diff --git a/README.md b/README.md index 0c2866c..4b0ef8a 100644 --- a/README.md +++ b/README.md @@ -1,329 +1,27 @@ -# MediaFlow - Configurable Media Processing API +# MediaFlow -A lightweight Go service for processing and serving images with YAML-configurable storage options and S3 integration. +A lightweight Go service that handles media uploads, validation, and serving. Backed by S3/R2 by default; can route video uploads directly to Cloudflare Stream. -## Features +## What it does -- **Presigned Uploads**: `/v1/uploads/presign` - secure direct-to-S3 uploads with validation -- **Original Image Serving**: `/originals/{type}/{image_id}` - serve original images directly from storage -- **Thumbnail Generation**: `/thumb/{type}/{image_id}` - on-demand thumbnail generation -- **Unified Configuration**: Profile-based YAML config combining upload and processing rules -- **Multiple Formats**: Convert images to WebP, JPEG, PNG with configurable quality -- **Video Support**: Ready for video upload and processing (processing features coming soon) -- **S3 Integration**: Direct S3 uploads with multipart support for large files -- **CDN-Optimized**: Cache-Control and ETag headers for optimal CDN performance -- **Graceful Shutdown**: Production-ready server lifecycle management +- **Presigned uploads** — secure direct-to-S3 (or direct-to-Stream) uploads with profile-driven validation +- **Image processing** — on-demand thumbnails with WebP/JPEG/PNG output +- **Video probe** — validates duration, dimensions, and codec against profile constraints (ffprobe for R2, Stream API for Stream) +- **Cloudflare Stream delivery** — opt-in per profile, bytes go straight to Stream with no R2 round-trip - -## Future Features -- Other media support (currently on images) -- Server level caching of high frequency media - -## API Endpoints - -### Presigned Uploads -``` -POST /v1/uploads/presign -``` -Generates presigned URLs for secure direct-to-S3 uploads. - -**Request Body:** -```json -{ - "key_base": "unique-file-id", - "ext": "jpg", - "mime": "image/jpeg", - "size_bytes": 1024000, - "kind": "image", - "profile": "avatar", - "multipart": "auto" -} -``` - -**Response for Single Upload:** -```json -{ - "object_key": "originals/avatars/ab/unique-file-id.jpg", - "upload": { - "single": { - "method": "PUT", - "url": "https://presigned-s3-url", - "headers": { - "Content-Type": "image/jpeg", - "If-None-Match": "*" - }, - "expires_at": "2024-01-01T12:00:00Z" - } - } -} -``` - -**Response for Multipart Upload:** -```json -{ - "object_key": "originals/avatars/ab/unique-file-id.jpg", - "upload": { - "multipart": { - "upload_id": "abc123xyz", - "part_size": 8388608, - "parts": [ - { - "part_number": 1, - "method": "PUT", - "url": "https://presigned-s3-part-url-1", - "headers": {"Content-Type": "image/jpeg"}, - "expires_at": "2024-01-01T12:00:00Z" - } - ], - "complete": { - "method": "POST", - "url": "https://your-api/v1/uploads/originals%2Favatars%2Fab%2Funique-file-id.jpg/complete/abc123xyz", - "headers": {"Content-Type": "application/json"}, - "expires_at": "2024-01-01T12:00:00Z" - }, - "abort": { - "method": "DELETE", - "url": "https://your-api/v1/uploads/originals%2Favatars%2Fab%2Funique-file-id.jpg/abort/abc123xyz", - "headers": {}, - "expires_at": "2024-01-01T12:00:00Z" - } - } - } -} -``` - -**Parameters:** -- `key_base`: Unique identifier for the file -- `ext`: File extension (optional, for backward compatibility) -- `mime`: MIME type of the file -- `size_bytes`: File size in bytes -- `kind`: Media type (`image` or `video`) -- `profile`: Configuration profile to use (`avatar`, `photo`, `video`, etc.) -- `multipart`: Upload strategy (`auto`, `force`, or `off`) - -### Multipart Upload Completion -``` -POST /v1/uploads/{object_key}/complete/{upload_id} -``` -Completes a multipart upload by providing the ETags for all uploaded parts. - -**Request Body:** -```json -{ - "parts": [ - { - "part_number": 1, - "etag": "\"d41d8cd98f00b204e9800998ecf8427e\"" - }, - { - "part_number": 2, - "etag": "\"098f6bcd4621d373cade4e832627b4f6\"" - } - ] -} -``` - -**Response:** -```json -{ - "status": "completed", - "object_key": "originals/avatars/ab/unique-file-id.jpg" -} -``` - -### Multipart Upload Abort -``` -DELETE /v1/uploads/{object_key}/abort/{upload_id} -``` -Aborts a multipart upload and cleans up any uploaded parts. - -**Response:** -```json -{ - "status": "aborted", - "upload_id": "abc123xyz" -} -``` - -### Thumbnails -``` -GET /thumb/{type}/{image_id}?width=512 -POST /thumb/{type}/{image_id} -``` -Generates and serves thumbnails with configurable dimensions. POST requests require authentication. - -**GET Parameters:** -- `type`: Image category (avatar, photo, banner, or any configured type) -- `image_id`: Unique identifier for the image -- `width`: Image width in pixels (optional, defaults to the type's `default_size` from storage config) - -**POST Parameters:** -- Requires authentication (API key) -- Request body should contain the image data -- Used for uploading images to be processed - -### Original Images -``` -GET /originals/{type}/{image_id} -``` -Serves original images directly from storage. - -**Parameters:** -- `type`: Image category (avatar, photo, banner, or any configured type) -- `image_id`: Unique identifier for the image - -### Health Check -``` -GET /health -``` -Returns service health status. - -## Configuration - -### Storage Configuration (storage-config.yaml) - -MediaFlow uses YAML configuration to define profiles that combine upload settings and processing rules: - -```yaml -profiles: - avatar: - # Upload configuration - kind: "image" - allowed_mimes: ["image/jpeg", "image/png", "image/webp"] - size_max_bytes: 5242880 # 5MB - multipart_threshold_mb: 15 - part_size_mb: 8 - token_ttl_seconds: 900 # 15 minutes - storage_path: "originals/avatars/{shard?}/{key_base}" - enable_sharding: true - - # Processing configuration - thumb_folder: "thumbnails/avatars" - sizes: ["128", "256"] - default_size: "256" - quality: 90 - convert_to: "webp" - - photo: - kind: "image" - allowed_mimes: ["image/jpeg", "image/png", "image/webp"] - size_max_bytes: 20971520 # 20MB - multipart_threshold_mb: 15 - part_size_mb: 8 - token_ttl_seconds: 900 - storage_path: "originals/photos/{shard?}/{key_base}" - enable_sharding: true - - thumb_folder: "thumbnails/photos" - sizes: ["256", "512", "1024"] - default_size: "256" - quality: 90 - convert_to: "webp" - - video: - kind: "video" - allowed_mimes: ["video/mp4", "video/quicktime", "video/webm"] - size_max_bytes: 104857600 # 100MB - multipart_threshold_mb: 15 - part_size_mb: 8 - token_ttl_seconds: 1800 # 30 minutes - storage_path: "originals/videos/{shard?}/{key_base}" - enable_sharding: true - - thumb_folder: "posters/videos" # Video thumbnails - proxy_folder: "proxies/videos" # Compressed versions - formats: ["mp4", "webm"] - quality: 80 - - default: - kind: "image" - allowed_mimes: ["image/jpeg", "image/png"] - size_max_bytes: 10485760 # 10MB - multipart_threshold_mb: 15 - part_size_mb: 8 - token_ttl_seconds: 900 - storage_path: "originals/{shard?}/{key_base}" - enable_sharding: true - - thumb_folder: "thumbnails" - sizes: ["256", "512"] - default_size: "256" - quality: 90 - convert_to: "webp" -``` - -### Configuration Fields - -#### Upload Configuration -- `kind`: Media type (`image` or `video`) -- `allowed_mimes`: Array of allowed MIME types -- `size_max_bytes`: Maximum file size in bytes -- `multipart_threshold_mb`: Size threshold for multipart uploads -- `part_size_mb`: Size of each multipart chunk -- `token_ttl_seconds`: Presigned URL expiration time -- `storage_path`: Template for where files are stored in S3 (supports `{key_base}`, `{ext}`, `{shard}`, `{shard?}`) -- `enable_sharding`: Whether to use sharding for load distribution - -#### Processing Configuration -- `thumb_folder`: Folder for storing thumbnails -- `sizes`: Available thumbnail sizes -- `default_size`: Default thumbnail size if none specified -- `quality`: Image compression quality (1-100) -- `convert_to`: Format to convert images to (`webp`, `jpeg`, etc.) - -#### Storage Path Templates -The `storage_path` field uses a template system to define where files are stored: -- `{key_base}`: The unique file identifier -- `{ext}`: File extension -- `{shard}`: Shard value (only when `enable_sharding: true`) -- `{shard?}`: Optional shard (removed when `enable_sharding: false`) - -**Sharding Modes:** - -**Auto-sharding** (`enable_sharding: true`): -- `"originals/{shard?}/{key_base}"` → `originals/ab/my-file.jpg` -- Shards auto-generated from key_base hash -- Clients can optionally provide custom shard in request - -**Fixed organization** (`enable_sharding: false`): -- `"originals/user123/{key_base}"` → `originals/user123/my-file.jpg` -- `"uploads/{year}/{month}/{key_base}"` → Custom organization -- Any `{shard}` placeholders are removed -- Custom shards in requests are ignored - -Examples: -- `"originals/{key_base}"` → `originals/my-file.jpg` -- `"uploads/{shard?}/{key_base}"` → `uploads/ab/my-file.jpg` (with sharding) -- `"users/team-marketing/{key_base}"` → Fixed custom prefix - -### Environment Variables - -Create a `.env` file for local development: +## Quick start ```bash -# Required -S3_BUCKET=your-bucket-name - -# AWS Credentials (use one of the following methods) -# Method 1: Direct credentials (local development) -AWS_ACCESS_KEY_ID=your-access-key -AWS_SECRET_ACCESS_KEY=your-secret-key - -# Method 2: IAM Role (recommended for ECS/EC2) -# No credentials needed - uses IAM role attached to ECS task/EC2 instance - -# Optional -S3_REGION=us-east-1 -PORT=8080 -CACHE_MAX_AGE=86400 -STORAGE_CONFIG_PATH=storage-config.yaml +git clone https://github.com/syntaxsdev/mediaflow +cd mediaflow +cp .env.example .env # set S3_BUCKET + AWS / R2 credentials +go mod download +make run # or `make run-air` for hot reload ``` -## Docker Deployment +Or via Docker: -### Using Pre-built Image ```bash -# Pull and run the latest image docker run -p 8080:8080 \ -e S3_BUCKET=your-bucket-name \ -e AWS_ACCESS_KEY_ID=your-key \ @@ -332,103 +30,33 @@ docker run -p 8080:8080 \ syntaxsdev/mediaflow:latest ``` -### Building from Source -```bash -make build-image -``` +## Documentation -## Production Deployment +- [API reference](docs/api.md) — endpoints, request/response shapes, error codes +- [Configuration](docs/configuration.md) — `storage-config.yaml` profiles, fields, env vars +- [Cloudflare Stream delivery](docs/stream.md) — opt-in flow for video profiles +- [Deployment](docs/deployment.md) — Docker, ECS, IAM, R2, lifecycle rules -### ECS Deployment -1. Create an IAM role with S3 permissions (see AWS S3 IAM Integration section) -2. Create an ECS task definition using `syntaxsdev/mediaflow:latest` -3. Attach the IAM role to your ECS task -4. Set environment variables: `S3_BUCKET`, `S3_REGION`, `PORT` -5. Mount or include your `storage-config.yaml` file -6. Configure your Application Load Balancer to forward requests to the ECS service +## Make targets -### General Deployment Steps -1. Set environment variables for your deployment platform -2. Ensure `storage-config.yaml` is available to the container -3. Configure your load balancer to forward requests to the service -4. Set up appropriate S3 bucket policies and IAM roles -5. Consider using a CDN (CloudFront) for better performance and caching +- `make run` — build and run +- `make run-air` — hot reload via [Air](https://github.com/cosmtrek/air) +- `make build` — build the binary +- `make build-image` — build the Docker image +- `make clean` — remove artifacts -#### For EC2 Deployments -1. Attach the same IAM role to your EC2 instance -2. MediaFlow will use the instance's IAM role credentials automatically +## Prerequisites -### AWS S3 IAM Integration - -MediaFlow supports automatic AWS S3 authentication through IAM roles, making it ideal for ECS deployments: - -#### For ECS Deployments (Recommended) -1. Create an IAM role with S3 permissions: -```json -{ - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Action": [ - "s3:GetObject", - "s3:PutObject", - "s3:DeleteObject", - "s3:ListBucket" - ], - "Resource": [ - "arn:aws:s3:::your-bucket-name", - "arn:aws:s3:::your-bucket-name/*" - ] - } - ] -} -``` - -2. Attach the IAM role to your ECS task definition -3. Remove `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` from environment variables -4. MediaFlow will automatically use the ECS task's IAM role credentials - - -## Development - -### Prerequisites - Go 1.24.5+ -- [Air](https://github.com/cosmtrek/air) for hot reloading (optional) - -### Local Development - -```bash -# Clone and setup -git clone https://github.com/syntaxsdev/mediaflow -cd mediaflow -cp .env.example .env # Edit with your AWS credentials - -# Install dependencies -go mod download - -# Development with hot reload -make run-air - -# Or build and run -make run -``` - -### Make Targets - -- `make run` - Build and run the server -- `make run-air` - Run with Air for hot reloading during development -- `make build` - Build the binary -- `make clean` - Remove built binary +- ffmpeg (only if any profile is `kind: video` and `delivery` is *not* `stream` — the runtime image already includes it) ## Contributing -1. Fork the repository +1. Fork 2. Create a feature branch -3. Make your changes -4. Add tests if applicable -5. Submit a pull request +3. Make changes (add tests where applicable) +4. Open a PR ## License -MIT License \ No newline at end of file +MIT diff --git a/docs/api.md b/docs/api.md new file mode 100644 index 0000000..8361470 --- /dev/null +++ b/docs/api.md @@ -0,0 +1,211 @@ +# API Reference + +All write endpoints (`POST` / `PUT` / `PATCH` / `DELETE`) require the `API_KEY` env var when set; pass it as `Authorization: Bearer `. + +## Presigned Uploads + +``` +POST /v1/uploads/presign +``` + +Generates a presigned URL for direct upload — to S3/R2 by default, or directly to Cloudflare Stream when the profile has `delivery: stream`. + +**Request body:** +```json +{ + "key_base": "unique-file-id", + "ext": "jpg", + "mime": "image/jpeg", + "size_bytes": 1024000, + "kind": "image", + "profile": "avatar", + "multipart": "auto" +} +``` + +| Field | Notes | +|---|---| +| `key_base` | Unique identifier. Ignored for `delivery: stream` profiles — Stream assigns the UID. | +| `ext` | File extension (used in `storage_path` template). | +| `mime` | Validated against profile `allowed_mimes`. | +| `size_bytes` | Validated against profile `size_max_bytes`. For Stream, also chooses POST (≤200MB) vs TUS. | +| `kind` | `image` or `video`. Must match the profile's `kind`. | +| `profile` | Name from `storage-config.yaml`. | +| `multipart` | `auto` / `force` / `off`. Ignored for Stream delivery. | + +**Response — single PUT (R2):** +```json +{ + "object_key": "originals/avatars/ab/unique-file-id.jpg", + "upload": { + "single": { + "method": "PUT", + "url": "https://presigned-s3-url", + "headers": { "Content-Type": "image/jpeg", "If-None-Match": "*" }, + "expires_at": "2024-01-01T12:00:00Z" + } + } +} +``` + +**Response — multipart (R2):** +```json +{ + "object_key": "originals/avatars/ab/unique-file-id.jpg", + "upload": { + "multipart": { + "upload_id": "abc123xyz", + "part_size": 8388608, + "parts": [ + { + "part_number": 1, + "method": "PUT", + "url": "https://presigned-s3-part-url-1", + "headers": { "Content-Type": "image/jpeg" }, + "expires_at": "2024-01-01T12:00:00Z" + } + ], + "complete": { + "method": "POST", + "url": "https://your-api/v1/uploads/.../complete/abc123xyz", + "headers": { "Content-Type": "application/json" }, + "expires_at": "2024-01-01T12:00:00Z" + }, + "abort": { + "method": "DELETE", + "url": "https://your-api/v1/uploads/.../abort/abc123xyz", + "headers": {}, + "expires_at": "2024-01-01T12:00:00Z" + } + } + } +} +``` + +**Response — Cloudflare Stream:** +```json +{ + "object_key": "8f2a3c4d5e6f7890abcdef1234567890", + "upload": { + "stream": { + "method": "POST", + "url": "https://upload.videodelivery.net/...", + "uid": "8f2a3c4d5e6f7890abcdef1234567890", + "expires_at": "2024-01-01T12:00:00Z" + } + } +} +``` + +For `delivery: stream`, `object_key` is the Stream video UID — use it as the asset's stable handle for probe + delete. `method` is `"POST"` for ≤200MB, `"TUS"` for larger. + +## Multipart Upload Completion + +``` +POST /v1/uploads/{object_key}/complete/{upload_id} +``` + +```json +{ + "parts": [ + { "part_number": 1, "etag": "\"d41d8cd98f00b204e9800998ecf8427e\"" }, + { "part_number": 2, "etag": "\"098f6bcd4621d373cade4e832627b4f6\"" } + ] +} +``` + +Response: `{ "status": "completed", "object_key": "..." }` + +## Multipart Upload Abort + +``` +DELETE /v1/uploads/{object_key}/abort/{upload_id} +``` + +Response: `{ "status": "aborted", "upload_id": "..." }` + +## Probe Asset (Video) + +``` +POST /v1/assets/{profile}/{key_base}/probe +``` + +Validates an uploaded video against the profile's constraint fields. Required: profile must be `kind: video`. Returns 200 for both pass and fail — `ok` is the gate, not the status code. + +- **R2 profiles** — mediaflow runs `ffprobe` over a presigned GET URL. +- **Stream profiles** — `key_base` is the Stream UID; mediaflow reads metadata from `GET /accounts/{id}/stream/{uid}`. Returns `202` with `{ok: false, ready: false, state: "queued"|"inprogress"}` if Stream is still encoding. + +**Pass:** +```json +{ + "ok": true, + "ready": true, + "video": { "duration_seconds": 42.18, "width": 1920, "height": 1080, "codec": "h264" }, + "reasons": [] +} +``` + +**Fail:** +```json +{ + "ok": false, + "video": { "duration_seconds": 67.4, "width": 854, "height": 480 }, + "reasons": [ + { "code": "duration_exceeded", "limit": 45, "actual": 67.4 }, + { "code": "width_too_low", "limit": 1280, "actual": 854 } + ] +} +``` + +**Reason codes:** `duration_exceeded`, `width_too_low`, `height_too_low`, `width_too_high`, `height_too_high`, `codec_not_allowed`, `no_video_stream`. + +**Other status codes:** `404` (asset/UID not found), `422` (profile is not `kind: video`), `502` (ffprobe crash or Stream API error). + +## Delete Asset + +``` +DELETE /v1/assets/{profile}/{key_base} +``` + +R2 profiles: deletes the original + every object under `thumb_folder/{key_base}*`. +Stream profiles: `key_base` is the UID; mediaflow calls Stream's `DELETE /stream/{uid}`. + +**R2 response:** +```json +{ "status": "deleted", "profile": "avatar", "key_base": "abc123", "objects_deleted": 4 } +``` + +**Stream response:** +```json +{ "status": "deleted", "profile": "trailer", "uid": "8f2a3c4d5e6f7890..." } +``` + +## Thumbnails + +``` +GET /thumb/{type}/{image_id}?width=512 +POST /thumb/{type}/{image_id} +``` + +Generates and serves thumbnails. POST requires `API_KEY` auth. + +**GET parameters:** +- `type`: profile name (avatar, photo, banner, …) +- `image_id`: unique identifier +- `width`: pixels — defaults to the profile's `default_size` + +## Original Images + +``` +GET /originals/{type}/{image_id} +``` + +Serves the original from storage. + +## Health Check + +``` +GET /health +``` + +Returns `OK`. diff --git a/docs/configuration.md b/docs/configuration.md new file mode 100644 index 0000000..5f6051f --- /dev/null +++ b/docs/configuration.md @@ -0,0 +1,134 @@ +# Configuration + +## storage-config.yaml + +Profile-based config combining upload, processing, probe, and delivery rules. Loaded from the path in `STORAGE_CONFIG_PATH` (defaults to `examples/storage-config.yaml`). Can also be loaded from S3 with an `s3://bucket/key` path. + +```yaml +profiles: + avatar: + kind: "image" + allowed_mimes: ["image/jpeg", "image/png", "image/webp"] + size_max_bytes: 5242880 # 5MB + multipart_threshold_mb: 15 + part_size_mb: 8 + token_ttl_seconds: 900 + storage_path: "originals/avatars/{shard?}/{key_base}" + enable_sharding: true + thumb_folder: "thumbnails/avatars" + sizes: ["128", "256"] + default_size: "256" + quality: 90 + convert_to: "webp" + + video: + # R2-delivered video — mediaflow runs ffprobe at attach + kind: "video" + allowed_mimes: ["video/mp4", "video/quicktime", "video/webm"] + size_max_bytes: 104857600 # 100MB + multipart_threshold_mb: 15 + part_size_mb: 8 + token_ttl_seconds: 1800 + storage_path: "originals/videos/{shard?}/{key_base}" + enable_sharding: true + max_duration_seconds: 600 + min_width: 640 + min_height: 360 + allowed_codecs: ["h264", "hevc"] + + trailer: + # Stream-delivered video — bytes go directly to Cloudflare Stream + kind: "video" + delivery: "stream" + allowed_mimes: ["video/mp4", "video/quicktime"] + size_max_bytes: 78643200 # 75MB; Stream caps plain POST at 200MB, TUS at 30GB + token_ttl_seconds: 1800 + max_duration_seconds: 45 + min_width: 1280 + min_height: 720 +``` + +## Field reference + +### Upload + +| Field | Notes | +|---|---| +| `kind` | `image` or `video`. | +| `delivery` | `""` / `"r2"` (default — presigned R2 PUT) or `"stream"` (Cloudflare Stream Direct Creator Upload). Stream requires `STREAM_ACCOUNT_ID` + `STREAM_API_TOKEN`. | +| `allowed_mimes` | Whitelist of MIME types accepted at presign. | +| `size_max_bytes` | Hard cap on upload size, enforced at presign. | +| `multipart_threshold_mb` | Files above this trigger multipart (R2 only). | +| `part_size_mb` | Multipart chunk size (R2 only). | +| `token_ttl_seconds` | Presigned URL expiration. | +| `storage_path` | Template for R2 key. Required for R2 profiles, omitted for `delivery: stream`. Supports `{key_base}`, `{ext}`, `{shard}`, `{shard?}`. | +| `enable_sharding` | When true, mediaflow generates a 2-char shard from `key_base` hash. | + +### Image processing + +| Field | Notes | +|---|---| +| `thumb_folder` | Where thumbnails are written. | +| `sizes` | Available widths (e.g. `["128", "256"]`). | +| `default_size` | Width used when client doesn't specify. | +| `quality` | 1–100. | +| `convert_to` | Output format (`webp`, `jpeg`, etc.). | + +### Video probe constraints + +All optional. Unset fields are skipped during probe; only set fields are enforced. + +| Field | Notes | +|---|---| +| `max_duration_seconds` | Reject longer videos. For `delivery: stream`, also passed to Stream's `direct_upload` API as `maxDurationSeconds`. | +| `min_width` / `min_height` | Resolution floors. | +| `max_width` / `max_height` | Resolution ceilings. | +| `allowed_codecs` | Whitelist (e.g. `["h264", "hevc"]`). Stream profiles ignore this — Stream re-encodes to H.264 regardless. | + +## Storage path templates + +`storage_path` is a string template: + +| Placeholder | Meaning | +|---|---| +| `{key_base}` | Unique file identifier from the request. | +| `{ext}` | File extension. | +| `{shard}` | Always replaced. Use when `enable_sharding: true`. | +| `{shard?}` | Removed (along with surrounding `/`) when `enable_sharding: false`. | + +**Auto-sharding** (`enable_sharding: true`): +- `"originals/{shard?}/{key_base}"` → `originals/ab/my-file` +- Shards auto-generated from a SHA1 prefix of `key_base`. +- Clients can optionally pass `shard` in the request to override. + +**Fixed organization** (`enable_sharding: false`): +- `"originals/user123/{key_base}"` → `originals/user123/my-file` +- `{shard?}` placeholders are stripped. +- `shard` field in requests is ignored. + +## Environment variables + +```bash +# Required +S3_BUCKET=your-bucket-name + +# AWS / R2 credentials — pick one +AWS_ACCESS_KEY_ID=your-access-key +AWS_SECRET_ACCESS_KEY=your-secret-key +# or use IAM role (ECS/EC2) + +# Optional +S3_REGION=us-east-1 +S3_ENDPOINT= # Override for non-AWS S3 (R2, MinIO, etc.) +PUBLIC_S3_ENDPOINT= # Host used in presigned URLs (defaults to S3_ENDPOINT) +PORT=8080 +CACHE_MAX_AGE=86400 +STORAGE_CONFIG_PATH=storage-config.yaml # Or s3://bucket/path/storage-config.yaml + +# API authentication (write endpoints reject if unset on a write) +API_KEY= + +# Cloudflare Stream — required if any profile uses delivery: stream +STREAM_ACCOUNT_ID= +STREAM_API_TOKEN= # Token with Stream:Edit on the account +``` diff --git a/docs/deployment.md b/docs/deployment.md new file mode 100644 index 0000000..400c913 --- /dev/null +++ b/docs/deployment.md @@ -0,0 +1,88 @@ +# Deployment + +## Docker + +### Pre-built image + +```bash +docker run -p 8080:8080 \ + -e S3_BUCKET=your-bucket-name \ + -e AWS_ACCESS_KEY_ID=your-key \ + -e AWS_SECRET_ACCESS_KEY=your-secret \ + -v $(pwd)/storage-config.yaml:/app/storage-config.yaml \ + syntaxsdev/mediaflow:latest +``` + +### Build from source + +```bash +make build-image +``` + +The runtime image ships with `ffmpeg` (for `ffprobe`) plus `libwebp` and `vips` for image processing. ARM64 builds fall back gracefully if `libwebp`/`vips` aren't available. + +## ECS + +1. Create an IAM role with S3 permissions (see below). +2. Create an ECS task definition using `syntaxsdev/mediaflow:latest`. +3. Attach the IAM role to the task. +4. Set env vars: `S3_BUCKET`, `S3_REGION`, `PORT`, plus `STREAM_ACCOUNT_ID` / `STREAM_API_TOKEN` if any profile uses Stream delivery. +5. Mount `storage-config.yaml` into the container, or set `STORAGE_CONFIG_PATH=s3://bucket/path/storage-config.yaml` to load from S3. +6. Configure your ALB to forward to the ECS service. + +### EC2 + +Same as ECS but attach the IAM role directly to the instance. mediaflow will pick up instance role credentials automatically. + +## AWS S3 IAM policy + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:GetObject", + "s3:PutObject", + "s3:DeleteObject", + "s3:ListBucket" + ], + "Resource": [ + "arn:aws:s3:::your-bucket-name", + "arn:aws:s3:::your-bucket-name/*" + ] + } + ] +} +``` + +When using IAM role auth (ECS or EC2), omit `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` from env — the SDK picks up credentials from instance metadata automatically. + +## R2 / non-AWS S3 + +Set `S3_ENDPOINT` to your provider's endpoint: + +```bash +S3_ENDPOINT=https://.r2.cloudflarestorage.com +PUBLIC_S3_ENDPOINT=https://media.your-domain.com # what clients see in presigned URLs +S3_REGION=auto +``` + +`PUBLIC_S3_ENDPOINT` is the host stamped into presigned URLs the client receives — set it to your custom CDN domain if you want clients to upload to `media.example.com` rather than the raw R2 endpoint. mediaflow itself uses `S3_ENDPOINT` for internal calls. + +## R2 bucket lifecycle rules + +For R2-delivered profiles, add these in the Cloudflare dashboard (R2 → bucket → Settings → Object lifecycle rules): + +- **`abort-stale-multipart`** — action: *Abort incomplete multipart uploads* after 1 day. Reaps abandoned multipart sessions that never called `/complete`. + +For "completed object, never attached" orphans, the right defense is application-side cleanup, not a bucket lifecycle rule (which can't distinguish attached from unattached). Implement a periodic job in your orchestrator that walks unattached records past a TTL and calls `DELETE /v1/assets/...` for each. + +Stream-delivered profiles don't need R2 lifecycle rules — bytes never land in R2. + +## CDN + +For R2-delivered images, put a CDN (CloudFront, Cloudflare) in front of the `/originals/` and `/thumb/` paths. mediaflow sets `Cache-Control` and `ETag` headers appropriately. + +For Stream-delivered videos, Stream's edge handles caching — no CDN setup needed. diff --git a/docs/stream.md b/docs/stream.md new file mode 100644 index 0000000..63e5bf3 --- /dev/null +++ b/docs/stream.md @@ -0,0 +1,76 @@ +# Cloudflare Stream Delivery + +Profiles with `delivery: stream` route uploads directly to Cloudflare Stream. R2 is bypassed entirely for those profiles — no double storage, no transcoding pipeline to operate, native HEVC and MOV support. + +## Why Stream + +For video profiles, the alternatives are: + +| Approach | Trade | +|---|---| +| **R2 + ffprobe + DIY transcode** | Full control, but you operate ffmpeg, store proxies, and handle codec rescue (HEVC, MOV) yourself. | +| **R2 + Cloudflare Media Transformations** | JIT transformation at the edge, but inputs must be H.264 mp4. Anything else needs server-side rescue. | +| **Cloudflare Stream** (this) | One-time transcode at ingest, free encoding, native support for any codec/container clients produce. ~$5/1000 min stored, ~$1/1000 min delivered. | + +For short-form creator content (trailers, clips), Stream is the lowest-engineering path that handles iPhone HEVC/MOV uploads without any DIY transcoding. + +## Flow + +``` +client → orchestrator (e.g. barta-api) + └─ POST /v1/uploads/presign { profile: "trailer", ... } + mediaflow → Stream: POST /accounts/{id}/stream/direct_upload + { maxDurationSeconds: } + mediaflow ← Stream: { uploadURL, uid } + ← { object_key: , upload: { stream: { url, uid, method, expires_at } } } + +client → Stream: direct upload to uploadURL (POST ≤200MB, TUS otherwise) + +orchestrator → mediaflow: POST /v1/assets/trailer/{uid}/probe + mediaflow → Stream: GET /stream/{uid} (read duration/dimensions) + returns 202 if still encoding, 200 with {ok, reasons[]} once ready + +orchestrator → mediaflow: DELETE /v1/assets/trailer/{uid} (if probe failed or asset removed) + mediaflow → Stream: DELETE /stream/{uid} +``` + +## Caps & gates + +| Constraint | Where it's enforced | +|---|---| +| `size_max_bytes` | mediaflow at presign time, before calling Stream. | +| `max_duration_seconds` | Passed to Stream as `maxDurationSeconds`; Stream rejects longer uploads at ingest. | +| `min_width` / `min_height` (and others) | Validated post-ingest by the probe endpoint — dimensions aren't known until Stream finishes encoding. | + +`allowed_codecs` is **ignored** for Stream profiles. Stream re-encodes everything to H.264 for delivery, so input codec is irrelevant. + +## Probe states + +The probe endpoint returns: + +- **`200` with `ok: true, ready: true`** — video is ready and meets all profile constraints. +- **`200` with `ok: false, ready: true, reasons: [...]`** — video is ready but violates a constraint. Caller should `DELETE` the asset and surface a 422 to the client. +- **`202` with `ok: false, ready: false, state: "queued"|"inprogress"`** — Stream is still encoding. Caller should retry, or use a [Stream webhook](https://developers.cloudflare.com/stream/manage-video-library/using-webhooks/) to get notified. +- **`404`** — UID not found in Stream. +- **`502`** — Stream API error. + +## Serving + +mediaflow does not serve Stream-delivered videos. Use Stream's URLs directly from your frontend: + +| URL | Purpose | +|---|---| +| `https://customer-{code}.cloudflarestream.com/{uid}/iframe` | Stream's bundled player (iframe embed). | +| `https://customer-{code}.cloudflarestream.com/{uid}/manifest/video.m3u8` | HLS manifest — feed to your own player (hls.js, AVPlayer, ExoPlayer). | +| `https://customer-{code}.cloudflarestream.com/{uid}/thumbnails/thumbnail.jpg?time=2s` | Auto-generated poster at any timestamp. | + +Custom domains for VOD playback are not supported by Stream. Use your own player against the HLS manifest if branding the URL bar matters; end-users don't typically see CDN hostnames in playback flows. + +## Required environment + +```bash +STREAM_ACCOUNT_ID= # Cloudflare account ID +STREAM_API_TOKEN= # API token with Stream:Edit permission +``` + +mediaflow only reads these when a profile sets `delivery: stream`. Other profiles work without them. diff --git a/examples/storage-config.yaml b/examples/storage-config.yaml index 61ace4b..bc0e530 100644 --- a/examples/storage-config.yaml +++ b/examples/storage-config.yaml @@ -71,20 +71,13 @@ profiles: trailer: kind: "video" + delivery: "stream" # bytes go to Cloudflare Stream, not R2 allowed_mimes: ["video/mp4", "video/quicktime"] - size_max_bytes: 78643200 # 75MB - multipart_threshold_mb: 15 - part_size_mb: 8 + size_max_bytes: 78643200 # 75MB; Stream caps plain POST at 200MB and TUS at 30GB token_ttl_seconds: 1800 - storage_path: "originals/trailers/{key_base}" - enable_sharding: false - proxy_folder: "proxies/trailers" - formats: ["mp4", "webm"] - quality: 80 - max_duration_seconds: 45 + max_duration_seconds: 45 # also passed to Stream as upload-time cap min_width: 1280 min_height: 720 - allowed_codecs: ["h264", "hevc"] default: # Upload configuration diff --git a/internal/config/config.go b/internal/config/config.go index 7703a46..0d605ea 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -21,6 +21,9 @@ type Config struct { CacheMaxAge string // API authentication APIKey string + // Cloudflare Stream (only required for profiles with delivery: stream) + StreamAccountID string + StreamAPIToken string } func Load() *Config { @@ -43,6 +46,9 @@ func Load() *Config { CacheMaxAge: getEnv("CACHE_MAX_AGE", "86400"), // API authentication APIKey: getEnv("API_KEY", ""), + // Cloudflare Stream + StreamAccountID: getEnv("STREAM_ACCOUNT_ID", ""), + StreamAPIToken: getEnv("STREAM_API_TOKEN", ""), } } @@ -57,6 +63,9 @@ type Profile struct { TokenTTLSeconds int64 `yaml:"token_ttl_seconds"` StoragePath string `yaml:"storage_path"` EnableSharding bool `yaml:"enable_sharding"` + // Delivery selects where uploads land. "" or "r2" → presigned R2 PUT (default). + // "stream" → Cloudflare Stream Direct Creator Upload; storage_path is ignored. + Delivery string `yaml:"delivery,omitempty"` // Processing configuration (shared) ThumbFolder string `yaml:"thumb_folder,omitempty"` @@ -125,9 +134,14 @@ func LoadStorageConfig(s3 *s3.Client, config *Config) (*StorageConfig, error) { return &storageConfig, nil } -// validateStorageConfig ensures all profiles have required fields +// validateStorageConfig ensures all profiles have required fields. +// storage_path is only required for R2-backed profiles; stream-delivered +// profiles store bytes in Cloudflare Stream and have no R2 key. func validateStorageConfig(config *StorageConfig) error { for profileName, profile := range config.Profiles { + if profile.Delivery == "stream" { + continue + } if profile.StoragePath == "" { return fmt.Errorf("profile '%s' is missing required 'storage_path' field", profileName) } diff --git a/internal/stream/client.go b/internal/stream/client.go new file mode 100644 index 0000000..28608ad --- /dev/null +++ b/internal/stream/client.go @@ -0,0 +1,191 @@ +// Package stream wraps the small slice of the Cloudflare Stream API that +// mediaflow needs: provisioning Direct Creator Upload URLs, fetching +// post-ingest metadata, and deleting videos. +package stream + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +const apiBase = "https://api.cloudflare.com/client/v4" + +type Client struct { + accountID string + apiToken string + http *http.Client +} + +func NewClient(accountID, apiToken string) *Client { + return &Client{ + accountID: accountID, + apiToken: apiToken, + http: &http.Client{Timeout: 30 * time.Second}, + } +} + +// Configured reports whether the credentials needed for any Stream call are present. +func (c *Client) Configured() bool { + return c != nil && c.accountID != "" && c.apiToken != "" +} + +type DirectUploadRequest struct { + MaxDurationSeconds int `json:"maxDurationSeconds"` + Expiry string `json:"expiry,omitempty"` + Meta map[string]string `json:"meta,omitempty"` + RequireSignedURLs bool `json:"requireSignedURLs,omitempty"` + AllowedOrigins []string `json:"allowedOrigins,omitempty"` +} + +type DirectUploadResult struct { + UploadURL string + UID string +} + +// CreateDirectUpload provisions a one-time upload URL plus a stable video UID. +// The URL accepts plain POST for ≤200MB or TUS for larger files. +func (c *Client) CreateDirectUpload(ctx context.Context, req DirectUploadRequest) (*DirectUploadResult, error) { + if !c.Configured() { + return nil, fmt.Errorf("stream client not configured (missing STREAM_ACCOUNT_ID or STREAM_API_TOKEN)") + } + + body, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal direct_upload request: %w", err) + } + + url := fmt.Sprintf("%s/accounts/%s/stream/direct_upload", apiBase, c.accountID) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + if err != nil { + return nil, err + } + httpReq.Header.Set("Authorization", "Bearer "+c.apiToken) + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := c.http.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("stream direct_upload: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + raw, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("stream direct_upload status %d: %s", resp.StatusCode, string(raw)) + } + + var parsed struct { + Result struct { + UploadURL string `json:"uploadURL"` + UID string `json:"uid"` + } `json:"result"` + Success bool `json:"success"` + } + if err := json.NewDecoder(resp.Body).Decode(&parsed); err != nil { + return nil, fmt.Errorf("decode direct_upload response: %w", err) + } + if !parsed.Success || parsed.Result.UploadURL == "" || parsed.Result.UID == "" { + return nil, fmt.Errorf("stream direct_upload returned empty result") + } + return &DirectUploadResult{ + UploadURL: parsed.Result.UploadURL, + UID: parsed.Result.UID, + }, nil +} + +// VideoDetails is the slice of GET /stream/{uid} we need for post-upload validation. +type VideoDetails struct { + UID string + ReadyToStream bool + StatusState string + DurationSec float64 + Width int + Height int + InputCodec string +} + +func (c *Client) GetVideo(ctx context.Context, uid string) (*VideoDetails, error) { + if !c.Configured() { + return nil, fmt.Errorf("stream client not configured") + } + + url := fmt.Sprintf("%s/accounts/%s/stream/%s", apiBase, c.accountID, uid) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + httpReq.Header.Set("Authorization", "Bearer "+c.apiToken) + + resp, err := c.http.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("stream get video: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + return nil, ErrVideoNotFound + } + if resp.StatusCode >= 400 { + raw, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("stream get video status %d: %s", resp.StatusCode, string(raw)) + } + + var parsed struct { + Result struct { + UID string `json:"uid"` + ReadyToStream bool `json:"readyToStream"` + Duration float64 `json:"duration"` + Status struct { + State string `json:"state"` + } `json:"status"` + Input struct { + Width int `json:"width"` + Height int `json:"height"` + } `json:"input"` + Meta map[string]string `json:"meta"` + } `json:"result"` + } + if err := json.NewDecoder(resp.Body).Decode(&parsed); err != nil { + return nil, fmt.Errorf("decode get video response: %w", err) + } + + return &VideoDetails{ + UID: parsed.Result.UID, + ReadyToStream: parsed.Result.ReadyToStream, + StatusState: parsed.Result.Status.State, + DurationSec: parsed.Result.Duration, + Width: parsed.Result.Input.Width, + Height: parsed.Result.Input.Height, + }, nil +} + +func (c *Client) DeleteVideo(ctx context.Context, uid string) error { + if !c.Configured() { + return fmt.Errorf("stream client not configured") + } + + url := fmt.Sprintf("%s/accounts/%s/stream/%s", apiBase, c.accountID, uid) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) + if err != nil { + return err + } + httpReq.Header.Set("Authorization", "Bearer "+c.apiToken) + + resp, err := c.http.Do(httpReq) + if err != nil { + return fmt.Errorf("stream delete video: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 && resp.StatusCode != http.StatusNotFound { + raw, _ := io.ReadAll(resp.Body) + return fmt.Errorf("stream delete video status %d: %s", resp.StatusCode, string(raw)) + } + return nil +} + +var ErrVideoNotFound = fmt.Errorf("stream video not found") diff --git a/internal/upload/handlers.go b/internal/upload/handlers.go index 620271a..a6aa5b2 100644 --- a/internal/upload/handlers.go +++ b/internal/upload/handlers.go @@ -3,6 +3,7 @@ package upload import ( "context" "encoding/json" + "errors" "fmt" "net/http" "strings" @@ -10,6 +11,7 @@ import ( "mediaflow/internal/config" "mediaflow/internal/probe" + "mediaflow/internal/stream" ) type Handler struct { @@ -222,6 +224,27 @@ func (h *Handler) HandleDeleteAsset(w http.ResponseWriter, r *http.Request) { return } + if profile.Delivery == "stream" { + sc := h.uploadService.StreamClient() + if !sc.Configured() { + h.writeError(w, http.StatusInternalServerError, ErrBadRequest, "Stream not configured", "") + return + } + if err := sc.DeleteVideo(r.Context(), keyBase); err != nil { + fmt.Printf("Stream delete error uid=%s: %v\n", keyBase, err) + h.writeError(w, http.StatusBadGateway, ErrUpstream, "Stream delete failed", err.Error()) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]any{ + "status": "deleted", + "profile": profileName, + "uid": keyBase, + }) + return + } + // Delete the original + thumbnails deleted, err := h.uploadService.DeleteAsset(h.ctx, profile, keyBase) if err != nil { @@ -265,6 +288,11 @@ func (h *Handler) HandleProbeAsset(w http.ResponseWriter, r *http.Request) { return } + if profile.Delivery == "stream" { + h.handleProbeStream(w, r, profile, keyBase) + return + } + objectKey := h.uploadService.ResolveAssetKey(profile, keyBase) if err := h.uploadService.AssetExists(r.Context(), objectKey); err != nil { @@ -291,6 +319,79 @@ func (h *Handler) HandleProbeAsset(w http.ResponseWriter, r *http.Request) { _ = json.NewEncoder(w).Encode(result) } +// handleProbeStream validates a Stream-delivered video by reading metadata +// from the Stream API. key_base is the Stream UID. +func (h *Handler) handleProbeStream(w http.ResponseWriter, r *http.Request, profile *config.Profile, uid string) { + sc := h.uploadService.StreamClient() + if !sc.Configured() { + h.writeError(w, http.StatusInternalServerError, ErrBadRequest, "Stream not configured", "") + return + } + + details, err := sc.GetVideo(r.Context(), uid) + if err != nil { + if errors.Is(err, stream.ErrVideoNotFound) { + h.writeError(w, http.StatusNotFound, ErrBadRequest, "Stream video not found", uid) + return + } + fmt.Printf("Stream probe error uid=%s: %v\n", uid, err) + h.writeError(w, http.StatusBadGateway, ErrUpstream, "Stream API error", err.Error()) + return + } + + if !details.ReadyToStream { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) + _ = json.NewEncoder(w).Encode(map[string]any{ + "ok": false, + "ready": false, + "state": details.StatusState, + "reason": "video still processing", + }) + return + } + + reasons := []probe.Reason{} + if profile.MaxDurationSeconds > 0 && details.DurationSec > float64(profile.MaxDurationSeconds) { + reasons = append(reasons, probe.Reason{ + Code: "duration_exceeded", Limit: profile.MaxDurationSeconds, Actual: details.DurationSec, + }) + } + if profile.MinWidth > 0 && details.Width > 0 && details.Width < profile.MinWidth { + reasons = append(reasons, probe.Reason{ + Code: "width_too_low", Limit: profile.MinWidth, Actual: details.Width, + }) + } + if profile.MinHeight > 0 && details.Height > 0 && details.Height < profile.MinHeight { + reasons = append(reasons, probe.Reason{ + Code: "height_too_low", Limit: profile.MinHeight, Actual: details.Height, + }) + } + if profile.MaxWidth > 0 && details.Width > profile.MaxWidth { + reasons = append(reasons, probe.Reason{ + Code: "width_too_high", Limit: profile.MaxWidth, Actual: details.Width, + }) + } + if profile.MaxHeight > 0 && details.Height > profile.MaxHeight { + reasons = append(reasons, probe.Reason{ + Code: "height_too_high", Limit: profile.MaxHeight, Actual: details.Height, + }) + } + + resp := map[string]any{ + "ok": len(reasons) == 0, + "ready": true, + "state": details.StatusState, + "uid": details.UID, + "video": map[string]any{"duration_seconds": details.DurationSec, "width": details.Width, "height": details.Height}, + "reasons": reasons, + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(resp) +} + // parseAssetPath extracts {profile} and {key_base} from /v1/assets/{profile}/{key_base}{suffix}. func parseAssetPath(urlPath, suffix string) (profile, keyBase string, ok bool) { path := strings.TrimPrefix(urlPath, "/v1/assets/") diff --git a/internal/upload/service.go b/internal/upload/service.go index a0ceb78..f55795b 100644 --- a/internal/upload/service.go +++ b/internal/upload/service.go @@ -10,20 +10,28 @@ import ( "mediaflow/internal/config" "mediaflow/internal/s3" + "mediaflow/internal/stream" ) type Service struct { - s3Client S3Client - config *config.Config + s3Client S3Client + streamClient *stream.Client + config *config.Config } func NewService(s3Client S3Client, config *config.Config) *Service { return &Service{ - s3Client: s3Client, - config: config, + s3Client: s3Client, + streamClient: stream.NewClient(config.StreamAccountID, config.StreamAPIToken), + config: config, } } +// StreamClient exposes the Stream API wrapper to handlers (probe, delete). +func (s *Service) StreamClient() *stream.Client { + return s.streamClient +} + // PresignUpload generates presigned URLs for upload based on the request func (s *Service) PresignUpload(ctx context.Context, req *PresignRequest, profile *config.Profile, baseURL string) (*PresignResponse, error) { // Validate MIME type @@ -36,6 +44,10 @@ func (s *Service) PresignUpload(ctx context.Context, req *PresignRequest, profil return nil, fmt.Errorf("file size exceeds maximum: %d > %d", req.SizeBytes, profile.SizeMaxBytes) } + if profile.Delivery == "stream" { + return s.presignStream(ctx, req, profile) + } + // Generate shard only if auto-sharding is enabled shard := "" if profile.EnableSharding { @@ -81,11 +93,11 @@ func (s *Service) isMimeAllowed(mime string, allowedMimes []string) bool { func (s *Service) buildObjectKey(template, keyBase, ext, shard string) string { objectKey := template - + // Replace placeholders in template objectKey = strings.ReplaceAll(objectKey, "{key_base}", keyBase) objectKey = strings.ReplaceAll(objectKey, "{ext}", ext) - + // Handle optional shard if shard != "" { objectKey = strings.ReplaceAll(objectKey, "{shard?}", shard) @@ -96,13 +108,13 @@ func (s *Service) buildObjectKey(template, keyBase, ext, shard string) string { objectKey = strings.ReplaceAll(objectKey, "{shard?}/", "") objectKey = strings.ReplaceAll(objectKey, "{shard?}", "") } - + return objectKey } func (s *Service) determineStrategy(multipart string, sizeBytes int64, thresholdMB int64) string { thresholdBytes := thresholdMB * 1024 * 1024 - + switch multipart { case "force": return "multipart" @@ -122,16 +134,16 @@ func (s *Service) buildRequiredHeaders(mime string) map[string]string { headers := map[string]string{ "Content-Type": mime, } - + // Note: Server-side encryption disabled for MinIO compatibility // In production, configure proper SSE based on your storage backend - + return headers } func (s *Service) createUploadDetails(ctx context.Context, strategy, objectKey string, headers map[string]string, expiresAt time.Time, partSizeMB int64, totalSizeBytes int64, baseURL string) (*UploadDetails, error) { expires := time.Until(expiresAt) - + if strategy == "single" { // Add If-None-Match header for overwrite prevention singleHeaders := make(map[string]string) @@ -139,12 +151,12 @@ func (s *Service) createUploadDetails(ctx context.Context, strategy, objectKey s singleHeaders[k] = v } singleHeaders["If-None-Match"] = "*" - + url, err := s.s3Client.PresignPutObject(ctx, objectKey, expires, singleHeaders) if err != nil { return nil, err } - + return &UploadDetails{ Single: &SingleUpload{ Method: "PUT", @@ -154,23 +166,23 @@ func (s *Service) createUploadDetails(ctx context.Context, strategy, objectKey s }, }, nil } - + // For multipart uploads, create the multipart upload and generate part URLs uploadID, err := s.s3Client.CreateMultipartUpload(ctx, objectKey, headers) if err != nil { return nil, fmt.Errorf("failed to create multipart upload: %w", err) } - + // Calculate number of parts needed partSizeBytes := partSizeMB * 1024 * 1024 numParts := int(math.Ceil(float64(totalSizeBytes) / float64(partSizeBytes))) - + // Generate presigned URLs for each part (limit to reasonable number) maxParts := 100 // Reasonable limit for batch presigning if numParts > maxParts { numParts = maxParts } - + parts := make([]PartUpload, numParts) for i := 0; i < numParts; i++ { partNumber := i + 1 @@ -178,7 +190,7 @@ func (s *Service) createUploadDetails(ctx context.Context, strategy, objectKey s if err != nil { return nil, fmt.Errorf("failed to presign part %d: %w", partNumber, err) } - + parts[i] = PartUpload{ PartNumber: partNumber, Method: "PUT", @@ -187,15 +199,15 @@ func (s *Service) createUploadDetails(ctx context.Context, strategy, objectKey s ExpiresAt: expiresAt, } } - + // Generate server-side URLs for complete and abort operations if baseURL == "" { baseURL = "http://localhost:8080" // Default fallback } - + completeURL := fmt.Sprintf("%s/v1/uploads/%s/complete/%s", baseURL, objectKey, uploadID) abortURL := fmt.Sprintf("%s/v1/uploads/%s/abort/%s", baseURL, objectKey, uploadID) - + return &UploadDetails{ Multipart: &MultipartUpload{ UploadID: uploadID, @@ -227,7 +239,7 @@ func (s *Service) CompleteMultipartUpload(ctx context.Context, objectKey, upload ETag: part.ETag, } } - + return s.s3Client.CompleteMultipartUpload(ctx, objectKey, uploadID, parts) } @@ -236,6 +248,49 @@ func (s *Service) AbortMultipartUpload(ctx context.Context, objectKey, uploadID return s.s3Client.AbortMultipartUpload(ctx, objectKey, uploadID) } +// presignStream provisions a Cloudflare Stream Direct Creator Upload. +func (s *Service) presignStream(ctx context.Context, req *PresignRequest, profile *config.Profile) (*PresignResponse, error) { + if !s.streamClient.Configured() { + return nil, fmt.Errorf("stream delivery requested but STREAM_ACCOUNT_ID/STREAM_API_TOKEN not set") + } + + maxDur := profile.MaxDurationSeconds + if maxDur <= 0 { + // Stream requires a positive maxDurationSeconds for direct uploads; + // fall back to a generous cap so misconfigured profiles still work. + maxDur = 600 + } + + result, err := s.streamClient.CreateDirectUpload(ctx, stream.DirectUploadRequest{ + MaxDurationSeconds: maxDur, + Meta: map[string]string{ + "key_base": req.KeyBase, + "profile": req.Profile, + }, + }) + if err != nil { + return nil, err + } + + expiresAt := time.Now().Add(time.Duration(profile.TokenTTLSeconds) * time.Second) + method := "POST" + if req.SizeBytes > 200*1024*1024 { + method = "TUS" + } + + return &PresignResponse{ + ObjectKey: result.UID, + Upload: &UploadDetails{ + Stream: &StreamUpload{ + Method: method, + URL: result.UploadURL, + UID: result.UID, + ExpiresAt: expiresAt, + }, + }, + }, nil +} + func (s *Service) ResolveAssetKey(profile *config.Profile, keyBase string) string { shard := "" if profile.EnableSharding { @@ -287,4 +342,4 @@ func (s *Service) DeleteAsset(ctx context.Context, profile *config.Profile, keyB func GenerateShard(keyBase string) string { hash := sha1.Sum([]byte(keyBase)) return fmt.Sprintf("%02x", hash[:1]) // First 2 hex characters -} \ No newline at end of file +} diff --git a/internal/upload/types.go b/internal/upload/types.go index d8a0ac0..5ae48a6 100644 --- a/internal/upload/types.go +++ b/internal/upload/types.go @@ -24,6 +24,16 @@ type PresignResponse struct { type UploadDetails struct { Single *SingleUpload `json:"single,omitempty"` Multipart *MultipartUpload `json:"multipart,omitempty"` + Stream *StreamUpload `json:"stream,omitempty"` +} + +// StreamUpload is a one-time direct-creator-upload URL for Cloudflare Stream. +// Client uses TUS for files >200MB, plain POST otherwise. +type StreamUpload struct { + Method string `json:"method"` + URL string `json:"url"` + UID string `json:"uid"` + ExpiresAt time.Time `json:"expires_at"` } // SingleUpload contains details for single PUT upload