From 3b6cfaf15ad930c34b0d7f87c6a841ea651ccccb Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Sat, 7 Mar 2020 12:23:06 +0000 Subject: [PATCH] Receiver now a package. --- README.md | 92 ++++++++---- ir_rx.py | 324 ------------------------------------------- ir_rx/__init__.py | 70 ++++++++++ ir_rx/nec.py | 62 +++++++++ ir_rx/philips.py | 127 +++++++++++++++++ ir_rx/print_error.py | 19 +++ ir_rx/sony.py | 71 ++++++++++ ir_rx_test.py | 57 ++++---- 8 files changed, 441 insertions(+), 381 deletions(-) delete mode 100644 ir_rx.py create mode 100644 ir_rx/__init__.py create mode 100644 ir_rx/nec.py create mode 100644 ir_rx/philips.py create mode 100644 ir_rx/print_error.py create mode 100644 ir_rx/sony.py diff --git a/README.md b/README.md index c107a78..1e1fa25 100644 --- a/README.md +++ b/README.md @@ -69,8 +69,11 @@ On import, demos print an explanation of how to run them. ## 3.1 Receiver -Copy the following files to the target filesystem: - 1. `ir_rx.py` The receiver device driver. +This is a Python package. This minimises RAM usage: applications only import +the device driver for the protocol in use. + +Copy the following to the target filesystem: + 1. `ir_rx` Directory and contents. Contains the device drivers. 2. `ir_rx_test.py` Demo of a receiver. There are no dependencies. @@ -91,10 +94,29 @@ from the official library and `aswitch.py` from # 4. Receiver -This implements a class for each supported protocol, namely `NEC_IR`, -`SONY_IR`, `RC5_IR` and `RC6_M0`. Applications should instantiate the -appropriate class with a callback. The callback will run whenever an IR pulse -train is received. +This implements a class for each supported protocol. Applications should +instantiate the appropriate class with a callback. The callback will run +whenever an IR pulse train is received. Example running on a Pyboard: + +```python +import time +from machine import Pin +from pyb import LED +from ir_rx.nec import NEC_8 # NEC remote, 8 bit addresses + +red = LED(1) + +def callback(data, addr, ctrl): + if data < 0: # NEC protocol sends repeat codes. + print('Repeat code.') + else: + print('Data {:02x} Addr {:04x}'.format(data, addr)) + +ir = NEC_8(Pin('X3', Pin.IN), callback) +while True: + time.sleep_ms(500) + red.toggle() +``` #### Common to all classes @@ -125,24 +147,46 @@ Method: it will be called if an error occurs. The value corresponds to the error code (see below). -#### Properties specific to a class - -`NEC_IR`: -`extended` `bool`. Remotes using the NEC protocol can send 8 or 16 bit -addresses. If `True` 16 bit addresses are assumed. If an 8 bit address is sent -it will be received as a 16 bit value comprising the address and (in bits 8-15) -its ones complement. Set `False` to enable error checking for remotes that -return an 8 bit address: the complement will be checked and the address will be -returned as an 8-bit value. The default is `True`. - -`SONY_IR`: -`bits` `int`. The SIRC protocol comes in 3 variants: 12, 15 and 20 bits. The -default will handle bitstreams from all three types of remote. A value matching -your remote improves the timing reducing the likelihood of errors when handling -repeats: in 20-bit mode SIRC timing when a button is held down is tight. A -worst-case 20-bit block takes 39ms nominal, yet the repeat time is 45ms nominal. -The Sony remote tested issues both 12 bit and 15 bit streams. The default is -20. +#### NEC classes + +`NEC_8`, `NEC_16` + +```python +from ir_rx.nec import NEC_8 +``` + +Remotes using the NEC protocol can send 8 or 16 bit addresses. If the `NEC_16` +class receives an 8 bit address it will get a 16 bit value comprising the +address in bits 0-7 and its one's complement in bits 8-15. +The `NEC_8` class enables error checking for remotes that return an 8 bit +address: the complement is checked and the address returned as an 8-bit value. +A 16-bit address will result in an error. + +#### Sony classes + +`SONY_12`, `SONY_15`, `SONY_20` + +```python +from ir_rx.sony import SONY_15 +``` + +The SIRC protocol comes in 3 variants: 12, 15 and 20 bits. `SONY_20` handles +bitstreams from all three types of remote. Choosing a class matching the remote +improves the timing reducing the likelihood of errors when handling repeats: in +20-bit mode SIRC timing when a button is held down is tight. A worst-case 20 +bit block takes 39ms nominal, yet the repeat time is 45ms nominal. +A single physical remote can issue more than one type of bitstream. The Sony +remote tested issued both 12 bit and 15 bit streams. + +#### Philips classes + +`RC5_IR`, `RC6_M0` + +```python +from ir_rx.philips import RC5_IR +``` + +These support the RC-5 and RC-6 mode 0 protocols respectively. # 4.1 Errors diff --git a/ir_rx.py b/ir_rx.py deleted file mode 100644 index 210d617..0000000 --- a/ir_rx.py +++ /dev/null @@ -1,324 +0,0 @@ -# ir_rx.py Decoder for IR remote control using synchronous code -# Supports RC-5 RC-6 mode 0 and NEC protocols. -# For a remote using NEC see https://www.adafruit.com/products/389 - -# Author: Peter Hinch -# Copyright Peter Hinch 2020 Released under the MIT license - -from sys import platform -from micropython import const -from machine import Timer -from array import array -from utime import ticks_us, ticks_diff - -if platform == 'pyboard': - from pyb import Pin, ExtInt -else: - from machine import Pin - -ESP32 = platform == 'esp32' or platform == 'esp32_LoBo' - -# Save RAM -# from micropython import alloc_emergency_exception_buf -# alloc_emergency_exception_buf(100) - -# Result codes (accessible to application) -# Repeat button code -REPEAT = -1 -# Error codes -BADSTART = -2 -BADBLOCK = -3 -BADREP = -4 -OVERRUN = -5 -BADDATA = -6 -BADADDR = -7 - - -# On 1st edge start a block timer. While the timer is running, record the time -# of each edge. When the timer times out decode the data. Duration must exceed -# the worst case block transmission time, but be less than the interval between -# a block start and a repeat code start (~108ms depending on protocol) - -class IR_RX(): - _erstr = "'{}' object has no attribute '{}'" - - def __init__(self, pin, nedges, tblock, callback, *args): # Optional args for callback - self._nedges = nedges - self._tblock = tblock - self.callback = callback - self.args = args - self._errf = lambda _ : None - self.verbose = False - - self._times = array('i', (0 for _ in range(nedges + 1))) # +1 for overrun - if platform == 'pyboard': - ei = ExtInt(pin, ExtInt.IRQ_RISING_FALLING, Pin.PULL_NONE, self._cb_pin) - elif ESP32: - ei = pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING)) - else: - ei = pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING), hard = True) - self._eint = ei # Keep reference?? - self.edge = 0 - self.tim = Timer(-1) # Sofware timer - self.cb = self.decode - - - # Pin interrupt. Save time of each edge for later decode. - def _cb_pin(self, line): - t = ticks_us() - # On overrun ignore pulses until software timer times out - if self.edge <= self._nedges: # Allow 1 extra pulse to record overrun - if not self.edge: # First edge received - self.tim.init(period=self._tblock , mode=Timer.ONE_SHOT, callback=self.cb) - self._times[self.edge] = t - self.edge += 1 - - @property - def extended(self): - raise AttributeError(self._erstr.format(self.__qualname__, 'extended')) - - @property - def bits(self): - raise AttributeError(self._erstr.format(self.__qualname__, 'bits')) - - def do_callback(self, cmd, addr, ext, thresh=0): - self.edge = 0 - if cmd >= thresh: - self.callback(cmd, addr, ext, *self.args) - else: - self._errf(cmd) - - def error_function(self, func): - self._errf = func - -class NEC_IR(IR_RX): - def __init__(self, pin, callback, *args): - # Block lasts <= 80ms (extended mode) and has 68 edges - super().__init__(pin, 68, 80, callback, *args) - self._extended = True - self._addr = 0 - - def decode(self, _): - try: - if self.edge > 68: - raise RuntimeError(OVERRUN) - width = ticks_diff(self._times[1], self._times[0]) - if width < 4000: # 9ms leading mark for all valid data - raise RuntimeError(BADSTART) - width = ticks_diff(self._times[2], self._times[1]) - if width > 3000: # 4.5ms space for normal data - if self.edge < 68: # Haven't received the correct number of edges - raise RuntimeError(BADBLOCK) - # Time spaces only (marks are always 562.5µs) - # Space is 1.6875ms (1) or 562.5µs (0) - # Skip last bit which is always 1 - val = 0 - for edge in range(3, 68 - 2, 2): - val >>= 1 - if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120: - val |= 0x80000000 - elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges. - raise RuntimeError(REPEAT if self.edge == 4 else BADREP) # Treat REPEAT as error. - else: - raise RuntimeError(BADSTART) - addr = val & 0xff # 8 bit addr - cmd = (val >> 16) & 0xff - if cmd != (val >> 24) ^ 0xff: - raise RuntimeError(BADDATA) - if addr != ((val >> 8) ^ 0xff) & 0xff: # 8 bit addr doesn't match check - if not self._extended: - raise RuntimeError(BADADDR) - addr |= val & 0xff00 # pass assumed 16 bit address to callback - self._addr = addr - except RuntimeError as e: - cmd = e.args[0] - addr = self._addr if cmd == REPEAT else 0 # REPEAT uses last address - # Set up for new data burst and run user callback - self.do_callback(cmd, addr, 0, REPEAT) - - @property - def extended(self): - return self._extended - - @extended.setter - def extended(self, value): - self._extended = bool(value) - -class SONY_IR(IR_RX): - def __init__(self, pin, callback, *args): - # 20 bit block has 42 edges and lasts <= 39ms nominal. Add 4ms to time - # for tolerances except in 20 bit case where timing is tight with a - # repeat period of 45ms. - t = int(3 + bits * 1.8) + (1 if bits == 20 else 4) - super().__init__(pin, 2 + bits * 2, t, callback, *args) - self._addr = 0 - self._bits = 20 - - def decode(self, _): - try: - nedges = self.edge # No. of edges detected - self.verbose and print('nedges', nedges) - if nedges > 42: - raise RuntimeError(OVERRUN) - bits = (nedges - 2) // 2 - if nedges not in (26, 32, 42) or bits > self._bits: - raise RuntimeError(BADBLOCK) - self.verbose and print('SIRC {}bit'.format(bits)) - width = ticks_diff(self._times[1], self._times[0]) - if not 1800 < width < 3000: # 2.4ms leading mark for all valid data - raise RuntimeError(BADSTART) - width = ticks_diff(self._times[2], self._times[1]) - if not 350 < width < 1000: # 600μs space - raise RuntimeError(BADSTART) - - val = 0 # Data received, LSB 1st - x = 2 - bit = 1 - while x < nedges - 2: - if ticks_diff(self._times[x + 1], self._times[x]) > 900: - val |= bit - bit <<= 1 - x += 2 - - cmd = val & 0x7f # 7 bit command - val >>= 7 - if nedges < 42: - addr = val & 0xff # 5 or 8 bit addr - val = 0 - else: - addr = val & 0x1f # 5 bit addr - val >>= 5 # 8 bit extended - except RuntimeError as e: - cmd = e.args[0] - addr = 0 - val = 0 - self.do_callback(cmd, addr, val) - - @property - def bits(self): - return self._bits - - @bits.setter - def bits(self, value): - if value not in (12, 15, 20): - raise ValueError('bits must be 12, 15 or 20') - self._bits = value - -class RC5_IR(IR_RX): - def __init__(self, pin, callback, *args): - # Block lasts <= 30ms and has <= 28 edges - super().__init__(pin, 28, 30, callback, *args) - - def decode(self, _): - try: - nedges = self.edge # No. of edges detected - if not 14 <= nedges <= 28: - raise RuntimeError(OVERRUN if nedges > 28 else BADSTART) - # Regenerate bitstream - bits = 0 - bit = 1 - for x in range(1, nedges): - width = ticks_diff(self._times[x], self._times[x - 1]) - if not 500 < width < 2000: - raise RuntimeError(BADBLOCK) - for _ in range(1 if width < 1334 else 2): - bits <<= 1 - bits |= bit - bit ^= 1 - self.verbose and print(bin(bits)) # Matches inverted scope waveform - # Decode Manchester code - x = 30 - while not bits >> x: - x -= 1 - m0 = 1 << x # Mask MS two bits (always 01) - m1 = m0 << 1 - v = 0 # 14 bit bitstream - for _ in range(14): - v <<= 1 - b0 = (bits & m0) > 0 - b1 = (bits & m1) > 0 - if b0 == b1: - raise RuntimeError(BADBLOCK) - v |= b0 - m0 >>= 2 - m1 >>= 2 - # Split into fields (val, addr, ctrl) - val = (v & 0x3f) | (0x40 if ((v >> 12) & 1) else 0) - addr = (v >> 6) & 0x1f - ctrl = (v >> 11) & 1 - - except RuntimeError as e: - val, addr, ctrl = e.args[0], 0, 0 - # Set up for new data burst and run user callback - self.do_callback(val, addr, ctrl) - -class RC6_M0(IR_RX): - # Even on Pyboard D the 444μs nominal pulses can be recorded as up to 705μs - # Scope shows 360-520 μs (-84μs +76μs relative to nominal) - # Header nominal 2666, 889, 444, 889, 444, 444, 444, 444 carrier ON at end - hdr = ((1800, 4000), (593, 1333), (222, 750), (593, 1333), (222, 750), (222, 750), (222, 750), (222, 750)) - def __init__(self, pin, callback, *args): - # Block lasts 23ms nominal and has <=44 edges - super().__init__(pin, 44, 30, callback, *args) - - def decode(self, _): - try: - nedges = self.edge # No. of edges detected - if not 22 <= nedges <= 44: - raise RuntimeError(OVERRUN if nedges > 28 else BADSTART) - for x, lims in enumerate(self.hdr): - width = ticks_diff(self._times[x + 1], self._times[x]) - if not (lims[0] < width < lims[1]): - self.verbose and print('Bad start', x, width, lims) - raise RuntimeError(BADSTART) - x += 1 - width = ticks_diff(self._times[x + 1], self._times[x]) - # 2nd bit of last 0 is 444μs (0) or 1333μs (1) - if not 222 < width < 1555: - self.verbose and print('Bad block 1 Width', width, 'x', x) - raise RuntimeError(BADBLOCK) - short = width < 889 - v = int(not short) - bit = v - bits = 1 # Bits decoded - x += 1 + int(short) - width = ticks_diff(self._times[x + 1], self._times[x]) - if not 222 < width < 1555: - self.verbose and print('Bad block 2 Width', width, 'x', x) - raise RuntimeError(BADBLOCK) - short = width < 1111 - if not short: - bit ^= 1 - x += 1 + int(short) # If it's short, we know width of next - v <<= 1 - v |= bit # MSB of result - bits += 1 - # Decode bitstream - while bits < 17: - # -1 convert count to index, -1 because we look ahead - if x > nedges - 2: - raise RuntimeError(BADBLOCK) - # width is 444/889 nominal - width = ticks_diff(self._times[x + 1], self._times[x]) - if not 222 < width < 1111: - self.verbose and print('Bad block 3 Width', width, 'x', x) - raise RuntimeError(BADBLOCK) - short = width < 666 - if not short: - bit ^= 1 - v <<= 1 - v |= bit - bits += 1 - x += 1 + int(short) - - if self.verbose: - ss = '20-bit format {:020b} x={} nedges={} bits={}' - print(ss.format(v, x, nedges, bits)) - - val = v & 0xff - addr = (v >> 8) & 0xff - ctrl = (v >> 16) & 1 - except RuntimeError as e: - val, addr, ctrl = e.args[0], 0, 0 - # Set up for new data burst and run user callback - self.do_callback(val, addr, ctrl) diff --git a/ir_rx/__init__.py b/ir_rx/__init__.py new file mode 100644 index 0000000..4b96524 --- /dev/null +++ b/ir_rx/__init__.py @@ -0,0 +1,70 @@ +# ir_rx __init__.py Decoder for IR remote control using synchronous code +# IR_RX abstract base class for IR receivers. + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from sys import platform +from machine import Timer, Pin +from array import array +from utime import ticks_us + +# Save RAM +# from micropython import alloc_emergency_exception_buf +# alloc_emergency_exception_buf(100) + + +# On 1st edge start a block timer. While the timer is running, record the time +# of each edge. When the timer times out decode the data. Duration must exceed +# the worst case block transmission time, but be less than the interval between +# a block start and a repeat code start (~108ms depending on protocol) + +class IR_RX(): + # Result/error codes + # Repeat button code + REPEAT = -1 + # Error codes + BADSTART = -2 + BADBLOCK = -3 + BADREP = -4 + OVERRUN = -5 + BADDATA = -6 + BADADDR = -7 + + def __init__(self, pin, nedges, tblock, callback, *args): # Optional args for callback + self._nedges = nedges + self._tblock = tblock + self.callback = callback + self.args = args + self._errf = lambda _ : None + self.verbose = False + + self._times = array('i', (0 for _ in range(nedges + 1))) # +1 for overrun + if platform == 'esp32' or platform == 'esp32_LoBo': # ESP32 Doesn't support hard IRQ + ei = pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING)) + else: + ei = pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING), hard = True) + self._eint = ei # Keep reference?? + self.edge = 0 + self.tim = Timer(-1) # Sofware timer + self.cb = self.decode + + # Pin interrupt. Save time of each edge for later decode. + def _cb_pin(self, line): + t = ticks_us() + # On overrun ignore pulses until software timer times out + if self.edge <= self._nedges: # Allow 1 extra pulse to record overrun + if not self.edge: # First edge received + self.tim.init(period=self._tblock , mode=Timer.ONE_SHOT, callback=self.cb) + self._times[self.edge] = t + self.edge += 1 + + def do_callback(self, cmd, addr, ext, thresh=0): + self.edge = 0 + if cmd >= thresh: + self.callback(cmd, addr, ext, *self.args) + else: + self._errf(cmd) + + def error_function(self, func): + self._errf = func diff --git a/ir_rx/nec.py b/ir_rx/nec.py new file mode 100644 index 0000000..e516924 --- /dev/null +++ b/ir_rx/nec.py @@ -0,0 +1,62 @@ +# nec.py Decoder for IR remote control using synchronous code +# Supports NEC protocol. +# For a remote using NEC see https://www.adafruit.com/products/389 + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from utime import ticks_us, ticks_diff +from ir_rx import IR_RX + +class NEC_ABC(IR_RX): + def __init__(self, pin, extended, callback, *args): + # Block lasts <= 80ms (extended mode) and has 68 edges + super().__init__(pin, 68, 80, callback, *args) + self._extended = extended + self._addr = 0 + + def decode(self, _): + try: + if self.edge > 68: + raise RuntimeError(self.OVERRUN) + width = ticks_diff(self._times[1], self._times[0]) + if width < 4000: # 9ms leading mark for all valid data + raise RuntimeError(self.BADSTART) + width = ticks_diff(self._times[2], self._times[1]) + if width > 3000: # 4.5ms space for normal data + if self.edge < 68: # Haven't received the correct number of edges + raise RuntimeError(self.BADBLOCK) + # Time spaces only (marks are always 562.5µs) + # Space is 1.6875ms (1) or 562.5µs (0) + # Skip last bit which is always 1 + val = 0 + for edge in range(3, 68 - 2, 2): + val >>= 1 + if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120: + val |= 0x80000000 + elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges. + raise RuntimeError(self.REPEAT if self.edge == 4 else self.BADREP) # Treat REPEAT as error. + else: + raise RuntimeError(self.BADSTART) + addr = val & 0xff # 8 bit addr + cmd = (val >> 16) & 0xff + if cmd != (val >> 24) ^ 0xff: + raise RuntimeError(self.BADDATA) + if addr != ((val >> 8) ^ 0xff) & 0xff: # 8 bit addr doesn't match check + if not self._extended: + raise RuntimeError(self.BADADDR) + addr |= val & 0xff00 # pass assumed 16 bit address to callback + self._addr = addr + except RuntimeError as e: + cmd = e.args[0] + addr = self._addr if cmd == self.REPEAT else 0 # REPEAT uses last address + # Set up for new data burst and run user callback + self.do_callback(cmd, addr, 0, self.REPEAT) + +class NEC_8(NEC_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, False, callback, *args) + +class NEC_16(NEC_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, True, callback, *args) diff --git a/ir_rx/philips.py b/ir_rx/philips.py new file mode 100644 index 0000000..cbd96f8 --- /dev/null +++ b/ir_rx/philips.py @@ -0,0 +1,127 @@ +# philips.py Decoder for IR remote control using synchronous code +# Supports Philips RC-5 RC-6 mode 0 protocols. + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from utime import ticks_us, ticks_diff +from ir_rx import IR_RX + +class RC5_IR(IR_RX): + def __init__(self, pin, callback, *args): + # Block lasts <= 30ms and has <= 28 edges + super().__init__(pin, 28, 30, callback, *args) + + def decode(self, _): + try: + nedges = self.edge # No. of edges detected + if not 14 <= nedges <= 28: + raise RuntimeError(self.OVERRUN if nedges > 28 else self.BADSTART) + # Regenerate bitstream + bits = 0 + bit = 1 + for x in range(1, nedges): + width = ticks_diff(self._times[x], self._times[x - 1]) + if not 500 < width < 2000: + raise RuntimeError(self.BADBLOCK) + for _ in range(1 if width < 1334 else 2): + bits <<= 1 + bits |= bit + bit ^= 1 + self.verbose and print(bin(bits)) # Matches inverted scope waveform + # Decode Manchester code + x = 30 + while not bits >> x: + x -= 1 + m0 = 1 << x # Mask MS two bits (always 01) + m1 = m0 << 1 + v = 0 # 14 bit bitstream + for _ in range(14): + v <<= 1 + b0 = (bits & m0) > 0 + b1 = (bits & m1) > 0 + if b0 == b1: + raise RuntimeError(self.BADBLOCK) + v |= b0 + m0 >>= 2 + m1 >>= 2 + # Split into fields (val, addr, ctrl) + val = (v & 0x3f) | (0x40 if ((v >> 12) & 1) else 0) + addr = (v >> 6) & 0x1f + ctrl = (v >> 11) & 1 + + except RuntimeError as e: + val, addr, ctrl = e.args[0], 0, 0 + # Set up for new data burst and run user callback + self.do_callback(val, addr, ctrl) + +class RC6_M0(IR_RX): + # Even on Pyboard D the 444μs nominal pulses can be recorded as up to 705μs + # Scope shows 360-520 μs (-84μs +76μs relative to nominal) + # Header nominal 2666, 889, 444, 889, 444, 444, 444, 444 carrier ON at end + hdr = ((1800, 4000), (593, 1333), (222, 750), (593, 1333), (222, 750), (222, 750), (222, 750), (222, 750)) + def __init__(self, pin, callback, *args): + # Block lasts 23ms nominal and has <=44 edges + super().__init__(pin, 44, 30, callback, *args) + + def decode(self, _): + try: + nedges = self.edge # No. of edges detected + if not 22 <= nedges <= 44: + raise RuntimeError(self.OVERRUN if nedges > 28 else self.BADSTART) + for x, lims in enumerate(self.hdr): + width = ticks_diff(self._times[x + 1], self._times[x]) + if not (lims[0] < width < lims[1]): + self.verbose and print('Bad start', x, width, lims) + raise RuntimeError(self.BADSTART) + x += 1 + width = ticks_diff(self._times[x + 1], self._times[x]) + # 2nd bit of last 0 is 444μs (0) or 1333μs (1) + if not 222 < width < 1555: + self.verbose and print('Bad block 1 Width', width, 'x', x) + raise RuntimeError(self.BADBLOCK) + short = width < 889 + v = int(not short) + bit = v + bits = 1 # Bits decoded + x += 1 + int(short) + width = ticks_diff(self._times[x + 1], self._times[x]) + if not 222 < width < 1555: + self.verbose and print('Bad block 2 Width', width, 'x', x) + raise RuntimeError(self.BADBLOCK) + short = width < 1111 + if not short: + bit ^= 1 + x += 1 + int(short) # If it's short, we know width of next + v <<= 1 + v |= bit # MSB of result + bits += 1 + # Decode bitstream + while bits < 17: + # -1 convert count to index, -1 because we look ahead + if x > nedges - 2: + raise RuntimeError(self.BADBLOCK) + # width is 444/889 nominal + width = ticks_diff(self._times[x + 1], self._times[x]) + if not 222 < width < 1111: + self.verbose and print('Bad block 3 Width', width, 'x', x) + raise RuntimeError(self.BADBLOCK) + short = width < 666 + if not short: + bit ^= 1 + v <<= 1 + v |= bit + bits += 1 + x += 1 + int(short) + + if self.verbose: + ss = '20-bit format {:020b} x={} nedges={} bits={}' + print(ss.format(v, x, nedges, bits)) + + val = v & 0xff + addr = (v >> 8) & 0xff + ctrl = (v >> 16) & 1 + except RuntimeError as e: + val, addr, ctrl = e.args[0], 0, 0 + # Set up for new data burst and run user callback + self.do_callback(val, addr, ctrl) diff --git a/ir_rx/print_error.py b/ir_rx/print_error.py new file mode 100644 index 0000000..31ce51e --- /dev/null +++ b/ir_rx/print_error.py @@ -0,0 +1,19 @@ +# print_error.py Error print for IR receiver + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from ir_rx import IR_RX + +_errors = {IR_RX.BADSTART : 'Invalid start pulse', + IR_RX.BADBLOCK : 'Error: bad block', + IR_RX.BADREP : 'Error: repeat', + IR_RX.OVERRUN : 'Error: overrun', + IR_RX.BADDATA : 'Error: invalid data', + IR_RX.BADADDR : 'Error: invalid address'} + +def print_error(data): + if data in _errors: + print(_errors[data]) + else: + print('Unknown error code:', data) diff --git a/ir_rx/sony.py b/ir_rx/sony.py new file mode 100644 index 0000000..9c9147a --- /dev/null +++ b/ir_rx/sony.py @@ -0,0 +1,71 @@ +# sony.py Decoder for IR remote control using synchronous code +# Sony SIRC protocol. + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from utime import ticks_us, ticks_diff +from ir_rx import IR_RX + +class SONY_ABC(IR_RX): # Abstract base class + def __init__(self, pin, bits, callback, *args): + # 20 bit block has 42 edges and lasts <= 39ms nominal. Add 4ms to time + # for tolerances except in 20 bit case where timing is tight with a + # repeat period of 45ms. + t = int(3 + bits * 1.8) + (1 if bits == 20 else 4) + super().__init__(pin, 2 + bits * 2, t, callback, *args) + self._addr = 0 + self._bits = 20 + + def decode(self, _): + try: + nedges = self.edge # No. of edges detected + self.verbose and print('nedges', nedges) + if nedges > 42: + raise RuntimeError(self.OVERRUN) + bits = (nedges - 2) // 2 + if nedges not in (26, 32, 42) or bits > self._bits: + raise RuntimeError(self.BADBLOCK) + self.verbose and print('SIRC {}bit'.format(bits)) + width = ticks_diff(self._times[1], self._times[0]) + if not 1800 < width < 3000: # 2.4ms leading mark for all valid data + raise RuntimeError(self.BADSTART) + width = ticks_diff(self._times[2], self._times[1]) + if not 350 < width < 1000: # 600μs space + raise RuntimeError(self.BADSTART) + + val = 0 # Data received, LSB 1st + x = 2 + bit = 1 + while x < nedges - 2: + if ticks_diff(self._times[x + 1], self._times[x]) > 900: + val |= bit + bit <<= 1 + x += 2 + + cmd = val & 0x7f # 7 bit command + val >>= 7 + if nedges < 42: + addr = val & 0xff # 5 or 8 bit addr + val = 0 + else: + addr = val & 0x1f # 5 bit addr + val >>= 5 # 8 bit extended + except RuntimeError as e: + cmd = e.args[0] + addr = 0 + val = 0 + self.do_callback(cmd, addr, val) + +class SONY_12(SONY_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, 12, callback, *args) + +class SONY_15(SONY_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, 15, callback, *args) + +class SONY_20(SONY_ABC): + def __init__(self, pin, callback, *args): + super().__init__(pin, 20, callback, *args) + diff --git a/ir_rx_test.py b/ir_rx_test.py index a436082..badd2b5 100644 --- a/ir_rx_test.py +++ b/ir_rx_test.py @@ -10,16 +10,19 @@ from sys import platform import time import gc from machine import Pin, freq -from ir_rx import * - -ESP32 = platform == 'esp32' or platform == 'esp32_LoBo' +from ir_rx.print_error import print_error # Optional print of error codes +# Import all implemented classes +from ir_rx.nec import NEC_8, NEC_16 +from ir_rx.sony import SONY_12, SONY_15, SONY_20 +from ir_rx.philips import RC5_IR, RC6_M0 +# Define pin according to platform if platform == 'pyboard': p = Pin('X3', Pin.IN) elif platform == 'esp8266': freq(160000000) p = Pin(13, Pin.IN) -elif ESP32: +elif platform == 'esp32' or platform == 'esp32_LoBo': p = Pin(23, Pin.IN) # was 27 # User callback @@ -27,42 +30,30 @@ def cb(data, addr, ctrl): if data < 0: # NEC protocol sends repeat codes. print('Repeat code.') else: - print('Data {:03x} Addr {:03x} Ctrl {:01x}'.format(data, addr, ctrl)) + print('Data {:02x} Addr {:04x} Ctrl {:02x}'.format(data, addr, ctrl)) -# Optionally print debug information -def errf(data): - errors = {BADSTART : 'Invalid start pulse', BADBLOCK : 'Error: bad block', - BADREP : 'Error: repeat', OVERRUN : 'Error: overrun', - BADDATA : 'Error: invalid data', BADADDR : 'Error: invalid address'} - print(errors[data]) +def test(proto=0): + classes = (NEC_8, NEC_16, SONY_12, SONY_15, SONY_20, RC5_IR, RC6_M0) + ir = classes[proto](p, cb) # Instantiate receiver + ir.error_function(print_error) # Show debug information + #ir.verbose = True + # A real application would do something here... + while True: + print('running') + time.sleep(5) + gc.collect() +# **** DISPLAY GREETING **** s = '''Test for IR receiver. Run: from ir_rx_test import test -test() for NEC protocol, -test(1) for Sony SIRC 12 bit, -test(2) for Sony SIRC 15 bit, -test(3) for Sony SIRC 20 bit, +test() for NEC 8 bit protocol, +test(1) for NEC 16 bit, +test(2) for Sony SIRC 12 bit, +test(3) for Sony SIRC 15 bit, +test(4) for Sony SIRC 20 bit, test(5) for Philips RC-5 protocol, test(6) for RC6 mode 0. Hit ctrl-c to stop, then ctrl-d to soft reset.''' print(s) - -def test(proto=0): - if proto == 0: - ir = NEC_IR(p, cb) # Extended mode - elif proto < 4: - ir = SONY_IR(p, cb) - ir.bits = (12, 15, 20)[proto - 1] - #ir.verbose = True - elif proto == 5: - ir = RC5_IR(p, cb) - elif proto == 6: - ir = RC6_M0(p, cb) - ir.error_function(errf) # Show debug information - # A real application would do something here... - while True: - print('running') - time.sleep(5) - gc.collect() -- 2.47.3