Schema-Driven Cypher Normalization & Event Publishing for Python
daplug-cypher brings the ergonomics of an adapter patthern to graph databases. It bundles Cypher-friendly schema mapping, optimistic concurrency, and SNS event fan-out so your graph services stay DRY, version-safe, and event-drivenβwhether you deploy to Neo4j or AWS Neptune (openCypher).
- Unified factory β
daplug_cypher.adapter(**kwargs)returns a ready-to-go adapter with SNS support, just likedaplug_ddb. - Schema mapping β Reuse OpenAPI/JSON schemas to validate and normalize payloads before writing nodes or relationships.
- Optimistic concurrency β Guard updates with identifier + version keys; the adapter enforces atomic Cypher
SETsemantics. - Relationship helpers β Convenience methods that enforce safe Cypher patterns for creating/deleting relationships.
- Backend flexibility β Supply
bolt={...}for Neo4j,neptune={...}for Neptune, or both; the adapter chooses the right driver config automatically. - Per-operation targeting β Pass
node,identifier, andidempotence_keyto each call so shared adapters can manage multiple labels safely. - Per-call SNS metadata β Supply
sns_attributeswhen writing to annotate events with request-specific context.
pip install daplug-cypher
# pipenv install daplug-cypher
# poetry add daplug-cypher
# uv pip install daplug-cypherfrom daplug_cypher import adapter
graph = adapter(
bolt={
"url": "bolt://localhost:7687",
"user": "neo4j",
"password": "password",
},
schema_file="openapi.yml",
schema="CustomerModel",
)
payload = {
"customer_id": "abc123",
"name": "Ada",
"version": 1,
}
graph.create(data=payload, node="Customer")
result = graph.read(
query="MATCH (c:Customer) WHERE c.customer_id = $id RETURN c",
placeholder={"id": "abc123"},
node="Customer",
)
print(result["Customer"][0]["name"])
graph.create(
data=payload,
node="Customer",
sns_attributes={"source": "api"},
)Because the adapter is schema-aware, every write can opt into mapping by passing schema. Skip it when you want to persist the payload exactly as provided. Select the node label (and identifiers) per call so a single adapter can service multiple models, and add sns_attributes when you want to decorate published events with request-specific context.
from daplug_cypher import adapter
graph = adapter(bolt={"url": "bolt://localhost:7687", "user": "neo4j", "password": "password"})
# CREATE ---------------------------------------------------------------
graph.create(
data={"customer_id": "abc123", "name": "Ada", "version": 1},
node="Customer",
sns_attributes={"event": "customer-created"},
)
# READ / MATCH ---------------------------------------------------------
graph.read(
query="MATCH (c:Customer) WHERE c.customer_id = $id RETURN c",
placeholder={"id": "abc123"},
node="Customer",
)
# QUERY (raw parameterized Cypher) ------------------------------------
graph.query(
query="MATCH (c:Customer) WHERE c.customer_id = $id RETURN c",
placeholder={"id": "abc123"},
sns_attributes={"source": "reporting"},
)
# UPDATE (optimistic) --------------------------------------------------
graph.update(
data={"status": "vip"},
query="MATCH (c:Customer) WHERE c.customer_id = $id RETURN c",
placeholder={"id": "abc123"},
original_idempotence_value=1,
node="Customer",
identifier="customer_id",
idempotence_key="version",
sns_attributes={"event": "customer-updated"},
)
# DELETE ---------------------------------------------------------------
graph.delete(
delete_identifier="abc123",
node="Customer",
identifier="customer_id",
sns_attributes={"event": "customer-deleted"},
)
# RELATIONSHIP HELPERS -------------------------------------------------
graph.create_relationship(
query="""
MATCH (c:Customer), (o:Order)
WHERE c.customer_id = $customer AND o.order_id = $order
CREATE (c)-[:PLACED]->(o)
RETURN c, o
""",
placeholder={"customer": "abc123", "order": "o-789"},
sns_attributes={"event": "relationship-created"},
)
graph.delete_relationship(
query="""
MATCH (c:Customer)-[r:PLACED]->(o:Order)
WHERE c.customer_id = $customer AND o.order_id = $order
DETACH DELETE r
""",
placeholder={"customer": "abc123", "order": "o-789"},
sns_attributes={"event": "relationship-deleted"},
)Each method mirrors the DynamoDB adapter API: provide per-call metadata, and the adapter handles schema normalization, optimistic locking, driver orchestration, and optional SNS fan-out.
graph = adapter(
bolt={"url": "bolt://localhost:7687", "user": "neo4j", "password": "password"},
neptune={"url": "bolt://neptune-endpoint:8182", "user": "user", "password": "secret"},
)Provide both dictionaries to allow local Neo4j development with a production Neptune endpoint. When neptune is supplied it wins; otherwise bolt is used.
Use the same adapter instance for different node types by passing the appropriate label to each call (e.g., graph.create(..., node="Order")).
graph.update(
data={"order_id": "abc123", "updated_at": 2, "status": "shipped"},
query="MATCH (o:Order) WHERE o.order_id = $id RETURN o",
placeholder={"id": "abc123"},
original_idempotence_value=1, # the previous value of updated_at
node="Order",
identifier="order_id",
idempotence_key="updated_at",
sns_attributes={"event": "status-change"},
)If another session updates the node first, the adapter raises ValueError("ATOMIC ERROR...") rather than overwriting silently.
graph.create(data={"customer_id": "abc123", "version": 1}, node="Customer")
graph.create(data={"order_id": "o-789", "version": 1}, node="Order")
graph.create_relationship(
query="""
MATCH (c:Customer), (o:Order)
WHERE c.customer_id = $customer AND o.order_id = $order
CREATE (c)-[:PLACED]->(o)
RETURN c, o
""",
placeholder={"customer": "abc123", "order": "o-789"},
)
graph.delete_relationship(
query="""
MATCH (c:Customer)-[r:PLACED]->(o:Order)
WHERE c.customer_id = $customer AND o.order_id = $order
DETACH DELETE r
""",
placeholder={"customer": "abc123", "order": "o-789"},
)Validation ensures relationship queries include edge notation and destructive operations actually delete nodes/relationships.
graph = adapter(
bolt={...},
sns_arn="arn:aws:sns:us-east-2:123456789012:customers",
sns_attributes={"service": "crm"},
)
graph.delete(delete_identifier="abc123", node="Customer", identifier="customer_id")Each CRUD helper automatically publishes an SNS message when sns_arn is set. Provide default metadata through sns_attributes at adapter construction (for example {"service": "crm"}) and add request-specific context per call: graph.create(..., sns_attributes={"source": "api"}). Per-call keys override adapter defaults, operation is injected automatically, and None values are stripped so events remain clean. Non-string values are sent using the appropriate SNS Number type.
We split fast unit tests from integration suites targeting Neo4j and Neptune-compatible endpoints.
# Unit tests (pure Python, heavy mocking)
pipenv run test
# Integration suites
pipenv run test_neo4j # requires Neo4j Bolt endpoint (defaults to bolt://localhost:7687)
pipenv run test_neptune # reuses Bolt settings, can point at Neptune or LocalStack
# Coverage (Neo4j suite under coverage)
pipenv run coverageEnvironment variables to override defaults:
| Variable | Purpose | Default |
|---|---|---|
NEO4J_BOLT_URL |
Neo4j Bolt connection URI | bolt://localhost:7687 |
NEO4J_USER / _PASSWORD |
Neo4j credentials | neo4j / password |
NEPTUNE_BOLT_URL |
Neptune Bolt-compatible endpoint | falls back to Neo4j |
NEPTUNE_USER / _PASSWORD |
Neptune credentials | falls back to Neo4j |
.circleci/config.yml mirrors the DynamoDB project:
install-buildinstalls dependencies and persists the workspace.lintandtype-checkrunpipenv run lintandpipenv run mypy.test-neo4jandtest-neptunerun pytest markers in parallel; the Neptune job provisions a LocalStack container for compatibility checks.install-build-publishretains the token-based PyPI workflow.
- Python 3.10+
- Pipenv
- Docker (for running Neo4j or LocalStack locally)
git clone https://github.com/paulcruse3/daplug-cypher.git
cd daplug-cypher
pipenv install --devpipenv run lint # pylint (JSON + HTML report)
pipenv run mypy # static typing (post-phase polish)
pipenv run test # unit tests
pipenv run test_neo4j # integration suite (requires Bolt endpoint)
pipenv run test_neptune # integration suite (LocalStack/Neptune)When running Neo4j via Docker, set NEO4J_AUTH=neo4j/password before docker run so the tests can authenticate automatically.
daplug-cypher/
βββ daplug_cypher/
β βββ adapter.py # Cypher adapter implementation
β βββ common/ # Shared schema, merge, logging, publisher helpers
β βββ cypher/ # Parameter + serialization utilities
β βββ types/ # Shared TypedDict/type aliases (reused by common)
β βββ __init__.py # Public adapter factory & exports
βββ tests/
β βββ integration/ # Neo4j & Neptune pytest suites
β βββ unit/ # Mock-based unit coverage for every module
βββ .circleci/config.yml # CI pipeline
βββ Pipfile # Runtime & dev dependencies + scripts
βββ setup.py / setup.cfg # Packaging metadata & pytest config
βββ README.md # You are hereWe welcome issues and pull requests! Please ensure linting, typing, and both integration suites pass before submitting.
git checkout -b feature/amazing-cypher
# make your changes
pipenv run lint
pipenv run mypy
pipenv run test
pipenv run test_neo4j
pipenv run test_neptune
git commit -am "feat: amazing cypher enhancement"
git push origin feature/amazing-cypherApache License 2.0 β see LICENSE for the full text.
Built to keep Cypher integrations as clean and predictable