Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,19 @@ All configuration is via environment variables.
## API

- `GET /health`
- `POST /jobs` with `{ "url": "..." }`
- `GET /jobs/{id}`
- `POST /jobs` accepts:
- `url` (required)
- `options.mode` (`video` or `audio`)
- `options.sponsorBlock` (`true`/`false`)
- `options.sponsorBlockCategories` (`sponsor,selfpromo,interaction,intro,outro,preview,filler,music_offtopic`)
- `options.thumbnailOnly` (`true`/`false`)
- `options.subtitles` (`enabled`, `auto`, `embed`, `languages`, `format`)
and returns `{ "id": "...", "cached": false|true }`
- `GET /jobs/{id}` returns one of `queued|running|done|failed` and includes a signed `artifactUrl` when available
- `GET /jobs/{id}/artifact` redirects to signed Garage artifact URL when ready

Wrapper URLs are resolved automatically. For example, frontend watch wrappers such as
`https://watch.example/watch?v=https%3A%2F%2Fwww.youtube.com%2Fwatch%3Fv%3D...` are normalized to the underlying source URL before processing.

## License

Expand Down
2 changes: 2 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ dependencies {
implementation("org.postgresql:postgresql:42.7.8")
implementation("com.zaxxer:HikariCP:6.3.0")
implementation("redis.clients:jedis:6.1.0")
implementation("software.amazon.awssdk:s3:2.31.69")
implementation("ch.qos.logback:logback-classic:1.5.20")
testImplementation(kotlin("test"))
testImplementation("io.mockk:mockk:1.13.12")
}

