-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathchunking.py
More file actions
152 lines (127 loc) · 5.85 KB
/
chunking.py
File metadata and controls
152 lines (127 loc) · 5.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from langchain_core.documents import Document
import pysrt
import os
def chunk_srt_by_chars(srt_path, max_chars=70000):
subs = pysrt.open(srt_path)
chunks = []
current_chunk = []
current_char_count = 0
for sub in subs:
entry_text = f"{sub.index}\n{sub.start} --> {sub.end}\n{sub.text}\n\n"
if current_char_count + len(entry_text) > max_chars and current_chunk:
chunks.append(current_chunk)
current_chunk = []
current_char_count = 0
current_chunk.append(sub)
current_char_count += len(entry_text)
if current_chunk:
chunks.append(current_chunk)
result = []
for i, chunk in enumerate(chunks):
chunk_text = "\n".join(
f"{sub.index}\n{sub.start} --> {sub.end}\n{sub.text}" for sub in chunk
)
result.append({
"chunk_index": i,
"start_time": str(chunk[0].start),
"end_time": str(chunk[-1].end),
"srt_text": chunk_text
})
return result
def extract_srt_segment(srt_path, start_time_str, end_time_str):
"""
Extracts subtitles between start_time and end_time from an SRT file.
srt_path: Path to the .srt file
start_time_str: Start timestamp as string, format -> "00:12:00,000"
end_time_str: End timestamp as string, format -> "00:15:30,000"
:returns -> List of pysrt.SubRipItem objects within the range
"""
subs = pysrt.open(srt_path)
start = pysrt.SubRipTime.from_string(start_time_str)
end = pysrt.SubRipTime.from_string(end_time_str)
selected = [sub for sub in subs if start <= sub.start <= end]
return selected
def save_srt(subs, output_path):
"""
Saves a list of pysrt.SubRipItem objects to a new .srt file.
subs: List of subtitle items
output_path: Output path to save .srt file
"""
subrip_file = pysrt.SubRipFile(items=subs)
subrip_file.clean_indexes()
subrip_file.save(output_path, encoding='utf-8')
transcription_segments = [
{'text': "Ladies and gentlemen, he's one of the", 'duration': 2.96, 'offset': 0.08, 'lang': 'en'},
{'text': 'most successful artists in the world. Is', 'duration': 4.081, 'offset': 1.439, 'lang': 'en'},
{'text': "my tower tower. Let's say a couple", 'duration': 4.719, 'offset': 3.04, 'lang': 'en'},
{'text': "artists they make songs like I'm out in", 'duration': 6.159, 'offset': 5.52, 'lang': 'en'},
{'text': "the club. I'm by They're not even out to", 'duration': 5.84, 'offset': 7.759, 'lang': 'en'},
{'text': "the wild wild west", 'duration': 2.0, 'offset': 14.0, 'lang': 'en'},
{'text': "where the sun always shines", 'duration': 3.5, 'offset': 16.5, 'lang': 'en'},
{'text': "and the cowboys ride freely", 'duration': 4.0, 'offset': 20.0, 'lang': 'en'},
]
def chunk_transcription_data(segments, max_chunk_chars=200):
"""
Chunks transcription data by combining complete original segments,
dynamically updating metadata (offset, duration) for each chunk.
Args:
segments (list): A list of dictionaries, each representing a speech segment.
max_chunk_chars (int): The maximum character limit for each generated chunk.
Returns:
list: A list of LangChain Document objects, each representing a chunk
with updated metadata.
"""
chunks = []
current_chunk_text = []
current_chunk_start_offset = None
current_chunk_end_offset = None
current_chunk_lang = None
for segment in segments:
segment_text = segment['text']
segment_duration = segment['duration']
segment_offset = segment['offset']
segment_lang = segment['lang']
if (len(" ".join(current_chunk_text)) + len(segment_text) + (1 if current_chunk_text else 0) > max_chunk_chars) \
or (current_chunk_lang is not None and current_chunk_lang != segment_lang):
if current_chunk_text:
chunk_content = " ".join(current_chunk_text)
chunks.append(
Document(
page_content=chunk_content,
metadata={
"source_type": "audio_transcription",
"original_audio_start_offset": current_chunk_start_offset,
"original_audio_end_offset": current_chunk_end_offset,
"language": current_chunk_lang,
"num_original_segments": len(current_chunk_text)
}
)
)
current_chunk_text = []
current_chunk_start_offset = None
current_chunk_end_offset = None
current_chunk_lang = None
current_chunk_text.append(segment_text)
if current_chunk_start_offset is None:
current_chunk_start_offset = segment_offset
current_chunk_end_offset = segment_offset + segment_duration
current_chunk_lang = segment_lang
if current_chunk_text:
chunk_content = " ".join(current_chunk_text)
chunks.append(
Document(
page_content=chunk_content,
metadata={
"source_type": "audio_transcription",
"original_audio_start_offset": current_chunk_start_offset,
"original_audio_end_offset": current_chunk_end_offset,
"language": current_chunk_lang,
"num_original_segments": len(current_chunk_text)
}
)
)
return chunks
if __name__ == '__main__':
print(chunk_srt_by_chars('./ts (a.en).srt')[0])
clip_subs = extract_srt_segment('./ts (a.en).srt' , "00:03:31,920" , "00:05:31,039")
save_srt(clip_subs, "clip_subs.srt")