Skip to content

Commit 382cb91

Browse files
committed
Send messages to expected external MIDI devices on pedalboard load
1 parent 67f1e78 commit 382cb91

5 files changed

Lines changed: 365 additions & 5 deletions

File tree

modalapi/external_midi.py

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
# This file is part of pi-stomp.
2+
#
3+
# pi-stomp is free software: you can redistribute it and/or modify
4+
# it under the terms of the GNU General Public License as published by
5+
# the Free Software Foundation, either version 3 of the License, or
6+
# (at your option) any later version.
7+
#
8+
# pi-stomp is distributed in the hope that it will be useful,
9+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
# GNU General Public License for more details.
12+
#
13+
# You should have received a copy of the GNU General Public License
14+
# along with pi-stomp. If not, see <https://www.gnu.org/licenses/>.
15+
16+
from __future__ import annotations
17+
18+
import fnmatch
19+
import logging
20+
import time
21+
from typing import TypedDict
22+
23+
import rtmidi
24+
25+
26+
MidiMessage = list[int]
27+
28+
29+
class PortConfig(TypedDict, total=False):
30+
auto_detect: list[str]
31+
port_index: int
32+
33+
34+
class ExternalMidiConfig(TypedDict, total=False):
35+
enabled: bool
36+
send_delay_ms: int
37+
ports: dict[str, PortConfig] # configured devices
38+
messages: dict[str, list[MidiMessage]] # port_name -> list of MIDI messages
39+
40+
41+
class ExternalMidiManager:
42+
"""
43+
Manages external MIDI device synchronization.
44+
Sends MIDI messages to external devices when pedalboards are loaded.
45+
"""
46+
47+
def __init__(self):
48+
self.midi_ports: dict[str, rtmidi.MidiOut | None] = {}
49+
self.port_configs: dict[str, PortConfig] = {}
50+
self.messages: dict[str, list[MidiMessage]] = {}
51+
self.enabled: bool = False
52+
self.send_delay_ms: int = 10
53+
54+
def update_config(self, cfg: ExternalMidiConfig | None) -> None:
55+
"""
56+
Update configuration incrementally (can be called multiple times).
57+
Only updates fields that are present.
58+
"""
59+
if cfg is None:
60+
return
61+
62+
if "enabled" in cfg:
63+
self.enabled = cfg["enabled"]
64+
if self.enabled:
65+
logging.debug("External MIDI enabled")
66+
else:
67+
logging.debug("External MIDI disabled")
68+
69+
if "send_delay_ms" in cfg:
70+
self.send_delay_ms = cfg["send_delay_ms"]
71+
72+
if "ports" in cfg:
73+
# Merge ports (port-level granularity)
74+
self.port_configs.update(cfg["ports"])
75+
76+
if "messages" in cfg:
77+
# Merge messages at port level
78+
# This allows pedalboard config to override specific ports while keeping others default
79+
self.messages.update(cfg["messages"])
80+
81+
def _get_available_ports(self) -> list[str]:
82+
try:
83+
temp_out = rtmidi.MidiOut()
84+
ports = temp_out.get_ports()
85+
del temp_out
86+
return ports
87+
except Exception as e:
88+
logging.error(f"Failed to enumerate MIDI ports: {e}")
89+
return []
90+
91+
def _find_port_by_name(self, port_config: PortConfig) -> int | None:
92+
"""
93+
Find MIDI port index matching a given config, returning its index if found.
94+
"""
95+
if "port_index" in port_config:
96+
return port_config["port_index"]
97+
98+
# Auto-detect by name patterns
99+
auto_detect = port_config.get("auto_detect", [])
100+
if not auto_detect:
101+
return None
102+
103+
available_ports = self._get_available_ports()
104+
if not available_ports:
105+
return None
106+
107+
# Search for matching ports using glob patterns
108+
matched_ports = []
109+
for pattern in auto_detect:
110+
for idx, port_name in enumerate(available_ports):
111+
# Case-insensitive glob matching
112+
if fnmatch.fnmatch(port_name.lower(), pattern.lower()):
113+
matched_ports.append((idx, port_name))
114+
115+
if not matched_ports:
116+
logging.warning(f"No MIDI ports matched patterns: {auto_detect}")
117+
return None
118+
119+
# Warn if multiple matches
120+
if len(matched_ports) > 1:
121+
port_names = [name for _, name in matched_ports]
122+
logging.warning(
123+
f"Multiple MIDI ports matched {auto_detect}: {port_names}. Using first match: {matched_ports[0][1]}"
124+
)
125+
126+
selected_idx, selected_name = matched_ports[0]
127+
logging.info(f"Auto-detected MIDI port: {selected_name} (index {selected_idx})")
128+
return selected_idx
129+
130+
def _init_port(self, port_name: str) -> rtmidi.MidiOut | None:
131+
if port_name in self.midi_ports:
132+
return self.midi_ports[port_name]
133+
134+
port_config = self.port_configs.get(port_name)
135+
if not port_config:
136+
logging.warning(f"No configuration found for MIDI port: {port_name}")
137+
self.midi_ports[port_name] = None
138+
return None
139+
140+
port_idx = self._find_port_by_name(port_config)
141+
if port_idx is None:
142+
logging.warning(f"Could not find MIDI port for: {port_name}")
143+
self.midi_ports[port_name] = None
144+
return None
145+
146+
try:
147+
midi_out = rtmidi.MidiOut()
148+
midi_out.open_port(port_idx)
149+
self.midi_ports[port_name] = midi_out
150+
logging.info(f"Opened MIDI port: {port_name}")
151+
return midi_out
152+
except Exception as e:
153+
logging.error(
154+
f"Failed to open MIDI port {port_name} (index {port_idx}): {e}"
155+
)
156+
self.midi_ports[port_name] = None
157+
return None
158+
159+
def _validate_midi_message(self, message: MidiMessage) -> bool:
160+
if not isinstance(message, list) or len(message) < 2:
161+
logging.warning(
162+
f"Invalid MIDI message format (must be list with 2+ bytes): {message}"
163+
)
164+
return False
165+
166+
# Check status byte (must be 0x80-0xFF)
167+
status = message[0]
168+
if not (0x80 <= status <= 0xFF):
169+
logging.warning(
170+
f"Invalid MIDI status byte (must be 0x80-0xFF): 0x{status:02X}"
171+
)
172+
return False
173+
174+
# Check data bytes (must be 0x00-0x7F)
175+
for i, byte in enumerate(message[1:], start=1):
176+
if not (0x00 <= byte <= 0x7F):
177+
logging.warning(
178+
f"Invalid MIDI data byte at position {i} (must be 0x00-0x7F): 0x{byte:02X}"
179+
)
180+
return False
181+
182+
return True
183+
184+
def _send_messages(
185+
self, port_name: str, messages: list[MidiMessage], delay_ms: int = 10
186+
):
187+
"""
188+
Send MIDI messages to a port.
189+
190+
Args:
191+
port_name: Name of port configuration.
192+
messages: List of MIDI messages to send.
193+
delay_ms: Delay between messages in milliseconds.
194+
"""
195+
midi_out = self._init_port(port_name)
196+
if midi_out is None:
197+
logging.warning(f"Skipping messages for unavailable port: {port_name}")
198+
return
199+
200+
for i, message in enumerate(messages):
201+
if not self._validate_midi_message(message):
202+
logging.warning(
203+
f"Skipping invalid MIDI message {i + 1}/{len(messages)}: {message}"
204+
)
205+
continue
206+
207+
try:
208+
midi_out.send_message(message)
209+
logging.debug(
210+
f"Sent MIDI message to {port_name}: {[f'0x{b:02X}' for b in message]}"
211+
)
212+
213+
# Delay between messages (except after last one)
214+
if i < len(messages) - 1 and delay_ms > 0:
215+
time.sleep(delay_ms / 1000.0)
216+
217+
except Exception as e:
218+
logging.error(f"Failed to send MIDI message to {port_name}: {e}")
219+
220+
def send_messages_for_pedalboard(self) -> bool:
221+
"""
222+
Send external MIDI messages for current pedalboard configuration.
223+
Configuration should have been set via update_config() before calling this.
224+
225+
Returns:
226+
True if messages were sent successfully, False otherwise.
227+
"""
228+
if not self.enabled:
229+
return False
230+
231+
if not self.messages:
232+
return False
233+
234+
for port_name, messages in self.messages.items():
235+
if not messages:
236+
continue
237+
238+
logging.debug(f"Sending MIDI message(s) to {port_name}: {messages.join(', ')}")
239+
self._send_messages(port_name, messages, self.send_delay_ms)
240+
241+
return True
242+
243+
def close(self):
244+
"""Close ports and clean up."""
245+
for port_name, midi_out in self.midi_ports.items():
246+
if midi_out is not None:
247+
try:
248+
midi_out.close_port()
249+
logging.debug(f"Closed MIDI port: {port_name}")
250+
except Exception as e:
251+
logging.warning(f"Error closing MIDI port {port_name}: {e}")
252+
253+
self.midi_ports.clear()
254+
logging.info("External MIDI manager closed")

