From b676195011a3f30eddd7ed6bc9ebaa747cf30ca2 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 26 Feb 2026 05:04:07 +0000 Subject: [PATCH 1/5] refactor: decouple download architecture from HTTP-specific assumptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove HTTP-specific fields (etag, lastModified, acceptRanges) from TaskRecord; all source-specific state now lives in sourceResumeState - Add buildResumeState(ResolvedSource, Long) to DownloadSource interface so each source manages its own resume state - Remove all HttpDownloadSource references from DownloadExecution (protocol-agnostic) - Rename headers→properties in DownloadSource.resolve() and KetchApi.resolve() - Update FileNameResolver to accept ResolvedSource instead of ServerInfo - Update SQLite schema to persist sourceType and sourceResumeState instead of HTTP-specific columns - Implement buildResumeState() in HttpDownloadSource, FtpDownloadSource, and TorrentDownloadSource https://claude.ai/code/session_016RRifFe1AwMyjU8NYHrEqC --- .../kotlin/com/linroid/ketch/api/KetchApi.kt | 6 ++- .../kotlin/com/linroid/ketch/core/Ketch.kt | 4 +- .../ketch/core/engine/DownloadExecution.kt | 41 ++++++------------- .../ketch/core/engine/DownloadSource.kt | 26 ++++++++++-- .../ketch/core/engine/HttpDownloadSource.kt | 19 ++++++++- .../core/file/DefaultFileNameResolver.kt | 10 +++-- .../ketch/core/file/FileNameResolver.kt | 12 +++--- .../com/linroid/ketch/core/task/TaskRecord.kt | 3 -- .../linroid/ketch/ftp/FtpDownloadSource.kt | 12 +++++- .../com/linroid/ketch/remote/RemoteKetch.kt | 4 +- .../linroid/ketch/sqlite/SqliteTaskStore.kt | 30 +++++++++----- .../com/linroid/ketch/sqlite/TaskRecords.sq | 14 +++---- .../ketch/torrent/TorrentDownloadSource.kt | 20 ++++++++- 13 files changed, 128 insertions(+), 73 deletions(-) diff --git a/library/api/src/commonMain/kotlin/com/linroid/ketch/api/KetchApi.kt b/library/api/src/commonMain/kotlin/com/linroid/ketch/api/KetchApi.kt index 547aef13..8ef10073 100644 --- a/library/api/src/commonMain/kotlin/com/linroid/ketch/api/KetchApi.kt +++ b/library/api/src/commonMain/kotlin/com/linroid/ketch/api/KetchApi.kt @@ -27,11 +27,13 @@ interface KetchApi { * during [download]. * * @param url the URL to resolve - * @param headers optional HTTP headers to include in the probe + * @param properties source-specific key-value pairs. For HTTP + * sources this contains HTTP headers; other sources may + * interpret them differently or ignore them. */ suspend fun resolve( url: String, - headers: Map = emptyMap(), + properties: Map = emptyMap(), ): ResolvedSource /** diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/Ketch.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/Ketch.kt index 803c23ac..f0c8eb4a 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/Ketch.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/Ketch.kt @@ -181,11 +181,11 @@ class Ketch( override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { log.i { "Resolving URL: $url" } val source = sourceResolver.resolve(url) - return source.resolve(url, headers) + return source.resolve(url, properties) } override suspend fun start() { diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadExecution.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadExecution.kt index ef593e94..caeb1725 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadExecution.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadExecution.kt @@ -139,9 +139,7 @@ internal class DownloadExecution( totalBytes = total val fileName = resolvedUrl.suggestedFileName - ?: fileNameResolver.resolve( - request, toServerInfo(resolvedUrl), - ) + ?: fileNameResolver.resolve(request, resolvedUrl) val outputPath = resolveDestPath( destination = request.destination, defaultDir = config.defaultDirectory ?: "downloads", @@ -161,12 +159,10 @@ internal class DownloadExecution( outputPath = outputPath, state = TaskState.DOWNLOADING, totalBytes = total, - acceptRanges = resolvedUrl.supportsResume, - etag = resolvedUrl.metadata[HttpDownloadSource.META_ETAG], - lastModified = resolvedUrl.metadata[ - HttpDownloadSource.META_LAST_MODIFIED, - ], sourceType = source.type, + sourceResumeState = source.buildResumeState( + resolvedUrl, total, + ), updatedAt = now, ) } @@ -184,7 +180,12 @@ internal class DownloadExecution( private suspend fun executeResume(info: ResumeInfo) { val taskRecord = info.record - val sourceType = taskRecord.sourceType ?: HttpDownloadSource.TYPE + val sourceType = taskRecord.sourceType + ?: throw KetchError.Unknown( + IllegalStateException( + "No sourceType for taskId=${taskRecord.taskId}", + ), + ) val source = sourceResolver.resolveByType(sourceType) log.i { "Resuming download for taskId=$taskId via " + @@ -201,10 +202,8 @@ internal class DownloadExecution( taskLimiter.delegate = createLimiter(taskRecord.request.speedLimit) val resumeState = taskRecord.sourceResumeState - ?: HttpDownloadSource.buildResumeState( - etag = taskRecord.etag, - lastModified = taskRecord.lastModified, - totalBytes = taskRecord.totalBytes, + ?: throw KetchError.CorruptResumeState( + "No resume state for taskId=${taskRecord.taskId}", ) runDownload( @@ -277,11 +276,6 @@ internal class DownloadExecution( it.copy( state = TaskState.COMPLETED, segments = null, - sourceResumeState = HttpDownloadSource.buildResumeState( - etag = it.etag, - lastModified = it.lastModified, - totalBytes = it.totalBytes, - ), updatedAt = Clock.System.now(), ) } @@ -478,17 +472,6 @@ internal class DownloadExecution( } } - private fun toServerInfo(resolved: ResolvedSource): ServerInfo { - return ServerInfo( - contentLength = resolved.totalBytes, - acceptRanges = resolved.supportsResume, - etag = resolved.metadata[HttpDownloadSource.META_ETAG], - lastModified = resolved.metadata[ - HttpDownloadSource.META_LAST_MODIFIED, - ], - ) - } - private fun resolveDestPath( destination: com.linroid.ketch.api.Destination?, defaultDir: String, diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadSource.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadSource.kt index 17d33eb3..c456c00d 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadSource.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadSource.kt @@ -7,8 +7,7 @@ import com.linroid.ketch.api.ResolvedSource * * Each source handles a specific protocol or download mechanism. * The default implementation is [HttpDownloadSource] for HTTP/HTTPS - * downloads. Future implementations may include torrent, media - * extraction, or other protocols. + * downloads. Other implementations include FTP, BitTorrent, etc. * * Sources are registered with [SourceResolver] which routes * download requests to the appropriate source based on URL matching. @@ -32,10 +31,15 @@ interface DownloadSource { * Resolves source metadata for the given URL without downloading. * This is analogous to an HTTP HEAD request but generalized for * any source type. + * + * @param url the URL to resolve + * @param properties source-specific key-value pairs. For HTTP + * sources this contains HTTP headers; other sources may + * interpret them differently or ignore them. */ suspend fun resolve( url: String, - headers: Map = emptyMap(), + properties: Map = emptyMap(), ): ResolvedSource /** @@ -56,4 +60,20 @@ interface DownloadSource { * [com.linroid.ketch.api.KetchError.Unsupported]. */ suspend fun resume(context: DownloadContext, resumeState: SourceResumeState) + + /** + * Builds an opaque [SourceResumeState] from resolved metadata. + * + * Called after [resolve] completes to persist source-specific + * state needed for resume validation (e.g., HTTP ETag/Last-Modified, + * FTP MDTM, torrent info hash). The returned state is stored in + * the task record and passed back to [resume] on restart. + * + * @param resolved the metadata returned by [resolve] + * @param totalBytes total download size in bytes + */ + fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ): SourceResumeState } diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/HttpDownloadSource.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/HttpDownloadSource.kt index 1fb96d4f..fad8022e 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/HttpDownloadSource.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/HttpDownloadSource.kt @@ -47,10 +47,10 @@ internal class HttpDownloadSource( override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { val detector = RangeSupportDetector(httpEngine) - val serverInfo = detector.detect(url, headers) + val serverInfo = detector.detect(url, properties) val fileName = serverInfo.contentDisposition?.let { extractDispositionFileName(it) } ?: DefaultFileNameResolver.fromUrl(url) @@ -65,6 +65,9 @@ internal class HttpDownloadSource( serverInfo.etag?.let { put(META_ETAG, it) } serverInfo.lastModified?.let { put(META_LAST_MODIFIED, it) } if (serverInfo.acceptRanges) put(META_ACCEPT_RANGES, "true") + serverInfo.contentDisposition?.let { + put(META_CONTENT_DISPOSITION, it) + } serverInfo.rateLimitRemaining?.let { put(META_RATE_LIMIT_REMAINING, it.toString()) } @@ -454,6 +457,17 @@ internal class HttpDownloadSource( return connections } + override fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ): SourceResumeState { + return buildResumeState( + etag = resolved.metadata[META_ETAG], + lastModified = resolved.metadata[META_LAST_MODIFIED], + totalBytes = totalBytes, + ) + } + private fun extractDispositionFileName( contentDisposition: String, ): String? { @@ -466,6 +480,7 @@ internal class HttpDownloadSource( internal const val META_ETAG = "etag" internal const val META_LAST_MODIFIED = "lastModified" internal const val META_ACCEPT_RANGES = "acceptRanges" + internal const val META_CONTENT_DISPOSITION = "contentDisposition" internal const val META_RATE_LIMIT_REMAINING = "rateLimitRemaining" internal const val META_RATE_LIMIT_RESET = "rateLimitReset" diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/DefaultFileNameResolver.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/DefaultFileNameResolver.kt index 54c58e43..b45db748 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/DefaultFileNameResolver.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/DefaultFileNameResolver.kt @@ -1,13 +1,13 @@ package com.linroid.ketch.core.file import com.linroid.ketch.api.DownloadRequest +import com.linroid.ketch.api.ResolvedSource import com.linroid.ketch.api.log.KetchLogger -import com.linroid.ketch.core.engine.ServerInfo /** * Default strategy for resolving file names: * 1. Content-Disposition header (`filename*=UTF-8''...`, `filename="..."`, - * or `filename=...`) + * or `filename=...`) — extracted from `resolved.metadata["contentDisposition"]` * 2. Last non-empty URL path segment (percent-decoded, query/fragment stripped) * 3. Fallback: `"download"` * @@ -20,9 +20,10 @@ internal class DefaultFileNameResolver : FileNameResolver { override fun resolve( request: DownloadRequest, - serverInfo: ServerInfo, + resolved: ResolvedSource, ): String { - val name = fromContentDisposition(serverInfo.contentDisposition) + val contentDisposition = resolved.metadata[META_CONTENT_DISPOSITION] + val name = fromContentDisposition(contentDisposition) ?: fromUrl(request.url) ?: FALLBACK log.d { "Resolved filename: \"$name\" for url: ${request.url}" } @@ -31,6 +32,7 @@ internal class DefaultFileNameResolver : FileNameResolver { companion object { internal const val FALLBACK = "download" + internal const val META_CONTENT_DISPOSITION = "contentDisposition" internal fun fromContentDisposition(header: String?): String? { if (header.isNullOrBlank()) return null diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/FileNameResolver.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/FileNameResolver.kt index cba30134..4415a519 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/FileNameResolver.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/file/FileNameResolver.kt @@ -1,22 +1,22 @@ package com.linroid.ketch.core.file import com.linroid.ketch.api.DownloadRequest -import com.linroid.ketch.core.engine.ServerInfo +import com.linroid.ketch.api.ResolvedSource /** - * Resolves a file name for a download from server metadata. + * Resolves a file name for a download from source metadata. * * Explicit names set via [DownloadRequest.destination] are handled by * the coordinator before this resolver is called. Implementations only - * need to derive a name from the server response. + * need to derive a name from the source response. */ fun interface FileNameResolver { /** - * Resolves a file name from the given [request] and [serverInfo]. + * Resolves a file name from the given [request] and [resolved] metadata. * * @param request the download request - * @param serverInfo server metadata from the HEAD response + * @param resolved source metadata from the resolve/probe step * @return a non-blank file name to save the download as */ - fun resolve(request: DownloadRequest, serverInfo: ServerInfo): String + fun resolve(request: DownloadRequest, resolved: ResolvedSource): String } diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/task/TaskRecord.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/task/TaskRecord.kt index a8e32b2e..65940f31 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/task/TaskRecord.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/task/TaskRecord.kt @@ -22,9 +22,6 @@ data class TaskRecord( val state: TaskState = TaskState.QUEUED, val totalBytes: Long = -1, val error: KetchError? = null, - val acceptRanges: Boolean? = null, - val etag: String? = null, - val lastModified: String? = null, val segments: List? = null, val sourceType: String? = null, val sourceResumeState: SourceResumeState? = null, diff --git a/library/ftp/src/commonMain/kotlin/com/linroid/ketch/ftp/FtpDownloadSource.kt b/library/ftp/src/commonMain/kotlin/com/linroid/ketch/ftp/FtpDownloadSource.kt index 8397520f..8bf90c8d 100644 --- a/library/ftp/src/commonMain/kotlin/com/linroid/ketch/ftp/FtpDownloadSource.kt +++ b/library/ftp/src/commonMain/kotlin/com/linroid/ketch/ftp/FtpDownloadSource.kt @@ -54,9 +54,19 @@ class FtpDownloadSource( return lower.startsWith("ftp://") || lower.startsWith("ftps://") } + override fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ): SourceResumeState { + return buildResumeState( + totalBytes = totalBytes, + mdtm = resolved.metadata[META_MDTM], + ) + } + override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { val ftpUrl = try { FtpUrl.parse(url) diff --git a/library/remote/src/commonMain/kotlin/com/linroid/ketch/remote/RemoteKetch.kt b/library/remote/src/commonMain/kotlin/com/linroid/ketch/remote/RemoteKetch.kt index 04b4449f..273442fc 100644 --- a/library/remote/src/commonMain/kotlin/com/linroid/ketch/remote/RemoteKetch.kt +++ b/library/remote/src/commonMain/kotlin/com/linroid/ketch/remote/RemoteKetch.kt @@ -133,11 +133,11 @@ class RemoteKetch( override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { val response = httpClient.post(Api.Resolve()) { contentType(ContentType.Application.Json) - setBody(ResolveUrlRequest(url, headers)) + setBody(ResolveUrlRequest(url, properties)) } checkSuccess(response) return response.body() diff --git a/library/sqlite/src/commonMain/kotlin/com/linroid/ketch/sqlite/SqliteTaskStore.kt b/library/sqlite/src/commonMain/kotlin/com/linroid/ketch/sqlite/SqliteTaskStore.kt index 8a3aa384..34f5e772 100644 --- a/library/sqlite/src/commonMain/kotlin/com/linroid/ketch/sqlite/SqliteTaskStore.kt +++ b/library/sqlite/src/commonMain/kotlin/com/linroid/ketch/sqlite/SqliteTaskStore.kt @@ -5,6 +5,7 @@ import com.linroid.ketch.api.DownloadRequest import com.linroid.ketch.api.KetchError import com.linroid.ketch.api.Segment import com.linroid.ketch.api.log.KetchLogger +import com.linroid.ketch.core.engine.SourceResumeState import com.linroid.ketch.core.task.TaskRecord import com.linroid.ketch.core.task.TaskState import com.linroid.ketch.core.task.TaskStore @@ -28,6 +29,7 @@ class SqliteTaskStore(driver: SqlDriver) : TaskStore { private val json = Json { ignoreUnknownKeys = true } private val errorSerializer = KetchError.serializer() private val segmentListSerializer = ListSerializer(Segment.serializer()) + private val resumeStateSerializer = SourceResumeState.serializer() /** * Saves a [TaskRecord] to the SQLite database. If a record with the same @@ -38,13 +40,15 @@ class SqliteTaskStore(driver: SqlDriver) : TaskStore { val requestJson = json.encodeToString( DownloadRequest.serializer(), record.request ) - val acceptRanges = record.acceptRanges?.let { if (it) 1L else 0L } val segmentsJson = record.segments?.let { json.encodeToString(segmentListSerializer, it) } val errorJson = record.error?.let { json.encodeToString(errorSerializer, it) } + val resumeStateJson = record.sourceResumeState?.let { + json.encodeToString(resumeStateSerializer, it) + } queries.transaction { queries.insertOrIgnore( task_id = record.taskId, @@ -52,9 +56,8 @@ class SqliteTaskStore(driver: SqlDriver) : TaskStore { output_path = record.outputPath, state = record.state.name, total_bytes = record.totalBytes, - accept_ranges = acceptRanges, - etag = record.etag, - last_modified = record.lastModified, + source_type = record.sourceType, + source_resume_state_json = resumeStateJson, segments_json = segmentsJson, error_json = errorJson, ) @@ -64,9 +67,8 @@ class SqliteTaskStore(driver: SqlDriver) : TaskStore { output_path = record.outputPath, state = record.state.name, total_bytes = record.totalBytes, - accept_ranges = acceptRanges, - etag = record.etag, - last_modified = record.lastModified, + source_type = record.sourceType, + source_resume_state_json = resumeStateJson, segments_json = segmentsJson, error_json = errorJson, ) @@ -116,9 +118,6 @@ class SqliteTaskStore(driver: SqlDriver) : TaskStore { null } }, - acceptRanges = accept_ranges?.let { it != 0L }, - etag = etag, - lastModified = last_modified, segments = segments_json?.let { try { json.decodeFromString(segmentListSerializer, it) @@ -127,6 +126,17 @@ class SqliteTaskStore(driver: SqlDriver) : TaskStore { null } }, + sourceType = source_type, + sourceResumeState = source_resume_state_json?.let { + try { + json.decodeFromString(resumeStateSerializer, it) + } catch (e: Exception) { + log.w(e) { + "Failed to deserialize resume state for task: $task_id" + } + null + } + }, createdAt = Instant.fromEpochMilliseconds(created_at), updatedAt = Instant.fromEpochMilliseconds(updated_at), ) diff --git a/library/sqlite/src/commonMain/sqldelight/com/linroid/ketch/sqlite/TaskRecords.sq b/library/sqlite/src/commonMain/sqldelight/com/linroid/ketch/sqlite/TaskRecords.sq index c27bfed5..fcdf19e9 100644 --- a/library/sqlite/src/commonMain/sqldelight/com/linroid/ketch/sqlite/TaskRecords.sq +++ b/library/sqlite/src/commonMain/sqldelight/com/linroid/ketch/sqlite/TaskRecords.sq @@ -4,9 +4,8 @@ CREATE TABLE IF NOT EXISTS task_records ( output_path TEXT, state TEXT NOT NULL DEFAULT 'PENDING', total_bytes INTEGER NOT NULL DEFAULT -1, - accept_ranges INTEGER, - etag TEXT, - last_modified TEXT, + source_type TEXT, + source_resume_state_json TEXT, segments_json TEXT, created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s', 'now') AS INTEGER) * 1000), updated_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s', 'now') AS INTEGER) * 1000), @@ -16,9 +15,9 @@ CREATE TABLE IF NOT EXISTS task_records ( insertOrIgnore: INSERT OR IGNORE INTO task_records( task_id, request_json, output_path, state, - total_bytes, accept_ranges, etag, last_modified, + total_bytes, source_type, source_resume_state_json, segments_json, error_json -) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?); +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); update: UPDATE task_records SET @@ -26,9 +25,8 @@ UPDATE task_records SET output_path = ?, state = ?, total_bytes = ?, - accept_ranges = ?, - etag = ?, - last_modified = ?, + source_type = ?, + source_resume_state_json = ?, segments_json = ?, error_json = ?, updated_at = CAST(strftime('%s', 'now') AS INTEGER) * 1000 diff --git a/library/torrent/src/commonMain/kotlin/com/linroid/ketch/torrent/TorrentDownloadSource.kt b/library/torrent/src/commonMain/kotlin/com/linroid/ketch/torrent/TorrentDownloadSource.kt index 75f2c6e5..95dd5a75 100644 --- a/library/torrent/src/commonMain/kotlin/com/linroid/ketch/torrent/TorrentDownloadSource.kt +++ b/library/torrent/src/commonMain/kotlin/com/linroid/ketch/torrent/TorrentDownloadSource.kt @@ -61,9 +61,27 @@ class TorrentDownloadSource( lower.contains(".torrent?") } + override fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ): SourceResumeState { + val infoHash = resolved.metadata[META_INFO_HASH] ?: "" + val state = TorrentResumeState( + infoHash = infoHash, + totalBytes = totalBytes, + resumeData = "", + selectedFileIds = resolved.files.map { it.id }.toSet(), + savePath = "", + ) + return SourceResumeState( + sourceType = TYPE, + data = Json.encodeToString(state), + ) + } + override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { val metadata = try { resolveMetadata(url) From be526e39cafe663ee451ddb145ee227f8e1ea64d Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 26 Feb 2026 05:11:12 +0000 Subject: [PATCH 2/5] refactor: extract SegmentedDownloadHelper and fix all broken references MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract shared download loop (downloadSegments/downloadBatch) into SegmentedDownloadHelper, eliminating ~200 lines of duplicated code between HttpDownloadSource and FtpDownloadSource - Both sources now delegate to SegmentedDownloadHelper.downloadAll() with a protocol-specific segment download lambda - Fix headers→properties rename across all KetchApi implementations: RemoteKetch, InstanceManager, FakeKetchApi, ResolveUrlRequest, ServerRoutes - Update all test fake DownloadSource objects: rename headers→properties, add buildResumeState() implementation - Update DefaultFileNameResolverTest to use ResolvedSource instead of ServerInfo - Remove obsolete TaskRecordTest for removed HTTP-specific fields - Update DownloadContext KDoc to be protocol-agnostic https://claude.ai/code/session_016RRifFe1AwMyjU8NYHrEqC --- .../ketch/app/instance/InstanceManager.kt | 2 +- .../com/linroid/ketch/app/FakeKetchApi.kt | 2 +- .../ketch/core/engine/DownloadContext.kt | 11 +- .../ketch/core/engine/HttpDownloadSource.kt | 192 ++-------------- .../core/segment/SegmentedDownloadHelper.kt | 209 ++++++++++++++++++ .../linroid/ketch/engine/ResolveUrlTest.kt | 13 +- .../ketch/engine/SourceResolverTest.kt | 6 +- .../ketch/file/DefaultFileNameResolverTest.kt | 50 +++-- .../com/linroid/ketch/task/TaskRecordTest.kt | 27 --- .../endpoints/model/ResolveUrlRequest.kt | 4 +- .../linroid/ketch/ftp/FtpDownloadSource.kt | 162 ++------------ .../linroid/ketch/server/api/ServerRoutes.kt | 2 +- 12 files changed, 299 insertions(+), 381 deletions(-) create mode 100644 library/core/src/commonMain/kotlin/com/linroid/ketch/core/segment/SegmentedDownloadHelper.kt diff --git a/app/shared/src/commonMain/kotlin/com/linroid/ketch/app/instance/InstanceManager.kt b/app/shared/src/commonMain/kotlin/com/linroid/ketch/app/instance/InstanceManager.kt index dc294631..adeb5335 100644 --- a/app/shared/src/commonMain/kotlin/com/linroid/ketch/app/instance/InstanceManager.kt +++ b/app/shared/src/commonMain/kotlin/com/linroid/ketch/app/instance/InstanceManager.kt @@ -244,7 +244,7 @@ private object DisconnectedApi : KetchApi { override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { throw IllegalStateException( "No instance connected. Add a remote server first.", diff --git a/app/shared/src/commonTest/kotlin/com/linroid/ketch/app/FakeKetchApi.kt b/app/shared/src/commonTest/kotlin/com/linroid/ketch/app/FakeKetchApi.kt index 50ab3b23..365e9c77 100644 --- a/app/shared/src/commonTest/kotlin/com/linroid/ketch/app/FakeKetchApi.kt +++ b/app/shared/src/commonTest/kotlin/com/linroid/ketch/app/FakeKetchApi.kt @@ -41,7 +41,7 @@ class FakeKetchApi( override suspend fun resolve( url: String, - headers: Map, + properties: Map, ): ResolvedSource { throw UnsupportedOperationException( "FakeKetchApi does not support resolve" diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadContext.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadContext.kt index 6cb306d3..5527ab55 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadContext.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadContext.kt @@ -21,18 +21,19 @@ import kotlinx.coroutines.flow.MutableStateFlow * call this with the number of bytes before writing each chunk. * This replaces direct [SpeedLimiter] access to avoid cross-module * visibility issues with internal types. - * @property headers HTTP headers or source-specific metadata headers + * @property headers request headers from [DownloadRequest.headers]. + * Used by HTTP sources for custom headers; other sources may ignore. * @property preResolved pre-resolved URL metadata, allowing the * download source to skip its own probe/HEAD request * @property maxConnections observable override for the number of * concurrent segment connections. When positive, takes precedence * over [DownloadRequest.connections]. Emitting a new value triggers - * live resegmentation in [HttpDownloadSource]. Reduced automatically - * on HTTP 429 (Too Many Requests) responses. + * live resegmentation in sources that support it. Reduced + * automatically on HTTP 429 (Too Many Requests) responses. * @property pendingResegment target connection count for a pending * resegmentation. Set by the connection-change watcher before - * canceling the download batch scope. Read by [HttpDownloadSource] - * to distinguish resegment-cancel from external cancel. + * canceling the download batch scope. Read by sources to + * distinguish resegment-cancel from external cancel. */ class DownloadContext( val taskId: String, diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/HttpDownloadSource.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/HttpDownloadSource.kt index fad8022e..4c20cf05 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/HttpDownloadSource.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/HttpDownloadSource.kt @@ -3,26 +3,15 @@ package com.linroid.ketch.core.engine import com.linroid.ketch.api.KetchError import com.linroid.ketch.api.ResolvedSource import com.linroid.ketch.api.Segment -import com.linroid.ketch.api.DownloadRequest import com.linroid.ketch.core.file.DefaultFileNameResolver import com.linroid.ketch.api.log.KetchLogger import com.linroid.ketch.core.segment.SegmentCalculator import com.linroid.ketch.core.segment.SegmentDownloader +import com.linroid.ketch.core.segment.SegmentedDownloadHelper import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import kotlinx.coroutines.withContext import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json -import kotlin.time.Clock -import kotlin.time.Duration.Companion.milliseconds /** * HTTP/HTTPS download source using the existing [HttpEngine] pipeline. @@ -234,173 +223,36 @@ internal class HttpDownloadSource( return segments } + private val segmentHelper = SegmentedDownloadHelper( + progressIntervalMs = progressIntervalMs, + tag = "HttpSource", + ) + /** - * Downloads segments with dynamic resegmentation support. - * - * Uses a while loop that repeatedly calls [downloadBatch] for the - * current set of incomplete segments. When the connection count - * changes (via [DownloadContext.maxConnections]), the batch is - * canceled, progress is snapshotted, segments are merged/split - * via [SegmentCalculator.resegment], and a new batch starts. + * Downloads segments via HTTP Range requests with dynamic + * resegmentation support. Delegates the concurrent batch loop + * to [SegmentedDownloadHelper]. */ private suspend fun downloadSegments( context: DownloadContext, segments: List, totalBytes: Long, ) { - var currentSegments = segments - - while (true) { - val incomplete = currentSegments.filter { !it.isComplete } - if (incomplete.isEmpty()) break - - val batchCompleted = downloadBatch( - context, currentSegments, incomplete, totalBytes - ) - - if (batchCompleted) break - - // Resegment with the new connection count - val newCount = context.pendingResegment - context.pendingResegment = 0 - currentSegments = SegmentCalculator.resegment( - context.segments.value, newCount - ) - context.segments.value = currentSegments - log.i { - "Resegmented to $newCount connections for " + - "taskId=${context.taskId}" - } - } - - context.segments.value = currentSegments - context.onProgress(totalBytes, totalBytes) - } - - /** - * Downloads one batch of incomplete segments concurrently. - * - * A watcher coroutine monitors [DownloadContext.maxConnections] - * for changes. When the connection count changes, it sets - * [DownloadContext.pendingResegment] and cancels the scope, - * causing all segment coroutines to stop. Progress is - * snapshotted before returning. - * - * @return `true` if all segments completed, `false` if - * interrupted for resegmentation - */ - private suspend fun downloadBatch( - context: DownloadContext, - allSegments: List, - incompleteSegments: List, - totalBytes: Long, - ): Boolean { - val segmentProgress = - allSegments.map { it.downloadedBytes }.toMutableList() - val segmentMutex = Mutex() - val updatedSegments = allSegments.toMutableList() - - var lastProgressUpdate = Clock.System.now() - val progressMutex = Mutex() - - suspend fun currentSegments(): List { - return segmentMutex.withLock { - updatedSegments.mapIndexed { i, seg -> - seg.copy(downloadedBytes = segmentProgress[i]) - } - } - } - - suspend fun updateProgress() { - val now = Clock.System.now() - progressMutex.withLock { - if (now - lastProgressUpdate >= - progressIntervalMs.milliseconds - ) { - val snapshot = currentSegments() - val downloaded = snapshot.sumOf { it.downloadedBytes } - context.onProgress(downloaded, totalBytes) - context.segments.value = snapshot - lastProgressUpdate = now - } - } - } - - val downloadedBytes = allSegments.sumOf { it.downloadedBytes } - context.onProgress(downloadedBytes, totalBytes) - context.segments.value = allSegments - - return try { - coroutineScope { - // Watcher: detect connection count changes and trigger - // resegmentation by canceling the scope. Compares against the - // last-seen flow value (not segment count) to avoid an infinite - // loop when fewer segments can be created than requested. - val watcherJob = launch { - val lastSeen = context.maxConnections.value - context.maxConnections.first { count -> - count > 0 && count != lastSeen - } - context.pendingResegment = - context.maxConnections.value - log.i { - "Connection change detected for " + - "taskId=${context.taskId}: " + - "$lastSeen -> ${context.pendingResegment}" - } - throw CancellationException("Resegmenting") + segmentHelper.downloadAll( + context, segments, totalBytes, + ) { segment, onProgress -> + val throttleLimiter = object : SpeedLimiter { + override suspend fun acquire(bytes: Int) { + context.throttle(bytes) } - - try { - val results = incompleteSegments.map { segment -> - async { - val throttleLimiter = object : SpeedLimiter { - override suspend fun acquire(bytes: Int) { - context.throttle(bytes) - } - } - val downloader = SegmentDownloader( - httpEngine, context.fileAccessor, - throttleLimiter, SpeedLimiter.Unlimited - ) - val completed = downloader.download( - context.url, segment, context.headers - ) { bytesDownloaded -> - segmentMutex.withLock { - segmentProgress[segment.index] = - bytesDownloaded - } - updateProgress() - } - segmentMutex.withLock { - updatedSegments[completed.index] = completed - } - context.segments.value = currentSegments() - log.d { - "Segment ${completed.index} completed for " + - "taskId=${context.taskId}" - } - completed - } - } - results.awaitAll() - } finally { - watcherJob.cancel() - } - - context.segments.value = currentSegments() - true // All segments completed - } - } catch (e: CancellationException) { - if (context.pendingResegment > 0) { - // Snapshot progress before resegmentation - withContext(NonCancellable) { - context.segments.value = currentSegments() - } - false // Signal outer loop to resegment - } else { - throw e // External cancellation — propagate } + val downloader = SegmentDownloader( + httpEngine, context.fileAccessor, + throttleLimiter, SpeedLimiter.Unlimited, + ) + downloader.download( + context.url, segment, context.headers, onProgress, + ) } } diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/segment/SegmentedDownloadHelper.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/segment/SegmentedDownloadHelper.kt new file mode 100644 index 00000000..b196d0ad --- /dev/null +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/segment/SegmentedDownloadHelper.kt @@ -0,0 +1,209 @@ +package com.linroid.ketch.core.segment + +import com.linroid.ketch.api.Segment +import com.linroid.ketch.api.log.KetchLogger +import com.linroid.ketch.core.engine.DownloadContext +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext +import kotlin.time.Clock +import kotlin.time.Duration.Companion.milliseconds + +/** + * Shared loop logic for segmented parallel downloads with dynamic + * resegmentation support. + * + * Both [com.linroid.ketch.core.engine.HttpDownloadSource] and + * FTP/other segmented sources can delegate to this helper instead + * of reimplementing the download loop, progress aggregation, and + * connection-change watcher logic independently. + * + * @param progressIntervalMs minimum interval between progress + * reports in milliseconds + * @param tag log tag for this helper instance + */ +internal class SegmentedDownloadHelper( + private val progressIntervalMs: Long = 200, + tag: String = "SegmentHelper", +) { + private val log = KetchLogger(tag) + + /** + * Downloads all incomplete segments concurrently with dynamic + * resegmentation support. + * + * Manages a while loop that calls [downloadBatch] for incomplete + * segments. When [DownloadContext.maxConnections] changes, the + * batch is canceled, progress is snapshotted, segments are + * merged/split via [SegmentCalculator.resegment], and a new + * batch starts. + * + * @param context the download context with progress callbacks + * @param segments initial list of segments (some may already + * have progress) + * @param totalBytes total download size + * @param downloadSegment protocol-specific download function for + * a single segment. Receives the segment and a progress + * callback (bytesDownloaded so far for this segment). Must + * return the completed segment with final downloadedBytes. + */ + suspend fun downloadAll( + context: DownloadContext, + segments: List, + totalBytes: Long, + downloadSegment: suspend ( + segment: Segment, + onProgress: suspend (bytesDownloaded: Long) -> Unit, + ) -> Segment, + ) { + var currentSegments = segments + + while (true) { + val incomplete = currentSegments.filter { !it.isComplete } + if (incomplete.isEmpty()) break + + val batchCompleted = downloadBatch( + context, currentSegments, incomplete, totalBytes, + downloadSegment, + ) + + if (batchCompleted) break + + val newCount = context.pendingResegment + context.pendingResegment = 0 + currentSegments = SegmentCalculator.resegment( + context.segments.value, newCount, + ) + context.segments.value = currentSegments + log.i { + "Resegmented to $newCount connections for " + + "taskId=${context.taskId}" + } + } + + context.segments.value = currentSegments + context.onProgress(totalBytes, totalBytes) + } + + /** + * Downloads one batch of incomplete segments concurrently. + * + * A watcher coroutine monitors [DownloadContext.maxConnections] + * for changes. When the connection count changes, it sets + * [DownloadContext.pendingResegment] and cancels the scope. + * + * @return `true` if all segments completed, `false` if + * interrupted for resegmentation + */ + private suspend fun downloadBatch( + context: DownloadContext, + allSegments: List, + incompleteSegments: List, + totalBytes: Long, + downloadSegment: suspend ( + segment: Segment, + onProgress: suspend (bytesDownloaded: Long) -> Unit, + ) -> Segment, + ): Boolean { + val segmentProgress = + allSegments.map { it.downloadedBytes }.toMutableList() + val segmentMutex = Mutex() + val updatedSegments = allSegments.toMutableList() + + var lastProgressUpdate = Clock.System.now() + val progressMutex = Mutex() + + suspend fun currentSegments(): List { + return segmentMutex.withLock { + updatedSegments.mapIndexed { i, seg -> + seg.copy(downloadedBytes = segmentProgress[i]) + } + } + } + + suspend fun updateProgress() { + val now = Clock.System.now() + progressMutex.withLock { + if (now - lastProgressUpdate >= + progressIntervalMs.milliseconds + ) { + val snapshot = currentSegments() + val downloaded = snapshot.sumOf { it.downloadedBytes } + context.onProgress(downloaded, totalBytes) + context.segments.value = snapshot + lastProgressUpdate = now + } + } + } + + val downloadedBytes = allSegments.sumOf { it.downloadedBytes } + context.onProgress(downloadedBytes, totalBytes) + context.segments.value = allSegments + + return try { + coroutineScope { + val watcherJob = launch { + val lastSeen = context.maxConnections.value + context.maxConnections.first { count -> + count > 0 && count != lastSeen + } + context.pendingResegment = + context.maxConnections.value + log.i { + "Connection change detected for " + + "taskId=${context.taskId}: " + + "$lastSeen -> ${context.pendingResegment}" + } + throw CancellationException("Resegmenting") + } + + try { + val results = incompleteSegments.map { segment -> + async { + val completed = downloadSegment( + segment, + ) { bytesDownloaded -> + segmentMutex.withLock { + segmentProgress[segment.index] = + bytesDownloaded + } + updateProgress() + } + segmentMutex.withLock { + updatedSegments[completed.index] = completed + } + context.segments.value = currentSegments() + log.d { + "Segment ${completed.index} completed for " + + "taskId=${context.taskId}" + } + completed + } + } + results.awaitAll() + } finally { + watcherJob.cancel() + } + + context.segments.value = currentSegments() + true + } + } catch (e: CancellationException) { + if (context.pendingResegment > 0) { + withContext(NonCancellable) { + context.segments.value = currentSegments() + } + false + } else { + throw e + } + } + } +} diff --git a/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/ResolveUrlTest.kt b/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/ResolveUrlTest.kt index 56f0f276..23732c9f 100644 --- a/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/ResolveUrlTest.kt +++ b/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/ResolveUrlTest.kt @@ -5,7 +5,6 @@ import com.linroid.ketch.api.ResolvedSource import com.linroid.ketch.core.engine.DownloadContext import com.linroid.ketch.core.engine.DownloadSource import com.linroid.ketch.core.engine.HttpDownloadSource -import com.linroid.ketch.core.engine.ServerInfo import com.linroid.ketch.core.engine.SourceResolver import com.linroid.ketch.core.engine.SourceResumeState import kotlinx.coroutines.test.runTest @@ -265,7 +264,7 @@ class ResolveUrlTest { url.startsWith("magnet:") override suspend fun resolve( url: String, - headers: Map, + properties: Map, ) = ResolvedSource( url = url, sourceType = "magnet", @@ -279,6 +278,10 @@ class ResolveUrlTest { context: DownloadContext, resumeState: SourceResumeState, ) {} + override fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ) = SourceResumeState(sourceType = "magnet", data = "{}") } val resolver = SourceResolver(listOf(fakeSource)) assertFailsWith { @@ -296,7 +299,7 @@ class ResolveUrlTest { url.startsWith("magnet:") override suspend fun resolve( url: String, - headers: Map, + properties: Map, ) = ResolvedSource( url = url, sourceType = "magnet", @@ -310,6 +313,10 @@ class ResolveUrlTest { context: DownloadContext, resumeState: SourceResumeState, ) {} + override fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ) = SourceResumeState(sourceType = "magnet", data = "{}") } val engine = FakeHttpEngine() diff --git a/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/SourceResolverTest.kt b/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/SourceResolverTest.kt index 41a921e5..840166c6 100644 --- a/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/SourceResolverTest.kt +++ b/library/core/src/commonTest/kotlin/com/linroid/ketch/engine/SourceResolverTest.kt @@ -25,7 +25,7 @@ class SourceResolverTest { url.startsWith("magnet:") override suspend fun resolve( url: String, - headers: Map, + properties: Map, ) = ResolvedSource( url = url, sourceType = "magnet", @@ -39,6 +39,10 @@ class SourceResolverTest { context: DownloadContext, resumeState: SourceResumeState, ) {} + override fun buildResumeState( + resolved: ResolvedSource, + totalBytes: Long, + ) = SourceResumeState(sourceType = "magnet", data = "{}") } @Test diff --git a/library/core/src/commonTest/kotlin/com/linroid/ketch/file/DefaultFileNameResolverTest.kt b/library/core/src/commonTest/kotlin/com/linroid/ketch/file/DefaultFileNameResolverTest.kt index 72358886..7a89900d 100644 --- a/library/core/src/commonTest/kotlin/com/linroid/ketch/file/DefaultFileNameResolverTest.kt +++ b/library/core/src/commonTest/kotlin/com/linroid/ketch/file/DefaultFileNameResolverTest.kt @@ -1,7 +1,7 @@ package com.linroid.ketch.file import com.linroid.ketch.api.DownloadRequest -import com.linroid.ketch.core.engine.ServerInfo +import com.linroid.ketch.api.ResolvedSource import com.linroid.ketch.core.file.DefaultFileNameResolver import kotlin.test.Test import kotlin.test.assertEquals @@ -11,12 +11,20 @@ class DefaultFileNameResolverTest { private val resolver = DefaultFileNameResolver() private val dir = "/tmp" - private fun serverInfo(contentDisposition: String? = null) = ServerInfo( - contentLength = 1000, - acceptRanges = true, - etag = null, - lastModified = null, - contentDisposition = contentDisposition, + private fun resolved( + contentDisposition: String? = null, + ) = ResolvedSource( + url = "https://example.com", + sourceType = "http", + totalBytes = 1000, + supportsResume = true, + suggestedFileName = null, + maxSegments = 4, + metadata = buildMap { + contentDisposition?.let { + put(DefaultFileNameResolver.META_CONTENT_DISPOSITION, it) + } + }, ) private fun request(url: String) = DownloadRequest( @@ -28,7 +36,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_filenameStarUtf8() { - val info = serverInfo("attachment; filename*=UTF-8''my%20file.zip") + val info = resolved("attachment; filename*=UTF-8''my%20file.zip") assertEquals( "my file.zip", resolver.resolve(request("https://example.com"), info) @@ -37,7 +45,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_filenameStarUtf8_caseInsensitive() { - val info = serverInfo("attachment; Filename*=utf-8''report.pdf") + val info = resolved("attachment; Filename*=utf-8''report.pdf") assertEquals( "report.pdf", resolver.resolve(request("https://example.com"), info) @@ -48,7 +56,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_filenameQuoted() { - val info = serverInfo("attachment; filename=\"document.pdf\"") + val info = resolved("attachment; filename=\"document.pdf\"") assertEquals( "document.pdf", resolver.resolve(request("https://example.com"), info) @@ -57,7 +65,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_filenameQuoted_withSpaces() { - val info = serverInfo("attachment; filename=\"my document.pdf\"") + val info = resolved("attachment; filename=\"my document.pdf\"") assertEquals( "my document.pdf", resolver.resolve(request("https://example.com"), info) @@ -68,7 +76,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_filenameUnquoted() { - val info = serverInfo("attachment; filename=report.csv") + val info = resolved("attachment; filename=report.csv") assertEquals( "report.csv", resolver.resolve(request("https://example.com"), info) @@ -79,7 +87,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fromUrlPath() { - val info = serverInfo() + val info = resolved() assertEquals( "archive.tar.gz", resolver.resolve( @@ -91,7 +99,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fromUrlPath_withQuery() { - val info = serverInfo() + val info = resolved() assertEquals( "file.zip", resolver.resolve( @@ -103,7 +111,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fromUrlPath_withFragment() { - val info = serverInfo() + val info = resolved() assertEquals( "file.zip", resolver.resolve( @@ -115,7 +123,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fromUrlPath_percentEncoded() { - val info = serverInfo() + val info = resolved() assertEquals( "my file.zip", resolver.resolve( @@ -127,7 +135,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fromUrlPath_trailingSlash() { - val info = serverInfo() + val info = resolved() assertEquals( "dir", resolver.resolve(request("https://example.com/dir/"), info) @@ -138,7 +146,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fallback_noPathNoDisposition() { - val info = serverInfo() + val info = resolved() assertEquals( "download", resolver.resolve(request("https://example.com/"), info) @@ -147,7 +155,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_fallback_rootUrl() { - val info = serverInfo() + val info = resolved() assertEquals( "download", resolver.resolve(request("https://example.com"), info) @@ -161,7 +169,7 @@ class DefaultFileNameResolverTest { // Explicit file names via Destination are now handled by the // coordinator, not the resolver. The resolver always returns // the server-derived name. - val info = serverInfo("attachment; filename=\"server-name.zip\"") + val info = resolved("attachment; filename=\"server-name.zip\"") val req = DownloadRequest( url = "https://example.com/url-name.zip", destination = com.linroid.ketch.api.Destination("explicit.zip"), @@ -171,7 +179,7 @@ class DefaultFileNameResolverTest { @Test fun resolve_dispositionTakesPriorityOverUrl() { - val info = serverInfo("attachment; filename=\"server-name.zip\"") + val info = resolved("attachment; filename=\"server-name.zip\"") assertEquals( "server-name.zip", resolver.resolve( diff --git a/library/core/src/commonTest/kotlin/com/linroid/ketch/task/TaskRecordTest.kt b/library/core/src/commonTest/kotlin/com/linroid/ketch/task/TaskRecordTest.kt index bbe891ea..929a40ba 100644 --- a/library/core/src/commonTest/kotlin/com/linroid/ketch/task/TaskRecordTest.kt +++ b/library/core/src/commonTest/kotlin/com/linroid/ketch/task/TaskRecordTest.kt @@ -107,31 +107,4 @@ class TaskRecordTest { assertNull(record.sourceResumeState) } - @Test - fun deserialization_withoutServerInfoFields_defaultsToNull() { - val epoch = Instant.fromEpochMilliseconds(0) - val jsonStr = """ - { - "taskId": "t1", - "request": { - "url": "https://example.com/f", - "destination": "/tmp/", - "connections": 4, - "headers": {}, - "properties": {} - }, - "outputPath": "/tmp/f", - "state": "QUEUED", - "totalBytes": 100, - "downloadedBytes": 0, - "createdAt": "$epoch", - "updatedAt": "$epoch" - } - """.trimIndent() - val record = json.decodeFromString(jsonStr) - assertNull(record.acceptRanges) - assertNull(record.etag) - assertNull(record.lastModified) - } - } diff --git a/library/endpoints/src/commonMain/kotlin/com/linroid/ketch/endpoints/model/ResolveUrlRequest.kt b/library/endpoints/src/commonMain/kotlin/com/linroid/ketch/endpoints/model/ResolveUrlRequest.kt index 6ed623fa..e0a19890 100644 --- a/library/endpoints/src/commonMain/kotlin/com/linroid/ketch/endpoints/model/ResolveUrlRequest.kt +++ b/library/endpoints/src/commonMain/kotlin/com/linroid/ketch/endpoints/model/ResolveUrlRequest.kt @@ -6,10 +6,10 @@ import kotlinx.serialization.Serializable * Request body for resolving a URL without downloading. * * @property url the URL to resolve - * @property headers optional HTTP headers to include in the probe + * @property properties source-specific key-value pairs (e.g., HTTP headers) */ @Serializable data class ResolveUrlRequest( val url: String, - val headers: Map = emptyMap(), + val properties: Map = emptyMap(), ) diff --git a/library/ftp/src/commonMain/kotlin/com/linroid/ketch/ftp/FtpDownloadSource.kt b/library/ftp/src/commonMain/kotlin/com/linroid/ketch/ftp/FtpDownloadSource.kt index 8bf90c8d..86816b08 100644 --- a/library/ftp/src/commonMain/kotlin/com/linroid/ketch/ftp/FtpDownloadSource.kt +++ b/library/ftp/src/commonMain/kotlin/com/linroid/ketch/ftp/FtpDownloadSource.kt @@ -8,20 +8,11 @@ import com.linroid.ketch.core.engine.DownloadContext import com.linroid.ketch.core.engine.DownloadSource import com.linroid.ketch.core.engine.SourceResumeState import com.linroid.ketch.core.segment.SegmentCalculator +import com.linroid.ketch.core.segment.SegmentedDownloadHelper import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.async -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.ensureActive -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import kotlinx.coroutines.withContext import kotlinx.serialization.json.Json -import kotlin.time.Clock -import kotlin.time.Duration.Companion.milliseconds /** * FTP/FTPS download source using the [FtpClient] protocol layer. @@ -252,152 +243,25 @@ class FtpDownloadSource( return segments } + private val segmentHelper = SegmentedDownloadHelper( + progressIntervalMs = progressIntervalMs, + tag = "FtpSource", + ) + /** - * Downloads segments with dynamic resegmentation support. + * Downloads segments via FTP connections with dynamic + * resegmentation support. Delegates the concurrent batch loop + * to [SegmentedDownloadHelper]. */ private suspend fun downloadSegments( context: DownloadContext, segments: List, totalBytes: Long, ) { - var currentSegments = segments - - while (true) { - val incomplete = currentSegments.filter { !it.isComplete } - if (incomplete.isEmpty()) break - - val batchCompleted = downloadBatch( - context, currentSegments, incomplete, totalBytes, - ) - - if (batchCompleted) break - - val newCount = context.pendingResegment - context.pendingResegment = 0 - currentSegments = SegmentCalculator.resegment( - context.segments.value, newCount, - ) - context.segments.value = currentSegments - log.i { - "Resegmented to $newCount connections for " + - "taskId=${context.taskId}" - } - } - - context.segments.value = currentSegments - context.onProgress(totalBytes, totalBytes) - } - - /** - * Downloads one batch of incomplete segments concurrently. - * - * Each segment gets its own FTP connection. The watcher coroutine - * monitors connection count changes for live resegmentation. - * - * @return true if all segments completed, false if interrupted - * for resegmentation - */ - private suspend fun downloadBatch( - context: DownloadContext, - allSegments: List, - incompleteSegments: List, - totalBytes: Long, - ): Boolean { - val segmentProgress = - allSegments.map { it.downloadedBytes }.toMutableList() - val segmentMutex = Mutex() - val updatedSegments = allSegments.toMutableList() - - var lastProgressUpdate = Clock.System.now() - val progressMutex = Mutex() - - suspend fun currentSegments(): List { - return segmentMutex.withLock { - updatedSegments.mapIndexed { i, seg -> - seg.copy(downloadedBytes = segmentProgress[i]) - } - } - } - - suspend fun updateProgress() { - val now = Clock.System.now() - progressMutex.withLock { - if (now - lastProgressUpdate >= - progressIntervalMs.milliseconds - ) { - val snapshot = currentSegments() - val downloaded = snapshot.sumOf { it.downloadedBytes } - context.onProgress(downloaded, totalBytes) - context.segments.value = snapshot - lastProgressUpdate = now - } - } - } - - val downloadedBytes = allSegments.sumOf { it.downloadedBytes } - context.onProgress(downloadedBytes, totalBytes) - context.segments.value = allSegments - - return try { - coroutineScope { - // Watcher for connection count changes - val watcherJob = launch { - val lastSeen = context.maxConnections.value - context.maxConnections.first { count -> - count > 0 && count != lastSeen - } - context.pendingResegment = - context.maxConnections.value - log.i { - "Connection change detected for " + - "taskId=${context.taskId}: " + - "$lastSeen -> ${context.pendingResegment}" - } - throw CancellationException("Resegmenting") - } - - try { - val results = incompleteSegments.map { segment -> - async { - downloadSegment( - context, segment, - ) { bytesDownloaded -> - segmentMutex.withLock { - segmentProgress[segment.index] = - bytesDownloaded - } - updateProgress() - } - } - } - - for ((i, deferred) in results.withIndex()) { - val completed = deferred.await() - segmentMutex.withLock { - updatedSegments[completed.index] = completed - } - context.segments.value = currentSegments() - log.d { - "Segment ${completed.index} completed for " + - "taskId=${context.taskId}" - } - } - } finally { - watcherJob.cancel() - } - - context.segments.value = currentSegments() - true - } - } catch (e: CancellationException) { - if (context.pendingResegment > 0) { - withContext(NonCancellable) { - context.segments.value = currentSegments() - } - false - } else { - throw e - } + segmentHelper.downloadAll( + context, segments, totalBytes, + ) { segment, onProgress -> + downloadSegment(context, segment, onProgress) } } diff --git a/library/server/src/main/kotlin/com/linroid/ketch/server/api/ServerRoutes.kt b/library/server/src/main/kotlin/com/linroid/ketch/server/api/ServerRoutes.kt index dd605e4a..b3ab3796 100644 --- a/library/server/src/main/kotlin/com/linroid/ketch/server/api/ServerRoutes.kt +++ b/library/server/src/main/kotlin/com/linroid/ketch/server/api/ServerRoutes.kt @@ -34,7 +34,7 @@ internal fun Route.serverRoutes(ketch: KetchApi) { post { val body = call.receive() log.i { "POST /api/resolve url=${body.url}" } - val resolved = ketch.resolve(body.url, body.headers) + val resolved = ketch.resolve(body.url, body.properties) call.respond(resolved) } } From 9695ce24337e5af59a20d5d066a9a45c685e89e7 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 26 Feb 2026 05:16:56 +0000 Subject: [PATCH 3/5] refactor: add periodic resume state updates via updateResumeState() Add updateResumeState() to DownloadSource interface so sources can persist resume state incrementally during download. This completes the source-managed resume state lifecycle (Step 7 of the refactoring plan). - DownloadSource: new suspend fun updateResumeState() with default null - DownloadExecution: save job now calls updateResumeState() and persists the result alongside segment snapshots - TorrentDownloadSource: implements updateResumeState() to persist bitfield/resume data via activeSession, replacing the dead code in monitorProgress() that built resume state but never persisted it https://claude.ai/code/session_016RRifFe1AwMyjU8NYHrEqC --- .../ketch/core/engine/DownloadExecution.kt | 9 +++- .../ketch/core/engine/DownloadSource.kt | 16 +++++++ .../ketch/torrent/TorrentDownloadSource.kt | 48 +++++++++++-------- 3 files changed, 53 insertions(+), 20 deletions(-) diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadExecution.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadExecution.kt index caeb1725..855dd4e9 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadExecution.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadExecution.kt @@ -172,6 +172,7 @@ internal class DownloadExecution( val preResolved = if (resolved != null) resolvedUrl else null runDownload( outputPath, total, source.managesOwnFileIo, preResolved, + source = source, ) { ctx -> source.download(ctx) } @@ -208,6 +209,7 @@ internal class DownloadExecution( runDownload( outputPath, taskRecord.totalBytes, source.managesOwnFileIo, + source = source, ) { ctx -> context = ctx source.resume(ctx, resumeState) @@ -227,6 +229,7 @@ internal class DownloadExecution( total: Long, selfManagedIo: Boolean = false, preResolved: ResolvedSource? = null, + source: DownloadSource? = null, downloadBlock: suspend (DownloadContext) -> Unit, ) { val fa = if (selfManagedIo) { @@ -246,10 +249,14 @@ internal class DownloadExecution( while (true) { delay(config.saveIntervalMs) val snapshot = handle.mutableSegments.value - val downloaded = snapshot.sumOf { it.downloadedBytes } + val updatedResume = context?.let { + source?.updateResumeState(it) + } handle.record.update { it.copy( segments = snapshot, + sourceResumeState = updatedResume + ?: it.sourceResumeState, updatedAt = Clock.System.now(), ) } diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadSource.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadSource.kt index c456c00d..ee950001 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadSource.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/engine/DownloadSource.kt @@ -76,4 +76,20 @@ interface DownloadSource { resolved: ResolvedSource, totalBytes: Long, ): SourceResumeState + + /** + * Called periodically during download to let the source update + * its resume state. The returned state replaces the current + * resume state in the task record. + * + * Default implementation returns `null` (no update needed). + * Sources like BitTorrent override this to persist bitfield + * progress incrementally. + * + * @param context the active download context + * @return updated resume state, or `null` to keep the current one + */ + suspend fun updateResumeState( + context: DownloadContext, + ): SourceResumeState? = null } diff --git a/library/torrent/src/commonMain/kotlin/com/linroid/ketch/torrent/TorrentDownloadSource.kt b/library/torrent/src/commonMain/kotlin/com/linroid/ketch/torrent/TorrentDownloadSource.kt index 95dd5a75..9369e58a 100644 --- a/library/torrent/src/commonMain/kotlin/com/linroid/ketch/torrent/TorrentDownloadSource.kt +++ b/library/torrent/src/commonMain/kotlin/com/linroid/ketch/torrent/TorrentDownloadSource.kt @@ -44,6 +44,7 @@ class TorrentDownloadSource( override val managesOwnFileIo: Boolean = true private var engine: TorrentEngine? = null + private var activeSession: TorrentSession? = null private suspend fun getEngine(): TorrentEngine { val existing = engine @@ -79,6 +80,28 @@ class TorrentDownloadSource( ) } + override suspend fun updateResumeState( + context: DownloadContext, + ): SourceResumeState? { + val session = activeSession ?: return null + val resumeData = session.saveResumeData() ?: return null + val selectedIds = context.request.selectedFileIds.ifEmpty { + context.segments.value.map { it.index.toString() }.toSet() + } + return SourceResumeState( + sourceType = TYPE, + data = Json.encodeToString( + TorrentResumeState( + infoHash = session.infoHash, + totalBytes = context.segments.value.sumOf { it.totalBytes }, + resumeData = encodeBase64(resumeData), + selectedFileIds = selectedIds, + savePath = extractSavePath(context), + ), + ), + ) + } + override suspend fun resolve( url: String, properties: Map, @@ -204,6 +227,7 @@ class TorrentDownloadSource( session.setDownloadRateLimit(speedLimit.bytesPerSecond) } + activeSession = session try { monitorProgress(context, session, segments, totalBytes) } catch (e: CancellationException) { @@ -212,6 +236,8 @@ class TorrentDownloadSource( } catch (e: Exception) { if (e is KetchError) throw e throw KetchError.SourceError(TYPE, e) + } finally { + activeSession = null } } @@ -271,6 +297,7 @@ class TorrentDownloadSource( val segments = context.segments.value val totalBytes = state.totalBytes + activeSession = session try { monitorProgress(context, session, segments, totalBytes) } catch (e: CancellationException) { @@ -279,6 +306,8 @@ class TorrentDownloadSource( } catch (e: Exception) { if (e is KetchError) throw e throw KetchError.SourceError(TYPE, e) + } finally { + activeSession = null } } @@ -318,25 +347,6 @@ class TorrentDownloadSource( // Final progress update updateSegmentProgress(context, segments, totalBytes, totalBytes) context.onProgress(totalBytes, totalBytes) - - // Save resume data for potential re-seeding - val resumeData = session.saveResumeData() - if (resumeData != null) { - val savePath = extractSavePath(context) - val selectedIds = context.request.selectedFileIds.ifEmpty { - segments.map { it.index.toString() }.toSet() - } - val sourceState = TorrentResumeState( - infoHash = session.infoHash, - totalBytes = totalBytes, - resumeData = encodeBase64(resumeData), - selectedFileIds = selectedIds, - savePath = savePath, - ) - // Store resume state by updating the context's segments - // The DownloadExecution will persist this through TaskRecord - log.d { "Saved torrent resume data for ${session.infoHash}" } - } } /** From 38f94624dc6b8da8763d50e625ee6f07f166b8fe Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 26 Feb 2026 05:17:29 +0000 Subject: [PATCH 4/5] docs: add architecture refactoring plan Documents the 7-step plan for decoupling the download architecture from HTTP-specific assumptions, enabling clean support for BitTorrent, HLS, and future protocol sources. https://claude.ai/code/session_016RRifFe1AwMyjU8NYHrEqC --- plan.md | 348 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 348 insertions(+) create mode 100644 plan.md diff --git a/plan.md b/plan.md new file mode 100644 index 00000000..f497f330 --- /dev/null +++ b/plan.md @@ -0,0 +1,348 @@ +# Refactoring Plan: Protocol-Agnostic Download Architecture + +## Problem Statement + +The core download engine was designed around HTTP Range requests. As we add more +protocols (FTP is already in, BitTorrent and HLS are planned), several HTTP-specific +assumptions leak through the architecture, causing: + +1. **Duplicated logic** — FTP reimplements the entire segment download loop + (`downloadSegments`/`downloadBatch`) that HttpDownloadSource also has +2. **Hardcoded HTTP references in core** — `DownloadExecution` directly references + `HttpDownloadSource.META_ETAG`, `HttpDownloadSource.buildResumeState()`, and + `HttpDownloadSource.TYPE` as fallbacks +3. **HTTP fields in TaskRecord** — `etag`, `lastModified`, `acceptRanges` are + HTTP-specific but baked into the persistent record shared by all sources +4. **SegmentDownloader is HTTP-only** — Takes `HttpEngine` directly; FTP can't + reuse it and had to build its own segment download implementation +5. **Protocol mismatch in DownloadSource interface** — `resolve()` takes `headers` + parameter that means nothing for FTP, BitTorrent, etc. + +### What breaks with BitTorrent? + +BitTorrent has fundamentally different semantics: +- **Pieces** not byte-range segments (pieces can span files) +- **Peer connections** not HTTP connections to a single server +- **Multi-file** by default (a torrent contains a file tree) +- **Own file I/O** — the piece-to-file mapping doesn't match FileAccessor's + `writeAt(offset, data)` model +- **Resume** via bitfield (which pieces are complete), not byte offsets +- **No server identity** — no ETag/Last-Modified equivalent; info hash IS the identity +- **Progress** is piece-count based, not byte-offset based + +The current architecture would force BitTorrent to fight `Segment`, ignore +`headers`, fake `etag`/`lastModified` fields in TaskRecord, and reimplement +the entire download loop (as FTP already did). + +--- + +## Refactoring Steps + +### Step 1: Remove HTTP-specific fields from `TaskRecord` + +**Files:** `library/core/.../task/TaskRecord.kt` + +Remove `etag`, `lastModified`, and `acceptRanges` from `TaskRecord`. All +source-specific metadata should live in `sourceResumeState: SourceResumeState?` +which is already the proper abstraction (opaque JSON blob per source type). + +```kotlin +// BEFORE +data class TaskRecord( + val taskId: String, + val request: DownloadRequest, + val outputPath: String? = null, + val state: TaskState = TaskState.QUEUED, + val totalBytes: Long = -1, + val error: KetchError? = null, + val acceptRanges: Boolean? = null, // ← HTTP-specific + val etag: String? = null, // ← HTTP-specific + val lastModified: String? = null, // ← HTTP-specific + val segments: List? = null, + val sourceType: String? = null, + val sourceResumeState: SourceResumeState? = null, + val createdAt: Instant, + val updatedAt: Instant, +) + +// AFTER +data class TaskRecord( + val taskId: String, + val request: DownloadRequest, + val outputPath: String? = null, + val state: TaskState = TaskState.QUEUED, + val totalBytes: Long = -1, + val error: KetchError? = null, + val segments: List? = null, + val sourceType: String? = null, + val sourceResumeState: SourceResumeState? = null, + val createdAt: Instant, + val updatedAt: Instant, +) +``` + +**Migration:** HTTP source stores its etag/lastModified/acceptRanges in +`HttpResumeState` (it already does this for resume — just make it the +sole storage location from the start). + +--- + +### Step 2: Remove `HttpDownloadSource` references from `DownloadExecution` + +**Files:** `library/core/.../engine/DownloadExecution.kt` + +Currently `DownloadExecution` has three direct references to `HttpDownloadSource`: +- Line 165-168: Reads `HttpDownloadSource.META_ETAG` / `META_LAST_MODIFIED` from + resolved metadata to store in TaskRecord +- Line 204: Calls `HttpDownloadSource.buildResumeState()` as fallback when + `taskRecord.sourceResumeState` is null +- Line 280: Calls `HttpDownloadSource.buildResumeState()` on completion + +**Refactoring approach:** Add a `buildResumeState()` method to `DownloadSource` so +each source can build its own resume state from the resolved metadata. Make the +source responsible for persisting and restoring its state. + +```kotlin +// Add to DownloadSource interface: +interface DownloadSource { + // ... existing methods ... + + /** + * Builds a resume state from resolved metadata after a successful + * download or before pausing. Sources use this to persist any + * source-specific state needed for resume validation. + */ + fun buildResumeState(resolved: ResolvedSource, totalBytes: Long): SourceResumeState +} +``` + +Then `DownloadExecution.executeFresh()` becomes: + +```kotlin +// BEFORE (HTTP-specific) +handle.record.update { + it.copy( + etag = resolvedUrl.metadata[HttpDownloadSource.META_ETAG], + lastModified = resolvedUrl.metadata[HttpDownloadSource.META_LAST_MODIFIED], + sourceType = source.type, + ... + ) +} + +// AFTER (protocol-agnostic) +handle.record.update { + it.copy( + sourceType = source.type, + sourceResumeState = source.buildResumeState(resolvedUrl, total), + ... + ) +} +``` + +And `executeResume()` no longer needs `HttpDownloadSource.buildResumeState()` +fallback since `sourceResumeState` is always populated: + +```kotlin +// BEFORE +val resumeState = taskRecord.sourceResumeState + ?: HttpDownloadSource.buildResumeState( + etag = taskRecord.etag, + lastModified = taskRecord.lastModified, + totalBytes = taskRecord.totalBytes, + ) + +// AFTER +val resumeState = taskRecord.sourceResumeState + ?: throw KetchError.CorruptResumeState("No resume state for taskId=$taskId") +``` + +--- + +### Step 3: Extract shared segmented download logic into `SegmentedDownloadHelper` + +**Files:** New `library/core/.../segment/SegmentedDownloadHelper.kt` + +Both `HttpDownloadSource` and `FtpDownloadSource` implement nearly identical +download loop logic: +- `downloadSegments()` — outer loop with resegmentation +- `downloadBatch()` — concurrent segment execution with connection-change watcher +- Progress aggregation with throttled updates +- Mutex-protected segment state + +Extract this into a reusable helper that both sources can delegate to: + +```kotlin +internal class SegmentedDownloadHelper( + private val progressIntervalMs: Long = 200, +) { + /** + * Downloads segments concurrently with dynamic resegmentation support. + * + * @param downloadSegment called for each individual segment; + * the source provides the protocol-specific download logic + */ + suspend fun downloadAll( + context: DownloadContext, + segments: List, + totalBytes: Long, + downloadSegment: suspend (Segment, onProgress: suspend (Long) -> Unit) -> Segment, + ) { /* shared loop logic */ } +} +``` + +`HttpDownloadSource` passes a lambda that creates `SegmentDownloader` per segment. +`FtpDownloadSource` passes a lambda that opens an FTP connection per segment. +Future sources (HLS) pass their own segment download logic. + +This eliminates ~200 lines of duplicated code between HTTP and FTP sources. + +--- + +### Step 4: Decouple `SegmentDownloader` from `HttpEngine` + +**Files:** `library/core/.../segment/SegmentDownloader.kt` + +Currently `SegmentDownloader` takes `HttpEngine` directly and calls +`httpEngine.download(url, range, headers)`. It should instead accept a +generic download function: + +```kotlin +// BEFORE +internal class SegmentDownloader( + private val httpEngine: HttpEngine, + private val fileAccessor: FileAccessor, + private val taskLimiter: SpeedLimiter, + private val globalLimiter: SpeedLimiter, +) + +// AFTER — simplified since SegmentedDownloadHelper handles the loop +// SegmentDownloader remains HTTP-specific but is clearly scoped to +// HttpDownloadSource only (not used by other sources). +``` + +With Step 3's `SegmentedDownloadHelper`, `SegmentDownloader` becomes an +internal detail of `HttpDownloadSource` rather than a shared component that +other sources are expected to (but can't) use. + +--- + +### Step 5: Remove `headers` from `DownloadSource.resolve()` signature + +**Files:** `library/core/.../engine/DownloadSource.kt`, all implementations + +The `headers` parameter on `resolve()` is HTTP-specific. Non-HTTP sources ignore it. +Instead, source-specific configuration should come through the URL or constructor: + +```kotlin +// BEFORE +suspend fun resolve( + url: String, + headers: Map = emptyMap(), +): ResolvedSource + +// AFTER +suspend fun resolve( + url: String, + properties: Map = emptyMap(), +): ResolvedSource +``` + +Rename `headers` to `properties` — a generic key-value map. HTTP source reads +HTTP headers from it. FTP source ignores it (or reads credentials). BitTorrent +could read tracker preferences. The semantics are source-defined. + +This also aligns with `DownloadRequest.properties` which already exists for +arbitrary extension data. + +**Alternative (simpler):** Just pass the entire `DownloadRequest` to `resolve()` +so each source can pick what it needs: + +```kotlin +suspend fun resolve(request: DownloadRequest): ResolvedSource +``` + +This is cleaner because `DownloadRequest` already has `url`, `headers`, +`properties`, `connections`, and `selectedFileIds` — everything a source +might need. + +--- + +### Step 6: Remove `ServerInfo` dependency from `FileNameResolver` + +**Files:** `library/core/.../file/FileNameResolver.kt`, `DownloadExecution.kt` + +`DownloadExecution.toServerInfo()` converts `ResolvedSource` to `ServerInfo` +just to pass to `FileNameResolver`. This couples filename resolution to +HTTP-specific `ServerInfo`. Instead, `FileNameResolver` should work with +`ResolvedSource` directly: + +```kotlin +// BEFORE +interface FileNameResolver { + fun resolve(request: DownloadRequest, serverInfo: ServerInfo): String? +} + +// AFTER +interface FileNameResolver { + fun resolve(request: DownloadRequest, resolved: ResolvedSource): String? +} +``` + +`ServerInfo` then becomes purely internal to `HttpDownloadSource` / +`RangeSupportDetector` — it's an HTTP-layer detail, not a core model. + +--- + +### Step 7: Source-managed resume state lifecycle + +**Files:** `DownloadSource.kt`, `DownloadExecution.kt`, `DownloadCoordinator.kt` + +Add a `saveResumeState()` to `DownloadSource` so sources can update their resume +state periodically (not just at download end): + +```kotlin +interface DownloadSource { + // ... existing ... + + /** + * Called periodically during download to let the source update its + * resume state. Called by the save-interval job in DownloadExecution. + * Default implementation returns null (no update needed). + */ + fun updateResumeState(context: DownloadContext): SourceResumeState? = null +} +``` + +This lets BitTorrent persist its bitfield incrementally, while HTTP sources +can return null (their resume state doesn't change mid-download). + +--- + +## Summary of Changes + +| # | Change | Files Touched | Risk | +|---|--------|---------------|------| +| 1 | Remove HTTP fields from TaskRecord | TaskRecord, DownloadExecution, HttpDownloadSource | Medium — migration of persisted data | +| 2 | Remove HttpDownloadSource refs from DownloadExecution | DownloadExecution, DownloadSource | Low | +| 3 | Extract SegmentedDownloadHelper | New file + HttpDownloadSource + FtpDownloadSource | Medium — refactoring shared logic | +| 4 | Scope SegmentDownloader to HTTP | SegmentDownloader, HttpDownloadSource | Low | +| 5 | Rename headers→properties in resolve() | DownloadSource, all impls, DownloadExecution | Low | +| 6 | FileNameResolver uses ResolvedSource | FileNameResolver, DownloadExecution | Low | +| 7 | Source-managed resume state lifecycle | DownloadSource, DownloadExecution | Low | + +### Execution Order + +Steps 1-2 should be done together (they're tightly coupled). Step 3 is the +largest change but is mostly mechanical. Steps 4-7 are independent and can +be done in any order after 1-2. + +### What stays the same + +- `KetchApi` interface — no public API changes +- `DownloadTask` interface — no changes +- `Segment` model — still used for byte-range protocols (HTTP, FTP); BitTorrent + maps pieces to segments for progress reporting +- `DownloadQueue`, `DownloadScheduler` — unchanged (protocol-agnostic already) +- `SpeedLimiter`, `TokenBucket` — unchanged +- `DownloadConfig` — unchanged +- `DownloadState`, `DownloadProgress` — unchanged From 9a4bd6c7b18291983380ce16728d31da14c9075e Mon Sep 17 00:00:00 2001 From: Lin Zhang Date: Thu, 26 Feb 2026 18:13:22 +0800 Subject: [PATCH 5/5] Add db migration --- .../core/segment/SegmentedDownloadHelper.kt | 2 +- .../src/commonMain/sqldelight/databases/2.db | Bin 0 -> 12288 bytes .../src/commonMain/sqldelight/databases/3.db | Bin 0 -> 12288 bytes .../commonMain/sqldelight/migrations/2.sqm | 26 ++++++++++++++++++ 4 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 library/sqlite/src/commonMain/sqldelight/databases/2.db create mode 100644 library/sqlite/src/commonMain/sqldelight/databases/3.db create mode 100644 library/sqlite/src/commonMain/sqldelight/migrations/2.sqm diff --git a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/segment/SegmentedDownloadHelper.kt b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/segment/SegmentedDownloadHelper.kt index b196d0ad..31bad94a 100644 --- a/library/core/src/commonMain/kotlin/com/linroid/ketch/core/segment/SegmentedDownloadHelper.kt +++ b/library/core/src/commonMain/kotlin/com/linroid/ketch/core/segment/SegmentedDownloadHelper.kt @@ -29,7 +29,7 @@ import kotlin.time.Duration.Companion.milliseconds * reports in milliseconds * @param tag log tag for this helper instance */ -internal class SegmentedDownloadHelper( +class SegmentedDownloadHelper( private val progressIntervalMs: Long = 200, tag: String = "SegmentHelper", ) { diff --git a/library/sqlite/src/commonMain/sqldelight/databases/2.db b/library/sqlite/src/commonMain/sqldelight/databases/2.db new file mode 100644 index 0000000000000000000000000000000000000000..975363890a97aecafc3f6970812f4de016452a6d GIT binary patch literal 12288 zcmeI#O;6h}7zc2tn>JLbf*aT8Ql*{P)M|%amw`bQp<`{3;AEM`?XtCTu^q9UcG!Uv z9|`fbIL!xS+o!C-H0i{F2Kv{E`+<(O8?Abo8EWu_$66i5O#%uP=OEme78= zZ1{U3GmPN(N_i z$j^5CB%>}WWu5qDb@^~+7}b@VS5C~O`??qmef|hOv3$7c9^X$+(v0F`-rwu>58^P< zX(y-R{HIfvdi{(a@WJXB%N1|CVX7RB)Z7Up%gJhT81S)A{=RuC{Xu6sFUEzs4{LEA zHJPc>*;NRkcVv`5W-fg4``>rBc9UV&vd)ZMF;lI$Y2zJ=OMMx)DH$%;wdoCYJDpBD zY+KFqr`T2*qs=OSdiz7|)%IaCVnvy&ANFGEb9o^vrx$Ip{;5TGy%qBMjqpb`!euL5 zP#^#S2tWV=5P$##AOHafKmY;|c&-BV5E%0R=lXdO7X%;x0SG_<0uX=z1Rwwb2tZ&V F@CV2*xL5!H literal 0 HcmV?d00001 diff --git a/library/sqlite/src/commonMain/sqldelight/databases/3.db b/library/sqlite/src/commonMain/sqldelight/databases/3.db new file mode 100644 index 0000000000000000000000000000000000000000..bb338e25df8fa97da6d61e79113c8a6b80a91828 GIT binary patch literal 12288 zcmeI#Jx}8>7zc2tcY;%;3O3efsnQ}2wPLuH00vcr0&S3BvO;5^(ALFvbTBh8@tIio zP>g2>RZR!PbPM`ha@xo5{w2%PKaGc0(v{NV)Y2APWkJBUi5O#nt3OqQ!;BJso>2GL)w4LJlV0a2>Rr$wzu0ht zw7k!yvEtUK$>K=gt8zQdt(d0vrs(u-?g%cwTwHXI??-2GLg5MT?sd8cp&w|KO-JJL z(Mm&|Zo&_EZ+?u~jNk2IO_l3`6!vlYv2v+ROOubKD0ZEnvKSz*$8uuLV(UOlcfL%x z~Jg_Vcmoq6i(FNut`yWHm^zR)NZv}O}|+_&Hl%-QfsB>7pOM> zRF>8b>j4{1GWlTcM=qBaX>L^!i}E`b?Q-Pv`d|Kws`