Skip to content
This repository has been archived by the owner on Nov 11, 2024. It is now read-only.

Commit

Permalink
Revised version: now able to take input from a serial port and send o…
Browse files Browse the repository at this point in the history
…utput to either, on Linux, a PTY or, on Windows or Linux, an existing serial port.

Test: none
  • Loading branch information
RobMeades committed Mar 18, 2024
1 parent 63a2610 commit 57e31bf
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 56 deletions.
4 changes: 4 additions & 0 deletions gnss/api/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Requirements for u_gnss_ucenter_ubx.py, install with
# pip3 install -r ./requirements.txt

pyserial
127 changes: 71 additions & 56 deletions gnss/api/u_gnss_ucenter_ubx.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,57 +172,74 @@ def line_read_character_by_character(input_handle):

return return_value

def main(source, output_file, responses_only, echo_on, baud_rate):
def main(source, destination, responses_only, echo_on, baud_rate):
'''Main as a function'''
return_value = 1

try:
# Open the source
reading_from_device = False
# Open the source
reading_from_device = False
source_handle = None
if os.path.isfile(source):
try:
source_handle = open(source, "r", encoding="utf8")
except FileNotFoundError:
print(f"Cannot open \"{source}\" as a file, trying to open it as a serial device.")
try:
# Open what is assumed to be a serial device
source_handle = serial.Serial(baudrate=baud_rate, timeout=0.05)
source_handle.port = source
source_handle.open()
reading_from_device = True
except (ValueError, serial.SerialException) as ex:
print(f"{type(ex).__name__} while opening port \"{source}\" for source.")
source_handle = None
if source_handle:
# Open either the output file or a virtual
# device that the caller can connect uCenter to
output_file_handle = None
device_output = None
if output_file:
if not os.path.splitext(output_file)[1]:
output_file += "." + OUTPUT_FILE_EXTENSION
output_file_handle = open(output_file, "wb")
except (NameError, FileNotFoundError, PermissionError) as ex:
print(f"{type(ex).__name__} while trying to open \"{source}\".")
else:
print(f"Input \"{source}\" is not a file, trying to open it as a device.")
try:
# Open what is assumed to be a serial device
source_handle = serial.Serial(baudrate=baud_rate, timeout=0.05)
source_handle.port = source
source_handle.open()
reading_from_device = True
except (ValueError, serial.SerialException) as ex:
print(f"{type(ex).__name__} while opening device \"{source}\".")
source_handle = None
if source_handle:
# Open either the output file or a virtual
# device that the caller can connect uCenter to
destination_handle = None
writing_to_device = False
try:
if destination:
if os.path.isfile(destination):
if not os.path.splitext(destination)[1]:
destination += "." + OUTPUT_FILE_EXTENSION
destination_handle = open(destination, "wb")
else:
print(f"Output \"{destination}\" is not a file, trying to open it as a device.")
try:
# Open what is assumed to be a serial device
destination_handle = serial.Serial(baudrate=baud_rate, timeout=0.05)
destination_handle.port = destination
destination_handle.open()
writing_to_device = True
except (ValueError, serial.SerialException) as ex:
print(f"{type(ex).__name__} while opening device \"{destination_handle}\".")
destination_handle = None
else:
if reading_from_device and platform.system() == "Linux":
try:
# Try to open a virtual serial port
# that we can write the output to
_, device_output = pty.openpty()
output_file = os.ttyname(device_output)
output_file_handle = serial.Serial(output_file)
destination = os.ttyname(device_output)
destination_handle = serial.Serial(destination)
writing_to_device = True
except (NameError, FileNotFoundError, PermissionError) as ex:
print(f"{type(ex).__name__} while trying to create virtual device" \
f" for output: : {str(ex)}.")
else:
if not destination_handle:
# Output file name is source file with modified extension
output_file = os.path.splitext(output_file)[0] + "." + OUTPUT_FILE_EXTENSION
output_file_handle = open(output_file, "wb")
if output_file_handle:
destination = os.path.splitext(source)[0] + "." + OUTPUT_FILE_EXTENSION
destination_handle = open(destination, "wb")
if destination_handle:
print_text = f"Reading from {source}"
if output_file:
print_text += f", writing to {output_file}"
if destination:
print_text += f", writing to {destination}"
print_text += f", looking for lines containing \"{GNSS_LOG_LINE_MARKER_RECEIVE}\""
if not responses_only:
print_text += f" and \"{GNSS_LOG_LINE_MARKER_COMMAND}\""
print_text += f" and \"{GNSS_LOG_LINE_MARKER_COMMAND}\"..."
print(print_text)
if reading_from_device:
print("Use CTRL-C to stop.")
Expand All @@ -247,8 +264,8 @@ def main(source, output_file, responses_only, echo_on, baud_rate):
message = line_parser(line_number, source_line, responses_only)
if message:
message_count += 1
if output_file_handle:
output_file_handle.write(message)
if destination_handle:
destination_handle.write(message)
else:
if not reading_from_device:
# If we are reading from a file this
Expand All @@ -259,15 +276,15 @@ def main(source, output_file, responses_only, echo_on, baud_rate):
pass
print(f"Found {message_count} UBX messages(s) in {line_number} line(s).")
source_handle.close()
if output_file and not device_output and message_count > 0:
print(f"File {output_file} has been written: you may open it in uCenter.")
if destination and not writing_to_device and message_count > 0:
print(f"File {destination} has been written: you may open it in uCenter.")
return_value = 0
if output_file_handle:
output_file_handle.close()
except (NameError, FileNotFoundError, PermissionError) as ex:
print(f"{type(ex).__name__} while trying to open {output_file} for writing.")
else:
print(f"Unable to open \"{source}\".")
if destination_handle:
destination_handle.close()
except (NameError, FileNotFoundError, PermissionError) as ex:
print(f"{type(ex).__name__} while trying to open {destination} for writing.")
else:
print(f"Unable to open \"{source}\".")

