-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patheffects.py
More file actions
235 lines (219 loc) · 11.9 KB
/
effects.py
File metadata and controls
235 lines (219 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import os
import random
from pathlib import Path
from ffmpeg_wrapper import run_ffmpeg
from utils import random_bool_with_prob, scale_level, unique_temp_path
# Registry of effects. Each effect function receives:
# (input_path, tmpdir, level_percent) and returns a tuple:
# (audio_filters_list, video_filters_list, extra_inputs, post_func)
# - audio_filters_list, video_filters_list: strings for ffmpeg -filter_complex usage
# - extra_inputs: list of additional input files to pass to ffmpeg (e.g., overlay images)
# - post_func: optional function to call after ffmpeg runs for further processing (or None)
#
# For simplicity we often return None filters and call a small ffmpeg command directly inside the effect function
# when we need separate steps (stutter loops, sentence mixing).
def _simple_run(input_path, output_path, args):
run_ffmpeg(["-y", "-i", input_path] + args + [output_path])
# Below are a selection of effect implementations. Many effects are approximated using ffmpeg filters.
def effect_reverse(input_path, tmpdir, level):
# Reverse both audio and video
out = unique_temp_path(tmpdir, "rev.mp4")
args = ["-filter_complex", "[0:v]reverse[v];[0:a]areverse[a]", "-map", "[v]", "-map", "[a]"]
_simple_run(input_path, out, args)
return out
def effect_speed_change(input_path, tmpdir, level):
# level: -100..100 where >50 speeds up, <50 slows down. We'll map 0-100 to 0.25x-4x
factor = scale_level(level, 0.25, 4.0)
out = unique_temp_path(tmpdir, "speed.mp4")
# For audio use atempo multiple times if needed (atempo supports 0.5-2.0)
# We'll adjust using asetrate+atempo for extreme.
v_filter = f"setpts={1.0/factor}*PTS"
# audio via atempo chain (approx)
if factor <= 0:
factor = 0.5
atempo_chain = []
remaining = factor
while remaining > 2.0:
atempo_chain.append("atempo=2.0")
remaining /= 2.0
while remaining < 0.5:
atempo_chain.append("atempo=0.5")
remaining *= 2.0
atempo_chain.append(f"atempo={remaining:.3f}")
a_filter = ",".join(atempo_chain)
args = ["-filter_complex", f"[0:v]{v_filter}[v];[0:a]{a_filter}[a]", "-map", "[v]", "-map", "[a]"]
_simple_run(input_path, out, args)
return out
def effect_chorus(input_path, tmpdir, level):
# Use ffmpeg "chorus" audio filter (approx via aecho or afir). We'll use aecho for echo layering.
out = unique_temp_path(tmpdir, "chorus.mp4")
# level maps to delay and decay
delay = int(scale_level(level, 25, 120))
decay = scale_level(level, 0.2, 0.8)
a_filter = f"aecho=0.8:0.9:{delay}|{delay*2}:{decay}|{decay*0.5}"
args = ["-filter_complex", f"[0:v]copy[v];[0:a]{a_filter}[a]", "-map", "[v]", "-map", "[a]"]
_simple_run(input_path, out, args)
return out
def effect_pitch_shift(input_path, tmpdir, level):
# Pitch shift approximated via asetrate + atempo
out = unique_temp_path(tmpdir, "pitch.mp4")
semitones = int(scale_level(level, -12, 12)) # +/- 12 semitones
rate_factor = 2 ** (semitones / 12.0)
a_filter = f"asetrate=44100*{rate_factor},aresample=44100,atempo={1.0/rate_factor}"
# small video jitter for style
v_filter = "eq=saturation=1.2"
args = ["-filter_complex", f"[0:v]{v_filter}[v];[0:a]{a_filter}[a]", "-map", "[v]", "-map", "[a]"]
_simple_run(input_path, out, args)
return out
def effect_stutter(input_path, tmpdir, level):
# Create a stutter: take a short segment and loop it
out = unique_temp_path(tmpdir, "stutter.mp4")
# segment length depends on level (ms)
seg_ms = int(scale_level(level, 40, 500))
# We will use ffmpeg to trim first seg_ms milliseconds and loop it
loop_count = int(scale_level(level, 2, 30))
# Build complex filter: trim, loop via concat
# Simpler approach: create an audio-video of the short segment, then concat it loop_count times and append rest
seg_file = unique_temp_path(tmpdir, "stg_seg.mp4")
# extract first seg_ms ms
_simple_run(input_path, seg_file, ["-ss", "0", "-t", f"0.00{seg_ms/1000.0:.6f}"])
# create list file for concat
list_path = Path(tmpdir) / ("stutter_list.txt")
with open(list_path, "w", encoding="utf-8") as f:
for i in range(loop_count):
f.write(f"file '{seg_file}'\n")
concat_out = unique_temp_path(tmpdir, "stutter_looped.mp4")
run_ffmpeg(["-y", "-f", "concat", "-safe", "0", "-i", str(list_path), "-c", "copy", concat_out])
# Append remainder of original after the stutter loop
final_out = unique_temp_path(tmpdir, "stutter_final.mp4")
# write concat of looped + original
list2 = Path(tmpdir) / ("stutter_list2.txt")
with open(list2, "w", encoding="utf-8") as f:
f.write(f"file '{concat_out}'\n")
f.write(f"file '{input_path}'\n")
run_ffmpeg(["-y", "-f", "concat", "-safe", "0", "-i", str(list2), "-c", "copy", final_out])
return final_out
def effect_earrape(input_path, tmpdir, level):
out = unique_temp_path(tmpdir, "earrape.mp4")
gain_db = scale_level(level, 6, 30) # boost
a_filter = f"volume={gain_db}dB"
# also add distortion by clipping via acompressor etc. Use "acrusher" not available; so heavy volume + highpass
args = ["-filter_complex", f"[0:v]eq=contrast=1.5:saturation=1.3[v];[0:a]{a_filter}[a]", "-map", "[v]", "-map", "[a]"]
_simple_run(input_path, out, args)
return out
def effect_invert_colors(input_path, tmpdir, level):
out = unique_temp_path(tmpdir, "invert.mp4")
# level used to blend with original: 0 no invert, 100 full invert
alpha = scale_level(level, 0.0, 1.0)
# Build overlay that mixes inverted and original
# Use lutrgb to invert: lutrgb="r=negval:g=negval:b=negval"
# We'll use blend between original and inverted via lut and format conversion
args = ["-filter_complex", f"[0:v]lutrgb=r=negval:g=negval:b=negval,format=yuv420p[inv];[0:v][inv]blend=all_expr='A*(1-{alpha})+B*{alpha}'[v]".replace("{alpha}", f"{alpha:.3f}"), "-map", "[v]", "-map", "0:a?"]
_simple_run(input_path, out, args)
return out
def effect_mirror(input_path, tmpdir, level):
out = unique_temp_path(tmpdir, "mirror.mp4")
# level determines how strongly to flip horizontally (binary here)
args = ["-filter_complex", "[0:v]hflip[v]", "-map", "[v]", "-map", "0:a?"]
_simple_run(input_path, out, args)
return out
def effect_frame_shuffle(input_path, tmpdir, level):
out = unique_temp_path(tmpdir, "frameshuf.mp4")
# We'll re-encode and shuffle frames by splitting into segments of N frames and randomizing order
# Simpler approach: use select filter with random function to create a scrambled output
# ffmpeg's select=random(n) won't work deterministically across frames; we use read a list of frames then reassemble
# For performance, implement a simple -vf tblend trick to produce jitter
intensity = scale_level(level, 1, 10)
v_filter = f"tblend=all_mode=average,framestep={int(intensity)}"
args = ["-vf", v_filter]
_simple_run(input_path, out, args)
return out
def effect_rainbow_overlay(input_path, tmpdir, level):
out = unique_temp_path(tmpdir, "rainbow.mp4")
# Generate a color gradient and overlay with blend
alpha = scale_level(level, 0.1, 0.9)
# Use color gradient via colorchannelmixer? Simpler: use color source and overlay with blend
# Create a gradient frame file temporarily
gradient = Path(tmpdir) / "grad.png"
# Create gradient via ffmpeg drawbox approach
grad_tmp = unique_temp_path(tmpdir, "grad.mp4")
# Use lutrgb to tint frames by time
args = ["-filter_complex", f"[0:v]split[v0][v1];[v1]lutparam='r=128+128*sin(T*3)':g='128+128*sin(T*3+2)':b='128+128*sin(T*3+4)'[t];[v0][t]blend=all_mode='overlay':all_opacity={alpha:.3f}[v]","-map","[v]","-map","0:a?"]
_simple_run(input_path, out, args)
return out
def effect_random_sound_injection(input_path, tmpdir, level):
out = unique_temp_path(tmpdir, "rand_sound.mp4")
# We will overlay a short beep or generate a sine wave at random times.
# Generate a tone using ffmpeg "sine" source, then overlay at random position(s)
dur = 2.0
freq = int(scale_level(level, 400, 2000))
tone = unique_temp_path(tmpdir, "tone.wav")
run_ffmpeg(["-y", "-f", "lavfi", "-i", f"sine=frequency={freq}:duration={dur}", tone])
# Overlay at random start time
start = random.uniform(0, 0.7)
# Use amerge/adelay
args = ["-filter_complex", f"[1:a]adelay={int(start*1000)}|{int(start*1000)}[snd];[0:a][snd]amix=inputs=2:dropout_transition=0[a]","-map","0:v","-map","[a]"]
run_ffmpeg(["-y", "-i", input_path, "-i", tone] + args + [out])
return out
# Registry mapping keys to functions and display names
effects_registry = {
"random_sound": {"func": effect_random_sound_injection, "display_name": "Random Sound Injection"},
"reverse": {"func": effect_reverse, "display_name": "Reverse Clip"},
"speed": {"func": effect_speed_change, "display_name": "Speed Up / Slow Down"},
"chorus": {"func": effect_chorus, "display_name": "Chorus / Echo"},
"pitch": {"func": effect_pitch_shift, "display_name": "Pitch Shift / Vibrato"},
"stutter": {"func": effect_stutter, "display_name": "Stutter Loop"},
"earrape": {"func": effect_earrape, "display_name": "Earrape Mode"},
"invert": {"func": effect_invert_colors, "display_name": "Invert Colors"},
"mirror": {"func": effect_mirror, "display_name": "Mirror Mode"},
"frameshuffle": {"func": effect_frame_shuffle, "display_name": "Frame Shuffle"},
"rainbow": {"func": effect_rainbow_overlay, "display_name": "Rainbow Overlay"},
# ... add stubs for many other named features to keep GUI comprehensive
"random_clip_shuffle": {"func": effect_frame_shuffle, "display_name": "Random Clip Shuffle (approx)"},
"green_screen_portal": {"func": effect_mirror, "display_name": "Green Screen Portal (approx)"},
"speed_warp": {"func": effect_speed_change, "display_name": "Speed Warp"},
"dance_squidward": {"func": effect_pitch_shift, "display_name": "Dance & Squidward Mode (approx)"},
"auto_tune_chaos": {"func": effect_pitch_shift, "display_name": "Auto-Tune Chaos (approx)"},
"sonic_spam": {"func": effect_earrape, "display_name": "Explosion Spam / Sonic Spam (approx)"},
"deep_fry": {"func": effect_invert_colors, "display_name": "Deep Fry Vision (approx)"},
"meme_injection": {"func": effect_random_sound_injection, "display_name": "Meme Injection (approx)"},
# Many more keys can be mapped to either new functions or existing approximations.
}
# Orchestration function: sequentially apply selected effects with probability and level scaling
def apply_effects_with_pipeline(input_path, output_path, selected_keys, global_prob, global_max_level, tmpdir):
"""
input_path: input file path
output_path: final desired output
selected_keys: list of effect keys to attempt
global_prob: percentage probability for each selected effect to actually apply
global_max_level: 0-100 maximum level (each effect may randomize relative of this)
tmp = temporary directory
"""
cur = input_path
steps = []
for key in selected_keys:
if key not in effects_registry:
continue
# Determine if effect triggers (based on global_prob)
if not random_bool_with_prob(global_prob):
continue
func = effects_registry[key]["func"]
# Individual effect level is randomized up to global_max_level
level = random.randint(0, max(1, global_max_level))
# Call effect function. They return a new temporary file path
print(f"Applying {key} at level {level} to {cur}")
new_file = func(cur, tmpdir, level)
# If function returned None, skip
if not new_file:
continue
# Prepare for next iteration
cur = new_file
steps.append((key, new_file))
# After pipeline, copy/rename to output_path
if not steps:
# No effects applied: just copy input to output
from shutil import copyfile
copyfile(input_path, output_path)
else:
os.replace(cur, output_path)
return output_path