]>
vault307.fbx.one Git - ir_remote.git/blob - primitives/tests/asyntest.py
1 # asyntest.py Test/demo of the 'micro' Event, Barrier and Semaphore classes
2 # Test/demo of official asyncio library and official Lock class
4 # Copyright (c) 2017-2022 Peter Hinch
5 # Released under the MIT License (MIT) - see LICENSE file
7 # CPython 3.8 compatibility
8 # (ignore RuntimeWarning: coroutine '_g' was never awaited)
10 # from primitives.tests.asyntest import test
13 import uasyncio
as asyncio
17 unix
= "linux" in sys
.implementation
._machine
19 from primitives
import Barrier
, Semaphore
, BoundedSemaphore
, Condition
, Queue
, RingbufQueue
21 from threadsafe
import Message
26 st
= '''Available functions:
27 test(0) Print this list.
28 test(1) Test message acknowledge.
29 test(2) Test Message and Lock objects.
30 test(3) Test the Barrier class with callback.
31 test(4) Test the Barrier class with coroutine.
32 test(5) Test Semaphore
33 test(6) Test BoundedSemaphore.
34 test(7) Test the Condition class.
35 test(8) Test the Queue class.
36 test(9) Test the RingbufQueue class.
44 def printexp(exp
, runtime
=0):
45 print('Expected output:')
50 print('Running (runtime = {}s):'.format(runtime
))
52 print('Running (runtime < 1s):')
54 # ************ Test Message class ************
55 # Demo use of acknowledge message
57 async def message_wait(message
, ack_message
, n
):
60 print(f
'message_wait {n} got message: {message.value()}')
61 if ack_message
is not None:
63 except asyncio
.CancelledError
:
64 print(f
"message_wait {n} cancelled")
70 for count
in range(n
):
71 t0
= asyncio
.create_task(message_wait(message
, ack1
, 1))
72 t1
= asyncio
.create_task(message_wait(message
, ack2
, 2))
74 print('message was set')
82 print('Cleared message')
83 await asyncio
.sleep(1)
87 async def msg_send(msg
, items
):
89 await asyncio
.sleep_ms(400)
92 async def msg_recv(msg
): # Receive using asynchronous iterator
93 async for data
in msg
:
98 print("Test multiple tasks waiting on a message.")
101 print("Test asynchronous iterator.")
103 asyncio
.create_task(msg_send(msg
, (1, 2, 3)))
105 await asyncio
.wait_for(msg_recv(msg
), 3)
106 except asyncio
.TimeoutError
:
108 await asyncio
.sleep(1)
110 print("Test cancellation of first waiting task.")
111 t1
= asyncio
.create_task(message_wait(msg
, None, 1))
112 t2
= asyncio
.create_task(message_wait(msg
, None, 2))
113 await asyncio
.sleep(1)
115 await asyncio
.sleep(1)
116 print("Setting message")
117 msg
.set("Test message")
118 await asyncio
.sleep(1) # Tasks have ended or been cancelled
121 print("Test cancellation of second waiting task.")
122 t1
= asyncio
.create_task(message_wait(msg
, None, 1))
123 t2
= asyncio
.create_task(message_wait(msg
, None, 2))
124 await asyncio
.sleep(1)
126 await asyncio
.sleep(1)
127 print("Setting message")
128 msg
.set("Test message")
129 await asyncio
.sleep(1)
132 print("I've seen attack ships burn on the shoulder of Orion...")
133 print("Time to die...")
137 print("Message class is incompatible with Unix build.")
139 printexp('''Running (runtime = 12s):
140 Test multiple tasks waiting on a message.
142 message_wait 1 got message: 0
143 message_wait 2 got message: 0
148 message_wait 1 got message: 1
149 message_wait 2 got message: 1
154 message_wait 1 got message: 2
155 message_wait 2 got message: 2
160 Test asynchronous iterator.
165 Test cancellation of first waiting task.
166 message_wait 1 cancelled
168 message_wait 2 got message: Test message
170 Test cancellation of second waiting task.
171 message_wait 2 cancelled
173 message_wait 1 got message: Test message
174 I've seen attack ships burn on the shoulder of Orion...
177 asyncio
.run(ack_coro())
179 # ************ Test Lock and Message classes ************
181 async def run_lock(n
, lock
):
182 print('run_lock {} waiting for lock'.format(n
))
184 print('run_lock {} acquired lock'.format(n
))
185 await asyncio
.sleep(1) # Delay to demo other coros waiting for lock
187 print('run_lock {} released lock'.format(n
))
189 async def messageset(message
):
190 print('Waiting 5 secs before setting message')
191 await asyncio
.sleep(5)
193 print('message was set')
195 async def messagewait(message
):
196 print('waiting for message')
201 async def run_message_test():
202 print('Test Lock class')
203 lock
= asyncio
.Lock()
204 asyncio
.create_task(run_lock(1, lock
))
205 asyncio
.create_task(run_lock(2, lock
))
206 asyncio
.create_task(run_lock(3, lock
))
207 print('Test Message class')
209 asyncio
.create_task(messageset(message
))
210 await messagewait(message
) # run_message_test runs fast until this point
211 print('Message status {}'.format('Incorrect' if message
.is_set() else 'OK'))
212 print('Tasks complete')
216 print("Message class is incompatible with Unix build.")
218 printexp('''Test Lock class
221 run_lock 1 waiting for lock
222 run_lock 1 acquired lock
223 run_lock 2 waiting for lock
224 run_lock 3 waiting for lock
225 Waiting 5 secs before setting message
226 run_lock 1 released lock
227 run_lock 2 acquired lock
228 run_lock 2 released lock
229 run_lock 3 acquired lock
230 run_lock 3 released lock
236 asyncio
.run(run_message_test())
238 # ************ Barrier test ************
240 async def killer(duration
):
241 await asyncio
.sleep(duration
)
246 async def report(barrier
):
248 print('{} '.format(i
), end
='')
251 async def do_barrier_test():
252 barrier
= Barrier(3, callback
, ('Synch',))
255 asyncio
.create_task(report(barrier
))
256 await asyncio
.sleep(1)
258 await asyncio
.sleep(1)
261 printexp('''Running (runtime = 3s):
273 asyncio
.run(do_barrier_test())
275 # ************ Barrier test 1 ************
277 async def my_coro(text
):
279 await asyncio
.sleep_ms(0)
281 await asyncio
.sleep(1)
283 except asyncio
.CancelledError
:
284 print('my_coro was cancelled.')
286 async def report1(barrier
, x
):
287 await asyncio
.sleep(x
)
288 print('report instance', x
, 'waiting')
290 print('report instance', x
, 'done')
293 barrier
= Barrier(4, my_coro
, ('my_coro running',))
295 asyncio
.create_task(report1(barrier
, x
))
296 await asyncio
.sleep(4)
297 assert barrier
.busy()
299 await asyncio
.sleep(0)
300 assert not barrier
.busy()
301 # Must yield before reading result(). Here we wait long enough for
302 await asyncio
.sleep_ms(1500) # coro to print
303 barrier
.result().cancel()
304 await asyncio
.sleep(2)
307 printexp('''Running (runtime = 5s):
308 report instance 0 waiting
309 report instance 1 waiting
310 report instance 2 waiting
311 report instance 2 done
312 report instance 1 done
313 report instance 0 done
315 my_coro was cancelled.
317 Exact report instance done sequence may vary, but 3 instances should report
318 done before my_coro runs.
322 # ************ Semaphore test ************
324 async def run_sema(n
, sema
, barrier
):
325 print('run_sema {} trying to access semaphore'.format(n
))
327 print('run_sema {} acquired semaphore'.format(n
))
328 # Delay demonstrates other coros waiting for semaphore
329 await asyncio
.sleep(1 + n
/10) # n/10 ensures deterministic printout
330 print('run_sema {} has released semaphore'.format(n
))
333 async def run_sema_test(bounded
):
335 barrier
= Barrier(num_coros
+ 1)
337 semaphore
= BoundedSemaphore(3)
339 semaphore
= Semaphore(3)
340 for n
in range(num_coros
):
341 asyncio
.create_task(run_sema(n
, semaphore
, barrier
))
342 await barrier
# Quit when all coros complete
346 print('Bounded semaphore exception test OK')
348 def semaphore_test(bounded
=False):
350 exp
= '''run_sema 0 trying to access semaphore
351 run_sema 0 acquired semaphore
352 run_sema 1 trying to access semaphore
353 run_sema 1 acquired semaphore
354 run_sema 2 trying to access semaphore
355 run_sema 2 acquired semaphore
356 run_sema 3 trying to access semaphore
357 run_sema 4 trying to access semaphore
358 run_sema 0 has released semaphore
359 run_sema 4 acquired semaphore
360 run_sema 1 has released semaphore
361 run_sema 3 acquired semaphore
362 run_sema 2 has released semaphore
363 run_sema 4 has released semaphore
364 run_sema 3 has released semaphore
365 Bounded semaphore exception test OK
367 Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
369 exp
= '''run_sema 0 trying to access semaphore
370 run_sema 0 acquired semaphore
371 run_sema 1 trying to access semaphore
372 run_sema 1 acquired semaphore
373 run_sema 2 trying to access semaphore
374 run_sema 2 acquired semaphore
375 run_sema 3 trying to access semaphore
376 run_sema 4 trying to access semaphore
377 run_sema 0 has released semaphore
378 run_sema 3 acquired semaphore
379 run_sema 1 has released semaphore
380 run_sema 4 acquired semaphore
381 run_sema 2 has released semaphore
382 run_sema 3 has released semaphore
383 run_sema 4 has released semaphore
385 Exact sequence of acquisition may vary when 3 and 4 compete for semaphore.'''
387 asyncio
.run(run_sema_test(bounded
))
389 # ************ Condition test ************
396 await asyncio
.sleep(2)
398 cond
.notify(2) # Notify 2 tasks
400 async def cond03(): # Maintain a count of seconds
402 await asyncio
.sleep(0.5)
404 await asyncio
.sleep(1)
407 async def cond02(n
, barrier
):
409 print('cond02', n
, 'Awaiting notification.')
411 print('cond02', n
, 'triggered. tim =', tim
)
417 async def cond04(n
, barrier
):
419 print('cond04', n
, 'Awaiting notification and predicate.')
420 await cond
.wait_for(predicate
)
421 print('cond04', n
, 'triggered. tim =', tim
)
426 barrier
= Barrier(ntasks
+ 1)
427 t1
= asyncio
.create_task(cond01())
428 t3
= asyncio
.create_task(cond03())
429 for n
in range(ntasks
):
430 asyncio
.create_task(cond02(n
, barrier
))
431 await barrier
# All instances of cond02 have completed
434 asyncio
.create_task(cond04(99, barrier
))
436 # cancel continuously running coros.
439 await asyncio
.sleep_ms(0)
442 def condition_test():
443 printexp('''cond02 0 Awaiting notification.
444 cond02 1 Awaiting notification.
445 cond02 2 Awaiting notification.
446 cond02 3 Awaiting notification.
447 cond02 4 Awaiting notification.
448 cond02 5 Awaiting notification.
449 cond02 6 Awaiting notification.
450 cond02 5 triggered. tim = 1
451 cond02 6 triggered. tim = 1
452 cond02 3 triggered. tim = 3
453 cond02 4 triggered. tim = 3
454 cond02 1 triggered. tim = 5
455 cond02 2 triggered. tim = 5
456 cond02 0 triggered. tim = 7
457 cond04 99 Awaiting notification and predicate.
458 cond04 99 triggered. tim = 9
461 asyncio
.run(cond_go())
463 # ************ Queue test ************
465 async def slow_process():
466 await asyncio
.sleep(2)
470 print('Waiting for slow process.')
471 result
= await slow_process()
472 print('Putting result onto queue')
473 await q
.put(result
) # Put result on q
476 print("Running foo()")
477 result
= await q
.get()
478 print('Result was {}'.format(result
))
480 async def q_put(n
, q
):
484 await asyncio
.sleep(0)
486 async def q_get(n
, q
):
489 await asyncio
.sleep(0)
492 # put some item, then sleep
495 await asyncio
.sleep_ms(50)
499 # checks for new items, and relies on the "blocking" of the get method
503 async def queue_go():
505 asyncio
.create_task(foo(q
))
506 asyncio
.create_task(bar(q
))
507 await asyncio
.sleep(3)
509 asyncio
.create_task(q_put(n
, q
))
510 await asyncio
.sleep(1)
511 assert q
.qsize() == 10
513 await asyncio
.sleep(0.1)
514 assert q
.qsize() == 10
517 await asyncio
.sleep(0.1)
519 print('Competing put tasks test complete')
522 asyncio
.create_task(q_get(n
, q
))
523 await asyncio
.sleep(1)
527 await asyncio
.sleep(0.3)
529 assert q
.qsize() == 10
530 print('Competing get tasks test complete')
531 await asyncio
.gather(
535 print('Queue tests complete')
536 print("I've seen attack ships burn off the shoulder of Orion...")
537 print("Time to die...")
540 printexp('''Running (runtime = 20s):
542 Waiting for slow process.
543 Putting result onto queue
545 Competing put tasks test complete
546 Competing get tasks test complete
550 I've seen attack ships burn off the shoulder of Orion...
554 asyncio
.run(queue_go())
556 # ************ RingbufQueue test ************
558 async def qread(q
, lst
, twr
):
561 await asyncio
.sleep_ms(twr
)
563 async def read(q
, t
, twr
=0):
566 await asyncio
.wait_for(qread(q
, lst
, twr
), t
)
567 except asyncio
.TimeoutError
:
571 async def put_list(q
, lst
, twp
=0):
574 await asyncio
.sleep_ms(twp
)
577 q
= RingbufQueue([0 for _
in range(10)]) # 10 elements
578 pl
= [n
for n
in range(15)]
579 print("Read waits on slow write.")
580 asyncio
.create_task(put_list(q
, pl
, 100))
581 rl
= await read(q
, 2)
584 print("Write waits on slow read.")
585 asyncio
.create_task(put_list(q
, pl
))
586 rl
= await read(q
, 2, 100)
589 print("Testing full, empty and qsize methods.")
591 assert q
.qsize() == 0
593 await put_list(q
, (1,2,3))
595 assert q
.qsize() == 3
598 print("Testing put_nowait and overruns.")
600 for x
in range(4, 15):
607 rl
= await read(q
, 2)
608 assert rl
== [6, 7, 8, 9, 10, 11, 12, 13, 14]
609 print("Testing get_nowait.")
611 assert q
.get_nowait() == 1
618 print("Tests complete.")
619 print("I've seen attack ships burn off the shoulder of Orion...")
620 print("Time to die...")
623 printexp('''Running (runtime = 6s):
624 Read waits on slow write.
626 Write waits on slow read.
628 Testing full, empty and qsize methods.
630 Testing put_nowait and overruns.
633 I've seen attack ships burn off the shoulder of Orion...
637 asyncio
.run(rbq_go())
639 # ************ ************
643 ack_test() # Test message acknowledge.
645 msg_test() # Test Messge and Lock objects.
647 barrier_test() # Test the Barrier class.
649 barrier_test1() # Test the Barrier class.
651 semaphore_test(False) # Test Semaphore
653 semaphore_test(True) # Test BoundedSemaphore.
655 condition_test() # Test the Condition class.
657 queue_test() # Test the Queue class.
659 rbq_test() # Test the RingbufQueue class.
660 except KeyboardInterrupt:
663 asyncio
.new_event_loop()