modalapi/mod.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,20 @@
2121
import sys
2222
import yaml
2323

24+
from enum import Enum
25+
from rtmidi.midiconstants import CONTROL_CHANGE
26+
2427
import common.token as Token
2528
import common.util as util
2629
import pistomp.switchstate as switchstate
2730
import modalapi.pedalboard as Pedalboard
2831
import modalapi.parameter as Parameter
2932
import modalapi.wifi as Wifi
33+
import modalapi.external_midi as ExternalMidi
3034

3135
from pistomp.analogmidicontrol import AnalogMidiControl
3236
from pistomp.footswitch import Footswitch
3337
from pistomp.handler import Handler
34-
from enum import Enum
3538
from pathlib import Path
3639

3740
#sys.path.append('/usr/lib/python3.5/site-packages') # TODO possibly /usr/local/modep/mod-ui
@@ -136,12 +139,20 @@ def __init__(self, audiocard, homedir):
136139
self.current_menu = MenuType.MENU_NONE
137140

138141
# This file is modified when the pedalboard is changed via MOD UI
139-
self.pedalboard_modification_file = "/home/pistomp/data/last.json"
142+
self.data_dir = "/home/pistomp/data"
143+
self.pedalboard_modification_file = os.path.join(self.data_dir, "last.json")
140144
self.pedalboard_change_timestamp = os.path.getmtime(self.pedalboard_modification_file)\
141145
if Path(self.pedalboard_modification_file).exists() else 0
142146

