Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,9 @@ void heartbeat() {
private void testConnection() {
PooledConnection conn = null;
try {
// Get a connection from the pool and test it
conn = getPooledConnection();
// Get a connection from the pool and test it (heartbeat obtain is
// excluded from the pool usage metrics - it is not application use)
conn = getPooledConnection(true);
heartbeatPoolExhaustedCount = 0;
if (testConnection(conn)) {
notifyDataSourceIsUp();
Expand Down Expand Up @@ -596,7 +597,7 @@ void removeClosedConnection(PooledConnection pooledConnection) {
* must be removed and closed fully.
*/
private void returnTheConnection(PooledConnection pooledConnection, boolean forceClose) {
if (poolListener != null && !forceClose) {
if (poolListener != null && !forceClose && !pooledConnection.heartbeat()) {
poolListener.onBeforeReturnConnection(pooledConnection);
}
queue.returnPooledConnection(pooledConnection, forceClose);
Expand Down Expand Up @@ -664,7 +665,16 @@ public Connection getConnection() throws SQLException {
* will go into a wait if the pool has hit its maximum size.
*/
private PooledConnection getPooledConnection() throws SQLException {
PooledConnection c = queue.obtainConnection();
return getPooledConnection(false);
}

private PooledConnection getPooledConnection(boolean heartbeat) throws SQLException {
PooledConnection c = queue.obtainConnection(heartbeat);
c.setHeartbeat(heartbeat);
if (heartbeat) {
// not application use so skip poolListener etc
return c;
}
if (captureStackTrace) {
c.setStackTrace(Thread.currentThread().getStackTrace());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ final class PooledConnection extends ConnectionDelegator {
private boolean failoverToReadOnly;
private boolean autoCommit;
private boolean readOnly;
private boolean heartbeat;
private int transactionIsolation;
private int schemaState = SCHEMA_CATALOG_UNKNOWN;
private int catalogState = SCHEMA_CATALOG_UNKNOWN;
Expand Down Expand Up @@ -976,6 +977,21 @@ void setStackTrace(StackTraceElement[] stackTrace) {
this.stackTrace = stackTrace;
}

/**
* Mark this connection as borrowed for internal heartbeat validation
* (rather than application use).
*/
void setHeartbeat(boolean heartbeat) {
this.heartbeat = heartbeat;
}

/**
* Return true if this connection was borrowed for internal heartbeat validation.
*/
boolean heartbeat() {
return heartbeat;
}

/**
* Return the stackTrace as a String for logging purposes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,12 @@ private boolean stale(PooledConnection c) {
}

PooledConnection obtainConnection() throws SQLException {
return obtainConnection(false);
}

PooledConnection obtainConnection(boolean heartbeat) throws SQLException {
try {
PooledConnection pc = _obtainConnection();
PooledConnection pc = _obtainConnection(heartbeat);
pc.resetForUse();
return pc;

Expand All @@ -214,16 +218,17 @@ private int registerBusyConnection(PooledConnection connection) {
return busySize;
}

private PooledConnection _obtainConnection() throws InterruptedException, SQLException {
private PooledConnection _obtainConnection(boolean heartbeat) throws InterruptedException, SQLException {
var start = System.nanoTime();
lock.lockInterruptibly();
try {
if (doingShutdown) {
throw new SQLException("Trying to access the Connection Pool when it is shutting down");
}
// this includes attempts that fail with InterruptedException
// or SQLException but that is ok as its only an indicator
hitCount++;
// exclude heartbeat from application metrics
if (!heartbeat) {
hitCount++;
}
// are other threads already waiting? (they get priority)
if (waitingThreads == 0) {
PooledConnection connection = extractFromFreeList();
Expand All @@ -238,17 +243,23 @@ private PooledConnection _obtainConnection() throws InterruptedException, SQLExc
try {
// The pool is at maximum size. We are going to go into
// a wait loop until connections are returned into the pool.
waitCount++;
if (!heartbeat) {
waitCount++;
}
waitingThreads++;
return _obtainConnectionWaitLoop();
} finally {
waitingThreads--;
totalWaitNanos += (System.nanoTime() - start);
if (!heartbeat) {
totalWaitNanos += (System.nanoTime() - start);
}
}
} finally {
final var elapsed = System.nanoTime() - start;
totalAcquireNanos += elapsed;
maxAcquireNanos = Math.max(maxAcquireNanos, elapsed);
if (!heartbeat) {
final var elapsed = System.nanoTime() - start;
totalAcquireNanos += elapsed;
maxAcquireNanos = Math.max(maxAcquireNanos, elapsed);
}
lock.unlock();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ void testPoolFullWithHeartbeat() throws Exception {
assertThat(down).isEqualTo(1);

PoolStatus status = pool.status(true);
assertThat(status.waitCount()).isGreaterThan(0);
assertThat(status.totalWaitMicros()).isBetween(0L, 9_000_000L);
// heartbeat validation waits on the full pool are no longer counted as
// application use, and this single-threaded app never waits, so the wait
// metrics are now zero (previously the heartbeat contention inflated them)
assertThat(status.waitCount()).isEqualTo(0);
assertThat(status.totalWaitMicros()).isEqualTo(0);
assertThat(status.totalAcquireMicros()).isBetween(0L, 20_000_000L);
assertThat(status.maxAcquireMicros()).isBetween(0L, 3_000_000L);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.ebean.datasource.pool;

import io.ebean.datasource.DataSourceConfig;
import io.ebean.datasource.DataSourcePoolListener;
import io.ebean.datasource.PoolStatus;
import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;

class ConnectionPoolHeartbeatMetricsTest {

private final AtomicInteger borrowed = new AtomicInteger();
private final AtomicInteger returned = new AtomicInteger();

private ConnectionPool createPool() {
DataSourceConfig config = new DataSourceConfig();
config.setUrl("jdbc:h2:mem:heartbeatMetrics");
config.setUsername("sa");
config.setPassword("");
config.setMinConnections(2);
config.setMaxConnections(4);
config.setListener(new DataSourcePoolListener() {
@Override
public void onAfterBorrowConnection(Connection connection) {
borrowed.incrementAndGet();
}

@Override
public void onBeforeReturnConnection(Connection connection) {
returned.incrementAndGet();
}
});
return new ConnectionPool("heartbeatMetrics", config);
}

@Test
void heartbeat_doesNotAffectUsageMetricsOrListener() throws SQLException {
ConnectionPool pool = createPool();
try {
// application use - establishes the baseline usage metrics + listener counts
try (Connection c1 = pool.getConnection(); Connection c2 = pool.getConnection()) {
c1.rollback();
c2.rollback();
}

PoolStatus before = pool.status(false);
assertThat(before.hitCount()).isEqualTo(2);
assertThat(borrowed.get()).isEqualTo(2);
assertThat(returned.get()).isEqualTo(2);

// heartbeat validation obtains/validates a connection multiple times
for (int i = 0; i < 5; i++) {
pool.heartbeat();
}

PoolStatus after = pool.status(false);
// heartbeat validation must NOT be counted as application use
assertThat(after.hitCount()).isEqualTo(before.hitCount());
assertThat(after.waitCount()).isEqualTo(before.waitCount());
assertThat(after.totalWaitMicros()).isEqualTo(before.totalWaitMicros());
assertThat(after.maxAcquireMicros()).isEqualTo(before.maxAcquireMicros());
// and must NOT fire the borrow/return listener callbacks
assertThat(borrowed.get()).isEqualTo(2);
assertThat(returned.get()).isEqualTo(2);
} finally {
pool.shutdown();
}
}
}
Loading