Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private object DisconnectedApi : KetchApi {

override suspend fun resolve(
url: String,
headers: Map<String, String>,
properties: Map<String, String>,
): ResolvedSource {
throw IllegalStateException(
"No instance connected. Add a remote server first.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class FakeKetchApi(

override suspend fun resolve(
url: String,
headers: Map<String, String>,
properties: Map<String, String>,
): ResolvedSource {
throw UnsupportedOperationException(
"FakeKetchApi does not support resolve"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ interface KetchApi {
* during [download].
*
* @param url the URL to resolve
* @param headers optional HTTP headers to include in the probe
* @param properties source-specific key-value pairs. For HTTP
* sources this contains HTTP headers; other sources may
* interpret them differently or ignore them.
*/
suspend fun resolve(
url: String,
headers: Map<String, String> = emptyMap(),
properties: Map<String, String> = emptyMap(),
): ResolvedSource

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ class Ketch(

override suspend fun resolve(
url: String,
headers: Map<String, String>,
properties: Map<String, String>,
): ResolvedSource {
log.i { "Resolving URL: $url" }
val source = sourceResolver.resolve(url)
return source.resolve(url, headers)
return source.resolve(url, properties)
}

override suspend fun start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ import kotlinx.coroutines.flow.MutableStateFlow
* call this with the number of bytes before writing each chunk.
* This replaces direct [SpeedLimiter] access to avoid cross-module
* visibility issues with internal types.
* @property headers HTTP headers or source-specific metadata headers
* @property headers request headers from [DownloadRequest.headers].
* Used by HTTP sources for custom headers; other sources may ignore.
* @property preResolved pre-resolved URL metadata, allowing the
* download source to skip its own probe/HEAD request
* @property maxConnections observable override for the number of
* concurrent segment connections. When positive, takes precedence
* over [DownloadRequest.connections]. Emitting a new value triggers
* live resegmentation in [HttpDownloadSource]. Reduced automatically
* on HTTP 429 (Too Many Requests) responses.
* live resegmentation in sources that support it. Reduced
* automatically on HTTP 429 (Too Many Requests) responses.
* @property pendingResegment target connection count for a pending
* resegmentation. Set by the connection-change watcher before
* canceling the download batch scope. Read by [HttpDownloadSource]
* to distinguish resegment-cancel from external cancel.
* canceling the download batch scope. Read by sources to
* distinguish resegment-cancel from external cancel.
*/
class DownloadContext(
val taskId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ internal class DownloadExecution(
totalBytes = total

val fileName = resolvedUrl.suggestedFileName
?: fileNameResolver.resolve(
request, toServerInfo(resolvedUrl),
)
?: fileNameResolver.resolve(request, resolvedUrl)
val outputPath = resolveDestPath(
destination = request.destination,
defaultDir = config.defaultDirectory ?: "downloads",
Expand All @@ -162,12 +160,10 @@ internal class DownloadExecution(
outputPath = outputPath,
state = TaskState.DOWNLOADING,
totalBytes = total,
acceptRanges = resolvedUrl.supportsResume,
etag = resolvedUrl.metadata[HttpDownloadSource.META_ETAG],
lastModified = resolvedUrl.metadata[
HttpDownloadSource.META_LAST_MODIFIED,
],
sourceType = source.type,
sourceResumeState = source.buildResumeState(
resolvedUrl, total,
),
updatedAt = now,
)
}
Expand All @@ -177,6 +173,7 @@ internal class DownloadExecution(
val preResolved = if (resolved != null) resolvedUrl else null
runDownload(
outputPath, total, source.managesOwnFileIo, preResolved,
source = source,
) { ctx ->
source.download(ctx)
}
Expand All @@ -185,7 +182,12 @@ internal class DownloadExecution(
private suspend fun executeResume(info: ResumeInfo) {
val taskRecord = info.record

val sourceType = taskRecord.sourceType ?: HttpDownloadSource.TYPE
val sourceType = taskRecord.sourceType
?: throw KetchError.Unknown(
IllegalStateException(
"No sourceType for taskId=${taskRecord.taskId}",
),
)
val source = sourceResolver.resolveByType(sourceType)
log.i {
"Resuming download for taskId=$taskId via " +
Expand All @@ -202,14 +204,13 @@ internal class DownloadExecution(
taskLimiter.delegate = createLimiter(taskRecord.request.speedLimit)

val resumeState = taskRecord.sourceResumeState
?: HttpDownloadSource.buildResumeState(
etag = taskRecord.etag,
lastModified = taskRecord.lastModified,
totalBytes = taskRecord.totalBytes,
?: throw KetchError.CorruptResumeState(
"No resume state for taskId=${taskRecord.taskId}",
)

runDownload(
outputPath, taskRecord.totalBytes, source.managesOwnFileIo,
source = source,
) { ctx ->
context = ctx
source.resume(ctx, resumeState)
Expand All @@ -229,6 +230,7 @@ internal class DownloadExecution(
total: Long,
selfManagedIo: Boolean = false,
preResolved: ResolvedSource? = null,
source: DownloadSource? = null,
downloadBlock: suspend (DownloadContext) -> Unit,
) {
val fa = if (selfManagedIo) {
Expand All @@ -248,10 +250,14 @@ internal class DownloadExecution(
while (true) {
delay(config.saveIntervalMs)
val snapshot = handle.mutableSegments.value
val downloaded = snapshot.sumOf { it.downloadedBytes }
val updatedResume = context?.let {
source?.updateResumeState(it)
}
handle.record.update {
it.copy(
segments = snapshot,
sourceResumeState = updatedResume
?: it.sourceResumeState,
updatedAt = Clock.System.now(),
)
}
Expand All @@ -278,11 +284,6 @@ internal class DownloadExecution(
it.copy(
state = TaskState.COMPLETED,
segments = null,
sourceResumeState = HttpDownloadSource.buildResumeState(
etag = it.etag,
lastModified = it.lastModified,
totalBytes = it.totalBytes,
),
updatedAt = Clock.System.now(),
)
}
Expand Down Expand Up @@ -479,17 +480,6 @@ internal class DownloadExecution(
}
}

private fun toServerInfo(resolved: ResolvedSource): ServerInfo {
return ServerInfo(
contentLength = resolved.totalBytes,
acceptRanges = resolved.supportsResume,
etag = resolved.metadata[HttpDownloadSource.META_ETAG],
lastModified = resolved.metadata[
HttpDownloadSource.META_LAST_MODIFIED,
],
)
}

private fun resolveDestPath(
destination: com.linroid.ketch.api.Destination?,
defaultDir: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import com.linroid.ketch.api.ResolvedSource
*
* Each source handles a specific protocol or download mechanism.
* The default implementation is [HttpDownloadSource] for HTTP/HTTPS
* downloads. Future implementations may include torrent, media
* extraction, or other protocols.
* downloads. Other implementations include FTP, BitTorrent, etc.
*
* Sources are registered with [SourceResolver] which routes
* download requests to the appropriate source based on URL matching.
Expand All @@ -32,10 +31,15 @@ interface DownloadSource {
* Resolves source metadata for the given URL without downloading.
* This is analogous to an HTTP HEAD request but generalized for
* any source type.
*
* @param url the URL to resolve
* @param properties source-specific key-value pairs. For HTTP
* sources this contains HTTP headers; other sources may
* interpret them differently or ignore them.
*/
suspend fun resolve(
url: String,
headers: Map<String, String> = emptyMap(),
properties: Map<String, String> = emptyMap(),
): ResolvedSource

/**
Expand All @@ -56,4 +60,36 @@ interface DownloadSource {
* [com.linroid.ketch.api.KetchError.Unsupported].
*/
suspend fun resume(context: DownloadContext, resumeState: SourceResumeState)

/**
* Builds an opaque [SourceResumeState] from resolved metadata.
*
* Called after [resolve] completes to persist source-specific
* state needed for resume validation (e.g., HTTP ETag/Last-Modified,
* FTP MDTM, torrent info hash). The returned state is stored in
* the task record and passed back to [resume] on restart.
*
* @param resolved the metadata returned by [resolve]
* @param totalBytes total download size in bytes
*/
fun buildResumeState(
resolved: ResolvedSource,
totalBytes: Long,
): SourceResumeState

/**
* Called periodically during download to let the source update
* its resume state. The returned state replaces the current
* resume state in the task record.
*
* Default implementation returns `null` (no update needed).
* Sources like BitTorrent override this to persist bitfield
* progress incrementally.
*
* @param context the active download context
* @return updated resume state, or `null` to keep the current one
*/
suspend fun updateResumeState(
context: DownloadContext,
): SourceResumeState? = null
}
Loading
Loading