]> vault307.fbx.one Git - ir_remote.git/blob - primitives/semaphore.py
infrared remote
[ir_remote.git] / primitives / semaphore.py
1 # semaphore.py
2
3 # Copyright (c) 2018-2020 Peter Hinch
4 # Released under the MIT License (MIT) - see LICENSE file
5
6 try:
7 import uasyncio as asyncio
8 except ImportError:
9 import asyncio
10
11 # A Semaphore is typically used to limit the number of coros running a
12 # particular piece of code at once. The number is defined in the constructor.
13 class Semaphore():
14 def __init__(self, value=1):
15 self._count = value
16 self._event = asyncio.Event()
17
18 async def __aenter__(self):
19 await self.acquire()
20 return self
21
22 async def __aexit__(self, *args):
23 self.release()
24 await asyncio.sleep(0)
25
26 async def acquire(self):
27 self._event.clear()
28 while self._count == 0: # Multiple tasks may be waiting for
29 await self._event.wait() # a release
30 self._event.clear()
31 # When we yield, another task may succeed. In this case
32 await asyncio.sleep(0) # the loop repeats
33 self._count -= 1
34
35 def release(self):
36 self._event.set()
37 self._count += 1
38
39 class BoundedSemaphore(Semaphore):
40 def __init__(self, value=1):
41 super().__init__(value)
42 self._initial_value = value
43
44 def release(self):
45 if self._count < self._initial_value:
46 super().release()
47 else:
48 raise ValueError('Semaphore released more than acquired')