--- /dev/null
+# ir_rx __init__.py Decoder for IR remote control using synchronous code
+# IR_RX abstract base class for IR receivers.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020-2021 Released under the MIT license
+
+from machine import Timer, Pin
+from array import array
+from utime import ticks_us
+
+# Save RAM
+# from micropython import alloc_emergency_exception_buf
+# alloc_emergency_exception_buf(100)
+
+
+# On 1st edge start a block timer. While the timer is running, record the time
+# of each edge. When the timer times out decode the data. Duration must exceed
+# the worst case block transmission time, but be less than the interval between
+# a block start and a repeat code start (~108ms depending on protocol)
+
+class IR_RX():
+ # Result/error codes
+ # Repeat button code
+ REPEAT = -1
+ # Error codes
+ BADSTART = -2
+ BADBLOCK = -3
+ BADREP = -4
+ OVERRUN = -5
+ BADDATA = -6
+ BADADDR = -7
+
+ def __init__(self, pin, nedges, tblock, callback, *args): # Optional args for callback
+ self._pin = pin
+ self._nedges = nedges
+ self._tblock = tblock
+ self.callback = callback
+ self.args = args
+ self._errf = lambda _ : None
+ self.verbose = False
+
+ self._times = array('i', (0 for _ in range(nedges + 1))) # +1 for overrun
+ pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING))
+ self.edge = 0
+ self.tim = Timer(-1) # Sofware timer
+ self.cb = self.decode
+
+ # Pin interrupt. Save time of each edge for later decode.
+ def _cb_pin(self, line):
+ t = ticks_us()
+ # On overrun ignore pulses until software timer times out
+ if self.edge <= self._nedges: # Allow 1 extra pulse to record overrun
+ if not self.edge: # First edge received
+ self.tim.init(period=self._tblock , mode=Timer.ONE_SHOT, callback=self.cb)
+ self._times[self.edge] = t
+ self.edge += 1
+
+ def do_callback(self, cmd, addr, ext, thresh=0):
+ self.edge = 0
+ if cmd >= thresh:
+ self.callback(cmd, addr, ext, *self.args)
+ else:
+ self._errf(cmd)
+
+ def error_function(self, func):
+ self._errf = func
+
+ def close(self):
+ self._pin.irq(handler = None)
+ self.tim.deinit()
--- /dev/null
+# acquire.py Acquire a pulse train from an IR remote
+# Supports NEC protocol.
+# For a remote using NEC see https://www.adafruit.com/products/389
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from machine import Pin, freq
+from sys import platform
+
+from utime import sleep_ms, ticks_us, ticks_diff
+from ir_rx import IR_RX
+
+
+class IR_GET(IR_RX):
+ def __init__(self, pin, nedges=100, twait=100, display=True):
+ self.display = display
+ super().__init__(pin, nedges, twait, lambda *_ : None)
+ self.data = None
+
+ def decode(self, _):
+ def near(v, target):
+ return target * 0.8 < v < target * 1.2
+ lb = self.edge - 1 # Possible length of burst
+ if lb < 3:
+ return # Noise
+ burst = []
+ for x in range(lb):
+ dt = ticks_diff(self._times[x + 1], self._times[x])
+ if x > 0 and dt > 10000: # Reached gap between repeats
+ break
+ burst.append(dt)
+ lb = len(burst) # Actual length
+ # Duration of pulse train 24892 for RC-5 22205 for RC-6
+ duration = ticks_diff(self._times[lb - 1], self._times[0])
+
+ if self.display:
+ for x, e in enumerate(burst):
+ print('{:03d} {:5d}'.format(x, e))
+ print()
+ # Attempt to determine protocol
+ ok = False # Protocol not yet found
+ if near(burst[0], 9000) and lb == 67:
+ print('NEC')
+ ok = True
+
+ if not ok and near(burst[0], 2400) and near(burst[1], 600): # Maybe Sony
+ try:
+ nbits = {25:12, 31:15, 41:20}[lb]
+ except KeyError:
+ pass
+ else:
+ ok = True
+ print('Sony {}bit'.format(nbits))
+
+ if not ok and near(burst[0], 889): # Maybe RC-5
+ if near(duration, 24892) and near(max(burst), 1778):
+ print('Philps RC-5')
+ ok = True
+
+ if not ok and near(burst[0], 2666) and near(burst[1], 889): # RC-6?
+ if near(duration, 22205) and near(burst[1], 889) and near(burst[2], 444):
+ print('Philips RC-6 mode 0')
+ ok = True
+
+ if not ok and near(burst[0], 2000) and near(burst[1], 1000):
+ if near(duration, 19000):
+ print('Microsoft MCE edition protocol.')
+ # Constant duration, variable burst length, presumably bi-phase
+ print('Protocol start {} {} Burst length {} duration {}'.format(burst[0], burst[1], lb, duration))
+ ok = True
+
+ if not ok and near(burst[0], 4500) and near(burst[1], 4500) and lb == 67: # Samsung
+ print('Samsung')
+ ok = True
+
+ if not ok and near(burst[0], 3500) and near(burst[1], 1680): # Panasonic?
+ print('Unsupported protocol. Panasonic?')
+ ok = True
+
+ if not ok:
+ print('Unknown protocol start {} {} Burst length {} duration {}'.format(burst[0], burst[1], lb, duration))
+
+ print()
+ self.data = burst
+ # Set up for new data burst. Run null callback
+ self.do_callback(0, 0, 0)
+
+ def acquire(self):
+ while self.data is None:
+ sleep_ms(5)
+ self.close()
+ return self.data
+
+def test():
+ # Define pin according to platform
+ if platform == 'pyboard':
+ pin = Pin('X3', Pin.IN)
+ elif platform == 'esp8266':
+ freq(160000000)
+ pin = Pin(13, Pin.IN)
+ elif platform == 'esp32' or platform == 'esp32_LoBo':
+ pin = Pin(23, Pin.IN)
+ elif platform == 'rp2':
+ pin = Pin(16, Pin.IN)
+ irg = IR_GET(pin)
+ print('Waiting for IR data...')
+ return irg.acquire()
--- /dev/null
+# mce.py Decoder for IR remote control using synchronous code
+# Supports Microsoft MCE edition remote protocol.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+# WARNING: This is experimental and subject to change.
+
+from utime import ticks_us, ticks_diff
+from ir_rx import IR_RX
+
+class MCE(IR_RX):
+ init_cs = 4 # http://www.hifi-remote.com/johnsfine/DecodeIR.html#OrtekMCE says 3
+ def __init__(self, pin, callback, *args):
+ # Block lasts ~19ms and has <= 34 edges
+ super().__init__(pin, 34, 25, callback, *args)
+
+ def decode(self, _):
+ def check(v):
+ if self.init_cs == -1:
+ return True
+ csum = v >> 12
+ cs = self.init_cs
+ for _ in range(12):
+ if v & 1:
+ cs += 1
+ v >>= 1
+ return cs == csum
+
+ try:
+ t0 = ticks_diff(self._times[1], self._times[0]) # 2000μs mark
+ t1 = ticks_diff(self._times[2], self._times[1]) # 1000μs space
+ if not ((1800 < t0 < 2200) and (800 < t1 < 1200)):
+ raise RuntimeError(self.BADSTART)
+ nedges = self.edge # No. of edges detected
+ if not 14 <= nedges <= 34:
+ raise RuntimeError(self.OVERRUN if nedges > 28 else self.BADSTART)
+ # Manchester decode
+ mask = 1
+ bit = 1
+ v = 0
+ x = 2
+ for _ in range(16):
+ # -1 convert count to index, -1 because we look ahead
+ if x > nedges - 2:
+ raise RuntimeError(self.BADBLOCK)
+ # width is 500/1000 nominal
+ width = ticks_diff(self._times[x + 1], self._times[x])
+ if not 250 < width < 1350:
+ self.verbose and print('Bad block 3 Width', width, 'x', x)
+ raise RuntimeError(self.BADBLOCK)
+ short = int(width < 750)
+ bit ^= short ^ 1
+ v |= mask if bit else 0
+ mask <<= 1
+ x += 1 + short
+
+ self.verbose and print(bin(v))
+ if not check(v):
+ raise RuntimeError(self.BADDATA)
+ val = (v >> 6) & 0x3f
+ addr = v & 0xf # Constant for all buttons on my remote
+ ctrl = (v >> 4) & 3
+
+ except RuntimeError as e:
+ val, addr, ctrl = e.args[0], 0, 0
+ # Set up for new data burst and run user callback/error function
+ self.do_callback(val, addr, ctrl)
--- /dev/null
+# nec.py Decoder for IR remote control using synchronous code
+# Supports NEC and Samsung protocols.
+# With thanks to J.E. Tannenbaum for information re Samsung protocol
+
+# For a remote using NEC see https://www.adafruit.com/products/389
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020-2022 Released under the MIT license
+
+from utime import ticks_us, ticks_diff
+from ir_rx import IR_RX
+
+class NEC_ABC(IR_RX):
+ def __init__(self, pin, extended, samsung, callback, *args):
+ # Block lasts <= 80ms (extended mode) and has 68 edges
+ super().__init__(pin, 68, 80, callback, *args)
+ self._extended = extended
+ self._addr = 0
+ self._leader = 2500 if samsung else 4000 # 4.5ms for Samsung else 9ms
+
+ def decode(self, _):
+ try:
+ if self.edge > 68:
+ raise RuntimeError(self.OVERRUN)
+ width = ticks_diff(self._times[1], self._times[0])
+ if width < self._leader: # 9ms leading mark for all valid data
+ raise RuntimeError(self.BADSTART)
+ width = ticks_diff(self._times[2], self._times[1])
+ if width > 3000: # 4.5ms space for normal data
+ if self.edge < 68: # Haven't received the correct number of edges
+ raise RuntimeError(self.BADBLOCK)
+ # Time spaces only (marks are always 562.5µs)
+ # Space is 1.6875ms (1) or 562.5µs (0)
+ # Skip last bit which is always 1
+ val = 0
+ for edge in range(3, 68 - 2, 2):
+ val >>= 1
+ if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120:
+ val |= 0x80000000
+ elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges.
+ raise RuntimeError(self.REPEAT if self.edge == 4 else self.BADREP) # Treat REPEAT as error.
+ else:
+ raise RuntimeError(self.BADSTART)
+ addr = val & 0xff # 8 bit addr
+ cmd = (val >> 16) & 0xff
+ if cmd != (val >> 24) ^ 0xff:
+ raise RuntimeError(self.BADDATA)
+ if addr != ((val >> 8) ^ 0xff) & 0xff: # 8 bit addr doesn't match check
+ if not self._extended:
+ raise RuntimeError(self.BADADDR)
+ addr |= val & 0xff00 # pass assumed 16 bit address to callback
+ self._addr = addr
+ except RuntimeError as e:
+ cmd = e.args[0]
+ addr = self._addr if cmd == self.REPEAT else 0 # REPEAT uses last address
+ # Set up for new data burst and run user callback
+ self.do_callback(cmd, addr, 0, self.REPEAT)
+
+class NEC_8(NEC_ABC):
+ def __init__(self, pin, callback, *args):
+ super().__init__(pin, False, False, callback, *args)
+
+class NEC_16(NEC_ABC):
+ def __init__(self, pin, callback, *args):
+ super().__init__(pin, True, False, callback, *args)
+
+class SAMSUNG(NEC_ABC):
+ def __init__(self, pin, callback, *args):
+ super().__init__(pin, True, True, callback, *args)
--- /dev/null
+# philips.py Decoder for IR remote control using synchronous code
+# Supports Philips RC-5 RC-6 mode 0 protocols.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from utime import ticks_us, ticks_diff
+from ir_rx import IR_RX
+
+class RC5_IR(IR_RX):
+ def __init__(self, pin, callback, *args):
+ # Block lasts <= 30ms and has <= 28 edges
+ super().__init__(pin, 28, 30, callback, *args)
+
+ def decode(self, _):
+ try:
+ nedges = self.edge # No. of edges detected
+ if not 14 <= nedges <= 28:
+ raise RuntimeError(self.OVERRUN if nedges > 28 else self.BADSTART)
+ # Regenerate bitstream
+ bits = 1
+ bit = 1
+ v = 1 # 14 bit bitstream, MSB always 1
+ x = 0
+ while bits < 14:
+ # -1 convert count to index, -1 because we look ahead
+ if x > nedges - 2:
+ print('Bad block 1 edges', nedges, 'x', x)
+ raise RuntimeError(self.BADBLOCK)
+ # width is 889/1778 nominal
+ width = ticks_diff(self._times[x + 1], self._times[x])
+ if not 500 < width < 2100:
+ self.verbose and print('Bad block 3 Width', width, 'x', x)
+ raise RuntimeError(self.BADBLOCK)
+ short = width < 1334
+ if not short:
+ bit ^= 1
+ v <<= 1
+ v |= bit
+ bits += 1
+ x += 1 + int(short)
+ self.verbose and print(bin(v))
+ # Split into fields (val, addr, ctrl)
+ val = (v & 0x3f) | (0 if ((v >> 12) & 1) else 0x40) # Correct the polarity of S2
+ addr = (v >> 6) & 0x1f
+ ctrl = (v >> 11) & 1
+
+ except RuntimeError as e:
+ val, addr, ctrl = e.args[0], 0, 0
+ # Set up for new data burst and run user callback
+ self.do_callback(val, addr, ctrl)
+
+
+class RC6_M0(IR_RX):
+ # Even on Pyboard D the 444μs nominal pulses can be recorded as up to 705μs
+ # Scope shows 360-520 μs (-84μs +76μs relative to nominal)
+ # Header nominal 2666, 889, 444, 889, 444, 444, 444, 444 carrier ON at end
+ hdr = ((1800, 4000), (593, 1333), (222, 750), (593, 1333), (222, 750), (222, 750), (222, 750), (222, 750))
+ def __init__(self, pin, callback, *args):
+ # Block lasts 23ms nominal and has <=44 edges
+ super().__init__(pin, 44, 30, callback, *args)
+
+ def decode(self, _):
+ try:
+ nedges = self.edge # No. of edges detected
+ if not 22 <= nedges <= 44:
+ raise RuntimeError(self.OVERRUN if nedges > 28 else self.BADSTART)
+ for x, lims in enumerate(self.hdr):
+ width = ticks_diff(self._times[x + 1], self._times[x])
+ if not (lims[0] < width < lims[1]):
+ self.verbose and print('Bad start', x, width, lims)
+ raise RuntimeError(self.BADSTART)
+ x += 1
+ width = ticks_diff(self._times[x + 1], self._times[x])
+ # 2nd bit of last 0 is 444μs (0) or 1333μs (1)
+ if not 222 < width < 1555:
+ self.verbose and print('Bad block 1 Width', width, 'x', x)
+ raise RuntimeError(self.BADBLOCK)
+ short = width < 889
+ v = int(not short)
+ bit = v
+ bits = 1 # Bits decoded
+ x += 1 + int(short)
+ width = ticks_diff(self._times[x + 1], self._times[x])
+ if not 222 < width < 1555:
+ self.verbose and print('Bad block 2 Width', width, 'x', x)
+ raise RuntimeError(self.BADBLOCK)
+ short = width < 1111
+ if not short:
+ bit ^= 1
+ x += 1 + int(short) # If it's short, we know width of next
+ v <<= 1
+ v |= bit # MSB of result
+ bits += 1
+ # Decode bitstream
+ while bits < 17:
+ # -1 convert count to index, -1 because we look ahead
+ if x > nedges - 2:
+ raise RuntimeError(self.BADBLOCK)
+ # width is 444/889 nominal
+ width = ticks_diff(self._times[x + 1], self._times[x])
+ if not 222 < width < 1111:
+ self.verbose and print('Bad block 3 Width', width, 'x', x)
+ raise RuntimeError(self.BADBLOCK)
+ short = width < 666
+ if not short:
+ bit ^= 1
+ v <<= 1
+ v |= bit
+ bits += 1
+ x += 1 + int(short)
+
+ if self.verbose:
+ ss = '20-bit format {:020b} x={} nedges={} bits={}'
+ print(ss.format(v, x, nedges, bits))
+
+ val = v & 0xff
+ addr = (v >> 8) & 0xff
+ ctrl = (v >> 16) & 1
+ except RuntimeError as e:
+ val, addr, ctrl = e.args[0], 0, 0
+ # Set up for new data burst and run user callback
+ self.do_callback(val, addr, ctrl)
--- /dev/null
+# print_error.py Error print for IR receiver
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from ir_rx import IR_RX
+
+_errors = {IR_RX.BADSTART : 'Invalid start pulse',
+ IR_RX.BADBLOCK : 'Error: bad block',
+ IR_RX.BADREP : 'Error: repeat',
+ IR_RX.OVERRUN : 'Error: overrun',
+ IR_RX.BADDATA : 'Error: invalid data',
+ IR_RX.BADADDR : 'Error: invalid address'}
+
+def print_error(data):
+ if data in _errors:
+ print(_errors[data])
+ else:
+ print('Unknown error code:', data)
--- /dev/null
+# sony.py Decoder for IR remote control using synchronous code
+# Sony SIRC protocol.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from utime import ticks_us, ticks_diff
+from ir_rx import IR_RX
+
+class SONY_ABC(IR_RX): # Abstract base class
+ def __init__(self, pin, bits, callback, *args):
+ # 20 bit block has 42 edges and lasts <= 39ms nominal. Add 4ms to time
+ # for tolerances except in 20 bit case where timing is tight with a
+ # repeat period of 45ms.
+ t = int(3 + bits * 1.8) + (1 if bits == 20 else 4)
+ super().__init__(pin, 2 + bits * 2, t, callback, *args)
+ self._addr = 0
+ self._bits = 20
+
+ def decode(self, _):
+ try:
+ nedges = self.edge # No. of edges detected
+ self.verbose and print('nedges', nedges)
+ if nedges > 42:
+ raise RuntimeError(self.OVERRUN)
+ bits = (nedges - 2) // 2
+ if nedges not in (26, 32, 42) or bits > self._bits:
+ raise RuntimeError(self.BADBLOCK)
+ self.verbose and print('SIRC {}bit'.format(bits))
+ width = ticks_diff(self._times[1], self._times[0])
+ if not 1800 < width < 3000: # 2.4ms leading mark for all valid data
+ raise RuntimeError(self.BADSTART)
+ width = ticks_diff(self._times[2], self._times[1])
+ if not 350 < width < 1000: # 600μs space
+ raise RuntimeError(self.BADSTART)
+
+ val = 0 # Data received, LSB 1st
+ x = 2
+ bit = 1
+ while x <= nedges - 2:
+ if ticks_diff(self._times[x + 1], self._times[x]) > 900:
+ val |= bit
+ bit <<= 1
+ x += 2
+ cmd = val & 0x7f # 7 bit command
+ val >>= 7
+ if nedges < 42:
+ addr = val & 0xff # 5 or 8 bit addr
+ val = 0
+ else:
+ addr = val & 0x1f # 5 bit addr
+ val >>= 5 # 8 bit extended
+ except RuntimeError as e:
+ cmd = e.args[0]
+ addr = 0
+ val = 0
+ self.do_callback(cmd, addr, val)
+
+class SONY_12(SONY_ABC):
+ def __init__(self, pin, callback, *args):
+ super().__init__(pin, 12, callback, *args)
+
+class SONY_15(SONY_ABC):
+ def __init__(self, pin, callback, *args):
+ super().__init__(pin, 15, callback, *args)
+
+class SONY_20(SONY_ABC):
+ def __init__(self, pin, callback, *args):
+ super().__init__(pin, 20, callback, *args)
+
--- /dev/null
+# test.py Test program for IR remote control decoder
+# Supports Pyboard, ESP32 and ESP8266
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020-2022 Released under the MIT license
+
+# Run this to characterise a remote.
+
+from sys import platform
+import time
+import gc
+from machine import Pin, freq
+from ir_rx.print_error import print_error # Optional print of error codes
+
+# Import all implemented classes
+from ir_rx.nec import NEC_8, NEC_16, SAMSUNG
+from ir_rx.sony import SONY_12, SONY_15, SONY_20
+from ir_rx.philips import RC5_IR, RC6_M0
+from ir_rx.mce import MCE
+
+# Define pin according to platform
+if platform == "pyboard":
+ p = Pin("X3", Pin.IN)
+elif platform == "esp8266":
+ freq(160000000)
+ p = Pin(13, Pin.IN)
+elif platform == "esp32" or platform == "esp32_LoBo":
+ p = Pin(23, Pin.IN)
+elif platform == "rp2":
+ p = Pin(16, Pin.IN)
+
+# User callback
+def cb(data, addr, ctrl):
+ if data < 0: # NEC protocol sends repeat codes.
+ print("Repeat code.")
+ else:
+ print(f"Data 0x{data:02x} Addr 0x{addr:04x} Ctrl 0x{ctrl:02x}")
+
+
+def test(proto=0):
+ classes = (NEC_8, NEC_16, SONY_12, SONY_15, SONY_20, RC5_IR, RC6_M0, MCE, SAMSUNG)
+ ir = classes[proto](p, cb) # Instantiate receiver
+ ir.error_function(print_error) # Show debug information
+ # ir.verbose = True
+ # A real application would do something here...
+ try:
+ while True:
+ print("running")
+ time.sleep(5)
+ gc.collect()
+ except KeyboardInterrupt:
+ ir.close()
+
+
+# **** DISPLAY GREETING ****
+s = """Test for IR receiver. Run:
+from ir_rx.test import test
+test() for NEC 8 bit protocol,
+test(1) for NEC 16 bit,
+test(2) for Sony SIRC 12 bit,
+test(3) for Sony SIRC 15 bit,
+test(4) for Sony SIRC 20 bit,
+test(5) for Philips RC-5 protocol,
+test(6) for RC6 mode 0.
+test(7) for Microsoft Vista MCE.
+test(8) for Samsung.
+
+Hit ctrl-c to stop, then ctrl-d to soft reset."""
+
+print(s)
--- /dev/null
+# __init__.py Nonblocking IR blaster
+# Runs on Pyboard D or Pyboard 1.x (not Pyboard Lite), ESP32 and RP2
+
+# Released under the MIT License (MIT). See LICENSE.
+
+# Copyright (c) 2020-2021 Peter Hinch
+from sys import platform
+ESP32 = platform == 'esp32' # Loboris not supported owing to RMT
+RP2 = platform == 'rp2'
+if ESP32:
+ from machine import Pin, PWM
+ from esp32 import RMT
+elif RP2:
+ from .rp2_rmt import RP2_RMT
+else:
+ from pyb import Pin, Timer # Pyboard does not support machine.PWM
+
+from micropython import const
+from array import array
+from time import ticks_us, ticks_diff
+# import micropython
+# micropython.alloc_emergency_exception_buf(100)
+
+
+# Shared by NEC
+STOP = const(0) # End of data
+
+# IR abstract base class. Array holds periods in μs between toggling 36/38KHz
+# carrier on or off. Physical transmission occurs in an ISR context controlled
+# by timer 2 and timer 5. See TRANSMITTER.md for details of operation.
+class IR:
+ _active_high = True # Hardware turns IRLED on if pin goes high.
+ _space = 0 # Duty ratio that causes IRLED to be off
+ timeit = False # Print timing info
+
+ @classmethod
+ def active_low(cls):
+ if ESP32:
+ raise ValueError('Cannot set active low on ESP32')
+ cls._active_high = False
+ cls._space = 100
+
+ def __init__(self, pin, cfreq, asize, duty, verbose):
+ if ESP32:
+ self._rmt = RMT(0, pin=pin, clock_div=80, tx_carrier = (cfreq, duty, 1))
+ # 1μs resolution
+ elif RP2: # PIO-based RMT-like device
+ self._rmt = RP2_RMT(pin_pulse=None, carrier=(pin, cfreq, duty)) # 1μs resolution
+ asize += 1 # Allow for possible extra space pulse
+ else: # Pyboard
+ if not IR._active_high:
+ duty = 100 - duty
+ tim = Timer(2, freq=cfreq) # Timer 2/pin produces 36/38/40KHz carrier
+ self._ch = tim.channel(1, Timer.PWM, pin=pin)
+ self._ch.pulse_width_percent(self._space) # Turn off IR LED
+ # Pyboard: 0 <= pulse_width_percent <= 100
+ self._duty = duty
+ self._tim = Timer(5) # Timer 5 controls carrier on/off times
+ self._tcb = self._cb # Pre-allocate
+ self._arr = array('H', 0 for _ in range(asize)) # on/off times (μs)
+ self._mva = memoryview(self._arr)
+ # Subclass interface
+ self.verbose = verbose
+ self.carrier = False # Notional carrier state while encoding biphase
+ self.aptr = 0 # Index into array
+
+ def _cb(self, t): # T5 callback, generate a carrier mark or space
+ t.deinit()
+ p = self.aptr
+ v = self._arr[p]
+ if v == STOP:
+ self._ch.pulse_width_percent(self._space) # Turn off IR LED.
+ return
+ self._ch.pulse_width_percent(self._space if p & 1 else self._duty)
+ self._tim.init(prescaler=84, period=v, callback=self._tcb)
+ self.aptr += 1
+
+ # Public interface
+ # Before populating array, zero pointer, set notional carrier state (off).
+ def transmit(self, addr, data, toggle=0, validate=False): # NEC: toggle is unused
+ t = ticks_us()
+ if validate:
+ if addr > self.valid[0] or addr < 0:
+ raise ValueError('Address out of range', addr)
+ if data > self.valid[1] or data < 0:
+ raise ValueError('Data out of range', data)
+ if toggle > self.valid[2] or toggle < 0:
+ raise ValueError('Toggle out of range', toggle)
+ self.aptr = 0 # Inital conditions for tx: index into array
+ self.carrier = False
+ self.tx(addr, data, toggle) # Subclass populates ._arr
+ self.trigger() # Initiate transmission
+ if self.timeit:
+ dt = ticks_diff(ticks_us(), t)
+ print('Time = {}μs'.format(dt))
+
+ # Subclass interface
+ def trigger(self): # Used by NEC to initiate a repeat frame
+ if ESP32:
+ self._rmt.write_pulses(tuple(self._mva[0 : self.aptr]))
+ elif RP2:
+ self.append(STOP)
+ self._rmt.send(self._arr)
+ else:
+ self.append(STOP)
+ self.aptr = 0 # Reset pointer
+ self._cb(self._tim) # Initiate physical transmission.
+
+ def append(self, *times): # Append one or more time peiods to ._arr
+ for t in times:
+ self._arr[self.aptr] = t
+ self.aptr += 1
+ self.carrier = not self.carrier # Keep track of carrier state
+ self.verbose and print('append', t, 'carrier', self.carrier)
+
+ def add(self, t): # Increase last time value (for biphase)
+ assert t > 0
+ self.verbose and print('add', t)
+ # .carrier unaffected
+ self._arr[self.aptr - 1] += t
+
+
+# Given an iterable (e.g. list or tuple) of times, emit it as an IR stream.
+class Player(IR):
+
+ def __init__(self, pin, freq=38000, verbose=False): # NEC specifies 38KHz
+ super().__init__(pin, freq, 68, 33, verbose) # Measured duty ratio 33%
+
+ def play(self, lst):
+ for x, t in enumerate(lst):
+ self._arr[x] = t
+ self.aptr = x + 1
+ self.trigger()
--- /dev/null
+# mce.py Encoder for IR remote control using synchronous code
+# Supports Microsoft MCE edition remote protocol.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+# WARNING: This is experimental and subject to change.
+
+from micropython import const
+from ir_tx import IR
+
+_TBIT = const(500) # Time (μs) for pulse of carrier
+
+
+class MCE(IR):
+ valid = (0xf, 0x3f, 3) # Max addr, data, toggle
+ init_cs = 4 # http://www.hifi-remote.com/johnsfine/DecodeIR.html#OrtekMCE says 3
+
+ def __init__(self, pin, freq=38000, verbose=False):
+ super().__init__(pin, freq, 34, 30, verbose)
+
+ def tx(self, addr, data, toggle):
+ def checksum(v):
+ cs = self.init_cs
+ for _ in range(12):
+ if v & 1:
+ cs += 1
+ v >>= 1
+ return cs
+
+ self.append(2000, 1000, _TBIT)
+ d = ((data & 0x3f) << 6) | (addr & 0xf) | ((toggle & 3) << 4)
+ d |= checksum(d) << 12
+ self.verbose and print(bin(d))
+
+ mask = 1
+ while mask < 0x10000:
+ bit = bool(d & mask)
+ if bit ^ self.carrier:
+ self.add(_TBIT)
+ self.append(_TBIT)
+ else:
+ self.append(_TBIT, _TBIT)
+ mask <<= 1
--- /dev/null
+# ir_tx.mcetest Test for nonblocking MCE IR blaster.
+
+# Released under the MIT License (MIT). See LICENSE.
+
+# Copyright (c) 2020 Peter Hinch
+
+# Implements a 2-button remote control on a Pyboard with auto repeat.
+from sys import platform
+ESP32 = platform == 'esp32'
+if ESP32:
+ from machine import Pin
+else:
+ from pyb import Pin, LED
+
+from micropython import const
+import uasyncio as asyncio
+from aswitch import Switch, Delay_ms
+from ir_tx.mce import MCE
+
+loop = asyncio.get_event_loop()
+_FIRST = const(0)
+_REP = const(1)
+_END = const(2)
+_REP_DELAY = const(60)
+
+class Rbutton:
+ def __init__(self, irb, pin, addr, data, rep_code=False):
+ self.irb = irb
+ self.sw = Switch(pin)
+ self.addr = addr
+ self.data = data
+ self.rep_code = rep_code
+ self.sw.close_func(self.cfunc)
+ self.sw.open_func(self.ofunc)
+ self.tim = Delay_ms(self.repeat)
+ self.stop = False
+
+ def cfunc(self): # Button push: send data and set up for repeats
+ self.irb.transmit(self.addr, self.data, _FIRST, True)
+ self.tim.trigger(_REP_DELAY)
+
+ def ofunc(self): # Button release: cancel repeat timer
+ self.stop = True
+
+ async def repeat(self):
+ await asyncio.sleep(0) # Let timer stop before retriggering
+ if self.stop: # Button has been released: send last message
+ self.stop = False
+ self.tim.stop() # Not strictly necessary
+ self.irb.transmit(self.addr, self.data, _END, True)
+ else:
+ self.tim.trigger(_REP_DELAY)
+ self.irb.transmit(self.addr, self.data, _REP, True)
+
+async def main():
+ pin = Pin(23, Pin.OUT, value = 0) if ESP32 else Pin('X1')
+ irb = MCE(pin) # verbose=True)
+ # Uncomment the following to print transmit timing
+ # irb.timeit = True
+
+ b = [] # Rbutton instances
+ px3 = Pin(18, Pin.IN, Pin.PULL_UP) if ESP32 else Pin('X3', Pin.IN, Pin.PULL_UP)
+ px4 = Pin(19, Pin.IN, Pin.PULL_UP) if ESP32 else Pin('X4', Pin.IN, Pin.PULL_UP)
+ b.append(Rbutton(irb, px3, 0x1, 0x7))
+ b.append(Rbutton(irb, px4, 0xe, 0xb))
+ if ESP32:
+ while True:
+ print('Running')
+ await asyncio.sleep(5)
+ else:
+ led = LED(1)
+ while True:
+ await asyncio.sleep_ms(500) # Obligatory flashing LED.
+ led.toggle()
+
+# Greeting strings. Common:
+s = '''Test for IR transmitter. Run:
+from ir_tx.mcetest import test
+test()
+'''
+# Pyboard:
+spb = '''
+IR LED on pin X1
+Ground pin X3 to send addr 1 data 7
+Ground pin X4 to send addr 0xe data 0x0b.'''
+# ESP32
+sesp = '''
+IR LED gate on pins 23, 21
+Ground pin 18 to send addr 1 data 7
+Ground pin 19 to send addr 0xe data 0x0b.'''
+
+print(''.join((s, sesp)) if ESP32 else ''.join((s, spb)))
+
+def test():
+ loop.run_until_complete(main())
--- /dev/null
+# nec.py Encoder for IR remote control using synchronous code
+# NEC protocol.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020-2022 Released under the MIT license
+
+# With thanks to J.E. Tannenbaum for information re Samsung protocol
+from micropython import const
+from ir_tx import IR, STOP
+
+_TBURST = const(563)
+_T_ONE = const(1687)
+
+class NEC(IR):
+ valid = (0xffff, 0xff, 0) # Max addr, data, toggle
+ samsung = False
+
+ def __init__(self, pin, freq=38000, verbose=False): # NEC specifies 38KHz also Samsung
+ super().__init__(pin, freq, 68, 33, verbose) # Measured duty ratio 33%
+
+ def _bit(self, b):
+ self.append(_TBURST, _T_ONE if b else _TBURST)
+
+ def tx(self, addr, data, _): # Ignore toggle
+ if self.samsung:
+ self.append(4500, 4500)
+ else:
+ self.append(9000, 4500)
+ if addr < 256: # Short address: append complement
+ if self.samsung:
+ addr |= addr << 8
+ else:
+ addr |= ((addr ^ 0xff) << 8)
+ for _ in range(16):
+ self._bit(addr & 1)
+ addr >>= 1
+ data |= ((data ^ 0xff) << 8)
+ for _ in range(16):
+ self._bit(data & 1)
+ data >>= 1
+ self.append(_TBURST)
+
+ def repeat(self):
+ self.aptr = 0
+ self.append(9000, 2250, _TBURST)
+ self.trigger() # Initiate physical transmission.
--- /dev/null
+# philips.py Encoder for IR remote control using synchronous code
+# RC-5 and RC-6 mode 0 protocols.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from micropython import const
+from ir_tx import IR
+
+# Philips RC5 protocol
+_T_RC5 = const(889) # Time for pulse of carrier
+
+
+class RC5(IR):
+ valid = (0x1f, 0x7f, 1) # Max addr, data, toggle
+
+ def __init__(self, pin, freq=36000, verbose=False):
+ super().__init__(pin, freq, 28, 30, verbose)
+
+ def tx(self, addr, data, toggle): # Fix RC5X S2 bit polarity
+ d = (data & 0x3f) | ((addr & 0x1f) << 6) | (((data & 0x40) ^ 0x40) << 6) | ((toggle & 1) << 11)
+ self.verbose and print(bin(d))
+ mask = 0x2000
+ while mask:
+ if mask == 0x2000:
+ self.append(_T_RC5)
+ else:
+ bit = bool(d & mask)
+ if bit ^ self.carrier:
+ self.add(_T_RC5)
+ self.append(_T_RC5)
+ else:
+ self.append(_T_RC5, _T_RC5)
+ mask >>= 1
+
+# Philips RC6 mode 0 protocol
+_T_RC6 = const(444)
+_T2_RC6 = const(889)
+
+class RC6_M0(IR):
+ valid = (0xff, 0xff, 1) # Max addr, data, toggle
+
+ def __init__(self, pin, freq=36000, verbose=False):
+ super().__init__(pin, freq, 44, 30, verbose)
+
+ def tx(self, addr, data, toggle):
+ # leader, 1, 0, 0, 0
+ self.append(2666, _T2_RC6, _T_RC6, _T2_RC6, _T_RC6, _T_RC6, _T_RC6, _T_RC6, _T_RC6)
+ # Append a single bit of twice duration
+ if toggle:
+ self.add(_T2_RC6)
+ self.append(_T2_RC6)
+ else:
+ self.append(_T2_RC6, _T2_RC6)
+ d = (data & 0xff) | ((addr & 0xff) << 8)
+ mask = 0x8000
+ self.verbose and print('toggle', toggle, self.carrier, bool(d & mask))
+ while mask:
+ bit = bool(d & mask)
+ if bit ^ self.carrier:
+ self.append(_T_RC6, _T_RC6)
+ else:
+ self.add(_T_RC6)
+ self.append(_T_RC6)
+ mask >>= 1
--- /dev/null
+# rp2_rmt.py A RMT-like class for the RP2.
+
+# Released under the MIT License (MIT). See LICENSE.
+
+# Copyright (c) 2021 Peter Hinch
+
+from machine import Pin, PWM
+import rp2
+
+@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, autopull=True, pull_thresh=32)
+def pulsetrain():
+ wrap_target()
+ out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end.
+ irq(rel(0))
+ set(pins, 1) # Set pin high
+ label('loop')
+ jmp(x_dec,'loop')
+ irq(rel(0))
+ set(pins, 0) # Set pin low
+ out(y, 32) # Low time.
+ label('loop_lo')
+ jmp(y_dec,'loop_lo')
+ wrap()
+
+@rp2.asm_pio(autopull=True, pull_thresh=32)
+def irqtrain():
+ wrap_target()
+ out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end.
+ irq(rel(0))
+ label('loop')
+ jmp(x_dec,'loop')
+ wrap()
+
+class DummyPWM:
+ def duty_u16(self, _):
+ pass
+
+class RP2_RMT:
+
+ def __init__(self, pin_pulse=None, carrier=None, sm_no=0, sm_freq=1_000_000):
+ if carrier is None:
+ self.pwm = DummyPWM()
+ self.duty = (0, 0)
+ else:
+ pin_car, freq, duty = carrier
+ self.pwm = PWM(pin_car) # Set up PWM with carrier off.
+ self.pwm.freq(freq)
+ self.pwm.duty_u16(0)
+ self.duty = (int(0xffff * duty // 100), 0)
+ if pin_pulse is None:
+ self.sm = rp2.StateMachine(sm_no, irqtrain, freq=sm_freq)
+ else:
+ self.sm = rp2.StateMachine(sm_no, pulsetrain, freq=sm_freq, set_base=pin_pulse)
+ self.apt = 0 # Array index
+ self.arr = None # Array
+ self.ict = None # Current IRQ count
+ self.icm = 0 # End IRQ count
+ self.reps = 0 # 0 == forever n == no. of reps
+ rp2.PIO(0).irq(self._cb)
+
+ # IRQ callback. Because of FIFO IRQ's keep arriving after STOP.
+ def _cb(self, pio):
+ self.pwm.duty_u16(self.duty[self.ict & 1])
+ self.ict += 1
+ if d := self.arr[self.apt]: # If data available feed FIFO
+ self.sm.put(d)
+ self.apt += 1
+ else:
+ if r := self.reps != 1: # All done if reps == 1
+ if r: # 0 == run forever
+ self.reps -= 1
+ self.sm.put(self.arr[0])
+ self.apt = 1 # Set pointer and count to state
+ self.ict = 1 # after 1st IRQ
+
+ # Arg is an array of times in μs terminated by 0.
+ def send(self, ar, reps=1, check=True):
+ self.sm.active(0)
+ self.reps = reps
+ ar[-1] = 0 # Ensure at least one STOP
+ for x, d in enumerate(ar): # Find 1st STOP
+ if d == 0:
+ break
+ if check:
+ # Pulse train must end with a space otherwise we leave carrier on.
+ # So, if it ends with a mark, append a space. Note __init__.py
+ # ensures that there is room in array.
+ if (x & 1):
+ ar[x] = 1 # space. Duration doesn't matter.
+ x += 1
+ ar[x] = 0 # STOP
+ self.icm = x # index of 1st STOP
+ mv = memoryview(ar)
+ n = min(x, 4) # Fill FIFO if there are enough data points.
+ self.sm.put(mv[0 : n])
+ self.arr = ar # Initial conditions for ISR
+ self.apt = n # Point to next data value
+ self.ict = 0 # IRQ count
+ self.sm.active(1)
+
+ def busy(self):
+ if self.ict is None:
+ return False # Just instantiated
+ return self.ict < self.icm
+
+ def cancel(self):
+ self.reps = 1
--- /dev/null
+# sony.py Encoder for IR remote control using synchronous code
+# Sony SIRC protocol.
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from micropython import const
+from ir_tx import IR
+
+class SONY_ABC(IR):
+
+ def __init__(self, pin, bits, freq, verbose):
+ super().__init__(pin, freq, 3 + bits * 2, 30, verbose)
+ if bits not in (12, 15, 20):
+ raise ValueError('bits must be 12, 15 or 20.')
+ self.bits = bits
+
+ def tx(self, addr, data, ext):
+ self.append(2400, 600)
+ bits = self.bits
+ v = data & 0x7f
+ if bits == 12:
+ v |= (addr & 0x1f) << 7
+ elif bits == 15:
+ v |= (addr & 0xff) << 7
+ else:
+ v |= (addr & 0x1f) << 7
+ v |= (ext & 0xff) << 12
+ for _ in range(bits):
+ self.append(1200 if v & 1 else 600, 600)
+ v >>= 1
+
+# Sony specifies 40KHz
+class SONY_12(SONY_ABC):
+ valid = (0x1f, 0x7f, 0) # Max addr, data, toggle
+ def __init__(self, pin, freq=40000, verbose=False):
+ super().__init__(pin, 12, freq, verbose)
+
+class SONY_15(SONY_ABC):
+ valid = (0xff, 0x7f, 0) # Max addr, data, toggle
+ def __init__(self, pin, freq=40000, verbose=False):
+ super().__init__(pin, 15, freq, verbose)
+
+class SONY_20(SONY_ABC):
+ valid = (0x1f, 0x7f, 0xff) # Max addr, data, toggle
+ def __init__(self, pin, freq=40000, verbose=False):
+ super().__init__(pin, 20, freq, verbose)
+
--- /dev/null
+# ir_tx.test Test for nonblocking NEC/SONY/RC-5/RC-6 mode 0 IR blaster.
+
+# Released under the MIT License (MIT). See LICENSE.
+
+# Copyright (c) 2020 Peter Hinch
+
+# Implements a 2-button remote control on a Pyboard with auto repeat.
+from sys import platform
+ESP32 = platform == 'esp32'
+RP2 = platform == 'rp2'
+PYBOARD = platform == 'pyboard'
+if ESP32 or RP2:
+ from machine import Pin
+else:
+ from pyb import Pin, LED
+import uasyncio as asyncio
+from primitives.switch import Switch
+from primitives.delay_ms import Delay_ms
+# Import all implemented classes
+from ir_tx.nec import NEC
+from ir_tx.sony import SONY_12, SONY_15, SONY_20
+from ir_tx.philips import RC5, RC6_M0
+
+loop = asyncio.get_event_loop()
+
+# If button is held down normal behaviour is to retransmit
+# but most NEC models send a REPEAT code
+class Rbutton:
+ toggle = 1 # toggle is ignored in NEC mode
+ def __init__(self, irb, pin, addr, data, proto):
+ self.irb = irb
+ self.sw = Switch(pin)
+ self.addr = addr
+ self.data = data
+ self.proto = proto
+
+ self.sw.close_func(self.cfunc)
+ self.sw.open_func(self.ofunc)
+ self.tim = Delay_ms(self.repeat)
+
+ def cfunc(self): # Button push: send data
+ tog = 0 if self.proto < 3 else Rbutton.toggle # NEC, sony 12, 15: toggle==0
+ self.irb.transmit(self.addr, self.data, tog, True) # Test validation
+ #print(self.data)
+ # Auto repeat. The Sony protocol specifies 45ms but this is tight.
+ # In 20 bit mode a data burst can be upto 39ms long.
+ self.tim.trigger(108)
+
+ def ofunc(self): # Button release: cancel repeat timer
+ self.tim.stop()
+ Rbutton.toggle ^= 1 # Toggle control
+
+ async def repeat(self):
+ await asyncio.sleep(0) # Let timer stop before retriggering
+ if not self.sw(): # Button is still pressed: retrigger
+ self.tim.trigger(108)
+ if self.proto == 0:
+ self.irb.repeat() # NEC special case: send REPEAT code
+ #print(self.data)
+ else:
+ tog = 0 if self.proto < 3 else Rbutton.toggle # NEC, sony 12, 15: toggle==0
+ self.irb.transmit(self.addr, self.data, tog, True) # Test validation
+ #print(self.data)
+
+async def main(proto):
+ # Test uses a 38KHz carrier.
+ if ESP32: # Pins for IR LED gate
+ pin = Pin(23, Pin.OUT, value = 0)
+ elif RP2:
+ pin = Pin(17, Pin.OUT, value = 0)
+ else:
+ pin = Pin('X1')
+ classes = (NEC, SONY_12, SONY_15, SONY_20, RC5, RC6_M0)
+ irb = classes[proto](pin, 38000) # My decoder chip is 38KHz
+ # Uncomment the following to print transmit timing
+ # irb.timeit = True
+
+ b = [] # Rbutton instances
+ px3 = Pin('X3', Pin.IN, Pin.PULL_UP) if PYBOARD else Pin(18, Pin.IN, Pin.PULL_UP)
+ px4 = Pin('X4', Pin.IN, Pin.PULL_UP) if PYBOARD else Pin(19, Pin.IN, Pin.PULL_UP)
+ b.append(Rbutton(irb, px3, 0xef00, 0x03, proto))
+ b.append(Rbutton(irb, px4, 0xef00, 0x02, proto))
+ if ESP32:
+ while True:
+ print('Running')
+ await asyncio.sleep(5)
+ elif RP2:
+ led = Pin(25, Pin.OUT)
+ while True:
+ await asyncio.sleep_ms(500) # Obligatory flashing LED.
+ led(not led())
+ else:
+ led = LED(1)
+ while True:
+ await asyncio.sleep_ms(500) # Obligatory flashing LED.
+ led.toggle()
+
+# Greeting strings. Common:
+s = '''Test for IR transmitter. Run:
+from ir_tx.test import test
+test() for NEC protocol
+test(1) for Sony SIRC 12 bit
+test(2) for Sony SIRC 15 bit
+test(3) for Sony SIRC 20 bit
+test(4) for Philips RC-5 protocol
+test(5) for Philips RC-6 mode 0.
+'''
+
+# Pyboard:
+spb = '''
+IR LED on pin X1
+Ground pin X3 to send addr 1 data 7
+Ground pin X4 to send addr 0x10 data 0x0b.'''
+
+# ESP32
+sesp = '''
+IR LED gate on pin 23
+Ground pin 18 to send addr 1 data 7
+Ground pin 19 to send addr 0x10 data 0x0b.'''
+
+# RP2
+srp2 = '''
+IR LED gate on pin 17
+Ground pin 18 to send addr 1 data 7
+Ground pin 19 to send addr 0x10 data 0x0b.'''
+
+if ESP32:
+ print(''.join((s, sesp)))
+elif RP2:
+ print(''.join((s, srp2)))
+else:
+ print(''.join((s, spb)))
+
+def test(proto=0):
+ loop.run_until_complete(main(proto))
--- /dev/null
+from ir_tx.test import test
+test()
\ No newline at end of file
--- /dev/null
+# __init__.py Common functions for uasyncio primitives
+
+# Copyright (c) 2018-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+try:
+ import uasyncio as asyncio
+except ImportError:
+ import asyncio
+
+
+async def _g():
+ pass
+type_coro = type(_g())
+
+# If a callback is passed, run it and return.
+# If a coro is passed initiate it and return.
+# coros are passed by name i.e. not using function call syntax.
+def launch(func, tup_args):
+ res = func(*tup_args)
+ if isinstance(res, type_coro):
+ res = asyncio.create_task(res)
+ return res
+
+def set_global_exception():
+ def _handle_exception(loop, context):
+ import sys
+ sys.print_exception(context["exception"])
+ sys.exit()
+ loop = asyncio.get_event_loop()
+ loop.set_exception_handler(_handle_exception)
+
+_attrs = {
+ "AADC": "aadc",
+ "Barrier": "barrier",
+ "Condition": "condition",
+ "Delay_ms": "delay_ms",
+ "Encode": "encoder_async",
+ "Pushbutton": "pushbutton",
+ "ESP32Touch": "pushbutton",
+ "Queue": "queue",
+ "Semaphore": "semaphore",
+ "BoundedSemaphore": "semaphore",
+ "Switch": "switch",
+ "WaitAll": "events",
+ "WaitAny": "events",
+ "ESwitch": "events",
+ "EButton": "events",
+ "RingbufQueue": "ringbuf_queue",
+}
+
+# Copied from uasyncio.__init__.py
+# Lazy loader, effectively does:
+# global attr
+# from .mod import attr
+def __getattr__(attr):
+ mod = _attrs.get(attr, None)
+ if mod is None:
+ raise AttributeError(attr)
+ value = getattr(__import__(mod, None, None, True, 1), attr)
+ globals()[attr] = value
+ return value
--- /dev/null
+# aadc.py AADC (asynchronous ADC) class
+
+# Copyright (c) 2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+import io
+
+MP_STREAM_POLL_RD = const(1)
+MP_STREAM_POLL = const(3)
+MP_STREAM_ERROR = const(-1)
+
+class AADC(io.IOBase):
+ def __init__(self, adc):
+ self._adc = adc
+ self._lower = 0
+ self._upper = 65535
+ self._pol = True
+ self._last = None
+ self._sreader = asyncio.StreamReader(self)
+
+ def __iter__(self):
+ b = yield from self._sreader.read(2)
+ return int.from_bytes(b, 'little')
+
+ def _adcread(self):
+ self._last = self._adc.read_u16()
+ return self._last
+
+ def read(self, n): # For use by StreamReader only
+ return int.to_bytes(self._last, 2, 'little')
+
+ def ioctl(self, req, arg):
+ ret = MP_STREAM_ERROR
+ if req == MP_STREAM_POLL:
+ ret = 0
+ if arg & MP_STREAM_POLL_RD:
+ if self._pol ^ (self._lower <= self._adcread() <= self._upper):
+ ret |= MP_STREAM_POLL_RD
+ return ret
+
+ # *** API ***
+
+ # If normal will pause until ADC value is in range
+ # Otherwise will pause until value is out of range
+ def sense(self, normal):
+ self._pol = normal
+
+ def read_u16(self, last=False):
+ if last:
+ return self._last
+ return self._adcread()
+
+ # Call syntax: set limits for trigger
+ # lower is None: leave limits unchanged.
+ # upper is None: treat lower as relative to current value.
+ # both have values: treat as absolute limits.
+ def __call__(self, lower=None, upper=None):
+ if lower is not None:
+ if upper is None: # Relative limit
+ r = self._adcread() if self._last is None else self._last
+ self._lower = r - lower
+ self._upper = r + lower
+ else: # Absolute limits
+ self._lower = lower
+ self._upper = upper
+ return self
--- /dev/null
+# barrier.py
+# Copyright (c) 2018-2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+# Now uses Event rather than polling.
+
+try:
+ import uasyncio as asyncio
+except ImportError:
+ import asyncio
+
+from . import launch
+
+# A Barrier synchronises N coros. Each issues await barrier.
+# Execution pauses until all other participant coros are waiting on it.
+# At that point the callback is executed. Then the barrier is 'opened' and
+# execution of all participants resumes.
+
+class Barrier():
+ def __init__(self, participants, func=None, args=()):
+ self._participants = participants
+ self._count = participants
+ self._func = func
+ self._args = args
+ self._res = None
+ self._evt = asyncio.Event()
+
+ def __await__(self):
+ if self.trigger():
+ return # Other tasks have already reached barrier
+ await self._evt.wait() # Wait until last task reaches it
+
+ __iter__ = __await__
+
+ def result(self):
+ return self._res
+
+ def trigger(self):
+ self._count -=1
+ if self._count < 0:
+ raise ValueError('Too many tasks accessing Barrier')
+ if self._count > 0:
+ return False # At least 1 other task has not reached barrier
+ # All other tasks are waiting
+ if self._func is not None:
+ self._res = launch(self._func, self._args)
+ self._count = self._participants
+ self._evt.set() # Release others
+ self._evt.clear()
+ return True
+
+ def busy(self):
+ return self._count < self._participants
--- /dev/null
+# condition.py
+
+# Copyright (c) 2018-2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+try:
+ import uasyncio as asyncio
+except ImportError:
+ import asyncio
+
+# Condition class
+# from primitives.condition import Condition
+
+class Condition():
+ def __init__(self, lock=None):
+ self.lock = asyncio.Lock() if lock is None else lock
+ self.events = []
+
+ async def acquire(self):
+ await self.lock.acquire()
+
+# enable this syntax:
+# with await condition [as cond]:
+ def __await__(self):
+ await self.lock.acquire()
+ return self
+
+ __iter__ = __await__
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *_):
+ self.lock.release()
+
+ def locked(self):
+ return self.lock.locked()
+
+ def release(self):
+ self.lock.release() # Will raise RuntimeError if not locked
+
+ def notify(self, n=1): # Caller controls lock
+ if not self.lock.locked():
+ raise RuntimeError('Condition notify with lock not acquired.')
+ for _ in range(min(n, len(self.events))):
+ ev = self.events.pop()
+ ev.set()
+
+ def notify_all(self):
+ self.notify(len(self.events))
+
+ async def wait(self):
+ if not self.lock.locked():
+ raise RuntimeError('Condition wait with lock not acquired.')
+ ev = asyncio.Event()
+ self.events.append(ev)
+ self.lock.release()
+ await ev.wait()
+ await self.lock.acquire()
+ assert ev not in self.events, 'condition wait assertion fail'
+ return True # CPython compatibility
+
+ async def wait_for(self, predicate):
+ result = predicate()
+ while not result:
+ await self.wait()
+ result = predicate()
+ return result
--- /dev/null
+# delay_ms.py Now uses ThreadSafeFlag and has extra .wait() API
+# Usage:
+# from primitives import Delay_ms
+
+# Copyright (c) 2018-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+from utime import ticks_add, ticks_diff, ticks_ms
+from . import launch
+
+class Delay_ms:
+
+ class DummyTimer: # Stand-in for the timer class. Can be cancelled.
+ def cancel(self):
+ pass
+ _fake = DummyTimer()
+
+ def __init__(self, func=None, args=(), duration=1000):
+ self._func = func
+ self._args = args
+ self._durn = duration # Default duration
+ self._retn = None # Return value of launched callable
+ self._tend = None # Stop time (absolute ms).
+ self._busy = False
+ self._trig = asyncio.ThreadSafeFlag()
+ self._tout = asyncio.Event() # Timeout event
+ self.wait = self._tout.wait # Allow: await wait_ms.wait()
+ self.clear = self._tout.clear
+ self.set = self._tout.set
+ self._ttask = self._fake # Timer task
+ self._mtask = asyncio.create_task(self._run()) #Main task
+
+ async def _run(self):
+ while True:
+ await self._trig.wait() # Await a trigger
+ self._ttask.cancel() # Cancel and replace
+ await asyncio.sleep_ms(0)
+ dt = max(ticks_diff(self._tend, ticks_ms()), 0) # Beware already elapsed.
+ self._ttask = asyncio.create_task(self._timer(dt))
+
+ async def _timer(self, dt):
+ await asyncio.sleep_ms(dt)
+ self._tout.set() # Only gets here if not cancelled.
+ self._busy = False
+ if self._func is not None:
+ self._retn = launch(self._func, self._args)
+
+# API
+ # trigger may be called from hard ISR.
+ def trigger(self, duration=0): # Update absolute end time, 0-> ctor default
+ if self._mtask is None:
+ raise RuntimeError("Delay_ms.deinit() has run.")
+ self._tend = ticks_add(ticks_ms(), duration if duration > 0 else self._durn)
+ self._retn = None # Default in case cancelled.
+ self._busy = True
+ self._trig.set()
+
+ def stop(self):
+ self._ttask.cancel()
+ self._ttask = self._fake
+ self._busy = False
+ self._tout.clear()
+
+ def __call__(self): # Current running status
+ return self._busy
+
+ running = __call__
+
+ def rvalue(self):
+ return self._retn
+
+ def callback(self, func=None, args=()):
+ self._func = func
+ self._args = args
+
+ def deinit(self):
+ self.stop()
+ self._mtask.cancel()
+ self._mtask = None
--- /dev/null
+# encoder.py Asynchronous driver for incremental quadrature encoder.
+
+# Copyright (c) 2021-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+# Thanks are due to @ilium007 for identifying the issue of tracking detents,
+# https://github.com/peterhinch/micropython-async/issues/82.
+# Also to Mike Teachman (@miketeachman) for design discussions and testing
+# against a state table design
+# https://github.com/miketeachman/micropython-rotary/blob/master/rotary.py
+
+import uasyncio as asyncio
+from machine import Pin
+
+class Encoder:
+
+ def __init__(self, pin_x, pin_y, v=0, div=1, vmin=None, vmax=None,
+ mod=None, callback=lambda a, b : None, args=(), delay=100):
+ self._pin_x = pin_x
+ self._pin_y = pin_y
+ self._x = pin_x()
+ self._y = pin_y()
+ self._v = v * div # Initialise hardware value
+ self._cv = v # Current (divided) value
+ self.delay = delay # Pause (ms) for motion to stop/limit callback frequency
+
+ if ((vmin is not None) and v < vmin) or ((vmax is not None) and v > vmax):
+ raise ValueError('Incompatible args: must have vmin <= v <= vmax')
+ self._tsf = asyncio.ThreadSafeFlag()
+ trig = Pin.IRQ_RISING | Pin.IRQ_FALLING
+ try:
+ xirq = pin_x.irq(trigger=trig, handler=self._x_cb, hard=True)
+ yirq = pin_y.irq(trigger=trig, handler=self._y_cb, hard=True)
+ except TypeError: # hard arg is unsupported on some hosts
+ xirq = pin_x.irq(trigger=trig, handler=self._x_cb)
+ yirq = pin_y.irq(trigger=trig, handler=self._y_cb)
+ asyncio.create_task(self._run(vmin, vmax, div, mod, callback, args))
+
+ # Hardware IRQ's. Duration 36μs on Pyboard 1 ~50μs on ESP32.
+ # IRQ latency: 2nd edge may have occured by the time ISR runs, in
+ # which case there is no movement.
+ def _x_cb(self, pin_x):
+ if (x := pin_x()) != self._x:
+ self._x = x
+ self._v += 1 if x ^ self._pin_y() else -1
+ self._tsf.set()
+
+ def _y_cb(self, pin_y):
+ if (y := pin_y()) != self._y:
+ self._y = y
+ self._v -= 1 if y ^ self._pin_x() else -1
+ self._tsf.set()
+
+ async def _run(self, vmin, vmax, div, mod, cb, args):
+ pv = self._v # Prior hardware value
+ pcv = self._cv # Prior divided value passed to callback
+ lcv = pcv # Current value after limits applied
+ plcv = pcv # Previous value after limits applied
+ delay = self.delay
+ while True:
+ await self._tsf.wait()
+ await asyncio.sleep_ms(delay) # Wait for motion to stop.
+ hv = self._v # Sample hardware (atomic read).
+ if hv == pv: # A change happened but was negated before
+ continue # this got scheduled. Nothing to do.
+ pv = hv
+ cv = round(hv / div) # cv is divided value.
+ if not (dv := cv - pcv): # dv is change in divided value.
+ continue # No change
+ lcv += dv # lcv: divided value with limits/mod applied
+ lcv = lcv if vmax is None else min(vmax, lcv)
+ lcv = lcv if vmin is None else max(vmin, lcv)
+ lcv = lcv if mod is None else lcv % mod
+ self._cv = lcv # update ._cv for .value() before CB.
+ if lcv != plcv:
+ cb(lcv, lcv - plcv, *args) # Run user CB in uasyncio context
+ pcv = cv
+ plcv = lcv
+
+ def value(self):
+ return self._cv
--- /dev/null
+# events.py Event based primitives
+
+# Copyright (c) 2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+from . import Delay_ms
+
+# An Event-like class that can wait on an iterable of Event-like instances.
+# .wait pauses until any passed event is set.
+class WaitAny:
+ def __init__(self, events):
+ self.events = events
+ self.trig_event = None
+ self.evt = asyncio.Event()
+
+ async def wait(self):
+ tasks = [asyncio.create_task(self.wt(event)) for event in self.events]
+ try:
+ await self.evt.wait()
+ finally:
+ self.evt.clear()
+ for task in tasks:
+ task.cancel()
+ return self.trig_event
+
+ async def wt(self, event):
+ await event.wait()
+ self.evt.set()
+ self.trig_event = event
+
+ def event(self):
+ return self.trig_event
+
+ def clear(self):
+ for evt in (x for x in self.events if hasattr(x, 'clear')):
+ evt.clear()
+
+# An Event-like class that can wait on an iterable of Event-like instances,
+# .wait pauses until all passed events have been set.
+class WaitAll:
+ def __init__(self, events):
+ self.events = events
+
+ async def wait(self):
+ async def wt(event):
+ await event.wait()
+ tasks = (asyncio.create_task(wt(event)) for event in self.events)
+ try:
+ await asyncio.gather(*tasks)
+ finally: # May be subject to timeout or cancellation
+ for task in tasks:
+ task.cancel()
+
+ def clear(self):
+ for evt in (x for x in self.events if hasattr(x, 'clear')):
+ evt.clear()
+
+# Minimal switch class having an Event based interface
+class ESwitch:
+ debounce_ms = 50
+
+ def __init__(self, pin, lopen=1): # Default is n/o switch returned to gnd
+ self._pin = pin # Should be initialised for input with pullup
+ self._lopen = lopen # Logic level in "open" state
+ self.open = asyncio.Event()
+ self.close = asyncio.Event()
+ self._state = self._pin() ^ self._lopen # Get initial state
+ asyncio.create_task(self._poll(ESwitch.debounce_ms))
+
+ async def _poll(self, dt): # Poll the button
+ while True:
+ if (s := self._pin() ^ self._lopen) != self._state: # 15μs
+ self._state = s
+ self._cf() if s else self._of()
+ await asyncio.sleep_ms(dt) # Wait out bounce
+
+ def _of(self):
+ self.open.set()
+
+ def _cf(self):
+ self.close.set()
+
+ # ***** API *****
+ # Return current state of switch (0 = pressed)
+ def __call__(self):
+ return self._state
+
+ def deinit(self):
+ self._poll.cancel()
+ self.open.clear()
+ self.close.clear()
+
+# Minimal pushbutton class having an Event based interface
+class EButton:
+ debounce_ms = 50 # Attributes can be varied by user
+ long_press_ms = 1000
+ double_click_ms = 400
+
+ def __init__(self, pin, suppress=False, sense=None):
+ self._pin = pin # Initialise for input
+ self._supp = suppress
+ self._sense = pin() if sense is None else sense
+ self._state = self.rawstate() # Initial logical state
+ self._ltim = Delay_ms(duration = EButton.long_press_ms)
+ self._dtim = Delay_ms(duration = EButton.double_click_ms)
+ self.press = asyncio.Event() # *** API ***
+ self.double = asyncio.Event()
+ self.long = asyncio.Event()
+ self.release = asyncio.Event() # *** END API ***
+ self._tasks = [asyncio.create_task(self._poll(EButton.debounce_ms))] # Tasks run forever. Poll contacts
+ self._tasks.append(asyncio.create_task(self._ltf())) # Handle long press
+ if suppress:
+ self._tasks.append(asyncio.create_task(self._dtf())) # Double timer
+
+ async def _poll(self, dt): # Poll the button
+ while True:
+ if (s := self.rawstate()) != self._state:
+ self._state = s
+ self._pf() if s else self._rf()
+ await asyncio.sleep_ms(dt) # Wait out bounce
+
+ def _pf(self): # Button press
+ if not self._supp:
+ self.press.set() # User event
+ if not self._ltim(): # Don't retrigger long timer if already running
+ self._ltim.trigger()
+ if self._dtim(): # Press occurred while _dtim is running
+ self.double.set() # User event
+ self._dtim.stop() # _dtim's Event is only used if suppress
+ else:
+ self._dtim.trigger()
+
+ def _rf(self): # Button release
+ self._ltim.stop()
+ if not self._supp or not self._dtim(): # If dtim running postpone release otherwise it
+ self.release.set() # is set before press
+
+ async def _ltf(self): # Long timeout
+ while True:
+ await self._ltim.wait()
+ self._ltim.clear() # Clear the event
+ self.long.set() # User event
+
+ async def _dtf(self): # Double timeout (runs if suppress is set)
+ while True:
+ await self._dtim.wait()
+ self._dtim.clear() # Clear the event
+ if not self._ltim(): # Button was released
+ self.press.set() # User events
+ self.release.set()
+
+ # ****** API ******
+ # Current non-debounced logical button state: True == pressed
+ def rawstate(self):
+ return bool(self._pin() ^ self._sense)
+
+ # Current debounced state of button (True == pressed)
+ def __call__(self):
+ return self._state
+
+ def deinit(self):
+ for task in self._tasks:
+ task.cancel()
+ for evt in (self.press, self.double, self.long, self.release):
+ evt.clear()
--- /dev/null
+# pushbutton.py
+
+# Copyright (c) 2018-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+import utime as time
+from . import launch, Delay_ms
+
+try:
+ from machine import TouchPad
+except ImportError:
+ pass
+
+
+class Pushbutton:
+ debounce_ms = 50
+ long_press_ms = 1000
+ double_click_ms = 400
+
+ def __init__(self, pin, suppress=False, sense=None):
+ self._pin = pin # Initialise for input
+ self._supp = suppress
+ self._dblpend = False # Doubleclick waiting for 2nd click
+ self._dblran = False # Doubleclick executed user function
+ self._tf = False
+ self._ff = False
+ self._df = False
+ self._ld = False # Delay_ms instance for long press
+ self._dd = False # Ditto for doubleclick
+ # Convert from electrical to logical value
+ self._sense = pin.value() if sense is None else sense
+ self._state = self.rawstate() # Initial state
+ self._run = asyncio.create_task(self._go()) # Thread runs forever
+
+ async def _go(self):
+ while True:
+ self._check(self.rawstate())
+ # Ignore state changes until switch has settled. Also avoid hogging CPU.
+ # See https://github.com/peterhinch/micropython-async/issues/69
+ await asyncio.sleep_ms(Pushbutton.debounce_ms)
+
+ def _check(self, state):
+ if state == self._state:
+ return
+ # State has changed: act on it now.
+ self._state = state
+ if state: # Button pressed: launch pressed func
+ if self._tf:
+ launch(self._tf, self._ta)
+ if self._ld: # There's a long func: start long press delay
+ self._ld.trigger(Pushbutton.long_press_ms)
+ if self._df:
+ if self._dd(): # Second click: timer running
+ self._dd.stop()
+ self._dblpend = False
+ self._dblran = True # Prevent suppressed launch on release
+ launch(self._df, self._da)
+ else:
+ # First click: start doubleclick timer
+ self._dd.trigger(Pushbutton.double_click_ms)
+ self._dblpend = True # Prevent suppressed launch on release
+ else: # Button release. Is there a release func?
+ if self._ff:
+ if self._supp:
+ d = self._ld
+ # If long delay exists, is running and doubleclick status is OK
+ if not self._dblpend and not self._dblran:
+ if (d and d()) or not d:
+ launch(self._ff, self._fa)
+ else:
+ launch(self._ff, self._fa)
+ if self._ld:
+ self._ld.stop() # Avoid interpreting a second click as a long push
+ self._dblran = False
+
+ def _ddto(self): # Doubleclick timeout: no doubleclick occurred
+ self._dblpend = False
+ if self._supp and not self._state:
+ if not self._ld or (self._ld and not self._ld()):
+ launch(self._ff, self._fa)
+
+ # ****** API ******
+ def press_func(self, func=False, args=()):
+ if func is None:
+ self.press = asyncio.Event()
+ self._tf = self.press.set if func is None else func
+ self._ta = args
+
+ def release_func(self, func=False, args=()):
+ if func is None:
+ self.release = asyncio.Event()
+ self._ff = self.release.set if func is None else func
+ self._fa = args
+
+ def double_func(self, func=False, args=()):
+ if func is None:
+ self.double = asyncio.Event()
+ func = self.double.set
+ self._df = func
+ self._da = args
+ if func: # If double timer already in place, leave it
+ if not self._dd:
+ self._dd = Delay_ms(self._ddto)
+ else:
+ self._dd = False # Clearing down double func
+
+ def long_func(self, func=False, args=()):
+ if func is None:
+ self.long = asyncio.Event()
+ func = self.long.set
+ if func:
+ if self._ld:
+ self._ld.callback(func, args)
+ else:
+ self._ld = Delay_ms(func, args)
+ else:
+ self._ld = False
+
+ # Current non-debounced logical button state: True == pressed
+ def rawstate(self):
+ return bool(self._pin() ^ self._sense)
+
+ # Current debounced state of button (True == pressed)
+ def __call__(self):
+ return self._state
+
+ def deinit(self):
+ self._run.cancel()
+
+
+class ESP32Touch(Pushbutton):
+ thresh = (80 << 8) // 100
+
+ @classmethod
+ def threshold(cls, val):
+ if not (isinstance(val, int) and 0 < val < 100):
+ raise ValueError("Threshold must be in range 1-99")
+ cls.thresh = (val << 8) // 100
+
+ def __init__(self, pin, suppress=False):
+ self._thresh = 0 # Detection threshold
+ self._rawval = 0
+ try:
+ self._pad = TouchPad(pin)
+ except ValueError:
+ raise ValueError(pin) # Let's have a bit of information :)
+ super().__init__(pin, suppress, False)
+
+ # Current logical button state: True == touched
+ def rawstate(self):
+ rv = self._pad.read() # ~220μs
+ if rv > self._rawval: # Either initialisation or pad was touched
+ self._rawval = rv # when initialised and has now been released
+ self._thresh = (rv * ESP32Touch.thresh) >> 8
+ return False # Untouched
+ return rv < self._thresh
--- /dev/null
+# queue.py: adapted from uasyncio V2
+
+# Copyright (c) 2018-2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+# Code is based on Paul Sokolovsky's work.
+# This is a temporary solution until uasyncio V3 gets an efficient official version
+
+import uasyncio as asyncio
+
+
+# Exception raised by get_nowait().
+class QueueEmpty(Exception):
+ pass
+
+
+# Exception raised by put_nowait().
+class QueueFull(Exception):
+ pass
+
+class Queue:
+
+ def __init__(self, maxsize=0):
+ self.maxsize = maxsize
+ self._queue = []
+ self._evput = asyncio.Event() # Triggered by put, tested by get
+ self._evget = asyncio.Event() # Triggered by get, tested by put
+
+ def _get(self):
+ self._evget.set() # Schedule all tasks waiting on get
+ self._evget.clear()
+ return self._queue.pop(0)
+
+ async def get(self): # Usage: item = await queue.get()
+ while self.empty(): # May be multiple tasks waiting on get()
+ # Queue is empty, suspend task until a put occurs
+ # 1st of N tasks gets, the rest loop again
+ await self._evput.wait()
+ return self._get()
+
+ def get_nowait(self): # Remove and return an item from the queue.
+ # Return an item if one is immediately available, else raise QueueEmpty.
+ if self.empty():
+ raise QueueEmpty()
+ return self._get()
+
+ def _put(self, val):
+ self._evput.set() # Schedule tasks waiting on put
+ self._evput.clear()
+ self._queue.append(val)
+
+ async def put(self, val): # Usage: await queue.put(item)
+ while self.full():
+ # Queue full
+ await self._evget.wait()
+ # Task(s) waiting to get from queue, schedule first Task
+ self._put(val)
+
+ def put_nowait(self, val): # Put an item into the queue without blocking.
+ if self.full():
+ raise QueueFull()
+ self._put(val)
+
+ def qsize(self): # Number of items in the queue.
+ return len(self._queue)
+
+ def empty(self): # Return True if the queue is empty, False otherwise.
+ return len(self._queue) == 0
+
+ def full(self): # Return True if there are maxsize items in the queue.
+ # Note: if the Queue was initialized with maxsize=0 (the default) or
+ # any negative number, then full() is never True.
+ return self.maxsize > 0 and self.qsize() >= self.maxsize
--- /dev/null
+# ringbuf_queue.py Provides RingbufQueue class
+
+# Copyright (c) 2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+# API differs from CPython
+# Uses pre-allocated ring buffer: can use list or array
+# Asynchronous iterator allowing consumer to use async for
+# put_nowait QueueFull exception can be ignored allowing oldest data to be discarded.
+
+import uasyncio as asyncio
+
+
+class RingbufQueue: # MicroPython optimised
+ def __init__(self, buf):
+ self._q = buf
+ self._size = len(buf)
+ self._wi = 0
+ self._ri = 0
+ self._evput = asyncio.Event() # Triggered by put, tested by get
+ self._evget = asyncio.Event() # Triggered by get, tested by put
+
+ def full(self):
+ return ((self._wi + 1) % self._size) == self._ri
+
+ def empty(self):
+ return self._ri == self._wi
+
+ def qsize(self):
+ return (self._wi - self._ri) % self._size
+
+ def get_nowait(self): # Remove and return an item from the queue.
+ # Return an item if one is immediately available, else raise QueueEmpty.
+ if self.empty():
+ raise IndexError
+ r = self._q[self._ri]
+ self._ri = (self._ri + 1) % self._size
+ self._evget.set() # Schedule all tasks waiting on ._evget
+ self._evget.clear()
+ return r
+
+ def put_nowait(self, v):
+ self._q[self._wi] = v
+ self._evput.set() # Schedule any tasks waiting on get
+ self._evput.clear()
+ self._wi = (self._wi + 1) % self._size
+ if self._wi == self._ri: # Would indicate empty
+ self._ri = (self._ri + 1) % self._size # Discard a message
+ raise IndexError # Caller can ignore if overwrites are OK
+
+ async def put(self, val): # Usage: await queue.put(item)
+ while self.full(): # Queue full
+ await self._evget.wait() # May be >1 task waiting on ._evget
+ # Task(s) waiting to get from queue, schedule first Task
+ self.put_nowait(val)
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ while self.empty(): # Empty. May be more than one task waiting on ._evput
+ await self._evput.wait()
+ r = self._q[self._ri]
+ self._ri = (self._ri + 1) % self._size
+ self._evget.set() # Schedule all tasks waiting on ._evget
+ self._evget.clear()
+ return r
--- /dev/null
+# semaphore.py
+
+# Copyright (c) 2018-2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+try:
+ import uasyncio as asyncio
+except ImportError:
+ import asyncio
+
+# A Semaphore is typically used to limit the number of coros running a
+# particular piece of code at once. The number is defined in the constructor.
+class Semaphore():
+ def __init__(self, value=1):
+ self._count = value
+ self._event = asyncio.Event()
+
+ async def __aenter__(self):
+ await self.acquire()
+ return self
+
+ async def __aexit__(self, *args):
+ self.release()
+ await asyncio.sleep(0)
+
+ async def acquire(self):
+ self._event.clear()
+ while self._count == 0: # Multiple tasks may be waiting for
+ await self._event.wait() # a release
+ self._event.clear()
+ # When we yield, another task may succeed. In this case
+ await asyncio.sleep(0) # the loop repeats
+ self._count -= 1
+
+ def release(self):
+ self._event.set()
+ self._count += 1
+
+class BoundedSemaphore(Semaphore):
+ def __init__(self, value=1):
+ super().__init__(value)
+ self._initial_value = value
+
+ def release(self):
+ if self._count < self._initial_value:
+ super().release()
+ else:
+ raise ValueError('Semaphore released more than acquired')
--- /dev/null
+# switch.py
+
+# Copyright (c) 2018-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+import utime as time
+from . import launch
+
+class Switch:
+ debounce_ms = 50
+ def __init__(self, pin):
+ self.pin = pin # Should be initialised for input with pullup
+ self._open_func = False
+ self._close_func = False
+ self.switchstate = self.pin.value() # Get initial state
+ self._run = asyncio.create_task(self.switchcheck()) # Thread runs forever
+
+ def open_func(self, func, args=()):
+ if func is None:
+ self.open = asyncio.Event()
+ self._open_func = self.open.set if func is None else func
+ self._open_args = args
+
+ def close_func(self, func, args=()):
+ if func is None:
+ self.close = asyncio.Event()
+ self._close_func = self.close.set if func is None else func
+ self._close_args = args
+
+ # Return current state of switch (0 = pressed)
+ def __call__(self):
+ return self.switchstate
+
+ async def switchcheck(self):
+ while True:
+ state = self.pin.value()
+ if state != self.switchstate:
+ # State has changed: act on it now.
+ self.switchstate = state
+ if state == 0 and self._close_func:
+ launch(self._close_func, self._close_args)
+ elif state == 1 and self._open_func:
+ launch(self._open_func, self._open_args)
+ # Ignore further state changes until switch has settled
+ await asyncio.sleep_ms(Switch.debounce_ms)
+
+ def deinit(self):
+ self._run.cancel()
--- /dev/null
+# adctest.py
+
+# Copyright (c) 2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+from machine import ADC
+import pyb
+from primitives import AADC
+
+async def signal(): # Could use write_timed but this prints values
+ dac = pyb.DAC(1, bits=12, buffering=True)
+ v = 0
+ while True:
+ if not v & 0xf:
+ print('write', v << 4) # Make value u16 as per ADC read
+ dac.write(v)
+ v += 1
+ v %= 4096
+ await asyncio.sleep_ms(50)
+
+async def adctest():
+ asyncio.create_task(signal())
+ adc = AADC(ADC(pyb.Pin.board.X1))
+ await asyncio.sleep(0)
+ adc.sense(normal=False) # Wait until ADC gets to 5000
+ value = await adc(5000, 10000)
+ print('Received', value, adc.read_u16(True)) # Reduce to 12 bits
+ adc.sense(normal=True) # Now print all changes > 2000
+ while True:
+ value = await adc(2000) # Trigger if value changes by 2000
+ print('Received', value, adc.read_u16(True))
+
+st = '''This test requires a Pyboard with pins X1 and X5 linked.
+A sawtooth waveform is applied to the ADC. Initially the test waits
+until the ADC value reaches 5000. It then reports whenever the value
+changes by 2000.
+Issue test() to start.
+'''
+print(st)
+
+def test():
+ try:
+ asyncio.run(adctest())
+ except KeyboardInterrupt:
+ print('Interrupted')
+ finally:
+ asyncio.new_event_loop()
+ print()
+ print(st)
--- /dev/null
+# asyntest.py Test/demo of the 'micro' Event, Barrier and Semaphore classes
+# Test/demo of official asyncio library and official Lock class
+
+# Copyright (c) 2017-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+# CPython 3.8 compatibility
+# (ignore RuntimeWarning: coroutine '_g' was never awaited)
+# To run:
+# from primitives.tests.asyntest import test
+
+try:
+ import uasyncio as asyncio
+except ImportError:
+ import asyncio
+import sys
+unix = "linux" in sys.implementation._machine
+
+from primitives import Barrier, Semaphore, BoundedSemaphore, Condition, Queue, RingbufQueue
+try:
+ from threadsafe import Message
+except:
+ pass
+
+def print_tests():
+ st = '''Available functions:
+test(0) Print this list.
+test(1) Test message acknowledge.
+test(2) Test Message and Lock objects.
+test(3) Test the Barrier class with callback.
+test(4) Test the Barrier class with coroutine.
+test(5) Test Semaphore
+test(6) Test BoundedSemaphore.
+test(7) Test the Condition class.
+test(8) Test the Queue class.
+test(9) Test the RingbufQueue class.
+'''
+ print('\x1b[32m')
+ print(st)
+ print('\x1b[39m')
+
+print_tests()
+
+def printexp(exp, runtime=0):
+ print('Expected output:')
+ print('\x1b[32m')
+ print(exp)
+ print('\x1b[39m')
+ if runtime:
+ print('Running (runtime = {}s):'.format(runtime))
+ else:
+ print('Running (runtime < 1s):')
+
+# ************ Test Message class ************
+# Demo use of acknowledge message
+
+async def message_wait(message, ack_message, n):
+ try:
+ await message
+ print(f'message_wait {n} got message: {message.value()}')
+ if ack_message is not None:
+ ack_message.set()
+ except asyncio.CancelledError:
+ print(f"message_wait {n} cancelled")
+
+async def run_ack(n):
+ message = Message()
+ ack1 = Message()
+ ack2 = Message()
+ for count in range(n):
+ t0 = asyncio.create_task(message_wait(message, ack1, 1))
+ t1 = asyncio.create_task(message_wait(message, ack2, 2))
+ message.set(count)
+ print('message was set')
+ await ack1
+ ack1.clear()
+ print('Cleared ack1')
+ await ack2
+ ack2.clear()
+ print('Cleared ack2')
+ message.clear()
+ print('Cleared message')
+ await asyncio.sleep(1)
+ t0.cancel()
+ t1.cancel()
+
+async def msg_send(msg, items):
+ for item in items:
+ await asyncio.sleep_ms(400)
+ msg.set(item)
+
+async def msg_recv(msg): # Receive using asynchronous iterator
+ async for data in msg:
+ print("Got", data)
+ msg.clear()
+
+async def ack_coro():
+ print("Test multiple tasks waiting on a message.")
+ await run_ack(3)
+ print()
+ print("Test asynchronous iterator.")
+ msg = Message()
+ asyncio.create_task(msg_send(msg, (1, 2, 3)))
+ try:
+ await asyncio.wait_for(msg_recv(msg), 3)
+ except asyncio.TimeoutError:
+ pass
+ await asyncio.sleep(1)
+ print()
+ print("Test cancellation of first waiting task.")
+ t1 = asyncio.create_task(message_wait(msg, None, 1))
+ t2 = asyncio.create_task(message_wait(msg, None, 2))
+ await asyncio.sleep(1)
+ t1.cancel()
+ await asyncio.sleep(1)
+ print("Setting message")
+ msg.set("Test message")
+ await asyncio.sleep(1) # Tasks have ended or been cancelled
+ msg.clear()
+ print()
+ print("Test cancellation of second waiting task.")
+ t1 = asyncio.create_task(message_wait(msg, None, 1))
+ t2 = asyncio.create_task(message_wait(msg, None, 2))
+ await asyncio.sleep(1)
+ t2.cancel()
+ await asyncio.sleep(1)
+ print("Setting message")
+ msg.set("Test message")
+ await asyncio.sleep(1)
+ msg.clear()
+
+ print("I've seen attack ships burn on the shoulder of Orion...")
+ print("Time to die...")
+
+def ack_test():
+ if unix:
+ print("Message class is incompatible with Unix build.")
+ return
+ printexp('''Running (runtime = 12s):
+Test multiple tasks waiting on a message.
+message was set
+message_wait 1 got message: 0
+message_wait 2 got message: 0
+Cleared ack1
+Cleared ack2
+Cleared message
+message was set
+message_wait 1 got message: 1
+message_wait 2 got message: 1
+Cleared ack1
+Cleared ack2
+Cleared message
+message was set
+message_wait 1 got message: 2
+message_wait 2 got message: 2
+Cleared ack1
+Cleared ack2
+Cleared message
+
+Test asynchronous iterator.
+Got 1
+Got 2
+Got 3
+
+Test cancellation of first waiting task.
+message_wait 1 cancelled
+Setting message
+message_wait 2 got message: Test message
+
+Test cancellation of second waiting task.
+message_wait 2 cancelled
+Setting message
+message_wait 1 got message: Test message
+I've seen attack ships burn on the shoulder of Orion...
+Time to die...
+''', 12)
+ asyncio.run(ack_coro())
+
+# ************ Test Lock and Message classes ************
+
+async def run_lock(n, lock):
+ print('run_lock {} waiting for lock'.format(n))
+ await lock.acquire()
+ print('run_lock {} acquired lock'.format(n))
+ await asyncio.sleep(1) # Delay to demo other coros waiting for lock
+ lock.release()
+ print('run_lock {} released lock'.format(n))
+
+async def messageset(message):
+ print('Waiting 5 secs before setting message')
+ await asyncio.sleep(5)
+ message.set()
+ print('message was set')
+
+async def messagewait(message):
+ print('waiting for message')
+ await message
+ print('got message')
+ message.clear()
+
+async def run_message_test():
+ print('Test Lock class')
+ lock = asyncio.Lock()
+ asyncio.create_task(run_lock(1, lock))
+ asyncio.create_task(run_lock(2, lock))
+ asyncio.create_task(run_lock(3, lock))
+ print('Test Message class')
+ message = Message()
+ asyncio.create_task(messageset(message))
+ await messagewait(message) # run_message_test runs fast until this point
+ print('Message status {}'.format('Incorrect' if message.is_set() else 'OK'))
+ print('Tasks complete')
+
+def msg_test():
+ if unix:
+ print("Message class is incompatible with Unix build.")
+ return
+ printexp('''Test Lock class
+Test Message class
+waiting for message
+run_lock 1 waiting for lock
+run_lock 1 acquired lock
+run_lock 2 waiting for lock
+run_lock 3 waiting for lock
+Waiting 5 secs before setting message
+run_lock 1 released lock
+run_lock 2 acquired lock
+run_lock 2 released lock
+run_lock 3 acquired lock
+run_lock 3 released lock
+message was set
+got message
+Message status OK
+Tasks complete
+''', 5)
+ asyncio.run(run_message_test())
+
+# ************ Barrier test ************
+
+async def killer(duration):
+ await asyncio.sleep(duration)
+
+def callback(text):
+ print(text)
+
+async def report(barrier):
+ for i in range(5):
+ print('{} '.format(i), end='')
+ await barrier
+
+async def do_barrier_test():
+ barrier = Barrier(3, callback, ('Synch',))
+ for _ in range(2):
+ for _ in range(3):
+ asyncio.create_task(report(barrier))
+ await asyncio.sleep(1)
+ print()
+ await asyncio.sleep(1)
+
+def barrier_test():
+ printexp('''Running (runtime = 3s):
+0 0 0 Synch
+1 1 1 Synch
+2 2 2 Synch
+3 3 3 Synch
+4 4 4 Synch
+
+1 1 1 Synch
+2 2 2 Synch
+3 3 3 Synch
+4 4 4 Synch
+''', 3)
+ asyncio.run(do_barrier_test())
+
+# ************ Barrier test 1 ************
+
+async def my_coro(text):
+ try:
+ await asyncio.sleep_ms(0)
+ while True:
+ await asyncio.sleep(1)
+ print(text)
+ except asyncio.CancelledError:
+ print('my_coro was cancelled.')
+
+async def report1(barrier, x):
+ await asyncio.sleep(x)
+ print('report instance', x, 'waiting')
+ await barrier
+ print('report instance', x, 'done')
+
+async def bart():
+ barrier = Barrier(4, my_coro, ('my_coro running',))
+ for x in range(3):
+ asyncio.create_task(report1(barrier, x))
+ await asyncio.sleep(4)
+ assert barrier.busy()
+ await barrier
+ await asyncio.sleep(0)
+ assert not barrier.busy()
+ # Must yield before reading result(). Here we wait long enough for
+ await asyncio.sleep_ms(1500) # coro to print
+ barrier.result().cancel()
+ await asyncio.sleep(2)
+
+def barrier_test1():
+ printexp('''Running (runtime = 5s):
+report instance 0 waiting
+report instance 1 waiting
+report instance 2 waiting
+report instance 2 done
+report instance 1 done
+report instance 0 done
+my_coro running
+my_coro was cancelled.
+
+Exact report instance done sequence may vary, but 3 instances should report
+done before my_coro runs.
+''', 5)
+ asyncio.run(bart())
+
+# ************ Semaphore test ************
+
+async def run_sema(n, sema, barrier):
+ print('run_sema {} trying to access semaphore'.format(n))
+ async with sema:
+ print('run_sema {} acquired semaphore'.format(n))
+ # Delay demonstrates other coros waiting for semaphore
+ await asyncio.sleep(1 + n/10) # n/10 ensures deterministic printout
+ print('run_sema {} has released semaphore'.format(n))
+ barrier.trigger()
+
+async def run_sema_test(bounded):
+ num_coros = 5
+ barrier = Barrier(num_coros + 1)
+ if bounded:
+ semaphore = BoundedSemaphore(3)
+ else:
+ semaphore = Semaphore(3)
+ for n in range(num_coros):
+ asyncio.create_task(run_sema(n, semaphore, barrier))
+ await barrier # Quit when all coros complete
+ try:
+ semaphore.release()
+ except ValueError:
+ print('Bounded semaphore exception test OK')
+
+def semaphore_test(bounded=False):
+ if bounded:
+ exp = '''run_sema 0 trying to access semaphore
+run_sema 0 acquired semaphore
+run_sema 1 trying to access semaphore
+run_sema 1 acquired semaphore
+run_sema 2 trying to access semaphore
+run_sema 2 acquired semaphore
+run_sema 3 trying to access semaphore
+run_sema 4 trying to access semaphore
+run_sema 0 has released semaphore
+run_sema 4 acquired semaphore
+run_sema 1 has released semaphore
+run_sema 3 acquired semaphore
+run_sema 2 has released semaphore
+run_sema 4 has released semaphore
+run_sema 3 has released semaphore
+Bounded semaphore exception test OK
+
+Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
+ else:
+ exp = '''run_sema 0 trying to access semaphore
+run_sema 0 acquired semaphore
+run_sema 1 trying to access semaphore
+run_sema 1 acquired semaphore
+run_sema 2 trying to access semaphore
+run_sema 2 acquired semaphore
+run_sema 3 trying to access semaphore
+run_sema 4 trying to access semaphore
+run_sema 0 has released semaphore
+run_sema 3 acquired semaphore
+run_sema 1 has released semaphore
+run_sema 4 acquired semaphore
+run_sema 2 has released semaphore
+run_sema 3 has released semaphore
+run_sema 4 has released semaphore
+
+Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
+ printexp(exp, 3)
+ asyncio.run(run_sema_test(bounded))
+
+# ************ Condition test ************
+
+cond = Condition()
+tim = 0
+
+async def cond01():
+ while True:
+ await asyncio.sleep(2)
+ with await cond:
+ cond.notify(2) # Notify 2 tasks
+
+async def cond03(): # Maintain a count of seconds
+ global tim
+ await asyncio.sleep(0.5)
+ while True:
+ await asyncio.sleep(1)
+ tim += 1
+
+async def cond02(n, barrier):
+ with await cond:
+ print('cond02', n, 'Awaiting notification.')
+ await cond.wait()
+ print('cond02', n, 'triggered. tim =', tim)
+ barrier.trigger()
+
+def predicate():
+ return tim >= 8 # 12
+
+async def cond04(n, barrier):
+ with await cond:
+ print('cond04', n, 'Awaiting notification and predicate.')
+ await cond.wait_for(predicate)
+ print('cond04', n, 'triggered. tim =', tim)
+ barrier.trigger()
+
+async def cond_go():
+ ntasks = 7
+ barrier = Barrier(ntasks + 1)
+ t1 = asyncio.create_task(cond01())
+ t3 = asyncio.create_task(cond03())
+ for n in range(ntasks):
+ asyncio.create_task(cond02(n, barrier))
+ await barrier # All instances of cond02 have completed
+ # Test wait_for
+ barrier = Barrier(2)
+ asyncio.create_task(cond04(99, barrier))
+ await barrier
+ # cancel continuously running coros.
+ t1.cancel()
+ t3.cancel()
+ await asyncio.sleep_ms(0)
+ print('Done.')
+
+def condition_test():
+ printexp('''cond02 0 Awaiting notification.
+cond02 1 Awaiting notification.
+cond02 2 Awaiting notification.
+cond02 3 Awaiting notification.
+cond02 4 Awaiting notification.
+cond02 5 Awaiting notification.
+cond02 6 Awaiting notification.
+cond02 5 triggered. tim = 1
+cond02 6 triggered. tim = 1
+cond02 3 triggered. tim = 3
+cond02 4 triggered. tim = 3
+cond02 1 triggered. tim = 5
+cond02 2 triggered. tim = 5
+cond02 0 triggered. tim = 7
+cond04 99 Awaiting notification and predicate.
+cond04 99 triggered. tim = 9
+Done.
+''', 13)
+ asyncio.run(cond_go())
+
+# ************ Queue test ************
+
+async def slow_process():
+ await asyncio.sleep(2)
+ return 42
+
+async def bar(q):
+ print('Waiting for slow process.')
+ result = await slow_process()
+ print('Putting result onto queue')
+ await q.put(result) # Put result on q
+
+async def foo(q):
+ print("Running foo()")
+ result = await q.get()
+ print('Result was {}'.format(result))
+
+async def q_put(n, q):
+ for x in range(8):
+ obj = (n, x)
+ await q.put(obj)
+ await asyncio.sleep(0)
+
+async def q_get(n, q):
+ for x in range(8):
+ await q.get()
+ await asyncio.sleep(0)
+
+async def putter(q):
+ # put some item, then sleep
+ for _ in range(20):
+ await q.put(1)
+ await asyncio.sleep_ms(50)
+
+
+async def getter(q):
+ # checks for new items, and relies on the "blocking" of the get method
+ for _ in range(20):
+ await q.get()
+
+async def queue_go():
+ q = Queue(10)
+ asyncio.create_task(foo(q))
+ asyncio.create_task(bar(q))
+ await asyncio.sleep(3)
+ for n in range(4):
+ asyncio.create_task(q_put(n, q))
+ await asyncio.sleep(1)
+ assert q.qsize() == 10
+ await q.get()
+ await asyncio.sleep(0.1)
+ assert q.qsize() == 10
+ while not q.empty():
+ await q.get()
+ await asyncio.sleep(0.1)
+ assert q.empty()
+ print('Competing put tasks test complete')
+
+ for n in range(4):
+ asyncio.create_task(q_get(n, q))
+ await asyncio.sleep(1)
+ x = 0
+ while not q.full():
+ await q.put(x)
+ await asyncio.sleep(0.3)
+ x += 1
+ assert q.qsize() == 10
+ print('Competing get tasks test complete')
+ await asyncio.gather(
+ putter(q),
+ getter(q)
+ )
+ print('Queue tests complete')
+ print("I've seen attack ships burn off the shoulder of Orion...")
+ print("Time to die...")
+
+def queue_test():
+ printexp('''Running (runtime = 20s):
+Running foo()
+Waiting for slow process.
+Putting result onto queue
+Result was 42
+Competing put tasks test complete
+Competing get tasks test complete
+Queue tests complete
+
+
+I've seen attack ships burn off the shoulder of Orion...
+Time to die...
+
+''', 20)
+ asyncio.run(queue_go())
+
+# ************ RingbufQueue test ************
+
+async def qread(q, lst, twr):
+ async for item in q:
+ lst.append(item)
+ await asyncio.sleep_ms(twr)
+
+async def read(q, t, twr=0):
+ lst = []
+ try:
+ await asyncio.wait_for(qread(q, lst, twr), t)
+ except asyncio.TimeoutError:
+ pass
+ return lst
+
+async def put_list(q, lst, twp=0):
+ for item in lst:
+ await q.put(item)
+ await asyncio.sleep_ms(twp)
+
+async def rbq_go():
+ q = RingbufQueue([0 for _ in range(10)]) # 10 elements
+ pl = [n for n in range(15)]
+ print("Read waits on slow write.")
+ asyncio.create_task(put_list(q, pl, 100))
+ rl = await read(q, 2)
+ assert pl == rl
+ print('done')
+ print("Write waits on slow read.")
+ asyncio.create_task(put_list(q, pl))
+ rl = await read(q, 2, 100)
+ assert pl == rl
+ print('done')
+ print("Testing full, empty and qsize methods.")
+ assert q.empty()
+ assert q.qsize() == 0
+ assert not q.full()
+ await put_list(q, (1,2,3))
+ assert not q.empty()
+ assert q.qsize() == 3
+ assert not q.full()
+ print("Done")
+ print("Testing put_nowait and overruns.")
+ nfail = 0
+ for x in range(4, 15):
+ try:
+ q.put_nowait(x)
+ except IndexError:
+ nfail += 1
+ assert nfail == 5
+ assert q.full()
+ rl = await read(q, 2)
+ assert rl == [6, 7, 8, 9, 10, 11, 12, 13, 14]
+ print("Testing get_nowait.")
+ await q.put(1)
+ assert q.get_nowait() == 1
+ err = 0
+ try:
+ q.get_nowait()
+ except IndexError:
+ err = 1
+ assert err == 1
+ print("Tests complete.")
+ print("I've seen attack ships burn off the shoulder of Orion...")
+ print("Time to die...")
+
+def rbq_test():
+ printexp('''Running (runtime = 6s):
+Read waits on slow write.
+done
+Write waits on slow read.
+done
+Testing full, empty and qsize methods.
+Done
+Testing put_nowait and overruns.
+Testing get_nowait.
+Tests complete.
+I've seen attack ships burn off the shoulder of Orion...
+Time to die...
+
+''', 6)
+ asyncio.run(rbq_go())
+
+# ************ ************
+def test(n):
+ try:
+ if n == 1:
+ ack_test() # Test message acknowledge.
+ elif n == 2:
+ msg_test() # Test Messge and Lock objects.
+ elif n == 3:
+ barrier_test() # Test the Barrier class.
+ elif n == 4:
+ barrier_test1() # Test the Barrier class.
+ elif n == 5:
+ semaphore_test(False) # Test Semaphore
+ elif n == 6:
+ semaphore_test(True) # Test BoundedSemaphore.
+ elif n == 7:
+ condition_test() # Test the Condition class.
+ elif n == 8:
+ queue_test() # Test the Queue class.
+ elif n == 9:
+ rbq_test() # Test the RingbufQueue class.
+ except KeyboardInterrupt:
+ print('Interrupted')
+ finally:
+ asyncio.new_event_loop()
+ print_tests()
--- /dev/null
+# delay_test.py Tests for Delay_ms class
+
+# Copyright (c) 2020 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+import uasyncio as asyncio
+import micropython
+from primitives.delay_ms import Delay_ms
+
+micropython.alloc_emergency_exception_buf(100)
+
+def printexp(exp, runtime=0):
+ print('Expected output:')
+ print('\x1b[32m')
+ print(exp)
+ print('\x1b[39m')
+ if runtime:
+ print('Running (runtime = {}s):'.format(runtime))
+ else:
+ print('Running (runtime < 1s):')
+
+async def ctor_test(): # Constructor arg
+ s = '''
+Trigger 5 sec delay
+Retrigger 5 sec delay
+Callback should run
+cb callback
+Done
+'''
+ printexp(s, 12)
+ def cb(v):
+ print('cb', v)
+
+ d = Delay_ms(cb, ('callback',), duration=5000)
+
+ print('Trigger 5 sec delay')
+ d.trigger()
+ await asyncio.sleep(4)
+ print('Retrigger 5 sec delay')
+ d.trigger()
+ await asyncio.sleep(4)
+ print('Callback should run')
+ await asyncio.sleep(2)
+ print('Done')
+
+async def launch_test():
+ s = '''
+Trigger 5 sec delay
+Coroutine should run: run to completion.
+Coroutine starts
+Coroutine ends
+Coroutine should run: test cancellation.
+Coroutine starts
+Coroutine should run: test awaiting.
+Coroutine starts
+Coroutine ends
+Done
+'''
+ printexp(s, 20)
+ async def cb(v, ms):
+ print(v, 'starts')
+ await asyncio.sleep_ms(ms)
+ print(v, 'ends')
+
+ d = Delay_ms(cb, ('coroutine', 1000))
+
+ print('Trigger 5 sec delay')
+ d.trigger(5000) # Test extending time
+ await asyncio.sleep(4)
+ print('Coroutine should run: run to completion.')
+ await asyncio.sleep(3)
+ d = Delay_ms(cb, ('coroutine', 3000))
+ d.trigger(5000)
+ await asyncio.sleep(4)
+ print('Coroutine should run: test cancellation.')
+ await asyncio.sleep(2)
+ coro = d.rvalue()
+ coro.cancel()
+ d.trigger(5000)
+ await asyncio.sleep(4)
+ print('Coroutine should run: test awaiting.')
+ await asyncio.sleep(2)
+ coro = d.rvalue()
+ await coro
+ print('Done')
+
+
+async def reduce_test(): # Test reducing a running delay
+ s = '''
+Trigger 5 sec delay
+Callback should run
+cb callback
+Callback should run
+cb callback
+Done
+'''
+ printexp(s, 11)
+ def cb(v):
+ print('cb', v)
+
+ d = Delay_ms(cb, ('callback',))
+
+ print('Trigger 5 sec delay')
+ d.trigger(5000) # Test extending time
+ await asyncio.sleep(4)
+ print('Callback should run')
+ await asyncio.sleep(2)
+ d.trigger(10000)
+ await asyncio.sleep(1)
+ d.trigger(3000)
+ await asyncio.sleep(2)
+ print('Callback should run')
+ await asyncio.sleep(2)
+ print('Done')
+
+
+async def stop_test(): # Test the .stop and .running methods
+ s = '''
+Trigger 5 sec delay
+Running
+Callback should run
+cb callback
+Callback returned 42
+Callback should not run
+Done
+ '''
+ printexp(s, 12)
+ def cb(v):
+ print('cb', v)
+ return 42
+
+ d = Delay_ms(cb, ('callback',))
+
+ print('Trigger 5 sec delay')
+ d.trigger(5000) # Test extending time
+ await asyncio.sleep(4)
+ if d():
+ print('Running')
+ print('Callback should run')
+ await asyncio.sleep(2)
+ print('Callback returned', d.rvalue())
+ d.trigger(3000)
+ await asyncio.sleep(1)
+ d.stop()
+ await asyncio.sleep(1)
+ if d():
+ print('Running')
+ print('Callback should not run')
+ await asyncio.sleep(4)
+ print('Done')
+
+
+async def isr_test(): # Test trigger from hard ISR
+ from pyb import Timer
+ s = '''
+Timer holds off cb for 5 secs
+cb should now run
+cb callback
+Done
+'''
+ printexp(s, 6)
+ def cb(v):
+ print('cb', v)
+
+ d = Delay_ms(cb, ('callback',))
+
+ def timer_cb(_):
+ d.trigger(200)
+ tim = Timer(1, freq=10, callback=timer_cb)
+
+ print('Timer holds off cb for 5 secs')
+ await asyncio.sleep(5)
+ tim.deinit()
+ print('cb should now run')
+ await asyncio.sleep(1)
+ print('Done')
+
+async def err_test(): # Test triggering de-initialised timer
+ s = '''
+Running (runtime = 3s):
+Trigger 1 sec delay
+cb callback
+Success: error was raised.
+Done
+ '''
+ printexp(s, 3)
+ def cb(v):
+ print('cb', v)
+ return 42
+
+ d = Delay_ms(cb, ('callback',))
+
+ print('Trigger 1 sec delay')
+ d.trigger(1000)
+ await asyncio.sleep(2)
+ d.deinit()
+ try:
+ d.trigger(1000)
+ except RuntimeError:
+ print("Success: error was raised.")
+ print('Done')
+
+av = '''
+Run a test by issuing
+delay_test.test(n)
+where n is a test number. Avaliable tests:
+\x1b[32m
+0 Test triggering from a hard ISR (Pyboard only)
+1 Test the .stop method and callback return value.
+2 Test reducing the duration of a running timer
+3 Test delay defined by constructor arg
+4 Test triggering a Task
+5 Attempt to trigger de-initialised instance
+\x1b[39m
+'''
+print(av)
+
+tests = (isr_test, stop_test, reduce_test, ctor_test, launch_test, err_test)
+def test(n=0):
+ try:
+ asyncio.run(tests[n]())
+ finally:
+ asyncio.new_event_loop()
--- /dev/null
+# encoder_stop.py Demo of callback which occurs after motion has stopped.
+
+from machine import Pin
+import uasyncio as asyncio
+from primitives.encoder import Encoder
+from primitives.delay_ms import Delay_ms
+
+px = Pin('X1', Pin.IN, Pin.PULL_UP)
+py = Pin('X2', Pin.IN, Pin.PULL_UP)
+
+tim = Delay_ms(duration=400) # High value for test
+d = 0
+
+def tcb(pos, delta): # User callback gets args of encoder cb
+ global d
+ d = 0
+ print(pos, delta)
+
+def cb(pos, delta): # Encoder callback occurs rapidly
+ global d
+ tim.trigger() # Postpone the user callback
+ tim.callback(tcb, (pos, d := d + delta)) # and update its args
+
+async def main():
+ while True:
+ await asyncio.sleep(1)
+
+def test():
+ print('Running encoder test. Press ctrl-c to teminate.')
+ Encoder.delay = 0 # No need for this delay
+ enc = Encoder(px, py, callback=cb)
+ try:
+ asyncio.run(main())
+ except KeyboardInterrupt:
+ print('Interrupted')
+ finally:
+ asyncio.new_event_loop()
+
+test()
--- /dev/null
+# encoder_test.py Test for asynchronous driver for incremental quadrature encoder.
+
+# Copyright (c) 2021-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+from machine import Pin
+import uasyncio as asyncio
+from primitives.encoder import Encoder
+
+
+px = Pin(33, Pin.IN, Pin.PULL_UP)
+py = Pin(25, Pin.IN, Pin.PULL_UP)
+
+def cb(pos, delta):
+ print(pos, delta)
+
+async def main():
+ while True:
+ await asyncio.sleep(1)
+
+def test():
+ print('Running encoder test. Press ctrl-c to teminate.')
+ enc = Encoder(px, py, v=0, vmin=0, vmax=100, callback=cb)
+ try:
+ asyncio.run(main())
+ except KeyboardInterrupt:
+ print('Interrupted')
+ finally:
+ asyncio.new_event_loop()
+
+test()
--- /dev/null
+# event_test.py Test WaitAll, WaitAny, ESwwitch, EButton
+
+# Copyright (c) 2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+
+# from primitives.tests.event_test import *
+
+import uasyncio as asyncio
+from primitives import Delay_ms, WaitAny, ESwitch, WaitAll, EButton
+from pyb import Pin
+
+events = [asyncio.Event() for _ in range(4)]
+
+async def set_events(*ev):
+ for n in ev:
+ await asyncio.sleep(1)
+ print("Setting", n)
+ events[n].set()
+
+def clear(msg):
+ print(msg)
+ for e in events:
+ e.clear()
+
+async def can(obj, tim):
+ await asyncio.sleep(tim)
+ print("About to cancel")
+ obj.cancel()
+
+async def foo(tsk):
+ print("Waiting")
+ await tsk
+
+async def wait_test():
+ msg = """
+\x1b[32m
+Expected output:
+Setting 0
+Tested WaitAny 0
+Setting 1
+Tested WaitAny 1
+Setting 2
+Setting 3
+Tested WaitAll 2, 3
+Setting 0
+Setting 3
+Tested WaitAny 0, 3
+Cancel in 3s
+Setting 0
+Setting 1
+About to cancel
+Cancelled.
+Waiting for 4s
+Timeout
+done
+\x1b[39m
+"""
+ print(msg)
+ wa = WaitAny((events[0], events[1], WaitAll((events[2], events[3]))))
+ asyncio.create_task(set_events(0))
+ await wa.wait()
+ clear("Tested WaitAny 0")
+ asyncio.create_task(set_events(1))
+ await wa.wait()
+ clear("Tested WaitAny 1")
+ asyncio.create_task(set_events(2, 3))
+ await wa.wait()
+ clear("Tested WaitAll 2, 3")
+ wa = WaitAll((WaitAny((events[0], events[1])), WaitAny((events[2], events[3]))))
+ asyncio.create_task(set_events(0, 3))
+ await wa.wait()
+ clear("Tested WaitAny 0, 3")
+ task = asyncio.create_task(wa.wait())
+ asyncio.create_task(set_events(0, 1)) # Does nothing
+ asyncio.create_task(can(task, 3))
+ print("Cancel in 3s")
+ try:
+ await task
+ except asyncio.CancelledError: # TODO why must we trap this?
+ print("Cancelled.")
+ print("Waiting for 4s")
+ try:
+ await asyncio.wait_for(wa.wait(), 4)
+ except asyncio.TimeoutError:
+ print("Timeout")
+ print("done")
+
+val = 0
+fail = False
+pout = None
+polarity = 0
+
+async def monitor(evt, v, verbose):
+ global val
+ while True:
+ await evt.wait()
+ evt.clear()
+ val += v
+ verbose and print("Got", hex(v), hex(val))
+
+async def pulse(ms=100):
+ pout(1 ^ polarity)
+ await asyncio.sleep_ms(ms)
+ pout(polarity)
+
+def expect(v, e):
+ global fail
+ if v == e:
+ print("Pass")
+ else:
+ print(f"Fail: expected {e} got {v}")
+ fail = True
+
+async def btest(btn, verbose, supp):
+ global val, fail
+ val = 0
+ events = btn.press, btn.release, btn.double, btn.long
+ tasks = []
+ for n, evt in enumerate(events):
+ tasks.append(asyncio.create_task(monitor(evt, 1 << 3 * n, verbose)))
+ await asyncio.sleep(1)
+ print("Start short press test")
+ await pulse()
+ await asyncio.sleep(1)
+ verbose and print("Test of short press", hex(val))
+ expect(val, 0x09)
+ val = 0
+ await asyncio.sleep(1)
+ print("Start long press test")
+ await pulse(2000)
+ await asyncio.sleep(4)
+ verbose and print("Long press", hex(val))
+ exp = 0x208 if supp else 0x209
+ expect(val, exp)
+ val = 0
+ await asyncio.sleep(1)
+ print("Start double press test")
+ await pulse()
+ await asyncio.sleep_ms(100)
+ await pulse()
+ await asyncio.sleep(4)
+ verbose and print("Double press", hex(val))
+ exp = 0x48 if supp else 0x52
+ expect(val, exp)
+ for task in tasks:
+ task.cancel()
+
+async def stest(sw, verbose):
+ global val, fail
+ val = 0
+ events = sw.open, sw.close
+ tasks = []
+ for n, evt in enumerate(events):
+ tasks.append(asyncio.create_task(monitor(evt, 1 << 3 * n, verbose)))
+ asyncio.create_task(pulse(2000))
+ await asyncio.sleep(1)
+ expect(val, 0x08)
+ await asyncio.sleep(4) # Wait for any spurious events
+ verbose and print("Switch close and open", hex(val))
+ expect(val, 0x09)
+ for task in tasks:
+ task.cancel()
+
+async def switch_test(pol, verbose):
+ global val, pout, polarity
+ polarity = pol
+ pin = Pin('Y1', Pin.IN)
+ pout = Pin('Y2', Pin.OUT, value=pol)
+ print("Testing EButton.")
+ print("suppress == False")
+ btn = EButton(pin)
+ await btest(btn, verbose, False)
+ print("suppress == True")
+ btn = EButton(pin, suppress=True)
+ await btest(btn, verbose, True)
+ print("Testing ESwitch")
+ sw = ESwitch(pin, pol)
+ await stest(sw, verbose)
+ print("Failures occurred.") if fail else print("All tests passed.")
+
+def tests():
+ txt="""
+ \x1b[32m
+ Available tests:
+ 1. test_switches(polarity=1, verbose=False) Test the ESwitch and Ebutton classe.
+ 2. test_wait() Test the WaitAny and WaitAll primitives.
+
+ Switch tests assume a Pyboard with a link between Y1 and Y2.
+ \x1b[39m
+ """
+ print(txt)
+
+tests()
+def test_switches(polarity=1, verbose=False):
+ try:
+ asyncio.run(switch_test(polarity, verbose)) # polarity 1/0 is normal (off) electrical state.
+ finally:
+ asyncio.new_event_loop()
+ tests()
+
+def test_wait():
+ try:
+ asyncio.run(wait_test())
+ finally:
+ asyncio.new_event_loop()
+ tests()
--- /dev/null
+# Test/demo programs for Switch and Pushbutton classes
+# Tested on Pyboard but should run on other microcontroller platforms
+# running MicroPython with uasyncio library.
+
+# Copyright (c) 2018-2022 Peter Hinch
+# Released under the MIT License (MIT) - see LICENSE file
+# Now executes .deinit()
+
+# To run:
+# from primitives.tests.switches import *
+# test_sw() # For example
+
+from machine import Pin
+from pyb import LED
+from primitives import Switch, Pushbutton
+import uasyncio as asyncio
+
+helptext = '''
+Test using switch or pushbutton between X1 and gnd.
+Ground pin X2 to terminate test.
+
+'''
+tests = '''
+\x1b[32m
+Available tests:
+test_sw Switch test.
+test_swcb Switch with callback.
+test_sw_event Switch with event.
+test_btn Pushutton launching coros.
+test_btncb Pushbutton launching callbacks.
+btn_dynamic Change coros launched at runtime.
+btn_event Pushbutton event interface.
+\x1b[39m
+'''
+print(tests)
+
+# Pulse an LED (coroutine)
+async def pulse(led, ms):
+ led.on()
+ await asyncio.sleep_ms(ms)
+ led.off()
+
+# Pulse an LED when an event triggered
+async def evt_pulse(event, led):
+ while True:
+ event.clear()
+ await event.wait()
+ led.on()
+ await asyncio.sleep_ms(500)
+ led.off()
+
+# Toggle an LED (callback)
+def toggle(led):
+ led.toggle()
+
+# Quit test by connecting X2 to ground
+async def killer(obj):
+ pin = Pin('X2', Pin.IN, Pin.PULL_UP)
+ while pin.value():
+ await asyncio.sleep_ms(50)
+ obj.deinit()
+ await asyncio.sleep_ms(0)
+
+def run(obj):
+ try:
+ asyncio.run(killer(obj))
+ except KeyboardInterrupt:
+ print('Interrupted')
+ finally:
+ asyncio.new_event_loop()
+ print(tests)
+
+
+# Test for the Switch class passing coros
+def test_sw():
+ s = '''
+close pulses green
+open pulses red
+'''
+ print('Test of switch scheduling coroutines.')
+ print(helptext)
+ print(s)
+ pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+ red = LED(1)
+ green = LED(2)
+ sw = Switch(pin)
+ # Register coros to launch on contact close and open
+ sw.close_func(pulse, (green, 1000))
+ sw.open_func(pulse, (red, 1000))
+ run(sw)
+
+# Test for the switch class with a callback
+def test_swcb():
+ s = '''
+close toggles red
+open toggles green
+'''
+ print('Test of switch executing callbacks.')
+ print(helptext)
+ print(s)
+ pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+ red = LED(1)
+ green = LED(2)
+ sw = Switch(pin)
+ # Register a coro to launch on contact close
+ sw.close_func(toggle, (red,))
+ sw.open_func(toggle, (green,))
+ run(sw)
+
+# Test for the Switch class (events)
+async def do_sw_event():
+ pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+ sw = Switch(pin)
+ sw.open_func(None)
+ sw.close_func(None)
+ tasks = []
+ for event, led in ((sw.close, 1), (sw.open, 2)):
+ tasks.append(asyncio.create_task(evt_pulse(event, LED(led))))
+ await killer(sw)
+ for task in tasks:
+ task.cancel()
+
+def test_sw_event():
+ s = '''
+close pulse red
+open pulses green
+'''
+ print('Test of switch triggering events.')
+ print(helptext)
+ print(s)
+ try:
+ asyncio.run(do_sw_event())
+ except KeyboardInterrupt:
+ print('Interrupted')
+ finally:
+ asyncio.new_event_loop()
+ print(tests)
+
+# Test for the Pushbutton class (coroutines)
+# Pass True to test suppress
+def test_btn(suppress=False, lf=True, df=True):
+ s = '''
+press pulses red
+release pulses green
+double click pulses yellow
+long press pulses blue
+'''
+ print('Test of pushbutton scheduling coroutines.')
+ print(helptext)
+ print(s)
+ pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+ red = LED(1)
+ green = LED(2)
+ yellow = LED(3)
+ blue = LED(4)
+ pb = Pushbutton(pin, suppress)
+ pb.press_func(pulse, (red, 1000))
+ pb.release_func(pulse, (green, 1000))
+ if df:
+ print('Doubleclick enabled')
+ pb.double_func(pulse, (yellow, 1000))
+ if lf:
+ print('Long press enabled')
+ pb.long_func(pulse, (blue, 1000))
+ run(pb)
+
+# Test for the Pushbutton class (callbacks)
+def test_btncb():
+ s = '''
+press toggles red
+release toggles green
+double click toggles yellow
+long press toggles blue
+'''
+ print('Test of pushbutton executing callbacks.')
+ print(helptext)
+ print(s)
+ pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+ red = LED(1)
+ green = LED(2)
+ yellow = LED(3)
+ blue = LED(4)
+ pb = Pushbutton(pin)
+ pb.press_func(toggle, (red,))
+ pb.release_func(toggle, (green,))
+ pb.double_func(toggle, (yellow,))
+ pb.long_func(toggle, (blue,))
+ run(pb)
+
+# Test for the Pushbutton class where callback coros change dynamically
+def setup(pb, press, release, dbl, lng, t=1000):
+ s = '''
+Functions are changed:
+LED's pulse for 2 seconds
+press pulses blue
+release pulses red
+double click pulses green
+long pulses yellow
+'''
+ pb.press_func(pulse, (press, t))
+ pb.release_func(pulse, (release, t))
+ pb.double_func(pulse, (dbl, t))
+ if lng is not None:
+ pb.long_func(pulse, (lng, t))
+ print(s)
+
+def btn_dynamic():
+ s = '''
+press pulses red
+release pulses green
+double click pulses yellow
+long press changes button functions.
+'''
+ print('Test of pushbutton scheduling coroutines.')
+ print(helptext)
+ print(s)
+ pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+ red = LED(1)
+ green = LED(2)
+ yellow = LED(3)
+ blue = LED(4)
+ pb = Pushbutton(pin)
+ setup(pb, red, green, yellow, None)
+ pb.long_func(setup, (pb, blue, red, green, yellow, 2000))
+ run(pb)
+
+# Test for the Pushbutton class (events)
+async def do_btn_event():
+ pin = Pin('X1', Pin.IN, Pin.PULL_UP)
+ pb = Pushbutton(pin)
+ pb.press_func(None)
+ pb.release_func(None)
+ pb.double_func(None)
+ pb.long_func(None)
+ tasks = []
+ for event, led in ((pb.press, 1), (pb.release, 2), (pb.double, 3), (pb.long, 4)):
+ tasks.append(asyncio.create_task(evt_pulse(event, LED(led))))
+ await killer(pb)
+ for task in tasks:
+ task.cancel()
+
+def btn_event():
+ s = '''
+press pulse red
+release pulses green
+double click pulses yellow
+long press pulses blue
+'''
+ print('Test of pushbutton triggering events.')
+ print(helptext)
+ print(s)
+ try:
+ asyncio.run(do_btn_event())
+ except KeyboardInterrupt:
+ print('Interrupted')
+ finally:
+ asyncio.new_event_loop()
+ print(tests)
+