Skip to content

Latest commit

 

History

History
432 lines (327 loc) · 12 KB

File metadata and controls

432 lines (327 loc) · 12 KB

Back to index

📘 DML Statements — SQL Gateway for Elasticsearch


Introduction

The SQL Gateway supports the following Data Manipulation Language (DML) operations:

  • INSERT INTO ... VALUES
  • INSERT INTO ... AS SELECT ... [ON CONFLICT ...]
  • UPDATE ... SET ... [WHERE]
  • DELETE FROM ... [WHERE]
  • COPY INTO ... (bulk ingestion)

The DML engine is:

  • schema-aware (PK, defaults, scripts, STRUCT, ARRAY)
  • pipeline-aware (default pipeline + user pipelines)
  • partition-aware (date-based routing)
  • primary-key-aware (upsert semantics)
  • version-aware (ES6 → ES9)

All DML operations are translated into Elasticsearch write APIs:

  • INSERT → index API (bulk)
  • INSERT ... AS SELECT → search + bulk index
  • UPDATE → update-by-query
  • DELETE → delete-by-query
  • COPY INTO → bulk from file

DELETE without a WHERE clause is allowed and behaves like a TRUNCATE TABLE, removing all documents from the index.

Each DML statement returns:

  case class DmlResult(
    inserted: Long = 0L,
    updated: Long = 0L,
    deleted: Long = 0L,
    rejected: Long = 0L
  ) extends QueryResult

INSERT

INSERT INTO ... VALUES

INSERT INTO table_name (col1, col2, ...)
VALUES (v1, v2, ...), (v3, v4, ...), ...;

INSERT operations:

  • use the table’s primary key to generate _id
  • pass documents through the table pipeline
  • support:
    • STRUCT
    • ARRAY
    • DEFAULT values
    • SCRIPT AS generated columns
    • NOT NULL constraints

Full Example: INSERT INTO with STRUCT

CREATE TABLE IF NOT EXISTS dql_users (
  id INT NOT NULL,
  name VARCHAR FIELDS(
    raw KEYWORD
  ) OPTIONS (fielddata = true),
  age INT SCRIPT AS (YEAR(CURRENT_DATE) - YEAR(birthdate)),
  birthdate DATE,
  profile STRUCT FIELDS(
    city VARCHAR OPTIONS (fielddata = true),
    followers INT
  )
);

INSERT INTO dql_users (id, name, birthdate, profile) VALUES
  (1, 'Alice', '1994-01-01', {city = "Paris", followers = 100}),
  (2, 'Bob',   '1984-05-10', {city = "Lyon",  followers = 50}),
  (3, 'Chloe', '1999-07-20', {city = "Paris", followers = 200}),
  (4, 'David', '1974-03-15', {city = "Marseille", followers = 10});

Behavior

  • profile → Elasticsearch object
  • name.raw → multi-field
  • DEFAULT, SCRIPT, NOT NULL → applied via pipeline
  • PK → _id generated automatically

Notes:

  • Fields not provided in the VALUES clause are simply omitted from the document (Elasticsearch does not store NULL).
  • Unknown fields are rejected if the index mapping is strict.
  • All values are validated against the Elasticsearch mapping (type, format, date patterns, etc.).
  • Nested structures (STRUCT, ARRAY<STRUCT>) must match the expected JSON shape.

INSERT INTO ... AS SELECT ... [ON CONFLICT ...]

INSERT INTO orders (order_id, customer_id, order_date, total)
AS SELECT
  id AS order_id,
  cust AS customer_id,
  date AS order_date,
  amount AS total
FROM staging_orders;

Behavior

  • The SELECT is executed by the Gateway
  • Results are inserted using the Bulk API
  • Table pipelines are applied
  • PK ensures upsert semantics

INSERT INTO ... AS SELECT ... [ON CONFLICT ...] — Validation Workflow**

Before executing an INSERT ... AS SELECT, the Gateway performs a full validation pipeline to ensure schema correctness and safe upsert behavior.

1. Index Validation

Ensures the target index name is valid.
Invalid names return a 400 error.

2. SQL Parsing