143147
self.wifi_manager = Wifi.WifiManager()
144148

149+
# External MIDI device synchronization
150+
self.external_midi = None
151+
try:
152+
self.external_midi = ExternalMidi.ExternalMidiManager()
153+
except Exception as e:
154+
logging.warning(f"Failed to initialize external MIDI manager: {e}")
155+
145156
# Callback function map. Key is the user specified name, value is function from this handler
146157
# Used for calling handler callbacks pointed to by names which may be user set in the config file
147158
self.callbacks = {"set_mod_tap_tempo": self.set_mod_tap_tempo,
@@ -153,10 +164,14 @@ def __del__(self):
153164
logging.info("Handler cleanup")
154165
if self.wifi_manager:
155166
del self.wifi_manager
167+
if self.external_midi is not None:
168+
self.external_midi.close()
156169

157170
def cleanup(self):
158171
if self.lcd is not None:
159172
self.lcd.cleanup()
173+
if self.external_midi is not None:
174+
self.external_midi.close()
160175

161176
# Container for dynamic data which is unique to the "current" pedalboard
162177
# The self.current pointed above will point to this object which gets
@@ -183,6 +198,7 @@ def __init__(self, plugin):
183198

184199
def add_hardware(self, hardware):
185200
self.hardware = hardware
201+
hardware.external_midi = self.external_midi
186202

187203
def add_lcd(self, lcd):
188204
self.lcd = lcd
@@ -534,6 +550,14 @@ def set_current_pedalboard(self, pedalboard):
534550
self.load_current_presets()
535551
self.update_lcd()
536552

553+
# Send external MIDI messages for this pedalboard
554+
# Config was already updated by hardware.reinit(cfg) above
555+
if self.external_midi is not None:
556+
try:
557+
self.external_midi.send_messages_for_pedalboard()
558+
except Exception as e:
559+
logging.warning(f"Failed to send external MIDI messages: {e}")
560+
537561
# Selection info
538562
self.selectable_items.clear()
539563
self.selectable_items.append((SelectedType.PEDALBOARD, None))

0 commit comments

Comments
 (0)