Skip to content

System identification

Fit a model's physical parameters (Fit) or its noise model (NoiseFit) to recorded data. See Fit parameters from a log for a worked recipe.

Parameter fit

manta.Fit

Fit(world, parameters)

MAP parameter fit over recorded windows (see module docstring).

Args: world — the model. Finalized by the internal Sim; the fit never mutates it (until result.apply()). parameters — {param name/suffix: Prior | None}. Names resolve against the model's promotable Parameters (<craft>.<part>.<param>); None = flat prior (only safe for parameters the data fully observes).

Source code in manta/fit/_map.py
def __init__(self, world, parameters: dict) -> None:
    self.world = world
    self.sim = Sim(world, parameters=list(parameters))
    self.module = self.sim.module()
    self._spec = self.module.spec
    port = self.module.port("params")
    fulls = [f.name for f in port.fields]
    by_full = {resolve_suffix(k, fulls, label="parameter", who="Fit"): v
               for k, v in parameters.items()}

    self._blocks: list[_Block] = []
    off = 0
    for f in port.fields:           # port order == kernel `p` layout
        self._blocks.append(_Block(
            f.name, f.dim, off,
            np.asarray(f.default, dtype=float).ravel(),
            by_full.get(f.name)))
        off += f.dim
    self.n_v = off
    self._stepk_cache: dict[int, ca.Function] = {}

solve

solve(windows, *, weights=None, verbose=False, ipopt_options=None)

Build the windowed prediction-error + prior NLP and solve it.

Args: windows — the recorded data (≥ 1 Window). weights — optional per-sensor scalar weights on the squared residuals ({sensor name/suffix: w}); use 1/σ_meas² to whiten mixed-unit sensors. Default 1. verbose — IPOPT iteration output. ipopt_options — extra nlpsol options, merged last.

Source code in manta/fit/_map.py
def solve(self, windows: list[Window], *, weights: dict | None = None,
          verbose: bool = False,
          ipopt_options: dict | None = None) -> FitResult:
    """Build the windowed prediction-error + prior NLP and solve it.

    Args:
        windows — the recorded data (≥ 1 `Window`).
        weights — optional per-sensor scalar weights on the squared
                  residuals (`{sensor name/suffix: w}`); use
                  `1/σ_meas²` to whiten mixed-unit sensors. Default 1.
        verbose — IPOPT iteration output.
        ipopt_options — extra `nlpsol` options, merged last.
    """
    if not windows:
        raise ValueError("Fit.solve: needs at least one Window.")

    v = ca.MX.sym("v", self.n_v, 1)
    p = ca.vertcat(*[b.p_of_v(v[b.offset:b.offset + b.dim])
                     for b in self._blocks])

    meas_names = [pt.name for pt in
                  self.module.ports_by_role(Role.MEASUREMENT)]
    w_by_full: dict[str, float] = {}
    for k, val in (weights or {}).items():
        full = resolve_suffix(k, meas_names, label="sensor", who="Fit")
        w_by_full[full] = float(val)

    loss = ca.MX(0.0)
    residuals: list[ca.MX] = []
    for w in windows:
        loss, residuals = self._add_window(
            w, p, loss, residuals, w_by_full, meas_names)

    # MAP prior term (skipped for flat-prior components).
    loss = loss + prior_penalty(v, self._blocks)

    v_opt, objective, stats = solve_blocks_nlp(
        "fit", v, loss, self._blocks,
        verbose=verbose, ipopt_options=ipopt_options)

    # Data-only Gauss-Newton information JᵀJ at the solution.
    r = ca.vertcat(*residuals) if residuals else ca.MX.zeros(0, 1)
    J_fn = ca.Function("J", [v], [ca.jacobian(r, v)])
    J = np.asarray(ca.DM(J_fn(v_opt)))
    JtJ = J.T @ J

    return FitResult(self._blocks, v_opt, JtJ, objective,
                     stats, self.world)

manta.FitResult

FitResult(blocks, v_opt, JtJ, objective, stats, world)

Fitted values + Gauss-Newton posterior diagnostics.

