Skip to content

Transforms — Sim, EKF, LQR

The three compile-time siblings. Each takes a World, writes its math symbolically over the shared linearized system, and emits a typed Module. Lower one with a target to get a callable runtime.

Sim

manta.Sim

Sim(world, *, discretization='exact', parameters=None)

Forward-dynamics transform: model validation + the linearized tick, emitting oracle/deploy Modules.

Source code in manta/sim.py
def __init__(self, world: "World", *,
             discretization: str = "exact",
             parameters: list[str] | None = None) -> None:
    # Model validation (planet prep, requires_fields/requires_planet,
    # craft back-pointers) happens inside LinearizedSystem — the one
    # choke point every transform passes through. `discretization`
    # selects how predict_jacobian discretizes F ("exact" | "euler" —
    # see LinearizedSystem; "euler" trades an O(dt²) jacobian
    # difference for much smaller generated deploy code).
    # `parameters` promotes the named promotable Parameters to a live
    # `params` port on every emitted Module (system ID — `manta.fit`);
    # passing the port's declared defaults reproduces the baked model
    # bit-for-bit.
    self._sys = LinearizedSystem(           # full state, all sensors
        world, discretization=discretization, parameters=parameters)
    self.world = world
    self.crafts = self._sys.crafts

tick property

tick

The compiled world tick (named CasADi I/O).

module

module()

The oracle Module (simulation truth): one step entry — the full forward tick, one live noise draw → state + readings.

Source code in manta/sim.py
def module(self) -> Module:
    """The **oracle** Module (simulation truth): one `step` entry —
    the full forward tick, one live noise draw → state + readings."""
    sys = self._sys
    x_field, u_port, dtp, tp, meas_ports = self._module_scaffold()
    sensor_fulls = list(sys.sensors)
    noise_port = Port(
        "noise", Role.NOISE, (sys.n_noise,),
        fields=tuple(PortField(c.full, c.dim, 0.0, sigma=c.sigma)
                     for c in sys.noise_specs))
    p_port = self._param_port()
    kargs, kargn = [sys.x_sym, sys.u_sym, sys.n_sym], ["x", "u", "noise"]
    eargs = [StateRef("x"), PortRef("u"), PortRef("noise")]
    if p_port is not None:
        kargs.append(sys.p_sym); kargn.append("params")
        eargs.append(PortRef("params"))
    kargs += [sys.dt_sym, sys.t_sym]; kargn += ["dt", "t"]
    eargs += [PortRef("dt"), PortRef("t")]
    step_fn = ca.Function(
        "step", kargs,
        [sys.x_new_noisy] + [
            ca.reshape(sys.sensors[f].h_noisy_sym,
                       sys.sensors[f].dim, 1) for f in sensor_fulls],
        kargn,
        ["x_new"] + [entry_ident(f) for f in sensor_fulls])
    ports = [u_port, noise_port, dtp, tp, *meas_ports]
    if p_port is not None:
        ports.insert(2, p_port)
    return Module(
        name=self.world.name, state=StateLayout((x_field,)),
        ports=tuple(ports),
        functions={"step": step_fn},
        entry_points=(EntryPoint(
            "step", "step", tuple(eargs),
            writes=("x",), returns=tuple(sensor_fulls)),),
        hosting=Hosting.THREADED)

deploy_module

deploy_module()

The deploy Module (runs on a robot against real sensors): noiseless forward map + per-sensor measurement models + Jacobians.

