diff --git a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java
index 97186c8..8bab7a2 100644
--- a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java
+++ b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePool.java
@@ -102,6 +102,8 @@ static DataSourceBuilder builder() {
*/
SQLException dataSourceDownReason();
+ int forceTrim(int trimCount);
+
/**
* Set a new maximum size.
*
diff --git a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePoolListener.java b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePoolListener.java
index 5471c51..edf8b18 100644
--- a/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePoolListener.java
+++ b/ebean-datasource-api/src/main/java/io/ebean/datasource/DataSourcePoolListener.java
@@ -14,15 +14,59 @@
*/
public interface DataSourcePoolListener {
+ /**
+ * Called before a connection has been created
+ */
+ default void onBeforeCreateConnection(DataSourcePool pool) {}
+
+ /**
+ * Called after a connection has been created
+ */
+ default void onAfterCreateConnection(DataSourcePool pool, Connection connection) {}
+
+ /**
+ * Called before a connection has been retrieved from the connection pool
+ */
+ default void onBeforeBorrowConnection(DataSourcePool pool) {}
+
/**
* Called after a connection has been retrieved from the connection pool
*/
+ default void onAfterBorrowConnection(DataSourcePool pool, Connection connection) {onAfterBorrowConnection(connection);}
+
+ /**
+ * Called after a connection has been retrieved from the connection pool.
+ *
+ * Migrate to onAfterBorrowConnection(DataSourcePool pool, Connection connection)
+ */
+ @Deprecated
default void onAfterBorrowConnection(Connection connection) {}
/**
* Called before a connection will be put back to the connection pool
*/
+ default void onBeforeReturnConnection(DataSourcePool pool, Connection connection) {onBeforeReturnConnection(connection);}
+
+ /**
+ * Called before a connection will be put back to the connection pool.
+ *
+ * Migrate to onBeforeReturnConnection(DataSourcePool pool, Connection connection)
+ */
+ @Deprecated
default void onBeforeReturnConnection(Connection connection) {}
+ /**
+ * Called after a connection will be put back to the connection pool
+ */
+ default void onAfterReturnConnection(DataSourcePool pool) {}
+ /**
+ * Called before a connection has been closed
+ */
+ default void onBeforeCloseConnection(DataSourcePool pool, Connection connection) {}
+
+ /**
+ * Called after a connection had been closed
+ */
+ default void onAfterCloseConnection(DataSourcePool pool) {}
}
diff --git a/ebean-datasource/pom.xml b/ebean-datasource/pom.xml
index 9958cf4..84d636f 100644
--- a/ebean-datasource/pom.xml
+++ b/ebean-datasource/pom.xml
@@ -29,7 +29,7 @@
io.ebean
ebean-test-containers
- 7.3
+ 7.6
test
@@ -83,6 +83,13 @@
test
+
+ org.mariadb.jdbc
+ mariadb-java-client
+ 3.5.1
+ test
+
+
diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java
index 2a5ae03..88854e2 100644
--- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java
+++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/ConnectionPool.java
@@ -157,6 +157,8 @@ void pstmtCacheMetrics(PstmtCache pstmtCache) {
pscRem.add(pstmtCache.removeCount());
}
+
+
final class HeartBeatRunnable extends TimerTask {
@Override
public void run() {
@@ -446,7 +448,20 @@ private Connection initConnection(Connection conn) throws SQLException {
}
private Connection createConnection() throws SQLException {
- return initConnection(source.getConnection());
+
+ if (poolListener != null) {
+ poolListener.onBeforeCreateConnection(this);
+ }
+ Connection connection = initConnection(source.getConnection());
+ if (poolListener != null) {
+ poolListener.onAfterCreateConnection(this, connection);
+ }
+ return connection;
+ }
+
+ @Override
+ public int forceTrim(int trimCount) {
+ return queue.forceTrim(trimCount);
}
@Override
@@ -559,9 +574,12 @@ void removeClosedConnection(PooledConnection pooledConnection) {
*/
private void returnTheConnection(PooledConnection pooledConnection, boolean forceClose) {
if (poolListener != null && !forceClose) {
- poolListener.onBeforeReturnConnection(pooledConnection);
+ poolListener.onBeforeReturnConnection(this, pooledConnection);
}
queue.returnPooledConnection(pooledConnection, forceClose);
+ if (poolListener != null && !forceClose) {
+ poolListener.onAfterReturnConnection(this);
+ }
}
void returnConnectionReset(PooledConnection pooledConnection) {
@@ -570,6 +588,17 @@ void returnConnectionReset(PooledConnection pooledConnection) {
reset();
}
+ void onBeforeCloseConnection(PooledConnection pooledConnection) {
+ if (poolListener != null) {
+ poolListener.onBeforeCloseConnection(this, pooledConnection);
+ }
+ }
+
+ void onAfterCloseConnection() {
+ if (poolListener != null) {
+ poolListener.onAfterCloseConnection(this);
+ }
+ }
/**
* Grow the pool by creating a new connection. The connection can either be
* added to the available list, or returned.
@@ -608,7 +637,14 @@ private void reset() {
*/
@Override
public Connection getConnection(String username, String password) throws SQLException {
- return initConnection(source.getConnection(username, password));
+ if (poolListener != null) {
+ poolListener.onBeforeCreateConnection(this);
+ }
+ Connection connection = initConnection(source.getConnection(username, password));
+ if (poolListener != null) {
+ poolListener.onAfterCreateConnection(this, connection);
+ }
+ return connection;
}
/**
@@ -626,12 +662,15 @@ public Connection getConnection() throws SQLException {
* will go into a wait if the pool has hit its maximum size.
*/
private PooledConnection getPooledConnection() throws SQLException {
+ if (poolListener != null) {
+ poolListener.onBeforeBorrowConnection(this);
+ }
PooledConnection c = queue.obtainConnection();
if (captureStackTrace) {
c.setStackTrace(Thread.currentThread().getStackTrace());
}
if (poolListener != null) {
- poolListener.onAfterBorrowConnection(c);
+ poolListener.onAfterBorrowConnection(this, c);
}
return c;
}
diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java
index a78ee39..a0e1c34 100644
--- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java
+++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/FreeConnectionBuffer.java
@@ -63,12 +63,12 @@ void closeAll(boolean logErrors) {
/**
* Trim any inactive connections that have not been used since usedSince.
*/
- int trim(int minSize, long usedSince, long createdSince) {
+ int trim(int minSize, long usedSince, long createdSince, boolean forced) {
int trimCount = 0;
ListIterator iterator = freeBuffer.listIterator(minSize);
while (iterator.hasNext()) {
PooledConnection pooledConnection = iterator.next();
- if (pooledConnection.shouldTrim(usedSince, createdSince)) {
+ if (pooledConnection.shouldTrim(usedSince, createdSince, forced)) {
iterator.remove();
pooledConnection.closeConnectionFully(true);
trimCount++;
diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java
index 88ad9cc..cddafd7 100644
--- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java
+++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnection.java
@@ -37,6 +37,11 @@ final class PooledConnection extends ConnectionDelegator {
*/
private static final String REASON_RESET = "reset";
+ /**
+ * Marker for when connection is closed due a forced trim.
+ */
+ private static final String REASON_FORCED = "forced";
+
/**
* Set when connection is idle in the pool. In general when in the pool the
* connection should not be modified.
@@ -237,6 +242,7 @@ void closeConnectionFully(boolean logErrors) {
}
if (pool != null) {
pool.pstmtCacheMetrics(pstmtCache);
+ pool.onBeforeCloseConnection(this);
}
try {
if (connection.isClosed()) {
@@ -276,6 +282,7 @@ void closeConnectionFully(boolean logErrors) {
try {
connection.close();
pool.dec();
+ pool.onAfterCloseConnection();
} catch (SQLException ex) {
if (logErrors || Log.isLoggable(System.Logger.Level.DEBUG)) {
Log.error("Error when fully closing connection [" + fullDescription() + "]", ex);
@@ -363,7 +370,7 @@ private PreparedStatement prepareStatement(String sql, boolean useFlag, int flag
lastStatement = sql;
// try to get a matching cached PStmt from the cache.
ExtendedPreparedStatement pstmt = pstmtCache.remove(cacheKey);
- if (pstmt != null) {
+ if (pstmt != null && !pstmt.isClosed()) {
return pstmt.reset();
}
@@ -543,7 +550,7 @@ boolean shouldTrimOnReturn(long lastResetTime, long maxAgeMillis) {
/**
* Return true if the connection has been idle for too long or is too old.
*/
- boolean shouldTrim(long usedSince, long createdSince) {
+ boolean shouldTrim(long usedSince, long createdSince, boolean forced) {
if (lastUseTime < usedSince) {
// been idle for too long so trim it
this.closeReason = REASON_IDLE;
@@ -554,6 +561,10 @@ boolean shouldTrim(long usedSince, long createdSince) {
this.closeReason = REASON_MAXAGE;
return true;
}
+ if (forced) {
+ this.closeReason = REASON_FORCED;
+ return true;
+ }
return false;
}
diff --git a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java
index 7a66c98..041f84d 100644
--- a/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java
+++ b/ebean-datasource/src/main/java/io/ebean/datasource/pool/PooledConnectionQueue.java
@@ -5,6 +5,7 @@
import io.ebean.datasource.pool.ConnectionPool.Status;
import java.sql.SQLException;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -376,10 +377,10 @@ private boolean trimInactiveConnections(long maxInactiveMillis, long maxAgeMilli
if (freeList.size() > minSize) {
// trim on maxInactive and maxAge
long usedSince = System.currentTimeMillis() - maxInactiveMillis;
- trimmedCount = freeList.trim(minSize, usedSince, createdSince);
+ trimmedCount = freeList.trim(minSize, usedSince, createdSince, false);
} else if (createdSince > 0) {
// trim only on maxAge
- trimmedCount = freeList.trim(0, createdSince, createdSince);
+ trimmedCount = freeList.trim(0, createdSince, createdSince, false);
} else {
trimmedCount = 0;
}
@@ -401,6 +402,27 @@ private void closeFreeConnections(boolean logErrors) {
}
}
+ int forceTrim(int trimCount) {
+ int trimmedCount = 0;
+ lock.lock();
+ try {
+ int trimStart = freeList.size() - trimCount;
+ trimStart = Math.max(trimStart, 0);
+
+ if (freeList.size() > trimStart) {
+ trimmedCount = freeList.trim(trimStart, 0, 0, true);
+ }
+ } finally {
+ lock.unlock();
+ }
+ if (trimmedCount != 0) {
+ if (Log.isLoggable(DEBUG)) {
+ Log.debug("DataSource [{0}] forced trimmed [{1}] connections. New size[{2}]", name, trimmedCount, totalConnections());
+ }
+ }
+ return trimmedCount;
+ }
+
/**
* Close any busy connections that have not been used for some time.
*
diff --git a/ebean-datasource/src/test/java/io/ebean/datasource/test/MultipoolTest.java b/ebean-datasource/src/test/java/io/ebean/datasource/test/MultipoolTest.java
new file mode 100644
index 0000000..c796a16
--- /dev/null
+++ b/ebean-datasource/src/test/java/io/ebean/datasource/test/MultipoolTest.java
@@ -0,0 +1,123 @@
+package io.ebean.datasource.test;
+
+import io.ebean.datasource.DataSourceBuilder;
+import io.ebean.datasource.DataSourcePool;
+import io.ebean.datasource.DataSourcePoolListener;
+import io.ebean.test.containers.MariaDBContainer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+@Disabled("run manually")
+class MultipoolTest {
+
+ private static MariaDBContainer container;
+
+ private static ExecutorService executor;
+
+ @BeforeAll
+ static void before() {
+ container = MariaDBContainer.builder("latest")
+ .dbName("unit")
+ .user("unit")
+ .password("unit")
+ .build();
+
+ container.start();
+
+ executor = Executors.newCachedThreadPool();
+
+ }
+
+ @AfterAll
+ static void after() {
+ executor.shutdownNow();
+ }
+
+ static class PoolManager implements DataSourcePoolListener {
+ List pools = new ArrayList<>();
+ Semaphore semaphore = new Semaphore(120);
+ Random random = new Random();
+
+ @Override
+ public void onBeforeCreateConnection(DataSourcePool pool) {
+ try {
+ while (!semaphore.tryAcquire(50, TimeUnit.MILLISECONDS)) {
+ System.out.println("trim required");
+ pools.get(random.nextInt(pools.size())).forceTrim(25);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onAfterCloseConnection(DataSourcePool pool) {
+ semaphore.release();
+ }
+
+ }
+
+ private static PoolManager poolManager = new PoolManager();
+
+ @Test
+ void testFalseFriendRollback() throws Exception {
+
+ DataSourcePool pool1 = getPool();
+ DataSourcePool pool2 = getPool();
+
+ try {
+ consumeConnections(pool1, 100);
+ //pool1.forceTrim(70);
+ consumeConnections(pool2, 100);
+ } finally {
+ pool1.shutdown();
+ pool2.shutdown();
+ }
+
+ }
+
+ void consumeConnections(DataSourcePool pool, int connectionsCount) throws Exception {
+ List> futures = new ArrayList<>();
+
+ for (int i = 0; i < connectionsCount; i++) {
+ Future submit = executor.submit(() -> {
+ try (Connection conn = pool.getConnection()) {
+ Thread.sleep(1000);
+ conn.rollback();
+ }
+ return true;
+ });
+ futures.add(submit);
+ }
+
+ for (Future> future : futures) {
+ future.get();
+ }
+ }
+
+ private static DataSourcePool getPool() {
+ DataSourcePool pool = DataSourceBuilder.create()
+ .url(container.jdbcUrl())
+ .username("unit")
+ .password("unit")
+ .ownerUsername("unit")
+ .ownerPassword("unit")
+ .maxConnections(100)
+ .listener(poolManager)
+ .build();
+ poolManager.pools.add(pool);
+ return pool;
+ }
+}