From: jimmy Date: Fri, 28 Jun 2024 01:20:54 +0000 (-0500) Subject: infrared remote X-Git-Url: https://vault307.fbx.one/gitweb/ir_remote.git/commitdiff_plain/HEAD?ds=inline infrared remote --- a75443df2f1febce2f461ea0707c2d95bc97a7ef diff --git a/ir_rx/__init__.py b/ir_rx/__init__.py new file mode 100644 index 0000000..a8ff823 --- /dev/null +++ b/ir_rx/__init__.py @@ -0,0 +1,70 @@ +# ir_rx __init__.py Decoder for IR remote control using synchronous code +# IR_RX abstract base class for IR receivers. + +# Author: Peter Hinch +# Copyright Peter Hinch 2020-2021 Released under the MIT license + +from machine import Timer, Pin +from array import array +from utime import ticks_us + +# Save RAM +# from micropython import alloc_emergency_exception_buf +# alloc_emergency_exception_buf(100) + + +# On 1st edge start a block timer. While the timer is running, record the time +# of each edge. When the timer times out decode the data. Duration must exceed +# the worst case block transmission time, but be less than the interval between +# a block start and a repeat code start (~108ms depending on protocol) + +class IR_RX(): + # Result/error codes + # Repeat button code + REPEAT = -1 + # Error codes + BADSTART = -2 + BADBLOCK = -3 + BADREP = -4 + OVERRUN = -5 + BADDATA = -6 + BADADDR = -7 + + def __init__(self, pin, nedges, tblock, callback, *args): # Optional args for callback + self._pin = pin + self._nedges = nedges + self._tblock = tblock + self.callback = callback + self.args = args + self._errf = lambda _ : None + self.verbose = False + + self._times = array('i', (0 for _ in range(nedges + 1))) # +1 for overrun + pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING)) + self.edge = 0 + self.tim = Timer(-1) # Sofware timer + self.cb = self.decode + + # Pin interrupt. Save time of each edge for later decode. + def _cb_pin(self, line): + t = ticks_us() + # On overrun ignore pulses until software timer times out + if self.edge <= self._nedges: # Allow 1 extra pulse to record overrun + if not self.edge: # First edge received + self.tim.init(period=self._tblock , mode=Timer.ONE_SHOT, callback=self.cb) + self._times[self.edge] = t + self.edge += 1 + + def do_callback(self, cmd, addr, ext, thresh=0): + self.edge = 0 + if cmd >= thresh: + self.callback(cmd, addr, ext, *self.args) + else: + self._errf(cmd) + + def error_function(self, func): + self._errf = func + + def close(self): + self._pin.irq(handler = None) + self.tim.deinit() diff --git a/ir_rx/acquire.py b/ir_rx/acquire.py new file mode 100644 index 0000000..af3ab38 --- /dev/null +++ b/ir_rx/acquire.py @@ -0,0 +1,108 @@ +# acquire.py Acquire a pulse train from an IR remote +# Supports NEC protocol. +# For a remote using NEC see https://www.adafruit.com/products/389 + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from machine import Pin, freq +from sys import platform + +from utime import sleep_ms, ticks_us, ticks_diff +from ir_rx import IR_RX + + +class IR_GET(IR_RX): + def __init__(self, pin, nedges=100, twait=100, display=True): + self.display = display + super().__init__(pin, nedges, twait, lambda *_ : None) + self.data = None + + def decode(self, _): + def near(v, target): + return target * 0.8 < v < target * 1.2 + lb = self.edge - 1 # Possible length of burst + if lb < 3: + return # Noise + burst = [] + for x in range(lb): + dt = ticks_diff(self._times[x + 1], self._times[x]) + if x > 0 and dt > 10000: # Reached gap between repeats + break + burst.append(dt) + lb = len(burst) # Actual length + # Duration of pulse train 24892 for RC-5 22205 for RC-6 + duration = ticks_diff(self._times[lb - 1], self._times[0]) + + if self.display: + for x, e in enumerate(burst): + print('{:03d} {:5d}'.format(x, e)) + print() + # Attempt to determine protocol + ok = False # Protocol not yet found + if near(burst[0], 9000) and lb == 67: + print('NEC') + ok = True + + if not ok and near(burst[0], 2400) and near(burst[1], 600): # Maybe Sony + try: + nbits = {25:12, 31:15, 41:20}[lb] + except KeyError: + pass + else: + ok = True + print('Sony {}bit'.format(nbits)) + + if not ok and near(burst[0], 889): # Maybe RC-5 + if near(duration, 24892) and near(max(burst), 1778): + print('Philps RC-5') + ok = True + + if not ok and near(burst[0], 2666) and near(burst[1], 889): # RC-6? + if near(duration, 22205) and near(burst[1], 889) and near(burst[2], 444): + print('Philips RC-6 mode 0') + ok = True + + if not ok and near(burst[0], 2000) and near(burst[1], 1000): + if near(duration, 19000): + print('Microsoft MCE edition protocol.') + # Constant duration, variable burst length, presumably bi-phase + print('Protocol start {} {} Burst length {} duration {}'.format(burst[0], burst[1], lb, duration)) + ok = True + + if not ok and near(burst[0], 4500) and near(burst[1], 4500) and lb == 67: # Samsung + print('Samsung') + ok = True + + if not ok and near(burst[0], 3500) and near(burst[1], 1680): # Panasonic? + print('Unsupported protocol. Panasonic?') + ok = True + + if not ok: + print('Unknown protocol start {} {} Burst length {} duration {}'.format(burst[0], burst[1], lb, duration)) + + print() + self.data = burst + # Set up for new data burst. Run null callback + self.do_callback(0, 0, 0) + + def acquire(self): + while self.data is None: + sleep_ms(5) + self.close() + return self.data + +def test(): + # Define pin according to platform + if platform == 'pyboard': + pin = Pin('X3', Pin.IN) + elif platform == 'esp8266': + freq(160000000) + pin = Pin(13, Pin.IN) + elif platform == 'esp32' or platform == 'esp32_LoBo': + pin = Pin(23, Pin.IN) + elif platform == 'rp2': + pin = Pin(16, Pin.IN) + irg = IR_GET(pin) + print('Waiting for IR data...') + return irg.acquire() diff --git a/ir_rx/mce.py b/ir_rx/mce.py new file mode 100644 index 0000000..c3e3849 --- /dev/null +++ b/ir_rx/mce.py @@ -0,0 +1,68 @@ +# mce.py Decoder for IR remote control using synchronous code +# Supports Microsoft MCE edition remote protocol. + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +# WARNING: This is experimental and subject to change. + +from utime import ticks_us, ticks_diff +from ir_rx import IR_RX + +class MCE(IR_RX): + init_cs = 4 # http://www.hifi-remote.com/johnsfine/DecodeIR.html#OrtekMCE says 3 + def __init__(self, pin, callback, *args): + # Block lasts ~19ms and has <= 34 edges + super().__init__(pin, 34, 25, callback, *args) + + def decode(self, _): + def check(v): + if self.init_cs == -1: + return True + csum = v >> 12 + cs = self.init_cs + for _ in range(12): + if v & 1: + cs += 1 + v >>= 1 + return cs == csum + + try: + t0 = ticks_diff(self._times[1], self._times[0]) # 2000μs mark + t1 = ticks_diff(self._times[2], self._times[1]) # 1000μs space + if not ((1800 < t0 < 2200) and (800 < t1 < 1200)): + raise RuntimeError(self.BADSTART) + nedges = self.edge # No. of edges detected + if not 14 <= nedges <= 34: + raise RuntimeError(self.OVERRUN if nedges > 28 else self.BADSTART) + # Manchester decode + mask = 1 + bit = 1 + v = 0 + x = 2 + for _ in range(16): + # -1 convert count to index, -1 because we look ahead + if x > nedges - 2: + raise RuntimeError(self.BADBLOCK) + # width is 500/1000 nominal + width = ticks_diff(self._times[x + 1], self._times[x]) + if not 250 < width < 1350: + self.verbose and print('Bad block 3 Width', width, 'x', x) + raise RuntimeError(self.BADBLOCK) + short = int(width < 750) + bit ^= short ^ 1 + v |= mask if bit else 0 + mask <<= 1 + x += 1 + short + + self.verbose and print(bin(v)) + if not check(v): + raise RuntimeError(self.BADDATA) + val = (v >> 6) & 0x3f + addr = v & 0xf # Constant for all buttons on my remote + ctrl = (v >> 4) & 3 + + except RuntimeError as e: + val, addr, ctrl = e.args[0], 0, 0 + # Set up for new data burst and run user callback/error function + self.do_callback(val, addr, ctrl) diff --git a/ir_rx/nec.py b/ir_rx/nec.py new file mode 100644 index 0000000..011b13d --- /dev/null +++ b/ir_rx/nec.py @@ -0,0 +1,69 @@ +# nec.py Decoder for IR remote control using synchronous code +# Supports NEC and Samsung protocols. +# With thanks to J.E. Tannenbaum for information re Samsung protocol + +# For a remote using NEC see https://www.adafruit.com/products/389 + +# Author: Peter Hinch +# Copyright Peter Hinch 2020-2022 Released under the MIT license + +from utime import ticks_us, ticks_diff +from ir_rx import IR_RX + +class NEC_ABC(IR_RX): + def __init__(self, pin, extended, samsung, callback, *args): + # Block lasts <= 80ms (extended mode) and has 68 edges + super().__init__(pin, 68, 80, callback, *args) + self._extended = extended + self._addr = 0 + self._leader = 2500 if samsung else 4000 # 4.5ms for Samsung else 9ms + + def decode(self, _): + try: + if self.edge > 68: + raise RuntimeError(self.OVERRUN) + width = ticks_diff(self._times[1], self._times[0]) + if width < self._leader: # 9ms leading mark for all valid data + raise RuntimeError(self.BADSTART) + width = ticks_diff(self._times[2], self._times[1]) + if width > 3000: # 4.5ms space for normal data + if self.edge < 68: # Haven't received the correct number of edges + raise RuntimeError(self.BADBLOCK) + # Time spaces only (marks are always 562.5µs) + # Space is 1.6875ms (1) or 562.5µs (0) + # Skip last bit which is always 1 + val = 0 + for edge in range(3, 68 - 2, 2): + val >>= 1 + if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120: + val |= 0x80000000 + elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges. + raise RuntimeError(self.REPEAT if self.edge == 4 else self.BADREP) # Treat REPEAT as error. + else: + raise RuntimeError(self.BADSTART) + addr = val & 0xff # 8 bit addr + cmd = (val >> 16) & 0xff + if cmd != (val >> 24) ^ 0xff: + raise RuntimeError(self.BADDATA) + if addr != ((val >> 8) ^ 0xff) & 0xff: # 8 bit addr doesn't match check + if not self._extended: + raise RuntimeError(self.BADADDR) + addr |= val & 0xff00 # pass assumed 16 bit address to callback + self._addr = addr + except RuntimeError as e: + cmd = e.args[0] + addr = self._addr if cmd == self.REPEAT else 0 # REPEAT uses last address + # Set up for new data burst and run user callback + self.do_callback(cmd, addr, 0, self.REPEAT) + +class NEC_8(NEC_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, False, False, callback, *args) + +class NEC_16(NEC_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, True, False, callback, *args) + +class SAMSUNG(NEC_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, True, True, callback, *args) diff --git a/ir_rx/philips.py b/ir_rx/philips.py new file mode 100644 index 0000000..cd7feaa --- /dev/null +++ b/ir_rx/philips.py @@ -0,0 +1,123 @@ +# philips.py Decoder for IR remote control using synchronous code +# Supports Philips RC-5 RC-6 mode 0 protocols. + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from utime import ticks_us, ticks_diff +from ir_rx import IR_RX + +class RC5_IR(IR_RX): + def __init__(self, pin, callback, *args): + # Block lasts <= 30ms and has <= 28 edges + super().__init__(pin, 28, 30, callback, *args) + + def decode(self, _): + try: + nedges = self.edge # No. of edges detected + if not 14 <= nedges <= 28: + raise RuntimeError(self.OVERRUN if nedges > 28 else self.BADSTART) + # Regenerate bitstream + bits = 1 + bit = 1 + v = 1 # 14 bit bitstream, MSB always 1 + x = 0 + while bits < 14: + # -1 convert count to index, -1 because we look ahead + if x > nedges - 2: + print('Bad block 1 edges', nedges, 'x', x) + raise RuntimeError(self.BADBLOCK) + # width is 889/1778 nominal + width = ticks_diff(self._times[x + 1], self._times[x]) + if not 500 < width < 2100: + self.verbose and print('Bad block 3 Width', width, 'x', x) + raise RuntimeError(self.BADBLOCK) + short = width < 1334 + if not short: + bit ^= 1 + v <<= 1 + v |= bit + bits += 1 + x += 1 + int(short) + self.verbose and print(bin(v)) + # Split into fields (val, addr, ctrl) + val = (v & 0x3f) | (0 if ((v >> 12) & 1) else 0x40) # Correct the polarity of S2 + addr = (v >> 6) & 0x1f + ctrl = (v >> 11) & 1 + + except RuntimeError as e: + val, addr, ctrl = e.args[0], 0, 0 + # Set up for new data burst and run user callback + self.do_callback(val, addr, ctrl) + + +class RC6_M0(IR_RX): + # Even on Pyboard D the 444μs nominal pulses can be recorded as up to 705μs + # Scope shows 360-520 μs (-84μs +76μs relative to nominal) + # Header nominal 2666, 889, 444, 889, 444, 444, 444, 444 carrier ON at end + hdr = ((1800, 4000), (593, 1333), (222, 750), (593, 1333), (222, 750), (222, 750), (222, 750), (222, 750)) + def __init__(self, pin, callback, *args): + # Block lasts 23ms nominal and has <=44 edges + super().__init__(pin, 44, 30, callback, *args) + + def decode(self, _): + try: + nedges = self.edge # No. of edges detected + if not 22 <= nedges <= 44: + raise RuntimeError(self.OVERRUN if nedges > 28 else self.BADSTART) + for x, lims in enumerate(self.hdr): + width = ticks_diff(self._times[x + 1], self._times[x]) + if not (lims[0] < width < lims[1]): + self.verbose and print('Bad start', x, width, lims) + raise RuntimeError(self.BADSTART) + x += 1 + width = ticks_diff(self._times[x + 1], self._times[x]) + # 2nd bit of last 0 is 444μs (0) or 1333μs (1) + if not 222 < width < 1555: + self.verbose and print('Bad block 1 Width', width, 'x', x) + raise RuntimeError(self.BADBLOCK) + short = width < 889 + v = int(not short) + bit = v + bits = 1 # Bits decoded + x += 1 + int(short) + width = ticks_diff(self._times[x + 1], self._times[x]) + if not 222 < width < 1555: + self.verbose and print('Bad block 2 Width', width, 'x', x) + raise RuntimeError(self.BADBLOCK) + short = width < 1111 + if not short: + bit ^= 1 + x += 1 + int(short) # If it's short, we know width of next + v <<= 1 + v |= bit # MSB of result + bits += 1 + # Decode bitstream + while bits < 17: + # -1 convert count to index, -1 because we look ahead + if x > nedges - 2: + raise RuntimeError(self.BADBLOCK) + # width is 444/889 nominal + width = ticks_diff(self._times[x + 1], self._times[x]) + if not 222 < width < 1111: + self.verbose and print('Bad block 3 Width', width, 'x', x) + raise RuntimeError(self.BADBLOCK) + short = width < 666 + if not short: + bit ^= 1 + v <<= 1 + v |= bit + bits += 1 + x += 1 + int(short) + + if self.verbose: + ss = '20-bit format {:020b} x={} nedges={} bits={}' + print(ss.format(v, x, nedges, bits)) + + val = v & 0xff + addr = (v >> 8) & 0xff + ctrl = (v >> 16) & 1 + except RuntimeError as e: + val, addr, ctrl = e.args[0], 0, 0 + # Set up for new data burst and run user callback + self.do_callback(val, addr, ctrl) diff --git a/ir_rx/print_error.py b/ir_rx/print_error.py new file mode 100644 index 0000000..31ce51e --- /dev/null +++ b/ir_rx/print_error.py @@ -0,0 +1,19 @@ +# print_error.py Error print for IR receiver + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from ir_rx import IR_RX + +_errors = {IR_RX.BADSTART : 'Invalid start pulse', + IR_RX.BADBLOCK : 'Error: bad block', + IR_RX.BADREP : 'Error: repeat', + IR_RX.OVERRUN : 'Error: overrun', + IR_RX.BADDATA : 'Error: invalid data', + IR_RX.BADADDR : 'Error: invalid address'} + +def print_error(data): + if data in _errors: + print(_errors[data]) + else: + print('Unknown error code:', data) diff --git a/ir_rx/sony.py b/ir_rx/sony.py new file mode 100644 index 0000000..1050356 --- /dev/null +++ b/ir_rx/sony.py @@ -0,0 +1,70 @@ +# sony.py Decoder for IR remote control using synchronous code +# Sony SIRC protocol. + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from utime import ticks_us, ticks_diff +from ir_rx import IR_RX + +class SONY_ABC(IR_RX): # Abstract base class + def __init__(self, pin, bits, callback, *args): + # 20 bit block has 42 edges and lasts <= 39ms nominal. Add 4ms to time + # for tolerances except in 20 bit case where timing is tight with a + # repeat period of 45ms. + t = int(3 + bits * 1.8) + (1 if bits == 20 else 4) + super().__init__(pin, 2 + bits * 2, t, callback, *args) + self._addr = 0 + self._bits = 20 + + def decode(self, _): + try: + nedges = self.edge # No. of edges detected + self.verbose and print('nedges', nedges) + if nedges > 42: + raise RuntimeError(self.OVERRUN) + bits = (nedges - 2) // 2 + if nedges not in (26, 32, 42) or bits > self._bits: + raise RuntimeError(self.BADBLOCK) + self.verbose and print('SIRC {}bit'.format(bits)) + width = ticks_diff(self._times[1], self._times[0]) + if not 1800 < width < 3000: # 2.4ms leading mark for all valid data + raise RuntimeError(self.BADSTART) + width = ticks_diff(self._times[2], self._times[1]) + if not 350 < width < 1000: # 600μs space + raise RuntimeError(self.BADSTART) + + val = 0 # Data received, LSB 1st + x = 2 + bit = 1 + while x <= nedges - 2: + if ticks_diff(self._times[x + 1], self._times[x]) > 900: + val |= bit + bit <<= 1 + x += 2 + cmd = val & 0x7f # 7 bit command + val >>= 7 + if nedges < 42: + addr = val & 0xff # 5 or 8 bit addr + val = 0 + else: + addr = val & 0x1f # 5 bit addr + val >>= 5 # 8 bit extended + except RuntimeError as e: + cmd = e.args[0] + addr = 0 + val = 0 + self.do_callback(cmd, addr, val) + +class SONY_12(SONY_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, 12, callback, *args) + +class SONY_15(SONY_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, 15, callback, *args) + +class SONY_20(SONY_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, 20, callback, *args) + diff --git a/ir_rx/test.py b/ir_rx/test.py new file mode 100644 index 0000000..c4dbd7f --- /dev/null +++ b/ir_rx/test.py @@ -0,0 +1,70 @@ +# test.py Test program for IR remote control decoder +# Supports Pyboard, ESP32 and ESP8266 + +# Author: Peter Hinch +# Copyright Peter Hinch 2020-2022 Released under the MIT license + +# Run this to characterise a remote. + +from sys import platform +import time +import gc +from machine import Pin, freq +from ir_rx.print_error import print_error # Optional print of error codes + +# Import all implemented classes +from ir_rx.nec import NEC_8, NEC_16, SAMSUNG +from ir_rx.sony import SONY_12, SONY_15, SONY_20 +from ir_rx.philips import RC5_IR, RC6_M0 +from ir_rx.mce import MCE + +# Define pin according to platform +if platform == "pyboard": + p = Pin("X3", Pin.IN) +elif platform == "esp8266": + freq(160000000) + p = Pin(13, Pin.IN) +elif platform == "esp32" or platform == "esp32_LoBo": + p = Pin(23, Pin.IN) +elif platform == "rp2": + p = Pin(16, Pin.IN) + +# User callback +def cb(data, addr, ctrl): + if data < 0: # NEC protocol sends repeat codes. + print("Repeat code.") + else: + print(f"Data 0x{data:02x} Addr 0x{addr:04x} Ctrl 0x{ctrl:02x}") + + +def test(proto=0): + classes = (NEC_8, NEC_16, SONY_12, SONY_15, SONY_20, RC5_IR, RC6_M0, MCE, SAMSUNG) + ir = classes[proto](p, cb) # Instantiate receiver + ir.error_function(print_error) # Show debug information + # ir.verbose = True + # A real application would do something here... + try: + while True: + print("running") + time.sleep(5) + gc.collect() + except KeyboardInterrupt: + ir.close() + + +# **** DISPLAY GREETING **** +s = """Test for IR receiver. Run: +from ir_rx.test import test +test() for NEC 8 bit protocol, +test(1) for NEC 16 bit, +test(2) for Sony SIRC 12 bit, +test(3) for Sony SIRC 15 bit, +test(4) for Sony SIRC 20 bit, +test(5) for Philips RC-5 protocol, +test(6) for RC6 mode 0. +test(7) for Microsoft Vista MCE. +test(8) for Samsung. + +Hit ctrl-c to stop, then ctrl-d to soft reset.""" + +print(s) diff --git a/ir_tx/__init__.py b/ir_tx/__init__.py new file mode 100644 index 0000000..e791b4a --- /dev/null +++ b/ir_tx/__init__.py @@ -0,0 +1,133 @@ +# __init__.py Nonblocking IR blaster +# Runs on Pyboard D or Pyboard 1.x (not Pyboard Lite), ESP32 and RP2 + +# Released under the MIT License (MIT). See LICENSE. + +# Copyright (c) 2020-2021 Peter Hinch +from sys import platform +ESP32 = platform == 'esp32' # Loboris not supported owing to RMT +RP2 = platform == 'rp2' +if ESP32: + from machine import Pin, PWM + from esp32 import RMT +elif RP2: + from .rp2_rmt import RP2_RMT +else: + from pyb import Pin, Timer # Pyboard does not support machine.PWM + +from micropython import const +from array import array +from time import ticks_us, ticks_diff +# import micropython +# micropython.alloc_emergency_exception_buf(100) + + +# Shared by NEC +STOP = const(0) # End of data + +# IR abstract base class. Array holds periods in μs between toggling 36/38KHz +# carrier on or off. Physical transmission occurs in an ISR context controlled +# by timer 2 and timer 5. See TRANSMITTER.md for details of operation. +class IR: + _active_high = True # Hardware turns IRLED on if pin goes high. + _space = 0 # Duty ratio that causes IRLED to be off + timeit = False # Print timing info + + @classmethod + def active_low(cls): + if ESP32: + raise ValueError('Cannot set active low on ESP32') + cls._active_high = False + cls._space = 100 + + def __init__(self, pin, cfreq, asize, duty, verbose): + if ESP32: + self._rmt = RMT(0, pin=pin, clock_div=80, tx_carrier = (cfreq, duty, 1)) + # 1μs resolution + elif RP2: # PIO-based RMT-like device + self._rmt = RP2_RMT(pin_pulse=None, carrier=(pin, cfreq, duty)) # 1μs resolution + asize += 1 # Allow for possible extra space pulse + else: # Pyboard + if not IR._active_high: + duty = 100 - duty + tim = Timer(2, freq=cfreq) # Timer 2/pin produces 36/38/40KHz carrier + self._ch = tim.channel(1, Timer.PWM, pin=pin) + self._ch.pulse_width_percent(self._space) # Turn off IR LED + # Pyboard: 0 <= pulse_width_percent <= 100 + self._duty = duty + self._tim = Timer(5) # Timer 5 controls carrier on/off times + self._tcb = self._cb # Pre-allocate + self._arr = array('H', 0 for _ in range(asize)) # on/off times (μs) + self._mva = memoryview(self._arr) + # Subclass interface + self.verbose = verbose + self.carrier = False # Notional carrier state while encoding biphase + self.aptr = 0 # Index into array + + def _cb(self, t): # T5 callback, generate a carrier mark or space + t.deinit() + p = self.aptr + v = self._arr[p] + if v == STOP: + self._ch.pulse_width_percent(self._space) # Turn off IR LED. + return + self._ch.pulse_width_percent(self._space if p & 1 else self._duty) + self._tim.init(prescaler=84, period=v, callback=self._tcb) + self.aptr += 1 + + # Public interface + # Before populating array, zero pointer, set notional carrier state (off). + def transmit(self, addr, data, toggle=0, validate=False): # NEC: toggle is unused + t = ticks_us() + if validate: + if addr > self.valid[0] or addr < 0: + raise ValueError('Address out of range', addr) + if data > self.valid[1] or data < 0: + raise ValueError('Data out of range', data) + if toggle > self.valid[2] or toggle < 0: + raise ValueError('Toggle out of range', toggle) + self.aptr = 0 # Inital conditions for tx: index into array + self.carrier = False + self.tx(addr, data, toggle) # Subclass populates ._arr + self.trigger() # Initiate transmission + if self.timeit: + dt = ticks_diff(ticks_us(), t) + print('Time = {}μs'.format(dt)) + + # Subclass interface + def trigger(self): # Used by NEC to initiate a repeat frame + if ESP32: + self._rmt.write_pulses(tuple(self._mva[0 : self.aptr])) + elif RP2: + self.append(STOP) + self._rmt.send(self._arr) + else: + self.append(STOP) + self.aptr = 0 # Reset pointer + self._cb(self._tim) # Initiate physical transmission. + + def append(self, *times): # Append one or more time peiods to ._arr + for t in times: + self._arr[self.aptr] = t + self.aptr += 1 + self.carrier = not self.carrier # Keep track of carrier state + self.verbose and print('append', t, 'carrier', self.carrier) + + def add(self, t): # Increase last time value (for biphase) + assert t > 0 + self.verbose and print('add', t) + # .carrier unaffected + self._arr[self.aptr - 1] += t + + +# Given an iterable (e.g. list or tuple) of times, emit it as an IR stream. +class Player(IR): + + def __init__(self, pin, freq=38000, verbose=False): # NEC specifies 38KHz + super().__init__(pin, freq, 68, 33, verbose) # Measured duty ratio 33% + + def play(self, lst): + for x, t in enumerate(lst): + self._arr[x] = t + self.aptr = x + 1 + self.trigger() diff --git a/ir_tx/mce.py b/ir_tx/mce.py new file mode 100644 index 0000000..b2b7267 --- /dev/null +++ b/ir_tx/mce.py @@ -0,0 +1,44 @@ +# mce.py Encoder for IR remote control using synchronous code +# Supports Microsoft MCE edition remote protocol. + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +# WARNING: This is experimental and subject to change. + +from micropython import const +from ir_tx import IR + +_TBIT = const(500) # Time (μs) for pulse of carrier + + +class MCE(IR): + valid = (0xf, 0x3f, 3) # Max addr, data, toggle + init_cs = 4 # http://www.hifi-remote.com/johnsfine/DecodeIR.html#OrtekMCE says 3 + + def __init__(self, pin, freq=38000, verbose=False): + super().__init__(pin, freq, 34, 30, verbose) + + def tx(self, addr, data, toggle): + def checksum(v): + cs = self.init_cs + for _ in range(12): + if v & 1: + cs += 1 + v >>= 1 + return cs + + self.append(2000, 1000, _TBIT) + d = ((data & 0x3f) << 6) | (addr & 0xf) | ((toggle & 3) << 4) + d |= checksum(d) << 12 + self.verbose and print(bin(d)) + + mask = 1 + while mask < 0x10000: + bit = bool(d & mask) + if bit ^ self.carrier: + self.add(_TBIT) + self.append(_TBIT) + else: + self.append(_TBIT, _TBIT) + mask <<= 1 diff --git a/ir_tx/mcetest.py b/ir_tx/mcetest.py new file mode 100644 index 0000000..fc473d9 --- /dev/null +++ b/ir_tx/mcetest.py @@ -0,0 +1,95 @@ +# ir_tx.mcetest Test for nonblocking MCE IR blaster. + +# Released under the MIT License (MIT). See LICENSE. + +# Copyright (c) 2020 Peter Hinch + +# Implements a 2-button remote control on a Pyboard with auto repeat. +from sys import platform +ESP32 = platform == 'esp32' +if ESP32: + from machine import Pin +else: + from pyb import Pin, LED + +from micropython import const +import uasyncio as asyncio +from aswitch import Switch, Delay_ms +from ir_tx.mce import MCE + +loop = asyncio.get_event_loop() +_FIRST = const(0) +_REP = const(1) +_END = const(2) +_REP_DELAY = const(60) + +class Rbutton: + def __init__(self, irb, pin, addr, data, rep_code=False): + self.irb = irb + self.sw = Switch(pin) + self.addr = addr + self.data = data + self.rep_code = rep_code + self.sw.close_func(self.cfunc) + self.sw.open_func(self.ofunc) + self.tim = Delay_ms(self.repeat) + self.stop = False + + def cfunc(self): # Button push: send data and set up for repeats + self.irb.transmit(self.addr, self.data, _FIRST, True) + self.tim.trigger(_REP_DELAY) + + def ofunc(self): # Button release: cancel repeat timer + self.stop = True + + async def repeat(self): + await asyncio.sleep(0) # Let timer stop before retriggering + if self.stop: # Button has been released: send last message + self.stop = False + self.tim.stop() # Not strictly necessary + self.irb.transmit(self.addr, self.data, _END, True) + else: + self.tim.trigger(_REP_DELAY) + self.irb.transmit(self.addr, self.data, _REP, True) + +async def main(): + pin = Pin(23, Pin.OUT, value = 0) if ESP32 else Pin('X1') + irb = MCE(pin) # verbose=True) + # Uncomment the following to print transmit timing + # irb.timeit = True + + b = [] # Rbutton instances + px3 = Pin(18, Pin.IN, Pin.PULL_UP) if ESP32 else Pin('X3', Pin.IN, Pin.PULL_UP) + px4 = Pin(19, Pin.IN, Pin.PULL_UP) if ESP32 else Pin('X4', Pin.IN, Pin.PULL_UP) + b.append(Rbutton(irb, px3, 0x1, 0x7)) + b.append(Rbutton(irb, px4, 0xe, 0xb)) + if ESP32: + while True: + print('Running') + await asyncio.sleep(5) + else: + led = LED(1) + while True: + await asyncio.sleep_ms(500) # Obligatory flashing LED. + led.toggle() + +# Greeting strings. Common: +s = '''Test for IR transmitter. Run: +from ir_tx.mcetest import test +test() +''' +# Pyboard: +spb = ''' +IR LED on pin X1 +Ground pin X3 to send addr 1 data 7 +Ground pin X4 to send addr 0xe data 0x0b.''' +# ESP32 +sesp = ''' +IR LED gate on pins 23, 21 +Ground pin 18 to send addr 1 data 7 +Ground pin 19 to send addr 0xe data 0x0b.''' + +print(''.join((s, sesp)) if ESP32 else ''.join((s, spb))) + +def test(): + loop.run_until_complete(main()) diff --git a/ir_tx/nec.py b/ir_tx/nec.py new file mode 100644 index 0000000..be9ee28 --- /dev/null +++ b/ir_tx/nec.py @@ -0,0 +1,46 @@ +# nec.py Encoder for IR remote control using synchronous code +# NEC protocol. + +# Author: Peter Hinch +# Copyright Peter Hinch 2020-2022 Released under the MIT license + +# With thanks to J.E. Tannenbaum for information re Samsung protocol +from micropython import const +from ir_tx import IR, STOP + +_TBURST = const(563) +_T_ONE = const(1687) + +class NEC(IR): + valid = (0xffff, 0xff, 0) # Max addr, data, toggle + samsung = False + + def __init__(self, pin, freq=38000, verbose=False): # NEC specifies 38KHz also Samsung + super().__init__(pin, freq, 68, 33, verbose) # Measured duty ratio 33% + + def _bit(self, b): + self.append(_TBURST, _T_ONE if b else _TBURST) + + def tx(self, addr, data, _): # Ignore toggle + if self.samsung: + self.append(4500, 4500) + else: + self.append(9000, 4500) + if addr < 256: # Short address: append complement + if self.samsung: + addr |= addr << 8 + else: + addr |= ((addr ^ 0xff) << 8) + for _ in range(16): + self._bit(addr & 1) + addr >>= 1 + data |= ((data ^ 0xff) << 8) + for _ in range(16): + self._bit(data & 1) + data >>= 1 + self.append(_TBURST) + + def repeat(self): + self.aptr = 0 + self.append(9000, 2250, _TBURST) + self.trigger() # Initiate physical transmission. diff --git a/ir_tx/philips.py b/ir_tx/philips.py new file mode 100644 index 0000000..e82e251 --- /dev/null +++ b/ir_tx/philips.py @@ -0,0 +1,65 @@ +# philips.py Encoder for IR remote control using synchronous code +# RC-5 and RC-6 mode 0 protocols. + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from micropython import const +from ir_tx import IR + +# Philips RC5 protocol +_T_RC5 = const(889) # Time for pulse of carrier + + +class RC5(IR): + valid = (0x1f, 0x7f, 1) # Max addr, data, toggle + + def __init__(self, pin, freq=36000, verbose=False): + super().__init__(pin, freq, 28, 30, verbose) + + def tx(self, addr, data, toggle): # Fix RC5X S2 bit polarity + d = (data & 0x3f) | ((addr & 0x1f) << 6) | (((data & 0x40) ^ 0x40) << 6) | ((toggle & 1) << 11) + self.verbose and print(bin(d)) + mask = 0x2000 + while mask: + if mask == 0x2000: + self.append(_T_RC5) + else: + bit = bool(d & mask) + if bit ^ self.carrier: + self.add(_T_RC5) + self.append(_T_RC5) + else: + self.append(_T_RC5, _T_RC5) + mask >>= 1 + +# Philips RC6 mode 0 protocol +_T_RC6 = const(444) +_T2_RC6 = const(889) + +class RC6_M0(IR): + valid = (0xff, 0xff, 1) # Max addr, data, toggle + + def __init__(self, pin, freq=36000, verbose=False): + super().__init__(pin, freq, 44, 30, verbose) + + def tx(self, addr, data, toggle): + # leader, 1, 0, 0, 0 + self.append(2666, _T2_RC6, _T_RC6, _T2_RC6, _T_RC6, _T_RC6, _T_RC6, _T_RC6, _T_RC6) + # Append a single bit of twice duration + if toggle: + self.add(_T2_RC6) + self.append(_T2_RC6) + else: + self.append(_T2_RC6, _T2_RC6) + d = (data & 0xff) | ((addr & 0xff) << 8) + mask = 0x8000 + self.verbose and print('toggle', toggle, self.carrier, bool(d & mask)) + while mask: + bit = bool(d & mask) + if bit ^ self.carrier: + self.append(_T_RC6, _T_RC6) + else: + self.add(_T_RC6) + self.append(_T_RC6) + mask >>= 1 diff --git a/ir_tx/rp2_rmt.py b/ir_tx/rp2_rmt.py new file mode 100644 index 0000000..2848419 --- /dev/null +++ b/ir_tx/rp2_rmt.py @@ -0,0 +1,107 @@ +# rp2_rmt.py A RMT-like class for the RP2. + +# Released under the MIT License (MIT). See LICENSE. + +# Copyright (c) 2021 Peter Hinch + +from machine import Pin, PWM +import rp2 + +@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, autopull=True, pull_thresh=32) +def pulsetrain(): + wrap_target() + out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end. + irq(rel(0)) + set(pins, 1) # Set pin high + label('loop') + jmp(x_dec,'loop') + irq(rel(0)) + set(pins, 0) # Set pin low + out(y, 32) # Low time. + label('loop_lo') + jmp(y_dec,'loop_lo') + wrap() + +@rp2.asm_pio(autopull=True, pull_thresh=32) +def irqtrain(): + wrap_target() + out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end. + irq(rel(0)) + label('loop') + jmp(x_dec,'loop') + wrap() + +class DummyPWM: + def duty_u16(self, _): + pass + +class RP2_RMT: + + def __init__(self, pin_pulse=None, carrier=None, sm_no=0, sm_freq=1_000_000): + if carrier is None: + self.pwm = DummyPWM() + self.duty = (0, 0) + else: + pin_car, freq, duty = carrier + self.pwm = PWM(pin_car) # Set up PWM with carrier off. + self.pwm.freq(freq) + self.pwm.duty_u16(0) + self.duty = (int(0xffff * duty // 100), 0) + if pin_pulse is None: + self.sm = rp2.StateMachine(sm_no, irqtrain, freq=sm_freq) + else: + self.sm = rp2.StateMachine(sm_no, pulsetrain, freq=sm_freq, set_base=pin_pulse) + self.apt = 0 # Array index + self.arr = None # Array + self.ict = None # Current IRQ count + self.icm = 0 # End IRQ count + self.reps = 0 # 0 == forever n == no. of reps + rp2.PIO(0).irq(self._cb) + + # IRQ callback. Because of FIFO IRQ's keep arriving after STOP. + def _cb(self, pio): + self.pwm.duty_u16(self.duty[self.ict & 1]) + self.ict += 1 + if d := self.arr[self.apt]: # If data available feed FIFO + self.sm.put(d) + self.apt += 1 + else: + if r := self.reps != 1: # All done if reps == 1 + if r: # 0 == run forever + self.reps -= 1 + self.sm.put(self.arr[0]) + self.apt = 1 # Set pointer and count to state + self.ict = 1 # after 1st IRQ + + # Arg is an array of times in μs terminated by 0. + def send(self, ar, reps=1, check=True): + self.sm.active(0) + self.reps = reps + ar[-1] = 0 # Ensure at least one STOP + for x, d in enumerate(ar): # Find 1st STOP + if d == 0: + break + if check: + # Pulse train must end with a space otherwise we leave carrier on. + # So, if it ends with a mark, append a space. Note __init__.py + # ensures that there is room in array. + if (x & 1): + ar[x] = 1 # space. Duration doesn't matter. + x += 1 + ar[x] = 0 # STOP + self.icm = x # index of 1st STOP + mv = memoryview(ar) + n = min(x, 4) # Fill FIFO if there are enough data points. + self.sm.put(mv[0 : n]) + self.arr = ar # Initial conditions for ISR + self.apt = n # Point to next data value + self.ict = 0 # IRQ count + self.sm.active(1) + + def busy(self): + if self.ict is None: + return False # Just instantiated + return self.ict < self.icm + + def cancel(self): + self.reps = 1 diff --git a/ir_tx/sony.py b/ir_tx/sony.py new file mode 100644 index 0000000..7372f5d --- /dev/null +++ b/ir_tx/sony.py @@ -0,0 +1,48 @@ +# sony.py Encoder for IR remote control using synchronous code +# Sony SIRC protocol. + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from micropython import const +from ir_tx import IR + +class SONY_ABC(IR): + + def __init__(self, pin, bits, freq, verbose): + super().__init__(pin, freq, 3 + bits * 2, 30, verbose) + if bits not in (12, 15, 20): + raise ValueError('bits must be 12, 15 or 20.') + self.bits = bits + + def tx(self, addr, data, ext): + self.append(2400, 600) + bits = self.bits + v = data & 0x7f + if bits == 12: + v |= (addr & 0x1f) << 7 + elif bits == 15: + v |= (addr & 0xff) << 7 + else: + v |= (addr & 0x1f) << 7 + v |= (ext & 0xff) << 12 + for _ in range(bits): + self.append(1200 if v & 1 else 600, 600) + v >>= 1 + +# Sony specifies 40KHz +class SONY_12(SONY_ABC): + valid = (0x1f, 0x7f, 0) # Max addr, data, toggle + def __init__(self, pin, freq=40000, verbose=False): + super().__init__(pin, 12, freq, verbose) + +class SONY_15(SONY_ABC): + valid = (0xff, 0x7f, 0) # Max addr, data, toggle + def __init__(self, pin, freq=40000, verbose=False): + super().__init__(pin, 15, freq, verbose) + +class SONY_20(SONY_ABC): + valid = (0x1f, 0x7f, 0xff) # Max addr, data, toggle + def __init__(self, pin, freq=40000, verbose=False): + super().__init__(pin, 20, freq, verbose) + diff --git a/ir_tx/test.py b/ir_tx/test.py new file mode 100644 index 0000000..e979b57 --- /dev/null +++ b/ir_tx/test.py @@ -0,0 +1,135 @@ +# ir_tx.test Test for nonblocking NEC/SONY/RC-5/RC-6 mode 0 IR blaster. + +# Released under the MIT License (MIT). See LICENSE. + +# Copyright (c) 2020 Peter Hinch + +# Implements a 2-button remote control on a Pyboard with auto repeat. +from sys import platform +ESP32 = platform == 'esp32' +RP2 = platform == 'rp2' +PYBOARD = platform == 'pyboard' +if ESP32 or RP2: + from machine import Pin +else: + from pyb import Pin, LED +import uasyncio as asyncio +from primitives.switch import Switch +from primitives.delay_ms import Delay_ms +# Import all implemented classes +from ir_tx.nec import NEC +from ir_tx.sony import SONY_12, SONY_15, SONY_20 +from ir_tx.philips import RC5, RC6_M0 + +loop = asyncio.get_event_loop() + +# If button is held down normal behaviour is to retransmit +# but most NEC models send a REPEAT code +class Rbutton: + toggle = 1 # toggle is ignored in NEC mode + def __init__(self, irb, pin, addr, data, proto): + self.irb = irb + self.sw = Switch(pin) + self.addr = addr + self.data = data + self.proto = proto + + self.sw.close_func(self.cfunc) + self.sw.open_func(self.ofunc) + self.tim = Delay_ms(self.repeat) + + def cfunc(self): # Button push: send data + tog = 0 if self.proto < 3 else Rbutton.toggle # NEC, sony 12, 15: toggle==0 + self.irb.transmit(self.addr, self.data, tog, True) # Test validation + #print(self.data) + # Auto repeat. The Sony protocol specifies 45ms but this is tight. + # In 20 bit mode a data burst can be upto 39ms long. + self.tim.trigger(108) + + def ofunc(self): # Button release: cancel repeat timer + self.tim.stop() + Rbutton.toggle ^= 1 # Toggle control + + async def repeat(self): + await asyncio.sleep(0) # Let timer stop before retriggering + if not self.sw(): # Button is still pressed: retrigger + self.tim.trigger(108) + if self.proto == 0: + self.irb.repeat() # NEC special case: send REPEAT code + #print(self.data) + else: + tog = 0 if self.proto < 3 else Rbutton.toggle # NEC, sony 12, 15: toggle==0 + self.irb.transmit(self.addr, self.data, tog, True) # Test validation + #print(self.data) + +async def main(proto): + # Test uses a 38KHz carrier. + if ESP32: # Pins for IR LED gate + pin = Pin(23, Pin.OUT, value = 0) + elif RP2: + pin = Pin(17, Pin.OUT, value = 0) + else: + pin = Pin('X1') + classes = (NEC, SONY_12, SONY_15, SONY_20, RC5, RC6_M0) + irb = classes[proto](pin, 38000) # My decoder chip is 38KHz + # Uncomment the following to print transmit timing + # irb.timeit = True + + b = [] # Rbutton instances + px3 = Pin('X3', Pin.IN, Pin.PULL_UP) if PYBOARD else Pin(18, Pin.IN, Pin.PULL_UP) + px4 = Pin('X4', Pin.IN, Pin.PULL_UP) if PYBOARD else Pin(19, Pin.IN, Pin.PULL_UP) + b.append(Rbutton(irb, px3, 0xef00, 0x03, proto)) + b.append(Rbutton(irb, px4, 0xef00, 0x02, proto)) + if ESP32: + while True: + print('Running') + await asyncio.sleep(5) + elif RP2: + led = Pin(25, Pin.OUT) + while True: + await asyncio.sleep_ms(500) # Obligatory flashing LED. + led(not led()) + else: + led = LED(1) + while True: + await asyncio.sleep_ms(500) # Obligatory flashing LED. + led.toggle() + +# Greeting strings. Common: +s = '''Test for IR transmitter. Run: +from ir_tx.test import test +test() for NEC protocol +test(1) for Sony SIRC 12 bit +test(2) for Sony SIRC 15 bit +test(3) for Sony SIRC 20 bit +test(4) for Philips RC-5 protocol +test(5) for Philips RC-6 mode 0. +''' + +# Pyboard: +spb = ''' +IR LED on pin X1 +Ground pin X3 to send addr 1 data 7 +Ground pin X4 to send addr 0x10 data 0x0b.''' + +# ESP32 +sesp = ''' +IR LED gate on pin 23 +Ground pin 18 to send addr 1 data 7 +Ground pin 19 to send addr 0x10 data 0x0b.''' + +# RP2 +srp2 = ''' +IR LED gate on pin 17 +Ground pin 18 to send addr 1 data 7 +Ground pin 19 to send addr 0x10 data 0x0b.''' + +if ESP32: + print(''.join((s, sesp))) +elif RP2: + print(''.join((s, srp2))) +else: + print(''.join((s, spb))) + +def test(proto=0): + loop.run_until_complete(main(proto)) diff --git a/main.py b/main.py new file mode 100644 index 0000000..9e5ee6c --- /dev/null +++ b/main.py @@ -0,0 +1,2 @@ +from ir_tx.test import test +test() \ No newline at end of file diff --git a/primitives/__init__.py b/primitives/__init__.py new file mode 100644 index 0000000..1dab8ba --- /dev/null +++ b/primitives/__init__.py @@ -0,0 +1,62 @@ +# __init__.py Common functions for uasyncio primitives + +# Copyright (c) 2018-2022 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +try: + import uasyncio as asyncio +except ImportError: + import asyncio + + +async def _g(): + pass +type_coro = type(_g()) + +# If a callback is passed, run it and return. +# If a coro is passed initiate it and return. +# coros are passed by name i.e. not using function call syntax. +def launch(func, tup_args): + res = func(*tup_args) + if isinstance(res, type_coro): + res = asyncio.create_task(res) + return res + +def set_global_exception(): + def _handle_exception(loop, context): + import sys + sys.print_exception(context["exception"]) + sys.exit() + loop = asyncio.get_event_loop() + loop.set_exception_handler(_handle_exception) + +_attrs = { + "AADC": "aadc", + "Barrier": "barrier", + "Condition": "condition", + "Delay_ms": "delay_ms", + "Encode": "encoder_async", + "Pushbutton": "pushbutton", + "ESP32Touch": "pushbutton", + "Queue": "queue", + "Semaphore": "semaphore", + "BoundedSemaphore": "semaphore", + "Switch": "switch", + "WaitAll": "events", + "WaitAny": "events", + "ESwitch": "events", + "EButton": "events", + "RingbufQueue": "ringbuf_queue", +} + +# Copied from uasyncio.__init__.py +# Lazy loader, effectively does: +# global attr +# from .mod import attr +def __getattr__(attr): + mod = _attrs.get(attr, None) + if mod is None: + raise AttributeError(attr) + value = getattr(__import__(mod, None, None, True, 1), attr) + globals()[attr] = value + return value diff --git a/primitives/aadc.py b/primitives/aadc.py new file mode 100644 index 0000000..5208520 --- /dev/null +++ b/primitives/aadc.py @@ -0,0 +1,67 @@ +# aadc.py AADC (asynchronous ADC) class + +# Copyright (c) 2020 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +import uasyncio as asyncio +import io + +MP_STREAM_POLL_RD = const(1) +MP_STREAM_POLL = const(3) +MP_STREAM_ERROR = const(-1) + +class AADC(io.IOBase): + def __init__(self, adc): + self._adc = adc + self._lower = 0 + self._upper = 65535 + self._pol = True + self._last = None + self._sreader = asyncio.StreamReader(self) + + def __iter__(self): + b = yield from self._sreader.read(2) + return int.from_bytes(b, 'little') + + def _adcread(self): + self._last = self._adc.read_u16() + return self._last + + def read(self, n): # For use by StreamReader only + return int.to_bytes(self._last, 2, 'little') + + def ioctl(self, req, arg): + ret = MP_STREAM_ERROR + if req == MP_STREAM_POLL: + ret = 0 + if arg & MP_STREAM_POLL_RD: + if self._pol ^ (self._lower <= self._adcread() <= self._upper): + ret |= MP_STREAM_POLL_RD + return ret + + # *** API *** + + # If normal will pause until ADC value is in range + # Otherwise will pause until value is out of range + def sense(self, normal): + self._pol = normal + + def read_u16(self, last=False): + if last: + return self._last + return self._adcread() + + # Call syntax: set limits for trigger + # lower is None: leave limits unchanged. + # upper is None: treat lower as relative to current value. + # both have values: treat as absolute limits. + def __call__(self, lower=None, upper=None): + if lower is not None: + if upper is None: # Relative limit + r = self._adcread() if self._last is None else self._last + self._lower = r - lower + self._upper = r + lower + else: # Absolute limits + self._lower = lower + self._upper = upper + return self diff --git a/primitives/barrier.py b/primitives/barrier.py new file mode 100644 index 0000000..445f4ed --- /dev/null +++ b/primitives/barrier.py @@ -0,0 +1,53 @@ +# barrier.py +# Copyright (c) 2018-2020 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +# Now uses Event rather than polling. + +try: + import uasyncio as asyncio +except ImportError: + import asyncio + +from . import launch + +# A Barrier synchronises N coros. Each issues await barrier. +# Execution pauses until all other participant coros are waiting on it. +# At that point the callback is executed. Then the barrier is 'opened' and +# execution of all participants resumes. + +class Barrier(): + def __init__(self, participants, func=None, args=()): + self._participants = participants + self._count = participants + self._func = func + self._args = args + self._res = None + self._evt = asyncio.Event() + + def __await__(self): + if self.trigger(): + return # Other tasks have already reached barrier + await self._evt.wait() # Wait until last task reaches it + + __iter__ = __await__ + + def result(self): + return self._res + + def trigger(self): + self._count -=1 + if self._count < 0: + raise ValueError('Too many tasks accessing Barrier') + if self._count > 0: + return False # At least 1 other task has not reached barrier + # All other tasks are waiting + if self._func is not None: + self._res = launch(self._func, self._args) + self._count = self._participants + self._evt.set() # Release others + self._evt.clear() + return True + + def busy(self): + return self._count < self._participants diff --git a/primitives/condition.py b/primitives/condition.py new file mode 100644 index 0000000..20df097 --- /dev/null +++ b/primitives/condition.py @@ -0,0 +1,68 @@ +# condition.py + +# Copyright (c) 2018-2020 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +try: + import uasyncio as asyncio +except ImportError: + import asyncio + +# Condition class +# from primitives.condition import Condition + +class Condition(): + def __init__(self, lock=None): + self.lock = asyncio.Lock() if lock is None else lock + self.events = [] + + async def acquire(self): + await self.lock.acquire() + +# enable this syntax: +# with await condition [as cond]: + def __await__(self): + await self.lock.acquire() + return self + + __iter__ = __await__ + + def __enter__(self): + return self + + def __exit__(self, *_): + self.lock.release() + + def locked(self): + return self.lock.locked() + + def release(self): + self.lock.release() # Will raise RuntimeError if not locked + + def notify(self, n=1): # Caller controls lock + if not self.lock.locked(): + raise RuntimeError('Condition notify with lock not acquired.') + for _ in range(min(n, len(self.events))): + ev = self.events.pop() + ev.set() + + def notify_all(self): + self.notify(len(self.events)) + + async def wait(self): + if not self.lock.locked(): + raise RuntimeError('Condition wait with lock not acquired.') + ev = asyncio.Event() + self.events.append(ev) + self.lock.release() + await ev.wait() + await self.lock.acquire() + assert ev not in self.events, 'condition wait assertion fail' + return True # CPython compatibility + + async def wait_for(self, predicate): + result = predicate() + while not result: + await self.wait() + result = predicate() + return result diff --git a/primitives/delay_ms.py b/primitives/delay_ms.py new file mode 100644 index 0000000..bfed02d --- /dev/null +++ b/primitives/delay_ms.py @@ -0,0 +1,80 @@ +# delay_ms.py Now uses ThreadSafeFlag and has extra .wait() API +# Usage: +# from primitives import Delay_ms + +# Copyright (c) 2018-2022 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +import uasyncio as asyncio +from utime import ticks_add, ticks_diff, ticks_ms +from . import launch + +class Delay_ms: + + class DummyTimer: # Stand-in for the timer class. Can be cancelled. + def cancel(self): + pass + _fake = DummyTimer() + + def __init__(self, func=None, args=(), duration=1000): + self._func = func + self._args = args + self._durn = duration # Default duration + self._retn = None # Return value of launched callable + self._tend = None # Stop time (absolute ms). + self._busy = False + self._trig = asyncio.ThreadSafeFlag() + self._tout = asyncio.Event() # Timeout event + self.wait = self._tout.wait # Allow: await wait_ms.wait() + self.clear = self._tout.clear + self.set = self._tout.set + self._ttask = self._fake # Timer task + self._mtask = asyncio.create_task(self._run()) #Main task + + async def _run(self): + while True: + await self._trig.wait() # Await a trigger + self._ttask.cancel() # Cancel and replace + await asyncio.sleep_ms(0) + dt = max(ticks_diff(self._tend, ticks_ms()), 0) # Beware already elapsed. + self._ttask = asyncio.create_task(self._timer(dt)) + + async def _timer(self, dt): + await asyncio.sleep_ms(dt) + self._tout.set() # Only gets here if not cancelled. + self._busy = False + if self._func is not None: + self._retn = launch(self._func, self._args) + +# API + # trigger may be called from hard ISR. + def trigger(self, duration=0): # Update absolute end time, 0-> ctor default + if self._mtask is None: + raise RuntimeError("Delay_ms.deinit() has run.") + self._tend = ticks_add(ticks_ms(), duration if duration > 0 else self._durn) + self._retn = None # Default in case cancelled. + self._busy = True + self._trig.set() + + def stop(self): + self._ttask.cancel() + self._ttask = self._fake + self._busy = False + self._tout.clear() + + def __call__(self): # Current running status + return self._busy + + running = __call__ + + def rvalue(self): + return self._retn + + def callback(self, func=None, args=()): + self._func = func + self._args = args + + def deinit(self): + self.stop() + self._mtask.cancel() + self._mtask = None diff --git a/primitives/encoder.py b/primitives/encoder.py new file mode 100644 index 0000000..759422b --- /dev/null +++ b/primitives/encoder.py @@ -0,0 +1,81 @@ +# encoder.py Asynchronous driver for incremental quadrature encoder. + +# Copyright (c) 2021-2022 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +# Thanks are due to @ilium007 for identifying the issue of tracking detents, +# https://github.com/peterhinch/micropython-async/issues/82. +# Also to Mike Teachman (@miketeachman) for design discussions and testing +# against a state table design +# https://github.com/miketeachman/micropython-rotary/blob/master/rotary.py + +import uasyncio as asyncio +from machine import Pin + +class Encoder: + + def __init__(self, pin_x, pin_y, v=0, div=1, vmin=None, vmax=None, + mod=None, callback=lambda a, b : None, args=(), delay=100): + self._pin_x = pin_x + self._pin_y = pin_y + self._x = pin_x() + self._y = pin_y() + self._v = v * div # Initialise hardware value + self._cv = v # Current (divided) value + self.delay = delay # Pause (ms) for motion to stop/limit callback frequency + + if ((vmin is not None) and v < vmin) or ((vmax is not None) and v > vmax): + raise ValueError('Incompatible args: must have vmin <= v <= vmax') + self._tsf = asyncio.ThreadSafeFlag() + trig = Pin.IRQ_RISING | Pin.IRQ_FALLING + try: + xirq = pin_x.irq(trigger=trig, handler=self._x_cb, hard=True) + yirq = pin_y.irq(trigger=trig, handler=self._y_cb, hard=True) + except TypeError: # hard arg is unsupported on some hosts + xirq = pin_x.irq(trigger=trig, handler=self._x_cb) + yirq = pin_y.irq(trigger=trig, handler=self._y_cb) + asyncio.create_task(self._run(vmin, vmax, div, mod, callback, args)) + + # Hardware IRQ's. Duration 36μs on Pyboard 1 ~50μs on ESP32. + # IRQ latency: 2nd edge may have occured by the time ISR runs, in + # which case there is no movement. + def _x_cb(self, pin_x): + if (x := pin_x()) != self._x: + self._x = x + self._v += 1 if x ^ self._pin_y() else -1 + self._tsf.set() + + def _y_cb(self, pin_y): + if (y := pin_y()) != self._y: + self._y = y + self._v -= 1 if y ^ self._pin_x() else -1 + self._tsf.set() + + async def _run(self, vmin, vmax, div, mod, cb, args): + pv = self._v # Prior hardware value + pcv = self._cv # Prior divided value passed to callback + lcv = pcv # Current value after limits applied + plcv = pcv # Previous value after limits applied + delay = self.delay + while True: + await self._tsf.wait() + await asyncio.sleep_ms(delay) # Wait for motion to stop. + hv = self._v # Sample hardware (atomic read). + if hv == pv: # A change happened but was negated before + continue # this got scheduled. Nothing to do. + pv = hv + cv = round(hv / div) # cv is divided value. + if not (dv := cv - pcv): # dv is change in divided value. + continue # No change + lcv += dv # lcv: divided value with limits/mod applied + lcv = lcv if vmax is None else min(vmax, lcv) + lcv = lcv if vmin is None else max(vmin, lcv) + lcv = lcv if mod is None else lcv % mod + self._cv = lcv # update ._cv for .value() before CB. + if lcv != plcv: + cb(lcv, lcv - plcv, *args) # Run user CB in uasyncio context + pcv = cv + plcv = lcv + + def value(self): + return self._cv diff --git a/primitives/events.py b/primitives/events.py new file mode 100644 index 0000000..4e5dd7e --- /dev/null +++ b/primitives/events.py @@ -0,0 +1,166 @@ +# events.py Event based primitives + +# Copyright (c) 2022 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +import uasyncio as asyncio +from . import Delay_ms + +# An Event-like class that can wait on an iterable of Event-like instances. +# .wait pauses until any passed event is set. +class WaitAny: + def __init__(self, events): + self.events = events + self.trig_event = None + self.evt = asyncio.Event() + + async def wait(self): + tasks = [asyncio.create_task(self.wt(event)) for event in self.events] + try: + await self.evt.wait() + finally: + self.evt.clear() + for task in tasks: + task.cancel() + return self.trig_event + + async def wt(self, event): + await event.wait() + self.evt.set() + self.trig_event = event + + def event(self): + return self.trig_event + + def clear(self): + for evt in (x for x in self.events if hasattr(x, 'clear')): + evt.clear() + +# An Event-like class that can wait on an iterable of Event-like instances, +# .wait pauses until all passed events have been set. +class WaitAll: + def __init__(self, events): + self.events = events + + async def wait(self): + async def wt(event): + await event.wait() + tasks = (asyncio.create_task(wt(event)) for event in self.events) + try: + await asyncio.gather(*tasks) + finally: # May be subject to timeout or cancellation + for task in tasks: + task.cancel() + + def clear(self): + for evt in (x for x in self.events if hasattr(x, 'clear')): + evt.clear() + +# Minimal switch class having an Event based interface +class ESwitch: + debounce_ms = 50 + + def __init__(self, pin, lopen=1): # Default is n/o switch returned to gnd + self._pin = pin # Should be initialised for input with pullup + self._lopen = lopen # Logic level in "open" state + self.open = asyncio.Event() + self.close = asyncio.Event() + self._state = self._pin() ^ self._lopen # Get initial state + asyncio.create_task(self._poll(ESwitch.debounce_ms)) + + async def _poll(self, dt): # Poll the button + while True: + if (s := self._pin() ^ self._lopen) != self._state: # 15μs + self._state = s + self._cf() if s else self._of() + await asyncio.sleep_ms(dt) # Wait out bounce + + def _of(self): + self.open.set() + + def _cf(self): + self.close.set() + + # ***** API ***** + # Return current state of switch (0 = pressed) + def __call__(self): + return self._state + + def deinit(self): + self._poll.cancel() + self.open.clear() + self.close.clear() + +# Minimal pushbutton class having an Event based interface +class EButton: + debounce_ms = 50 # Attributes can be varied by user + long_press_ms = 1000 + double_click_ms = 400 + + def __init__(self, pin, suppress=False, sense=None): + self._pin = pin # Initialise for input + self._supp = suppress + self._sense = pin() if sense is None else sense + self._state = self.rawstate() # Initial logical state + self._ltim = Delay_ms(duration = EButton.long_press_ms) + self._dtim = Delay_ms(duration = EButton.double_click_ms) + self.press = asyncio.Event() # *** API *** + self.double = asyncio.Event() + self.long = asyncio.Event() + self.release = asyncio.Event() # *** END API *** + self._tasks = [asyncio.create_task(self._poll(EButton.debounce_ms))] # Tasks run forever. Poll contacts + self._tasks.append(asyncio.create_task(self._ltf())) # Handle long press + if suppress: + self._tasks.append(asyncio.create_task(self._dtf())) # Double timer + + async def _poll(self, dt): # Poll the button + while True: + if (s := self.rawstate()) != self._state: + self._state = s + self._pf() if s else self._rf() + await asyncio.sleep_ms(dt) # Wait out bounce + + def _pf(self): # Button press + if not self._supp: + self.press.set() # User event + if not self._ltim(): # Don't retrigger long timer if already running + self._ltim.trigger() + if self._dtim(): # Press occurred while _dtim is running + self.double.set() # User event + self._dtim.stop() # _dtim's Event is only used if suppress + else: + self._dtim.trigger() + + def _rf(self): # Button release + self._ltim.stop() + if not self._supp or not self._dtim(): # If dtim running postpone release otherwise it + self.release.set() # is set before press + + async def _ltf(self): # Long timeout + while True: + await self._ltim.wait() + self._ltim.clear() # Clear the event + self.long.set() # User event + + async def _dtf(self): # Double timeout (runs if suppress is set) + while True: + await self._dtim.wait() + self._dtim.clear() # Clear the event + if not self._ltim(): # Button was released + self.press.set() # User events + self.release.set() + + # ****** API ****** + # Current non-debounced logical button state: True == pressed + def rawstate(self): + return bool(self._pin() ^ self._sense) + + # Current debounced state of button (True == pressed) + def __call__(self): + return self._state + + def deinit(self): + for task in self._tasks: + task.cancel() + for evt in (self.press, self.double, self.long, self.release): + evt.clear() diff --git a/primitives/pushbutton.py b/primitives/pushbutton.py new file mode 100644 index 0000000..dff541f --- /dev/null +++ b/primitives/pushbutton.py @@ -0,0 +1,157 @@ +# pushbutton.py + +# Copyright (c) 2018-2022 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +import uasyncio as asyncio +import utime as time +from . import launch, Delay_ms + +try: + from machine import TouchPad +except ImportError: + pass + + +class Pushbutton: + debounce_ms = 50 + long_press_ms = 1000 + double_click_ms = 400 + + def __init__(self, pin, suppress=False, sense=None): + self._pin = pin # Initialise for input + self._supp = suppress + self._dblpend = False # Doubleclick waiting for 2nd click + self._dblran = False # Doubleclick executed user function + self._tf = False + self._ff = False + self._df = False + self._ld = False # Delay_ms instance for long press + self._dd = False # Ditto for doubleclick + # Convert from electrical to logical value + self._sense = pin.value() if sense is None else sense + self._state = self.rawstate() # Initial state + self._run = asyncio.create_task(self._go()) # Thread runs forever + + async def _go(self): + while True: + self._check(self.rawstate()) + # Ignore state changes until switch has settled. Also avoid hogging CPU. + # See https://github.com/peterhinch/micropython-async/issues/69 + await asyncio.sleep_ms(Pushbutton.debounce_ms) + + def _check(self, state): + if state == self._state: + return + # State has changed: act on it now. + self._state = state + if state: # Button pressed: launch pressed func + if self._tf: + launch(self._tf, self._ta) + if self._ld: # There's a long func: start long press delay + self._ld.trigger(Pushbutton.long_press_ms) + if self._df: + if self._dd(): # Second click: timer running + self._dd.stop() + self._dblpend = False + self._dblran = True # Prevent suppressed launch on release + launch(self._df, self._da) + else: + # First click: start doubleclick timer + self._dd.trigger(Pushbutton.double_click_ms) + self._dblpend = True # Prevent suppressed launch on release + else: # Button release. Is there a release func? + if self._ff: + if self._supp: + d = self._ld + # If long delay exists, is running and doubleclick status is OK + if not self._dblpend and not self._dblran: + if (d and d()) or not d: + launch(self._ff, self._fa) + else: + launch(self._ff, self._fa) + if self._ld: + self._ld.stop() # Avoid interpreting a second click as a long push + self._dblran = False + + def _ddto(self): # Doubleclick timeout: no doubleclick occurred + self._dblpend = False + if self._supp and not self._state: + if not self._ld or (self._ld and not self._ld()): + launch(self._ff, self._fa) + + # ****** API ****** + def press_func(self, func=False, args=()): + if func is None: + self.press = asyncio.Event() + self._tf = self.press.set if func is None else func + self._ta = args + + def release_func(self, func=False, args=()): + if func is None: + self.release = asyncio.Event() + self._ff = self.release.set if func is None else func + self._fa = args + + def double_func(self, func=False, args=()): + if func is None: + self.double = asyncio.Event() + func = self.double.set + self._df = func + self._da = args + if func: # If double timer already in place, leave it + if not self._dd: + self._dd = Delay_ms(self._ddto) + else: + self._dd = False # Clearing down double func + + def long_func(self, func=False, args=()): + if func is None: + self.long = asyncio.Event() + func = self.long.set + if func: + if self._ld: + self._ld.callback(func, args) + else: + self._ld = Delay_ms(func, args) + else: + self._ld = False + + # Current non-debounced logical button state: True == pressed + def rawstate(self): + return bool(self._pin() ^ self._sense) + + # Current debounced state of button (True == pressed) + def __call__(self): + return self._state + + def deinit(self): + self._run.cancel() + + +class ESP32Touch(Pushbutton): + thresh = (80 << 8) // 100 + + @classmethod + def threshold(cls, val): + if not (isinstance(val, int) and 0 < val < 100): + raise ValueError("Threshold must be in range 1-99") + cls.thresh = (val << 8) // 100 + + def __init__(self, pin, suppress=False): + self._thresh = 0 # Detection threshold + self._rawval = 0 + try: + self._pad = TouchPad(pin) + except ValueError: + raise ValueError(pin) # Let's have a bit of information :) + super().__init__(pin, suppress, False) + + # Current logical button state: True == touched + def rawstate(self): + rv = self._pad.read() # ~220μs + if rv > self._rawval: # Either initialisation or pad was touched + self._rawval = rv # when initialised and has now been released + self._thresh = (rv * ESP32Touch.thresh) >> 8 + return False # Untouched + return rv < self._thresh diff --git a/primitives/queue.py b/primitives/queue.py new file mode 100644 index 0000000..405c857 --- /dev/null +++ b/primitives/queue.py @@ -0,0 +1,73 @@ +# queue.py: adapted from uasyncio V2 + +# Copyright (c) 2018-2020 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +# Code is based on Paul Sokolovsky's work. +# This is a temporary solution until uasyncio V3 gets an efficient official version + +import uasyncio as asyncio + + +# Exception raised by get_nowait(). +class QueueEmpty(Exception): + pass + + +# Exception raised by put_nowait(). +class QueueFull(Exception): + pass + +class Queue: + + def __init__(self, maxsize=0): + self.maxsize = maxsize + self._queue = [] + self._evput = asyncio.Event() # Triggered by put, tested by get + self._evget = asyncio.Event() # Triggered by get, tested by put + + def _get(self): + self._evget.set() # Schedule all tasks waiting on get + self._evget.clear() + return self._queue.pop(0) + + async def get(self): # Usage: item = await queue.get() + while self.empty(): # May be multiple tasks waiting on get() + # Queue is empty, suspend task until a put occurs + # 1st of N tasks gets, the rest loop again + await self._evput.wait() + return self._get() + + def get_nowait(self): # Remove and return an item from the queue. + # Return an item if one is immediately available, else raise QueueEmpty. + if self.empty(): + raise QueueEmpty() + return self._get() + + def _put(self, val): + self._evput.set() # Schedule tasks waiting on put + self._evput.clear() + self._queue.append(val) + + async def put(self, val): # Usage: await queue.put(item) + while self.full(): + # Queue full + await self._evget.wait() + # Task(s) waiting to get from queue, schedule first Task + self._put(val) + + def put_nowait(self, val): # Put an item into the queue without blocking. + if self.full(): + raise QueueFull() + self._put(val) + + def qsize(self): # Number of items in the queue. + return len(self._queue) + + def empty(self): # Return True if the queue is empty, False otherwise. + return len(self._queue) == 0 + + def full(self): # Return True if there are maxsize items in the queue. + # Note: if the Queue was initialized with maxsize=0 (the default) or + # any negative number, then full() is never True. + return self.maxsize > 0 and self.qsize() >= self.maxsize diff --git a/primitives/ringbuf_queue.py b/primitives/ringbuf_queue.py new file mode 100644 index 0000000..be44c2a --- /dev/null +++ b/primitives/ringbuf_queue.py @@ -0,0 +1,67 @@ +# ringbuf_queue.py Provides RingbufQueue class + +# Copyright (c) 2022 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +# API differs from CPython +# Uses pre-allocated ring buffer: can use list or array +# Asynchronous iterator allowing consumer to use async for +# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded. + +import uasyncio as asyncio + + +class RingbufQueue: # MicroPython optimised + def __init__(self, buf): + self._q = buf + self._size = len(buf) + self._wi = 0 + self._ri = 0 + self._evput = asyncio.Event() # Triggered by put, tested by get + self._evget = asyncio.Event() # Triggered by get, tested by put + + def full(self): + return ((self._wi + 1) % self._size) == self._ri + + def empty(self): + return self._ri == self._wi + + def qsize(self): + return (self._wi - self._ri) % self._size + + def get_nowait(self): # Remove and return an item from the queue. + # Return an item if one is immediately available, else raise QueueEmpty. + if self.empty(): + raise IndexError + r = self._q[self._ri] + self._ri = (self._ri + 1) % self._size + self._evget.set() # Schedule all tasks waiting on ._evget + self._evget.clear() + return r + + def put_nowait(self, v): + self._q[self._wi] = v + self._evput.set() # Schedule any tasks waiting on get + self._evput.clear() + self._wi = (self._wi + 1) % self._size + if self._wi == self._ri: # Would indicate empty + self._ri = (self._ri + 1) % self._size # Discard a message + raise IndexError # Caller can ignore if overwrites are OK + + async def put(self, val): # Usage: await queue.put(item) + while self.full(): # Queue full + await self._evget.wait() # May be >1 task waiting on ._evget + # Task(s) waiting to get from queue, schedule first Task + self.put_nowait(val) + + def __aiter__(self): + return self + + async def __anext__(self): + while self.empty(): # Empty. May be more than one task waiting on ._evput + await self._evput.wait() + r = self._q[self._ri] + self._ri = (self._ri + 1) % self._size + self._evget.set() # Schedule all tasks waiting on ._evget + self._evget.clear() + return r diff --git a/primitives/semaphore.py b/primitives/semaphore.py new file mode 100644 index 0000000..19b82e4 --- /dev/null +++ b/primitives/semaphore.py @@ -0,0 +1,48 @@ +# semaphore.py + +# Copyright (c) 2018-2020 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +try: + import uasyncio as asyncio +except ImportError: + import asyncio + +# A Semaphore is typically used to limit the number of coros running a +# particular piece of code at once. The number is defined in the constructor. +class Semaphore(): + def __init__(self, value=1): + self._count = value + self._event = asyncio.Event() + + async def __aenter__(self): + await self.acquire() + return self + + async def __aexit__(self, *args): + self.release() + await asyncio.sleep(0) + + async def acquire(self): + self._event.clear() + while self._count == 0: # Multiple tasks may be waiting for + await self._event.wait() # a release + self._event.clear() + # When we yield, another task may succeed. In this case + await asyncio.sleep(0) # the loop repeats + self._count -= 1 + + def release(self): + self._event.set() + self._count += 1 + +class BoundedSemaphore(Semaphore): + def __init__(self, value=1): + super().__init__(value) + self._initial_value = value + + def release(self): + if self._count < self._initial_value: + super().release() + else: + raise ValueError('Semaphore released more than acquired') diff --git a/primitives/switch.py b/primitives/switch.py new file mode 100644 index 0000000..1da2435 --- /dev/null +++ b/primitives/switch.py @@ -0,0 +1,49 @@ +# switch.py + +# Copyright (c) 2018-2022 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +import uasyncio as asyncio +import utime as time +from . import launch + +class Switch: + debounce_ms = 50 + def __init__(self, pin): + self.pin = pin # Should be initialised for input with pullup + self._open_func = False + self._close_func = False + self.switchstate = self.pin.value() # Get initial state + self._run = asyncio.create_task(self.switchcheck()) # Thread runs forever + + def open_func(self, func, args=()): + if func is None: + self.open = asyncio.Event() + self._open_func = self.open.set if func is None else func + self._open_args = args + + def close_func(self, func, args=()): + if func is None: + self.close = asyncio.Event() + self._close_func = self.close.set if func is None else func + self._close_args = args + + # Return current state of switch (0 = pressed) + def __call__(self): + return self.switchstate + + async def switchcheck(self): + while True: + state = self.pin.value() + if state != self.switchstate: + # State has changed: act on it now. + self.switchstate = state + if state == 0 and self._close_func: + launch(self._close_func, self._close_args) + elif state == 1 and self._open_func: + launch(self._open_func, self._open_args) + # Ignore further state changes until switch has settled + await asyncio.sleep_ms(Switch.debounce_ms) + + def deinit(self): + self._run.cancel() diff --git a/primitives/tests/__init__.py b/primitives/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/primitives/tests/adctest.py b/primitives/tests/adctest.py new file mode 100644 index 0000000..8e3a194 --- /dev/null +++ b/primitives/tests/adctest.py @@ -0,0 +1,50 @@ +# adctest.py + +# Copyright (c) 2020 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +import uasyncio as asyncio +from machine import ADC +import pyb +from primitives import AADC + +async def signal(): # Could use write_timed but this prints values + dac = pyb.DAC(1, bits=12, buffering=True) + v = 0 + while True: + if not v & 0xf: + print('write', v << 4) # Make value u16 as per ADC read + dac.write(v) + v += 1 + v %= 4096 + await asyncio.sleep_ms(50) + +async def adctest(): + asyncio.create_task(signal()) + adc = AADC(ADC(pyb.Pin.board.X1)) + await asyncio.sleep(0) + adc.sense(normal=False) # Wait until ADC gets to 5000 + value = await adc(5000, 10000) + print('Received', value, adc.read_u16(True)) # Reduce to 12 bits + adc.sense(normal=True) # Now print all changes > 2000 + while True: + value = await adc(2000) # Trigger if value changes by 2000 + print('Received', value, adc.read_u16(True)) + +st = '''This test requires a Pyboard with pins X1 and X5 linked. +A sawtooth waveform is applied to the ADC. Initially the test waits +until the ADC value reaches 5000. It then reports whenever the value +changes by 2000. +Issue test() to start. +''' +print(st) + +def test(): + try: + asyncio.run(adctest()) + except KeyboardInterrupt: + print('Interrupted') + finally: + asyncio.new_event_loop() + print() + print(st) diff --git a/primitives/tests/asyntest.py b/primitives/tests/asyntest.py new file mode 100644 index 0000000..606e1fd --- /dev/null +++ b/primitives/tests/asyntest.py @@ -0,0 +1,664 @@ +# asyntest.py Test/demo of the 'micro' Event, Barrier and Semaphore classes +# Test/demo of official asyncio library and official Lock class + +# Copyright (c) 2017-2022 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +# CPython 3.8 compatibility +# (ignore RuntimeWarning: coroutine '_g' was never awaited) +# To run: +# from primitives.tests.asyntest import test + +try: + import uasyncio as asyncio +except ImportError: + import asyncio +import sys +unix = "linux" in sys.implementation._machine + +from primitives import Barrier, Semaphore, BoundedSemaphore, Condition, Queue, RingbufQueue +try: + from threadsafe import Message +except: + pass + +def print_tests(): + st = '''Available functions: +test(0) Print this list. +test(1) Test message acknowledge. +test(2) Test Message and Lock objects. +test(3) Test the Barrier class with callback. +test(4) Test the Barrier class with coroutine. +test(5) Test Semaphore +test(6) Test BoundedSemaphore. +test(7) Test the Condition class. +test(8) Test the Queue class. +test(9) Test the RingbufQueue class. +''' + print('\x1b[32m') + print(st) + print('\x1b[39m') + +print_tests() + +def printexp(exp, runtime=0): + print('Expected output:') + print('\x1b[32m') + print(exp) + print('\x1b[39m') + if runtime: + print('Running (runtime = {}s):'.format(runtime)) + else: + print('Running (runtime < 1s):') + +# ************ Test Message class ************ +# Demo use of acknowledge message + +async def message_wait(message, ack_message, n): + try: + await message + print(f'message_wait {n} got message: {message.value()}') + if ack_message is not None: + ack_message.set() + except asyncio.CancelledError: + print(f"message_wait {n} cancelled") + +async def run_ack(n): + message = Message() + ack1 = Message() + ack2 = Message() + for count in range(n): + t0 = asyncio.create_task(message_wait(message, ack1, 1)) + t1 = asyncio.create_task(message_wait(message, ack2, 2)) + message.set(count) + print('message was set') + await ack1 + ack1.clear() + print('Cleared ack1') + await ack2 + ack2.clear() + print('Cleared ack2') + message.clear() + print('Cleared message') + await asyncio.sleep(1) + t0.cancel() + t1.cancel() + +async def msg_send(msg, items): + for item in items: + await asyncio.sleep_ms(400) + msg.set(item) + +async def msg_recv(msg): # Receive using asynchronous iterator + async for data in msg: + print("Got", data) + msg.clear() + +async def ack_coro(): + print("Test multiple tasks waiting on a message.") + await run_ack(3) + print() + print("Test asynchronous iterator.") + msg = Message() + asyncio.create_task(msg_send(msg, (1, 2, 3))) + try: + await asyncio.wait_for(msg_recv(msg), 3) + except asyncio.TimeoutError: + pass + await asyncio.sleep(1) + print() + print("Test cancellation of first waiting task.") + t1 = asyncio.create_task(message_wait(msg, None, 1)) + t2 = asyncio.create_task(message_wait(msg, None, 2)) + await asyncio.sleep(1) + t1.cancel() + await asyncio.sleep(1) + print("Setting message") + msg.set("Test message") + await asyncio.sleep(1) # Tasks have ended or been cancelled + msg.clear() + print() + print("Test cancellation of second waiting task.") + t1 = asyncio.create_task(message_wait(msg, None, 1)) + t2 = asyncio.create_task(message_wait(msg, None, 2)) + await asyncio.sleep(1) + t2.cancel() + await asyncio.sleep(1) + print("Setting message") + msg.set("Test message") + await asyncio.sleep(1) + msg.clear() + + print("I've seen attack ships burn on the shoulder of Orion...") + print("Time to die...") + +def ack_test(): + if unix: + print("Message class is incompatible with Unix build.") + return + printexp('''Running (runtime = 12s): +Test multiple tasks waiting on a message. +message was set +message_wait 1 got message: 0 +message_wait 2 got message: 0 +Cleared ack1 +Cleared ack2 +Cleared message +message was set +message_wait 1 got message: 1 +message_wait 2 got message: 1 +Cleared ack1 +Cleared ack2 +Cleared message +message was set +message_wait 1 got message: 2 +message_wait 2 got message: 2 +Cleared ack1 +Cleared ack2 +Cleared message + +Test asynchronous iterator. +Got 1 +Got 2 +Got 3 + +Test cancellation of first waiting task. +message_wait 1 cancelled +Setting message +message_wait 2 got message: Test message + +Test cancellation of second waiting task. +message_wait 2 cancelled +Setting message +message_wait 1 got message: Test message +I've seen attack ships burn on the shoulder of Orion... +Time to die... +''', 12) + asyncio.run(ack_coro()) + +# ************ Test Lock and Message classes ************ + +async def run_lock(n, lock): + print('run_lock {} waiting for lock'.format(n)) + await lock.acquire() + print('run_lock {} acquired lock'.format(n)) + await asyncio.sleep(1) # Delay to demo other coros waiting for lock + lock.release() + print('run_lock {} released lock'.format(n)) + +async def messageset(message): + print('Waiting 5 secs before setting message') + await asyncio.sleep(5) + message.set() + print('message was set') + +async def messagewait(message): + print('waiting for message') + await message + print('got message') + message.clear() + +async def run_message_test(): + print('Test Lock class') + lock = asyncio.Lock() + asyncio.create_task(run_lock(1, lock)) + asyncio.create_task(run_lock(2, lock)) + asyncio.create_task(run_lock(3, lock)) + print('Test Message class') + message = Message() + asyncio.create_task(messageset(message)) + await messagewait(message) # run_message_test runs fast until this point + print('Message status {}'.format('Incorrect' if message.is_set() else 'OK')) + print('Tasks complete') + +def msg_test(): + if unix: + print("Message class is incompatible with Unix build.") + return + printexp('''Test Lock class +Test Message class +waiting for message +run_lock 1 waiting for lock +run_lock 1 acquired lock +run_lock 2 waiting for lock +run_lock 3 waiting for lock +Waiting 5 secs before setting message +run_lock 1 released lock +run_lock 2 acquired lock +run_lock 2 released lock +run_lock 3 acquired lock +run_lock 3 released lock +message was set +got message +Message status OK +Tasks complete +''', 5) + asyncio.run(run_message_test()) + +# ************ Barrier test ************ + +async def killer(duration): + await asyncio.sleep(duration) + +def callback(text): + print(text) + +async def report(barrier): + for i in range(5): + print('{} '.format(i), end='') + await barrier + +async def do_barrier_test(): + barrier = Barrier(3, callback, ('Synch',)) + for _ in range(2): + for _ in range(3): + asyncio.create_task(report(barrier)) + await asyncio.sleep(1) + print() + await asyncio.sleep(1) + +def barrier_test(): + printexp('''Running (runtime = 3s): +0 0 0 Synch +1 1 1 Synch +2 2 2 Synch +3 3 3 Synch +4 4 4 Synch + +1 1 1 Synch +2 2 2 Synch +3 3 3 Synch +4 4 4 Synch +''', 3) + asyncio.run(do_barrier_test()) + +# ************ Barrier test 1 ************ + +async def my_coro(text): + try: + await asyncio.sleep_ms(0) + while True: + await asyncio.sleep(1) + print(text) + except asyncio.CancelledError: + print('my_coro was cancelled.') + +async def report1(barrier, x): + await asyncio.sleep(x) + print('report instance', x, 'waiting') + await barrier + print('report instance', x, 'done') + +async def bart(): + barrier = Barrier(4, my_coro, ('my_coro running',)) + for x in range(3): + asyncio.create_task(report1(barrier, x)) + await asyncio.sleep(4) + assert barrier.busy() + await barrier + await asyncio.sleep(0) + assert not barrier.busy() + # Must yield before reading result(). Here we wait long enough for + await asyncio.sleep_ms(1500) # coro to print + barrier.result().cancel() + await asyncio.sleep(2) + +def barrier_test1(): + printexp('''Running (runtime = 5s): +report instance 0 waiting +report instance 1 waiting +report instance 2 waiting +report instance 2 done +report instance 1 done +report instance 0 done +my_coro running +my_coro was cancelled. + +Exact report instance done sequence may vary, but 3 instances should report +done before my_coro runs. +''', 5) + asyncio.run(bart()) + +# ************ Semaphore test ************ + +async def run_sema(n, sema, barrier): + print('run_sema {} trying to access semaphore'.format(n)) + async with sema: + print('run_sema {} acquired semaphore'.format(n)) + # Delay demonstrates other coros waiting for semaphore + await asyncio.sleep(1 + n/10) # n/10 ensures deterministic printout + print('run_sema {} has released semaphore'.format(n)) + barrier.trigger() + +async def run_sema_test(bounded): + num_coros = 5 + barrier = Barrier(num_coros + 1) + if bounded: + semaphore = BoundedSemaphore(3) + else: + semaphore = Semaphore(3) + for n in range(num_coros): + asyncio.create_task(run_sema(n, semaphore, barrier)) + await barrier # Quit when all coros complete + try: + semaphore.release() + except ValueError: + print('Bounded semaphore exception test OK') + +def semaphore_test(bounded=False): + if bounded: + exp = '''run_sema 0 trying to access semaphore +run_sema 0 acquired semaphore +run_sema 1 trying to access semaphore +run_sema 1 acquired semaphore +run_sema 2 trying to access semaphore +run_sema 2 acquired semaphore +run_sema 3 trying to access semaphore +run_sema 4 trying to access semaphore +run_sema 0 has released semaphore +run_sema 4 acquired semaphore +run_sema 1 has released semaphore +run_sema 3 acquired semaphore +run_sema 2 has released semaphore +run_sema 4 has released semaphore +run_sema 3 has released semaphore +Bounded semaphore exception test OK + +Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.''' + else: + exp = '''run_sema 0 trying to access semaphore +run_sema 0 acquired semaphore +run_sema 1 trying to access semaphore +run_sema 1 acquired semaphore +run_sema 2 trying to access semaphore +run_sema 2 acquired semaphore +run_sema 3 trying to access semaphore +run_sema 4 trying to access semaphore +run_sema 0 has released semaphore +run_sema 3 acquired semaphore +run_sema 1 has released semaphore +run_sema 4 acquired semaphore +run_sema 2 has released semaphore +run_sema 3 has released semaphore +run_sema 4 has released semaphore + +Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.''' + printexp(exp, 3) + asyncio.run(run_sema_test(bounded)) + +# ************ Condition test ************ + +cond = Condition() +tim = 0 + +async def cond01(): + while True: + await asyncio.sleep(2) + with await cond: + cond.notify(2) # Notify 2 tasks + +async def cond03(): # Maintain a count of seconds + global tim + await asyncio.sleep(0.5) + while True: + await asyncio.sleep(1) + tim += 1 + +async def cond02(n, barrier): + with await cond: + print('cond02', n, 'Awaiting notification.') + await cond.wait() + print('cond02', n, 'triggered. tim =', tim) + barrier.trigger() + +def predicate(): + return tim >= 8 # 12 + +async def cond04(n, barrier): + with await cond: + print('cond04', n, 'Awaiting notification and predicate.') + await cond.wait_for(predicate) + print('cond04', n, 'triggered. tim =', tim) + barrier.trigger() + +async def cond_go(): + ntasks = 7 + barrier = Barrier(ntasks + 1) + t1 = asyncio.create_task(cond01()) + t3 = asyncio.create_task(cond03()) + for n in range(ntasks): + asyncio.create_task(cond02(n, barrier)) + await barrier # All instances of cond02 have completed + # Test wait_for + barrier = Barrier(2) + asyncio.create_task(cond04(99, barrier)) + await barrier + # cancel continuously running coros. + t1.cancel() + t3.cancel() + await asyncio.sleep_ms(0) + print('Done.') + +def condition_test(): + printexp('''cond02 0 Awaiting notification. +cond02 1 Awaiting notification. +cond02 2 Awaiting notification. +cond02 3 Awaiting notification. +cond02 4 Awaiting notification. +cond02 5 Awaiting notification. +cond02 6 Awaiting notification. +cond02 5 triggered. tim = 1 +cond02 6 triggered. tim = 1 +cond02 3 triggered. tim = 3 +cond02 4 triggered. tim = 3 +cond02 1 triggered. tim = 5 +cond02 2 triggered. tim = 5 +cond02 0 triggered. tim = 7 +cond04 99 Awaiting notification and predicate. +cond04 99 triggered. tim = 9 +Done. +''', 13) + asyncio.run(cond_go()) + +# ************ Queue test ************ + +async def slow_process(): + await asyncio.sleep(2) + return 42 + +async def bar(q): + print('Waiting for slow process.') + result = await slow_process() + print('Putting result onto queue') + await q.put(result) # Put result on q + +async def foo(q): + print("Running foo()") + result = await q.get() + print('Result was {}'.format(result)) + +async def q_put(n, q): + for x in range(8): + obj = (n, x) + await q.put(obj) + await asyncio.sleep(0) + +async def q_get(n, q): + for x in range(8): + await q.get() + await asyncio.sleep(0) + +async def putter(q): + # put some item, then sleep + for _ in range(20): + await q.put(1) + await asyncio.sleep_ms(50) + + +async def getter(q): + # checks for new items, and relies on the "blocking" of the get method + for _ in range(20): + await q.get() + +async def queue_go(): + q = Queue(10) + asyncio.create_task(foo(q)) + asyncio.create_task(bar(q)) + await asyncio.sleep(3) + for n in range(4): + asyncio.create_task(q_put(n, q)) + await asyncio.sleep(1) + assert q.qsize() == 10 + await q.get() + await asyncio.sleep(0.1) + assert q.qsize() == 10 + while not q.empty(): + await q.get() + await asyncio.sleep(0.1) + assert q.empty() + print('Competing put tasks test complete') + + for n in range(4): + asyncio.create_task(q_get(n, q)) + await asyncio.sleep(1) + x = 0 + while not q.full(): + await q.put(x) + await asyncio.sleep(0.3) + x += 1 + assert q.qsize() == 10 + print('Competing get tasks test complete') + await asyncio.gather( + putter(q), + getter(q) + ) + print('Queue tests complete') + print("I've seen attack ships burn off the shoulder of Orion...") + print("Time to die...") + +def queue_test(): + printexp('''Running (runtime = 20s): +Running foo() +Waiting for slow process. +Putting result onto queue +Result was 42 +Competing put tasks test complete +Competing get tasks test complete +Queue tests complete + + +I've seen attack ships burn off the shoulder of Orion... +Time to die... + +''', 20) + asyncio.run(queue_go()) + +# ************ RingbufQueue test ************ + +async def qread(q, lst, twr): + async for item in q: + lst.append(item) + await asyncio.sleep_ms(twr) + +async def read(q, t, twr=0): + lst = [] + try: + await asyncio.wait_for(qread(q, lst, twr), t) + except asyncio.TimeoutError: + pass + return lst + +async def put_list(q, lst, twp=0): + for item in lst: + await q.put(item) + await asyncio.sleep_ms(twp) + +async def rbq_go(): + q = RingbufQueue([0 for _ in range(10)]) # 10 elements + pl = [n for n in range(15)] + print("Read waits on slow write.") + asyncio.create_task(put_list(q, pl, 100)) + rl = await read(q, 2) + assert pl == rl + print('done') + print("Write waits on slow read.") + asyncio.create_task(put_list(q, pl)) + rl = await read(q, 2, 100) + assert pl == rl + print('done') + print("Testing full, empty and qsize methods.") + assert q.empty() + assert q.qsize() == 0 + assert not q.full() + await put_list(q, (1,2,3)) + assert not q.empty() + assert q.qsize() == 3 + assert not q.full() + print("Done") + print("Testing put_nowait and overruns.") + nfail = 0 + for x in range(4, 15): + try: + q.put_nowait(x) + except IndexError: + nfail += 1 + assert nfail == 5 + assert q.full() + rl = await read(q, 2) + assert rl == [6, 7, 8, 9, 10, 11, 12, 13, 14] + print("Testing get_nowait.") + await q.put(1) + assert q.get_nowait() == 1 + err = 0 + try: + q.get_nowait() + except IndexError: + err = 1 + assert err == 1 + print("Tests complete.") + print("I've seen attack ships burn off the shoulder of Orion...") + print("Time to die...") + +def rbq_test(): + printexp('''Running (runtime = 6s): +Read waits on slow write. +done +Write waits on slow read. +done +Testing full, empty and qsize methods. +Done +Testing put_nowait and overruns. +Testing get_nowait. +Tests complete. +I've seen attack ships burn off the shoulder of Orion... +Time to die... + +''', 6) + asyncio.run(rbq_go()) + +# ************ ************ +def test(n): + try: + if n == 1: + ack_test() # Test message acknowledge. + elif n == 2: + msg_test() # Test Messge and Lock objects. + elif n == 3: + barrier_test() # Test the Barrier class. + elif n == 4: + barrier_test1() # Test the Barrier class. + elif n == 5: + semaphore_test(False) # Test Semaphore + elif n == 6: + semaphore_test(True) # Test BoundedSemaphore. + elif n == 7: + condition_test() # Test the Condition class. + elif n == 8: + queue_test() # Test the Queue class. + elif n == 9: + rbq_test() # Test the RingbufQueue class. + except KeyboardInterrupt: + print('Interrupted') + finally: + asyncio.new_event_loop() + print_tests() diff --git a/primitives/tests/delay_test.py b/primitives/tests/delay_test.py new file mode 100644 index 0000000..17dde72 --- /dev/null +++ b/primitives/tests/delay_test.py @@ -0,0 +1,223 @@ +# delay_test.py Tests for Delay_ms class + +# Copyright (c) 2020 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +import uasyncio as asyncio +import micropython +from primitives.delay_ms import Delay_ms + +micropython.alloc_emergency_exception_buf(100) + +def printexp(exp, runtime=0): + print('Expected output:') + print('\x1b[32m') + print(exp) + print('\x1b[39m') + if runtime: + print('Running (runtime = {}s):'.format(runtime)) + else: + print('Running (runtime < 1s):') + +async def ctor_test(): # Constructor arg + s = ''' +Trigger 5 sec delay +Retrigger 5 sec delay +Callback should run +cb callback +Done +''' + printexp(s, 12) + def cb(v): + print('cb', v) + + d = Delay_ms(cb, ('callback',), duration=5000) + + print('Trigger 5 sec delay') + d.trigger() + await asyncio.sleep(4) + print('Retrigger 5 sec delay') + d.trigger() + await asyncio.sleep(4) + print('Callback should run') + await asyncio.sleep(2) + print('Done') + +async def launch_test(): + s = ''' +Trigger 5 sec delay +Coroutine should run: run to completion. +Coroutine starts +Coroutine ends +Coroutine should run: test cancellation. +Coroutine starts +Coroutine should run: test awaiting. +Coroutine starts +Coroutine ends +Done +''' + printexp(s, 20) + async def cb(v, ms): + print(v, 'starts') + await asyncio.sleep_ms(ms) + print(v, 'ends') + + d = Delay_ms(cb, ('coroutine', 1000)) + + print('Trigger 5 sec delay') + d.trigger(5000) # Test extending time + await asyncio.sleep(4) + print('Coroutine should run: run to completion.') + await asyncio.sleep(3) + d = Delay_ms(cb, ('coroutine', 3000)) + d.trigger(5000) + await asyncio.sleep(4) + print('Coroutine should run: test cancellation.') + await asyncio.sleep(2) + coro = d.rvalue() + coro.cancel() + d.trigger(5000) + await asyncio.sleep(4) + print('Coroutine should run: test awaiting.') + await asyncio.sleep(2) + coro = d.rvalue() + await coro + print('Done') + + +async def reduce_test(): # Test reducing a running delay + s = ''' +Trigger 5 sec delay +Callback should run +cb callback +Callback should run +cb callback +Done +''' + printexp(s, 11) + def cb(v): + print('cb', v) + + d = Delay_ms(cb, ('callback',)) + + print('Trigger 5 sec delay') + d.trigger(5000) # Test extending time + await asyncio.sleep(4) + print('Callback should run') + await asyncio.sleep(2) + d.trigger(10000) + await asyncio.sleep(1) + d.trigger(3000) + await asyncio.sleep(2) + print('Callback should run') + await asyncio.sleep(2) + print('Done') + + +async def stop_test(): # Test the .stop and .running methods + s = ''' +Trigger 5 sec delay +Running +Callback should run +cb callback +Callback returned 42 +Callback should not run +Done + ''' + printexp(s, 12) + def cb(v): + print('cb', v) + return 42 + + d = Delay_ms(cb, ('callback',)) + + print('Trigger 5 sec delay') + d.trigger(5000) # Test extending time + await asyncio.sleep(4) + if d(): + print('Running') + print('Callback should run') + await asyncio.sleep(2) + print('Callback returned', d.rvalue()) + d.trigger(3000) + await asyncio.sleep(1) + d.stop() + await asyncio.sleep(1) + if d(): + print('Running') + print('Callback should not run') + await asyncio.sleep(4) + print('Done') + + +async def isr_test(): # Test trigger from hard ISR + from pyb import Timer + s = ''' +Timer holds off cb for 5 secs +cb should now run +cb callback +Done +''' + printexp(s, 6) + def cb(v): + print('cb', v) + + d = Delay_ms(cb, ('callback',)) + + def timer_cb(_): + d.trigger(200) + tim = Timer(1, freq=10, callback=timer_cb) + + print('Timer holds off cb for 5 secs') + await asyncio.sleep(5) + tim.deinit() + print('cb should now run') + await asyncio.sleep(1) + print('Done') + +async def err_test(): # Test triggering de-initialised timer + s = ''' +Running (runtime = 3s): +Trigger 1 sec delay +cb callback +Success: error was raised. +Done + ''' + printexp(s, 3) + def cb(v): + print('cb', v) + return 42 + + d = Delay_ms(cb, ('callback',)) + + print('Trigger 1 sec delay') + d.trigger(1000) + await asyncio.sleep(2) + d.deinit() + try: + d.trigger(1000) + except RuntimeError: + print("Success: error was raised.") + print('Done') + +av = ''' +Run a test by issuing +delay_test.test(n) +where n is a test number. Avaliable tests: +\x1b[32m +0 Test triggering from a hard ISR (Pyboard only) +1 Test the .stop method and callback return value. +2 Test reducing the duration of a running timer +3 Test delay defined by constructor arg +4 Test triggering a Task +5 Attempt to trigger de-initialised instance +\x1b[39m +''' +print(av) + +tests = (isr_test, stop_test, reduce_test, ctor_test, launch_test, err_test) +def test(n=0): + try: + asyncio.run(tests[n]()) + finally: + asyncio.new_event_loop() diff --git a/primitives/tests/encoder_stop.py b/primitives/tests/encoder_stop.py new file mode 100644 index 0000000..ed75e8d --- /dev/null +++ b/primitives/tests/encoder_stop.py @@ -0,0 +1,39 @@ +# encoder_stop.py Demo of callback which occurs after motion has stopped. + +from machine import Pin +import uasyncio as asyncio +from primitives.encoder import Encoder +from primitives.delay_ms import Delay_ms + +px = Pin('X1', Pin.IN, Pin.PULL_UP) +py = Pin('X2', Pin.IN, Pin.PULL_UP) + +tim = Delay_ms(duration=400) # High value for test +d = 0 + +def tcb(pos, delta): # User callback gets args of encoder cb + global d + d = 0 + print(pos, delta) + +def cb(pos, delta): # Encoder callback occurs rapidly + global d + tim.trigger() # Postpone the user callback + tim.callback(tcb, (pos, d := d + delta)) # and update its args + +async def main(): + while True: + await asyncio.sleep(1) + +def test(): + print('Running encoder test. Press ctrl-c to teminate.') + Encoder.delay = 0 # No need for this delay + enc = Encoder(px, py, callback=cb) + try: + asyncio.run(main()) + except KeyboardInterrupt: + print('Interrupted') + finally: + asyncio.new_event_loop() + +test() diff --git a/primitives/tests/encoder_test.py b/primitives/tests/encoder_test.py new file mode 100644 index 0000000..15b919d --- /dev/null +++ b/primitives/tests/encoder_test.py @@ -0,0 +1,31 @@ +# encoder_test.py Test for asynchronous driver for incremental quadrature encoder. + +# Copyright (c) 2021-2022 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +from machine import Pin +import uasyncio as asyncio +from primitives.encoder import Encoder + + +px = Pin(33, Pin.IN, Pin.PULL_UP) +py = Pin(25, Pin.IN, Pin.PULL_UP) + +def cb(pos, delta): + print(pos, delta) + +async def main(): + while True: + await asyncio.sleep(1) + +def test(): + print('Running encoder test. Press ctrl-c to teminate.') + enc = Encoder(px, py, v=0, vmin=0, vmax=100, callback=cb) + try: + asyncio.run(main()) + except KeyboardInterrupt: + print('Interrupted') + finally: + asyncio.new_event_loop() + +test() diff --git a/primitives/tests/event_test.py b/primitives/tests/event_test.py new file mode 100644 index 0000000..23989e3 --- /dev/null +++ b/primitives/tests/event_test.py @@ -0,0 +1,206 @@ +# event_test.py Test WaitAll, WaitAny, ESwwitch, EButton + +# Copyright (c) 2022 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file + +# from primitives.tests.event_test import * + +import uasyncio as asyncio +from primitives import Delay_ms, WaitAny, ESwitch, WaitAll, EButton +from pyb import Pin + +events = [asyncio.Event() for _ in range(4)] + +async def set_events(*ev): + for n in ev: + await asyncio.sleep(1) + print("Setting", n) + events[n].set() + +def clear(msg): + print(msg) + for e in events: + e.clear() + +async def can(obj, tim): + await asyncio.sleep(tim) + print("About to cancel") + obj.cancel() + +async def foo(tsk): + print("Waiting") + await tsk + +async def wait_test(): + msg = """ +\x1b[32m +Expected output: +Setting 0 +Tested WaitAny 0 +Setting 1 +Tested WaitAny 1 +Setting 2 +Setting 3 +Tested WaitAll 2, 3 +Setting 0 +Setting 3 +Tested WaitAny 0, 3 +Cancel in 3s +Setting 0 +Setting 1 +About to cancel +Cancelled. +Waiting for 4s +Timeout +done +\x1b[39m +""" + print(msg) + wa = WaitAny((events[0], events[1], WaitAll((events[2], events[3])))) + asyncio.create_task(set_events(0)) + await wa.wait() + clear("Tested WaitAny 0") + asyncio.create_task(set_events(1)) + await wa.wait() + clear("Tested WaitAny 1") + asyncio.create_task(set_events(2, 3)) + await wa.wait() + clear("Tested WaitAll 2, 3") + wa = WaitAll((WaitAny((events[0], events[1])), WaitAny((events[2], events[3])))) + asyncio.create_task(set_events(0, 3)) + await wa.wait() + clear("Tested WaitAny 0, 3") + task = asyncio.create_task(wa.wait()) + asyncio.create_task(set_events(0, 1)) # Does nothing + asyncio.create_task(can(task, 3)) + print("Cancel in 3s") + try: + await task + except asyncio.CancelledError: # TODO why must we trap this? + print("Cancelled.") + print("Waiting for 4s") + try: + await asyncio.wait_for(wa.wait(), 4) + except asyncio.TimeoutError: + print("Timeout") + print("done") + +val = 0 +fail = False +pout = None +polarity = 0 + +async def monitor(evt, v, verbose): + global val + while True: + await evt.wait() + evt.clear() + val += v + verbose and print("Got", hex(v), hex(val)) + +async def pulse(ms=100): + pout(1 ^ polarity) + await asyncio.sleep_ms(ms) + pout(polarity) + +def expect(v, e): + global fail + if v == e: + print("Pass") + else: + print(f"Fail: expected {e} got {v}") + fail = True + +async def btest(btn, verbose, supp): + global val, fail + val = 0 + events = btn.press, btn.release, btn.double, btn.long + tasks = [] + for n, evt in enumerate(events): + tasks.append(asyncio.create_task(monitor(evt, 1 << 3 * n, verbose))) + await asyncio.sleep(1) + print("Start short press test") + await pulse() + await asyncio.sleep(1) + verbose and print("Test of short press", hex(val)) + expect(val, 0x09) + val = 0 + await asyncio.sleep(1) + print("Start long press test") + await pulse(2000) + await asyncio.sleep(4) + verbose and print("Long press", hex(val)) + exp = 0x208 if supp else 0x209 + expect(val, exp) + val = 0 + await asyncio.sleep(1) + print("Start double press test") + await pulse() + await asyncio.sleep_ms(100) + await pulse() + await asyncio.sleep(4) + verbose and print("Double press", hex(val)) + exp = 0x48 if supp else 0x52 + expect(val, exp) + for task in tasks: + task.cancel() + +async def stest(sw, verbose): + global val, fail + val = 0 + events = sw.open, sw.close + tasks = [] + for n, evt in enumerate(events): + tasks.append(asyncio.create_task(monitor(evt, 1 << 3 * n, verbose))) + asyncio.create_task(pulse(2000)) + await asyncio.sleep(1) + expect(val, 0x08) + await asyncio.sleep(4) # Wait for any spurious events + verbose and print("Switch close and open", hex(val)) + expect(val, 0x09) + for task in tasks: + task.cancel() + +async def switch_test(pol, verbose): + global val, pout, polarity + polarity = pol + pin = Pin('Y1', Pin.IN) + pout = Pin('Y2', Pin.OUT, value=pol) + print("Testing EButton.") + print("suppress == False") + btn = EButton(pin) + await btest(btn, verbose, False) + print("suppress == True") + btn = EButton(pin, suppress=True) + await btest(btn, verbose, True) + print("Testing ESwitch") + sw = ESwitch(pin, pol) + await stest(sw, verbose) + print("Failures occurred.") if fail else print("All tests passed.") + +def tests(): + txt=""" + \x1b[32m + Available tests: + 1. test_switches(polarity=1, verbose=False) Test the ESwitch and Ebutton classe. + 2. test_wait() Test the WaitAny and WaitAll primitives. + + Switch tests assume a Pyboard with a link between Y1 and Y2. + \x1b[39m + """ + print(txt) + +tests() +def test_switches(polarity=1, verbose=False): + try: + asyncio.run(switch_test(polarity, verbose)) # polarity 1/0 is normal (off) electrical state. + finally: + asyncio.new_event_loop() + tests() + +def test_wait(): + try: + asyncio.run(wait_test()) + finally: + asyncio.new_event_loop() + tests() diff --git a/primitives/tests/switches.py b/primitives/tests/switches.py new file mode 100644 index 0000000..59fa779 --- /dev/null +++ b/primitives/tests/switches.py @@ -0,0 +1,259 @@ +# Test/demo programs for Switch and Pushbutton classes +# Tested on Pyboard but should run on other microcontroller platforms +# running MicroPython with uasyncio library. + +# Copyright (c) 2018-2022 Peter Hinch +# Released under the MIT License (MIT) - see LICENSE file +# Now executes .deinit() + +# To run: +# from primitives.tests.switches import * +# test_sw() # For example + +from machine import Pin +from pyb import LED +from primitives import Switch, Pushbutton +import uasyncio as asyncio + +helptext = ''' +Test using switch or pushbutton between X1 and gnd. +Ground pin X2 to terminate test. + +''' +tests = ''' +\x1b[32m +Available tests: +test_sw Switch test. +test_swcb Switch with callback. +test_sw_event Switch with event. +test_btn Pushutton launching coros. +test_btncb Pushbutton launching callbacks. +btn_dynamic Change coros launched at runtime. +btn_event Pushbutton event interface. +\x1b[39m +''' +print(tests) + +# Pulse an LED (coroutine) +async def pulse(led, ms): + led.on() + await asyncio.sleep_ms(ms) + led.off() + +# Pulse an LED when an event triggered +async def evt_pulse(event, led): + while True: + event.clear() + await event.wait() + led.on() + await asyncio.sleep_ms(500) + led.off() + +# Toggle an LED (callback) +def toggle(led): + led.toggle() + +# Quit test by connecting X2 to ground +async def killer(obj): + pin = Pin('X2', Pin.IN, Pin.PULL_UP) + while pin.value(): + await asyncio.sleep_ms(50) + obj.deinit() + await asyncio.sleep_ms(0) + +def run(obj): + try: + asyncio.run(killer(obj)) + except KeyboardInterrupt: + print('Interrupted') + finally: + asyncio.new_event_loop() + print(tests) + + +# Test for the Switch class passing coros +def test_sw(): + s = ''' +close pulses green +open pulses red +''' + print('Test of switch scheduling coroutines.') + print(helptext) + print(s) + pin = Pin('X1', Pin.IN, Pin.PULL_UP) + red = LED(1) + green = LED(2) + sw = Switch(pin) + # Register coros to launch on contact close and open + sw.close_func(pulse, (green, 1000)) + sw.open_func(pulse, (red, 1000)) + run(sw) + +# Test for the switch class with a callback +def test_swcb(): + s = ''' +close toggles red +open toggles green +''' + print('Test of switch executing callbacks.') + print(helptext) + print(s) + pin = Pin('X1', Pin.IN, Pin.PULL_UP) + red = LED(1) + green = LED(2) + sw = Switch(pin) + # Register a coro to launch on contact close + sw.close_func(toggle, (red,)) + sw.open_func(toggle, (green,)) + run(sw) + +# Test for the Switch class (events) +async def do_sw_event(): + pin = Pin('X1', Pin.IN, Pin.PULL_UP) + sw = Switch(pin) + sw.open_func(None) + sw.close_func(None) + tasks = [] + for event, led in ((sw.close, 1), (sw.open, 2)): + tasks.append(asyncio.create_task(evt_pulse(event, LED(led)))) + await killer(sw) + for task in tasks: + task.cancel() + +def test_sw_event(): + s = ''' +close pulse red +open pulses green +''' + print('Test of switch triggering events.') + print(helptext) + print(s) + try: + asyncio.run(do_sw_event()) + except KeyboardInterrupt: + print('Interrupted') + finally: + asyncio.new_event_loop() + print(tests) + +# Test for the Pushbutton class (coroutines) +# Pass True to test suppress +def test_btn(suppress=False, lf=True, df=True): + s = ''' +press pulses red +release pulses green +double click pulses yellow +long press pulses blue +''' + print('Test of pushbutton scheduling coroutines.') + print(helptext) + print(s) + pin = Pin('X1', Pin.IN, Pin.PULL_UP) + red = LED(1) + green = LED(2) + yellow = LED(3) + blue = LED(4) + pb = Pushbutton(pin, suppress) + pb.press_func(pulse, (red, 1000)) + pb.release_func(pulse, (green, 1000)) + if df: + print('Doubleclick enabled') + pb.double_func(pulse, (yellow, 1000)) + if lf: + print('Long press enabled') + pb.long_func(pulse, (blue, 1000)) + run(pb) + +# Test for the Pushbutton class (callbacks) +def test_btncb(): + s = ''' +press toggles red +release toggles green +double click toggles yellow +long press toggles blue +''' + print('Test of pushbutton executing callbacks.') + print(helptext) + print(s) + pin = Pin('X1', Pin.IN, Pin.PULL_UP) + red = LED(1) + green = LED(2) + yellow = LED(3) + blue = LED(4) + pb = Pushbutton(pin) + pb.press_func(toggle, (red,)) + pb.release_func(toggle, (green,)) + pb.double_func(toggle, (yellow,)) + pb.long_func(toggle, (blue,)) + run(pb) + +# Test for the Pushbutton class where callback coros change dynamically +def setup(pb, press, release, dbl, lng, t=1000): + s = ''' +Functions are changed: +LED's pulse for 2 seconds +press pulses blue +release pulses red +double click pulses green +long pulses yellow +''' + pb.press_func(pulse, (press, t)) + pb.release_func(pulse, (release, t)) + pb.double_func(pulse, (dbl, t)) + if lng is not None: + pb.long_func(pulse, (lng, t)) + print(s) + +def btn_dynamic(): + s = ''' +press pulses red +release pulses green +double click pulses yellow +long press changes button functions. +''' + print('Test of pushbutton scheduling coroutines.') + print(helptext) + print(s) + pin = Pin('X1', Pin.IN, Pin.PULL_UP) + red = LED(1) + green = LED(2) + yellow = LED(3) + blue = LED(4) + pb = Pushbutton(pin) + setup(pb, red, green, yellow, None) + pb.long_func(setup, (pb, blue, red, green, yellow, 2000)) + run(pb) + +# Test for the Pushbutton class (events) +async def do_btn_event(): + pin = Pin('X1', Pin.IN, Pin.PULL_UP) + pb = Pushbutton(pin) + pb.press_func(None) + pb.release_func(None) + pb.double_func(None) + pb.long_func(None) + tasks = [] + for event, led in ((pb.press, 1), (pb.release, 2), (pb.double, 3), (pb.long, 4)): + tasks.append(asyncio.create_task(evt_pulse(event, LED(led)))) + await killer(pb) + for task in tasks: + task.cancel() + +def btn_event(): + s = ''' +press pulse red +release pulses green +double click pulses yellow +long press pulses blue +''' + print('Test of pushbutton triggering events.') + print(helptext) + print(s) + try: + asyncio.run(do_btn_event()) + except KeyboardInterrupt: + print('Interrupted') + finally: + asyncio.new_event_loop() + print(tests) +