Source code in manta/sim.py
def deploy_module(self) -> Module:
    """The **deploy** Module (runs on a robot against real sensors):
    noiseless forward map + per-sensor measurement models + Jacobians."""
    sys = self._sys
    spec = sys.spec
    x_field, u_port, dtp, tp, meas_ports = self._module_scaffold()
    # A measurement is dt-independent — dt is eliminated at construction,
    # so the measure kernels honestly take (x, u, t).
    tan = spec.tangent_dim
    zero_dt = ca.MX.zeros(1, 1)
    functions = {"predict": sys.predict_fn, "predict_jacobian": sys.F_fn}
    p_port = self._param_port()
    p_ref = () if p_port is None else (PortRef("params"),)
    ports = [u_port, *((p_port,) if p_port is not None else ()),
             dtp, tp, *meas_ports,
             Port("F", Role.MATRIX, (tan, tan))]
    entries = [
        EntryPoint("predict", "predict",
                   (StateRef("x"), PortRef("u"), *p_ref, PortRef("dt"),
                    PortRef("t")), writes=("x",)),
        EntryPoint("predict_jacobian", "predict_jacobian",
                   (StateRef("x"), PortRef("u"), *p_ref, PortRef("dt"),
                    PortRef("t")), returns=("F",)),
    ]
    margs = [sys.x_sym, sys.u_sym, sys.t_sym]
    margn = ["x", "u", "t"]
    if p_port is not None:
        margs.insert(2, sys.p_sym)
        margn.insert(2, "p")
    for full, s in sys.sensors.items():
        ident = entry_ident(full)
        h = ca.substitute(s.h_sym, sys.dt_sym, zero_dt)
        H = ca.substitute(s.H_sym, sys.dt_sym, zero_dt)
        functions[f"measure_{ident}"] = ca.Function(
            f"h_{ident}", margs, [h], margn, ["h"])
        functions[f"measure_{ident}_jacobian"] = ca.Function(
            f"H_{ident}", margs, [H], margn, ["H"])
        ports.append(Port(f"H_{ident}", Role.MATRIX, (s.dim, tan)))
        entries.append(EntryPoint(
            f"measure_{ident}", f"measure_{ident}",
            (StateRef("x"), PortRef("u"), *p_ref, PortRef("t")),
            returns=(full,)))
        entries.append(EntryPoint(
            f"measure_{ident}_jacobian", f"measure_{ident}_jacobian",
            (StateRef("x"), PortRef("u"), *p_ref, PortRef("t")),
            returns=(f"H_{ident}",)))
    return Module(
        name=self.world.name, state=StateLayout((x_field,)),
        ports=tuple(ports), functions=functions,
        entry_points=tuple(entries), hosting=Hosting.THREADED)

EKF

manta.EKF

EKF(world, *, track=None, sensors=None, inputs=None, discretization='exact')

Error-state EKF over a World — symbolic recursion + typed Module.

Args: track: {craft_name: SlotSet} lower bound of what to estimate (closed under the dynamics; the rest freezes). None keeps the full state. sensors: measurement full-names (or unambiguous suffixes). None keeps every output (of tracked crafts). inputs: known control inputs; None keeps all, excluded ones freeze at their default. discretization: how F discretizes the dynamics — "exact" (default; jacobian of the full discrete tick) or "euler" (F = I + dt·∂ẋ/∂δ; O(dt²) from exact, much smaller generated deploy code). See LinearizedSystem.

