Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Unreleased

- Updated documentation.

- Added support for DynamoDB data jobs.

1.15.0 - 2025/10/02
===================

Expand Down
97 changes: 78 additions & 19 deletions croud/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
get_scheduled_job_log,
get_scheduled_jobs,
import_jobs_create_from_azure_blob_storage,
import_jobs_create_from_dynamodb,
import_jobs_create_from_file,
import_jobs_create_from_s3,
import_jobs_create_from_url,
Expand Down Expand Up @@ -139,20 +140,6 @@
required=True,
help="The cluster the data will be imported into.",
),
Argument(
"--file-format",
type=str,
required=True,
choices=["csv", "json", "parquet"],
help="The format of the structured data in the file.",
),
Argument(
"--compression",
type=str,
required=False,
choices=["gzip"],
help="The compression method the file uses.",
),
Argument(
"--table",
type=str,
Expand All @@ -168,6 +155,23 @@
" if it does not exist. If true new columns will also be added when the data"
" requires them.",
),
]

import_job_create_common_file_args = [
Argument(
"--file-format",
type=str,
required=True,
choices=["csv", "json", "parquet"],
help="The format of the structured data in the file.",
),
Argument(
"--compression",
type=str,
required=False,
choices=["gzip", "none"],
help="The compression method the file uses.",
),
Argument(
"--transformations",
type=str,
Expand Down Expand Up @@ -828,7 +832,8 @@
help="The URL the import file will be read "
"from."
),
] + import_job_create_common_args,
] + import_job_create_common_args
+ import_job_create_common_file_args,
"resolver": import_jobs_create_from_url,
},
"from-file": {
Expand All @@ -852,7 +857,8 @@
"Please note the file will become visible "
"under ``croud organizations files list``."
),
] + import_job_create_common_args,
] + import_job_create_common_args
+ import_job_create_common_file_args,
"resolver": import_jobs_create_from_file,
},
"from-s3": {
Expand Down Expand Up @@ -882,9 +888,61 @@
"--endpoint", type=str, required=False,
help="An Amazon S3 compatible endpoint."
),
] + import_job_create_common_args,
] + import_job_create_common_args
+ import_job_create_common_file_args,
"resolver": import_jobs_create_from_s3,
},
"from-dynamodb": {
"help": "Create a data import job on the specified "
"cluster from an Amazon DynamoDB compatible "
"location.",
"extra_args": [
# Type S3 params
Argument(
"--ingestion-type", type=str,
choices=[
"IMPORT_ONLY",
"IMPORT_AND_CDC",
"CDC_ONLY"
],
required=True,
help="Determines how to ingest the data. "
"IMPORT_ONLY will just ingest the data "
"and finish. CDC_ONLY will continuously "
"read CDC (Change Data Capture) events. "
"IMPORT_AND_CDC will first import the "
"data and then start listening for CDC "
"events."
),
Argument(
"--aws-region", type=str, required=True,
help="The name of the AWS region where the "
"DynamoDB table is located."
),
Argument(
"--dynamodb-table", type=str, required=True,
help="The name of the DynamoDB table."
),
Argument(
"--kinesis-stream-name", type=str,
required=False,
help="The name of the Kinesis Stream that will "
"be used to read CDC events from. Only for"
" CDC mode."
),
Argument(
"--secret-id", type=str, required=True,
help="The secret that contains the access key "
"and secret key needed to access the "
"table to be imported."
),
Argument(
"--endpoint", type=str, required=False,
help="An AWS DynamoDB compatible endpoint."
),
] + import_job_create_common_args,
"resolver": import_jobs_create_from_dynamodb,
},
"from-azure-blob-storage": {
"help": "Create a data import job on the specified "
"cluster from an Azure blob storage location.",
Expand All @@ -908,7 +966,8 @@
"and secret key needed to access the file "
"to be imported."
),
] + import_job_create_common_args,
] + import_job_create_common_args
+ import_job_create_common_file_args,
"resolver": import_jobs_create_from_azure_blob_storage,
},
},
Expand Down Expand Up @@ -972,7 +1031,7 @@
"--compression",
type=str,
required=False,
choices=["gzip"],
choices=["gzip", "none"],
help="The compression method of the exported file.",
),
Argument(
Expand Down
39 changes: 36 additions & 3 deletions croud/clusters/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from croud.clusters.exceptions import AsyncOperationNotFound
from croud.config import CONFIG, get_output_format
from croud.organizations.commands import op_upload_file_to_org
from croud.parser import CroudCliArgumentParser
from croud.printer import print_error, print_info, print_response, print_success
from croud.tools.spinner import HALO
from croud.util import grand_central_jwt_token, require_confirmation
Expand Down Expand Up @@ -200,6 +201,41 @@ def import_jobs_create_from_s3(args: Namespace) -> None:
import_jobs_create(args, extra_payload=extra_body)


def import_jobs_create_from_dynamodb(args: Namespace) -> None:
args.type = "dynamodb"
args.file_format = "dynamodb"
args.compression = "none"

extra_body = {
"dynamodb": {
"region": args.aws_region,
"table_name": args.dynamodb_table,
"secret_id": args.secret_id,
},
}
if args.endpoint:
extra_body["dynamodb"]["endpoint"] = args.endpoint
if args.ingestion_type:
extra_body["ingestion_type"] = args.ingestion_type
if "CDC" in extra_body.get("ingestion_type", ""):
if args.kinesis_stream_name:
extra_body["dynamodb"]["kinesis_stream_name"] = args.kinesis_stream_name
else:
parser = CroudCliArgumentParser()
parser.error(
"\nError: --kinesis-stream-name must be set when using CDC ingestion "
"types."
)
elif args.kinesis_stream_name:
parser = CroudCliArgumentParser()
parser.error(
"\nError: --kinesis-stream-name must not be set when using IMPORT "
"ingestion type."
)

import_jobs_create(args, extra_payload=extra_body)


def import_jobs_create_from_azure_blob_storage(args: Namespace) -> None:
extra_body = {
"azureblob": {
Expand Down Expand Up @@ -270,9 +306,6 @@ def import_jobs_create(args: Namespace, extra_payload: Dict[str, Any]) -> None:
if args.create_table is not None:
body["destination"]["create_table"] = args.create_table

if args.transformations:
body["schema"] = {"select": args.transformations}

if extra_payload:
body.update(extra_payload)

Expand Down
30 changes: 30 additions & 0 deletions docs/commands/data-import-export.rst
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,36 @@ Example
==> Info: Done importing 14.73K records
==> Success: Operation completed.

``clusters import-jobs create from-dynamodb``
Comment thread
plaharanne marked this conversation as resolved.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


.. note::

For IMPORT_ONLY, this command will wait for the operation to finish or fail.
When --ingestion-type is set to CDC_ONLY or IMPORT_AND_CDC, the command will not finish
and even when the last CDC event is processed, it will remain waiting for new CDC events to come.
Comment thread
plaharanne marked this conversation as resolved.

.. argparse::
:module: croud.__main__
:func: get_parser
:prog: croud
:path: clusters import-jobs create from-dynamodb

.. code-block:: console
Comment thread
plaharanne marked this conversation as resolved.

sh$ croud clusters import-jobs create from-dynamodb --cluster-id e1e38d92-a650-48f1-8a70-8133f2d5c400 \
--secret-id 71e7c5da-51fa-44f2-b178-d95052cbe620 --aws-region eu-west-1 \
--dynamodb-table my_dynamodb_table_name --kinesis-stream-name my_kinesis_stream_name \
--table my_table_name --ingestion-type IMPORT_AND_CDC
+--------------------------------------+--------------------------------------+------------+
| id | cluster_id | status |
|--------------------------------------+--------------------------------------+------------|
| f29fdc02-edd0-4ad9-8839-9616fccf752b | e1e38d92-a650-48f1-8a70-8133f2d5c400 | REGISTERED |
+--------------------------------------+--------------------------------------+------------+
==> Info: Status: REGISTERED (Your import job was received and is pending processing.)


``clusters import-jobs list``
-----------------------------

Expand Down
68 changes: 68 additions & 0 deletions tests/commands/test_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,74 @@ def test_import_job_create_from_s3(mock_request):
)


@mock.patch.object(
Client,
"request",
return_value=(
{
"id": "1",
"status": "SUCCEEDED",
"progress": {"records": 27, "bytes": 4096, "message": "success"},
},
None,
),
)
def test_import_job_create_from_dynamodb(mock_request):
cluster_id = gen_uuid()
region = "us-east-1"
table_name = "my_dynamodb_table"
kinesis_stream_name = "my_kinesis_stream_name"
secret_id = gen_uuid()
endpoint = "https://my-dynamodb-compatible-endpoint"
ingestion_type = "IMPORT_AND_CDC"
call_command(
"croud",
"clusters",
"import-jobs",
"create",
"from-dynamodb",
"--cluster-id",
cluster_id,
"--ingestion-type",
ingestion_type,
"--aws-region",
region,
"--dynamodb-table",
table_name,
"--kinesis-stream-name",
kinesis_stream_name,
"--secret-id",
secret_id,
"--endpoint",
endpoint,
"--table",
"my-table",
"--create-table",
"false",
)
body = {
"type": "dynamodb",
"dynamodb": {
"region": region,
"table_name": table_name,
"kinesis_stream_name": kinesis_stream_name,
"secret_id": secret_id,
"endpoint": endpoint,
},
"format": "dynamodb",
"destination": {"table": "my-table", "create_table": False},
"compression": "none",
"ingestion_type": ingestion_type,
}
assert_rest(
mock_request,
RequestMethod.POST,
f"/api/v2/clusters/{cluster_id}/import-jobs/",
body=body,
any_times=True,
)


@mock.patch.object(
Client, "request", return_value=({"id": "1", "status": "SUCCEEDED"}, None)
)
Expand Down
Loading