Attributes: values — {full param name: fitted value} (float for scalars, ndarray for vectors). labels — one entry per fitted scalar component. log_scale — per-component bool; True ⇒ the sigmas below are RELATIVE (log-space). prior_sigma — per-component prior σ (inf = no prior). posterior_sigma — per-component Gauss-Newton posterior σ from (JᵀJ + Σ₀⁻¹)⁻¹. ≈ prior σ ⇒ the data did not inform this component. JtJ — data-only Gauss-Newton information matrix in decision space; its small eigenvalues are the unidentifiable directions. objective — final loss value. stats — IPOPT return stats. converged — IPOPT's success flag; False ⇒ the values below are the failed solve's final iterate (a RuntimeWarning was emitted), not an optimum.

Source code in manta/fit/_map.py
def __init__(self, blocks, v_opt, JtJ, objective, stats, world) -> None:
    self._blocks = blocks
    self._world = world
    self.v = np.asarray(v_opt, dtype=float).ravel()
    self.JtJ = JtJ
    self.objective = float(objective)
    self.stats = stats
    self.converged = solver_converged(stats, who="Fit")

    self.values: dict[str, object] = {}
    self.labels: list[str] = []
    self.log_scale: list[bool] = []
    prior_sig = []
    for b in blocks:
        theta = b.theta_of_v(self.v[b.offset:b.offset + b.dim])
        self.values[b.full] = float(theta[0]) if b.dim == 1 else theta
        self.labels += b.labels()
        self.log_scale += [b.log] * b.dim
        prior_sig.append(b.sigma)
    self.prior_sigma = np.concatenate(prior_sig)

    prior_prec = np.where(np.isinf(self.prior_sigma), 0.0,
                          1.0 / np.square(self.prior_sigma))
    # eigh-based: flat-prior components the data never touched come
    # back inf, without poisoning the identified ones.
    self.posterior_sigma = laplace_sigma(JtJ + np.diag(prior_prec))

weak_directions

weak_directions(k=3)

The k least-informed directions of the DATA alone: list of (eigenvalue, {label: component}) for the smallest eigenvalues of JᵀJ. A near-zero eigenvalue is an unidentifiable parameter combination (e.g. the thrust/mass scale).

Source code in manta/fit/_map.py
def weak_directions(self, k: int = 3):
    """The `k` least-informed directions of the DATA alone: list of
    `(eigenvalue, {label: component})` for the smallest eigenvalues
    of JᵀJ. A near-zero eigenvalue is an unidentifiable parameter
    combination (e.g. the thrust/mass scale)."""
    vals, vecs = np.linalg.eigh(self.JtJ)
    out = []
    for i in range(min(k, len(vals))):
        comp = {lbl: float(vecs[j, i])
                for j, lbl in enumerate(self.labels)
                if abs(vecs[j, i]) > 1e-3}
        out.append((float(vals[i]), comp))
    return out

apply

apply()

Write the fitted values back onto the world's Part instances. A transform built afterwards (Sim(world), EKF(world), a C++ deploy) bakes them in as constants.

Source code in manta/fit/_map.py
def apply(self) -> None:
    """Write the fitted values back onto the world's Part instances.
    A transform built afterwards (`Sim(world)`, `EKF(world)`, a C++
    deploy) bakes them in as constants."""
    if not self.converged:
        warnings.warn(
            "FitResult.apply: the solve did NOT converge — writing the "
            "failed solve's final iterate onto the parts.",
            RuntimeWarning, stacklevel=2)
    for b in self._blocks:
        craft_name, part_name, pname = b.full.split(".", 2)
        craft = next(c for c in self._world.crafts
                     if c.name == craft_name)
        part = next(p for p in craft.parts if p.name == part_name)
        theta = b.theta_of_v(self.v[b.offset:b.offset + b.dim])
        setattr(part, pname,
                float(theta[0]) if b.dim == 1 else tuple(theta))

summary

summary()

Per-component table: fitted value, prior σ vs posterior σ. post/prior ≈ 1 flags a component the data did not inform — its fitted value is your prior talking, not the flight.