return return_value

Expand All @@ -281,20 +298,18 @@ def main(source, output_file, responses_only, echo_on, baud_rate):
PARSER.add_argument("source", help="the source of ubxlib log" \
" data; either a file or a device that is" \
" emitting ubxlib log output.")
PARSER.add_argument("output_file", nargs="?", help= "optional" \
" output file name; if provided then the" \
" output will be written to this file (if" \
" the file exists it will be overwritten," \
" if no extension is given ." + \
PARSER.add_argument("destination", nargs="?", help= "optional" \
" output name; if provided then the output" \
" will be written to this file or device" \
" (if the file exists it will be" \
" overwritten, if no extension is given ." +\
OUTPUT_FILE_EXTENSION + " will be added)." \
" If this parameter is omitted and the" \
" source is a device then, on Linux, the" \
" output will be written to a virtual COM" \
" port to which you may connect uCenter" \
" directly. Otherwise the output will" \
" be written to the same name as the source" \
" but with extension " + \
OUTPUT_FILE_EXTENSION + ".")
" source is a device then, on Linux, the" \
" output will be written to a PTY." \
" Otherwise the output will be written to" \
" the same name as the source but with" \
" extension " + OUTPUT_FILE_EXTENSION + ".")
PARSER.add_argument("-r", action="store_true", help="include" \
" only the responses from the GNSS device" \
" (i.e. leave out any commands sent to the" \
Expand All @@ -308,7 +323,7 @@ def main(source, output_file, responses_only, echo_on, baud_rate):
ARGS = PARSER.parse_args()

# Call main()
RETURN_VALUE = main(ARGS.source, ARGS.output_file, ARGS.r, ARGS.e, ARGS.b)
RETURN_VALUE = main(ARGS.source, ARGS.destination, ARGS.r, ARGS.e, ARGS.b)

sys.exit(RETURN_VALUE)

Expand Down

0 comments on commit 57e31bf

Please sign in to comment.