]> vault307.fbx.one Git - ir_remote.git/blob - primitives/events.py
infrared remote
[ir_remote.git] / primitives / events.py
1 # events.py Event based primitives
2
3 # Copyright (c) 2022 Peter Hinch
4 # Released under the MIT License (MIT) - see LICENSE file
5
6 import uasyncio as asyncio
7 from . import Delay_ms
8
9 # An Event-like class that can wait on an iterable of Event-like instances.
10 # .wait pauses until any passed event is set.
11 class WaitAny:
12 def __init__(self, events):
13 self.events = events
14 self.trig_event = None
15 self.evt = asyncio.Event()
16
17 async def wait(self):
18 tasks = [asyncio.create_task(self.wt(event)) for event in self.events]
19 try:
20 await self.evt.wait()
21 finally:
22 self.evt.clear()
23 for task in tasks:
24 task.cancel()
25 return self.trig_event
26
27 async def wt(self, event):
28 await event.wait()
29 self.evt.set()
30 self.trig_event = event
31
32 def event(self):
33 return self.trig_event
34
35 def clear(self):
36 for evt in (x for x in self.events if hasattr(x, 'clear')):
37 evt.clear()
38
39 # An Event-like class that can wait on an iterable of Event-like instances,
40 # .wait pauses until all passed events have been set.
41 class WaitAll:
42 def __init__(self, events):
43 self.events = events
44
45 async def wait(self):
46 async def wt(event):
47 await event.wait()
48 tasks = (asyncio.create_task(wt(event)) for event in self.events)
49 try:
50 await asyncio.gather(*tasks)
51 finally: # May be subject to timeout or cancellation
52 for task in tasks:
53 task.cancel()
54
55 def clear(self):
56 for evt in (x for x in self.events if hasattr(x, 'clear')):
57 evt.clear()
58
59 # Minimal switch class having an Event based interface
60 class ESwitch:
61 debounce_ms = 50
62
63 def __init__(self, pin, lopen=1): # Default is n/o switch returned to gnd
64 self._pin = pin # Should be initialised for input with pullup
65 self._lopen = lopen # Logic level in "open" state
66 self.open = asyncio.Event()
67 self.close = asyncio.Event()
68 self._state = self._pin() ^ self._lopen # Get initial state
69 asyncio.create_task(self._poll(ESwitch.debounce_ms))
70
71 async def _poll(self, dt): # Poll the button
72 while True:
73 if (s := self._pin() ^ self._lopen) != self._state: # 15μs
74 self._state = s
75 self._cf() if s else self._of()
76 await asyncio.sleep_ms(dt) # Wait out bounce
77
78 def _of(self):
79 self.open.set()
80
81 def _cf(self):
82 self.close.set()
83
84 # ***** API *****
85 # Return current state of switch (0 = pressed)
86 def __call__(self):
87 return self._state
88
89 def deinit(self):
90 self._poll.cancel()
91 self.open.clear()
92 self.close.clear()
93
94 # Minimal pushbutton class having an Event based interface
95 class EButton:
96 debounce_ms = 50 # Attributes can be varied by user
97 long_press_ms = 1000
98 double_click_ms = 400
99
100 def __init__(self, pin, suppress=False, sense=None):
101 self._pin = pin # Initialise for input
102 self._supp = suppress
103 self._sense = pin() if sense is None else sense
104 self._state = self.rawstate() # Initial logical state
105 self._ltim = Delay_ms(duration = EButton.long_press_ms)
106 self._dtim = Delay_ms(duration = EButton.double_click_ms)
107 self.press = asyncio.Event() # *** API ***
108 self.double = asyncio.Event()
109 self.long = asyncio.Event()
110 self.release = asyncio.Event() # *** END API ***
111 self._tasks = [asyncio.create_task(self._poll(EButton.debounce_ms))] # Tasks run forever. Poll contacts
112 self._tasks.append(asyncio.create_task(self._ltf())) # Handle long press
113 if suppress:
114 self._tasks.append(asyncio.create_task(self._dtf())) # Double timer
115
116 async def _poll(self, dt): # Poll the button
117 while True:
118 if (s := self.rawstate()) != self._state:
119 self._state = s
120 self._pf() if s else self._rf()
121 await asyncio.sleep_ms(dt) # Wait out bounce
122
123 def _pf(self): # Button press
124 if not self._supp:
125 self.press.set() # User event
126 if not self._ltim(): # Don't retrigger long timer if already running
127 self._ltim.trigger()
128 if self._dtim(): # Press occurred while _dtim is running
129 self.double.set() # User event
130 self._dtim.stop() # _dtim's Event is only used if suppress
131 else:
132 self._dtim.trigger()
133
134 def _rf(self): # Button release
135 self._ltim.stop()
136 if not self._supp or not self._dtim(): # If dtim running postpone release otherwise it
137 self.release.set() # is set before press
138
139 async def _ltf(self): # Long timeout
140 while True:
141 await self._ltim.wait()
142 self._ltim.clear() # Clear the event
143 self.long.set() # User event
144
145 async def _dtf(self): # Double timeout (runs if suppress is set)
146 while True:
147 await self._dtim.wait()
148 self._dtim.clear() # Clear the event
149 if not self._ltim(): # Button was released
150 self.press.set() # User events
151 self.release.set()
152
153 # ****** API ******
154 # Current non-debounced logical button state: True == pressed
155 def rawstate(self):
156 return bool(self._pin() ^ self._sense)
157
158 # Current debounced state of button (True == pressed)
159 def __call__(self):
160 return self._state
161
162 def deinit(self):
163 for task in self._tasks:
164 task.cancel()
165 for evt in (self.press, self.double, self.long, self.release):
166 evt.clear()