From d2a0c49467a3a7832d5d350ea00a50c59359722b Mon Sep 17 00:00:00 2001 From: Peter Hinch Date: Mon, 24 Feb 2020 05:35:11 +0000 Subject: [PATCH] Initial commit. RC6 rx not working. --- ir_rx.py | 228 ++++++++++++++++++++++++++++++++++++++++++++++++++ ir_rx_test.py | 57 +++++++++++++ ir_tx.py | 158 ++++++++++++++++++++++++++++++++++ ir_tx_test.py | 79 +++++++++++++++++ 4 files changed, 522 insertions(+) create mode 100644 ir_rx.py create mode 100644 ir_rx_test.py create mode 100644 ir_tx.py create mode 100644 ir_tx_test.py diff --git a/ir_rx.py b/ir_rx.py new file mode 100644 index 0000000..6f20b2b --- /dev/null +++ b/ir_rx.py @@ -0,0 +1,228 @@ +# ir_rx.py Decoder for IR remote control using synchronous code +# Supports RC-5 RC-6 mode 0 and NEC protocols. +# For a remote using NEC see https://www.adafruit.com/products/389 + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +from sys import platform +from micropython import const +from machine import Timer +from array import array +from utime import ticks_us, ticks_diff + +if platform == 'pyboard': + from pyb import Pin, ExtInt +else: + from machine import Pin + +ESP32 = platform == 'esp32' or platform == 'esp32_LoBo' + +# Save RAM +# from micropython import alloc_emergency_exception_buf +# alloc_emergency_exception_buf(100) + +# Result codes (accessible to application) +# Repeat button code +REPEAT = -1 +# Error codes +BADSTART = -2 +BADBLOCK = -3 +BADREP = -4 +OVERRUN = -5 +BADDATA = -6 +BADADDR = -7 + + +# On 1st edge start a block timer. While the timer is running, record the time +# of each edge. When the timer times out decode the data. Duration must exceed +# the worst case block transmission time, but be less than the interval between +# a block start and a repeat code start (~108ms depending on protocol) + +class IR_RX(): + def __init__(self, pin, nedges, tblock, callback, *args): # Optional args for callback + self._nedges = nedges + self._tblock = tblock + self.callback = callback + self.args = args + + self._times = array('i', (0 for _ in range(nedges + 1))) # +1 for overrun + if platform == 'pyboard': + ExtInt(pin, ExtInt.IRQ_RISING_FALLING, Pin.PULL_NONE, self._cb_pin) + elif ESP32: + pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING)) + else: + pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING), hard = True) + self.edge = 0 + self.tim = Timer(-1) # Sofware timer + self.cb = self._decode + + + # Pin interrupt. Save time of each edge for later decode. + def _cb_pin(self, line): + t = ticks_us() + # On overrun ignore pulses until software timer times out + if self.edge <= self._nedges: # Allow 1 extra pulse to record overrun + if not self.edge: # First edge received + self.tim.init(period=self._tblock , mode=Timer.ONE_SHOT, callback=self.cb) + self._times[self.edge] = t + self.edge += 1 + +class NEC_IR(IR_RX): + def __init__(self, pin, callback, extended, *args): + # Block lasts <= 80ms and has 68 edges + tblock = 80 if extended else 73 # Allow for some tx tolerance (?) + super().__init__(pin, 68, tblock, callback, *args) + self._extended = extended + self._addr = 0 + + def _decode(self, _): + overrun = self.edge > 68 + val = OVERRUN if overrun else BADSTART + if not overrun: + width = ticks_diff(self._times[1], self._times[0]) + if width > 4000: # 9ms leading mark for all valid data + width = ticks_diff(self._times[2], self._times[1]) + if width > 3000: # 4.5ms space for normal data + if self.edge < 68: + # Haven't received the correct number of edges + val = BADBLOCK + else: + # Time spaces only (marks are always 562.5µs) + # Space is 1.6875ms (1) or 562.5µs (0) + # Skip last bit which is always 1 + val = 0 + for edge in range(3, 68 - 2, 2): + val >>= 1 + if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120: + val |= 0x80000000 + elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges. + val = REPEAT if self.edge == 4 else BADREP + addr = 0 + if val >= 0: # validate. Byte layout of val ~cmd cmd ~addr addr + addr = val & 0xff + cmd = (val >> 16) & 0xff + if addr == ((val >> 8) ^ 0xff) & 0xff: # 8 bit address OK + val = cmd if cmd == (val >> 24) ^ 0xff else BADDATA + self._addr = addr + else: + addr |= val & 0xff00 # pass assumed 16 bit address to callback + if self._extended: + val = cmd if cmd == (val >> 24) ^ 0xff else BADDATA + self._addr = addr + else: + val = BADADDR + if val == REPEAT: + addr = self._addr # Last valid addresss + self.edge = 0 # Set up for new data burst and run user callback + self.callback(val, addr, *self.args) + +class RC5_IR(IR_RX): + def __init__(self, pin, callback, *args): + # Block lasts <= 30ms and has <= 28 edges + super().__init__(pin, 28, 30, callback, *args) + + def _decode(self, _): + try: + nedges = self.edge # No. of edges detected + if not 14 <= nedges <= 28: + raise RuntimeError(OVERRUN if nedges > 28 else BADSTART) + # Regenerate bitstream + bits = 0 + bit = 1 + for x in range(1, nedges): + width = ticks_diff(self._times[x], self._times[x - 1]) + if not 500 < width < 2000: + raise RuntimeError(BADBLOCK) + for _ in range(1 if width < 1334 else 2): + bits <<= 1 + bits |= bit + bit ^= 1 + #print(bin(bits)) # Matches inverted scope waveform + # Decode Manchester code + x = 30 + while not bits >> x: + x -= 1 + m0 = 1 << x # Mask MS two bits (always 01) + m1 = m0 << 1 + v = 0 # 14 bit bitstream + for _ in range(14): + v <<= 1 + b0 = (bits & m0) > 0 + b1 = (bits & m1) > 0 + if b0 == b1: + raise RuntimeError(BADBLOCK) + v |= b0 + m0 >>= 2 + m1 >>= 2 + # Split into fields (val, addr, ctrl) + val = (v & 0x3f) | (0x40 if ((v >> 12) & 1) else 0) + addr = (v >> 6) & 0x1f + ctrl = (v >> 11) & 1 + + except RuntimeError as e: + val, addr, ctrl = e.args[0], 0, 0 + self.edge = 0 # Set up for new data burst and run user callback + self.callback(val, addr, ctrl, *self.args) + +class RC6_M0(IR_RX): + # Even on Pyboard D these 444us nominal pulses can be recorded as up to 705us + hdr = ((1800, 4000), (593, 1333), (222, 750), (593, 1333), (222, 750), (222, 750), (222, 750), (222, 750)) + def __init__(self, pin, callback, *args): + # Block lasts 23ms nominal and has <=44 edges + super().__init__(pin, 44, 30, callback, *args) + + def _decode(self, _): + try: + nedges = self.edge # No. of edges detected + if not 22 <= nedges <= 44: + raise RuntimeError(OVERRUN if nedges > 28 else BADSTART) + for x, lims in enumerate(self.hdr): + width = ticks_diff(self._times[x + 1], self._times[x]) + if not (lims[0] < width < lims[1]): + print('Bad start', x, width, lims) + raise RuntimeError(BADSTART) + x += 1 + width = ticks_diff(self._times[x + 1], self._times[x]) + ctrl = width > 889 # Long bit + start = x + 2 # Skip 2nd long bit + + # Regenerate bitstream + bits = 0 + bit = 0 + for x in range(start, nedges): + width = ticks_diff(self._times[x], self._times[x - 1]) + if not 222 < width < 1333: + print('Width', width) + raise RuntimeError(BADBLOCK) + for _ in range(1 if width < 666 else 2): + bits <<= 1 + bits |= bit + bit ^= 1 + print(bin(bits), len(bin(bits)) - 2) + + # Decode Manchester code + x = 32 + while not bits >> x: + x -= 1 + m0 = 1 << (x - 1) + m1 = 1 << x # MSB of pair + v = 0 # 16 bit bitstream + for _ in range(16): + v <<= 1 + b0 = (bits & m0) > 0 + b1 = (bits & m1) > 0 + #print(int(b1), int(b0)) + if b0 == b1: + raise RuntimeError(BADBLOCK) + v |= b0 + m0 >>= 2 + m1 >>= 2 + # Split into fields (val, addr) + val = v & 0xff + addr = (v >> 8) & 0xff + + except RuntimeError as e: + val, addr, ctrl = e.args[0], 0, 0 + self.edge = 0 # Set up for new data burst and run user callback + self.callback(val, addr, ctrl, *self.args) diff --git a/ir_rx_test.py b/ir_rx_test.py new file mode 100644 index 0000000..908d23f --- /dev/null +++ b/ir_rx_test.py @@ -0,0 +1,57 @@ +# ir_rx_test.py Test program for IR remote control decoder arem.py +# Supports Pyboard and ESP8266 + +# Author: Peter Hinch +# Copyright Peter Hinch 2020 Released under the MIT license + +# Run this to characterise a remote. + +from sys import platform +import time +from machine import Pin, freq +from arem import * + +ESP32 = platform == 'esp32' or platform == 'esp32_LoBo' + +if platform == 'pyboard': + p = Pin('X3', Pin.IN) +elif platform == 'esp8266': + freq(160000000) + p = Pin(13, Pin.IN) +elif ESP32: + p = Pin(23, Pin.IN) + +errors = {BADSTART : 'Invalid start pulse', BADBLOCK : 'Error: bad block', + BADREP : 'Error: repeat', OVERRUN : 'Error: overrun', + BADDATA : 'Error: invalid data', BADADDR : 'Error: invalid address'} + +def cb(data, addr, ctrl): + if data == REPEAT: # NEC protocol sends repeat codes. + print('Repeat code.') + elif data >= 0: + print('Data {:03x} Addr {:03x} Ctrl {:01x}'.format(data, addr, ctrl)) + else: + print('{} Address: {}'.format(errors[data], hex(addr))) + + +s = '''Test for IR receiver. Run: +ir_tx_test.test() for NEC protocol, +ir_tx_test.test(5) for Philips RC-5 protocol, +ir_tx_test.test(6) for RC6 mode 0. + +Background processing means REPL prompt reappears. +Hit ctrl-D to stop (soft reset).''' + +print(s) + +def test(proto=0): + if proto == 0: + ir = NEC_IR(p, cb, True, 0) # Extended mode, dummy ctrl arg for callback + elif proto == 5: + ir = RC5_IR(p, cb) + elif proto == 6: + ir = RC6_M0(p, cb) + # A real application would do something here... + #while True: + #time.sleep(5) + #print('running') diff --git a/ir_tx.py b/ir_tx.py new file mode 100644 index 0000000..9757840 --- /dev/null +++ b/ir_tx.py @@ -0,0 +1,158 @@ +# ir_tx.py Nonblocking IR blaster +# Runs on Pyboard D or Pyboard 1.x only (not Pyboard Lite) + +# Released under the MIT License (MIT). See LICENSE. + +# Copyright (c) 2020 Peter Hinch + +from pyb import Pin, Timer +from time import sleep_us, sleep +from micropython import const +from array import array +import micropython + +# micropython.alloc_emergency_exception_buf(100) + +# Common +_SPACE = const(0) # Or 100. Depends on wiring: 0 assumes logic 0 turns IR off. +_STOP = const(0) # End of data +# NEC +_TBURST = const(563) +_T_ONE = const(1687) +# RC5 +_T_RC5 = const(889) # Time for pulse of carrier +# RC6_M0 +_T_RC6 = const(444) +_T2_RC6 = const(889) + +# IR abstract base class. Array holds periods in μs between toggling 36/38KHz +# carrier on or off. +# Subclass is responsible for populating .arr and initiating transmission. +# Operation is in two phases: .transmit populates .arr with times in μs, then +# calls .start to initiate physical transmission. +class IR: + + def __init__(self, pin, freq, asize, duty): + tim = Timer(2, freq=freq) + self._ch = tim.channel(1, Timer.PWM, pin=pin) + self._ch.pulse_width_percent(_SPACE) + self.duty = duty + self.arr = array('H', 0 for _ in range(asize)) + self._tim = Timer(5) + self._tcb = self._cb + self.pretrans() + + # Before populating array, zero pointer, set notional carrier state (off). + def pretrans(self): + self.aptr = 0 # Index into array + self.carrier = False + + def start(self): + self.aptr = 0 # Reset pointer and initiate TX. + self._cb(self._tim) + + def _cb(self, t): + t.deinit() + p = self.aptr + v = self.arr[p] + if v == _STOP: + self._ch.pulse_width_percent(_SPACE) # Turn off IR LED. + return + self._ch.pulse_width_percent(_SPACE if p & 1 else self.duty) + self._tim.init(prescaler=84, period=v, callback=self._tcb) + self.aptr += 1 + + def append(self, *times): # Append one or more time peiods to .arr + for t in times: + self.arr[self.aptr] = t + self.aptr += 1 + self.carrier = not self.carrier # Keep track of carrier state + print('append', t, 'carrier', self.carrier) + + def add(self, t): # Increase last time value + print('add', t) + self.arr[self.aptr - 1] += t # Carrier unaffected + +# NEC protocol +class NEC(IR): + + def __init__(self, pin, freq=38000): # NEC specifies 38KHz + super().__init__(pin, freq, 68, 50) + + def _bit(self, b): + self.append(_TBURST, _T_ONE if b else _TBURST) + + def transmit(self, addr, data, _=0): # Ignore toggle if passed + self.pretrans() # Set initial conditions + self.append(9000, 4500) + if addr < 256: # Short address: append complement + addr |= ((addr ^ 0xff) << 8) + for x in range(16): + self._bit(addr & 1) + addr >>= 1 + data |= ((data ^ 0xff) << 8) + for x in range(16): + self._bit(data & 1) + data >>= 1 + self.append(_TBURST, _STOP) + self.start() + + def repeat(self): + self.aptr = 0 + self.append(9000, 2250, _TBURST, _STOP) + + +# Philips RC5 protocol +class RC5(IR): + + def __init__(self, pin, freq=36000): + super().__init__(pin, freq, 28, 30) + + def transmit(self, addr, data, toggle): + self.pretrans() # Set initial conditions + d = (data & 0x3f) | ((addr & 0x1f) << 6) | ((data & 0x40) << 6) | ((toggle & 1) << 11) + print(bin(d)) + mask = 0x2000 + while mask: + if mask == 0x2000: + self.append(_T_RC5) + else: + bit = bool(d & mask) + if bit ^ self.carrier: + self.add(_T_RC5) + self.append(_T_RC5) + else: + self.append(_T_RC5, _T_RC5) + mask >>= 1 + self.append(_STOP) + self.start() + +# Philips RC6 mode 0 protocol +class RC6_M0(IR): + + def __init__(self, pin, freq=36000): + super().__init__(pin, freq, 44, 30) + + def transmit(self, addr, data, toggle): + self.pretrans() # Set initial conditions + # leader, 1, 0, 0, 0 + self.append(2666, _T2_RC6, _T_RC6, _T2_RC6, _T_RC6, _T_RC6, _T_RC6, _T_RC6, _T_RC6) + # Append a single bit of twice duration + if toggle: + self.add(_T2_RC6) + self.append(_T2_RC6) + else: + self.append(_T2_RC6, _T2_RC6) + d = (data & 0xff) | ((addr & 0xff) << 8) + mask = 0x8000 + print('toggle', toggle, self.carrier, bool(d & mask)) + while mask: + bit = bool(d & mask) + if bit ^ self.carrier: + self.append(_T_RC6, _T_RC6) + else: + self.add(_T_RC6) + self.append(_T_RC6) + mask >>= 1 + self.append(_STOP) + self.start() diff --git a/ir_tx_test.py b/ir_tx_test.py new file mode 100644 index 0000000..2d78728 --- /dev/null +++ b/ir_tx_test.py @@ -0,0 +1,79 @@ +# ir_tx_test.py Test for nonblocking NEC/RC-5/RC-6 mode 0 IR blaster. + +# Released under the MIT License (MIT). See LICENSE. + +# Copyright (c) 2020 Peter Hinch + +# Implements a 2-button remote control on a Pyboard + +from pyb import Pin, LED +import uasyncio as asyncio +from aswitch import Switch, Delay_ms +from ir_tx import NEC, RC5, RC6_M0 + +loop = asyncio.get_event_loop() + +class Rbutton: + toggle = 1 # toggle is ignored in NEC mode + def __init__(self, irb, pin, addr, data, rep_code=False): + self.irb = irb + self.sw = Switch(pin) + self.addr = addr + self.data = data + self.rep_code = rep_code + self.sw.close_func(self.cfunc) + self.sw.open_func(self.ofunc) + self.tim = Delay_ms(self.repeat) + + def cfunc(self): # Button push: send data + self.irb.transmit(self.addr, self.data, Rbutton.toggle) + # Auto repeat + self.tim.trigger(108) + + def ofunc(self): # Button release: cancel repeat timer + self.tim.stop() + Rbutton.toggle ^= 1 # Toggle control + + async def repeat(self): + await asyncio.sleep(0) # Let timer stop before retriggering + if not self.sw(): # Button is still pressed: retrigger + self.tim.trigger(108) + if self.rep_code: + self.irb.repeat() # NEC special case: send REPEAT code + else: + self.irb.transmit(self.addr, self.data, Rbutton.toggle) + +async def main(proto): + # Test uses a 38KHz carrier. Some Philips systems use 36KHz. + # If button is held down normal behaviour is to retransmit + # but most NEC models send a RPEAT code + rep_code = False # Don't care for RC-X. NEC protocol only. + pin = Pin('X1') + if not proto: + irb = NEC(pin) # Default NEC freq == 38KHz + # Option to send REPEAT code. Most remotes do this. + rep_code = True + elif proto == 5: + irb = RC5(pin, 38000) + elif proto == 6: + irb = RC6_M0(pin, 38000) + + b = [] # Rbutton instances + b.append(Rbutton(irb, Pin('X3', Pin.IN, Pin.PULL_UP), 0x1, 0x7, rep_code)) + b.append(Rbutton(irb, Pin('X4', Pin.IN, Pin.PULL_UP), 0x10, 0xb, rep_code)) + led = LED(1) + while True: + await asyncio.sleep_ms(500) # Obligatory flashing LED. + led.toggle() + +s = '''Test for IR transmitter. Run: +ir_tx_test.test() for NEC protocol +ir_tx_test.test(5) for RC-5 protocol +ir_tx_test.test(6) for RC-6 mode 0. + +Ground X3 to send addr 1 data 7 +Ground X4 to send addr 0x10 data 0x0b.''' +print(s) + +def test(proto=0): + loop.run_until_complete(main(proto)) -- 2.47.3