From e652bd25543580aece374eb7ecddaf79a2de4ecc Mon Sep 17 00:00:00 2001 From: DeflateAwning <11021263+DeflateAwning@users.noreply.github.com> Date: Wed, 2 Oct 2024 00:54:12 -0600 Subject: [PATCH] WIP: Finish pre-work data rxtx log data structure change --- cts1_ground_support/terminal_app/app_store.py | 3 +++ cts1_ground_support/terminal_app/serial_thread.py | 9 +++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cts1_ground_support/terminal_app/app_store.py b/cts1_ground_support/terminal_app/app_store.py index 68624e1..8370ad1 100644 --- a/cts1_ground_support/terminal_app/app_store.py +++ b/cts1_ground_support/terminal_app/app_store.py @@ -14,6 +14,9 @@ class AppStore: firmware_repo_path: Path | None = None uart_port_name: str = UART_PORT_NAME_DISCONNECTED + + # `rxtx_log` is a dictionary, where keys are increasing ints by the order the messages were + # added/received. Elements are popped from the start, and added to the end. rxtx_log: dict[int, RxTxLogEntry] = field( default_factory=lambda: {0: RxTxLogEntry(b"Start of Log", "notice")} ) diff --git a/cts1_ground_support/terminal_app/serial_thread.py b/cts1_ground_support/terminal_app/serial_thread.py index ba5524d..49b4325 100644 --- a/cts1_ground_support/terminal_app/serial_thread.py +++ b/cts1_ground_support/terminal_app/serial_thread.py @@ -27,16 +27,17 @@ def uart_listener() -> None: # Check for incoming data if port.in_waiting > 0: received_data: bytes = port.readline() - app_store.rxtx_log.append(RxTxLogEntry(received_data, "receive")) + app_store.append_to_rxtx_log(RxTxLogEntry(received_data, "receive")) elif len(app_store.rxtx_log) > MAX_RX_TX_LOG_ENTRIES: # Data's not coming in rapid-fire, so take a sec to purge the buffer. - app_store.rxtx_log = app_store.rxtx_log[-MAX_RX_TX_LOG_ENTRIES:] + # Remove the oldest entry. + app_store.rxtx_log.pop(next(iter(app_store.rxtx_log.keys()))) # Check for outgoing data if len(app_store.tx_queue) > 0: tx_data = app_store.tx_queue.pop(0) port.write(tx_data) - app_store.rxtx_log.append(RxTxLogEntry(tx_data, "transmit")) + app_store.append_to_rxtx_log(RxTxLogEntry(tx_data, "transmit")) app_store.last_tx_timestamp_sec = time.time() time.sleep(0.01) @@ -46,7 +47,7 @@ def uart_listener() -> None: f"Serial port forcefully disconnected (SerialException: {e}): " f"{app_store.uart_port_name}" ) - app_store.rxtx_log.append(RxTxLogEntry(msg.encode(), "error")) + app_store.append_to_rxtx_log(RxTxLogEntry(msg.encode(), "error")) logger.warning(msg) app_store.uart_port_name = UART_PORT_NAME_DISCONNECTED # propagate back to UI time.sleep(0.5)