Skip to content

Model — World, Craft, Coupling

The declarative layer: build a World, populate it with Crafts and Couplings, then hand it to a transform.

World

manta.World

World(name='world')

Top-level simulation container.

Source code in manta/world.py
def __init__(self, name: str = "world") -> None:
    from .ir.module import check_name
    self.name = check_name(name, who="World")
    # _crafts: list of dicts with craft, initial_state_overrides.
    self._crafts: list[dict[str, Any]] = []
    self._couplings: list[Coupling] = []
    # Fields keyed by exact subclass (one GravityField per world, one
    # FluidField, …). Concrete subclasses query by class.
    self._fields: dict[type, Field] = {}
    # Planets registered with this world. Each one contributes its
    # standing field disturbances at finalize() time via
    # `planet.register_disturbances(world)`. Multi-planet supported;
    # disturbances superpose into the shared field instances.
    self._planets: list = []
    # Set by finalize(): compile-time registrations have run and the
    # model is locked (the add_* mutators raise).
    self._finalized = False

finalized property

finalized

True once finalize() has locked the model.

finalize

finalize()

Resolve the compile-time registrations and lock the model.

Runs once (idempotent): planets contribute their standing disturbances to the shared fields, field-source parts emit their craft-anchored disturbances, cameras are pointed at the optical ellipsoids they can see, and PlanetState initial-state wrappers resolve to WorldFrame. Afterwards add_craft / add_field / add_planet / add_coupling raise — every transform built over this world sees the same finalized model.

Called automatically by the first transform (Sim, EKF, LQR, …); call it directly only to lock a world early.

Source code in manta/world.py
def finalize(self) -> "World":
    """Resolve the compile-time registrations and lock the model.

    Runs once (idempotent): planets contribute their standing
    disturbances to the shared fields, field-source parts emit their
    craft-anchored disturbances, cameras are pointed at the optical
    ellipsoids they can see, and `PlanetState` initial-state wrappers
    resolve to WorldFrame. Afterwards `add_craft` / `add_field` /
    `add_planet` / `add_coupling` raise — every transform built over
    this world sees the same finalized model.

    Called automatically by the first transform (`Sim`, `EKF`,
    `LQR`, …); call it directly only to lock a world early.
    """
    if self._finalized:
        return self
    for p in self._planets:
        p.register_disturbances(self)
    self._register_field_sources()
    self._resolve_planet_state_overrides()
    self._finalized = True
    return self

add_field

add_field(field)

Register a Field with this world. One instance per Field subclass is allowed — call field.add(disturbance) on the registered instance to attach more sources.

Returns self for chaining.

Source code in manta/world.py
def add_field(self, field: Field) -> "World":
    """Register a Field with this world. One instance per Field
    subclass is allowed — call `field.add(disturbance)` on the
    registered instance to attach more sources.

    Returns self for chaining.
    """
    self._require_mutable("add a field")
    if not isinstance(field, Field):
        raise TypeError(
            f"World.add_field: expected a Field, got "
            f"{type(field).__name__}")
    cls = type(field)
    if cls in self._fields:
        raise ValueError(
            f"World '{self.name}': field of type {cls.__name__} already "
            f"registered. Use `world.get_field({cls.__name__}).add(...)` "
            f"to attach additional disturbances to it.")
    self._fields[cls] = field
    return self

get_field

get_field(cls)

Return the registered field of type cls, or None.

Source code in manta/world.py
def get_field(self, cls: type) -> Field | None:
    """Return the registered field of type `cls`, or None."""
    return self._fields.get(cls)

get_or_create_field

get_or_create_field(cls)

Return the registered field of type cls, creating + adding a fresh empty instance if none is registered. Used by Planet.register_disturbances so a planet's contributions land on the same field instance whether or not the user pre-added one.

Source code in manta/world.py
def get_or_create_field(self, cls: type) -> Field:
    """Return the registered field of type `cls`, creating + adding
    a fresh empty instance if none is registered. Used by
    `Planet.register_disturbances` so a planet's contributions land
    on the same field instance whether or not the user pre-added
    one."""
    if cls in self._fields:
        return self._fields[cls]
    instance = cls()
    self.add_field(instance)
    return instance

add_planet

add_planet(planet)

Register a planet with this world. The planet's register_disturbances(world) is called at Sim(world) time, attaching its standing contributions to the world's shared fields. Multi-planet worlds superpose contributions from every registered planet.

