diff --git a/docs/docs/api/adapters/Adapter.md b/docs/docs/api/adapters/Adapter.md index 7fa2fd960e..00cc4151b1 100644 --- a/docs/docs/api/adapters/Adapter.md +++ b/docs/docs/api/adapters/Adapter.md @@ -13,6 +13,7 @@ - format_demos - format_field_description - format_field_structure + - format_system_message - format_task_description - format_user_message_content - parse diff --git a/docs/docs/api/adapters/ChatAdapter.md b/docs/docs/api/adapters/ChatAdapter.md index fa5e6a4d25..2bd2b06b55 100644 --- a/docs/docs/api/adapters/ChatAdapter.md +++ b/docs/docs/api/adapters/ChatAdapter.md @@ -15,6 +15,7 @@ - format_field_structure - format_field_with_value - format_finetune_data + - format_system_message - format_task_description - format_user_message_content - parse diff --git a/docs/docs/api/adapters/JSONAdapter.md b/docs/docs/api/adapters/JSONAdapter.md index 718264b6e4..65c2741074 100644 --- a/docs/docs/api/adapters/JSONAdapter.md +++ b/docs/docs/api/adapters/JSONAdapter.md @@ -15,6 +15,7 @@ - format_field_structure - format_field_with_value - format_finetune_data + - format_system_message - format_task_description - format_user_message_content - parse diff --git a/docs/docs/api/adapters/TwoStepAdapter.md b/docs/docs/api/adapters/TwoStepAdapter.md index ec14b47115..3594c4f55b 100644 --- a/docs/docs/api/adapters/TwoStepAdapter.md +++ b/docs/docs/api/adapters/TwoStepAdapter.md @@ -13,6 +13,7 @@ - format_demos - format_field_description - format_field_structure + - format_system_message - format_task_description - format_user_message_content - parse diff --git a/docs/docs/api/evaluation/EvaluationResult.md b/docs/docs/api/evaluation/EvaluationResult.md index f0b4ff714a..3483670e25 100644 --- a/docs/docs/api/evaluation/EvaluationResult.md +++ b/docs/docs/api/evaluation/EvaluationResult.md @@ -1,6 +1,5 @@ # dspy.evaluate.EvaluationResult - ::: dspy.evaluate.EvaluationResult handler: python diff --git a/docs/docs/api/experimental/Citations.md b/docs/docs/api/experimental/Citations.md index 663372d29e..9650eee1ed 100644 --- a/docs/docs/api/experimental/Citations.md +++ b/docs/docs/api/experimental/Citations.md @@ -5,6 +5,7 @@ handler: python options: members: + - adapt_to_native_lm_feature - description - extract_custom_type_from_annotation - format diff --git a/docs/docs/api/experimental/Document.md b/docs/docs/api/experimental/Document.md index 40c8f9c383..885dd3d098 100644 --- a/docs/docs/api/experimental/Document.md +++ b/docs/docs/api/experimental/Document.md @@ -5,6 +5,7 @@ handler: python options: members: + - adapt_to_native_lm_feature - description - extract_custom_type_from_annotation - format diff --git a/docs/docs/api/primitives/Audio.md b/docs/docs/api/primitives/Audio.md index 21e0cdf3ba..6c766e8dcb 100644 --- a/docs/docs/api/primitives/Audio.md +++ b/docs/docs/api/primitives/Audio.md @@ -5,6 +5,7 @@ handler: python options: members: + - adapt_to_native_lm_feature - description - extract_custom_type_from_annotation - format diff --git a/docs/docs/api/primitives/Code.md b/docs/docs/api/primitives/Code.md index 5c07c67872..3b2ec341ee 100644 --- a/docs/docs/api/primitives/Code.md +++ b/docs/docs/api/primitives/Code.md @@ -5,6 +5,7 @@ handler: python options: members: + - adapt_to_native_lm_feature - description - extract_custom_type_from_annotation - format diff --git a/docs/docs/api/primitives/Image.md b/docs/docs/api/primitives/Image.md index abc161c08d..07a8eab875 100644 --- a/docs/docs/api/primitives/Image.md +++ b/docs/docs/api/primitives/Image.md @@ -5,6 +5,7 @@ handler: python options: members: + - adapt_to_native_lm_feature - description - extract_custom_type_from_annotation - format diff --git a/docs/docs/api/primitives/Tool.md b/docs/docs/api/primitives/Tool.md index 6e32d40c71..22ba090a04 100644 --- a/docs/docs/api/primitives/Tool.md +++ b/docs/docs/api/primitives/Tool.md @@ -7,6 +7,7 @@ members: - __call__ - acall + - adapt_to_native_lm_feature - description - extract_custom_type_from_annotation - format diff --git a/docs/docs/api/primitives/ToolCalls.md b/docs/docs/api/primitives/ToolCalls.md index 492de64ff2..7f4c31db05 100644 --- a/docs/docs/api/primitives/ToolCalls.md +++ b/docs/docs/api/primitives/ToolCalls.md @@ -5,6 +5,7 @@ handler: python options: members: + - adapt_to_native_lm_feature - description - extract_custom_type_from_annotation - format diff --git a/docs/docs/api/primitives/Video.md b/docs/docs/api/primitives/Video.md new file mode 100644 index 0000000000..3f51713c54 --- /dev/null +++ b/docs/docs/api/primitives/Video.md @@ -0,0 +1,204 @@ +# dspy.Video + +The `dspy.Video` type enables video understanding capabilities in DSPy, with native support for Google's Gemini models via LiteLLM. + +## Overview + +`dspy.Video` supports multiple input sources: + +- **Local files**: Automatically encoded to base64 (for files under 20MB) +- **Remote URLs**: HTTP(S) video URLs passed directly to the model +- **YouTube URLs**: Native Gemini support for YouTube videos +- **Pre-uploaded files**: Reference videos uploaded via Gemini's Files API +- **Raw bytes**: Video data with specified MIME type + +## Supported Formats + +MP4, MPEG, MOV, AVI, FLV, MPG, WebM, WMV, and 3GPP. + +## Basic Usage + +### From a Local File + +```python +import dspy + +# Simple construction +video = dspy.Video("./my_video.mp4") + +# Or using the factory method +video = dspy.Video.from_path("./my_video.mp4") +``` + +### From a URL + +```python +# Remote video URL +video = dspy.Video("https://example.com/video.mp4") + +# Or explicitly +video = dspy.Video.from_url("https://example.com/video.mp4") +``` + +### From YouTube + +```python +# YouTube videos are natively supported by Gemini +video = dspy.Video.from_youtube("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + +# Or just pass the URL directly +video = dspy.Video("https://www.youtube.com/watch?v=dQw4w9WgXcQ") +``` + +### From a Pre-uploaded File ID + +For videos larger than 20MB, upload them first using Gemini's Files API: + +```python +# Reference an already-uploaded file +video = dspy.Video.from_file_id("files/abc123", mime_type="video/mp4") +``` + +## Using Video in Signatures + +### Basic Video Question-Answering + +```python +import dspy + +class VideoQA(dspy.Signature): + """Answer questions about a video.""" + video: dspy.Video = dspy.InputField(desc="The video to analyze") + question: str = dspy.InputField(desc="Question about the video") + answer: str = dspy.OutputField(desc="Answer based on the video content") + +# Configure with a Gemini model +lm = dspy.LM("gemini/gemini-2.0-flash") +dspy.configure(lm=lm) + +# Use the signature +qa = dspy.Predict(VideoQA) +result = qa( + video=dspy.Video("./clip.mp4"), + question="What is happening in this video?" +) +print(result.answer) +``` + +### Video Summarization + +```python +class VideoSummary(dspy.Signature): + """Generate a summary of the video content.""" + video: dspy.Video = dspy.InputField() + summary: str = dspy.OutputField(desc="A concise summary of what happens in the video") + +summarize = dspy.ChainOfThought(VideoSummary) +result = summarize(video=dspy.Video("https://example.com/presentation.mp4")) +``` + +### Multiple Videos + +```python +class VideoComparison(dspy.Signature): + """Compare multiple videos.""" + videos: list[dspy.Video] = dspy.InputField(desc="Videos to compare") + comparison: str = dspy.OutputField(desc="Comparison of the videos") + +compare = dspy.Predict(VideoComparison) +result = compare(videos=[ + dspy.Video("./video1.mp4"), + dspy.Video("./video2.mp4"), +]) +``` + +## Handling Large Videos (>20MB) + +For videos exceeding the 20MB inline limit, use the Gemini Files API: + +```python +import os + +# Set your API key +os.environ["GEMINI_API_KEY"] = "your-api-key" + +# Create video from large file (will raise error if used directly) +video = dspy.Video.from_path("./large_video.mp4") + +# Upload to Gemini Files API +uploaded_video = video.upload() + +# Now use the uploaded video (references file_id) +result = qa(video=uploaded_video, question="What happens at the end?") +``` + +You can also pass the API key directly: + +```python +uploaded_video = video.upload(api_key="your-api-key") +``` + +## Combining with Other Media Types + +```python +class MultiModalAnalysis(dspy.Signature): + """Analyze video and image together.""" + video: dspy.Video = dspy.InputField() + thumbnail: dspy.Image = dspy.InputField() + analysis: str = dspy.OutputField() + +analyze = dspy.Predict(MultiModalAnalysis) +result = analyze( + video=dspy.Video("./clip.mp4"), + thumbnail=dspy.Image("./thumbnail.jpg") +) +``` + +## Model Compatibility + +Video understanding is currently best supported by **Google Gemini** models: + +- `gemini/gemini-2.0-flash` +- `gemini/gemini-2.0-pro` +- `gemini/gemini-1.5-flash` +- `gemini/gemini-1.5-pro` + +Other models may have limited or no video support. + +## Technical Details + +- **Inline size limit**: 20MB (larger files require Files API upload) +- **Frame sampling**: Gemini samples at 1 FPS by default +- **Token usage**: ~300 tokens per second of video at default resolution +- **Video duration**: Up to 2 hours at default resolution, 6 hours at low resolution +- **YouTube limits**: Free tier allows 8 hours/day; paid tiers unlimited + + +::: dspy.Video + handler: python + options: + members: + - adapt_to_native_lm_feature + - description + - extract_custom_type_from_annotation + - format + - from_bytes + - from_file_id + - from_path + - from_url + - from_youtube + - is_streamable + - parse_lm_response + - parse_stream_chunk + - serialize_model + - upload + show_source: true + show_root_heading: true + heading_level: 2 + docstring_style: google + show_root_full_path: true + show_object_full_path: false + separate_signature: false + inherited_members: true +::: + diff --git a/docs/docs/api/tools/Embeddings.md b/docs/docs/api/tools/Embeddings.md index 36b5553e71..1b8a7bdf2e 100644 --- a/docs/docs/api/tools/Embeddings.md +++ b/docs/docs/api/tools/Embeddings.md @@ -7,6 +7,9 @@ members: - __call__ - forward + - from_saved + - load + - save show_source: true show_root_heading: true heading_level: 2 diff --git a/docs/docs/api/utils/StreamListener.md b/docs/docs/api/utils/StreamListener.md index 517c398fdc..66c2db8a14 100644 --- a/docs/docs/api/utils/StreamListener.md +++ b/docs/docs/api/utils/StreamListener.md @@ -5,6 +5,7 @@ handler: python options: members: + - finalize - flush - receive show_source: true diff --git a/docs/docs/learn/programming/signatures.md b/docs/docs/learn/programming/signatures.md index ce3717b282..8384901d28 100644 --- a/docs/docs/learn/programming/signatures.md +++ b/docs/docs/learn/programming/signatures.md @@ -175,6 +175,33 @@ Prediction( ) ``` +### Example F: Video understanding with Gemini + +```python +class VideoQA(dspy.Signature): + """Answer questions about a video.""" + video: dspy.Video = dspy.InputField(desc="The video to analyze") + question: str = dspy.InputField(desc="Question about the video") + answer: str = dspy.OutputField(desc="Answer based on the video content") + +# Configure with a Gemini model (video understanding requires Gemini) +lm = dspy.LM("gemini/gemini-2.0-flash") +dspy.configure(lm=lm) + +qa = dspy.Predict(VideoQA) +qa(video=dspy.Video("./my_video.mp4"), question="What is happening in this video?") +``` + +**Possible Output:** + +```text +Prediction( + answer='The video shows a person walking through a park on a sunny day.' +) +``` + +Note: `dspy.Video` supports local files, URLs, YouTube links, and pre-uploaded file IDs. For videos larger than 20MB, use the `upload()` method to upload via Gemini's Files API first. + ## Type Resolution in Signatures DSPy signatures support various annotation types: @@ -183,7 +210,7 @@ DSPy signatures support various annotation types: 2. **Typing module types** like `list[str]`, `dict[str, int]`, `Optional[float]`. `Union[str, int]` 3. **Custom types** defined in your code 4. **Dot notation** for nested types with proper configuration -5. **Special data types** like `dspy.Image, dspy.History` +5. **Special data types** like `dspy.Image`, `dspy.Video`, `dspy.Audio`, `dspy.History` ### Working with Custom Types diff --git a/docs/docs/tutorials/index.md b/docs/docs/tutorials/index.md index a467e1cb48..fdd7eed048 100644 --- a/docs/docs/tutorials/index.md +++ b/docs/docs/tutorials/index.md @@ -25,6 +25,7 @@ Welcome to DSPy tutorials! We've organized our tutorials into three main categor - [Program Of Thought](program_of_thought/index.ipynb) - [Image Generation Prompt iteration](image_generation_prompting/index.ipynb) - [Audio](audio/index.ipynb) + - [Video Understanding with Gemini](video/index.md) - Optimize AI Programs with DSPy diff --git a/docs/docs/tutorials/video/index.md b/docs/docs/tutorials/video/index.md new file mode 100644 index 0000000000..636b25f445 --- /dev/null +++ b/docs/docs/tutorials/video/index.md @@ -0,0 +1,346 @@ +# Video Understanding with Gemini + +This tutorial demonstrates how to use `dspy.Video` to build video understanding applications with Google's Gemini models. Video understanding enables powerful use cases like video Q&A, content summarization, scene detection, and more. + +## Prerequisites + +- A Google AI Studio API key (get one at [aistudio.google.com](https://aistudio.google.com)) +- DSPy installed with LiteLLM support + +```bash +pip install dspy-ai +``` + +## Setup + +First, configure DSPy with a Gemini model: + +```python +import dspy +import os + +os.environ["GEMINI_API_KEY"] = "your-api-key-here" + +# Configure with Gemini 2.0 Flash (recommended for video) +lm = dspy.LM("gemini/gemini-2.0-flash") +dspy.configure(lm=lm) +``` + +## Basic Video Question-Answering + +The simplest use case is asking questions about a video: + +```python +class VideoQA(dspy.Signature): + """Answer questions about a video.""" + video: dspy.Video = dspy.InputField(desc="The video to analyze") + question: str = dspy.InputField(desc="Question about the video") + answer: str = dspy.OutputField(desc="Answer based on the video content") + +qa = dspy.Predict(VideoQA) + +# From a local file +result = qa( + video=dspy.Video("./my_video.mp4"), + question="What is happening in this video?" +) +print(result.answer) +``` + +## Different Video Sources + +`dspy.Video` supports multiple input sources: + +### Local Files + +```python +# Direct path +video = dspy.Video("./path/to/video.mp4") + +# Using factory method +video = dspy.Video.from_path("./path/to/video.mp4") +``` + +### Remote URLs + +```python +# HTTP/HTTPS URLs +video = dspy.Video("https://example.com/video.mp4") + +# Using factory method +video = dspy.Video.from_url("https://example.com/video.mp4") +``` + +### YouTube Videos + +Gemini natively supports YouTube URLs - no download required: + +```python +# YouTube watch URLs +video = dspy.Video("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + +# Short URLs +video = dspy.Video("https://youtu.be/dQw4w9WgXcQ") + +# Using factory method +video = dspy.Video.from_youtube("https://www.youtube.com/watch?v=dQw4w9WgXcQ") +``` + +### Pre-uploaded Files (Gemini Files API) + +For videos you've already uploaded: + +```python +video = dspy.Video.from_file_id("files/abc123", mime_type="video/mp4") +``` + +## Video Summarization + +Create summaries of video content: + +```python +class VideoSummary(dspy.Signature): + """Generate a detailed summary of video content.""" + video: dspy.Video = dspy.InputField(desc="The video to summarize") + summary: str = dspy.OutputField(desc="Comprehensive summary of the video") + +summarize = dspy.ChainOfThought(VideoSummary) +result = summarize(video=dspy.Video("./presentation.mp4")) +print(result.summary) +``` + +## Timestamp-Based Analysis + +Ask about specific moments in a video: + +```python +class TimestampAnalysis(dspy.Signature): + """Analyze specific moments in a video.""" + video: dspy.Video = dspy.InputField() + question: str = dspy.InputField() + timestamp: str = dspy.OutputField(desc="Timestamp in MM:SS format") + description: str = dspy.OutputField(desc="What happens at this moment") + +analyze = dspy.Predict(TimestampAnalysis) +result = analyze( + video=dspy.Video("./tutorial.mp4"), + question="When does the speaker introduce the main topic?" +) +print(f"At {result.timestamp}: {result.description}") +``` + +## Working with Multiple Videos + +Compare or analyze multiple videos together: + +```python +class VideoComparison(dspy.Signature): + """Compare multiple videos.""" + videos: list[dspy.Video] = dspy.InputField(desc="Videos to compare") + aspect: str = dspy.InputField(desc="What aspect to compare") + comparison: str = dspy.OutputField(desc="Detailed comparison") + +compare = dspy.Predict(VideoComparison) +result = compare( + videos=[ + dspy.Video("./video1.mp4"), + dspy.Video("./video2.mp4"), + ], + aspect="presentation style and clarity" +) +print(result.comparison) +``` + +## Combining Video with Other Inputs + +### Video + Text Context + +```python +class VideoWithContext(dspy.Signature): + """Analyze video with additional context.""" + video: dspy.Video = dspy.InputField() + context: str = dspy.InputField(desc="Background information") + question: str = dspy.InputField() + answer: str = dspy.OutputField() + +analyze = dspy.Predict(VideoWithContext) +result = analyze( + video=dspy.Video("./product_demo.mp4"), + context="This is a demo of our new software product targeting enterprise customers.", + question="What features are highlighted and how well are they explained?" +) +``` + +### Video + Image (Thumbnail Analysis) + +```python +class VideoWithThumbnail(dspy.Signature): + """Analyze video and its thumbnail.""" + video: dspy.Video = dspy.InputField() + thumbnail: dspy.Image = dspy.InputField() + question: str = dspy.InputField() + answer: str = dspy.OutputField() + +analyze = dspy.Predict(VideoWithThumbnail) +result = analyze( + video=dspy.Video("./video.mp4"), + thumbnail=dspy.Image("./thumbnail.jpg"), + question="Does the thumbnail accurately represent the video content?" +) +``` + +## Handling Large Videos (>20MB) + +For videos larger than 20MB, you need to upload them to Gemini's Files API first: + +```python +# This will raise an error for large files +# video = dspy.Video("./large_video.mp4") + +# Instead, upload first +video = dspy.Video.from_path("./large_video.mp4") +uploaded_video = video.upload() # Uploads to Gemini Files API + +# Now use the uploaded video +result = qa(video=uploaded_video, question="What happens in this video?") +``` + +You can also pass the API key explicitly: + +```python +uploaded_video = video.upload(api_key="your-api-key") +``` + +## Building a Video Content Moderator + +Here's a practical example of a content moderation system: + +```python +from typing import Literal + +class ContentModeration(dspy.Signature): + """Analyze video for content policy violations.""" + video: dspy.Video = dspy.InputField(desc="Video to moderate") + + is_safe: bool = dspy.OutputField(desc="Whether the content is safe") + category: Literal["safe", "violence", "adult", "hate_speech", "other"] = dspy.OutputField() + confidence: Literal["high", "medium", "low"] = dspy.OutputField() + explanation: str = dspy.OutputField(desc="Brief explanation of the decision") + +moderate = dspy.ChainOfThought(ContentModeration) + +def check_video(video_path: str) -> dict: + result = moderate(video=dspy.Video(video_path)) + return { + "is_safe": result.is_safe, + "category": result.category, + "confidence": result.confidence, + "explanation": result.explanation + } + +# Usage +moderation_result = check_video("./user_upload.mp4") +if not moderation_result["is_safe"]: + print(f"Content flagged: {moderation_result['category']}") + print(f"Reason: {moderation_result['explanation']}") +``` + +## Building a Video Search System + +Create a system that finds relevant moments in videos: + +```python +import json + +class VideoSearch(dspy.Signature): + """Find moments in a video matching a query.""" + video: dspy.Video = dspy.InputField() + query: str = dspy.InputField(desc="What to search for in the video") + + found: bool = dspy.OutputField(desc="Whether matching content was found") + moments: str = dspy.OutputField(desc="JSON list of {timestamp, description} objects") + +search = dspy.Predict(VideoSearch) + +def find_moments(video_path: str, query: str) -> list[dict]: + result = search(video=dspy.Video(video_path), query=query) + if result.found: + return json.loads(result.moments) + return [] + +# Usage +moments = find_moments("./lecture.mp4", "when the professor explains machine learning") +for moment in moments: + print(f"{moment['timestamp']}: {moment['description']}") +``` + +## Optimizing Video Programs with DSPy + +You can optimize your video programs just like any other DSPy program: + +```python +from dspy.teleprompt import BootstrapFewShot + +# Define your signature +class VideoClassification(dspy.Signature): + """Classify video content type.""" + video: dspy.Video = dspy.InputField() + category: Literal["tutorial", "entertainment", "news", "sports", "other"] = dspy.OutputField() + +# Create training examples +trainset = [ + dspy.Example( + video=dspy.Video("./examples/tutorial1.mp4"), + category="tutorial" + ).with_inputs("video"), + dspy.Example( + video=dspy.Video("./examples/news1.mp4"), + category="news" + ).with_inputs("video"), + # ... more examples +] + +# Create and optimize +classifier = dspy.Predict(VideoClassification) +optimizer = BootstrapFewShot(metric=lambda example, pred, trace: example.category == pred.category) +optimized_classifier = optimizer.compile(classifier, trainset=trainset) +``` + +## Best Practices + +1. **Choose the right video source**: Use YouTube URLs when possible (no upload needed). Use local files for private content. + +2. **Keep videos concise**: Gemini processes videos at 1 FPS. Longer videos use more tokens (~300 tokens/second). + +3. **Be specific in questions**: Instead of "What's in this video?", ask "What product features are demonstrated in the first 30 seconds?" + +4. **Use ChainOfThought for complex analysis**: For nuanced tasks, `dspy.ChainOfThought` helps the model reason through the video content. + +5. **Handle large files appropriately**: Always use `upload()` for videos over 20MB to avoid errors. + +6. **Consider token limits**: A 2-minute video uses ~36,000 tokens. Plan your context budget accordingly. + +## Supported Models + +Video understanding works best with these Gemini models: + +| Model | Video Support | Notes | +|-------|--------------|-------| +| `gemini/gemini-2.0-flash` | Full | Recommended for most use cases | +| `gemini/gemini-2.0-pro` | Full | Better quality, higher latency | +| `gemini/gemini-1.5-flash` | Full | Good balance of speed/quality | +| `gemini/gemini-1.5-pro` | Full | Highest quality | + +## Limitations + +- **Inline size limit**: 20MB (use Files API for larger videos) +- **Maximum duration**: 2 hours at default resolution +- **Frame sampling**: 1 FPS (fast action may lose detail) +- **YouTube free tier**: 8 hours of video per day +- **Provider support**: Currently Gemini-only + +## Next Steps + +- Check out the [API Reference](/api/primitives/Video) for complete method documentation +- Learn about [Signatures](/learn/programming/signatures) for more complex use cases +- Explore [Optimizers](/learn/optimization/optimizers) to improve your video programs diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 74e6ebce08..9f30416700 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -43,6 +43,7 @@ nav: - Program Of Thought: tutorials/program_of_thought/index.ipynb - Image Generation Prompt iteration: tutorials/image_generation_prompting/index.ipynb - Audio: tutorials/audio/index.ipynb + - Video Understanding with Gemini: tutorials/video/index.md - Optimize AI Programs with DSPy: - Overview: tutorials/optimize_ai_program/index.md - Math Reasoning: tutorials/math/index.ipynb @@ -144,6 +145,7 @@ nav: - Prediction: api/primitives/Prediction.md - Tool: api/primitives/Tool.md - ToolCalls: api/primitives/ToolCalls.md + - Video: api/primitives/Video.md - Signatures: - InputField: api/signatures/InputField.md - OutputField: api/signatures/OutputField.md diff --git a/docs/scripts/generate_api_docs.py b/docs/scripts/generate_api_docs.py index 3dcf351a98..e46191ac67 100644 --- a/docs/scripts/generate_api_docs.py +++ b/docs/scripts/generate_api_docs.py @@ -20,6 +20,7 @@ dspy.Prediction, dspy.Tool, dspy.ToolCalls, + dspy.Video, ], "signatures": [ dspy.Signature, diff --git a/dspy/__init__.py b/dspy/__init__.py index 3de46932b8..57959481c7 100644 --- a/dspy/__init__.py +++ b/dspy/__init__.py @@ -6,7 +6,7 @@ from dspy.evaluate import Evaluate # isort: skip from dspy.clients import * # isort: skip -from dspy.adapters import Adapter, ChatAdapter, JSONAdapter, XMLAdapter, TwoStepAdapter, Image, Audio, File, History, Type, Tool, ToolCalls, Code, Reasoning # isort: skip +from dspy.adapters import Adapter, ChatAdapter, JSONAdapter, XMLAdapter, TwoStepAdapter, Image, Audio, File, History, Type, Tool, ToolCalls, Code, Reasoning, Video # isort: skip from dspy.utils.logging_utils import configure_dspy_loggers, disable_logging, enable_logging from dspy.utils.asyncify import asyncify from dspy.utils.syncify import syncify diff --git a/dspy/adapters/__init__.py b/dspy/adapters/__init__.py index c217d7260e..f40938b604 100644 --- a/dspy/adapters/__init__.py +++ b/dspy/adapters/__init__.py @@ -2,7 +2,7 @@ from dspy.adapters.chat_adapter import ChatAdapter from dspy.adapters.json_adapter import JSONAdapter from dspy.adapters.two_step_adapter import TwoStepAdapter -from dspy.adapters.types import Audio, Code, File, History, Image, Reasoning, Tool, ToolCalls, Type +from dspy.adapters.types import Audio, Code, File, History, Image, Reasoning, Tool, ToolCalls, Type, Video from dspy.adapters.xml_adapter import XMLAdapter __all__ = [ @@ -20,4 +20,5 @@ "Tool", "ToolCalls", "Reasoning", + "Video", ] diff --git a/dspy/adapters/types/__init__.py b/dspy/adapters/types/__init__.py index 5ec8043021..25048e626f 100644 --- a/dspy/adapters/types/__init__.py +++ b/dspy/adapters/types/__init__.py @@ -6,5 +6,6 @@ from dspy.adapters.types.image import Image from dspy.adapters.types.reasoning import Reasoning from dspy.adapters.types.tool import Tool, ToolCalls +from dspy.adapters.types.video import Video -__all__ = ["History", "Image", "Audio", "File", "Type", "Tool", "ToolCalls", "Code", "Reasoning"] +__all__ = ["History", "Image", "Audio", "File", "Type", "Tool", "ToolCalls", "Code", "Reasoning", "Video"] diff --git a/dspy/adapters/types/video.py b/dspy/adapters/types/video.py new file mode 100644 index 0000000000..f62b9d9316 --- /dev/null +++ b/dspy/adapters/types/video.py @@ -0,0 +1,613 @@ +import base64 +import mimetypes +import os +from typing import Any, Union +from urllib.parse import urlparse + +import pydantic + +from dspy.adapters.types.base_type import Type + +# Supported video MIME types for Gemini +VIDEO_MIME_TYPES = { + "mp4": "video/mp4", + "mpeg": "video/mpeg", + "mov": "video/quicktime", + "avi": "video/x-msvideo", + "flv": "video/x-flv", + "mpg": "video/mpeg", + "webm": "video/webm", + "wmv": "video/x-ms-wmv", + "3gp": "video/3gpp", + "3gpp": "video/3gpp", +} + +# Maximum size for inline base64 encoding (20MB) +MAX_INLINE_SIZE_BYTES = 20 * 1024 * 1024 + + +def is_youtube_url(url: str) -> bool: + """Check if a URL is a YouTube video URL.""" + if not isinstance(url, str): + return False + parsed = urlparse(url) + youtube_domains = {"youtube.com", "www.youtube.com", "youtu.be", "m.youtube.com"} + return parsed.netloc in youtube_domains + + +def is_gcs_url(url: str) -> bool: + """Check if a URL is a Google Cloud Storage URL (gs://).""" + if not isinstance(url, str): + return False + return url.startswith("gs://") + + +def is_video_url(url: str) -> bool: + """Check if a string is a valid video URL (HTTP, HTTPS, or GCS).""" + try: + # Check for GCS URLs first + if is_gcs_url(url): + return True + result = urlparse(url) + return all([result.scheme in ("http", "https"), result.netloc]) + except ValueError: + return False + + +def get_video_mime_type(file_path_or_url: str) -> str | None: + """Get MIME type for a video file from its extension.""" + ext = os.path.splitext(urlparse(file_path_or_url).path)[1].lstrip(".").lower() + if ext in VIDEO_MIME_TYPES: + return VIDEO_MIME_TYPES[ext] + mime_type, _ = mimetypes.guess_type(file_path_or_url) + return mime_type + + +class Video(Type): + """A video input type for DSPy, with support for Gemini video understanding. + + The Video type supports multiple input sources: + - Local file paths (encoded as base64 for files <20MB) + - Remote HTTP(S) URLs + - YouTube URLs (native Gemini support) + - Google Cloud Storage URIs (gs://) + - Raw bytes + - Pre-uploaded file IDs from Gemini Files API + + For videos larger than 20MB, use the `upload()` method to upload to Gemini's + Files API first, then use the returned file_id. + + Example: + ```python + import dspy + + # From local file + video = dspy.Video("./my_video.mp4") + + # From URL + video = dspy.Video("https://example.com/video.mp4") + + # From YouTube + video = dspy.Video("https://www.youtube.com/watch?v=VIDEO_ID") + + # From Google Cloud Storage + video = dspy.Video("gs://my-bucket/videos/sample.mp4") + + # From pre-uploaded file ID + video = dspy.Video(file_id="files/abc123", mime_type="video/mp4") + + # Upload large video first + video = dspy.Video("./large_video.mp4") + uploaded_video = video.upload() # Returns new Video with file_id + + # Use in a signature + class VideoQA(dspy.Signature): + video: dspy.Video = dspy.InputField() + question: str = dspy.InputField() + answer: str = dspy.OutputField() + + qa = dspy.Predict(VideoQA) + result = qa(video=dspy.Video("./clip.mp4"), question="What happens in this video?") + ``` + + Note: + Video understanding is currently best supported by Gemini models. + This uses LiteLLM's beta Files API support for Gemini. + """ + + url: str | None = None + file_id: str | None = None + filename: str | None = None + mime_type: str | None = None + + model_config = pydantic.ConfigDict( + frozen=True, + str_strip_whitespace=True, + validate_assignment=True, + extra="forbid", + ) + + def __init__(self, source: Any = None, **data): + """Create a Video. + + Parameters + ---------- + source: + The video source. Supported values include: + + - ``str``: Local file path, HTTP(S) URL, YouTube URL, or data URI + - ``dict`` with video fields (url, file_id, filename, mime_type) + + Any additional keyword arguments are passed to :class:`pydantic.BaseModel`. + """ + if source is not None and not any(k in data for k in ("url", "file_id", "filename", "mime_type")): + # Process positional argument + if isinstance(source, Video): + data["url"] = source.url + data["file_id"] = source.file_id + data["filename"] = source.filename + data["mime_type"] = source.mime_type + elif isinstance(source, str): + # Encode string input (path, URL, data URI, YouTube) + encoded = encode_video_to_dict(source) + data.update(encoded) + elif isinstance(source, dict): + data.update(source) + else: + raise ValueError(f"Unsupported video source type: {type(source)}") + + super().__init__(**data) + + def format(self) -> list[dict[str, Any]]: + """Format the video for LLM API consumption. + + Returns OpenAI-compatible content blocks that LiteLLM translates for Gemini. + """ + try: + if self.file_id: + # Pre-uploaded file via Gemini Files API + file_dict = {"file_id": self.file_id} + if self.filename: + file_dict["filename"] = self.filename + if self.mime_type: + file_dict["format"] = self.mime_type + return [{"type": "file", "file": file_dict}] + + if self.url: + if is_youtube_url(self.url): + # YouTube URLs - pass through for Gemini native handling + # Must include format so LiteLLM uses file_uri instead of trying to download + return [{"type": "file", "file": {"file_data": self.url, "filename": self.filename or "youtube_video", "format": "video/mp4"}}] + + if is_gcs_url(self.url): + # GCS URLs (gs://) - Gemini natively supports these + # LiteLLM extracts MIME type from extension, but we pass it if available + file_dict = {"file_data": self.url} + if self.filename: + file_dict["filename"] = self.filename + if self.mime_type: + file_dict["format"] = self.mime_type + else: + # Try to detect MIME type from the GCS path extension + detected_mime = get_video_mime_type(self.url) + if detected_mime: + file_dict["format"] = detected_mime + return [{"type": "file", "file": file_dict}] + + if self.url.startswith("data:"): + # Already a data URI + file_dict = {"file_data": self.url} + if self.filename: + file_dict["filename"] = self.filename + return [{"type": "file", "file": file_dict}] + + # Remote URL - pass through + file_dict = {"file_data": self.url} + if self.filename: + file_dict["filename"] = self.filename + if self.mime_type: + file_dict["format"] = self.mime_type + return [{"type": "file", "file": file_dict}] + + raise ValueError("Video must have either url or file_id") + + except Exception as e: + raise ValueError(f"Failed to format video for DSPy: {e}") + + def __str__(self): + return self.serialize_model() + + def __repr__(self): + parts = [] + if self.url is not None: + if self.url.startswith("data:"): + mime = self.url.split(";")[0].split(":")[1] if ":" in self.url else "unknown" + len_data = len(self.url.split("base64,")[1]) if "base64," in self.url else len(self.url) + parts.append(f"url=") + elif is_youtube_url(self.url): + parts.append(f"url='{self.url}' (YouTube)") + elif is_gcs_url(self.url): + parts.append(f"url='{self.url}' (GCS)") + else: + parts.append(f"url='{self.url}'") + if self.file_id is not None: + parts.append(f"file_id='{self.file_id}'") + if self.filename is not None: + parts.append(f"filename='{self.filename}'") + if self.mime_type is not None: + parts.append(f"mime_type='{self.mime_type}'") + return f"Video({', '.join(parts)})" + + @classmethod + def from_path(cls, file_path: str, filename: str | None = None, mime_type: str | None = None) -> "Video": + """Create a Video from a local file path. + + Args: + file_path: Path to the video file + filename: Optional filename to use (defaults to basename of path) + mime_type: Optional MIME type (defaults to auto-detection) + + Returns: + Video instance with base64-encoded data URI + + Raises: + ValueError: If file not found or exceeds inline size limit + """ + if not os.path.isfile(file_path): + raise ValueError(f"File not found: {file_path}") + + file_size = os.path.getsize(file_path) + if file_size > MAX_INLINE_SIZE_BYTES: + raise ValueError( + f"Video file ({file_size / 1024 / 1024:.1f}MB) exceeds {MAX_INLINE_SIZE_BYTES / 1024 / 1024:.0f}MB " + f"inline limit. Use Video.upload_from_path('{file_path}') to upload via Gemini Files API." + ) + + with open(file_path, "rb") as f: + video_bytes = f.read() + + if filename is None: + filename = os.path.basename(file_path) + + if mime_type is None: + mime_type = get_video_mime_type(file_path) + if mime_type is None: + raise ValueError(f"Could not determine MIME type for file: {file_path}") + + encoded_data = base64.b64encode(video_bytes).decode("utf-8") + url = f"data:{mime_type};base64,{encoded_data}" + + return cls(url=url, filename=filename, mime_type=mime_type) + + @classmethod + def from_bytes( + cls, + video_bytes: bytes, + mime_type: str, + filename: str | None = None, + ) -> "Video": + """Create a Video from raw bytes. + + Args: + video_bytes: Raw video bytes + mime_type: MIME type (e.g., 'video/mp4') + filename: Optional filename + + Returns: + Video instance with base64-encoded data URI + """ + if len(video_bytes) > MAX_INLINE_SIZE_BYTES: + raise ValueError( + f"Video data ({len(video_bytes) / 1024 / 1024:.1f}MB) exceeds " + f"{MAX_INLINE_SIZE_BYTES / 1024 / 1024:.0f}MB inline limit. " + f"Upload via Gemini Files API first." + ) + + encoded_data = base64.b64encode(video_bytes).decode("utf-8") + url = f"data:{mime_type};base64,{encoded_data}" + return cls(url=url, filename=filename, mime_type=mime_type) + + @classmethod + def from_url(cls, url: str, filename: str | None = None, mime_type: str | None = None) -> "Video": + """Create a Video from a remote URL. + + Args: + url: HTTP(S) URL to the video, or YouTube URL + filename: Optional filename + mime_type: Optional MIME type (auto-detected from URL if not provided) + + Returns: + Video instance referencing the URL + """ + if not is_video_url(url) and not is_youtube_url(url): + raise ValueError(f"Invalid video URL: {url}") + + if mime_type is None and not is_youtube_url(url): + mime_type = get_video_mime_type(url) + + return cls(url=url, filename=filename, mime_type=mime_type) + + @classmethod + def from_youtube(cls, url: str) -> "Video": + """Create a Video from a YouTube URL. + + Args: + url: YouTube video URL (youtube.com/watch or youtu.be format) + + Returns: + Video instance for YouTube URL + + Note: + Gemini natively supports YouTube URLs. Free tier allows up to 8 hours + of YouTube video per day; paid tiers have no restrictions. + """ + if not is_youtube_url(url): + raise ValueError(f"Not a valid YouTube URL: {url}") + return cls(url=url, filename="youtube_video") + + @classmethod + def from_file_id(cls, file_id: str, filename: str | None = None, mime_type: str | None = None) -> "Video": + """Create a Video from a pre-uploaded Gemini file ID. + + Args: + file_id: The file ID returned from Gemini Files API upload + filename: Optional filename + mime_type: Optional MIME type + + Returns: + Video instance referencing the uploaded file + """ + return cls(file_id=file_id, filename=filename, mime_type=mime_type) + + @classmethod + def from_gcs(cls, gcs_uri: str, filename: str | None = None, mime_type: str | None = None) -> "Video": + """Create a Video from a Google Cloud Storage URI. + + Args: + gcs_uri: GCS URI in format gs://bucket-name/path/to/video.mp4 + filename: Optional filename (defaults to basename of path) + mime_type: Optional MIME type (auto-detected from URI extension if not provided) + + Returns: + Video instance referencing the GCS URI + + Note: + The GCS bucket must be accessible to the Gemini API. For Vertex AI, + this typically means the bucket should be in the same project or + have appropriate IAM permissions. + + Example: + ```python + video = dspy.Video.from_gcs("gs://my-bucket/videos/sample.mp4") + result = qa(video=video, question="What is in this video?") + ``` + """ + if not is_gcs_url(gcs_uri): + raise ValueError(f"Not a valid GCS URI (must start with gs://): {gcs_uri}") + + if mime_type is None: + mime_type = get_video_mime_type(gcs_uri) + + if filename is None: + # Extract filename from GCS path + filename = os.path.basename(gcs_uri) + + return cls(url=gcs_uri, filename=filename, mime_type=mime_type) + + @classmethod + def upload_from_path(cls, file_path: str, api_key: str | None = None, mime_type: str | None = None) -> "Video": + """Upload a video file directly to Gemini Files API. + + Use this method for videos of any size, especially those larger than 20MB + that cannot be inlined. This bypasses the inline size limit entirely. + + Args: + file_path: Path to the video file to upload + api_key: Optional Gemini API key. If not provided, uses GEMINI_API_KEY + environment variable. + mime_type: Optional MIME type (auto-detected from file extension if not provided) + + Returns: + Video instance with file_id set (ready to use with Gemini) + + Raises: + ValueError: If file not found or no API key available + ImportError: If litellm doesn't support create_file + + Example: + ```python + # Upload a large video directly + video = dspy.Video.upload_from_path("./large_video.mp4") + + # Use it in your signature + result = qa(video=video, question="What happens in this video?") + ``` + """ + if not os.path.isfile(file_path): + raise ValueError(f"File not found: {file_path}") + + try: + from litellm import create_file + except ImportError: + raise ImportError( + "litellm.create_file is required for video uploads. " + "Please update litellm: pip install --upgrade litellm" + ) + + if api_key is None: + api_key = os.getenv("GEMINI_API_KEY") + if api_key is None: + raise ValueError( + "No API key provided. Set GEMINI_API_KEY environment variable " + "or pass api_key parameter." + ) + + # Detect MIME type + if mime_type is None: + mime_type = get_video_mime_type(file_path) + if mime_type is None: + mime_type = "video/mp4" # Default fallback + + # Read and upload the file + with open(file_path, "rb") as f: + video_data = f.read() + + filename = os.path.basename(file_path) + + # Pass file as tuple (filename, data, content_type) to include MIME type + response = create_file( + file=(filename, video_data, mime_type), + purpose="user_data", + custom_llm_provider="gemini", + api_key=api_key, + ) + + return cls( + file_id=response.id, + filename=filename, + mime_type=mime_type, + ) + + def upload(self, api_key: str | None = None) -> "Video": + """Upload video to Gemini Files API and return a new Video with file_id. + + This method uploads the video data to Google's servers using LiteLLM's + Files API support. Use this for videos that are already loaded as data URIs. + + For large files (>20MB), use `Video.upload_from_path()` instead. + + Args: + api_key: Optional Gemini API key. If not provided, uses GEMINI_API_KEY + environment variable. + + Returns: + New Video instance with file_id set + + Raises: + ValueError: If video has no uploadable data + ImportError: If litellm doesn't support create_file + + Example: + ```python + video = dspy.Video("./small_video.mp4") # Must be <20MB + uploaded = video.upload() + # Use uploaded.file_id in your requests + ``` + """ + try: + from litellm import create_file + except ImportError: + raise ImportError( + "litellm.create_file is required for video uploads. " + "Please update litellm: pip install --upgrade litellm" + ) + + if api_key is None: + api_key = os.getenv("GEMINI_API_KEY") + if api_key is None: + raise ValueError( + "No API key provided. Set GEMINI_API_KEY environment variable " + "or pass api_key parameter." + ) + + # Get video data to upload + if self.url and self.url.startswith("data:"): + # Extract bytes from data URI + header, b64data = self.url.split(",", 1) + video_data = base64.b64decode(b64data) + elif self.url and os.path.isfile(self.url): + # Read from file path + with open(self.url, "rb") as f: + video_data = f.read() + elif self.url and is_video_url(self.url) and not is_youtube_url(self.url): + # Download from URL + import requests + response = requests.get(self.url) + response.raise_for_status() + video_data = response.content + else: + raise ValueError( + "Cannot upload: Video must have a data URI, local file path, or remote URL. " + "YouTube URLs and file_ids don't need uploading." + ) + + # Upload to Gemini - pass file as tuple (filename, data, content_type) to include MIME type + upload_filename = self.filename or "video" + upload_mime_type = self.mime_type or "video/mp4" + response = create_file( + file=(upload_filename, video_data, upload_mime_type), + purpose="user_data", + custom_llm_provider="gemini", + api_key=api_key, + ) + + return Video( + file_id=response.id, + filename=self.filename, + mime_type=self.mime_type, + ) + + +def encode_video_to_dict(video_input: Any) -> dict: + """Encode various video inputs to a dict with url, file_id, filename, and/or mime_type. + + Args: + video_input: Can be a file path (str), URL (str), bytes, or Video instance. + + Returns: + dict: A dictionary with video fields. + """ + if isinstance(video_input, Video): + result = {} + if video_input.url is not None: + result["url"] = video_input.url + if video_input.file_id is not None: + result["file_id"] = video_input.file_id + if video_input.filename is not None: + result["filename"] = video_input.filename + if video_input.mime_type is not None: + result["mime_type"] = video_input.mime_type + return result + + elif isinstance(video_input, str): + # Check if it's a data URI + if video_input.startswith("data:video/"): + mime_match = video_input.split(";")[0].split(":")[1] if ":" in video_input else None + return {"url": video_input, "mime_type": mime_match} + + # Check if it's a YouTube URL + if is_youtube_url(video_input): + return {"url": video_input, "filename": "youtube_video"} + + # Check if it's a GCS URL + if is_gcs_url(video_input): + mime_type = get_video_mime_type(video_input) + filename = os.path.basename(video_input) + return {"url": video_input, "filename": filename, "mime_type": mime_type} + + # Check if it's a local file + if os.path.isfile(video_input): + video_obj = Video.from_path(video_input) + return { + "url": video_obj.url, + "filename": video_obj.filename, + "mime_type": video_obj.mime_type, + } + + # Check if it's a URL + if is_video_url(video_input): + mime_type = get_video_mime_type(video_input) + return {"url": video_input, "mime_type": mime_type} + + raise ValueError( + f"Unrecognized video string: {video_input}; must be a valid file path, URL, or YouTube URL" + ) + + elif isinstance(video_input, bytes): + raise ValueError( + "Cannot create Video from bytes without mime_type. " + "Use Video.from_bytes(data, mime_type='video/mp4') instead." + ) + + else: + raise ValueError(f"Unsupported video input type: {type(video_input)}") diff --git a/tests/adapters/test_video.py b/tests/adapters/test_video.py new file mode 100644 index 0000000000..6e9502c4c2 --- /dev/null +++ b/tests/adapters/test_video.py @@ -0,0 +1,523 @@ +import base64 +import os +import tempfile + +import pydantic +import pytest + +import dspy +from dspy.adapters.types.video import ( + VIDEO_MIME_TYPES, + MAX_INLINE_SIZE_BYTES, + encode_video_to_dict, + get_video_mime_type, + is_gcs_url, + is_video_url, + is_youtube_url, +) + + +@pytest.fixture +def sample_video_file(): + """Create a small fake video file for testing.""" + # Create a minimal MP4-like file (not a real video, but enough for testing file handling) + with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=".mp4") as tmp_file: + # Write some fake video data + tmp_file.write(b"\x00\x00\x00\x1c\x66\x74\x79\x70\x69\x73\x6f\x6d") # Fake MP4 header + tmp_file.write(b"\x00" * 100) # Padding + tmp_file_path = tmp_file.name + yield tmp_file_path + try: + os.unlink(tmp_file_path) + except Exception: + pass + + +@pytest.fixture +def sample_webm_file(): + """Create a small fake WebM file for testing.""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=".webm") as tmp_file: + tmp_file.write(b"\x1a\x45\xdf\xa3") # Fake WebM header + tmp_file.write(b"\x00" * 100) + tmp_file_path = tmp_file.name + yield tmp_file_path + try: + os.unlink(tmp_file_path) + except Exception: + pass + + +# ============================================================================ +# Helper function tests +# ============================================================================ + + +class TestIsYoutubeUrl: + def test_youtube_watch_url(self): + assert is_youtube_url("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + + def test_youtube_short_url(self): + assert is_youtube_url("https://youtu.be/dQw4w9WgXcQ") + + def test_youtube_mobile_url(self): + assert is_youtube_url("https://m.youtube.com/watch?v=dQw4w9WgXcQ") + + def test_youtube_no_www(self): + assert is_youtube_url("https://youtube.com/watch?v=dQw4w9WgXcQ") + + def test_non_youtube_url(self): + assert not is_youtube_url("https://vimeo.com/123456") + + def test_non_url_string(self): + assert not is_youtube_url("not a url") + + def test_empty_string(self): + assert not is_youtube_url("") + + def test_non_string(self): + assert not is_youtube_url(123) + + +class TestIsVideoUrl: + def test_https_url(self): + assert is_video_url("https://example.com/video.mp4") + + def test_http_url(self): + assert is_video_url("http://example.com/video.mp4") + + def test_invalid_scheme(self): + assert not is_video_url("ftp://example.com/video.mp4") + + def test_local_path(self): + assert not is_video_url("/path/to/video.mp4") + + def test_empty_string(self): + assert not is_video_url("") + + def test_gcs_url(self): + assert is_video_url("gs://my-bucket/videos/sample.mp4") + + def test_gcs_url_nested_path(self): + assert is_video_url("gs://bucket-name/path/to/nested/video.webm") + + +class TestIsGcsUrl: + def test_gcs_url_basic(self): + assert is_gcs_url("gs://my-bucket/video.mp4") + + def test_gcs_url_nested(self): + assert is_gcs_url("gs://bucket-name/path/to/video.mp4") + + def test_gcs_url_no_path(self): + assert is_gcs_url("gs://bucket-name/") + + def test_not_gcs_https(self): + assert not is_gcs_url("https://storage.googleapis.com/bucket/video.mp4") + + def test_not_gcs_http(self): + assert not is_gcs_url("http://example.com/video.mp4") + + def test_not_gcs_local_path(self): + assert not is_gcs_url("/path/to/video.mp4") + + def test_not_gcs_empty(self): + assert not is_gcs_url("") + + def test_not_gcs_non_string(self): + assert not is_gcs_url(123) + + +class TestGetVideoMimeType: + @pytest.mark.parametrize( + "path,expected", + [ + ("video.mp4", "video/mp4"), + ("video.webm", "video/webm"), + ("video.mov", "video/quicktime"), + ("video.avi", "video/x-msvideo"), + ("video.3gp", "video/3gpp"), + ("video.mpeg", "video/mpeg"), + ("video.mpg", "video/mpeg"), + ("https://example.com/video.mp4", "video/mp4"), + ], + ) + def test_known_extensions(self, path, expected): + assert get_video_mime_type(path) == expected + + def test_unknown_extension(self): + # Should fall back to mimetypes module or return None + result = get_video_mime_type("video.xyz") + assert result is None or isinstance(result, str) + + +# ============================================================================ +# Video class creation tests +# ============================================================================ + + +class TestVideoFromPath: + def test_from_path_basic(self, sample_video_file): + video = dspy.Video.from_path(sample_video_file) + assert video.url is not None + assert video.url.startswith("data:video/mp4;base64,") + assert video.filename == os.path.basename(sample_video_file) + assert video.mime_type == "video/mp4" + + def test_from_path_webm(self, sample_webm_file): + video = dspy.Video.from_path(sample_webm_file) + assert video.url.startswith("data:video/webm;base64,") + assert video.mime_type == "video/webm" + + def test_from_path_custom_filename(self, sample_video_file): + video = dspy.Video.from_path(sample_video_file, filename="custom.mp4") + assert video.filename == "custom.mp4" + + def test_from_path_custom_mime_type(self, sample_video_file): + video = dspy.Video.from_path(sample_video_file, mime_type="video/custom") + assert video.url.startswith("data:video/custom;base64,") + assert video.mime_type == "video/custom" + + def test_from_path_file_not_found(self): + with pytest.raises(ValueError, match="File not found"): + dspy.Video.from_path("/nonexistent/path/video.mp4") + + def test_from_path_size_limit(self, sample_video_file): + # Test that files exceeding the limit raise an error + # We can't easily create a 20MB+ file in tests, so we'll mock or skip this + # For now, just verify the constant exists + assert MAX_INLINE_SIZE_BYTES == 20 * 1024 * 1024 + + +class TestVideoFromBytes: + def test_from_bytes_basic(self): + video_bytes = b"\x00" * 100 + video = dspy.Video.from_bytes(video_bytes, mime_type="video/mp4") + assert video.url is not None + assert video.url.startswith("data:video/mp4;base64,") + assert video.mime_type == "video/mp4" + + def test_from_bytes_with_filename(self): + video_bytes = b"\x00" * 100 + video = dspy.Video.from_bytes(video_bytes, mime_type="video/mp4", filename="test.mp4") + assert video.filename == "test.mp4" + + def test_from_bytes_without_mime_type_fails(self): + # encode_video_to_dict should raise error for bytes without mime_type + with pytest.raises(ValueError, match="Cannot create Video from bytes"): + encode_video_to_dict(b"\x00" * 100) + + +class TestVideoFromUrl: + def test_from_url_basic(self): + video = dspy.Video.from_url("https://example.com/video.mp4") + assert video.url == "https://example.com/video.mp4" + assert video.mime_type == "video/mp4" + + def test_from_url_with_filename(self): + video = dspy.Video.from_url("https://example.com/video.mp4", filename="my_video.mp4") + assert video.filename == "my_video.mp4" + + def test_from_url_custom_mime_type(self): + video = dspy.Video.from_url("https://example.com/video", mime_type="video/webm") + assert video.mime_type == "video/webm" + + def test_from_url_invalid(self): + with pytest.raises(ValueError, match="Invalid video URL"): + dspy.Video.from_url("not_a_url") + + +class TestVideoFromYoutube: + def test_from_youtube_watch(self): + video = dspy.Video.from_youtube("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + assert video.url == "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + assert video.filename == "youtube_video" + + def test_from_youtube_short(self): + video = dspy.Video.from_youtube("https://youtu.be/dQw4w9WgXcQ") + assert video.url == "https://youtu.be/dQw4w9WgXcQ" + + def test_from_youtube_invalid(self): + with pytest.raises(ValueError, match="Not a valid YouTube URL"): + dspy.Video.from_youtube("https://vimeo.com/123456") + + +class TestVideoFromFileId: + def test_from_file_id_basic(self): + video = dspy.Video.from_file_id("files/abc123") + assert video.file_id == "files/abc123" + assert video.url is None + + def test_from_file_id_with_filename(self): + video = dspy.Video.from_file_id("files/abc123", filename="video.mp4") + assert video.file_id == "files/abc123" + assert video.filename == "video.mp4" + + def test_from_file_id_with_mime_type(self): + video = dspy.Video.from_file_id("files/abc123", mime_type="video/mp4") + assert video.file_id == "files/abc123" + assert video.mime_type == "video/mp4" + + +class TestVideoFromGcs: + def test_from_gcs_basic(self): + video = dspy.Video.from_gcs("gs://my-bucket/videos/sample.mp4") + assert video.url == "gs://my-bucket/videos/sample.mp4" + assert video.mime_type == "video/mp4" + assert video.filename == "sample.mp4" + + def test_from_gcs_webm(self): + video = dspy.Video.from_gcs("gs://bucket/path/to/video.webm") + assert video.url == "gs://bucket/path/to/video.webm" + assert video.mime_type == "video/webm" + assert video.filename == "video.webm" + + def test_from_gcs_custom_filename(self): + video = dspy.Video.from_gcs("gs://bucket/video.mp4", filename="custom_name.mp4") + assert video.filename == "custom_name.mp4" + + def test_from_gcs_custom_mime_type(self): + video = dspy.Video.from_gcs("gs://bucket/video", mime_type="video/mp4") + assert video.mime_type == "video/mp4" + + def test_from_gcs_invalid_url(self): + with pytest.raises(ValueError, match="Not a valid GCS URI"): + dspy.Video.from_gcs("https://example.com/video.mp4") + + +class TestVideoDirectConstruction: + def test_from_local_path_string(self, sample_video_file): + video = dspy.Video(sample_video_file) + assert video.url is not None + assert video.url.startswith("data:video/mp4;base64,") + + def test_from_url_string(self): + video = dspy.Video("https://example.com/video.mp4") + assert video.url == "https://example.com/video.mp4" + + def test_from_youtube_string(self): + video = dspy.Video("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + assert video.url == "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + + def test_from_gcs_string(self): + video = dspy.Video("gs://my-bucket/videos/sample.mp4") + assert video.url == "gs://my-bucket/videos/sample.mp4" + assert video.mime_type == "video/mp4" + assert video.filename == "sample.mp4" + + def test_from_data_uri(self): + data_uri = "data:video/mp4;base64,AAAA" + video = dspy.Video(data_uri) + assert video.url == data_uri + + def test_from_dict_with_url(self): + video = dspy.Video(url="https://example.com/video.mp4", mime_type="video/mp4") + assert video.url == "https://example.com/video.mp4" + assert video.mime_type == "video/mp4" + + def test_from_dict_with_file_id(self): + video = dspy.Video(file_id="files/abc123", filename="video.mp4") + assert video.file_id == "files/abc123" + assert video.filename == "video.mp4" + + def test_invalid_dict_raises(self): + with pytest.raises(pydantic.ValidationError, match="Extra inputs are not permitted"): + dspy.Video(invalid="field") + + def test_invalid_string_raises(self): + with pytest.raises(ValueError, match="Unrecognized video string"): + dspy.Video("not_a_path_or_url") + + +# ============================================================================ +# Video format() tests +# ============================================================================ + + +class TestVideoFormat: + def test_format_with_file_id(self): + video = dspy.Video(file_id="files/abc123", filename="video.mp4", mime_type="video/mp4") + formatted = video.format() + assert isinstance(formatted, list) + assert len(formatted) == 1 + assert formatted[0]["type"] == "file" + assert formatted[0]["file"]["file_id"] == "files/abc123" + assert formatted[0]["file"]["filename"] == "video.mp4" + assert formatted[0]["file"]["format"] == "video/mp4" + + def test_format_with_data_uri(self): + data_uri = "data:video/mp4;base64,AAAA" + video = dspy.Video(url=data_uri, filename="test.mp4") + formatted = video.format() + assert formatted[0]["type"] == "file" + assert formatted[0]["file"]["file_data"] == data_uri + assert formatted[0]["file"]["filename"] == "test.mp4" + + def test_format_with_remote_url(self): + video = dspy.Video(url="https://example.com/video.mp4", mime_type="video/mp4") + formatted = video.format() + assert formatted[0]["type"] == "file" + assert formatted[0]["file"]["file_data"] == "https://example.com/video.mp4" + assert formatted[0]["file"]["format"] == "video/mp4" + + def test_format_with_youtube_url(self): + video = dspy.Video.from_youtube("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + formatted = video.format() + assert formatted[0]["type"] == "file" + assert formatted[0]["file"]["file_data"] == "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + assert formatted[0]["file"]["filename"] == "youtube_video" + + def test_format_with_gcs_url(self): + video = dspy.Video.from_gcs("gs://my-bucket/videos/sample.mp4") + formatted = video.format() + assert formatted[0]["type"] == "file" + assert formatted[0]["file"]["file_data"] == "gs://my-bucket/videos/sample.mp4" + assert formatted[0]["file"]["filename"] == "sample.mp4" + assert formatted[0]["file"]["format"] == "video/mp4" + + def test_format_with_gcs_url_no_extension(self): + # When no extension, we should still be able to specify mime_type + video = dspy.Video.from_gcs("gs://bucket/video", mime_type="video/webm") + formatted = video.format() + assert formatted[0]["file"]["file_data"] == "gs://bucket/video" + assert formatted[0]["file"]["format"] == "video/webm" + + def test_format_no_url_or_file_id_raises(self): + # This shouldn't happen in practice due to validation, but test the format method + video = dspy.Video.__new__(dspy.Video) + object.__setattr__(video, "url", None) + object.__setattr__(video, "file_id", None) + object.__setattr__(video, "filename", None) + object.__setattr__(video, "mime_type", None) + with pytest.raises(ValueError, match="must have either url or file_id"): + video.format() + + +# ============================================================================ +# Video repr/str tests +# ============================================================================ + + +class TestVideoRepr: + def test_repr_with_url(self): + video = dspy.Video(url="https://example.com/video.mp4") + repr_str = repr(video) + assert "url='https://example.com/video.mp4'" in repr_str + + def test_repr_with_data_uri(self): + video = dspy.Video(url="data:video/mp4;base64,AAAA") + repr_str = repr(video) + assert "DATA_URI" in repr_str + assert "video/mp4" in repr_str + + def test_repr_with_youtube(self): + video = dspy.Video.from_youtube("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + repr_str = repr(video) + assert "YouTube" in repr_str + + def test_repr_with_gcs(self): + video = dspy.Video.from_gcs("gs://my-bucket/video.mp4") + repr_str = repr(video) + assert "GCS" in repr_str + assert "gs://my-bucket/video.mp4" in repr_str + + def test_repr_with_file_id(self): + video = dspy.Video(file_id="files/abc123") + repr_str = repr(video) + assert "file_id='files/abc123'" in repr_str + + def test_repr_with_filename(self): + video = dspy.Video(file_id="files/abc123", filename="video.mp4") + repr_str = repr(video) + assert "filename='video.mp4'" in repr_str + + def test_repr_with_mime_type(self): + video = dspy.Video(file_id="files/abc123", mime_type="video/mp4") + repr_str = repr(video) + assert "mime_type='video/mp4'" in repr_str + + +class TestVideoStr: + def test_str_contains_markers(self): + video = dspy.Video(url="https://example.com/video.mp4") + str_repr = str(video) + assert "<>" in str_repr + assert "<>" in str_repr + + +# ============================================================================ +# Video immutability tests +# ============================================================================ + + +class TestVideoImmutability: + def test_video_frozen(self): + video = dspy.Video(url="https://example.com/video.mp4") + with pytest.raises((TypeError, ValueError, pydantic.ValidationError)): + video.url = "https://example.com/other.mp4" + + def test_video_from_video_instance(self): + video1 = dspy.Video(url="https://example.com/video.mp4", mime_type="video/mp4") + video2 = dspy.Video(video1) + assert video2.url == video1.url + assert video2.mime_type == video1.mime_type + + +# ============================================================================ +# encode_video_to_dict tests +# ============================================================================ + + +class TestEncodeVideoToDict: + def test_from_video_instance(self, sample_video_file): + video = dspy.Video.from_path(sample_video_file) + result = encode_video_to_dict(video) + assert "url" in result + assert "filename" in result + assert "mime_type" in result + + def test_from_local_path(self, sample_video_file): + result = encode_video_to_dict(sample_video_file) + assert "url" in result + assert result["url"].startswith("data:video/mp4;base64,") + + def test_from_youtube_url(self): + result = encode_video_to_dict("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + assert result["url"] == "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + assert result["filename"] == "youtube_video" + + def test_from_gcs_url(self): + result = encode_video_to_dict("gs://my-bucket/videos/sample.mp4") + assert result["url"] == "gs://my-bucket/videos/sample.mp4" + assert result["filename"] == "sample.mp4" + assert result["mime_type"] == "video/mp4" + + def test_from_remote_url(self): + result = encode_video_to_dict("https://example.com/video.mp4") + assert result["url"] == "https://example.com/video.mp4" + assert result["mime_type"] == "video/mp4" + + def test_from_data_uri(self): + data_uri = "data:video/mp4;base64,AAAA" + result = encode_video_to_dict(data_uri) + assert result["url"] == data_uri + assert result["mime_type"] == "video/mp4" + + def test_unsupported_type(self): + with pytest.raises(ValueError, match="Unsupported video input type"): + encode_video_to_dict(12345) + + +# ============================================================================ +# Constants and configuration tests +# ============================================================================ + + +class TestVideoConstants: + def test_video_mime_types_dict(self): + assert "mp4" in VIDEO_MIME_TYPES + assert "webm" in VIDEO_MIME_TYPES + assert "mov" in VIDEO_MIME_TYPES + assert VIDEO_MIME_TYPES["mp4"] == "video/mp4" + + def test_max_inline_size(self): + assert MAX_INLINE_SIZE_BYTES == 20 * 1024 * 1024 # 20MB diff --git a/tests/signatures/test_adapter_video.py b/tests/signatures/test_adapter_video.py new file mode 100644 index 0000000000..633ec17f3e --- /dev/null +++ b/tests/signatures/test_adapter_video.py @@ -0,0 +1,339 @@ +import os +import tempfile + +import pytest + +import dspy +from dspy.utils.dummies import DummyLM + + +@pytest.fixture +def sample_video_file(): + """Create a small fake video file for testing.""" + with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=".mp4") as tmp_file: + tmp_file.write(b"\x00\x00\x00\x1c\x66\x74\x79\x70\x69\x73\x6f\x6d") + tmp_file.write(b"\x00" * 100) + tmp_file_path = tmp_file.name + yield tmp_file_path + try: + os.unlink(tmp_file_path) + except Exception: + pass + + +def count_messages_with_file_pattern(messages): + """Count the number of file content blocks in messages.""" + pattern = {"type": "file", "file": lambda x: isinstance(x, dict)} + + def check_pattern(obj, pattern): + if isinstance(pattern, dict): + if not isinstance(obj, dict): + return False + return all(k in obj and check_pattern(obj[k], v) for k, v in pattern.items()) + if callable(pattern): + return pattern(obj) + return obj == pattern + + def count_patterns(obj, pattern): + count = 0 + if check_pattern(obj, pattern): + count += 1 + if isinstance(obj, dict): + count += sum(count_patterns(v, pattern) for v in obj.values()) + if isinstance(obj, list | tuple): + count += sum(count_patterns(v, pattern) for v in obj) + return count + + return count_patterns(messages, pattern) + + +def setup_predictor(signature, expected_output): + lm = DummyLM([expected_output]) + dspy.settings.configure(lm=lm) + return dspy.Predict(signature), lm + + +# ============================================================================ +# Basic signature tests +# ============================================================================ + + +class TestVideoInSignature: + def test_video_basic_signature(self, sample_video_file): + """Test video in a basic signature.""" + signature = "video: dspy.Video -> description: str" + expected = {"description": "A video showing something"} + predictor, lm = setup_predictor(signature, expected) + + video = dspy.Video.from_path(sample_video_file) + result = predictor(video=video) + + assert result.description == "A video showing something" + assert count_messages_with_file_pattern(lm.history[-1]["messages"]) == 1 + + def test_video_with_text_input(self, sample_video_file): + """Test video combined with text input.""" + + class VideoQA(dspy.Signature): + video: dspy.Video = dspy.InputField() + question: str = dspy.InputField() + answer: str = dspy.OutputField() + + expected = {"answer": "The video shows a person walking"} + predictor, lm = setup_predictor(VideoQA, expected) + + video = dspy.Video.from_path(sample_video_file) + result = predictor(video=video, question="What is happening in this video?") + + assert result.answer == "The video shows a person walking" + assert count_messages_with_file_pattern(lm.history[-1]["messages"]) == 1 + + def test_video_from_url_in_signature(self): + """Test video from URL in signature.""" + + class VideoSummary(dspy.Signature): + video: dspy.Video = dspy.InputField() + summary: str = dspy.OutputField() + + expected = {"summary": "Summary of video content"} + predictor, lm = setup_predictor(VideoSummary, expected) + + video = dspy.Video.from_url("https://example.com/video.mp4") + result = predictor(video=video) + + assert result.summary == "Summary of video content" + assert count_messages_with_file_pattern(lm.history[-1]["messages"]) == 1 + + def test_video_from_youtube_in_signature(self): + """Test YouTube video in signature.""" + + class YouTubeAnalysis(dspy.Signature): + video: dspy.Video = dspy.InputField() + analysis: str = dspy.OutputField() + + expected = {"analysis": "This YouTube video discusses..."} + predictor, lm = setup_predictor(YouTubeAnalysis, expected) + + video = dspy.Video.from_youtube("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + result = predictor(video=video) + + assert result.analysis == "This YouTube video discusses..." + assert count_messages_with_file_pattern(lm.history[-1]["messages"]) == 1 + + def test_video_from_file_id_in_signature(self): + """Test video from pre-uploaded file_id in signature.""" + + class VideoCaption(dspy.Signature): + video: dspy.Video = dspy.InputField() + caption: str = dspy.OutputField() + + expected = {"caption": "A beautiful sunset"} + predictor, lm = setup_predictor(VideoCaption, expected) + + video = dspy.Video.from_file_id("files/abc123", mime_type="video/mp4") + result = predictor(video=video) + + assert result.caption == "A beautiful sunset" + assert count_messages_with_file_pattern(lm.history[-1]["messages"]) == 1 + + +# ============================================================================ +# List of videos tests +# ============================================================================ + + +class TestVideoListInSignature: + def test_video_list_basic(self, sample_video_file): + """Test list of videos in signature.""" + + class MultiVideoSignature(dspy.Signature): + videos: list[dspy.Video] = dspy.InputField() + comparison: str = dspy.OutputField() + + expected = {"comparison": "The videos show different scenes"} + predictor, lm = setup_predictor(MultiVideoSignature, expected) + + videos = [ + dspy.Video.from_path(sample_video_file), + dspy.Video.from_url("https://example.com/video2.mp4"), + ] + result = predictor(videos=videos) + + assert result.comparison == "The videos show different scenes" + assert count_messages_with_file_pattern(lm.history[-1]["messages"]) == 2 + + def test_mixed_video_sources_list(self, sample_video_file): + """Test list with videos from different sources.""" + + class MixedVideoSignature(dspy.Signature): + videos: list[dspy.Video] = dspy.InputField() + summary: str = dspy.OutputField() + + expected = {"summary": "Combined video analysis"} + predictor, lm = setup_predictor(MixedVideoSignature, expected) + + videos = [ + dspy.Video.from_path(sample_video_file), + dspy.Video.from_youtube("https://www.youtube.com/watch?v=test"), + dspy.Video.from_file_id("files/xyz789"), + ] + result = predictor(videos=videos) + + assert result.summary == "Combined video analysis" + assert count_messages_with_file_pattern(lm.history[-1]["messages"]) == 3 + + +# ============================================================================ +# Optional video tests +# ============================================================================ + + +class TestOptionalVideoInSignature: + def test_optional_video_with_value(self, sample_video_file): + """Test optional video field with a value.""" + + class OptionalVideoSignature(dspy.Signature): + video: dspy.Video | None = dspy.InputField() + output: str = dspy.OutputField() + + expected = {"output": "Video processed"} + predictor, lm = setup_predictor(OptionalVideoSignature, expected) + + video = dspy.Video.from_path(sample_video_file) + result = predictor(video=video) + + assert result.output == "Video processed" + assert count_messages_with_file_pattern(lm.history[-1]["messages"]) == 1 + + def test_optional_video_with_none(self): + """Test optional video field with None.""" + + class OptionalVideoSignature(dspy.Signature): + video: dspy.Video | None = dspy.InputField() + output: str = dspy.OutputField() + + expected = {"output": "No video provided"} + predictor, lm = setup_predictor(OptionalVideoSignature, expected) + + result = predictor(video=None) + + assert result.output == "No video provided" + assert count_messages_with_file_pattern(lm.history[-1]["messages"]) == 0 + + +# ============================================================================ +# Video with other media types tests +# ============================================================================ + + +class TestVideoWithOtherMedia: + def test_video_with_image(self, sample_video_file): + """Test video combined with image in same signature.""" + + class MultiMediaSignature(dspy.Signature): + video: dspy.Video = dspy.InputField() + image: dspy.Image = dspy.InputField() + analysis: str = dspy.OutputField() + + expected = {"analysis": "Combined media analysis"} + predictor, lm = setup_predictor(MultiMediaSignature, expected) + + video = dspy.Video.from_path(sample_video_file) + # Use a simple image URL + image = dspy.Image("https://example.com/image.jpg") + + result = predictor(video=video, image=image) + + assert result.analysis == "Combined media analysis" + # Check both video (file type) and image (image_url type) are present + messages = lm.history[-1]["messages"] + assert count_messages_with_file_pattern(messages) == 1 # Video + + # Count image_url patterns + def count_image_patterns(obj): + if isinstance(obj, dict): + if obj.get("type") == "image_url": + return 1 + return sum(count_image_patterns(v) for v in obj.values()) + if isinstance(obj, list | tuple): + return sum(count_image_patterns(v) for v in obj) + return 0 + + assert count_image_patterns(messages) == 1 # Image + + +# ============================================================================ +# Save/load tests +# ============================================================================ + + +class TestVideoSaveLoad: + def test_save_load_video_predictor(self, sample_video_file): + """Test saving and loading a predictor with video examples.""" + signature = "video: dspy.Video -> description: str" + video = dspy.Video.from_path(sample_video_file) + examples = [dspy.Example(video=video, description="Test description")] + + predictor, lm = setup_predictor(signature, {"description": "A description"}) + optimizer = dspy.teleprompt.LabeledFewShot(k=1) + compiled_predictor = optimizer.compile(student=predictor, trainset=examples, sample=False) + + with tempfile.NamedTemporaryFile(mode="w+", delete=True, suffix=".json") as temp_file: + compiled_predictor.save(temp_file.name) + loaded_predictor = dspy.Predict(signature) + loaded_predictor.load(temp_file.name) + + # Run prediction with loaded predictor + loaded_predictor(video=dspy.Video.from_file_id("files/test")) + + # Should have 2 videos: one from few-shot example, one from input + assert count_messages_with_file_pattern(lm.history[-1]["messages"]) == 2 + + +# ============================================================================ +# String-based signature tests +# ============================================================================ + + +class TestVideoStringSignature: + def test_string_signature_with_video_type(self, sample_video_file): + """Test video type in string-based signature.""" + signature = "video: dspy.Video, prompt: str -> response: str" + expected = {"response": "Here is the analysis"} + predictor, lm = setup_predictor(signature, expected) + + video = dspy.Video.from_path(sample_video_file) + result = predictor(video=video, prompt="Analyze this video") + + assert result.response == "Here is the analysis" + assert count_messages_with_file_pattern(lm.history[-1]["messages"]) == 1 + + +# ============================================================================ +# Chain of Thought with video tests +# ============================================================================ + + +class TestVideoChainOfThought: + def test_cot_with_video(self, sample_video_file): + """Test ChainOfThought module with video input.""" + + class VideoAnalysis(dspy.Signature): + """Analyze a video and provide detailed description.""" + + video: dspy.Video = dspy.InputField(desc="The video to analyze") + description: str = dspy.OutputField(desc="Detailed description of the video") + + expected = { + "reasoning": "Looking at the video frames...", + "description": "The video shows a scenic landscape", + } + lm = DummyLM([expected]) + dspy.settings.configure(lm=lm) + + cot = dspy.ChainOfThought(VideoAnalysis) + video = dspy.Video.from_path(sample_video_file) + result = cot(video=video) + + assert result.description == "The video shows a scenic landscape" + assert count_messages_with_file_pattern(lm.history[-1]["messages"]) == 1