diff --git a/Data/README_TRACTIVE_THERMAL_MODELING.md b/Data/README_TRACTIVE_THERMAL_MODELING.md new file mode 100644 index 0000000..42dcebc --- /dev/null +++ b/Data/README_TRACTIVE_THERMAL_MODELING.md @@ -0,0 +1,32 @@ +# Run Thermal Modeling +Todo + +### Run Tests + +```bash +python -m unittest discover -s test +``` + +#### Plot the curve +```bash +python ./TractiveBatteryThermalModelViewer.py -h +usage: TractiveBatteryThermalModelViewer.py [-h] --path_parquet PATH_PARQUET [--column-name COLUMN_NAME] [--t-end T_END] [--initial-temp INITIAL_TEMP] + +View tractive battery thermal model as a function of current draw. + +options: + -h, --help show this help message and exit + --path_parquet PATH_PARQUET + Path to the Parquet file containing current data. + --column-name COLUMN_NAME + Name of the current column in the Parquet file (default: SME_TEMP_BusCurrent). + --t-end T_END End time in seconds for the simulation (default: 60). + --initial-temp INITIAL_TEMP + Initial temperature in °C (default: 22). +``` + +Example Command : + +``` +python TractiveBatteryThermalModelViewer.py --path_parquet +``` diff --git a/Data/TractiveBatteryThermalModelViewer.py b/Data/TractiveBatteryThermalModelViewer.py new file mode 100644 index 0000000..ba776f8 --- /dev/null +++ b/Data/TractiveBatteryThermalModelViewer.py @@ -0,0 +1,116 @@ +# TractiveBatteryThermalModelViewer.py + +import argparse +import matplotlib.pyplot as plt +import numpy as np +from numpy.typing import NDArray +import polars as pl + +from fs4BatteryThermalModelingEx import thermal_ode_solve_ivp + +def load_current_from_parquet(path: str, column: str) -> NDArray[np.float64]: + """ + Load a single current column from a Parquet file as a NumPy array. + """ + # df = pl.read_parquet(path, columns=[column]) + # df = pl.read_parquet(path, columns=["ACC_SEG0_TEMPS_CELL0", "ACC_SEG0_TEMPS_CELL1"]) + # # take the average of 64 Columns and create a dataframe with the average value + # avg_df = df.select( + # pl.mean_horizontal("ACC_SEG0_TEMPS_CELL0", "ACC_SEG0_TEMPS_CELL1").alias("avg") + # ) + csv_df = pl.read_csv("../Docs/Columns.csv").select("Column Name") # or columns=["c1"] to only read c1 + + filtered_df = csv_df.filter( + # pl.col("Column Name").str.contains("ACC_SEG0_TEMPS", literal=True) + pl.col("Column Name").str.contains(r"ACC_SEG\d+_TEMPS_") # \d+ = one or more + ) + + acc_seg_temps_list: list[str] = filtered_df["Column Name"].to_list() + cleaned_acc_seg_temps_list = [s.strip("'") for s in acc_seg_temps_list] + parquet_df = pl.read_parquet(path, columns= cleaned_acc_seg_temps_list) + # take the average of 64 Columns and create a dataframe with the average value + avg_df = parquet_df.select( + pl.mean_horizontal(pl.col(cleaned_acc_seg_temps_list)).alias("avg") + ) + return avg_df["avg"].to_numpy() + + +def run_thermal_model( + current_draw: NDArray[np.float64], + t_end: float, + initial_temp: float, +): + """ + Run the thermal ODE solver over [0, t_end] for the given current profile. + """ + t_span = (0.0, t_end) + t_eval = np.linspace(t_span[0], t_span[1], len(current_draw), dtype=np.float64) + return thermal_ode_solve_ivp( + current_draw=current_draw, + t_span=t_span, + initial_temp=initial_temp, + t_eval=t_eval, + ) + + +def plot_temperature(solution) -> None: + """ + Plot temperature vs time from a solve_ivp solution. + """ + t = solution.t + T = solution.y[0] + + plt.figure() + plt.plot(t, T, label="Cell temperature") + plt.xlabel("Time [s]") + plt.ylabel("Temperature [°C]") + plt.title("Tractive Battery Thermal Model") + plt.grid(True) + plt.legend() + plt.tight_layout() + plt.show() + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="View tractive battery thermal model as a function of current draw." + ) + parser.add_argument( + "--path_parquet", + required=True, + help="Path to the Parquet file containing current data.", + ) + parser.add_argument( + "--column-name", + default="SME_TEMP_BusCurrent", + help="Name of the current column in the Parquet file " + "(default: SME_TEMP_BusCurrent).", + ) + parser.add_argument( + "--t-end", + type=float, + default=60.0, + help="End time in seconds for the simulation (default: 60).", + ) + parser.add_argument( + "--initial-temp", + type=float, + default=22.0, + help="Initial temperature in °C (default: 22).", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + current = load_current_from_parquet(args.path_parquet, args.column_name) + + solution = run_thermal_model(current, args.t_end, args.initial_temp) + + if not solution.success: + raise RuntimeError(f"ODE solver failed: {solution.message}") + + plot_temperature(solution) + + +if __name__ == "__main__": + main() diff --git a/Data/fs4BatteryThermalModelingEx.py b/Data/fs4BatteryThermalModelingEx.py new file mode 100644 index 0000000..cab52a5 --- /dev/null +++ b/Data/fs4BatteryThermalModelingEx.py @@ -0,0 +1,104 @@ +import numpy as np +from numpy.typing import NDArray +from scipy.integrate import solve_ivp +from typing import Tuple + + +THERMAL_COEFFICIENT: float = 12.6316 / (49.9 * 1000.0) +""" +Effective thermal coefficient relating I² to temperature rate of change. + +Units +----- +(°C / s) / A² +""" + + +def thermal_ode( + t: float, + T: float, + current_draw: NDArray[np.float64], + total_time: float, +) -> float: + """ + Compute dT/dt for a single cell using a sampled current profile. + + The model assumes Joule heating only: + + dT/dt = THERMAL_COEFFICIENT * I(t)² + + where I(t) is obtained by indexing into ``current_draw``, which is + assumed to be uniformly sampled over the time interval + ``[0, total_time]``. + + Parameters + ---------- + t : float + Current time (seconds) at which the derivative is evaluated. + T : float + Current cell temperature (degrees Celsius). Currently unused, + but included for compatibility with ``scipy.integrate.solve_ivp``. + current_draw : NDArray[np.float64] + 1D array of current samples (amperes), uniformly spaced in time. + total_time : float + Total duration (seconds) covered by ``current_draw``. Used to map + the continuous time ``t`` to an index in ``current_draw``. + + Returns + ------- + float + Instantaneous temperature time derivative ``dT/dt`` in + degrees Celsius per second. + """ + idx = int(t * len(current_draw) / total_time) + idx = min(max(idx, 0), len(current_draw) - 1) + I_t = current_draw[idx] + return float(THERMAL_COEFFICIENT * (I_t ** 2)) + + +def thermal_ode_solve_ivp( + current_draw: NDArray[np.float64], + t_span: Tuple[float, float], + initial_temp: float, + t_eval: NDArray[np.float64] | None = None, +): + """ + Integrate the thermal ODE over a time interval using ``solve_ivp``. + + This solves + + dT/dt = THERMAL_COEFFICIENT * I(t)² + + where ``I(t)`` is obtained from the discrete array ``current_draw``, + assumed to be uniformly sampled over the interval ``t_span``. + + Parameters + ---------- + current_draw : NDArray[np.float64] + 1D array of current samples (amperes) over the driving cycle. + t_span : tuple of float + Integration interval ``(t0, tf)`` in seconds. + initial_temp : float + Initial temperature at ``t0`` in degrees Celsius. + t_eval : NDArray[np.float64], optional + 1D array of times at which to store the computed solution. + If ``None``, the solver chooses its own time steps. + + Returns + ------- + OdeResult + The object returned by ``scipy.integrate.solve_ivp``, containing + at least ``t`` (times) and ``y`` (temperatures). + """ + total_time = t_span[1] - t_span[0] + + sol = solve_ivp( + fun=lambda t, T: thermal_ode(t, T, current_draw, total_time), + t_span=t_span, + y0=[initial_temp], + t_eval=t_eval, + method="RK45", + rtol=1e-6, + atol=1e-8, + ) + return sol diff --git a/Data/test/test_IT_ThermalIntegrationParquet.py b/Data/test/test_IT_ThermalIntegrationParquet.py new file mode 100644 index 0000000..14208cf --- /dev/null +++ b/Data/test/test_IT_ThermalIntegrationParquet.py @@ -0,0 +1,58 @@ +import unittest +import pathlib +import numpy as np +import polars as pl +from numpy.testing import assert_array_almost_equal +from numpy.typing import NDArray + +from fs4BatteryThermalModelingEx import ( + THERMAL_COEFFICIENT, + thermal_ode_solve_ivp, +) + +HERE = pathlib.Path(__file__).parent +PARQUET_PATH = HERE / "testdata" / "parquet" / "08102025Endurance1_FirstHalf.parquet" +COLUMN_NAME = "SME_TEMP_BusCurrent" + + +class TestThermalIntegrationParquet(unittest.TestCase): + def setUp(self) -> None: + # read only the needed column + df = pl.read_parquet(PARQUET_PATH, columns=["SME_TEMP_BusCurrent"]) + self.current_draw: NDArray[np.float64] = df[ + "SME_TEMP_BusCurrent" + ].to_numpy() + + # basic time grid: assume data covers 0–60s + self.t_span = (0.0, 60.0) + n_points = len(self.current_draw) + self.t_eval = np.linspace( + self.t_span[0], self.t_span[1], n_points, dtype=np.float64 + ) + self.initial_temp = 25.0 + + def test_integration_runs_and_temperature_increases_on_average(self) -> None: + sol = thermal_ode_solve_ivp( + current_draw=self.current_draw, + t_span=self.t_span, + initial_temp=self.initial_temp, + t_eval=self.t_eval, + ) + + # 1) solver succeeded + self.assertTrue(sol.success, msg=sol.message) + + temps = sol.y[0] + + # 2) length matches t_eval / current array + self.assertEqual(temps.shape, self.t_eval.shape) + + # 3) on average, temperature should not drop far below initial + self.assertGreaterEqual(temps.mean(), self.initial_temp - 1.0) + + # 4) first value equals initial temperature + self.assertAlmostEqual(temps[0], self.initial_temp, places=6) + + +if __name__ == "__main__": + unittest.main() diff --git a/Data/test/test_UT_fs4BatteryThermalModeling.py b/Data/test/test_UT_fs4BatteryThermalModeling.py new file mode 100644 index 0000000..43b0e6d --- /dev/null +++ b/Data/test/test_UT_fs4BatteryThermalModeling.py @@ -0,0 +1,72 @@ +import unittest +import numpy as np +from numpy.typing import NDArray +from numpy.testing import assert_array_almost_equal + +from fs4BatteryThermalModelingEx import ( + THERMAL_COEFFICIENT, + thermal_ode_solve_ivp, +) + +class TestThermal_ode_solve_ivp(unittest.TestCase): + def setUp(self) -> None: + self.t_span = (0.0, 10.0) + self.n_points = 1000 + self.t_eval: NDArray[np.float64] = np.linspace( + self.t_span[0], self.t_span[1], self.n_points, dtype=np.float64 + ) + self.initial_temp: float = 25.0 + + def test_constant_current_matches_analytic(self) -> None: + """ + For constant I, dT/dt = k * I^2 -> T(t) = T0 + k * I^2 * t + """ + I_const = 5.0 + current_draw = np.full(self.n_points, I_const, dtype=np.float64) + + sol = thermal_ode_solve_ivp( + current_draw=current_draw, + t_span=self.t_span, + initial_temp=self.initial_temp, + t_eval=self.t_eval, + ) + + self.assertTrue(sol.success, msg=sol.message) + temps = sol.y[0] + + expected = self.initial_temp + THERMAL_COEFFICIENT * (I_const ** 2) * self.t_eval + assert_array_almost_equal(temps, expected, decimal=4) + + def test_zero_current_results_in_constant_temperature(self) -> None: + zero_current = np.zeros(self.n_points, dtype=np.float64) + + sol = thermal_ode_solve_ivp( + current_draw=zero_current, + t_span=self.t_span, + initial_temp=self.initial_temp, + t_eval=self.t_eval, + ) + + self.assertTrue(sol.success, msg=sol.message) + temps = sol.y[0] + expected = np.full_like(temps, self.initial_temp) + assert_array_almost_equal(temps, expected, decimal=8) + + def test_extreme_currents_no_nan_or_inf(self) -> None: + extreme_currents = np.linspace(0.0, 1e3, self.n_points, dtype=np.float64) + + sol = thermal_ode_solve_ivp( + current_draw=extreme_currents, + t_span=self.t_span, + initial_temp=self.initial_temp, + t_eval=self.t_eval, + ) + + self.assertTrue(sol.success, msg=sol.message) + temps = sol.y[0] + self.assertFalse(np.isnan(temps).any()) + self.assertFalse(np.isinf(temps).any()) + + +if __name__ == "__main__": + unittest.main() diff --git a/Data/test/testdata/parquet/08102025Endurance1_FirstHalf.parquet b/Data/test/testdata/parquet/08102025Endurance1_FirstHalf.parquet new file mode 100644 index 0000000..7a175e0 Binary files /dev/null and b/Data/test/testdata/parquet/08102025Endurance1_FirstHalf.parquet differ