@dataclass class XModStep: """Result of a model step.""" outputs: Dict[str, np.ndarray] # port name -> value new_time: float event_occurred: bool = False
# Log results history = [] def log(t, outputs): x = outputs["mass_spring"].get("x", [0])[0] history.append((t, x)) print(f"t={t:.3f}, x={x:.4f}")
def set_state(self, state: Dict): self.x = state["x"] self.v = state["v"] Example: Simple controller (P-controller) ---------------------------------------------------------------------- class PController(XModModel): """Outputs F = Kp * (x_ref - x_measured).""" xmod co-simulation
I'll write it as a with a small demo. """ xmod co-simulation — a lightweight modular co-simulation framework. Models exchange data via ports and advance with their own solvers. Master coordinates time steps and data exchange. """ import numpy as np from dataclasses import dataclass from typing import Dict, List, Callable from abc import ABC, abstractmethod ---------------------------------------------------------------------- Core xmod types ---------------------------------------------------------------------- @dataclass class XModPort: """Defines a data port for a model.""" name: str shape: tuple = () dtype: type = float
@abstractmethod def step(self, t: float, dt: float, inputs: Dict[str, np.ndarray]) -> XModStep: """Advance model by dt from t with given inputs.""" pass @dataclass class XModStep: """Result of a model step
def run(self, t_start: float, t_end: float, log_callback: Callable = None): t = t_start outputs = {name: {} for name in self.models} while t < t_end - 1e-12: # Gather all inputs for each model from previous outputs inputs_for = {name: {} for name in self.models} for fm, fp, tm, tp in self.connections: if fm in outputs and fp in outputs[fm]: inputs_for[tm][tp] = outputs[fm][fp] # Step each model new_outputs = {} for name, model in self.models.items(): step_result = model.step(t, self.dt, inputs_for[name]) new_outputs[name] = step_result.outputs outputs = new_outputs t += self.dt if log_callback: log_callback(t, outputs)
def run_with_iteration(self, t_start: float, t_end: float, max_iter: int = 10): """Jacobi-style fixed-point iteration at each time step.""" t = t_start outputs = {name: {} for name in self.models} while t < t_end - 1e-12: converged = False for _iter in range(max_iter): inputs_for = {name: {} for name in self.models} for fm, fp, tm, tp in self.connections: if fm in outputs and fp in outputs[fm]: inputs_for[tm][tp] = outputs[fm][fp] new_outputs = {} for name, model in self.models.items(): step_result = model.step(t, self.dt, inputs_for[name]) new_outputs[name] = step_result.outputs # Check convergence (norm of output change) diff = 0.0 for name in self.models: for port in new_outputs[name]: old_val = outputs[name].get(port, 0) diff += np.linalg.norm(new_outputs[name][port] - old_val) outputs = new_outputs if diff < 1e-8: converged = True break print(f"t={t:.3f}, iter={_iter+1}, converged={converged}") t += self.dt Demo ---------------------------------------------------------------------- if name == " main ": # Build system: controller -> spring-mass-damper plant = SpringMassDamper("mass_spring", m=1.0, k=10.0, c=0.5) ctrl = PController("controller", Kp=20.0, x_ref=1.0) Master coordinates time steps and data exchange
@abstractmethod def get_state(self) -> Dict: """For checkpoint/rollback.""" pass