Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added recording class for Nonin 3231 device, to work with the cardiocieption HRD task #71

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions systole/recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,3 +748,172 @@ def read(self, duration):
def close(self):
"""Close TCPIP connections"""
self.con.close()





class Nonin3231USB:
"""Recording Nonin 3231 USB HR signal through USB connection

Parameters
----------

serial : pySerial object
The `serial` instance interfacing with the USB port.


Examples
--------
First, you will need to define a :py:func:`serial` instance, indexing the
USB port where the Nonin 3231 Pulse Oximeter is plugged.

>>> import serial
>>> ser = serial.Serial('COM4')

This instance is then used to create an :py:func:`Nonin3231USB` instance
that will be used for the recording from a Nonin 3231 USB device.

>>> from ecg.recording import Nonin3231USB
>>> exg = Nonin3231USB(serial).read(30)

Use the :py:func:`read` method to record some signal and save it in the
`exg` dictionary.

.. warning:: The signals received fom the host are appened to a list. This
process can require more time at each iteration as the signal length
increase in memory. You should alway make sure that this will not
interfer with other task and regularly save intermediate recording to
save resources.

Notes
-----
"""

def __init__(
self,
serial,
add_channels: Optional[int] = 1,
):
self.reset(serial, add_channels)

def reset(
self,
serial,
add_channels: Optional[int] = 1,
):
"""Initialize/restart the recording instance.

Parameters
----------
serial : pySerial object
The `serial` instance interfacing with the USB port.

Returns
-------
Nonin3231USB instance.
"""
self.serial = serial

# Initialize recording with empty lists
self.recording: List[float] = []
self.bpm: List[float] = []
self.SpO2: List[float] = []
self.times: List[float] = []
self.n_channels: Optional[int] = add_channels
if add_channels is not None:
self.channels: Optional[Dict[str, List]] = {}
for i in range(add_channels):
self.channels[f"Channel_{i}"] = []
else:
self.channels = None

# print('channels: '+str(self.channels))

return self

def setup(self):
self.reset(
serial=self.serial,
)

return self

def read(self, duration: float):
"""Read PPG signal for some amount of time.

Parameters
----------
duration : int or float
Length of the desired recording time.
"""
# init start ime
tstart = time.time()

# read for length of duration
while time.time() - tstart < duration:
# assert one full line of data is there
if self.serial.inWaiting() >= 12:
# Store line of data
data = list(self.serial.readline())
# assert full data line
if len(data) < 12:
continue
# get bpm reading
self.bpm.append(data[8])
# give bpm to recording as well
self.recording.append(data[8])
# get SpO2 reading (we don't use it)
self.SpO2.append(data[6])
# get 'time' reading (seconds)
self.times.append(data[5])
# Add 0 to the additional channels
if self.channels is not None:
for ch in self.channels:
self.channels[ch].append(0)

return self


def readInWaiting(self, bool = False):
"""Read in waiting Nonin data.

Parameters
----------
stop : bool
Stop the recording when an error is detected. Default is *False*.
"""

# nonin device reads out 12 bits per message
while self.serial.inWaiting() >= 12:

# Store full message line
data = list(self.serial.readline())
# assert that there is a full line of data
if len(data) < 12:
continue
# get bpm reading
self.bpm.append(data[8])
# give bpm to recording as well
self.recording.append(data[8])
# get SpO2 reading (we don't use it)
self.SpO2.append(data[6])
# get 'time' reading (seconds)
self.times.append(data[5])
# Add 0 to the additional channels for triggers
if self.channels is not None:
for ch in self.channels:
self.channels[ch].append(0)



def save(self, fname: str):
"""
Not saving the data as of now, only results
"""
None


def close(self):
"""Close serial connections"""
self.serial.close()