diff --git a/.github/workflows/release_build.yml b/.github/workflows/release_build.yml index 375a89df..09880be2 100644 --- a/.github/workflows/release_build.yml +++ b/.github/workflows/release_build.yml @@ -318,6 +318,7 @@ jobs: gzip fusion-engine-client-docs.tar - name: Upload Artifact + if: ${{ !env.ACT }} uses: actions/upload-artifact@v4 with: name: fusion-engine-client-docs.tar.gz @@ -327,6 +328,9 @@ jobs: test_python: name: Python Unit Tests runs-on: ubuntu-latest + strategy: + matrix: + python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13'] defaults: run: working-directory: python @@ -334,29 +338,41 @@ jobs: - uses: actions/checkout@v2 - name: Setup Python - uses: actions/setup-python@v2 + if: ${{ !env.ACT }} + uses: actions/setup-python@v5 with: - python-version: '3.x' + python-version: ${{ matrix.python-version }} + cache: 'pip' + cache-dependency-path: | + python/requirements.txt + python/requirements-dev.txt + + - name: Setup Python (local) + if: ${{ env.ACT }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} - name: Install Python Requirements run: | - pip install -r requirements.txt - pip install pytest + pip install -r requirements.txt -r requirements-dev.txt - name: Run Unit Tests run: | python -m pytest - name: Build A Python Distribution + if: ${{ !env.ACT }} run: | pip install build twine python -m build twine check dist/* - name: Upload Artifact + if: ${{ !env.ACT }} uses: actions/upload-artifact@v4 with: - name: python_dist + name: python_dist-${{ matrix.python-version }} path: python/dist # Check autogenerated signal definitions. @@ -370,7 +386,7 @@ jobs: - uses: actions/checkout@v2 - name: Setup Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: '3.x' diff --git a/.gitignore b/.gitignore index facb7ca5..c1bb3eff 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,9 @@ venv*/ .vscode *.code-workspace +# AI settings. +.claude/ + # P1 binary files. *.p1bin *.p1log diff --git a/python/README.md b/python/README.md index 7d190f7e..a51211e5 100644 --- a/python/README.md +++ b/python/README.md @@ -18,23 +18,24 @@ FusionEngine message specification. ### Requirements -- Python 3.6 or later +- Python 3.8 or later ### Applications - [p1_capture](fusion_engine_client/applications/p1_capture.py) - Connect to a FusionEngine device in real time over - serial, TCP, UDP, or UNIX domain socket, and display incoming FusionEngine contents and/or log the incoming data to - disk + serial, TCP, UDP, WebSocket, or UNIX domain socket, or read from a file or log on disk. Display a summary of the + FusionEngine messages in the data stream or their contents, and optionally logs data to disk. + - Messages can be filtered by type or time. + - Supports stdin/stdout for inline data filtering via pipe. + - Supports extracting sensor or RTK corrections data from diagnostic FusionEngine `InputDataWrapper` messages. - [p1_display](fusion_engine_client/applications/p1_display.py) - Generate plots of vehicle trajectory, GNSS signal - status, wheel speed measurements, etc. from a file of logged FusionEngine messages + status, wheel speed measurements, etc. from a file of logged FusionEngine messages. - [p1_extract](fusion_engine_client/applications/p1_extract.py) - Extract FusionEngine messages from a binary file - containing multiple data streams (e.g., interleaved RTCM and FusionEngine messages) -- [p1_filter](fusion_engine_client/applications/p1_filter.py) - Filter an incoming FusionEngine data stream, outputting - a new FusionEngine stream containing only the requested messages + containing multiple data streams (e.g., interleaved RTCM and FusionEngine messages). +- [p1_filter](fusion_engine_client/applications/p1_filter.py) - Legacy alias for `p1_capture` - [p1_lband_extract](fusion_engine_client/applications/p1_lband_extract.py) - Extract L-band data bits contained from a log of FusionEngine `LBandFrameMessage` messages -- [p1_print](fusion_engine_client/applications/p1_print.py) - Print the contents of FusionEngine messages found in a - binary file to the console +- [p1_print](fusion_engine_client/applications/p1_print.py) - Legacy alias for `p1_capture`. ### Directory Structure - `python/` - Python source files @@ -100,7 +101,7 @@ FusionEngine message specification. #### Install From PyPI -1. Install Python (3.6 or later) and pip. +1. Install Python (3.8 or later) and pip. 2. Install the `fusione-engine-client` module, including all analysis and data processing tools: ```bash python3 -m pip install fusion-engine-client @@ -121,7 +122,7 @@ FusionEngine message specification. #### Install From Source (Use In Another Python Project) -1. Install Python (3.6 or later) and pip. +1. Install Python (3.8 or later) and pip. 2. Clone a copy of this repository: ```bash git clone https://github.com/PointOneNav/fusion-engine-client.git @@ -149,7 +150,7 @@ FusionEngine message specification. #### Install From Source (Development) -1. Install Python (3.6 or later) and pip. +1. Install Python (3.8 or later) and pip. 2. Clone a copy of this repository: ```bash git clone https://github.com/PointOneNav/fusion-engine-client.git diff --git a/python/fusion_engine_client/applications/p1_capture.py b/python/fusion_engine_client/applications/p1_capture.py index 032216ef..00eeeb6a 100755 --- a/python/fusion_engine_client/applications/p1_capture.py +++ b/python/fusion_engine_client/applications/p1_capture.py @@ -5,7 +5,7 @@ import os import select import sys -from typing import Optional +from typing import Optional, Union import colorama @@ -13,64 +13,776 @@ from import_utils import enable_relative_imports __package__ = enable_relative_imports(__name__, __file__) -from ..messages import MessagePayload, message_type_by_name -from ..parsers import FusionEngineDecoder +from ..messages import InputDataType, MessageHeader, MessagePayload, MessageType, message_type_by_name +from ..parsers import FusionEngineDecoder, MixedLogReader from ..utils import trace as logging -from ..utils.argument_parser import ArgumentParser, ExtendedBooleanAction +from ..utils.argument_parser import ArgumentParser, CSVAction, ExtendedBooleanAction +from ..utils.log import define_cli_arguments as define_log_search_arguments, is_possible_log_pattern, locate_log from ..utils.print_utils import \ - DeviceSummary, add_print_format_argument, print_message, print_summary_table + DeviceSummary, add_print_format_argument, add_wrapped_data_mode_argument, print_message, print_summary_table from ..utils.socket_timestamping import (enable_socket_timestamping, HW_TIMESTAMPING_HELP, log_timestamped_data_offset, recv, TIMESTAMP_FILE_ENDING,) from ..utils.transport_utils import * +from ..utils.time_range import TimeRange from ..utils.trace import HighlightFormatter, BrokenPipeStreamHandler _logger = logging.getLogger('point_one.fusion_engine.applications.p1_capture') -def main(): +class Application: + def __init__(self, options, logging_stream=None): + self.options = options + self.logging_stream = logging_stream + + # Determine what to display. + self.show_summary_live = self.options.display == 'summary' + self.show_summary = self.options.display in ('summary', 'messages+summary') + self.show_status = self.options.display == 'status' + self.show_message_contents = self.options.display in ('messages', 'messages+summary') + self.quiet = self.options.display == 'none' + + # Message filtering. + self.message_types = set() + self.input_data_types = set() + self.wrapped_data_format = self.options.wrapped_data_format + self.include_input_data_wrapper = False + self.source_ids = set() + self.time_range = None + + # Input. + self.input_transport = None + self.log_id = None + self.log_reader = None + + self.read_timeout_sec = None + self.read_size_bytes = None + + # Output. + self.output_transport = None + self.timestamp_file = None + self.generating_raw_log = False + self.generating_p1log = False + self.generating_csv = False + + # Status/incoming data summary. + self.bytes_received = 0 + self.fe_bytes_received = 0 + self.messages_received = 0 + self.skipped_messages = 0 + self.bytes_sent = 0 + self.messages_sent = 0 + self.device_summary = DeviceSummary() + + self.first_p1_time_sec = None + self.last_p1_time_sec = None + + self.first_system_time_sec = None + self.last_system_time_sec = None + + self.start_time = None + self.last_print_time = None + self.print_timeout_sec = 1.0 if self.show_summary_live else 5.0 + + # Configure everything. + self._init_message_type_filter() + self._init_input_data_type_filter() + self._init_source_id_filter() + self._init_time_range_filter() + self._configure_input() + self._configure_output() + self._set_read_timeout() + + # If we're reading from a file (and not stdin), just display the summary at the end and don't clear the + # terminal. + if isinstance(self.input_transport, FileTransport) and not self.input_transport.is_stdin: + self.show_summary_live = False + + # Create the FusionEngine decoder after configuring, in case we need to change show_summary_live, etc. + self.decoder = FusionEngineDecoder(warn_on_unrecognized=False, return_bytes=True, return_offset=True) + + def _init_message_type_filter(self) -> None: + # If the user specified a set of message names, lookup their type values. Below, we will limit the printout to + # only those message types. + if self.options.unwrap: + if self.options.message_type is not None: + _logger.error('Error: You cannot specify both --unwrap and --message-type.') + sys.exit(1) + + self.message_types = {MessageType.INPUT_DATA_WRAPPER} + elif self.options.message_type is not None: + # Pattern match to any of: + # -m Type1 + # -m Type1 -m Type2 + # -m Type1,Type2 + # -m Type1,Type2 -m Type3 + # -m Type* + try: + self.message_types = MessagePayload.find_matching_message_types(self.options.message_type) + if len(self.message_types) == 0: + # find_matching_message_types() will print an error. + sys.exit(1) + except ValueError as e: + _logger.error(str(e)) + _logger.debug('', exc_info=e) + sys.exit(1) + + # If the user is filtering to specific FusionEngine messages and did not include InputDataWrapper messages, if + # they want to display FusionEngine contents _within_ InputDataWrapper messages too (e.g., + # --wrapped-data-format=content), we'll include the wrapper messages manually. + if (len(self.message_types) != 0 and MessageType.INPUT_DATA_WRAPPER not in self.message_types and + self.wrapped_data_format != 'parent'): + self.include_input_data_wrapper = True + if self.wrapped_data_format == 'auto': + self.wrapped_data_format = 'content' + + def _init_input_data_type_filter(self) -> None: + if self.options.wrapped_data_type is not None and self.options.unwrap is not None: + _logger.error('Error: You cannot specify both --unwrap and --wrapped-data-type.') + sys.exit(1) + + # For InputDataWrapper messages, if the user specified desired data types, limit the output to only those. + if self.options.wrapped_data_type is not None or self.options.unwrap is not None: + try: + if self.options.wrapped_data_type is not None: + type_str = self.options.wrapped_data_type + else: + type_str = self.options.unwrap + self.input_data_types = InputDataType.find_matching_values(type_str, prefix='M_TYPE_', + allow_multiple=not self.options.unwrap, + on_unrecognized='ignore', + print_func=_logger.error) + if len(self.input_data_types) == 0: + # find_matching_values() will print an error. + sys.exit(1) + elif self.options.unwrap and len(self.input_data_types) > 1: + _logger.error('You can only unwrap one data type in real time. To extract multiple data streams, ' + 'consider logging all data and then running p1_dump_input.') + sys.exit(1) + except ValueError as e: + _logger.error(str(e)) + _logger.debug('', exc_info=e) + sys.exit(1) + + def _init_source_id_filter(self) -> None: + # If the user specified a set of source IDs, limit messages to only those sources. + if self.options.source_identifier is not None: + try: + self.source_ids = set([int(s) for s in self.options.source_identifier]) + except ValueError: + _logger.error('Source identifiers must be integers.') + sys.exit(1) + + def _init_time_range_filter(self) -> None: + try: + self.time_range = TimeRange.parse(self.options.time) + except ValueError as e: + _logger.error(str(e)) + _logger.debug('', exc_info=e) + sys.exit(1) + + def _configure_input(self) -> None: + # Connect to the device using the specified transport, or read from a file or log. + try: + # If the user specified a partial or complete log hash, or the path to a directory, try to locate a P1 log. + # Log patterns are mutually exclusive with transport descriptors, so it can only be one or the other. No + # need to check both. + if is_possible_log_pattern(self.options.input): + input_path, self.log_id = locate_log( + input_path=self.options.input, log_base_dir=self.options.log_base_dir, + return_log_id=True, extract_fusion_engine_data=False) + if input_path is None: + # locate_log() will log an error. + sys.exit(1) + else: + self.input_transport = create_transport(input_path, mode='input', print_func=self._print) + else: + self.input_transport = create_transport(self.options.input, mode='input', print_func=self._print) + + # If we're reading from a normal file on disk, use MixedLogReader instead of reading directly. That is more + # efficient since it will index the file for faster reads. + if isinstance(self.input_transport, FileTransport) and not self.input_transport.is_stdin: + message_types_plus_wrapper = set(self.message_types) + invert = self.options.invert + if self.include_input_data_wrapper and MessageType.INPUT_DATA_WRAPPER not in self.message_types: + # If the user specifies message types that they want to _exclude_, but we also need to _include_ + # InputDataWrapper, MixedLogReader can't do both. We'll have it pass all messages and handle the + # filtering later. + if invert: + message_types_plus_wrapper = None + invert = False + else: + message_types_plus_wrapper.add(MessageType.INPUT_DATA_WRAPPER) + + self.input_transport.input.close() + self.log_reader = MixedLogReader( + self.input_transport.input_path, ignore_index=self.options.ignore_index, + return_bytes=True, return_offset=True, show_progress=self.options.progress, + message_types=message_types_plus_wrapper, invert_message_types=invert, + time_range=self.time_range, source_ids=self.source_ids) + + # MixedLogReader will apply the time range, message type, and source ID filters, so we will clear them + # here so they are not applied twice by _apply_filters(). + self.time_range = None + if message_types_plus_wrapper is not None: + self.message_types = set() + self.source_ids = set() + except Exception as e: + _logger.error(str(e)) + _logger.debug('', exc_info=e) + sys.exit(1) + + def _configure_output(self) -> None: + # Open the output file or real-time output transport if enabled. + if self.options.output is not None: + # If writing to a .p1log file, if there's an existing index file (.p1i) for that filename, delete it. + if self.options.output_format == 'p1log': + p1i_path = os.path.splitext(self.options.output)[0] + '.p1i' + if os.path.exists(p1i_path): + os.remove(p1i_path) + + # Now open the transport/file. + self.output_transport = create_transport(self.options.output, mode='output', print_func=self._print) + + # If requested when logging to disk, also capture host OS timestamps as messages arrive. + if self.options.log_timestamp_source: + if not isinstance(self.output_transport, FileTransport) or self.output_transport.is_stdout: + _logger.error('--log-timestamp-source can only be used when --output is a file.') + sys.exit(1) + elif self.options.output_format == 'csv': + _logger.error('--log-timestamp-source only supported for binary output files.') + sys.exit(1) + else: + self.timestamp_file = open(self.options.output + TIMESTAMP_FILE_ENDING, 'wb') + + # Note: We intentionally set --output-format=None by default instead of raw to avoid printing the warning below + # unnecessarily. If not specified, default to raw (i.e., capture all incoming data). + self.generating_raw_log = (self.output_transport is not None and + (self.options.output_format == 'raw' or self.options.output_format is None)) + self.generating_p1log = (self.output_transport is not None and self.options.output_format == 'p1log') + self.generating_csv = (self.output_transport is not None and self.options.output_format == 'csv') + + if self.generating_csv: + self.output_transport.write(b'host_time,type,p1_time,sys_time\n') + + # If the user wants to unwrap InputDataWrapper messages and they set anything other than --output-format=raw, + # fail. + # + # If the user requested --output-format=raw but also set specific message types, warn them that we will only be + # outputting the requested FusionEngine messages and not any non-FusionEngine binary data in the input stream. + # + # There is no requirement to use the .p1log file extension for a stream containing only FusionEngine messages. + if self.options.unwrap: + if self.output_transport is None: + _logger.error("You must specify an output file or transport when using --unwrap.") + sys.exit(1) + elif not self.generating_raw_log: + _logger.error("Output format must 'raw' when unwrapping InputDataWrapper content.") + sys.exit(1) + + if self.generating_raw_log and self.options.output_format is not None and len(self.message_types) != 0: + _logger.warning('Raw log format requested, but --message-type specified. Output will not contain any ' + 'non-FusionEngine binary, if present in the input stream.') + self.generating_raw_log = False + self.generating_p1log = True + + def _set_read_timeout(self) -> None: + # In the read loop, if we're filtering data and forwarding it in real time, we'll read a small amount of data at + # a time to reduce latency. Otherwise, if we're just displaying stuff or writing to disk, we'll read more data + # at a time to be more efficient. + is_real_time = (self.output_transport is not None and + (not isinstance(self.output_transport, FileTransport) or self.output_transport.is_stdout)) + if is_real_time: + self.read_timeout_sec = 1.0 + self.read_size_bytes = 64 + else: + self.read_timeout_sec = 1.0 + self.read_size_bytes = 1024 + + # If this is a TCP/UDP/UNIX socket, configure it for non-blocking reads. We'll apply a read timeout with + # select() in the read loop. + if isinstance(self.input_transport, socket.socket): + self.input_transport.setblocking(0) + # This function won't do anything if neither timestamp is enabled. + enable_socket_timestamping( + self.input_transport, + enable_sw_timestamp=self.options.log_timestamp_source == 'kernel-sw', + enable_hw_timestamp=self.options.log_timestamp_source == 'hw' + ) + # If this is a serial port or websocket, configure its read timeout. If this is a file, set_read_timeout() is a + # no-op. + else: + if self.options.log_timestamp_source and self.options.log_timestamp_source != 'user-sw': + _logger.error( + f'--log-timestamp-source={self.options.log_timestamp_source} is not supported. Only "user-sw" ' + f'timestamps are supported on non-socket captures.') + sys.exit(1) + + set_read_timeout(self.input_transport, self.read_timeout_sec) + + def process_input(self) -> None: + """! + @brief Process all incoming data until stopped (Ctrl-C) or reaching the end of the file. + """ + self.start_time = datetime.now() + self.last_print_time = self.start_time + + # Listen for incoming data. + try: + while True: + # If using a MixedLogReader, read one message from the log. + if self.log_reader: + try: + next_message = next(self.log_reader) + received_data = next_message[2] + messages = [next_message] + now = datetime.now() + timestamp_sec = now.timestamp() + except StopIteration: + break + # Otherwise, read some data from the transport/file. + else: + kernel_ts: Optional[float] = None + hw_ts: Optional[float] = None + try: + # If this is a TCP/UDP socket, use select() to implement a read timeout so we can wake up + # periodically and print status if there's no incoming data. + if isinstance(self.input_transport, socket.socket): + ready = select.select([self.input_transport], [], [], self.read_timeout_sec) + if ready[0]: + received_data, kernel_ts, hw_ts = recv(self.input_transport, self.read_size_bytes) + else: + received_data = [] + # If this is a serial port or file, we set the read timeout above. + else: + received_data = recv_from_transport(self.input_transport, self.read_size_bytes) + + # Check if we reached EOF. + if len(received_data) == 0 and isinstance(self.input_transport, FileTransport): + break + + now = datetime.now() + + self.bytes_received += len(received_data) + + if self.show_summary_live or self.show_status: + if (now - self.last_print_time).total_seconds() > self.print_timeout_sec: + self._print_display(now) + except serial.SerialException as e: + _logger.error('Unexpected error reading from device:\r%s' % str(e)) + break + + if self.options.log_timestamp_source == 'kernel-sw': + if kernel_ts is None: + _logger.error(f'Unable to capture kernel SW timestamps on {self.options.transport}.') + sys.exit(1) + timestamp_sec = kernel_ts + elif self.options.log_timestamp_source == 'hw': + if hw_ts is None: + _logger.error( + f'Unable to capture HW timestamps on {self.options.transport}.\n{HW_TIMESTAMPING_HELP}') + sys.exit(1) + timestamp_sec = hw_ts + else: + timestamp_sec = now.timestamp() + + # If logging in raw format, write the data to disk as is. + # + # Exception: In --unwrap mode, we are creating a raw output file, but we do it later after filtering to + # InputDataWrapper messages and extracting their content. + if self.generating_raw_log and not self.options.unwrap: + self.output_transport.write(received_data) + self.bytes_sent += len(received_data) + if self.timestamp_file: + timestamp_ns = int(round(timestamp_sec * 1e9)) + log_timestamped_data_offset(self.timestamp_file, timestamp_ns, self.bytes_received) + + # Decode the incoming data and print the contents of any complete messages. + # + # Note that we pass the data to the decoder at all times, even if --display=false, --summary=false, and + # --quiet=true were set, so that: + # - So that we get a count of the number of incoming and outgoing messages + # - So we print warnings if the CRC fails on any of the incoming data + # - If we are logging in *.p1log format, so the decoder can separate the FusionEngine data from any + # non-FusionEngine data in the stream + if not self.log_reader: + messages = self.decoder.on_data(received_data) + + # Now process the message. + finished = not self._process_fe_messages(messages, timestamp_sec) + + if self.show_summary_live: + if (now - self.last_print_time).total_seconds() > 0.5: + self._print_display(now) + + if finished: + break + except (BrokenPipeError, KeyboardInterrupt) as e: + # User hit Ctrl-C -- done processing. + pass + + # Close the transport. + self.input_transport.close() + + # Close the output file. + if self.output_transport is not None: + self.output_transport.close() + + # Update the summary one last time if enabled. + if self.show_summary: + now = datetime.now() + self._print_display(now) + + def _process_fe_messages(self, messages, timestamp_sec): + for (header, message, raw_data, offset_bytes) in messages: + # Count _all_ incoming FusionEngine messages. We apply the user-specified message_types filter below to the + # outgoing message count. + self.messages_received += 1 + self.fe_bytes_received += len(raw_data) + + # Capture elapsed P1 and (device) system time. + if isinstance(message, MessagePayload): + p1_time = message.get_p1_time() + if p1_time is not None: + if self.first_p1_time_sec is None: + self.first_p1_time_sec = float(p1_time) + self.last_p1_time_sec = float(p1_time) + + system_time = message.get_system_time_sec() + if system_time is not None: + if self.first_system_time_sec is None: + self.first_system_time_sec = float(system_time) + self.last_system_time_sec = float(system_time) + else: + p1_time = None + system_time = None + + if self._apply_filters(header=header, message=message): + # If requested, skip the first N messages that pass the filter (e.g., skip the first 10 pose messages). + if self.skipped_messages < self.options.skip: + self.skipped_messages += 1 + continue + + self.device_summary.update(header, message) + self.messages_sent += 1 + + if self.generating_p1log: + self.output_transport.write(raw_data) + self.bytes_sent += len(raw_data) + if self.timestamp_file: + timestamp_ns = int(round(timestamp_sec * 1e9)) + log_timestamped_data_offset(self.timestamp_file, timestamp_ns, self.fe_bytes_received) + elif self.generating_csv: + self.bytes_sent += len(raw_data) + p1_str = str(p1_time.seconds) if p1_time is not None and not math.isnan(p1_time) else '' + sys_str = str(system_time) if system_time is not None and not math.isnan(system_time) else '' + self.output_transport.write( + f'{timestamp_sec},{header.message_type},{p1_str},{sys_str}\n'.encode('utf-8')) + # In --unwrap mode, we filter to just InputDataWrapper messages. Extract the content of those messages + # to send to the output transport. The output format will be 'raw' (generating_raw_log == True), but we + # disabled the 'raw' logging in process_input() in order to get here. + elif self.options.unwrap: + self.output_transport.write(message.data) + self.bytes_sent += len(message.data) + + if self.show_message_contents: + print_message(header=header, contents=message, offset_bytes=offset_bytes, bytes=raw_data, + format=self.options.display_format, + message_types=self.message_types, wrapped_data_mode=self.wrapped_data_format, + logger=_logger) + + if self.options.max is not None and self.messages_sent == self.options.max: + return False + + if self.time_range is not None and self.time_range.in_range_ended(): + return False + + return True + + def _apply_filters(self, header: MessageHeader, message: Union[MessagePayload, bytes]) -> bool: + """! + @brief Apply user-specified filters and determine if this message should be included or skipped. + + @param header The message header. + @param message The message payload, or a `bytes` array if the message could not be parsed. + + @return `True` if the message passed the filters and should be included. + """ + # Check if this message is in the specified time range or if we're reached the end of the time range and should + # stop processing. + if self.time_range is not None and isinstance(message, MessagePayload): + if not self.time_range.is_in_range(message): + return False + + # See if this is in the list of user-specified message types to keep. If the list is empty, keep all messages. + # + # In unwrap mode, we explicitly set message_types to InputDataWrapper messages and ignore all other incoming + # messages. + # + # When not in unwrap mode, the user may or may not have requested InputDataWrapper. However, if they set + # --wrapped-data-format=auto|all|content, we will pass wrappers through here and filter them out below. + if len(self.message_types) > 0: + if header.message_type == MessageType.INPUT_DATA_WRAPPER and self.include_input_data_wrapper: + pass + elif not self.options.invert and header.message_type not in self.message_types: + return False + elif self.options.invert and header.message_type in self.message_types: + return False + + # If this is an InputDataWrapper and the user specified a list of data types to keep, keep only the messages + # with that kind of data. If the list is empty, keep all messages. + if header.message_type == MessageType.INPUT_DATA_WRAPPER and len(self.input_data_types) > 0: + if not self.options.invert and message.data_type not in self.input_data_types: + return False + elif self.options.invert and message.data_type in self.input_data_types: + return False + + # If the user listed specific sources IDs, restrict to that. + if len(self.source_ids) > 0: + if header.source_identifier not in self.source_ids: + return False + + return True + + def _print_display(self, now): + if self.show_status: + self._print_status(now) + elif self.show_summary: + self._print_summary(now) + self.last_print_time = now + + def _print_status(self, now): + self._print( + 'Status: [elapsed_time=%d sec, received: %d B (%d messages = %d B) -> sent: %d B (%d messages)]' % + ((now - self.start_time).total_seconds(), + self.bytes_received, self.messages_received, self.fe_bytes_received, + self.bytes_sent, self.messages_sent)) + + def _print_summary(self, now): + if self.show_summary_live: + # Clear the terminal. + print(colorama.ansi.CSI + 'H' + colorama.ansi.CSI + 'J', end='', file=self.logging_stream) + + # Log/data details. + if isinstance(self.input_transport, FileTransport): + if self.input_transport.is_stdin: + self._print('Input file: ') + else: + self._print(f'Input file: {self.input_transport.input_path}') + + if self.log_id is not None: + self._print(f'Log ID: {self.log_id}') + + if self.first_p1_time_sec is not None: + elapsed_sec = self.last_p1_time_sec - self.first_p1_time_sec + self._print(f'P1 time: {self.first_p1_time_sec} -> {self.last_p1_time_sec} ({elapsed_sec:.1f} sec)') + else: + self._print(f'P1 time: -') + + if self.first_system_time_sec is not None: + elapsed_sec = self.last_system_time_sec - self.first_system_time_sec + self._print(f'System time: {self.first_system_time_sec} -> {self.last_system_time_sec} ' + f'({elapsed_sec:.1f} sec)') + else: + self._print(f'System time: -') + + self._print("") + + # Real-time processing details. + self._print(f'Elapsed time: {(now - self.start_time).total_seconds():.1f} sec') + self._print(f'Received: {self.bytes_received} B ({self.messages_received} messages = ' + f'{self.fe_bytes_received} B)') + self._print(f'Sent: {self.bytes_sent} B ({self.messages_sent} messages)') + + # Message summary table. + self._print("") + print_summary_table(self.device_summary) + + def _print(self, msg, *args, **kwargs): + if not self.quiet: + _logger.info(msg, *args, **kwargs) + + +def main(default_display_mode: str = 'summary', default_output: str = None): # Parse command-line arguments. parser = ArgumentParser(description="""\ -Connect to a Point One device and print out the incoming FusionEngine message -contents and/or log the messages to disk. +Connect to a Point One device in real time over TCP, UDP, UNIX socket, etc., +or read logged data from a file for post-processing, then filter/display the +incoming FusionEngine messages. The data may also be logged to disk or sent to +another application, either over stdout or a specified transport. + +Examples: + # Connect to a device over TCP and display a summary of the incoming data. + # + # --display=summary is the default setting. + ./p1_capture.py tcp://192.168.1.138:30202 + or + ./p1_capture.py tcp://192.168.1.138:30202 --display=summary + or + ./p1_capture.py tcp://192.168.1.138:30202 --summary + + # Connect to a device over a serial port. + ./p1_capture.py tty:///dev/ttyUSB0:460800 + + # Display the contents of all messages received from a device in real time, + # instead of summarizing. + ./p1_capture.py tcp://192.168.1.138:30202 --display=messages + + # Log output from a device to disk. + ./p1_capture.py tcp://192.168.1.138:30202 --output=my_log.p1log + + # Display a data capture status instead of the larger summary table. + ./p1_capture.py tcp://192.168.1.138:30202 --output=my_log.p1log --display=status + + # Display the contents of all Pose messages captured in a log file. + ./p1_capture.py my_log.p1log --message-type=Pose --display=messages + + # Filter a recorded log file and only keep the Pose messages. + ./p1_capture.py my_log.p1log --message-type=Pose --output=pose_output.p1log + + # Filter out non-FusionEngine content from a binary file containing a mix of + # FusionEngine messages and other protocols, leaving only the FusionEngine + # content. + ./p1_capture.py my_log.bin --output=fe_content.p1log --output-format=p1log + + # Print the contents of the first 10 Pose messages in a recorded data file. + ./p1_capture.py my_log.p1log --message-type=Pose --max=10 \ + --display=messages + + # Print the contents of the 10 Pose messages in a recorded data file, + # starting with the 45th Pose message. + ./p1_capture.py my_log.p1log --message-type=Pose --skip=44 --max=10 \ + --display=messages + + # Filter the incoming data from a device connected over TCP and remove + # GNSSSignals messages. Log the remaining FusionEngine messages to disk. + ./p1_capture.py tcp://192.168.1.138:30202 \ + --output=my_log_no_gnss_signals.p1log \ + --invert --message-type=GNSSSignals + + # Same as above, but capture data from stdin using netcat. + netcat 192.168.1.138 30202 | \ + ./p1_capture.py \ + --output=my_log_no_gnss_signals.p1log \ + --invert --message-type=GNSSSignals + + # Similar to above, but open a serial port manually using stty and cat. + stty -F /dev/ttyUSB0 speed 460800 cs8 \ + -cstopb -parenb -icrnl -ixon -ixoff -opost -isig -icanon -echo && \ + cat /dev/ttyUSB0 | \ + ./p1_capture.py \ + --output=my_log_no_gnss_signals.p1log \ + --invert --message-type=GNSSSignals + + # Extract GNSS receiver data in its native format (RTCM, SBF, etc.) from a + # remote device, and pass the data to another application to be parsed and + # displayed. + # + # Note that --output=- sends the data to stdout. All status/display prints + # will be redirected to stderr, or in this case, disabled using + # --display=quiet. + ./p1_capture.py tcp://192.168.1.138:30202 \ + --unwrap=EXTERNAL_UNFRAMED_GNSS \ + --output=- --display=quiet | \ + example_rtcm_print_utility """) + add_print_format_argument(parser, '--display-format') parser.add_argument( - '--display', action=ExtendedBooleanAction, default=True, - help="Print the incoming message contents to the console.") - parser.add_argument( - '-m', '--message-type', type=str, action='append', - help="An optional list of class names corresponding with the message types to be displayed. May be specified " - "multiple times (-m Pose -m PoseAux), or as a comma-separated list (-m Pose,PoseAux). All matches are" - "case-insensitive.\n" - "\n" - "If a partial name is specified, the best match will be returned. Use the wildcard '*' to match multiple " - "message types.\n" - "\n" - "Note: This applies to the displayed messages only. All incoming data will still be stored on disk if " - "--output is specified.\n" - "\n" - "Supported types:\n%s" % '\n'.join(['- %s' % c for c in message_type_by_name.keys()])) - parser.add_argument( - '-q', '--quiet', dest='quiet', action=ExtendedBooleanAction, default=False, - help="Do not print anything to the console.") + '-d', '--display', type=str, default=default_display_mode, + choices=('messages', 'messages+summary', 'none', 'quiet', 'status', 'summary'), + help="""\ +Specify the level of detail to be displayed on the console. Output will be printed to stdout, unless configured to write +incoming data to stdout (--output=-). +- messages - Print the content of all incoming FusionEngine messages +- messages+summary - Print the content of all incoming FusionEngine messages, plus a summary on exit +- none - Only print warnings/errors, do not print any contents to the console +- quiet - Alias for 'none' +- status - Periodically print the amount of data received (byte count, number of messages) but not contents +- summary - Print a table summarizing the incoming data""") parser.add_argument( '-s', '--summary', action=ExtendedBooleanAction, default=False, - help="Print a summary of the incoming messages instead of the message content.") + help="Alias for --display=summary.") parser.add_argument( '-v', '--verbose', action='count', default=0, help="Print verbose/trace debugging messages.") - file_group = parser.add_argument_group('File Capture') - file_group.add_argument( - '-f', '--output-format', default='raw', choices=('p1log', 'raw', 'csv'), + input_parser = parser.add_argument_group('Input Control') + define_log_search_arguments(input_parser, define_log=False) + input_parser.add_argument( + '--progress', action=ExtendedBooleanAction, + help="If input is a file, print file read progress to the console periodically.") + input_parser.add_argument( + 'input', type=str, nargs='?', default='-', + help=f"""\ +{TRANSPORT_HELP_STRING} +- The path to a FusionEngine log directory +- A pattern matching a FusionEngine log directory under the specified base directory (see find_fusion_engine_log() and + --log-base-dir) +""") + + filter_group = parser.add_argument_group('Message Filtering') + filter_group.add_argument( + '-V', '--invert', action=ExtendedBooleanAction, default=False, help="""\ -The format of the file to be generated when --output is enabled: -- p1log - Create a *.p1log file containing only FusionEngine messages -- raw - Create a generic binary file containing all incoming data -- csv - Create a CSV file with the received message types and timestamps""") +If specified, discard all message types specified with --message-type and output everything else. + +By default, all specified message types are output and all others are discarded.""") + filter_group.add_argument( + '-n', '--max', type=int, default=None, + help="Process up to a maximum of N messages. If --message-type is specified, only count messages matching the " + "specified type(s).") + filter_group.add_argument( + '-m', '--message-type', type=str, action='append', + help="""\ +An optional list of class names corresponding with the message types to be displayed. May be specified multiple times +(-m Pose -m PoseAux), or as a comma-separated list (-m Pose,PoseAux). All matches are case-insensitive. + +If a partial name is specified, the best match will be returned. Use the wildcard '*' to match multiple message types. + +Supported types: +%s""" % '\n'.join(['- %s' % c for c in message_type_by_name.keys()])) + filter_group.add_argument( + '--skip', type=int, default=0, + help="Skip the first N messages. If --message-type is specified, only count messages matching the specified " + "type(s).") + filter_group.add_argument( + '--source-identifier', '--source-id', action=CSVAction, nargs='*', + help="Only include messages with the listed source identifier(s). Must be integers. May be specified multiple " + "times (--source-id 0 --source-id 1), as a space-separated list (--source-id 0 1), or as a " + "comma-separated list (--source-id 0,1). If not specified, all available source identifiers present in " + "the data will be used.") + filter_group.add_argument( + '-t', '--time', type=str, metavar='[START][:END][:{rel,abs}]', + help="Only process messages in the specified time range. Both start and end may be omitted to read from the " + "beginning or to the end of the file. By default, timestamps are treated as relative to the first message " + "in the file, unless an 'abs' type is specified.") + + wrapper_group = parser.add_argument_group('InputDataWrapper Support') + wrapper_group.add_argument( + '-u', '--unwrap', type=str, + help="""\ +Unwrap the content of InputDataWrapper messages containing the specified data type. + +If a partial name is specified, the best match will be returned. Use the wildcard '*' to match multiple types. + +Supported types: +%s""" % '\n'.join(['- %s' % c for c in InputDataType])) + add_wrapped_data_mode_argument(wrapper_group, '--wrapped-data-format', default='parent') + wrapper_group.add_argument( + '--wrapped-data-type', type=str, action='append', + help="""\ +If specified, discard InputDataWrapper messages for data types other than the requested values. May be specified +multiple times, or as a comma-separated list. + +If a partial name is specified, the best match will be returned. Use the wildcard '*' to match multiple types. + +Supported types: +%s""" % '\n'.join(['- %s' % c for c in InputDataType])) + + file_group = parser.add_argument_group('Output Capture') parser.add_argument( '--log-timestamp-source', default=None, choices=('user-sw', 'kernel-sw', 'hw'), help=f"""\ @@ -80,24 +792,37 @@ def main(): The data is pairs of uint64. First, the timestamp in nanoseconds followed by the byte offset in the data file. - user-sw - Log timestamps from python code. This is the only option available for serial data. - kernel-sw - Log kernel SW timestamps. This is only available for socket connections. -- hw - Log HW timestamps from device driver. This needs HW driver support. Run `./fusion_engine_client/utils/socket_timestamping.py` to test.""") +- hw - Log HW timestamps from device driver. This needs HW driver support. Run + `./fusion_engine_client/utils/socket_timestamping.py` to test.""") file_group.add_argument( - '-o', '--output', metavar='PATH', type=str, + '-o', '--output', metavar='PATH', type=str, default=default_output, help=f"""\ -If specified, save the incoming data in the specified file or transport. +If specified, save the incoming data in the specified file, or send it to the +specified transport. Supported formats include: {TRANSPORT_HELP_OPTIONS}""") - - parser.add_argument( - 'transport', type=str, - help=TRANSPORT_HELP_STRING) + file_group.add_argument( + '-f', '--output-format', default=None, choices=('p1log', 'raw', 'csv'), + help="""\ +The format of the file to be generated when --output is enabled: +- p1log - Create a *.p1log file containing only FusionEngine messages +- raw - Create a generic binary file containing all incoming data (default) +- csv - Create a CSV file with the received message types and timestamps""") options = parser.parse_args() - if options.quiet: - options.display = False + # --summary is an alias for --display=summary. + if options.summary: + options.display = 'summary' + if options.display == 'quiet': + options.display = 'none' + + quiet = options.display == 'none' + + # Configure logging. + # # If the user is sending output to stdout, route all other messages to stderr so the logging prints and the data # don't get mixed up. Otherwise, print to stdout. if options.output in ('', '-', 'file://-'): @@ -105,7 +830,6 @@ def main(): else: logging_stream = sys.stdout - # Configure logging. if options.verbose >= 1: logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s:%(lineno)d - %(message)s', stream=logging_stream) @@ -116,197 +840,16 @@ def main(): logging.getTraceLevel(depth=options.verbose - 1)) else: logging.basicConfig(level=logging.INFO, format='%(message)s', stream=logging_stream) + if quiet: + logging.getLogger('point_one.utils.log').setLevel(logging.ERROR) + logging.getLogger('point_one.fusion_engine.parsers').setLevel(logging.ERROR) HighlightFormatter.install(color=True, standoff_level=logging.WARNING) BrokenPipeStreamHandler.install() - # If the user specified a set of message names, lookup their type values. Below, we will limit the printout to only - # those message types. - message_types = None - if options.message_type is not None: - # Pattern match to any of: - # -m Type1 - # -m Type1 -m Type2 - # -m Type1,Type2 - # -m Type1,Type2 -m Type3 - # -m Type* - try: - message_types = MessagePayload.find_matching_message_types(options.message_type) - if len(message_types) == 0: - # find_matching_message_types() will print an error. - sys.exit(1) - except ValueError as e: - _logger.error(str(e)) - sys.exit(1) - - # Connect to the device using the specified transport. - try: - transport = create_transport(options.transport, mode='input') - except Exception as e: - _logger.error(str(e)) - sys.exit(1) - - # Open the output file if logging was requested. - timestamp_file = None - if options.output is not None: - if options.output_format == 'p1log': - p1i_path = os.path.splitext(options.output)[0] + '.p1i' - if os.path.exists(p1i_path): - os.remove(p1i_path) - - output_file = create_transport(options.output, mode='output') - - if isinstance(output_file, VirtualSerial): - _logger.info(f'Writing output to: {output_file}') - - if options.log_timestamp_source and options.output_format != 'csv': - timestamp_file = open(options.output + TIMESTAMP_FILE_ENDING, 'wb') - else: - output_file = None - - generating_raw_log = (output_file is not None and options.output_format == 'raw') - generating_p1log = (output_file is not None and options.output_format == 'p1log') - generating_csv = (output_file is not None and options.output_format == 'csv') - - if generating_csv: - output_file.write(b'host_time,type,p1_time,sys_time\n') - - # If this is a TCP/UDP socket, configure it for non-blocking reads. We'll apply a read timeout with select() below. - read_timeout_sec = 1.0 - if isinstance(transport, socket.socket): - transport.setblocking(0) - # This function won't do anything if neither timestamp is enabled. - enable_socket_timestamping( - transport, - enable_sw_timestamp=options.log_timestamp_source == 'kernel-sw', - enable_hw_timestamp=options.log_timestamp_source == 'hw' - ) - # If this is a serial port, configure its read timeout. - else: - if options.log_timestamp_source and options.log_timestamp_source != 'user-sw': - _logger.error(f'--log-timestamp-source={options.log_timestamp_source} is not supported. Only "user-sw" timestamps are supported on non-socket captures.') - sys.exit(1) - - set_read_timeout(transport, read_timeout_sec) - - # Listen for incoming data. - decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet and not options.summary, return_bytes=True) - - bytes_received = 0 - fe_bytes_received = 0 - messages_received = 0 - device_summary = DeviceSummary() - - start_time = datetime.now() - last_print_time = start_time - print_timeout_sec = 1.0 if options.summary else 5.0 - - def _print_status(now): - if options.summary: - # Clear the terminal. - print(colorama.ansi.CSI + 'H' + colorama.ansi.CSI + 'J', end='', file=logging_stream) - _logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % - (bytes_received, messages_received, (now - start_time).total_seconds())) - if options.summary: - print_summary_table(device_summary) - - try: - while True: - kernel_ts: Optional[float] = None - hw_ts: Optional[float] = None - # Read some data. - try: - # If this is a TCP/UDP socket, use select() to implement a read timeout so we can wakeup periodically - # and print status if there's no incoming data. - if isinstance(transport, socket.socket): - ready = select.select([transport], [], [], read_timeout_sec) - if ready[0]: - received_data, kernel_ts, hw_ts = recv(transport, 1024) - else: - received_data = [] - # If this is a serial port or file, we set the read timeout above. - else: - received_data = recv_from_transport(transport, 1024) - - bytes_received += len(received_data) - - now = datetime.now() - if not options.quiet: - if (now - last_print_time).total_seconds() > print_timeout_sec: - _print_status(now) - last_print_time = now - except serial.SerialException as e: - _logger.error('Unexpected error reading from device:\r%s' % str(e)) - break - - if options.log_timestamp_source == 'kernel-sw': - if kernel_ts is None: - _logger.error(f'Unable to capture kernel SW timestamps on {options.transport}.') - sys.exit(1) - timestamp_sec = kernel_ts - elif options.log_timestamp_source == 'hw': - if hw_ts is None: - _logger.error(f'Unable to capture HW timestamps on {options.transport}.\n{HW_TIMESTAMPING_HELP}') - sys.exit(1) - timestamp_sec = hw_ts - else: - timestamp_sec = now.timestamp() - timestamp_ns = int(round(timestamp_sec * 1e9)) - - # If logging in raw format, write the data to disk as is. - if generating_raw_log: - output_file.write(received_data) - if timestamp_file: - log_timestamped_data_offset(timestamp_file, timestamp_ns, bytes_received) - - - # Decode the incoming data and print the contents of any complete messages. - # - # Note that we pass the data to the decoder, even if --no-display was requested, for three reasons: - # - So that we get a count of the number of incoming messages - # - So we print warnings if the CRC fails on any of the incoming data - # - If we are logging in *.p1log format, so the decoder can extract the FusionEngine data from any - # non-FusionEngine data in the stream - messages = decoder.on_data(received_data) - messages_received += len(messages) - - if options.display or generating_p1log: - for (header, message, raw_data) in messages: - fe_bytes_received += len(raw_data) - device_summary.update(header, message) - - if generating_p1log: - output_file.write(raw_data) - if timestamp_file: - log_timestamped_data_offset(timestamp_file, timestamp_ns, fe_bytes_received) - - if generating_csv: - p1_time = message.get_p1_time() - sys_time = message.get_system_time_sec() - p1_str = str(p1_time.seconds) if p1_time is not None and not math.isnan(p1_time) else '' - sys_str = str(sys_time) if sys_time is not None and not math.isnan(sys_time) else '' - output_file.write( - f'{timestamp_sec},{header.message_type},{p1_str},{sys_str}\n'.encode('utf-8')) - - if options.display: - if options.summary: - if (now - last_print_time).total_seconds() > 0.1: - _print_status(now) - elif message_types is None or header.message_type in message_types: - print_message(header, message, format=options.display_format) - except (BrokenPipeError, KeyboardInterrupt) as e: - pass - - # Close the transport. - transport.close() - - # Close the output file. - if output_file is not None: - output_file.close() - - if not options.quiet and not options.summary: - now = datetime.now() - _print_status(now) + # Configure the application. + app = Application(options=options, logging_stream=logging_stream) + app.process_input() if __name__ == "__main__": diff --git a/python/fusion_engine_client/applications/p1_filter.py b/python/fusion_engine_client/applications/p1_filter.py index 03416908..06514207 100755 --- a/python/fusion_engine_client/applications/p1_filter.py +++ b/python/fusion_engine_client/applications/p1_filter.py @@ -1,224 +1,13 @@ #!/usr/bin/env python3 -from datetime import datetime -import os -import sys -import time - -# Since stdout is used for data stream, don't write any print statements to stdout. -# Done here to avoid any log/print statements triggered by imports. -original_stdout = sys.stdout -sys.stdout = sys.stderr - -# Add the Python root directory (fusion-engine-client/python/) to the import search path to enable FusionEngine imports -# if this application is being run directly out of the repository and is not installed as a pip package. -root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) -sys.path.insert(0, root_dir) - -from fusion_engine_client.messages import InputDataType, MessagePayload, MessageType, message_type_by_name -from fusion_engine_client.parsers import FusionEngineDecoder -from fusion_engine_client.utils.argument_parser import ArgumentParser, ExtendedBooleanAction -from fusion_engine_client.utils.transport_utils import * +if __package__ is None or __package__ == "": + from import_utils import enable_relative_imports + __package__ = enable_relative_imports(__name__, __file__) +from .p1_capture import main as p1_capture_main def main(): - parser = ArgumentParser(description="""\ -Filter FusionEngine data coming from a device, or via stdin, and send the -filtered result to stdout. - -Examples: - # Remove GNSSSatellite from the data stream of a device connected over TCP. - ./p1_filter.py tcp://192.168.1.138:30202 \ - --invert -m GNSSSatellite --display > /tmp/out.p1log - - # Same as above, but capture data using netcat. - netcat 192.168.1.138 30202 | \ - ./p1_filter.py --invert -m GNSSSatellite --display > /tmp/out.p1log - - # Only keep Pose messages from a recorded data file. - cat /tmp/out.p1log | ./p1_filter.py -m Pose > /tmp/pose_out.p1log - - # Only keep Pose messages from an incoming serial data stream. - ./p1_filter.py tty:///dev/ttyUSB0:460800 \ - -m Pose > /tmp/pose_out.p1log - - # Similar to above, but open the serial port manually using stty and cat. - stty -F /dev/ttyUSB0 speed 460800 cs8 \ - -cstopb -parenb -icrnl -ixon -ixoff -opost -isig -icanon -echo && \ - cat /dev/ttyUSB0 | \ - ./p1_filter.py -m Pose > /tmp/pose_out.p1log - - # Extract GNSS receiver data in its native format (RTCM, SBF, etc.) from a - # remote Point One device, and pass the data to another application to be - # parsed and displayed. - ./p1_filter.py tcp://192.168.1.138:30202 \ - --unwrap --data-type EXTERNAL_UNFRAMED_GNSS | \ - rtcm_print -""") - - parser.add_argument( - '-V', '--invert', action=ExtendedBooleanAction, default=False, - help="""\ -If specified, discard all message types specified with --message-type and output everything else. - -By default, all specified message types are output and all others are discarded.""") - parser.add_argument( - '--display', action=ExtendedBooleanAction, default=False, - help="Periodically print status on stderr.") - parser.add_argument( - '-m', '--message-type', type=str, action='append', - help="An list of class names corresponding with the message types to forward or discard (see --invert).\n" - "\n" - "May be specified multiple times (-m Pose -m PoseAux), or as a comma-separated list (-m Pose,PoseAux). " - "All matches are case-insensitive.\n" - "\n" - "If a partial name is specified, the best match will be returned. Use the wildcard '*' to match multiple " - "message types.\n" - "\n" - "Supported types:\n%s" % '\n'.join(['- %s' % c for c in message_type_by_name.keys()])) - parser.add_argument( - '-o', '--output', metavar='PATH', type=str, - help=f"""\ -If specified, write output to the specified file. Otherwise, output is sent to -stdout by default. - -Supported formats include: -{TRANSPORT_HELP_OPTIONS}""") - - wrapper_group = parser.add_argument_group('InputDataWrapper Support') - wrapper_group.add_argument( - '-d', '--data-type', type=str, action='append', - help="If specified, discard InputDataWrapper messages for data types other than the listed values.") - wrapper_group.add_argument( - '-u', '--unwrap', action=ExtendedBooleanAction, default=False, - help="""\ -Unwrap incoming InputDataWrapper messages and output their contents without FusionEngine framing. Discard all other -FusionEngine messages. - -Note that we strongly recommend using this option with a single --data-type specified. When --data-type is not -specified, or when multiple data types are specified, the unwrapped stream will contain multiple interleaved binary -data streams with no frame alignment enforced.""") - - parser.add_argument( - 'input', metavar='PATH', type=str, nargs='?', default='-', - help=TRANSPORT_HELP_STRING) - options = parser.parse_args() - - # If the user specified a set of message names, lookup their type values. Below, we will limit the printout to only - # those message types. - message_types = set() - if options.unwrap: - if options.message_type is not None: - print('Error: You cannot specify both --unwrap and --message-type.') - sys.exit(1) - - message_types = {MessageType.INPUT_DATA_WRAPPER} - elif options.message_type is not None: - # Pattern match to any of: - # -m Type1 - # -m Type1 -m Type2 - # -m Type1,Type2 - # -m Type1,Type2 -m Type3 - # -m Type* - try: - message_types = MessagePayload.find_matching_message_types(options.message_type) - if len(message_types) == 0: - # find_matching_message_types() will print an error. - sys.exit(1) - except ValueError as e: - print(str(e)) - sys.exit(1) - - # For InputDataWrapper messages, if the user specified desired data types, limit the output to only those. - input_data_types = set() - if options.data_type is not None: - try: - input_data_types = InputDataType.find_matching_values(options.data_type, prefix='M_TYPE_', print_func=print) - if len(input_data_types) == 0: - # find_matching_values() will print an error. - sys.exit(1) - except ValueError as e: - print(str(e)) - sys.exit(1) - - # Open the output stream/data file. - if options.output is None: - options.output = 'file://-' - output_transport = create_transport(options.output, mode='output', stdout=original_stdout) - if isinstance(output_transport, VirtualSerial): - print(f'Writing output to: {output_transport}') - - # Open the input stream/data file. - input_transport = create_transport(options.input, mode='input') - - # Listen for incoming data. - start_time = datetime.now() - last_print_time = datetime.now() - bytes_received = 0 - bytes_forwarded = 0 - messages_received = 0 - messages_forwarded = 0 - - decoder = FusionEngineDecoder(return_bytes=True) - try: - while True: - # Need to specify read size or read waits for end of file character. - # This returns immediately even if 0 bytes are available. - received_data = recv_from_transport(input_transport, 64) - - if len(received_data) == 0: - time.sleep(0.1) - else: - bytes_received += len(received_data) - messages = decoder.on_data(received_data) - for (header, message, raw_data) in messages: - # In unwrap mode, discard all but InputDataWrapper messages. - if options.unwrap and header.message_type != MessageType.INPUT_DATA_WRAPPER: - continue - - messages_received += 1 - - # In unwrap mode, the input message is always an InputDataWrapper. - if options.unwrap: - pass_through_message = True - # Otherwise, see if this is in the list of user-specified message types to keep. If the list is - # empty, keep all messages. - else: - pass_through_message = ( - len(message_types) == 0 or - (options.invert and header.message_type not in message_types) or - (not options.invert and header.message_type in message_types) - ) - - # If this is an InputDataWrapper and the user specified a list of data types to keep, keep only the - # messages with that kind of data. If the list is empty, keep all messages. - if pass_through_message and header.message_type == MessageType.INPUT_DATA_WRAPPER: - pass_through_message = ( - len(input_data_types) == 0 or - (options.invert and message.data_type not in input_data_types) or - (not options.invert and message.data_type in input_data_types) - ) - - # If the message passed the filters above, output it now. - if pass_through_message: - messages_forwarded += 1 - if options.unwrap: - bytes_forwarded += len(message.data) - output_transport.write(message.data) - else: - bytes_forwarded += len(raw_data) - output_transport.write(raw_data) - - if options.display: - now = datetime.now() - if (now - last_print_time).total_seconds() > 5.0: - print('Status: [bytes_in=%d, msgs_in=%d, bytes_out=%d, msgs_out=%d, elapsed_time=%d sec]' % - (bytes_received, messages_received, bytes_forwarded, - messages_forwarded, (now - start_time).total_seconds())) - last_print_time = now - - except KeyboardInterrupt: - pass + p1_capture_main(default_display_mode='quiet', default_output='-') if __name__ == "__main__": diff --git a/python/fusion_engine_client/applications/p1_print.py b/python/fusion_engine_client/applications/p1_print.py index 7d56b2d1..c9154cd9 100755 --- a/python/fusion_engine_client/applications/p1_print.py +++ b/python/fusion_engine_client/applications/p1_print.py @@ -1,236 +1,13 @@ #!/usr/bin/env python3 -import sys - if __package__ is None or __package__ == "": from import_utils import enable_relative_imports __package__ = enable_relative_imports(__name__, __file__) -from ..messages import * -from ..parsers import MixedLogReader -from ..utils import trace as logging -from ..utils.argument_parser import ArgumentParser, ExtendedBooleanAction, CSVAction -from ..utils.log import define_cli_arguments as define_log_search_arguments, locate_log -from ..utils.print_utils import DeviceSummary, add_print_format_argument, print_message, print_summary_table -from ..utils.time_range import TimeRange -from ..utils.trace import HighlightFormatter, BrokenPipeStreamHandler - -_logger = logging.getLogger('point_one.fusion_engine.applications.print_contents') - +from .p1_capture import main as p1_capture_main def main(): - parser = ArgumentParser(description="""\ -Decode and print the contents of messages contained in a *.p1log file or other -binary file containing FusionEngine messages. The binary file may also contain -other types of data. -""") - - parser.add_argument( - '--absolute-time', '--abs', action=ExtendedBooleanAction, - help="Interpret the timestamps in --time as absolute P1 times. Otherwise, treat them as relative to the first " - "message in the file. Ignored if --time contains a type specifier.") - add_print_format_argument(parser, '-f', '--format', '--display-format') - parser.add_argument( - '-m', '--message-type', type=str, action='append', - help="An optional list of class names corresponding with the message types to be displayed. May be specified " - "multiple times (-m Pose -m PoseAux), or as a comma-separated list (-m Pose,PoseAux). All matches are" - "case-insensitive.\n" - "\n" - "If a partial name is specified, the best match will be returned. Use the wildcard '*' to match multiple " - "message types.\n" - "\n" - "Supported types:\n%s" % '\n'.join(['- %s' % c for c in message_type_by_name.keys()])) - parser.add_argument( - '-n', '--max', type=int, default=None, - help="Print up to a maximum of N messages. If --message-type is specified, only count messages matching the " - "specified type(s).") - parser.add_argument( - '-s', '--summary', action='store_true', - help="Print a summary of the messages in the file.") - parser.add_argument( - '--skip', type=int, default=0, - help="Skip the first N messages in the log. If --message-type is specified, only count messages matching the " - "specified type(s).") - parser.add_argument( - '--source-identifier', '--source-id', action=CSVAction, nargs='*', - help="Plot the FusionEngine Pose messages with the listed source identifier(s). Must be integers. May be " - "specified multiple times (--source-id 0 --source-id 1), as a space-separated list (--source-id 0 1), or " - "as a comma-separated list (--source-id 0,1). If not specified, all available source identifiers present " - "in the log will be used.") - parser.add_argument( - '-t', '--time', type=str, metavar='[START][:END][:{rel,abs}]', - help="The desired time range to be analyzed. Both start and end may be omitted to read from beginning or to " - "the end of the file. By default, timestamps are treated as relative to the first message in the file, " - "unless an 'abs' type is specified or --absolute-time is set.") - parser.add_argument('-v', '--verbose', action='count', default=0, - help="Print verbose/trace debugging messages.") - - log_parser = parser.add_argument_group('Input File/Log Control') - define_log_search_arguments(log_parser) - log_parser.add_argument( - '--progress', action='store_true', - help="Print file read progress to the console periodically.") - - options = parser.parse_args() - - read_index = not options.ignore_index - - # Configure logging. - if options.verbose >= 1: - logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s:%(lineno)d - %(message)s', - stream=sys.stdout) - if options.verbose == 1: - logging.getLogger('point_one.fusion_engine.parsers').setLevel(logging.DEBUG) - else: - logging.getLogger('point_one.fusion_engine.parsers').setLevel( - logging.getTraceLevel(depth=options.verbose - 1)) - else: - logging.basicConfig(level=logging.INFO, format='%(message)s', stream=sys.stdout) - - HighlightFormatter.install(color=True, standoff_level=logging.WARNING) - BrokenPipeStreamHandler.install() - - # Locate the input file and set the output directory. - input_path, log_id = locate_log(input_path=options.log, log_base_dir=options.log_base_dir, return_log_id=True, - extract_fusion_engine_data=False) - if input_path is None: - # locate_log() will log an error. - sys.exit(1) - - _logger.info("Processing input file '%s'." % input_path) - - # Parse the time range. - time_range = TimeRange.parse(options.time, absolute=options.absolute_time) - - # If the user specified a set of message names, lookup their type values. Below, we will limit the printout to only - # those message types. - message_types = None - if options.message_type is not None: - # Pattern match to any of: - # -m Type1 - # -m Type1 -m Type2 - # -m Type1,Type2 - # -m Type1,Type2 -m Type3 - # -m Type* - try: - message_types = MessagePayload.find_matching_message_types(options.message_type) - if len(message_types) == 0: - # find_matching_message_types() will print an error. - sys.exit(1) - except ValueError as e: - _logger.error(str(e)) - sys.exit(1) - - if options.source_identifier is None: - source_id = None - else: - try: - source_id = [int(s) for s in options.source_identifier] - except ValueError: - _logger.error('Source identifiers must be integers. Exiting.') - sys.exit(1) - - # Process all data in the file. - reader = MixedLogReader(input_path, return_bytes=True, return_offset=True, show_progress=options.progress, - ignore_index=not read_index, message_types=message_types, time_range=time_range, - source_ids=source_id) - - first_p1_time_sec = None - last_p1_time_sec = None - newest_p1_time = None - newest_p1_message_type = None - - first_system_time_sec = None - last_system_time_sec = None - newest_system_time_sec = None - newest_system_message_type = None - - total_decoded_messages = 0 - total_messages = 0 - bytes_decoded = 0 - device_summary = DeviceSummary() - - try: - for header, message, data, offset_bytes in reader: - total_decoded_messages += 1 - if total_decoded_messages <= options.skip: - continue - elif options.max is not None and (total_decoded_messages - options.skip) > options.max: - break - - # Update the data summary in summary mode, or print the message contents otherwise. - total_messages += 1 - bytes_decoded += len(data) - if options.summary: - device_summary.update(header, message) - - if message is not None: - p1_time = message.get_p1_time() - if p1_time is not None: - if first_p1_time_sec is None: - first_p1_time_sec = float(p1_time) - last_p1_time_sec = float(p1_time) - newest_p1_time = p1_time - newest_p1_message_type = header.message_type - else: - # We allow a small tolerance to account for normal latency between measurements and computed - # data like pose solutions, as well as latency between different types of measurements. - dt_sec = float(p1_time - newest_p1_time) - if dt_sec < -0.2: - _logger.warning( - 'Backwards/restarted P1 time detected after %s (%s). [new_message=%s, ' - 'new_p1_time=%s, offset=%d B]' % - (str(newest_p1_time), str(newest_p1_message_type), header.get_type_string(), - str(p1_time), offset_bytes)) - last_p1_time_sec = max(last_p1_time_sec, float(p1_time)) - newest_p1_time = p1_time - newest_p1_message_type = header.message_type - - system_time_sec = message.get_system_time_sec() - if system_time_sec is not None: - if first_system_time_sec is None: - first_system_time_sec = system_time_sec - last_system_time_sec = system_time_sec - newest_system_time_sec = system_time_sec - newest_system_message_type = header.message_type - else: - # We allow a small tolerance to account for normal latency between measurements and computed - # data like pose solutions, as well as latency between different types of measurements. - dt_sec = system_time_sec - newest_system_time_sec - if dt_sec < -0.2: - _logger.warning( - 'Backwards/restarted system time detected after %s (%s). [new_message=%s, ' - 'new_system_time=%s, offset=%d B]' % - (system_time_to_str(newest_system_time_sec, is_seconds=True), - str(newest_system_message_type), header.get_type_string(), - system_time_to_str(system_time_sec, is_seconds=True), - offset_bytes)) - last_system_time_sec = max(last_system_time_sec, system_time_sec) - newest_system_time_sec = system_time_sec - newest_system_message_type = header.message_type - else: - print_message(header, message, offset_bytes, format=options.format, bytes=data) - except (BrokenPipeError, KeyboardInterrupt) as e: - sys.exit(1) - - # Print the data summary. - if options.summary: - _logger.info('Input file: %s' % input_path) - _logger.info('Log ID: %s' % log_id) - if first_p1_time_sec is not None: - _logger.info('Duration (P1): %.1f seconds' % (last_p1_time_sec - first_p1_time_sec)) - else: - _logger.info('Duration (P1): unknown') - if first_system_time_sec is not None: - _logger.info('Duration (system): %.1f seconds' % (last_system_time_sec - first_system_time_sec)) - else: - _logger.info('Duration (system): unknown') - _logger.info('Total data read: %d B' % reader.get_bytes_read()) - _logger.info('Selected data size: %d B' % bytes_decoded) - _logger.info('') - print_summary_table(device_summary) - elif total_messages == 0: - _logger.warning('No valid FusionEngine messages found.') + p1_capture_main(default_display_mode='messages') if __name__ == "__main__": diff --git a/python/fusion_engine_client/messages/measurements.py b/python/fusion_engine_client/messages/measurements.py index 650d68f8..bdb2d7c5 100644 --- a/python/fusion_engine_client/messages/measurements.py +++ b/python/fusion_engine_client/messages/measurements.py @@ -1594,7 +1594,7 @@ def pack(self, buffer: bytes = None, offset: int = 0, return_buffer: bool = True system_time_cs >> 32, int(self.data_type)) - buffer += self.data + buffer[offset:offset + len(self.data)] = self.data offset += len(self.data) if return_buffer: @@ -1652,11 +1652,8 @@ def get_fe_content_payload(self) -> Optional[MessagePayload]: message_version=header.message_version) self._fe_content_payload = payload except Exception: - pass - - if self._fe_content_payload is None: - # Set to False instead of None so we don't try to unpack the header multiple times. - self._fe_content_payload = False + # Set to False instead of None so we don't try to unpack the header multiple times. + self._fe_content_payload = False if self._fe_content_payload == False: return None diff --git a/python/fusion_engine_client/messages/signal_def_gen.py b/python/fusion_engine_client/messages/signal_def_gen.py index 607dc1dc..8b8a5426 100644 --- a/python/fusion_engine_client/messages/signal_def_gen.py +++ b/python/fusion_engine_client/messages/signal_def_gen.py @@ -8,7 +8,7 @@ import subprocess import sys from textwrap import indent -from typing import Callable, Iterable, NamedTuple, Optional, TypeVar +from typing import Callable, Dict, NamedTuple, Optional, Type, TypeVar if __package__ is None or __package__ == "": root_dir = os.path.normpath(os.path.join(os.path.abspath(os.path.dirname(__file__)), '../..')) @@ -303,13 +303,13 @@ class EnumValue(NamedTuple): E = TypeVar('E', bound=IntEnum) -def get_enum_values(enum_type: type[E]) -> Optional[dict[E, EnumValue]]: +def get_enum_values(enum_type: Type[E]) -> Optional[Dict[E, EnumValue]]: '''! Parse contents from Python enums to be used to generate C++ enums. ''' source_lines = inspect.getsourcelines(enum_type) comment = [] - values: dict[E, EnumValue] = {} + values: Dict[E, EnumValue] = {} enum_value_names = {e.name for e in enum_type} for line in source_lines[0]: comment_match = _PYTHON_COMMENT_RE.match(line) @@ -361,7 +361,7 @@ def strip_python_comment(comment: Optional[str]) -> str: return re.sub(_PYTHON_COMMENT_PREFIX_RE, '', comment).strip() -def generate_cpp_enum_from_python(enum_type: type[IntEnum], int_type='uint8_t') -> str: +def generate_cpp_enum_from_python(enum_type: Type[IntEnum], int_type='uint8_t') -> str: '''! Generate a the full C++ enum source code from an existing Python enum. ''' @@ -377,7 +377,7 @@ def generate_cpp_enum_from_python(enum_type: type[IntEnum], int_type='uint8_t') return generate_cpp_enum_declaration(enum_type.__name__, comment, int_type, values_code) -def generate_cpp_to_string(enum_type: type[_GNSSSignalPartType], pretty_values: dict[str, str]) -> str: +def generate_cpp_to_string(enum_type: Type[_GNSSSignalPartType], pretty_values: Dict[str, str]) -> str: '''! Generate the C++ source code for the enum `to_string`, `operator<<` and `ToPrettyString` functions. ''' @@ -433,7 +433,7 @@ def generate_cpp_to_string(enum_type: type[_GNSSSignalPartType], pretty_values: }}''' -def get_gnss_enum_cpp_name(enum_type: type[_GNSSSignalPartType]) -> str: +def get_gnss_enum_cpp_name(enum_type: Type[_GNSSSignalPartType]) -> str: '''! Convert a Python signal part type to a C++ constant prefix: `SatelliteType` -> "SATELLITE_TYPE". ''' @@ -446,7 +446,7 @@ def _camel_case_split(identifier): return cpp_name -def generate_cpp_get_part(enum_type: type[_GNSSSignalPartType]) -> str: +def generate_cpp_get_part(enum_type: Type[_GNSSSignalPartType]) -> str: '''! Generate the C++ source code for getting the enum value of a component type of @ref GNSSSignalType. An example function would be `GetSatelliteTypePart`. @@ -489,7 +489,7 @@ def generate_cpp_constants_defs() -> str: '''! Generate the top constants portion C++ source code (shift and size constants). ''' - def _make_constants(enum_type: type[_GNSSSignalPartType]): + def _make_constants(enum_type: Type[_GNSSSignalPartType]): cpp_prefix = get_gnss_enum_cpp_name(enum_type) bit_packing = _get_gnss_enum_bit_packing(enum_type) # type: ignore return f'''\ @@ -503,7 +503,7 @@ def generate_cpp_signal_defs() -> str: '''! Generate all the generated C++ source code. This includes enum definitions and helper functions. ''' - def _make_functions(enum_type: type[_GNSSSignalPartType]): + def _make_functions(enum_type: Type[_GNSSSignalPartType]): if enum_type == GNSSSignalType: pretty_values = { k: _get_pretty_gnss_signal_type( diff --git a/python/fusion_engine_client/messages/signal_defs.py b/python/fusion_engine_client/messages/signal_defs.py index 95a99746..72eafc3e 100644 --- a/python/fusion_engine_client/messages/signal_defs.py +++ b/python/fusion_engine_client/messages/signal_defs.py @@ -1,5 +1,5 @@ import functools -from typing import NamedTuple, Optional, TypeAlias, TypeVar, Union +from typing import NamedTuple, Optional, Tuple, Type, TypeVar, Union from ..utils.enum_utils import IntEnum, enum_bitmask @@ -255,10 +255,10 @@ class _BitPacking(NamedTuple): SatelliteType.QZSS: QZSSSignalName, } -_GNSSSignalPartType: TypeAlias = SatelliteType | FrequencyBand | SignalName | GNSSComponent +_GNSSSignalPartType = Union[SatelliteType, FrequencyBand, SignalName, GNSSComponent] -def _get_gnss_enum_bit_packing(cls: type[_GNSSSignalPartType]) -> _BitPacking: +def _get_gnss_enum_bit_packing(cls: Type[_GNSSSignalPartType]) -> _BitPacking: '''! Get the bit packing for an enum component of @ref GNSSSignalType ''' @@ -269,7 +269,7 @@ def _get_gnss_enum_bit_packing(cls: type[_GNSSSignalPartType]) -> _BitPacking: _T = TypeVar('_T', SatelliteType, FrequencyBand, SignalName, GNSSComponent) -def _get_signal_part(signal: 'GNSSSignalType', cls: type[_T], raise_on_unrecognized: bool = True) -> _T: +def _get_signal_part(signal: 'GNSSSignalType', cls: Type[_T], raise_on_unrecognized: bool = True) -> _T: '''! Return the value for an enum component of @ref GNSSSignalType @@ -711,7 +711,7 @@ def get_satellite_hash(signal_hash: int) -> int: sv_hash = signal_hash | 0x0FFF0000 return sv_hash -def decode_signal_hash(signal_hash: int) -> tuple[SatelliteType, int, Optional[GNSSSignalType]]: +def decode_signal_hash(signal_hash: int) -> Tuple[SatelliteType, int, Optional[GNSSSignalType]]: """! @brief Decode an integer satellite/signal hash into its component parts: system, signal type, and PRN. @@ -738,7 +738,7 @@ def decode_signal_hash(signal_hash: int) -> tuple[SatelliteType, int, Optional[G return satellite_type, prn, signal_type -def encode_signal_hash(signal_info: GNSSSignalType | SatelliteType, prn) -> int: +def encode_signal_hash(signal_info: Union[GNSSSignalType, SatelliteType], prn) -> int: """! @brief Encode satellite/signal ID component parts into an integer hash. diff --git a/python/fusion_engine_client/parsers/file_index.py b/python/fusion_engine_client/parsers/file_index.py index c59bbe1c..08bdcaa2 100644 --- a/python/fusion_engine_client/parsers/file_index.py +++ b/python/fusion_engine_client/parsers/file_index.py @@ -293,6 +293,46 @@ def save(self, index_path: str, data_path: str): os.remove(index_path) raw_data.tofile(index_path) + def get_message_types(self, message_types: Union[int, IntEnum, MessagePayload, set, list, tuple], + invert: bool = False) -> 'FileIndex': + """! + @brief Get a subset of the contents for a set of message types. + + @param message_types The desired message types. + @param invert If `True`, exclude messages from `message_types` and return all others. + + @return Returns a _copy_ of this class, limited to the requested message types. + """ + # Convert singletons to a list. + if isinstance(message_types, int): + message_types = [MessageType(message_types, raise_on_unrecognized=False)] + elif isinstance(message_types, IntEnum): + message_types = [message_types] + elif MessagePayload.is_subclass(message_types): + message_types = [message_types.get_type()] + + # Convert type enums to integers. + if isinstance(message_types, (set, list, tuple)) and len(message_types) > 0: + if isinstance(next(iter(message_types)), IntEnum): + message_types = [int(k) for k in message_types] + elif MessagePayload.is_subclass(next(iter(message_types))): + message_types = [int(k.get_type()) for k in message_types] + + # Find all matching message types. + idx = np.isin(self._data['type'], message_types) + if invert: + idx = ~idx + + return FileIndex(data=self._data[idx], t0=self.t0) + + @classmethod + def _is_message_type_key(cls, key: Union[IntEnum, MessagePayload, set, list, tuple]) -> bool: + return (isinstance(key, IntEnum) or + MessagePayload.is_subclass(key) or + (isinstance(key, (set, list, tuple)) and len(key) > 0 and isinstance(next(iter(key)), IntEnum)) or + (isinstance(key, (set, list, tuple)) and len(key) > 0 and MessagePayload.is_subclass(next(iter(key))))) + + def get_time_range(self, start: Union[Timestamp, float] = None, stop: Union[Timestamp, float] = None, hint: str = None, time_range: TimeRange = None) -> 'FileIndex': """! @@ -352,25 +392,27 @@ def get_time_range(self, start: Union[Timestamp, float] = None, stop: Union[Time start_idx = find_first(self._data['time'] >= np.floor(start)) if start is not None else 0 end_idx = find_first(self._data['time'] >= stop) if stop is not None else len(self._data) - if start_idx < 0: - start_idx = 0 - - if end_idx < 0: - end_idx = len(self._data['time']) - - if hint == 'include_nans': - return FileIndex(data=self._data[start_idx:end_idx], t0=self.t0) - else: - idx = np.full_like(self._data['time'], False, dtype=bool) + # Corner case: if all messages with timestamps are >= stop (i.e., the log starts after the stop time), if + # there are some messages at the start of the log that do not have timestamps, find_first() will include + # them but we don't want that. For example, if stop is 4 and we have: + # {nan, 6, 7, 8} + # we expect end_idx = -1 (i.e., nothing in range), not end_idx = 1 (i.e., include the nan message). + nan_idx = np.isnan(self._data['time']) + if end_idx >= 1 and np.all(nan_idx[:end_idx]): + end_idx = -1 + + # Note: start_idx or end_idx == -1 indicates there was no data in the time range. + idx = np.full_like(self._data['time'], False, dtype=bool) + if start_idx >= 0 and end_idx >= 0: idx[start_idx:end_idx] = True - nan_idx = np.isnan(self._data['time']) + if hint in ('all_nans', 'remove_nans'): if hint == 'all_nans': idx[nan_idx] = True elif hint == 'remove_nans': idx[nan_idx] = False - else: - raise ValueError('Unrecognized control hint.') + elif hint != 'include_nans': + raise ValueError('Unrecognized control hint.') return FileIndex(data=self._data[idx], t0=self.t0) @@ -390,6 +432,13 @@ def __getattr__(self, key): raise AttributeError def __getitem__(self, key): + # If the key is a 2-tuple and the second argument is a string, treat it as a hint string. + if isinstance(key, tuple) and len(key) == 2 and isinstance(key[1], str): + hint = key[1] + key = key[0] + else: + hint = None + # No key specified (convenience case). if key is None: return copy.copy(self) @@ -400,19 +449,8 @@ def __getitem__(self, key): elif len(self._data) == 0: return FileIndex() # Return entries for a specific message type. - elif isinstance(key, IntEnum): - idx = self._data['type'] == key - return FileIndex(data=self._data[idx], t0=self.t0) - elif MessagePayload.is_subclass(key): - idx = self._data['type'] == key.get_type() - return FileIndex(data=self._data[idx], t0=self.t0) - # Return entries for a list of message types. - elif isinstance(key, (set, list, tuple)) and len(key) > 0 and isinstance(next(iter(key)), IntEnum): - idx = np.isin(self._data['type'], [int(k) for k in key]) - return FileIndex(data=self._data[idx], t0=self.t0) - elif isinstance(key, (set, list, tuple)) and len(key) > 0 and MessagePayload.is_subclass(next(iter(key))): - idx = np.isin(self._data['type'], [int(k.get_type()) for k in key]) - return FileIndex(data=self._data[idx], t0=self.t0) + elif self._is_message_type_key(key): + return self.get_message_types(message_types=key, invert=hint == 'invert') # Return a single element by index. elif isinstance(key, int): return FileIndex(data=self._data[key:(key + 1)], t0=self.t0) @@ -423,14 +461,16 @@ def __getitem__(self, key): # my_index[10:12:'remove_nans'] elif isinstance(key, slice) and (isinstance(key.start, (Timestamp, float)) or isinstance(key.stop, (Timestamp, float))): - hint = key.step + if key.step is not None: + hint = key.step if hint is not None and not isinstance(hint, str): raise ValueError('Step size not supported for time range slicing.') return self.get_time_range(start=key.start, stop=key.stop, hint=hint) # Key is a TimeRange object. Return a subset of the data. All nan elements (messages without P1 time) will be - # included in the results. + # included in the results by default, unless hint is set to 'remove_nans': + # my_index[TimeRange(...), 'remove_nans'] elif isinstance(key, TimeRange): - return self.get_time_range(time_range=key, hint='include_nans') + return self.get_time_range(time_range=key, hint=hint) # Key is an index slice or a list of individual element indices. Return a subset of the data. elif isinstance(key, slice): return FileIndex(data=self._data[key], t0=self.t0) diff --git a/python/fusion_engine_client/parsers/mixed_log_reader.py b/python/fusion_engine_client/parsers/mixed_log_reader.py index 8efbb2b0..844c0333 100644 --- a/python/fusion_engine_client/parsers/mixed_log_reader.py +++ b/python/fusion_engine_client/parsers/mixed_log_reader.py @@ -25,7 +25,9 @@ class MixedLogReader(object): def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool = False, save_index: bool = True, ignore_index: bool = False, num_threads: int = None, max_bytes: int = None, - time_range: TimeRange = None, message_types: Union[Iterable[MessageType], MessageType] = None, + time_range: TimeRange = None, + message_types: Union[Iterable[MessageType], MessageType] = None, + invert_message_types: bool = False, source_ids: Optional[Iterable[int]] = None, return_header: bool = True, return_payload: bool = True, return_bytes: bool = False, return_offset: bool = False, return_message_index: bool = False): @@ -50,6 +52,8 @@ def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool = be read. See @ref TimeRange for more details. @param message_types A list of one or more @ref fusion_engine_client.messages.defs.MessageType "MessageTypes" to be returned. If `None` or an empty list, read all available messages. + @param invert_message_types If `True`, invert the `message_types` list and return messages _not_ specified in + the list. @param source_ids An optional list message source identifiers to be returned. If `None`, read messages from available source identifiers. @param return_header If `True`, return the decoded @ref MessageHeader for each message. @@ -83,6 +87,8 @@ def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool = if len(self.message_types) == 0: self.message_types = None + self.invert_message_types = invert_message_types + # The source IDs requested by the user. If none were requested, then use all of them. if source_ids is None: self.requested_source_ids = None @@ -130,10 +136,9 @@ def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool = self._populate_available_source_ids() self.filter_in_place(None, source_ids=self.requested_source_ids) - self.filter_in_place(self.message_types) + self.filter_in_place(self.message_types, invert_message_types=self.invert_message_types) self.filter_in_place(self.time_range) - self.index = self._original_index[self.message_types][self.time_range] self.filtered_message_types = len(np.unique(self._original_index.type)) != \ len(np.unique(self.index.type)) @@ -285,7 +290,8 @@ def _read_next(self, require_p1_time=False, require_system_time=False, force_eof header.validate_crc(data) # Verify that source ID is correct. - if self.requested_source_ids is not None and header.source_identifier not in self.requested_source_ids: + if (self.requested_source_ids is not None and len(self.requested_source_ids) > 0 and + header.source_identifier not in self.requested_source_ids): continue message_length_bytes = MessageHeader.calcsize() + header.payload_size_bytes @@ -355,7 +361,9 @@ def _read_next(self, require_p1_time=False, require_system_time=False, force_eof # self.time_range are _only_ valid if we are _not_ using an index, so this may end up incorrectly # filtering out some messages as unwanted. if self.index is None: - if self.message_types is not None and header.message_type not in self.message_types: + if (self.message_types is not None and + ((not self.invert_message_types and header.message_type not in self.message_types) or + (self.invert_message_types and header.message_type in self.message_types))): self.logger.trace("Message type not requested. Skipping.", depth=1) continue elif self.remove_invalid_p1_time and not p1_time: @@ -465,7 +473,7 @@ def parse_entry_at_index(self, index: file_index.FileIndexEntry): def clear_filters(self): self.filter_in_place(key=None, clear_existing=True) - def filter_in_place(self, key, clear_existing: Union[bool, str] = False, + def filter_in_place(self, key, clear_existing: Union[bool, str] = False, invert_message_types: bool = False, source_ids: Optional[Iterable[int]] = None): """! @brief Limit the returned messages by type or time. @@ -546,7 +554,10 @@ def filter_in_place(self, key, clear_existing: Union[bool, str] = False, pass # If we have an index file available, reduce the index to the requested criteria. elif self.index is not None: - self.index = self.index[key] + if self.index._is_message_type_key(key): + self.index = self.index.get_message_types(key, invert=invert_message_types) + else: + self.index = self.index[key] self.filtered_message_types = len(np.unique(self._original_index.type)) != \ len(np.unique(self.index.type)) # Otherwise, store the criteria and apply them while reading. @@ -554,6 +565,7 @@ def filter_in_place(self, key, clear_existing: Union[bool, str] = False, # Return entries for a specific message type. if isinstance(key, MessageType): self.message_types = set((key,)) + self.invert_message_types = invert_message_types self.filtered_message_types = True elif MessagePayload.is_subclass(key): self.message_types = set((key.get_type(),)) @@ -565,6 +577,7 @@ def filter_in_place(self, key, clear_existing: Union[bool, str] = False, self.message_types = new_message_types else: self.message_types = self.message_types & new_message_types + self.invert_message_types = invert_message_types self.filtered_message_types = True elif isinstance(key, (set, list, tuple)) and len(key) > 0 and MessagePayload.is_subclass(next(iter(key))): new_message_types = set([t.get_type() for t in key if t is not None]) @@ -572,6 +585,7 @@ def filter_in_place(self, key, clear_existing: Union[bool, str] = False, self.message_types = new_message_types else: self.message_types = self.message_types & new_message_types + self.invert_message_types = invert_message_types self.filtered_message_types = True # Key is a slice in time. Return a subset of the data. elif isinstance(key, slice) and (isinstance(key.start, (Timestamp, float)) or diff --git a/python/fusion_engine_client/utils/enum_utils.py b/python/fusion_engine_client/utils/enum_utils.py index 0583fbec..b32a1b3d 100644 --- a/python/fusion_engine_client/utils/enum_utils.py +++ b/python/fusion_engine_client/utils/enum_utils.py @@ -114,8 +114,8 @@ def to_string(self, include_value=True, print_hex=False): return str(self) @classmethod - def find_matching_values(cls, pattern: Union[int, str, List[int], List[str]], raise_on_unrecognized: bool = False, - prefix: str = None, print_func=None) -> Set['IntEnum']: + def find_matching_values(cls, pattern: Union[int, str, List[int], List[str]], on_unrecognized: str = 'warn', + prefix: str = None, allow_multiple: bool = True, print_func=None) -> Set['IntEnum']: """! @brief Find one or more enum values that match the specified pattern(s). @@ -142,9 +142,12 @@ class MyType(IntEnum): wildcards are specified and multiple enums match, a single result will be returned if there is an exact match (e.g., `thing_a` will match to `MyType.THING_ABC`, not `MyType.THING_DEF`). All matches are case-insensitive. - @param raise_on_unrecognized If `True`, raise an exception for any unrecognized integer values. Unrecognized - values will automatically create new enum entries by default. + @param on_unrecognized Specifies how to handle unrecognized integer values: + - 'raise' - Raise an exception + - 'warn' - Print a warning, but create new enum entries + - 'ignore' - Create new enum entries, do not warn @param prefix If specified, prepend the prefix to all string values if they do not already start with it. + @param allow_multiple If `True`, allow multiple matches when the pattern contains a wildcard. @return A set containing the matching enum values. """ @@ -163,6 +166,8 @@ class MyType(IntEnum): # ['VersionInfoMessage', 'PoseMessage,GNSS*'] -> # ['VersionInfoMessage', 'PoseMessage', 'GNSS*'] requested_types = [p.strip() for entry in patterns for p in entry.split(',')] + if len(requested_types) > 1 and not allow_multiple: + raise ValueError("Multiple type specifiers not permitted.") # Now find matches to each pattern. result = set() @@ -173,15 +178,19 @@ class MyType(IntEnum): int_val = int(pattern, base=16) else: int_val = int(pattern) - enum_val = cls(int_val, raise_on_unrecognized=raise_on_unrecognized) + enum_val = cls(int_val, raise_on_unrecognized=on_unrecognized == 'raise') result.add(enum_val) - if str(enum_val) == '(Unrecognized)' and print_func: + if str(enum_val) == '(Unrecognized)' and print_func and on_unrecognized == 'warn': print_func(f"{pattern} is an unknown {cls.__name__} value.") except: + original_pattern = pattern if prefix is not None and not pattern.startswith(prefix): - pattern = f'{prefix}{pattern}' + pattern = f'{prefix}*{pattern}' - allow_multiple = '*' in pattern + if allow_multiple: + has_wildcard = '*' in pattern + else: + has_wildcard = False re_pattern = pattern.replace('*', '.*') # if pattern[0] != '^': # re_pattern = r'.*' + re_pattern @@ -192,14 +201,18 @@ class MyType(IntEnum): matched_types = [v for k, v in cls._member_map_.items() if re.match(re_pattern, k, flags=re.IGNORECASE)] if len(matched_types) == 0 and print_func: - print_func("No message types matching pattern '%s'." % pattern) + print_func("No message types matching pattern '%s'." % original_pattern) continue # If there are too many matches, fail. - if len(matched_types) > 1 and not allow_multiple: - raise ValueError("Pattern '%s' matches multiple message types:%s\n\nAdd a wildcard (%s*) to display " - "all matching types." % - (pattern, ''.join(['\n %s' % c for c in cls._member_map_.keys()]), pattern)) + if len(matched_types) > 1 and not has_wildcard: + if allow_multiple: + wildcard_str = f"\n\nAdd a wildcard ({original_pattern}*) to display all matching types." + else: + wildcard_str = '' + raise ValueError("Pattern '%s' matches multiple message types:%s%s" % + (original_pattern, ''.join(['\n %s' % c for c in matched_types]), + wildcard_str)) # Otherwise, update the set of message types. else: result.update(matched_types) diff --git a/python/fusion_engine_client/utils/log.py b/python/fusion_engine_client/utils/log.py index 87c3b0ae..465fc5be 100644 --- a/python/fusion_engine_client/utils/log.py +++ b/python/fusion_engine_client/utils/log.py @@ -2,6 +2,7 @@ import glob import json import os +import re from . import trace as logging from ..messages import MessageType @@ -45,7 +46,7 @@ DEFAULT_LOG_BASE_DIR = os.path.expanduser("~/point_one/logs") -def define_cli_arguments(parser_group): +def define_cli_arguments(parser_group, define_log=True): parser_group.add_argument( '--ignore-index', action=ExtendedBooleanAction, help="If set, do not load the .p1i index file corresponding with the .p1log data file. If specified and a " @@ -54,13 +55,30 @@ def define_cli_arguments(parser_group): parser_group.add_argument( '--log-base-dir', metavar='DIR', default=DEFAULT_LOG_BASE_DIR, help="The base directory containing FusionEngine logs to be searched if a log pattern is specified.") - parser_group.add_argument( - 'log', - help="The log to be read. May be one of:\n" - "- The path to a .p1log file or a file containing FusionEngine messages and other content\n" - "- The path to a FusionEngine log directory\n" - "- A pattern matching a FusionEngine log directory under the specified base directory " - "(see find_fusion_engine_log() and --log-base-dir)") + if define_log: + parser_group.add_argument( + 'log', + help="The log to be read. May be one of:\n" + "- The path to a .p1log file or a file containing FusionEngine messages and other content\n" + "- The path to a FusionEngine log directory\n" + "- A pattern matching a FusionEngine log directory under the specified base directory " + "(see find_fusion_engine_log() and --log-base-dir)") + + +def is_possible_log_pattern(pattern: str) -> bool: + """! + @brief Check if a string is a possible log pattern: a full or partial log hash, or the path to a log directory. + + @param pattern The pattern to test. + + @return `True` if the pattern may be a log hash or is a directory. + """ + if os.path.isdir(pattern): + return True + elif re.match(r'^[a-z0-9]+$', pattern) and not os.path.isfile(pattern): + return True + else: + return False def find_log_by_pattern(pattern, log_base_dir=DEFAULT_LOG_BASE_DIR, allow_multiple=False, skip_empty_files=True, diff --git a/python/fusion_engine_client/utils/print_utils.py b/python/fusion_engine_client/utils/print_utils.py index b9dfcbba..f0cf04ae 100644 --- a/python/fusion_engine_client/utils/print_utils.py +++ b/python/fusion_engine_client/utils/print_utils.py @@ -26,11 +26,11 @@ def add_print_format_argument(parser: argparse._ActionsContainer, *arg_names): "- oneline-binary - Use `oneline-detailed` format, but include the binary representation of each message\n" "- oneline-binary-payload - Like `oneline-binary`, but exclude the message header from the binary") -def add_wrapped_data_mode_argument(parser: argparse._ActionsContainer, *arg_names): +def add_wrapped_data_mode_argument(parser: argparse._ActionsContainer, *arg_names, default='parent'): parser.add_argument( *arg_names, choices=['auto', 'all', 'parent', 'content'], - default='parent', + default=default, help="Specify the way in which InputDataWrapper messages should be handled:\n" "- auto - Use 'all' mode unless specific message types are specified, in which case use 'content' mode " "and only print the wrapped contents.\n" @@ -73,7 +73,7 @@ def print_message(header: MessageHeader, contents: Union[MessagePayload, bytes], if logger is None: logger = _logger - is_requested = message_types is None or header.message_type in message_types + is_requested = message_types is None or len(message_types) == 0 or header.message_type in message_types if header.message_type == MessageType.INPUT_DATA_WRAPPER: wrapped_fe_header = contents.get_fe_content_header() if is_requested: diff --git a/python/fusion_engine_client/utils/socket_timestamping.py b/python/fusion_engine_client/utils/socket_timestamping.py index c768e8aa..d178071d 100755 --- a/python/fusion_engine_client/utils/socket_timestamping.py +++ b/python/fusion_engine_client/utils/socket_timestamping.py @@ -11,10 +11,10 @@ import socket import struct import sys -from typing import BinaryIO, Optional, Tuple, TypeAlias, Union +from typing import BinaryIO, List, Optional, Tuple, Union -_CMSG: TypeAlias = tuple[int, int, bytes] +_CMSG = Tuple[int, int, bytes] TIMESTAMP_FILE_ENDING = '.data_times.bin' @@ -71,7 +71,7 @@ SOF_TIMESTAMPING_RAW_HARDWARE = 1 << 6 -def parse_timestamps_from_ancdata(ancdata: list[_CMSG]) -> tuple[Optional[float], Optional[float], Optional[float]]: +def parse_timestamps_from_ancdata(ancdata: List[_CMSG]) -> Tuple[Optional[float], Optional[float], Optional[float]]: """ Parse timestamps from ancillary data. See: https://docs.kernel.org/networking/timestamping.html#scm-timestamping-records diff --git a/python/fusion_engine_client/utils/time_range.py b/python/fusion_engine_client/utils/time_range.py index fa67c1c4..24cb4865 100644 --- a/python/fusion_engine_client/utils/time_range.py +++ b/python/fusion_engine_client/utils/time_range.py @@ -191,6 +191,9 @@ def is_specified(self) -> bool: def in_range_started(self) -> bool: return self._in_range_started + def in_range_ended(self) -> bool: + return self._in_range_ended + def is_in_range(self, message: Union[MessagePayload, bytes], return_timestamps: bool = False) ->\ Union[bool, Tuple[bool, Timestamp, float]]: """! diff --git a/python/fusion_engine_client/utils/transport_utils.py b/python/fusion_engine_client/utils/transport_utils.py index 4a348fb1..bec18845 100644 --- a/python/fusion_engine_client/utils/transport_utils.py +++ b/python/fusion_engine_client/utils/transport_utils.py @@ -1,3 +1,4 @@ +import os import re import socket import sys @@ -79,10 +80,12 @@ class FileTransport: def __init__(self, input: Union[str, BinaryIO, TextIO] = None, output: Union[str, BinaryIO, TextIO] = None): # If input is a path, open the specified file. If '-', read from stdin. self.close_input = False + self.is_stdin = False if isinstance(input, str): if input in ('', '-'): self.input = sys.stdin.buffer self.input_path = 'stdin' + self.is_stdin = True else: self.input = open(input, 'rb') self.input_path = input @@ -102,10 +105,12 @@ def __init__(self, input: Union[str, BinaryIO, TextIO] = None, output: Union[str # If output is a path, open the specified file. If '-', write to stdout. self.close_output = False + self.is_stdout = False if isinstance(output, str): if output in ('', '-'): self.output = sys.stdout.buffer self.output_path = 'stdout' + self.is_stdout = True else: self.output = open(output, 'wb') self.output_path = output @@ -230,9 +235,9 @@ def create_transport(descriptor: str, timeout_sec: float = None, print_func: Cal if descriptor in ('', '-'): descriptor = 'file://-' - m = re.match(r'^(?:file://)?([a-zA-Z0-9-_./]+)$', descriptor) + m = re.match(r'^(?:file://)?((?:~/)?[a-zA-Z0-9-_./]+)$', descriptor) if m: - path = m.group(1) + path = os.path.expanduser(m.group(1)) if mode == 'both': if path != '-': raise ValueError("Cannot open a file for both read and write access.") diff --git a/python/requirements-dev.txt b/python/requirements-dev.txt new file mode 100644 index 00000000..d5751fb1 --- /dev/null +++ b/python/requirements-dev.txt @@ -0,0 +1,2 @@ +clang-format +pytest diff --git a/python/tests/test_file_index.py b/python/tests/test_file_index.py index 907a2ca9..ab6b1bd3 100644 --- a/python/tests/test_file_index.py +++ b/python/tests/test_file_index.py @@ -65,6 +65,12 @@ def test_type_slice(): assert (pose_index.offset == [e[2] for e in raw]).all() assert (pose_index.message_index == [e[3] for e in raw]).all() + pose_index = index[MessageType.POSE, 'invert'] + raw = [e for e in RAW_DATA if e[1] != MessageType.POSE] + assert len(pose_index) == len(raw) + assert (pose_index.offset == [e[2] for e in raw]).all() + assert (pose_index.message_index == [e[3] for e in raw]).all() + def test_index_slice(): index = FileIndex(data=RAW_DATA) @@ -188,6 +194,28 @@ def _lower_bound(time): assert (sliced_index.offset == [e[2] for e in raw]).all() assert (sliced_index.message_index == [e[3] for e in raw]).all() + # Start time beyond end of data. + sliced_index = index[TimeRange(start=1000.0, absolute=True)] + assert len(sliced_index) == 0 + + # End time before start of data. + sliced_index = index[TimeRange(end=-1.0, absolute=True)] + assert len(sliced_index) == 0 + + # Start time beyond end of data, but "all_nans" requested. + sliced_index = index.get_time_range(time_range=TimeRange(start=1000.0, absolute=True), hint='all_nans') + raw = [m for m in RAW_DATA if m[0] is None] + assert _test_time(sliced_index.time, raw) + assert (sliced_index.offset == [e[2] for e in raw]).all() + assert (sliced_index.message_index == [e[3] for e in raw]).all() + + # Same using getitem hint syntax. + sliced_index = index[TimeRange(start=1000.0, absolute=True), 'all_nans'] + raw = [m for m in RAW_DATA if m[0] is None] + assert _test_time(sliced_index.time, raw) + assert (sliced_index.offset == [e[2] for e in raw]).all() + assert (sliced_index.message_index == [e[3] for e in raw]).all() + def test_time_slice_no_p1_time(): def _lower_bound(time): diff --git a/python/tests/test_mixed_log_reader.py b/python/tests/test_mixed_log_reader.py index 3f795877..ec268236 100644 --- a/python/tests/test_mixed_log_reader.py +++ b/python/tests/test_mixed_log_reader.py @@ -168,6 +168,13 @@ def test_read_pose_constructor(self, data_path): reader = MixedLogReader(str(data_path), message_types=(PoseMessage,)) self._check_results(reader, expected_messages) + def test_exclude_pose(self, data_path): + messages = self._generate_mixed_data(data_path) + expected_messages = [m for m in messages if not isinstance(m, PoseMessage)] + + reader = MixedLogReader(str(data_path), message_types=(PoseMessage,), invert_message_types=True) + self._check_results(reader, expected_messages) + def test_read_pose_mixed_binary(self, data_path): messages = self._generate_mixed_data_with_binary(data_path) expected_messages = [m for m in messages if isinstance(m, PoseMessage)] diff --git a/python/tests/test_p1_capture.py b/python/tests/test_p1_capture.py new file mode 100644 index 00000000..2022224d --- /dev/null +++ b/python/tests/test_p1_capture.py @@ -0,0 +1,585 @@ +import argparse + +import pytest + +from fusion_engine_client.messages import ( + EventNotificationMessage, + GNSSInfoMessage, + InputDataType, + InputDataWrapperMessage, + MessageType, + PoseMessage, + Timestamp, +) +from fusion_engine_client.parsers import FusionEngineEncoder, MixedLogReader +from fusion_engine_client.applications.p1_capture import Application + + +def make_options(**overrides): + defaults = dict( + input=None, + output=None, + output_format=None, + display='none', + summary=False, + verbose=0, + message_type=None, + invert=False, + unwrap=None, + wrapped_data_format='parent', + wrapped_data_type=None, + source_identifier=None, + time=None, + max=None, + skip=0, + progress=False, + log_base_dir=None, + log_type=None, + ignore_index=False, + log_timestamp_source=None, + ) + defaults.update(overrides) + return argparse.Namespace(**defaults) + + +class TestApplication: + @pytest.fixture + def tmp(self, tmp_path): + return tmp_path + + # ------------------------------------------------------------------------- + # Helpers + + def _write_fe_messages(self, path, specs): + """Write FusionEngine messages to *path*. + + Each entry in *specs* is one of: + - bytes: written verbatim (raw binary, no FE framing) + - (MessageClass, p1_time_sec): message with the given P1 time, + source_identifier=0 + - (MessageClass, p1_time_sec, source_identifier): explicit source + """ + encoder = FusionEngineEncoder() + with open(str(path), 'wb') as f: + for item in specs: + if isinstance(item, bytes): + f.write(item) + continue + msg_cls = item[0] + p1_time = item[1] + src_id = item[2] if len(item) > 2 else 0 + msg = msg_cls() + if p1_time is not None and hasattr(msg, 'p1_time'): + msg.p1_time = Timestamp(float(p1_time)) + f.write(encoder.encode_message(msg, source_identifier=src_id)) + + def _write_wrapper_messages(self, path, specs): + """Write InputDataWrapperMessage records to *path*. + + Each entry: (InputDataType, payload_bytes, system_time_ns) + """ + encoder = FusionEngineEncoder() + with open(str(path), 'wb') as f: + for dtype, payload, ts_ns in specs: + msg = InputDataWrapperMessage() + msg.system_time_ns = ts_ns + msg.data_type = dtype + msg.data = payload + f.write(encoder.encode_message(msg)) + + def _read_output(self, path): + """Return list of (MessageType, source_identifier) from a .p1log file.""" + reader = MixedLogReader(str(path)) + return [(h.message_type, h.source_identifier) for h, _ in reader] + + def _run(self, input_path, **kwargs): + opts = make_options(input=str(input_path), **kwargs) + app = Application(options=opts) + app.process_input() + return app + + # ------------------------------------------------------------------------- + # File reading + + def test_read_pure_fe_file(self, tmp): + path = tmp / 'input.p1log' + self._write_fe_messages(path, [ + (PoseMessage, 0.0), + (PoseMessage, 1.0), + (GNSSInfoMessage, 1.0), + ]) + app = self._run(path) + assert app.messages_sent == 3 + + def test_read_mixed_binary_file(self, tmp): + """Non-FE binary interspersed with FE messages must not count as messages.""" + path = tmp / 'input.p1log' + self._write_fe_messages(path, [ + b'\xde\xad\xbe\xef', + (PoseMessage, 0.0), + b'\xca\xfe\xba\xbe', + (GNSSInfoMessage, 1.0), + b'\x00\x01\x02\x03', + ]) + app = self._run(path) + assert app.messages_sent == 2 + + def test_read_empty_file(self, tmp): + path = tmp / 'empty.p1log' + path.write_bytes(b'') + app = self._run(path) + assert app.messages_sent == 0 + + # ------------------------------------------------------------------------- + # Message type filtering + + def test_message_type_no_filter_passes_all(self, tmp): + path = tmp / 'input.p1log' + self._write_fe_messages(path, [ + (PoseMessage, 0.0), + (PoseMessage, 1.0), + (GNSSInfoMessage, 1.0), + ]) + app = self._run(path) + assert app.messages_sent == 3 + + def test_message_type_single_type(self, tmp): + path = tmp / 'input.p1log' + out = tmp / 'out.p1log' + self._write_fe_messages(path, [ + (PoseMessage, 0.0), + (PoseMessage, 1.0), + (GNSSInfoMessage, 1.0), + ]) + app = self._run(path, output=str(out), output_format='p1log', + message_type=['Pose']) + assert app.messages_sent == 2 + assert all(t == MessageType.POSE for t, _ in self._read_output(out)) + + def test_message_type_comma_list(self, tmp): + path = tmp / 'input.p1log' + out = tmp / 'out.p1log' + self._write_fe_messages(path, [ + (PoseMessage, 0.0), + (GNSSInfoMessage, 1.0), + (EventNotificationMessage, None), + ]) + app = self._run(path, output=str(out), output_format='p1log', + message_type=['Pose,GNSSInfo']) + assert app.messages_sent == 2 + types = {t for t, _ in self._read_output(out)} + assert types == {MessageType.POSE, MessageType.GNSS_INFO} + + def test_message_type_repeated_flag(self, tmp): + path = tmp / 'input.p1log' + out = tmp / 'out.p1log' + self._write_fe_messages(path, [ + (PoseMessage, 0.0), + (GNSSInfoMessage, 1.0), + (EventNotificationMessage, None), + ]) + app = self._run(path, output=str(out), output_format='p1log', + message_type=['Pose', 'GNSSInfo']) + assert app.messages_sent == 2 + types = {t for t, _ in self._read_output(out)} + assert types == {MessageType.POSE, MessageType.GNSS_INFO} + + def test_message_type_wildcard(self, tmp): + path = tmp / 'input.p1log' + out = tmp / 'out.p1log' + self._write_fe_messages(path, [ + (PoseMessage, 0.0), + (GNSSInfoMessage, 1.0), + ]) + # GNSS* should match GNSSInfo but not Pose + app = self._run(path, output=str(out), output_format='p1log', + message_type=['GNSS*']) + assert app.messages_sent >= 1 + types = {t for t, _ in self._read_output(out)} + assert MessageType.POSE not in types + assert MessageType.GNSS_INFO in types + + def test_message_type_unknown_name_exits(self, tmp): + path = tmp / 'input.p1log' + path.write_bytes(b'') + with pytest.raises(SystemExit) as exc: + Application(options=make_options(input=str(path), + message_type=['NoSuchType_XYZ'])) + assert exc.value.code == 1 + + # ------------------------------------------------------------------------- + # Source ID filtering + + def test_source_id_no_filter_passes_all(self, tmp): + path = tmp / 'input.p1log' + # 3 messages per source + self._write_fe_messages(path, [ + (PoseMessage, float(i), src) + for src in range(3) + for i in range(3) + ]) + app = self._run(path) + assert app.messages_sent == 9 + + def test_source_id_single(self, tmp): + path = tmp / 'input.p1log' + self._write_fe_messages(path, [ + (PoseMessage, float(i), src) + for src in range(3) + for i in range(3) + ]) + app = self._run(path, source_identifier=['1']) + assert app.messages_sent == 3 + + def test_source_id_multiple(self, tmp): + path = tmp / 'input.p1log' + self._write_fe_messages(path, [ + (PoseMessage, float(i), src) + for src in range(3) + for i in range(3) + ]) + app = self._run(path, source_identifier=['0', '2']) + assert app.messages_sent == 6 + + def test_source_id_non_integer_exits(self, tmp): + path = tmp / 'input.p1log' + path.write_bytes(b'') + with pytest.raises(SystemExit) as exc: + Application(options=make_options(input=str(path), + source_identifier=['abc'])) + assert exc.value.code == 1 + + def test_source_id_combined_with_message_type(self, tmp): + """Only messages matching both the type filter and source ID are forwarded.""" + path = tmp / 'input.p1log' + out = tmp / 'out.p1log' + self._write_fe_messages(path, [ + (PoseMessage, 0.0, 0), + (PoseMessage, 1.0, 1), + (GNSSInfoMessage, 1.0, 1), + ]) + app = self._run(path, output=str(out), output_format='p1log', + message_type=['Pose'], source_identifier=['1']) + assert app.messages_sent == 1 + results = self._read_output(out) + assert all(t == MessageType.POSE for t, _ in results) + assert all(s == 1 for _, s in results) + + # ------------------------------------------------------------------------- + # Time range filtering + # + # Relative time ranges with a defined start are unreliable when reading + # from a file: the Application passes the same TimeRange object to both + # MixedLogReader (for index-level pre-filtering) and _apply_filters (for + # per-message filtering), so the stateful TimeRange is consumed twice. + # Absolute time ranges and open-start relative ranges are unaffected and + # are tested here. + + def test_time_range_absolute(self, tmp): + path = tmp / 'input.p1log' + self._write_fe_messages(path, [(PoseMessage, float(i)) for i in range(6)]) + app = self._run(path, time='1:3:abs') + assert app.messages_sent == 2 # t=1, t=2 + + def test_time_range_absolute_open_start(self, tmp): + path = tmp / 'input.p1log' + self._write_fe_messages(path, [(PoseMessage, float(i)) for i in range(6)]) + app = self._run(path, time=':3:abs') + assert app.messages_sent == 3 # t=0, t=1, t=2 + + def test_time_range_absolute_open_end(self, tmp): + path = tmp / 'input.p1log' + self._write_fe_messages(path, [(PoseMessage, float(i)) for i in range(6)]) + app = self._run(path, time='3::abs') + assert app.messages_sent == 3 # t=3, t=4, t=5 + + def test_time_range_relative_open_start(self, tmp): + """Open-start relative range works correctly with file input.""" + path = tmp / 'input.p1log' + self._write_fe_messages(path, [(PoseMessage, float(i)) for i in range(6)]) + app = self._run(path, time=':2') + assert app.messages_sent == 2 # t=0, t=1 + + def test_time_range_no_matches(self, tmp): + path = tmp / 'input.p1log' + self._write_fe_messages(path, [(PoseMessage, float(i)) for i in range(6)]) + app = self._run(path, time='100:200:abs') + assert app.messages_sent == 0 + + def test_time_range_combined_with_message_type(self, tmp): + path = tmp / 'input.p1log' + out = tmp / 'out.p1log' + self._write_fe_messages(path, [ + (PoseMessage, 0.0), + (GNSSInfoMessage, 0.0), + (PoseMessage, 1.0), + (GNSSInfoMessage, 1.0), + (PoseMessage, 5.0), + ]) + app = self._run(path, output=str(out), output_format='p1log', + message_type=['Pose'], time=':2:abs') + assert app.messages_sent == 2 # Pose at t=0 and t=1 + types = {t for t, _ in self._read_output(out)} + assert types == {MessageType.POSE} + + # ------------------------------------------------------------------------- + # InputDataWrapper filtering + + def test_input_data_wrapper_no_filter(self, tmp): + path = tmp / 'input.p1log' + self._write_wrapper_messages(path, [ + (InputDataType.M_TYPE_RTCM3_UNKNOWN, b'rtcm', 0), + (InputDataType.M_TYPE_RTCM3_UNKNOWN, b'rtcm', 1_000_000_000), + (InputDataType.M_TYPE_EXTERNAL_UNFRAMED_GNSS, b'gnss', 2_000_000_000), + ]) + app = self._run(path) + assert app.messages_sent == 3 + + def test_input_data_wrapper_filter_by_data_type(self, tmp): + path = tmp / 'input.p1log' + self._write_wrapper_messages(path, [ + (InputDataType.M_TYPE_RTCM3_UNKNOWN, b'rtcm_0', 0), + (InputDataType.M_TYPE_RTCM3_UNKNOWN, b'rtcm_1', 1_000_000_000), + (InputDataType.M_TYPE_EXTERNAL_UNFRAMED_GNSS, b'gnss', 2_000_000_000), + ]) + app = self._run(path, wrapped_data_type=['RTCM3_UNKNOWN']) + assert app.messages_sent == 2 + + def test_input_data_wrapper_invert_data_type(self, tmp): + path = tmp / 'input.p1log' + self._write_wrapper_messages(path, [ + (InputDataType.M_TYPE_RTCM3_UNKNOWN, b'rtcm_0', 0), + (InputDataType.M_TYPE_RTCM3_UNKNOWN, b'rtcm_1', 1_000_000_000), + (InputDataType.M_TYPE_EXTERNAL_UNFRAMED_GNSS, b'gnss', 2_000_000_000), + ]) + app = self._run(path, wrapped_data_type=['RTCM3_UNKNOWN'], invert=True) + assert app.messages_sent == 1 + + def test_unwrap_extracts_inner_bytes(self, tmp): + path = tmp / 'input.p1log' + out = tmp / 'out.bin' + payload1 = b'rtcm_packet_one' + payload2 = b'rtcm_packet_two' + self._write_wrapper_messages(path, [ + (InputDataType.M_TYPE_RTCM3_UNKNOWN, payload1, 0), + (InputDataType.M_TYPE_RTCM3_UNKNOWN, payload2, 1_000_000_000), + ]) + self._run(path, unwrap='RTCM3_UNKNOWN', output=str(out)) + assert out.read_bytes() == payload1 + payload2 + + def test_unwrap_filters_to_matching_data_type(self, tmp): + path = tmp / 'input.p1log' + out = tmp / 'out.bin' + rtcm_payload = b'rtcm_bytes' + gnss_payload = b'gnss_bytes' + self._write_wrapper_messages(path, [ + (InputDataType.M_TYPE_RTCM3_UNKNOWN, rtcm_payload, 0), + (InputDataType.M_TYPE_EXTERNAL_UNFRAMED_GNSS, gnss_payload, 1_000_000_000), + (InputDataType.M_TYPE_RTCM3_UNKNOWN, rtcm_payload, 2_000_000_000), + ]) + self._run(path, unwrap='RTCM3_UNKNOWN', output=str(out)) + assert out.read_bytes() == rtcm_payload + rtcm_payload + + def test_unwrap_requires_output_exits(self, tmp): + path = tmp / 'input.p1log' + path.write_bytes(b'') + with pytest.raises(SystemExit) as exc: + Application(options=make_options(input=str(path), unwrap='RTCM3_UNKNOWN', + output=None)) + assert exc.value.code == 1 + + def test_unwrap_rejects_message_type_exits(self, tmp): + path = tmp / 'input.p1log' + path.write_bytes(b'') + with pytest.raises(SystemExit) as exc: + Application(options=make_options(input=str(path), unwrap='RTCM3_UNKNOWN', + message_type=['Pose'], + output=str(tmp / 'out.bin'))) + assert exc.value.code == 1 + + def test_unwrap_rejects_data_type_exits(self, tmp): + path = tmp / 'input.p1log' + path.write_bytes(b'') + with pytest.raises(SystemExit) as exc: + Application(options=make_options(input=str(path), unwrap='RTCM3_UNKNOWN', + wrapped_data_type=['EXTERNAL_UNFRAMED_GNSS'], + output=str(tmp / 'out.bin'))) + assert exc.value.code == 1 + + # ------------------------------------------------------------------------- + # Output formats + + def test_output_p1log_contains_only_fe_messages(self, tmp): + """p1log output strips non-FE binary and contains exactly the FE messages.""" + path = tmp / 'input.p1log' + out = tmp / 'out.p1log' + self._write_fe_messages(path, [ + b'\xde\xad\xbe\xef', + (PoseMessage, 0.0), + b'\xca\xfe', + (GNSSInfoMessage, 1.0), + ]) + app = self._run(path, output=str(out), output_format='p1log') + assert app.messages_sent == 2 + types = [t for t, _ in self._read_output(out)] + assert types == [MessageType.POSE, MessageType.GNSS_INFO] + + def test_output_p1log_message_type_filter(self, tmp): + """p1log output with a type filter contains only the requested type.""" + path = tmp / 'input.p1log' + out = tmp / 'out.p1log' + self._write_fe_messages(path, [ + (PoseMessage, 0.0), + (GNSSInfoMessage, 0.0), + (PoseMessage, 1.0), + ]) + self._run(path, output=str(out), output_format='p1log', + message_type=['Pose']) + types = [t for t, _ in self._read_output(out)] + assert types == [MessageType.POSE, MessageType.POSE] + + def test_output_raw_matches_input_for_pure_fe_file(self, tmp): + """Raw output of a pure FE file is byte-identical to the input.""" + path = tmp / 'input.p1log' + out = tmp / 'out.bin' + encoder = FusionEngineEncoder() + raw_content = b''.join( + encoder.encode_message(PoseMessage()) for _ in range(3) + ) + path.write_bytes(raw_content) + + self._run(path, output=str(out), output_format='raw') + assert out.read_bytes() == raw_content + + def test_output_csv_header_and_rows(self, tmp): + """CSV output starts with the expected header and has one row per message.""" + path = tmp / 'input.p1log' + out = tmp / 'out.csv' + self._write_fe_messages(path, [ + (PoseMessage, 0.0), + (PoseMessage, 1.0), + (PoseMessage, 2.0), + ]) + self._run(path, output=str(out), output_format='csv') + lines = out.read_text().splitlines() + assert lines[0] == 'host_time,type,p1_time,sys_time' + assert len(lines) == 4 # header + 3 data rows + for line in lines[1:]: + fields = line.split(',') + assert fields[1] == 'POSE' + + def test_output_raw_with_type_filter_produces_p1log(self, tmp): + """--output-format=raw plus --message-type is silently demoted to p1log.""" + path = tmp / 'input.p1log' + out = tmp / 'out.bin' + self._write_fe_messages(path, [ + (PoseMessage, 0.0), + (GNSSInfoMessage, 0.0), + ]) + app = self._run(path, output=str(out), output_format='raw', + message_type=['Pose']) + assert app.messages_sent == 1 + # Output is a valid p1log containing only Pose + types = [t for t, _ in self._read_output(out)] + assert types == [MessageType.POSE] + + # ------------------------------------------------------------------------- + # --max and --skip + + def test_max_stops_after_n_messages(self, tmp): + path = tmp / 'input.p1log' + self._write_fe_messages(path, [(PoseMessage, float(i)) for i in range(10)]) + app = self._run(path, max=3) + assert app.messages_sent == 3 + + def test_skip_omits_first_n_messages(self, tmp): + path = tmp / 'input.p1log' + self._write_fe_messages(path, [(PoseMessage, float(i)) for i in range(10)]) + app = self._run(path, skip=4) + assert app.messages_sent == 6 + + def test_skip_counts_only_filtered_messages(self, tmp): + """--skip counts only messages that pass the type filter.""" + path = tmp / 'input.p1log' + specs = [] + for i in range(5): + specs.append((PoseMessage, float(i))) + specs.append((GNSSInfoMessage, float(i))) + self._write_fe_messages(path, specs) + # 5 Pose messages total; skipping 4 filtered (Pose) ones leaves 1 + app = self._run(path, message_type=['Pose'], skip=4) + assert app.messages_sent == 1 + + def test_max_and_skip_combined(self, tmp): + path = tmp / 'input.p1log' + self._write_fe_messages(path, [(PoseMessage, float(i)) for i in range(10)]) + app = self._run(path, skip=2, max=3) + assert app.messages_sent == 3 + + def test_max_and_skip_with_type_filter(self, tmp): + """--skip and --max both count only messages that pass the type filter.""" + path = tmp / 'input.p1log' + specs = [] + for i in range(10): + specs.append((PoseMessage, float(i))) + specs.append((GNSSInfoMessage, float(i))) + self._write_fe_messages(path, specs) + # 10 Pose messages; skip 3 filtered, then take max 4 → 4 sent + app = self._run(path, message_type=['Pose'], skip=3, max=4) + assert app.messages_sent == 4 + + def test_max_counts_only_matching_type(self, tmp): + """--max counts only messages that pass the type filter.""" + path = tmp / 'input.p1log' + # Interleave Pose and GNSSInfo + specs = [] + for i in range(5): + specs.append((PoseMessage, float(i))) + specs.append((GNSSInfoMessage, float(i))) + self._write_fe_messages(path, specs) + app = self._run(path, message_type=['Pose'], max=2) + assert app.messages_sent == 2 + + def test_time_range_relative_closed(self, tmp): + """Relative range [1, 3) must return messages at t=1 and t=2.""" + path = tmp / 'input.p1log' + self._write_fe_messages(path, [(PoseMessage, float(i)) for i in range(6)]) + app = self._run(path, time='1:3') + assert app.messages_sent == 2 # currently returns 1 + + def test_time_range_relative_open_end(self, tmp): + """Relative range [4, ∞) must return messages at t=4 and t=5.""" + path = tmp / 'input.p1log' + self._write_fe_messages(path, [(PoseMessage, float(i)) for i in range(6)]) + app = self._run(path, time='4:') + assert app.messages_sent == 2 # currently returns 0 + + def test_message_type_invert_single(self, tmp): + """--invert --message-type=Pose must forward everything except Pose.""" + path = tmp / 'input.p1log' + out = tmp / 'out.p1log' + self._write_fe_messages(path, [ + (PoseMessage, 0.0), + (PoseMessage, 1.0), + (GNSSInfoMessage, 1.0), + ]) + app = self._run(path, output=str(out), output_format='p1log', + message_type=['Pose'], invert=True) + assert app.messages_sent == 1 # currently returns 0 + types = [t for t, _ in self._read_output(out)] + assert types == [MessageType.GNSS_INFO] + + def test_message_type_invert_multiple(self, tmp): + """--invert with multiple types excludes all of them.""" + path = tmp / 'input.p1log' + out = tmp / 'out.p1log' + self._write_fe_messages(path, [ + (PoseMessage, 0.0), + (GNSSInfoMessage, 0.0), + (EventNotificationMessage, None), + ]) + app = self._run(path, output=str(out), output_format='p1log', + message_type=['Pose', 'GNSSInfo'], invert=True) + assert app.messages_sent == 1 # currently returns 0 + types = [t for t, _ in self._read_output(out)] + assert types == [MessageType.EVENT_NOTIFICATION]