Source code in manta/world.py
def add_planet(self, planet) -> "World":
    """Register a planet with this world. The planet's
    `register_disturbances(world)` is called at `Sim(world)`
    time, attaching its standing contributions to the world's
    shared fields. Multi-planet worlds superpose contributions
    from every registered planet.
    """
    self._require_mutable("add a planet")
    from .planets.base import Planet
    if not isinstance(planet, Planet):
        raise TypeError(
            f"World.add_planet: expected a Planet, got "
            f"{type(planet).__name__}")
    for existing in self._planets:
        if existing is planet:
            raise ValueError(
                f"World '{self.name}': planet {planet.name!r} already added")
        if existing.name == planet.name:
            raise ValueError(
                f"World '{self.name}': planet name {planet.name!r} "
                f"collides with an existing planet")
    planet._world = self
    self._planets.append(planet)
    return self

add_craft

add_craft(craft, *, position=(0.0, 0.0, 0.0), orientation=(1.0, 0.0, 0.0, 0.0), velocity=(0.0, 0.0, 0.0), angular_velocity=(0.0, 0.0, 0.0), **extra_state)

Add a craft to the world.

Args: craft — the Craft instance. position — craft-origin position, WorldFrame (m). orientation — wxyz quaternion, world-from-craft. velocity — craft-origin velocity, WorldFrame (m/s). angular_velocity — body rates in CraftFrame (rad/s) — the same convention as the integrated state (what a strapped-down gyro reads). For a non-identity orientation, world-frame rates must be rotated into the body first. **extra_state — per-part state overrides (e.g., **{"wheel.angle": 0.5}).

Source code in manta/world.py
def add_craft(self,
              craft: Craft,
              *,
              position=(0.0, 0.0, 0.0),
              orientation=(1.0, 0.0, 0.0, 0.0),
              velocity=(0.0, 0.0, 0.0),
              angular_velocity=(0.0, 0.0, 0.0),
              **extra_state: Any) -> Craft:
    """Add a craft to the world.

    Args:
        craft            — the Craft instance.
        position         — craft-origin position, WorldFrame (m).
        orientation      — wxyz quaternion, world-from-craft.
        velocity         — craft-origin velocity, WorldFrame (m/s).
        angular_velocity — body rates in **CraftFrame** (rad/s) — the
                           same convention as the integrated state
                           (what a strapped-down gyro reads). For a
                           non-identity `orientation`, world-frame
                           rates must be rotated into the body first.
        **extra_state    — per-part state overrides
                           (e.g., `**{"wheel.angle": 0.5}`).
    """
    self._require_mutable("add a craft")
    # Validate uniqueness.
    for entry in self._crafts:
        if entry["craft"] is craft:
            raise ValueError(
                f"World '{self.name}': craft '{craft.name}' already added")
        if entry["craft"].name == craft.name:
            raise ValueError(
                f"World '{self.name}': craft name '{craft.name}' collides "
                f"with an existing craft")

    initial_state_overrides: dict[str, Any] = {
        "position":         position,
        "orientation":      orientation,
        "velocity":         velocity,
        "angular_velocity": angular_velocity,
        **extra_state,
    }
    self._crafts.append({
        "craft":  craft,
        "initial_state_overrides": initial_state_overrides,
    })
    return craft

add_coupling

add_coupling(coupling)

Add an inter-craft coupling. Both endpoint crafts must already be registered via add_craft. The coupling forces them into the same connected component at compile time → one shared compiled tick over both.

Source code in manta/world.py
def add_coupling(self, coupling: Coupling) -> Coupling:
    """Add an inter-craft coupling. Both endpoint crafts must already
    be registered via `add_craft`. The coupling forces them into the
    same connected component at compile time → one shared compiled
    tick over both."""
    self._require_mutable("add a coupling")
    if not isinstance(coupling, Coupling):
        raise TypeError(
            f"World.add_coupling: expected Coupling, got "
            f"{type(coupling).__name__}")
    registered = {id(e["craft"]) for e in self._crafts}
    for c, label in ((coupling.craft_a, "craft_a"),
                      (coupling.craft_b, "craft_b")):
        if id(c) not in registered:
            raise ValueError(
                f"World.add_coupling: coupling.{label} '{c.name}' is "
                f"not registered with this World. Call add_craft first.")
    self._couplings.append(coupling)
    return coupling

Craft

manta.Craft

Craft(name)

A collection of parts with shared rigid-body dynamics.

