Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for rotary encoder #12

Merged
merged 7 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ Mopidy-Raspberry-GPIO to your Mopidy configuration file::
[raspberry-gpio]
enabled = true
bcm5 = play_pause,active_low,250
bcm6 = volume_down,active_low,250
bcm6 = prev,active_low,250
bcm16 = next,active_low,250
bcm20 = volume_up,active_low,250
bcm21 = volume_down,active_low,10,rotenc_id=vol,step=1
bcm20 = volume_up,active_low,10,rotenc_id=vol,step=1

Each bcmN entry corresponds to the BCM pin of that number.

Expand Down
37 changes: 32 additions & 5 deletions mopidy_raspberry_gpio/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import pykka
from mopidy import core

from .rotencoder import RotEncoder

logger = logging.getLogger(__name__)


Expand All @@ -14,6 +16,7 @@ def __init__(self, config, core):
self.core = core
self.config = config["raspberry-gpio"]
self.pin_settings = {}
self.rot_encoders = {}

GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
Expand All @@ -33,6 +36,17 @@ def __init__(self, config, core):
pull = GPIO.PUD_DOWN
edge = GPIO.RISING

if "rotenc_id" in settings.options:
edge = GPIO.BOTH
rotenc_id = settings.options["rotenc_id"]
encoder = None
if rotenc_id in self.rot_encoders.keys():
encoder = self.rot_encoders[rotenc_id]
else:
encoder = RotEncoder(rotenc_id)
self.rot_encoders[rotenc_id] = encoder
encoder.add_pin(pin, settings.event)

GPIO.setup(pin, GPIO.IN, pull_up_down=pull)

GPIO.add_event_detect(
Expand All @@ -44,17 +58,30 @@ def __init__(self, config, core):

self.pin_settings[pin] = settings

# TODO validate all self.rot_encoders have two pins

def find_pin_rotenc(self, pin):
for encoder in self.rot_encoders.values():
if pin in encoder.pins:
return encoder

def gpio_event(self, pin):
settings = self.pin_settings[pin]
self.dispatch_input(settings)
event = settings.event
encoder = self.find_pin_rotenc(pin)
if encoder:
event = encoder.get_event()

if event:
self.dispatch_input(event, settings.options)

def dispatch_input(self, settings):
handler_name = f"handle_{settings.event}"
def dispatch_input(self, event, options):
handler_name = f"handle_{event}"
try:
getattr(self, handler_name)(settings.options)
getattr(self, handler_name)(options)
except AttributeError:
raise RuntimeError(
f"Could not find input handler for event: {settings.event}"
f"Could not find input handler for event: {event}"
)

def handle_play_pause(self, config):
Expand Down
51 changes: 51 additions & 0 deletions mopidy_raspberry_gpio/rotencoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import logging

logger = logging.getLogger(__name__)


class RotEncoder:
def __init__(self, rot_id):
self.id = rot_id
self.pins = []
self.events = []
self.state = (None, None)
self.state_map = {
((False, False), (False, True)): 0,
((False, False), (True, False)): 1,
((False, True), (True, True)): 0,
((False, True), (False, False)): 1,
((True, False), (False, False)): 0,
((True, False), (True, True)): 1,
((True, True), (True, False)): 0,
((True, True), (False, True)): 1,
}

def add_pin(self, pin, event):
if len(self.pins) == 2:
raise RuntimeError(f"Too many pins for rotary encoder {self.id}!")
self.pins.append(pin)
self.events.append(event)

def get_state(self):
import RPi.GPIO as GPIO

level0 = GPIO.input(self.pins[0])
level1 = GPIO.input(self.pins[1])

return (level0, level1)

def get_direction(self, current, new):
return self.state_map[(current, new)]

def get_event(self):
next_state = self.get_state()

event = None
try:
direction = self.get_direction(self.state, next_state)
event = self.events[direction]
except KeyError:
pass

self.state = next_state
return event
46 changes: 38 additions & 8 deletions tests/test_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"bcm1": deserialize("play_pause,active_low,30"),
"bcm2": deserialize("volume_up,active_high,30"),
"bcm3": deserialize("volume_down,active_high,30"),
"bcm4": deserialize("volume_down,active_high,250,rotenc_id=vol"),
"bcm5": deserialize("volume_up,active_high,250,rotenc_id=vol"),
}
}

Expand Down Expand Up @@ -61,7 +63,7 @@ def test_frontend_handler_dispatch_play_pause():
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("play_pause,active_low,30")

frontend.dispatch_input(settings)
frontend.dispatch_input(settings.event, settings.options)

stop_mopidy_core()

Expand All @@ -78,7 +80,7 @@ def test_frontend_handler_dispatch_play_stop():
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("play_stop,active_low,30")

