From 67bba1473bf1b0ab4c467aca2d46baec9735836a Mon Sep 17 00:00:00 2001 From: Noemi Praml Date: Tue, 18 Mar 2025 14:12:59 +0100 Subject: [PATCH] add listeners to control max connections ADD: forceTrim zwischencommit test erweitert + fix revert logback revert pool as parameter fix test --- .../io/ebean/datasource/DataSourcePool.java | 2 + .../datasource/DataSourcePoolListener.java | 44 +++++++ ebean-datasource/pom.xml | 9 +- .../ebean/datasource/pool/ConnectionPool.java | 47 ++++++- .../datasource/pool/FreeConnectionBuffer.java | 4 +- .../datasource/pool/PooledConnection.java | 15 ++- .../pool/PooledConnectionQueue.java | 26 +++- .../ebean/datasource/test/MultipoolTest.java | 123 ++++++++++++++++++ 8 files changed, 259 insertions(+), 11 deletions(-) create mode 100644 ebean-datasource/src/test/java/io/ebean/datasource/test/MultipoolTest.java diff --git a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java index 97186c8..8bab7a2 100644 --- a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java +++ b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java @@ -102,6 +102,8 @@ static DataSourceBuilder builder() { */ SQLException dataSourceDownReason(); + int forceTrim(int trimCount); + /** * Set a new maximum size. *

diff --git a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePoolListener.java b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePoolListener.java index 5471c51..edf8b18 100644 --- a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePoolListener.java +++ b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePoolListener.java @@ -14,15 +14,59 @@ */ public interface DataSourcePoolListener { + /** + * Called before a connection has been created + */ + default void onBeforeCreateConnection(DataSourcePool pool) {} + + /** + * Called after a connection has been created + */ + default void onAfterCreateConnection(DataSourcePool pool, Connection connection) {} + + /** + * Called before a connection has been retrieved from the connection pool + */ + default void onBeforeBorrowConnection(DataSourcePool pool) {} + /** * Called after a connection has been retrieved from the connection pool */ + default void onAfterBorrowConnection(DataSourcePool pool, Connection connection) {onAfterBorrowConnection(connection);} + + /** + * Called after a connection has been retrieved from the connection pool. + * + * Migrate to onAfterBorrowConnection(DataSourcePool pool, Connection connection) + */ + @Deprecated default void onAfterBorrowConnection(Connection connection) {} /** * Called before a connection will be put back to the connection pool */ + default void onBeforeReturnConnection(DataSourcePool pool, Connection connection) {onBeforeReturnConnection(connection);} + + /** + * Called before a connection will be put back to the connection pool. + * + * Migrate to onBeforeReturnConnection(DataSourcePool pool, Connection connection) + */ + @Deprecated default void onBeforeReturnConnection(Connection connection) {} + /** + * Called after a connection will be put back to the connection pool + */ + default void onAfterReturnConnection(DataSourcePool pool) {} + /** + * Called before a connection has been closed + */ + default void onBeforeCloseConnection(DataSourcePool pool, Connection connection) {} + + /** + * Called after a connection had been closed + */ + default void onAfterCloseConnection(DataSourcePool pool) {} } diff --git a/ebean-datasource/pom.xml b/ebean-datasource/pom.xml index 9958cf4..84d636f 100644 --- a/ebean-datasource/pom.xml +++ b/ebean-datasource/pom.xml @@ -29,7 +29,7 @@ io.ebean ebean-test-containers - 7.3 + 7.6 test @@ -83,6 +83,13 @@ test + + org.mariadb.jdbc + mariadb-java-client + 3.5.1 + test + + diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java index 2a5ae03..88854e2 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java @@ -157,6 +157,8 @@ void pstmtCacheMetrics(PstmtCache pstmtCache) { pscRem.add(pstmtCache.removeCount()); } + + final class HeartBeatRunnable extends TimerTask { @Override public void run() { @@ -446,7 +448,20 @@ private Connection initConnection(Connection conn) throws SQLException { } private Connection createConnection() throws SQLException { - return initConnection(source.getConnection()); + + if (poolListener != null) { + poolListener.onBeforeCreateConnection(this); + } + Connection connection = initConnection(source.getConnection()); + if (poolListener != null) { + poolListener.onAfterCreateConnection(this, connection); + } + return connection; + } + + @Override + public int forceTrim(int trimCount) { + return queue.forceTrim(trimCount); } @Override @@ -559,9 +574,12 @@ void removeClosedConnection(PooledConnection pooledConnection) { */ private void returnTheConnection(PooledConnection pooledConnection, boolean forceClose) { if (poolListener != null && !forceClose) { - poolListener.onBeforeReturnConnection(pooledConnection); + poolListener.onBeforeReturnConnection(this, pooledConnection); } queue.returnPooledConnection(pooledConnection, forceClose); + if (poolListener != null && !forceClose) { + poolListener.onAfterReturnConnection(this); + } } void returnConnectionReset(PooledConnection pooledConnection) { @@ -570,6 +588,17 @@ void returnConnectionReset(PooledConnection pooledConnection) { reset(); } + void onBeforeCloseConnection(PooledConnection pooledConnection) { + if (poolListener != null) { + poolListener.onBeforeCloseConnection(this, pooledConnection); + } + } + + void onAfterCloseConnection() { + if (poolListener != null) { + poolListener.onAfterCloseConnection(this); + } + } /** * Grow the pool by creating a new connection. The connection can either be * added to the available list, or returned. @@ -608,7 +637,14 @@ private void reset() { */ @Override public Connection getConnection(String username, String password) throws SQLException { - return initConnection(source.getConnection(username, password)); + if (poolListener != null) { + poolListener.onBeforeCreateConnection(this); + } + Connection connection = initConnection(source.getConnection(username, password)); + if (poolListener != null) { + poolListener.onAfterCreateConnection(this, connection); + } + return connection; } /** @@ -626,12 +662,15 @@ public Connection getConnection() throws SQLException { * will go into a wait if the pool has hit its maximum size. */ private PooledConnection getPooledConnection() throws SQLException { + if (poolListener != null) { + poolListener.onBeforeBorrowConnection(this); + } PooledConnection c = queue.obtainConnection(); if (captureStackTrace) { c.setStackTrace(Thread.currentThread().getStackTrace()); } if (poolListener != null) { - poolListener.onAfterBorrowConnection(c); + poolListener.onAfterBorrowConnection(this, c); } return c; } diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java index a78ee39..a0e1c34 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java @@ -63,12 +63,12 @@ void closeAll(boolean logErrors) { /** * Trim any inactive connections that have not been used since usedSince. */ - int trim(int minSize, long usedSince, long createdSince) { + int trim(int minSize, long usedSince, long createdSince, boolean forced) { int trimCount = 0; ListIterator iterator = freeBuffer.listIterator(minSize); while (iterator.hasNext()) { PooledConnection pooledConnection = iterator.next(); - if (pooledConnection.shouldTrim(usedSince, createdSince)) { + if (pooledConnection.shouldTrim(usedSince, createdSince, forced)) { iterator.remove(); pooledConnection.closeConnectionFully(true); trimCount++; diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java index 88ad9cc..cddafd7 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java @@ -37,6 +37,11 @@ final class PooledConnection extends ConnectionDelegator { */ private static final String REASON_RESET = "reset"; + /** + * Marker for when connection is closed due a forced trim. + */ + private static final String REASON_FORCED = "forced"; + /** * Set when connection is idle in the pool. In general when in the pool the * connection should not be modified. @@ -237,6 +242,7 @@ void closeConnectionFully(boolean logErrors) { } if (pool != null) { pool.pstmtCacheMetrics(pstmtCache); + pool.onBeforeCloseConnection(this); } try { if (connection.isClosed()) { @@ -276,6 +282,7 @@ void closeConnectionFully(boolean logErrors) { try { connection.close(); pool.dec(); + pool.onAfterCloseConnection(); } catch (SQLException ex) { if (logErrors || Log.isLoggable(System.Logger.Level.DEBUG)) { Log.error("Error when fully closing connection [" + fullDescription() + "]", ex); @@ -363,7 +370,7 @@ private PreparedStatement prepareStatement(String sql, boolean useFlag, int flag lastStatement = sql; // try to get a matching cached PStmt from the cache. ExtendedPreparedStatement pstmt = pstmtCache.remove(cacheKey); - if (pstmt != null) { + if (pstmt != null && !pstmt.isClosed()) { return pstmt.reset(); } @@ -543,7 +550,7 @@ boolean shouldTrimOnReturn(long lastResetTime, long maxAgeMillis) { /** * Return true if the connection has been idle for too long or is too old. */ - boolean shouldTrim(long usedSince, long createdSince) { + boolean shouldTrim(long usedSince, long createdSince, boolean forced) { if (lastUseTime < usedSince) { // been idle for too long so trim it this.closeReason = REASON_IDLE; @@ -554,6 +561,10 @@ boolean shouldTrim(long usedSince, long createdSince) { this.closeReason = REASON_MAXAGE; return true; } + if (forced) { + this.closeReason = REASON_FORCED; + return true; + } return false; } diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java index 7a66c98..041f84d 100644 --- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java +++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java @@ -5,6 +5,7 @@ import io.ebean.datasource.pool.ConnectionPool.Status; import java.sql.SQLException; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -376,10 +377,10 @@ private boolean trimInactiveConnections(long maxInactiveMillis, long maxAgeMilli if (freeList.size() > minSize) { // trim on maxInactive and maxAge long usedSince = System.currentTimeMillis() - maxInactiveMillis; - trimmedCount = freeList.trim(minSize, usedSince, createdSince); + trimmedCount = freeList.trim(minSize, usedSince, createdSince, false); } else if (createdSince > 0) { // trim only on maxAge - trimmedCount = freeList.trim(0, createdSince, createdSince); + trimmedCount = freeList.trim(0, createdSince, createdSince, false); } else { trimmedCount = 0; } @@ -401,6 +402,27 @@ private void closeFreeConnections(boolean logErrors) { } } + int forceTrim(int trimCount) { + int trimmedCount = 0; + lock.lock(); + try { + int trimStart = freeList.size() - trimCount; + trimStart = Math.max(trimStart, 0); + + if (freeList.size() > trimStart) { + trimmedCount = freeList.trim(trimStart, 0, 0, true); + } + } finally { + lock.unlock(); + } + if (trimmedCount != 0) { + if (Log.isLoggable(DEBUG)) { + Log.debug("DataSource [{0}] forced trimmed [{1}] connections. New size[{2}]", name, trimmedCount, totalConnections()); + } + } + return trimmedCount; + } + /** * Close any busy connections that have not been used for some time. *

diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/test/MultipoolTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/test/MultipoolTest.java new file mode 100644 index 0000000..c796a16 --- /dev/null +++ b/ebean-datasource/src/test/java/io/ebean/datasource/test/MultipoolTest.java @@ -0,0 +1,123 @@ +package io.ebean.datasource.test; + +import io.ebean.datasource.DataSourceBuilder; +import io.ebean.datasource.DataSourcePool; +import io.ebean.datasource.DataSourcePoolListener; +import io.ebean.test.containers.MariaDBContainer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +@Disabled("run manually") +class MultipoolTest { + + private static MariaDBContainer container; + + private static ExecutorService executor; + + @BeforeAll + static void before() { + container = MariaDBContainer.builder("latest") + .dbName("unit") + .user("unit") + .password("unit") + .build(); + + container.start(); + + executor = Executors.newCachedThreadPool(); + + } + + @AfterAll + static void after() { + executor.shutdownNow(); + } + + static class PoolManager implements DataSourcePoolListener { + List pools = new ArrayList<>(); + Semaphore semaphore = new Semaphore(120); + Random random = new Random(); + + @Override + public void onBeforeCreateConnection(DataSourcePool pool) { + try { + while (!semaphore.tryAcquire(50, TimeUnit.MILLISECONDS)) { + System.out.println("trim required"); + pools.get(random.nextInt(pools.size())).forceTrim(25); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onAfterCloseConnection(DataSourcePool pool) { + semaphore.release(); + } + + } + + private static PoolManager poolManager = new PoolManager(); + + @Test + void testFalseFriendRollback() throws Exception { + + DataSourcePool pool1 = getPool(); + DataSourcePool pool2 = getPool(); + + try { + consumeConnections(pool1, 100); + //pool1.forceTrim(70); + consumeConnections(pool2, 100); + } finally { + pool1.shutdown(); + pool2.shutdown(); + } + + } + + void consumeConnections(DataSourcePool pool, int connectionsCount) throws Exception { + List> futures = new ArrayList<>(); + + for (int i = 0; i < connectionsCount; i++) { + Future submit = executor.submit(() -> { + try (Connection conn = pool.getConnection()) { + Thread.sleep(1000); + conn.rollback(); + } + return true; + }); + futures.add(submit); + } + + for (Future future : futures) { + future.get(); + } + } + + private static DataSourcePool getPool() { + DataSourcePool pool = DataSourceBuilder.create() + .url(container.jdbcUrl()) + .username("unit") + .password("unit") + .ownerUsername("unit") + .ownerPassword("unit") + .maxConnections(100) + .listener(poolManager) + .build(); + poolManager.pools.add(pool); + return pool; + } +}