]> vault307.fbx.one Git - micorpython_ir.git/blobdiff - ir_tx/rp2_rmt.py
Make compatible with Raspberry Pi Pico (RP2 arch).
[micorpython_ir.git] / ir_tx / rp2_rmt.py
diff --git a/ir_tx/rp2_rmt.py b/ir_tx/rp2_rmt.py
new file mode 100644 (file)
index 0000000..7b42fa8
--- /dev/null
@@ -0,0 +1,104 @@
+# rp2_rmt.py A RMT-like class for the RP2.
+
+# Released under the MIT License (MIT). See LICENSE.
+
+# Copyright (c) 2021 Peter Hinch
+
+from machine import Pin, PWM
+import rp2
+
+@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, autopull=True, pull_thresh=32)
+def pulsetrain():
+    wrap_target()
+    out(x, 32)  # No of 1MHz ticks. Block if FIFO MT at end.
+    irq(rel(0))
+    set(pins, 1)  # Set pin high
+    label('loop')
+    jmp(x_dec,'loop')
+    irq(rel(0))
+    set(pins, 0)  # Set pin low
+    out(y, 32)  # Low time.
+    label('loop_lo')
+    jmp(y_dec,'loop_lo')
+    wrap()
+
+@rp2.asm_pio(autopull=True, pull_thresh=32)
+def irqtrain():
+    wrap_target()
+    out(x, 32)  # No of 1MHz ticks. Block if FIFO MT at end.
+    irq(rel(0))
+    label('loop')
+    jmp(x_dec,'loop')
+    wrap()
+
+class DummyPWM:
+    def duty_u16(self, _):
+        pass
+
+class RP2_RMT:
+
+    def __init__(self, pin_pulse=None, carrier=None, sm_no=0, sm_freq=1_000_000):
+        if carrier is None:
+            self.pwm = DummyPWM()
+            self.duty = (0, 0)
+        else:
+            pin_car, freq, duty = carrier
+            self.pwm = PWM(pin_car)  # Set up PWM with carrier off.
+            self.pwm.freq(freq)
+            self.pwm.duty_u16(0)
+            self.duty = (int(0xffff * duty // 100), 0)
+        if pin_pulse is None:
+            self.sm = rp2.StateMachine(sm_no, irqtrain, freq=sm_freq)
+        else:
+            self.sm = rp2.StateMachine(sm_no, pulsetrain, freq=sm_freq, set_base=pin_pulse)
+        self.apt = 0  # Array index
+        self.arr = None  # Array
+        self.ict = None  # Current IRQ count
+        self.icm = 0  # End IRQ count
+        self.reps = 0  # 0 == forever n == no. of reps
+        rp2.PIO(0).irq(self._cb)
+
+    # IRQ callback. Because of FIFO IRQ's keep arriving after STOP.
+    def _cb(self, pio):
+        self.pwm.duty_u16(self.duty[self.ict & 1])
+        self.ict += 1
+        if d := self.arr[self.apt]:  # If data available feed FIFO
+            self.sm.put(d)
+            self.apt += 1
+        else:
+            if r := self.reps != 1:  # All done if reps == 1
+                if r:  # 0 == run forever
+                    self.reps -= 1
+                self.sm.put(self.arr[0])
+                self.apt = 1  # Set pointer and count to state
+                self.ict = 1  # after 1st IRQ
+
+    # Arg is an array of times in μs terminated by 0. 
+    def send(self, ar, reps=1, check=True):
+        self.sm.active(0)
+        self.reps = reps
+        ar[-1] = 0  # Ensure at least one STOP
+        for x, d in enumerate(ar):  # Find 1st STOP
+            if d == 0:
+                break
+        if check:
+            # Discard any trailing mark which would leave carrier on.
+            if (x & 1):
+                x -= 1
+                ar[x] = 0
+        self.icm = x  # index of 1st STOP
+        mv = memoryview(ar)
+        n = min(x, 4)  # Fill FIFO if there are enough data points.
+        self.sm.put(mv[0 : n])
+        self.arr = ar  # Initial conditions for ISR
+        self.apt = n  # Point to next data value
+        self.ict = 0  # IRQ count
+        self.sm.active(1)
+
+    def busy(self):
+        if self.ict is None:
+            return False  # Just instantiated
+        return self.ict < self.icm
+
+    def cancel(self):
+        self.reps = 1