]> vault307.fbx.one Git - micorpython_ir.git/commitdiff
Initial commit. RC6 rx not working.
authorPeter Hinch <peter@hinch.me.uk>
Mon, 24 Feb 2020 05:35:11 +0000 (05:35 +0000)
committerPeter Hinch <peter@hinch.me.uk>
Mon, 24 Feb 2020 05:35:11 +0000 (05:35 +0000)
ir_rx.py [new file with mode: 0644]
ir_rx_test.py [new file with mode: 0644]
ir_tx.py [new file with mode: 0644]
ir_tx_test.py [new file with mode: 0644]

diff --git a/ir_rx.py b/ir_rx.py
new file mode 100644 (file)
index 0000000..6f20b2b
--- /dev/null
+++ b/ir_rx.py
@@ -0,0 +1,228 @@
+# ir_rx.py Decoder for IR remote control using synchronous code
+# Supports RC-5 RC-6 mode 0 and NEC protocols.
+# For a remote using NEC see https://www.adafruit.com/products/389
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+from sys import platform
+from micropython import const
+from machine import Timer
+from array import array
+from utime import ticks_us, ticks_diff
+
+if platform == 'pyboard':
+    from pyb import Pin, ExtInt
+else:
+    from machine import Pin
+
+ESP32 = platform == 'esp32' or platform == 'esp32_LoBo'
+
+# Save RAM
+# from micropython import alloc_emergency_exception_buf
+# alloc_emergency_exception_buf(100)
+
+# Result codes (accessible to application)
+# Repeat button code
+REPEAT = -1
+# Error codes
+BADSTART = -2
+BADBLOCK = -3
+BADREP = -4
+OVERRUN = -5
+BADDATA = -6
+BADADDR = -7
+
+
+# On 1st edge start a block timer. While the timer is running, record the time
+# of each edge. When the timer times out decode the data. Duration must exceed
+# the worst case block transmission time, but be less than the interval between
+# a block start and a repeat code start (~108ms depending on protocol)
+
+class IR_RX():
+    def __init__(self, pin, nedges, tblock, callback, *args):  # Optional args for callback
+        self._nedges = nedges
+        self._tblock = tblock
+        self.callback = callback
+        self.args = args
+
+        self._times = array('i',  (0 for _ in range(nedges + 1)))  # +1 for overrun
+        if platform == 'pyboard':
+            ExtInt(pin, ExtInt.IRQ_RISING_FALLING, Pin.PULL_NONE, self._cb_pin)
+        elif ESP32:
+            pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING))
+        else:
+            pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING), hard = True)
+        self.edge = 0
+        self.tim = Timer(-1)  # Sofware timer
+        self.cb = self._decode
+
+
+    # Pin interrupt. Save time of each edge for later decode.
+    def _cb_pin(self, line):
+        t = ticks_us()
+        # On overrun ignore pulses until software timer times out
+        if self.edge <= self._nedges:  # Allow 1 extra pulse to record overrun
+            if not self.edge:  # First edge received
+                self.tim.init(period=self._tblock , mode=Timer.ONE_SHOT, callback=self.cb)
+            self._times[self.edge] = t
+            self.edge += 1
+
+class NEC_IR(IR_RX):
+    def __init__(self, pin, callback, extended, *args):
+        # Block lasts <= 80ms and has 68 edges
+        tblock = 80 if extended else 73  # Allow for some tx tolerance (?)
+        super().__init__(pin, 68, tblock, callback, *args)
+        self._extended = extended
+        self._addr = 0
+
+    def _decode(self, _):
+        overrun = self.edge > 68
+        val = OVERRUN if overrun else BADSTART
+        if not overrun:
+            width = ticks_diff(self._times[1], self._times[0])
+            if width > 4000:  # 9ms leading mark for all valid data
+                width = ticks_diff(self._times[2], self._times[1])
+                if width > 3000: # 4.5ms space for normal data
+                    if self.edge < 68:
+                        # Haven't received the correct number of edges
+                        val = BADBLOCK
+                    else:
+                        # Time spaces only (marks are always 562.5µs)
+                        # Space is 1.6875ms (1) or 562.5µs (0)
+                        # Skip last bit which is always 1
+                        val = 0
+                        for edge in range(3, 68 - 2, 2):
+                            val >>= 1
+                            if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120:
+                                val |= 0x80000000
+                elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges.
+                    val = REPEAT if self.edge == 4 else BADREP
+        addr = 0
+        if val >= 0:  # validate. Byte layout of val ~cmd cmd ~addr addr
+            addr = val & 0xff
+            cmd = (val >> 16) & 0xff
+            if addr == ((val >> 8) ^ 0xff) & 0xff:  # 8 bit address OK
+                val = cmd if cmd == (val >> 24) ^ 0xff else BADDATA
+                self._addr = addr
+            else:
+                addr |= val & 0xff00  # pass assumed 16 bit address to callback
+                if self._extended:
+                    val = cmd if cmd == (val >> 24) ^ 0xff else BADDATA
+                    self._addr = addr
+                else:
+                    val = BADADDR
+        if val == REPEAT:
+            addr = self._addr  # Last valid addresss
+        self.edge = 0  # Set up for new data burst and run user callback
+        self.callback(val, addr, *self.args)
+
+class RC5_IR(IR_RX):
+    def __init__(self, pin, callback, *args):
+        # Block lasts <= 30ms and has <= 28 edges
+        super().__init__(pin, 28, 30, callback, *args)
+
+    def _decode(self, _):
+        try:
+            nedges = self.edge  # No. of edges detected
+            if not 14 <= nedges <= 28:
+                raise RuntimeError(OVERRUN if nedges > 28 else BADSTART)
+            # Regenerate bitstream
+            bits = 0
+            bit = 1
+            for x in range(1, nedges):
+                width = ticks_diff(self._times[x], self._times[x - 1])
+                if not 500 < width < 2000:
+                    raise RuntimeError(BADBLOCK)
+                for _ in range(1 if width < 1334 else 2):
+                    bits <<= 1
+                    bits |= bit
+                bit ^= 1
+            #print(bin(bits))  # Matches inverted scope waveform
+            # Decode Manchester code
+            x = 30
+            while not bits >> x:
+                x -= 1
+            m0 = 1 << x  # Mask MS two bits (always 01)
+            m1 = m0 << 1
+            v = 0  # 14 bit bitstream
+            for _ in range(14):
+                v <<= 1
+                b0 = (bits & m0) > 0
+                b1 = (bits & m1) > 0
+                if b0 == b1:
+                    raise RuntimeError(BADBLOCK)
+                v |= b0
+                m0 >>= 2
+                m1 >>= 2
+            # Split into fields (val, addr, ctrl)
+            val = (v & 0x3f) | (0x40 if ((v >> 12) & 1) else 0)
+            addr = (v >> 6) & 0x1f
+            ctrl = (v >> 11) & 1
+
+        except RuntimeError as e:
+            val, addr, ctrl = e.args[0], 0, 0
+        self.edge = 0  # Set up for new data burst and run user callback
+        self.callback(val, addr, ctrl, *self.args)
+
+class RC6_M0(IR_RX):
+    # Even on Pyboard D these 444us nominal pulses can be recorded as up to 705us
+    hdr = ((1800, 4000), (593, 1333), (222, 750), (593, 1333), (222, 750), (222, 750), (222, 750), (222, 750))
+    def __init__(self, pin, callback, *args):
+        # Block lasts 23ms nominal and has <=44 edges
+        super().__init__(pin, 44, 30, callback, *args)
+
+    def _decode(self, _):
+        try:
+            nedges = self.edge  # No. of edges detected
+            if not 22 <= nedges <= 44:
+                raise RuntimeError(OVERRUN if nedges > 28 else BADSTART)
+            for x, lims in enumerate(self.hdr):
+                width = ticks_diff(self._times[x + 1], self._times[x])
+                if not (lims[0] < width < lims[1]):
+                    print('Bad start', x, width, lims)
+                    raise RuntimeError(BADSTART)
+            x += 1
+            width = ticks_diff(self._times[x + 1], self._times[x])
+            ctrl = width > 889  # Long bit
+            start = x + 2  # Skip 2nd long bit
+
+            # Regenerate bitstream
+            bits = 0
+            bit = 0
+            for x in range(start, nedges):
+                width = ticks_diff(self._times[x], self._times[x - 1])
+                if not 222 < width < 1333:
+                    print('Width', width)
+                    raise RuntimeError(BADBLOCK)
+                for _ in range(1 if width < 666 else 2):
+                    bits <<= 1
+                    bits |= bit
+                bit ^= 1
+            print(bin(bits), len(bin(bits)) - 2)
+
+            # Decode Manchester code
+            x = 32
+            while not bits >> x:
+                x -= 1
+            m0 = 1 << (x - 1)
+            m1 = 1 << x  # MSB of pair
+            v = 0  # 16 bit bitstream
+            for _ in range(16):
+                v <<= 1
+                b0 = (bits & m0) > 0
+                b1 = (bits & m1) > 0
+                #print(int(b1), int(b0))
+                if b0 == b1:
+                    raise RuntimeError(BADBLOCK)
+                v |= b0
+                m0 >>= 2
+                m1 >>= 2
+            # Split into fields (val, addr)
+            val = v & 0xff
+            addr = (v >> 8) & 0xff
+
+        except RuntimeError as e:
+            val, addr, ctrl = e.args[0], 0, 0
+        self.edge = 0  # Set up for new data burst and run user callback
+        self.callback(val, addr, ctrl, *self.args)
diff --git a/ir_rx_test.py b/ir_rx_test.py
new file mode 100644 (file)
index 0000000..908d23f
--- /dev/null
@@ -0,0 +1,57 @@
+# ir_rx_test.py Test program for IR remote control decoder arem.py
+# Supports Pyboard and ESP8266
+
+# Author: Peter Hinch
+# Copyright Peter Hinch 2020 Released under the MIT license
+
+# Run this to characterise a remote.
+
+from sys import platform
+import time
+from machine import Pin, freq
+from arem import *
+
+ESP32 = platform == 'esp32' or platform == 'esp32_LoBo'
+
+if platform == 'pyboard':
+    p = Pin('X3', Pin.IN)
+elif platform == 'esp8266':
+    freq(160000000)
+    p = Pin(13, Pin.IN)
+elif ESP32:
+    p = Pin(23, Pin.IN)
+
+errors = {BADSTART : 'Invalid start pulse', BADBLOCK : 'Error: bad block',
+          BADREP : 'Error: repeat', OVERRUN : 'Error: overrun',
+          BADDATA : 'Error: invalid data', BADADDR : 'Error: invalid address'}
+
+def cb(data, addr, ctrl):
+    if data == REPEAT:  # NEC protocol sends repeat codes.
+        print('Repeat code.')
+    elif data >= 0:
+        print('Data {:03x} Addr {:03x} Ctrl {:01x}'.format(data, addr, ctrl))
+    else:
+        print('{} Address: {}'.format(errors[data], hex(addr)))
+
+
+s = '''Test for IR receiver. Run:
+ir_tx_test.test() for NEC protocol,
+ir_tx_test.test(5) for Philips RC-5 protocol,
+ir_tx_test.test(6) for RC6 mode 0.
+
+Background processing means REPL prompt reappears.
+Hit ctrl-D to stop (soft reset).'''
+
+print(s)
+
+def test(proto=0):
+    if proto == 0:
+        ir = NEC_IR(p, cb, True, 0)  # Extended mode, dummy ctrl arg for callback
+    elif proto == 5:
+        ir = RC5_IR(p, cb)
+    elif proto == 6:
+        ir = RC6_M0(p, cb)
+    # A real application would do something here...
+    #while True:
+        #time.sleep(5)
+        #print('running')
diff --git a/ir_tx.py b/ir_tx.py
new file mode 100644 (file)
index 0000000..9757840
--- /dev/null
+++ b/ir_tx.py
@@ -0,0 +1,158 @@
+# ir_tx.py Nonblocking IR blaster
+# Runs on Pyboard D or Pyboard 1.x only (not Pyboard Lite)
+
+# Released under the MIT License (MIT). See LICENSE.
+
+# Copyright (c) 2020 Peter Hinch
+
+from pyb import Pin, Timer
+from time import sleep_us, sleep
+from micropython import const
+from array import array
+import micropython
+
+# micropython.alloc_emergency_exception_buf(100)
+
+# Common
+_SPACE = const(0)  # Or 100. Depends on wiring: 0 assumes logic 0 turns IR off.
+_STOP = const(0)  # End of data
+# NEC
+_TBURST = const(563)
+_T_ONE = const(1687)
+# RC5
+_T_RC5 = const(889)  # Time for pulse of carrier
+# RC6_M0
+_T_RC6 = const(444)
+_T2_RC6 = const(889)
+
+# IR abstract base class. Array holds periods in μs between toggling 36/38KHz
+# carrier on or off.
+# Subclass is responsible for populating .arr and initiating transmission.
+# Operation is in two phases: .transmit populates .arr with times in μs, then
+# calls .start to initiate physical transmission.
+class IR:
+
+    def __init__(self, pin, freq, asize, duty):
+        tim = Timer(2, freq=freq)
+        self._ch = tim.channel(1, Timer.PWM, pin=pin)
+        self._ch.pulse_width_percent(_SPACE)
+        self.duty = duty
+        self.arr = array('H', 0 for _ in range(asize))
+        self._tim = Timer(5)
+        self._tcb = self._cb
+        self.pretrans()
+
+    # Before populating array, zero pointer, set notional carrier state (off).
+    def pretrans(self):
+        self.aptr = 0  # Index into array
+        self.carrier = False
+
+    def start(self):
+        self.aptr = 0  # Reset pointer and initiate TX.
+        self._cb(self._tim)
+
+    def _cb(self, t):
+        t.deinit()
+        p = self.aptr
+        v = self.arr[p]
+        if v == _STOP:
+            self._ch.pulse_width_percent(_SPACE)  # Turn off IR LED.
+            return
+        self._ch.pulse_width_percent(_SPACE if p & 1 else self.duty)
+        self._tim.init(prescaler=84, period=v, callback=self._tcb)
+        self.aptr += 1
+
+    def append(self, *times):  # Append one or more time peiods to .arr
+        for t in times:
+            self.arr[self.aptr] = t
+            self.aptr += 1
+            self.carrier = not self.carrier  # Keep track of carrier state
+            print('append', t, 'carrier', self.carrier)
+
+    def add(self, t):  # Increase last time value
+        print('add', t)
+        self.arr[self.aptr - 1] += t  # Carrier unaffected
+
+# NEC protocol
+class NEC(IR):
+
+    def __init__(self, pin, freq=38000):  # NEC specifies 38KHz
+        super().__init__(pin, freq, 68, 50)
+
+    def _bit(self, b):
+        self.append(_TBURST, _T_ONE if b else _TBURST)
+
+    def transmit(self, addr, data, _=0):  # Ignore toggle if passed
+        self.pretrans()  # Set initial conditions
+        self.append(9000, 4500)
+        if addr < 256:  # Short address: append complement
+            addr |= ((addr ^ 0xff) << 8)
+        for x in range(16):
+            self._bit(addr & 1)
+            addr >>= 1
+        data |= ((data ^ 0xff) << 8)
+        for x in range(16):
+            self._bit(data & 1)
+            data >>= 1
+        self.append(_TBURST, _STOP)
+        self.start()
+
+    def repeat(self):
+        self.aptr = 0
+        self.append(9000, 2250, _TBURST, _STOP)
+
+
+# Philips RC5 protocol
+class RC5(IR):
+
+    def __init__(self, pin, freq=36000):
+        super().__init__(pin, freq, 28, 30)
+
+    def transmit(self, addr, data, toggle):
+        self.pretrans()  # Set initial conditions
+        d = (data & 0x3f) | ((addr & 0x1f) << 6) | ((data & 0x40) << 6) | ((toggle & 1) << 11)
+        print(bin(d))
+        mask = 0x2000
+        while mask:
+            if mask == 0x2000:
+                self.append(_T_RC5)
+            else:
+                bit = bool(d & mask)
+                if bit ^ self.carrier:
+                    self.add(_T_RC5)
+                    self.append(_T_RC5)
+                else:
+                    self.append(_T_RC5, _T_RC5)
+            mask >>= 1
+        self.append(_STOP)
+        self.start()
+
+# Philips RC6 mode 0 protocol
+class RC6_M0(IR):
+
+    def __init__(self, pin, freq=36000):
+        super().__init__(pin, freq, 44, 30)
+
+    def transmit(self, addr, data, toggle):
+        self.pretrans()  # Set initial conditions
+        # leader, 1, 0, 0, 0
+        self.append(2666, _T2_RC6, _T_RC6, _T2_RC6, _T_RC6, _T_RC6, _T_RC6, _T_RC6, _T_RC6)
+        # Append a single bit of twice duration
+        if toggle:
+            self.add(_T2_RC6)
+            self.append(_T2_RC6)
+        else:
+            self.append(_T2_RC6, _T2_RC6)
+        d = (data & 0xff) | ((addr & 0xff) << 8)
+        mask = 0x8000
+        print('toggle', toggle, self.carrier, bool(d & mask))
+        while mask:
+            bit = bool(d & mask)
+            if bit ^ self.carrier:
+                self.append(_T_RC6, _T_RC6)
+            else:
+                self.add(_T_RC6)
+                self.append(_T_RC6)
+            mask >>= 1
+        self.append(_STOP)
+        self.start()
diff --git a/ir_tx_test.py b/ir_tx_test.py
new file mode 100644 (file)
index 0000000..2d78728
--- /dev/null
@@ -0,0 +1,79 @@
+# ir_tx_test.py Test for nonblocking NEC/RC-5/RC-6 mode 0 IR blaster.
+
+# Released under the MIT License (MIT). See LICENSE.
+
+# Copyright (c) 2020 Peter Hinch
+
+# Implements a 2-button remote control on a Pyboard
+
+from pyb import Pin, LED
+import uasyncio as asyncio
+from aswitch import Switch, Delay_ms
+from ir_tx import NEC, RC5, RC6_M0
+
+loop = asyncio.get_event_loop()
+
+class Rbutton:
+    toggle = 1  # toggle is ignored in NEC mode
+    def __init__(self, irb, pin, addr, data, rep_code=False):
+        self.irb = irb
+        self.sw = Switch(pin)
+        self.addr = addr
+        self.data = data
+        self.rep_code = rep_code
+        self.sw.close_func(self.cfunc)
+        self.sw.open_func(self.ofunc)
+        self.tim = Delay_ms(self.repeat)
+
+    def cfunc(self):  # Button push: send data
+        self.irb.transmit(self.addr, self.data, Rbutton.toggle)
+        # Auto repeat
+        self.tim.trigger(108)
+
+    def ofunc(self):  # Button release: cancel repeat timer
+        self.tim.stop()
+        Rbutton.toggle ^= 1  # Toggle control
+
+    async def repeat(self):
+        await asyncio.sleep(0)  # Let timer stop before retriggering
+        if not self.sw():  # Button is still pressed: retrigger
+            self.tim.trigger(108)
+            if self.rep_code:
+                self.irb.repeat()  # NEC special case: send REPEAT code
+            else:
+                self.irb.transmit(self.addr, self.data, Rbutton.toggle)
+
+async def main(proto):
+    # Test uses a 38KHz carrier. Some Philips systems use 36KHz.
+    # If button is held down normal behaviour is to retransmit
+    # but most NEC models send a RPEAT code
+    rep_code = False  # Don't care for RC-X. NEC protocol only.
+    pin = Pin('X1')
+    if not proto:
+        irb = NEC(pin)  # Default NEC freq == 38KHz
+        # Option to send REPEAT code. Most remotes do this.
+        rep_code = True
+    elif proto == 5:
+        irb = RC5(pin, 38000)
+    elif proto == 6:
+        irb = RC6_M0(pin, 38000)
+
+    b = []  # Rbutton instances
+    b.append(Rbutton(irb, Pin('X3', Pin.IN, Pin.PULL_UP), 0x1, 0x7, rep_code))
+    b.append(Rbutton(irb, Pin('X4', Pin.IN, Pin.PULL_UP), 0x10, 0xb, rep_code))
+    led = LED(1)
+    while True:
+        await asyncio.sleep_ms(500)  # Obligatory flashing LED.
+        led.toggle()
+
+s = '''Test for IR transmitter. Run:
+ir_tx_test.test() for NEC protocol
+ir_tx_test.test(5) for RC-5 protocol
+ir_tx_test.test(6) for RC-6 mode 0.
+
+Ground X3 to send addr 1 data 7
+Ground X4 to send addr 0x10 data 0x0b.'''
+print(s)
+
+def test(proto=0):
+    loop.run_until_complete(main(proto))