Source code in manta/fit/_map.py
def summary(self) -> str:
    """Per-component table: fitted value, prior σ vs posterior σ.
    `post/prior ≈ 1` flags a component the data did not inform —
    its fitted value is your prior talking, not the flight."""
    rows = [("parameter", "fitted", "prior σ", "post σ", "post/prior")]
    i = 0
    for b in self._blocks:
        theta = b.theta_of_v(self.v[b.offset:b.offset + b.dim])
        for j, lbl in enumerate(b.labels()):
            pri, post = self.prior_sigma[i], self.posterior_sigma[i]
            ratio = ("—" if np.isinf(pri) or np.isinf(post)
                     else f"{post / pri:.3f}")
            unit = " (rel)" if b.log else ""
            rows.append((lbl, f"{theta[j]:.6g}",
                         ("inf" if np.isinf(pri)
                          else f"{pri:.3g}{unit}"),
                         ("inf" if np.isinf(post)
                          else f"{post:.3g}{unit}"),
                         ratio))
            i += 1
    return (convergence_line(self.converged, self.stats) + "\n"
            + format_table(rows))

Noise fit

manta.NoiseFit

NoiseFit(world, noise, *, sensors=None)

Innovation-NLL fit of noise σ values (see module docstring).

Args: world — the model (dynamics/geometry at their — ideally already fitted — declared values). noise — {channel name/suffix: Prior | None}. Channel names are the declaration names (drone.imu.gyro_noise, drone.imu.gyro_bias); priors are relative (log-space), None = flat. sensors — measurement outputs the filter consumes (default: all with traces required in every window).

Source code in manta/fit/_nll.py
def __init__(self, world, noise: dict, *,
             sensors: list[str] | None = None) -> None:
    self.world = world
    self.sys = LinearizedSystem(world, sensors=sensors)
    sys = self.sys

    # Resolve requested channels against the tick's noise vector.
    aliases = []
    for spec in sys.noise_specs:
        aliases.append(spec.full[:-len("_driver")]
                       if spec.full.endswith("_driver") else spec.full)
    chosen: dict[int, Prior | None] = {}
    for key, prior in noise.items():
        alias = resolve_suffix(key, aliases, label="noise channel",
                               who="NoiseFit")
        chosen[aliases.index(alias)] = prior
    self.channels = [
        _Channel(sys.noise_specs[idx], k, prior)
        for k, (idx, prior) in enumerate(sorted(chosen.items()))]
    if not self.channels:
        raise ValueError(
            "NoiseFit: no noise channels selected — name at least one "
            "channel in noise={...}.")
    self._chan_by_spec = {c.spec.full: c for c in self.channels}
    self.n_s = len(self.channels)

    self._step_fn = self._build_step()
    self._fold_cache: dict[int, ca.Function] = {}
    self._validate_R0()

solve

solve(windows, *, P0=1e-06, verbose=False, ipopt_options=None)

Minimize the windows' total innovation NLL + prior over log-σ.

Args: windows — recorded data; every chosen sensor needs a trace in every window. P0 — initial tangent covariance per window, P0 · I. Keep small when x0 is trusted (synthetic truth); grow it for estimator-seeded initial states.

Source code in manta/fit/_nll.py
def solve(self, windows: list[Window], *, P0: float = 1e-6,
          verbose: bool = False,
          ipopt_options: dict | None = None) -> NoiseFitResult:
    """Minimize the windows' total innovation NLL + prior over log-σ.

    Args:
        windows — recorded data; every chosen sensor needs a trace
                  in every window.
        P0      — initial tangent covariance per window, `P0 · I`.
                  Keep small when `x0` is trusted (synthetic truth);
                  grow it for estimator-seeded initial states.
    """
    if not windows:
        raise ValueError("NoiseFit.solve: needs at least one Window.")
    sys = self.sys
    spec = sys.spec
    tan = spec.tangent_dim
    s = ca.MX.sym("s", self.n_s, 1)

    total = ca.MX(0.0)
    for w in windows:
        x0, U, Z, K = self._window_arrays(w)
        fold = self._fold(K)
        res = fold(ca.DM(x0),
                   ca.DM((P0 * np.eye(tan)).reshape(-1, 1)),
                   ca.DM(U) if U.size else ca.DM(0, K),
                   ca.DM(Z),
                   ca.repmat(s, 1, K),
                   ca.repmat(ca.DM(float(w.dt)), 1, K),
                   ca.DM(np.array([[w.t0 + i * w.dt
                                    for i in range(K)]])))
        total = total + ca.sum2(res[2])

    # ½‖(s − s̄)/σ‖² prior (skipped for flat-prior channels).
    total = total + prior_penalty(s, self.channels, weight=0.5)

    s_opt, objective, stats = solve_blocks_nlp(
        "noise_fit", s, total, self.channels,
        verbose=verbose, ipopt_options=ipopt_options)

    H_fn = ca.Function("H", [s], [ca.hessian(total, s)[0]])
    hessian = np.asarray(ca.DM(H_fn(s_opt)))

    return NoiseFitResult(self.channels, s_opt, hessian, objective,
                          stats, self.world)

