]> vault307.fbx.one Git - ir_remote.git/blob - primitives/tests/asyntest.py
infrared remote
[ir_remote.git] / primitives / tests / asyntest.py
1 # asyntest.py Test/demo of the 'micro' Event, Barrier and Semaphore classes
2 # Test/demo of official asyncio library and official Lock class
3
4 # Copyright (c) 2017-2022 Peter Hinch
5 # Released under the MIT License (MIT) - see LICENSE file
6
7 # CPython 3.8 compatibility
8 # (ignore RuntimeWarning: coroutine '_g' was never awaited)
9 # To run:
10 # from primitives.tests.asyntest import test
11
12 try:
13 import uasyncio as asyncio
14 except ImportError:
15 import asyncio
16 import sys
17 unix = "linux" in sys.implementation._machine
18
19 from primitives import Barrier, Semaphore, BoundedSemaphore, Condition, Queue, RingbufQueue
20 try:
21 from threadsafe import Message
22 except:
23 pass
24
25 def print_tests():
26 st = '''Available functions:
27 test(0) Print this list.
28 test(1) Test message acknowledge.
29 test(2) Test Message and Lock objects.
30 test(3) Test the Barrier class with callback.
31 test(4) Test the Barrier class with coroutine.
32 test(5) Test Semaphore
33 test(6) Test BoundedSemaphore.
34 test(7) Test the Condition class.
35 test(8) Test the Queue class.
36 test(9) Test the RingbufQueue class.
37 '''
38 print('\x1b[32m')
39 print(st)
40 print('\x1b[39m')
41
42 print_tests()
43
44 def printexp(exp, runtime=0):
45 print('Expected output:')
46 print('\x1b[32m')
47 print(exp)
48 print('\x1b[39m')
49 if runtime:
50 print('Running (runtime = {}s):'.format(runtime))
51 else:
52 print('Running (runtime < 1s):')
53
54 # ************ Test Message class ************
55 # Demo use of acknowledge message
56
57 async def message_wait(message, ack_message, n):
58 try:
59 await message
60 print(f'message_wait {n} got message: {message.value()}')
61 if ack_message is not None:
62 ack_message.set()
63 except asyncio.CancelledError:
64 print(f"message_wait {n} cancelled")
65
66 async def run_ack(n):
67 message = Message()
68 ack1 = Message()
69 ack2 = Message()
70 for count in range(n):
71 t0 = asyncio.create_task(message_wait(message, ack1, 1))
72 t1 = asyncio.create_task(message_wait(message, ack2, 2))
73 message.set(count)
74 print('message was set')
75 await ack1
76 ack1.clear()
77 print('Cleared ack1')
78 await ack2
79 ack2.clear()
80 print('Cleared ack2')
81 message.clear()
82 print('Cleared message')
83 await asyncio.sleep(1)
84 t0.cancel()
85 t1.cancel()
86
87 async def msg_send(msg, items):
88 for item in items:
89 await asyncio.sleep_ms(400)
90 msg.set(item)
91
92 async def msg_recv(msg): # Receive using asynchronous iterator
93 async for data in msg:
94 print("Got", data)
95 msg.clear()
96
97 async def ack_coro():
98 print("Test multiple tasks waiting on a message.")
99 await run_ack(3)
100 print()
101 print("Test asynchronous iterator.")
102 msg = Message()
103 asyncio.create_task(msg_send(msg, (1, 2, 3)))
104 try:
105 await asyncio.wait_for(msg_recv(msg), 3)
106 except asyncio.TimeoutError:
107 pass
108 await asyncio.sleep(1)
109 print()
110 print("Test cancellation of first waiting task.")
111 t1 = asyncio.create_task(message_wait(msg, None, 1))
112 t2 = asyncio.create_task(message_wait(msg, None, 2))
113 await asyncio.sleep(1)
114 t1.cancel()
115 await asyncio.sleep(1)
116 print("Setting message")
117 msg.set("Test message")
118 await asyncio.sleep(1) # Tasks have ended or been cancelled
119 msg.clear()
120 print()
121 print("Test cancellation of second waiting task.")
122 t1 = asyncio.create_task(message_wait(msg, None, 1))
123 t2 = asyncio.create_task(message_wait(msg, None, 2))
124 await asyncio.sleep(1)
125 t2.cancel()
126 await asyncio.sleep(1)
127 print("Setting message")
128 msg.set("Test message")
129 await asyncio.sleep(1)
130 msg.clear()
131
132 print("I've seen attack ships burn on the shoulder of Orion...")
133 print("Time to die...")
134
135 def ack_test():
136 if unix:
137 print("Message class is incompatible with Unix build.")
138 return
139 printexp('''Running (runtime = 12s):
140 Test multiple tasks waiting on a message.
141 message was set
142 message_wait 1 got message: 0
143 message_wait 2 got message: 0
144 Cleared ack1
145 Cleared ack2
146 Cleared message
147 message was set
148 message_wait 1 got message: 1
149 message_wait 2 got message: 1
150 Cleared ack1
151 Cleared ack2
152 Cleared message
153 message was set
154 message_wait 1 got message: 2
155 message_wait 2 got message: 2
156 Cleared ack1
157 Cleared ack2
158 Cleared message
159
160 Test asynchronous iterator.
161 Got 1
162 Got 2
163 Got 3
164
165 Test cancellation of first waiting task.
166 message_wait 1 cancelled
167 Setting message
168 message_wait 2 got message: Test message
169
170 Test cancellation of second waiting task.
171 message_wait 2 cancelled
172 Setting message
173 message_wait 1 got message: Test message
174 I've seen attack ships burn on the shoulder of Orion...
175 Time to die...
176 ''', 12)
177 asyncio.run(ack_coro())
178
179 # ************ Test Lock and Message classes ************
180
181 async def run_lock(n, lock):
182 print('run_lock {} waiting for lock'.format(n))
183 await lock.acquire()
184 print('run_lock {} acquired lock'.format(n))
185 await asyncio.sleep(1) # Delay to demo other coros waiting for lock
186 lock.release()
187 print('run_lock {} released lock'.format(n))
188
189 async def messageset(message):
190 print('Waiting 5 secs before setting message')
191 await asyncio.sleep(5)
192 message.set()
193 print('message was set')
194
195 async def messagewait(message):
196 print('waiting for message')
197 await message
198 print('got message')
199 message.clear()
200
201 async def run_message_test():
202 print('Test Lock class')
203 lock = asyncio.Lock()
204 asyncio.create_task(run_lock(1, lock))
205 asyncio.create_task(run_lock(2, lock))
206 asyncio.create_task(run_lock(3, lock))
207 print('Test Message class')
208 message = Message()
209 asyncio.create_task(messageset(message))
210 await messagewait(message) # run_message_test runs fast until this point
211 print('Message status {}'.format('Incorrect' if message.is_set() else 'OK'))
212 print('Tasks complete')
213
214 def msg_test():
215 if unix:
216 print("Message class is incompatible with Unix build.")
217 return
218 printexp('''Test Lock class
219 Test Message class
220 waiting for message
221 run_lock 1 waiting for lock
222 run_lock 1 acquired lock
223 run_lock 2 waiting for lock
224 run_lock 3 waiting for lock
225 Waiting 5 secs before setting message
226 run_lock 1 released lock
227 run_lock 2 acquired lock
228 run_lock 2 released lock
229 run_lock 3 acquired lock
230 run_lock 3 released lock
231 message was set
232 got message
233 Message status OK
234 Tasks complete
235 ''', 5)
236 asyncio.run(run_message_test())
237
238 # ************ Barrier test ************
239
240 async def killer(duration):
241 await asyncio.sleep(duration)
242
243 def callback(text):
244 print(text)
245
246 async def report(barrier):
247 for i in range(5):
248 print('{} '.format(i), end='')
249 await barrier
250
251 async def do_barrier_test():
252 barrier = Barrier(3, callback, ('Synch',))
253 for _ in range(2):
254 for _ in range(3):
255 asyncio.create_task(report(barrier))
256 await asyncio.sleep(1)
257 print()
258 await asyncio.sleep(1)
259
260 def barrier_test():
261 printexp('''Running (runtime = 3s):
262 0 0 0 Synch
263 1 1 1 Synch
264 2 2 2 Synch
265 3 3 3 Synch
266 4 4 4 Synch
267
268 1 1 1 Synch
269 2 2 2 Synch
270 3 3 3 Synch
271 4 4 4 Synch
272 ''', 3)
273 asyncio.run(do_barrier_test())
274
275 # ************ Barrier test 1 ************
276
277 async def my_coro(text):
278 try:
279 await asyncio.sleep_ms(0)
280 while True:
281 await asyncio.sleep(1)
282 print(text)
283 except asyncio.CancelledError:
284 print('my_coro was cancelled.')
285
286 async def report1(barrier, x):
287 await asyncio.sleep(x)
288 print('report instance', x, 'waiting')
289 await barrier
290 print('report instance', x, 'done')
291
292 async def bart():
293 barrier = Barrier(4, my_coro, ('my_coro running',))
294 for x in range(3):
295 asyncio.create_task(report1(barrier, x))
296 await asyncio.sleep(4)
297 assert barrier.busy()
298 await barrier
299 await asyncio.sleep(0)
300 assert not barrier.busy()
301 # Must yield before reading result(). Here we wait long enough for
302 await asyncio.sleep_ms(1500) # coro to print
303 barrier.result().cancel()
304 await asyncio.sleep(2)
305
306 def barrier_test1():
307 printexp('''Running (runtime = 5s):
308 report instance 0 waiting
309 report instance 1 waiting
310 report instance 2 waiting
311 report instance 2 done
312 report instance 1 done
313 report instance 0 done
314 my_coro running
315 my_coro was cancelled.
316
317 Exact report instance done sequence may vary, but 3 instances should report
318 done before my_coro runs.
319 ''', 5)
320 asyncio.run(bart())
321
322 # ************ Semaphore test ************
323
324 async def run_sema(n, sema, barrier):
325 print('run_sema {} trying to access semaphore'.format(n))
326 async with sema:
327 print('run_sema {} acquired semaphore'.format(n))
328 # Delay demonstrates other coros waiting for semaphore
329 await asyncio.sleep(1 + n/10) # n/10 ensures deterministic printout
330 print('run_sema {} has released semaphore'.format(n))
331 barrier.trigger()
332
333 async def run_sema_test(bounded):
334 num_coros = 5
335 barrier = Barrier(num_coros + 1)
336 if bounded:
337 semaphore = BoundedSemaphore(3)
338 else:
339 semaphore = Semaphore(3)
340 for n in range(num_coros):
341 asyncio.create_task(run_sema(n, semaphore, barrier))
342 await barrier # Quit when all coros complete
343 try:
344 semaphore.release()
345 except ValueError:
346 print('Bounded semaphore exception test OK')
347
348 def semaphore_test(bounded=False):
349 if bounded:
350 exp = '''run_sema 0 trying to access semaphore
351 run_sema 0 acquired semaphore
352 run_sema 1 trying to access semaphore
353 run_sema 1 acquired semaphore
354 run_sema 2 trying to access semaphore
355 run_sema 2 acquired semaphore
356 run_sema 3 trying to access semaphore
357 run_sema 4 trying to access semaphore
358 run_sema 0 has released semaphore
359 run_sema 4 acquired semaphore
360 run_sema 1 has released semaphore
361 run_sema 3 acquired semaphore
362 run_sema 2 has released semaphore
363 run_sema 4 has released semaphore
364 run_sema 3 has released semaphore
365 Bounded semaphore exception test OK
366
367 Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
368 else:
369 exp = '''run_sema 0 trying to access semaphore
370 run_sema 0 acquired semaphore
371 run_sema 1 trying to access semaphore
372 run_sema 1 acquired semaphore
373 run_sema 2 trying to access semaphore
374 run_sema 2 acquired semaphore
375 run_sema 3 trying to access semaphore
376 run_sema 4 trying to access semaphore
377 run_sema 0 has released semaphore
378 run_sema 3 acquired semaphore
379 run_sema 1 has released semaphore
380 run_sema 4 acquired semaphore
381 run_sema 2 has released semaphore
382 run_sema 3 has released semaphore
383 run_sema 4 has released semaphore
384
385 Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
386 printexp(exp, 3)
387 asyncio.run(run_sema_test(bounded))
388
389 # ************ Condition test ************
390
391 cond = Condition()
392 tim = 0
393
394 async def cond01():
395 while True:
396 await asyncio.sleep(2)
397 with await cond:
398 cond.notify(2) # Notify 2 tasks
399
400 async def cond03(): # Maintain a count of seconds
401 global tim
402 await asyncio.sleep(0.5)
403 while True:
404 await asyncio.sleep(1)
405 tim += 1
406
407 async def cond02(n, barrier):
408 with await cond:
409 print('cond02', n, 'Awaiting notification.')
410 await cond.wait()
411 print('cond02', n, 'triggered. tim =', tim)
412 barrier.trigger()
413
414 def predicate():
415 return tim >= 8 # 12
416
417 async def cond04(n, barrier):
418 with await cond:
419 print('cond04', n, 'Awaiting notification and predicate.')
420 await cond.wait_for(predicate)
421 print('cond04', n, 'triggered. tim =', tim)
422 barrier.trigger()
423
424 async def cond_go():
425 ntasks = 7
426 barrier = Barrier(ntasks + 1)
427 t1 = asyncio.create_task(cond01())
428 t3 = asyncio.create_task(cond03())
429 for n in range(ntasks):
430 asyncio.create_task(cond02(n, barrier))
431 await barrier # All instances of cond02 have completed
432 # Test wait_for
433 barrier = Barrier(2)
434 asyncio.create_task(cond04(99, barrier))
435 await barrier
436 # cancel continuously running coros.
437 t1.cancel()
438 t3.cancel()
439 await asyncio.sleep_ms(0)
440 print('Done.')
441
442 def condition_test():
443 printexp('''cond02 0 Awaiting notification.
444 cond02 1 Awaiting notification.
445 cond02 2 Awaiting notification.
446 cond02 3 Awaiting notification.
447 cond02 4 Awaiting notification.
448 cond02 5 Awaiting notification.
449 cond02 6 Awaiting notification.
450 cond02 5 triggered. tim = 1
451 cond02 6 triggered. tim = 1
452 cond02 3 triggered. tim = 3
453 cond02 4 triggered. tim = 3
454 cond02 1 triggered. tim = 5
455 cond02 2 triggered. tim = 5
456 cond02 0 triggered. tim = 7
457 cond04 99 Awaiting notification and predicate.
458 cond04 99 triggered. tim = 9
459 Done.
460 ''', 13)
461 asyncio.run(cond_go())
462
463 # ************ Queue test ************
464
465 async def slow_process():
466 await asyncio.sleep(2)
467 return 42
468
469 async def bar(q):
470 print('Waiting for slow process.')
471 result = await slow_process()
472 print('Putting result onto queue')
473 await q.put(result) # Put result on q
474
475 async def foo(q):
476 print("Running foo()")
477 result = await q.get()
478 print('Result was {}'.format(result))
479
480 async def q_put(n, q):
481 for x in range(8):
482 obj = (n, x)
483 await q.put(obj)
484 await asyncio.sleep(0)
485
486 async def q_get(n, q):
487 for x in range(8):
488 await q.get()
489 await asyncio.sleep(0)
490
491 async def putter(q):
492 # put some item, then sleep
493 for _ in range(20):
494 await q.put(1)
495 await asyncio.sleep_ms(50)
496
497
498 async def getter(q):
499 # checks for new items, and relies on the "blocking" of the get method
500 for _ in range(20):
501 await q.get()
502
503 async def queue_go():
504 q = Queue(10)
505 asyncio.create_task(foo(q))
506 asyncio.create_task(bar(q))
507 await asyncio.sleep(3)
508 for n in range(4):
509 asyncio.create_task(q_put(n, q))
510 await asyncio.sleep(1)
511 assert q.qsize() == 10
512 await q.get()
513 await asyncio.sleep(0.1)
514 assert q.qsize() == 10
515 while not q.empty():
516 await q.get()
517 await asyncio.sleep(0.1)
518 assert q.empty()
519 print('Competing put tasks test complete')
520
521 for n in range(4):
522 asyncio.create_task(q_get(n, q))
523 await asyncio.sleep(1)
524 x = 0
525 while not q.full():
526 await q.put(x)
527 await asyncio.sleep(0.3)
528 x += 1
529 assert q.qsize() == 10
530 print('Competing get tasks test complete')
531 await asyncio.gather(
532 putter(q),
533 getter(q)
534 )
535 print('Queue tests complete')
536 print("I've seen attack ships burn off the shoulder of Orion...")
537 print("Time to die...")
538
539 def queue_test():
540 printexp('''Running (runtime = 20s):
541 Running foo()
542 Waiting for slow process.
543 Putting result onto queue
544 Result was 42
545 Competing put tasks test complete
546 Competing get tasks test complete
547 Queue tests complete
548
549
550 I've seen attack ships burn off the shoulder of Orion...
551 Time to die...
552
553 ''', 20)
554 asyncio.run(queue_go())
555
556 # ************ RingbufQueue test ************
557
558 async def qread(q, lst, twr):
559 async for item in q:
560 lst.append(item)
561 await asyncio.sleep_ms(twr)
562
563 async def read(q, t, twr=0):
564 lst = []
565 try:
566 await asyncio.wait_for(qread(q, lst, twr), t)
567 except asyncio.TimeoutError:
568 pass
569 return lst
570
571 async def put_list(q, lst, twp=0):
572 for item in lst:
573 await q.put(item)
574 await asyncio.sleep_ms(twp)
575
576 async def rbq_go():
577 q = RingbufQueue([0 for _ in range(10)]) # 10 elements
578 pl = [n for n in range(15)]
579 print("Read waits on slow write.")
580 asyncio.create_task(put_list(q, pl, 100))
581 rl = await read(q, 2)
582 assert pl == rl
583 print('done')
584 print("Write waits on slow read.")
585 asyncio.create_task(put_list(q, pl))
586 rl = await read(q, 2, 100)
587 assert pl == rl
588 print('done')
589 print("Testing full, empty and qsize methods.")
590 assert q.empty()
591 assert q.qsize() == 0
592 assert not q.full()
593 await put_list(q, (1,2,3))
594 assert not q.empty()
595 assert q.qsize() == 3
596 assert not q.full()
597 print("Done")
598 print("Testing put_nowait and overruns.")
599 nfail = 0
600 for x in range(4, 15):
601 try:
602 q.put_nowait(x)
603 except IndexError:
604 nfail += 1
605 assert nfail == 5
606 assert q.full()
607 rl = await read(q, 2)
608 assert rl == [6, 7, 8, 9, 10, 11, 12, 13, 14]
609 print("Testing get_nowait.")
610 await q.put(1)
611 assert q.get_nowait() == 1
612 err = 0
613 try:
614 q.get_nowait()
615 except IndexError:
616 err = 1
617 assert err == 1
618 print("Tests complete.")
619 print("I've seen attack ships burn off the shoulder of Orion...")
620 print("Time to die...")
621
622 def rbq_test():
623 printexp('''Running (runtime = 6s):
624 Read waits on slow write.
625 done
626 Write waits on slow read.
627 done
628 Testing full, empty and qsize methods.
629 Done
630 Testing put_nowait and overruns.
631 Testing get_nowait.
632 Tests complete.
633 I've seen attack ships burn off the shoulder of Orion...
634 Time to die...
635
636 ''', 6)
637 asyncio.run(rbq_go())
638
639 # ************ ************
640 def test(n):
641 try:
642 if n == 1:
643 ack_test() # Test message acknowledge.
644 elif n == 2:
645 msg_test() # Test Messge and Lock objects.
646 elif n == 3:
647 barrier_test() # Test the Barrier class.
648 elif n == 4:
649 barrier_test1() # Test the Barrier class.
650 elif n == 5:
651 semaphore_test(False) # Test Semaphore
652 elif n == 6:
653 semaphore_test(True) # Test BoundedSemaphore.
654 elif n == 7:
655 condition_test() # Test the Condition class.
656 elif n == 8:
657 queue_test() # Test the Queue class.
658 elif n == 9:
659 rbq_test() # Test the RingbufQueue class.
660 except KeyboardInterrupt:
661 print('Interrupted')
662 finally:
663 asyncio.new_event_loop()
664 print_tests()