application {
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ services:

garage:
image: dxflrs/garage:v2.2.0
command: ["server"]
ports:
- "3900:3900"
- "3901:3901"
Expand Down
15 changes: 13 additions & 2 deletions src/main/kotlin/dev/typetype/downloader/ApplicationModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import dev.typetype.downloader.routes.healthRoutes
import dev.typetype.downloader.routes.jobRoutes
import dev.typetype.downloader.services.JobService
import dev.typetype.downloader.services.JobWorker
import dev.typetype.downloader.services.QueueSaturatedException
import dev.typetype.downloader.services.GarageStorageService
import dev.typetype.downloader.services.TokenServiceClient
import dev.typetype.downloader.services.YtDlpService
import io.ktor.http.HttpStatusCode
import io.ktor.serialization.kotlinx.json.json
Expand All @@ -24,15 +27,22 @@ fun Application.module() {
val config = AppConfigLoader.load()
Database.init(config)
val redis = JedisPooled(config.redisHost, config.redisPort)
val storage = GarageStorageService(config)
storage.ensureBucket()
val jobsRepository = JobsRepository()
val ytDlpService = YtDlpService(config)
val jobService = JobService(jobsRepository, redis, config)
val worker = JobWorker(jobsRepository, redis, ytDlpService, config)
val tokenServiceClient = TokenServiceClient(config)
val jobService = JobService(jobsRepository, redis, storage, config)
val worker = JobWorker(jobsRepository, redis, ytDlpService, tokenServiceClient, storage, config)
jobService.recoverPendingJobs()
worker.start()

install(CallLogging)
install(ContentNegotiation) { json() }
install(StatusPages) {
exception<QueueSaturatedException> { call, cause ->
call.respond(HttpStatusCode.TooManyRequests, mapOf("error" to (cause.message ?: "queue saturated")))
}
exception<IllegalArgumentException> { call, cause ->
call.respond(HttpStatusCode.BadRequest, mapOf("error" to (cause.message ?: "bad request")))
}
Expand All @@ -44,6 +54,7 @@ fun Application.module() {
}

monitor.subscribe(ApplicationStopping) {
storage.close()
redis.close()
Database.close()
}
Expand Down
14 changes: 14 additions & 0 deletions src/main/kotlin/dev/typetype/downloader/db/Database.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,31 @@ object Database {
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
source_url TEXT NOT NULL,
cache_key TEXT NOT NULL,
options_json TEXT NOT NULL DEFAULT '{}',
status TEXT NOT NULL,
duration_ms BIGINT NOT NULL,
title TEXT NOT NULL,
error TEXT,
artifact_key TEXT,
artifact_expires_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ
)
""".trimIndent()
)
statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS cache_key TEXT")
statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS options_json TEXT")
statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS artifact_key TEXT")
statement.execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS artifact_expires_at TIMESTAMPTZ")
statement.execute("UPDATE jobs SET cache_key = id WHERE cache_key IS NULL")
statement.execute("UPDATE jobs SET options_json = '{}' WHERE options_json IS NULL")
statement.execute("ALTER TABLE jobs ALTER COLUMN cache_key SET NOT NULL")
statement.execute("ALTER TABLE jobs ALTER COLUMN options_json SET NOT NULL")
statement.execute("CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)")
statement.execute("CREATE INDEX IF NOT EXISTS idx_jobs_cache_key ON jobs(cache_key)")
statement.execute("CREATE INDEX IF NOT EXISTS idx_jobs_artifact_expiry ON jobs(artifact_expires_at)")
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/kotlin/dev/typetype/downloader/db/JobRow.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package dev.typetype.downloader.db

import dev.typetype.downloader.models.JobStatus
import java.time.Instant

data class JobRow(
val id: String,
val url: String,
val cacheKey: String,
val optionsJson: String,
val status: JobStatus,
val durationMs: Long,
val title: String,
val error: String?,
val artifactKey: String?,
val artifactExpiresAt: Instant?,
)
17 changes: 17 additions & 0 deletions src/main/kotlin/dev/typetype/downloader/db/JobRowMapper.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package dev.typetype.downloader.db

import dev.typetype.downloader.models.JobStatus
import java.sql.ResultSet

fun rowFrom(rs: ResultSet): JobRow = JobRow(
id = rs.getString("id"),
url = rs.getString("source_url"),
cacheKey = rs.getString("cache_key"),
optionsJson = rs.getString("options_json"),
status = JobStatus.valueOf(rs.getString("status")),
durationMs = rs.getLong("duration_ms"),
title = rs.getString("title"),
error = rs.getString("error"),
artifactKey = rs.getString("artifact_key"),
artifactExpiresAt = rs.getTimestamp("artifact_expires_at")?.toInstant(),
)
117 changes: 71 additions & 46 deletions src/main/kotlin/dev/typetype/downloader/db/JobsRepository.kt
Original file line number Diff line number Diff line change
@@ -1,67 +1,92 @@
package dev.typetype.downloader.db

import dev.typetype.downloader.models.JobStatus
import java.sql.Timestamp
import java.sql.Types
import java.time.Instant

class JobsRepository {
fun insertQueued(id: String, url: String) {
private val columns = "id, source_url, cache_key, options_json, status, duration_ms, title, error, artifact_key, artifact_expires_at"

fun insertQueued(id: String, url: String, cacheKey: String, optionsJson: String) {
val sql = "INSERT INTO jobs (id, source_url, cache_key, options_json, status, duration_ms, title, error) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
Database.withConnection { connection ->
connection.prepareStatement(
"INSERT INTO jobs (id, source_url, status, duration_ms, title, error) VALUES (?, ?, ?, ?, ?, ?)"
).use { statement ->
statement.setString(1, id)
statement.setString(2, url)
statement.setString(3, JobStatus.QUEUED.name)
statement.setLong(4, 0L)
statement.setString(5, "")
statement.setString(6, null)
statement.executeUpdate()
connection.prepareStatement(sql).use {
it.setString(1, id); it.setString(2, url); it.setString(3, cacheKey); it.setString(4, optionsJson)
it.setString(5, JobStatus.QUEUED.name); it.setLong(6, 0L); it.setString(7, ""); it.setString(8, null)
it.executeUpdate()
}
}
}

fun getById(id: String): JobRow? = Database.withConnection { connection ->
connection.prepareStatement(
"SELECT id, source_url, status, duration_ms, title, error FROM jobs WHERE id = ?"
).use { statement ->
statement.setString(1, id)
statement.executeQuery().use { rs ->
if (!rs.next()) return@use null
JobRow(
id = rs.getString("id"),
url = rs.getString("source_url"),
status = JobStatus.valueOf(rs.getString("status")),
durationMs = rs.getLong("duration_ms"),
title = rs.getString("title"),
error = rs.getString("error"),
)
fun insertDoneFromCache(id: String, url: String, optionsJson: String, cached: JobRow) {
val sql = "INSERT INTO jobs (id, source_url, cache_key, options_json, status, duration_ms, title, error, artifact_key, artifact_expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
Database.withConnection { connection ->
connection.prepareStatement(sql).use {
it.setString(1, id); it.setString(2, url); it.setString(3, cached.cacheKey); it.setString(4, optionsJson)
it.setString(5, JobStatus.DONE.name); it.setLong(6, 0L); it.setString(7, cached.title); it.setString(8, null)
it.setString(9, cached.artifactKey); setInstant(it, 10, cached.artifactExpiresAt); it.executeUpdate()
}
}
}

fun markRunning(id: String) {
Database.withConnection { connection ->
connection.prepareStatement(
"UPDATE jobs SET status = ?, started_at = NOW() WHERE id = ?"
).use { statement ->
statement.setString(1, JobStatus.RUNNING.name)
statement.setString(2, id)
statement.executeUpdate()
fun findReusableByCacheKey(cacheKey: String): JobRow? = selectOne(
"SELECT $columns FROM jobs WHERE cache_key = ? AND status = ? AND artifact_key IS NOT NULL AND artifact_expires_at > NOW() ORDER BY finished_at DESC NULLS LAST LIMIT 1",
) { it.setString(1, cacheKey); it.setString(2, JobStatus.DONE.name) }

fun getById(id: String): JobRow? = selectOne("SELECT $columns FROM jobs WHERE id = ?") { it.setString(1, id) }

fun listQueuedOrRunning(): List<JobRow> = selectMany(
"SELECT $columns FROM jobs WHERE status = ? OR status = ? ORDER BY created_at ASC",
) { it.setString(1, JobStatus.QUEUED.name); it.setString(2, JobStatus.RUNNING.name) }

fun markRunningIfQueued(id: String): Boolean = update(
"UPDATE jobs SET status = ?, started_at = NOW(), error = NULL WHERE id = ? AND status = ?",
) { it.setString(1, JobStatus.RUNNING.name); it.setString(2, id); it.setString(3, JobStatus.QUEUED.name) } > 0

fun markCancelled(id: String): Boolean = update(
"UPDATE jobs SET status = ?, error = ?, finished_at = NOW() WHERE id = ? AND status IN (?, ?)",
) { it.setString(1, JobStatus.FAILED.name); it.setString(2, "job cancelled"); it.setString(3, id); it.setString(4, JobStatus.QUEUED.name); it.setString(5, JobStatus.RUNNING.name) } > 0

fun deleteIfNotRunning(id: String): JobRow? = selectOne(
"DELETE FROM jobs WHERE id = ? AND status <> ? RETURNING $columns",
) { it.setString(1, id); it.setString(2, JobStatus.RUNNING.name) }

fun markFinishedIfRunning(id: String, status: JobStatus, durationMs: Long, title: String, error: String?, artifactKey: String?, artifactExpiresAt: Instant?): Boolean = update(
"UPDATE jobs SET status = ?, duration_ms = ?, title = ?, error = ?, artifact_key = ?, artifact_expires_at = ?, finished_at = NOW() WHERE id = ? AND status = ?",
) {
it.setString(1, status.name); it.setLong(2, durationMs); it.setString(3, title); it.setString(4, error)
it.setString(5, artifactKey); setInstant(it, 6, artifactExpiresAt); it.setString(7, id); it.setString(8, JobStatus.RUNNING.name)
} > 0

fun resetRunningToQueued(): Int = update(
"UPDATE jobs SET status = ?, error = NULL WHERE status = ?",
) { it.setString(1, JobStatus.QUEUED.name); it.setString(2, JobStatus.RUNNING.name) }

private fun update(sql: String, bind: (java.sql.PreparedStatement) -> Unit): Int = Database.withConnection { connection ->
connection.prepareStatement(sql).use { bind(it); it.executeUpdate() }
}

private fun selectOne(sql: String, bind: (java.sql.PreparedStatement) -> Unit): JobRow? = Database.withConnection { connection ->
connection.prepareStatement(sql).use { bind(it); it.executeQuery().use { rs -> if (rs.next()) rowFrom(rs) else null } }
}

private fun selectMany(sql: String, bind: (java.sql.PreparedStatement) -> Unit): List<JobRow> = Database.withConnection { connection ->
connection.prepareStatement(sql).use {
bind(it)
it.executeQuery().use { rs ->
val rows = mutableListOf<JobRow>()
while (rs.next()) rows += rowFrom(rs)
rows
}
}
}

fun markFinished(id: String, status: JobStatus, durationMs: Long, title: String, error: String?) {
Database.withConnection { connection ->
connection.prepareStatement(
"UPDATE jobs SET status = ?, duration_ms = ?, title = ?, error = ?, finished_at = NOW() WHERE id = ?"
).use { statement ->
statement.setString(1, status.name)
statement.setLong(2, durationMs)
statement.setString(3, title)
statement.setString(4, error)
statement.setString(5, id)
statement.executeUpdate()
}
private fun setInstant(statement: java.sql.PreparedStatement, index: Int, value: Instant?) {
if (value == null) {
statement.setNull(index, Types.TIMESTAMP_WITH_TIMEZONE)
return
}
statement.setTimestamp(index, Timestamp.from(value))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ package dev.typetype.downloader.models
import kotlinx.serialization.Serializable

@Serializable
data class CreateJobRequest(val url: String)
data class CreateJobRequest(
val url: String,
val options: JobOptions = JobOptions(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ package dev.typetype.downloader.models
import kotlinx.serialization.Serializable

@Serializable
data class CreateJobResponse(val id: String)
data class CreateJobResponse(
val id: String,
val cached: Boolean,
)
13 changes: 13 additions & 0 deletions src/main/kotlin/dev/typetype/downloader/models/DownloadMode.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package dev.typetype.downloader.models

import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable

@Serializable
enum class DownloadMode {
@SerialName("video")
VIDEO,

@SerialName("audio")
AUDIO,
}
14 changes: 14 additions & 0 deletions src/main/kotlin/dev/typetype/downloader/models/JobOptions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package dev.typetype.downloader.models

import kotlinx.serialization.Serializable

@Serializable
data class JobOptions(
val mode: DownloadMode = DownloadMode.VIDEO,
val quality: String = "best",
val format: String = "",
val sponsorBlock: Boolean = false,
val sponsorBlockCategories: List<String> = emptyList(),
val thumbnailOnly: Boolean = false,
val subtitles: SubtitlesOptions = SubtitlesOptions(),
)
2 changes: 2 additions & 0 deletions src/main/kotlin/dev/typetype/downloader/models/JobResponse.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ data class JobResponse(
val durationMs: Long,
val title: String,
val error: String? = null,
val artifactUrl: String? = null,
val artifactExpiresAt: String? = null,
)
12 changes: 12 additions & 0 deletions src/main/kotlin/dev/typetype/downloader/models/SubtitlesOptions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package dev.typetype.downloader.models

import kotlinx.serialization.Serializable

@Serializable
data class SubtitlesOptions(
val enabled: Boolean = false,
val auto: Boolean = false,
val embed: Boolean = false,
val languages: List<String> = listOf("en"),
val format: String = "srt",
)
Loading