Extracts:

  • target columns
  • SELECT statement
  • ON CONFLICT clause
  • DO UPDATE flag
  • conflict target columns

3. Load Index Metadata

Loads the real Elasticsearch schema:

  • primary key
  • partitioning
  • mapping
  • settings

3.b Determine Effective Insert Columns

If the INSERT column list is omitted, the Gateway derives it from the SELECT output.

3.c Validate ON CONFLICT Rules

  • If the index has a primary key:
    • conflict target must match the PK exactly
    • INSERT must include all PK columns
  • If the index has no primary key:
    • ON CONFLICT requires an explicit conflict target
    • all conflict target columns must be included in INSERT

3.d Validate SELECT Output Columns

Ensures :

  • every INSERT column exists in the SELECT output
  • aliases are resolved
  • SELECT is valid

Otherwise, a 400 error is returned.

4. Derive Bulk Options

Determines:

  • _id generation (PK or composite PK)
  • partitioning suffix
  • upsert behavior (update = DO UPDATE)

5. Build Document Source

  • For VALUES: convert to JSON array
  • For SELECT: scroll the query and convert rows to JSON

6. Bulk Insert

Uses the Bulk API with:

  • PK-based _id
  • partitioning
  • pipeline execution
  • upsert if DO UPDATE

Returns:

DmlResult(inserted = N, rejected = M)

Notes

  • All columns listed in the INSERT target must be present in the SELECT list.
  • The order of columns does not matter; matching is done by column name.
  • Extra columns in the SELECT that are not part of the target table are ignored.
  • Nested structures (STRUCT, ARRAY<STRUCT>) are supported as long as the selected expressions produce valid JSON structures for the target fields.
  • No type validation is performed at the SQL layer; type errors are reported by Elasticsearch at indexing time.

UPDATE ... SET ... [WHERE]

Syntax

UPDATE table_name
SET col1 = expr1, col2 = expr2, ...
WHERE condition;

Behavior

  • UPDATE is implemented using Elasticsearch update-by-query.
  • Only top-level scalar fields or entire nested objects can be replaced.
  • Updating a specific element inside an ARRAY<STRUCT> is not supported.
  • SET field = NULL removes the field from the document (Elasticsearch does not store SQL NULL).
  • Expressions such as SET x = x + 1 are not supported (no script-based incremental updates).
  • Fields not present in the mapping cannot be added unless dynamic mapping is enabled.

DELETE FROM ... [WHERE]

Syntax

DELETE FROM table_name
WHERE condition;

Behavior

  • DELETE is implemented using Elasticsearch delete-by-query.
  • WHERE is optional.
    • If provided, only matching documents are deleted.
    • If omitted, all documents in the index are deleted (equivalent to TRUNCATE TABLE).
  • Deleting a nested field or an element inside an array is not supported; only whole documents can be removed.

COPY INTO ...

COPY INTO is a DML operator that loads documents from external files into an Elasticsearch index.
It uses only the Bulk API, not the SQL engine.

Syntax

COPY INTO table_name
FROM 'path/to/file'
[FILE_FORMAT = 'JSON' | 'JSON_ARRAY' | 'PARQUET' | 'DELTA_LAKE']
[ON CONFLICT (pk_column) DO UPDATE];

Behavior

COPY INTO performs:

  1. Index name validation
  2. Loading of the real Elasticsearch schema
    • mapping
    • primary key
    • partitioning
  3. Primary key extraction
    • PK → _id
    • composite PK → concatenated _id
  4. Partitioning extraction
    • suffix index name based on date
  5. Bulk ingestion via bulkFromFile
  6. Pipeline execution
  7. Return of DmlResult
    • inserted = successfully indexed docs
    • rejected = Bulk failures

There are no strategies like insertAfter, updateAfter, or deleteAfter.
COPY INTO does not perform SQL-level operations — everything is Bulk.

Full Example

  • Table Definition
