diff --git a/driver-core/src/main/com/mongodb/internal/connection/BackpressureErrorLabeler.java b/driver-core/src/main/com/mongodb/internal/connection/BackpressureErrorLabeler.java
new file mode 100644
index 0000000000..aae7cb05dd
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/connection/BackpressureErrorLabeler.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.connection;
+
+import com.mongodb.MongoException;
+import com.mongodb.MongoSocketException;
+
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLProtocolException;
+import java.net.UnknownHostException;
+import java.security.cert.CertPathBuilderException;
+import java.security.cert.CertPathValidatorException;
+import java.security.cert.CertificateException;
+import java.util.Locale;
+
+/**
+ * Attaches {@link MongoException#SYSTEM_OVERLOADED_ERROR_LABEL} and
+ * {@link MongoException#RETRYABLE_ERROR_LABEL} to network errors encountered during connection
+ * establishment or the hello message, per the CMAP specification.
+ *
+ *
This is topology-agnostic: it must be invoked from the connection-establishment path so that
+ * both default SDAM and load-balanced modes are covered.
+ */
+final class BackpressureErrorLabeler {
+
+ private BackpressureErrorLabeler() {
+ }
+
+ static void applyLabelsIfEligible(final Throwable t) {
+ if (!(t instanceof MongoSocketException)) {
+ return;
+ }
+ MongoSocketException socketException = (MongoSocketException) t;
+ if (isDnsLookupFailure(socketException)) {
+ return;
+ }
+ if (isTlsConfigurationError(socketException)) {
+ return;
+ }
+ // TODO-BACKPRESSURE Nabil - SOCKS5 Revisit alongside JAVA-5205 (SOCKS5 in async) so both sync and
+ // async proxy error surfaces can be handled together — likely via a dedicated internal
+ // exception thrown from the proxy code path.
+ socketException.addLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL);
+ socketException.addLabel(MongoException.RETRYABLE_ERROR_LABEL);
+ }
+
+ private static boolean isDnsLookupFailure(final MongoSocketException t) {
+ Throwable cause = t.getCause();
+ while (cause != null) {
+ if (cause instanceof UnknownHostException) {
+ return true;
+ }
+ cause = cause.getCause();
+ }
+ return false;
+ }
+
+ private static boolean isTlsConfigurationError(final MongoSocketException t) {
+ Throwable cause = t.getCause();
+ while (cause != null) {
+ if (cause instanceof CertificateException
+ || cause instanceof CertPathBuilderException
+ || cause instanceof CertPathValidatorException
+ || cause instanceof SSLPeerUnverifiedException
+ || cause instanceof SSLProtocolException) {
+ return true;
+ }
+ if (cause instanceof SSLHandshakeException) {
+ String message = cause.getMessage();
+ if (message != null) {
+ String lowerMessage = message.toLowerCase(Locale.ROOT);
+ if (lowerMessage.contains("certificate")
+ || lowerMessage.contains("verify")
+ || lowerMessage.contains("trust")
+ || lowerMessage.contains("hostname")
+ || lowerMessage.contains("protocol")
+ || lowerMessage.contains("cipher")
+ // PKIX path building/validation failures surface as SSLHandshakeException
+ // when the underlying CertPath* cause is not in the chain.
+ || lowerMessage.contains("pkix")
+ // Any "Received fatal alert: X" from OpenJDK's JSSE provider means the
+ // server actively answered with a TLS protocol error — not an overload
+ // signal. Catches all 25 RFC handshake alert descriptions in one rule.
+ || lowerMessage.contains("received fatal alert")) {
+ return true;
+ }
+ }
+ }
+ cause = cause.getCause();
+ }
+ return false;
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java
index af4acd8c03..371e709d80 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java
@@ -137,6 +137,9 @@ private void handleException(final SdamIssue sdamIssue, final boolean beforeHand
serverMonitor.connect();
} else if (sdamIssue.relatedToNetworkNotTimeout()
|| (beforeHandshake && (sdamIssue.relatedToNetworkTimeout() || sdamIssue.relatedToAuth()))) {
+ if (sdamIssue.hasSystemOverloadedLabel()) {
+ return;
+ }
updateDescription(sdamIssue.serverDescription());
connectionPool.invalidate(sdamIssue.exception().orElse(null));
serverMonitor.cancelCurrentCheck();
diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java
index 6b20c46719..09e28d7c84 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java
@@ -230,9 +230,11 @@ public void open(final OperationContext originalOperationContext) {
isTrue("Open already called", stream == null);
stream = streamFactory.create(serverId.getAddress());
OperationContext operationContext = originalOperationContext;
+ boolean beforeHandshake = true;
try {
stream.open(operationContext);
InternalConnectionInitializationDescription initializationDescription = connectionInitializer.startHandshake(this, operationContext);
+ beforeHandshake = false;
operationContext = operationContext.withOverride(TimeoutContext::withNewlyStartedMaintenanceTimeout);
initAfterHandshakeStart(initializationDescription);
@@ -241,6 +243,9 @@ public void open(final OperationContext originalOperationContext) {
initAfterHandshakeFinish(initializationDescription);
} catch (Throwable t) {
close();
+ if (beforeHandshake) {
+ BackpressureErrorLabeler.applyLabelsIfEligible(t);
+ }
if (t instanceof MongoException) {
throw (MongoException) t;
} else {
@@ -263,6 +268,7 @@ public void completed(@Nullable final Void aVoid) {
(initialResult, initialException) -> {
if (initialException != null) {
close();
+ BackpressureErrorLabeler.applyLabelsIfEligible(initialException);
callback.onResult(null, initialException);
} else {
assertNotNull(initialResult);
@@ -278,11 +284,13 @@ public void completed(@Nullable final Void aVoid) {
@Override
public void failed(final Throwable t) {
close();
+ BackpressureErrorLabeler.applyLabelsIfEligible(t);
callback.onResult(null, t);
}
});
} catch (Throwable t) {
close();
+ BackpressureErrorLabeler.applyLabelsIfEligible(t);
callback.onResult(null, t);
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java b/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java
index 7f014d7ede..4b989193c0 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java
@@ -17,6 +17,7 @@
package com.mongodb.internal.connection;
import com.mongodb.MongoCommandException;
+import com.mongodb.MongoException;
import com.mongodb.MongoNodeIsRecoveringException;
import com.mongodb.MongoNotPrimaryException;
import com.mongodb.MongoSecurityException;
@@ -162,6 +163,11 @@ boolean relatedToWriteConcern() {
return exception instanceof MongoWriteConcernWithResponseException;
}
+ boolean hasSystemOverloadedLabel() {
+ return exception instanceof MongoException
+ && ((MongoException) exception).hasErrorLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL);
+ }
+
private static boolean stale(@Nullable final Throwable t, final ServerDescription currentServerDescription) {
return TopologyVersionHelper.topologyVersion(t)
.map(candidateTopologyVersion -> TopologyVersionHelper.newerOrEqual(
diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java
index d1b1f27a90..52287a23d1 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java
@@ -113,6 +113,7 @@ protected void applyApplicationError(final BsonDocument applicationError) {
switch (when) {
case "beforeHandshakeCompletes":
+ BackpressureErrorLabeler.applyLabelsIfEligible(exception);
server.sdamServerDescriptionManager().handleExceptionBeforeHandshake(
SdamIssue.of(exception, new SdamIssue.Context(server.serverId(), errorGeneration, maxWireVersion)));
break;
diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/BackpressureErrorLabelerTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/BackpressureErrorLabelerTest.java
new file mode 100644
index 0000000000..b07a4240cd
--- /dev/null
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/BackpressureErrorLabelerTest.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.connection;
+
+import com.mongodb.MongoCredential;
+import com.mongodb.MongoException;
+import com.mongodb.MongoSecurityException;
+import com.mongodb.MongoSocketException;
+import com.mongodb.MongoSocketOpenException;
+import com.mongodb.MongoSocketReadTimeoutException;
+import com.mongodb.ServerAddress;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLProtocolException;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.security.cert.CertPathBuilderException;
+import java.security.cert.CertPathValidatorException;
+import java.security.cert.CertificateException;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class BackpressureErrorLabelerTest {
+
+ private static final ServerAddress ADDRESS = new ServerAddress();
+
+ static Stream> networkErrorShouldBeLabeled() {
+ return Stream.of(
+ named(new MongoSocketException("boom", ADDRESS)),
+ named(new MongoSocketReadTimeoutException("slow", ADDRESS, new IOException("read timed out"))),
+ named(new MongoSocketOpenException("open failed", ADDRESS, new IOException("connection refused"))),
+ // FIN-during-handshake: server closed the TCP connection while the client was mid-handshake
+ // (no protocol-level alert). I/O failure → must be labeled per CMAP "I/O error during TLS handshake".
+ named(new MongoSocketException("tls", ADDRESS, initCause(
+ new SSLHandshakeException("Remote host terminated the handshake"),
+ new EOFException("SSL peer shut down incorrectly"))))
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void networkErrorShouldBeLabeled(final MongoSocketException e) {
+ BackpressureErrorLabeler.applyLabelsIfEligible(e);
+ assertHasBackpressureLabels(e);
+ }
+
+ static Stream> dnsFailureShouldNotBeLabeled() {
+ return Stream.of(
+ named(new MongoSocketException("lookup failed", ADDRESS, new UnknownHostException("nope"))),
+ named(new MongoSocketException("wrap", ADDRESS, new IOException("wrap", new UnknownHostException("nope"))))
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void dnsFailureShouldNotBeLabeled(final MongoSocketException e) {
+ BackpressureErrorLabeler.applyLabelsIfEligible(e);
+ assertLacksBackpressureLabels(e);
+ }
+
+ static Stream> localTlsConfigErrorShouldNotBeLabeled() {
+ return Stream.of(
+ named(new CertificateException("bad cert")),
+ named(new CertPathBuilderException("path build failed")),
+ named(new CertPathValidatorException("validation failed")),
+ named(new SSLPeerUnverifiedException("peer not verified")),
+ named(new SSLProtocolException("protocol error")),
+ named(new SSLHandshakeException("PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: "
+ + "unable to find valid certification path to requested target"))
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void localTlsConfigErrorShouldNotBeLabeled(final Throwable cause) {
+ MongoSocketException e = new MongoSocketException("tls", ADDRESS, cause);
+ BackpressureErrorLabeler.applyLabelsIfEligible(e);
+ assertLacksBackpressureLabels(e);
+ }
+
+ /**
+ * "Received fatal alert: " means the peer actively answered with a TLS protocol
+ * error — definitively a config/protocol issue, not an overload signal. Covers all 25
+ * handshake-only RFC alert descriptions emitted by OpenJDK's JSSE provider.
+ */
+ @ParameterizedTest(name = "Received fatal alert: {0}")
+ @ValueSource(strings = {
+ "handshake_failure",
+ "no_certificate",
+ "bad_certificate",
+ "unsupported_certificate",
+ "certificate_revoked",
+ "certificate_expired",
+ "certificate_unknown",
+ "illegal_parameter",
+ "unknown_ca",
+ "access_denied",
+ "decode_error",
+ "decrypt_error",
+ "export_restriction",
+ "protocol_version",
+ "insufficient_security",
+ "no_renegotiation",
+ "missing_extension",
+ "unsupported_extension",
+ "certificate_unobtainable",
+ "unrecognized_name",
+ "bad_certificate_status_response",
+ "bad_certificate_hash_value",
+ "unknown_psk_identity",
+ "certificate_required",
+ "no_application_protocol"
+ })
+ void receivedTlsAlertShouldNotBeLabeled(final String alertDescription) {
+ SSLHandshakeException tls = new SSLHandshakeException("Received fatal alert: " + alertDescription);
+ MongoSocketException e = new MongoSocketException("tls", ADDRESS, tls);
+ BackpressureErrorLabeler.applyLabelsIfEligible(e);
+ assertLacksBackpressureLabels(e);
+ }
+
+ static Stream> nonSocketErrorShouldNotBeLabeled() {
+ return Stream.of(
+ named(new MongoSecurityException(
+ MongoCredential.createCredential("user", "db", "pwd".toCharArray()), "auth failed")),
+ named(new MongoException(42, "some command error")),
+ named(new IOException("raw"))
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void nonSocketErrorShouldNotBeLabeled(final Throwable e) {
+ BackpressureErrorLabeler.applyLabelsIfEligible(e);
+ if (e instanceof MongoException) {
+ assertLacksBackpressureLabels((MongoException) e);
+ }
+ }
+
+ private static Named named(final T e) {
+ return Named.of(e.getClass().getSimpleName(), e);
+ }
+
+ private static T initCause(final T exception, final Throwable cause) {
+ exception.initCause(cause);
+ return exception;
+ }
+
+ private static void assertHasBackpressureLabels(final MongoException e) {
+ assertTrue(e.hasErrorLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL),
+ "expected SystemOverloadedError label");
+ assertTrue(e.hasErrorLabel(MongoException.RETRYABLE_ERROR_LABEL),
+ "expected RetryableError label");
+ }
+
+ private static void assertLacksBackpressureLabels(final MongoException e) {
+ assertFalse(e.hasErrorLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL),
+ "unexpected SystemOverloadedError label");
+ assertFalse(e.hasErrorLabel(MongoException.RETRYABLE_ERROR_LABEL),
+ "unexpected RetryableError label");
+ }
+}
diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy
index 230301e903..e2a6043226 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy
@@ -51,6 +51,7 @@ import org.bson.BsonInt32
import org.bson.codecs.BsonDocumentCodec
import spock.lang.Specification
+import java.security.cert.CertificateException
import java.util.concurrent.CountDownLatch
import static com.mongodb.ClusterFixture.CLIENT_METADATA
@@ -258,6 +259,56 @@ class DefaultServerSpecification extends Specification {
]
}
+ def 'should invalidate the pool when the exception without system overloaded label'() {
+ given:
+ assert !exceptionToThrow.hasErrorLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL)
+ def connectionPool = Mock(ConnectionPool)
+ connectionPool.get(_) >> { throw exceptionToThrow }
+ def serverMonitor = Mock(ServerMonitor)
+ def server = defaultServer(connectionPool, serverMonitor)
+
+ when:
+ server.getConnection(createOperationContext())
+
+ then:
+ def e = thrown(MongoException)
+ e.is(exceptionToThrow)
+ 1 * connectionPool.invalidate(exceptionToThrow)
+ 1 * serverMonitor.cancelCurrentCheck()
+
+ where:
+ exceptionToThrow << [
+ new MongoSocketException('establishment failed', new ServerAddress()),
+ new MongoSocketOpenException('open failed', new ServerAddress(), new IOException()),
+ new MongoSocketReadTimeoutException('Read timed out', new ServerAddress(), new IOException()),
+ new MongoSocketException('DNS lookup failed', new ServerAddress(),
+ new UnknownHostException('no such host')),
+ new MongoSocketException('TLS config error', new ServerAddress(),
+ new CertificateException('bad cert')),
+ ]
+ }
+
+ def 'should not invalidate the pool when the exception carries SystemOverloadedError'() {
+ given:
+ def exceptionToThrow = new MongoSocketException('rate-limited establishment', new ServerAddress())
+ exceptionToThrow.addLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL)
+
+ def connectionPool = Mock(ConnectionPool)
+ connectionPool.get(_) >> { throw exceptionToThrow }
+ def serverMonitor = Mock(ServerMonitor)
+ def server = defaultServer(connectionPool, serverMonitor)
+
+ when:
+ server.getConnection(createOperationContext())
+
+ then:
+ def e = thrown(MongoException)
+ e.is(exceptionToThrow)
+ e.hasErrorLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL)
+ 0 * connectionPool.invalidate(_)
+ 0 * serverMonitor.cancelCurrentCheck()
+ }
+
def 'failed authentication should invalidate the connection pool'() {
given:
def connectionPool = Mock(ConnectionPool)
diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java
index 095372a6ce..0406a8a9bb 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java
@@ -54,9 +54,6 @@ public class ServerDiscoveryAndMonitoringTest extends AbstractServerDiscoveryAnd
public ServerDiscoveryAndMonitoringTest(final String description, final BsonDocument definition) {
super(definition);
- assumeFalse("https://jira.mongodb.org/browse/JAVA-5949",
- description.equals("error_handling_handshake.json: Network timeouts before and after the handshake completes"));
-
this.description = description;
init(serverAddress -> NO_OP_SERVER_LISTENER, NO_OP_CLUSTER_LISTENER);
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java
index 18b3b3f4fc..8e20fdf211 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java
@@ -18,6 +18,7 @@
import com.mongodb.ClusterFixture;
import com.mongodb.MongoClientSettings;
+import com.mongodb.event.ConnectionCheckOutFailedEvent;
import com.mongodb.event.ConnectionPoolClearedEvent;
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.event.ConnectionPoolReadyEvent;
@@ -26,6 +27,7 @@
import com.mongodb.event.ServerHeartbeatSucceededEvent;
import com.mongodb.event.ServerListener;
import com.mongodb.event.ServerMonitorListener;
+import com.mongodb.internal.connection.TestConnectionPoolListener;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.time.TimePointTest;
@@ -47,6 +49,8 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import static com.mongodb.ClusterFixture.configureFailPoint;
@@ -268,6 +272,72 @@ public void shouldEmitHeartbeatStartedBeforeSocketIsConnected() {
// As it requires mocking and package access to `com.mongodb.internal.connection`
}
+ /**
+ * See
+ * Connection Pool Backpressure.
+ */
+ @Test
+ public void testConnectionPoolBackpressure() throws InterruptedException {
+ assumeTrue(serverVersionAtLeast(7, 0));
+
+ TestConnectionPoolListener connectionPoolListener = new TestConnectionPoolListener();
+
+ MongoClientSettings clientSettings = getMongoClientSettingsBuilder()
+ .applyToConnectionPoolSettings(builder -> builder
+ .maxConnecting(100)
+ .addConnectionPoolListener(connectionPoolListener))
+ .build();
+
+ try (MongoClient adminClient = MongoClients.create(getMongoClientSettingsBuilder().build());
+ MongoClient client = MongoClients.create(clientSettings)) {
+
+ MongoDatabase adminDatabase = adminClient.getDatabase("admin");
+ MongoDatabase database = client.getDatabase(getDefaultDatabaseName());
+ MongoCollection collection = database.getCollection("testCollection");
+
+ try {
+ adminDatabase.runCommand(new Document("setParameter", 1)
+ .append("ingressConnectionEstablishmentRateLimiterEnabled", true));
+ adminDatabase.runCommand(new Document("setParameter", 1)
+ .append("ingressConnectionEstablishmentRatePerSec", 20));
+ adminDatabase.runCommand(new Document("setParameter", 1)
+ .append("ingressConnectionEstablishmentBurstCapacitySecs", 1));
+ adminDatabase.runCommand(new Document("setParameter", 1)
+ .append("ingressConnectionEstablishmentMaxQueueDepth", 1));
+
+ collection.insertOne(Document.parse("{}"));
+
+ ExecutorService executor = Executors.newFixedThreadPool(100);
+ try {
+ for (int i = 0; i < 100; i++) {
+ executor.submit(() ->
+ collection.find(new Document("$where", "function() { sleep(2000); return true; }")).first());
+ }
+ executor.shutdown();
+ assertTrue("Executor did not terminate within timeout",
+ executor.awaitTermination(20, SECONDS));
+ } finally {
+ if (!executor.isTerminated()) {
+ executor.shutdownNow();
+ }
+ }
+
+ int failedCheckOutCount = connectionPoolListener.countEvents(ConnectionCheckOutFailedEvent.class);
+ assertTrue("Expected at least 10 ConnectionCheckOutFailedEvents, but got " + failedCheckOutCount,
+ failedCheckOutCount >= 10);
+ assertEquals(0, connectionPoolListener.countEvents(ConnectionPoolClearedEvent.class));
+ } finally {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ adminDatabase.runCommand(new Document("setParameter", 1)
+ .append("ingressConnectionEstablishmentRateLimiterEnabled", false));
+ }
+ }
+ }
+
private static void assertPoll(final BlockingQueue> queue, @Nullable final Class> allowed, final Set> required)
throws InterruptedException {
assertPoll(queue, allowed, required, Timeout.expiresIn(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS, ZERO_DURATION_MEANS_EXPIRED));
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java
index b2718b4b2d..1bfe2e19a6 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java
@@ -26,6 +26,7 @@
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.event.CommandSucceededEvent;
import com.mongodb.event.ConnectionCheckOutFailedEvent;
+import com.mongodb.event.ConnectionCheckedInEvent;
import com.mongodb.event.ConnectionClosedEvent;
import com.mongodb.event.ConnectionCreatedEvent;
import com.mongodb.event.ConnectionPoolClearedEvent;
@@ -208,6 +209,12 @@ public void waitForConnectionPoolEvents(final String client, final BsonDocument
case "connectionReadyEvent":
eventClass = ConnectionReadyEvent.class;
break;
+ case "connectionClosedEvent":
+ eventClass = ConnectionClosedEvent.class;
+ break;
+ case "connectionCheckedInEvent":
+ eventClass = ConnectionCheckedInEvent.class;
+ break;
default:
throw new UnsupportedOperationException("Unsupported event: " + event.getFirstKey());
}
@@ -436,11 +443,18 @@ private static boolean serverDescriptionChangedEventMatches(final BsonDocument e
switch (newType) {
case "Unknown":
return event.getNewDescription().getType() == ServerType.UNKNOWN;
- case "LoadBalancer": {
+ case "LoadBalancer":
return event.getNewDescription().getType() == ServerType.LOAD_BALANCER;
- }
+ case "Mongos":
+ return event.getNewDescription().getType() == ServerType.SHARD_ROUTER;
+ case "Standalone":
+ return event.getNewDescription().getType() == ServerType.STANDALONE;
+ case "RSPrimary":
+ return event.getNewDescription().getType() == ServerType.REPLICA_SET_PRIMARY;
+ case "RSSecondary":
+ return event.getNewDescription().getType() == ServerType.REPLICA_SET_SECONDARY;
default:
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Unsupported server type " + newType);
}
}
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
index cf003078f0..a6f14688b4 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java
@@ -800,6 +800,8 @@ private OperationResult executeWaitForEvent(final UnifiedTestContext context, fi
case "poolReadyEvent":
case "connectionCreatedEvent":
case "connectionReadyEvent":
+ case "connectionClosedEvent":
+ case "connectionCheckedInEvent":
context.getEventMatcher().waitForConnectionPoolEvents(clientId, event, count, entities.getConnectionPoolListener(clientId));
break;
case "serverHeartbeatStartedEvent":
diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java
index 328c8298b6..06d72002fe 100644
--- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java
+++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java
@@ -439,15 +439,8 @@ public static void applyCustomizations(final TestDef def) {
.file("server-discovery-and-monitoring", "pool-clear-on-error-checkout");
def.skipJira("https://jira.mongodb.org/browse/JAVA-5664")
.file("server-discovery-and-monitoring", "pool-cleared-on-min-pool-size-population-error");
- def.skipJira("https://jira.mongodb.org/browse/JAVA-5949")
- .file("server-discovery-and-monitoring", "backpressure-network-error-fail-single");
- def.skipJira("https://jira.mongodb.org/browse/JAVA-5949")
- .file("server-discovery-and-monitoring", "backpressure-network-timeout-error-single");
- def.skipJira("https://jira.mongodb.org/browse/JAVA-5949")
- .file("server-discovery-and-monitoring", "backpressure-network-error-fail-replicaset");
- def.skipJira("https://jira.mongodb.org/browse/JAVA-5949")
- .file("server-discovery-and-monitoring", "backpressure-network-timeout-error-replicaset");
- def.skipJira("https://jira.mongodb.org/browse/JAVA-5949")
+ // TODO-BACKPRESSURE Nabil - This issue is unrelated to backpressure but consider fixing it before merging to main
+ def.skipJira("https://jira.mongodb.org/browse/JAVA-6174")
.file("server-discovery-and-monitoring", "backpressure-server-description-unchanged-on-min-pool-size-population-error");
// session tests