Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions data-pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,21 @@ Input:
Once deployed, trigger the pipeline via the Render API or SDK:

```python
from render_sdk.client import Client
from render_sdk import Render

client = Client(api_key="your_render_api_key")
# Uses RENDER_API_KEY environment variable automatically
render = Render()

# Run the complete pipeline
task_run = client.workflows.run_task(
workflow_service_slug="data-pipeline-workflows",
task_name="run_data_pipeline",
input={
"user_ids": ["user_1", "user_2", "user_3", "user_4"]
}
task_run = await render.workflows.run_task(
"data-pipeline-workflows/run_data_pipeline",
{"user_ids": ["user_1", "user_2", "user_3", "user_4"]}
)

result = await task_run
print(f"Pipeline status: {result['status']}")
print(f"Total revenue: ${result['insights']['revenue']['total']}")
print(f"Segment distribution: {result['insights']['segment_distribution']}")
print(f"Pipeline status: {result.results['status']}")
print(f"Total revenue: ${result.results['insights']['revenue']['total']}")
print(f"Segment distribution: {result.results['segment_distribution']}")
```

## Pipeline Stages
Expand Down Expand Up @@ -328,7 +326,7 @@ Business logic classifies users into segments:

**Add Real APIs**:
```python
@task
@app.task
async def fetch_user_data_from_api(user_ids: list[str]) -> dict:
client = get_http_client()
response = await client.post(
Expand All @@ -340,7 +338,7 @@ async def fetch_user_data_from_api(user_ids: list[str]) -> dict:

**Add Database Integration**:
```python
@task
@app.task
async def load_to_warehouse(insights: dict) -> dict:
# Connect to data warehouse (Snowflake, BigQuery, etc.)
# Insert aggregated insights
Expand All @@ -350,7 +348,7 @@ async def load_to_warehouse(insights: dict) -> dict:

**Add Caching**:
```python
@task
@app.task
async def fetch_with_cache(source: str, key: str) -> dict:
# Check Redis/Memcached
# If miss, fetch from source and cache
Expand All @@ -360,7 +358,7 @@ async def fetch_with_cache(source: str, key: str) -> dict:

**Add Notifications**:
```python
@task
@app.task
async def send_pipeline_notification(result: dict) -> dict:
# Send to Slack, email, etc.
# Notify stakeholders of pipeline completion
Expand Down
32 changes: 17 additions & 15 deletions data-pipeline/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Any

from render_sdk.workflows import Options, Retry, start, task
from render_sdk import Retry, Workflows

# Configure logging
logging.basicConfig(
Expand All @@ -44,11 +43,19 @@ def get_http_client():
return _http_client


# Initialize Workflows app with defaults
app = Workflows(
default_retry=Retry(max_retries=3, wait_duration_ms=2000, backoff_scaling=1.5),
default_timeout=300,
auto_start=True,
)


# ============================================================================
# Data Source Tasks - Extract from multiple sources
# ============================================================================

@task(options=Options(retry=Retry(max_retries=3, wait_duration_ms=2000)))
@app.task
async def fetch_user_data(user_ids: list[str]) -> dict:
"""
Fetch user profile data from user service.
Expand Down Expand Up @@ -83,7 +90,7 @@ async def fetch_user_data(user_ids: list[str]) -> dict:
}


@task(options=Options(retry=Retry(max_retries=3, wait_duration_ms=2000)))
@app.task
async def fetch_transaction_data(user_ids: list[str], days: int = 30) -> dict:
"""
Fetch transaction history for users.
Expand Down Expand Up @@ -123,7 +130,7 @@ async def fetch_transaction_data(user_ids: list[str], days: int = 30) -> dict:
}


@task(options=Options(retry=Retry(max_retries=3, wait_duration_ms=2000)))
@app.task
async def fetch_engagement_data(user_ids: list[str]) -> dict:
"""
Fetch user engagement metrics.
Expand Down Expand Up @@ -167,7 +174,7 @@ async def fetch_engagement_data(user_ids: list[str]) -> dict:
# Enrichment Tasks - Add additional context
# ============================================================================

@task
@app.task
async def enrich_with_geo_data(user_email: str) -> dict:
"""
Enrich user data with geographic information.
Expand All @@ -192,7 +199,7 @@ async def enrich_with_geo_data(user_email: str) -> dict:
return geo_data