frontend.dispatch_input(settings)
frontend.dispatch_input(settings.event, settings.options)

stop_mopidy_core()

Expand All @@ -95,7 +97,7 @@ def test_frontend_handler_dispatch_next():
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("next,active_low,30")

frontend.dispatch_input(settings)
frontend.dispatch_input(settings.event, settings.options)

stop_mopidy_core()

Expand All @@ -112,7 +114,7 @@ def test_frontend_handler_dispatch_prev():
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("prev,active_low,30")

frontend.dispatch_input(settings)
frontend.dispatch_input(settings.event, settings.options)

stop_mopidy_core()

Expand All @@ -129,7 +131,7 @@ def test_frontend_handler_dispatch_volume_up():
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("volume_up,active_low,30")

frontend.dispatch_input(settings)
frontend.dispatch_input(settings.event, settings.options)

stop_mopidy_core()

Expand All @@ -146,7 +148,7 @@ def test_frontend_handler_dispatch_volume_down():
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("volume_down,active_low,30")

frontend.dispatch_input(settings)
frontend.dispatch_input(settings.event, settings.options)

stop_mopidy_core()

Expand All @@ -163,7 +165,7 @@ def test_frontend_handler_dispatch_volume_up_custom_step():
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("volume_up,active_low,30,step=1")

frontend.dispatch_input(settings)
frontend.dispatch_input(settings.event, settings.options)

stop_mopidy_core()

Expand All @@ -180,7 +182,7 @@ def test_frontend_handler_dispatch_volume_down_custom_step():
schema = ext.get_config_schema()
settings = schema["bcm1"].deserialize("volume_down,active_low,30,step=1")

frontend.dispatch_input(settings)
frontend.dispatch_input(settings.event, settings.options)

stop_mopidy_core()

Expand All @@ -196,3 +198,31 @@ def test_frontend_gpio_event():
frontend.gpio_event(3)

stop_mopidy_core()


@mock.patch("RPi.GPIO.input")
def test_frontend_rot_encoder_event(patched_input):
patched_input.return_value = False

frontend = frontend_lib.RaspberryGPIOFrontend(
dummy_config, dummy_mopidy_core()
)

# Check that transition (False, True) -> (False, False) triggers volume_up
encoder = frontend.rot_encoders["vol"]
encoder.state = (False, True)

dispatch_input = mock.Mock()
frontend.dispatch_input = dispatch_input

frontend.gpio_event(4)
assert dispatch_input.call_args[0][0] == "volume_up"
assert encoder.state == (False, False)

# Check that we do not submit an event for the invalid transition
# (False, False) -> (False, False)
dispatch_input.reset_mock()
frontend.gpio_event(4)
dispatch_input.assert_not_called()

stop_mopidy_core()
54 changes: 54 additions & 0 deletions tests/test_rotencoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import unittest

from unittest.mock import patch
from mopidy_raspberry_gpio.rotencoder import RotEncoder


class RotEncoderTests(unittest.TestCase):
def test_rotenc_init(self):
rot_enc = RotEncoder("vol")
self.assertTrue(rot_enc.id == "vol")
self.assertTrue(((False, False), (False, True)) in rot_enc.state_map)

def test_get_direction(self):
rot_enc = RotEncoder("vol")
rot_enc.add_pin(123, "vol_up")
rot_enc.add_pin(124, "vol_down")

dir_down = rot_enc.get_direction((False, False), (False, True))
dir_up = rot_enc.get_direction((False, False), (True, False))

self.assertEqual(dir_up, 1)
self.assertEqual(dir_down, 0)

def test_add_pin_invalid(self):
rot_enc = RotEncoder("vol")
rot_enc.add_pin(123, "vol_up")
rot_enc.add_pin(124, "vol_down")

with self.assertRaises(RuntimeError):
rot_enc.add_pin(124, "vol_down")

@patch("RPi.GPIO.input")
def test_get_event(self, patched_input):
# Always return False for GPIO.input
patched_input.return_value = False

rot_enc = RotEncoder("vol")
rot_enc.add_pin(123, "vol_down") # dir 0 => vol_down
rot_enc.add_pin(124, "vol_up") # dir 1 => vol_up

# from False,True to False,False => dir 1
rot_enc.state = (False, True)
event = rot_enc.get_event()
self.assertEqual(event, "vol_up")

# from True,False to False,False => dir 0
rot_enc.state = (True, False)
event = rot_enc.get_event()
self.assertEqual(event, "vol_down")

# from True,True to False,False => None
rot_enc.state = (True, True)
event = rot_enc.get_event()
self.assertEqual(event, None)