Spaces:
Running
on
Zero
Running
on
Zero
#!/usr/bin/python3 | |
""" | |
A simplified MIDI file parsing and generation module. | |
This module provides functions to convert between MIDI files and two list-based | |
representations: "opus" (delta-time events) and "score" (absolute-time notes). | |
Text events are kept as bytes to handle various encodings (e.g., Shift-JIS) | |
correctly, avoiding decoding errors noted in the original comment. | |
Core functions: | |
- midi2opus: Parse MIDI bytes to an opus structure. | |
- opus2midi: Convert an opus to MIDI bytes. | |
- score2opus: Convert a score to an opus. | |
- opus2score: Convert an opus to a score. | |
- midi2score: Parse MIDI bytes to a score. | |
- score2midi: Convert a score to MIDI bytes. | |
Event formats: | |
- Opus: ['event_type', delta_time, ...] (e.g., ['note_on', 5, 0, 60, 100]) | |
- Score: ['note', start_time, duration, channel, pitch, velocity] for notes, | |
other events retain their opus format with absolute times. | |
Dependencies: Python 3, struct module. | |
""" | |
import struct | |
# Version info | |
__version__ = "1.0" | |
__date__ = "2023-10-01" | |
# --- Helper Functions --- | |
def _read_vlq(data, offset): | |
"""Read a variable-length quantity from data at offset.""" | |
value = 0 | |
while True: | |
byte = data[offset] | |
offset += 1 | |
value = (value << 7) | (byte & 0x7F) | |
if not (byte & 0x80): | |
break | |
return value, offset | |
def _write_vlq(value): | |
"""Write a variable-length quantity as bytes.""" | |
if value == 0: | |
return b'\x00' | |
parts = [] | |
while value > 0: | |
parts.append(value & 0x7F) | |
value >>= 7 | |
parts.reverse() | |
for i in range(len(parts) - 1): | |
parts[i] |= 0x80 | |
return bytes(parts) | |
def _write_14_bit(value): | |
"""Encode a 14-bit value into two bytes.""" | |
return bytes([value & 0x7F, (value >> 7) & 0x7F]) | |
# --- Decoding Functions --- | |
def _decode_track(data): | |
"""Decode MIDI track bytes into a list of opus events.""" | |
events = [] | |
offset = 0 | |
running_status = None | |
while offset < len(data): | |
# Read delta time | |
delta_time, offset = _read_vlq(data, offset) | |
event_type = data[offset] | |
offset += 1 | |
# Handle running status | |
if event_type >= 0x80: | |
status = event_type | |
running_status = status | |
else: | |
if running_status is None: | |
raise ValueError("Running status used without prior status") | |
status = running_status | |
offset -= 1 # Backtrack to use byte as parameter | |
# Channel messages (0x80-0xEF) | |
if 0x80 <= status <= 0xEF: | |
channel = status & 0x0F | |
command = status & 0xF0 | |
if command in (0x80, 0x90, 0xA0, 0xB0, 0xE0): # Two parameters | |
param1, param2 = data[offset], data[offset + 1] | |
offset += 2 | |
if command == 0x80: | |
event = ['note_off', delta_time, channel, param1, param2] | |
elif command == 0x90: | |
event = ['note_on' if param2 > 0 else 'note_off', delta_time, channel, param1, param2] | |
elif command == 0xA0: | |
event = ['key_after_touch', delta_time, channel, param1, param2] | |
elif command == 0xB0: | |
event = ['control_change', delta_time, channel, param1, param2] | |
elif command == 0xE0: | |
pitch = ((param2 << 7) | param1) - 0x2000 | |
event = ['pitch_wheel_change', delta_time, channel, pitch] | |
elif command in (0xC0, 0xD0): # One parameter | |
param1 = data[offset] | |
offset += 1 | |
event = ['patch_change' if command == 0xC0 else 'channel_after_touch', delta_time, channel, param1] | |
events.append(event) | |
# Meta events (0xFF) | |
elif status == 0xFF: | |
meta_type = data[offset] | |
offset += 1 | |
length, offset = _read_vlq(data, offset) | |
meta_data = data[offset:offset + length] | |
offset += length | |
if meta_type == 0x2F: | |
event = ['end_track', delta_time] | |
elif meta_type == 0x51 and length == 3: | |
event = ['set_tempo', delta_time, int.from_bytes(meta_data, 'big')] | |
elif 0x01 <= meta_type <= 0x0F: | |
event = [f'text_event_{meta_type:02x}', delta_time, meta_data] # Text as bytes | |
else: | |
event = ['raw_meta_event', delta_time, meta_type, meta_data] | |
events.append(event) | |
# System exclusive events (0xF0, 0xF7) | |
elif status in (0xF0, 0xF7): | |
length, offset = _read_vlq(data, offset) | |
sysex_data = data[offset:offset + length] | |
offset += length | |
event = ['sysex_f0' if status == 0xF0 else 'sysex_f7', delta_time, sysex_data] | |
events.append(event) | |
else: | |
raise ValueError(f"Unknown status byte: {status:02x}") | |
return events | |
def midi2opus(midi_bytes): | |
"""Convert MIDI bytes to an opus structure: [ticks_per_quarter, [track_events, ...]].""" | |
if not isinstance(midi_bytes, (bytes, bytearray)) or len(midi_bytes) < 14 or midi_bytes[:4] != b'MThd': | |
return [1000, []] # Default empty opus | |
header_size = int.from_bytes(midi_bytes[4:8], 'big') | |
if header_size != 6: | |
raise ValueError("Invalid MIDI header size") | |
ticks_per_quarter = int.from_bytes(midi_bytes[12:14], 'big') | |
num_tracks = int.from_bytes(midi_bytes[10:12], 'big') | |
opus = [ticks_per_quarter] | |
offset = 14 | |
for _ in range(num_tracks): | |
if offset + 8 > len(midi_bytes) or midi_bytes[offset:offset+4] != b'MTrk': | |
break | |
track_size = int.from_bytes(midi_bytes[offset+4:offset+8], 'big') | |
track_data = midi_bytes[offset+8:offset+8+track_size] | |
opus.append(_decode_track(track_data)) | |
offset += 8 + track_size | |
return opus | |
# --- Encoding Functions --- | |
def _encode_track(events): | |
"""Encode a list of opus events into track bytes.""" | |
track_data = bytearray() | |
running_status = None | |
for event in events: | |
event_type, delta_time = event[0], event[1] | |
track_data.extend(_write_vlq(delta_time)) | |
if event_type == 'note_on': | |
status = 0x90 | event[2] | |
params = bytes([event[3], event[4]]) | |
elif event_type == 'note_off': | |
status = 0x80 | event[2] | |
params = bytes([event[3], event[4]]) | |
elif event_type == 'control_change': | |
status = 0xB0 | event[2] | |
params = bytes([event[3], event[4]]) | |
elif event_type == 'patch_change': | |
status = 0xC0 | event[2] | |
params = bytes([event[3]]) | |
elif event_type == 'pitch_wheel_change': | |
status = 0xE0 | event[2] | |
params = _write_14_bit(event[3] + 0x2000) | |
elif event_type.startswith('text_event_'): | |
meta_type = int(event_type.split('_')[-1], 16) | |
text_data = event[2] | |
track_data.extend(b'\xFF' + bytes([meta_type]) + _write_vlq(len(text_data)) + text_data) | |
continue | |
elif event_type == 'set_tempo': | |
tempo = event[2] | |
track_data.extend(b'\xFF\x51\x03' + struct.pack('>I', tempo)[1:]) | |
continue | |
elif event_type == 'end_track': | |
track_data.extend(b'\xFF\x2F\x00') | |
continue | |
else: | |
continue # Skip unsupported events | |
if status != running_status: | |
track_data.append(status) | |
running_status = status | |
track_data.extend(params) | |
return track_data | |
def opus2midi(opus): | |
"""Convert an opus structure to MIDI bytes.""" | |
if len(opus) < 2: | |
opus = [1000, []] | |
ticks_per_quarter = opus[0] | |
tracks = opus[1:] | |
midi_bytes = b'MThd' + struct.pack('>IHHH', 6, 1 if len(tracks) > 1 else 0, len(tracks), ticks_per_quarter) | |
for track in tracks: | |
track_data = _encode_track(track) | |
midi_bytes += b'MTrk' + struct.pack('>I', len(track_data)) + track_data | |
return midi_bytes | |
# --- Score Conversion Functions --- | |
def score2opus(score): | |
"""Convert a score to an opus structure.""" | |
if len(score) < 2: | |
return [1000, []] | |
ticks = score[0] | |
opus = [ticks] | |
for track in score[1:]: | |
time_to_events = {} | |
for event in track: | |
if event[0] == 'note': | |
time_to_events.setdefault(event[1], []).append( | |
['note_on', event[1], event[3], event[4], event[5]]) | |
time_to_events.setdefault(event[1] + event[2], []).append( | |
['note_off', event[1] + event[2], event[3], event[4], 0]) | |
else: | |
time_to_events.setdefault(event[1], []).append(event) | |
sorted_times = sorted(time_to_events.keys()) | |
opus_track = [] | |
abs_time = 0 | |
for time in sorted_times: | |
delta = time - abs_time | |
abs_time = time | |
for evt in time_to_events[time]: | |
evt_copy = evt.copy() | |
evt_copy[1] = delta | |
opus_track.append(evt_copy) | |
delta = 0 # Subsequent events at same time have delta 0 | |
opus.append(opus_track) | |
return opus | |
def opus2score(opus): | |
"""Convert an opus to a score structure.""" | |
if len(opus) < 2: | |
return [1000, []] | |
ticks = opus[0] | |
score = [ticks] | |
for track in opus[1:]: | |
score_track = [] | |
abs_time = 0 | |
pending_notes = {} # (channel, pitch) -> note event | |
for event in track: | |
abs_time += event[1] | |
if event[0] == 'note_on' and event[4] > 0: | |
key = (event[2], event[3]) | |
pending_notes[key] = ['note', abs_time, 0, event[2], event[3], event[4]] | |
elif event[0] in ('note_off',) or (event[0] == 'note_on' and event[4] == 0): | |
key = (event[2], event[3]) | |
if key in pending_notes: | |
note = pending_notes.pop(key) | |
note[2] = abs_time - note[1] | |
score_track.append(note) | |
else: | |
event_copy = event.copy() | |
event_copy[1] = abs_time | |
score_track.append(event_copy) | |
# Handle unterminated notes | |
for note in pending_notes.values(): | |
note[2] = abs_time - note[1] | |
score_track.append(note) | |
score.append(score_track) | |
return score | |
# --- Direct MIDI-Score Conversions --- | |
def midi2score(midi_bytes): | |
"""Convert MIDI bytes directly to a score.""" | |
return opus2score(midi2opus(midi_bytes)) | |
def score2midi(score): | |
"""Convert a score directly to MIDI bytes.""" | |
return opus2midi(score2opus(score)) | |
# --- Example Usage --- | |
if __name__ == "__main__": | |
# Example opus | |
test_opus = [ | |
96, | |
[ | |
['patch_change', 0, 1, 8], | |
['note_on', 5, 1, 60, 96], | |
['note_off', 96, 1, 60, 0], | |
['text_event_01', 0, b'Shift-JIS: \x83e\x83X\x83g'], # Shift-JIS "テスト" (Test) | |
['end_track', 0] | |
] | |
] | |
midi_data = opus2midi(test_opus) | |
with open("test.mid", "wb") as f: | |
f.write(midi_data) | |
# Parse it back | |
parsed_opus = midi2opus(midi_data) | |
score = opus2score(parsed_opus) | |
print("Parsed Score:", score) |