Schema-Driven S3 Normalization & Event Publishing for Python
daplug-s3 is an adapter that wraps boto3 S3 primitives with schema-friendly helpers, SNS publishing, and convenience utilities for streaming, multipart uploads, and presigned URL generation.
Need deeper operational context? See .agents/.AGENTS.md for the full consumer guide and .agents/CODEX.md for contributor workflow expectations.
- Schema-aware writes β Convert Python dictionaries into JSON payloads automatically.
- Event-driven β Every write publishes presigned URLs via the
daplug_corepublisher so downstream services can react in real time. - Complete S3 toolkit β Simple helpers for CRUD, streaming uploads, multipart chunking, directory listings, and rename/delete patterns.
- Local-first β Works seamlessly with LocalStack using the
endpointparameter.
pip install daplug-s3
# or with Pipenv
pipenv install daplug-s3The library targets Python 3.10+.
from daplug_s3 import adapter
s3 = adapter(
endpoint="https://s3.us-east-1.amazonaws.com", # optional override
bucket="my-team-bucket",
aws_access_key_id="AKIA...",
aws_secret_access_key="secret",
region="us-east-1",
sns_arn="arn:aws:sns:us-east-1:123456789012:my-topic",
sns_endpoint="https://sns.us-east-1.amazonaws.com",
)| Kwarg | Type | Required | Description |
|---|---|---|---|
endpoint |
str,None |
No | Custom S3/LocalStack endpoint. Leave None for AWS default. |
bucket |
str |
Yes | Default bucket applied to every request. |
aws_access_key_id |
str |
Yes | IAM access key. |
aws_secret_access_key |
str |
Yes | IAM secret key. |
region |
str |
Yes | AWS region name. |
sns_arn |
str |
No | SNS topic ARN used by BaseAdapter.publish. |
sns_endpoint |
str |
No | SNS endpoint override (LocalStack). |
sns_attributes |
dict[str,int,float,bool] |
No | Default SNS message attributes. |
All public methods below accept keyword arguments only.
Store data in S3 with optional JSON encoding.
payload = {"type": "invoice", "id": 256}
s3.put(s3_path="docs/invoice-256.json", data=payload, json=True)| Kwarg | Type | Required | Default | Description |
|---|---|---|---|---|
s3_path |
str |
Yes | β | Object key. |
data |
bytes | str | dict |
Yes | β | Content to write. |
json |
bool |
No | False |
Encode via jsonpickle. |
encode |
bool |
No | True |
Convert strings to UTF-8 bytes. |
public_read |
bool |
No | False |
Applies public-read ACL. |
Always triggers BaseAdapter.publish with a presigned URL payload.
Upload streamed or buffered data.
from pathlib import Path
with Path("brochure.pdf").open("rb") as fh:
s3.upload_stream(s3_path="docs/brochure.pdf", io=fh, public_read=True)| Kwarg | Type | Required | Default | Description |
|---|---|---|---|---|
s3_path |
str |
Yes | β | Object key. |
io |
binary file object | One of | β | File-like object to stream from. |
data |
bytes |
One of | β | Raw bytes used when io missing. |
threshold |
int |
No | 10000 |
Multipart threshold for uploads. |
concurrency |
int |
No | 4 |
Parallel worker count. |
public_read |
bool |
No | False |
Public ACL toggle. |
Either io or data must be supplied.
Retrieve and optionally decode objects.
config = s3.get(s3_path="docs/invoice-256.json", json=True)| Kwarg | Type | Required | Default | Description |
|---|---|---|---|---|
s3_path |
str |
Yes | β | Object key. |
json |
bool |
No | False |
Decode JSON via jsonpickle. |
decode |
bool |
No | True |
When False, returns raw boto3 response. |
read simply calls get.
Save S3 content locally.
target = s3.download(s3_path="reports/weekly.csv", download_path="/tmp/weekly.csv")| Kwarg | Type | Required | Description |
|---|---|---|---|
s3_path |
str |
Yes | Remote key. |
download_path |
str |
Yes | Destination path; directories are created automatically. |
Returns the download_path string.
Manual chunk uploads.
chunks = [b"chunk-1", b"chunk-2", b"chunk-3"]
s3.multipart_upload(s3_path="large/data.bin", chunks=chunks)| Kwarg | Type | Required | Description |
|---|---|---|---|
s3_path |
str |
Yes | Target key. |
chunks |
list[bytes] |
Yes | Ordered byte chunks uploaded sequentially. |
Publishes a presigned URL when complete.
Generate unsigned URLs for public objects.
public_url = s3.create_public_url(s3_path="docs/brochure.pdf")| Kwarg | Type | Required | Description |
|---|---|---|---|
s3_path |
str |
Yes | Object key. |
Only works for objects uploaded with public_read=True.
Time-limited access.
signed_url = s3.create_presigned_read_url(s3_path="docs/invoice-256.json", expiration=900)| Kwarg | Type | Required | Default | Description |
|---|---|---|---|---|
s3_path |
str |
Yes | β | Object key. |
expiration |
int |
No | 3600 |
Seconds before URL expires. |
Generate POST policies for browser uploads.
post_config = s3.create_presigned_post_url(
s3_path="uploads/raw.txt",
required_fields={"acl": "private"},
required_conditions=[["content-length-range", 0, 1048576]],
expiration=600,
)| Kwarg | Type | Required | Description |
|---|---|---|---|
s3_path |
str |
Yes | Destination key. |
required_fields |
dict[str, str] |
No | Pre-populated form fields. |
required_conditions |
list[list[str]] |
No | Additional policy conditions. |
expiration |
int |
No (3600) |
Policy lifetime (seconds). |
Check for object existence.
if not s3.object_exist(s3_path="docs/invoice-999.json"):
raise LookupError("missing doc")| Kwarg | Type | Required | Description |
|---|---|---|---|
s3_path |
str |
Yes | Object key. |
Returns True when the object exists, False on 404, otherwise raises ClientError.
folders = s3.list_dir_subfolders(dir_name="reports/")| Kwarg | Type | Required | Description |
|---|---|---|---|
dir_name |
str |
Yes | Prefix ending with /. |
Returns prefixes for child folders.
recent = s3.list_dir_files(dir_name="reports/", date=datetime.utcnow())| Kwarg | Type | Required | Default | Description |
|---|---|---|---|---|
dir_name |
str |
Yes | β | Prefix to list. |
date |
datetime |
No | None |
Only return objects newer than this timestamp. |
Outputs a list of object keys.
Copy + delete convenience.
s3.rename_object(old_file_name="logs/old.txt", new_file_name="logs/new.txt")| Kwarg | Type | Required | Description |
|---|---|---|---|
old_file_name |
str |
Yes | Existing key. |
new_file_name |
str |
Yes | New key. |
Remove an object.
s3.delete(s3_path="archives/data.bin")| Kwarg | Type | Required | Description |
|---|---|---|---|
s3_path |
str |
Yes | Key to delete. |
Returns the boto3 delete_object response.
pipenv install --dev
# lint & type check
pipenv run lint
pipenv run type-check
# unit tests (moto-backed)
pipenv run test
# integration tests (LocalStack S3 via docker-compose)
pipenv run integrationsSet S3_ENDPOINT=http://localhost:4566 plus fake AWS credentials when using LocalStack manually.
- Consumer-facing playbook:
.agents/.AGENTS.md - Contributor guide & automation notes:
.agents/CODEX.md
Issues and pull requests are welcome! Please run the full quality gate before opening a PR:
pipenv run lint
pipenv run type-check
pipenv run test
pipenv run integrationsApache License 2.0 β see LICENSE for details.
Built to keep S3 integrations predictable, event-driven, and schema-friendly. EOF