]> vault307.fbx.one Git - ir_remote.git/commitdiff
infrared remote master
authorjimmy <jimipunk88@gmail.com>
Fri, 28 Jun 2024 01:20:54 +0000 (20:20 -0500)
committerjimmy <jimipunk88@gmail.com>
Fri, 28 Jun 2024 01:20:54 +0000 (20:20 -0500)
37 files changed:
ir_rx/__init__.py [new file with mode: 0644]
ir_rx/acquire.py [new file with mode: 0644]
ir_rx/mce.py [new file with mode: 0644]
ir_rx/nec.py [new file with mode: 0644]
ir_rx/philips.py [new file with mode: 0644]
ir_rx/print_error.py [new file with mode: 0644]
ir_rx/sony.py [new file with mode: 0644]
ir_rx/test.py [new file with mode: 0644]
ir_tx/__init__.py [new file with mode: 0644]
ir_tx/mce.py [new file with mode: 0644]
ir_tx/mcetest.py [new file with mode: 0644]
ir_tx/nec.py [new file with mode: 0644]
ir_tx/philips.py [new file with mode: 0644]
ir_tx/rp2_rmt.py [new file with mode: 0644]
ir_tx/sony.py [new file with mode: 0644]
ir_tx/test.py [new file with mode: 0644]
main.py [new file with mode: 0644]
primitives/__init__.py [new file with mode: 0644]
primitives/aadc.py [new file with mode: 0644]
primitives/barrier.py [new file with mode: 0644]
primitives/condition.py [new file with mode: 0644]
primitives/delay_ms.py [new file with mode: 0644]
primitives/encoder.py [new file with mode: 0644]
primitives/events.py [new file with mode: 0644]
primitives/pushbutton.py [new file with mode: 0644]
primitives/queue.py [new file with mode: 0644]
primitives/ringbuf_queue.py [new file with mode: 0644]
primitives/semaphore.py [new file with mode: 0644]
primitives/switch.py [new file with mode: 0644]
primitives/tests/__init__.py [new file with mode: 0644]
primitives/tests/adctest.py [new file with mode: 0644]
primitives/tests/asyntest.py [new file with mode: 0644]
primitives/tests/delay_test.py [new file with mode: 0644]
primitives/tests/encoder_stop.py [new file with mode: 0644]
primitives/tests/encoder_test.py [new file with mode: 0644]
primitives/tests/event_test.py [new file with mode: 0644]
primitives/tests/switches.py [new file with mode: 0644]

