max86150.py 2.51 KB
Newer Older
Arist's avatar
Arist committed
1
2
3
import sys_max86150
import uerrno
import interrupt
4
5
6
import ucollections

Max86150Data = ucollections.namedtuple("Max86150Data", ["red", "infrared", "ecg"])
Arist's avatar
Arist committed
7
8
9
10


class MAX86150:
    """
Rahix's avatar
Rahix committed
11
12
13
    The MAX86150 class provides a stream interface to the MAX86150 PPG and ECG.

    **Example**:
Arist's avatar
Arist committed
14
15
16

    .. code-block:: python

Rahix's avatar
Rahix committed
17
        import max86150
Arist's avatar
Arist committed
18
        m = max86150.MAX86150()
Rahix's avatar
Rahix committed
19
20
21
22
23

        data = m.read()
        for sample in data:
            print("Red: {} Infrared: {} ECG: {}", sample.red, sample.infrared, sample.ecg)

Arist's avatar
Arist committed
24
25
26
27
28
29
30
        m.close()
    """

    def __init__(self, callback=None, sample_buffer_len=128, sample_rate=200):
        """
        Initializes the MAX86150 (if it is not already running).

Rahix's avatar
Rahix committed
31
32
        :param callback: If not None: A callback which is called with the data
            when ever new data is available
Arist's avatar
Arist committed
33
34
35
36
37
38
39
40
41
42
43
        """
        self.active = False
        self.stream_id = -uerrno.ENODEV
        self.interrupt_id = interrupt.MAX86150
        self._callback = callback
        self.sample_rate = sample_rate
        self.sample_buffer_len = sample_buffer_len
        self.enable_sensor()

    def enable_sensor(self):
        """
Rahix's avatar
Rahix committed
44
45
46
        Enables the sensor.

        Automatically called when instanciating the sensor object.
Arist's avatar
Arist committed
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
        """
        interrupt.disable_callback(self.interrupt_id)
        interrupt.set_callback(self.interrupt_id, self._interrupt)
        self.stream_id = sys_max86150.enable_sensor(
            self.sample_buffer_len, self.sample_rate
        )

        self.active = True

        if self._callback:
            interrupt.enable_callback(self.interrupt_id)

    def __enter__(self):
        return self

    def __exit__(self, _et, _ev, _t):
        self.close()

    def close(self):
        """
        Close the currently open connection to the sensor.
        """

        if self.active:
            self.active = False
            sys_max86150.disable_sensor()

            interrupt.disable_callback(self.interrupt_id)
            interrupt.set_callback(self.interrupt_id, None)

    def read(self):
        """
Rahix's avatar
Rahix committed
79
        Read as many samples as currently available.
Arist's avatar
Arist committed
80
81
        """
        assert self.active, "Sensor is inactive"
82
83
84
85
86
87
88
        result = []
        for sample in sys_max86150.read_sensor(self.stream_id):
            result.append(self._convert(sample))
        return result

    def _convert(self, sample):
        return Max86150Data(sample[0], sample[1], sample[2])
Arist's avatar
Arist committed
89
90
91
92
93
94

    def _interrupt(self, _):
        if self.active:
            data = self.read()
            if self._callback:
                self._callback(data)