Source code in manta/estimation/ekf.py
def __init__(self, world, *,
             track: dict | None = None,
             sensors: list[str] | None = None,
             inputs: list[str] | None = None,
             discretization: str = "exact") -> None:
    """Args:
        track:   `{craft_name: SlotSet}` lower bound of what to estimate
                 (closed under the dynamics; the rest freezes). `None`
                 keeps the full state.
        sensors: measurement full-names (or unambiguous suffixes).
                 `None` keeps every output (of tracked crafts).
        inputs:  known control inputs; `None` keeps all, excluded ones
                 freeze at their default.
        discretization: how F discretizes the dynamics — "exact"
                 (default; jacobian of the full discrete tick) or
                 "euler" (F = I + dt·∂ẋ/∂δ; O(dt²) from exact, much
                 smaller generated deploy code). See LinearizedSystem.
    """
    sys = LinearizedSystem(world, track=track, sensors=sensors,
                           inputs=inputs, track_mode="closure",
                           discretization=discretization)
    self.sys = sys
    self.world = world
    self.crafts = sys.crafts
    self.spec: StateSpec = sys.spec

    # ---- the Kalman recursion, symbolically, once -------------------
    spec, n_tan = sys.spec, sys.spec.tangent_dim
    x, u = sys.x_sym, sys.u_sym
    dt, t = sys.dt_sym, sys.t_sym
    P = ca.MX.sym("P", n_tan, n_tan)
    Q = ca.MX.sym("Q", n_tan, n_tan)
    F = sys.F_sym

    # predict: auto process noise Q = L Σ Lᵀ baked into the kernel
    # (zero when the model declares none) + an explicit-Q override.
    Q_auto = lin_cov(sys.L_sym,
                     ca.DM(sys.Sigma) if sys.L_sym is not None else None,
                     n_tan)
    predict_fn = ca.Function(
        "ekf_predict", [x, P, u, dt, t],
        [sys.x_new, symmetrize(F @ P @ F.T + Q_auto)],
        ["x", "P", "u", "dt", "t"], ["x_new", "P_new"])
    predict_q_fn = ca.Function(
        "ekf_predict_with_Q", [x, P, Q, u, dt, t],
        [sys.x_new, symmetrize(F @ P @ F.T + Q)],
        ["x", "P", "Q", "u", "dt", "t"], ["x_new", "P_new"])

    # per-sensor Joseph update (the shared `joseph_update` kernel —
    # see estimation/_kalman.py). A measurement is dt-independent, so
    # dt is eliminated here (substituted to 0) — the kernel honestly
    # takes only (x, P, z, u, t).
    init_flat = flatten_nested(world._initial_state_dict())
    x0 = spec.pack_any(init_flat)
    zero_dt = ca.MX.zeros(1, 1)
    updates: dict[str, ca.Function] = {}
    for full, s in sys.sensors.items():
        z = ca.MX.sym("z", s.dim)
        h = ca.substitute(s.h_sym, dt, zero_dt)
        H = ca.substitute(s.H_sym, dt, zero_dt)
        L_h = (ca.substitute(s.L_h_sym, dt, zero_dt)
               if s.L_h_sym is not None and sys.Sigma is not None
               else None)
        R = lin_cov(L_h, ca.DM(sys.Sigma) if L_h is not None else None,
                    s.dim)
        # Refuse a σ=0 sensor (R ≡ 0 → singular S on the second fold).
        R_fn = ca.Function("R0", [x, u, t], [R])
        require_active_R(R, R_fn, ca.vertcat(x, u, t),
                         x0=x0, u_defaults=sys.u_defaults, spec=spec,
                         full=full, who="EKF")
        x_upd, P_upd, _, _ = joseph_update(x, P, h, H, R, z, spec)
        updates[full] = ca.Function(
            f"ekf_update_{entry_ident(full)}",
            [x, P, z, u, t], [x_upd, P_upd],
            ["x", "P", "z", "u", "t"], ["x_new", "P_new"])

    # ---- the typed Module -------------------------------------------
    fields = (
        StateField("x", "manifold", (spec.ambient_dim,),
                   init=x0, manifold=spec),
        StateField("P", "matrix", (n_tan, n_tan),
                   init=np.eye(n_tan) * 1e-2),
    )
    ports = [
        Port("u", Role.CONTROL, (len(sys.input_names),), fields=tuple(
            PortField(n, 1, float(sys.input_defaults[n]),
                      rate=sys.sample_rates.get(n))
            for n in sys.input_names)),
        Port("dt", Role.TIMESTEP),
        Port("t", Role.TIME),
        Port("Q", Role.MATRIX, (n_tan, n_tan)),
    ]
    functions = {"predict": predict_fn, "predict_with_Q": predict_q_fn}
    entries = [
        EntryPoint("predict", "predict",
                   (StateRef("x"), StateRef("P"), PortRef("u"),
                    PortRef("dt"), PortRef("t")),
                   writes=("x", "P")),
        EntryPoint("predict_with_Q", "predict_with_Q",
                   (StateRef("x"), StateRef("P"), PortRef("Q"),
                    PortRef("u"), PortRef("dt"), PortRef("t")),
                   writes=("x", "P")),
    ]
    for full, s in sys.sensors.items():
        ident = entry_ident(full)
        ports.append(Port(full, Role.MEASUREMENT, (s.dim,),
                          rate=sys.sample_rates.get(full)))
        functions[f"update_{ident}"] = updates[full]
        entries.append(EntryPoint(
            f"update_{ident}", f"update_{ident}",
            (StateRef("x"), StateRef("P"), PortRef(full),
             PortRef("u"), PortRef("t")),
            writes=("x", "P")))

    self._module = Module(
        name=f"{world.name}_ekf", state=StateLayout(fields),
        ports=tuple(ports), functions=functions,
        entry_points=tuple(entries), hosting=Hosting.HELD)

