Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import click
from commands.cognito import cognito
from commands.dlq import dlq
from commands.dynamodb import dynamodb
from commands.handle import handle
from commands.users import users
from commands.customers import customers
Expand All @@ -20,6 +21,7 @@ def cli():

cli.add_command(cognito)
cli.add_command(dlq)
cli.add_command(dynamodb)
cli.add_command(handle)
cli.add_command(users)
cli.add_command(customers)
Expand Down
90 changes: 90 additions & 0 deletions commands/dynamodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import click
from commands.services.dynamodb_service import DynamoDBService


@click.group()
def dynamodb():
"""Utility methods for working with DynamoDB tables."""
pass


@dynamodb.command(help="Purge all items from a DynamoDB table.")
@click.argument("table")
@click.option(
"--profile",
envvar="AWS_PROFILE",
default="default",
help="The AWS profile to use. e.g. sikt-nva-sandbox, configure your profiles in ~/.aws/config",
)
def purge(table: str, profile: str) -> None:
"""
Purge all items from a DynamoDB table.

This command will:
1. Find the table matching the provided name or partial name
2. Show table information including approximate item count
3. Request confirmation before deleting
4. Delete all items from the table
"""
service = DynamoDBService(profile)

# Find the table
result, _ = service.find_table(table)

if result is None:
click.echo(f"No table found matching '{table}'")
return

# Handle multiple matches
if isinstance(result, list):
click.echo(f"Multiple tables found matching '{table}':")
for idx, table_name in enumerate(result, 1):
click.echo(f" {idx}. {table_name}")
click.echo("\nPlease be more specific with the table name.")
return

table_name = result

# Get table information
try:
table_info = service.get_table_info(table_name)
except Exception as e:
click.echo(f"Error retrieving table information: {str(e)}")
return

# Display table information
click.echo("\nTable Information:")
click.echo(f" Profile: {profile}")
click.echo(f" Table Name: {table_info['table_name']}")
click.echo(f" Status: {table_info['table_status']}")
click.echo(f" Approximate Item Count: {table_info['item_count']}")

# Get key schema
partition_key, sort_key = service.get_key_names(table_name)
key_schema_str = partition_key
if sort_key:
key_schema_str += f", {sort_key}"
click.echo(f" Key Schema: {key_schema_str}")

# Confirmation
click.echo(f"\nWARNING: This will delete ALL items from table '{table_name}'!")
confirmation = click.prompt(
"Type the full table name to confirm deletion",
type=str,
)

if confirmation != table_name:
click.echo("Deletion cancelled - table name did not match.")
return

# Perform deletion
click.echo(f"\nStarting deletion of all items from '{table_name}'...")

try:
total_deleted = service.purge_table(table_name)
click.echo(
f"\n✓ Successfully deleted {total_deleted} items from '{table_name}'"
)
except Exception as e:
click.echo(f"\n✗ Error during deletion: {str(e)}")
raise
139 changes: 139 additions & 0 deletions commands/services/dynamodb_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import boto3
import click


class DynamoDBService:
def __init__(self, profile):
self.profile = profile
self.session = boto3.Session(profile_name=self.profile)
self.dynamodb_client = self.session.client("dynamodb")
self.dynamodb_resource = self.session.resource("dynamodb")

def find_table(self, table_pattern):
"""
Find a table matching the given pattern.

Args:
table_pattern (str): A pattern to match DynamoDB table name.

Returns:
tuple: (table_name, table_resource) or (None, None) if not found
"""
response = self.dynamodb_client.list_tables()
table_names = response["TableNames"]

# Try exact match first
if table_pattern in table_names:
return table_pattern, self.dynamodb_resource.Table(table_pattern)

# Try partial match
matching_tables = [
name for name in table_names if table_pattern.lower() in name.lower()
]

if not matching_tables:
return None, None

if len(matching_tables) == 1:
table_name = matching_tables[0]
return table_name, self.dynamodb_resource.Table(table_name)

# Multiple matches - return them for user to choose
return matching_tables, None

def get_table_info(self, table_name):
"""
Get table information including item count.

Args:
table_name (str): The DynamoDB table name.

Returns:
dict: Table information including ItemCount
"""
table = self.dynamodb_resource.Table(table_name)
table.reload() # Refresh table metadata
return {
"table_name": table.name,
"item_count": table.item_count,
"table_status": table.table_status,
"key_schema": table.key_schema,
}

def get_key_names(self, table_name):
"""
Get the primary key and sort key names for a table.

Args:
table_name (str): The DynamoDB table name.

Returns:
tuple: (partition_key_name, sort_key_name) or (partition_key_name, None)
"""
table = self.dynamodb_resource.Table(table_name)
key_schema = table.key_schema

partition_key = next(
(k["AttributeName"] for k in key_schema if k["KeyType"] == "HASH"), None
)
sort_key = next(
(k["AttributeName"] for k in key_schema if k["KeyType"] == "RANGE"), None
)

return partition_key, sort_key

def purge_table(self, table_name):
"""
Delete all items from a DynamoDB table.

Args:
table_name (str): The DynamoDB table name.

Returns:
int: Total number of items deleted
"""
table = self.dynamodb_resource.Table(table_name)
partition_key, sort_key = self.get_key_names(table_name)

total_deleted = 0

# Initial scan
response = table.scan()
total_deleted += self._delete_items(
table, response["Items"], partition_key, sort_key
)

# Continue scanning if there are more items
while "LastEvaluatedKey" in response:
response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
total_deleted += self._delete_items(
table, response["Items"], partition_key, sort_key
)
click.echo(f"Deletion in progress... {total_deleted} items deleted")

return total_deleted

def _delete_items(self, table, items, partition_key, sort_key):
"""
Delete a batch of items from the table.

Args:
table: DynamoDB table resource
items: List of items to delete
partition_key: Name of the partition key
sort_key: Name of the sort key (can be None)

Returns:
int: Number of items deleted
"""
if not items:
return 0

with table.batch_writer() as batch:
for item in items:
key = {partition_key: item[partition_key]}
if sort_key:
key[sort_key] = item[sort_key]
batch.delete_item(Key=key)

return len(items)
Loading