diff --git a/docs/user/index.rst b/docs/user/index.rst index 5afaee3a6..580b9c612 100644 --- a/docs/user/index.rst +++ b/docs/user/index.rst @@ -24,6 +24,7 @@ RocketPy's User Guide :caption: Special Case Simulations Compare Flights Class + Parachute Triggers (Acceleration-Based) Deployable Payload Air Brakes Example ../notebooks/sensors.ipynb diff --git a/docs/user/parachute_triggers.rst b/docs/user/parachute_triggers.rst new file mode 100644 index 000000000..638ec1731 --- /dev/null +++ b/docs/user/parachute_triggers.rst @@ -0,0 +1,343 @@ +Acceleration-Based Parachute Triggers +====================================== + +RocketPy supports advanced parachute deployment logic using acceleration data +from simulated IMU (Inertial Measurement Unit) sensors. This enables realistic +avionics algorithms that mimic real-world flight computers. + +Overview +-------- + +Traditional parachute triggers rely on altitude and velocity. Acceleration-based +triggers provide additional capabilities: + +- **Motor burnout detection**: Implement as a custom trigger with mission-specific thresholds +- **Apogee detection**: Use acceleration and velocity together for precise apogee +- **Freefall detection**: Identify ballistic coasting phases +- **Liftoff detection**: Confirm motor ignition via high acceleration + +These triggers can optionally include sensor noise to simulate realistic IMU +behavior, making simulations more representative of actual flight conditions. + +Built-in Trigger +---------------- + +RocketPy provides one built-in trigger string: + +Apogee Detection +~~~~~~~~~~~~~~~~ + +Detects apogee when the rocket starts descending. + +.. code-block:: python + + main = Parachute( + name="Main", + cd_s=10.0, + trigger="apogee", + sampling_rate=100, + lag=0.5 + ) + +**Detection criteria:** +- Vertical velocity becomes negative (descent starts) + +Numeric Altitude Trigger +------------------------ + +You can also pass a numeric altitude (int or float) to deploy at a fixed height +while descending. + +.. code-block:: python + + main = Parachute( + name="Main", + cd_s=10.0, + trigger=400, # meters above ground level + sampling_rate=100, + lag=0.5 + ) + +Custom Triggers +--------------- + +You can create custom triggers that use acceleration data: + +.. code-block:: python + + from rocketpy import Parachute + +Custom Trigger: Motor Burnout +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Motor burnout is highly mission-dependent, so it is recommended as a custom +trigger with user-defined thresholds: + +Logic: check for a drop in vertical or total acceleration once the rocket is +above a minimum height and still ascending. + +.. code-block:: python + + def burnout_trigger_factory( + min_height=5.0, + min_vz=0.5, + az_threshold=-8.0, + total_acc_threshold=2.0, + ): + def burnout_trigger(_pressure, height, state_vector, u_dot): + if u_dot is None or len(u_dot) < 6: + return False + + ax, ay, az = u_dot[3], u_dot[4], u_dot[5] + total_acc = (ax**2 + ay**2 + az**2) ** 0.5 + vz = state_vector[5] if len(state_vector) > 5 else 0 + + if height < min_height or vz <= min_vz: + return False + + return az < az_threshold or total_acc < total_acc_threshold + + return burnout_trigger + +Usage: + +.. code-block:: python + + drogue = Parachute( + name="Drogue", + cd_s=1.0, + trigger=burnout_trigger_factory( + min_height=10.0, + min_vz=2.0, + az_threshold=-10.0, + total_acc_threshold=3.0, + ), + sampling_rate=100, + lag=1.5, + ) + +Custom Trigger: Apogee by Acceleration +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Logic: near-zero vertical velocity with downward acceleration. + +.. code-block:: python + + def apogee_acc_trigger(_pressure, _height, state_vector, u_dot): + if u_dot is None or len(u_dot) < 6: + return False + vz = state_vector[5] + az = u_dot[5] + # Apogee is indicated by near-zero vertical velocity and downward acceleration. + return abs(vz) < 1.0 and az < -0.1 + +Usage: + +.. code-block:: python + + main = Parachute( + name="Main", + cd_s=10.0, + trigger=apogee_acc_trigger, + sampling_rate=100, + lag=0.5 + ) + +Custom Trigger: Free-fall +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Logic: low total acceleration while descending above a small height. + +.. code-block:: python + + def freefall_trigger(_pressure, height, state_vector, u_dot): + if u_dot is None or len(u_dot) < 6: + return False + ax, ay, az = u_dot[3], u_dot[4], u_dot[5] + total_acc = (ax**2 + ay**2 + az**2) ** 0.5 + vz = state_vector[5] + # Free-fall is a low-acceleration phase while descending. + return height > 5.0 and vz < -0.2 and total_acc < 11.5 + +Usage: + +.. code-block:: python + + drogue = Parachute( + name="Drogue", + cd_s=1.0, + trigger=freefall_trigger, + sampling_rate=100, + lag=1.5 + ) + +Custom Trigger: Liftoff +~~~~~~~~~~~~~~~~~~~~~~ + +Logic: detect motor ignition by high total acceleration. + +.. code-block:: python + + def liftoff_trigger(_pressure, _height, _state_vector, u_dot): + if u_dot is None or len(u_dot) < 6: + return False + + ax, ay, az = u_dot[3], u_dot[4], u_dot[5] + total_acc = (ax**2 + ay**2 + az**2) ** 0.5 + return total_acc > 15.0 + +Usage: + +.. code-block:: python + + lift = Parachute( + name="Lift", + cd_s=0.5, + trigger=liftoff_trigger, + sampling_rate=100, + lag=0.1 + ) + +Custom Trigger: Threshold-based +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Logic: this is a generic threshold example showing how to implement custom +deployment criteria based on acceleration and velocity. Adjust thresholds and +conditions to match your specific mission requirements. + +.. code-block:: python + + def custom_trigger(pressure, height, state_vector, u_dot): + if u_dot is None or len(u_dot) < 6: + return False + + ax, ay, az = u_dot[3], u_dot[4], u_dot[5] + total_acc = (ax**2 + ay**2 + az**2)**0.5 + vz = state_vector[5] + return total_acc < 5.0 and vz < -10.0 + +Usage: + +.. code-block:: python + + parachute = Parachute( + name="Custom", + cd_s=2.0, + trigger=custom_trigger, + sampling_rate=100, + lag=1.0 + ) + +Custom Trigger: Sensors and Acceleration +---------------------------------------- + +Triggers can also accept sensor data alongside acceleration. + +Logic: compare measured vertical acceleration from an Accelerometer with the +computed ``u_dot`` value. + +.. code-block:: python + + def advanced_trigger(pressure, height, state_vector, sensors, u_dot): + if len(sensors) == 0: + return False + + acc_reading = sensors[0].measurement + if acc_reading is None or len(acc_reading) < 3: + return False + + if u_dot is None or len(u_dot) < 6: + return False + + meas_az = acc_reading[2] + az = u_dot[5] + return (az < -5.0) and (meas_az < -5.0) + +Usage: + +.. code-block:: python + + parachute = Parachute( + name="Advanced", + cd_s=1.5, + trigger=advanced_trigger, + sampling_rate=100 + ) + +.. note:: + + For realistic IMU behavior, use RocketPy sensors with their own noise models. + Parachute trigger functions can receive ``sensors`` and use those measurements + directly instead of injecting noise in the flight solver. See the + :doc:`Sensor Classes ` for available sensors. + +Example: Simulating Drogue Deployment +------------------------------------- + +Logic: in RocketPy, only one parachute is active at a time, so you can simulate +dual-deploy by treating the first trigger as a drogue event and the second as the +main deployment. + +.. code-block:: python + + from rocketpy import Rocket, Parachute, Flight, Environment + + # Environment and rocket setup + env = Environment(latitude=32.99, longitude=-106.97, elevation=1400) + env.set_atmospheric_model(type="standard_atmosphere") + + my_rocket = Rocket(...) # Define your rocket + + # Drogue parachute: Deploy using a custom burnout trigger + def drogue_burnout_trigger(_pressure, height, state_vector, u_dot): + if u_dot is None or len(u_dot) < 6: + return False + az = u_dot[5] + vz = state_vector[5] + return height > 10 and vz > 1 and az < -8.0 + + drogue = Parachute( + name="Drogue", + cd_s=1.0, + trigger=drogue_burnout_trigger, + sampling_rate=100, + lag=1.5, + noise=(0, 8.3, 0.5) # Pressure sensor noise + ) + my_rocket.add_parachute(drogue) + + # Main parachute: Deploy at 800m using a custom trigger + def main_deploy_trigger(_pressure, height, state_vector, u_dot): + if u_dot is None or len(u_dot) < 6: + return False + vz = state_vector[5] + az = u_dot[5] + return height < 800 and vz < -5 and az > -15 + + main = Parachute( + name="Main", + cd_s=10.0, + trigger=main_deploy_trigger, + sampling_rate=100, + lag=0.5, + noise=(0, 8.3, 0.5) + ) + my_rocket.add_parachute(main) + + # Flight simulation + flight = Flight( + rocket=my_rocket, + environment=env, + rail_length=5.2, + inclination=85, + heading=0, + ) + + flight.all_info() + +See Also +-------- + +- :doc:`Parachute Class Reference ` +- :doc:`Flight Simulation ` +- :doc:`Sensors and Controllers ` diff --git a/rocketpy/rocket/parachute.py b/rocketpy/rocket/parachute.py index 4e0318d18..f6791d852 100644 --- a/rocketpy/rocket/parachute.py +++ b/rocketpy/rocket/parachute.py @@ -24,25 +24,34 @@ class Parachute: This parameter defines the trigger condition for the parachute ejection system. It can be one of the following: - - A callable function that takes three arguments: - 1. Freestream pressure in pascals. - 2. Height in meters above ground level. - 3. The state vector of the simulation, which is defined as: - - `[x, y, z, vx, vy, vz, e0, e1, e2, e3, wx, wy, wz]`. - - 4. A list of sensors that are attached to the rocket. The most recent - measurements of the sensors are provided with the - ``sensor.measurement`` attribute. The sensors are listed in the same - order as they are added to the rocket. - - The function should return ``True`` if the parachute ejection system - should be triggered and False otherwise. The function will be called - according to the specified sampling rate. + - A callable function that can take 3, 4, or 5 arguments: + + **3 arguments**: + 1. Freestream pressure in pascals. + 2. Height in meters above ground level. + 3. The state vector: ``[x, y, z, vx, vy, vz, e0, e1, e2, e3, wx, wy, wz]`` + + **4 arguments** (sensors OR acceleration): + 1. Freestream pressure in pascals. + 2. Height in meters above ground level. + 3. The state vector. + 4. Either: + - ``sensors``: List of sensor objects attached to the rocket, OR + - ``u_dot``: State derivative including accelerations at indices [3:6] + + **5 arguments** (sensors AND acceleration): + 1. Freestream pressure in pascals. + 2. Height in meters above ground level. + 3. The state vector. + 4. ``sensors``: List of sensor objects. + 5. ``u_dot``: State derivative with accelerations ``[vx, vy, vz, ax, ay, az, ...]`` + + The function should return ``True`` to trigger deployment, ``False`` otherwise. + The function will be called according to the specified sampling rate. - A float value, representing an absolute height in meters. In this case, the parachute will be ejected when the rocket reaches this height - above ground level. + above ground level while descending. - The string "apogee" which triggers the parachute at apogee, i.e., when the rocket reaches its highest point and starts descending. @@ -51,12 +60,12 @@ class Parachute: Parachute.triggerfunc : function Trigger function created from the trigger used to evaluate the trigger condition for the parachute ejection system. It is a callable function - that takes three arguments: Freestream pressure in Pa, Height above - ground level in meters, and the state vector of the simulation. The - returns ``True`` if the parachute ejection system should be triggered + that takes five arguments: Freestream pressure in Pa, Height above + ground level in meters, the state vector, sensors list, and u_dot. + Returns ``True`` if the parachute ejection system should be triggered and ``False`` otherwise. - .. note: + .. note:: The function will be called according to the sampling rate specified. @@ -273,54 +282,95 @@ def __init_noise(self, noise): + beta * np.random.normal(noise[0], noise[1]) ) - def __evaluate_trigger_function(self, trigger): + def __evaluate_trigger_function(self, trigger): # pylint: disable=too-many-statements """This is used to set the triggerfunc attribute that will be used to interact with the Flight class. + + Notes + ----- + The resulting triggerfunc always has signature (p, h, y, sensors, u_dot) + so Flight can pass both sensors and u_dot when needed. """ # pylint: disable=unused-argument, function-redefined - # Case 1: The parachute is deployed by a custom function - if callable(trigger): - # work around for having added sensors to parachute triggers - # to avoid breaking changes - triggerfunc = trigger - sig = signature(triggerfunc) - if len(sig.parameters) == 3: + # Helper to wrap any callable to the internal (p, h, y, sensors, u_dot) API + def _make_wrapper(fn): + sig = signature(fn) + params = list(sig.parameters.keys()) - def triggerfunc(p, h, y, sensors): - return trigger(p, h, y) + # detect if user function expects acceleration-like argument + expects_udot = any( + name.lower() in ("u_dot", "udot", "acc", "acceleration") + for name in params[3:] + ) + # detect if user function expects sensors argument + expects_sensors = any(name.lower() == "sensors" for name in params[3:]) + + def wrapper(p, h, y, sensors, u_dot): + # Support 3, 4, and 5-arg user functions + num_params = len(sig.parameters) + if num_params == 3: + return fn(p, h, y) + if num_params == 4: + # Check which 4th arg to pass + fourth_param = params[3].lower() + if fourth_param in ("u_dot", "udot", "acc", "acceleration"): + return fn(p, h, y, u_dot) + else: + return fn(p, h, y, sensors) + if num_params >= 5: + # Pass both sensors and u_dot + return fn(p, h, y, sensors, u_dot) + # If function signature is not supported, raise an error + raise TypeError( + f"Trigger function '{fn.__name__}' has unsupported signature: " + f"expected 3, 4, or 5+ arguments, got {num_params}. " + "Please check the function definition." + ) + + # attach metadata so Flight can decide whether to compute u_dot + wrapper._expects_udot = expects_udot + wrapper._expects_sensors = expects_sensors + wrapper._wrapped_fn = fn + return wrapper + + # Callable provided by user + if callable(trigger): + self.triggerfunc = _make_wrapper(trigger) + return - self.triggerfunc = triggerfunc + # Numeric altitude trigger + if isinstance(trigger, (int, float)): - # Case 2: The parachute is deployed at a given height - elif isinstance(trigger, (int, float)): - # The parachute is deployed at a given height - def triggerfunc(p, h, y, sensors): + def triggerfunc(p, h, y, sensors, u_dot): # pylint: disable=unused-argument # p = pressure considering parachute noise signal # h = height above ground level considering parachute noise signal # y = [x, y, z, vx, vy, vz, e0, e1, e2, e3, w1, w2, w3] return y[5] < 0 and h < trigger + triggerfunc._expects_udot = False + triggerfunc._expects_sensors = True self.triggerfunc = triggerfunc + return - # Case 3: The parachute is deployed at apogee - elif trigger.lower() == "apogee": - # The parachute is deployed at apogee - def triggerfunc(p, h, y, sensors): - # p = pressure considering parachute noise signal - # h = height above ground level considering parachute noise signal - # y = [x, y, z, vx, vy, vz, e0, e1, e2, e3, w1, w2, w3] + # Special case: "apogee" + if isinstance(trigger, str) and trigger.lower() == "apogee": + + def triggerfunc(p, h, y, sensors, u_dot): # pylint: disable=unused-argument return y[5] < 0 + triggerfunc._expects_udot = False + triggerfunc._expects_sensors = True self.triggerfunc = triggerfunc - - # Case 4: Invalid trigger input - else: - raise ValueError( - f"Unable to set the trigger function for parachute '{self.name}'. " - + "Trigger must be a callable, a float value or the string 'apogee'. " - + "See the Parachute class documentation for more information." - ) + return + + # If we reach this point, the trigger is invalid + raise ValueError( + f"Unable to set the trigger function for parachute '{self.name}'. " + + "Trigger must be a callable, a float value or one of the strings " + + "('apogee'). " + + "See the Parachute class documentation for more information." + ) def __str__(self): """Returns a string representation of the Parachute class. diff --git a/rocketpy/simulation/flight.py b/rocketpy/simulation/flight.py index 2293d9706..f7420aaa1 100644 --- a/rocketpy/simulation/flight.py +++ b/rocketpy/simulation/flight.py @@ -587,6 +587,9 @@ def __init__( # pylint: disable=too-many-arguments,too-many-statements A custom ``scipy.integrate.OdeSolver`` can be passed as well. For more information on the integration methods, see the scipy documentation [1]_. + simulation_mode : str, optional + Simulation mode to use. Can be "6 DOF" for 6 degrees of freedom or + "3 DOF" for 3 degrees of freedom. Default is "6 DOF". Returns ------- None @@ -706,11 +709,14 @@ def __simulate(self, verbose): ) = self.__calculate_and_save_pressure_signals( parachute, node.t, self.y_sol[2] ) - if parachute.triggerfunc( + if self._evaluate_parachute_trigger( + parachute, noisy_pressure, height_above_ground_level, self.y_sol, self.sensors, + phase.derivative, + self.t, ): # Remove parachute from flight parachutes self.parachutes.remove(parachute) @@ -923,11 +929,14 @@ def __check_and_handle_parachute_triggers( ) = self.__calculate_and_save_pressure_signals( parachute, node.t, self.y_sol[2] ) - if not parachute.triggerfunc( + if not self._evaluate_parachute_trigger( + parachute, noisy_pressure, height_above_ground_level, self.y_sol, self.sensors, + phase.derivative, + node.t, ): continue # Check next parachute @@ -1334,11 +1343,14 @@ def __check_overshootable_parachute_triggers( ) # Check for parachute trigger - if not parachute.triggerfunc( + if not self._evaluate_parachute_trigger( + parachute, noisy_pressure, height_above_ground_level, overshootable_node.y_sol, self.sensors, + phase.derivative, + overshootable_node.t, ): continue # Check next parachute @@ -1439,6 +1451,51 @@ def __calculate_and_save_pressure_signals(self, parachute, t, z): return noisy_pressure, height_above_ground_level + def _evaluate_parachute_trigger( + self, parachute, pressure, height, y, sensors, derivative_func, t + ): + """Evaluate parachute trigger, passing both sensors and u_dot to wrapper. + + This helper preserves backward compatibility with existing trigger + signatures. The wrapper in Parachute always expects (p, h, y, sensors, u_dot) + and Flight computes u_dot only when the trigger requests it (optimization). + + Parameters + ---------- + parachute : Parachute + Parachute object. + pressure : float + Noisy pressure value passed to trigger. + height : float + Height above ground level passed to trigger. + y : array + State vector at evaluation time. + sensors : list + Sensors list passed to trigger. + derivative_func : callable + Function to compute derivatives: derivative_func(t, y) + t : float + Time at which to evaluate derivatives. + + Returns + ------- + bool + True if trigger condition met, False otherwise. + """ + triggerfunc = parachute.triggerfunc + + # Check wrapper metadata for expectations + expects_udot = getattr(triggerfunc, "_expects_udot", False) + + # Compute u_dot only if needed (performance optimization) + u_dot = None + if expects_udot: + u_dot = derivative_func(t, y) + + # Call the wrapper with both sensors and u_dot + # The wrapper will decide which args to pass to the user's function + return triggerfunc(pressure, height, y, sensors, u_dot) + def __init_solution_monitors(self): # Initialize solution monitors self.out_of_rail_time = 0 diff --git a/tests/unit/test_parachute_triggers.py b/tests/unit/test_parachute_triggers.py new file mode 100644 index 000000000..907b16268 --- /dev/null +++ b/tests/unit/test_parachute_triggers.py @@ -0,0 +1,110 @@ +import numpy as np + +from rocketpy.rocket.parachute import Parachute +from rocketpy.simulation.flight import Flight + + +def test_trigger_receives_u_dot(): + def derivative_func(_t, _y): + return np.array([0, 0, 0, 1.0, 2.0, 3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(_p, _h, _y, u_dot): + recorded["u_dot"] = np.array(u_dot) + return True + + parachute = Parachute( + name="test", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=10.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + assert res is True + assert "u_dot" in recorded + assert np.allclose(recorded["u_dot"][3:6], np.array([1.0, 2.0, 3.0])) + + +def test_trigger_with_u_dot_only(): + """Test trigger that only expects u_dot (no sensors).""" + + def derivative_func(_t, _y): + return np.array([0, 0, 0, -1.0, -2.0, -3.0, 0, 0, 0, 0, 0, 0, 0]) + + recorded = {} + + def user_trigger(_p, _h, _y, u_dot): + recorded["u_dot"] = np.array(u_dot) + return False + + parachute = Parachute( + name="test_u_dot_only", + cd_s=1.0, + trigger=user_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=5.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=1.234, + ) + + assert res is False + assert "u_dot" in recorded + assert np.allclose(recorded["u_dot"][3:6], np.array([-1.0, -2.0, -3.0])) + + +def test_basic_trigger_does_not_compute_u_dot(): + def derivative_func(_t, _y): + raise RuntimeError("derivative should not be called for legacy triggers") + + called = {} + + def basic_trigger(_p, _h, _y): + called["ok"] = True + return True + + parachute = Parachute( + name="basic", + cd_s=1.0, + trigger=basic_trigger, + sampling_rate=100, + ) + + dummy = type("D", (), {})() + + res = Flight._evaluate_parachute_trigger( + dummy, + parachute, + pressure=0.0, + height=0.0, + y=np.zeros(13), + sensors=[], + derivative_func=derivative_func, + t=0.0, + ) + + assert res is True + assert called.get("ok", False) is True