Skip to content

Targets and runtimes

A Target* lowers a Module to a backend. TargetNumpy returns the matching native-Python runtime view; TargetCpp emits an Eigen C++ library; TargetJax emits a jitted rollout.

Targets

manta.TargetNumpy

TargetNumpy(x, *, compile=False)

Lower a typed Module — or any transform exposing .module() (Sim, EKF, LQR, a recurrence block) — to the matching native-Python view (sim / filter / recurrence / regulator), or the bare kernel engine when no view matches.

compile=True builds the kernel's CasADi functions with cc and calls them as externals instead of interpreting the MX graph (bit-identical, ~8x per call; cached on disk). Pair with NumpySim's step_n to fold substeps for a further amortization.

Source code in manta/codegen/numpy/__init__.py
def TargetNumpy(x, *, compile: bool = False) -> NumpyRuntime:
    """Lower a typed `Module` — or any transform exposing `.module()`
    (`Sim`, `EKF`, `LQR`, a recurrence block) — to the matching
    native-Python view (sim / filter / recurrence / regulator), or the
    bare kernel engine when no view matches.

    `compile=True` builds the kernel's CasADi functions with `cc` and
    calls them as externals instead of interpreting the MX graph
    (bit-identical, ~8x per call; cached on disk). Pair with `NumpySim`'s
    `step_n` to fold substeps for a further amortization."""
    from ..target import as_module
    m = as_module(x, "TargetNumpy")
    runtime = _select_view(m)(m)
    return runtime._enable_compile() if compile else runtime

manta.TargetCpp

TargetCpp(x, out_dir, *, class_name, basename=None, namespace='manta_gen')

C++ codegen target.

Args: x — a Module, or a transform with .module(). out_dir — destination directory (created if missing). class_name — C++ class name. Conventionally PascalCase. basename — filename stem; defaults to class_name.lower(). namespace — C++ namespace enclosing the emitted class.

Returns: EmitResult with paths to every emitted file plus a small funcs summary (world_name / dims).

Source code in manta/codegen/cpp/__init__.py
def TargetCpp(x,
              out_dir: str | Path,
              *,
              class_name: str,
              basename: str | None = None,
              namespace: str = "manta_gen") -> EmitResult:
    """C++ codegen target.

    Args:
        x           — a `Module`, or a transform with `.module()`.
        out_dir     — destination directory (created if missing).
        class_name  — C++ class name. Conventionally PascalCase.
        basename    — filename stem; defaults to `class_name.lower()`.
        namespace   — C++ namespace enclosing the emitted class.

    Returns:
        `EmitResult` with paths to every emitted file plus a small `funcs`
        summary (world_name / dims).
    """
    return emit_module(x, out_dir, class_name=class_name,
                       basename=basename, namespace=namespace)

manta.TargetJax

TargetJax(x)

Lower a Module (or any transform exposing .module()) to jitted JAX kernels by name + a lax.scan rollout builder — the functional artifacts a training loop wants (see manta.codegen.jax.JaxModule). jax is imported only when this is called, so manta itself never requires it.

Source code in manta/codegen/__init__.py
def TargetJax(x):
    """Lower a Module (or any transform exposing `.module()`) to jitted
    JAX kernels by name + a `lax.scan` rollout builder — the functional
    artifacts a training loop wants (see `manta.codegen.jax.JaxModule`).
    `jax` is imported only when this is called, so manta itself never
    requires it."""
    from .jax import TargetJax as _TargetJax
    return _TargetJax(x)

manta.NoiseDriver

NoiseDriver(seed=None)

Draws the per-step samples that make an oracle Module's noise live.

Binds to the NOISE port's fields (name, dim, σ); each sample() is an independent N(0, σ²) draw per active channel. Deliberately thin and swappable — kept out of the pure kernels — and simply omitted on a deploy target.

Source code in manta/codegen/numpy/_noise.py
def __init__(self, seed: int | None = None) -> None:
    self._seed = seed
    self._rng = np.random.default_rng(seed)
    self._channels: list[tuple[str, int, float]] = []

Numpy runtime views

The view TargetNumpy(x) returns is determined by the Module's shape.

manta.codegen.NumpyRuntime

NumpyRuntime(module)