n_blocks property

n_blocks

Independent tangent subsystems (block-diagonal predict).

module

module()

The typed Module IR a backend lowers.

Source code in manta/estimation/ekf.py
def module(self) -> Module:
    """The typed `Module` IR a backend lowers."""
    return self._module

observability

observability(**kwargs)

Local observability of the chosen sensor set at an operating point (see manta.estimation.observability).

Source code in manta/estimation/ekf.py
def observability(self, **kwargs):
    """Local observability of the chosen sensor set at an operating
    point (see `manta.estimation.observability`)."""
    from .observability import observability
    return observability(self, **kwargs)

sigma_horizon

sigma_horizon(**kwargs)

Per-slot σ attainable after a horizon — the covariance recursion run open-loop, resolving the weak/slow observability the rank test can't (see manta.estimation.observability).

Source code in manta/estimation/ekf.py
def sigma_horizon(self, **kwargs):
    """Per-slot σ attainable after a horizon — the covariance
    recursion run open-loop, resolving the weak/slow observability the
    rank test can't (see `manta.estimation.observability`)."""
    from .observability import sigma_horizon
    return sigma_horizon(self, **kwargs)

LQR

manta.LQR

LQR(world, *, x_ref, u_ref=None, Q=None, R=None, dt=0.01, regulate=None, tol=1e-12, max_iter=10000)

Infinite-horizon discrete LQR about an operating point.

Args: world — the model. x_ref — target state (nested {owner: {slot: value}} or flat {"owner.slot": value}), merged over the world's initial state for any unspecified slot. u_ref — trim inputs ({input_name: value}), merged over each Part Input's default. The equilibrium command. Q, R — LQR cost weights (regulated-tangent², n_inputs²). Default to identity. R must be positive-definite. dt — the discrete step the controller will run at. regulate — slot full-names to regulate, taken verbatim (e.g. ["c.position", "c.velocity"]); the rest are frozen at x_ref. None regulates the full state (fully-actuated systems only). tol, max_iter — Riccati-iteration convergence: relative fixpoint tolerance (‖ΔP‖ ≤ tol·max(1, ‖P‖)) and iteration cap.

Attributes: spec (full), regulated (regulated slot names), input_names, K (n_u × tracked_tangent), A, B, x_ref/u_ref (vectors), control_fn (u(x_full, x_ref_full) baked ca.Function; runtimes default x_ref to the built reference — see NumpyRegulator.retarget).

