diff --git a/tools/log-converter/log_converter.py b/tools/log-converter/log_converter.py index 50a2b7076d2bcd1ebfbb6e07e9e6613ea44fba48..099dfc022618540fdb9a05359aa3cb4d391a4184 100644 --- a/tools/log-converter/log_converter.py +++ b/tools/log-converter/log_converter.py @@ -1,14 +1,10 @@ import os +from enum import Enum from pathlib import Path import click import polars as pl -NAS_CALIBRATE_STATE = 1 -MAIN_FLYING_FMM_STATE = 10 -PAYLOAD_FLYING_FMM_STATE = 9 -SECONDS_BEFORE_FLYING = 15 - @click.command() @click.argument("path", type=click.Path(exists=True)) @@ -19,151 +15,304 @@ def main(path: Path, output: Path, origin: str): """ Convert the logs found in the given path to a format compliant with ARPIST. """ - # now walk in the directory pointed by path the files: main_Boardcore_EventData.csv, main_Boardcore_NASState.csv, main_Boardcore_ReferenceValues.csv and save their path to variables - nas_controller_status_path = None - nas_state_path = None - reference_values_path = None - fmm_status_path = None - gps_path = None - - for p, _, fn in os.walk(path): - for f in fn: - if origin == "main": + if origin not in ["main", "payload"]: + raise ValueError("Invalid origin value. Origin must be 'main' or 'payload'.") + + origin = Origin(origin) + + LogConverter( + logs_path=path, + output_dir=output, + origin=origin, + slow_rate=2, # 2 Hz + high_rate=4, # 4 Hz + ).process() + + +class Origin(Enum): + MAIN = "main" + PAYLOAD = "payload" + + +class LogConverter: + NAS_CALIBRATE_STATE = 1 + MAIN_FLYING_FMM_STATE = 10 + PAYLOAD_FLYING_FMM_STATE = 9 + SECONDS_BEFORE_FLYING = 15 + + def __init__( + self, + logs_path: Path | str, + output_dir: Path | str = Path("."), + origin: Origin | str = Origin.MAIN, + slow_rate: int = 2, + high_rate: int = 4, + ): + """Converter of logs to ARPIST format. + + Args: + logs_path (Path | str): The path to the logs. + output_dir (Path | str): The output directory. + origin (Origin | str): The origin of the logs. + slow_rate (int): The slow rate of the logs [Hz]. + high_rate (int): The high rate of the logs [Hz] + """ + # Settings + self.path = Path(logs_path) + self.output = Path(output_dir) + self.origin = Origin(origin) + self.slow_rate = slow_rate + self.high_rate = high_rate + + # Raw DataFrames + self.main_nas_controller_status = None + self.main_nas_state = None + self.main_reference_values = None + self.main_fmm_status = None + self.payload_nas_controller_status = None + self.payload_nas_state = None + self.payload_gps_data = None + self.payload_fmm_status = None + + # Intermediate DataFrames + self.nas_controller_status = None + self.fmm_status = None + + # Processed DataFrames + self.reference_values = None + self.nas_state = None + + def process(self): + # ~ Read the data ~ + self.scrape_logs() + + # - add missing columns + self.add_reference_timestamp() + + # ~ Normalize the data ~ + # - select columns + # - sort by timestamp + self.reference_values = normalize_raw_reference_values(self.reference_values) + self.nas_state = normalize_raw_nas_state(self.nas_state) + self.fmm_status = normalize_raw_fmm_status(self.fmm_status) + if self.payload_gps_data is not None: + self.payload_gps_data = normalize_raw_gps_data(self.payload_gps_data) + + # - fix missing values (timestamps for reference) by applying corrections + if self.origin == Origin.PAYLOAD: + self.correct_reference_with_gps() + + # - cut till max recorded event timestamp + max_recorded_ts = self.get_last_event_timestamp() + self.keep_up_to(max_recorded_ts) + + _, end = self.get_start_end_interval() + self.expand_reference_up_to(end) + + # - fix sample rate + self.reference_values = fix_sample_rate(self.reference_values, self.slow_rate) + self.nas_state = fix_sample_rate(self.nas_state, self.high_rate) + + # - cut from 15 seconds before flying + if self.origin == Origin.MAIN: + flying_fmm_state = self.MAIN_FLYING_FMM_STATE + else: + flying_fmm_state = self.PAYLOAD_FLYING_FMM_STATE + start_ts = self.get_event_timestamp(flying_fmm_state) - pl.duration( + seconds=self.SECONDS_BEFORE_FLYING + ) + self.keep_from(start_ts) + + # - normalize timings (in order to have the same start and end timestamps) + start, end = self.get_start_end_interval() + self.keep_from(start) + self.keep_up_to(end) + + # ~ Write the data ~ + self.output.mkdir(parents=True, exist_ok=True) + write_high_rate(self.nas_state, self.output) + write_low_rate(self.reference_values, self.output) + + def scrape_logs(self): + """Scrape the logs found in the given path.""" + for p, _, fn in os.walk(self.path): + for f in fn: if f == "main_Main_NASControllerStatus.csv": - nas_controller_status_path = os.path.join(p, f) - elif f == "main_Main_FlightModeManagerStatus.csv": - fmm_status_path = os.path.join(p, f) + self.main_nas_controller_status = pl.read_csv(os.path.join(p, f)) elif f == "main_Boardcore_NASState.csv": - nas_state_path = os.path.join(p, f) - elif origin == "payload": - if f == "payload_Payload_NASControllerStatus.csv": - nas_controller_status_path = os.path.join(p, f) - elif f == "payload_Payload_FlightModeManagerStatus.csv": - fmm_status_path = os.path.join(p, f) + self.main_nas_state = pl.read_csv(os.path.join(p, f)) + elif f == "main_Boardcore_ReferenceValues.csv": + self.main_reference_values = pl.read_csv(os.path.join(p, f)) + elif f == "main_Main_FlightModeManagerStatus.csv": + self.main_fmm_status = pl.read_csv(os.path.join(p, f)) + elif f == "payload_Payload_NASControllerStatus.csv": + self.payload_nas_controller_status = pl.read_csv(os.path.join(p, f)) elif f == "payload_Boardcore_NASState.csv": - nas_state_path = os.path.join(p, f) + self.payload_nas_state = pl.read_csv(os.path.join(p, f)) elif f == "payload_Boardcore_UBXGPSData.csv": - gps_path = os.path.join(p, f) - if f == "main_Boardcore_ReferenceValues.csv": - reference_values_path = os.path.join(p, f) - - if nas_controller_status_path is None: - raise FileNotFoundError("NAS controller status file not found") - if nas_state_path is None: - raise FileNotFoundError("NAS state file not found") - if reference_values_path is None: - raise FileNotFoundError("Reference values file not found") - if fmm_status_path is None: - raise FileNotFoundError("FMM status file not found") - if origin == "payload" and gps_path is None: - raise FileNotFoundError("GPS data file not found") - - nas_controller_status = pl.read_csv(nas_controller_status_path) - nas_state = pl.read_csv(nas_state_path) - reference_values = pl.read_csv(reference_values_path) - fmm_status = pl.read_csv(fmm_status_path) - - # sort by timestamp and extract the timestamp associated to the calibrate event and topic - nas_controller_status = nas_controller_status.sort("timestamp") - calibrate_tms = nas_controller_status.filter( - pl.col("state") == NAS_CALIBRATE_STATE - ).select("timestamp") - - # add the calibrate timestamp to the reference_values as a new column - reference_values = reference_values.with_columns(calibrate_tms.select("timestamp")) - - # select cols - reference_values = reference_values.select( - pl.from_epoch(pl.col("timestamp"), time_unit="us"), - pl.col("refLatitude").alias("latitude"), - pl.col("refLongitude").alias("longitude"), - pl.col("refAltitude").alias("altitude"), - ) - nas_state = nas_state.select( - pl.from_epoch(pl.col("timestamp"), time_unit="us"), - "n", - "e", - "d", - "vn", - "ve", - "vd", - ) - fmm_status = fmm_status.select( - pl.from_epoch(pl.col("timestamp"), time_unit="us"), "state" - ) + self.payload_gps_data = pl.read_csv( + os.path.join(p, f), infer_schema_length=10000 + ) + elif f == "payload_Payload_FlightModeManagerStatus.csv": + self.payload_fmm_status = pl.read_csv(os.path.join(p, f)) - # find stop timestamp based on last state of the fmm plus 10 seconds - stop_ts = fmm_status.select(pl.col("timestamp") + pl.duration(seconds=2))[-1, 0] + self.reference_values = self.main_reference_values + if self.origin == Origin.MAIN: + self.nas_controller_status = self.main_nas_controller_status + self.nas_state = self.main_nas_state + self.fmm_status = self.main_fmm_status + elif self.origin == Origin.PAYLOAD: + self.nas_controller_status = self.payload_nas_controller_status + self.nas_state = self.payload_nas_state + self.fmm_status = self.payload_fmm_status - # filter the reference values and nas state dataframes - reference_values = reference_values.filter(pl.col("timestamp") <= stop_ts) - nas_state = nas_state.filter(pl.col("timestamp") <= stop_ts) + if self.nas_controller_status is None: + raise FileNotFoundError("NAS controller status file not found") + if self.nas_state is None: + raise FileNotFoundError("NAS state file not found") + if self.reference_values is None: + raise FileNotFoundError("Reference values file not found") + if self.fmm_status is None: + raise FileNotFoundError("FMM status file not found") + if self.origin == Origin.PAYLOAD and self.payload_gps_data is None: + raise FileNotFoundError( + "GPS data file not found, required for payload origin" + ) + + def keep_from(self, timestamp: int): + """Cut the dataframes from the given timestamp.""" + self.reference_values = self.reference_values.filter( + pl.col("timestamp") >= timestamp + ) + self.nas_state = self.nas_state.filter(pl.col("timestamp") >= timestamp) - # apply correction to the reference values if payload origin - if origin == "payload": - gps_data = pl.read_csv(gps_path, infer_schema_length=10000) - gps_data = gps_data.select( - pl.from_epoch(pl.col("gpsTimestamp"), time_unit="us").alias("timestamp"), - "latitude", - "longitude", + def keep_up_to(self, timestamp: int): + """Cut the dataframes up to the given timestamp.""" + self.reference_values = self.reference_values.filter( + pl.col("timestamp") <= timestamp ) + self.nas_state = self.nas_state.filter(pl.col("timestamp") <= timestamp) + + def add_reference_timestamp(self): + """Add the calibrate timestamp to the reference values as a new column.""" + self.nas_controller_status = self.nas_controller_status.sort("timestamp") + calibrate_tms = self.nas_controller_status.filter( + pl.col("state") == self.NAS_CALIBRATE_STATE + ).select("timestamp") + self.reference_values = self.reference_values.with_columns(calibrate_tms) + + def correct_reference_with_gps(self): + """Correct the reference values with the GPS data.""" + if self.payload_gps_data is None: + raise ValueError( + "GPS data file not found, required for GPS correction of reference values" + ) new_reference_values = [] - for row in reference_values.rows(): + for row in self.reference_values.rows(): timestamp = row[0] (lat, lon) = ( - gps_data.filter(pl.col("timestamp") < timestamp) + self.payload_gps_data.filter(pl.col("timestamp") < timestamp) .tail(1) .to_numpy()[0, 1:] ) new_reference_values.append( [timestamp, lat, lon, row[3] + 1.5] # altitude is corrected ) - reference_values = pl.DataFrame( - new_reference_values, schema=reference_values.schema, orient="row" + self.reference_values = pl.DataFrame( + new_reference_values, schema=self.reference_values.schema, orient="row" ) - # find max timestamp - max_ts = max( - reference_values.select("timestamp").max().item(0, 0), - (nas_state.select("timestamp").max().item(0, 0)), - ) + def get_last_event_timestamp(self): + """Get the last timestamp of the given event.""" + return self.fmm_status.select("timestamp")[-1, 0] - # upsample and downsample the dataframes - last_row = reference_values.tail(1) - last_row[0, "timestamp"] = max_ts - reference_values = pl.concat([reference_values, last_row], how="vertical") - reference_values = ( - reference_values.group_by_dynamic(pl.col("timestamp"), every="500ms") - .agg(pl.all().last()) - .upsample(time_column="timestamp", every="500ms") - .fill_null(strategy="forward") + def get_event_timestamp(self, event: int): + """Get the timestamp of the given event.""" + return self.fmm_status.filter(pl.col("state") == event).select("timestamp")[ + 0, 0 + ] + + def get_start_end_interval(self): + """Get the start and end from high rate.""" + return ( + self.nas_state.select("timestamp").min().item(0, 0), + self.nas_state.select("timestamp").max().item(0, 0), + ) + + def expand_reference_up_to(self, timestamp: int): + """Expand the reference values up to the given timestamp.""" + last_row = self.reference_values.tail(1) + last_row[0, "timestamp"] = timestamp + self.reference_values = pl.concat( + [self.reference_values, last_row], how="vertical" + ) + + +def normalize_raw_reference_values(df: pl.DataFrame): + """Normalize the reference values DataFrame.""" + return df.select( + pl.from_epoch(pl.col("timestamp"), time_unit="us"), + pl.col("refLatitude").alias("latitude"), + pl.col("refLongitude").alias("longitude"), + pl.col("refAltitude").alias("altitude"), + ).sort("timestamp") + + +def normalize_raw_nas_state(df: pl.DataFrame): + """Normalize the NAS state DataFrame.""" + return df.select( + pl.from_epoch(pl.col("timestamp"), time_unit="us"), + "n", + "e", + "d", + "vn", + "ve", + "vd", + ).sort("timestamp") + + +def normalize_raw_fmm_status(df: pl.DataFrame): + """Normalize the FMM status DataFrame.""" + return df.select(pl.from_epoch(pl.col("timestamp"), time_unit="us"), "state").sort( + "timestamp" ) - nas_state = ( - nas_state.group_by_dynamic(pl.col("timestamp"), every="250ms") + + +def normalize_raw_gps_data(df: pl.DataFrame): + """Normalize the GPS data DataFrame.""" + return df.select( + pl.from_epoch(pl.col("gpsTimestamp"), time_unit="us").alias("timestamp"), + "latitude", + "longitude", + ).sort("timestamp") + + +def fix_sample_rate(df: pl.DataFrame, rate: int): + """Change the sample rate of the given DataFrame based on the given rate [Hz].""" + if rate < 1 or rate > 1000: + raise ValueError("Invalid rate value. Rate must be between 1 and 1000 Hz.") + period = f"{1000 // rate}ms" + return ( + df.group_by_dynamic(pl.col("timestamp"), every=period) .agg(pl.all().last()) - .upsample(time_column="timestamp", every="250ms") + .upsample(time_column="timestamp", every=period) .fill_null(strategy="forward") ) - # filter from 15 seconds before flying - flying_fmm_state = ( - MAIN_FLYING_FMM_STATE if origin == "main" else PAYLOAD_FLYING_FMM_STATE - ) - start_ts = fmm_status.filter(pl.col("state") == flying_fmm_state).select( - "timestamp" - )[0, 0] - pl.duration(seconds=SECONDS_BEFORE_FLYING) - reference_values = reference_values.filter(pl.col("timestamp") >= start_ts) - nas_state = nas_state.filter(pl.col("timestamp") >= start_ts) - # save the dataframes to csv - output = Path(output) - reference_values.select( +def write_low_rate(df: pl.DataFrame, output: Path): + """Write the low rate data to a CSV file.""" + df.select( pl.col("timestamp").dt.timestamp(time_unit="us"), "latitude", "longitude", "altitude", ).write_csv(output / "low_rate.csv") - nas_state.select( + + +def write_high_rate(df: pl.DataFrame, output: Path): + """Write the high rate data to a CSV file.""" + df.select( pl.col("timestamp").dt.timestamp(time_unit="us"), "n", "e",