max86150.py 2.34 KB
Newer Older
Arist's avatar
Arist committed
1
2
3
import sys_max86150
import uerrno
import interrupt
4
5
6
import ucollections

Max86150Data = ucollections.namedtuple("Max86150Data", ["red", "infrared", "ecg"])
Arist's avatar
Arist committed
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73


class MAX86150:
    """
    The MAX86150 class provides a stram interface to the MAX86150 PPG and ECG.

    .. code-block:: python

        import MAX86150
        m = max86150.MAX86150()
        m.read()
        m.close()
    """

    def __init__(self, callback=None, sample_buffer_len=128, sample_rate=200):
        """
        Initializes the MAX86150 (if it is not already running).

        :param callback: If not None: A callback which is called with the data when ever new data is available

        """
        self.active = False
        self.stream_id = -uerrno.ENODEV
        self.interrupt_id = interrupt.MAX86150
        self._callback = callback
        self.sample_rate = sample_rate
        self.sample_buffer_len = sample_buffer_len
        self.enable_sensor()

    def enable_sensor(self):
        """
        Enables the sensor. Automatically called by __init__.
        """
        interrupt.disable_callback(self.interrupt_id)
        interrupt.set_callback(self.interrupt_id, self._interrupt)
        self.stream_id = sys_max86150.enable_sensor(
            self.sample_buffer_len, self.sample_rate
        )

        self.active = True

        if self._callback:
            interrupt.enable_callback(self.interrupt_id)

    def __enter__(self):
        return self

    def __exit__(self, _et, _ev, _t):
        self.close()

    def close(self):
        """
        Close the currently open connection to the sensor.
        """

        if self.active:
            self.active = False
            sys_max86150.disable_sensor()

            interrupt.disable_callback(self.interrupt_id)
            interrupt.set_callback(self.interrupt_id, None)

    def read(self):
        """
        Read as many samples (signed integer) as currently available.
        """
        assert self.active, "Sensor is inactive"
74
75
76
77
78
79
80
        result = []
        for sample in sys_max86150.read_sensor(self.stream_id):
            result.append(self._convert(sample))
        return result

    def _convert(self, sample):
        return Max86150Data(sample[0], sample[1], sample[2])
Arist's avatar
Arist committed
81
82
83
84
85
86

    def _interrupt(self, _):
        if self.active:
            data = self.read()
            if self._callback:
                self._callback(data)