Source code in manta/control/lqr.py
def __init__(self, world, *,
             x_ref: dict,
             u_ref: dict | None = None,
             Q=None, R=None,
             dt: float = 0.01,
             regulate: list[str] | None = None,
             tol: float = 1e-12,
             max_iter: int = 10000) -> None:
    if not world.crafts:
        raise ValueError("LQR: world has no crafts.")

    # All the linearization plumbing — tick compile, signature, the
    # VERBATIM regulated subset frozen at the operating point, and
    # B = ∂f/∂u — lives in `LinearizedSystem`. `regulate` is taken
    # verbatim (NOT closed over the dynamics like the EKF's `track`):
    # for an underactuated craft the whole point is to freeze the
    # uncontrollable states (e.g. attitude) at the operating point so
    # the reduced system is stabilizable; closing the set would pull
    # them back and the Riccati solve would diverge. The reference
    # point doubles as the freeze value.
    sys = LinearizedSystem(world, track=regulate, inputs=None,
                           track_mode="verbatim", control=True,
                           ref=x_ref)
    self.sys     = sys
    self.world   = world
    self.spec    = sys.full_spec      # full layout (the law gathers from it)
    self._spec   = sys.spec           # tracked subspec
    self.regulated = sys.tracked
    self.input_names = sys.input_names
    n_u = len(self.input_names)
    if n_u == 0:
        raise ValueError(
            "LQR: world has no Part Inputs — no control authority.")

    # --- operating point ----------------------------------------------
    u_full = dict(sys.input_defaults)
    for k, v in (u_ref or {}).items():
        full = resolve_suffix(k, self.input_names, label="input", who="LQR")
        u_full[full] = float(v)
    u_ref_vec = np.array([u_full[n] for n in self.input_names], dtype=float)
    x_ref_full = sys.pack_ref(sys.full_spec)
    self.x_ref, self.u_ref = x_ref_full, u_ref_vec

    # A = F, B = ∂f/∂u, both at the operating point (subspec ambient).
    x_ref_sub = sys.pack_ref(sys.spec)
    A = np.array(sys.F_fn(x_ref_sub, u_ref_vec, dt, 0.0))
    B = np.array(sys.B_fn(x_ref_sub, u_ref_vec, dt, 0.0))
    self.A, self.B = A, B
    n_x = sys.spec.tangent_dim

    Qm = np.eye(n_x) if Q is None else np.asarray(Q, dtype=float)
    Rm = np.eye(n_u) if R is None else np.asarray(R, dtype=float)
    if Qm.shape != (n_x, n_x):
        raise ValueError(
            f"LQR: Q must be {n_x}×{n_x} (tracked tangent dim), "
            f"got {Qm.shape}.")
    if Rm.shape != (n_u, n_u):
        raise ValueError(
            f"LQR: R must be {n_u}×{n_u} (n_inputs), got {Rm.shape}.")
    if not np.allclose(Rm, Rm.T):
        raise ValueError("LQR: R must be symmetric.")
    if np.min(np.linalg.eigvalsh(Rm)) <= 0.0:
        raise ValueError(
            "LQR: R must be positive-definite (its min eigenvalue is "
            f"{np.min(np.linalg.eigvalsh(Rm)):.3g}) — every input "
            "needs a positive cost.")

    self.K, self.P = _solve_dare(A, B, Qm, Rm, tol=tol,
                                 max_iter=max_iter)

    # --- baked control law: u = u_ref − K·(x_tracked ⊟ x_ref_tracked).
    # Takes the FULL ambient state AND the reference as arguments;
    # gathers the tracked slots from each. The gain K is the baked
    # constant — feeding a moved `x_ref` retargets the regulator
    # through the same law with NO re-solve, which is exact wherever
    # the dynamics are invariant along the moved direction (e.g.
    # translating a hover setpoint under uniform gravity). A genuinely
    # different operating point (new A/B or trim) needs a new LQR.
    full_spec, spec = sys.full_spec, sys.spec
    x_full_sym = ca.MX.sym("x", full_spec.ambient_dim, 1)
    x_ref_sym = ca.MX.sym("x_ref", full_spec.ambient_dim, 1)

    def _gather(sym):
        chunks = []
        for s in spec.slots:
            fs = full_spec.slot(s.name)
            chunks.append(
                sym[fs.ambient_offset : fs.ambient_offset + fs.ambient_dim])
        return ca.vertcat(*chunks) if chunks else sym

    dx = spec.boxminus_sym(_gather(x_full_sym), _gather(x_ref_sym))
    u_expr = ca.DM(u_ref_vec.reshape(-1, 1)) - ca.DM(self.K) @ dx
    self.control_fn = ca.Function(
        "lqr_u", [x_full_sym, x_ref_sym], [u_expr], ["x", "x_ref"], ["u"])

    # --- the typed Module: stateless, one control(x, x_ref) -> u entry.
    # Both STATE ports carry the operating point as `init`, so a
    # backend can default x_ref to the built reference.
    self._module = Module(
        name=f"{world.name}_lqr", state=StateLayout(()),
        ports=(
            Port("x", Role.STATE, (full_spec.ambient_dim,),
                 manifold=full_spec, init=x_ref_full),
            Port("x_ref", Role.STATE, (full_spec.ambient_dim,),
                 manifold=full_spec, init=x_ref_full),
            Port("u", Role.CONTROL, (n_u,), fields=tuple(
                PortField(n, 1, float(u_full[n]))
                for n in self.input_names)),
        ),
        functions={"control": self.control_fn},
        entry_points=(EntryPoint("control", "control",
                                 (PortRef("x"), PortRef("x_ref")),
                                 returns=("u",)),),
        hosting=Hosting.THREADED)

