]> vault307.fbx.one Git - ir_remote.git/blob - primitives/ringbuf_queue.py
infrared remote
[ir_remote.git] / primitives / ringbuf_queue.py
1 # ringbuf_queue.py Provides RingbufQueue class
2
3 # Copyright (c) 2022 Peter Hinch
4 # Released under the MIT License (MIT) - see LICENSE file
5
6 # API differs from CPython
7 # Uses pre-allocated ring buffer: can use list or array
8 # Asynchronous iterator allowing consumer to use async for
9 # put_nowait QueueFull exception can be ignored allowing oldest data to be discarded.
10
11 import uasyncio as asyncio
12
13
14 class RingbufQueue: # MicroPython optimised
15 def __init__(self, buf):
16 self._q = buf
17 self._size = len(buf)
18 self._wi = 0
19 self._ri = 0
20 self._evput = asyncio.Event() # Triggered by put, tested by get
21 self._evget = asyncio.Event() # Triggered by get, tested by put
22
23 def full(self):
24 return ((self._wi + 1) % self._size) == self._ri
25
26 def empty(self):
27 return self._ri == self._wi
28
29 def qsize(self):
30 return (self._wi - self._ri) % self._size
31
32 def get_nowait(self): # Remove and return an item from the queue.
33 # Return an item if one is immediately available, else raise QueueEmpty.
34 if self.empty():
35 raise IndexError
36 r = self._q[self._ri]
37 self._ri = (self._ri + 1) % self._size
38 self._evget.set() # Schedule all tasks waiting on ._evget
39 self._evget.clear()
40 return r
41
42 def put_nowait(self, v):
43 self._q[self._wi] = v
44 self._evput.set() # Schedule any tasks waiting on get
45 self._evput.clear()
46 self._wi = (self._wi + 1) % self._size
47 if self._wi == self._ri: # Would indicate empty
48 self._ri = (self._ri + 1) % self._size # Discard a message
49 raise IndexError # Caller can ignore if overwrites are OK
50
51 async def put(self, val): # Usage: await queue.put(item)
52 while self.full(): # Queue full
53 await self._evget.wait() # May be >1 task waiting on ._evget
54 # Task(s) waiting to get from queue, schedule first Task
55 self.put_nowait(val)
56
57 def __aiter__(self):
58 return self
59
60 async def __anext__(self):
61 while self.empty(): # Empty. May be more than one task waiting on ._evput
62 await self._evput.wait()
63 r = self._q[self._ri]
64 self._ri = (self._ri + 1) % self._size
65 self._evget.set() # Schedule all tasks waiting on ._evget
66 self._evget.clear()
67 return r