-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
254 lines (187 loc) · 7.8 KB
/
utils.py
File metadata and controls
254 lines (187 loc) · 7.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import torch
import torchaudio
from pydub import AudioSegment
import numpy as np
import pickle
import json
import glob
import random
import re
import os
import config
def format_feed_audio(audio):
audio = audio.set_frame_rate(config.sample_rate)
audio = audio.set_channels(config.channels)
audio = audio.set_sample_width(config.sample_width)
return audio
def load_audio(filepath):
return format_feed_audio(AudioSegment.from_file(filepath))
def random_file(path, times = 1):
paths = list(glob.glob(path))
indexes = [random.randint(0, len(paths) - 1) for i in range(times)]
return [paths[index] for index in indexes]
def filename_valuable(filename):
file_valuable = re.match(r"([0-9.]+)s-([0-9.]+)s-SPEAKER_([0-9]+)-([a-zA-Z0-9_.]+)", filename).groups()
start_time = float(file_valuable[0])
end_time = float(file_valuable[1])
speaker_id = int(file_valuable[2])
filename = file_valuable[3]
return start_time, end_time, speaker_id, filename
"""
統計自動分類,單個檔名中有幾個說話者
"""
def statistics_speakers(filepath):
name_speakers = {}
for path in glob.glob(filepath):
start_time, end_time, speaker_id, filename = filename_valuable(os.path.basename(path))
if filename not in name_speakers.keys():
name_speakers[filename] = 0
if name_speakers[filename] < speaker_id:
name_speakers[filename] = speaker_id
return name_speakers
def array_to_audio(array):
array = (array * 2 ** 15).astype(np.int16)
audio_bytes = array.tobytes()
return AudioSegment(
audio_bytes,
frame_rate=config.sample_rate,
sample_width=config.sample_width,
channels=array.shape[1]
).set_channels(config.channels)
def audio_to_array(audio):
audio_bytes = audio.raw_data
array = np.frombuffer(audio_bytes, np.int16)
array = array.reshape((-1), audio.channels)
array = array / 2 ** 15
return array
def join_with_fixed_position(audio_paths, split_time = 60 * 1000):
output_audio = AudioSegment.silent(duration=60 * 60 * 1000)
data_mapping = []
join_index = 0
file_paths = audio_paths
if isinstance(audio_paths, str):
file_paths = glob.glob(audio_paths)
for path in file_paths:
start_time, end_time, speaker_id, filename = filename_valuable(os.path.basename(path))
total_time = end_time - start_time
join_audio = load_audio(path)
join_time = join_index * split_time
join_index += 1
output_audio = output_audio.overlay(join_audio, position=join_time)
data_mapping.append({
"name": filename,
"speaker": speaker_id,
"total_time": total_time
})
return output_audio, data_mapping
"""
單個檔案中,每個說話者個隨機選取檔案
"""
def files_all_speaker_random_audio(filepath, pick_times = 1):
mapping = statistics_speakers(filepath)
file_names = mapping.keys()
base_folder = os.path.dirname(filepath)
select_paths = []
for name in file_names:
for speaker_id in range(mapping[name] + 1):
select_file_name = "*s-*s-SPEAKER_" + str(speaker_id).zfill(2) + "-" + name
select_paths.extend(random_file(os.path.join(base_folder, select_file_name), pick_times))
return select_paths
def convert_audio_format():
for path in glob.glob(os.path.join(config.raw_audio_path, "*.wav")):
filename = os.path.basename(path).split(".")[0]
load_audio(path).export(os.path.join(config.train_format_audio_path, filename + ".wav"), format="wav")
def get_mels(paths):
vocoder = torch.hub.load('LewisGet/melgan-neurips', 'load_melgan')
mels = []
for path in paths:
wav, sample_rate = torchaudio.load(path)
mel = vocoder(wav)
if mel.shape[-1] < config.fft_frames:
continue
mel_join = np.concatenate(mels, axis=1)
mel_mean = np.mean(mel_join, axis=1, keepdims=True)
mel_std = np.std(mel_join, axis=1, keepdims=True) + 1e-9
mel_normalized = []
for mel in mels:
mel_normalized.append((mel - mel_mean) / mel_std)
return mel_normalized, mel_mean, mel_std
def save_mels(prefix_name, mels, mean, std):
f = open(os.path.join(config.fft_preprocess_path, prefix_name + "_mels.pkl"), "wb")
pickle.dump(mels, f)
f.close()
np.save(os.path.join(config.fft_preprocess_path, prefix_name + "_mean.npy"), mean)
np.save(os.path.join(config.fft_preprocess_path, prefix_name + "_std.npy"), std)
def load_mels(prefix_name):
f = open(os.path.join(config.fft_preprocess_path, prefix_name + "_mels.pkl"), "rb")
mels = pickle.load(f)
f.close()
mean = np.load(os.path.join(config.fft_preprocess_path, prefix_name + "_mean.npy"))
std = np.load(os.path.join(config.fft_preprocess_path, prefix_name + "_std.npy"))
return mels, mean, std
def fixed_wav_length(wav, size=config.fft_frames * 250):
wav = wav[:, :size]
if wav.shape[1] < size:
wav = torch.nn.functional.pad(wav, (0, size - wav.shape[1]))
return wav
def save_json(data, path):
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
except Exception as e:
raise e
def load_json(path):
if os.path.exists(path):
try:
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
raise e
return None
def split_long_wav(path):
audio = AudioSegment.from_file(path)
filename = os.path.basename(path)
split_parts = 0
for start in range(0, len(audio), config.max_audio_length_ms):
end = start + config.max_audio_length_ms
org_clip = audio[start:end]
feed_clip = format_feed_audio(audio[start:end])
split_name = config.format_long_audio_split_name(filename, split_parts)
org_clip.export(os.path.join(config.raw_audio_path, split_name), format="wav")
feed_clip.export(os.path.join(config.train_format_audio_path, split_name), format="wav")
split_parts += 1
def classify_overlap_dicts(speaker_dicts):
sorted_intervals = sorted(speaker_dicts, key=lambda x: x['s'])
overlapping_groups = []
non_overlapping_intervals = []
current_group = [sorted_intervals[0]]
for i in range(1, len(sorted_intervals)):
next_interval = sorted_intervals[i]
current_group_end_time = current_group[-1]['e']
if next_interval['s'] < current_group_end_time:
current_group.append(next_interval)
else:
if len(current_group) == 1:
non_overlapping_intervals.extend(current_group)
else:
overlapping_groups.append(current_group)
current_group = [next_interval]
if current_group:
if len(current_group) == 1:
non_overlapping_intervals.extend(current_group)
else:
overlapping_groups.append(current_group)
final_overlapping_list = [item for sublist in overlapping_groups for item in sublist]
return final_overlapping_list, non_overlapping_intervals
def split_audio_clips(dicts):
for clip_section in dicts:
filename = os.path.basename(clip_section['path'])
org_file = os.path.join(config.raw_audio_path, filename)
audio = AudioSegment.from_file(org_file)
_start_time, _stop_time = clip_section['s'], clip_section['e']
clip_filename = config.format_clip_name(filename, _start_time, _stop_time)
org_clip = audio[_start_time:_stop_time]
org_clip.export(os.path.join(config.raw_audio_path, clip_filename), format="wav")
format_clip = format_feed_audio(org_clip)
format_clip.export(os.path.join(config.train_format_audio_path, clip_filename), format="wav")