diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b4630e1..b2b5296 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,7 +19,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install pytest pytest-cov requests + pip install pytest pytest-cov requests python-dotenv - name: Run tests run: pytest -q diff --git a/.github/workflows/migrate_pipelines.yml b/.github/workflows/migrate_pipelines.yml new file mode 100644 index 0000000..c5dca2a --- /dev/null +++ b/.github/workflows/migrate_pipelines.yml @@ -0,0 +1,119 @@ +name: pipelines_migration + +on: + workflow_dispatch: + inputs: + env_url: + description: "Select source and target env's" + type: choice + default: "Source: SANDBOX, Target: PRODUCTION" + options: + - "Source: SANDBOX, Target: PRODUCTION" + - "Source: SANDBOX, Target: SANDBOX" + - "Source: PRODUCTION, Target: PRODUCTION" + - "Source: PRODUCTION, Target: SANDBOX" + source_datastore_config: + description: "Source datastore JSON. Provide either an ftpServer or s3Bucket object." + required: true + default: | + { + "ftpServer": { + "transferProtocol": "FTPS", + "plainText": { + "hostname": "", + "port": "", + "username": "", + "password": "", + }, + "skyflowHosted": false + } + } + target_datastore_config: + description: "Destination datastore JSON. Provide either an ftpServer or s3Bucket object." + required: true + default: | + { + "s3Bucket": { + "name": "", + "region": "", + "assumedRoleARN": "" + } + } + source_vault_id: + description: "Source Vault ID." + required: false + pipeline_id: + description: "PipelineID to be migrated." + required: false + default: "" + target_vault_id: + description: "Target Vault ID" + required: true + source_account_access_token: + description: "Access token of the Source Account. (Not required, if config file is selected)" + required: false + target_account_access_token: + description: "Access token of the Target Account" + required: true + source_account_id: + description: "Source Account ID. If not provided, will use the repository variable" + required: false + target_account_id: + description: "Target Account ID. If not provided, will use the repository variable" + required: false + + +jobs: + execute-pipelines-migration-script: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.x" + + - name: Install dependencies + run: pip install requests + + - name: Parse and map environment URLs + id: map_envs + shell: bash + run: | + input="${{ github.event.inputs.env_url }}" + + source_name=$(echo "$input" | sed -n 's/Source: \([^,]*\),.*/\1/p' | xargs) + target_name=$(echo "$input" | sed -n 's/.*Target: \(.*\)/\1/p' | xargs) + + get_env_url() { + case "$1" in + SANDBOX) echo "https://manage.skyflowapis-preview.com" ;; + PRODUCTION) echo "https://manage.skyflowapis.com" ;; + *) echo "Invalid environment: $1" >&2; exit 1 ;; + esac + } + + # Resolve URLs + source_url=$(get_env_url "$source_name") + target_url=$(get_env_url "$target_name") + + echo "source_url=$source_url" >> $GITHUB_OUTPUT + echo "target_url=$target_url" >> $GITHUB_OUTPUT + + - name: Run Python script + env: + PIPELINE_ID: ${{ github.event.inputs.pipeline_id }} + SOURCE_DATASTORE_CONFIG: ${{ github.event.inputs.source_datastore_config }} + TARGET_DATASTORE_CONFIG: ${{ github.event.inputs.target_datastore_config }} + SOURCE_VAULT_ID: ${{ github.event.inputs.source_vault_id }} + TARGET_VAULT_ID: ${{ github.event.inputs.target_vault_id }} + SOURCE_ACCOUNT_AUTH: ${{ github.event.inputs.source_account_access_token }} + TARGET_ACCOUNT_AUTH: ${{ github.event.inputs.target_account_access_token }} + SOURCE_ACCOUNT_ID: ${{ github.event.inputs.source_account_id != '' && github.event.inputs.source_account_id || vars.SOURCE_ACCOUNT_ID }} + TARGET_ACCOUNT_ID: ${{ github.event.inputs.target_account_id != '' && github.event.inputs.target_account_id || vars.TARGET_ACCOUNT_ID }} + SOURCE_ENV_URL: ${{ steps.map_envs.outputs.source_url }} + TARGET_ENV_URL: ${{ steps.map_envs.outputs.target_url }} + run: python3 migrate_pipelines.py diff --git a/README.md b/README.md index c4a49df..7dc9584 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,49 @@ Note: Please note that if all values are provided `config_file` will take the pr - The script doesn't migrate service accounts related to connection, this has to be done from Studio. - Migration of connections associated with functions is not supported. +### Pipelines Migration + +Migrates a pipeline definition from the source vault to the target vault. + +##### Parameters: +- **`source_and_target_env`**: Source and Target Env's. +- **`pipeline_id`**: Pipeline ID to migrate. Get the pipeline ID from Studio. +- **`source_datastore_config`**: JSON object that replaces the source datastore configuration. Provide either an `ftpServer` or `s3Bucket` object with the required credentials. +- **`target_datastore_config`**: JSON object that replaces the destination datastore configuration. Provide either an `ftpServer` or `s3Bucket` object with the required credentials. +- **`source_account_access_token`**: Access token of the source account. +- **`target_account_access_token`**: Access token of the target account. + +##### Notes: +- Datastore overrides accept exactly one of `ftpServer` or `s3Bucket`. FTP datastore require `transferProtocol` plus either `plainText` or `encrypted` credentials. S3 datastore must include `name`, `region`, and `assumedRoleARN`. +- The script validates incompatible overrides (for example, replacing an S3 datastore with FTP). + +##### Sample datastore configurations: + +```jsonc +{ + "ftpServer": { + "transferProtocol": "SFTP", + "plainText": { + "hostname": "sftp.example.com", + "port": "22", + "username": "pipeline-user", + "password": "secret" + }, + "skyflowHosted": false + } +} +``` + +```jsonc +{ + "s3Bucket": { + "name": "pipeline-export-bucket", + "region": "us-west-2", + "assumedRoleARN": "arn:aws:iam::123456789012:role/pipeline-export-role" + } +} +``` + ## Steps to run the workflows ### Prerequisites diff --git a/migrate_pipelines.py b/migrate_pipelines.py new file mode 100644 index 0000000..092f5a5 --- /dev/null +++ b/migrate_pipelines.py @@ -0,0 +1,230 @@ +import copy +import json +import os +import requests +from typing import Any, Dict, List, Optional + + +PIPELINE_ID = os.getenv("PIPELINE_ID") +SOURCE_VAULT_ID = os.getenv("SOURCE_VAULT_ID") +TARGET_VAULT_ID = os.getenv("TARGET_VAULT_ID") +SOURCE_ACCOUNT_ID = os.getenv("SOURCE_ACCOUNT_ID") +TARGET_ACCOUNT_ID = os.getenv("TARGET_ACCOUNT_ID") +SOURCE_ACCOUNT_AUTH = os.getenv("SOURCE_ACCOUNT_AUTH") +TARGET_ACCOUNT_AUTH = os.getenv("TARGET_ACCOUNT_AUTH") +SOURCE_ENV_URL = os.getenv("SOURCE_ENV_URL") +TARGET_ENV_URL = os.getenv("TARGET_ENV_URL") +SOURCE_DATASTORE_CONFIG = os.getenv("SOURCE_DATASTORE_CONFIG") +TARGET_DATASTORE_CONFIG = os.getenv("TARGET_DATASTORE_CONFIG") + +FTP_ALLOWED_KEYS = {"transferProtocol", "plainText", "encrypted", "skyflowHosted"} +S3_ALLOWED_KEYS = {"name", "region", "assumedRoleARN"} +PIPELINE = "pipeline" +PIPELINES = "pipelines" + +SOURCE_ACCOUNT_HEADERS = { + "X-SKYFLOW-ACCOUNT-ID": SOURCE_ACCOUNT_ID, + "Authorization": f"Bearer {SOURCE_ACCOUNT_AUTH}", + "Content-Type": "application/json", +} + +TARGET_ACCOUNT_HEADERS = { + "X-SKYFLOW-ACCOUNT-ID": TARGET_ACCOUNT_ID, + "Authorization": f"Bearer {TARGET_ACCOUNT_AUTH}", + "Content-Type": "application/json", +} + +def list_pipelines(vault_id: str) -> List[Dict[str, Any]]: + """Return all pipelines in the supplied vault.""" + pipelines = [] + response = requests.get( + f"{SOURCE_ENV_URL}/v1/pipelines?vaultID={vault_id}", + headers=SOURCE_ACCOUNT_HEADERS, + ) + response.raise_for_status() + pipelines.extend(response.json()[PIPELINES]) + return pipelines + +def get_pipeline(pipeline_id: str) -> Dict[str, Any]: + """Fetch a single pipeline definition from the source environment.""" + response = requests.get( + f"{SOURCE_ENV_URL}/v1/pipelines/{pipeline_id}", + headers=SOURCE_ACCOUNT_HEADERS, + ) + response.raise_for_status() + return response.json()[PIPELINE] + +def create_pipeline(pipeline: Dict[str, Any]) -> requests.Response: + """Create a pipeline in the target environment.""" + response = requests.post( + f"{TARGET_ENV_URL}/v1/pipelines", + json=pipeline, + headers=TARGET_ACCOUNT_HEADERS, + ) + response.raise_for_status() + return response + + +def strip_empty_values(value: Any) -> Any: + """Recursively drop values that are empty strings or None.""" + if isinstance(value, dict): + cleaned = {} + for key, val in value.items(): + cleaned_val = strip_empty_values(val) + if cleaned_val is None: + continue + cleaned[key] = cleaned_val + return cleaned + if isinstance(value, list): + cleaned_list = [strip_empty_values(item) for item in value] + return [item for item in cleaned_list if item is not None] + if value == "" or value is None: + return None + return value + + +def validate_ftp_server(config: Dict[str, Any], label: str) -> Dict[str, Any]: + """Return an FTP server configuration with only supported fields.""" + if not isinstance(config, dict): + raise ValueError(f"-- {label} datastore ftpServer must be an object. --") + sanitised = {key: config[key] for key in config if key in FTP_ALLOWED_KEYS} + if "plainText" in sanitised: + if not isinstance(sanitised["plainText"], dict): + raise ValueError(f"-- {label} datastore ftpServer.plainText must be an object. --") + sanitised["plainText"] = strip_empty_values(sanitised["plainText"]) + if "encrypted" in sanitised: + if not isinstance(sanitised["encrypted"], dict): + raise ValueError(f"-- {label} datastore ftpServer.encrypted must be an object. --") + sanitised["encrypted"] = strip_empty_values(sanitised["encrypted"]) + sanitised = strip_empty_values(sanitised) + if not sanitised: + raise ValueError(f"-- {label} datastore ftpServer must include non-empty credentials. --") + if "transferProtocol" not in sanitised: + raise ValueError(f"-- {label} datastore ftpServer.transferProtocol is required. --") + has_plain = "plainText" in sanitised and sanitised["plainText"] + has_encrypted = "encrypted" in sanitised and sanitised["encrypted"] + if not (has_plain or has_encrypted): + raise ValueError( + f"-- {label} datastore ftpServer must include plainText or encrypted credentials. --" + ) + return sanitised + + +def validate_s3_bucket(config: Dict[str, Any], label: str) -> Dict[str, Any]: + """Return an S3 bucket configuration with only supported fields.""" + if not isinstance(config, dict): + raise ValueError(f"-- {label} datastore s3Bucket must be an object. --") + sanitised = {key: config[key] for key in config if key in S3_ALLOWED_KEYS} + sanitised = strip_empty_values(sanitised) + if not sanitised: + raise ValueError(f"-- {label} datastore s3Bucket must include non-empty configuration. --") + missing = sorted(S3_ALLOWED_KEYS - set(sanitised.keys())) + if missing: + raise ValueError( + f"-- {label} datastore s3Bucket is missing required fields: {', '.join(missing)}. --" + ) + return sanitised + + +def load_datastore_input(raw_config: Optional[str], label: str) -> Optional[Dict[str, Any]]: + """Return a sanitized datastore override dict or None if config is empty.""" + if raw_config is None or raw_config.strip() == "": + return None + try: + parsed = json.loads(raw_config) + except json.JSONDecodeError as exc: + raise ValueError(f"-- Invalid JSON for {label} datastore config: {exc} --") from exc + if not isinstance(parsed, dict): + raise ValueError(f"-- {label} datastore config must be a JSON object. --") + datastore_keys = [key for key in ("ftpServer", "s3Bucket") if key in parsed and parsed[key] is not None] + if len(datastore_keys) != 1: + raise ValueError( + f"-- {label} datastore config must contain exactly one of ftpServer or s3Bucket. --" + ) + datastore_key = datastore_keys[0] + if datastore_key == "ftpServer": + return {"ftpServer": validate_ftp_server(parsed["ftpServer"], label)} + return {"s3Bucket": validate_s3_bucket(parsed["s3Bucket"], label)} + + +def replace_datastore_input( + existing_section: Optional[Dict[str, Any]], override: Dict[str, Any] +) -> Dict[str, Any]: + """Replace the datastore section while preserving other configuration.""" + section = copy.deepcopy(existing_section or {}) + existing_datastore_keys = [ + key for key in ("ftpServer", "s3Bucket") if key in section and section[key] is not None + ] + datastore_key, datastore_value = next(iter(override.items())) + if datastore_key == "s3Bucket" and "ftpServer" in existing_datastore_keys: + raise ValueError("-- Cannot override FTP datastore with an S3 override. --") + if datastore_key == "ftpServer" and "s3Bucket" in existing_datastore_keys: + raise ValueError("-- Cannot override S3 datastore with an FTP override. --") + section.pop(datastore_key, None) + section[datastore_key] = copy.deepcopy(datastore_value) + return section + + +def transform_pipeline_payload( + source_resource: Dict[str, Any], + source_datastore_input: Optional[Dict[str, Any]] = None, + target_datastore_input: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """Prepare the payload for the target pipeline.""" + transformed_resource = copy.deepcopy(source_resource) + if "ID" in transformed_resource: + del transformed_resource["ID"] # remove pipeline ID + transformed_resource["vaultID"] = TARGET_VAULT_ID + if source_datastore_input: + transformed_resource["source"] = replace_datastore_input( + transformed_resource.get("source"), source_datastore_input + ) + if target_datastore_input: + transformed_resource["destination"] = replace_datastore_input( + transformed_resource.get("destination"), target_datastore_input + ) + return transformed_resource + + +def main(pipeline_id: str) -> None: + """pipeline migration""" + try: + print("-- Initiating Pipelines migration --") + source_datastore_input = load_datastore_input(SOURCE_DATASTORE_CONFIG, "source") + target_datastore_input = load_datastore_input(TARGET_DATASTORE_CONFIG, "destination") + pipeline = get_pipeline(pipeline_id) + pipeline_name = pipeline.get("name", "Pipeline") + print(f"-- Working on pipeline: {pipeline_name} --") + + pipeline_payload = transform_pipeline_payload( + pipeline, source_datastore_input, target_datastore_input + ) + create_pipeline_response = create_pipeline(pipeline_payload) + + if create_pipeline_response.status_code == 200: + created_pipeline = create_pipeline_response.json() + print( + f"-- Pipeline migrated successfully: {pipeline_name}. " + f"Source PIPELINE_ID: {pipeline.get('ID')}, " + f"Target PIPELINE_ID: {created_pipeline.get('ID')} --" + ) + else: + print( + f"-- Pipeline migration failed: {create_pipeline_response.status_code}. " + f"{create_pipeline_response.content}" + ) + print("-- Pipelines migration script executed successfully. --") + except requests.exceptions.HTTPError as http_err: + print( + f"-- migrate_pipelines HTTP error: {http_err.response.content.decode()} --" + ) + raise http_err + except Exception as err: + print(f"-- migrate_pipelines other error: {err} --") + raise err + + +if __name__ == "__main__": + if not PIPELINE_ID: + raise ValueError("-- PIPELINE_ID is required to migrate a pipeline. --") + main(PIPELINE_ID) diff --git a/tests/test_migrate_pipelines.py b/tests/test_migrate_pipelines.py new file mode 100644 index 0000000..6628aca --- /dev/null +++ b/tests/test_migrate_pipelines.py @@ -0,0 +1,456 @@ +import importlib +import sys +from types import SimpleNamespace + +import dotenv +import pytest + + +BASE_ENV = { + "PIPELINE_ID": "pipeline-from-env", + "SOURCE_VAULT_ID": "source-vault", + "TARGET_VAULT_ID": "target-vault", + "SOURCE_ACCOUNT_ID": "account-source", + "TARGET_ACCOUNT_ID": "account-target", + "SOURCE_ACCOUNT_AUTH": "token-source", + "TARGET_ACCOUNT_AUTH": "token-target", + "SOURCE_ENV_URL": "https://source.example.com", + "TARGET_ENV_URL": "https://target.example.com", + "SOURCE_DATASTORE_CONFIG": "", + "TARGET_DATASTORE_CONFIG": "", +} + + +def load_module(monkeypatch, **env_overrides): + complete_env = {**BASE_ENV, **env_overrides} + for key, value in complete_env.items(): + monkeypatch.setenv(key, value) + if "migrate_pipelines" in sys.modules: + del sys.modules["migrate_pipelines"] + return importlib.import_module("migrate_pipelines") + + +def test_strip_empty_values(monkeypatch): + module = load_module(monkeypatch) + source = {"a": "", "b": None, "c": {"d": "value", "e": ""}, "f": [1, "", None]} + assert module.strip_empty_values(source) == {"c": {"d": "value"}, "f": [1]} + + +def test_validate_ftp_server_success(monkeypatch): + module = load_module(monkeypatch) + config = { + "transferProtocol": "SFTP", + "plainText": { + "hostname": "host", + "port": "", + "username": "user", + "password": "", + "sshKeyID": "key", + }, + "encrypted": {"encryptedCredentials": "cipher"}, + "skyflowHosted": False, + "extra": "ignored", + } + result = module.validate_ftp_server(config, "source") + assert result == { + "transferProtocol": "SFTP", + "plainText": { + "hostname": "host", + "username": "user", + "sshKeyID": "key", + }, + "encrypted": {"encryptedCredentials": "cipher"}, + "skyflowHosted": False, + } + + +@pytest.mark.parametrize( + "config, expected_message", + [ + ("not-a-dict", "ftpServer must be an object"), + ({"plainText": "oops"}, "ftpServer.plainText must be an object"), + ({"extra": "value"}, "must include non-empty credentials"), + ( + {"transferProtocol": "SFTP", "encrypted": "oops"}, + "ftpServer.encrypted must be an object", + ), + ({"plainText": {}}, "transferProtocol is required"), + ({"transferProtocol": "SFTP"}, "plainText or encrypted credentials"), + ], +) +def test_validate_ftp_server_errors(monkeypatch, config, expected_message): + module = load_module(monkeypatch) + with pytest.raises(ValueError, match=expected_message): + module.validate_ftp_server(config, "source") + + +def test_validate_s3_bucket_success(monkeypatch): + module = load_module(monkeypatch) + config = { + "name": "bucket", + "region": "us-west-2", + "assumedRoleARN": "arn:aws:iam::role/example", + "ignored": "value", + } + assert module.validate_s3_bucket(config, "target") == { + "name": "bucket", + "region": "us-west-2", + "assumedRoleARN": "arn:aws:iam::role/example", + } + + +@pytest.mark.parametrize( + "config, expected_message", + [ + ("not-a-dict", "s3Bucket must be an object"), + ({}, "must include non-empty configuration"), + ({"name": "bucket", "region": "us-west-2"}, "missing required fields"), + ], +) +def test_validate_s3_bucket_errors(monkeypatch, config, expected_message): + module = load_module(monkeypatch) + with pytest.raises(ValueError, match=expected_message): + module.validate_s3_bucket(config, "target") + + +def test_load_datastore_input_variants(monkeypatch): + module = load_module(monkeypatch) + assert module.load_datastore_input("", "source") is None + assert module.load_datastore_input(None, "source") is None + + with pytest.raises(ValueError, match="Invalid JSON"): + module.load_datastore_input("{invalid}", "source") + with pytest.raises(ValueError, match="must be a JSON object"): + module.load_datastore_input("[]", "source") + with pytest.raises(ValueError, match="exactly one"): + module.load_datastore_input('{"ftpServer": {}, "s3Bucket": {}}', "source") + + ftp_override = module.load_datastore_input( + '{"ftpServer": {"transferProtocol": "FTPS", "plainText": {"hostname": "h", "username": "u"}}}', + "source", + ) + assert ftp_override == { + "ftpServer": { + "transferProtocol": "FTPS", + "plainText": {"hostname": "h", "username": "u"}, + } + } + + s3_override = module.load_datastore_input( + '{"s3Bucket": {"name": "bucket", "region": "us", "assumedRoleARN": "arn"}}', + "target", + ) + assert s3_override == { + "s3Bucket": {"name": "bucket", "region": "us", "assumedRoleARN": "arn"} + } + + +def test_replace_datastore_input_replaces_only_datastore(monkeypatch): + module = load_module(monkeypatch) + existing = { + "dataFormat": "CSV", + "ftpServer": {"transferProtocol": "FTPS"}, + "other": "value", + } + override = {"ftpServer": {"transferProtocol": "SFTP"}} + result = module.replace_datastore_input(existing, override) + assert result == { + "dataFormat": "CSV", + "other": "value", + "ftpServer": {"transferProtocol": "SFTP"}, + } + assert existing["ftpServer"]["transferProtocol"] == "FTPS" + + +def test_replace_datastore_input_fails_on_ftp_to_s3(monkeypatch): + module = load_module(monkeypatch) + existing = { + "dataFormat": "CSV", + "ftpServer": {"transferProtocol": "FTPS"}, + } + override = {"s3Bucket": {"name": "bucket"}} + with pytest.raises(ValueError, match="Cannot override FTP datastore"): + module.replace_datastore_input(existing, override) + + +def test_replace_datastore_input_fails_on_s3_to_ftp(monkeypatch): + module = load_module(monkeypatch) + existing = { + "dataFormat": "CSV", + "s3Bucket": {"name": "bucket", "region": "us", "assumedRoleARN": "arn"}, + } + override = { + "ftpServer": { + "transferProtocol": "SFTP", + "plainText": {"hostname": "h", "username": "u"}, + } + } + with pytest.raises(ValueError, match="Cannot override S3 datastore"): + module.replace_datastore_input(existing, override) + + +def test_transform_pipeline_payload(monkeypatch): + module = load_module(monkeypatch) + pipeline = { + "vaultID": "original-vault", + "source": {"ftpServer": {"transferProtocol": "FTPS"}}, + "destination": {"s3Bucket": {"name": "existing"}}, + } + source_override = {"ftpServer": {"transferProtocol": "SFTP"}} + target_override = {"s3Bucket": {"name": "new", "region": "us-east-1"}} + + result = module.transform_pipeline_payload( + pipeline, + source_datastore_input=source_override, + target_datastore_input=target_override, + ) + + assert result["vaultID"] == BASE_ENV["TARGET_VAULT_ID"] + assert result["source"]["ftpServer"] == {"transferProtocol": "SFTP"} + assert result["destination"]["s3Bucket"] == { + "name": "new", + "region": "us-east-1", + } + # Ensure original object is untouched + assert pipeline["source"]["ftpServer"]["transferProtocol"] == "FTPS" + + +def test_list_pipelines(monkeypatch): + module = load_module(monkeypatch) + calls = {} + + def fake_get(url, headers): + calls["url"] = url + calls["headers"] = headers + return SimpleNamespace( + raise_for_status=lambda: None, + json=lambda: {"pipelines": [{"ID": "pipeline-1"}]}, + ) + + monkeypatch.setattr(module.requests, "get", fake_get) + result = module.list_pipelines("vault-123") + assert result == [{"ID": "pipeline-1"}] + assert calls["url"].endswith("vaultID=vault-123") + assert calls["headers"]["Authorization"].startswith("Bearer") + + +def test_get_pipeline(monkeypatch): + module = load_module(monkeypatch) + captured = {} + + def fake_get(url, headers): + captured["url"] = url + captured["headers"] = headers + return SimpleNamespace( + raise_for_status=lambda: None, + json=lambda: {"pipeline": {"ID": "pipeline-1", "name": "Example"}}, + ) + + monkeypatch.setattr(module.requests, "get", fake_get) + pipeline = module.get_pipeline("pipeline-1") + assert pipeline["name"] == "Example" + assert captured["url"].endswith("/pipeline-1") + + +def test_create_pipeline(monkeypatch): + module = load_module(monkeypatch) + captured = {} + + def fake_post(url, json, headers): + captured["url"] = url + captured["json"] = json + captured["headers"] = headers + + class DummyResponse: + status_code = 200 + + def raise_for_status(self): + return None + + return DummyResponse() + + monkeypatch.setattr(module.requests, "post", fake_post) + payload = {"name": "new-pipeline"} + response = module.create_pipeline(payload) + assert response.status_code == 200 + assert captured["json"] == payload + assert captured["url"].endswith("/v1/pipelines") + + +def test_main_success(monkeypatch, capsys): + source_config = """ + { + "ftpServer": { + "transferProtocol": "FTPS", + "plainText": {"hostname": "host", "username": "user"} + } + } + """ + target_config = """ + { + "s3Bucket": { + "name": "bucket", + "region": "us-west-2", + "assumedRoleARN" : "test" + } + } + """ + module = load_module( + monkeypatch, + SOURCE_DATASTORE_CONFIG=source_config, + TARGET_DATASTORE_CONFIG=target_config, + ) + + pipeline = { + "ID": "pipeline-1", + "name": "Sample Pipeline", + "vaultID": "old-vault", + "source": {"ftpServer": {"transferProtocol": "SFTP"}}, + "destination": {"s3Bucket": {"name": "old"}}, + } + monkeypatch.setattr(module, "get_pipeline", lambda pipeline_id: pipeline) + + captured_payload = {} + + class SuccessResponse: + status_code = 200 + + def json(self): + return {"ID": "new-id"} + + def fake_create_pipeline(payload): + captured_payload.update(payload) + return SuccessResponse() + + monkeypatch.setattr(module, "create_pipeline", fake_create_pipeline) + module.main("pipeline-1") + stdout = capsys.readouterr().out + assert "Pipeline migrated successfully" in stdout + assert captured_payload["vaultID"] == BASE_ENV["TARGET_VAULT_ID"] + assert "ftpServer" in captured_payload["source"] + assert "s3Bucket" in captured_payload["destination"] + + +def test_main_failure(monkeypatch, capsys): + module = load_module(monkeypatch) + pipeline = {"ID": "pipeline-2", "name": "Failing Pipeline"} + monkeypatch.setattr(module, "get_pipeline", lambda pipeline_id: pipeline) + + class FailureResponse: + status_code = 500 + content = b"problem" + + monkeypatch.setattr(module, "create_pipeline", lambda payload: FailureResponse()) + module.main("pipeline-2") + stdout = capsys.readouterr().out + assert "Pipeline migration failed" in stdout + + +def test_main_http_error_branch(monkeypatch, capsys): + import requests + + module = load_module(monkeypatch) + + class Resp: + content = b"http boom" + + http_err = requests.exceptions.HTTPError(response=Resp()) + + def raise_http_error(_): + raise http_err + + monkeypatch.setattr(module, "get_pipeline", raise_http_error) + + with pytest.raises(requests.exceptions.HTTPError): + module.main("pipeline-http-error") + + stdout = capsys.readouterr().out + assert "HTTP error" in stdout + + +def test_main_other_exception_branch(monkeypatch, capsys): + module = load_module(monkeypatch) + pipeline = {"ID": "pipeline-3", "name": "Other Error"} + monkeypatch.setattr(module, "get_pipeline", lambda pipeline_id: pipeline) + + def boom(_payload): + raise RuntimeError("explode") + + monkeypatch.setattr(module, "create_pipeline", boom) + + with pytest.raises(RuntimeError): + module.main("pipeline-other-error") + + stdout = capsys.readouterr().out + assert "other error" in stdout + + +def test_run_as_script_requires_pipeline_id(monkeypatch): + import runpy + import sys + import dotenv + + if "migrate_pipelines" in sys.modules: + del sys.modules["migrate_pipelines"] + + for key, value in BASE_ENV.items(): + if key != "PIPELINE_ID": + monkeypatch.setenv(key, value) + monkeypatch.setenv("PIPELINE_ID", "") + monkeypatch.setattr(dotenv, "load_dotenv", lambda *args, **kwargs: None) + + with pytest.raises(ValueError, match="PIPELINE_ID is required"): + runpy.run_module("migrate_pipelines", run_name="__main__") + + +def test_run_as_script_executes_main(monkeypatch): + import runpy + import requests + + for key, value in BASE_ENV.items(): + monkeypatch.setenv(key, value) + + monkeypatch.setattr(dotenv, "load_dotenv", lambda *args, **kwargs: None) + + captured = {} + + class FakeGetResponse: + def raise_for_status(self): + return None + + def json(self): + return { + "pipeline": { + "ID": BASE_ENV["PIPELINE_ID"], + "name": "Script Pipeline", + "vaultID": "source-vault", + "source": {}, + "destination": {}, + } + } + + class FakePostResponse: + status_code = 200 + + def raise_for_status(self): + return None + + def json(self): + return {"ID": "new-pipeline"} + + def fake_get(url, headers): + captured["get_url"] = url + return FakeGetResponse() + + def fake_post(url, json, headers): + captured["post_url"] = url + captured["payload"] = json + return FakePostResponse() + + monkeypatch.setattr(requests, "get", fake_get) + monkeypatch.setattr(requests, "post", fake_post) + + runpy.run_module("migrate_pipelines", run_name="__main__") + + assert captured["post_url"].endswith("/v1/pipelines") + assert captured["payload"]["vaultID"] == BASE_ENV["TARGET_VAULT_ID"]