Schema-Driven SQL Normalization & Event Publishing for Python
daplug-sql wraps psycopg2 / mysql-connector with optimistic CRUD helpers and SNS event fan-out so your Postgres and MySQL services stay DRY and event-driven.
π Agents β a dedicated playbook lives in
.agents/AGENTS.md.
- Single adapter factory β
daplug_sql.adapter(**kwargs)returns a ready-to-go adapter configured for Postgres or MySQL based on theengineparameter. - Optimistic CRUD β Identifier-aware
insert,update,upsert, anddeleteguard against duplicates and emit SNS events automatically. - Connection reuse β Thread-safe cache reuses connections per endpoint/database/user/port/engine and lazily closes them.
- Integration-tested β
pipenv run integrationspins up both Postgres and MySQL via docker-compose and runs the real test suite.
pip install daplug-sql
# pipenv install daplug-sql
# poetry add daplug-sql
# uv pip install daplug-sqlfrom daplug_sql import adapter
sql = adapter(
endpoint="127.0.0.1",
database="daplug",
user="svc",
password="secret",
engine="postgres", # "mysql" also supported
)
sql.connect()
sql.insert(
data={"customer_id": "abc123", "name": "Ada"},
table="customers",
identifier="customer_id",
)
record = sql.get("abc123", table="customers", identifier="customer_id")
print(record)
sql.close()| Parameter | Type | Required | Description |
|---|---|---|---|
endpoint |
str |
β | Host/IP of the Postgres/MySQL server. |
database |
str |
β | Database/schema name. |
user |
str |
β | Database username. |
password |
str |
β | Database password. |
engine |
str |
β | 'postgres' (default) or 'mysql'. |
autocommit |
bool |
β | Defaults to True; set False for manual transaction control. |
sns_arn |
str |
β | SNS topic ARN used when publishing CRUD events. |
sns_endpoint |
str |
β | Optional SNS endpoint URL (e.g., LocalStack). |
sns_attributes |
dict |
β | Default SNS message attributes merged into every publish. |
Every CRUD/query helper expects the target table and identifier column at call time so one adapter can manage multiple tables:
| Argument | Description |
|---|---|
table |
Table to operate on (customers, orders, etc.). |
identifier |
Column that uniquely identifies rows (customer_id). |
commit |
Override autocommit per call (True/False). |
debug |
Log SQL statements via the adapter logger when True. |
sns_attributes |
Per-call attributes merged with defaults before publish. |
fifo_group_id / fifo_duplication_id |
Optional FIFO metadata passed straight to SNS. |
SQLAdapter inherits daplug-core's SNS publisher. Provide the topic details when constructing the adapter:
sql = adapter(
endpoint="127.0.0.1",
database="daplug",
user="svc",
password="secret",
engine="postgres",
sns_arn="arn:aws:sns:us-east-1:123456789012:sql-events",
sns_endpoint="http://localhost:4566", # optional (LocalStack)
sns_attributes={"service": "billing"},
)sns_attributespassed toadapter(...)become defaults for every publish.- Each CRUD helper accepts its own
sns_attributesto overlay call-specific metadata. - FIFO topics are supported via the
fifo_group_idandfifo_duplication_idkwargs on individual calls.
Example:
sql.insert(
data={"customer_id": "abc123", "name": "Ada"},
table="customers",
identifier="customer_id",
sns_attributes={"event": "customer-created"},
fifo_group_id="customers",
)If sns_arn is omitted, publish calls are skipped automatically.
| Method | Description |
|---|---|
connect() |
Opens a connection + cursor using the engine-specific connector. |
close() |
Closes the cursor/connection and evicts the cached connector. |
commit(commit=True) |
Commits the underlying DB connection when commit is truthy. |
insert(data, table, identifier, **kwargs) |
Validates data, enforces uniqueness on the provided identifier, inserts the row, and publishes SNS. |
update(data, table, identifier, **kwargs) |
Fetches the existing row, merges via dict_merger, runs UPDATE, publishes SNS. |
upsert(data, table, identifier, **kwargs) |
Calls update when the row exists; falls back to insert. |
get(identifier_value, table, identifier, **kwargs) |
Returns the first matching row or None. |
read(identifier_value, table, identifier, **kwargs) |
Alias of get. |
query(query, params, table, identifier, **kwargs) |
Executes a read-only statement (SELECT) and returns all rows as dictionaries. |
delete(identifier_value, table, identifier, **kwargs) |
Deletes the row, publishes SNS, and ignores missing rows. |
create_index(table_name, index_columns) |
Issues CREATE INDEX index_col1_col2 ON table_name (col1, col2) using safe identifiers. |
All identifier-based helpers sanitize names with
SAFE_IDENTIFIERto prevent SQL injection through table/column inputs.
sql = adapter(
endpoint="127.0.0.1",
database="daplug",
user="svc",
password="secret",
engine="postgres",
)
sql.connect()
sql.insert(data={"sku": "W-1000", "name": "Widget", "cost": 99}, table="inventory", identifier="sku")
rows = sql.query(
query="SELECT sku, name FROM inventory WHERE cost >= %(min_cost)s",
params={"min_cost": 50},
table="inventory",
identifier="sku",
)
print(rows)
sql.close()sql = adapter(
endpoint="127.0.0.1",
database="daplug",
user="svc",
password="secret",
engine="mysql",
autocommit=False,
)
sql.connect()
try:
sql.insert(data={"order_id": "O-1", "status": "pending"}, table="orders", identifier="order_id", commit=False)
sql.update(data={"order_id": "O-1", "status": "shipped"}, table="orders", identifier="order_id", commit=False)
sql.commit(True)
finally:
sql.close()# Share one adapter across multiple tables by overriding table + identifier per call
sql.insert(data=payload, table="orders", identifier="order_id")
sql.create_index("orders", ["status", "created_at"])| Command | Description |
|---|---|
pipenv run lint |
Runs pylint and exports HTML/JSON to coverage/lint. |
pipenv run type-check |
Runs mypy using the new Protocol types. |
pipenv run test |
Executes the unit suite (mocks only). |
pipenv run integration |
Starts Postgres + MySQL via docker-compose and runs tests/integration. |
pipenv run test_ci |
Runs unit tests and integration tests sequentially (no Docker management). |
pipenv run coverage |
Full coverage run producing HTML, XML, JUnit, and pretty reports. |
Integration tests rely on tests/integration/docker-compose.yml. The CircleCI pipeline mirrors this by launching Postgres and MySQL sidecars, waiting for them to be reachable, and then executing pipenv run coverage so artifacts are published automatically.
daplug-sql/
βββ daplug_sql/
βΒ Β βββ adapter.py # SQLAdapter implementation
βΒ Β βββ exception.py # Adapter-specific exceptions
βΒ Β βββ sql_connector.py # Engine-aware connector wrapper
βΒ Β βββ sql_connection.py # Connection caching decorators
βΒ Β βββ types/__init__.py # Shared typing helpers (Protocols, aliases)
βΒ Β βββ __init__.py # Adapter factory export
βββ tests/
βΒ Β βββ unit/ # Pure unit tests (mocks only)
βΒ Β βββ integration/ # Integration tests (Postgres + MySQL)
βββ tests/integration/docker-compose.yml
βββ Pipfile / Pipfile.lock # Runtime + dev dependencies
βββ setup.py # Packaging metadata
βββ README.md
βββ .agents/AGENTS.md # Automation/Triage playbook for agents- Fork / branch (
git checkout -b feature/amazing) pipenv install --dev- Add/change code + tests
- Run
pipenv run lint && pipenv run type-check && pipenv run test && pipenv run integration - Open a pull request and tag
@dual
Apache License 2.0 β see LICENSE.
Built to keep SQL integrations event-driven and zero-boilerplate.