closed_loop_eigs property

closed_loop_eigs

Eigenvalues of the closed-loop tangent map A − B·K (over the tracked subspace). All inside the unit circle ⇒ stable.

module

module()

The typed Module IR a backend lowers.

Source code in manta/control/lqr.py
def module(self) -> Module:
    """The typed `Module` IR a backend lowers."""
    return self._module

PID

manta.PID

PID(kp, ki=0.0, kd=0.0, *, integral_limit=None, output_limit=None, name='pid')

Bases: RecurrenceBlock

Scalar PID controller as a recurrence block.

Args: kp, ki, kd — proportional / integral / derivative gains. integral_limit — symmetric clamp on the integral accumulator (anti-windup). None disables it. output_limit — symmetric clamp on the command. None disables. name — codegen basename / default C++ class stem.

Ports: inputs setpoint + measurement (scalars); output command. State: integral, prev_measurement, primed.

Source code in manta/control/pid.py
def __init__(self, kp: float, ki: float = 0.0, kd: float = 0.0, *,
             integral_limit: float | None = None,
             output_limit: float | None = None,
             name: str = "pid") -> None:
    self.kp = float(kp)
    self.ki = float(ki)
    self.kd = float(kd)
    self.integral_limit = (None if integral_limit is None
                           else float(integral_limit))
    self.output_limit = (None if output_limit is None
                         else float(output_limit))

    kp_, ki_, kd_ = self.kp, self.ki, self.kd
    i_lim, o_lim = self.integral_limit, self.output_limit

    def rec(x, u, dt, t):
        err   = u["setpoint"] - u["measurement"]
        integ = x["integral"] + err * dt
        if i_lim is not None:
            integ = ca.fmin(ca.fmax(integ, -i_lim), i_lim)
        # Derivative on measurement; zeroed on the first step via `primed`.
        d_meas = x["primed"] * (u["measurement"] - x["prev_measurement"]) / dt
        cmd = kp_ * err + ki_ * integ - kd_ * d_meas
        if o_lim is not None:
            cmd = ca.fmin(ca.fmax(cmd, -o_lim), o_lim)
        x_next = {
            "integral":         integ,
            "prev_measurement": u["measurement"],
            "primed":           ca.MX.ones(1, 1),
        }
        return x_next, {"command": cmd}

    self._build_recurrence(
        name=name,
        state=[("integral",         ScalarManifold()),
               ("prev_measurement", ScalarManifold()),
               ("primed",           ScalarManifold())],
        inputs=[("setpoint", 1), ("measurement", 1)],
        outputs=[("command", 1)],
        x0={"integral": 0.0, "prev_measurement": 0.0, "primed": 0.0},
        recurrence=rec)