From f845249c8ab105194ed25993db60f7481283e2c8 Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 14:55:58 +0200 Subject: [PATCH 01/16] feat: add Garage artifact cache and queue backpressure --- README.md | 4 +- build.gradle.kts | 1 + .../typetype/downloader/ApplicationModule.kt | 12 ++- .../dev/typetype/downloader/db/Database.kt | 10 ++ .../dev/typetype/downloader/db/JobRow.kt | 4 + .../typetype/downloader/db/JobRowMapper.kt | 16 ++++ .../typetype/downloader/db/JobsRepository.kt | 94 ++++++++++++++----- .../downloader/models/CreateJobResponse.kt | 5 +- .../typetype/downloader/models/JobResponse.kt | 2 + .../services/GarageStorageService.kt | 69 ++++++++++++++ .../downloader/services/JobCacheKey.kt | 11 +++ .../downloader/services/JobService.kt | 35 ++++++- .../typetype/downloader/services/JobWorker.kt | 64 +++++++++---- .../services/QueueSaturatedException.kt | 3 + .../downloader/services/StorageArtifact.kt | 8 ++ 15 files changed, 289 insertions(+), 49 deletions(-) create mode 100644 src/main/kotlin/dev/typetype/downloader/db/JobRowMapper.kt create mode 100644 src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt create mode 100644 src/main/kotlin/dev/typetype/downloader/services/JobCacheKey.kt create mode 100644 src/main/kotlin/dev/typetype/downloader/services/QueueSaturatedException.kt create mode 100644 src/main/kotlin/dev/typetype/downloader/services/StorageArtifact.kt diff --git a/README.md b/README.md index 550104c..3f07e76 100644 --- a/README.md +++ b/README.md @@ -90,8 +90,8 @@ All configuration is via environment variables. ## API - `GET /health` -- `POST /jobs` with `{ "url": "..." }` -- `GET /jobs/{id}` +- `POST /jobs` with `{ "url": "..." }` returns `{ "id": "...", "cached": false|true }` +- `GET /jobs/{id}` returns one of `queued|running|done|failed` and includes a signed `artifactUrl` when available ## License diff --git a/build.gradle.kts b/build.gradle.kts index d24912b..9d0bcaa 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -22,6 +22,7 @@ dependencies { implementation("org.postgresql:postgresql:42.7.8") implementation("com.zaxxer:HikariCP:6.3.0") implementation("redis.clients:jedis:6.1.0") + implementation("software.amazon.awssdk:s3:2.31.69") implementation("ch.qos.logback:logback-classic:1.5.20") testImplementation(kotlin("test")) } diff --git a/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt b/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt index 6374826..ce6ebda 100644 --- a/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt +++ b/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt @@ -7,6 +7,8 @@ import dev.typetype.downloader.routes.healthRoutes import dev.typetype.downloader.routes.jobRoutes import dev.typetype.downloader.services.JobService import dev.typetype.downloader.services.JobWorker +import dev.typetype.downloader.services.QueueSaturatedException +import dev.typetype.downloader.services.GarageStorageService import dev.typetype.downloader.services.YtDlpService import io.ktor.http.HttpStatusCode import io.ktor.serialization.kotlinx.json.json @@ -24,15 +26,20 @@ fun Application.module() { val config = AppConfigLoader.load() Database.init(config) val redis = JedisPooled(config.redisHost, config.redisPort) + val storage = GarageStorageService(config) + storage.ensureBucket() val jobsRepository = JobsRepository() val ytDlpService = YtDlpService(config) - val jobService = JobService(jobsRepository, redis, config) - val worker = JobWorker(jobsRepository, redis, ytDlpService, config) + val jobService = JobService(jobsRepository, redis, storage, config) + val worker = JobWorker(jobsRepository, redis, ytDlpService, storage, config) worker.start() install(CallLogging) install(ContentNegotiation) { json() } install(StatusPages) { + exception { call, cause -> + call.respond(HttpStatusCode.TooManyRequests, mapOf("error" to (cause.message ?: "queue saturated"))) + } exception { call, cause -> call.respond(HttpStatusCode.BadRequest, mapOf("error" to (cause.message ?: "bad request"))) } @@ -44,6 +51,7 @@ fun Application.module() { } monitor.subscribe(ApplicationStopping) { + storage.close() redis.close() Database.close() } diff --git a/src/main/kotlin/dev/typetype/downloader/db/Database.kt b/src/main/kotlin/dev/typetype/downloader/db/Database.kt index db89696..12e760c 100644 --- a/src/main/kotlin/dev/typetype/downloader/db/Database.kt +++ b/src/main/kotlin/dev/typetype/downloader/db/Database.kt @@ -25,17 +25,27 @@ object Database { CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, source_url TEXT NOT NULL, + cache_key TEXT NOT NULL, status TEXT NOT NULL, duration_ms BIGINT NOT NULL, title TEXT NOT NULL, error TEXT, + artifact_key TEXT, + artifact_expires_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), started_at TIMESTAMPTZ, finished_at TIMESTAMPTZ ) """.trimIndent() ) + statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS cache_key TEXT") + statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS artifact_key TEXT") + statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS artifact_expires_at TIMESTAMPTZ") + statement.execute("UPDATE jobs SET cache_key = id WHERE cache_key IS NULL") + statement.execute("ALTER TABLE jobs ALTER COLUMN cache_key SET NOT NULL") statement.execute("CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)") + statement.execute("CREATE INDEX IF NOT EXISTS idx_jobs_cache_key ON jobs(cache_key)") + statement.execute("CREATE INDEX IF NOT EXISTS idx_jobs_artifact_expiry ON jobs(artifact_expires_at)") } } } diff --git a/src/main/kotlin/dev/typetype/downloader/db/JobRow.kt b/src/main/kotlin/dev/typetype/downloader/db/JobRow.kt index 7fbf0b8..eb28c61 100644 --- a/src/main/kotlin/dev/typetype/downloader/db/JobRow.kt +++ b/src/main/kotlin/dev/typetype/downloader/db/JobRow.kt @@ -1,12 +1,16 @@ package dev.typetype.downloader.db import dev.typetype.downloader.models.JobStatus +import java.time.Instant data class JobRow( val id: String, val url: String, + val cacheKey: String, val status: JobStatus, val durationMs: Long, val title: String, val error: String?, + val artifactKey: String?, + val artifactExpiresAt: Instant?, ) diff --git a/src/main/kotlin/dev/typetype/downloader/db/JobRowMapper.kt b/src/main/kotlin/dev/typetype/downloader/db/JobRowMapper.kt new file mode 100644 index 0000000..54a5b61 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/db/JobRowMapper.kt @@ -0,0 +1,16 @@ +package dev.typetype.downloader.db + +import dev.typetype.downloader.models.JobStatus +import java.sql.ResultSet + +fun rowFrom(rs: ResultSet): JobRow = JobRow( + id = rs.getString("id"), + url = rs.getString("source_url"), + cacheKey = rs.getString("cache_key"), + status = JobStatus.valueOf(rs.getString("status")), + durationMs = rs.getLong("duration_ms"), + title = rs.getString("title"), + error = rs.getString("error"), + artifactKey = rs.getString("artifact_key"), + artifactExpiresAt = rs.getTimestamp("artifact_expires_at")?.toInstant(), +) diff --git a/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt b/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt index 94c91a2..b5f3683 100644 --- a/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt +++ b/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt @@ -1,43 +1,82 @@ package dev.typetype.downloader.db import dev.typetype.downloader.models.JobStatus +import java.time.Instant class JobsRepository { - fun insertQueued(id: String, url: String) { + fun insertQueued(id: String, url: String, cacheKey: String) { Database.withConnection { connection -> connection.prepareStatement( - "INSERT INTO jobs (id, source_url, status, duration_ms, title, error) VALUES (?, ?, ?, ?, ?, ?)" + """ + INSERT INTO jobs (id, source_url, cache_key, status, duration_ms, title, error) + VALUES (?, ?, ?, ?, ?, ?, ?) + """.trimIndent() ).use { statement -> statement.setString(1, id) statement.setString(2, url) - statement.setString(3, JobStatus.QUEUED.name) - statement.setLong(4, 0L) - statement.setString(5, "") - statement.setString(6, null) + statement.setString(3, cacheKey) + statement.setString(4, JobStatus.QUEUED.name) + statement.setLong(5, 0L) + statement.setString(6, "") + statement.setString(7, null) statement.executeUpdate() } } } - + fun insertDoneFromCache(id: String, url: String, cached: JobRow) { + Database.withConnection { connection -> + connection.prepareStatement( + """ + INSERT INTO jobs (id, source_url, cache_key, status, duration_ms, title, error, artifact_key, artifact_expires_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """.trimIndent() + ).use { statement -> + statement.setString(1, id) + statement.setString(2, url) + statement.setString(3, cached.cacheKey) + statement.setString(4, JobStatus.DONE.name) + statement.setLong(5, 0L) + statement.setString(6, cached.title) + statement.setString(7, null) + statement.setString(8, cached.artifactKey) + statement.setObject(9, cached.artifactExpiresAt) + statement.executeUpdate() + } + } + } + fun findReusableByCacheKey(cacheKey: String): JobRow? = Database.withConnection { connection -> + connection.prepareStatement( + """ + SELECT id, source_url, cache_key, status, duration_ms, title, error, artifact_key, artifact_expires_at + FROM jobs + WHERE cache_key = ? AND status = ? AND artifact_key IS NOT NULL AND artifact_expires_at > NOW() + ORDER BY finished_at DESC NULLS LAST + LIMIT 1 + """.trimIndent() + ).use { statement -> + statement.setString(1, cacheKey) + statement.setString(2, JobStatus.DONE.name) + statement.executeQuery().use { rs -> + if (!rs.next()) return@use null + rowFrom(rs) + } + } + } fun getById(id: String): JobRow? = Database.withConnection { connection -> connection.prepareStatement( - "SELECT id, source_url, status, duration_ms, title, error FROM jobs WHERE id = ?" + """ + SELECT id, source_url, cache_key, status, duration_ms, title, error, artifact_key, artifact_expires_at + FROM jobs + WHERE id = ? + """.trimIndent() ).use { statement -> statement.setString(1, id) statement.executeQuery().use { rs -> if (!rs.next()) return@use null - JobRow( - id = rs.getString("id"), - url = rs.getString("source_url"), - status = JobStatus.valueOf(rs.getString("status")), - durationMs = rs.getLong("duration_ms"), - title = rs.getString("title"), - error = rs.getString("error"), - ) + rowFrom(rs) } } } - fun markRunning(id: String) { Database.withConnection { connection -> connection.prepareStatement( @@ -49,17 +88,30 @@ class JobsRepository { } } } - - fun markFinished(id: String, status: JobStatus, durationMs: Long, title: String, error: String?) { + fun markFinished( + id: String, + status: JobStatus, + durationMs: Long, + title: String, + error: String?, + artifactKey: String?, + artifactExpiresAt: Instant?, + ) { Database.withConnection { connection -> connection.prepareStatement( - "UPDATE jobs SET status = ?, duration_ms = ?, title = ?, error = ?, finished_at = NOW() WHERE id = ?" + """ + UPDATE jobs + SET status = ?, duration_ms = ?, title = ?, error = ?, artifact_key = ?, artifact_expires_at = ?, finished_at = NOW() + WHERE id = ? + """.trimIndent() ).use { statement -> statement.setString(1, status.name) statement.setLong(2, durationMs) statement.setString(3, title) statement.setString(4, error) - statement.setString(5, id) + statement.setString(5, artifactKey) + statement.setObject(6, artifactExpiresAt) + statement.setString(7, id) statement.executeUpdate() } } diff --git a/src/main/kotlin/dev/typetype/downloader/models/CreateJobResponse.kt b/src/main/kotlin/dev/typetype/downloader/models/CreateJobResponse.kt index c5a0586..22c382f 100644 --- a/src/main/kotlin/dev/typetype/downloader/models/CreateJobResponse.kt +++ b/src/main/kotlin/dev/typetype/downloader/models/CreateJobResponse.kt @@ -3,4 +3,7 @@ package dev.typetype.downloader.models import kotlinx.serialization.Serializable @Serializable -data class CreateJobResponse(val id: String) +data class CreateJobResponse( + val id: String, + val cached: Boolean, +) diff --git a/src/main/kotlin/dev/typetype/downloader/models/JobResponse.kt b/src/main/kotlin/dev/typetype/downloader/models/JobResponse.kt index 72672ca..574179e 100644 --- a/src/main/kotlin/dev/typetype/downloader/models/JobResponse.kt +++ b/src/main/kotlin/dev/typetype/downloader/models/JobResponse.kt @@ -10,4 +10,6 @@ data class JobResponse( val durationMs: Long, val title: String, val error: String? = null, + val artifactUrl: String? = null, + val artifactExpiresAt: String? = null, ) diff --git a/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt b/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt new file mode 100644 index 0000000..110232f --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt @@ -0,0 +1,69 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.config.AppConfig +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider +import software.amazon.awssdk.core.sync.RequestBody +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.S3Configuration +import software.amazon.awssdk.services.s3.model.CreateBucketRequest +import software.amazon.awssdk.services.s3.model.GetObjectRequest +import software.amazon.awssdk.services.s3.model.HeadBucketRequest +import software.amazon.awssdk.services.s3.model.PutObjectRequest +import software.amazon.awssdk.services.s3.model.S3Exception +import software.amazon.awssdk.services.s3.presigner.S3Presigner +import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest +import java.net.URI +import java.time.Duration + +class GarageStorageService(config: AppConfig) { + private val endpoint = URI(config.s3Endpoint) + private val region = Region.of(config.s3Region) + private val bucket = config.s3Bucket + private val credentials = StaticCredentialsProvider.create( + AwsBasicCredentials.create(config.s3AccessKey, config.s3SecretKey) + ) + + private val s3: S3Client = S3Client.builder() + .endpointOverride(endpoint) + .region(region) + .credentialsProvider(credentials) + .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .build() + + private val presigner: S3Presigner = S3Presigner.builder() + .endpointOverride(endpoint) + .region(region) + .credentialsProvider(credentials) + .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .build() + + fun ensureBucket() { + try { + s3.headBucket(HeadBucketRequest.builder().bucket(bucket).build()) + } catch (error: S3Exception) { + if (error.statusCode() == 404) { + s3.createBucket(CreateBucketRequest.builder().bucket(bucket).build()) + } else { + throw error + } + } + } + + fun putBytes(objectKey: String, body: ByteArray, contentType: String) { + val request = PutObjectRequest.builder().bucket(bucket).key(objectKey).contentType(contentType).build() + s3.putObject(request, RequestBody.fromBytes(body)) + } + + fun presignGet(objectKey: String, duration: Duration): String { + val getRequest = GetObjectRequest.builder().bucket(bucket).key(objectKey).build() + val presignRequest = GetObjectPresignRequest.builder().signatureDuration(duration).getObjectRequest(getRequest).build() + return presigner.presignGetObject(presignRequest).url().toString() + } + + fun close() { + presigner.close() + s3.close() + } +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobCacheKey.kt b/src/main/kotlin/dev/typetype/downloader/services/JobCacheKey.kt new file mode 100644 index 0000000..2f80c1c --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/JobCacheKey.kt @@ -0,0 +1,11 @@ +package dev.typetype.downloader.services + +import java.security.MessageDigest + +object JobCacheKey { + fun fromUrl(url: String): String { + val digest = MessageDigest.getInstance("SHA-256") + val bytes = digest.digest(url.trim().toByteArray()) + return bytes.joinToString("") { "%02x".format(it) } + } +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt index dc842f8..fdf82a1 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt @@ -7,25 +7,39 @@ import dev.typetype.downloader.models.CreateJobResponse import dev.typetype.downloader.models.JobResponse import redis.clients.jedis.JedisPooled import java.net.URI +import java.time.Duration +import java.time.Instant import java.util.UUID class JobService( private val jobsRepository: JobsRepository, private val redis: JedisPooled, + private val storageService: GarageStorageService, private val config: AppConfig, ) { fun enqueue(url: String): CreateJobResponse { validateUrl(url) + val cacheKey = JobCacheKey.fromUrl(url) val id = UUID.randomUUID().toString() - jobsRepository.insertQueued(id = id, url = url) + val reusable = jobsRepository.findReusableByCacheKey(cacheKey) + if (reusable != null) { + jobsRepository.insertDoneFromCache(id = id, url = url, cached = reusable) + redis.setex(redisJobKey(id), config.jobTtlSeconds, "done:cached") + return CreateJobResponse(id = id, cached = true) + } + val queueSize = redis.llen(config.redisQueueKey) + if (queueSize >= config.maxQueueSize) { + throw QueueSaturatedException("Queue is full") + } + jobsRepository.insertQueued(id = id, url = url, cacheKey = cacheKey) redis.rpush(config.redisQueueKey, id) - redis.setex(redisJobKey(id), 600, "queued") - return CreateJobResponse(id = id) + redis.setex(redisJobKey(id), config.jobTtlSeconds, "queued") + return CreateJobResponse(id = id, cached = false) } fun get(id: String): JobResponse? { val row = jobsRepository.getById(id) ?: return null - return row.toResponse() + return row.toResponse(presignUrl(row), row.artifactExpiresAt?.toString()) } private fun validateUrl(url: String) { @@ -41,12 +55,23 @@ class JobService( private fun redisJobKey(id: String): String = "downloader:job:$id" - private fun JobRow.toResponse(): JobResponse = JobResponse( + private fun presignUrl(row: JobRow): String? { + val key = row.artifactKey ?: return null + val expiresAt = row.artifactExpiresAt ?: return null + val now = Instant.now() + if (expiresAt <= now) return null + val seconds = Duration.between(now, expiresAt).seconds.coerceIn(1, 900) + return storageService.presignGet(key, Duration.ofSeconds(seconds)) + } + + private fun JobRow.toResponse(artifactUrl: String?, artifactExpiresAt: String?): JobResponse = JobResponse( id = id, url = url, status = status, durationMs = durationMs, title = title, error = error, + artifactUrl = artifactUrl, + artifactExpiresAt = artifactExpiresAt, ) } diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt b/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt index fa65337..fa75f95 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt @@ -4,20 +4,24 @@ import dev.typetype.downloader.config.AppConfig import dev.typetype.downloader.db.JobsRepository import dev.typetype.downloader.models.JobStatus import redis.clients.jedis.JedisPooled +import java.time.Instant import kotlin.concurrent.thread class JobWorker( private val jobsRepository: JobsRepository, private val redis: JedisPooled, private val ytDlpService: YtDlpService, + private val storageService: GarageStorageService, private val config: AppConfig, ) { fun start() { - thread(name = "job-worker", isDaemon = true) { - while (true) { - val item = redis.blpop(0, config.redisQueueKey) ?: continue - val id = item.getOrNull(1) ?: continue - process(id) + repeat(config.maxConcurrentWorkers) { index -> + thread(name = "job-worker-$index", isDaemon = true) { + while (true) { + val item = redis.blpop(0, config.redisQueueKey) ?: continue + val id = item.getOrNull(1) ?: continue + process(id) + } } } } @@ -25,19 +29,43 @@ class JobWorker( private fun process(id: String) { val job = jobsRepository.getById(id) ?: return jobsRepository.markRunning(id) - redis.setex(redisJobKey(id), 600, "running") - val startedAt = System.nanoTime() - val result = ytDlpService.extractTitle(job.url) - val durationMs = (System.nanoTime() - startedAt) / 1_000_000 - val status = if (result.error == null) JobStatus.DONE else JobStatus.FAILED - jobsRepository.markFinished( - id = id, - status = status, - durationMs = durationMs, - title = result.title, - error = result.error, - ) - redis.setex(redisJobKey(id), 600, "${status.name.lowercase()}:$durationMs") + redis.setex(redisJobKey(id), config.jobTtlSeconds, "running") + try { + val startedAt = System.nanoTime() + val result = ytDlpService.extractTitle(job.url) + val durationMs = (System.nanoTime() - startedAt) / 1_000_000 + val status = if (result.error == null) JobStatus.DONE else JobStatus.FAILED + val artifact = if (status == JobStatus.DONE) uploadArtifact(job.id, job.url, job.cacheKey, result.title) else null + jobsRepository.markFinished( + id = id, + status = status, + durationMs = durationMs, + title = result.title, + error = result.error, + artifactKey = artifact?.objectKey, + artifactExpiresAt = artifact?.expiresAt, + ) + redis.setex(redisJobKey(id), config.jobTtlSeconds, "${status.name.lowercase()}:$durationMs") + } catch (error: Throwable) { + jobsRepository.markFinished( + id = id, + status = JobStatus.FAILED, + durationMs = 0, + title = "", + error = error.message ?: "worker failed", + artifactKey = null, + artifactExpiresAt = null, + ) + redis.setex(redisJobKey(id), config.jobTtlSeconds, "failed:0") + } + } + + private fun uploadArtifact(id: String, url: String, cacheKey: String, title: String): StorageArtifact { + val expiresAt = Instant.now().plusSeconds(config.s3ArtifactTtlSeconds) + val objectKey = "cache/$cacheKey.txt" + val payload = "id=$id\nurl=$url\ntitle=$title\n" + storageService.putBytes(objectKey, payload.toByteArray(), "text/plain") + return StorageArtifact(objectKey = objectKey, expiresAt = expiresAt) } private fun redisJobKey(id: String): String = "downloader:job:$id" diff --git a/src/main/kotlin/dev/typetype/downloader/services/QueueSaturatedException.kt b/src/main/kotlin/dev/typetype/downloader/services/QueueSaturatedException.kt new file mode 100644 index 0000000..2bd0d06 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/QueueSaturatedException.kt @@ -0,0 +1,3 @@ +package dev.typetype.downloader.services + +class QueueSaturatedException(message: String) : RuntimeException(message) diff --git a/src/main/kotlin/dev/typetype/downloader/services/StorageArtifact.kt b/src/main/kotlin/dev/typetype/downloader/services/StorageArtifact.kt new file mode 100644 index 0000000..c0fe649 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/StorageArtifact.kt @@ -0,0 +1,8 @@ +package dev.typetype.downloader.services + +import java.time.Instant + +data class StorageArtifact( + val objectKey: String, + val expiresAt: Instant, +) From 8d6c6809bea4f7ebb9f78b9c43a790a94fe7d4b9 Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 14:56:08 +0200 Subject: [PATCH 02/16] test: add config and cache key coverage --- .../downloader/config/AppConfigLoaderTest.kt | 27 +++++++++++++++++ .../downloader/services/JobCacheKeyTest.kt | 29 +++++++++++++++++++ 2 files changed, 56 insertions(+) create mode 100644 src/test/kotlin/dev/typetype/downloader/config/AppConfigLoaderTest.kt create mode 100644 src/test/kotlin/dev/typetype/downloader/services/JobCacheKeyTest.kt diff --git a/src/test/kotlin/dev/typetype/downloader/config/AppConfigLoaderTest.kt b/src/test/kotlin/dev/typetype/downloader/config/AppConfigLoaderTest.kt new file mode 100644 index 0000000..2044b21 --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/config/AppConfigLoaderTest.kt @@ -0,0 +1,27 @@ +package dev.typetype.downloader.config + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class AppConfigLoaderTest { + @Test + fun `loads non-empty default config`() { + val config = AppConfigLoader.load() + assertTrue(config.httpPort > 0) + assertTrue(config.dbUrl.isNotBlank()) + assertTrue(config.redisQueueKey.isNotBlank()) + assertTrue(config.ytdlpBin.isNotBlank()) + assertTrue(config.s3Endpoint.isNotBlank()) + assertTrue(config.maxConcurrentWorkers >= 1) + assertTrue(config.maxQueueSize >= 1) + assertTrue(config.jobTtlSeconds >= 1) + } + + @Test + fun `default region and bucket stay stable`() { + val config = AppConfigLoader.load() + assertEquals("garage", config.s3Region) + assertEquals("typetype-downloads", config.s3Bucket) + } +} diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobCacheKeyTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobCacheKeyTest.kt new file mode 100644 index 0000000..28f4be8 --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/services/JobCacheKeyTest.kt @@ -0,0 +1,29 @@ +package dev.typetype.downloader.services + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotEquals + +class JobCacheKeyTest { + @Test + fun `same URL yields same key`() { + val url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + val first = JobCacheKey.fromUrl(url) + val second = JobCacheKey.fromUrl(url) + assertEquals(first, second) + } + + @Test + fun `trimmed URL keeps same key`() { + val clean = JobCacheKey.fromUrl("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + val padded = JobCacheKey.fromUrl(" https://www.youtube.com/watch?v=dQw4w9WgXcQ ") + assertEquals(clean, padded) + } + + @Test + fun `different URLs yield different keys`() { + val first = JobCacheKey.fromUrl("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + val second = JobCacheKey.fromUrl("https://www.youtube.com/watch?v=oHg5SJYRHA0") + assertNotEquals(first, second) + } +} From 8aa74fbe4fb73b99a79bafff78d4d247d3aa6c7b Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 15:26:03 +0200 Subject: [PATCH 03/16] feat: normalize wrapped URLs and integrate tokenized downloads --- .../typetype/downloader/ApplicationModule.kt | 4 +- .../typetype/downloader/routes/JobRoutes.kt | 17 +++++ .../services/GarageStorageService.kt | 6 ++ .../downloader/services/JobService.kt | 9 ++- .../typetype/downloader/services/JobWorker.kt | 34 +++++++-- .../downloader/services/SourceUrlResolver.kt | 63 ++++++++++++++++ .../downloader/services/TokenPayload.kt | 6 ++ .../downloader/services/TokenServiceClient.kt | 48 ++++++++++++ .../downloader/services/YtDlpService.kt | 74 +++++++++++++++---- 9 files changed, 234 insertions(+), 27 deletions(-) create mode 100644 src/main/kotlin/dev/typetype/downloader/services/SourceUrlResolver.kt create mode 100644 src/main/kotlin/dev/typetype/downloader/services/TokenPayload.kt create mode 100644 src/main/kotlin/dev/typetype/downloader/services/TokenServiceClient.kt diff --git a/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt b/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt index ce6ebda..15b8203 100644 --- a/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt +++ b/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt @@ -9,6 +9,7 @@ import dev.typetype.downloader.services.JobService import dev.typetype.downloader.services.JobWorker import dev.typetype.downloader.services.QueueSaturatedException import dev.typetype.downloader.services.GarageStorageService +import dev.typetype.downloader.services.TokenServiceClient import dev.typetype.downloader.services.YtDlpService import io.ktor.http.HttpStatusCode import io.ktor.serialization.kotlinx.json.json @@ -30,8 +31,9 @@ fun Application.module() { storage.ensureBucket() val jobsRepository = JobsRepository() val ytDlpService = YtDlpService(config) + val tokenServiceClient = TokenServiceClient(config) val jobService = JobService(jobsRepository, redis, storage, config) - val worker = JobWorker(jobsRepository, redis, ytDlpService, storage, config) + val worker = JobWorker(jobsRepository, redis, ytDlpService, tokenServiceClient, storage, config) worker.start() install(CallLogging) diff --git a/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt b/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt index 51b85ed..85b7941 100644 --- a/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt +++ b/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt @@ -5,6 +5,7 @@ import dev.typetype.downloader.services.JobService import io.ktor.http.HttpStatusCode import io.ktor.server.request.receive import io.ktor.server.response.respond +import io.ktor.server.response.respondRedirect import io.ktor.server.routing.Route import io.ktor.server.routing.get import io.ktor.server.routing.post @@ -30,4 +31,20 @@ fun Route.jobRoutes(jobService: JobService) { ) call.respond(HttpStatusCode.OK, job) } + + get("/jobs/{id}/artifact") { + val id = call.parameters["id"] ?: return@get call.respond( + HttpStatusCode.BadRequest, + mapOf("error" to "id is required"), + ) + val job = jobService.get(id) ?: return@get call.respond( + HttpStatusCode.NotFound, + mapOf("error" to "job not found"), + ) + val artifactUrl = job.artifactUrl ?: return@get call.respond( + HttpStatusCode.Conflict, + mapOf("error" to "artifact not ready"), + ) + call.respondRedirect(artifactUrl, permanent = false) + } } diff --git a/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt b/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt index 110232f..87176da 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt @@ -15,6 +15,7 @@ import software.amazon.awssdk.services.s3.model.S3Exception import software.amazon.awssdk.services.s3.presigner.S3Presigner import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest import java.net.URI +import java.nio.file.Path import java.time.Duration class GarageStorageService(config: AppConfig) { @@ -56,6 +57,11 @@ class GarageStorageService(config: AppConfig) { s3.putObject(request, RequestBody.fromBytes(body)) } + fun putFile(objectKey: String, filePath: Path, contentType: String) { + val request = PutObjectRequest.builder().bucket(bucket).key(objectKey).contentType(contentType).build() + s3.putObject(request, RequestBody.fromFile(filePath)) + } + fun presignGet(objectKey: String, duration: Duration): String { val getRequest = GetObjectRequest.builder().bucket(bucket).key(objectKey).build() val presignRequest = GetObjectPresignRequest.builder().signatureDuration(duration).getObjectRequest(getRequest).build() diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt index fdf82a1..7500747 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt @@ -18,12 +18,13 @@ class JobService( private val config: AppConfig, ) { fun enqueue(url: String): CreateJobResponse { - validateUrl(url) - val cacheKey = JobCacheKey.fromUrl(url) + val resolvedUrl = SourceUrlResolver.resolve(url) + validateUrl(resolvedUrl) + val cacheKey = JobCacheKey.fromUrl(resolvedUrl) val id = UUID.randomUUID().toString() val reusable = jobsRepository.findReusableByCacheKey(cacheKey) if (reusable != null) { - jobsRepository.insertDoneFromCache(id = id, url = url, cached = reusable) + jobsRepository.insertDoneFromCache(id = id, url = resolvedUrl, cached = reusable) redis.setex(redisJobKey(id), config.jobTtlSeconds, "done:cached") return CreateJobResponse(id = id, cached = true) } @@ -31,7 +32,7 @@ class JobService( if (queueSize >= config.maxQueueSize) { throw QueueSaturatedException("Queue is full") } - jobsRepository.insertQueued(id = id, url = url, cacheKey = cacheKey) + jobsRepository.insertQueued(id = id, url = resolvedUrl, cacheKey = cacheKey) redis.rpush(config.redisQueueKey, id) redis.setex(redisJobKey(id), config.jobTtlSeconds, "queued") return CreateJobResponse(id = id, cached = false) diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt b/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt index fa75f95..5fce4f7 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt @@ -4,6 +4,7 @@ import dev.typetype.downloader.config.AppConfig import dev.typetype.downloader.db.JobsRepository import dev.typetype.downloader.models.JobStatus import redis.clients.jedis.JedisPooled +import java.nio.file.Files import java.time.Instant import kotlin.concurrent.thread @@ -11,6 +12,7 @@ class JobWorker( private val jobsRepository: JobsRepository, private val redis: JedisPooled, private val ytDlpService: YtDlpService, + private val tokenServiceClient: TokenServiceClient, private val storageService: GarageStorageService, private val config: AppConfig, ) { @@ -32,10 +34,15 @@ class JobWorker( redis.setex(redisJobKey(id), config.jobTtlSeconds, "running") try { val startedAt = System.nanoTime() - val result = ytDlpService.extractTitle(job.url) + val token = tokenServiceClient.fetchForUrl(job.url) + val result = ytDlpService.download(job.url, token) val durationMs = (System.nanoTime() - startedAt) / 1_000_000 val status = if (result.error == null) JobStatus.DONE else JobStatus.FAILED - val artifact = if (status == JobStatus.DONE) uploadArtifact(job.id, job.url, job.cacheKey, result.title) else null + val artifact = if (status == JobStatus.DONE && result.filePath != null) { + uploadArtifact(job.cacheKey, result.filePath) + } else { + null + } jobsRepository.markFinished( id = id, status = status, @@ -46,6 +53,7 @@ class JobWorker( artifactExpiresAt = artifact?.expiresAt, ) redis.setex(redisJobKey(id), config.jobTtlSeconds, "${status.name.lowercase()}:$durationMs") + result.filePath?.parent?.let { deleteDirectory(it) } } catch (error: Throwable) { jobsRepository.markFinished( id = id, @@ -60,13 +68,27 @@ class JobWorker( } } - private fun uploadArtifact(id: String, url: String, cacheKey: String, title: String): StorageArtifact { + private fun uploadArtifact(cacheKey: String, filePath: java.nio.file.Path): StorageArtifact { val expiresAt = Instant.now().plusSeconds(config.s3ArtifactTtlSeconds) - val objectKey = "cache/$cacheKey.txt" - val payload = "id=$id\nurl=$url\ntitle=$title\n" - storageService.putBytes(objectKey, payload.toByteArray(), "text/plain") + val extension = filePath.fileName.toString().substringAfterLast('.', "bin") + val objectKey = "cache/$cacheKey.$extension" + storageService.putFile(objectKey, filePath, contentType(extension)) return StorageArtifact(objectKey = objectKey, expiresAt = expiresAt) } + private fun contentType(extension: String): String = when (extension.lowercase()) { + "mp4" -> "video/mp4" + "webm" -> "video/webm" + "mkv" -> "video/x-matroska" + "m4a" -> "audio/mp4" + "mp3" -> "audio/mpeg" + else -> "application/octet-stream" + } + + private fun deleteDirectory(dir: java.nio.file.Path) { + if (!Files.exists(dir)) return + Files.walk(dir).sorted(Comparator.reverseOrder()).forEach { Files.deleteIfExists(it) } + } + private fun redisJobKey(id: String): String = "downloader:job:$id" } diff --git a/src/main/kotlin/dev/typetype/downloader/services/SourceUrlResolver.kt b/src/main/kotlin/dev/typetype/downloader/services/SourceUrlResolver.kt new file mode 100644 index 0000000..052792d --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/SourceUrlResolver.kt @@ -0,0 +1,63 @@ +package dev.typetype.downloader.services + +import java.net.URI +import java.net.URLDecoder +import java.nio.charset.StandardCharsets + +object SourceUrlResolver { + private val PRIORITY_KEYS = listOf("url", "v", "u", "target", "video", "href", "q") + private val YOUTUBE_ID_REGEX = Regex("^[A-Za-z0-9_-]{11}$") + + fun resolve(rawUrl: String): String { + var current = rawUrl.trim() + repeat(5) { + val next = unwrapOnce(current) ?: return current + if (next == current) return current + current = next + } + return current + } + + private fun unwrapOnce(url: String): String? { + val uri = runCatching { URI(url) }.getOrNull() ?: return null + val query = uri.rawQuery ?: return null + val params = parseQuery(query) + val ordered = PRIORITY_KEYS.flatMap { key -> params.filter { it.key.equals(key, ignoreCase = true) } } + + params.filter { p -> PRIORITY_KEYS.none { it.equals(p.key, ignoreCase = true) } } + for (param in ordered) { + val candidate = candidateFrom(param.key, decodeRepeated(param.value), uri.host.orEmpty()) ?: continue + if (candidate != url) return candidate + } + return null + } + + private fun candidateFrom(key: String, value: String, host: String): String? { + val normalized = value.trim() + if (normalized.isBlank()) return null + if (normalized.startsWith("http://") || normalized.startsWith("https://")) return normalized + if (normalized.startsWith("www.")) return "https://$normalized" + val isWrappedYoutubeId = key.equals("v", ignoreCase = true) && YOUTUBE_ID_REGEX.matches(normalized) + val fromYoutubeHost = host.contains("youtube.com") || host == "youtu.be" + if (isWrappedYoutubeId && !fromYoutubeHost) return "https://www.youtube.com/watch?v=$normalized" + return null + } + + private fun decodeRepeated(raw: String): String { + var current = raw + repeat(3) { + val decoded = runCatching { URLDecoder.decode(current, StandardCharsets.UTF_8) }.getOrElse { return current } + if (decoded == current) return current + current = decoded + } + return current + } + + private fun parseQuery(query: String): List = query.split('&').filter { it.isNotBlank() }.map { + val parts = it.split('=', limit = 2) + val key = parts[0] + val value = if (parts.size == 2) parts[1] else "" + QueryParam(key = key, value = value) + } + + private data class QueryParam(val key: String, val value: String) +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/TokenPayload.kt b/src/main/kotlin/dev/typetype/downloader/services/TokenPayload.kt new file mode 100644 index 0000000..b03daeb --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/TokenPayload.kt @@ -0,0 +1,6 @@ +package dev.typetype.downloader.services + +data class TokenPayload( + val visitorData: String, + val streamingPot: String, +) diff --git a/src/main/kotlin/dev/typetype/downloader/services/TokenServiceClient.kt b/src/main/kotlin/dev/typetype/downloader/services/TokenServiceClient.kt new file mode 100644 index 0000000..e757e67 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/TokenServiceClient.kt @@ -0,0 +1,48 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.config.AppConfig +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import java.net.URI +import java.net.URLEncoder +import java.net.http.HttpClient +import java.net.http.HttpRequest +import java.net.http.HttpResponse +import java.nio.charset.StandardCharsets +import java.time.Duration + +class TokenServiceClient(private val config: AppConfig) { + private val client = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(3)).build() + private val json = Json { ignoreUnknownKeys = true } + + fun fetchForUrl(url: String): TokenPayload? { + val videoId = extractYoutubeVideoId(SourceUrlResolver.resolve(url)) ?: return null + val encoded = URLEncoder.encode(videoId, StandardCharsets.UTF_8) + val uri = URI("${config.tokenServiceUrl}/potoken?videoId=$encoded") + val request = HttpRequest.newBuilder(uri).timeout(Duration.ofSeconds(10)).GET().build() + val response = runCatching { client.send(request, HttpResponse.BodyHandlers.ofString()) }.getOrNull() ?: return null + if (response.statusCode() != 200) return null + val body = runCatching { json.decodeFromString(TokenResponse.serializer(), response.body()) }.getOrNull() ?: return null + if (body.visitorData.isBlank() || body.streamingPot.isBlank()) return null + return TokenPayload(visitorData = body.visitorData, streamingPot = body.streamingPot) + } + + private fun extractYoutubeVideoId(url: String): String? { + val uri = runCatching { URI(url) }.getOrNull() ?: return null + val host = uri.host?.lowercase() ?: return null + val query = uri.query.orEmpty() + if (host.endsWith("youtube.com")) { + return query.split("&").mapNotNull { + val parts = it.split("=", limit = 2) + if (parts.size == 2 && parts[0] == "v") parts[1] else null + }.firstOrNull { it.isNotBlank() } + } + if (host == "youtu.be") { + return uri.path.trim('/').ifBlank { null } + } + return null + } + + @Serializable + private data class TokenResponse(val visitorData: String = "", val streamingPot: String = "") +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt index b183448..09976b8 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt @@ -1,32 +1,74 @@ package dev.typetype.downloader.services import dev.typetype.downloader.config.AppConfig +import java.nio.file.Files +import java.nio.file.Path import java.util.concurrent.TimeUnit -data class YtDlpResult(val title: String, val error: String?) +data class YtDlpResult( + val title: String, + val filePath: Path?, + val error: String?, +) class YtDlpService(private val config: AppConfig) { - fun extractTitle(url: String): YtDlpResult { - val process = ProcessBuilder( - config.ytdlpBin, - "--skip-download", - "--print", - "title", - "--no-warnings", - url, - ).start() + fun download(url: String, token: TokenPayload?): YtDlpResult { + val workDir = Files.createTempDirectory("typetype-download-") + val process = ProcessBuilder(buildCommand(url, workDir, token)) + .directory(workDir.toFile()) + .redirectErrorStream(true) + .start() val finished = process.waitFor(config.ytdlpTimeoutSeconds, TimeUnit.SECONDS) if (!finished) { process.destroyForcibly() - return YtDlpResult(title = "", error = "yt-dlp timeout") + deleteDirectory(workDir) + return YtDlpResult(title = "", filePath = null, error = "yt-dlp timeout") } - val stdout = process.inputStream.bufferedReader().readText().trim() - val stderr = process.errorStream.bufferedReader().readText().trim() + val output = process.inputStream.bufferedReader().readLines() return if (process.exitValue() == 0) { - YtDlpResult(title = stdout, error = null) + val filePath = Files.list(workDir).use { stream -> + stream.filter { Files.isRegularFile(it) }.findFirst().orElse(null) + } + if (filePath == null) { + deleteDirectory(workDir) + YtDlpResult(title = "", filePath = null, error = "yt-dlp output file missing") + } else { + val title = output.firstOrNull { it.isNotBlank() } ?: filePath.fileName.toString() + YtDlpResult(title = title, filePath = filePath, error = null) + } } else { - val error = if (stderr.isNotBlank()) stderr else "yt-dlp failed" - YtDlpResult(title = "", error = error) + deleteDirectory(workDir) + val error = output.lastOrNull { it.isNotBlank() } ?: "yt-dlp failed" + YtDlpResult(title = "", filePath = null, error = error) } } + + private fun buildCommand(url: String, workDir: Path, token: TokenPayload?): List { + val command = mutableListOf( + config.ytdlpBin, + "--no-warnings", + "--no-playlist", + "--print", + "title", + "-f", + "bv*+ba/b", + "--merge-output-format", + "mp4", + "-o", + "${workDir.toAbsolutePath()}/%(id)s.%(ext)s", + ) + if (token != null) { + command.add("--extractor-args") + command.add( + "youtube:player_client=web;po_token=web.gvs+${token.streamingPot};visitor_data=${token.visitorData}" + ) + } + command.add(url) + return command + } + + private fun deleteDirectory(dir: Path) { + if (!Files.exists(dir)) return + Files.walk(dir).sorted(Comparator.reverseOrder()).forEach { Files.deleteIfExists(it) } + } } From e88264eee6249f18f2db17a55f25122a5504cc88 Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 15:26:15 +0200 Subject: [PATCH 04/16] test: cover wrapped source URL normalization --- README.md | 4 +++ .../services/SourceUrlResolverTest.kt | 26 +++++++++++++++++++ 2 files changed, 30 insertions(+) create mode 100644 src/test/kotlin/dev/typetype/downloader/services/SourceUrlResolverTest.kt diff --git a/README.md b/README.md index 3f07e76..5071a5b 100644 --- a/README.md +++ b/README.md @@ -92,6 +92,10 @@ All configuration is via environment variables. - `GET /health` - `POST /jobs` with `{ "url": "..." }` returns `{ "id": "...", "cached": false|true }` - `GET /jobs/{id}` returns one of `queued|running|done|failed` and includes a signed `artifactUrl` when available +- `GET /jobs/{id}/artifact` redirects to signed Garage artifact URL when ready + +Wrapper URLs are resolved automatically. For example, frontend watch wrappers such as +`https://watch.example/watch?v=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3D...` are normalized to the underlying source URL before processing. ## License diff --git a/src/test/kotlin/dev/typetype/downloader/services/SourceUrlResolverTest.kt b/src/test/kotlin/dev/typetype/downloader/services/SourceUrlResolverTest.kt new file mode 100644 index 0000000..0c6621c --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/services/SourceUrlResolverTest.kt @@ -0,0 +1,26 @@ +package dev.typetype.downloader.services + +import kotlin.test.Test +import kotlin.test.assertEquals + +class SourceUrlResolverTest { + @Test + fun `keeps plain youtube url unchanged`() { + val url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + assertEquals(url, SourceUrlResolver.resolve(url)) + } + + @Test + fun `unwraps encoded watch wrapper`() { + val wrapped = "https://watch.invalid/watch?v=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3Dg3AI_ZkMv1I" + val expected = "https://www.youtube.com/watch?v=g3AI_ZkMv1I" + assertEquals(expected, SourceUrlResolver.resolve(wrapped)) + } + + @Test + fun `unwraps encoded player wrapper`() { + val wrapped = "https://player.invalid/watch?v=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3D7fqWBwErkxA" + val expected = "https://www.youtube.com/watch?v=7fqWBwErkxA" + assertEquals(expected, SourceUrlResolver.resolve(wrapped)) + } +} From 7e2dd973cf7bfafe1c698ee06a61434603066a92 Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 16:12:21 +0200 Subject: [PATCH 05/16] feat: add download options for mode, sponsorblock, and subtitles --- README.md | 8 ++- .../downloader/models/CreateJobRequest.kt | 5 +- .../downloader/models/DownloadMode.kt | 13 ++++ .../typetype/downloader/models/JobOptions.kt | 11 +++ .../downloader/models/SubtitlesOptions.kt | 12 ++++ .../typetype/downloader/routes/JobRoutes.kt | 2 +- .../downloader/services/JobCacheKey.kt | 5 +- .../downloader/services/JobOptionsCodec.kt | 25 +++++++ .../services/JobOptionsNormalizer.kt | 17 +++++ .../downloader/services/JobService.kt | 10 ++- .../typetype/downloader/services/JobWorker.kt | 12 ++-- .../downloader/services/YtDlpService.kt | 69 ++++++++++++------- .../downloader/services/JobCacheKeyTest.kt | 19 +++-- .../services/JobOptionsNormalizerTest.kt | 22 ++++++ 14 files changed, 188 insertions(+), 42 deletions(-) create mode 100644 src/main/kotlin/dev/typetype/downloader/models/DownloadMode.kt create mode 100644 src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt create mode 100644 src/main/kotlin/dev/typetype/downloader/models/SubtitlesOptions.kt create mode 100644 src/main/kotlin/dev/typetype/downloader/services/JobOptionsCodec.kt create mode 100644 src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt create mode 100644 src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt diff --git a/README.md b/README.md index 5071a5b..41bbad6 100644 --- a/README.md +++ b/README.md @@ -90,7 +90,13 @@ All configuration is via environment variables. ## API - `GET /health` -- `POST /jobs` with `{ "url": "..." }` returns `{ "id": "...", "cached": false|true }` +- `POST /jobs` accepts: + - `url` (required) + - `options.mode` (`video` or `audio`) + - `options.sponsorBlock` (`true`/`false`) + - `options.thumbnailOnly` (`true`/`false`) + - `options.subtitles` (`enabled`, `auto`, `embed`, `languages`, `format`) + and returns `{ "id": "...", "cached": false|true }` - `GET /jobs/{id}` returns one of `queued|running|done|failed` and includes a signed `artifactUrl` when available - `GET /jobs/{id}/artifact` redirects to signed Garage artifact URL when ready diff --git a/src/main/kotlin/dev/typetype/downloader/models/CreateJobRequest.kt b/src/main/kotlin/dev/typetype/downloader/models/CreateJobRequest.kt index e672e91..b17d2e0 100644 --- a/src/main/kotlin/dev/typetype/downloader/models/CreateJobRequest.kt +++ b/src/main/kotlin/dev/typetype/downloader/models/CreateJobRequest.kt @@ -3,4 +3,7 @@ package dev.typetype.downloader.models import kotlinx.serialization.Serializable @Serializable -data class CreateJobRequest(val url: String) +data class CreateJobRequest( + val url: String, + val options: JobOptions = JobOptions(), +) diff --git a/src/main/kotlin/dev/typetype/downloader/models/DownloadMode.kt b/src/main/kotlin/dev/typetype/downloader/models/DownloadMode.kt new file mode 100644 index 0000000..ba2fa36 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/models/DownloadMode.kt @@ -0,0 +1,13 @@ +package dev.typetype.downloader.models + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +@Serializable +enum class DownloadMode { + @SerialName("video") + VIDEO, + + @SerialName("audio") + AUDIO, +} diff --git a/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt b/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt new file mode 100644 index 0000000..e421612 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt @@ -0,0 +1,11 @@ +package dev.typetype.downloader.models + +import kotlinx.serialization.Serializable + +@Serializable +data class JobOptions( + val mode: DownloadMode = DownloadMode.VIDEO, + val sponsorBlock: Boolean = false, + val thumbnailOnly: Boolean = false, + val subtitles: SubtitlesOptions = SubtitlesOptions(), +) diff --git a/src/main/kotlin/dev/typetype/downloader/models/SubtitlesOptions.kt b/src/main/kotlin/dev/typetype/downloader/models/SubtitlesOptions.kt new file mode 100644 index 0000000..ee67347 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/models/SubtitlesOptions.kt @@ -0,0 +1,12 @@ +package dev.typetype.downloader.models + +import kotlinx.serialization.Serializable + +@Serializable +data class SubtitlesOptions( + val enabled: Boolean = false, + val auto: Boolean = false, + val embed: Boolean = false, + val languages: List = listOf("en"), + val format: String = "srt", +) diff --git a/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt b/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt index 85b7941..c02f6fd 100644 --- a/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt +++ b/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt @@ -16,7 +16,7 @@ fun Route.jobRoutes(jobService: JobService) { if (body.url.isBlank()) { return@post call.respond(HttpStatusCode.BadRequest, mapOf("error" to "url is required")) } - val created = jobService.enqueue(body.url) + val created = jobService.enqueue(body.url, body.options) call.respond(HttpStatusCode.Created, created) } diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobCacheKey.kt b/src/main/kotlin/dev/typetype/downloader/services/JobCacheKey.kt index 2f80c1c..14c9277 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobCacheKey.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobCacheKey.kt @@ -3,9 +3,10 @@ package dev.typetype.downloader.services import java.security.MessageDigest object JobCacheKey { - fun fromUrl(url: String): String { + fun from(url: String, optionsJson: String): String { val digest = MessageDigest.getInstance("SHA-256") - val bytes = digest.digest(url.trim().toByteArray()) + val payload = "${url.trim()}|$optionsJson" + val bytes = digest.digest(payload.toByteArray()) return bytes.joinToString("") { "%02x".format(it) } } } diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobOptionsCodec.kt b/src/main/kotlin/dev/typetype/downloader/services/JobOptionsCodec.kt new file mode 100644 index 0000000..fef0a5a --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/JobOptionsCodec.kt @@ -0,0 +1,25 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.models.JobOptions +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json + +object JobOptionsCodec { + private val json = Json { ignoreUnknownKeys = true; encodeDefaults = true } + + fun encode(options: JobOptions): String = json.encodeToString(JobOptions.serializer(), options) + + fun decode(raw: String): JobOptions = json.decodeFromString(JobOptions.serializer(), raw) + + fun encodeQueue(payload: QueuePayload): String = json.encodeToString(QueuePayload.serializer(), payload) + + fun decodeQueue(raw: String): QueuePayload? = runCatching { + json.decodeFromString(QueuePayload.serializer(), raw) + }.getOrNull() + + @Serializable + data class QueuePayload( + val id: String, + val options: JobOptions, + ) +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt b/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt new file mode 100644 index 0000000..9370790 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt @@ -0,0 +1,17 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.models.JobOptions +import dev.typetype.downloader.models.SubtitlesOptions + +object JobOptionsNormalizer { + fun normalize(options: JobOptions): JobOptions { + val subtitles = normalizeSubtitles(options.subtitles) + return options.copy(subtitles = subtitles) + } + + private fun normalizeSubtitles(input: SubtitlesOptions): SubtitlesOptions { + val langs = input.languages.map { it.trim() }.filter { it.isNotBlank() }.ifEmpty { listOf("en") } + val format = input.format.trim().ifBlank { "srt" }.lowercase() + return input.copy(languages = langs, format = format) + } +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt index 7500747..818ac6b 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt @@ -4,6 +4,7 @@ import dev.typetype.downloader.config.AppConfig import dev.typetype.downloader.db.JobRow import dev.typetype.downloader.db.JobsRepository import dev.typetype.downloader.models.CreateJobResponse +import dev.typetype.downloader.models.JobOptions import dev.typetype.downloader.models.JobResponse import redis.clients.jedis.JedisPooled import java.net.URI @@ -17,10 +18,12 @@ class JobService( private val storageService: GarageStorageService, private val config: AppConfig, ) { - fun enqueue(url: String): CreateJobResponse { + fun enqueue(url: String, requestedOptions: JobOptions): CreateJobResponse { val resolvedUrl = SourceUrlResolver.resolve(url) validateUrl(resolvedUrl) - val cacheKey = JobCacheKey.fromUrl(resolvedUrl) + val options = JobOptionsNormalizer.normalize(requestedOptions) + val optionsJson = JobOptionsCodec.encode(options) + val cacheKey = JobCacheKey.from(resolvedUrl, optionsJson) val id = UUID.randomUUID().toString() val reusable = jobsRepository.findReusableByCacheKey(cacheKey) if (reusable != null) { @@ -33,7 +36,8 @@ class JobService( throw QueueSaturatedException("Queue is full") } jobsRepository.insertQueued(id = id, url = resolvedUrl, cacheKey = cacheKey) - redis.rpush(config.redisQueueKey, id) + val payload = JobOptionsCodec.encodeQueue(JobOptionsCodec.QueuePayload(id = id, options = options)) + redis.rpush(config.redisQueueKey, payload) redis.setex(redisJobKey(id), config.jobTtlSeconds, "queued") return CreateJobResponse(id = id, cached = false) } diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt b/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt index 5fce4f7..6e2e17e 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt @@ -2,6 +2,7 @@ package dev.typetype.downloader.services import dev.typetype.downloader.config.AppConfig import dev.typetype.downloader.db.JobsRepository +import dev.typetype.downloader.models.JobOptions import dev.typetype.downloader.models.JobStatus import redis.clients.jedis.JedisPooled import java.nio.file.Files @@ -21,21 +22,24 @@ class JobWorker( thread(name = "job-worker-$index", isDaemon = true) { while (true) { val item = redis.blpop(0, config.redisQueueKey) ?: continue - val id = item.getOrNull(1) ?: continue - process(id) + val raw = item.getOrNull(1) ?: continue + val payload = JobOptionsCodec.decodeQueue(raw) + val id = payload?.id ?: raw + val options = payload?.options ?: JobOptions() + process(id, options) } } } } - private fun process(id: String) { + private fun process(id: String, options: JobOptions) { val job = jobsRepository.getById(id) ?: return jobsRepository.markRunning(id) redis.setex(redisJobKey(id), config.jobTtlSeconds, "running") try { val startedAt = System.nanoTime() val token = tokenServiceClient.fetchForUrl(job.url) - val result = ytDlpService.download(job.url, token) + val result = ytDlpService.download(job.url, token, options) val durationMs = (System.nanoTime() - startedAt) / 1_000_000 val status = if (result.error == null) JobStatus.DONE else JobStatus.FAILED val artifact = if (status == JobStatus.DONE && result.filePath != null) { diff --git a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt index 09976b8..b69e583 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt @@ -1,6 +1,8 @@ package dev.typetype.downloader.services import dev.typetype.downloader.config.AppConfig +import dev.typetype.downloader.models.DownloadMode +import dev.typetype.downloader.models.JobOptions import java.nio.file.Files import java.nio.file.Path import java.util.concurrent.TimeUnit @@ -12,9 +14,9 @@ data class YtDlpResult( ) class YtDlpService(private val config: AppConfig) { - fun download(url: String, token: TokenPayload?): YtDlpResult { + fun download(url: String, token: TokenPayload?, options: JobOptions): YtDlpResult { val workDir = Files.createTempDirectory("typetype-download-") - val process = ProcessBuilder(buildCommand(url, workDir, token)) + val process = ProcessBuilder(buildCommand(url, workDir, token, options)) .directory(workDir.toFile()) .redirectErrorStream(true) .start() @@ -25,48 +27,69 @@ class YtDlpService(private val config: AppConfig) { return YtDlpResult(title = "", filePath = null, error = "yt-dlp timeout") } val output = process.inputStream.bufferedReader().readLines() - return if (process.exitValue() == 0) { - val filePath = Files.list(workDir).use { stream -> - stream.filter { Files.isRegularFile(it) }.findFirst().orElse(null) - } - if (filePath == null) { - deleteDirectory(workDir) - YtDlpResult(title = "", filePath = null, error = "yt-dlp output file missing") - } else { - val title = output.firstOrNull { it.isNotBlank() } ?: filePath.fileName.toString() - YtDlpResult(title = title, filePath = filePath, error = null) - } - } else { + if (process.exitValue() != 0) { deleteDirectory(workDir) val error = output.lastOrNull { it.isNotBlank() } ?: "yt-dlp failed" - YtDlpResult(title = "", filePath = null, error = error) + return YtDlpResult(title = "", filePath = null, error = error) } + val filePath = selectOutputFile(workDir, options) + if (filePath == null) { + deleteDirectory(workDir) + return YtDlpResult(title = "", filePath = null, error = "yt-dlp output file missing") + } + val title = output.firstOrNull { isTitleLine(it) } ?: filePath.fileName.toString() + return YtDlpResult(title = title, filePath = filePath, error = null) } - private fun buildCommand(url: String, workDir: Path, token: TokenPayload?): List { + private fun buildCommand(url: String, workDir: Path, token: TokenPayload?, options: JobOptions): List { val command = mutableListOf( config.ytdlpBin, "--no-warnings", "--no-playlist", + "--no-progress", "--print", "title", - "-f", - "bv*+ba/b", - "--merge-output-format", - "mp4", "-o", "${workDir.toAbsolutePath()}/%(id)s.%(ext)s", ) + when { + options.thumbnailOnly -> command.addAll(listOf("--skip-download", "--write-thumbnail")) + options.mode == DownloadMode.AUDIO -> command.addAll(listOf("-f", "bestaudio/best", "--extract-audio", "--audio-format", "mp3")) + else -> command.addAll(listOf("-f", "bv*+ba/b", "--merge-output-format", "mp4")) + } + if (options.sponsorBlock && !options.thumbnailOnly) { + command.addAll(listOf("--sponsorblock-remove", "sponsor,selfpromo,interaction,intro,outro,preview,filler,music_offtopic")) + } + if (options.subtitles.enabled) { + command.add("--write-subs") + if (options.subtitles.auto) command.add("--write-auto-subs") + command.addAll(listOf("--sub-langs", options.subtitles.languages.joinToString(","))) + command.addAll(listOf("--sub-format", options.subtitles.format)) + if (options.subtitles.embed && options.mode == DownloadMode.VIDEO && !options.thumbnailOnly) command.add("--embed-subs") + } if (token != null) { command.add("--extractor-args") - command.add( - "youtube:player_client=web;po_token=web.gvs+${token.streamingPot};visitor_data=${token.visitorData}" - ) + command.add("youtube:player_client=web;po_token=web.gvs+${token.streamingPot};visitor_data=${token.visitorData}") } command.add(url) return command } + private fun selectOutputFile(workDir: Path, options: JobOptions): Path? { + val files = Files.list(workDir).use { stream -> stream.filter { Files.isRegularFile(it) }.toList() } + if (files.isEmpty()) return null + val preferred = when { + options.thumbnailOnly -> setOf("jpg", "jpeg", "png", "webp") + options.mode == DownloadMode.AUDIO -> setOf("mp3", "m4a", "opus", "aac", "flac", "wav", "webm") + else -> setOf("mp4", "mkv", "webm", "mov") + } + return files.firstOrNull { ext(it) in preferred } ?: files.maxByOrNull { Files.size(it) } + } + + private fun isTitleLine(value: String): Boolean = value.isNotBlank() && !value.startsWith("[") + + private fun ext(path: Path): String = path.fileName.toString().substringAfterLast('.', "").lowercase() + private fun deleteDirectory(dir: Path) { if (!Files.exists(dir)) return Files.walk(dir).sorted(Comparator.reverseOrder()).forEach { Files.deleteIfExists(it) } diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobCacheKeyTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobCacheKeyTest.kt index 28f4be8..04b44cf 100644 --- a/src/test/kotlin/dev/typetype/downloader/services/JobCacheKeyTest.kt +++ b/src/test/kotlin/dev/typetype/downloader/services/JobCacheKeyTest.kt @@ -1,5 +1,7 @@ package dev.typetype.downloader.services +import dev.typetype.downloader.models.DownloadMode +import dev.typetype.downloader.models.JobOptions import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertNotEquals @@ -8,22 +10,25 @@ class JobCacheKeyTest { @Test fun `same URL yields same key`() { val url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" - val first = JobCacheKey.fromUrl(url) - val second = JobCacheKey.fromUrl(url) + val options = JobOptionsCodec.encode(JobOptions()) + val first = JobCacheKey.from(url, options) + val second = JobCacheKey.from(url, options) assertEquals(first, second) } @Test fun `trimmed URL keeps same key`() { - val clean = JobCacheKey.fromUrl("https://www.youtube.com/watch?v=dQw4w9WgXcQ") - val padded = JobCacheKey.fromUrl(" https://www.youtube.com/watch?v=dQw4w9WgXcQ ") + val options = JobOptionsCodec.encode(JobOptions()) + val clean = JobCacheKey.from("https://www.youtube.com/watch?v=dQw4w9WgXcQ", options) + val padded = JobCacheKey.from(" https://www.youtube.com/watch?v=dQw4w9WgXcQ ", options) assertEquals(clean, padded) } @Test - fun `different URLs yield different keys`() { - val first = JobCacheKey.fromUrl("https://www.youtube.com/watch?v=dQw4w9WgXcQ") - val second = JobCacheKey.fromUrl("https://www.youtube.com/watch?v=oHg5SJYRHA0") + fun `different options yield different keys`() { + val url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + val first = JobCacheKey.from(url, JobOptionsCodec.encode(JobOptions(mode = DownloadMode.VIDEO))) + val second = JobCacheKey.from(url, JobOptionsCodec.encode(JobOptions(mode = DownloadMode.AUDIO))) assertNotEquals(first, second) } } diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt new file mode 100644 index 0000000..e536e35 --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt @@ -0,0 +1,22 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.models.JobOptions +import dev.typetype.downloader.models.SubtitlesOptions +import kotlin.test.Test +import kotlin.test.assertEquals + +class JobOptionsNormalizerTest { + @Test + fun `normalizes subtitle languages and format`() { + val input = JobOptions( + subtitles = SubtitlesOptions( + enabled = true, + languages = listOf(" en ", "", "fr"), + format = " VTT ", + ) + ) + val normalized = JobOptionsNormalizer.normalize(input) + assertEquals(listOf("en", "fr"), normalized.subtitles.languages) + assertEquals("vtt", normalized.subtitles.format) + } +} From 3758349487f650ce95d2abb2b5fc8b8289df0439 Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 16:18:28 +0200 Subject: [PATCH 06/16] feat: allow custom sponsorblock category selection --- README.md | 1 + .../typetype/downloader/models/JobOptions.kt | 1 + .../services/JobOptionsNormalizer.kt | 25 ++++++++++++++++++- .../downloader/services/YtDlpService.kt | 2 +- .../services/JobOptionsNormalizerTest.kt | 19 ++++++++++++++ 5 files changed, 46 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 41bbad6..9c22e94 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,7 @@ All configuration is via environment variables. - `url` (required) - `options.mode` (`video` or `audio`) - `options.sponsorBlock` (`true`/`false`) + - `options.sponsorBlockCategories` (`sponsor,selfpromo,interaction,intro,outro,preview,filler,music_offtopic`) - `options.thumbnailOnly` (`true`/`false`) - `options.subtitles` (`enabled`, `auto`, `embed`, `languages`, `format`) and returns `{ "id": "...", "cached": false|true }` diff --git a/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt b/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt index e421612..5421c7c 100644 --- a/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt +++ b/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt @@ -6,6 +6,7 @@ import kotlinx.serialization.Serializable data class JobOptions( val mode: DownloadMode = DownloadMode.VIDEO, val sponsorBlock: Boolean = false, + val sponsorBlockCategories: List = emptyList(), val thumbnailOnly: Boolean = false, val subtitles: SubtitlesOptions = SubtitlesOptions(), ) diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt b/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt index 9370790..8052374 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt @@ -4,9 +4,23 @@ import dev.typetype.downloader.models.JobOptions import dev.typetype.downloader.models.SubtitlesOptions object JobOptionsNormalizer { + private val defaultSponsorBlockCategories = listOf( + "sponsor", + "selfpromo", + "interaction", + "intro", + "outro", + "preview", + "filler", + "music_offtopic", + ) + + private val allowedSponsorBlockCategories = defaultSponsorBlockCategories.toSet() + fun normalize(options: JobOptions): JobOptions { val subtitles = normalizeSubtitles(options.subtitles) - return options.copy(subtitles = subtitles) + val sponsorBlockCategories = normalizeSponsorBlockCategories(options) + return options.copy(subtitles = subtitles, sponsorBlockCategories = sponsorBlockCategories) } private fun normalizeSubtitles(input: SubtitlesOptions): SubtitlesOptions { @@ -14,4 +28,13 @@ object JobOptionsNormalizer { val format = input.format.trim().ifBlank { "srt" }.lowercase() return input.copy(languages = langs, format = format) } + + private fun normalizeSponsorBlockCategories(options: JobOptions): List { + if (!options.sponsorBlock) return emptyList() + val custom = options.sponsorBlockCategories + .map { it.trim().lowercase() } + .filter { it in allowedSponsorBlockCategories } + .distinct() + return if (custom.isEmpty()) defaultSponsorBlockCategories else custom + } } diff --git a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt index b69e583..df640b6 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt @@ -58,7 +58,7 @@ class YtDlpService(private val config: AppConfig) { else -> command.addAll(listOf("-f", "bv*+ba/b", "--merge-output-format", "mp4")) } if (options.sponsorBlock && !options.thumbnailOnly) { - command.addAll(listOf("--sponsorblock-remove", "sponsor,selfpromo,interaction,intro,outro,preview,filler,music_offtopic")) + command.addAll(listOf("--sponsorblock-remove", options.sponsorBlockCategories.joinToString(","))) } if (options.subtitles.enabled) { command.add("--write-subs") diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt index e536e35..7ec417e 100644 --- a/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt +++ b/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt @@ -19,4 +19,23 @@ class JobOptionsNormalizerTest { assertEquals(listOf("en", "fr"), normalized.subtitles.languages) assertEquals("vtt", normalized.subtitles.format) } + + @Test + fun `fills default sponsorblock categories when enabled and empty`() { + val normalized = JobOptionsNormalizer.normalize(JobOptions(sponsorBlock = true)) + assertEquals( + listOf("sponsor", "selfpromo", "interaction", "intro", "outro", "preview", "filler", "music_offtopic"), + normalized.sponsorBlockCategories, + ) + } + + @Test + fun `filters and normalizes custom sponsorblock categories`() { + val input = JobOptions( + sponsorBlock = true, + sponsorBlockCategories = listOf(" Sponsor ", "intro", "invalid", "INTRO"), + ) + val normalized = JobOptionsNormalizer.normalize(input) + assertEquals(listOf("sponsor", "intro"), normalized.sponsorBlockCategories) + } } From 3d0b34d4340c312c9d26e62ae024e60922f130c1 Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 19:13:51 +0200 Subject: [PATCH 07/16] feat: add cancellable job lifecycle and startup recovery --- .../typetype/downloader/ApplicationModule.kt | 1 + .../dev/typetype/downloader/db/Database.kt | 4 + .../dev/typetype/downloader/db/JobRow.kt | 1 + .../typetype/downloader/db/JobRowMapper.kt | 1 + .../typetype/downloader/db/JobsRepository.kt | 161 +++++++----------- .../typetype/downloader/models/JobOptions.kt | 2 + .../typetype/downloader/routes/JobRoutes.kt | 22 +++ .../services/GarageStorageService.kt | 6 + .../downloader/services/JobService.kt | 41 ++++- .../typetype/downloader/services/JobWorker.kt | 24 ++- .../downloader/services/YtDlpService.kt | 35 +++- 11 files changed, 185 insertions(+), 113 deletions(-) diff --git a/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt b/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt index 15b8203..c554b00 100644 --- a/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt +++ b/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt @@ -34,6 +34,7 @@ fun Application.module() { val tokenServiceClient = TokenServiceClient(config) val jobService = JobService(jobsRepository, redis, storage, config) val worker = JobWorker(jobsRepository, redis, ytDlpService, tokenServiceClient, storage, config) + jobService.recoverPendingJobs() worker.start() install(CallLogging) diff --git a/src/main/kotlin/dev/typetype/downloader/db/Database.kt b/src/main/kotlin/dev/typetype/downloader/db/Database.kt index 12e760c..221814d 100644 --- a/src/main/kotlin/dev/typetype/downloader/db/Database.kt +++ b/src/main/kotlin/dev/typetype/downloader/db/Database.kt @@ -26,6 +26,7 @@ object Database { id TEXT PRIMARY KEY, source_url TEXT NOT NULL, cache_key TEXT NOT NULL, + options_json TEXT NOT NULL DEFAULT '{}', status TEXT NOT NULL, duration_ms BIGINT NOT NULL, title TEXT NOT NULL, @@ -39,10 +40,13 @@ object Database { """.trimIndent() ) statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS cache_key TEXT") + statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS options_json TEXT") statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS artifact_key TEXT") statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS artifact_expires_at TIMESTAMPTZ") statement.execute("UPDATE jobs SET cache_key = id WHERE cache_key IS NULL") + statement.execute("UPDATE jobs SET options_json = '{}' WHERE options_json IS NULL") statement.execute("ALTER TABLE jobs ALTER COLUMN cache_key SET NOT NULL") + statement.execute("ALTER TABLE jobs ALTER COLUMN options_json SET NOT NULL") statement.execute("CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)") statement.execute("CREATE INDEX IF NOT EXISTS idx_jobs_cache_key ON jobs(cache_key)") statement.execute("CREATE INDEX IF NOT EXISTS idx_jobs_artifact_expiry ON jobs(artifact_expires_at)") diff --git a/src/main/kotlin/dev/typetype/downloader/db/JobRow.kt b/src/main/kotlin/dev/typetype/downloader/db/JobRow.kt index eb28c61..e793c98 100644 --- a/src/main/kotlin/dev/typetype/downloader/db/JobRow.kt +++ b/src/main/kotlin/dev/typetype/downloader/db/JobRow.kt @@ -7,6 +7,7 @@ data class JobRow( val id: String, val url: String, val cacheKey: String, + val optionsJson: String, val status: JobStatus, val durationMs: Long, val title: String, diff --git a/src/main/kotlin/dev/typetype/downloader/db/JobRowMapper.kt b/src/main/kotlin/dev/typetype/downloader/db/JobRowMapper.kt index 54a5b61..c2d6adf 100644 --- a/src/main/kotlin/dev/typetype/downloader/db/JobRowMapper.kt +++ b/src/main/kotlin/dev/typetype/downloader/db/JobRowMapper.kt @@ -7,6 +7,7 @@ fun rowFrom(rs: ResultSet): JobRow = JobRow( id = rs.getString("id"), url = rs.getString("source_url"), cacheKey = rs.getString("cache_key"), + optionsJson = rs.getString("options_json"), status = JobStatus.valueOf(rs.getString("status")), durationMs = rs.getLong("duration_ms"), title = rs.getString("title"), diff --git a/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt b/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt index b5f3683..5d979bc 100644 --- a/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt +++ b/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt @@ -4,115 +4,78 @@ import dev.typetype.downloader.models.JobStatus import java.time.Instant class JobsRepository { - fun insertQueued(id: String, url: String, cacheKey: String) { + private val columns = "id, source_url, cache_key, options_json, status, duration_ms, title, error, artifact_key, artifact_expires_at" + + fun insertQueued(id: String, url: String, cacheKey: String, optionsJson: String) { + val sql = "INSERT INTO jobs (id, source_url, cache_key, options_json, status, duration_ms, title, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" Database.withConnection { connection -> - connection.prepareStatement( - """ - INSERT INTO jobs (id, source_url, cache_key, status, duration_ms, title, error) - VALUES (?, ?, ?, ?, ?, ?, ?) - """.trimIndent() - ).use { statement -> - statement.setString(1, id) - statement.setString(2, url) - statement.setString(3, cacheKey) - statement.setString(4, JobStatus.QUEUED.name) - statement.setLong(5, 0L) - statement.setString(6, "") - statement.setString(7, null) - statement.executeUpdate() + connection.prepareStatement(sql).use { + it.setString(1, id); it.setString(2, url); it.setString(3, cacheKey); it.setString(4, optionsJson) + it.setString(5, JobStatus.QUEUED.name); it.setLong(6, 0L); it.setString(7, ""); it.setString(8, null) + it.executeUpdate() } } } - fun insertDoneFromCache(id: String, url: String, cached: JobRow) { + + fun insertDoneFromCache(id: String, url: String, optionsJson: String, cached: JobRow) { + val sql = "INSERT INTO jobs (id, source_url, cache_key, options_json, status, duration_ms, title, error, artifact_key, artifact_expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" Database.withConnection { connection -> - connection.prepareStatement( - """ - INSERT INTO jobs (id, source_url, cache_key, status, duration_ms, title, error, artifact_key, artifact_expires_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - """.trimIndent() - ).use { statement -> - statement.setString(1, id) - statement.setString(2, url) - statement.setString(3, cached.cacheKey) - statement.setString(4, JobStatus.DONE.name) - statement.setLong(5, 0L) - statement.setString(6, cached.title) - statement.setString(7, null) - statement.setString(8, cached.artifactKey) - statement.setObject(9, cached.artifactExpiresAt) - statement.executeUpdate() - } - } - } - fun findReusableByCacheKey(cacheKey: String): JobRow? = Database.withConnection { connection -> - connection.prepareStatement( - """ - SELECT id, source_url, cache_key, status, duration_ms, title, error, artifact_key, artifact_expires_at - FROM jobs - WHERE cache_key = ? AND status = ? AND artifact_key IS NOT NULL AND artifact_expires_at > NOW() - ORDER BY finished_at DESC NULLS LAST - LIMIT 1 - """.trimIndent() - ).use { statement -> - statement.setString(1, cacheKey) - statement.setString(2, JobStatus.DONE.name) - statement.executeQuery().use { rs -> - if (!rs.next()) return@use null - rowFrom(rs) + connection.prepareStatement(sql).use { + it.setString(1, id); it.setString(2, url); it.setString(3, cached.cacheKey); it.setString(4, optionsJson) + it.setString(5, JobStatus.DONE.name); it.setLong(6, 0L); it.setString(7, cached.title); it.setString(8, null) + it.setString(9, cached.artifactKey); it.setObject(10, cached.artifactExpiresAt); it.executeUpdate() } } } - fun getById(id: String): JobRow? = Database.withConnection { connection -> - connection.prepareStatement( - """ - SELECT id, source_url, cache_key, status, duration_ms, title, error, artifact_key, artifact_expires_at - FROM jobs - WHERE id = ? - """.trimIndent() - ).use { statement -> - statement.setString(1, id) - statement.executeQuery().use { rs -> - if (!rs.next()) return@use null - rowFrom(rs) - } - } + + fun findReusableByCacheKey(cacheKey: String): JobRow? = selectOne( + "SELECT $columns FROM jobs WHERE cache_key = ? AND status = ? AND artifact_key IS NOT NULL AND artifact_expires_at > NOW() ORDER BY finished_at DESC NULLS LAST LIMIT 1", + ) { it.setString(1, cacheKey); it.setString(2, JobStatus.DONE.name) } + + fun getById(id: String): JobRow? = selectOne("SELECT $columns FROM jobs WHERE id = ?") { it.setString(1, id) } + + fun listQueuedOrRunning(): List = selectMany( + "SELECT $columns FROM jobs WHERE status = ? OR status = ? ORDER BY created_at ASC", + ) { it.setString(1, JobStatus.QUEUED.name); it.setString(2, JobStatus.RUNNING.name) } + + fun markRunningIfQueued(id: String): Boolean = update( + "UPDATE jobs SET status = ?, started_at = NOW(), error = NULL WHERE id = ? AND status = ?", + ) { it.setString(1, JobStatus.RUNNING.name); it.setString(2, id); it.setString(3, JobStatus.QUEUED.name) } > 0 + + fun markCancelled(id: String): Boolean = update( + "UPDATE jobs SET status = ?, error = ?, finished_at = NOW() WHERE id = ? AND status IN (?, ?)", + ) { it.setString(1, JobStatus.FAILED.name); it.setString(2, "job cancelled"); it.setString(3, id); it.setString(4, JobStatus.QUEUED.name); it.setString(5, JobStatus.RUNNING.name) } > 0 + + fun deleteIfNotRunning(id: String): JobRow? = selectOne( + "DELETE FROM jobs WHERE id = ? AND status <> ? RETURNING $columns", + ) { it.setString(1, id); it.setString(2, JobStatus.RUNNING.name) } + + fun markFinishedIfRunning(id: String, status: JobStatus, durationMs: Long, title: String, error: String?, artifactKey: String?, artifactExpiresAt: Instant?): Boolean = update( + "UPDATE jobs SET status = ?, duration_ms = ?, title = ?, error = ?, artifact_key = ?, artifact_expires_at = ?, finished_at = NOW() WHERE id = ? AND status = ?", + ) { + it.setString(1, status.name); it.setLong(2, durationMs); it.setString(3, title); it.setString(4, error) + it.setString(5, artifactKey); it.setObject(6, artifactExpiresAt); it.setString(7, id); it.setString(8, JobStatus.RUNNING.name) + } > 0 + + fun resetRunningToQueued(): Int = update( + "UPDATE jobs SET status = ?, error = NULL WHERE status = ?", + ) { it.setString(1, JobStatus.QUEUED.name); it.setString(2, JobStatus.RUNNING.name) } + + private fun update(sql: String, bind: (java.sql.PreparedStatement) -> Unit): Int = Database.withConnection { connection -> + connection.prepareStatement(sql).use { bind(it); it.executeUpdate() } } - fun markRunning(id: String) { - Database.withConnection { connection -> - connection.prepareStatement( - "UPDATE jobs SET status = ?, started_at = NOW() WHERE id = ?" - ).use { statement -> - statement.setString(1, JobStatus.RUNNING.name) - statement.setString(2, id) - statement.executeUpdate() - } - } + + private fun selectOne(sql: String, bind: (java.sql.PreparedStatement) -> Unit): JobRow? = Database.withConnection { connection -> + connection.prepareStatement(sql).use { bind(it); it.executeQuery().use { rs -> if (rs.next()) rowFrom(rs) else null } } } - fun markFinished( - id: String, - status: JobStatus, - durationMs: Long, - title: String, - error: String?, - artifactKey: String?, - artifactExpiresAt: Instant?, - ) { - Database.withConnection { connection -> - connection.prepareStatement( - """ - UPDATE jobs - SET status = ?, duration_ms = ?, title = ?, error = ?, artifact_key = ?, artifact_expires_at = ?, finished_at = NOW() - WHERE id = ? - """.trimIndent() - ).use { statement -> - statement.setString(1, status.name) - statement.setLong(2, durationMs) - statement.setString(3, title) - statement.setString(4, error) - statement.setString(5, artifactKey) - statement.setObject(6, artifactExpiresAt) - statement.setString(7, id) - statement.executeUpdate() + + private fun selectMany(sql: String, bind: (java.sql.PreparedStatement) -> Unit): List = Database.withConnection { connection -> + connection.prepareStatement(sql).use { + bind(it) + it.executeQuery().use { rs -> + val rows = mutableListOf() + while (rs.next()) rows += rowFrom(rs) + rows } } } diff --git a/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt b/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt index 5421c7c..d84285c 100644 --- a/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt +++ b/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt @@ -5,6 +5,8 @@ import kotlinx.serialization.Serializable @Serializable data class JobOptions( val mode: DownloadMode = DownloadMode.VIDEO, + val quality: String = "best", + val format: String = "", val sponsorBlock: Boolean = false, val sponsorBlockCategories: List = emptyList(), val thumbnailOnly: Boolean = false, diff --git a/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt b/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt index c02f6fd..80331c0 100644 --- a/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt +++ b/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt @@ -1,11 +1,15 @@ package dev.typetype.downloader.routes import dev.typetype.downloader.models.CreateJobRequest +import dev.typetype.downloader.services.CancelJobResult +import dev.typetype.downloader.services.DeleteJobResult import dev.typetype.downloader.services.JobService import io.ktor.http.HttpStatusCode import io.ktor.server.request.receive +import io.ktor.server.response.respondText import io.ktor.server.response.respond import io.ktor.server.response.respondRedirect +import io.ktor.server.routing.delete import io.ktor.server.routing.Route import io.ktor.server.routing.get import io.ktor.server.routing.post @@ -47,4 +51,22 @@ fun Route.jobRoutes(jobService: JobService) { ) call.respondRedirect(artifactUrl, permanent = false) } + + post("/jobs/{id}/cancel") { + val id = call.parameters["id"] ?: return@post call.respond(HttpStatusCode.BadRequest, mapOf("error" to "id is required")) + when (jobService.cancel(id)) { + CancelJobResult.CANCELLED -> call.respond(HttpStatusCode.Accepted, mapOf("status" to "cancelled")) + CancelJobResult.NOT_CANCELLABLE -> call.respond(HttpStatusCode.Conflict, mapOf("error" to "job is not cancellable")) + CancelJobResult.NOT_FOUND -> call.respond(HttpStatusCode.NotFound, mapOf("error" to "job not found")) + } + } + + delete("/jobs/{id}") { + val id = call.parameters["id"] ?: return@delete call.respond(HttpStatusCode.BadRequest, mapOf("error" to "id is required")) + when (jobService.delete(id)) { + DeleteJobResult.DELETED -> call.respondText("", status = HttpStatusCode.NoContent) + DeleteJobResult.CONFLICT_RUNNING -> call.respond(HttpStatusCode.Conflict, mapOf("error" to "job is running")) + DeleteJobResult.NOT_FOUND -> call.respond(HttpStatusCode.NotFound, mapOf("error" to "job not found")) + } + } } diff --git a/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt b/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt index 87176da..e99f69b 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt @@ -12,6 +12,7 @@ import software.amazon.awssdk.services.s3.model.GetObjectRequest import software.amazon.awssdk.services.s3.model.HeadBucketRequest import software.amazon.awssdk.services.s3.model.PutObjectRequest import software.amazon.awssdk.services.s3.model.S3Exception +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest import software.amazon.awssdk.services.s3.presigner.S3Presigner import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest import java.net.URI @@ -68,6 +69,11 @@ class GarageStorageService(config: AppConfig) { return presigner.presignGetObject(presignRequest).url().toString() } + fun deleteObject(objectKey: String) { + val request = DeleteObjectRequest.builder().bucket(bucket).key(objectKey).build() + s3.deleteObject(request) + } + fun close() { presigner.close() s3.close() diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt index 818ac6b..e3f5ea2 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt @@ -6,12 +6,17 @@ import dev.typetype.downloader.db.JobsRepository import dev.typetype.downloader.models.CreateJobResponse import dev.typetype.downloader.models.JobOptions import dev.typetype.downloader.models.JobResponse +import dev.typetype.downloader.models.JobStatus import redis.clients.jedis.JedisPooled import java.net.URI import java.time.Duration import java.time.Instant import java.util.UUID +enum class CancelJobResult { NOT_FOUND, NOT_CANCELLABLE, CANCELLED } + +enum class DeleteJobResult { NOT_FOUND, CONFLICT_RUNNING, DELETED } + class JobService( private val jobsRepository: JobsRepository, private val redis: JedisPooled, @@ -27,7 +32,7 @@ class JobService( val id = UUID.randomUUID().toString() val reusable = jobsRepository.findReusableByCacheKey(cacheKey) if (reusable != null) { - jobsRepository.insertDoneFromCache(id = id, url = resolvedUrl, cached = reusable) + jobsRepository.insertDoneFromCache(id = id, url = resolvedUrl, optionsJson = optionsJson, cached = reusable) redis.setex(redisJobKey(id), config.jobTtlSeconds, "done:cached") return CreateJobResponse(id = id, cached = true) } @@ -35,7 +40,7 @@ class JobService( if (queueSize >= config.maxQueueSize) { throw QueueSaturatedException("Queue is full") } - jobsRepository.insertQueued(id = id, url = resolvedUrl, cacheKey = cacheKey) + jobsRepository.insertQueued(id = id, url = resolvedUrl, cacheKey = cacheKey, optionsJson = optionsJson) val payload = JobOptionsCodec.encodeQueue(JobOptionsCodec.QueuePayload(id = id, options = options)) redis.rpush(config.redisQueueKey, payload) redis.setex(redisJobKey(id), config.jobTtlSeconds, "queued") @@ -47,6 +52,38 @@ class JobService( return row.toResponse(presignUrl(row), row.artifactExpiresAt?.toString()) } + fun cancel(id: String): CancelJobResult { + val row = jobsRepository.getById(id) ?: return CancelJobResult.NOT_FOUND + if (row.status == JobStatus.DONE || row.status == JobStatus.FAILED) { + return CancelJobResult.NOT_CANCELLABLE + } + return if (jobsRepository.markCancelled(id)) { + redis.setex(redisJobKey(id), config.jobTtlSeconds, "failed:0") + CancelJobResult.CANCELLED + } else { + CancelJobResult.NOT_CANCELLABLE + } + } + + fun delete(id: String): DeleteJobResult { + val existing = jobsRepository.getById(id) ?: return DeleteJobResult.NOT_FOUND + if (existing.status == JobStatus.RUNNING) return DeleteJobResult.CONFLICT_RUNNING + val deleted = jobsRepository.deleteIfNotRunning(id) ?: return DeleteJobResult.NOT_FOUND + deleted.artifactKey?.let { storageService.deleteObject(it) } + redis.del(redisJobKey(id)) + return DeleteJobResult.DELETED + } + + fun recoverPendingJobs() { + jobsRepository.resetRunningToQueued() + jobsRepository.listQueuedOrRunning().forEach { row -> + val options = runCatching { JobOptionsCodec.decode(row.optionsJson) }.getOrElse { JobOptions() } + val payload = JobOptionsCodec.encodeQueue(JobOptionsCodec.QueuePayload(id = row.id, options = options)) + redis.rpush(config.redisQueueKey, payload) + redis.setex(redisJobKey(row.id), config.jobTtlSeconds, "queued") + } + } + private fun validateUrl(url: String) { val parsed = runCatching { URI(url) }.getOrElse { throw IllegalArgumentException("Invalid URL") } val scheme = parsed.scheme?.lowercase() ?: throw IllegalArgumentException("Invalid URL") diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt b/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt index 6e2e17e..7de2e70 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt @@ -25,7 +25,7 @@ class JobWorker( val raw = item.getOrNull(1) ?: continue val payload = JobOptionsCodec.decodeQueue(raw) val id = payload?.id ?: raw - val options = payload?.options ?: JobOptions() + val options = payload?.options ?: decodeStoredOptions(id) process(id, options) } } @@ -34,12 +34,14 @@ class JobWorker( private fun process(id: String, options: JobOptions) { val job = jobsRepository.getById(id) ?: return - jobsRepository.markRunning(id) + if (!jobsRepository.markRunningIfQueued(id)) return redis.setex(redisJobKey(id), config.jobTtlSeconds, "running") try { val startedAt = System.nanoTime() val token = tokenServiceClient.fetchForUrl(job.url) - val result = ytDlpService.download(job.url, token, options) + val result = ytDlpService.download(job.url, token, options) { + jobsRepository.getById(id)?.status != JobStatus.RUNNING + } val durationMs = (System.nanoTime() - startedAt) / 1_000_000 val status = if (result.error == null) JobStatus.DONE else JobStatus.FAILED val artifact = if (status == JobStatus.DONE && result.filePath != null) { @@ -47,7 +49,7 @@ class JobWorker( } else { null } - jobsRepository.markFinished( + val updated = jobsRepository.markFinishedIfRunning( id = id, status = status, durationMs = durationMs, @@ -56,10 +58,15 @@ class JobWorker( artifactKey = artifact?.objectKey, artifactExpiresAt = artifact?.expiresAt, ) - redis.setex(redisJobKey(id), config.jobTtlSeconds, "${status.name.lowercase()}:$durationMs") + if (!updated && artifact != null) { + storageService.deleteObject(artifact.objectKey) + } + if (updated) { + redis.setex(redisJobKey(id), config.jobTtlSeconds, "${status.name.lowercase()}:$durationMs") + } result.filePath?.parent?.let { deleteDirectory(it) } } catch (error: Throwable) { - jobsRepository.markFinished( + jobsRepository.markFinishedIfRunning( id = id, status = JobStatus.FAILED, durationMs = 0, @@ -72,6 +79,11 @@ class JobWorker( } } + private fun decodeStoredOptions(id: String): JobOptions { + val row = jobsRepository.getById(id) ?: return JobOptions() + return runCatching { JobOptionsCodec.decode(row.optionsJson) }.getOrElse { JobOptions() } + } + private fun uploadArtifact(cacheKey: String, filePath: java.nio.file.Path): StorageArtifact { val expiresAt = Instant.now().plusSeconds(config.s3ArtifactTtlSeconds) val extension = filePath.fileName.toString().substringAfterLast('.', "bin") diff --git a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt index df640b6..b387060 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt @@ -14,18 +14,23 @@ data class YtDlpResult( ) class YtDlpService(private val config: AppConfig) { - fun download(url: String, token: TokenPayload?, options: JobOptions): YtDlpResult { + fun download(url: String, token: TokenPayload?, options: JobOptions, shouldCancel: () -> Boolean = { false }): YtDlpResult { val workDir = Files.createTempDirectory("typetype-download-") val process = ProcessBuilder(buildCommand(url, workDir, token, options)) .directory(workDir.toFile()) .redirectErrorStream(true) .start() - val finished = process.waitFor(config.ytdlpTimeoutSeconds, TimeUnit.SECONDS) + val finished = waitFor(process, config.ytdlpTimeoutSeconds, shouldCancel) if (!finished) { process.destroyForcibly() deleteDirectory(workDir) return YtDlpResult(title = "", filePath = null, error = "yt-dlp timeout") } + if (shouldCancel()) { + process.destroyForcibly() + deleteDirectory(workDir) + return YtDlpResult(title = "", filePath = null, error = "job cancelled") + } val output = process.inputStream.bufferedReader().readLines() if (process.exitValue() != 0) { deleteDirectory(workDir) @@ -54,8 +59,20 @@ class YtDlpService(private val config: AppConfig) { ) when { options.thumbnailOnly -> command.addAll(listOf("--skip-download", "--write-thumbnail")) - options.mode == DownloadMode.AUDIO -> command.addAll(listOf("-f", "bestaudio/best", "--extract-audio", "--audio-format", "mp3")) - else -> command.addAll(listOf("-f", "bv*+ba/b", "--merge-output-format", "mp4")) + options.mode == DownloadMode.AUDIO -> { + val selector = if (options.quality.lowercase() == "worst") "worstaudio/worst" else "bestaudio/best" + command.addAll(listOf("-f", selector, "--extract-audio", "--audio-format", options.format.trim().ifBlank { "mp3" })) + } + else -> { + val selector = when (options.quality.lowercase()) { + "1080p" -> "bv*[height<=1080]+ba/b[height<=1080]" + "720p" -> "bv*[height<=720]+ba/b[height<=720]" + "480p" -> "bv*[height<=480]+ba/b[height<=480]" + "worst" -> "worst" + else -> "bv*+ba/b" + } + command.addAll(listOf("-f", selector, "--merge-output-format", options.format.trim().ifBlank { "mp4" })) + } } if (options.sponsorBlock && !options.thumbnailOnly) { command.addAll(listOf("--sponsorblock-remove", options.sponsorBlockCategories.joinToString(","))) @@ -87,9 +104,15 @@ class YtDlpService(private val config: AppConfig) { } private fun isTitleLine(value: String): Boolean = value.isNotBlank() && !value.startsWith("[") - + private fun waitFor(process: Process, timeoutSeconds: Long, shouldCancel: () -> Boolean): Boolean { + val deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds) + while (System.nanoTime() < deadline) { + if (shouldCancel()) return true + if (process.waitFor(1, TimeUnit.SECONDS)) return true + } + return false + } private fun ext(path: Path): String = path.fileName.toString().substringAfterLast('.', "").lowercase() - private fun deleteDirectory(dir: Path) { if (!Files.exists(dir)) return Files.walk(dir).sorted(Comparator.reverseOrder()).forEach { Files.deleteIfExists(it) } From 92f32fa7aded0825deec1e2cb34f427bdd0ff56c Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 19:14:14 +0200 Subject: [PATCH 08/16] test: cover job cancellation deletion and recovery flows --- build.gradle.kts | 1 + .../services/JobServiceControlTest.kt | 93 +++++++++++++++++++ .../services/JobServiceRecoveryTest.kt | 87 +++++++++++++++++ 3 files changed, 181 insertions(+) create mode 100644 src/test/kotlin/dev/typetype/downloader/services/JobServiceControlTest.kt create mode 100644 src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt diff --git a/build.gradle.kts b/build.gradle.kts index 9d0bcaa..397e190 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -25,6 +25,7 @@ dependencies { implementation("software.amazon.awssdk:s3:2.31.69") implementation("ch.qos.logback:logback-classic:1.5.20") testImplementation(kotlin("test")) + testImplementation("io.mockk:mockk:1.13.12") } application { diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobServiceControlTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobServiceControlTest.kt new file mode 100644 index 0000000..60301f2 --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/services/JobServiceControlTest.kt @@ -0,0 +1,93 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.config.AppConfig +import dev.typetype.downloader.db.JobRow +import dev.typetype.downloader.db.JobsRepository +import dev.typetype.downloader.models.JobStatus +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import java.time.Instant +import kotlin.test.Test +import kotlin.test.assertEquals + +class JobServiceControlTest { + private val jobsRepository = mockk() + private val redis = mockk(relaxed = true) + private val storage = mockk(relaxed = true) + private val service = JobService(jobsRepository, redis, storage, config()) + + @Test + fun `cancel returns not found when job does not exist`() { + every { jobsRepository.getById("missing") } returns null + assertEquals(CancelJobResult.NOT_FOUND, service.cancel("missing")) + } + + @Test + fun `cancel returns conflict for done job`() { + every { jobsRepository.getById("done") } returns row("done", JobStatus.DONE) + assertEquals(CancelJobResult.NOT_CANCELLABLE, service.cancel("done")) + verify(exactly = 0) { jobsRepository.markCancelled(any()) } + } + + @Test + fun `cancel marks queued job as cancelled`() { + every { jobsRepository.getById("queued") } returns row("queued", JobStatus.QUEUED) + every { jobsRepository.markCancelled("queued") } returns true + assertEquals(CancelJobResult.CANCELLED, service.cancel("queued")) + verify { redis.setex("downloader:job:queued", 600L, "failed:0") } + } + + @Test + fun `delete rejects running job`() { + every { jobsRepository.getById("run") } returns row("run", JobStatus.RUNNING) + assertEquals(DeleteJobResult.CONFLICT_RUNNING, service.delete("run")) + verify(exactly = 0) { jobsRepository.deleteIfNotRunning(any()) } + } + + @Test + fun `delete removes artifact and redis state`() { + val existing = row("x", JobStatus.FAILED, artifactKey = "cache/x.mp4") + every { jobsRepository.getById("x") } returns existing + every { jobsRepository.deleteIfNotRunning("x") } returns existing + assertEquals(DeleteJobResult.DELETED, service.delete("x")) + verify { storage.deleteObject("cache/x.mp4") } + verify { redis.del("downloader:job:x") } + } + + private fun row(id: String, status: JobStatus, artifactKey: String? = null): JobRow = JobRow( + id = id, + url = "https://www.youtube.com/watch?v=test", + cacheKey = "cache-$id", + optionsJson = "{}", + status = status, + durationMs = 0, + title = "", + error = null, + artifactKey = artifactKey, + artifactExpiresAt = Instant.now().plusSeconds(300), + ) + + private fun config(): AppConfig = AppConfig( + httpPort = 18093, + dbUrl = "jdbc:postgresql://localhost:55432/typetype_downloader", + dbUser = "typetype", + dbPassword = "typetype", + redisHost = "localhost", + redisPort = 56379, + redisQueueKey = "downloader:queue", + maxConcurrentWorkers = 2, + maxQueueSize = 100, + jobTtlSeconds = 600, + ytdlpBin = "yt-dlp", + ytdlpTimeoutSeconds = 60, + enableTranscode = false, + s3Endpoint = "http://localhost:3900", + s3Region = "garage", + s3Bucket = "typetype-downloads", + s3AccessKey = "k", + s3SecretKey = "s", + s3ArtifactTtlSeconds = 7200, + tokenServiceUrl = "http://localhost:8081", + ) +} diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt new file mode 100644 index 0000000..32821df --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt @@ -0,0 +1,87 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.config.AppConfig +import dev.typetype.downloader.db.JobRow +import dev.typetype.downloader.db.JobsRepository +import dev.typetype.downloader.models.JobOptions +import dev.typetype.downloader.models.JobStatus +import io.mockk.every +import io.mockk.mockk +import io.mockk.slot +import io.mockk.verify +import java.time.Instant +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class JobServiceRecoveryTest { + private val jobsRepository = mockk(relaxed = true) + private val redis = mockk(relaxed = true) + private val storage = mockk(relaxed = true) + private val service = JobService(jobsRepository, redis, storage, config()) + + @Test + fun `enqueue stores normalized options json`() { + val optionsSlot = slot() + every { jobsRepository.findReusableByCacheKey(any()) } returns null + every { redis.llen("downloader:queue") } returns 0L + every { jobsRepository.insertQueued(any(), any(), any(), capture(optionsSlot)) } returns Unit + val options = JobOptions(quality = "720p", format = "webm", sponsorBlock = true) + val created = service.enqueue("https://www.youtube.com/watch?v=dQw4w9WgXcQ", options) + val decoded = JobOptionsCodec.decode(optionsSlot.captured) + assertFalse(created.cached) + assertEquals("720p", decoded.quality) + assertEquals("webm", decoded.format) + assertTrue(decoded.sponsorBlockCategories.isNotEmpty()) + } + + @Test + fun `recovery requeues pending jobs with stored options`() { + val one = row("a", JobStatus.QUEUED, JobOptionsCodec.encode(JobOptions(quality = "1080p"))) + val two = row("b", JobStatus.RUNNING, JobOptionsCodec.encode(JobOptions(mode = dev.typetype.downloader.models.DownloadMode.AUDIO))) + val payloads = mutableListOf() + every { jobsRepository.listQueuedOrRunning() } returns listOf(one, two) + every { redis.rpush("downloader:queue", capture(payloads)) } returns 1L + service.recoverPendingJobs() + verify { jobsRepository.resetRunningToQueued() } + verify(exactly = 2) { redis.setex(any(), 600L, "queued") } + assertEquals(setOf("a", "b"), payloads.mapNotNull { JobOptionsCodec.decodeQueue(it)?.id }.toSet()) + } + + private fun row(id: String, status: JobStatus, optionsJson: String): JobRow = JobRow( + id = id, + url = "https://www.youtube.com/watch?v=$id", + cacheKey = "cache-$id", + optionsJson = optionsJson, + status = status, + durationMs = 0, + title = "", + error = null, + artifactKey = null, + artifactExpiresAt = Instant.now().plusSeconds(300), + ) + + private fun config(): AppConfig = AppConfig( + httpPort = 18093, + dbUrl = "jdbc:postgresql://localhost:55432/typetype_downloader", + dbUser = "typetype", + dbPassword = "typetype", + redisHost = "localhost", + redisPort = 56379, + redisQueueKey = "downloader:queue", + maxConcurrentWorkers = 2, + maxQueueSize = 100, + jobTtlSeconds = 600, + ytdlpBin = "yt-dlp", + ytdlpTimeoutSeconds = 60, + enableTranscode = false, + s3Endpoint = "http://localhost:3900", + s3Region = "garage", + s3Bucket = "typetype-downloads", + s3AccessKey = "k", + s3SecretKey = "s", + s3ArtifactTtlSeconds = 7200, + tokenServiceUrl = "http://localhost:8081", + ) +} From e4e30b5ce7acf0bf0821824d507d74907bd1006e Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 19:48:40 +0200 Subject: [PATCH 09/16] fix: normalize quality and format option defaults --- .../services/JobOptionsNormalizer.kt | 34 +++++++++++++++++- .../services/JobOptionsNormalizerTest.kt | 36 +++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt b/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt index 8052374..d4e3c21 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt @@ -1,9 +1,15 @@ package dev.typetype.downloader.services +import dev.typetype.downloader.models.DownloadMode import dev.typetype.downloader.models.JobOptions import dev.typetype.downloader.models.SubtitlesOptions object JobOptionsNormalizer { + private val allowedVideoQualities = setOf("best", "1080p", "720p", "480p", "worst") + private val allowedAudioQualities = setOf("best", "worst") + private val allowedVideoFormats = setOf("mp4", "webm", "mkv", "mov") + private val allowedAudioFormats = setOf("mp3", "m4a", "aac", "opus", "flac", "wav") + private val defaultSponsorBlockCategories = listOf( "sponsor", "selfpromo", @@ -18,12 +24,38 @@ object JobOptionsNormalizer { private val allowedSponsorBlockCategories = defaultSponsorBlockCategories.toSet() fun normalize(options: JobOptions): JobOptions { + val quality = normalizeQuality(options) + val format = normalizeFormat(options) val subtitles = normalizeSubtitles(options.subtitles) val sponsorBlockCategories = normalizeSponsorBlockCategories(options) - return options.copy(subtitles = subtitles, sponsorBlockCategories = sponsorBlockCategories) + return options.copy( + quality = quality, + format = format, + subtitles = subtitles, + sponsorBlockCategories = sponsorBlockCategories, + ) + } + + private fun normalizeQuality(options: JobOptions): String { + if (options.thumbnailOnly) return "best" + val raw = options.quality.trim().lowercase().ifBlank { "best" } + val allowed = if (options.mode == DownloadMode.AUDIO) allowedAudioQualities else allowedVideoQualities + return if (raw in allowed) raw else "best" + } + + private fun normalizeFormat(options: JobOptions): String { + if (options.thumbnailOnly) return "" + val raw = options.format.trim().lowercase() + if (options.mode == DownloadMode.AUDIO) { + if (raw.isBlank()) return "mp3" + return if (raw in allowedAudioFormats) raw else "mp3" + } + if (raw.isBlank()) return "mp4" + return if (raw in allowedVideoFormats) raw else "mp4" } private fun normalizeSubtitles(input: SubtitlesOptions): SubtitlesOptions { + if (!input.enabled) return SubtitlesOptions() val langs = input.languages.map { it.trim() }.filter { it.isNotBlank() }.ifEmpty { listOf("en") } val format = input.format.trim().ifBlank { "srt" }.lowercase() return input.copy(languages = langs, format = format) diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt index 7ec417e..d4ad818 100644 --- a/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt +++ b/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt @@ -1,5 +1,6 @@ package dev.typetype.downloader.services +import dev.typetype.downloader.models.DownloadMode import dev.typetype.downloader.models.JobOptions import dev.typetype.downloader.models.SubtitlesOptions import kotlin.test.Test @@ -38,4 +39,39 @@ class JobOptionsNormalizerTest { val normalized = JobOptionsNormalizer.normalize(input) assertEquals(listOf("sponsor", "intro"), normalized.sponsorBlockCategories) } + + @Test + fun `normalizes video quality and format to allowed values`() { + val normalized = JobOptionsNormalizer.normalize(JobOptions(quality = " 2160P ", format = "avi")) + assertEquals("best", normalized.quality) + assertEquals("mp4", normalized.format) + } + + @Test + fun `normalizes audio defaults and allowed audio format`() { + val normalized = JobOptionsNormalizer.normalize( + JobOptions(mode = DownloadMode.AUDIO, quality = "", format = " M4A "), + ) + assertEquals("best", normalized.quality) + assertEquals("m4a", normalized.format) + } + + @Test + fun `thumbnail only ignores quality and format`() { + val normalized = JobOptionsNormalizer.normalize( + JobOptions(thumbnailOnly = true, quality = "1080p", format = "webm"), + ) + assertEquals("best", normalized.quality) + assertEquals("", normalized.format) + } + + @Test + fun `disabled subtitles reset to stable defaults`() { + val normalized = JobOptionsNormalizer.normalize( + JobOptions( + subtitles = SubtitlesOptions(enabled = false, auto = true, embed = true, languages = listOf("fr"), format = "vtt"), + ) + ) + assertEquals(SubtitlesOptions(), normalized.subtitles) + } } From 92e375529aa3a5a8c16c140975b433ffdbf80ed2 Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 19:48:50 +0200 Subject: [PATCH 10/16] ops: rely on Garage image default entrypoint --- docker-compose.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 037cbfd..cb494f1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,7 +20,6 @@ services: garage: image: dxflrs/garage:v2.2.0 - command: ["server"] ports: - "3900:3900" - "3901:3901" From 566a9bac80de9581c8c611f0c4ea81052ad6078f Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 19:57:21 +0200 Subject: [PATCH 11/16] fix: clear redis queue before startup recovery requeue --- src/main/kotlin/dev/typetype/downloader/services/JobService.kt | 1 + .../dev/typetype/downloader/services/JobServiceRecoveryTest.kt | 1 + 2 files changed, 2 insertions(+) diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt index e3f5ea2..bf2dbe8 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt @@ -76,6 +76,7 @@ class JobService( fun recoverPendingJobs() { jobsRepository.resetRunningToQueued() + redis.del(config.redisQueueKey) jobsRepository.listQueuedOrRunning().forEach { row -> val options = runCatching { JobOptionsCodec.decode(row.optionsJson) }.getOrElse { JobOptions() } val payload = JobOptionsCodec.encodeQueue(JobOptionsCodec.QueuePayload(id = row.id, options = options)) diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt index 32821df..e72e247 100644 --- a/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt +++ b/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt @@ -45,6 +45,7 @@ class JobServiceRecoveryTest { every { redis.rpush("downloader:queue", capture(payloads)) } returns 1L service.recoverPendingJobs() verify { jobsRepository.resetRunningToQueued() } + verify { redis.del("downloader:queue") } verify(exactly = 2) { redis.setex(any(), 600L, "queued") } assertEquals(setOf("a", "b"), payloads.mapNotNull { JobOptionsCodec.decodeQueue(it)?.id }.toSet()) } From 7ebd292ece38108ccdd207214732925bb3d15e50 Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 20:05:02 +0200 Subject: [PATCH 12/16] fix: normalize legacy options during recovery and dequeue --- .../dev/typetype/downloader/services/JobService.kt | 2 +- .../dev/typetype/downloader/services/JobWorker.kt | 4 ++-- .../downloader/services/JobServiceRecoveryTest.kt | 12 ++++++++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt index bf2dbe8..99bb46a 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt @@ -78,7 +78,7 @@ class JobService( jobsRepository.resetRunningToQueued() redis.del(config.redisQueueKey) jobsRepository.listQueuedOrRunning().forEach { row -> - val options = runCatching { JobOptionsCodec.decode(row.optionsJson) }.getOrElse { JobOptions() } + val options = runCatching { JobOptionsCodec.decode(row.optionsJson) }.map(JobOptionsNormalizer::normalize).getOrElse { JobOptions() } val payload = JobOptionsCodec.encodeQueue(JobOptionsCodec.QueuePayload(id = row.id, options = options)) redis.rpush(config.redisQueueKey, payload) redis.setex(redisJobKey(row.id), config.jobTtlSeconds, "queued") diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt b/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt index 7de2e70..ed54340 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt @@ -25,7 +25,7 @@ class JobWorker( val raw = item.getOrNull(1) ?: continue val payload = JobOptionsCodec.decodeQueue(raw) val id = payload?.id ?: raw - val options = payload?.options ?: decodeStoredOptions(id) + val options = payload?.options?.let(JobOptionsNormalizer::normalize) ?: decodeStoredOptions(id) process(id, options) } } @@ -81,7 +81,7 @@ class JobWorker( private fun decodeStoredOptions(id: String): JobOptions { val row = jobsRepository.getById(id) ?: return JobOptions() - return runCatching { JobOptionsCodec.decode(row.optionsJson) }.getOrElse { JobOptions() } + return runCatching { JobOptionsCodec.decode(row.optionsJson) }.map(JobOptionsNormalizer::normalize).getOrElse { JobOptions() } } private fun uploadArtifact(cacheKey: String, filePath: java.nio.file.Path): StorageArtifact { diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt index e72e247..4d71f2e 100644 --- a/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt +++ b/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt @@ -39,7 +39,11 @@ class JobServiceRecoveryTest { @Test fun `recovery requeues pending jobs with stored options`() { val one = row("a", JobStatus.QUEUED, JobOptionsCodec.encode(JobOptions(quality = "1080p"))) - val two = row("b", JobStatus.RUNNING, JobOptionsCodec.encode(JobOptions(mode = dev.typetype.downloader.models.DownloadMode.AUDIO))) + val two = row( + "b", + JobStatus.RUNNING, + JobOptionsCodec.encode(JobOptions(mode = dev.typetype.downloader.models.DownloadMode.AUDIO, quality = "1080p", format = "avi")), + ) val payloads = mutableListOf() every { jobsRepository.listQueuedOrRunning() } returns listOf(one, two) every { redis.rpush("downloader:queue", capture(payloads)) } returns 1L @@ -47,7 +51,11 @@ class JobServiceRecoveryTest { verify { jobsRepository.resetRunningToQueued() } verify { redis.del("downloader:queue") } verify(exactly = 2) { redis.setex(any(), 600L, "queued") } - assertEquals(setOf("a", "b"), payloads.mapNotNull { JobOptionsCodec.decodeQueue(it)?.id }.toSet()) + val queued = payloads.mapNotNull { JobOptionsCodec.decodeQueue(it) } + assertEquals(setOf("a", "b"), queued.map { it.id }.toSet()) + val recoveredAudio = queued.first { it.id == "b" }.options + assertEquals("best", recoveredAudio.quality) + assertEquals("mp3", recoveredAudio.format) } private fun row(id: String, status: JobStatus, optionsJson: String): JobRow = JobRow( From 03b92b43a7b0699b18184d66597debe92876d06f Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 20:11:12 +0200 Subject: [PATCH 13/16] fix: tighten yt-dlp selector and extension resolution --- .../services/YtDlpOptionResolver.kt | 41 +++++++++++++++++++ .../downloader/services/YtDlpService.kt | 25 +++++------ .../services/YtDlpOptionResolverTest.kt | 39 ++++++++++++++++++ 3 files changed, 90 insertions(+), 15 deletions(-) create mode 100644 src/main/kotlin/dev/typetype/downloader/services/YtDlpOptionResolver.kt create mode 100644 src/test/kotlin/dev/typetype/downloader/services/YtDlpOptionResolverTest.kt diff --git a/src/main/kotlin/dev/typetype/downloader/services/YtDlpOptionResolver.kt b/src/main/kotlin/dev/typetype/downloader/services/YtDlpOptionResolver.kt new file mode 100644 index 0000000..604120f --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/YtDlpOptionResolver.kt @@ -0,0 +1,41 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.models.DownloadMode +import dev.typetype.downloader.models.JobOptions + +object YtDlpOptionResolver { + private val audioFormats = setOf("mp3", "m4a", "aac", "opus", "flac", "wav") + private val videoFormats = setOf("mp4", "webm", "mkv", "mov") + + fun audioSelector(quality: String): String = if (quality.lowercase() == "worst") "worstaudio/worst" else "bestaudio/best" + + fun videoSelector(quality: String): String = when (quality.lowercase()) { + "1080p" -> "bv*[height<=1080]+ba/b[height<=1080]" + "720p" -> "bv*[height<=720]+ba/b[height<=720]" + "480p" -> "bv*[height<=480]+ba/b[height<=480]" + "worst" -> "worst" + else -> "bv*+ba/b" + } + + fun audioFormat(raw: String): String { + val value = raw.trim().lowercase() + return if (value in audioFormats) value else "mp3" + } + + fun videoFormat(raw: String): String { + val value = raw.trim().lowercase() + return if (value in videoFormats) value else "mp4" + } + + fun preferredExtensions(options: JobOptions): List = when { + options.thumbnailOnly -> listOf("jpg", "jpeg", "png", "webp") + options.mode == DownloadMode.AUDIO -> { + val requested = audioFormat(options.format) + listOf(requested, "mp3", "m4a", "opus", "aac", "flac", "wav", "webm").distinct() + } + else -> { + val requested = videoFormat(options.format) + listOf(requested, "mp4", "mkv", "webm", "mov").distinct() + } + } +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt index b387060..ea322a2 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt @@ -60,18 +60,14 @@ class YtDlpService(private val config: AppConfig) { when { options.thumbnailOnly -> command.addAll(listOf("--skip-download", "--write-thumbnail")) options.mode == DownloadMode.AUDIO -> { - val selector = if (options.quality.lowercase() == "worst") "worstaudio/worst" else "bestaudio/best" - command.addAll(listOf("-f", selector, "--extract-audio", "--audio-format", options.format.trim().ifBlank { "mp3" })) + val selector = YtDlpOptionResolver.audioSelector(options.quality) + val audioFormat = YtDlpOptionResolver.audioFormat(options.format) + command.addAll(listOf("-f", selector, "--extract-audio", "--audio-format", audioFormat)) } else -> { - val selector = when (options.quality.lowercase()) { - "1080p" -> "bv*[height<=1080]+ba/b[height<=1080]" - "720p" -> "bv*[height<=720]+ba/b[height<=720]" - "480p" -> "bv*[height<=480]+ba/b[height<=480]" - "worst" -> "worst" - else -> "bv*+ba/b" - } - command.addAll(listOf("-f", selector, "--merge-output-format", options.format.trim().ifBlank { "mp4" })) + val selector = YtDlpOptionResolver.videoSelector(options.quality) + val videoFormat = YtDlpOptionResolver.videoFormat(options.format) + command.addAll(listOf("-f", selector, "--merge-output-format", videoFormat)) } } if (options.sponsorBlock && !options.thumbnailOnly) { @@ -95,12 +91,11 @@ class YtDlpService(private val config: AppConfig) { private fun selectOutputFile(workDir: Path, options: JobOptions): Path? { val files = Files.list(workDir).use { stream -> stream.filter { Files.isRegularFile(it) }.toList() } if (files.isEmpty()) return null - val preferred = when { - options.thumbnailOnly -> setOf("jpg", "jpeg", "png", "webp") - options.mode == DownloadMode.AUDIO -> setOf("mp3", "m4a", "opus", "aac", "flac", "wav", "webm") - else -> setOf("mp4", "mkv", "webm", "mov") + val preferred = YtDlpOptionResolver.preferredExtensions(options) + preferred.forEach { wanted -> + files.firstOrNull { ext(it) == wanted }?.let { return it } } - return files.firstOrNull { ext(it) in preferred } ?: files.maxByOrNull { Files.size(it) } + return files.maxByOrNull { Files.size(it) } } private fun isTitleLine(value: String): Boolean = value.isNotBlank() && !value.startsWith("[") diff --git a/src/test/kotlin/dev/typetype/downloader/services/YtDlpOptionResolverTest.kt b/src/test/kotlin/dev/typetype/downloader/services/YtDlpOptionResolverTest.kt new file mode 100644 index 0000000..703f686 --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/services/YtDlpOptionResolverTest.kt @@ -0,0 +1,39 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.models.DownloadMode +import dev.typetype.downloader.models.JobOptions +import kotlin.test.Test +import kotlin.test.assertEquals + +class YtDlpOptionResolverTest { + @Test + fun `resolves audio selector and format defaults`() { + assertEquals("worstaudio/worst", YtDlpOptionResolver.audioSelector("worst")) + assertEquals("bestaudio/best", YtDlpOptionResolver.audioSelector("1080p")) + assertEquals("mp3", YtDlpOptionResolver.audioFormat("avi")) + assertEquals("m4a", YtDlpOptionResolver.audioFormat(" M4A ")) + } + + @Test + fun `resolves video selector and format defaults`() { + assertEquals("bv*[height<=720]+ba/b[height<=720]", YtDlpOptionResolver.videoSelector("720p")) + assertEquals("bv*+ba/b", YtDlpOptionResolver.videoSelector("best")) + assertEquals("mp4", YtDlpOptionResolver.videoFormat("unknown")) + assertEquals("webm", YtDlpOptionResolver.videoFormat(" WEBM ")) + } + + @Test + fun `preferred extensions prioritize requested normalized format`() { + val audio = YtDlpOptionResolver.preferredExtensions( + JobOptions(mode = DownloadMode.AUDIO, quality = "best", format = "opus"), + ) + val video = YtDlpOptionResolver.preferredExtensions( + JobOptions(mode = DownloadMode.VIDEO, quality = "best", format = "mkv"), + ) + val thumbnail = YtDlpOptionResolver.preferredExtensions(JobOptions(thumbnailOnly = true)) + + assertEquals("opus", audio.first()) + assertEquals("mkv", video.first()) + assertEquals(listOf("jpg", "jpeg", "png", "webp"), thumbnail) + } +} From f10b1f5fc09ee9771ffa73ff63a5f441908d875e Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 21:13:33 +0200 Subject: [PATCH 14/16] fix: disable yt-dlp simulate mode in worker downloads --- src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt index ea322a2..047ccaa 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt @@ -49,6 +49,7 @@ class YtDlpService(private val config: AppConfig) { private fun buildCommand(url: String, workDir: Path, token: TokenPayload?, options: JobOptions): List { val command = mutableListOf( config.ytdlpBin, + "--no-simulate", "--no-warnings", "--no-playlist", "--no-progress", From 0c71223950e1d1c3c6d2ca3022ddb7fa6075f08b Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 21:19:13 +0200 Subject: [PATCH 15/16] fix: use non-chunked s3 requests for Garage compatibility --- .../downloader/services/GarageStorageService.kt | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt b/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt index e99f69b..474a02a 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt @@ -31,14 +31,26 @@ class GarageStorageService(config: AppConfig) { .endpointOverride(endpoint) .region(region) .credentialsProvider(credentials) - .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .serviceConfiguration( + S3Configuration.builder() + .pathStyleAccessEnabled(true) + .chunkedEncodingEnabled(false) + .checksumValidationEnabled(false) + .build(), + ) .build() private val presigner: S3Presigner = S3Presigner.builder() .endpointOverride(endpoint) .region(region) .credentialsProvider(credentials) - .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .serviceConfiguration( + S3Configuration.builder() + .pathStyleAccessEnabled(true) + .chunkedEncodingEnabled(false) + .checksumValidationEnabled(false) + .build(), + ) .build() fun ensureBucket() { From c0c495e2e2b1304be04b955bdbaffb81ecda9b99 Mon Sep 17 00:00:00 2001 From: Priveetee Date: Wed, 8 Apr 2026 21:23:25 +0200 Subject: [PATCH 16/16] fix: persist artifact expiry as sql timestamp --- .../dev/typetype/downloader/db/JobsRepository.kt | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt b/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt index 5d979bc..f066820 100644 --- a/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt +++ b/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt @@ -1,6 +1,8 @@ package dev.typetype.downloader.db import dev.typetype.downloader.models.JobStatus +import java.sql.Timestamp +import java.sql.Types import java.time.Instant class JobsRepository { @@ -23,7 +25,7 @@ class JobsRepository { connection.prepareStatement(sql).use { it.setString(1, id); it.setString(2, url); it.setString(3, cached.cacheKey); it.setString(4, optionsJson) it.setString(5, JobStatus.DONE.name); it.setLong(6, 0L); it.setString(7, cached.title); it.setString(8, null) - it.setString(9, cached.artifactKey); it.setObject(10, cached.artifactExpiresAt); it.executeUpdate() + it.setString(9, cached.artifactKey); setInstant(it, 10, cached.artifactExpiresAt); it.executeUpdate() } } } @@ -54,7 +56,7 @@ class JobsRepository { "UPDATE jobs SET status = ?, duration_ms = ?, title = ?, error = ?, artifact_key = ?, artifact_expires_at = ?, finished_at = NOW() WHERE id = ? AND status = ?", ) { it.setString(1, status.name); it.setLong(2, durationMs); it.setString(3, title); it.setString(4, error) - it.setString(5, artifactKey); it.setObject(6, artifactExpiresAt); it.setString(7, id); it.setString(8, JobStatus.RUNNING.name) + it.setString(5, artifactKey); setInstant(it, 6, artifactExpiresAt); it.setString(7, id); it.setString(8, JobStatus.RUNNING.name) } > 0 fun resetRunningToQueued(): Int = update( @@ -79,4 +81,12 @@ class JobsRepository { } } } + + private fun setInstant(statement: java.sql.PreparedStatement, index: Int, value: Instant?) { + if (value == null) { + statement.setNull(index, Types.TIMESTAMP_WITH_TIMEZONE) + return + } + statement.setTimestamp(index, Timestamp.from(value)) + } }