diff --git a/tools/log-converter/log_converter.py b/tools/log-converter/log_converter.py index 099dfc022618540fdb9a05359aa3cb4d391a4184..50a2b7076d2bcd1ebfbb6e07e9e6613ea44fba48 100644 --- a/tools/log-converter/log_converter.py +++ b/tools/log-converter/log_converter.py @@ -1,10 +1,14 @@ import os -from enum import Enum from pathlib import Path import click import polars as pl +NAS_CALIBRATE_STATE = 1 +MAIN_FLYING_FMM_STATE = 10 +PAYLOAD_FLYING_FMM_STATE = 9 +SECONDS_BEFORE_FLYING = 15 + @click.command() @click.argument("path", type=click.Path(exists=True)) @@ -15,304 +19,151 @@ def main(path: Path, output: Path, origin: str): """ Convert the logs found in the given path to a format compliant with ARPIST. """ - if origin not in ["main", "payload"]: - raise ValueError("Invalid origin value. Origin must be 'main' or 'payload'.") - - origin = Origin(origin) - - LogConverter( - logs_path=path, - output_dir=output, - origin=origin, - slow_rate=2, # 2 Hz - high_rate=4, # 4 Hz - ).process() - - -class Origin(Enum): - MAIN = "main" - PAYLOAD = "payload" - - -class LogConverter: - NAS_CALIBRATE_STATE = 1 - MAIN_FLYING_FMM_STATE = 10 - PAYLOAD_FLYING_FMM_STATE = 9 - SECONDS_BEFORE_FLYING = 15 - - def __init__( - self, - logs_path: Path | str, - output_dir: Path | str = Path("."), - origin: Origin | str = Origin.MAIN, - slow_rate: int = 2, - high_rate: int = 4, - ): - """Converter of logs to ARPIST format. - - Args: - logs_path (Path | str): The path to the logs. - output_dir (Path | str): The output directory. - origin (Origin | str): The origin of the logs. - slow_rate (int): The slow rate of the logs [Hz]. - high_rate (int): The high rate of the logs [Hz] - """ - # Settings - self.path = Path(logs_path) - self.output = Path(output_dir) - self.origin = Origin(origin) - self.slow_rate = slow_rate - self.high_rate = high_rate - - # Raw DataFrames - self.main_nas_controller_status = None - self.main_nas_state = None - self.main_reference_values = None - self.main_fmm_status = None - self.payload_nas_controller_status = None - self.payload_nas_state = None - self.payload_gps_data = None - self.payload_fmm_status = None - - # Intermediate DataFrames - self.nas_controller_status = None - self.fmm_status = None - - # Processed DataFrames - self.reference_values = None - self.nas_state = None - - def process(self): - # ~ Read the data ~ - self.scrape_logs() - - # - add missing columns - self.add_reference_timestamp() - - # ~ Normalize the data ~ - # - select columns - # - sort by timestamp - self.reference_values = normalize_raw_reference_values(self.reference_values) - self.nas_state = normalize_raw_nas_state(self.nas_state) - self.fmm_status = normalize_raw_fmm_status(self.fmm_status) - if self.payload_gps_data is not None: - self.payload_gps_data = normalize_raw_gps_data(self.payload_gps_data) - - # - fix missing values (timestamps for reference) by applying corrections - if self.origin == Origin.PAYLOAD: - self.correct_reference_with_gps() - - # - cut till max recorded event timestamp - max_recorded_ts = self.get_last_event_timestamp() - self.keep_up_to(max_recorded_ts) - - _, end = self.get_start_end_interval() - self.expand_reference_up_to(end) - - # - fix sample rate - self.reference_values = fix_sample_rate(self.reference_values, self.slow_rate) - self.nas_state = fix_sample_rate(self.nas_state, self.high_rate) - - # - cut from 15 seconds before flying - if self.origin == Origin.MAIN: - flying_fmm_state = self.MAIN_FLYING_FMM_STATE - else: - flying_fmm_state = self.PAYLOAD_FLYING_FMM_STATE - start_ts = self.get_event_timestamp(flying_fmm_state) - pl.duration( - seconds=self.SECONDS_BEFORE_FLYING - ) - self.keep_from(start_ts) - - # - normalize timings (in order to have the same start and end timestamps) - start, end = self.get_start_end_interval() - self.keep_from(start) - self.keep_up_to(end) - - # ~ Write the data ~ - self.output.mkdir(parents=True, exist_ok=True) - write_high_rate(self.nas_state, self.output) - write_low_rate(self.reference_values, self.output) - - def scrape_logs(self): - """Scrape the logs found in the given path.""" - for p, _, fn in os.walk(self.path): - for f in fn: + # now walk in the directory pointed by path the files: main_Boardcore_EventData.csv, main_Boardcore_NASState.csv, main_Boardcore_ReferenceValues.csv and save their path to variables + nas_controller_status_path = None + nas_state_path = None + reference_values_path = None + fmm_status_path = None + gps_path = None + + for p, _, fn in os.walk(path): + for f in fn: + if origin == "main": if f == "main_Main_NASControllerStatus.csv": - self.main_nas_controller_status = pl.read_csv(os.path.join(p, f)) - elif f == "main_Boardcore_NASState.csv": - self.main_nas_state = pl.read_csv(os.path.join(p, f)) - elif f == "main_Boardcore_ReferenceValues.csv": - self.main_reference_values = pl.read_csv(os.path.join(p, f)) + nas_controller_status_path = os.path.join(p, f) elif f == "main_Main_FlightModeManagerStatus.csv": - self.main_fmm_status = pl.read_csv(os.path.join(p, f)) - elif f == "payload_Payload_NASControllerStatus.csv": - self.payload_nas_controller_status = pl.read_csv(os.path.join(p, f)) + fmm_status_path = os.path.join(p, f) + elif f == "main_Boardcore_NASState.csv": + nas_state_path = os.path.join(p, f) + elif origin == "payload": + if f == "payload_Payload_NASControllerStatus.csv": + nas_controller_status_path = os.path.join(p, f) + elif f == "payload_Payload_FlightModeManagerStatus.csv": + fmm_status_path = os.path.join(p, f) elif f == "payload_Boardcore_NASState.csv": - self.payload_nas_state = pl.read_csv(os.path.join(p, f)) + nas_state_path = os.path.join(p, f) elif f == "payload_Boardcore_UBXGPSData.csv": - self.payload_gps_data = pl.read_csv( - os.path.join(p, f), infer_schema_length=10000 - ) - elif f == "payload_Payload_FlightModeManagerStatus.csv": - self.payload_fmm_status = pl.read_csv(os.path.join(p, f)) - - self.reference_values = self.main_reference_values - if self.origin == Origin.MAIN: - self.nas_controller_status = self.main_nas_controller_status - self.nas_state = self.main_nas_state - self.fmm_status = self.main_fmm_status - elif self.origin == Origin.PAYLOAD: - self.nas_controller_status = self.payload_nas_controller_status - self.nas_state = self.payload_nas_state - self.fmm_status = self.payload_fmm_status + gps_path = os.path.join(p, f) + if f == "main_Boardcore_ReferenceValues.csv": + reference_values_path = os.path.join(p, f) + + if nas_controller_status_path is None: + raise FileNotFoundError("NAS controller status file not found") + if nas_state_path is None: + raise FileNotFoundError("NAS state file not found") + if reference_values_path is None: + raise FileNotFoundError("Reference values file not found") + if fmm_status_path is None: + raise FileNotFoundError("FMM status file not found") + if origin == "payload" and gps_path is None: + raise FileNotFoundError("GPS data file not found") + + nas_controller_status = pl.read_csv(nas_controller_status_path) + nas_state = pl.read_csv(nas_state_path) + reference_values = pl.read_csv(reference_values_path) + fmm_status = pl.read_csv(fmm_status_path) + + # sort by timestamp and extract the timestamp associated to the calibrate event and topic + nas_controller_status = nas_controller_status.sort("timestamp") + calibrate_tms = nas_controller_status.filter( + pl.col("state") == NAS_CALIBRATE_STATE + ).select("timestamp") + + # add the calibrate timestamp to the reference_values as a new column + reference_values = reference_values.with_columns(calibrate_tms.select("timestamp")) + + # select cols + reference_values = reference_values.select( + pl.from_epoch(pl.col("timestamp"), time_unit="us"), + pl.col("refLatitude").alias("latitude"), + pl.col("refLongitude").alias("longitude"), + pl.col("refAltitude").alias("altitude"), + ) + nas_state = nas_state.select( + pl.from_epoch(pl.col("timestamp"), time_unit="us"), + "n", + "e", + "d", + "vn", + "ve", + "vd", + ) + fmm_status = fmm_status.select( + pl.from_epoch(pl.col("timestamp"), time_unit="us"), "state" + ) - if self.nas_controller_status is None: - raise FileNotFoundError("NAS controller status file not found") - if self.nas_state is None: - raise FileNotFoundError("NAS state file not found") - if self.reference_values is None: - raise FileNotFoundError("Reference values file not found") - if self.fmm_status is None: - raise FileNotFoundError("FMM status file not found") - if self.origin == Origin.PAYLOAD and self.payload_gps_data is None: - raise FileNotFoundError( - "GPS data file not found, required for payload origin" - ) + # find stop timestamp based on last state of the fmm plus 10 seconds + stop_ts = fmm_status.select(pl.col("timestamp") + pl.duration(seconds=2))[-1, 0] - def keep_from(self, timestamp: int): - """Cut the dataframes from the given timestamp.""" - self.reference_values = self.reference_values.filter( - pl.col("timestamp") >= timestamp - ) - self.nas_state = self.nas_state.filter(pl.col("timestamp") >= timestamp) + # filter the reference values and nas state dataframes + reference_values = reference_values.filter(pl.col("timestamp") <= stop_ts) + nas_state = nas_state.filter(pl.col("timestamp") <= stop_ts) - def keep_up_to(self, timestamp: int): - """Cut the dataframes up to the given timestamp.""" - self.reference_values = self.reference_values.filter( - pl.col("timestamp") <= timestamp + # apply correction to the reference values if payload origin + if origin == "payload": + gps_data = pl.read_csv(gps_path, infer_schema_length=10000) + gps_data = gps_data.select( + pl.from_epoch(pl.col("gpsTimestamp"), time_unit="us").alias("timestamp"), + "latitude", + "longitude", ) - self.nas_state = self.nas_state.filter(pl.col("timestamp") <= timestamp) - - def add_reference_timestamp(self): - """Add the calibrate timestamp to the reference values as a new column.""" - self.nas_controller_status = self.nas_controller_status.sort("timestamp") - calibrate_tms = self.nas_controller_status.filter( - pl.col("state") == self.NAS_CALIBRATE_STATE - ).select("timestamp") - self.reference_values = self.reference_values.with_columns(calibrate_tms) - - def correct_reference_with_gps(self): - """Correct the reference values with the GPS data.""" - if self.payload_gps_data is None: - raise ValueError( - "GPS data file not found, required for GPS correction of reference values" - ) new_reference_values = [] - for row in self.reference_values.rows(): + for row in reference_values.rows(): timestamp = row[0] (lat, lon) = ( - self.payload_gps_data.filter(pl.col("timestamp") < timestamp) + gps_data.filter(pl.col("timestamp") < timestamp) .tail(1) .to_numpy()[0, 1:] ) new_reference_values.append( [timestamp, lat, lon, row[3] + 1.5] # altitude is corrected ) - self.reference_values = pl.DataFrame( - new_reference_values, schema=self.reference_values.schema, orient="row" + reference_values = pl.DataFrame( + new_reference_values, schema=reference_values.schema, orient="row" ) - def get_last_event_timestamp(self): - """Get the last timestamp of the given event.""" - return self.fmm_status.select("timestamp")[-1, 0] - - def get_event_timestamp(self, event: int): - """Get the timestamp of the given event.""" - return self.fmm_status.filter(pl.col("state") == event).select("timestamp")[ - 0, 0 - ] - - def get_start_end_interval(self): - """Get the start and end from high rate.""" - return ( - self.nas_state.select("timestamp").min().item(0, 0), - self.nas_state.select("timestamp").max().item(0, 0), - ) - - def expand_reference_up_to(self, timestamp: int): - """Expand the reference values up to the given timestamp.""" - last_row = self.reference_values.tail(1) - last_row[0, "timestamp"] = timestamp - self.reference_values = pl.concat( - [self.reference_values, last_row], how="vertical" - ) - - -def normalize_raw_reference_values(df: pl.DataFrame): - """Normalize the reference values DataFrame.""" - return df.select( - pl.from_epoch(pl.col("timestamp"), time_unit="us"), - pl.col("refLatitude").alias("latitude"), - pl.col("refLongitude").alias("longitude"), - pl.col("refAltitude").alias("altitude"), - ).sort("timestamp") - - -def normalize_raw_nas_state(df: pl.DataFrame): - """Normalize the NAS state DataFrame.""" - return df.select( - pl.from_epoch(pl.col("timestamp"), time_unit="us"), - "n", - "e", - "d", - "vn", - "ve", - "vd", - ).sort("timestamp") - - -def normalize_raw_fmm_status(df: pl.DataFrame): - """Normalize the FMM status DataFrame.""" - return df.select(pl.from_epoch(pl.col("timestamp"), time_unit="us"), "state").sort( - "timestamp" + # find max timestamp + max_ts = max( + reference_values.select("timestamp").max().item(0, 0), + (nas_state.select("timestamp").max().item(0, 0)), ) - -def normalize_raw_gps_data(df: pl.DataFrame): - """Normalize the GPS data DataFrame.""" - return df.select( - pl.from_epoch(pl.col("gpsTimestamp"), time_unit="us").alias("timestamp"), - "latitude", - "longitude", - ).sort("timestamp") - - -def fix_sample_rate(df: pl.DataFrame, rate: int): - """Change the sample rate of the given DataFrame based on the given rate [Hz].""" - if rate < 1 or rate > 1000: - raise ValueError("Invalid rate value. Rate must be between 1 and 1000 Hz.") - period = f"{1000 // rate}ms" - return ( - df.group_by_dynamic(pl.col("timestamp"), every=period) + # upsample and downsample the dataframes + last_row = reference_values.tail(1) + last_row[0, "timestamp"] = max_ts + reference_values = pl.concat([reference_values, last_row], how="vertical") + reference_values = ( + reference_values.group_by_dynamic(pl.col("timestamp"), every="500ms") + .agg(pl.all().last()) + .upsample(time_column="timestamp", every="500ms") + .fill_null(strategy="forward") + ) + nas_state = ( + nas_state.group_by_dynamic(pl.col("timestamp"), every="250ms") .agg(pl.all().last()) - .upsample(time_column="timestamp", every=period) + .upsample(time_column="timestamp", every="250ms") .fill_null(strategy="forward") ) + # filter from 15 seconds before flying + flying_fmm_state = ( + MAIN_FLYING_FMM_STATE if origin == "main" else PAYLOAD_FLYING_FMM_STATE + ) + start_ts = fmm_status.filter(pl.col("state") == flying_fmm_state).select( + "timestamp" + )[0, 0] - pl.duration(seconds=SECONDS_BEFORE_FLYING) + reference_values = reference_values.filter(pl.col("timestamp") >= start_ts) + nas_state = nas_state.filter(pl.col("timestamp") >= start_ts) -def write_low_rate(df: pl.DataFrame, output: Path): - """Write the low rate data to a CSV file.""" - df.select( + # save the dataframes to csv + output = Path(output) + reference_values.select( pl.col("timestamp").dt.timestamp(time_unit="us"), "latitude", "longitude", "altitude", ).write_csv(output / "low_rate.csv") - - -def write_high_rate(df: pl.DataFrame, output: Path): - """Write the high rate data to a CSV file.""" - df.select( + nas_state.select( pl.col("timestamp").dt.timestamp(time_unit="us"), "n", "e",