diff --git a/ir_rx/__init__.py b/ir_rx/__init__.py
new file mode 100644 (file)
index 0000000..a8ff823
--- /dev/null
@@ -0,0 +1,70 @@
+# ir_rx __init__.py Decoder for IR remote control using synchronous code
+# IR_RX abstract base class for IR receivers.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020-2021 Released under the MIT license
+
+from machine import Timer, Pin
+from array import array
+from utime import ticks_us
+
+# Save RAM
+# from micropython import alloc_emergency_exception_buf
+# alloc_emergency_exception_buf(100)
+
+
+# On 1st edge start a block timer. While the timer is running, record the time
+# of each edge. When the timer times out decode the data. Duration must exceed
+# the worst case block transmission time, but be less than the interval between
+# a block start and a repeat code start (~108ms depending on protocol)
+
+class IR_RX():
+    # Result/error codes
+    # Repeat button code
+    REPEAT = -1
+    # Error codes
+    BADSTART = -2
+    BADBLOCK = -3
+    BADREP = -4
+    OVERRUN = -5
+    BADDATA = -6
+    BADADDR = -7
+
+    def __init__(self, pin, nedges, tblock, callback, *args):  # Optional args for callback
+        self._pin = pin
+        self._nedges = nedges
+        self._tblock = tblock
+        self.callback = callback
+        self.args = args
+        self._errf = lambda _ : None
+        self.verbose = False
+
+        self._times = array('i',  (0 for _ in range(nedges + 1)))  # +1 for overrun
+        pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING))
+        self.edge = 0
+        self.tim = Timer(-1)  # Sofware timer
+        self.cb = self.decode
+
+    # Pin interrupt. Save time of each edge for later decode.
+    def _cb_pin(self, line):
+        t = ticks_us()
+        # On overrun ignore pulses until software timer times out
+        if self.edge <= self._nedges:  # Allow 1 extra pulse to record overrun
+            if not self.edge:  # First edge received
+                self.tim.init(period=self._tblock , mode=Timer.ONE_SHOT, callback=self.cb)
+            self._times[self.edge] = t
+            self.edge += 1
+
+    def do_callback(self, cmd, addr, ext, thresh=0):
+        self.edge = 0
+        if cmd >= thresh:
+            self.callback(cmd, addr, ext, *self.args)
+        else:
+            self._errf(cmd)
+
+    def error_function(self, func):
+        self._errf = func
+
+    def close(self):
+        self._pin.irq(handler = None)
+        self.tim.deinit()
diff --git a/ir_rx/acquire.py b/ir_rx/acquire.py
new file mode 100644 (file)
index 0000000..af3ab38
--- /dev/null
@@ -0,0 +1,108 @@
+# acquire.py Acquire a pulse train from an IR remote
+# Supports NEC protocol.
+# For a remote using NEC see https://www.adafruit.com/products/389
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from machine import Pin, freq
+from sys import platform
+
+from utime import sleep_ms, ticks_us, ticks_diff
+from ir_rx import IR_RX
+
+
+class IR_GET(IR_RX):
+    def __init__(self, pin, nedges=100, twait=100, display=True):
+        self.display = display
+        super().__init__(pin, nedges, twait, lambda *_ : None)
+        self.data = None
+
+    def decode(self, _):
+        def near(v, target):
+            return target * 0.8 < v < target * 1.2
+        lb = self.edge - 1  # Possible length of burst
+        if lb < 3:
+            return  # Noise
+        burst = []
+        for x in range(lb):
+            dt = ticks_diff(self._times[x + 1], self._times[x])
+            if x > 0 and dt > 10000:  # Reached gap between repeats
+                break
+            burst.append(dt)
+        lb = len(burst)  # Actual length
+        # Duration of pulse train 24892 for RC-5 22205 for RC-6
+        duration = ticks_diff(self._times[lb - 1], self._times[0])
+
+        if self.display:
+            for x, e in enumerate(burst):
+                print('{:03d} {:5d}'.format(x, e))
+            print()
+            # Attempt to determine protocol
+            ok = False  # Protocol not yet found
+            if near(burst[0], 9000) and lb == 67:
+                print('NEC')
+                ok = True
+
+            if not ok and near(burst[0], 2400) and near(burst[1], 600):  # Maybe Sony
+                try:
+                    nbits = {25:12, 31:15, 41:20}[lb]
+                except KeyError:
+                    pass
+                else:
+                    ok = True
+                    print('Sony {}bit'.format(nbits))
+
+            if not ok and near(burst[0], 889):  # Maybe RC-5
+                if near(duration, 24892) and near(max(burst), 1778):
+                    print('Philps RC-5')
+                    ok = True
+
+            if not ok and near(burst[0], 2666) and near(burst[1], 889):  # RC-6?
+                if near(duration, 22205) and near(burst[1], 889) and near(burst[2], 444):
+                    print('Philips RC-6 mode 0')
+                    ok = True
+
+            if not ok and near(burst[0], 2000) and near(burst[1], 1000):
+                if near(duration, 19000):
+                    print('Microsoft MCE edition protocol.')
+                    # Constant duration, variable burst length, presumably bi-phase
+                    print('Protocol start {} {} Burst length {} duration {}'.format(burst[0], burst[1], lb, duration))
+                    ok = True
+
+            if not ok and near(burst[0], 4500) and near(burst[1], 4500) and lb == 67:  # Samsung
+                print('Samsung')
+                ok = True
+
+            if not ok and near(burst[0], 3500) and near(burst[1], 1680):  # Panasonic?
+                print('Unsupported protocol. Panasonic?')
+                ok = True
+
+            if not ok:
+                print('Unknown protocol start {} {} Burst length {} duration {}'.format(burst[0], burst[1], lb, duration))
+
+            print()
+        self.data = burst
+        # Set up for new data burst. Run null callback
+        self.do_callback(0, 0, 0)
+
+    def acquire(self):
+        while self.data is None:
+            sleep_ms(5)
+        self.close()
+        return self.data
+
+def test():
+    # Define pin according to platform
+    if platform == 'pyboard':
+        pin = Pin('X3', Pin.IN)
+    elif platform == 'esp8266':
+        freq(160000000)
+        pin = Pin(13, Pin.IN)
+    elif platform == 'esp32' or platform == 'esp32_LoBo':
+        pin = Pin(23, Pin.IN)
+    elif platform == 'rp2':
+        pin = Pin(16, Pin.IN)
+    irg = IR_GET(pin)
+    print('Waiting for IR data...')
+    return irg.acquire()
diff --git a/ir_rx/mce.py b/ir_rx/mce.py
new file mode 100644 (file)
index 0000000..c3e3849
--- /dev/null
@@ -0,0 +1,68 @@
+# mce.py Decoder for IR remote control using synchronous code
+# Supports Microsoft MCE edition remote protocol.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+# WARNING: This is experimental and subject to change.
+
+from utime import ticks_us, ticks_diff
+from ir_rx import IR_RX
+
+class MCE(IR_RX):
+    init_cs = 4  # http://www.hifi-remote.com/johnsfine/DecodeIR.html#OrtekMCE says 3
+    def __init__(self, pin, callback, *args):
+        # Block lasts ~19ms and has <= 34 edges
+        super().__init__(pin, 34, 25, callback, *args)
+
+    def decode(self, _):
+        def check(v):
+            if self.init_cs == -1:
+                return True
+            csum = v >> 12
+            cs = self.init_cs
+            for _ in range(12):
+                if v & 1:
+                    cs += 1
+                v >>= 1
+            return cs == csum
+
+        try:
+            t0 = ticks_diff(self._times[1], self._times[0])  # 2000μs mark
+            t1 = ticks_diff(self._times[2], self._times[1])  # 1000μs space
+            if not ((1800 < t0 < 2200) and (800 < t1 < 1200)):
+                raise RuntimeError(self.BADSTART)
+            nedges = self.edge  # No. of edges detected
+            if not 14 <= nedges <= 34:
+                raise RuntimeError(self.OVERRUN if nedges > 28 else self.BADSTART)
+            # Manchester decode
+            mask = 1
+            bit = 1
+            v = 0
+            x = 2
+            for _ in range(16):
+                # -1 convert count to index, -1 because we look ahead
+                if x > nedges - 2:
+                    raise RuntimeError(self.BADBLOCK)
+                # width is 500/1000 nominal
+                width = ticks_diff(self._times[x + 1], self._times[x])
+                if not 250 < width < 1350:
+                    self.verbose and print('Bad block 3 Width', width, 'x', x)
+                    raise RuntimeError(self.BADBLOCK)
+                short = int(width < 750)
+                bit ^= short ^ 1
+                v |= mask if bit else 0
+                mask <<= 1
+                x += 1 + short
+
+            self.verbose and print(bin(v))
+            if not check(v):
+                raise RuntimeError(self.BADDATA)
+            val = (v >> 6) & 0x3f
+            addr = v & 0xf  # Constant for all buttons on my remote
+            ctrl = (v >> 4) & 3
+
+        except RuntimeError as e:
+            val, addr, ctrl = e.args[0], 0, 0
+        # Set up for new data burst and run user callback/error function
+        self.do_callback(val, addr, ctrl)
diff --git a/ir_rx/nec.py b/ir_rx/nec.py
new file mode 100644 (file)
index 0000000..011b13d
--- /dev/null
@@ -0,0 +1,69 @@
+# nec.py Decoder for IR remote control using synchronous code
+# Supports NEC and Samsung protocols.
+# With thanks to J.E. Tannenbaum for information re Samsung protocol
+
+# For a remote using NEC see https://www.adafruit.com/products/389
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020-2022 Released under the MIT license
+
+from utime import ticks_us, ticks_diff
+from ir_rx import IR_RX
+
+class NEC_ABC(IR_RX):
+    def __init__(self, pin, extended, samsung, callback, *args):
+        # Block lasts <= 80ms (extended mode) and has 68 edges
+        super().__init__(pin, 68, 80, callback, *args)
+        self._extended = extended
+        self._addr = 0
+        self._leader = 2500 if samsung else 4000  # 4.5ms for Samsung else 9ms
+
+    def decode(self, _):
+        try:
+            if self.edge > 68:
+                raise RuntimeError(self.OVERRUN)
+            width = ticks_diff(self._times[1], self._times[0])
+            if width < self._leader:  # 9ms leading mark for all valid data
+                raise RuntimeError(self.BADSTART)
+            width = ticks_diff(self._times[2], self._times[1])
+            if width > 3000:  # 4.5ms space for normal data
+                if self.edge < 68:  # Haven't received the correct number of edges
+                    raise RuntimeError(self.BADBLOCK)
+                # Time spaces only (marks are always 562.5µs)
+                # Space is 1.6875ms (1) or 562.5µs (0)
+                # Skip last bit which is always 1
+                val = 0
+                for edge in range(3, 68 - 2, 2):
+                    val >>= 1
+                    if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120:
+                        val |= 0x80000000
+            elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges.
+                raise RuntimeError(self.REPEAT if self.edge == 4 else self.BADREP)  # Treat REPEAT as error.
+            else:
+                raise RuntimeError(self.BADSTART)
+            addr = val & 0xff  # 8 bit addr
+            cmd = (val >> 16) & 0xff
+            if cmd != (val >> 24) ^ 0xff:
+                raise RuntimeError(self.BADDATA)
+            if addr != ((val >> 8) ^ 0xff) & 0xff:  # 8 bit addr doesn't match check
+                if not self._extended:
+                    raise RuntimeError(self.BADADDR)
+                addr |= val & 0xff00  # pass assumed 16 bit address to callback
+            self._addr = addr
+        except RuntimeError as e:
+            cmd = e.args[0]
+            addr = self._addr if cmd == self.REPEAT else 0  # REPEAT uses last address
+        # Set up for new data burst and run user callback
+        self.do_callback(cmd, addr, 0, self.REPEAT)
+
+class NEC_8(NEC_ABC):
+    def __init__(self, pin, callback, *args):
+        super().__init__(pin, False, False, callback, *args)
+
+class NEC_16(NEC_ABC):
+    def __init__(self, pin, callback, *args):
+        super().__init__(pin, True, False, callback, *args)
+
+class SAMSUNG(NEC_ABC):
+    def __init__(self, pin, callback, *args):
+        super().__init__(pin, True, True, callback, *args)
diff --git a/ir_rx/philips.py b/ir_rx/philips.py
new file mode 100644 (file)
index 0000000..cd7feaa
--- /dev/null
@@ -0,0 +1,123 @@
+# philips.py Decoder for IR remote control using synchronous code
+# Supports Philips RC-5 RC-6 mode 0 protocols.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from utime import ticks_us, ticks_diff
+from ir_rx import IR_RX
+
+class RC5_IR(IR_RX):
+    def __init__(self, pin, callback, *args):
+        # Block lasts <= 30ms and has <= 28 edges
+        super().__init__(pin, 28, 30, callback, *args)
+
+    def decode(self, _):
+        try:
+            nedges = self.edge  # No. of edges detected
+            if not 14 <= nedges <= 28:
+                raise RuntimeError(self.OVERRUN if nedges > 28 else self.BADSTART)
+            # Regenerate bitstream
+            bits = 1
+            bit = 1
+            v = 1  # 14 bit bitstream, MSB always 1
+            x = 0
+            while bits < 14:
+                # -1 convert count to index, -1 because we look ahead
+                if x > nedges - 2:
+                    print('Bad block 1 edges', nedges, 'x', x)
+                    raise RuntimeError(self.BADBLOCK)
+                # width is 889/1778 nominal
+                width = ticks_diff(self._times[x + 1], self._times[x])
+                if not 500 < width < 2100:
+                    self.verbose and print('Bad block 3 Width', width, 'x', x)
+                    raise RuntimeError(self.BADBLOCK)
+                short = width < 1334
+                if not short:
+                    bit ^= 1
+                v <<= 1
+                v |= bit
+                bits += 1
+                x += 1 + int(short)
+            self.verbose and print(bin(v))
+            # Split into fields (val, addr, ctrl)
+            val = (v & 0x3f) | (0 if ((v >> 12) & 1) else 0x40)  # Correct the polarity of S2
+            addr = (v >> 6) & 0x1f
+            ctrl = (v >> 11) & 1
+
+        except RuntimeError as e:
+            val, addr, ctrl = e.args[0], 0, 0
+        # Set up for new data burst and run user callback
+        self.do_callback(val, addr, ctrl)
+
+
+class RC6_M0(IR_RX):
+    # Even on Pyboard D the 444μs nominal pulses can be recorded as up to 705μs
+    # Scope shows 360-520 μs (-84μs +76μs relative to nominal)
+    # Header nominal 2666, 889, 444, 889, 444, 444, 444, 444 carrier ON at end
+    hdr = ((1800, 4000), (593, 1333), (222, 750), (593, 1333), (222, 750), (222, 750), (222, 750), (222, 750))
+    def __init__(self, pin, callback, *args):
+        # Block lasts 23ms nominal and has <=44 edges
+        super().__init__(pin, 44, 30, callback, *args)
+
+    def decode(self, _):
+        try:
+            nedges = self.edge  # No. of edges detected
+            if not 22 <= nedges <= 44:
+                raise RuntimeError(self.OVERRUN if nedges > 28 else self.BADSTART)
+            for x, lims in enumerate(self.hdr):
+                width = ticks_diff(self._times[x + 1], self._times[x])
+                if not (lims[0] < width < lims[1]):
+                    self.verbose and print('Bad start', x, width, lims)
+                    raise RuntimeError(self.BADSTART)
+            x += 1
+            width = ticks_diff(self._times[x + 1], self._times[x])
+            # 2nd bit of last 0 is 444μs (0) or 1333μs (1)
+            if not 222 < width < 1555:
+                self.verbose and print('Bad block 1 Width', width, 'x', x)
+                raise RuntimeError(self.BADBLOCK)
+            short = width < 889
+            v = int(not short)
+            bit = v
+            bits = 1  # Bits decoded
+            x += 1 + int(short)
+            width = ticks_diff(self._times[x + 1], self._times[x])
+            if not 222 < width < 1555:
+                self.verbose and print('Bad block 2 Width', width, 'x', x)
+                raise RuntimeError(self.BADBLOCK)
+            short = width < 1111
+            if not short:
+                bit ^= 1
+            x += 1 + int(short)  # If it's short, we know width of next
+            v <<= 1
+            v |= bit  # MSB of result
+            bits += 1
+            # Decode bitstream
+            while bits < 17:
+                # -1 convert count to index, -1 because we look ahead
+                if x > nedges - 2:
+                    raise RuntimeError(self.BADBLOCK)
+                # width is 444/889 nominal
+                width = ticks_diff(self._times[x + 1], self._times[x])
+                if not 222 < width < 1111:
+                    self.verbose and print('Bad block 3 Width', width, 'x', x)
+                    raise RuntimeError(self.BADBLOCK)
+                short = width < 666
+                if not short:
+                    bit ^= 1
+                v <<= 1
+                v |= bit
+                bits += 1
+                x += 1 + int(short)
+
+            if self.verbose:
+                 ss = '20-bit format {:020b} x={} nedges={} bits={}'
+                 print(ss.format(v, x, nedges, bits))
+
+            val = v & 0xff
+            addr = (v >> 8) & 0xff
+            ctrl = (v >> 16) & 1
+        except RuntimeError as e:
+            val, addr, ctrl = e.args[0], 0, 0
+        # Set up for new data burst and run user callback
+        self.do_callback(val, addr, ctrl)
diff --git a/ir_rx/print_error.py b/ir_rx/print_error.py
new file mode 100644 (file)
index 0000000..31ce51e
--- /dev/null
@@ -0,0 +1,19 @@
+# print_error.py Error print for IR receiver
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from ir_rx import IR_RX
+
+_errors = {IR_RX.BADSTART : 'Invalid start pulse',
+           IR_RX.BADBLOCK : 'Error: bad block',
+           IR_RX.BADREP : 'Error: repeat',
+           IR_RX.OVERRUN : 'Error: overrun',
+           IR_RX.BADDATA : 'Error: invalid data',
+           IR_RX.BADADDR : 'Error: invalid address'}
+
+def print_error(data):
+    if data in _errors:
+        print(_errors[data])
+    else:
+        print('Unknown error code:', data)
diff --git a/ir_rx/sony.py b/ir_rx/sony.py
new file mode 100644 (file)
index 0000000..1050356
--- /dev/null
@@ -0,0 +1,70 @@
+# sony.py Decoder for IR remote control using synchronous code
+# Sony SIRC protocol.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from utime import ticks_us, ticks_diff
+from ir_rx import IR_RX
+
+class SONY_ABC(IR_RX):  # Abstract base class
+    def __init__(self, pin, bits, callback, *args):
+        # 20 bit block has 42 edges and lasts <= 39ms nominal. Add 4ms to time
+        # for tolerances except in 20 bit case where timing is tight with a
+        # repeat period of 45ms.
+        t = int(3 + bits * 1.8) + (1 if bits == 20 else 4)
+        super().__init__(pin, 2 + bits * 2, t, callback, *args)
+        self._addr = 0
+        self._bits = 20
+
+    def decode(self, _):
+        try:
+            nedges = self.edge  # No. of edges detected
+            self.verbose and print('nedges', nedges)
+            if nedges > 42:
+                raise RuntimeError(self.OVERRUN)
+            bits = (nedges - 2) // 2
+            if nedges not in (26, 32, 42) or bits > self._bits:
+                raise RuntimeError(self.BADBLOCK)
+            self.verbose and print('SIRC {}bit'.format(bits))
+            width = ticks_diff(self._times[1], self._times[0])
+            if not 1800 < width < 3000:  # 2.4ms leading mark for all valid data
+                raise RuntimeError(self.BADSTART)
+            width = ticks_diff(self._times[2], self._times[1])
+            if not 350 < width < 1000:  # 600μs space
+                raise RuntimeError(self.BADSTART)
+
+            val = 0  # Data received, LSB 1st
+            x = 2
+            bit = 1
+            while x <= nedges - 2:
+                if ticks_diff(self._times[x + 1], self._times[x]) > 900:
+                    val |= bit
+                bit <<= 1
+                x += 2
+            cmd = val & 0x7f  # 7 bit command
+            val >>= 7
+            if nedges < 42:
+                addr = val & 0xff  # 5 or 8 bit addr
+                val = 0
+            else:
+                addr = val & 0x1f  # 5 bit addr
+                val >>= 5  # 8 bit extended
+        except RuntimeError as e:
+            cmd = e.args[0]
+            addr = 0
+            val = 0
+        self.do_callback(cmd, addr, val)
+
+class SONY_12(SONY_ABC):
+    def __init__(self, pin, callback, *args):
+        super().__init__(pin, 12, callback, *args)
+
+class SONY_15(SONY_ABC):
+    def __init__(self, pin, callback, *args):
+        super().__init__(pin, 15, callback, *args)
+
+class SONY_20(SONY_ABC):
+    def __init__(self, pin, callback, *args):
+        super().__init__(pin, 20, callback, *args)
+
diff --git a/ir_rx/test.py b/ir_rx/test.py
new file mode 100644 (file)
index 0000000..c4dbd7f
--- /dev/null
@@ -0,0 +1,70 @@
+# test.py Test program for IR remote control decoder
+# Supports Pyboard, ESP32 and ESP8266
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020-2022 Released under the MIT license
+
+# Run this to characterise a remote.
+
+from sys import platform
+import time
+import gc
+from machine import Pin, freq
+from ir_rx.print_error import print_error  # Optional print of error codes
+
+# Import all implemented classes
+from ir_rx.nec import NEC_8, NEC_16, SAMSUNG
+from ir_rx.sony import SONY_12, SONY_15, SONY_20
+from ir_rx.philips import RC5_IR, RC6_M0
+from ir_rx.mce import MCE
+
+# Define pin according to platform
+if platform == "pyboard":
+    p = Pin("X3", Pin.IN)
+elif platform == "esp8266":
+    freq(160000000)
+    p = Pin(13, Pin.IN)
+elif platform == "esp32" or platform == "esp32_LoBo":
+    p = Pin(23, Pin.IN)
+elif platform == "rp2":
+    p = Pin(16, Pin.IN)
+
+# User callback
+def cb(data, addr, ctrl):
+    if data < 0:  # NEC protocol sends repeat codes.
+        print("Repeat code.")
+    else:
+        print(f"Data 0x{data:02x} Addr 0x{addr:04x} Ctrl 0x{ctrl:02x}")
+
+
+def test(proto=0):
+    classes = (NEC_8, NEC_16, SONY_12, SONY_15, SONY_20, RC5_IR, RC6_M0, MCE, SAMSUNG)
+    ir = classes[proto](p, cb)  # Instantiate receiver
+    ir.error_function(print_error)  # Show debug information
+    # ir.verbose = True
+    # A real application would do something here...
+    try:
+        while True:
+            print("running")
+            time.sleep(5)
+            gc.collect()
+    except KeyboardInterrupt:
+        ir.close()
+
+
+# **** DISPLAY GREETING ****
+s = """Test for IR receiver. Run:
+from ir_rx.test import test
+test() for NEC 8 bit protocol,
+test(1) for NEC 16 bit,
+test(2) for Sony SIRC 12 bit,
+test(3) for Sony SIRC 15 bit,
+test(4) for Sony SIRC 20 bit,
+test(5) for Philips RC-5 protocol,
+test(6) for RC6 mode 0.
+test(7) for Microsoft Vista MCE.
+test(8) for Samsung.
+
+Hit ctrl-c to stop, then ctrl-d to soft reset."""
+
+print(s)
diff --git a/ir_tx/__init__.py b/ir_tx/__init__.py
new file mode 100644 (file)
index 0000000..e791b4a
--- /dev/null
@@ -0,0 +1,133 @@
+# __init__.py Nonblocking IR blaster
+# Runs on Pyboard D or Pyboard 1.x (not Pyboard Lite), ESP32 and RP2
+
+# Released under the MIT License (MIT). See LICENSE.
+
+# Copyright (c) 2020-2021 Peter Hinch
+from sys import platform
+ESP32 = platform == 'esp32'  # Loboris not supported owing to RMT
+RP2 = platform == 'rp2'
+if ESP32:
+    from machine import Pin, PWM
+    from esp32 import RMT
+elif RP2:
+    from .rp2_rmt import RP2_RMT
+else:
+    from pyb import Pin, Timer  # Pyboard does not support machine.PWM
+
+from micropython import const
+from array import array
+from time import ticks_us, ticks_diff
+# import micropython
+# micropython.alloc_emergency_exception_buf(100)
+
+
+# Shared by NEC
+STOP = const(0)  # End of data
+
+# IR abstract base class. Array holds periods in μs between toggling 36/38KHz
+# carrier on or off. Physical transmission occurs in an ISR context controlled
+# by timer 2 and timer 5. See TRANSMITTER.md for details of operation.
+class IR:
+    _active_high = True  # Hardware turns IRLED on if pin goes high.
+    _space = 0  # Duty ratio that causes IRLED to be off
+    timeit = False  # Print timing info
+
+    @classmethod
+    def active_low(cls):
+        if ESP32:
+            raise ValueError('Cannot set active low on ESP32')
+        cls._active_high = False
+        cls._space = 100
+
+    def __init__(self, pin, cfreq, asize, duty, verbose):
+        if ESP32:
+            self._rmt = RMT(0, pin=pin, clock_div=80, tx_carrier = (cfreq, duty, 1))
+            # 1μs resolution
+        elif RP2:  # PIO-based RMT-like device
+            self._rmt = RP2_RMT(pin_pulse=None, carrier=(pin, cfreq, duty))  # 1μs resolution
+            asize += 1  # Allow for possible extra space pulse
+        else:  # Pyboard
+            if not IR._active_high:
+                duty = 100 - duty
+            tim = Timer(2, freq=cfreq)  # Timer 2/pin produces 36/38/40KHz carrier
+            self._ch = tim.channel(1, Timer.PWM, pin=pin)
+            self._ch.pulse_width_percent(self._space)  # Turn off IR LED
+            # Pyboard: 0 <= pulse_width_percent <= 100
+            self._duty = duty
+            self._tim = Timer(5)  # Timer 5 controls carrier on/off times
+        self._tcb = self._cb  # Pre-allocate
+        self._arr = array('H', 0 for _ in range(asize))  # on/off times (μs)
+        self._mva = memoryview(self._arr)
+        # Subclass interface
+        self.verbose = verbose
+        self.carrier = False  # Notional carrier state while encoding biphase
+        self.aptr = 0  # Index into array
+
+    def _cb(self, t):  # T5 callback, generate a carrier mark or space
+        t.deinit()
+        p = self.aptr
+        v = self._arr[p]
+        if v == STOP:
+            self._ch.pulse_width_percent(self._space)  # Turn off IR LED.
+            return
+        self._ch.pulse_width_percent(self._space if p & 1 else self._duty)
+        self._tim.init(prescaler=84, period=v, callback=self._tcb)
+        self.aptr += 1
+
+    # Public interface
+    # Before populating array, zero pointer, set notional carrier state (off).
+    def transmit(self, addr, data, toggle=0, validate=False):  # NEC: toggle is unused
+        t = ticks_us()
+        if validate:
+            if addr > self.valid[0] or addr < 0:
+                raise ValueError('Address out of range', addr)
+            if data > self.valid[1] or data < 0:
+                raise ValueError('Data out of range', data)
+            if toggle > self.valid[2] or toggle < 0:
+                raise ValueError('Toggle out of range', toggle)
+        self.aptr = 0  # Inital conditions for tx: index into array
+        self.carrier = False
+        self.tx(addr, data, toggle)  # Subclass populates ._arr
+        self.trigger()  # Initiate transmission
+        if self.timeit:
+            dt = ticks_diff(ticks_us(), t)
+            print('Time = {}μs'.format(dt))
+
+    # Subclass interface
+    def trigger(self):  # Used by NEC to initiate a repeat frame
+        if ESP32:
+            self._rmt.write_pulses(tuple(self._mva[0 : self.aptr]))
+        elif RP2:
+            self.append(STOP)
+            self._rmt.send(self._arr)
+        else:
+            self.append(STOP)
+            self.aptr = 0  # Reset pointer
+            self._cb(self._tim)  # Initiate physical transmission.
+
+    def append(self, *times):  # Append one or more time peiods to ._arr
+        for t in times:
+            self._arr[self.aptr] = t
+            self.aptr += 1
+            self.carrier = not self.carrier  # Keep track of carrier state
+            self.verbose and print('append', t, 'carrier', self.carrier)
+
+    def add(self, t):  # Increase last time value (for biphase)
+        assert t > 0
+        self.verbose and print('add', t)
+        # .carrier unaffected
+        self._arr[self.aptr - 1] += t
+
+
+# Given an iterable (e.g. list or tuple) of times, emit it as an IR stream.
+class Player(IR):
+
+    def __init__(self, pin, freq=38000, verbose=False):  # NEC specifies 38KHz
+        super().__init__(pin, freq, 68, 33, verbose)  # Measured duty ratio 33%
+
+    def play(self, lst):
+        for x, t in enumerate(lst):
+            self._arr[x] = t
+        self.aptr = x + 1
+        self.trigger()
diff --git a/ir_tx/mce.py b/ir_tx/mce.py
new file mode 100644 (file)
index 0000000..b2b7267
--- /dev/null
@@ -0,0 +1,44 @@
+# mce.py Encoder for IR remote control using synchronous code
+# Supports Microsoft MCE edition remote protocol.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+# WARNING: This is experimental and subject to change.
+
+from micropython import const
+from ir_tx import IR
+
+_TBIT = const(500)  # Time (μs) for pulse of carrier
+
+
+class MCE(IR):
+    valid = (0xf, 0x3f, 3)  # Max addr, data, toggle
+    init_cs = 4  # http://www.hifi-remote.com/johnsfine/DecodeIR.html#OrtekMCE says 3
+
+    def __init__(self, pin, freq=38000, verbose=False):
+        super().__init__(pin, freq, 34, 30, verbose)
+
+    def tx(self, addr, data, toggle):
+        def checksum(v):
+            cs = self.init_cs
+            for _ in range(12):
+                if v & 1:
+                    cs += 1
+                v >>= 1
+            return cs
+
+        self.append(2000, 1000, _TBIT)
+        d = ((data & 0x3f) << 6) | (addr & 0xf)  | ((toggle & 3) << 4)
+        d |= checksum(d) << 12
+        self.verbose and print(bin(d))
+
+        mask = 1
+        while mask < 0x10000:
+            bit = bool(d & mask)
+            if bit ^ self.carrier:
+                self.add(_TBIT)
+                self.append(_TBIT)
+            else:
+                self.append(_TBIT, _TBIT)
+            mask <<= 1
diff --git a/ir_tx/mcetest.py b/ir_tx/mcetest.py
new file mode 100644 (file)
index 0000000..fc473d9
--- /dev/null
@@ -0,0 +1,95 @@
+# ir_tx.mcetest Test for nonblocking MCE IR blaster.
+
+# Released under the MIT License (MIT). See LICENSE.
+
+# Copyright (c) 2020 Peter Hinch
+
+# Implements a 2-button remote control on a Pyboard with auto repeat.
+from sys import platform
+ESP32 = platform == 'esp32'
+if ESP32:
+    from machine import Pin
+else:
+    from pyb import Pin, LED
+
+from micropython import const
+import uasyncio as asyncio
+from aswitch import Switch, Delay_ms
+from ir_tx.mce import MCE
+
+loop = asyncio.get_event_loop()
+_FIRST = const(0)
+_REP = const(1)
+_END = const(2)
+_REP_DELAY = const(60)
+
+class Rbutton:
+    def __init__(self, irb, pin, addr, data, rep_code=False):
+        self.irb = irb
+        self.sw = Switch(pin)
+        self.addr = addr
+        self.data = data
+        self.rep_code = rep_code
+        self.sw.close_func(self.cfunc)
+        self.sw.open_func(self.ofunc)
+        self.tim = Delay_ms(self.repeat)
+        self.stop = False
+
+    def cfunc(self):  # Button push: send data and set up for repeats
+        self.irb.transmit(self.addr, self.data, _FIRST, True)
+        self.tim.trigger(_REP_DELAY)
+
+    def ofunc(self):  # Button release: cancel repeat timer
+        self.stop = True
+
+    async def repeat(self):
+        await asyncio.sleep(0)  # Let timer stop before retriggering
+        if self.stop:  # Button has been released: send last message
+            self.stop = False
+            self.tim.stop()  # Not strictly necessary
+            self.irb.transmit(self.addr, self.data, _END, True)
+        else:
+            self.tim.trigger(_REP_DELAY)
+            self.irb.transmit(self.addr, self.data, _REP, True)
+
+async def main():
+    pin = Pin(23, Pin.OUT, value = 0) if ESP32 else Pin('X1')
+    irb = MCE(pin)  # verbose=True)
+    # Uncomment the following to print transmit timing
+    # irb.timeit = True
+
+    b = []  # Rbutton instances
+    px3 = Pin(18, Pin.IN, Pin.PULL_UP) if ESP32 else Pin('X3', Pin.IN, Pin.PULL_UP)
+    px4 = Pin(19, Pin.IN, Pin.PULL_UP) if ESP32 else Pin('X4', Pin.IN, Pin.PULL_UP)
+    b.append(Rbutton(irb, px3, 0x1, 0x7))
+    b.append(Rbutton(irb, px4, 0xe, 0xb))
+    if ESP32:
+        while True:
+            print('Running')
+            await asyncio.sleep(5)
+    else:
+        led = LED(1)
+        while True:
+            await asyncio.sleep_ms(500)  # Obligatory flashing LED.
+            led.toggle()
+
+# Greeting strings. Common:
+s = '''Test for IR transmitter. Run:
+from ir_tx.mcetest import test
+test()
+'''
+# Pyboard:
+spb = '''
+IR LED on pin X1
+Ground pin X3 to send addr 1 data 7
+Ground pin X4 to send addr 0xe data 0x0b.'''
+# ESP32
+sesp = '''
+IR LED gate on pins 23, 21
+Ground pin 18 to send addr 1 data 7
+Ground pin 19 to send addr 0xe data 0x0b.'''
+
+print(''.join((s, sesp)) if ESP32 else ''.join((s, spb)))
+
+def test():
+    loop.run_until_complete(main())
diff --git a/ir_tx/nec.py b/ir_tx/nec.py
new file mode 100644 (file)
index 0000000..be9ee28
--- /dev/null
@@ -0,0 +1,46 @@
+# nec.py Encoder for IR remote control using synchronous code
+# NEC protocol.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020-2022 Released under the MIT license
+
+# With thanks to J.E. Tannenbaum for information re Samsung protocol
+from micropython import const
+from ir_tx import IR, STOP
+
+_TBURST = const(563)
+_T_ONE = const(1687)
+
+class NEC(IR):
+    valid = (0xffff, 0xff, 0)  # Max addr, data, toggle
+    samsung = False
+
+    def __init__(self, pin, freq=38000, verbose=False):  # NEC specifies 38KHz also Samsung
+        super().__init__(pin, freq, 68, 33, verbose)  # Measured duty ratio 33%
+
+    def _bit(self, b):
+        self.append(_TBURST, _T_ONE if b else _TBURST)
+
+    def tx(self, addr, data, _):  # Ignore toggle
+        if self.samsung:
+            self.append(4500, 4500)
+        else:
+            self.append(9000, 4500)
+        if addr < 256:  # Short address: append complement
+            if self.samsung:
+              addr |= addr << 8
+            else:
+              addr |= ((addr ^ 0xff) << 8)
+        for _ in range(16):
+            self._bit(addr & 1)
+            addr >>= 1
+        data |= ((data ^ 0xff) << 8)
+        for _ in range(16):
+            self._bit(data & 1)
+            data >>= 1
+        self.append(_TBURST)
+
+    def repeat(self):
+        self.aptr = 0
+        self.append(9000, 2250, _TBURST)
+        self.trigger()  # Initiate physical transmission.
diff --git a/ir_tx/philips.py b/ir_tx/philips.py
new file mode 100644 (file)
index 0000000..e82e251
--- /dev/null
@@ -0,0 +1,65 @@
+# philips.py Encoder for IR remote control using synchronous code
+# RC-5 and RC-6 mode 0 protocols.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from micropython import const
+from ir_tx import IR
+
+# Philips RC5 protocol
+_T_RC5 = const(889)  # Time for pulse of carrier
+
+
+class RC5(IR):
+    valid = (0x1f, 0x7f, 1)  # Max addr, data, toggle
+
+    def __init__(self, pin, freq=36000, verbose=False):
+        super().__init__(pin, freq, 28, 30, verbose)
+
+    def tx(self, addr, data, toggle):  # Fix RC5X S2 bit polarity
+        d = (data & 0x3f) | ((addr & 0x1f) << 6) | (((data & 0x40) ^ 0x40) << 6) | ((toggle & 1) << 11)
+        self.verbose and print(bin(d))
+        mask = 0x2000
+        while mask:
+            if mask == 0x2000:
+                self.append(_T_RC5)
+            else:
+                bit = bool(d & mask)
+                if bit ^ self.carrier:
+                    self.add(_T_RC5)
+                    self.append(_T_RC5)
+                else:
+                    self.append(_T_RC5, _T_RC5)
+            mask >>= 1
+
+# Philips RC6 mode 0 protocol
+_T_RC6 = const(444)
+_T2_RC6 = const(889)
+
+class RC6_M0(IR):
+    valid = (0xff, 0xff, 1)  # Max addr, data, toggle
+
+    def __init__(self, pin, freq=36000, verbose=False):
+        super().__init__(pin, freq, 44, 30, verbose)
+
+    def tx(self, addr, data, toggle):
+        # leader, 1, 0, 0, 0
+        self.append(2666, _T2_RC6, _T_RC6, _T2_RC6, _T_RC6, _T_RC6, _T_RC6, _T_RC6, _T_RC6)
+        # Append a single bit of twice duration
+        if toggle:
+            self.add(_T2_RC6)
+            self.append(_T2_RC6)
+        else:
+            self.append(_T2_RC6, _T2_RC6)
+        d = (data & 0xff) | ((addr & 0xff) << 8)
+        mask = 0x8000
+        self.verbose and print('toggle', toggle, self.carrier, bool(d & mask))
+        while mask:
+            bit = bool(d & mask)
+            if bit ^ self.carrier:
+                self.append(_T_RC6, _T_RC6)
+            else:
+                self.add(_T_RC6)
+                self.append(_T_RC6)
+            mask >>= 1
diff --git a/ir_tx/rp2_rmt.py b/ir_tx/rp2_rmt.py
new file mode 100644 (file)
index 0000000..2848419
--- /dev/null
@@ -0,0 +1,107 @@
+# rp2_rmt.py A RMT-like class for the RP2.
+
+# Released under the MIT License (MIT). See LICENSE.
+
+# Copyright (c) 2021 Peter Hinch
+
+from machine import Pin, PWM
+import rp2
+
+@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, autopull=True, pull_thresh=32)
+def pulsetrain():
+    wrap_target()
+    out(x, 32)  # No of 1MHz ticks. Block if FIFO MT at end.
+    irq(rel(0))
+    set(pins, 1)  # Set pin high
+    label('loop')
+    jmp(x_dec,'loop')
+    irq(rel(0))
+    set(pins, 0)  # Set pin low
+    out(y, 32)  # Low time.
+    label('loop_lo')
+    jmp(y_dec,'loop_lo')
+    wrap()
+
+@rp2.asm_pio(autopull=True, pull_thresh=32)
+def irqtrain():
+    wrap_target()
+    out(x, 32)  # No of 1MHz ticks. Block if FIFO MT at end.
+    irq(rel(0))
+    label('loop')
+    jmp(x_dec,'loop')
+    wrap()
+
+class DummyPWM:
+    def duty_u16(self, _):
+        pass
+
+class RP2_RMT:
+
+    def __init__(self, pin_pulse=None, carrier=None, sm_no=0, sm_freq=1_000_000):
+        if carrier is None:
+            self.pwm = DummyPWM()
+            self.duty = (0, 0)
+        else:
+            pin_car, freq, duty = carrier
+            self.pwm = PWM(pin_car)  # Set up PWM with carrier off.
+            self.pwm.freq(freq)
+            self.pwm.duty_u16(0)
+            self.duty = (int(0xffff * duty // 100), 0)
+        if pin_pulse is None:
+            self.sm = rp2.StateMachine(sm_no, irqtrain, freq=sm_freq)
+        else:
+            self.sm = rp2.StateMachine(sm_no, pulsetrain, freq=sm_freq, set_base=pin_pulse)
+        self.apt = 0  # Array index
+        self.arr = None  # Array
+        self.ict = None  # Current IRQ count
+        self.icm = 0  # End IRQ count
+        self.reps = 0  # 0 == forever n == no. of reps
+        rp2.PIO(0).irq(self._cb)
+
+    # IRQ callback. Because of FIFO IRQ's keep arriving after STOP.
+    def _cb(self, pio):
+        self.pwm.duty_u16(self.duty[self.ict & 1])
+        self.ict += 1
+        if d := self.arr[self.apt]:  # If data available feed FIFO
+            self.sm.put(d)
+            self.apt += 1
+        else:
+            if r := self.reps != 1:  # All done if reps == 1
+                if r:  # 0 == run forever
+                    self.reps -= 1
+                self.sm.put(self.arr[0])
+                self.apt = 1  # Set pointer and count to state
+                self.ict = 1  # after 1st IRQ
+
+    # Arg is an array of times in μs terminated by 0. 
+    def send(self, ar, reps=1, check=True):
+        self.sm.active(0)
+        self.reps = reps
+        ar[-1] = 0  # Ensure at least one STOP
+        for x, d in enumerate(ar):  # Find 1st STOP
+            if d == 0:
+                break
+        if check:
+            # Pulse train must end with a space otherwise we leave carrier on.
+            # So, if it ends with a mark, append a space. Note __init__.py
+            # ensures that there is room in array.
+            if (x & 1):
+                ar[x] = 1  # space. Duration doesn't matter.
+                x += 1
+                ar[x] = 0  # STOP
+        self.icm = x  # index of 1st STOP
+        mv = memoryview(ar)
+        n = min(x, 4)  # Fill FIFO if there are enough data points.
+        self.sm.put(mv[0 : n])
+        self.arr = ar  # Initial conditions for ISR
+        self.apt = n  # Point to next data value
+        self.ict = 0  # IRQ count
+        self.sm.active(1)
+
+    def busy(self):
+        if self.ict is None:
+            return False  # Just instantiated
+        return self.ict < self.icm
+
+    def cancel(self):
+        self.reps = 1
diff --git a/ir_tx/sony.py b/ir_tx/sony.py
new file mode 100644 (file)
index 0000000..7372f5d
--- /dev/null
@@ -0,0 +1,48 @@
+# sony.py Encoder for IR remote control using synchronous code
+# Sony SIRC protocol.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from micropython import const
+from ir_tx import IR
+
+class SONY_ABC(IR):
+
+    def __init__(self, pin, bits, freq, verbose):
+        super().__init__(pin, freq, 3 + bits * 2, 30, verbose)
+        if bits not in (12, 15, 20):
+            raise ValueError('bits must be 12, 15 or 20.')
+        self.bits = bits
+
+    def tx(self, addr, data, ext):
+        self.append(2400, 600)
+        bits = self.bits
+        v = data & 0x7f
+        if bits == 12:
+            v |= (addr & 0x1f) << 7
+        elif bits == 15:
+            v |= (addr & 0xff) << 7
+        else:
+            v |= (addr & 0x1f) << 7
+            v |= (ext & 0xff) << 12
+        for _ in range(bits):
+            self.append(1200 if v & 1 else 600, 600)
+            v >>= 1
+
+# Sony specifies 40KHz
+class SONY_12(SONY_ABC):
+    valid = (0x1f, 0x7f, 0)  # Max addr, data, toggle
+    def __init__(self, pin, freq=40000, verbose=False):
+        super().__init__(pin, 12, freq, verbose)
+
+class SONY_15(SONY_ABC):
+    valid = (0xff, 0x7f, 0)  # Max addr, data, toggle
+    def __init__(self, pin, freq=40000, verbose=False):
+        super().__init__(pin, 15, freq, verbose)
+
+class SONY_20(SONY_ABC):
+    valid = (0x1f, 0x7f, 0xff)  # Max addr, data, toggle
+    def __init__(self, pin, freq=40000, verbose=False):
+        super().__init__(pin, 20, freq, verbose)
+
diff --git a/ir_tx/test.py b/ir_tx/test.py
new file mode 100644 (file)
index 0000000..e979b57
--- /dev/null
@@ -0,0 +1,135 @@
+# ir_tx.test Test for nonblocking NEC/SONY/RC-5/RC-6 mode 0 IR blaster.
+
+# Released under the MIT License (MIT). See LICENSE.
+
+# Copyright (c) 2020 Peter Hinch
+
+# Implements a 2-button remote control on a Pyboard with auto repeat.
+from sys import platform
+ESP32 = platform == 'esp32'
+RP2 = platform == 'rp2'
+PYBOARD = platform == 'pyboard'
+if ESP32 or RP2:
+    from machine import Pin
+else:
+    from pyb import Pin, LED
+import uasyncio as asyncio
+from primitives.switch import Switch
+from primitives.delay_ms import Delay_ms
+# Import all implemented classes
+from ir_tx.nec import NEC
+from ir_tx.sony import SONY_12, SONY_15, SONY_20
+from ir_tx.philips import RC5, RC6_M0
+
+loop = asyncio.get_event_loop()
+
+# If button is held down normal behaviour is to retransmit
+# but most NEC models send a REPEAT code
+class Rbutton:
+    toggle = 1  # toggle is ignored in NEC mode
+    def __init__(self, irb, pin, addr, data, proto):
+        self.irb = irb
+        self.sw = Switch(pin)
+        self.addr = addr
+        self.data = data
+        self.proto = proto
+
+        self.sw.close_func(self.cfunc)
+        self.sw.open_func(self.ofunc)
+        self.tim = Delay_ms(self.repeat)
+
+    def cfunc(self):  # Button push: send data
+        tog = 0 if self.proto < 3 else Rbutton.toggle  # NEC, sony 12, 15: toggle==0
+        self.irb.transmit(self.addr, self.data, tog, True)  # Test validation
+        #print(self.data)
+        # Auto repeat. The Sony protocol specifies 45ms but this is tight.
+        # In 20 bit mode a data burst can be upto 39ms long.
+        self.tim.trigger(108)
+
+    def ofunc(self):  # Button release: cancel repeat timer
+        self.tim.stop()
+        Rbutton.toggle ^= 1  # Toggle control
+
+    async def repeat(self):
+        await asyncio.sleep(0)  # Let timer stop before retriggering
+        if not self.sw():  # Button is still pressed: retrigger
+            self.tim.trigger(108)
+            if self.proto == 0:
+                self.irb.repeat()  # NEC special case: send REPEAT code
+                #print(self.data)
+            else:
+                tog = 0 if self.proto < 3 else Rbutton.toggle  # NEC, sony 12, 15: toggle==0
+                self.irb.transmit(self.addr, self.data, tog, True)  # Test validation
+                #print(self.data)
+
+async def main(proto):
+    # Test uses a 38KHz carrier.
+    if ESP32:  # Pins for IR LED gate
+        pin = Pin(23, Pin.OUT, value = 0)
+    elif RP2:
+        pin = Pin(17, Pin.OUT, value = 0)
+    else:
+        pin = Pin('X1')
+    classes = (NEC, SONY_12, SONY_15, SONY_20, RC5, RC6_M0)
+    irb = classes[proto](pin, 38000)  # My decoder chip is 38KHz
+    # Uncomment the following to print transmit timing
+    # irb.timeit = True
+
+    b = []  # Rbutton instances
+    px3 = Pin('X3', Pin.IN, Pin.PULL_UP) if PYBOARD else Pin(18, Pin.IN, Pin.PULL_UP)
+    px4 = Pin('X4', Pin.IN, Pin.PULL_UP) if PYBOARD else Pin(19, Pin.IN, Pin.PULL_UP)
+    b.append(Rbutton(irb, px3, 0xef00, 0x03, proto))
+    b.append(Rbutton(irb, px4, 0xef00, 0x02, proto))
+    if ESP32:
+        while True:
+            print('Running')
+            await asyncio.sleep(5)
+    elif RP2:
+        led = Pin(25, Pin.OUT)
+        while True:
+            await asyncio.sleep_ms(500)  # Obligatory flashing LED.
+            led(not led())
+    else:
+        led = LED(1)
+        while True:
+            await asyncio.sleep_ms(500)  # Obligatory flashing LED.
+            led.toggle()
+
+# Greeting strings. Common:
+s = '''Test for IR transmitter. Run:
+from ir_tx.test import test
+test() for NEC protocol
+test(1) for Sony SIRC 12 bit
+test(2) for Sony SIRC 15 bit
+test(3) for Sony SIRC 20 bit
+test(4) for Philips RC-5 protocol
+test(5) for Philips RC-6 mode 0.
+'''
+
+# Pyboard:
+spb = '''
+IR LED on pin X1
+Ground pin X3 to send addr 1 data 7
+Ground pin X4 to send addr 0x10 data 0x0b.'''
+
+# ESP32
+sesp = '''
+IR LED gate on pin 23
+Ground pin 18 to send addr 1 data 7
+Ground pin 19 to send addr 0x10 data 0x0b.'''
+
+# RP2
+srp2 = '''
+IR LED gate on pin 17
+Ground pin 18 to send addr 1 data 7
+Ground pin 19 to send addr 0x10 data 0x0b.'''
+
+if ESP32:
+    print(''.join((s, sesp)))
+elif RP2:
+    print(''.join((s, srp2)))
+else:
+    print(''.join((s, spb)))
+
+def test(proto=0):
+    loop.run_until_complete(main(proto))
diff --git a/main.py b/main.py
new file mode 100644 (file)
index 0000000..9e5ee6c
--- /dev/null
+++ b/main.py
@@ -0,0 +1,2 @@
+from ir_tx.test import test
+test()
\ No newline at end of file
diff --git a/primitives/__init__.py b/primitives/__init__.py
new file mode 100644 (file)
index 0000000..1dab8ba
--- /dev/null
@@ -0,0 +1,62 @@
+# __init__.py Common functions for uasyncio primitives
+
+# Copyright (c) 2018-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+try:
+    import uasyncio as asyncio
+except ImportError:
+    import asyncio
+
+
+async def _g():
+    pass
+type_coro = type(_g())
+
+# If a callback is passed, run it and return.
+# If a coro is passed initiate it and return.
+# coros are passed by name i.e. not using function call syntax.
+def launch(func, tup_args):
+    res = func(*tup_args)
+    if isinstance(res, type_coro):
+        res = asyncio.create_task(res)
+    return res
+
+def set_global_exception():
+    def _handle_exception(loop, context):
+        import sys
+        sys.print_exception(context["exception"])
+        sys.exit()
+    loop = asyncio.get_event_loop()
+    loop.set_exception_handler(_handle_exception)
+
+_attrs = {
+    "AADC": "aadc",
+    "Barrier": "barrier",
+    "Condition": "condition",
+    "Delay_ms": "delay_ms",
+    "Encode": "encoder_async",
+    "Pushbutton": "pushbutton",
+    "ESP32Touch": "pushbutton",
+    "Queue": "queue",
+    "Semaphore": "semaphore",
+    "BoundedSemaphore": "semaphore",
+    "Switch": "switch",
+    "WaitAll": "events",
+    "WaitAny": "events",
+    "ESwitch": "events",
+    "EButton": "events",
+    "RingbufQueue": "ringbuf_queue",
+}
+
+# Copied from uasyncio.__init__.py
+# Lazy loader, effectively does:
+#   global attr
+#   from .mod import attr
+def __getattr__(attr):
+    mod = _attrs.get(attr, None)
+    if mod is None:
+        raise AttributeError(attr)
+    value = getattr(__import__(mod, None, None, True, 1), attr)
+    globals()[attr] = value
+    return value
diff --git a/primitives/aadc.py b/primitives/aadc.py
new file mode 100644 (file)
index 0000000..5208520
--- /dev/null
@@ -0,0 +1,67 @@
+# aadc.py AADC (asynchronous ADC) class
+
+# Copyright (c) 2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+import io
+
+MP_STREAM_POLL_RD = const(1)
+MP_STREAM_POLL = const(3)
+MP_STREAM_ERROR = const(-1)
+
+class AADC(io.IOBase):
+    def __init__(self, adc):
+        self._adc = adc
+        self._lower = 0
+        self._upper = 65535
+        self._pol = True
+        self._last = None
+        self._sreader = asyncio.StreamReader(self)
+
+    def __iter__(self):
+        b = yield from self._sreader.read(2)
+        return int.from_bytes(b, 'little')
+
+    def _adcread(self):
+        self._last = self._adc.read_u16()
+        return self._last
+
+    def read(self, n):  # For use by StreamReader only
+        return int.to_bytes(self._last, 2, 'little')
+
+    def ioctl(self, req, arg):
+        ret = MP_STREAM_ERROR
+        if req == MP_STREAM_POLL:
+            ret = 0
+            if arg & MP_STREAM_POLL_RD:
+                if self._pol ^ (self._lower <= self._adcread() <= self._upper):
+                    ret |= MP_STREAM_POLL_RD
+        return ret
+
+    # *** API ***
+
+    # If normal will pause until ADC value is in range
+    # Otherwise will pause until value is out of range
+    def sense(self, normal):
+        self._pol = normal
+
+    def read_u16(self, last=False):
+        if last:
+            return self._last
+        return self._adcread()
+
+    # Call syntax: set limits for trigger
+    # lower is None: leave limits unchanged.
+    # upper is None: treat lower as relative to current value.
+    # both have values: treat as absolute limits.
+    def __call__(self, lower=None, upper=None):
+        if lower is not None:
+            if upper is None:  # Relative limit
+                r = self._adcread() if self._last is None else self._last
+                self._lower = r - lower
+                self._upper = r + lower
+            else:  # Absolute limits
+                self._lower = lower
+                self._upper = upper
+        return self
diff --git a/primitives/barrier.py b/primitives/barrier.py
new file mode 100644 (file)
index 0000000..445f4ed
--- /dev/null
@@ -0,0 +1,53 @@
+# barrier.py
+# Copyright (c) 2018-2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+# Now uses Event rather than polling.
+
+try:
+    import uasyncio as asyncio
+except ImportError:
+    import asyncio
+
+from . import launch
+
+# A Barrier synchronises N coros. Each issues await barrier.
+# Execution pauses until all other participant coros are waiting on it.
+# At that point the callback is executed. Then the barrier is 'opened' and
+# execution of all participants resumes.
+
+class Barrier():
+    def __init__(self, participants, func=None, args=()):
+        self._participants = participants
+        self._count = participants
+        self._func = func
+        self._args = args
+        self._res = None
+        self._evt = asyncio.Event()
+
+    def __await__(self):
+        if self.trigger():
+            return  # Other tasks have already reached barrier
+        await self._evt.wait()  # Wait until last task reaches it
+
+    __iter__ = __await__
+
+    def result(self):
+        return self._res
+
+    def trigger(self):
+        self._count -=1
+        if self._count < 0:
+            raise ValueError('Too many tasks accessing Barrier')
+        if self._count > 0:
+            return False  # At least 1 other task has not reached barrier
+        # All other tasks are waiting
+        if self._func is not None:
+            self._res = launch(self._func, self._args)
+        self._count = self._participants
+        self._evt.set()  # Release others
+        self._evt.clear()
+        return True
+
+    def busy(self):
+        return  self._count < self._participants
diff --git a/primitives/condition.py b/primitives/condition.py
new file mode 100644 (file)
index 0000000..20df097
--- /dev/null
@@ -0,0 +1,68 @@
+# condition.py
+
+# Copyright (c) 2018-2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+try:
+    import uasyncio as asyncio
+except ImportError:
+    import asyncio
+
+# Condition class
+# from primitives.condition import Condition
+
+class Condition():
+    def __init__(self, lock=None):
+        self.lock = asyncio.Lock() if lock is None else lock
+        self.events = []
+
+    async def acquire(self):
+        await self.lock.acquire()
+
+# enable this syntax:
+# with await condition [as cond]:
+    def __await__(self):
+        await self.lock.acquire()
+        return self
+
+    __iter__ = __await__
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *_):
+        self.lock.release()
+
+    def locked(self):
+        return self.lock.locked()
+
+    def release(self):
+        self.lock.release()  # Will raise RuntimeError if not locked
+
+    def notify(self, n=1):  # Caller controls lock
+        if not self.lock.locked():
+            raise RuntimeError('Condition notify with lock not acquired.')
+        for _ in range(min(n, len(self.events))):
+            ev = self.events.pop()
+            ev.set()
+
+    def notify_all(self):
+        self.notify(len(self.events))
+
+    async def wait(self):
+        if not self.lock.locked():
+            raise RuntimeError('Condition wait with lock not acquired.')
+        ev = asyncio.Event()
+        self.events.append(ev)
+        self.lock.release()
+        await ev.wait()
+        await self.lock.acquire()
+        assert ev not in self.events, 'condition wait assertion fail'
+        return True  # CPython compatibility
+
+    async def wait_for(self, predicate):
+        result = predicate()
+        while not result:
+            await self.wait()
+            result = predicate()
+        return result
diff --git a/primitives/delay_ms.py b/primitives/delay_ms.py
new file mode 100644 (file)
index 0000000..bfed02d
--- /dev/null
@@ -0,0 +1,80 @@
+# delay_ms.py Now uses ThreadSafeFlag and has extra .wait() API
+# Usage:
+# from primitives import Delay_ms
+
+# Copyright (c) 2018-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+from utime import ticks_add, ticks_diff, ticks_ms
+from . import launch
+
+class Delay_ms:
+
+    class DummyTimer:  # Stand-in for the timer class. Can be cancelled.
+        def cancel(self):
+            pass
+    _fake = DummyTimer()
+
+    def __init__(self, func=None, args=(), duration=1000):
+        self._func = func
+        self._args = args
+        self._durn = duration  # Default duration
+        self._retn = None  # Return value of launched callable
+        self._tend = None  # Stop time (absolute ms).
+        self._busy = False
+        self._trig = asyncio.ThreadSafeFlag()
+        self._tout = asyncio.Event()  # Timeout event
+        self.wait = self._tout.wait  # Allow: await wait_ms.wait()
+        self.clear = self._tout.clear
+        self.set = self._tout.set
+        self._ttask = self._fake  # Timer task
+        self._mtask = asyncio.create_task(self._run()) #Main task
+
+    async def _run(self):
+        while True:
+            await self._trig.wait()  # Await a trigger
+            self._ttask.cancel()  # Cancel and replace
+            await asyncio.sleep_ms(0)
+            dt = max(ticks_diff(self._tend, ticks_ms()), 0)  # Beware already elapsed.
+            self._ttask = asyncio.create_task(self._timer(dt))
+
+    async def _timer(self, dt):
+        await asyncio.sleep_ms(dt)
+        self._tout.set()  # Only gets here if not cancelled.
+        self._busy = False
+        if self._func is not None:
+            self._retn = launch(self._func, self._args)
+
+# API
+    # trigger may be called from hard ISR.
+    def trigger(self, duration=0):  # Update absolute end time, 0-> ctor default
+        if self._mtask is None:
+            raise RuntimeError("Delay_ms.deinit() has run.")
+        self._tend = ticks_add(ticks_ms(), duration if duration > 0 else self._durn)
+        self._retn = None  # Default in case cancelled.
+        self._busy = True
+        self._trig.set()
+
+    def stop(self):
+        self._ttask.cancel()
+        self._ttask = self._fake
+        self._busy = False
+        self._tout.clear()
+
+    def __call__(self):  # Current running status
+        return self._busy
+
+    running = __call__
+
+    def rvalue(self):
+        return self._retn
+
+    def callback(self, func=None, args=()):
+        self._func = func
+        self._args = args
+
+    def deinit(self):
+        self.stop()
+        self._mtask.cancel()
+        self._mtask = None
diff --git a/primitives/encoder.py b/primitives/encoder.py
new file mode 100644 (file)
index 0000000..759422b
--- /dev/null
@@ -0,0 +1,81 @@
+# encoder.py Asynchronous driver for incremental quadrature encoder.
+
+# Copyright (c) 2021-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+# Thanks are due to @ilium007 for identifying the issue of tracking detents,
+# https://github.com/peterhinch/micropython-async/issues/82.
+# Also to Mike Teachman (@miketeachman) for design discussions and testing
+# against a state table design
+# https://github.com/miketeachman/micropython-rotary/blob/master/rotary.py
+
+import uasyncio as asyncio
+from machine import Pin
+
+class Encoder:
+
+    def __init__(self, pin_x, pin_y, v=0, div=1, vmin=None, vmax=None,
+                 mod=None, callback=lambda a, b : None, args=(), delay=100):
+        self._pin_x = pin_x
+        self._pin_y = pin_y
+        self._x = pin_x()
+        self._y = pin_y()
+        self._v = v * div  # Initialise hardware value
+        self._cv = v  # Current (divided) value
+        self.delay = delay  # Pause (ms) for motion to stop/limit callback frequency
+
+        if ((vmin is not None) and v < vmin) or ((vmax is not None) and v > vmax):
+            raise ValueError('Incompatible args: must have vmin <= v <= vmax')
+        self._tsf = asyncio.ThreadSafeFlag()
+        trig = Pin.IRQ_RISING | Pin.IRQ_FALLING
+        try:
+            xirq = pin_x.irq(trigger=trig, handler=self._x_cb, hard=True)
+            yirq = pin_y.irq(trigger=trig, handler=self._y_cb, hard=True)
+        except TypeError:  # hard arg is unsupported on some hosts
+            xirq = pin_x.irq(trigger=trig, handler=self._x_cb)
+            yirq = pin_y.irq(trigger=trig, handler=self._y_cb)
+        asyncio.create_task(self._run(vmin, vmax, div, mod, callback, args))
+
+    # Hardware IRQ's. Duration 36μs on Pyboard 1 ~50μs on ESP32.
+    # IRQ latency: 2nd edge may have occured by the time ISR runs, in
+    # which case there is no movement.
+    def _x_cb(self, pin_x):
+        if (x := pin_x()) != self._x:
+            self._x = x
+            self._v += 1 if x ^ self._pin_y() else -1
+            self._tsf.set()
+
+    def _y_cb(self, pin_y):
+        if (y := pin_y()) != self._y:
+            self._y = y
+            self._v -= 1 if y ^ self._pin_x() else -1
+            self._tsf.set()
+
+    async def _run(self, vmin, vmax, div, mod, cb, args):
+        pv = self._v  # Prior hardware value
+        pcv = self._cv  # Prior divided value passed to callback
+        lcv = pcv  # Current value after limits applied
+        plcv = pcv  # Previous value after limits applied
+        delay = self.delay
+        while True:
+            await self._tsf.wait()
+            await asyncio.sleep_ms(delay)  # Wait for motion to stop.
+            hv = self._v  # Sample hardware (atomic read).
+            if hv == pv:  # A change happened but was negated before
+                continue  # this got scheduled. Nothing to do.
+            pv = hv
+            cv = round(hv / div)  # cv is divided value.
+            if not (dv := cv - pcv):  # dv is change in divided value.
+                continue  # No change
+            lcv += dv  # lcv: divided value with limits/mod applied
+            lcv = lcv if vmax is None else min(vmax, lcv)
+            lcv = lcv if vmin is None else max(vmin, lcv)
+            lcv = lcv if mod is None else lcv % mod
+            self._cv = lcv  # update ._cv for .value() before CB.
+            if lcv != plcv:
+                cb(lcv, lcv - plcv, *args)  # Run user CB in uasyncio context
+            pcv = cv
+            plcv = lcv
+
+    def value(self):
+        return self._cv
diff --git a/primitives/events.py b/primitives/events.py
new file mode 100644 (file)
index 0000000..4e5dd7e
--- /dev/null
@@ -0,0 +1,166 @@
+# events.py Event based primitives
+
+# Copyright (c) 2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+from . import Delay_ms
+
+# An Event-like class that can wait on an iterable of Event-like instances.
+# .wait pauses until any passed event is set.
+class WaitAny:
+    def __init__(self, events):
+        self.events = events
+        self.trig_event = None
+        self.evt = asyncio.Event()
+
+    async def wait(self):
+        tasks = [asyncio.create_task(self.wt(event)) for event in self.events]
+        try:
+            await self.evt.wait()
+        finally:
+            self.evt.clear()
+            for task in tasks:
+                task.cancel()
+        return self.trig_event
+
+    async def wt(self, event):
+        await event.wait()
+        self.evt.set()
+        self.trig_event = event
+    def event(self):
+        return self.trig_event
+
+    def clear(self):
+        for evt in (x for x in self.events if hasattr(x, 'clear')):
+            evt.clear()
+
+# An Event-like class that can wait on an iterable of Event-like instances,
+# .wait pauses until all passed events have been set.
+class WaitAll:
+    def __init__(self, events):
+        self.events = events
+
+    async def wait(self):
+        async def wt(event):
+            await event.wait()
+        tasks = (asyncio.create_task(wt(event)) for event in self.events)
+        try:
+            await asyncio.gather(*tasks)
+        finally:  # May be subject to timeout or cancellation
+            for task in tasks:
+                task.cancel()
+
+    def clear(self):
+        for evt in (x for x in self.events if hasattr(x, 'clear')):
+            evt.clear()
+
+# Minimal switch class having an Event based interface
+class ESwitch:
+    debounce_ms = 50
+
+    def __init__(self, pin, lopen=1):  # Default is n/o switch returned to gnd
+        self._pin = pin # Should be initialised for input with pullup
+        self._lopen = lopen  # Logic level in "open" state
+        self.open = asyncio.Event()
+        self.close = asyncio.Event()
+        self._state = self._pin() ^ self._lopen  # Get initial state
+        asyncio.create_task(self._poll(ESwitch.debounce_ms))
+
+    async def _poll(self, dt):  # Poll the button
+        while True:
+            if (s := self._pin() ^ self._lopen) != self._state:  # 15μs
+                self._state = s
+                self._cf() if s else self._of()
+            await asyncio.sleep_ms(dt)  # Wait out bounce
+
+    def _of(self):
+        self.open.set()
+
+    def _cf(self):
+        self.close.set()
+
+    # ***** API *****
+    # Return current state of switch (0 = pressed)
+    def __call__(self):
+        return self._state
+
+    def deinit(self):
+        self._poll.cancel()
+        self.open.clear()
+        self.close.clear()
+
+# Minimal pushbutton class having an Event based interface
+class EButton:
+    debounce_ms = 50  # Attributes can be varied by user
+    long_press_ms = 1000
+    double_click_ms = 400
+
+    def __init__(self, pin, suppress=False, sense=None):
+        self._pin = pin  # Initialise for input
+        self._supp = suppress
+        self._sense = pin() if sense is None else sense
+        self._state = self.rawstate()  # Initial logical state
+        self._ltim = Delay_ms(duration = EButton.long_press_ms)
+        self._dtim = Delay_ms(duration = EButton.double_click_ms)
+        self.press = asyncio.Event()  # *** API ***
+        self.double = asyncio.Event()
+        self.long = asyncio.Event()
+        self.release = asyncio.Event()  # *** END API ***
+        self._tasks = [asyncio.create_task(self._poll(EButton.debounce_ms))]  # Tasks run forever. Poll contacts
+        self._tasks.append(asyncio.create_task(self._ltf()))  # Handle long press
+        if suppress:
+            self._tasks.append(asyncio.create_task(self._dtf()))  # Double timer
+
+    async def _poll(self, dt):  # Poll the button
+        while True:
+            if (s := self.rawstate()) != self._state:
+                self._state = s
+                self._pf() if s else self._rf()
+            await asyncio.sleep_ms(dt)  # Wait out bounce
+
+    def _pf(self):  # Button press
+        if not self._supp:
+            self.press.set()  # User event
+        if not self._ltim():  # Don't retrigger long timer if already running
+            self._ltim.trigger()
+        if self._dtim():  # Press occurred while _dtim is running
+            self.double.set()  # User event
+            self._dtim.stop()  # _dtim's Event is only used if suppress
+        else:
+            self._dtim.trigger()
+
+    def _rf(self):  # Button release
+        self._ltim.stop()
+        if not self._supp or not self._dtim():  # If dtim running postpone release otherwise it
+            self.release.set()  # is set before press
+
+    async def _ltf(self):  # Long timeout
+        while True:
+            await self._ltim.wait()
+            self._ltim.clear()  # Clear the event
+            self.long.set()  # User event
+            
+    async def _dtf(self):  # Double timeout (runs if suppress is set)
+        while True:
+            await self._dtim.wait()
+            self._dtim.clear()  # Clear the event
+            if not self._ltim():  # Button was released
+                self.press.set()  # User events
+                self.release.set()
+
+    # ****** API ******
+    # Current non-debounced logical button state: True == pressed
+    def rawstate(self):
+        return bool(self._pin() ^ self._sense)
+
+    # Current debounced state of button (True == pressed)
+    def __call__(self):
+        return self._state
+
+    def deinit(self):
+        for task in self._tasks:
+            task.cancel()
+        for evt in (self.press, self.double, self.long, self.release):
+            evt.clear()
diff --git a/primitives/pushbutton.py b/primitives/pushbutton.py
new file mode 100644 (file)
index 0000000..dff541f
--- /dev/null
@@ -0,0 +1,157 @@
+# pushbutton.py
+
+# Copyright (c) 2018-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+import utime as time
+from . import launch, Delay_ms
+
+try:
+    from machine import TouchPad
+except ImportError:
+    pass
+
+
+class Pushbutton:
+    debounce_ms = 50
+    long_press_ms = 1000
+    double_click_ms = 400
+
+    def __init__(self, pin, suppress=False, sense=None):
+        self._pin = pin  # Initialise for input
+        self._supp = suppress
+        self._dblpend = False  # Doubleclick waiting for 2nd click
+        self._dblran = False  # Doubleclick executed user function
+        self._tf = False
+        self._ff = False
+        self._df = False
+        self._ld = False  # Delay_ms instance for long press
+        self._dd = False  # Ditto for doubleclick
+        # Convert from electrical to logical value
+        self._sense = pin.value() if sense is None else sense
+        self._state = self.rawstate()  # Initial state
+        self._run = asyncio.create_task(self._go())  # Thread runs forever
+
+    async def _go(self):
+        while True:
+            self._check(self.rawstate())
+            # Ignore state changes until switch has settled. Also avoid hogging CPU.
+            # See https://github.com/peterhinch/micropython-async/issues/69
+            await asyncio.sleep_ms(Pushbutton.debounce_ms)
+
+    def _check(self, state):
+        if state == self._state:
+            return
+        # State has changed: act on it now.
+        self._state = state
+        if state:  # Button pressed: launch pressed func
+            if self._tf:
+                launch(self._tf, self._ta)
+            if self._ld:  # There's a long func: start long press delay
+                self._ld.trigger(Pushbutton.long_press_ms)
+            if self._df:
+                if self._dd():  # Second click: timer running
+                    self._dd.stop()
+                    self._dblpend = False
+                    self._dblran = True  # Prevent suppressed launch on release
+                    launch(self._df, self._da)
+                else:
+                    # First click: start doubleclick timer
+                    self._dd.trigger(Pushbutton.double_click_ms)
+                    self._dblpend = True  # Prevent suppressed launch on release
+        else:  # Button release. Is there a release func?
+            if self._ff:
+                if self._supp:
+                    d = self._ld
+                    # If long delay exists, is running and doubleclick status is OK
+                    if not self._dblpend and not self._dblran:
+                        if (d and d()) or not d:
+                            launch(self._ff, self._fa)
+                else:
+                    launch(self._ff, self._fa)
+            if self._ld:
+                self._ld.stop()  # Avoid interpreting a second click as a long push
+            self._dblran = False
+
+    def _ddto(self):  # Doubleclick timeout: no doubleclick occurred
+        self._dblpend = False
+        if self._supp and not self._state:
+            if not self._ld or (self._ld and not self._ld()):
+                launch(self._ff, self._fa)
+
+    # ****** API ******
+    def press_func(self, func=False, args=()):
+        if func is None:
+            self.press = asyncio.Event()
+        self._tf = self.press.set if func is None else func
+        self._ta = args
+
+    def release_func(self, func=False, args=()):
+        if func is None:
+            self.release = asyncio.Event()
+        self._ff = self.release.set if func is None else func
+        self._fa = args
+
+    def double_func(self, func=False, args=()):
+        if func is None:
+            self.double = asyncio.Event()
+            func = self.double.set
+        self._df = func
+        self._da = args
+        if func:  # If double timer already in place, leave it
+            if not self._dd:
+                self._dd = Delay_ms(self._ddto)
+        else:
+            self._dd = False  # Clearing down double func
+
+    def long_func(self, func=False, args=()):
+        if func is None:
+            self.long = asyncio.Event()
+            func = self.long.set
+        if func:
+            if self._ld:
+                self._ld.callback(func, args)
+            else:
+                self._ld = Delay_ms(func, args)
+        else:
+            self._ld = False
+
+    # Current non-debounced logical button state: True == pressed
+    def rawstate(self):
+        return bool(self._pin() ^ self._sense)
+
+    # Current debounced state of button (True == pressed)
+    def __call__(self):
+        return self._state
+
+    def deinit(self):
+        self._run.cancel()
+
+
+class ESP32Touch(Pushbutton):
+    thresh = (80 << 8) // 100
+
+    @classmethod
+    def threshold(cls, val):
+        if not (isinstance(val, int) and 0 < val < 100):
+            raise ValueError("Threshold must be in range 1-99")
+        cls.thresh = (val << 8) // 100
+
+    def __init__(self, pin, suppress=False):
+        self._thresh = 0  # Detection threshold
+        self._rawval = 0
+        try:
+            self._pad = TouchPad(pin)
+        except ValueError:
+            raise ValueError(pin)  # Let's have a bit of information :)
+        super().__init__(pin, suppress, False)
+
+    # Current logical button state: True == touched
+    def rawstate(self):
+        rv = self._pad.read()  # ~220μs
+        if rv > self._rawval:  # Either initialisation or pad was touched
+            self._rawval = rv  # when initialised and has now been released
+            self._thresh = (rv * ESP32Touch.thresh) >> 8
+            return False  # Untouched
+        return rv < self._thresh
diff --git a/primitives/queue.py b/primitives/queue.py
new file mode 100644 (file)
index 0000000..405c857
--- /dev/null
@@ -0,0 +1,73 @@
+# queue.py: adapted from uasyncio V2
+
+# Copyright (c) 2018-2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+# Code is based on Paul Sokolovsky's work.
+# This is a temporary solution until uasyncio V3 gets an efficient official version
+
+import uasyncio as asyncio
+
+
+# Exception raised by get_nowait().
+class QueueEmpty(Exception):
+    pass
+
+
+# Exception raised by put_nowait().
+class QueueFull(Exception):
+    pass
+
+class Queue:
+
+    def __init__(self, maxsize=0):
+        self.maxsize = maxsize
+        self._queue = []
+        self._evput = asyncio.Event()  # Triggered by put, tested by get
+        self._evget = asyncio.Event()  # Triggered by get, tested by put
+
+    def _get(self):
+        self._evget.set()  # Schedule all tasks waiting on get
+        self._evget.clear()
+        return self._queue.pop(0)
+
+    async def get(self):  #  Usage: item = await queue.get()
+        while self.empty():  # May be multiple tasks waiting on get()
+            # Queue is empty, suspend task until a put occurs
+            # 1st of N tasks gets, the rest loop again
+            await self._evput.wait()
+        return self._get()
+
+    def get_nowait(self):  # Remove and return an item from the queue.
+        # Return an item if one is immediately available, else raise QueueEmpty.
+        if self.empty():
+            raise QueueEmpty()
+        return self._get()
+
+    def _put(self, val):
+        self._evput.set()  # Schedule tasks waiting on put
+        self._evput.clear()
+        self._queue.append(val)
+
+    async def put(self, val):  # Usage: await queue.put(item)
+        while self.full():
+            # Queue full
+            await self._evget.wait()
+            # Task(s) waiting to get from queue, schedule first Task
+        self._put(val)
+
+    def put_nowait(self, val):  # Put an item into the queue without blocking.
+        if self.full():
+            raise QueueFull()
+        self._put(val)
+
+    def qsize(self):  # Number of items in the queue.
+        return len(self._queue)
+
+    def empty(self):  # Return True if the queue is empty, False otherwise.
+        return len(self._queue) == 0
+
+    def full(self):  # Return True if there are maxsize items in the queue.
+        # Note: if the Queue was initialized with maxsize=0 (the default) or
+        # any negative number, then full() is never True.
+        return self.maxsize > 0 and self.qsize() >= self.maxsize
diff --git a/primitives/ringbuf_queue.py b/primitives/ringbuf_queue.py
new file mode 100644 (file)
index 0000000..be44c2a
--- /dev/null
@@ -0,0 +1,67 @@
+# ringbuf_queue.py Provides RingbufQueue class
+
+# Copyright (c) 2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+# API differs from CPython
+# Uses pre-allocated ring buffer: can use list or array
+# Asynchronous iterator allowing consumer to use async for
+# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded.
+
+import uasyncio as asyncio
+
+
+class RingbufQueue:  # MicroPython optimised
+    def __init__(self, buf):
+        self._q = buf
+        self._size = len(buf)
+        self._wi = 0
+        self._ri = 0
+        self._evput = asyncio.Event()  # Triggered by put, tested by get
+        self._evget = asyncio.Event()  # Triggered by get, tested by put
+
+    def full(self):
+        return ((self._wi + 1) % self._size) == self._ri
+
+    def empty(self):
+        return self._ri == self._wi
+
+    def qsize(self):
+        return (self._wi - self._ri) % self._size
+
+    def get_nowait(self):  # Remove and return an item from the queue.
+        # Return an item if one is immediately available, else raise QueueEmpty.
+        if self.empty():
+            raise IndexError
+        r = self._q[self._ri]
+        self._ri = (self._ri + 1) % self._size
+        self._evget.set()  # Schedule all tasks waiting on ._evget
+        self._evget.clear()
+        return r
+
+    def put_nowait(self, v):
+        self._q[self._wi] = v
+        self._evput.set()  # Schedule any tasks waiting on get
+        self._evput.clear()
+        self._wi = (self._wi + 1) % self._size
+        if self._wi == self._ri:  # Would indicate empty
+            self._ri = (self._ri + 1) % self._size  # Discard a message
+            raise IndexError  # Caller can ignore if overwrites are OK
+
+    async def put(self, val):  # Usage: await queue.put(item)
+        while self.full():  # Queue full
+            await self._evget.wait()  # May be >1 task waiting on ._evget
+            # Task(s) waiting to get from queue, schedule first Task
+        self.put_nowait(val)
+
+    def __aiter__(self):
+        return self
+
+    async def __anext__(self):
+        while self.empty():  # Empty. May be more than one task waiting on ._evput
+            await self._evput.wait()
+        r = self._q[self._ri]
+        self._ri = (self._ri + 1) % self._size
+        self._evget.set()  # Schedule all tasks waiting on ._evget
+        self._evget.clear()
+        return r
diff --git a/primitives/semaphore.py b/primitives/semaphore.py
new file mode 100644 (file)
index 0000000..19b82e4
--- /dev/null
@@ -0,0 +1,48 @@
+# semaphore.py
+
+# Copyright (c) 2018-2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+try:
+    import uasyncio as asyncio
+except ImportError:
+    import asyncio
+
+# A Semaphore is typically used to limit the number of coros running a
+# particular piece of code at once. The number is defined in the constructor.
+class Semaphore():
+    def __init__(self, value=1):
+        self._count = value
+        self._event = asyncio.Event()
+
+    async def __aenter__(self):
+        await self.acquire()
+        return self
+
+    async def __aexit__(self, *args):
+        self.release()
+        await asyncio.sleep(0)
+
+    async def acquire(self):
+        self._event.clear()
+        while self._count == 0:  # Multiple tasks may be waiting for
+            await self._event.wait()  # a release
+            self._event.clear()
+            # When we yield, another task may succeed. In this case
+            await asyncio.sleep(0)  # the loop repeats
+        self._count -= 1
+
+    def release(self):
+        self._event.set()
+        self._count += 1
+
+class BoundedSemaphore(Semaphore):
+    def __init__(self, value=1):
+        super().__init__(value)
+        self._initial_value = value
+
+    def release(self):
+        if self._count < self._initial_value:
+            super().release()
+        else:
+            raise ValueError('Semaphore released more than acquired')
diff --git a/primitives/switch.py b/primitives/switch.py
new file mode 100644 (file)
index 0000000..1da2435
--- /dev/null
@@ -0,0 +1,49 @@
+# switch.py
+
+# Copyright (c) 2018-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+import utime as time
+from . import launch
+
+class Switch:
+    debounce_ms = 50
+    def __init__(self, pin):
+        self.pin = pin # Should be initialised for input with pullup
+        self._open_func = False
+        self._close_func = False
+        self.switchstate = self.pin.value()  # Get initial state
+        self._run = asyncio.create_task(self.switchcheck())  # Thread runs forever
+
+    def open_func(self, func, args=()):
+        if func is None:
+            self.open = asyncio.Event()
+        self._open_func = self.open.set if func is None else func
+        self._open_args = args
+
+    def close_func(self, func, args=()):
+        if func is None:
+            self.close = asyncio.Event()
+        self._close_func = self.close.set if func is None else func
+        self._close_args = args
+
+    # Return current state of switch (0 = pressed)
+    def __call__(self):
+        return self.switchstate
+
+    async def switchcheck(self):
+        while True:
+            state = self.pin.value()
+            if state != self.switchstate:
+                # State has changed: act on it now.
+                self.switchstate = state
+                if state == 0 and self._close_func:
+                    launch(self._close_func, self._close_args)
+                elif state == 1 and self._open_func:
+                    launch(self._open_func, self._open_args)
+            # Ignore further state changes until switch has settled
+            await asyncio.sleep_ms(Switch.debounce_ms)
+
+    def deinit(self):
+        self._run.cancel()
diff --git a/primitives/tests/__init__.py b/primitives/tests/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/primitives/tests/adctest.py b/primitives/tests/adctest.py
new file mode 100644 (file)
index 0000000..8e3a194
--- /dev/null
@@ -0,0 +1,50 @@
+# adctest.py
+
+# Copyright (c) 2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+from machine import ADC
+import pyb
+from primitives import AADC
+
+async def signal():  # Could use write_timed but this prints values
+    dac = pyb.DAC(1, bits=12, buffering=True)
+    v = 0
+    while True:
+        if not v & 0xf:
+            print('write', v << 4)  # Make value u16 as per ADC read
+        dac.write(v)
+        v += 1
+        v %= 4096
+        await asyncio.sleep_ms(50)
+
+async def adctest():
+    asyncio.create_task(signal())
+    adc = AADC(ADC(pyb.Pin.board.X1))
+    await asyncio.sleep(0)
+    adc.sense(normal=False)  # Wait until ADC gets to 5000
+    value =  await adc(5000, 10000)
+    print('Received', value, adc.read_u16(True))  # Reduce to 12 bits
+    adc.sense(normal=True)  # Now print all changes > 2000
+    while True:
+        value = await adc(2000)  # Trigger if value changes by 2000
+        print('Received', value, adc.read_u16(True))
+
+st = '''This test requires a Pyboard with pins X1 and X5 linked.
+A sawtooth waveform is applied to the ADC. Initially the test waits
+until the ADC value reaches 5000. It then reports whenever the value
+changes by 2000.
+Issue test() to start.
+'''
+print(st)
+
+def test():
+    try:
+        asyncio.run(adctest())
+    except KeyboardInterrupt:
+        print('Interrupted')
+    finally:
+        asyncio.new_event_loop()
+        print()
+        print(st)
diff --git a/primitives/tests/asyntest.py b/primitives/tests/asyntest.py
new file mode 100644 (file)
index 0000000..606e1fd
--- /dev/null
@@ -0,0 +1,664 @@
+# asyntest.py Test/demo of the 'micro' Event, Barrier and Semaphore classes
+# Test/demo of official asyncio library and official Lock class
+
+# Copyright (c) 2017-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+# CPython 3.8 compatibility
+# (ignore RuntimeWarning: coroutine '_g' was never awaited)
+# To run:
+# from primitives.tests.asyntest import test
+
+try:
+    import uasyncio as asyncio
+except ImportError:
+    import asyncio
+import sys
+unix = "linux" in sys.implementation._machine
+
+from primitives import Barrier, Semaphore, BoundedSemaphore, Condition, Queue, RingbufQueue
+try:
+    from threadsafe import Message
+except:
+    pass
+
+def print_tests():
+    st = '''Available functions:
+test(0)  Print this list.
+test(1)  Test message acknowledge.
+test(2)  Test Message and Lock objects.
+test(3)  Test the Barrier class with callback.
+test(4)  Test the Barrier class with coroutine.
+test(5)  Test Semaphore
+test(6)  Test BoundedSemaphore.
+test(7)  Test the Condition class.
+test(8)  Test the Queue class.
+test(9)  Test the RingbufQueue class.
+'''
+    print('\x1b[32m')
+    print(st)
+    print('\x1b[39m')
+
+print_tests()
+
+def printexp(exp, runtime=0):
+    print('Expected output:')
+    print('\x1b[32m')
+    print(exp)
+    print('\x1b[39m')
+    if runtime:
+        print('Running (runtime = {}s):'.format(runtime))
+    else:
+        print('Running (runtime < 1s):')
+
+# ************ Test Message class ************
+# Demo use of acknowledge message
+
+async def message_wait(message, ack_message, n):
+    try:
+        await message
+        print(f'message_wait {n} got message: {message.value()}')
+        if ack_message is not None:
+            ack_message.set()
+    except asyncio.CancelledError:
+        print(f"message_wait {n} cancelled")
+
+async def run_ack(n):
+    message = Message()
+    ack1 = Message()
+    ack2 = Message()
+    for count in range(n):
+        t0 = asyncio.create_task(message_wait(message, ack1, 1))
+        t1 = asyncio.create_task(message_wait(message, ack2, 2))
+        message.set(count)
+        print('message was set')
+        await ack1
+        ack1.clear()
+        print('Cleared ack1')
+        await ack2
+        ack2.clear()
+        print('Cleared ack2')
+        message.clear()
+        print('Cleared message')
+        await asyncio.sleep(1)
+    t0.cancel()
+    t1.cancel()
+
+async def msg_send(msg, items):
+    for item in items:
+        await asyncio.sleep_ms(400)
+        msg.set(item)
+
+async def msg_recv(msg):  # Receive using asynchronous iterator
+    async for data in msg:
+        print("Got", data)
+        msg.clear()
+
+async def ack_coro():
+    print("Test multiple tasks waiting on a message.")
+    await run_ack(3)
+    print()
+    print("Test asynchronous iterator.")
+    msg = Message()
+    asyncio.create_task(msg_send(msg, (1, 2, 3)))
+    try:
+        await asyncio.wait_for(msg_recv(msg), 3)
+    except asyncio.TimeoutError:
+        pass
+    await asyncio.sleep(1)
+    print()
+    print("Test cancellation of first waiting task.")
+    t1 = asyncio.create_task(message_wait(msg, None, 1))
+    t2 = asyncio.create_task(message_wait(msg, None, 2))
+    await asyncio.sleep(1)
+    t1.cancel()
+    await asyncio.sleep(1)
+    print("Setting message")
+    msg.set("Test message")
+    await asyncio.sleep(1)  # Tasks have ended or been cancelled
+    msg.clear()
+    print()
+    print("Test cancellation of second waiting task.")
+    t1 = asyncio.create_task(message_wait(msg, None, 1))
+    t2 = asyncio.create_task(message_wait(msg, None, 2))
+    await asyncio.sleep(1)
+    t2.cancel()
+    await asyncio.sleep(1)
+    print("Setting message")
+    msg.set("Test message")
+    await asyncio.sleep(1)
+    msg.clear()
+
+    print("I've seen attack ships burn on the shoulder of Orion...")
+    print("Time to die...")
+
+def ack_test():
+    if unix:
+        print("Message class is incompatible with Unix build.")
+        return
+    printexp('''Running (runtime = 12s):
+Test multiple tasks waiting on a message.
+message was set
+message_wait 1 got message: 0
+message_wait 2 got message: 0
+Cleared ack1
+Cleared ack2
+Cleared message
+message was set
+message_wait 1 got message: 1
+message_wait 2 got message: 1
+Cleared ack1
+Cleared ack2
+Cleared message
+message was set
+message_wait 1 got message: 2
+message_wait 2 got message: 2
+Cleared ack1
+Cleared ack2
+Cleared message
+
+Test asynchronous iterator.
+Got 1
+Got 2
+Got 3
+
+Test cancellation of first waiting task.
+message_wait 1 cancelled
+Setting message
+message_wait 2 got message: Test message
+
+Test cancellation of second waiting task.
+message_wait 2 cancelled
+Setting message
+message_wait 1 got message: Test message
+I've seen attack ships burn on the shoulder of Orion...
+Time to die...
+''', 12)
+    asyncio.run(ack_coro())
+
+# ************ Test Lock and Message classes ************
+
+async def run_lock(n, lock):
+    print('run_lock {} waiting for lock'.format(n))
+    await lock.acquire()
+    print('run_lock {} acquired lock'.format(n))
+    await asyncio.sleep(1)  # Delay to demo other coros waiting for lock
+    lock.release()
+    print('run_lock {} released lock'.format(n))
+
+async def messageset(message):
+    print('Waiting 5 secs before setting message')
+    await asyncio.sleep(5)
+    message.set()
+    print('message was set')
+
+async def messagewait(message):
+    print('waiting for message')
+    await message
+    print('got message')
+    message.clear()
+
+async def run_message_test():
+    print('Test Lock class')
+    lock = asyncio.Lock()
+    asyncio.create_task(run_lock(1, lock))
+    asyncio.create_task(run_lock(2, lock))
+    asyncio.create_task(run_lock(3, lock))
+    print('Test Message class')
+    message = Message()
+    asyncio.create_task(messageset(message))
+    await messagewait(message)  # run_message_test runs fast until this point
+    print('Message status {}'.format('Incorrect' if message.is_set() else 'OK'))
+    print('Tasks complete')
+
+def msg_test():
+    if unix:
+        print("Message class is incompatible with Unix build.")
+        return
+    printexp('''Test Lock class
+Test Message class
+waiting for message
+run_lock 1 waiting for lock
+run_lock 1 acquired lock
+run_lock 2 waiting for lock
+run_lock 3 waiting for lock
+Waiting 5 secs before setting message
+run_lock 1 released lock
+run_lock 2 acquired lock
+run_lock 2 released lock
+run_lock 3 acquired lock
+run_lock 3 released lock
+message was set
+got message
+Message status OK
+Tasks complete
+''', 5)
+    asyncio.run(run_message_test())
+
+# ************ Barrier test ************
+
+async def killer(duration):
+    await asyncio.sleep(duration)
+
+def callback(text):
+    print(text)
+
+async def report(barrier):
+    for i in range(5):
+        print('{} '.format(i), end='')
+        await barrier
+
+async def do_barrier_test():
+    barrier = Barrier(3, callback, ('Synch',))
+    for _ in range(2):
+        for _ in range(3):
+            asyncio.create_task(report(barrier))
+        await asyncio.sleep(1)
+        print()
+    await asyncio.sleep(1)
+
+def barrier_test():
+    printexp('''Running (runtime = 3s):
+0 0 0 Synch
+1 1 1 Synch
+2 2 2 Synch
+3 3 3 Synch
+4 4 4 Synch
+
+1 1 1 Synch
+2 2 2 Synch
+3 3 3 Synch
+4 4 4 Synch
+''', 3)
+    asyncio.run(do_barrier_test())
+
+# ************ Barrier test 1 ************
+
+async def my_coro(text):
+    try:
+        await asyncio.sleep_ms(0)
+        while True:
+            await asyncio.sleep(1)
+            print(text)
+    except asyncio.CancelledError:
+        print('my_coro was cancelled.')
+
+async def report1(barrier, x):
+    await asyncio.sleep(x)
+    print('report instance', x, 'waiting')
+    await barrier
+    print('report instance', x, 'done')
+
+async def bart():
+    barrier = Barrier(4, my_coro, ('my_coro running',))
+    for x in range(3):
+        asyncio.create_task(report1(barrier, x))
+    await asyncio.sleep(4)
+    assert barrier.busy()
+    await barrier
+    await asyncio.sleep(0)
+    assert not barrier.busy()
+    # Must yield before reading result(). Here we wait long enough for
+    await asyncio.sleep_ms(1500)  # coro to print
+    barrier.result().cancel()
+    await asyncio.sleep(2)
+
+def barrier_test1():
+    printexp('''Running (runtime = 5s):
+report instance 0 waiting
+report instance 1 waiting
+report instance 2 waiting
+report instance 2 done
+report instance 1 done
+report instance 0 done
+my_coro running
+my_coro was cancelled.
+
+Exact report instance done sequence may vary, but 3 instances should report
+done before my_coro runs.
+''', 5)
+    asyncio.run(bart())
+
+# ************ Semaphore test ************
+
+async def run_sema(n, sema, barrier):
+    print('run_sema {} trying to access semaphore'.format(n))
+    async with sema:
+        print('run_sema {} acquired semaphore'.format(n))
+        # Delay demonstrates other coros waiting for semaphore
+        await asyncio.sleep(1 + n/10)  # n/10 ensures deterministic printout
+    print('run_sema {} has released semaphore'.format(n))
+    barrier.trigger()
+
+async def run_sema_test(bounded):
+    num_coros = 5
+    barrier = Barrier(num_coros + 1)
+    if bounded:
+        semaphore = BoundedSemaphore(3)
+    else:
+        semaphore = Semaphore(3)
+    for n in range(num_coros):
+        asyncio.create_task(run_sema(n, semaphore, barrier))
+    await barrier  # Quit when all coros complete
+    try:
+        semaphore.release()
+    except ValueError:
+        print('Bounded semaphore exception test OK')
+
+def semaphore_test(bounded=False):
+    if bounded:
+        exp = '''run_sema 0 trying to access semaphore
+run_sema 0 acquired semaphore
+run_sema 1 trying to access semaphore
+run_sema 1 acquired semaphore
+run_sema 2 trying to access semaphore
+run_sema 2 acquired semaphore
+run_sema 3 trying to access semaphore
+run_sema 4 trying to access semaphore
+run_sema 0 has released semaphore
+run_sema 4 acquired semaphore
+run_sema 1 has released semaphore
+run_sema 3 acquired semaphore
+run_sema 2 has released semaphore
+run_sema 4 has released semaphore
+run_sema 3 has released semaphore
+Bounded semaphore exception test OK
+
+Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
+    else:
+        exp = '''run_sema 0 trying to access semaphore
+run_sema 0 acquired semaphore
+run_sema 1 trying to access semaphore
+run_sema 1 acquired semaphore
+run_sema 2 trying to access semaphore
+run_sema 2 acquired semaphore
+run_sema 3 trying to access semaphore
+run_sema 4 trying to access semaphore
+run_sema 0 has released semaphore
+run_sema 3 acquired semaphore
+run_sema 1 has released semaphore
+run_sema 4 acquired semaphore
+run_sema 2 has released semaphore
+run_sema 3 has released semaphore
+run_sema 4 has released semaphore
+
+Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
+    printexp(exp, 3)
+    asyncio.run(run_sema_test(bounded))
+
+# ************ Condition test ************
+
+cond = Condition()
+tim = 0
+
+async def cond01():
+    while True:
+        await asyncio.sleep(2)
+        with await cond:
+            cond.notify(2)  # Notify 2 tasks
+
+async def cond03():  # Maintain a count of seconds
+    global tim
+    await asyncio.sleep(0.5)
+    while True:
+        await asyncio.sleep(1)
+        tim += 1
+
+async def cond02(n, barrier):
+    with await cond:
+        print('cond02', n, 'Awaiting notification.')
+        await cond.wait()
+        print('cond02', n, 'triggered. tim =', tim)
+        barrier.trigger()
+
+def predicate():
+    return tim >= 8 # 12
+
+async def cond04(n, barrier):
+    with await cond:
+        print('cond04', n, 'Awaiting notification and predicate.')
+        await cond.wait_for(predicate)
+        print('cond04', n, 'triggered. tim =', tim)
+        barrier.trigger()
+
+async def cond_go():
+    ntasks = 7
+    barrier = Barrier(ntasks + 1)
+    t1 = asyncio.create_task(cond01())
+    t3 = asyncio.create_task(cond03())
+    for n in range(ntasks):
+        asyncio.create_task(cond02(n, barrier))
+    await barrier  # All instances of cond02 have completed
+    # Test wait_for
+    barrier = Barrier(2)
+    asyncio.create_task(cond04(99, barrier))
+    await barrier
+    # cancel continuously running coros.
+    t1.cancel()
+    t3.cancel()
+    await asyncio.sleep_ms(0)
+    print('Done.')
+
+def condition_test():
+    printexp('''cond02 0 Awaiting notification.
+cond02 1 Awaiting notification.
+cond02 2 Awaiting notification.
+cond02 3 Awaiting notification.
+cond02 4 Awaiting notification.
+cond02 5 Awaiting notification.
+cond02 6 Awaiting notification.
+cond02 5 triggered. tim = 1
+cond02 6 triggered. tim = 1
+cond02 3 triggered. tim = 3
+cond02 4 triggered. tim = 3
+cond02 1 triggered. tim = 5
+cond02 2 triggered. tim = 5
+cond02 0 triggered. tim = 7
+cond04 99 Awaiting notification and predicate.
+cond04 99 triggered. tim = 9
+Done.
+''', 13)
+    asyncio.run(cond_go())
+
+# ************ Queue test ************
+
+async def slow_process():
+    await asyncio.sleep(2)
+    return 42
+
+async def bar(q):
+    print('Waiting for slow process.')
+    result = await slow_process()
+    print('Putting result onto queue')
+    await q.put(result)  # Put result on q
+
+async def foo(q):
+    print("Running foo()")
+    result = await q.get()
+    print('Result was {}'.format(result))
+
+async def q_put(n, q):
+    for x in range(8):
+        obj = (n, x)
+        await q.put(obj)
+        await asyncio.sleep(0)
+
+async def q_get(n, q):
+    for x in range(8):
+        await q.get()
+        await asyncio.sleep(0)
+
+async def putter(q):
+    # put some item, then sleep
+    for _ in range(20):
+        await q.put(1)
+        await asyncio.sleep_ms(50)
+
+
+async def getter(q):
+   # checks for new items, and relies on the "blocking" of the get method
+    for _ in range(20):
+        await q.get()
+
+async def queue_go():
+    q = Queue(10)
+    asyncio.create_task(foo(q))
+    asyncio.create_task(bar(q))
+    await asyncio.sleep(3)
+    for n in range(4):
+        asyncio.create_task(q_put(n, q))
+    await asyncio.sleep(1)
+    assert q.qsize() == 10
+    await q.get()
+    await asyncio.sleep(0.1)
+    assert q.qsize() == 10
+    while not q.empty():
+        await q.get()
+        await asyncio.sleep(0.1)
+    assert q.empty()
+    print('Competing put tasks test complete')
+
+    for n in range(4):
+        asyncio.create_task(q_get(n, q))
+    await asyncio.sleep(1)
+    x = 0
+    while not q.full():
+        await q.put(x)
+        await asyncio.sleep(0.3)
+        x += 1
+    assert q.qsize() == 10
+    print('Competing get tasks test complete')
+    await asyncio.gather(
+        putter(q),
+        getter(q)
+        )
+    print('Queue tests complete')
+    print("I've seen attack ships burn off the shoulder of Orion...")
+    print("Time to die...")
+
+def queue_test():
+    printexp('''Running (runtime = 20s):
+Running foo()
+Waiting for slow process.
+Putting result onto queue
+Result was 42
+Competing put tasks test complete
+Competing get tasks test complete
+Queue tests complete
+
+
+I've seen attack ships burn off the shoulder of Orion...
+Time to die...
+
+''', 20)
+    asyncio.run(queue_go())
+
+# ************ RingbufQueue test ************
+
+async def qread(q, lst, twr):
+    async for item in q:
+        lst.append(item)
+        await asyncio.sleep_ms(twr)
+
+async def read(q, t, twr=0):
+    lst = []
+    try:
+        await asyncio.wait_for(qread(q, lst, twr), t)
+    except asyncio.TimeoutError:
+        pass
+    return lst
+
+async def put_list(q, lst, twp=0):
+    for item in lst:
+        await q.put(item)
+        await asyncio.sleep_ms(twp)
+
+async def rbq_go():
+    q = RingbufQueue([0 for _ in range(10)])  # 10 elements
+    pl = [n for n in range(15)]
+    print("Read waits on slow write.")
+    asyncio.create_task(put_list(q, pl, 100))
+    rl = await read(q, 2)
+    assert pl == rl
+    print('done')
+    print("Write waits on slow read.")
+    asyncio.create_task(put_list(q, pl))
+    rl = await read(q, 2, 100)
+    assert pl == rl
+    print('done')
+    print("Testing full, empty and qsize methods.")
+    assert q.empty()
+    assert q.qsize() == 0
+    assert not q.full()
+    await put_list(q, (1,2,3))
+    assert not q.empty()
+    assert q.qsize() == 3
+    assert not q.full()
+    print("Done")
+    print("Testing put_nowait and overruns.")
+    nfail = 0
+    for x in range(4, 15):
+        try:
+            q.put_nowait(x)
+        except IndexError:
+            nfail += 1
+    assert nfail == 5
+    assert q.full()
+    rl = await read(q, 2)
+    assert rl == [6, 7, 8, 9, 10, 11, 12, 13, 14]
+    print("Testing get_nowait.")
+    await q.put(1)
+    assert q.get_nowait() == 1
+    err = 0
+    try:
+        q.get_nowait()
+    except IndexError:
+        err = 1
+    assert err == 1
+    print("Tests complete.")
+    print("I've seen attack ships burn off the shoulder of Orion...")
+    print("Time to die...")
+
+def rbq_test():
+    printexp('''Running (runtime = 6s):
+Read waits on slow write.
+done
+Write waits on slow read.
+done
+Testing full, empty and qsize methods.
+Done
+Testing put_nowait and overruns.
+Testing get_nowait.
+Tests complete.
+I've seen attack ships burn off the shoulder of Orion...
+Time to die...
+
+''', 6)
+    asyncio.run(rbq_go())
+
+# ************ ************
+def test(n):
+    try:
+        if n == 1:
+            ack_test()  # Test message acknowledge.
+        elif n == 2:
+            msg_test()  # Test Messge and Lock objects.
+        elif n == 3:
+            barrier_test()  # Test the Barrier class.
+        elif n == 4:
+            barrier_test1()  # Test the Barrier class.
+        elif n == 5:
+            semaphore_test(False) # Test Semaphore
+        elif n == 6:
+            semaphore_test(True)  # Test BoundedSemaphore.
+        elif n == 7:
+            condition_test()  # Test the Condition class.
+        elif n == 8:
+            queue_test()  # Test the Queue class.
+        elif n == 9:
+            rbq_test()  # Test the RingbufQueue class.
+    except KeyboardInterrupt:
+        print('Interrupted')
+    finally:
+        asyncio.new_event_loop()
+        print_tests()
diff --git a/primitives/tests/delay_test.py b/primitives/tests/delay_test.py
new file mode 100644 (file)
index 0000000..17dde72
--- /dev/null
@@ -0,0 +1,223 @@
+# delay_test.py Tests for Delay_ms class
+
+# Copyright (c) 2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+import micropython
+from primitives.delay_ms import Delay_ms
+
+micropython.alloc_emergency_exception_buf(100)
+
+def printexp(exp, runtime=0):
+    print('Expected output:')
+    print('\x1b[32m')
+    print(exp)
+    print('\x1b[39m')
+    if runtime:
+        print('Running (runtime = {}s):'.format(runtime))
+    else:
+        print('Running (runtime < 1s):')
+
+async def ctor_test():  # Constructor arg
+    s = '''
+Trigger 5 sec delay
+Retrigger 5 sec delay
+Callback should run
+cb callback
+Done
+'''
+    printexp(s, 12)
+    def cb(v):
+        print('cb', v)
+
+    d = Delay_ms(cb, ('callback',), duration=5000)
+
+    print('Trigger 5 sec delay')
+    d.trigger()
+    await asyncio.sleep(4)
+    print('Retrigger 5 sec delay')
+    d.trigger()
+    await asyncio.sleep(4)
+    print('Callback should run')
+    await asyncio.sleep(2)
+    print('Done')
+
+async def launch_test():
+    s = '''
+Trigger 5 sec delay
+Coroutine should run: run to completion.
+Coroutine starts
+Coroutine ends
+Coroutine should run: test cancellation.
+Coroutine starts
+Coroutine should run: test awaiting.
+Coroutine starts
+Coroutine ends
+Done
+'''
+    printexp(s, 20)
+    async def cb(v, ms):
+        print(v, 'starts')
+        await asyncio.sleep_ms(ms)
+        print(v, 'ends')
+
+    d = Delay_ms(cb, ('coroutine', 1000))
+
+    print('Trigger 5 sec delay')
+    d.trigger(5000)  # Test extending time
+    await asyncio.sleep(4)
+    print('Coroutine should run: run to completion.')
+    await asyncio.sleep(3)
+    d = Delay_ms(cb, ('coroutine', 3000))
+    d.trigger(5000)
+    await asyncio.sleep(4)
+    print('Coroutine should run: test cancellation.')
+    await asyncio.sleep(2)
+    coro = d.rvalue()
+    coro.cancel()
+    d.trigger(5000)
+    await asyncio.sleep(4)
+    print('Coroutine should run: test awaiting.')
+    await asyncio.sleep(2)
+    coro = d.rvalue()
+    await coro
+    print('Done')
+
+
+async def reduce_test():  # Test reducing a running delay
+    s = '''
+Trigger 5 sec delay
+Callback should run
+cb callback
+Callback should run
+cb callback
+Done
+'''
+    printexp(s, 11)
+    def cb(v):
+        print('cb', v)
+
+    d = Delay_ms(cb, ('callback',))
+
+    print('Trigger 5 sec delay')
+    d.trigger(5000)  # Test extending time
+    await asyncio.sleep(4)
+    print('Callback should run')
+    await asyncio.sleep(2)
+    d.trigger(10000)
+    await asyncio.sleep(1)
+    d.trigger(3000)
+    await asyncio.sleep(2)
+    print('Callback should run')
+    await asyncio.sleep(2)
+    print('Done')
+
+
+async def stop_test():  # Test the .stop and .running methods
+    s = '''
+Trigger 5 sec delay
+Running
+Callback should run
+cb callback
+Callback returned 42
+Callback should not run
+Done
+    '''
+    printexp(s, 12)
+    def cb(v):
+        print('cb', v)
+        return 42
+
+    d = Delay_ms(cb, ('callback',))
+
+    print('Trigger 5 sec delay')
+    d.trigger(5000)  # Test extending time
+    await asyncio.sleep(4)
+    if d():
+        print('Running')
+    print('Callback should run')
+    await asyncio.sleep(2)
+    print('Callback returned', d.rvalue())
+    d.trigger(3000)
+    await asyncio.sleep(1)
+    d.stop()
+    await asyncio.sleep(1)
+    if d():
+        print('Running')
+    print('Callback should not run')
+    await asyncio.sleep(4)
+    print('Done')
+
+
+async def isr_test():  # Test trigger from hard ISR
+    from pyb import Timer
+    s = '''
+Timer holds off cb for 5 secs
+cb should now run
+cb callback
+Done
+'''
+    printexp(s, 6)
+    def cb(v):
+        print('cb', v)
+
+    d = Delay_ms(cb, ('callback',))
+
+    def timer_cb(_):
+        d.trigger(200)
+    tim = Timer(1, freq=10, callback=timer_cb)
+
+    print('Timer holds off cb for 5 secs')
+    await asyncio.sleep(5)
+    tim.deinit()
+    print('cb should now run')
+    await asyncio.sleep(1)
+    print('Done')
+
+async def err_test():  # Test triggering de-initialised timer
+    s = '''
+Running (runtime = 3s):
+Trigger 1 sec delay
+cb callback
+Success: error was raised.
+Done
+    '''
+    printexp(s, 3)
+    def cb(v):
+        print('cb', v)
+        return 42
+
+    d = Delay_ms(cb, ('callback',))
+
+    print('Trigger 1 sec delay')
+    d.trigger(1000)
+    await asyncio.sleep(2)
+    d.deinit()
+    try:
+        d.trigger(1000)
+    except RuntimeError:
+        print("Success: error was raised.")
+    print('Done')
+
+av = '''
+Run a test by issuing
+delay_test.test(n)
+where n is a test number. Avaliable tests:
+\x1b[32m
+0 Test triggering from a hard ISR (Pyboard only)
+1 Test the .stop method and callback return value.
+2 Test reducing the duration of a running timer
+3 Test delay defined by constructor arg
+4 Test triggering a Task
+5 Attempt to trigger de-initialised instance
+\x1b[39m
+'''
+print(av)
+
+tests = (isr_test, stop_test, reduce_test, ctor_test, launch_test, err_test)
+def test(n=0):
+    try:
+        asyncio.run(tests[n]())
+    finally:
+        asyncio.new_event_loop()
diff --git a/primitives/tests/encoder_stop.py b/primitives/tests/encoder_stop.py
new file mode 100644 (file)
index 0000000..ed75e8d
--- /dev/null
@@ -0,0 +1,39 @@
+# encoder_stop.py Demo of callback which occurs after motion has stopped.
+
+from machine import Pin
+import uasyncio as asyncio
+from primitives.encoder import Encoder
+from primitives.delay_ms import Delay_ms
+
+px = Pin('X1', Pin.IN, Pin.PULL_UP)
+py = Pin('X2', Pin.IN, Pin.PULL_UP)
+
+tim = Delay_ms(duration=400)  # High value for test
+d = 0
+
+def tcb(pos, delta):  # User callback gets args of encoder cb
+    global d
+    d = 0
+    print(pos, delta)
+
+def cb(pos, delta):  # Encoder callback occurs rapidly
+    global d
+    tim.trigger()  # Postpone the user callback
+    tim.callback(tcb, (pos, d := d + delta))  # and update its args
+
+async def main():
+    while True:
+        await asyncio.sleep(1)
+
+def test():
+    print('Running encoder test. Press ctrl-c to teminate.')
+    Encoder.delay = 0  # No need for this delay
+    enc = Encoder(px, py, callback=cb)
+    try:
+        asyncio.run(main())
+    except KeyboardInterrupt:
+        print('Interrupted')
+    finally:
+        asyncio.new_event_loop()
+
+test()
diff --git a/primitives/tests/encoder_test.py b/primitives/tests/encoder_test.py
new file mode 100644 (file)
index 0000000..15b919d
--- /dev/null
@@ -0,0 +1,31 @@
+# encoder_test.py Test for asynchronous driver for incremental quadrature encoder.
+
+# Copyright (c) 2021-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+from machine import Pin
+import uasyncio as asyncio
+from primitives.encoder import Encoder
+
+
+px = Pin(33, Pin.IN, Pin.PULL_UP)
+py = Pin(25, Pin.IN, Pin.PULL_UP)
+
+def cb(pos, delta):
+    print(pos, delta)
+
+async def main():
+    while True:
+        await asyncio.sleep(1)
+
+def test():
+    print('Running encoder test. Press ctrl-c to teminate.')
+    enc = Encoder(px, py, v=0, vmin=0, vmax=100, callback=cb)
+    try:
+        asyncio.run(main())
+    except KeyboardInterrupt:
+        print('Interrupted')
+    finally:
+        asyncio.new_event_loop()
+
+test()
diff --git a/primitives/tests/event_test.py b/primitives/tests/event_test.py
new file mode 100644 (file)
index 0000000..23989e3
--- /dev/null
@@ -0,0 +1,206 @@
+# event_test.py Test WaitAll, WaitAny, ESwwitch, EButton
+
+# Copyright (c) 2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+# from primitives.tests.event_test import *
+
+import uasyncio as asyncio
+from primitives import Delay_ms, WaitAny, ESwitch, WaitAll, EButton
+from pyb import Pin
+
+events = [asyncio.Event() for _ in range(4)]
+
+async def set_events(*ev):
+    for n in ev:
+        await asyncio.sleep(1)
+        print("Setting", n)
+        events[n].set()
+
+def clear(msg):
+    print(msg)
+    for e in events:
+        e.clear()
+
+async def can(obj, tim):
+    await asyncio.sleep(tim)
+    print("About to cancel")
+    obj.cancel()
+
+async def foo(tsk):
+    print("Waiting")
+    await tsk
+
+async def wait_test():
+    msg = """
+\x1b[32m
+Expected output:
+Setting 0
+Tested WaitAny 0
+Setting 1
+Tested WaitAny 1
+Setting 2
+Setting 3
+Tested WaitAll 2, 3
+Setting 0
+Setting 3
+Tested WaitAny 0, 3
+Cancel in 3s
+Setting 0
+Setting 1
+About to cancel
+Cancelled.
+Waiting for 4s
+Timeout
+done
+\x1b[39m
+"""
+    print(msg)
+    wa = WaitAny((events[0], events[1], WaitAll((events[2], events[3]))))
+    asyncio.create_task(set_events(0))
+    await wa.wait()
+    clear("Tested WaitAny 0")
+    asyncio.create_task(set_events(1))
+    await wa.wait()
+    clear("Tested WaitAny 1")
+    asyncio.create_task(set_events(2, 3))
+    await wa.wait()
+    clear("Tested WaitAll 2, 3")
+    wa = WaitAll((WaitAny((events[0], events[1])), WaitAny((events[2], events[3]))))
+    asyncio.create_task(set_events(0, 3))
+    await wa.wait()
+    clear("Tested WaitAny 0, 3")
+    task = asyncio.create_task(wa.wait())
+    asyncio.create_task(set_events(0, 1))  # Does nothing
+    asyncio.create_task(can(task, 3))
+    print("Cancel in 3s")
+    try:
+        await task
+    except asyncio.CancelledError:  # TODO why must we trap this?
+        print("Cancelled.")
+    print("Waiting for 4s")
+    try:
+        await asyncio.wait_for(wa.wait(), 4)
+    except asyncio.TimeoutError:
+        print("Timeout")
+    print("done")
+
+val = 0
+fail = False
+pout = None
+polarity = 0
+
+async def monitor(evt, v, verbose):
+    global val
+    while True:
+        await evt.wait()
+        evt.clear()
+        val += v
+        verbose and print("Got", hex(v), hex(val))
+
+async def pulse(ms=100):
+    pout(1 ^ polarity)
+    await asyncio.sleep_ms(ms)
+    pout(polarity)
+
+def expect(v, e):
+    global fail
+    if v == e:
+        print("Pass")
+    else:
+        print(f"Fail: expected {e} got {v}")
+        fail = True
+
+async def btest(btn, verbose, supp):
+    global val, fail
+    val = 0
+    events = btn.press, btn.release, btn.double, btn.long
+    tasks = []
+    for n, evt in enumerate(events):
+        tasks.append(asyncio.create_task(monitor(evt, 1 << 3 * n, verbose)))
+    await asyncio.sleep(1)
+    print("Start short press test")
+    await pulse()
+    await asyncio.sleep(1)
+    verbose and print("Test of short press", hex(val))
+    expect(val, 0x09)
+    val = 0
+    await asyncio.sleep(1)
+    print("Start long press test")
+    await pulse(2000)
+    await asyncio.sleep(4)
+    verbose and print("Long press", hex(val))
+    exp = 0x208 if supp else 0x209
+    expect(val, exp)
+    val = 0
+    await asyncio.sleep(1)
+    print("Start double press test")
+    await pulse()
+    await asyncio.sleep_ms(100)
+    await pulse()
+    await asyncio.sleep(4)
+    verbose and print("Double press", hex(val))
+    exp = 0x48 if supp else 0x52
+    expect(val, exp)
+    for task in tasks:
+        task.cancel()
+
+async def stest(sw, verbose):
+    global val, fail
+    val = 0
+    events = sw.open, sw.close
+    tasks = []
+    for n, evt in enumerate(events):
+        tasks.append(asyncio.create_task(monitor(evt, 1 << 3 * n, verbose)))
+    asyncio.create_task(pulse(2000))
+    await asyncio.sleep(1)
+    expect(val, 0x08)
+    await asyncio.sleep(4)  # Wait for any spurious events
+    verbose and print("Switch close and open", hex(val))
+    expect(val, 0x09)
+    for task in tasks:
+        task.cancel()
+
+async def switch_test(pol, verbose):
+    global val, pout, polarity
+    polarity = pol
+    pin = Pin('Y1', Pin.IN)
+    pout = Pin('Y2', Pin.OUT, value=pol)
+    print("Testing EButton.")
+    print("suppress == False")
+    btn = EButton(pin)
+    await btest(btn, verbose, False)
+    print("suppress == True")
+    btn = EButton(pin, suppress=True)
+    await btest(btn, verbose, True)
+    print("Testing ESwitch")
+    sw = ESwitch(pin, pol)
+    await stest(sw, verbose)
+    print("Failures occurred.") if fail else print("All tests passed.")
+
+def tests():
+    txt="""
+    \x1b[32m
+    Available tests:
+    1. test_switches(polarity=1, verbose=False) Test the ESwitch and Ebutton classe.
+    2. test_wait() Test the WaitAny and WaitAll primitives.
+
+    Switch tests assume a Pyboard with a link between Y1 and Y2.
+    \x1b[39m
+    """
+    print(txt)
+
+tests()
+def test_switches(polarity=1, verbose=False):
+    try:
+        asyncio.run(switch_test(polarity, verbose))  # polarity 1/0 is normal (off) electrical state.
+    finally:
+        asyncio.new_event_loop()
+        tests()
+
+def test_wait():
+    try:
+        asyncio.run(wait_test())
+    finally:
+        asyncio.new_event_loop()
+        tests()
diff --git a/primitives/tests/switches.py b/primitives/tests/switches.py
new file mode 100644 (file)
index 0000000..59fa779
--- /dev/null
@@ -0,0 +1,259 @@
+# Test/demo programs for Switch and Pushbutton classes
+# Tested on Pyboard but should run on other microcontroller platforms
+# running MicroPython with uasyncio library.
+
+# Copyright (c) 2018-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+# Now executes .deinit()
+
+# To run:
+# from primitives.tests.switches import *
+# test_sw()  # For example
+
+from machine import Pin
+from pyb import LED
+from primitives import Switch, Pushbutton
+import uasyncio as asyncio
+
+helptext = '''
+Test using switch or pushbutton between X1 and gnd.
+Ground pin X2 to terminate test.
+
+'''
+tests = '''
+\x1b[32m
+Available tests:
+test_sw Switch test.
+test_swcb Switch with callback.
+test_sw_event Switch with event.
+test_btn Pushutton launching coros.
+test_btncb Pushbutton launching callbacks.
+btn_dynamic Change coros launched at runtime.
+btn_event Pushbutton event interface.
+\x1b[39m
+'''
+print(tests)
+
+# Pulse an LED (coroutine)
+async def pulse(led, ms):
+    led.on()
+    await asyncio.sleep_ms(ms)
+    led.off()
+
+# Pulse an LED when an event triggered
+async def evt_pulse(event, led):
+    while True:
+        event.clear()
+        await event.wait()
+        led.on()
+        await asyncio.sleep_ms(500)
+        led.off()
+
+# Toggle an LED (callback)
+def toggle(led):
+    led.toggle()
+
+# Quit test by connecting X2 to ground
+async def killer(obj):
+    pin = Pin('X2', Pin.IN, Pin.PULL_UP)
+    while pin.value():
+        await asyncio.sleep_ms(50)
+    obj.deinit()
+    await asyncio.sleep_ms(0)
+
+def run(obj):
+    try:
+        asyncio.run(killer(obj))
+    except KeyboardInterrupt:
+        print('Interrupted')
+    finally:
+        asyncio.new_event_loop()
+        print(tests)
+
+
+# Test for the Switch class passing coros
+def test_sw():
+    s = '''
+close pulses green
+open pulses red
+'''
+    print('Test of switch scheduling coroutines.')
+    print(helptext)
+    print(s)
+    pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+    red = LED(1)
+    green = LED(2)
+    sw = Switch(pin)
+    # Register coros to launch on contact close and open
+    sw.close_func(pulse, (green, 1000))
+    sw.open_func(pulse, (red, 1000))
+    run(sw)
+
+# Test for the switch class with a callback
+def test_swcb():
+    s = '''
+close toggles red
+open toggles green
+'''
+    print('Test of switch executing callbacks.')
+    print(helptext)
+    print(s)
+    pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+    red = LED(1)
+    green = LED(2)
+    sw = Switch(pin)
+    # Register a coro to launch on contact close
+    sw.close_func(toggle, (red,))
+    sw.open_func(toggle, (green,))
+    run(sw)
+
+# Test for the Switch class (events)
+async def do_sw_event():
+    pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+    sw = Switch(pin)
+    sw.open_func(None)
+    sw.close_func(None)
+    tasks = []
+    for event, led in ((sw.close, 1), (sw.open, 2)):
+        tasks.append(asyncio.create_task(evt_pulse(event, LED(led))))
+    await killer(sw)
+    for task in tasks:
+        task.cancel()
+
+def test_sw_event():
+    s = '''
+close pulse red
+open pulses green
+'''
+    print('Test of switch triggering events.')
+    print(helptext)
+    print(s)
+    try:
+        asyncio.run(do_sw_event())
+    except KeyboardInterrupt:
+        print('Interrupted')
+    finally:
+        asyncio.new_event_loop()
+        print(tests)
+
+# Test for the Pushbutton class (coroutines)
+# Pass True to test suppress
+def test_btn(suppress=False, lf=True, df=True):
+    s = '''
+press pulses red
+release pulses green
+double click pulses yellow
+long press pulses blue
+'''
+    print('Test of pushbutton scheduling coroutines.')
+    print(helptext)
+    print(s)
+    pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+    red = LED(1)
+    green = LED(2)
+    yellow = LED(3)
+    blue = LED(4)
+    pb = Pushbutton(pin, suppress)
+    pb.press_func(pulse, (red, 1000))
+    pb.release_func(pulse, (green, 1000))
+    if df:
+        print('Doubleclick enabled')
+        pb.double_func(pulse, (yellow, 1000))
+    if lf:
+        print('Long press enabled')
+        pb.long_func(pulse, (blue, 1000))
+    run(pb)
+
+# Test for the Pushbutton class (callbacks)
+def test_btncb():
+    s = '''
+press toggles red
+release toggles green
+double click toggles yellow
+long press toggles blue
+'''
+    print('Test of pushbutton executing callbacks.')
+    print(helptext)
+    print(s)
+    pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+    red = LED(1)
+    green = LED(2)
+    yellow = LED(3)
+    blue = LED(4)
+    pb = Pushbutton(pin)
+    pb.press_func(toggle, (red,))
+    pb.release_func(toggle, (green,))
+    pb.double_func(toggle, (yellow,))
+    pb.long_func(toggle, (blue,))
+    run(pb)
+
+# Test for the Pushbutton class where callback coros change dynamically
+def setup(pb, press, release, dbl, lng, t=1000):
+    s = '''
+Functions are changed:
+LED's pulse for 2 seconds
+press pulses blue
+release pulses red
+double click pulses green
+long pulses yellow
+'''
+    pb.press_func(pulse, (press, t))
+    pb.release_func(pulse, (release, t))
+    pb.double_func(pulse, (dbl, t))
+    if lng is not None:
+        pb.long_func(pulse, (lng, t))
+        print(s)
+
+def btn_dynamic():
+    s = '''
+press pulses red
+release pulses green
+double click pulses yellow
+long press changes button functions.
+'''
+    print('Test of pushbutton scheduling coroutines.')
+    print(helptext)
+    print(s)
+    pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+    red = LED(1)
+    green = LED(2)
+    yellow = LED(3)
+    blue = LED(4)
+    pb = Pushbutton(pin)
+    setup(pb, red, green, yellow, None)
+    pb.long_func(setup, (pb, blue, red, green, yellow, 2000))
+    run(pb)
+
+# Test for the Pushbutton class (events)
+async def do_btn_event():
+    pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+    pb = Pushbutton(pin)
+    pb.press_func(None)
+    pb.release_func(None)
+    pb.double_func(None)
+    pb.long_func(None)
+    tasks = []
+    for event, led in ((pb.press, 1), (pb.release, 2), (pb.double, 3), (pb.long, 4)):
+        tasks.append(asyncio.create_task(evt_pulse(event, LED(led))))
+    await killer(pb)
+    for task in tasks:
+        task.cancel()
+
+def btn_event():
+    s = '''
+press pulse red
+release pulses green
+double click pulses yellow
+long press pulses blue
+'''
+    print('Test of pushbutton triggering events.')
+    print(helptext)
+    print(s)
+    try:
+        asyncio.run(do_btn_event())
+    except KeyboardInterrupt:
+        print('Interrupted')
+    finally:
+        asyncio.new_event_loop()
+        print(tests)
+