Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ quarkus.datasource.jdbc.driver=io.roastedroot.pglite4j.jdbc.PgLiteDriver
quarkus.datasource.username=postgres
quarkus.datasource.password=password
quarkus.datasource.jdbc.min-size=1
quarkus.datasource.jdbc.max-size=1
quarkus.datasource.jdbc.max-size=5
quarkus.devservices.enabled=false
quarkus.hibernate-orm.dialect=org.hibernate.dialect.PostgreSQLDialect
quarkus.hibernate-orm.unsupported-properties."hibernate.boot.allow_jdbc_metadata_access"=false
Expand All @@ -82,18 +82,18 @@ spring.datasource.url=jdbc:pglite:memory://
spring.datasource.driver-class-name=io.roastedroot.pglite4j.jdbc.PgLiteDriver
spring.datasource.username=postgres
spring.datasource.password=password
spring.datasource.hikari.maximum-pool-size=1
spring.datasource.hikari.maximum-pool-size=5
spring.jpa.hibernate.ddl-auto=create-drop
spring.jpa.database-platform=org.hibernate.dialect.PostgreSQLDialect
spring.jpa.properties.hibernate.boot.allow_jdbc_metadata_access=false
```

### HikariCP - NOT TESTED
### HikariCP

```java
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:pglite:memory://");
config.setMaximumPoolSize(1);
config.setMaximumPoolSize(5);
DataSource ds = new HikariDataSource(config);
```

Expand All @@ -109,12 +109,14 @@ pglite4j/

## Status and known limitations

- [ ] **Only `memory://` is supported** — no persistent / file-backed databases yet
- [ ] **Single connection only** — PGlite is single-threaded; connection pool max size must be 1
- [x] ~~**Only `memory://` is supported**~~ — persistent / file-backed databases are not planned; the WASM backend uses an in-memory virtual filesystem (ZeroFS) with no disk I/O, which is fundamental to the architecture
- [x] ~~**Single connection only**~~ — multiple JDBC connections are now supported per database instance; requests are serialized through a single PGLite backend via a lock, so connection pools with `max-size > 1` work correctly (queries execute one at a time, not in parallel)
- [x] ~~**Error recovery**~~ — both simple and extended query protocol errors are handled correctly; PostgreSQL errors trap the WASM instance and are caught by the Java side, which resets the backend state and drains stale protocol buffers so subsequent queries work cleanly
- [ ] **No connection isolation** — PostgreSQL runs in single-user mode with one session; all connections share the same session state (transactions, session variables). Queries are serialized, so there is no data corruption, but concurrent transactions are not isolated from each other. This is fine for connection pools that use connections sequentially (borrow, use, return).
- [ ] **Server-side prepared statements disabled** — because all connections share a single PostgreSQL backend, named prepared statements (`S_1`, `S_2`, …) would collide across connections. The driver sets `prepareThreshold=0` so pgjdbc always uses the unnamed prepared statement. This has no functional impact but means PostgreSQL cannot cache query plans across executions.
- [ ] **Limited extensions** — only `plpgsql` and `dict_snowball` are bundled; adding more requires rebuilding the WASM binary
- [ ] **Startup time** — first connection has some overhead it can be optimized more
- [ ] **Startup time** — first connection has some overhead that can be optimized further
- [ ] **Binary size** — the WASM binary + pgdata resources add several MBs to the classpath
- [ ] **Error recovery** — `clear_error()` integration for automatic transaction recovery is not yet wired up

### CMA (Contiguous Memory Allocator)