@task
@app.task
async def calculate_user_metrics(
user: dict,
transactions: list[dict],
Expand Down Expand Up @@ -260,7 +267,7 @@ async def calculate_user_metrics(
# Transformation Tasks - Process and combine data
# ============================================================================

@task
@app.task
async def transform_user_data(
user_data: dict,
transaction_data: dict,
Expand Down Expand Up @@ -319,7 +326,7 @@ async def transform_user_data(
# Aggregation Tasks - Generate insights
# ============================================================================

@task
@app.task
def aggregate_insights(enriched_data: dict) -> dict:
"""
Generate aggregate insights from enriched user data.
Expand Down Expand Up @@ -390,7 +397,7 @@ def aggregate_insights(enriched_data: dict) -> dict:
# Main Pipeline Orchestrator
# ============================================================================

@task
@app.task
async def run_data_pipeline(user_ids: list[str]) -> dict:
"""
Execute the complete data pipeline.
Expand Down Expand Up @@ -486,8 +493,3 @@ async def run_data_pipeline(user_ids: list[str]) -> dict:
"error": str(e),
"failed_at": datetime.now().isoformat()
}


if __name__ == "__main__":
logger.info("Starting Data Pipeline Workflow Service")
start()
2 changes: 1 addition & 1 deletion data-pipeline/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
render-sdk>=0.1.0
render-sdk>=0.2.0
httpx>=0.27.0
22 changes: 11 additions & 11 deletions etl-job/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,21 @@ Input:
Once deployed, trigger the ETL pipeline via the Render API or SDK:

```python
from render_sdk.client import Client
from render_sdk import Render

client = Client(api_key="your_render_api_key")
# Uses RENDER_API_KEY environment variable automatically
render = Render()

# Run the ETL pipeline
task_run = client.workflows.run_task(
workflow_service_slug="etl-job-workflows",
task_name="run_etl_pipeline",
input="sample_data.csv"
task_run = await render.workflows.run_task(
"etl-job-workflows/run_etl_pipeline",
{"source_file": "sample_data.csv"}
)

# Wait for completion
result = await task_run
print(f"Pipeline status: {result['status']}")
print(f"Valid records: {result['transform']['valid_count']}")
print(f"Pipeline status: {result.results['status']}")
print(f"Valid records: {result.results['transform']['valid_count']}")
```

## Sample Data
Expand Down Expand Up @@ -187,7 +187,7 @@ This demonstrates **sequential subtask orchestration** for multi-stage pipelines

**Add Database Loading**:
```python
@task
@app.task
async def load_to_database(records: list[dict]) -> dict:
# Connect to database
# Insert records
Expand All @@ -197,7 +197,7 @@ async def load_to_database(records: list[dict]) -> dict:

**Add API Data Source**:
```python
@task
@app.task
async def extract_from_api(api_url: str) -> list[dict]:
# Fetch from REST API
# Parse JSON response
Expand All @@ -209,7 +209,7 @@ async def extract_from_api(api_url: str) -> list[dict]:
```python
import asyncio

@task
@app.task
async def transform_batch_parallel(records: list[dict]) -> dict:
# Validate all records in parallel
tasks = [validate_record(record) for record in records]
Expand Down
29 changes: 15 additions & 14 deletions etl-job/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
import logging
from datetime import datetime
from pathlib import Path
from typing import Any

from render_sdk.workflows import Options, Retry, start, task
from render_sdk import Retry, Workflows

# Configure logging
logging.basicConfig(
Expand All @@ -27,12 +26,19 @@
)
logger = logging.getLogger(__name__)

# Initialize Workflows app with defaults
app = Workflows(
default_retry=Retry(max_retries=3, wait_duration_ms=1000, backoff_scaling=1.5),
default_timeout=300,
auto_start=True,
)


# ============================================================================
# EXTRACT Tasks
# ============================================================================

@task(options=Options(retry=Retry(max_retries=3, wait_duration_ms=1000)))
@app.task
def extract_csv_data(file_path: str) -> list[dict]:
"""
Extract data from a CSV file.
Expand All @@ -51,7 +57,7 @@ def extract_csv_data(file_path: str) -> list[dict]:
try:
path = Path(file_path)
if not path.exists():
logger.warning(f"[EXTRACT] File not found, using sample data")
logger.warning("[EXTRACT] File not found, using sample data")
# In production, this would read from cloud storage or database
return [
{"id": "1", "name": "Alice", "email": "alice@example.com", "age": "28", "country": "USA"},
Expand All @@ -76,7 +82,7 @@ def extract_csv_data(file_path: str) -> list[dict]:
# TRANSFORM Tasks
# ============================================================================

@task
@app.task
def validate_record(record: dict) -> dict:
"""
Validate and clean a single data record.
Expand Down Expand Up @@ -137,7 +143,7 @@ def validate_record(record: dict) -> dict:
return cleaned_record


@task
@app.task
async def transform_batch(records: list[dict]) -> dict:
"""
Transform a batch of records by validating each one.
Expand Down Expand Up @@ -187,7 +193,7 @@ async def transform_batch(records: list[dict]) -> dict:
# LOAD Tasks
# ============================================================================

@task
@app.task
def compute_statistics(valid_records: list[dict]) -> dict:
"""
Compute statistical insights from validated records.
Expand Down Expand Up @@ -237,7 +243,7 @@ def compute_statistics(valid_records: list[dict]) -> dict:
'timestamp': datetime.now().isoformat()
}

logger.info(f"[LOAD] Statistics computed successfully")
logger.info("[LOAD] Statistics computed successfully")
logger.info(f"[LOAD] Countries: {list(country_counts.keys())}")
if age_stats:
logger.info(f"[LOAD] Age range: {age_stats['min']}-{age_stats['max']}")
Expand All @@ -249,7 +255,7 @@ def compute_statistics(valid_records: list[dict]) -> dict:
# MAIN ETL Pipeline
# ============================================================================

@task
@app.task
async def run_etl_pipeline(source_file: str) -> dict:
"""
Complete ETL pipeline orchestrating extract, transform, and load operations.
Expand Down Expand Up @@ -328,8 +334,3 @@ async def run_etl_pipeline(source_file: str) -> dict:
'error': str(e),
'failed_at': datetime.now().isoformat()
}


if __name__ == "__main__":
logger.info("Starting ETL Job Workflow Service")
start()
2 changes: 1 addition & 1 deletion etl-job/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
render-sdk>=0.1.0
render-sdk>=0.2.0
Loading