-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtranscoder.go
More file actions
354 lines (301 loc) · 8.46 KB
/
transcoder.go
File metadata and controls
354 lines (301 loc) · 8.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
// transcoder.go - Server-side media transcoding for web compatibility
package main
import (
"context"
"crypto/sha256"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/donomii/clusterF/syncmap"
"github.com/donomii/clusterF/types"
)
type Transcoder struct {
cacheDir string
maxCacheSize int64 // bytes
logger *log.Logger
cacheEntries *syncmap.SyncMap[string, *CacheEntry]
}
type CacheEntry struct {
Path string
Size int64
AccessTime time.Time
InProgress bool
}
type TranscodeRequest struct {
InputPath string
ContentType string
OutputFormat string // "web" for now
}
func NewTranscoder(cacheDir string, maxCacheSize int64, logger *log.Logger) *Transcoder {
if err := os.MkdirAll(cacheDir, 0755); err != nil {
logger.Printf("[TRANSCODE] Failed to create cache dir %s: %v", cacheDir, err)
}
t := &Transcoder{
cacheDir: cacheDir,
maxCacheSize: maxCacheSize,
logger: logger,
cacheEntries: syncmap.NewSyncMap[string, *CacheEntry](),
}
// Load existing cache entries
t.scanCacheDir()
return t
}
// checkFFmpegAvailable verifies ffmpeg is installed and accessible
func (t *Transcoder) checkFFmpegAvailable() bool {
cmd := exec.Command("ffmpeg", "-version")
if err := cmd.Run(); err != nil {
t.logger.Printf("[TRANSCODE] ffmpeg not available: %v", err)
return false
}
return true
}
// generateCacheKey creates a unique cache key for the transcode request
func (t *Transcoder) generateCacheKey(req TranscodeRequest) string {
h := sha256.New()
h.Write([]byte(req.InputPath))
h.Write([]byte(req.ContentType))
h.Write([]byte(req.OutputFormat))
return fmt.Sprintf("%x", h.Sum(nil))[:16]
}
// getCachedPath returns the filesystem path for a cached transcode
func (t *Transcoder) getCachedPath(cacheKey string, outputExt string) string {
return filepath.Join(t.cacheDir, cacheKey+outputExt)
}
// TranscodeToWeb transcodes media to web-compatible format with streaming support
func (t *Transcoder) TranscodeToWeb(ctx context.Context, inputReader io.Reader, req TranscodeRequest) (string, error) {
if !t.checkFFmpegAvailable() {
return "", fmt.Errorf("ffmpeg not available")
}
cacheKey := t.generateCacheKey(req)
// Determine output format and extension
var outputExt string
var ffmpegArgs []string
if strings.HasPrefix(req.ContentType, "video/") {
outputExt = ".mp4"
ffmpegArgs = []string{
"-i", "pipe:0",
"-c:v", "libx264",
"-preset", "fast", // balance speed vs size
"-crf", "23", // good quality
"-c:a", "aac",
"-movflags", "frag_keyframe+empty_moov", // streaming support
"-f", "mp4",
"-y", // overwrite
}
} else if strings.HasPrefix(req.ContentType, "audio/") {
outputExt = ".mp4" // AAC in MP4 container for better seeking
ffmpegArgs = []string{
"-i", "pipe:0",
"-c:a", "aac",
"-b:a", "128k",
"-movflags", "frag_keyframe+empty_moov",
"-f", "mp4",
"-y",
}
} else {
return "", fmt.Errorf("unsupported content type for transcoding: %s", req.ContentType)
}
outputPath := t.getCachedPath(cacheKey, outputExt)
// Check if already cached
entry, exists := t.cacheEntries.Load(cacheKey)
if exists && !entry.InProgress {
// Update access time
entry.AccessTime = time.Now()
t.cacheEntries.Store(cacheKey, entry)
// Verify file still exists
if _, err := os.Stat(outputPath); err == nil {
t.logger.Printf("[TRANSCODE] Cache hit for %s", cacheKey)
return outputPath, nil
} else {
// File missing, remove from cache map
t.cacheEntries.Delete(cacheKey)
}
}
entry = &CacheEntry{
Path: outputPath,
AccessTime: time.Now(),
InProgress: true,
}
t.cacheEntries.Store(cacheKey, entry)
// Ensure we clean up on failure
defer func() {
entry.InProgress = false
t.cacheEntries.Store(cacheKey, entry)
}()
t.logger.Printf("[TRANSCODE] Starting transcode to %s", outputPath)
start := time.Now()
// Create temporary file for input (ffmpeg needs seekable input for some operations)
tempInput, err := os.CreateTemp("", "transcode_input_*")
if err != nil {
return "", fmt.Errorf("failed to create temp input file: %v", err)
}
defer os.Remove(tempInput.Name())
defer tempInput.Close()
// Copy input to temp file
if _, err := io.Copy(tempInput, inputReader); err != nil {
return "", fmt.Errorf("failed to copy input data: %v", err)
}
tempInput.Close()
// Replace pipe:0 with temp file path
for i, arg := range ffmpegArgs {
if arg == "pipe:0" {
ffmpegArgs[i] = tempInput.Name()
break
}
}
// Add output path
ffmpegArgs = append(ffmpegArgs, outputPath)
// Run ffmpeg
cmd := exec.CommandContext(ctx, "ffmpeg", ffmpegArgs...)
// Capture stderr for debugging
stderr, err := cmd.StderrPipe()
if err != nil {
return "", fmt.Errorf("failed to get stderr pipe: %v", err)
}
if err := cmd.Start(); err != nil {
return "", fmt.Errorf("failed to start ffmpeg: %v", err)
}
// Read stderr in goroutine (ffmpeg is verbose)
go func() {
buf := make([]byte, 1024)
for {
n, err := stderr.Read(buf)
if n > 0 {
// Only log errors, not all ffmpeg output
output := string(buf[:n])
if strings.Contains(output, "error") || strings.Contains(output, "Error") {
t.logger.Printf("[TRANSCODE] ffmpeg: %s", strings.TrimSpace(output))
}
}
if err != nil {
break
}
}
}()
if err := cmd.Wait(); err != nil {
os.Remove(outputPath) // Clean up partial file
return "", fmt.Errorf("ffmpeg failed: %v", err)
}
duration := time.Since(start)
// Get file size
if stat, err := os.Stat(outputPath); err == nil {
entry.Size = stat.Size()
t.cacheEntries.Store(cacheKey, entry)
t.logger.Printf("[TRANSCODE] Completed %s in %v (size: %d bytes)",
cacheKey, duration, stat.Size())
} else {
t.logger.Printf("[TRANSCODE] Completed %s in %v", cacheKey, duration)
}
// Clean up cache if needed
go t.cleanupCache()
return outputPath, nil
}
// scanCacheDir loads existing cache entries from disk
func (t *Transcoder) scanCacheDir() {
entries, err := os.ReadDir(t.cacheDir)
if err != nil {
return
}
count := 0
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
if !strings.HasSuffix(name, ".mp4") {
continue
}
cacheKey := strings.TrimSuffix(name, ".mp4")
if len(cacheKey) != 16 { // Our cache keys are 16 hex chars
continue
}
fullPath := filepath.Join(t.cacheDir, name)
if stat, err := os.Stat(fullPath); err == nil {
t.cacheEntries.Store(cacheKey, &CacheEntry{
Path: fullPath,
Size: stat.Size(),
AccessTime: stat.ModTime(),
InProgress: false,
})
count++
}
}
t.logger.Printf("[TRANSCODE] Loaded %d cache entries", count)
}
// cleanupCache removes old entries if cache size exceeds limit
func (t *Transcoder) cleanupCache() {
// Calculate total cache size
var totalSize int64
var entries []struct {
key string
entry *CacheEntry
}
t.cacheEntries.Range(func(key string, entry *CacheEntry) bool {
totalSize += entry.Size
if !entry.InProgress {
entries = append(entries, struct {
key string
entry *CacheEntry
}{key, entry})
}
return true
})
if totalSize <= t.maxCacheSize {
return
}
t.logger.Printf("[TRANSCODE] Cache cleanup: %d bytes (limit: %d bytes)", totalSize, t.maxCacheSize)
// Sort by access time
for i := 0; i < len(entries)-1; i++ {
for j := i + 1; j < len(entries); j++ {
if entries[i].entry.AccessTime.After(entries[j].entry.AccessTime) {
entries[i], entries[j] = entries[j], entries[i]
}
}
}
// Remove oldest entries until under limit
removed := 0
for _, entryWithKey := range entries {
if totalSize <= t.maxCacheSize {
break
}
entry := entryWithKey.entry
size := entry.Size
path := entry.Path
if err := os.Remove(path); err != nil {
t.logger.Printf("[TRANSCODE] Failed to remove cache file %s: %v", path, err)
} else {
t.cacheEntries.Delete(entryWithKey.key)
totalSize -= size
removed++
}
}
if removed > 0 {
t.logger.Printf("[TRANSCODE] Removed %d cache entries, new size: %d bytes", removed, totalSize)
}
}
// GetCacheStats returns cache statistics
func (t *Transcoder) GetCacheStats() types.TranscodeStatistics {
var totalSize int64
var inProgress int
var totalEntries int
t.cacheEntries.Range(func(key string, entry *CacheEntry) bool {
totalSize += entry.Size
if entry.InProgress {
inProgress++
}
totalEntries++
return true
})
return types.TranscodeStatistics{
TotalEntries: totalEntries,
TotalSize: int(totalSize),
MaxSize: int(t.maxCacheSize),
InProgress: inProgress,
CacheDir: t.cacheDir,
}
}