The generic engine over a typed Module: state storage + the typed-arg gather → kernel call → scatter. Views subclass it.

Source code in manta/codegen/numpy/_runtime.py
def __init__(self, module: Module) -> None:
    self.module = module
    self._functions = module.functions      # swapped by _enable_compile()
    self._spec = module.spec
    self._state: dict[str, np.ndarray] = {}
    for f in module.state.fields:
        a = np.asarray(f.init, dtype=float)
        self._state[f.name] = (a.reshape(f.shape) if f.kind == "matrix"
                               else a.reshape(-1).copy())

    self._u_port = module.sole_port(Role.CONTROL)
    self._noise_port = module.sole_port(Role.NOISE)
    self._meas_ports_ir = module.ports_by_role(Role.MEASUREMENT)
    self._y_port = module.sole_port(Role.OUTPUT)
    self._x_port = module.sole_port(Role.STATE)
    self._param_port = module.sole_port(Role.PARAMETER)
    self._param_overrides: dict[str, np.ndarray] = {}

    self._t = 0.0

input_names property

input_names

The Module's declared control-input names, in order.

call

call(method, values=None, **kw)

Run one entry point. values/kwargs are keyed by port or state name (use the dict for dotted names); TIME ports default to 0.

The Hosting contract: a THREADED module's state is the caller's — supply state fields by name in values (unsupplied ones fall back to the engine's last-written copy) and read the fresh writes from the returned dict alongside the entry's returns. A HELD module's state lives in the runtime and is read/written in place; only the entry's returns come back.

Source code in manta/codegen/numpy/_runtime.py
def call(self, method: str, values: dict[str, Any] | None = None,
         **kw) -> dict[str, np.ndarray]:
    """Run one entry point. `values`/kwargs are keyed by port or state
    name (use the dict for dotted names); TIME ports default to 0.

    The Hosting contract: a THREADED module's state is the caller's —
    supply state fields by name in `values` (unsupplied ones fall back
    to the engine's last-written copy) and read the fresh writes from
    the returned dict alongside the entry's returns. A HELD module's
    state lives in the runtime and is read/written in place; only the
    entry's returns come back."""
    vals = dict(values or {})
    vals.update(kw)
    return self._run(self.module.entry(method), vals)

build_u

build_u(u)

Resolve a {name: value} dict (full or suffix names) to the flat control vector over the Module's declared defaults.

Source code in manta/codegen/numpy/_runtime.py
def build_u(self, u: dict[str, Any] | None) -> np.ndarray:
    """Resolve a `{name: value}` dict (full or suffix names) to the
    flat control vector over the Module's declared defaults."""
    names = self._input_names()
    source = {resolve_suffix(k, names, label="input",
                             who=type(self).__name__): v
              for k, v in (u or {}).items()}
    return pack_fields(self._u_fields(), source,
                       default=lambda f: f.default, who="build_u")

set_parameters

set_parameters(values)

Override promoted-parameter values (full or suffix names); every subsequent kernel call uses them. Values not overridden stay at the Module's declared defaults.

Source code in manta/codegen/numpy/_runtime.py
def set_parameters(self, values: dict[str, Any]) -> None:
    """Override promoted-parameter values (full or suffix names);
    every subsequent kernel call uses them. Values not overridden
    stay at the Module's declared defaults."""
    if self._param_port is None:
        raise ValueError(
            f"{self.module.name}: module declares no parameter port — "
            f"build the transform with parameters=[...] to promote "
            f"tunable Parameters.")
    names = [f.name for f in self._param_port.fields]
    dims = {f.name: f.dim for f in self._param_port.fields}
    for k, v in values.items():
        full = resolve_suffix(k, names, label="parameter",
                              who=type(self).__name__)
        arr = np.asarray(v, dtype=float).ravel()
        if arr.size != dims[full]:
            raise ValueError(
                f"set_parameters: {full!r} expects {dims[full]} "
                f"value(s), got {arr.size}.")
        self._param_overrides[full] = arr

param_vector

param_vector()

The flat promoted-parameter vector: declared defaults merged with set_parameters overrides, in port-field order.

Source code in manta/codegen/numpy/_runtime.py
def param_vector(self) -> np.ndarray:
    """The flat promoted-parameter vector: declared defaults merged
    with `set_parameters` overrides, in port-field order."""
    port = self._param_port
    if port is None:
        return np.zeros(0)
    return pack_fields(port.fields, self._param_overrides,
                       default=lambda f: f.default, who="param_vector")

manta.codegen.NumpySim

NumpySim(module)

Bases: NumpyRuntime

The simulation oracle. The runtime holds the nested state dict (sim.state); step(dt, u={...}) applies the commands and advances it, realizing that step's sensor readings. The kernel is pure: rate gating is the loop's job — build a rate_gate(...) / command_latch() from the model's declared rates and apply them yourself (uniform across backends). Read sensors with outputs() (raw nested) or reading(name) (one, by name).

Source code in manta/codegen/numpy/_sim.py
def __init__(self, module) -> None:
    super().__init__(module)
    self._driver: NoiseDriver | None = None
    self._outputs: dict[str, dict[str, Any]] = {}
    self._sim_state: dict | None = None
    self._stepn_cache: dict[int, Any] = {}   # n → folded step kernel

state property writable

state

The held nested state (lazy-seeded; mutate in place to set commands or override slots).

initial_state

initial_state()

Fresh nested initial state: the manifold slots' defaults plus input/noise placeholder entries (commands you may set; noise seeds that stay at zero — a NoiseDriver draw never enters the dict).

Source code in manta/codegen/numpy/_sim.py
def initial_state(self) -> dict[str, dict[str, Any]]:
    """Fresh nested initial state: the manifold slots' defaults plus
    input/noise placeholder entries (commands you may set; noise seeds
    that stay at zero — a `NoiseDriver` draw never enters the dict)."""
    nested = self._spec.to_nested(self.module.state.field("x").init)
    for f in self._u_fields():
        owner, rest = _split(f.name)
        nested.setdefault(owner, {}).setdefault(rest, f.default)
    if self._noise_port is not None:
        for f in self._noise_port.fields:
            owner, rest = _split(f.name)
            nested.setdefault(owner, {}).setdefault(
                rest, np.zeros(f.dim) if f.dim > 1 else 0.0)
    return nested

step

step(dt, *, t=None, u=None)

Advance the held state by dt. u is {input: value} (full or suffix names) applied this step over the held sim.state inputs. Runs the oracle kernel (one noise draw) and returns the new state dict. Rate-gated actuators: pass a command_latch()-held u.

Source code in manta/codegen/numpy/_sim.py
def step(self, dt: float, *, t: float | None = None,
         u: dict[str, Any] | None = None
         ) -> dict[str, dict[str, Any]]:
    """Advance the held state by `dt`. `u` is `{input: value}` (full or
    suffix names) applied this step over the held `sim.state` inputs.
    Runs the oracle kernel (one noise draw) and returns the new state
    dict. Rate-gated actuators: pass a `command_latch()`-held `u`."""
    if isinstance(dt, dict):
        raise TypeError(
            "NumpySim.step: the functional step(state, dt) form was "
            "removed — pass commands as step(dt, u={...}).")
    t0 = self._t if t is None else t
    self._sim_state = self._advance(self.state, float(dt), t0, u)
    self._t = t0 + float(dt)
    return self._sim_state

step_n

step_n(dt, n, *, t=None, u=None)

Advance n substeps of dt in ONE folded call — u commands held (ZOH) for the block, state chained through a mapaccum of the step kernel. Output readings + state are bit-identical to n sequential step(dt, u=u) calls; compiled (_enable_compile) it runs the whole inner loop in C.

Falls back to sequential stepping when a NoiseDriver is attached (a fresh stochastic draw per substep cannot be folded).

Source code in manta/codegen/numpy/_sim.py
def step_n(self, dt: float, n: int, *, t: float | None = None,
           u: dict[str, Any] | None = None
           ) -> dict[str, dict[str, Any]]:
    """Advance `n` substeps of `dt` in ONE folded call — `u` commands held
    (ZOH) for the block, state chained through a `mapaccum` of the step
    kernel. Output readings + state are bit-identical to `n` sequential
    `step(dt, u=u)` calls; compiled (`_enable_compile`) it runs the whole
    inner loop in C.

    Falls back to sequential stepping when a `NoiseDriver` is attached (a
    fresh stochastic draw per substep cannot be folded)."""
    if n <= 1 or self._driver is not None:
        for k in range(int(n)):
            self.step(dt, t=None if t is None else t + k * dt, u=u)
        return self._sim_state
    t0 = self._t if t is None else t
    self._sim_state = self._advance_n(self.state, float(dt), int(n), t0, u)
    self._t = t0 + n * float(dt)
    return self._sim_state

outputs

outputs()

Sensor readings from the most recent step (nested, realized with that step's noise draw).

Source code in manta/codegen/numpy/_sim.py
def outputs(self) -> dict[str, dict[str, Any]]:
    """Sensor readings from the most recent step (nested, realized
    with that step's noise draw)."""
    return self._outputs

attach_driver

attach_driver(driver)

Attach a stochastic NoiseDriver: every active (σ>0) channel of the Module's NOISE port is sampled each step, so truth is noisy with the very σ the EKF reads for R/Q. Without one the sim is a noiseless oracle.

Source code in manta/codegen/numpy/_sim.py
def attach_driver(self, driver: NoiseDriver) -> NoiseDriver:
    """Attach a stochastic `NoiseDriver`: every active (σ>0) channel of
    the Module's NOISE port is sampled each step, so truth is noisy
    with the very σ the EKF reads for R/Q. Without one the sim is a
    noiseless oracle."""
    if self._noise_port is None:
        raise ValueError(
            f"{self.module.name}: module declares no noise port.")
    driver.bind(self._noise_port.fields)
    self._driver = driver
    return driver

reading

reading(name)

The latest raw reading for a sensor (full or suffix name) from the most recent step. Readings are realized every step; gate feeding yourself with a rate_gate(name) if the sensor is rate-limited.

Source code in manta/codegen/numpy/_sim.py
def reading(self, name: str) -> Any:
    """The latest raw reading for a sensor (full or suffix name) from the
    most recent step. Readings are realized every step; gate feeding
    yourself with a `rate_gate(name)` if the sensor is rate-limited."""
    full = resolve_suffix(name, [p.name for p in self._meas_ports_ir],
                          label="output", who=type(self).__name__)
    owner, slot = _split(full)
    return self._outputs.get(owner, {}).get(slot)

rate_gate

rate_gate(name)

A RateGate at sensor name's declared rate (fires every step if the sensor has none). Apply it in your loop: if g.due(t): ekf.update(name, sim.reading(name), u=u).

Source code in manta/codegen/numpy/_sim.py
def rate_gate(self, name: str) -> RateGate:
    """A `RateGate` at sensor `name`'s declared rate (fires every step if
    the sensor has none). Apply it in your loop:
    `if g.due(t): ekf.update(name, sim.reading(name), u=u)`."""
    full = resolve_suffix(name, [p.name for p in self._meas_ports_ir],
                          label="output", who=type(self).__name__)
    return RateGate(self.module.port(full).rate)

command_latch

command_latch()

A CommandLatch over the actuators' declared intake rates (ZOH). Apply it once per step and pass the held u to BOTH sim.step and ekf.predict, so the filter predicts on the command truth acted on.

Source code in manta/codegen/numpy/_sim.py
def command_latch(self) -> CommandLatch:
    """A `CommandLatch` over the actuators' declared intake rates (ZOH).
    Apply it once per step and pass the held `u` to BOTH `sim.step` and
    `ekf.predict`, so the filter predicts on the command truth acted on."""
    return CommandLatch({f.name: f.rate for f in self._u_fields()
                         if f.rate is not None})

manta.codegen.NumpyFilter

NumpyFilter(module)

Bases: NumpyRuntime

A predict/update filter over a held x/P, with baked per-sensor update kernels — the same surface every backend emits.

You own the loop, identically in numpy and C++: fold each fresh measurement at the pre-predict state, then predict.

for nm in sensors:
    if gate[nm].due(t):
        ekf.update(nm, sim.reading(nm), u=u)   # update-then-...
ekf.predict(dt, u=u)                           # ...-predict

The update-then-predict order is yours to keep: a reading sampled at the interval start belongs against the current (pre-predict) state.

Source code in manta/codegen/numpy/_filter.py
def __init__(self, module) -> None:
    super().__init__(module)
    self._Q: np.ndarray | None = None        # default process noise

Q property writable

Q

Default process noise for predict (overridden per-call by predict(dt, Q=...); None uses the model's baked L Σ Lᵀ).

state_dict

state_dict()

Current estimate nested by owner.

Source code in manta/codegen/numpy/_filter.py
def state_dict(self) -> dict[str, dict[str, Any]]:
    """Current estimate nested by owner."""
    return self._spec.to_nested(self._state["x"])

reset

reset(state=None, *, P=None)

Reset held state. state is a nested/flat dict merged over the Module's declared initial values, or a flat ambient vector taken verbatim; P resets the covariance.

Source code in manta/codegen/numpy/_filter.py
def reset(self, state: dict | None = None, *,
          P: np.ndarray | None = None) -> None:
    """Reset held state. `state` is a nested/flat dict merged over
    the Module's declared initial values, or a flat ambient vector
    taken verbatim; `P` resets the covariance."""
    x_field = self.module.state.field("x")
    if state is not None:
        self._state["x"] = self._spec.pack_any(state, base=x_field.init)
    elif P is None:
        self._state["x"] = np.asarray(
            x_field.init, dtype=float).reshape(-1).copy()
        pf = self.module.state.field("P")
        self._state["P"] = np.asarray(
            pf.init, dtype=float).reshape(pf.shape).copy()
    if P is not None:
        P = np.asarray(P, dtype=float)
        expected = (self._spec.tangent_dim, self._spec.tangent_dim)
        if P.shape != expected:
            raise ValueError(
                f"reset: P shape {P.shape} doesn't match tangent dim "
                f"{expected}")
        self._state["P"] = P.copy()

predict

predict(dt, *, t=0.0, u=None, Q=None)

Advance the estimate by dt. Process noise: an explicit Q, else self.Q, else the model's baked L Σ Lᵀ. u is {input: value} (unset inputs fall to the Module's declared defaults); pass the same held u truth ran on.

Source code in manta/codegen/numpy/_filter.py
def predict(self, dt: float, *, t: float = 0.0,
            u: dict[str, Any] | None = None,
            Q: np.ndarray | None = None) -> None:
    """Advance the estimate by `dt`. Process noise: an explicit `Q`, else
    `self.Q`, else the model's baked `L Σ Lᵀ`. `u` is `{input: value}`
    (unset inputs fall to the Module's declared defaults); pass the same
    held `u` truth ran on."""
    self._predict_kernel(dt, t, self.build_u(u),
                         Q if Q is not None else self._Q)

update

update(target, z=None, R=None, *, t=0.0, u=None)

Fold one measurement at the current state.

  • update("gps.position", z) — by sensor name (full or suffix), through the baked Joseph-update kernel.
  • update(h_sym, z, R=R) — a caller-supplied h(x) callable + measurement covariance (custom measurements; numpy-only).
Source code in manta/codegen/numpy/_filter.py
def update(self, target, z=None, R=None, *, t: float = 0.0,
           u: dict[str, Any] | None = None) -> None:
    """Fold one measurement at the current state.

    * `update("gps.position", z)` — by sensor name (full or suffix),
      through the baked Joseph-update kernel.
    * `update(h_sym, z, R=R)` — a caller-supplied `h(x)` callable +
      measurement covariance (custom measurements; numpy-only).
    """
    if callable(target):
        if z is None or R is None:
            raise TypeError("update(h_sym, z, R=...): z and R required")
        return self._update_custom(target, z, R)
    self._fold_sensor(self._resolve_sensor(target), z,
                      self.build_u(u), t=t)

manta.codegen.NumpyRegulator

NumpyRegulator(module)

Bases: NumpyRuntime

A stateless control law: map a state estimate to commands via control(estimate) -> {input: value}.

Holds the live reference point x_ref (seeded from the Module's built operating point); retarget() moves it at runtime.

Source code in manta/codegen/numpy/_regulator.py
def __init__(self, module) -> None:
    super().__init__(module)
    st = module.ports_by_role(Role.STATE)
    self._ref_port = st[1] if len(st) > 1 else None
    self._x_ref = (np.asarray(self._ref_port.init, dtype=float)
                   .reshape(-1).copy() if self._ref_port is not None
                   else np.asarray(self._x_port.init,
                                   dtype=float).reshape(-1).copy())

x_ref property

x_ref

The live reference point (flat ambient vector).

retarget

retarget(state)

Move the reference point the law regulates to (nested or flat dict, merged over the CURRENT reference). The gain K is NOT re-solved: exact wherever the dynamics are invariant along the moved direction (e.g. translating a hover setpoint); build a new LQR for a genuinely different operating point (new A/B or trim).

Source code in manta/codegen/numpy/_regulator.py
def retarget(self, state: dict) -> None:
    """Move the reference point the law regulates to (nested or flat
    dict, merged over the CURRENT reference). The gain K is NOT
    re-solved: exact wherever the dynamics are invariant along the
    moved direction (e.g. translating a hover setpoint); build a new
    LQR for a genuinely different operating point (new A/B or trim).
    """
    if self._ref_port is None:
        raise AttributeError(
            f"{type(self).__name__}: module {self.module.name!r} has "
            "no reference port — its control law is not retargetable.")
    self._x_ref = self._ref_port.manifold.pack_any(state,
                                                   base=self._x_ref)

u

u(x_flat)

Control vector for a flat ambient state (full-spec layout).

Source code in manta/codegen/numpy/_regulator.py
def u(self, x_flat) -> np.ndarray:
    """Control vector for a flat ambient state (full-spec layout)."""
    vals = {"x": np.asarray(x_flat, dtype=float)}
    if self._ref_port is not None:
        vals[self._ref_port.name] = self._x_ref
    return self.call("control", vals)["u"]

control

control(state)

Map a state estimate (nested or flat dict) → {input: value}, merged over the live reference point (unsupplied slots sit at the reference, i.e. zero error).

Source code in manta/codegen/numpy/_regulator.py
def control(self, state: dict) -> dict[str, Any]:
    """Map a state estimate (nested or flat dict) → `{input: value}`,
    merged over the live reference point (unsupplied slots sit at
    the reference, i.e. zero error)."""
    x = self._x_port.manifold.pack_any(state, base=self._x_ref)
    return unpack_fields(self._u_fields(), self.u(x))

manta.codegen.NumpyRecurrence

NumpyRecurrence(module)

Bases: NumpyRuntime

A stateful dataflow block (PID, Madgwick, …): step(dt, **inputs) advances the held state and computes the readouts.

Source code in manta/codegen/numpy/_recurrence.py
def __init__(self, module) -> None:
    super().__init__(module)
    self._y = np.zeros(self._y_port.size)

state property

state

Held state, {slot: value} by name.

reset

reset()

Reset the held state to the Module's declared initial values.

Source code in manta/codegen/numpy/_recurrence.py
def reset(self) -> None:
    """Reset the held state to the Module's declared initial values."""
    x_field = self.module.state.field("x")
    self._state["x"] = np.asarray(
        x_field.init, dtype=float).reshape(-1).copy()
    self._y = np.zeros(self._y_port.size)

readouts

readouts()

Last-computed readouts by output-field name (scalars unwrapped).

Source code in manta/codegen/numpy/_recurrence.py
def readouts(self) -> dict[str, Any]:
    """Last-computed readouts by output-field name (scalars unwrapped)."""
    return unpack_fields(self._y_port.fields, self._y)

Rate helpers

manta.RateGate

RateGate(rate)

Window gate: due(t) fires once per 1/rate window (and records it); rate is None fires every call. Holds only the last fire time — the one bit of state a sample-and-hold window needs.

Source code in manta/rates.py
def __init__(self, rate: float | None) -> None:
    self.rate = rate
    self._last: float | None = None

manta.CommandLatch

CommandLatch(rates)

Zero-order-hold for rate-gated control inputs. latch(inputs, t) returns inputs with every gated entry replaced by the value latched at its last window boundary — mid-window writes are ignored until the next boundary. Apply it once per step and pass the held dict to BOTH sim.step and ekf.predict, so the filter predicts on the same held command truth acted on. Ungated inputs pass through untouched (and an empty gate set is a no-op that returns the dict unchanged).

Source code in manta/rates.py
def __init__(self, rates: dict[str, float]) -> None:
    self._gates = {n: RateGate(r) for n, r in rates.items()}
    self._held: dict[str, Any] = {}