--- /dev/null
+# rp2_rmt.py A RMT-like class for the RP2.
+
+# Released under the MIT License (MIT). See LICENSE.
+
+# Copyright (c) 2021 Peter Hinch
+
+from machine import Pin, PWM
+import rp2
+
+@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, autopull=True, pull_thresh=32)
+def pulsetrain():
+ wrap_target()
+ out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end.
+ irq(rel(0))
+ set(pins, 1) # Set pin high
+ label('loop')
+ jmp(x_dec,'loop')
+ irq(rel(0))
+ set(pins, 0) # Set pin low
+ out(y, 32) # Low time.
+ label('loop_lo')
+ jmp(y_dec,'loop_lo')
+ wrap()
+
+@rp2.asm_pio(autopull=True, pull_thresh=32)
+def irqtrain():
+ wrap_target()
+ out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end.
+ irq(rel(0))
+ label('loop')
+ jmp(x_dec,'loop')
+ wrap()
+
+class DummyPWM:
+ def duty_u16(self, _):
+ pass
+
+class RP2_RMT:
+
+ def __init__(self, pin_pulse=None, carrier=None, sm_no=0, sm_freq=1_000_000):
+ if carrier is None:
+ self.pwm = DummyPWM()
+ self.duty = (0, 0)
+ else:
+ pin_car, freq, duty = carrier
+ self.pwm = PWM(pin_car) # Set up PWM with carrier off.
+ self.pwm.freq(freq)
+ self.pwm.duty_u16(0)
+ self.duty = (int(0xffff * duty // 100), 0)
+ if pin_pulse is None:
+ self.sm = rp2.StateMachine(sm_no, irqtrain, freq=sm_freq)
+ else:
+ self.sm = rp2.StateMachine(sm_no, pulsetrain, freq=sm_freq, set_base=pin_pulse)
+ self.apt = 0 # Array index
+ self.arr = None # Array
+ self.ict = None # Current IRQ count
+ self.icm = 0 # End IRQ count
+ self.reps = 0 # 0 == forever n == no. of reps
+ rp2.PIO(0).irq(self._cb)
+
+ # IRQ callback. Because of FIFO IRQ's keep arriving after STOP.
+ def _cb(self, pio):
+ self.pwm.duty_u16(self.duty[self.ict & 1])
+ self.ict += 1
+ if d := self.arr[self.apt]: # If data available feed FIFO
+ self.sm.put(d)
+ self.apt += 1
+ else:
+ if r := self.reps != 1: # All done if reps == 1
+ if r: # 0 == run forever
+ self.reps -= 1
+ self.sm.put(self.arr[0])
+ self.apt = 1 # Set pointer and count to state
+ self.ict = 1 # after 1st IRQ
+
+ # Arg is an array of times in μs terminated by 0.
+ def send(self, ar, reps=1, check=True):
+ self.sm.active(0)
+ self.reps = reps
+ ar[-1] = 0 # Ensure at least one STOP
+ for x, d in enumerate(ar): # Find 1st STOP
+ if d == 0:
+ break
+ if check:
+ # Discard any trailing mark which would leave carrier on.
+ if (x & 1):
+ x -= 1
+ ar[x] = 0
+ self.icm = x # index of 1st STOP
+ mv = memoryview(ar)
+ n = min(x, 4) # Fill FIFO if there are enough data points.
+ self.sm.put(mv[0 : n])
+ self.arr = ar # Initial conditions for ISR
+ self.apt = n # Point to next data value
+ self.ict = 0 # IRQ count
+ self.sm.active(1)
+
+ def busy(self):
+ if self.ict is None:
+ return False # Just instantiated
+ return self.ict < self.icm
+
+ def cancel(self):
+ self.reps = 1