]>
vault307.fbx.one Git - ir_remote.git/blob - primitives/events.py
1 # events.py Event based primitives
3 # Copyright (c) 2022 Peter Hinch
4 # Released under the MIT License (MIT) - see LICENSE file
6 import uasyncio
as asyncio
9 # An Event-like class that can wait on an iterable of Event-like instances.
10 # .wait pauses until any passed event is set.
12 def __init__(self
, events
):
14 self
.trig_event
= None
15 self
.evt
= asyncio
.Event()
18 tasks
= [asyncio
.create_task(self
.wt(event
)) for event
in self
.events
]
25 return self
.trig_event
27 async def wt(self
, event
):
30 self
.trig_event
= event
33 return self
.trig_event
36 for evt
in (x
for x
in self
.events
if hasattr(x
, 'clear')):
39 # An Event-like class that can wait on an iterable of Event-like instances,
40 # .wait pauses until all passed events have been set.
42 def __init__(self
, events
):
48 tasks
= (asyncio
.create_task(wt(event
)) for event
in self
.events
)
50 await asyncio
.gather(*tasks
)
51 finally: # May be subject to timeout or cancellation
56 for evt
in (x
for x
in self
.events
if hasattr(x
, 'clear')):
59 # Minimal switch class having an Event based interface
63 def __init__(self
, pin
, lopen
=1): # Default is n/o switch returned to gnd
64 self
._pin
= pin
# Should be initialised for input with pullup
65 self
._lopen
= lopen
# Logic level in "open" state
66 self
.open = asyncio
.Event()
67 self
.close
= asyncio
.Event()
68 self
._state
= self
._pin
() ^ self
._lopen
# Get initial state
69 asyncio
.create_task(self
._poll
(ESwitch
.debounce_ms
))
71 async def _poll(self
, dt
): # Poll the button
73 if (s
:= self
._pin
() ^ self
._lopen
) != self
._state
: # 15μs
75 self
._cf
() if s
else self
._of
()
76 await asyncio
.sleep_ms(dt
) # Wait out bounce
85 # Return current state of switch (0 = pressed)
94 # Minimal pushbutton class having an Event based interface
96 debounce_ms
= 50 # Attributes can be varied by user
100 def __init__(self
, pin
, suppress
=False, sense
=None):
101 self
._pin
= pin
# Initialise for input
102 self
._supp
= suppress
103 self
._sense
= pin() if sense
is None else sense
104 self
._state
= self
.rawstate() # Initial logical state
105 self
._ltim
= Delay_ms(duration
= EButton
.long_press_ms
)
106 self
._dtim
= Delay_ms(duration
= EButton
.double_click_ms
)
107 self
.press
= asyncio
.Event() # *** API ***
108 self
.double
= asyncio
.Event()
109 self
.long = asyncio
.Event()
110 self
.release
= asyncio
.Event() # *** END API ***
111 self
._tasks
= [asyncio
.create_task(self
._poll
(EButton
.debounce_ms
))] # Tasks run forever. Poll contacts
112 self
._tasks
.append(asyncio
.create_task(self
._ltf
())) # Handle long press
114 self
._tasks
.append(asyncio
.create_task(self
._dtf
())) # Double timer
116 async def _poll(self
, dt
): # Poll the button
118 if (s
:= self
.rawstate()) != self
._state
:
120 self
._pf
() if s
else self
._rf
()
121 await asyncio
.sleep_ms(dt
) # Wait out bounce
123 def _pf(self
): # Button press
125 self
.press
.set() # User event
126 if not self
._ltim
(): # Don't retrigger long timer if already running
128 if self
._dtim
(): # Press occurred while _dtim is running
129 self
.double
.set() # User event
130 self
._dtim
.stop() # _dtim's Event is only used if suppress
134 def _rf(self
): # Button release
136 if not self
._supp
or not self
._dtim
(): # If dtim running postpone release otherwise it
137 self
.release
.set() # is set before press
139 async def _ltf(self
): # Long timeout
141 await self
._ltim
.wait()
142 self
._ltim
.clear() # Clear the event
143 self
.long.set() # User event
145 async def _dtf(self
): # Double timeout (runs if suppress is set)
147 await self
._dtim
.wait()
148 self
._dtim
.clear() # Clear the event
149 if not self
._ltim
(): # Button was released
150 self
.press
.set() # User events
154 # Current non-debounced logical button state: True == pressed
156 return bool(self
._pin
() ^ self
._sense
)
158 # Current debounced state of button (True == pressed)
163 for task
in self
._tasks
:
165 for evt
in (self
.press
, self
.double
, self
.long, self
.release
):