Internally a craft is a tree of parts rooted at Craft.root (a RootPart). craft.add(part) is sugar for craft.root.add(part); craft.parts returns a flat tuple of all parts in the tree (DFS order). Nested composition (e.g. a joint hosting another joint for a pan-tilt gimbal) is supported via the standard composite add() chain on individual parts.

State (13 DOF): position : Vec3[WorldFrame] orientation : Quat[WorldFrame, CraftFrame] velocity : Vec3[WorldFrame] angular_velocity : Vec3[CraftFrame] plus one Scalar per R1 State slot declared on any of the parts.

Source code in manta/craft.py
def __init__(self, name: str) -> None:
    from .ir.module import check_name
    from .parts.base import RootPart
    self.name = check_name(name, who="Craft")
    self.root = RootPart(f"{name}_root")

parts property

parts

Flat tuple of every part in the tree, root first (DFS order). Excludes the root itself.

total_mass property

total_mass

Sum of the declared mass of every genuinely inertial part (contributes_inertia trait) — gain-like mass parameters (e.g. TrajectoryEndpoint's feedforward) don't count.

add

add(part)

Attach a part to the craft's root. Equivalent to craft.root.add(part).

Source code in manta/craft.py
def add(self, part: Part) -> Part:
    """Attach a part to the craft's root. Equivalent to
    `craft.root.add(part)`."""
    return self.root.add(part)

aggregate_inertials

aggregate_inertials()

Public-facing accessor: see _aggregate_inertials. Useful for external inspection and tests.

Source code in manta/craft.py
def aggregate_inertials(self) -> dict[str, Any]:
    """Public-facing accessor: see `_aggregate_inertials`. Useful for
    external inspection and tests."""
    return _aggregate_inertials(self.root)

sample_noise

sample_noise(rng)

Draw one tick of white-Gaussian samples for every declared Noise slot on every part. Returns a dict of "<part>.<noise>" → np.ndarray ready to merge into the state dict before calling the compiled tick.

Slots whose sigma is 0 return zero vectors without consuming RNG state (so a deterministic-seed sim stays reproducible regardless of which noise channels are active).

Source code in manta/craft.py
def sample_noise(self, rng) -> dict:
    """Draw one tick of white-Gaussian samples for every declared
    `Noise` slot on every part. Returns a dict of
    `"<part>.<noise>" → np.ndarray` ready to merge into the state
    dict before calling the compiled tick.

    Slots whose sigma is 0 return zero vectors without consuming
    RNG state (so a deterministic-seed sim stays reproducible
    regardless of which noise channels are active).
    """
    out: dict[str, Any] = {}
    for part in self.parts:
        for nname, ndecl in part.noise_declarations().items():
            sigma = float(getattr(part, f"{nname}_sigma"))
            # Inert RW channels skip RNG entirely; everyone else
            # samples into the channel's driver-input name.
            if ndecl.contributes_state and sigma <= 0.0:
                continue
            key = f"{part.name}.{ndecl.driver_input_name(nname)}"
            d = ndecl.signal_manifold.ambient_dim
            if d == 1:
                out[key] = (rng.normal(0.0, sigma)
                            if sigma > 0.0 else 0.0)
            else:
                out[key] = (rng.normal(0.0, sigma, d)
                            if sigma > 0.0
                            else np.zeros(d, dtype=float))
    return out

initial_state

initial_state(**overrides)

Build the initial state dict for the compiled tick.

Returns a dict with the rigid-body slots (position, orientation, velocity, angular_velocity) AND a "<part_name>.<state_name>" entry for every part that declares state. Defaults come from each State declaration's init; keyword overrides replace them by name.

Source code in manta/craft.py
def initial_state(self, **overrides) -> dict:
    """Build the initial state dict for the compiled tick.

    Returns a dict with the rigid-body slots (position, orientation,
    velocity, angular_velocity) AND a `"<part_name>.<state_name>"`
    entry for every part that declares state. Defaults come from each
    State declaration's `init`; keyword overrides replace them by name.
    """
    state: dict[str, Any] = {
        "position":         np.asarray((0.0, 0.0, 0.0), dtype=float),
        "orientation":      np.asarray((1.0, 0.0, 0.0, 0.0), dtype=float),
        "velocity":         np.asarray((0.0, 0.0, 0.0), dtype=float),
        "angular_velocity": np.asarray((0.0, 0.0, 0.0), dtype=float),
    }
    for part in self.parts:
        for sname, sdecl in part.state_declarations().items():
            if sdecl.manifold.kind == "scalar":
                state[f"{part.name}.{sname}"] = float(sdecl.init)
            else:
                # vec / quat — `init` is a fixed-length tuple
                # validated at declaration time. Store as ndarray
                # for symmetry with rigid-body slots.
                state[f"{part.name}.{sname}"] = np.asarray(
                    sdecl.init, dtype=float)
        # Input slots: seed from the part's current attribute (which
        # is either the constructor-time override or the declaration
        # default). These pass through Sim.step's merge so
        # the user can update them per-tick or leave them alone.
        for iname in part.input_declarations():
            state[f"{part.name}.{iname}"] = float(getattr(part, iname))
        # Noise / RW-bias slots. Seed everything at zero.
        #   * White: one slot `<part>.<nname>` (the per-tick driver).
        #     EKF leaves it at zero; sim overwrites via
        #     `craft.sample_noise(rng)`.
        #   * RW (sigma > 0): two slots — `<part>.<nname>` is the
        #     bias state, `<part>.<nname>_driver` is the per-tick
        #     driver. RW channels with sigma == 0 are inert.
        for nname, ndecl in part.noise_declarations().items():
            # Each channel declares which slots it contributes to
            # the seed dict (white: just the signal; active RW:
            # bias + driver; inert RW: nothing).
            for k, v in ndecl.initial_state_entries(nname, part).items():
                state[f"{part.name}.{k}"] = v
    unknown = set(overrides) - set(state)
    if unknown:
        raise KeyError(
            f"Craft.initial_state: unknown slot(s) {sorted(unknown)}. "
            f"Available: {sorted(state)}")
    for k, v in overrides.items():
        current = state[k]
        if isinstance(current, np.ndarray):
            state[k] = np.asarray(v, dtype=float)
        else:
            state[k] = float(v)
    return state

Coupling

manta.Coupling

Bases: ABC

Abstract base for inter-craft constraints.

A concrete subclass (e.g. Tether) declares two craft endpoints (craft_a / craft_b) and produces the extra wrench terms they exchange (compute_wrenches_sym) in the tick graph for the connected component. The presence of a Coupling forces both crafts into the same compile unit.

craft_a abstractmethod property

craft_a

The first coupled craft.

craft_b abstractmethod property

craft_b

The second coupled craft.

compute_wrenches_sym abstractmethod

compute_wrenches_sym(ctx_a, ctx_b)

The wrench pair (wrench_on_a, wrench_on_b) this coupling applies, given each craft's TickContext. Frames: each wrench is in its own craft's CraftFrame, at that craft's origin.

Source code in manta/couplings/base.py
@abstractmethod
def compute_wrenches_sym(self, ctx_a, ctx_b):
    """The wrench pair `(wrench_on_a, wrench_on_b)` this coupling
    applies, given each craft's `TickContext`. Frames: each wrench is
    in its own craft's `CraftFrame`, at that craft's origin."""

manta.couplings.Tether

Tether(craft_a, endpoint_a, craft_b, endpoint_b, *, stiffness, damping=0.0, rest_length=0.0)

Bases: Coupling

Linear spring-damper tether between two TetherEndpoint Parts.

Args: craft_a, craft_b — the two coupled crafts (Craft instances). endpoint_a, endpoint_b — names of the TetherEndpoint Parts on craft_a / craft_b (strings). stiffness — spring constant k, N/m. damping — damper constant c, N·s/m. rest_length — natural length L_rest, m. The spring exerts zero force at exactly this separation.

Convention: tension positive — when L > rest_length the tether pulls A toward B (and B toward A). Compression (L < rest_length) pushes them apart, mimicking a rigid rod. For a string that goes slack, set stiffness=0 below rest_length using a custom subclass — the v1 Tether models a rigid spring (no slack).

Source code in manta/couplings/tether.py
def __init__(self,
             craft_a,
             endpoint_a: str,
             craft_b,
             endpoint_b: str,
             *,
             stiffness: float,
             damping: float = 0.0,
             rest_length: float = 0.0) -> None:
    self._craft_a = craft_a
    self._craft_b = craft_b
    self.endpoint_a_name = str(endpoint_a)
    self.endpoint_b_name = str(endpoint_b)
    self.stiffness = float(stiffness)
    self.damping   = float(damping)
    self.rest_length = float(rest_length)
    # Resolve endpoints now — a bad name fails here, at the line that
    # wrote it, not at compile. Add endpoint Parts before the Tether.
    self.endpoint_a = self._find_endpoint(craft_a, self.endpoint_a_name)
    self.endpoint_b = self._find_endpoint(craft_b, self.endpoint_b_name)

compute_wrenches_sym

compute_wrenches_sym(ctx_a, ctx_b)

Return (wrench_on_a_at_craft_origin, wrench_on_b_at_craft_origin).

Both wrenches are in their respective CraftFrame, lifted to the body origin (force-at-offset + lever-arm torque). The compile layer adds these directly to each craft's aggregate net wrench.

Source code in manta/couplings/tether.py
def compute_wrenches_sym(self, ctx_a, ctx_b) -> tuple[Wrench, Wrench]:
    """Return (wrench_on_a_at_craft_origin, wrench_on_b_at_craft_origin).

    Both wrenches are in their respective CraftFrame, lifted to the
    body origin (force-at-offset + lever-arm torque). The compile
    layer adds these directly to each craft's aggregate net wrench.
    """
    # Endpoint offsets in each body frame. A promoted (tunable)
    # endpoint transform reads as a trace-bound IR value — keep the
    # symbol (the bound vector's tag is the generic PartFrame; an
    # endpoint hangs directly off the root, so its coords ARE craft
    # coords, same assumption the constant path makes).
    def _off(ep):
        from ..parts._trace import is_promoted
        tr = ep.transform
        if is_promoted(tr):
            return Vec3[CraftFrame].from_mx(tr._mx)
        return Vec3[CraftFrame].constant(tuple(tr))

    off_a_craft = _off(self.endpoint_a)
    off_b_craft = _off(self.endpoint_b)

    # Endpoint positions in world frame. The coupling reads each craft's
    # root ctx (root frame = CraftFrame): orientation is
    # Quat[WorldFrame, CraftFrame] and position[WorldFrame] is the craft
    # origin in world.
    off_a_anchor = ctx_a.orientation.apply(off_a_craft)
    off_b_anchor = ctx_b.orientation.apply(off_b_craft)
    p_a_anchor = ctx_a.position[WorldFrame] + off_a_anchor
    p_b_anchor = ctx_b.position[WorldFrame] + off_b_anchor

    # Vector from A to B; instantaneous length with softened sqrt.
    r = p_b_anchor - p_a_anchor
    r_mx = r._mx
    L    = soft_norm(r_mx)
    r_hat_mx = r_mx / L
    r_hat = Vec3[WorldFrame].from_mx(r_hat_mx)

    # Endpoint velocities in world frame:
    #   v_endpoint_anchor = v_origin + R · (ω_body × r_endpoint_body)
    # ω_body (craft inertial ω, in craft coords) = R^T · ω[WorldFrame].
    omega_a_craft = ctx_a.orientation.conjugate().apply(
        ctx_a.angular_velocity[WorldFrame])
    omega_b_craft = ctx_b.orientation.conjugate().apply(
        ctx_b.angular_velocity[WorldFrame])
    v_rot_a_craft = omega_a_craft.cross(off_a_craft)
    v_rot_b_craft = omega_b_craft.cross(off_b_craft)
    v_a_anchor = (ctx_a.velocity[WorldFrame]
                  + ctx_a.orientation.apply(v_rot_a_craft))
    v_b_anchor = (ctx_b.velocity[WorldFrame]
                  + ctx_b.orientation.apply(v_rot_b_craft))
    v_rel = v_b_anchor - v_a_anchor

    # Tension positive when stretched; positive v_along means A and
    # B moving apart (damper resists that).
    stretch = L - self.rest_length
    v_along = v_rel.dot(r_hat)
    F_mag = self.stiffness * stretch + self.damping * v_along

    # Force on A is along +r̂ (toward B) when stretched/separating.
    F_on_a_anchor = r_hat * F_mag
    F_on_b_anchor = F_on_a_anchor * (-1.0)

    # Rotate into each craft's frame.
    F_on_a_craft = ctx_a.orientation.conjugate().apply(F_on_a_anchor)
    F_on_b_craft = ctx_b.orientation.conjugate().apply(F_on_b_anchor)

    # Lift force-at-offset to wrench-at-craft-origin:
    #   F at offset r yields F at origin plus torque r × F about origin.
    wrench_a = Wrench(
        force=F_on_a_craft,
        torque=off_a_craft.cross(F_on_a_craft),
    )
    wrench_b = Wrench(
        force=F_on_b_craft,
        torque=off_b_craft.cross(F_on_b_craft),
    )
    return wrench_a, wrench_b