File size: 4,062 Bytes
3c428bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
input_dir = "<path_to_your_abc_files>"  # Replace with the path to your folder containing standard ABC (.abc) files

import os
import re
import random
from multiprocessing import Pool
from tqdm import tqdm
from abctoolkit.utils import (
    find_all_abc, 
    remove_information_field, 
    remove_bar_no_annotations, 
    Quote_re, 
    Barlines, 
    strip_empty_bars
)
from abctoolkit.rotate import rotate_abc
from abctoolkit.check import check_alignment_unrotated

def abc_pipeline(abc_path, input_dir, output_dir):
    """

    Converts standard ABC notation to interleaved ABC notation.

    """
    with open(abc_path, 'r', encoding='utf-8') as f:
        abc_lines = f.readlines()

    abc_lines = [line for line in abc_lines if line.strip() != '']
    abc_lines = remove_information_field(
        abc_lines=abc_lines, 
        info_fields=['X:', 'T:', 'C:', 'W:', 'w:', 'Z:', '%%MIDI']
    )
    abc_lines = remove_bar_no_annotations(abc_lines)

    # Remove escaped quotes and clean up barlines inside quotes
    for i, line in enumerate(abc_lines):
        if not (re.search(r'^[A-Za-z]:', line) or line.startswith('%')):
            abc_lines[i] = line.replace(r'\"', '')
            quote_contents = re.findall(Quote_re, line)
            for quote_content in quote_contents:
                for barline in Barlines:
                    if barline in quote_content:
                        line = line.replace(quote_content, '')
                        abc_lines[i] = line

    try:
        stripped_abc_lines, bar_counts = strip_empty_bars(abc_lines)
    except Exception as e:
        print(abc_path, 'Error in stripping empty bars:', e)
        return

    if stripped_abc_lines is None:
        print(abc_path, 'Failed to strip')
        return
    
    # Check alignment
    _, bar_no_equal_flag, bar_dur_equal_flag = check_alignment_unrotated(stripped_abc_lines)
    if not bar_no_equal_flag:
        print(abc_path, 'Unequal bar number')
    if not bar_dur_equal_flag:
        print(abc_path, 'Unequal bar duration (unaligned)')

    # Construct the output path, maintaining input folder structure
    relative_path = os.path.relpath(abc_path, input_dir)  # Get relative path from input dir
    output_file_path = os.path.join(output_dir, relative_path)  # Recreate output path
    os.makedirs(os.path.dirname(output_file_path), exist_ok=True)  # Ensure output folder exists

    try:
        rotated_abc_lines = rotate_abc(stripped_abc_lines)
    except Exception as e:
        print(abc_path, 'Error in rotating:', e)
        return

    if rotated_abc_lines is None:
        print(abc_path, 'Failed to rotate')
        return

    with open(output_file_path, 'w', encoding='utf-8') as w:
        w.writelines(rotated_abc_lines)

def abc_pipeline_list(abc_path_list, input_dir, output_dir):
    for abc_path in tqdm(abc_path_list):
        try:
            abc_pipeline(abc_path, input_dir, output_dir)
        except Exception as e:
            print(abc_path, e)
            pass

def batch_abc_pipeline(input_dir):
    """

    Batch process all ABC files from `input_dir`, converting them to interleaved notation.

    """
    output_dir = input_dir + "_interleaved"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir, exist_ok=True)

    abc_path_list = []
    for abc_path in find_all_abc(input_dir):
        if os.path.getsize(abc_path) > 0:
            abc_path_list.append(abc_path)
    random.shuffle(abc_path_list)
    print(f"Found {len(abc_path_list)} ABC files.")

    num_cpus = os.cpu_count()
    split_lists = [[] for _ in range(num_cpus)]
    index = 0

    for abc_path in abc_path_list:
        split_lists[index].append(abc_path)
        index = (index + 1) % num_cpus

    pool = Pool(processes=num_cpus)
    pool.starmap(
        abc_pipeline_list, 
        [(split, input_dir, output_dir) for split in split_lists]
    )

if __name__ == '__main__':
    batch_abc_pipeline(input_dir)