manta.NoiseFitResult

NoiseFitResult(channels, s_opt, hessian, objective, stats, world)

Fitted σ per channel + Laplace posterior diagnostics.

prior_sigma / posterior_sigma are RELATIVE (log-space) widths; posterior ≈ prior means the data didn't inform that σ. converged is IPOPT's success flag — False ⇒ the values are the failed solve's final iterate (a RuntimeWarning was emitted), not an optimum.

Source code in manta/fit/_nll.py
def __init__(self, channels, s_opt, hessian, objective, stats,
             world) -> None:
    self._channels = channels
    self._world = world
    self.s = np.asarray(s_opt, dtype=float).ravel()
    self.objective = float(objective)
    self.stats = stats
    self.converged = solver_converged(stats, who="NoiseFit")
    self.values = {c.alias: float(np.exp(self.s[c.offset]))
                   for c in channels}
    self.labels = [c.alias for c in channels]
    self.prior_sigma = np.concatenate([c.sigma for c in channels])
    # eigh-based: a non-PD direction (indefinite/near-singular Laplace
    # Hessian) reports inf — never a fake "perfectly identified" 0.
    self.posterior_sigma = laplace_sigma(hessian)

apply

apply()

Write the fitted σ back onto the owning parts (<channel>_sigma attributes); transforms built afterwards (an EKF(world)'s auto-Q/R, a NoiseDriverd truth sim) use them.

Source code in manta/fit/_nll.py
def apply(self) -> None:
    """Write the fitted σ back onto the owning parts
    (`<channel>_sigma` attributes); transforms built afterwards
    (an `EKF(world)`'s auto-Q/R, a `NoiseDriver`d truth sim) use
    them."""
    if not self.converged:
        warnings.warn(
            "NoiseFitResult.apply: the solve did NOT converge — "
            "writing the failed solve's final iterate onto the parts.",
            RuntimeWarning, stacklevel=2)
    for c in self._channels:
        setattr(c.spec.owner, f"{c.decl_name}_sigma",
                self.values[c.alias])

Inputs

manta.Window dataclass

Window(x0, u=dict(), z=dict(), dt=0.01, t0=0.0)

One fitting window: a short recorded rollout.

Args: x0 — nested initial state dict (the sim.state shape: {craft: {slot: value}}). Slots omitted fall back to the world's initial state. u — recorded controls: {input name/suffix: scalar | (K,)}. A scalar is held for the whole window; inputs omitted hold their model default. z — recorded sensor readings: {sensor name/suffix: (K, dim) | (K,)}. Row k is the reading produced by step k (the step taken FROM state k). For Fit, only sensors present here enter the loss; for NoiseFit, every chosen sensor needs a trace. dt — fixed step, seconds. t0 — world-clock time of x0.

manta.Prior dataclass

Prior(sigma=None, mean=None, log=False)

Gaussian prior on one fitted parameter.

Args: sigma — 1-σ width. Scalar (isotropic across the parameter's components) or a per-component sequence. With log=True it is RELATIVE (log-space): sigma=0.3 ≈ ±30%. mean — prior mean. None (default) → the model's declared value. log — fit log(p) instead of p, elementwise. Strictly- positive parameters only (mass, moi, a thrust magnitude along one axis). Keeps every component positive with no constraint and makes pure scale ambiguities linear. NoiseFit ignores this flag — noise σ is ALWAYS fit in log-space, and sigma there is always relative.