Expand Down
6 changes: 3 additions & 3 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
<configuration>
<name>io.roastedroot.pglite4j.core.PGLiteModule</name>
<wasmFile>../wasm-build/output/pglite.wasi</wasmFile>
<!-- <interpreterFallback>WARN</interpreterFallback> -->
<interpretedFunctions>
<interpreterFallback>WARN</interpreterFallback>
<!-- <interpretedFunctions>
<function>2918</function>
<function>4335</function>
<function>4336</function>
Expand All @@ -81,7 +81,7 @@
<function>6394</function>
<function>6397</function>
<function>11038</function>
</interpretedFunctions>
</interpretedFunctions> -->
</configuration>
</execution>
</executions>
Expand Down
19 changes: 17 additions & 2 deletions core/src/main/java/io/roastedroot/pglite4j/core/PGLite.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ private PGLite() {

int channel = exports.getChannel();
this.bufferAddr = exports.getBufferAddr(channel);
// System.err.println("PGLite: channel=" + channel + " bufferAddr=" + bufferAddr);
} catch (IOException e) {
throw new RuntimeException("Failed to initialize PGLite", e);
}
Expand All @@ -115,7 +114,23 @@ public byte[] execProtocolRaw(byte[] message) {

for (int tick = 0; tick < 256; tick++) {
boolean producedBefore = collectReply(replies);
exports.interactiveOne();
try {
exports.interactiveOne();
} catch (RuntimeException e) {
if (exports.pglCheckError() != 0) {
// PostgreSQL hit an ERROR (e.g. relation not found).
// pgl_on_error() set the WASM-side flag and the
// instance trapped via __builtin_unreachable().
// Recover: clean up PG error state and flush the
// ErrorResponse + ReadyForQuery back through the wire.
exports.clearError();
exports.interactiveWrite(-1);
exports.interactiveOne();
collectReply(replies);
break;
}
throw e;
}
boolean producedAfter = collectReply(replies);
if (!producedBefore && !producedAfter) {
break;
Expand Down
74 changes: 53 additions & 21 deletions core/src/test/java/io/roastedroot/pglite4j/core/PGLiteTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public void selectOne() {
assertTrue(result.length > 0);

String data = PgWireCodec.parseDataRows(result);
System.out.println("SELECT 1 => " + data);
assertTrue(data.contains("1"));
}
}
Expand All @@ -27,11 +26,9 @@ public void handshake() {
try (PGLite pg = PGLite.builder().build()) {
doHandshake(pg);

// After handshake, queries should work
byte[] result = pg.execProtocolRaw(PgWireCodec.queryMessage("SELECT 42 AS answer;"));
assertNotNull(result);
String data = PgWireCodec.parseDataRows(result);
System.out.println("After handshake: SELECT 42 => " + data);
assertTrue(data.contains("42"));
}
}
Expand All @@ -41,67 +38,102 @@ public void createTableAndInsert() {
try (PGLite pg = PGLite.builder().build()) {
doHandshake(pg);

// DDL via simple query protocol
byte[] r1 =
pg.execProtocolRaw(
PgWireCodec.queryMessage("CREATE TABLE test (id INTEGER, name TEXT);"));
System.out.println("CREATE TABLE: " + r1.length + " bytes");
assertNotNull(r1);

// SERIAL column
byte[] r2 =
pg.execProtocolRaw(
PgWireCodec.queryMessage(
"CREATE TABLE test_serial (id SERIAL PRIMARY KEY, val TEXT);"));
System.out.println("CREATE TABLE SERIAL: " + r2.length + " bytes");
assertNotNull(r2);

// INSERT
byte[] r3 =
pg.execProtocolRaw(
PgWireCodec.queryMessage("INSERT INTO test VALUES (1, 'hello');"));
System.out.println("INSERT: " + r3.length + " bytes");
assertNotNull(r3);

// SELECT
byte[] r4 = pg.execProtocolRaw(PgWireCodec.queryMessage("SELECT * FROM test;"));
String data = PgWireCodec.parseDataRows(r4);
System.out.println("SELECT: " + data);
assertTrue(data.contains("hello"));
}
}

@Test
public void errorRecovery() {
try (PGLite pg = PGLite.builder().build()) {
doHandshake(pg);

byte[] r1 = pg.execProtocolRaw(PgWireCodec.queryMessage("SELECT 1;"));
assertNotNull(r1);
String data1 = PgWireCodec.parseDataRows(r1);
assertTrue(data1.contains("1"));

byte[] r2 =
pg.execProtocolRaw(
PgWireCodec.queryMessage("SELECT * FROM nonexistent_table_xyz;"));
assertNotNull(r2);
assertTrue(r2.length > 0, "Expected non-empty error response");

byte[] r3 = pg.execProtocolRaw(PgWireCodec.queryMessage("SELECT 2;"));
assertNotNull(r3);
String data3 = PgWireCodec.parseDataRows(r3);
assertTrue(data3.contains("2"), "Instance should be reusable after SQL error");
}
}

@Test
public void cmaBufferOverflow() {
try (PGLite pg = PGLite.builder().build()) {
doHandshake(pg);

int bufSize = pg.getBufferSize();
System.out.println(
"CMA buffer size: " + bufSize + " bytes (" + (bufSize / 1024) + " KB)");

// Generate a wire protocol response that exceeds the CMA buffer.
// repeat('x', N) returns an N-byte string in the DataRow message.
int repeatLen = bufSize + 1000;
String sql = "SELECT repeat('x', " + repeatLen + ");";
System.out.println("Query: SELECT repeat('x', " + repeatLen + ")");

byte[] result = pg.execProtocolRaw(PgWireCodec.queryMessage(sql));
System.out.println("Response length: " + result.length + " bytes");
assertNotNull(result);
// The response must contain the full string + wire protocol overhead
assertTrue(
result.length > repeatLen,
"Expected response > " + repeatLen + " but got " + result.length);
assertTrue(
PgWireCodec.hasReadyForQuery(result),
"Expected ReadyForQuery in overflow response");

// Verify a normal query still works after the overflow
byte[] r2 = pg.execProtocolRaw(PgWireCodec.queryMessage("SELECT 42;"));
String data = PgWireCodec.parseDataRows(r2);
System.out.println("Post-overflow query: SELECT 42 => " + data);
assertTrue(data.contains("42"), "Normal query should work after CMA overflow");
}
}

@Test
public void extendedProtocolErrorRecovery() {
try (PGLite pg = PGLite.builder().build()) {
doHandshake(pg);

byte[] r1 = pg.execProtocolRaw(PgWireCodec.queryMessage("SELECT 1;"));
assertNotNull(r1);
String data1 = PgWireCodec.parseDataRows(r1);
assertTrue(data1.contains("1"));

byte[] batch = PgWireCodec.extendedQueryBatch("SELECT * FROM nonexistent_table_xyz");
byte[] r2 = pg.execProtocolRaw(batch);
assertNotNull(r2);
assertTrue(r2.length > 0, "Expected non-empty error response");
assertTrue(
PgWireCodec.hasReadyForQuery(r2),
"Expected ReadyForQuery after extended protocol error");

byte[] r3 = pg.execProtocolRaw(PgWireCodec.queryMessage("SELECT 2;"));
assertNotNull(r3);
String data3 = PgWireCodec.parseDataRows(r3);
assertTrue(
data3.contains("2"),
"Instance should be reusable after extended protocol error");
}
}

static void doHandshake(PGLite pg) {
byte[] startup = PgWireCodec.startupMessage("postgres", "template1");
byte[] resp1 = pg.execProtocolRaw(startup);
Expand Down
122 changes: 122 additions & 0 deletions core/src/test/java/io/roastedroot/pglite4j/core/PgWireCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,128 @@ static String parseDataRows(byte[] data) {
return sb.toString();
}

// === Extended Query Protocol ===

/** Build a Parse ('P') message. */
static byte[] parseMessage(String stmtName, String sql) {
byte[] nameBytes = (stmtName + "\0").getBytes(StandardCharsets.UTF_8);
byte[] sqlBytes = (sql + "\0").getBytes(StandardCharsets.UTF_8);
int bodyLen = nameBytes.length + sqlBytes.length + 2; // 2 for numParamTypes
byte[] msg = new byte[1 + 4 + bodyLen];
msg[0] = 'P';
int len = 4 + bodyLen;
msg[1] = (byte) (len >> 24);
msg[2] = (byte) (len >> 16);
msg[3] = (byte) (len >> 8);
msg[4] = (byte) len;
int pos = 5;
System.arraycopy(nameBytes, 0, msg, pos, nameBytes.length);
pos += nameBytes.length;
System.arraycopy(sqlBytes, 0, msg, pos, sqlBytes.length);
pos += sqlBytes.length;
msg[pos] = 0;
msg[pos + 1] = 0; // 0 parameter types
return msg;
}

/** Build a Bind ('B') message with no parameters. */
static byte[] bindMessage(String portal, String stmtName) {
byte[] portalBytes = (portal + "\0").getBytes(StandardCharsets.UTF_8);
byte[] nameBytes = (stmtName + "\0").getBytes(StandardCharsets.UTF_8);
int bodyLen = portalBytes.length + nameBytes.length + 2 + 2 + 2;
byte[] msg = new byte[1 + 4 + bodyLen];
msg[0] = 'B';
int len = 4 + bodyLen;
msg[1] = (byte) (len >> 24);
msg[2] = (byte) (len >> 16);
msg[3] = (byte) (len >> 8);
msg[4] = (byte) len;
int pos = 5;
System.arraycopy(portalBytes, 0, msg, pos, portalBytes.length);
pos += portalBytes.length;
System.arraycopy(nameBytes, 0, msg, pos, nameBytes.length);
pos += nameBytes.length;
// 0 format codes, 0 parameters, 0 result format codes
msg[pos] = 0;
msg[pos + 1] = 0;
pos += 2;
msg[pos] = 0;
msg[pos + 1] = 0;
pos += 2;
msg[pos] = 0;
msg[pos + 1] = 0;
return msg;
}

/** Build a Describe ('D') message. */
static byte[] describeMessage(char type, String name) {
byte[] nameBytes = (name + "\0").getBytes(StandardCharsets.UTF_8);
int bodyLen = 1 + nameBytes.length;
byte[] msg = new byte[1 + 4 + bodyLen];
msg[0] = 'D';
int len = 4 + bodyLen;
msg[1] = (byte) (len >> 24);
msg[2] = (byte) (len >> 16);
msg[3] = (byte) (len >> 8);
msg[4] = (byte) len;
msg[5] = (byte) type;
System.arraycopy(nameBytes, 0, msg, 6, nameBytes.length);
return msg;
}

/** Build an Execute ('E') message. */
static byte[] executeMessage(String portal, int maxRows) {
byte[] portalBytes = (portal + "\0").getBytes(StandardCharsets.UTF_8);
int bodyLen = portalBytes.length + 4;
byte[] msg = new byte[1 + 4 + bodyLen];
msg[0] = 'E';
int len = 4 + bodyLen;
msg[1] = (byte) (len >> 24);
msg[2] = (byte) (len >> 16);
msg[3] = (byte) (len >> 8);
msg[4] = (byte) len;
int pos = 5;
System.arraycopy(portalBytes, 0, msg, pos, portalBytes.length);
pos += portalBytes.length;
msg[pos] = (byte) (maxRows >> 24);
msg[pos + 1] = (byte) (maxRows >> 16);
msg[pos + 2] = (byte) (maxRows >> 8);
msg[pos + 3] = (byte) maxRows;
return msg;
}

/** Build a Sync ('S') message. */
static byte[] syncMessage() {
return new byte[] {'S', 0, 0, 0, 4};
}

/** Build a complete extended query batch: Parse + Bind + Describe + Execute + Sync. */
static byte[] extendedQueryBatch(String sql) {
byte[] parse = parseMessage("", sql);
byte[] bind = bindMessage("", "");
byte[] describe = describeMessage('S', "");
byte[] execute = executeMessage("", 0);
byte[] sync = syncMessage();
byte[] batch =
new byte
[parse.length
+ bind.length
+ describe.length
+ execute.length
+ sync.length];
int pos = 0;
System.arraycopy(parse, 0, batch, pos, parse.length);
pos += parse.length;
System.arraycopy(bind, 0, batch, pos, bind.length);
pos += bind.length;
System.arraycopy(describe, 0, batch, pos, describe.length);
pos += describe.length;
System.arraycopy(execute, 0, batch, pos, execute.length);
pos += execute.length;
System.arraycopy(sync, 0, batch, pos, sync.length);
return batch;
}

private static String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ quarkus.hibernate-orm.database.generation=drop-and-create
quarkus.devservices.enabled=false

quarkus.hibernate-orm.dialect=org.hibernate.dialect.PostgreSQLDialect
quarkus.hibernate-orm.unsupported-properties."hibernate.boot.allow_jdbc_metadata_access"=false
quarkus.hibernate-orm.unsupported-properties."hibernate.boot.allow_jdbc_metadata_access"=false
Loading
Loading