]> vault307.fbx.one Git - ir_remote.git/blob - primitives/tests/event_test.py
infrared remote
[ir_remote.git] / primitives / tests / event_test.py
1 # event_test.py Test WaitAll, WaitAny, ESwwitch, EButton
2
3 # Copyright (c) 2022 Peter Hinch
4 # Released under the MIT License (MIT) - see LICENSE file
5
6 # from primitives.tests.event_test import *
7
8 import uasyncio as asyncio
9 from primitives import Delay_ms, WaitAny, ESwitch, WaitAll, EButton
10 from pyb import Pin
11
12 events = [asyncio.Event() for _ in range(4)]
13
14 async def set_events(*ev):
15 for n in ev:
16 await asyncio.sleep(1)
17 print("Setting", n)
18 events[n].set()
19
20 def clear(msg):
21 print(msg)
22 for e in events:
23 e.clear()
24
25 async def can(obj, tim):
26 await asyncio.sleep(tim)
27 print("About to cancel")
28 obj.cancel()
29
30 async def foo(tsk):
31 print("Waiting")
32 await tsk
33
34 async def wait_test():
35 msg = """
36 \x1b[32m
37 Expected output:
38 Setting 0
39 Tested WaitAny 0
40 Setting 1
41 Tested WaitAny 1
42 Setting 2
43 Setting 3
44 Tested WaitAll 2, 3
45 Setting 0
46 Setting 3
47 Tested WaitAny 0, 3
48 Cancel in 3s
49 Setting 0
50 Setting 1
51 About to cancel
52 Cancelled.
53 Waiting for 4s
54 Timeout
55 done
56 \x1b[39m
57 """
58 print(msg)
59 wa = WaitAny((events[0], events[1], WaitAll((events[2], events[3]))))
60 asyncio.create_task(set_events(0))
61 await wa.wait()
62 clear("Tested WaitAny 0")
63 asyncio.create_task(set_events(1))
64 await wa.wait()
65 clear("Tested WaitAny 1")
66 asyncio.create_task(set_events(2, 3))
67 await wa.wait()
68 clear("Tested WaitAll 2, 3")
69 wa = WaitAll((WaitAny((events[0], events[1])), WaitAny((events[2], events[3]))))
70 asyncio.create_task(set_events(0, 3))
71 await wa.wait()
72 clear("Tested WaitAny 0, 3")
73 task = asyncio.create_task(wa.wait())
74 asyncio.create_task(set_events(0, 1)) # Does nothing
75 asyncio.create_task(can(task, 3))
76 print("Cancel in 3s")
77 try:
78 await task
79 except asyncio.CancelledError: # TODO why must we trap this?
80 print("Cancelled.")
81 print("Waiting for 4s")
82 try:
83 await asyncio.wait_for(wa.wait(), 4)
84 except asyncio.TimeoutError:
85 print("Timeout")
86 print("done")
87
88 val = 0
89 fail = False
90 pout = None
91 polarity = 0
92
93 async def monitor(evt, v, verbose):
94 global val
95 while True:
96 await evt.wait()
97 evt.clear()
98 val += v
99 verbose and print("Got", hex(v), hex(val))
100
101 async def pulse(ms=100):
102 pout(1 ^ polarity)
103 await asyncio.sleep_ms(ms)
104 pout(polarity)
105
106 def expect(v, e):
107 global fail
108 if v == e:
109 print("Pass")
110 else:
111 print(f"Fail: expected {e} got {v}")
112 fail = True
113
114 async def btest(btn, verbose, supp):
115 global val, fail
116 val = 0
117 events = btn.press, btn.release, btn.double, btn.long
118 tasks = []
119 for n, evt in enumerate(events):
120 tasks.append(asyncio.create_task(monitor(evt, 1 << 3 * n, verbose)))
121 await asyncio.sleep(1)
122 print("Start short press test")
123 await pulse()
124 await asyncio.sleep(1)
125 verbose and print("Test of short press", hex(val))
126 expect(val, 0x09)
127 val = 0
128 await asyncio.sleep(1)
129 print("Start long press test")
130 await pulse(2000)
131 await asyncio.sleep(4)
132 verbose and print("Long press", hex(val))
133 exp = 0x208 if supp else 0x209
134 expect(val, exp)
135 val = 0
136 await asyncio.sleep(1)
137 print("Start double press test")
138 await pulse()
139 await asyncio.sleep_ms(100)
140 await pulse()
141 await asyncio.sleep(4)
142 verbose and print("Double press", hex(val))
143 exp = 0x48 if supp else 0x52
144 expect(val, exp)
145 for task in tasks:
146 task.cancel()
147
148 async def stest(sw, verbose):
149 global val, fail
150 val = 0
151 events = sw.open, sw.close
152 tasks = []
153 for n, evt in enumerate(events):
154 tasks.append(asyncio.create_task(monitor(evt, 1 << 3 * n, verbose)))
155 asyncio.create_task(pulse(2000))
156 await asyncio.sleep(1)
157 expect(val, 0x08)
158 await asyncio.sleep(4) # Wait for any spurious events
159 verbose and print("Switch close and open", hex(val))
160 expect(val, 0x09)
161 for task in tasks:
162 task.cancel()
163
164 async def switch_test(pol, verbose):
165 global val, pout, polarity
166 polarity = pol
167 pin = Pin('Y1', Pin.IN)
168 pout = Pin('Y2', Pin.OUT, value=pol)
169 print("Testing EButton.")
170 print("suppress == False")
171 btn = EButton(pin)
172 await btest(btn, verbose, False)
173 print("suppress == True")
174 btn = EButton(pin, suppress=True)
175 await btest(btn, verbose, True)
176 print("Testing ESwitch")
177 sw = ESwitch(pin, pol)
178 await stest(sw, verbose)
179 print("Failures occurred.") if fail else print("All tests passed.")
180
181 def tests():
182 txt="""
183 \x1b[32m
184 Available tests:
185 1. test_switches(polarity=1, verbose=False) Test the ESwitch and Ebutton classe.
186 2. test_wait() Test the WaitAny and WaitAll primitives.
187
188 Switch tests assume a Pyboard with a link between Y1 and Y2.
189 \x1b[39m
190 """
191 print(txt)
192
193 tests()
194 def test_switches(polarity=1, verbose=False):
195 try:
196 asyncio.run(switch_test(polarity, verbose)) # polarity 1/0 is normal (off) electrical state.
197 finally:
198 asyncio.new_event_loop()
199 tests()
200
201 def test_wait():
202 try:
203 asyncio.run(wait_test())
204 finally:
205 asyncio.new_event_loop()
206 tests()