]> vault307.fbx.one Git - ir_remote.git/blob - primitives/encoder.py
infrared remote
[ir_remote.git] / primitives / encoder.py
1 # encoder.py Asynchronous driver for incremental quadrature encoder.
2
3 # Copyright (c) 2021-2022 Peter Hinch
4 # Released under the MIT License (MIT) - see LICENSE file
5
6 # Thanks are due to @ilium007 for identifying the issue of tracking detents,
7 # https://github.com/peterhinch/micropython-async/issues/82.
8 # Also to Mike Teachman (@miketeachman) for design discussions and testing
9 # against a state table design
10 # https://github.com/miketeachman/micropython-rotary/blob/master/rotary.py
11
12 import uasyncio as asyncio
13 from machine import Pin
14
15 class Encoder:
16
17 def __init__(self, pin_x, pin_y, v=0, div=1, vmin=None, vmax=None,
18 mod=None, callback=lambda a, b : None, args=(), delay=100):
19 self._pin_x = pin_x
20 self._pin_y = pin_y
21 self._x = pin_x()
22 self._y = pin_y()
23 self._v = v * div # Initialise hardware value
24 self._cv = v # Current (divided) value
25 self.delay = delay # Pause (ms) for motion to stop/limit callback frequency
26
27 if ((vmin is not None) and v < vmin) or ((vmax is not None) and v > vmax):
28 raise ValueError('Incompatible args: must have vmin <= v <= vmax')
29 self._tsf = asyncio.ThreadSafeFlag()
30 trig = Pin.IRQ_RISING | Pin.IRQ_FALLING
31 try:
32 xirq = pin_x.irq(trigger=trig, handler=self._x_cb, hard=True)
33 yirq = pin_y.irq(trigger=trig, handler=self._y_cb, hard=True)
34 except TypeError: # hard arg is unsupported on some hosts
35 xirq = pin_x.irq(trigger=trig, handler=self._x_cb)
36 yirq = pin_y.irq(trigger=trig, handler=self._y_cb)
37 asyncio.create_task(self._run(vmin, vmax, div, mod, callback, args))
38
39 # Hardware IRQ's. Duration 36μs on Pyboard 1 ~50μs on ESP32.
40 # IRQ latency: 2nd edge may have occured by the time ISR runs, in
41 # which case there is no movement.
42 def _x_cb(self, pin_x):
43 if (x := pin_x()) != self._x:
44 self._x = x
45 self._v += 1 if x ^ self._pin_y() else -1
46 self._tsf.set()
47
48 def _y_cb(self, pin_y):
49 if (y := pin_y()) != self._y:
50 self._y = y
51 self._v -= 1 if y ^ self._pin_x() else -1
52 self._tsf.set()
53
54 async def _run(self, vmin, vmax, div, mod, cb, args):
55 pv = self._v # Prior hardware value
56 pcv = self._cv # Prior divided value passed to callback
57 lcv = pcv # Current value after limits applied
58 plcv = pcv # Previous value after limits applied
59 delay = self.delay
60 while True:
61 await self._tsf.wait()
62 await asyncio.sleep_ms(delay) # Wait for motion to stop.
63 hv = self._v # Sample hardware (atomic read).
64 if hv == pv: # A change happened but was negated before
65 continue # this got scheduled. Nothing to do.
66 pv = hv
67 cv = round(hv / div) # cv is divided value.
68 if not (dv := cv - pcv): # dv is change in divided value.
69 continue # No change
70 lcv += dv # lcv: divided value with limits/mod applied
71 lcv = lcv if vmax is None else min(vmax, lcv)
72 lcv = lcv if vmin is None else max(vmin, lcv)
73 lcv = lcv if mod is None else lcv % mod
74 self._cv = lcv # update ._cv for .value() before CB.
75 if lcv != plcv:
76 cb(lcv, lcv - plcv, *args) # Run user CB in uasyncio context
77 pcv = cv
78 plcv = lcv
79
80 def value(self):
81 return self._cv