]> vault307.fbx.one Git - micorpython_ir.git/blob - ir_rx.py
README update.
[micorpython_ir.git] / ir_rx.py
1 # ir_rx.py Decoder for IR remote control using synchronous code
2 # Supports RC-5 RC-6 mode 0 and NEC protocols.
3 # For a remote using NEC see https://www.adafruit.com/products/389
4
5 # Author: Peter Hinch
6 # Copyright Peter Hinch 2020 Released under the MIT license
7
8 from sys import platform
9 from micropython import const
10 from machine import Timer
11 from array import array
12 from utime import ticks_us, ticks_diff
13
14 if platform == 'pyboard':
15 from pyb import Pin, ExtInt
16 else:
17 from machine import Pin
18
19 ESP32 = platform == 'esp32' or platform == 'esp32_LoBo'
20
21 # Save RAM
22 # from micropython import alloc_emergency_exception_buf
23 # alloc_emergency_exception_buf(100)
24
25 # Result codes (accessible to application)
26 # Repeat button code
27 REPEAT = -1
28 # Error codes
29 BADSTART = -2
30 BADBLOCK = -3
31 BADREP = -4
32 OVERRUN = -5
33 BADDATA = -6
34 BADADDR = -7
35
36
37 # On 1st edge start a block timer. While the timer is running, record the time
38 # of each edge. When the timer times out decode the data. Duration must exceed
39 # the worst case block transmission time, but be less than the interval between
40 # a block start and a repeat code start (~108ms depending on protocol)
41
42 class IR_RX():
43 verbose = False
44 def __init__(self, pin, nedges, tblock, callback, *args): # Optional args for callback
45 self._nedges = nedges
46 self._tblock = tblock
47 self.callback = callback
48 self.args = args
49
50 self._times = array('i', (0 for _ in range(nedges + 1))) # +1 for overrun
51 if platform == 'pyboard':
52 ExtInt(pin, ExtInt.IRQ_RISING_FALLING, Pin.PULL_NONE, self._cb_pin)
53 elif ESP32:
54 pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING))
55 else:
56 pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING), hard = True)
57 self.edge = 0
58 self.tim = Timer(-1) # Sofware timer
59 self.cb = self.decode
60
61
62 # Pin interrupt. Save time of each edge for later decode.
63 def _cb_pin(self, line):
64 t = ticks_us()
65 # On overrun ignore pulses until software timer times out
66 if self.edge <= self._nedges: # Allow 1 extra pulse to record overrun
67 if not self.edge: # First edge received
68 self.tim.init(period=self._tblock , mode=Timer.ONE_SHOT, callback=self.cb)
69 self._times[self.edge] = t
70 self.edge += 1
71
72 class NEC_IR(IR_RX):
73 def __init__(self, pin, callback, extended=True, *args):
74 # Block lasts <= 80ms and has 68 edges
75 tblock = 80 if extended else 73 # Allow for some tx tolerance (?)
76 super().__init__(pin, 68, tblock, callback, *args)
77 self._extended = extended
78 self._addr = 0
79
80 def decode(self, _):
81 try:
82 if self.edge > 68:
83 raise RuntimeError(OVERRUN)
84 width = ticks_diff(self._times[1], self._times[0])
85 if width < 4000: # 9ms leading mark for all valid data
86 raise RuntimeError(BADSTART)
87 width = ticks_diff(self._times[2], self._times[1])
88 if width > 3000: # 4.5ms space for normal data
89 if self.edge < 68: # Haven't received the correct number of edges
90 raise RuntimeError(BADBLOCK)
91 # Time spaces only (marks are always 562.5µs)
92 # Space is 1.6875ms (1) or 562.5µs (0)
93 # Skip last bit which is always 1
94 val = 0
95 for edge in range(3, 68 - 2, 2):
96 val >>= 1
97 if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120:
98 val |= 0x80000000
99 elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges.
100 raise RuntimeError(REPEAT if self.edge == 4 else BADREP) # Treat REPEAT as error.
101 else:
102 raise RuntimeError(BADSTART)
103 addr = val & 0xff # 8 bit addr
104 cmd = (val >> 16) & 0xff
105 if cmd != (val >> 24) ^ 0xff:
106 raise RuntimeError(BADDATA)
107 if addr != ((val >> 8) ^ 0xff) & 0xff: # 8 bit addr doesn't match check
108 if not self._extended:
109 raise RuntimeError(BADADDR)
110 addr |= val & 0xff00 # pass assumed 16 bit address to callback
111 self._addr = addr
112 except RuntimeError as e:
113 cmd = e.args[0]
114 addr = self._addr if cmd == REPEAT else 0 # REPEAT uses last address
115 self.edge = 0 # Set up for new data burst and run user callback
116 self.callback(cmd, addr, 0, *self.args)
117
118 class RC5_IR(IR_RX):
119 def __init__(self, pin, callback, *args):
120 # Block lasts <= 30ms and has <= 28 edges
121 super().__init__(pin, 28, 30, callback, *args)
122
123 def decode(self, _):
124 try:
125 nedges = self.edge # No. of edges detected
126 if not 14 <= nedges <= 28:
127 raise RuntimeError(OVERRUN if nedges > 28 else BADSTART)
128 # Regenerate bitstream
129 bits = 0
130 bit = 1
131 for x in range(1, nedges):
132 width = ticks_diff(self._times[x], self._times[x - 1])
133 if not 500 < width < 2000:
134 raise RuntimeError(BADBLOCK)
135 for _ in range(1 if width < 1334 else 2):
136 bits <<= 1
137 bits |= bit
138 bit ^= 1
139 self.verbose and print(bin(bits)) # Matches inverted scope waveform
140 # Decode Manchester code
141 x = 30
142 while not bits >> x:
143 x -= 1
144 m0 = 1 << x # Mask MS two bits (always 01)
145 m1 = m0 << 1
146 v = 0 # 14 bit bitstream
147 for _ in range(14):
148 v <<= 1
149 b0 = (bits & m0) > 0
150 b1 = (bits & m1) > 0
151 if b0 == b1:
152 raise RuntimeError(BADBLOCK)
153 v |= b0
154 m0 >>= 2
155 m1 >>= 2
156 # Split into fields (val, addr, ctrl)
157 val = (v & 0x3f) | (0x40 if ((v >> 12) & 1) else 0)
158 addr = (v >> 6) & 0x1f
159 ctrl = (v >> 11) & 1
160
161 except RuntimeError as e:
162 val, addr, ctrl = e.args[0], 0, 0
163 self.edge = 0 # Set up for new data burst and run user callback
164 self.callback(val, addr, ctrl, *self.args)
165
166 class RC6_M0(IR_RX):
167 # Even on Pyboard D the 444μs nominal pulses can be recorded as up to 705μs
168 # Scope shows 360-520 μs (-84μs +76μs relative to nominal)
169 # Header nominal 2666, 889, 444, 889, 444, 444, 444, 444 carrier ON at end
170 hdr = ((1800, 4000), (593, 1333), (222, 750), (593, 1333), (222, 750), (222, 750), (222, 750), (222, 750))
171 def __init__(self, pin, callback, *args):
172 # Block lasts 23ms nominal and has <=44 edges
173 super().__init__(pin, 44, 30, callback, *args)
174
175 def decode(self, _):
176 try:
177 nedges = self.edge # No. of edges detected
178 if not 22 <= nedges <= 44:
179 raise RuntimeError(OVERRUN if nedges > 28 else BADSTART)
180 for x, lims in enumerate(self.hdr):
181 width = ticks_diff(self._times[x + 1], self._times[x])
182 if not (lims[0] < width < lims[1]):
183 self.verbose and print('Bad start', x, width, lims)
184 raise RuntimeError(BADSTART)
185 x += 1
186 width = ticks_diff(self._times[x + 1], self._times[x])
187 # 2nd bit of last 0 is 444μs (0) or 1333μs (1)
188 if not 222 < width < 1555:
189 self.verbose and print('Bad block 1 Width', width, 'x', x)
190 raise RuntimeError(BADBLOCK)
191 short = width < 889
192 v = int(not short)
193 bit = v
194 bits = 1 # Bits decoded
195 x += 1 + int(short)
196 width = ticks_diff(self._times[x + 1], self._times[x])
197 if not 222 < width < 1555:
198 self.verbose and print('Bad block 2 Width', width, 'x', x)
199 raise RuntimeError(BADBLOCK)
200 short = width < 1111
201 if not short:
202 bit ^= 1
203 x += 1 + int(short) # If it's short, we know width of next
204 v <<= 1
205 v |= bit # MSB of result
206 bits += 1
207 # Decode bitstream
208 while bits < 17:
209 # -1 convert count to index, -1 because we look ahead
210 if x > nedges - 2:
211 raise RuntimeError(BADBLOCK)
212 # width is 444/889 nominal
213 width = ticks_diff(self._times[x + 1], self._times[x])
214 if not 222 < width < 1111:
215 self.verbose and print('Bad block 3 Width', width, 'x', x)
216 raise RuntimeError(BADBLOCK)
217 short = width < 666
218 if not short:
219 bit ^= 1
220 v <<= 1
221 v |= bit
222 bits += 1
223 x += 1 + int(short)
224
225 if self.verbose:
226 ss = '20-bit format {:020b} x={} nedges={} bits={}'
227 print(ss.format(v, x, nedges, bits))
228
229 val = v & 0xff
230 addr = (v >> 8) & 0xff
231 ctrl = (v >> 16) & 1
232 except RuntimeError as e:
233 val, addr, ctrl = e.args[0], 0, 0
234 self.edge = 0 # Set up for new data burst and run user callback
235 self.callback(val, addr, ctrl, *self.args)