diff --git a/README.md b/README.md index 550104c..9c22e94 100644 --- a/README.md +++ b/README.md @@ -90,8 +90,19 @@ All configuration is via environment variables. ## API - `GET /health` -- `POST /jobs` with `{ "url": "..." }` -- `GET /jobs/{id}` +- `POST /jobs` accepts: + - `url` (required) + - `options.mode` (`video` or `audio`) + - `options.sponsorBlock` (`true`/`false`) + - `options.sponsorBlockCategories` (`sponsor,selfpromo,interaction,intro,outro,preview,filler,music_offtopic`) + - `options.thumbnailOnly` (`true`/`false`) + - `options.subtitles` (`enabled`, `auto`, `embed`, `languages`, `format`) + and returns `{ "id": "...", "cached": false|true }` +- `GET /jobs/{id}` returns one of `queued|running|done|failed` and includes a signed `artifactUrl` when available +- `GET /jobs/{id}/artifact` redirects to signed Garage artifact URL when ready + +Wrapper URLs are resolved automatically. For example, frontend watch wrappers such as +`https://watch.example/watch?v=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3D...` are normalized to the underlying source URL before processing. ## License diff --git a/build.gradle.kts b/build.gradle.kts index d24912b..397e190 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -22,8 +22,10 @@ dependencies { implementation("org.postgresql:postgresql:42.7.8") implementation("com.zaxxer:HikariCP:6.3.0") implementation("redis.clients:jedis:6.1.0") + implementation("software.amazon.awssdk:s3:2.31.69") implementation("ch.qos.logback:logback-classic:1.5.20") testImplementation(kotlin("test")) + testImplementation("io.mockk:mockk:1.13.12") } application { diff --git a/docker-compose.yml b/docker-compose.yml index 037cbfd..cb494f1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,7 +20,6 @@ services: garage: image: dxflrs/garage:v2.2.0 - command: ["server"] ports: - "3900:3900" - "3901:3901" diff --git a/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt b/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt index 6374826..c554b00 100644 --- a/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt +++ b/src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt @@ -7,6 +7,9 @@ import dev.typetype.downloader.routes.healthRoutes import dev.typetype.downloader.routes.jobRoutes import dev.typetype.downloader.services.JobService import dev.typetype.downloader.services.JobWorker +import dev.typetype.downloader.services.QueueSaturatedException +import dev.typetype.downloader.services.GarageStorageService +import dev.typetype.downloader.services.TokenServiceClient import dev.typetype.downloader.services.YtDlpService import io.ktor.http.HttpStatusCode import io.ktor.serialization.kotlinx.json.json @@ -24,15 +27,22 @@ fun Application.module() { val config = AppConfigLoader.load() Database.init(config) val redis = JedisPooled(config.redisHost, config.redisPort) + val storage = GarageStorageService(config) + storage.ensureBucket() val jobsRepository = JobsRepository() val ytDlpService = YtDlpService(config) - val jobService = JobService(jobsRepository, redis, config) - val worker = JobWorker(jobsRepository, redis, ytDlpService, config) + val tokenServiceClient = TokenServiceClient(config) + val jobService = JobService(jobsRepository, redis, storage, config) + val worker = JobWorker(jobsRepository, redis, ytDlpService, tokenServiceClient, storage, config) + jobService.recoverPendingJobs() worker.start() install(CallLogging) install(ContentNegotiation) { json() } install(StatusPages) { + exception { call, cause -> + call.respond(HttpStatusCode.TooManyRequests, mapOf("error" to (cause.message ?: "queue saturated"))) + } exception { call, cause -> call.respond(HttpStatusCode.BadRequest, mapOf("error" to (cause.message ?: "bad request"))) } @@ -44,6 +54,7 @@ fun Application.module() { } monitor.subscribe(ApplicationStopping) { + storage.close() redis.close() Database.close() } diff --git a/src/main/kotlin/dev/typetype/downloader/db/Database.kt b/src/main/kotlin/dev/typetype/downloader/db/Database.kt index db89696..221814d 100644 --- a/src/main/kotlin/dev/typetype/downloader/db/Database.kt +++ b/src/main/kotlin/dev/typetype/downloader/db/Database.kt @@ -25,17 +25,31 @@ object Database { CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, source_url TEXT NOT NULL, + cache_key TEXT NOT NULL, + options_json TEXT NOT NULL DEFAULT '{}', status TEXT NOT NULL, duration_ms BIGINT NOT NULL, title TEXT NOT NULL, error TEXT, + artifact_key TEXT, + artifact_expires_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), started_at TIMESTAMPTZ, finished_at TIMESTAMPTZ ) """.trimIndent() ) + statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS cache_key TEXT") + statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS options_json TEXT") + statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS artifact_key TEXT") + statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS artifact_expires_at TIMESTAMPTZ") + statement.execute("UPDATE jobs SET cache_key = id WHERE cache_key IS NULL") + statement.execute("UPDATE jobs SET options_json = '{}' WHERE options_json IS NULL") + statement.execute("ALTER TABLE jobs ALTER COLUMN cache_key SET NOT NULL") + statement.execute("ALTER TABLE jobs ALTER COLUMN options_json SET NOT NULL") statement.execute("CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)") + statement.execute("CREATE INDEX IF NOT EXISTS idx_jobs_cache_key ON jobs(cache_key)") + statement.execute("CREATE INDEX IF NOT EXISTS idx_jobs_artifact_expiry ON jobs(artifact_expires_at)") } } } diff --git a/src/main/kotlin/dev/typetype/downloader/db/JobRow.kt b/src/main/kotlin/dev/typetype/downloader/db/JobRow.kt index 7fbf0b8..e793c98 100644 --- a/src/main/kotlin/dev/typetype/downloader/db/JobRow.kt +++ b/src/main/kotlin/dev/typetype/downloader/db/JobRow.kt @@ -1,12 +1,17 @@ package dev.typetype.downloader.db import dev.typetype.downloader.models.JobStatus +import java.time.Instant data class JobRow( val id: String, val url: String, + val cacheKey: String, + val optionsJson: String, val status: JobStatus, val durationMs: Long, val title: String, val error: String?, + val artifactKey: String?, + val artifactExpiresAt: Instant?, ) diff --git a/src/main/kotlin/dev/typetype/downloader/db/JobRowMapper.kt b/src/main/kotlin/dev/typetype/downloader/db/JobRowMapper.kt new file mode 100644 index 0000000..c2d6adf --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/db/JobRowMapper.kt @@ -0,0 +1,17 @@ +package dev.typetype.downloader.db + +import dev.typetype.downloader.models.JobStatus +import java.sql.ResultSet + +fun rowFrom(rs: ResultSet): JobRow = JobRow( + id = rs.getString("id"), + url = rs.getString("source_url"), + cacheKey = rs.getString("cache_key"), + optionsJson = rs.getString("options_json"), + status = JobStatus.valueOf(rs.getString("status")), + durationMs = rs.getLong("duration_ms"), + title = rs.getString("title"), + error = rs.getString("error"), + artifactKey = rs.getString("artifact_key"), + artifactExpiresAt = rs.getTimestamp("artifact_expires_at")?.toInstant(), +) diff --git a/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt b/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt index 94c91a2..f066820 100644 --- a/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt +++ b/src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt @@ -1,67 +1,92 @@ package dev.typetype.downloader.db import dev.typetype.downloader.models.JobStatus +import java.sql.Timestamp +import java.sql.Types +import java.time.Instant class JobsRepository { - fun insertQueued(id: String, url: String) { + private val columns = "id, source_url, cache_key, options_json, status, duration_ms, title, error, artifact_key, artifact_expires_at" + + fun insertQueued(id: String, url: String, cacheKey: String, optionsJson: String) { + val sql = "INSERT INTO jobs (id, source_url, cache_key, options_json, status, duration_ms, title, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" Database.withConnection { connection -> - connection.prepareStatement( - "INSERT INTO jobs (id, source_url, status, duration_ms, title, error) VALUES (?, ?, ?, ?, ?, ?)" - ).use { statement -> - statement.setString(1, id) - statement.setString(2, url) - statement.setString(3, JobStatus.QUEUED.name) - statement.setLong(4, 0L) - statement.setString(5, "") - statement.setString(6, null) - statement.executeUpdate() + connection.prepareStatement(sql).use { + it.setString(1, id); it.setString(2, url); it.setString(3, cacheKey); it.setString(4, optionsJson) + it.setString(5, JobStatus.QUEUED.name); it.setLong(6, 0L); it.setString(7, ""); it.setString(8, null) + it.executeUpdate() } } } - fun getById(id: String): JobRow? = Database.withConnection { connection -> - connection.prepareStatement( - "SELECT id, source_url, status, duration_ms, title, error FROM jobs WHERE id = ?" - ).use { statement -> - statement.setString(1, id) - statement.executeQuery().use { rs -> - if (!rs.next()) return@use null - JobRow( - id = rs.getString("id"), - url = rs.getString("source_url"), - status = JobStatus.valueOf(rs.getString("status")), - durationMs = rs.getLong("duration_ms"), - title = rs.getString("title"), - error = rs.getString("error"), - ) + fun insertDoneFromCache(id: String, url: String, optionsJson: String, cached: JobRow) { + val sql = "INSERT INTO jobs (id, source_url, cache_key, options_json, status, duration_ms, title, error, artifact_key, artifact_expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + Database.withConnection { connection -> + connection.prepareStatement(sql).use { + it.setString(1, id); it.setString(2, url); it.setString(3, cached.cacheKey); it.setString(4, optionsJson) + it.setString(5, JobStatus.DONE.name); it.setLong(6, 0L); it.setString(7, cached.title); it.setString(8, null) + it.setString(9, cached.artifactKey); setInstant(it, 10, cached.artifactExpiresAt); it.executeUpdate() } } } - fun markRunning(id: String) { - Database.withConnection { connection -> - connection.prepareStatement( - "UPDATE jobs SET status = ?, started_at = NOW() WHERE id = ?" - ).use { statement -> - statement.setString(1, JobStatus.RUNNING.name) - statement.setString(2, id) - statement.executeUpdate() + fun findReusableByCacheKey(cacheKey: String): JobRow? = selectOne( + "SELECT $columns FROM jobs WHERE cache_key = ? AND status = ? AND artifact_key IS NOT NULL AND artifact_expires_at > NOW() ORDER BY finished_at DESC NULLS LAST LIMIT 1", + ) { it.setString(1, cacheKey); it.setString(2, JobStatus.DONE.name) } + + fun getById(id: String): JobRow? = selectOne("SELECT $columns FROM jobs WHERE id = ?") { it.setString(1, id) } + + fun listQueuedOrRunning(): List = selectMany( + "SELECT $columns FROM jobs WHERE status = ? OR status = ? ORDER BY created_at ASC", + ) { it.setString(1, JobStatus.QUEUED.name); it.setString(2, JobStatus.RUNNING.name) } + + fun markRunningIfQueued(id: String): Boolean = update( + "UPDATE jobs SET status = ?, started_at = NOW(), error = NULL WHERE id = ? AND status = ?", + ) { it.setString(1, JobStatus.RUNNING.name); it.setString(2, id); it.setString(3, JobStatus.QUEUED.name) } > 0 + + fun markCancelled(id: String): Boolean = update( + "UPDATE jobs SET status = ?, error = ?, finished_at = NOW() WHERE id = ? AND status IN (?, ?)", + ) { it.setString(1, JobStatus.FAILED.name); it.setString(2, "job cancelled"); it.setString(3, id); it.setString(4, JobStatus.QUEUED.name); it.setString(5, JobStatus.RUNNING.name) } > 0 + + fun deleteIfNotRunning(id: String): JobRow? = selectOne( + "DELETE FROM jobs WHERE id = ? AND status <> ? RETURNING $columns", + ) { it.setString(1, id); it.setString(2, JobStatus.RUNNING.name) } + + fun markFinishedIfRunning(id: String, status: JobStatus, durationMs: Long, title: String, error: String?, artifactKey: String?, artifactExpiresAt: Instant?): Boolean = update( + "UPDATE jobs SET status = ?, duration_ms = ?, title = ?, error = ?, artifact_key = ?, artifact_expires_at = ?, finished_at = NOW() WHERE id = ? AND status = ?", + ) { + it.setString(1, status.name); it.setLong(2, durationMs); it.setString(3, title); it.setString(4, error) + it.setString(5, artifactKey); setInstant(it, 6, artifactExpiresAt); it.setString(7, id); it.setString(8, JobStatus.RUNNING.name) + } > 0 + + fun resetRunningToQueued(): Int = update( + "UPDATE jobs SET status = ?, error = NULL WHERE status = ?", + ) { it.setString(1, JobStatus.QUEUED.name); it.setString(2, JobStatus.RUNNING.name) } + + private fun update(sql: String, bind: (java.sql.PreparedStatement) -> Unit): Int = Database.withConnection { connection -> + connection.prepareStatement(sql).use { bind(it); it.executeUpdate() } + } + + private fun selectOne(sql: String, bind: (java.sql.PreparedStatement) -> Unit): JobRow? = Database.withConnection { connection -> + connection.prepareStatement(sql).use { bind(it); it.executeQuery().use { rs -> if (rs.next()) rowFrom(rs) else null } } + } + + private fun selectMany(sql: String, bind: (java.sql.PreparedStatement) -> Unit): List = Database.withConnection { connection -> + connection.prepareStatement(sql).use { + bind(it) + it.executeQuery().use { rs -> + val rows = mutableListOf() + while (rs.next()) rows += rowFrom(rs) + rows } } } - fun markFinished(id: String, status: JobStatus, durationMs: Long, title: String, error: String?) { - Database.withConnection { connection -> - connection.prepareStatement( - "UPDATE jobs SET status = ?, duration_ms = ?, title = ?, error = ?, finished_at = NOW() WHERE id = ?" - ).use { statement -> - statement.setString(1, status.name) - statement.setLong(2, durationMs) - statement.setString(3, title) - statement.setString(4, error) - statement.setString(5, id) - statement.executeUpdate() - } + private fun setInstant(statement: java.sql.PreparedStatement, index: Int, value: Instant?) { + if (value == null) { + statement.setNull(index, Types.TIMESTAMP_WITH_TIMEZONE) + return } + statement.setTimestamp(index, Timestamp.from(value)) } } diff --git a/src/main/kotlin/dev/typetype/downloader/models/CreateJobRequest.kt b/src/main/kotlin/dev/typetype/downloader/models/CreateJobRequest.kt index e672e91..b17d2e0 100644 --- a/src/main/kotlin/dev/typetype/downloader/models/CreateJobRequest.kt +++ b/src/main/kotlin/dev/typetype/downloader/models/CreateJobRequest.kt @@ -3,4 +3,7 @@ package dev.typetype.downloader.models import kotlinx.serialization.Serializable @Serializable -data class CreateJobRequest(val url: String) +data class CreateJobRequest( + val url: String, + val options: JobOptions = JobOptions(), +) diff --git a/src/main/kotlin/dev/typetype/downloader/models/CreateJobResponse.kt b/src/main/kotlin/dev/typetype/downloader/models/CreateJobResponse.kt index c5a0586..22c382f 100644 --- a/src/main/kotlin/dev/typetype/downloader/models/CreateJobResponse.kt +++ b/src/main/kotlin/dev/typetype/downloader/models/CreateJobResponse.kt @@ -3,4 +3,7 @@ package dev.typetype.downloader.models import kotlinx.serialization.Serializable @Serializable -data class CreateJobResponse(val id: String) +data class CreateJobResponse( + val id: String, + val cached: Boolean, +) diff --git a/src/main/kotlin/dev/typetype/downloader/models/DownloadMode.kt b/src/main/kotlin/dev/typetype/downloader/models/DownloadMode.kt new file mode 100644 index 0000000..ba2fa36 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/models/DownloadMode.kt @@ -0,0 +1,13 @@ +package dev.typetype.downloader.models + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +@Serializable +enum class DownloadMode { + @SerialName("video") + VIDEO, + + @SerialName("audio") + AUDIO, +} diff --git a/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt b/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt new file mode 100644 index 0000000..d84285c --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt @@ -0,0 +1,14 @@ +package dev.typetype.downloader.models + +import kotlinx.serialization.Serializable + +@Serializable +data class JobOptions( + val mode: DownloadMode = DownloadMode.VIDEO, + val quality: String = "best", + val format: String = "", + val sponsorBlock: Boolean = false, + val sponsorBlockCategories: List = emptyList(), + val thumbnailOnly: Boolean = false, + val subtitles: SubtitlesOptions = SubtitlesOptions(), +) diff --git a/src/main/kotlin/dev/typetype/downloader/models/JobResponse.kt b/src/main/kotlin/dev/typetype/downloader/models/JobResponse.kt index 72672ca..574179e 100644 --- a/src/main/kotlin/dev/typetype/downloader/models/JobResponse.kt +++ b/src/main/kotlin/dev/typetype/downloader/models/JobResponse.kt @@ -10,4 +10,6 @@ data class JobResponse( val durationMs: Long, val title: String, val error: String? = null, + val artifactUrl: String? = null, + val artifactExpiresAt: String? = null, ) diff --git a/src/main/kotlin/dev/typetype/downloader/models/SubtitlesOptions.kt b/src/main/kotlin/dev/typetype/downloader/models/SubtitlesOptions.kt new file mode 100644 index 0000000..ee67347 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/models/SubtitlesOptions.kt @@ -0,0 +1,12 @@ +package dev.typetype.downloader.models + +import kotlinx.serialization.Serializable + +@Serializable +data class SubtitlesOptions( + val enabled: Boolean = false, + val auto: Boolean = false, + val embed: Boolean = false, + val languages: List = listOf("en"), + val format: String = "srt", +) diff --git a/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt b/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt index 51b85ed..80331c0 100644 --- a/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt +++ b/src/main/kotlin/dev/typetype/downloader/routes/JobRoutes.kt @@ -1,10 +1,15 @@ package dev.typetype.downloader.routes import dev.typetype.downloader.models.CreateJobRequest +import dev.typetype.downloader.services.CancelJobResult +import dev.typetype.downloader.services.DeleteJobResult import dev.typetype.downloader.services.JobService import io.ktor.http.HttpStatusCode import io.ktor.server.request.receive +import io.ktor.server.response.respondText import io.ktor.server.response.respond +import io.ktor.server.response.respondRedirect +import io.ktor.server.routing.delete import io.ktor.server.routing.Route import io.ktor.server.routing.get import io.ktor.server.routing.post @@ -15,7 +20,7 @@ fun Route.jobRoutes(jobService: JobService) { if (body.url.isBlank()) { return@post call.respond(HttpStatusCode.BadRequest, mapOf("error" to "url is required")) } - val created = jobService.enqueue(body.url) + val created = jobService.enqueue(body.url, body.options) call.respond(HttpStatusCode.Created, created) } @@ -30,4 +35,38 @@ fun Route.jobRoutes(jobService: JobService) { ) call.respond(HttpStatusCode.OK, job) } + + get("/jobs/{id}/artifact") { + val id = call.parameters["id"] ?: return@get call.respond( + HttpStatusCode.BadRequest, + mapOf("error" to "id is required"), + ) + val job = jobService.get(id) ?: return@get call.respond( + HttpStatusCode.NotFound, + mapOf("error" to "job not found"), + ) + val artifactUrl = job.artifactUrl ?: return@get call.respond( + HttpStatusCode.Conflict, + mapOf("error" to "artifact not ready"), + ) + call.respondRedirect(artifactUrl, permanent = false) + } + + post("/jobs/{id}/cancel") { + val id = call.parameters["id"] ?: return@post call.respond(HttpStatusCode.BadRequest, mapOf("error" to "id is required")) + when (jobService.cancel(id)) { + CancelJobResult.CANCELLED -> call.respond(HttpStatusCode.Accepted, mapOf("status" to "cancelled")) + CancelJobResult.NOT_CANCELLABLE -> call.respond(HttpStatusCode.Conflict, mapOf("error" to "job is not cancellable")) + CancelJobResult.NOT_FOUND -> call.respond(HttpStatusCode.NotFound, mapOf("error" to "job not found")) + } + } + + delete("/jobs/{id}") { + val id = call.parameters["id"] ?: return@delete call.respond(HttpStatusCode.BadRequest, mapOf("error" to "id is required")) + when (jobService.delete(id)) { + DeleteJobResult.DELETED -> call.respondText("", status = HttpStatusCode.NoContent) + DeleteJobResult.CONFLICT_RUNNING -> call.respond(HttpStatusCode.Conflict, mapOf("error" to "job is running")) + DeleteJobResult.NOT_FOUND -> call.respond(HttpStatusCode.NotFound, mapOf("error" to "job not found")) + } + } } diff --git a/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt b/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt new file mode 100644 index 0000000..474a02a --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/GarageStorageService.kt @@ -0,0 +1,93 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.config.AppConfig +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider +import software.amazon.awssdk.core.sync.RequestBody +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.S3Configuration +import software.amazon.awssdk.services.s3.model.CreateBucketRequest +import software.amazon.awssdk.services.s3.model.GetObjectRequest +import software.amazon.awssdk.services.s3.model.HeadBucketRequest +import software.amazon.awssdk.services.s3.model.PutObjectRequest +import software.amazon.awssdk.services.s3.model.S3Exception +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest +import software.amazon.awssdk.services.s3.presigner.S3Presigner +import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest +import java.net.URI +import java.nio.file.Path +import java.time.Duration + +class GarageStorageService(config: AppConfig) { + private val endpoint = URI(config.s3Endpoint) + private val region = Region.of(config.s3Region) + private val bucket = config.s3Bucket + private val credentials = StaticCredentialsProvider.create( + AwsBasicCredentials.create(config.s3AccessKey, config.s3SecretKey) + ) + + private val s3: S3Client = S3Client.builder() + .endpointOverride(endpoint) + .region(region) + .credentialsProvider(credentials) + .serviceConfiguration( + S3Configuration.builder() + .pathStyleAccessEnabled(true) + .chunkedEncodingEnabled(false) + .checksumValidationEnabled(false) + .build(), + ) + .build() + + private val presigner: S3Presigner = S3Presigner.builder() + .endpointOverride(endpoint) + .region(region) + .credentialsProvider(credentials) + .serviceConfiguration( + S3Configuration.builder() + .pathStyleAccessEnabled(true) + .chunkedEncodingEnabled(false) + .checksumValidationEnabled(false) + .build(), + ) + .build() + + fun ensureBucket() { + try { + s3.headBucket(HeadBucketRequest.builder().bucket(bucket).build()) + } catch (error: S3Exception) { + if (error.statusCode() == 404) { + s3.createBucket(CreateBucketRequest.builder().bucket(bucket).build()) + } else { + throw error + } + } + } + + fun putBytes(objectKey: String, body: ByteArray, contentType: String) { + val request = PutObjectRequest.builder().bucket(bucket).key(objectKey).contentType(contentType).build() + s3.putObject(request, RequestBody.fromBytes(body)) + } + + fun putFile(objectKey: String, filePath: Path, contentType: String) { + val request = PutObjectRequest.builder().bucket(bucket).key(objectKey).contentType(contentType).build() + s3.putObject(request, RequestBody.fromFile(filePath)) + } + + fun presignGet(objectKey: String, duration: Duration): String { + val getRequest = GetObjectRequest.builder().bucket(bucket).key(objectKey).build() + val presignRequest = GetObjectPresignRequest.builder().signatureDuration(duration).getObjectRequest(getRequest).build() + return presigner.presignGetObject(presignRequest).url().toString() + } + + fun deleteObject(objectKey: String) { + val request = DeleteObjectRequest.builder().bucket(bucket).key(objectKey).build() + s3.deleteObject(request) + } + + fun close() { + presigner.close() + s3.close() + } +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobCacheKey.kt b/src/main/kotlin/dev/typetype/downloader/services/JobCacheKey.kt new file mode 100644 index 0000000..14c9277 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/JobCacheKey.kt @@ -0,0 +1,12 @@ +package dev.typetype.downloader.services + +import java.security.MessageDigest + +object JobCacheKey { + fun from(url: String, optionsJson: String): String { + val digest = MessageDigest.getInstance("SHA-256") + val payload = "${url.trim()}|$optionsJson" + val bytes = digest.digest(payload.toByteArray()) + return bytes.joinToString("") { "%02x".format(it) } + } +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobOptionsCodec.kt b/src/main/kotlin/dev/typetype/downloader/services/JobOptionsCodec.kt new file mode 100644 index 0000000..fef0a5a --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/JobOptionsCodec.kt @@ -0,0 +1,25 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.models.JobOptions +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json + +object JobOptionsCodec { + private val json = Json { ignoreUnknownKeys = true; encodeDefaults = true } + + fun encode(options: JobOptions): String = json.encodeToString(JobOptions.serializer(), options) + + fun decode(raw: String): JobOptions = json.decodeFromString(JobOptions.serializer(), raw) + + fun encodeQueue(payload: QueuePayload): String = json.encodeToString(QueuePayload.serializer(), payload) + + fun decodeQueue(raw: String): QueuePayload? = runCatching { + json.decodeFromString(QueuePayload.serializer(), raw) + }.getOrNull() + + @Serializable + data class QueuePayload( + val id: String, + val options: JobOptions, + ) +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt b/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt new file mode 100644 index 0000000..d4e3c21 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/JobOptionsNormalizer.kt @@ -0,0 +1,72 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.models.DownloadMode +import dev.typetype.downloader.models.JobOptions +import dev.typetype.downloader.models.SubtitlesOptions + +object JobOptionsNormalizer { + private val allowedVideoQualities = setOf("best", "1080p", "720p", "480p", "worst") + private val allowedAudioQualities = setOf("best", "worst") + private val allowedVideoFormats = setOf("mp4", "webm", "mkv", "mov") + private val allowedAudioFormats = setOf("mp3", "m4a", "aac", "opus", "flac", "wav") + + private val defaultSponsorBlockCategories = listOf( + "sponsor", + "selfpromo", + "interaction", + "intro", + "outro", + "preview", + "filler", + "music_offtopic", + ) + + private val allowedSponsorBlockCategories = defaultSponsorBlockCategories.toSet() + + fun normalize(options: JobOptions): JobOptions { + val quality = normalizeQuality(options) + val format = normalizeFormat(options) + val subtitles = normalizeSubtitles(options.subtitles) + val sponsorBlockCategories = normalizeSponsorBlockCategories(options) + return options.copy( + quality = quality, + format = format, + subtitles = subtitles, + sponsorBlockCategories = sponsorBlockCategories, + ) + } + + private fun normalizeQuality(options: JobOptions): String { + if (options.thumbnailOnly) return "best" + val raw = options.quality.trim().lowercase().ifBlank { "best" } + val allowed = if (options.mode == DownloadMode.AUDIO) allowedAudioQualities else allowedVideoQualities + return if (raw in allowed) raw else "best" + } + + private fun normalizeFormat(options: JobOptions): String { + if (options.thumbnailOnly) return "" + val raw = options.format.trim().lowercase() + if (options.mode == DownloadMode.AUDIO) { + if (raw.isBlank()) return "mp3" + return if (raw in allowedAudioFormats) raw else "mp3" + } + if (raw.isBlank()) return "mp4" + return if (raw in allowedVideoFormats) raw else "mp4" + } + + private fun normalizeSubtitles(input: SubtitlesOptions): SubtitlesOptions { + if (!input.enabled) return SubtitlesOptions() + val langs = input.languages.map { it.trim() }.filter { it.isNotBlank() }.ifEmpty { listOf("en") } + val format = input.format.trim().ifBlank { "srt" }.lowercase() + return input.copy(languages = langs, format = format) + } + + private fun normalizeSponsorBlockCategories(options: JobOptions): List { + if (!options.sponsorBlock) return emptyList() + val custom = options.sponsorBlockCategories + .map { it.trim().lowercase() } + .filter { it in allowedSponsorBlockCategories } + .distinct() + return if (custom.isEmpty()) defaultSponsorBlockCategories else custom + } +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt index dc842f8..99bb46a 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobService.kt @@ -4,28 +4,85 @@ import dev.typetype.downloader.config.AppConfig import dev.typetype.downloader.db.JobRow import dev.typetype.downloader.db.JobsRepository import dev.typetype.downloader.models.CreateJobResponse +import dev.typetype.downloader.models.JobOptions import dev.typetype.downloader.models.JobResponse +import dev.typetype.downloader.models.JobStatus import redis.clients.jedis.JedisPooled import java.net.URI +import java.time.Duration +import java.time.Instant import java.util.UUID +enum class CancelJobResult { NOT_FOUND, NOT_CANCELLABLE, CANCELLED } + +enum class DeleteJobResult { NOT_FOUND, CONFLICT_RUNNING, DELETED } + class JobService( private val jobsRepository: JobsRepository, private val redis: JedisPooled, + private val storageService: GarageStorageService, private val config: AppConfig, ) { - fun enqueue(url: String): CreateJobResponse { - validateUrl(url) + fun enqueue(url: String, requestedOptions: JobOptions): CreateJobResponse { + val resolvedUrl = SourceUrlResolver.resolve(url) + validateUrl(resolvedUrl) + val options = JobOptionsNormalizer.normalize(requestedOptions) + val optionsJson = JobOptionsCodec.encode(options) + val cacheKey = JobCacheKey.from(resolvedUrl, optionsJson) val id = UUID.randomUUID().toString() - jobsRepository.insertQueued(id = id, url = url) - redis.rpush(config.redisQueueKey, id) - redis.setex(redisJobKey(id), 600, "queued") - return CreateJobResponse(id = id) + val reusable = jobsRepository.findReusableByCacheKey(cacheKey) + if (reusable != null) { + jobsRepository.insertDoneFromCache(id = id, url = resolvedUrl, optionsJson = optionsJson, cached = reusable) + redis.setex(redisJobKey(id), config.jobTtlSeconds, "done:cached") + return CreateJobResponse(id = id, cached = true) + } + val queueSize = redis.llen(config.redisQueueKey) + if (queueSize >= config.maxQueueSize) { + throw QueueSaturatedException("Queue is full") + } + jobsRepository.insertQueued(id = id, url = resolvedUrl, cacheKey = cacheKey, optionsJson = optionsJson) + val payload = JobOptionsCodec.encodeQueue(JobOptionsCodec.QueuePayload(id = id, options = options)) + redis.rpush(config.redisQueueKey, payload) + redis.setex(redisJobKey(id), config.jobTtlSeconds, "queued") + return CreateJobResponse(id = id, cached = false) } fun get(id: String): JobResponse? { val row = jobsRepository.getById(id) ?: return null - return row.toResponse() + return row.toResponse(presignUrl(row), row.artifactExpiresAt?.toString()) + } + + fun cancel(id: String): CancelJobResult { + val row = jobsRepository.getById(id) ?: return CancelJobResult.NOT_FOUND + if (row.status == JobStatus.DONE || row.status == JobStatus.FAILED) { + return CancelJobResult.NOT_CANCELLABLE + } + return if (jobsRepository.markCancelled(id)) { + redis.setex(redisJobKey(id), config.jobTtlSeconds, "failed:0") + CancelJobResult.CANCELLED + } else { + CancelJobResult.NOT_CANCELLABLE + } + } + + fun delete(id: String): DeleteJobResult { + val existing = jobsRepository.getById(id) ?: return DeleteJobResult.NOT_FOUND + if (existing.status == JobStatus.RUNNING) return DeleteJobResult.CONFLICT_RUNNING + val deleted = jobsRepository.deleteIfNotRunning(id) ?: return DeleteJobResult.NOT_FOUND + deleted.artifactKey?.let { storageService.deleteObject(it) } + redis.del(redisJobKey(id)) + return DeleteJobResult.DELETED + } + + fun recoverPendingJobs() { + jobsRepository.resetRunningToQueued() + redis.del(config.redisQueueKey) + jobsRepository.listQueuedOrRunning().forEach { row -> + val options = runCatching { JobOptionsCodec.decode(row.optionsJson) }.map(JobOptionsNormalizer::normalize).getOrElse { JobOptions() } + val payload = JobOptionsCodec.encodeQueue(JobOptionsCodec.QueuePayload(id = row.id, options = options)) + redis.rpush(config.redisQueueKey, payload) + redis.setex(redisJobKey(row.id), config.jobTtlSeconds, "queued") + } } private fun validateUrl(url: String) { @@ -41,12 +98,23 @@ class JobService( private fun redisJobKey(id: String): String = "downloader:job:$id" - private fun JobRow.toResponse(): JobResponse = JobResponse( + private fun presignUrl(row: JobRow): String? { + val key = row.artifactKey ?: return null + val expiresAt = row.artifactExpiresAt ?: return null + val now = Instant.now() + if (expiresAt <= now) return null + val seconds = Duration.between(now, expiresAt).seconds.coerceIn(1, 900) + return storageService.presignGet(key, Duration.ofSeconds(seconds)) + } + + private fun JobRow.toResponse(artifactUrl: String?, artifactExpiresAt: String?): JobResponse = JobResponse( id = id, url = url, status = status, durationMs = durationMs, title = title, error = error, + artifactUrl = artifactUrl, + artifactExpiresAt = artifactExpiresAt, ) } diff --git a/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt b/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt index fa65337..ed54340 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/JobWorker.kt @@ -2,42 +2,108 @@ package dev.typetype.downloader.services import dev.typetype.downloader.config.AppConfig import dev.typetype.downloader.db.JobsRepository +import dev.typetype.downloader.models.JobOptions import dev.typetype.downloader.models.JobStatus import redis.clients.jedis.JedisPooled +import java.nio.file.Files +import java.time.Instant import kotlin.concurrent.thread class JobWorker( private val jobsRepository: JobsRepository, private val redis: JedisPooled, private val ytDlpService: YtDlpService, + private val tokenServiceClient: TokenServiceClient, + private val storageService: GarageStorageService, private val config: AppConfig, ) { fun start() { - thread(name = "job-worker", isDaemon = true) { - while (true) { - val item = redis.blpop(0, config.redisQueueKey) ?: continue - val id = item.getOrNull(1) ?: continue - process(id) + repeat(config.maxConcurrentWorkers) { index -> + thread(name = "job-worker-$index", isDaemon = true) { + while (true) { + val item = redis.blpop(0, config.redisQueueKey) ?: continue + val raw = item.getOrNull(1) ?: continue + val payload = JobOptionsCodec.decodeQueue(raw) + val id = payload?.id ?: raw + val options = payload?.options?.let(JobOptionsNormalizer::normalize) ?: decodeStoredOptions(id) + process(id, options) + } } } } - private fun process(id: String) { + private fun process(id: String, options: JobOptions) { val job = jobsRepository.getById(id) ?: return - jobsRepository.markRunning(id) - redis.setex(redisJobKey(id), 600, "running") - val startedAt = System.nanoTime() - val result = ytDlpService.extractTitle(job.url) - val durationMs = (System.nanoTime() - startedAt) / 1_000_000 - val status = if (result.error == null) JobStatus.DONE else JobStatus.FAILED - jobsRepository.markFinished( - id = id, - status = status, - durationMs = durationMs, - title = result.title, - error = result.error, - ) - redis.setex(redisJobKey(id), 600, "${status.name.lowercase()}:$durationMs") + if (!jobsRepository.markRunningIfQueued(id)) return + redis.setex(redisJobKey(id), config.jobTtlSeconds, "running") + try { + val startedAt = System.nanoTime() + val token = tokenServiceClient.fetchForUrl(job.url) + val result = ytDlpService.download(job.url, token, options) { + jobsRepository.getById(id)?.status != JobStatus.RUNNING + } + val durationMs = (System.nanoTime() - startedAt) / 1_000_000 + val status = if (result.error == null) JobStatus.DONE else JobStatus.FAILED + val artifact = if (status == JobStatus.DONE && result.filePath != null) { + uploadArtifact(job.cacheKey, result.filePath) + } else { + null + } + val updated = jobsRepository.markFinishedIfRunning( + id = id, + status = status, + durationMs = durationMs, + title = result.title, + error = result.error, + artifactKey = artifact?.objectKey, + artifactExpiresAt = artifact?.expiresAt, + ) + if (!updated && artifact != null) { + storageService.deleteObject(artifact.objectKey) + } + if (updated) { + redis.setex(redisJobKey(id), config.jobTtlSeconds, "${status.name.lowercase()}:$durationMs") + } + result.filePath?.parent?.let { deleteDirectory(it) } + } catch (error: Throwable) { + jobsRepository.markFinishedIfRunning( + id = id, + status = JobStatus.FAILED, + durationMs = 0, + title = "", + error = error.message ?: "worker failed", + artifactKey = null, + artifactExpiresAt = null, + ) + redis.setex(redisJobKey(id), config.jobTtlSeconds, "failed:0") + } + } + + private fun decodeStoredOptions(id: String): JobOptions { + val row = jobsRepository.getById(id) ?: return JobOptions() + return runCatching { JobOptionsCodec.decode(row.optionsJson) }.map(JobOptionsNormalizer::normalize).getOrElse { JobOptions() } + } + + private fun uploadArtifact(cacheKey: String, filePath: java.nio.file.Path): StorageArtifact { + val expiresAt = Instant.now().plusSeconds(config.s3ArtifactTtlSeconds) + val extension = filePath.fileName.toString().substringAfterLast('.', "bin") + val objectKey = "cache/$cacheKey.$extension" + storageService.putFile(objectKey, filePath, contentType(extension)) + return StorageArtifact(objectKey = objectKey, expiresAt = expiresAt) + } + + private fun contentType(extension: String): String = when (extension.lowercase()) { + "mp4" -> "video/mp4" + "webm" -> "video/webm" + "mkv" -> "video/x-matroska" + "m4a" -> "audio/mp4" + "mp3" -> "audio/mpeg" + else -> "application/octet-stream" + } + + private fun deleteDirectory(dir: java.nio.file.Path) { + if (!Files.exists(dir)) return + Files.walk(dir).sorted(Comparator.reverseOrder()).forEach { Files.deleteIfExists(it) } } private fun redisJobKey(id: String): String = "downloader:job:$id" diff --git a/src/main/kotlin/dev/typetype/downloader/services/QueueSaturatedException.kt b/src/main/kotlin/dev/typetype/downloader/services/QueueSaturatedException.kt new file mode 100644 index 0000000..2bd0d06 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/QueueSaturatedException.kt @@ -0,0 +1,3 @@ +package dev.typetype.downloader.services + +class QueueSaturatedException(message: String) : RuntimeException(message) diff --git a/src/main/kotlin/dev/typetype/downloader/services/SourceUrlResolver.kt b/src/main/kotlin/dev/typetype/downloader/services/SourceUrlResolver.kt new file mode 100644 index 0000000..052792d --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/SourceUrlResolver.kt @@ -0,0 +1,63 @@ +package dev.typetype.downloader.services + +import java.net.URI +import java.net.URLDecoder +import java.nio.charset.StandardCharsets + +object SourceUrlResolver { + private val PRIORITY_KEYS = listOf("url", "v", "u", "target", "video", "href", "q") + private val YOUTUBE_ID_REGEX = Regex("^[A-Za-z0-9_-]{11}$") + + fun resolve(rawUrl: String): String { + var current = rawUrl.trim() + repeat(5) { + val next = unwrapOnce(current) ?: return current + if (next == current) return current + current = next + } + return current + } + + private fun unwrapOnce(url: String): String? { + val uri = runCatching { URI(url) }.getOrNull() ?: return null + val query = uri.rawQuery ?: return null + val params = parseQuery(query) + val ordered = PRIORITY_KEYS.flatMap { key -> params.filter { it.key.equals(key, ignoreCase = true) } } + + params.filter { p -> PRIORITY_KEYS.none { it.equals(p.key, ignoreCase = true) } } + for (param in ordered) { + val candidate = candidateFrom(param.key, decodeRepeated(param.value), uri.host.orEmpty()) ?: continue + if (candidate != url) return candidate + } + return null + } + + private fun candidateFrom(key: String, value: String, host: String): String? { + val normalized = value.trim() + if (normalized.isBlank()) return null + if (normalized.startsWith("http://") || normalized.startsWith("https://")) return normalized + if (normalized.startsWith("www.")) return "https://$normalized" + val isWrappedYoutubeId = key.equals("v", ignoreCase = true) && YOUTUBE_ID_REGEX.matches(normalized) + val fromYoutubeHost = host.contains("youtube.com") || host == "youtu.be" + if (isWrappedYoutubeId && !fromYoutubeHost) return "https://www.youtube.com/watch?v=$normalized" + return null + } + + private fun decodeRepeated(raw: String): String { + var current = raw + repeat(3) { + val decoded = runCatching { URLDecoder.decode(current, StandardCharsets.UTF_8) }.getOrElse { return current } + if (decoded == current) return current + current = decoded + } + return current + } + + private fun parseQuery(query: String): List = query.split('&').filter { it.isNotBlank() }.map { + val parts = it.split('=', limit = 2) + val key = parts[0] + val value = if (parts.size == 2) parts[1] else "" + QueryParam(key = key, value = value) + } + + private data class QueryParam(val key: String, val value: String) +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/StorageArtifact.kt b/src/main/kotlin/dev/typetype/downloader/services/StorageArtifact.kt new file mode 100644 index 0000000..c0fe649 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/StorageArtifact.kt @@ -0,0 +1,8 @@ +package dev.typetype.downloader.services + +import java.time.Instant + +data class StorageArtifact( + val objectKey: String, + val expiresAt: Instant, +) diff --git a/src/main/kotlin/dev/typetype/downloader/services/TokenPayload.kt b/src/main/kotlin/dev/typetype/downloader/services/TokenPayload.kt new file mode 100644 index 0000000..b03daeb --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/TokenPayload.kt @@ -0,0 +1,6 @@ +package dev.typetype.downloader.services + +data class TokenPayload( + val visitorData: String, + val streamingPot: String, +) diff --git a/src/main/kotlin/dev/typetype/downloader/services/TokenServiceClient.kt b/src/main/kotlin/dev/typetype/downloader/services/TokenServiceClient.kt new file mode 100644 index 0000000..e757e67 --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/TokenServiceClient.kt @@ -0,0 +1,48 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.config.AppConfig +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import java.net.URI +import java.net.URLEncoder +import java.net.http.HttpClient +import java.net.http.HttpRequest +import java.net.http.HttpResponse +import java.nio.charset.StandardCharsets +import java.time.Duration + +class TokenServiceClient(private val config: AppConfig) { + private val client = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(3)).build() + private val json = Json { ignoreUnknownKeys = true } + + fun fetchForUrl(url: String): TokenPayload? { + val videoId = extractYoutubeVideoId(SourceUrlResolver.resolve(url)) ?: return null + val encoded = URLEncoder.encode(videoId, StandardCharsets.UTF_8) + val uri = URI("${config.tokenServiceUrl}/potoken?videoId=$encoded") + val request = HttpRequest.newBuilder(uri).timeout(Duration.ofSeconds(10)).GET().build() + val response = runCatching { client.send(request, HttpResponse.BodyHandlers.ofString()) }.getOrNull() ?: return null + if (response.statusCode() != 200) return null + val body = runCatching { json.decodeFromString(TokenResponse.serializer(), response.body()) }.getOrNull() ?: return null + if (body.visitorData.isBlank() || body.streamingPot.isBlank()) return null + return TokenPayload(visitorData = body.visitorData, streamingPot = body.streamingPot) + } + + private fun extractYoutubeVideoId(url: String): String? { + val uri = runCatching { URI(url) }.getOrNull() ?: return null + val host = uri.host?.lowercase() ?: return null + val query = uri.query.orEmpty() + if (host.endsWith("youtube.com")) { + return query.split("&").mapNotNull { + val parts = it.split("=", limit = 2) + if (parts.size == 2 && parts[0] == "v") parts[1] else null + }.firstOrNull { it.isNotBlank() } + } + if (host == "youtu.be") { + return uri.path.trim('/').ifBlank { null } + } + return null + } + + @Serializable + private data class TokenResponse(val visitorData: String = "", val streamingPot: String = "") +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/YtDlpOptionResolver.kt b/src/main/kotlin/dev/typetype/downloader/services/YtDlpOptionResolver.kt new file mode 100644 index 0000000..604120f --- /dev/null +++ b/src/main/kotlin/dev/typetype/downloader/services/YtDlpOptionResolver.kt @@ -0,0 +1,41 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.models.DownloadMode +import dev.typetype.downloader.models.JobOptions + +object YtDlpOptionResolver { + private val audioFormats = setOf("mp3", "m4a", "aac", "opus", "flac", "wav") + private val videoFormats = setOf("mp4", "webm", "mkv", "mov") + + fun audioSelector(quality: String): String = if (quality.lowercase() == "worst") "worstaudio/worst" else "bestaudio/best" + + fun videoSelector(quality: String): String = when (quality.lowercase()) { + "1080p" -> "bv*[height<=1080]+ba/b[height<=1080]" + "720p" -> "bv*[height<=720]+ba/b[height<=720]" + "480p" -> "bv*[height<=480]+ba/b[height<=480]" + "worst" -> "worst" + else -> "bv*+ba/b" + } + + fun audioFormat(raw: String): String { + val value = raw.trim().lowercase() + return if (value in audioFormats) value else "mp3" + } + + fun videoFormat(raw: String): String { + val value = raw.trim().lowercase() + return if (value in videoFormats) value else "mp4" + } + + fun preferredExtensions(options: JobOptions): List = when { + options.thumbnailOnly -> listOf("jpg", "jpeg", "png", "webp") + options.mode == DownloadMode.AUDIO -> { + val requested = audioFormat(options.format) + listOf(requested, "mp3", "m4a", "opus", "aac", "flac", "wav", "webm").distinct() + } + else -> { + val requested = videoFormat(options.format) + listOf(requested, "mp4", "mkv", "webm", "mov").distinct() + } + } +} diff --git a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt index b183448..047ccaa 100644 --- a/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt +++ b/src/main/kotlin/dev/typetype/downloader/services/YtDlpService.kt @@ -1,32 +1,116 @@ package dev.typetype.downloader.services import dev.typetype.downloader.config.AppConfig +import dev.typetype.downloader.models.DownloadMode +import dev.typetype.downloader.models.JobOptions +import java.nio.file.Files +import java.nio.file.Path import java.util.concurrent.TimeUnit -data class YtDlpResult(val title: String, val error: String?) +data class YtDlpResult( + val title: String, + val filePath: Path?, + val error: String?, +) class YtDlpService(private val config: AppConfig) { - fun extractTitle(url: String): YtDlpResult { - val process = ProcessBuilder( + fun download(url: String, token: TokenPayload?, options: JobOptions, shouldCancel: () -> Boolean = { false }): YtDlpResult { + val workDir = Files.createTempDirectory("typetype-download-") + val process = ProcessBuilder(buildCommand(url, workDir, token, options)) + .directory(workDir.toFile()) + .redirectErrorStream(true) + .start() + val finished = waitFor(process, config.ytdlpTimeoutSeconds, shouldCancel) + if (!finished) { + process.destroyForcibly() + deleteDirectory(workDir) + return YtDlpResult(title = "", filePath = null, error = "yt-dlp timeout") + } + if (shouldCancel()) { + process.destroyForcibly() + deleteDirectory(workDir) + return YtDlpResult(title = "", filePath = null, error = "job cancelled") + } + val output = process.inputStream.bufferedReader().readLines() + if (process.exitValue() != 0) { + deleteDirectory(workDir) + val error = output.lastOrNull { it.isNotBlank() } ?: "yt-dlp failed" + return YtDlpResult(title = "", filePath = null, error = error) + } + val filePath = selectOutputFile(workDir, options) + if (filePath == null) { + deleteDirectory(workDir) + return YtDlpResult(title = "", filePath = null, error = "yt-dlp output file missing") + } + val title = output.firstOrNull { isTitleLine(it) } ?: filePath.fileName.toString() + return YtDlpResult(title = title, filePath = filePath, error = null) + } + + private fun buildCommand(url: String, workDir: Path, token: TokenPayload?, options: JobOptions): List { + val command = mutableListOf( config.ytdlpBin, - "--skip-download", + "--no-simulate", + "--no-warnings", + "--no-playlist", + "--no-progress", "--print", "title", - "--no-warnings", - url, - ).start() - val finished = process.waitFor(config.ytdlpTimeoutSeconds, TimeUnit.SECONDS) - if (!finished) { - process.destroyForcibly() - return YtDlpResult(title = "", error = "yt-dlp timeout") + "-o", + "${workDir.toAbsolutePath()}/%(id)s.%(ext)s", + ) + when { + options.thumbnailOnly -> command.addAll(listOf("--skip-download", "--write-thumbnail")) + options.mode == DownloadMode.AUDIO -> { + val selector = YtDlpOptionResolver.audioSelector(options.quality) + val audioFormat = YtDlpOptionResolver.audioFormat(options.format) + command.addAll(listOf("-f", selector, "--extract-audio", "--audio-format", audioFormat)) + } + else -> { + val selector = YtDlpOptionResolver.videoSelector(options.quality) + val videoFormat = YtDlpOptionResolver.videoFormat(options.format) + command.addAll(listOf("-f", selector, "--merge-output-format", videoFormat)) + } + } + if (options.sponsorBlock && !options.thumbnailOnly) { + command.addAll(listOf("--sponsorblock-remove", options.sponsorBlockCategories.joinToString(","))) } - val stdout = process.inputStream.bufferedReader().readText().trim() - val stderr = process.errorStream.bufferedReader().readText().trim() - return if (process.exitValue() == 0) { - YtDlpResult(title = stdout, error = null) - } else { - val error = if (stderr.isNotBlank()) stderr else "yt-dlp failed" - YtDlpResult(title = "", error = error) + if (options.subtitles.enabled) { + command.add("--write-subs") + if (options.subtitles.auto) command.add("--write-auto-subs") + command.addAll(listOf("--sub-langs", options.subtitles.languages.joinToString(","))) + command.addAll(listOf("--sub-format", options.subtitles.format)) + if (options.subtitles.embed && options.mode == DownloadMode.VIDEO && !options.thumbnailOnly) command.add("--embed-subs") } + if (token != null) { + command.add("--extractor-args") + command.add("youtube:player_client=web;po_token=web.gvs+${token.streamingPot};visitor_data=${token.visitorData}") + } + command.add(url) + return command + } + + private fun selectOutputFile(workDir: Path, options: JobOptions): Path? { + val files = Files.list(workDir).use { stream -> stream.filter { Files.isRegularFile(it) }.toList() } + if (files.isEmpty()) return null + val preferred = YtDlpOptionResolver.preferredExtensions(options) + preferred.forEach { wanted -> + files.firstOrNull { ext(it) == wanted }?.let { return it } + } + return files.maxByOrNull { Files.size(it) } + } + + private fun isTitleLine(value: String): Boolean = value.isNotBlank() && !value.startsWith("[") + private fun waitFor(process: Process, timeoutSeconds: Long, shouldCancel: () -> Boolean): Boolean { + val deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeoutSeconds) + while (System.nanoTime() < deadline) { + if (shouldCancel()) return true + if (process.waitFor(1, TimeUnit.SECONDS)) return true + } + return false + } + private fun ext(path: Path): String = path.fileName.toString().substringAfterLast('.', "").lowercase() + private fun deleteDirectory(dir: Path) { + if (!Files.exists(dir)) return + Files.walk(dir).sorted(Comparator.reverseOrder()).forEach { Files.deleteIfExists(it) } } } diff --git a/src/test/kotlin/dev/typetype/downloader/config/AppConfigLoaderTest.kt b/src/test/kotlin/dev/typetype/downloader/config/AppConfigLoaderTest.kt new file mode 100644 index 0000000..2044b21 --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/config/AppConfigLoaderTest.kt @@ -0,0 +1,27 @@ +package dev.typetype.downloader.config + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class AppConfigLoaderTest { + @Test + fun `loads non-empty default config`() { + val config = AppConfigLoader.load() + assertTrue(config.httpPort > 0) + assertTrue(config.dbUrl.isNotBlank()) + assertTrue(config.redisQueueKey.isNotBlank()) + assertTrue(config.ytdlpBin.isNotBlank()) + assertTrue(config.s3Endpoint.isNotBlank()) + assertTrue(config.maxConcurrentWorkers >= 1) + assertTrue(config.maxQueueSize >= 1) + assertTrue(config.jobTtlSeconds >= 1) + } + + @Test + fun `default region and bucket stay stable`() { + val config = AppConfigLoader.load() + assertEquals("garage", config.s3Region) + assertEquals("typetype-downloads", config.s3Bucket) + } +} diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobCacheKeyTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobCacheKeyTest.kt new file mode 100644 index 0000000..04b44cf --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/services/JobCacheKeyTest.kt @@ -0,0 +1,34 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.models.DownloadMode +import dev.typetype.downloader.models.JobOptions +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotEquals + +class JobCacheKeyTest { + @Test + fun `same URL yields same key`() { + val url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + val options = JobOptionsCodec.encode(JobOptions()) + val first = JobCacheKey.from(url, options) + val second = JobCacheKey.from(url, options) + assertEquals(first, second) + } + + @Test + fun `trimmed URL keeps same key`() { + val options = JobOptionsCodec.encode(JobOptions()) + val clean = JobCacheKey.from("https://www.youtube.com/watch?v=dQw4w9WgXcQ", options) + val padded = JobCacheKey.from(" https://www.youtube.com/watch?v=dQw4w9WgXcQ ", options) + assertEquals(clean, padded) + } + + @Test + fun `different options yield different keys`() { + val url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + val first = JobCacheKey.from(url, JobOptionsCodec.encode(JobOptions(mode = DownloadMode.VIDEO))) + val second = JobCacheKey.from(url, JobOptionsCodec.encode(JobOptions(mode = DownloadMode.AUDIO))) + assertNotEquals(first, second) + } +} diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt new file mode 100644 index 0000000..d4ad818 --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/services/JobOptionsNormalizerTest.kt @@ -0,0 +1,77 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.models.DownloadMode +import dev.typetype.downloader.models.JobOptions +import dev.typetype.downloader.models.SubtitlesOptions +import kotlin.test.Test +import kotlin.test.assertEquals + +class JobOptionsNormalizerTest { + @Test + fun `normalizes subtitle languages and format`() { + val input = JobOptions( + subtitles = SubtitlesOptions( + enabled = true, + languages = listOf(" en ", "", "fr"), + format = " VTT ", + ) + ) + val normalized = JobOptionsNormalizer.normalize(input) + assertEquals(listOf("en", "fr"), normalized.subtitles.languages) + assertEquals("vtt", normalized.subtitles.format) + } + + @Test + fun `fills default sponsorblock categories when enabled and empty`() { + val normalized = JobOptionsNormalizer.normalize(JobOptions(sponsorBlock = true)) + assertEquals( + listOf("sponsor", "selfpromo", "interaction", "intro", "outro", "preview", "filler", "music_offtopic"), + normalized.sponsorBlockCategories, + ) + } + + @Test + fun `filters and normalizes custom sponsorblock categories`() { + val input = JobOptions( + sponsorBlock = true, + sponsorBlockCategories = listOf(" Sponsor ", "intro", "invalid", "INTRO"), + ) + val normalized = JobOptionsNormalizer.normalize(input) + assertEquals(listOf("sponsor", "intro"), normalized.sponsorBlockCategories) + } + + @Test + fun `normalizes video quality and format to allowed values`() { + val normalized = JobOptionsNormalizer.normalize(JobOptions(quality = " 2160P ", format = "avi")) + assertEquals("best", normalized.quality) + assertEquals("mp4", normalized.format) + } + + @Test + fun `normalizes audio defaults and allowed audio format`() { + val normalized = JobOptionsNormalizer.normalize( + JobOptions(mode = DownloadMode.AUDIO, quality = "", format = " M4A "), + ) + assertEquals("best", normalized.quality) + assertEquals("m4a", normalized.format) + } + + @Test + fun `thumbnail only ignores quality and format`() { + val normalized = JobOptionsNormalizer.normalize( + JobOptions(thumbnailOnly = true, quality = "1080p", format = "webm"), + ) + assertEquals("best", normalized.quality) + assertEquals("", normalized.format) + } + + @Test + fun `disabled subtitles reset to stable defaults`() { + val normalized = JobOptionsNormalizer.normalize( + JobOptions( + subtitles = SubtitlesOptions(enabled = false, auto = true, embed = true, languages = listOf("fr"), format = "vtt"), + ) + ) + assertEquals(SubtitlesOptions(), normalized.subtitles) + } +} diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobServiceControlTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobServiceControlTest.kt new file mode 100644 index 0000000..60301f2 --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/services/JobServiceControlTest.kt @@ -0,0 +1,93 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.config.AppConfig +import dev.typetype.downloader.db.JobRow +import dev.typetype.downloader.db.JobsRepository +import dev.typetype.downloader.models.JobStatus +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import java.time.Instant +import kotlin.test.Test +import kotlin.test.assertEquals + +class JobServiceControlTest { + private val jobsRepository = mockk() + private val redis = mockk(relaxed = true) + private val storage = mockk(relaxed = true) + private val service = JobService(jobsRepository, redis, storage, config()) + + @Test + fun `cancel returns not found when job does not exist`() { + every { jobsRepository.getById("missing") } returns null + assertEquals(CancelJobResult.NOT_FOUND, service.cancel("missing")) + } + + @Test + fun `cancel returns conflict for done job`() { + every { jobsRepository.getById("done") } returns row("done", JobStatus.DONE) + assertEquals(CancelJobResult.NOT_CANCELLABLE, service.cancel("done")) + verify(exactly = 0) { jobsRepository.markCancelled(any()) } + } + + @Test + fun `cancel marks queued job as cancelled`() { + every { jobsRepository.getById("queued") } returns row("queued", JobStatus.QUEUED) + every { jobsRepository.markCancelled("queued") } returns true + assertEquals(CancelJobResult.CANCELLED, service.cancel("queued")) + verify { redis.setex("downloader:job:queued", 600L, "failed:0") } + } + + @Test + fun `delete rejects running job`() { + every { jobsRepository.getById("run") } returns row("run", JobStatus.RUNNING) + assertEquals(DeleteJobResult.CONFLICT_RUNNING, service.delete("run")) + verify(exactly = 0) { jobsRepository.deleteIfNotRunning(any()) } + } + + @Test + fun `delete removes artifact and redis state`() { + val existing = row("x", JobStatus.FAILED, artifactKey = "cache/x.mp4") + every { jobsRepository.getById("x") } returns existing + every { jobsRepository.deleteIfNotRunning("x") } returns existing + assertEquals(DeleteJobResult.DELETED, service.delete("x")) + verify { storage.deleteObject("cache/x.mp4") } + verify { redis.del("downloader:job:x") } + } + + private fun row(id: String, status: JobStatus, artifactKey: String? = null): JobRow = JobRow( + id = id, + url = "https://www.youtube.com/watch?v=test", + cacheKey = "cache-$id", + optionsJson = "{}", + status = status, + durationMs = 0, + title = "", + error = null, + artifactKey = artifactKey, + artifactExpiresAt = Instant.now().plusSeconds(300), + ) + + private fun config(): AppConfig = AppConfig( + httpPort = 18093, + dbUrl = "jdbc:postgresql://localhost:55432/typetype_downloader", + dbUser = "typetype", + dbPassword = "typetype", + redisHost = "localhost", + redisPort = 56379, + redisQueueKey = "downloader:queue", + maxConcurrentWorkers = 2, + maxQueueSize = 100, + jobTtlSeconds = 600, + ytdlpBin = "yt-dlp", + ytdlpTimeoutSeconds = 60, + enableTranscode = false, + s3Endpoint = "http://localhost:3900", + s3Region = "garage", + s3Bucket = "typetype-downloads", + s3AccessKey = "k", + s3SecretKey = "s", + s3ArtifactTtlSeconds = 7200, + tokenServiceUrl = "http://localhost:8081", + ) +} diff --git a/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt b/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt new file mode 100644 index 0000000..4d71f2e --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/services/JobServiceRecoveryTest.kt @@ -0,0 +1,96 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.config.AppConfig +import dev.typetype.downloader.db.JobRow +import dev.typetype.downloader.db.JobsRepository +import dev.typetype.downloader.models.JobOptions +import dev.typetype.downloader.models.JobStatus +import io.mockk.every +import io.mockk.mockk +import io.mockk.slot +import io.mockk.verify +import java.time.Instant +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class JobServiceRecoveryTest { + private val jobsRepository = mockk(relaxed = true) + private val redis = mockk(relaxed = true) + private val storage = mockk(relaxed = true) + private val service = JobService(jobsRepository, redis, storage, config()) + + @Test + fun `enqueue stores normalized options json`() { + val optionsSlot = slot() + every { jobsRepository.findReusableByCacheKey(any()) } returns null + every { redis.llen("downloader:queue") } returns 0L + every { jobsRepository.insertQueued(any(), any(), any(), capture(optionsSlot)) } returns Unit + val options = JobOptions(quality = "720p", format = "webm", sponsorBlock = true) + val created = service.enqueue("https://www.youtube.com/watch?v=dQw4w9WgXcQ", options) + val decoded = JobOptionsCodec.decode(optionsSlot.captured) + assertFalse(created.cached) + assertEquals("720p", decoded.quality) + assertEquals("webm", decoded.format) + assertTrue(decoded.sponsorBlockCategories.isNotEmpty()) + } + + @Test + fun `recovery requeues pending jobs with stored options`() { + val one = row("a", JobStatus.QUEUED, JobOptionsCodec.encode(JobOptions(quality = "1080p"))) + val two = row( + "b", + JobStatus.RUNNING, + JobOptionsCodec.encode(JobOptions(mode = dev.typetype.downloader.models.DownloadMode.AUDIO, quality = "1080p", format = "avi")), + ) + val payloads = mutableListOf() + every { jobsRepository.listQueuedOrRunning() } returns listOf(one, two) + every { redis.rpush("downloader:queue", capture(payloads)) } returns 1L + service.recoverPendingJobs() + verify { jobsRepository.resetRunningToQueued() } + verify { redis.del("downloader:queue") } + verify(exactly = 2) { redis.setex(any(), 600L, "queued") } + val queued = payloads.mapNotNull { JobOptionsCodec.decodeQueue(it) } + assertEquals(setOf("a", "b"), queued.map { it.id }.toSet()) + val recoveredAudio = queued.first { it.id == "b" }.options + assertEquals("best", recoveredAudio.quality) + assertEquals("mp3", recoveredAudio.format) + } + + private fun row(id: String, status: JobStatus, optionsJson: String): JobRow = JobRow( + id = id, + url = "https://www.youtube.com/watch?v=$id", + cacheKey = "cache-$id", + optionsJson = optionsJson, + status = status, + durationMs = 0, + title = "", + error = null, + artifactKey = null, + artifactExpiresAt = Instant.now().plusSeconds(300), + ) + + private fun config(): AppConfig = AppConfig( + httpPort = 18093, + dbUrl = "jdbc:postgresql://localhost:55432/typetype_downloader", + dbUser = "typetype", + dbPassword = "typetype", + redisHost = "localhost", + redisPort = 56379, + redisQueueKey = "downloader:queue", + maxConcurrentWorkers = 2, + maxQueueSize = 100, + jobTtlSeconds = 600, + ytdlpBin = "yt-dlp", + ytdlpTimeoutSeconds = 60, + enableTranscode = false, + s3Endpoint = "http://localhost:3900", + s3Region = "garage", + s3Bucket = "typetype-downloads", + s3AccessKey = "k", + s3SecretKey = "s", + s3ArtifactTtlSeconds = 7200, + tokenServiceUrl = "http://localhost:8081", + ) +} diff --git a/src/test/kotlin/dev/typetype/downloader/services/SourceUrlResolverTest.kt b/src/test/kotlin/dev/typetype/downloader/services/SourceUrlResolverTest.kt new file mode 100644 index 0000000..0c6621c --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/services/SourceUrlResolverTest.kt @@ -0,0 +1,26 @@ +package dev.typetype.downloader.services + +import kotlin.test.Test +import kotlin.test.assertEquals + +class SourceUrlResolverTest { + @Test + fun `keeps plain youtube url unchanged`() { + val url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + assertEquals(url, SourceUrlResolver.resolve(url)) + } + + @Test + fun `unwraps encoded watch wrapper`() { + val wrapped = "https://watch.invalid/watch?v=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3Dg3AI_ZkMv1I" + val expected = "https://www.youtube.com/watch?v=g3AI_ZkMv1I" + assertEquals(expected, SourceUrlResolver.resolve(wrapped)) + } + + @Test + fun `unwraps encoded player wrapper`() { + val wrapped = "https://player.invalid/watch?v=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3D7fqWBwErkxA" + val expected = "https://www.youtube.com/watch?v=7fqWBwErkxA" + assertEquals(expected, SourceUrlResolver.resolve(wrapped)) + } +} diff --git a/src/test/kotlin/dev/typetype/downloader/services/YtDlpOptionResolverTest.kt b/src/test/kotlin/dev/typetype/downloader/services/YtDlpOptionResolverTest.kt new file mode 100644 index 0000000..703f686 --- /dev/null +++ b/src/test/kotlin/dev/typetype/downloader/services/YtDlpOptionResolverTest.kt @@ -0,0 +1,39 @@ +package dev.typetype.downloader.services + +import dev.typetype.downloader.models.DownloadMode +import dev.typetype.downloader.models.JobOptions +import kotlin.test.Test +import kotlin.test.assertEquals + +class YtDlpOptionResolverTest { + @Test + fun `resolves audio selector and format defaults`() { + assertEquals("worstaudio/worst", YtDlpOptionResolver.audioSelector("worst")) + assertEquals("bestaudio/best", YtDlpOptionResolver.audioSelector("1080p")) + assertEquals("mp3", YtDlpOptionResolver.audioFormat("avi")) + assertEquals("m4a", YtDlpOptionResolver.audioFormat(" M4A ")) + } + + @Test + fun `resolves video selector and format defaults`() { + assertEquals("bv*[height<=720]+ba/b[height<=720]", YtDlpOptionResolver.videoSelector("720p")) + assertEquals("bv*+ba/b", YtDlpOptionResolver.videoSelector("best")) + assertEquals("mp4", YtDlpOptionResolver.videoFormat("unknown")) + assertEquals("webm", YtDlpOptionResolver.videoFormat(" WEBM ")) + } + + @Test + fun `preferred extensions prioritize requested normalized format`() { + val audio = YtDlpOptionResolver.preferredExtensions( + JobOptions(mode = DownloadMode.AUDIO, quality = "best", format = "opus"), + ) + val video = YtDlpOptionResolver.preferredExtensions( + JobOptions(mode = DownloadMode.VIDEO, quality = "best", format = "mkv"), + ) + val thumbnail = YtDlpOptionResolver.preferredExtensions(JobOptions(thumbnailOnly = true)) + + assertEquals("opus", audio.first()) + assertEquals("mkv", video.first()) + assertEquals(listOf("jpg", "jpeg", "png", "webp"), thumbnail) + } +}