]> vault307.fbx.one Git - micorpython_ir.git/blob - ir_rx.py
210d61767a79ef24ed0eb762de770e1b44849acd
[micorpython_ir.git] / ir_rx.py
1 # ir_rx.py Decoder for IR remote control using synchronous code
2 # Supports RC-5 RC-6 mode 0 and NEC protocols.
3 # For a remote using NEC see https://www.adafruit.com/products/389
4
5 # Author: Peter Hinch
6 # Copyright Peter Hinch 2020 Released under the MIT license
7
8 from sys import platform
9 from micropython import const
10 from machine import Timer
11 from array import array
12 from utime import ticks_us, ticks_diff
13
14 if platform == 'pyboard':
15 from pyb import Pin, ExtInt
16 else:
17 from machine import Pin
18
19 ESP32 = platform == 'esp32' or platform == 'esp32_LoBo'
20
21 # Save RAM
22 # from micropython import alloc_emergency_exception_buf
23 # alloc_emergency_exception_buf(100)
24
25 # Result codes (accessible to application)
26 # Repeat button code
27 REPEAT = -1
28 # Error codes
29 BADSTART = -2
30 BADBLOCK = -3
31 BADREP = -4
32 OVERRUN = -5
33 BADDATA = -6
34 BADADDR = -7
35
36
37 # On 1st edge start a block timer. While the timer is running, record the time
38 # of each edge. When the timer times out decode the data. Duration must exceed
39 # the worst case block transmission time, but be less than the interval between
40 # a block start and a repeat code start (~108ms depending on protocol)
41
42 class IR_RX():
43 _erstr = "'{}' object has no attribute '{}'"
44
45 def __init__(self, pin, nedges, tblock, callback, *args): # Optional args for callback
46 self._nedges = nedges
47 self._tblock = tblock
48 self.callback = callback
49 self.args = args
50 self._errf = lambda _ : None
51 self.verbose = False
52
53 self._times = array('i', (0 for _ in range(nedges + 1))) # +1 for overrun
54 if platform == 'pyboard':
55 ei = ExtInt(pin, ExtInt.IRQ_RISING_FALLING, Pin.PULL_NONE, self._cb_pin)
56 elif ESP32:
57 ei = pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING))
58 else:
59 ei = pin.irq(handler = self._cb_pin, trigger = (Pin.IRQ_FALLING | Pin.IRQ_RISING), hard = True)
60 self._eint = ei # Keep reference??
61 self.edge = 0
62 self.tim = Timer(-1) # Sofware timer
63 self.cb = self.decode
64
65
66 # Pin interrupt. Save time of each edge for later decode.
67 def _cb_pin(self, line):
68 t = ticks_us()
69 # On overrun ignore pulses until software timer times out
70 if self.edge <= self._nedges: # Allow 1 extra pulse to record overrun
71 if not self.edge: # First edge received
72 self.tim.init(period=self._tblock , mode=Timer.ONE_SHOT, callback=self.cb)
73 self._times[self.edge] = t
74 self.edge += 1
75
76 @property
77 def extended(self):
78 raise AttributeError(self._erstr.format(self.__qualname__, 'extended'))
79
80 @property
81 def bits(self):
82 raise AttributeError(self._erstr.format(self.__qualname__, 'bits'))
83
84 def do_callback(self, cmd, addr, ext, thresh=0):
85 self.edge = 0
86 if cmd >= thresh:
87 self.callback(cmd, addr, ext, *self.args)
88 else:
89 self._errf(cmd)
90
91 def error_function(self, func):
92 self._errf = func
93
94 class NEC_IR(IR_RX):
95 def __init__(self, pin, callback, *args):
96 # Block lasts <= 80ms (extended mode) and has 68 edges
97 super().__init__(pin, 68, 80, callback, *args)
98 self._extended = True
99 self._addr = 0
100
101 def decode(self, _):
102 try:
103 if self.edge > 68:
104 raise RuntimeError(OVERRUN)
105 width = ticks_diff(self._times[1], self._times[0])
106 if width < 4000: # 9ms leading mark for all valid data
107 raise RuntimeError(BADSTART)
108 width = ticks_diff(self._times[2], self._times[1])
109 if width > 3000: # 4.5ms space for normal data
110 if self.edge < 68: # Haven't received the correct number of edges
111 raise RuntimeError(BADBLOCK)
112 # Time spaces only (marks are always 562.5µs)
113 # Space is 1.6875ms (1) or 562.5µs (0)
114 # Skip last bit which is always 1
115 val = 0
116 for edge in range(3, 68 - 2, 2):
117 val >>= 1
118 if ticks_diff(self._times[edge + 1], self._times[edge]) > 1120:
119 val |= 0x80000000
120 elif width > 1700: # 2.5ms space for a repeat code. Should have exactly 4 edges.
121 raise RuntimeError(REPEAT if self.edge == 4 else BADREP) # Treat REPEAT as error.
122 else:
123 raise RuntimeError(BADSTART)
124 addr = val & 0xff # 8 bit addr
125 cmd = (val >> 16) & 0xff
126 if cmd != (val >> 24) ^ 0xff:
127 raise RuntimeError(BADDATA)
128 if addr != ((val >> 8) ^ 0xff) & 0xff: # 8 bit addr doesn't match check
129 if not self._extended:
130 raise RuntimeError(BADADDR)
131 addr |= val & 0xff00 # pass assumed 16 bit address to callback
132 self._addr = addr
133 except RuntimeError as e:
134 cmd = e.args[0]
135 addr = self._addr if cmd == REPEAT else 0 # REPEAT uses last address
136 # Set up for new data burst and run user callback
137 self.do_callback(cmd, addr, 0, REPEAT)
138
139 @property
140 def extended(self):
141 return self._extended
142
143 @extended.setter
144 def extended(self, value):
145 self._extended = bool(value)
146
147 class SONY_IR(IR_RX):
148 def __init__(self, pin, callback, *args):
149 # 20 bit block has 42 edges and lasts <= 39ms nominal. Add 4ms to time
150 # for tolerances except in 20 bit case where timing is tight with a
151 # repeat period of 45ms.
152 t = int(3 + bits * 1.8) + (1 if bits == 20 else 4)
153 super().__init__(pin, 2 + bits * 2, t, callback, *args)
154 self._addr = 0
155 self._bits = 20
156
157 def decode(self, _):
158 try:
159 nedges = self.edge # No. of edges detected
160 self.verbose and print('nedges', nedges)
161 if nedges > 42:
162 raise RuntimeError(OVERRUN)
163 bits = (nedges - 2) // 2
164 if nedges not in (26, 32, 42) or bits > self._bits:
165 raise RuntimeError(BADBLOCK)
166 self.verbose and print('SIRC {}bit'.format(bits))
167 width = ticks_diff(self._times[1], self._times[0])
168 if not 1800 < width < 3000: # 2.4ms leading mark for all valid data
169 raise RuntimeError(BADSTART)
170 width = ticks_diff(self._times[2], self._times[1])
171 if not 350 < width < 1000: # 600μs space
172 raise RuntimeError(BADSTART)
173
174 val = 0 # Data received, LSB 1st
175 x = 2
176 bit = 1
177 while x < nedges - 2:
178 if ticks_diff(self._times[x + 1], self._times[x]) > 900:
179 val |= bit
180 bit <<= 1
181 x += 2
182
183 cmd = val & 0x7f # 7 bit command
184 val >>= 7
185 if nedges < 42:
186 addr = val & 0xff # 5 or 8 bit addr
187 val = 0
188 else:
189 addr = val & 0x1f # 5 bit addr
190 val >>= 5 # 8 bit extended
191 except RuntimeError as e:
192 cmd = e.args[0]
193 addr = 0
194 val = 0
195 self.do_callback(cmd, addr, val)
196
197 @property
198 def bits(self):
199 return self._bits
200
201 @bits.setter
202 def bits(self, value):
203 if value not in (12, 15, 20):
204 raise ValueError('bits must be 12, 15 or 20')
205 self._bits = value
206
207 class RC5_IR(IR_RX):
208 def __init__(self, pin, callback, *args):
209 # Block lasts <= 30ms and has <= 28 edges
210 super().__init__(pin, 28, 30, callback, *args)
211
212 def decode(self, _):
213 try:
214 nedges = self.edge # No. of edges detected
215 if not 14 <= nedges <= 28:
216 raise RuntimeError(OVERRUN if nedges > 28 else BADSTART)
217 # Regenerate bitstream
218 bits = 0
219 bit = 1
220 for x in range(1, nedges):
221 width = ticks_diff(self._times[x], self._times[x - 1])
222 if not 500 < width < 2000:
223 raise RuntimeError(BADBLOCK)
224 for _ in range(1 if width < 1334 else 2):
225 bits <<= 1
226 bits |= bit
227 bit ^= 1
228 self.verbose and print(bin(bits)) # Matches inverted scope waveform
229 # Decode Manchester code
230 x = 30
231 while not bits >> x:
232 x -= 1
233 m0 = 1 << x # Mask MS two bits (always 01)
234 m1 = m0 << 1
235 v = 0 # 14 bit bitstream
236 for _ in range(14):
237 v <<= 1
238 b0 = (bits & m0) > 0
239 b1 = (bits & m1) > 0
240 if b0 == b1:
241 raise RuntimeError(BADBLOCK)
242 v |= b0
243 m0 >>= 2
244 m1 >>= 2
245 # Split into fields (val, addr, ctrl)
246 val = (v & 0x3f) | (0x40 if ((v >> 12) & 1) else 0)
247 addr = (v >> 6) & 0x1f
248 ctrl = (v >> 11) & 1
249
250 except RuntimeError as e:
251 val, addr, ctrl = e.args[0], 0, 0
252 # Set up for new data burst and run user callback
253 self.do_callback(val, addr, ctrl)
254
255 class RC6_M0(IR_RX):
256 # Even on Pyboard D the 444μs nominal pulses can be recorded as up to 705μs
257 # Scope shows 360-520 μs (-84μs +76μs relative to nominal)
258 # Header nominal 2666, 889, 444, 889, 444, 444, 444, 444 carrier ON at end
259 hdr = ((1800, 4000), (593, 1333), (222, 750), (593, 1333), (222, 750), (222, 750), (222, 750), (222, 750))
260 def __init__(self, pin, callback, *args):
261 # Block lasts 23ms nominal and has <=44 edges
262 super().__init__(pin, 44, 30, callback, *args)
263
264 def decode(self, _):
265 try:
266 nedges = self.edge # No. of edges detected
267 if not 22 <= nedges <= 44:
268 raise RuntimeError(OVERRUN if nedges > 28 else BADSTART)
269 for x, lims in enumerate(self.hdr):
270 width = ticks_diff(self._times[x + 1], self._times[x])
271 if not (lims[0] < width < lims[1]):
272 self.verbose and print('Bad start', x, width, lims)
273 raise RuntimeError(BADSTART)
274 x += 1
275 width = ticks_diff(self._times[x + 1], self._times[x])
276 # 2nd bit of last 0 is 444μs (0) or 1333μs (1)
277 if not 222 < width < 1555:
278 self.verbose and print('Bad block 1 Width', width, 'x', x)
279 raise RuntimeError(BADBLOCK)
280 short = width < 889
281 v = int(not short)
282 bit = v
283 bits = 1 # Bits decoded
284 x += 1 + int(short)
285 width = ticks_diff(self._times[x + 1], self._times[x])
286 if not 222 < width < 1555:
287 self.verbose and print('Bad block 2 Width', width, 'x', x)
288 raise RuntimeError(BADBLOCK)
289 short = width < 1111
290 if not short:
291 bit ^= 1
292 x += 1 + int(short) # If it's short, we know width of next
293 v <<= 1
294 v |= bit # MSB of result
295 bits += 1
296 # Decode bitstream
297 while bits < 17:
298 # -1 convert count to index, -1 because we look ahead
299 if x > nedges - 2:
300 raise RuntimeError(BADBLOCK)
301 # width is 444/889 nominal
302 width = ticks_diff(self._times[x + 1], self._times[x])
303 if not 222 < width < 1111:
304 self.verbose and print('Bad block 3 Width', width, 'x', x)
305 raise RuntimeError(BADBLOCK)
306 short = width < 666
307 if not short:
308 bit ^= 1
309 v <<= 1
310 v |= bit
311 bits += 1
312 x += 1 + int(short)
313
314 if self.verbose:
315 ss = '20-bit format {:020b} x={} nedges={} bits={}'
316 print(ss.format(v, x, nedges, bits))
317
318 val = v & 0xff
319 addr = (v >> 8) & 0xff
320 ctrl = (v >> 16) & 1
321 except RuntimeError as e:
322 val, addr, ctrl = e.args[0], 0, 0
323 # Set up for new data burst and run user callback
324 self.do_callback(val, addr, ctrl)