From 5a37c17662e7ed4d507542c2b92fe25a6ce21837 Mon Sep 17 00:00:00 2001 From: RobMeades Date: Mon, 18 Mar 2024 19:50:16 +0000 Subject: [PATCH] Revised version: now able to take input from a serial port and, on Linux, send output to a virtual serial port. Test: none --- gnss/api/u_gnss_ucenter_ubx.py | 122 +++++++++++++++++++-------------- 1 file changed, 69 insertions(+), 53 deletions(-) diff --git a/gnss/api/u_gnss_ucenter_ubx.py b/gnss/api/u_gnss_ucenter_ubx.py index 00d7054f..8ca8fe19 100644 --- a/gnss/api/u_gnss_ucenter_ubx.py +++ b/gnss/api/u_gnss_ucenter_ubx.py @@ -172,57 +172,74 @@ def line_read_character_by_character(input_handle): return return_value -def main(source, output_file, responses_only, echo_on, baud_rate): +def main(source, destination, responses_only, echo_on, baud_rate): '''Main as a function''' return_value = 1 - try: - # Open the source - reading_from_device = False + # Open the source + reading_from_device = False + source_handle = None + if os.path.isfile(source): try: source_handle = open(source, "r", encoding="utf8") - except FileNotFoundError: - print(f"Cannot open \"{source}\" as a file, trying to open it as a serial device.") - try: - # Open what is assumed to be a serial device - source_handle = serial.Serial(baudrate=baud_rate, timeout=0.05) - source_handle.port = source - source_handle.open() - reading_from_device = True - except (ValueError, serial.SerialException) as ex: - print(f"{type(ex).__name__} while opening port \"{source}\" for source.") - source_handle = None - if source_handle: - # Open either the output file or a virtual - # device that the caller can connect uCenter to - output_file_handle = None - device_output = None - if output_file: - if not os.path.splitext(output_file)[1]: - output_file += "." + OUTPUT_FILE_EXTENSION - output_file_handle = open(output_file, "wb") + except (NameError, FileNotFoundError, PermissionError) as ex: + print(f"{type(ex).__name__} while trying to open \"{source}\".") + else: + print(f"Input \"{source}\" is not a file, trying to open it as a device.") + try: + # Open what is assumed to be a serial device + source_handle = serial.Serial(baudrate=baud_rate, timeout=0.05) + source_handle.port = source + source_handle.open() + reading_from_device = True + except (ValueError, serial.SerialException) as ex: + print(f"{type(ex).__name__} while opening device \"{source}\".") + source_handle = None + if source_handle: + # Open either the output file or a virtual + # device that the caller can connect uCenter to + destination_handle = None + writing_to_device = False + try: + if destination: + if os.path.isfile(destination): + if not os.path.splitext(destination)[1]: + destination += "." + OUTPUT_FILE_EXTENSION + destination_handle = open(destination, "wb") + else: + print(f"Output \"{destination}\" is not a file, trying to open it as a device.") + try: + # Open what is assumed to be a serial device + destination_handle = serial.Serial(baudrate=baud_rate, timeout=0.05) + destination_handle.port = destination + destination_handle.open() + writing_to_device = True + except (ValueError, serial.SerialException) as ex: + print(f"{type(ex).__name__} while opening device \"{destination_handle}\".") + destination_handle = None else: if reading_from_device and platform.system() == "Linux": try: # Try to open a virtual serial port # that we can write the output to _, device_output = pty.openpty() - output_file = os.ttyname(device_output) - output_file_handle = serial.Serial(output_file) + destination = os.ttyname(device_output) + destination_handle = serial.Serial(destination) + writing_to_device = True except (NameError, FileNotFoundError, PermissionError) as ex: print(f"{type(ex).__name__} while trying to create virtual device" \ f" for output: : {str(ex)}.") - else: + if not destination_handle: # Output file name is source file with modified extension - output_file = os.path.splitext(output_file)[0] + "." + OUTPUT_FILE_EXTENSION - output_file_handle = open(output_file, "wb") - if output_file_handle: + destination = os.path.splitext(source)[0] + "." + OUTPUT_FILE_EXTENSION + destination_handle = open(destination, "wb") + if destination_handle: print_text = f"Reading from {source}" - if output_file: - print_text += f", writing to {output_file}" + if destination: + print_text += f", writing to {destination}" print_text += f", looking for lines containing \"{GNSS_LOG_LINE_MARKER_RECEIVE}\"" if not responses_only: - print_text += f" and \"{GNSS_LOG_LINE_MARKER_COMMAND}\"" + print_text += f" and \"{GNSS_LOG_LINE_MARKER_COMMAND}\"..." print(print_text) if reading_from_device: print("Use CTRL-C to stop.") @@ -247,8 +264,8 @@ def main(source, output_file, responses_only, echo_on, baud_rate): message = line_parser(line_number, source_line, responses_only) if message: message_count += 1 - if output_file_handle: - output_file_handle.write(message) + if destination_handle: + destination_handle.write(message) else: if not reading_from_device: # If we are reading from a file this @@ -259,15 +276,15 @@ def main(source, output_file, responses_only, echo_on, baud_rate): pass print(f"Found {message_count} UBX messages(s) in {line_number} line(s).") source_handle.close() - if output_file and not device_output and message_count > 0: - print(f"File {output_file} has been written: you may open it in uCenter.") + if destination and not writing_to_device and message_count > 0: + print(f"File {destination} has been written: you may open it in uCenter.") return_value = 0 - if output_file_handle: - output_file_handle.close() - except (NameError, FileNotFoundError, PermissionError) as ex: - print(f"{type(ex).__name__} while trying to open {output_file} for writing.") - else: - print(f"Unable to open \"{source}\".") + if destination_handle: + destination_handle.close() + except (NameError, FileNotFoundError, PermissionError) as ex: + print(f"{type(ex).__name__} while trying to open {destination} for writing.") + else: + print(f"Unable to open \"{source}\".") return return_value @@ -281,18 +298,17 @@ def main(source, output_file, responses_only, echo_on, baud_rate): PARSER.add_argument("source", help="the source of ubxlib log" \ " data; either a file or a device that is" \ " emitting ubxlib log output.") - PARSER.add_argument("output_file", nargs="?", help= "optional" \ - " output file name; if provided then the" \ - " output will be written to this file (if" \ - " the file exists it will be overwritten," \ - " if no extension is given ." + \ + PARSER.add_argument("destination", nargs="?", help= "optional" \ + " output name; if provided then the output" \ + " will be written to this file or device" \ + " (if the file exists it will be" \ + " overwritten, if no extension is given ." +\ OUTPUT_FILE_EXTENSION + " will be added)." \ " If this parameter is omitted and the" \ - " source is a device then, on Linux, the" \ + " source is a device then, on Linux, the" \ " output will be written to a virtual COM" \ - " port to which you may connect uCenter" \ - " directly. Otherwise the output will" \ - " be written to the same name as the source" \ + " port. Otherwise the output will be" \ + " written to the same name as the source" \ " but with extension " + \ OUTPUT_FILE_EXTENSION + ".") PARSER.add_argument("-r", action="store_true", help="include" \ @@ -308,7 +324,7 @@ def main(source, output_file, responses_only, echo_on, baud_rate): ARGS = PARSER.parse_args() # Call main() - RETURN_VALUE = main(ARGS.source, ARGS.output_file, ARGS.r, ARGS.e, ARGS.b) + RETURN_VALUE = main(ARGS.source, ARGS.destination, ARGS.r, ARGS.e, ARGS.b) sys.exit(RETURN_VALUE)