Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 27 additions & 165 deletions tests/test_import_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@

import paramiko
import pytest
import yaml
from testcontainers.sftp import SFTPContainer, SFTPUser
from typer.testing import CliRunner

from datacontract.cli import app
from datacontract.data_contract import DataContract

sftp_dir = "/sftp/data"

Expand Down Expand Up @@ -44,18 +42,21 @@
odcs_file_path = f"fixtures/odcs_v3/{odcs_file_name}.odcs.yaml"
odcs_sftp_path = f"{sftp_dir}/{odcs_file_name}.odcs.yaml"

protobuf_file_name = "sample_data"
protobuf_file_path = f"fixtures/protobuf/data/{protobuf_file_name}.proto3.data"
protobuf_sftp_path = f"{sftp_dir}/{protobuf_file_name}.proto3.data"


username = "demo" # for emberstack
password = "demo" # for emberstack
user = SFTPUser(name = username,password=password)


@pytest.fixture
def sftp_container():
@pytest.fixture(params=[{'filepath': csv_file_path, 'sftp_path': csv_sftp_path,'filetype': 'csv'},
{'filepath': parquet_file_path, 'sftp_path': parquet_sftp_path, 'filetype': 'parquet'},
{'filepath': avro_file_path, 'sftp_path': avro_sftp_path, 'filetype': 'avro'},
{'filepath': dbml_file_path, 'sftp_path': dbml_sftp_path, 'filetype': 'dbml'},
{'filepath': dbt_file_path, 'sftp_path': dbt_sftp_path,'filetype': 'dbt'},
{'filepath': iceberg_file_path, 'sftp_path': iceberg_sftp_path, 'filetype': 'iceberg'},
{'filepath': json_file_path, 'sftp_path': json_sftp_path, 'filetype': 'jsonschema'},
{'filepath': odcs_file_path, 'sftp_path': odcs_sftp_path, 'filetype': 'odcs'},
])
def sftp_container(request):
"""
Initialize and provide an SFTP container for all tests in this module.
Sets up the container, uploads the test file, and provides connection details.
Expand All @@ -79,165 +80,26 @@ def sftp_container():
sftp = ssh.open_sftp()
try:
sftp.mkdir(sftp_dir)
sftp.put(avro_file_path, avro_sftp_path)
sftp.put(dbml_file_path, dbml_sftp_path)
sftp.put(dbt_file_path, dbt_sftp_path)
sftp.put(csv_file_path, csv_sftp_path)
sftp.put(iceberg_file_path, iceberg_sftp_path)
sftp.put(json_file_path, json_sftp_path)
sftp.put(odcs_file_path, odcs_sftp_path)
sftp.put(parquet_file_path, parquet_sftp_path)
sftp.put(protobuf_file_path, protobuf_sftp_path)
sftp.put(request.param['filepath'], request.param['sftp_path'])
source = f"sftp://{host_ip}:{host_port}{csv_sftp_path}"

runner = CliRunner()
result = runner.invoke(
app,
[
"import",
"--format",
request.param['filetype'],
"--source",
source,
],
)
assert result.exit_code == 0
finally:
sftp.close()
ssh.close()


yield {
"host_ip": host_ip,
"host_port": host_port,
"container": container
}


def test_cli(sftp_container):
host_ip = sftp_container["host_ip"]
host_port = sftp_container["host_port"]
source = f"sftp://{host_ip}:{host_port}{csv_sftp_path}"

runner = CliRunner()
result = runner.invoke(
app,
[
"import",
"--format",
"csv",
"--source",
source,
],
)
assert result.exit_code == 0

def test_import_sftp_csv(sftp_container):
host_ip = sftp_container["host_ip"]
host_port = sftp_container["host_port"]
source = f"sftp://{host_ip}:{host_port}{csv_sftp_path}"

result = DataContract.import_from_source("csv", source)
model = result.models[csv_file_name]
assert model is not None
assert len(model.fields["field_one"].examples) == 5
assert len(model.fields["field_two"].examples) > 0
assert len(model.fields["field_three"].examples) > 0

for k in model.fields.keys():
model.fields[k].examples = None

expected = f"""
dataContractSpecification: 1.2.1
id: my-data-contract-id
info:
title: My Data Contract
version: 0.0.1
servers:
production:
type: local
format: csv
path: sftp://{host_ip}:{host_port}{csv_sftp_path}
delimiter: ','
models:
{csv_file_name}:
description: Generated model of sftp://{host_ip}:{host_port}{csv_sftp_path}
type: table
fields:
field_one:
type: string
required: true
unique: true
field_two:
type: integer
required: true
unique: true
minimum: 14.0
maximum: 89.0
field_three:
type: timestamp
required: true
unique: true
"""
assert yaml.safe_load(result.to_yaml()) == yaml.safe_load(expected)
# Disable linters so we don't get "missing description" warnings
assert DataContract(data_contract_str=expected).lint().has_passed()




def test_import_sftp_parquet(sftp_container):
host_ip = sftp_container["host_ip"]
host_port = sftp_container["host_port"]
source = f"sftp://{host_ip}:{host_port}{parquet_sftp_path}"

result = DataContract.import_from_source("parquet", source)
model = result.models[parquet_file_name]
assert model is not None
assert model.fields["string_field"].type == "string"
assert model.fields["blob_field"].type == "bytes"
assert model.fields["boolean_field"].type == "boolean"
assert DataContract(data_contract=result).lint().has_passed()

def test_import_sftp_avro(sftp_container):
host_ip = sftp_container["host_ip"]
host_port = sftp_container["host_port"]
source = f"sftp://{host_ip}:{host_port}{avro_sftp_path}"

result = DataContract.import_from_source("avro", source)
model = result.models[avro_file_name]
assert model is not None
assert model.fields["ordertime"].type == "long"
assert model.fields["orderid"].type == "int"
assert model.fields["itemid"].type == "string"
assert len(model.fields.keys()) == 9
assert DataContract(data_contract=result).lint().has_passed()

def test_import_sftp_dbml(sftp_container):
host_ip = sftp_container["host_ip"]
host_port = sftp_container["host_port"]
source = f"sftp://{host_ip}:{host_port}{dbml_sftp_path}"

result = DataContract.import_from_source("dbml", source)
assert(len(result.models.keys())) == 2
assert DataContract(data_contract=result).lint().has_passed()

def test_import_sftp_dbt(sftp_container):
host_ip = sftp_container["host_ip"]
host_port = sftp_container["host_port"]
source = f"sftp://{host_ip}:{host_port}{dbt_sftp_path}"
result = DataContract.import_from_source("dbt", source)
assert set(result.models.keys()) == {'orders', 'stg_customers', 'stg_orders', 'stg_payments', 'customers'}
assert DataContract(data_contract=result).lint().has_passed()

def test_import_sftp_iceberg(sftp_container):
host_ip = sftp_container["host_ip"]
host_port = sftp_container["host_port"]
source = f"sftp://{host_ip}:{host_port}{iceberg_sftp_path}"

result = DataContract.import_from_source("iceberg", source)
assert len(result.models.keys()) > 0
assert DataContract(data_contract=result).lint().has_passed()

def test_import_sftp_jsonschema(sftp_container):
host_ip = sftp_container["host_ip"]
host_port = sftp_container["host_port"]
source = f"sftp://{host_ip}:{host_port}{json_sftp_path}"
result = DataContract.import_from_source("jsonschema", source)
assert len(result.models.keys()) > 0
assert DataContract(data_contract=result).lint().has_passed()

def test_import_sftp_odcs(sftp_container):
host_ip = sftp_container["host_ip"]
host_port = sftp_container["host_port"]
source = f"sftp://{host_ip}:{host_port}{odcs_sftp_path}"
result = DataContract.import_from_source("odcs", source)
model = result.models["tbl_1"]
assert model is not None
assert DataContract(data_contract=result).lint().has_passed()
# The actual test is performed in the fixture. Each parameter set will run this test.
pass
Loading