]>
vault307.fbx.one Git - ir_remote.git/blob - primitives/ringbuf_queue.py
1 # ringbuf_queue.py Provides RingbufQueue class
3 # Copyright (c) 2022 Peter Hinch
4 # Released under the MIT License (MIT) - see LICENSE file
6 # API differs from CPython
7 # Uses pre-allocated ring buffer: can use list or array
8 # Asynchronous iterator allowing consumer to use async for
9 # put_nowait QueueFull exception can be ignored allowing oldest data to be discarded.
11 import uasyncio
as asyncio
14 class RingbufQueue
: # MicroPython optimised
15 def __init__(self
, buf
):
20 self
._evput
= asyncio
.Event() # Triggered by put, tested by get
21 self
._evget
= asyncio
.Event() # Triggered by get, tested by put
24 return ((self
._wi
+ 1) % self
._size
) == self
._ri
27 return self
._ri
== self
._wi
30 return (self
._wi
- self
._ri
) % self
._size
32 def get_nowait(self
): # Remove and return an item from the queue.
33 # Return an item if one is immediately available, else raise QueueEmpty.
37 self
._ri
= (self
._ri
+ 1) % self
._size
38 self
._evget
.set() # Schedule all tasks waiting on ._evget
42 def put_nowait(self
, v
):
44 self
._evput
.set() # Schedule any tasks waiting on get
46 self
._wi
= (self
._wi
+ 1) % self
._size
47 if self
._wi
== self
._ri
: # Would indicate empty
48 self
._ri
= (self
._ri
+ 1) % self
._size
# Discard a message
49 raise IndexError # Caller can ignore if overwrites are OK
51 async def put(self
, val
): # Usage: await queue.put(item)
52 while self
.full(): # Queue full
53 await self
._evget
.wait() # May be >1 task waiting on ._evget
54 # Task(s) waiting to get from queue, schedule first Task
60 async def __anext__(self
):
61 while self
.empty(): # Empty. May be more than one task waiting on ._evput
62 await self
._evput
.wait()
64 self
._ri
= (self
._ri
+ 1) % self
._size
65 self
._evget
.set() # Schedule all tasks waiting on ._evget