CREATE TABLE IF NOT EXISTS copy_into_test (
  uuid KEYWORD NOT NULL,
  name VARCHAR,
  birthDate DATE,
  childrenCount INT,
  PRIMARY KEY (uuid)
);
  • Data File (example_data.json)
{"uuid": "A12", "name": "Homer Simpson", "birthDate": "1967-11-21", "childrenCount": 0}
{"uuid": "A14", "name": "Moe Szyslak", "birthDate": "1967-11-21", "childrenCount": 0}
{"uuid": "A16", "name": "Barney Gumble", "birthDate": "1969-05-09", "childrenCount": 2}
  • COPY INTO Statement
COPY INTO copy_into_test
FROM 's3://my-bucket/path/to/example_data.json'
FILE_FORMAT = 'JSON'
ON CONFLICT (uuid) DO UPDATE;

Behavior

  • PK = uuid_id = uuid
  • ON CONFLICT DO UPDATE → Bulk upsert
  • Table pipeline is applied
  • Partitioning is applied if defined
  • Returns DmlResult(inserted = N, rejected = 0)

Remote File System Support

COPY INTO transparently supports remote file systems by auto-detecting the URI scheme in the FROM path. No SQL syntax change is required — simply use the appropriate URI scheme.

URI scheme File system Required JAR
s3a:// or s3:// AWS S3 hadoop-aws
abfs://, abfss://, wasb://, wasbs:// Azure ADLS Gen2 / Blob Storage hadoop-azure
gs:// Google Cloud Storage gcs-connector-hadoop3
hdfs:// HDFS (bundled with hadoop-client)
(no scheme / local path) Local filesystem (no extra JAR needed)

Important: Cloud connector JARs are declared as provided dependencies and are not bundled in the library. They must be present in the runtime classpath (e.g. added to the CLI assembly or the application's fat-jar).

Credentials configuration

Authentication is resolved automatically from standard environment variables:

AWS S3

AWS_ACCESS_KEY_ID         # access key (falls back to DefaultAWSCredentialsProviderChain)
AWS_SECRET_ACCESS_KEY     # secret key
AWS_SESSION_TOKEN         # session token (optional)
AWS_REGION                # region (or AWS_DEFAULT_REGION)
AWS_ENDPOINT_URL          # custom endpoint for S3-compatible stores (MinIO, LocalStack, …)

Azure ADLS Gen2 / Blob Storage

AZURE_STORAGE_ACCOUNT_NAME   # storage account name
AZURE_STORAGE_ACCOUNT_KEY    # shared key (Option 1)
AZURE_CLIENT_ID              # service principal client ID   (Option 2 — OAuth2)
AZURE_CLIENT_SECRET          # service principal secret      (Option 2 — OAuth2)
AZURE_TENANT_ID              # Azure tenant ID               (Option 2 — OAuth2)
AZURE_STORAGE_SAS_TOKEN      # SAS token                     (Option 3)

Google Cloud Storage

GOOGLE_APPLICATION_CREDENTIALS   # path to service-account JSON key file
GOOGLE_CLOUD_PROJECT             # GCS project ID (optional)

Falls back to Application Default Credentials (Workload Identity, gcloud auth, …) when the variable is absent.

HDFS

HADOOP_CONF_DIR    # directory containing core-site.xml and hdfs-site.xml
HADOOP_USER_NAME   # Hadoop user name (optional)

Per-user Hadoop overrides

Any *.xml file placed in ~/.softclient4es/ is loaded on top of the auto-detected configuration. This allows fine-grained property overrides without changing environment variables.

<!-- ~/.softclient4es/s3a-override.xml -->
<configuration>
  <property>
    <name>fs.s3a.connection.maximum</name>
    <value>200</value>
  </property>
</configuration>

DML Lifecycle Example

INSERT INTO dml_chain (id, value) VALUES
  (1, 10),
  (2, 20),
  (3, 30);

UPDATE dml_chain
SET value = 50
WHERE id IN (1, 3);

DELETE FROM dml_chain
WHERE value > 40;

SELECT * FROM dml_chain ORDER BY id ASC;

Version Compatibility

Feature ES6 ES7 ES8 ES9
INSERT
INSERT AS SELECT
UPDATE
DELETE
COPY INTO
JSON
JSON_ARRAY
PARQUET
DELTA_LAKE

Back to index