diff --git a/README.rst b/README.rst index 0b339dd..2f10fad 100644 --- a/README.rst +++ b/README.rst @@ -41,9 +41,10 @@ Mopidy-Raspberry-GPIO to your Mopidy configuration file:: [raspberry-gpio] enabled = true bcm5 = play_pause,active_low,250 - bcm6 = volume_down,active_low,250 + bcm6 = prev,active_low,250 bcm16 = next,active_low,250 - bcm20 = volume_up,active_low,250 + bcm21 = volume_down,active_low,10,rotenc_id=vol,step=1 + bcm20 = volume_up,active_low,10,rotenc_id=vol,step=1 Each bcmN entry corresponds to the BCM pin of that number. diff --git a/mopidy_raspberry_gpio/frontend.py b/mopidy_raspberry_gpio/frontend.py index cf10840..215b9c3 100644 --- a/mopidy_raspberry_gpio/frontend.py +++ b/mopidy_raspberry_gpio/frontend.py @@ -3,6 +3,8 @@ import pykka from mopidy import core +from .rotencoder import RotEncoder + logger = logging.getLogger(__name__) @@ -14,6 +16,7 @@ def __init__(self, config, core): self.core = core self.config = config["raspberry-gpio"] self.pin_settings = {} + self.rot_encoders = {} GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) @@ -33,6 +36,17 @@ def __init__(self, config, core): pull = GPIO.PUD_DOWN edge = GPIO.RISING + if "rotenc_id" in settings.options: + edge = GPIO.BOTH + rotenc_id = settings.options["rotenc_id"] + encoder = None + if rotenc_id in self.rot_encoders.keys(): + encoder = self.rot_encoders[rotenc_id] + else: + encoder = RotEncoder(rotenc_id) + self.rot_encoders[rotenc_id] = encoder + encoder.add_pin(pin, settings.event) + GPIO.setup(pin, GPIO.IN, pull_up_down=pull) GPIO.add_event_detect( @@ -44,17 +58,30 @@ def __init__(self, config, core): self.pin_settings[pin] = settings + # TODO validate all self.rot_encoders have two pins + + def find_pin_rotenc(self, pin): + for encoder in self.rot_encoders.values(): + if pin in encoder.pins: + return encoder + def gpio_event(self, pin): settings = self.pin_settings[pin] - self.dispatch_input(settings) + event = settings.event + encoder = self.find_pin_rotenc(pin) + if encoder: + event = encoder.get_event() + + if event: + self.dispatch_input(event, settings.options) - def dispatch_input(self, settings): - handler_name = f"handle_{settings.event}" + def dispatch_input(self, event, options): + handler_name = f"handle_{event}" try: - getattr(self, handler_name)(settings.options) + getattr(self, handler_name)(options) except AttributeError: raise RuntimeError( - f"Could not find input handler for event: {settings.event}" + f"Could not find input handler for event: {event}" ) def handle_play_pause(self, config): diff --git a/mopidy_raspberry_gpio/rotencoder.py b/mopidy_raspberry_gpio/rotencoder.py new file mode 100644 index 0000000..e0b09ac --- /dev/null +++ b/mopidy_raspberry_gpio/rotencoder.py @@ -0,0 +1,51 @@ +import logging + +logger = logging.getLogger(__name__) + + +class RotEncoder: + def __init__(self, rot_id): + self.id = rot_id + self.pins = [] + self.events = [] + self.state = (None, None) + self.state_map = { + ((False, False), (False, True)): 0, + ((False, False), (True, False)): 1, + ((False, True), (True, True)): 0, + ((False, True), (False, False)): 1, + ((True, False), (False, False)): 0, + ((True, False), (True, True)): 1, + ((True, True), (True, False)): 0, + ((True, True), (False, True)): 1, + } + + def add_pin(self, pin, event): + if len(self.pins) == 2: + raise RuntimeError(f"Too many pins for rotary encoder {self.id}!") + self.pins.append(pin) + self.events.append(event) + + def get_state(self): + import RPi.GPIO as GPIO + + level0 = GPIO.input(self.pins[0]) + level1 = GPIO.input(self.pins[1]) + + return (level0, level1) + + def get_direction(self, current, new): + return self.state_map[(current, new)] + + def get_event(self): + next_state = self.get_state() + + event = None + try: + direction = self.get_direction(self.state, next_state) + event = self.events[direction] + except KeyError: + pass + + self.state = next_state + return event diff --git a/tests/test_frontend.py b/tests/test_frontend.py index c749c7e..568fbe7 100644 --- a/tests/test_frontend.py +++ b/tests/test_frontend.py @@ -18,6 +18,8 @@ "bcm1": deserialize("play_pause,active_low,30"), "bcm2": deserialize("volume_up,active_high,30"), "bcm3": deserialize("volume_down,active_high,30"), + "bcm4": deserialize("volume_down,active_high,250,rotenc_id=vol"), + "bcm5": deserialize("volume_up,active_high,250,rotenc_id=vol"), } } @@ -61,7 +63,7 @@ def test_frontend_handler_dispatch_play_pause(): schema = ext.get_config_schema() settings = schema["bcm1"].deserialize("play_pause,active_low,30") - frontend.dispatch_input(settings) + frontend.dispatch_input(settings.event, settings.options) stop_mopidy_core() @@ -78,7 +80,7 @@ def test_frontend_handler_dispatch_play_stop(): schema = ext.get_config_schema() settings = schema["bcm1"].deserialize("play_stop,active_low,30") - frontend.dispatch_input(settings) + frontend.dispatch_input(settings.event, settings.options) stop_mopidy_core() @@ -95,7 +97,7 @@ def test_frontend_handler_dispatch_next(): schema = ext.get_config_schema() settings = schema["bcm1"].deserialize("next,active_low,30") - frontend.dispatch_input(settings) + frontend.dispatch_input(settings.event, settings.options) stop_mopidy_core() @@ -112,7 +114,7 @@ def test_frontend_handler_dispatch_prev(): schema = ext.get_config_schema() settings = schema["bcm1"].deserialize("prev,active_low,30") - frontend.dispatch_input(settings) + frontend.dispatch_input(settings.event, settings.options) stop_mopidy_core() @@ -129,7 +131,7 @@ def test_frontend_handler_dispatch_volume_up(): schema = ext.get_config_schema() settings = schema["bcm1"].deserialize("volume_up,active_low,30") - frontend.dispatch_input(settings) + frontend.dispatch_input(settings.event, settings.options) stop_mopidy_core() @@ -146,7 +148,7 @@ def test_frontend_handler_dispatch_volume_down(): schema = ext.get_config_schema() settings = schema["bcm1"].deserialize("volume_down,active_low,30") - frontend.dispatch_input(settings) + frontend.dispatch_input(settings.event, settings.options) stop_mopidy_core() @@ -163,7 +165,7 @@ def test_frontend_handler_dispatch_volume_up_custom_step(): schema = ext.get_config_schema() settings = schema["bcm1"].deserialize("volume_up,active_low,30,step=1") - frontend.dispatch_input(settings) + frontend.dispatch_input(settings.event, settings.options) stop_mopidy_core() @@ -180,7 +182,7 @@ def test_frontend_handler_dispatch_volume_down_custom_step(): schema = ext.get_config_schema() settings = schema["bcm1"].deserialize("volume_down,active_low,30,step=1") - frontend.dispatch_input(settings) + frontend.dispatch_input(settings.event, settings.options) stop_mopidy_core() @@ -196,3 +198,31 @@ def test_frontend_gpio_event(): frontend.gpio_event(3) stop_mopidy_core() + + +@mock.patch("RPi.GPIO.input") +def test_frontend_rot_encoder_event(patched_input): + patched_input.return_value = False + + frontend = frontend_lib.RaspberryGPIOFrontend( + dummy_config, dummy_mopidy_core() + ) + + # Check that transition (False, True) -> (False, False) triggers volume_up + encoder = frontend.rot_encoders["vol"] + encoder.state = (False, True) + + dispatch_input = mock.Mock() + frontend.dispatch_input = dispatch_input + + frontend.gpio_event(4) + assert dispatch_input.call_args[0][0] == "volume_up" + assert encoder.state == (False, False) + + # Check that we do not submit an event for the invalid transition + # (False, False) -> (False, False) + dispatch_input.reset_mock() + frontend.gpio_event(4) + dispatch_input.assert_not_called() + + stop_mopidy_core() diff --git a/tests/test_rotencoder.py b/tests/test_rotencoder.py new file mode 100644 index 0000000..fd321ed --- /dev/null +++ b/tests/test_rotencoder.py @@ -0,0 +1,54 @@ +import unittest + +from unittest.mock import patch +from mopidy_raspberry_gpio.rotencoder import RotEncoder + + +class RotEncoderTests(unittest.TestCase): + def test_rotenc_init(self): + rot_enc = RotEncoder("vol") + self.assertTrue(rot_enc.id == "vol") + self.assertTrue(((False, False), (False, True)) in rot_enc.state_map) + + def test_get_direction(self): + rot_enc = RotEncoder("vol") + rot_enc.add_pin(123, "vol_up") + rot_enc.add_pin(124, "vol_down") + + dir_down = rot_enc.get_direction((False, False), (False, True)) + dir_up = rot_enc.get_direction((False, False), (True, False)) + + self.assertEqual(dir_up, 1) + self.assertEqual(dir_down, 0) + + def test_add_pin_invalid(self): + rot_enc = RotEncoder("vol") + rot_enc.add_pin(123, "vol_up") + rot_enc.add_pin(124, "vol_down") + + with self.assertRaises(RuntimeError): + rot_enc.add_pin(124, "vol_down") + + @patch("RPi.GPIO.input") + def test_get_event(self, patched_input): + # Always return False for GPIO.input + patched_input.return_value = False + + rot_enc = RotEncoder("vol") + rot_enc.add_pin(123, "vol_down") # dir 0 => vol_down + rot_enc.add_pin(124, "vol_up") # dir 1 => vol_up + + # from False,True to False,False => dir 1 + rot_enc.state = (False, True) + event = rot_enc.get_event() + self.assertEqual(event, "vol_up") + + # from True,False to False,False => dir 0 + rot_enc.state = (True, False) + event = rot_enc.get_event() + self.assertEqual(event, "vol_down") + + # from True,True to False,False => None + rot_enc.state = (True, True) + event = rot_enc.get_event() + self.assertEqual(event, None)