diff --git a/driver-core/src/main/com/mongodb/internal/connection/BackpressureErrorLabeler.java b/driver-core/src/main/com/mongodb/internal/connection/BackpressureErrorLabeler.java new file mode 100644 index 0000000000..aae7cb05dd --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/connection/BackpressureErrorLabeler.java @@ -0,0 +1,108 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.connection; + +import com.mongodb.MongoException; +import com.mongodb.MongoSocketException; + +import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLProtocolException; +import java.net.UnknownHostException; +import java.security.cert.CertPathBuilderException; +import java.security.cert.CertPathValidatorException; +import java.security.cert.CertificateException; +import java.util.Locale; + +/** + * Attaches {@link MongoException#SYSTEM_OVERLOADED_ERROR_LABEL} and + * {@link MongoException#RETRYABLE_ERROR_LABEL} to network errors encountered during connection + * establishment or the hello message, per the CMAP specification. + * + *

This is topology-agnostic: it must be invoked from the connection-establishment path so that + * both default SDAM and load-balanced modes are covered. + */ +final class BackpressureErrorLabeler { + + private BackpressureErrorLabeler() { + } + + static void applyLabelsIfEligible(final Throwable t) { + if (!(t instanceof MongoSocketException)) { + return; + } + MongoSocketException socketException = (MongoSocketException) t; + if (isDnsLookupFailure(socketException)) { + return; + } + if (isTlsConfigurationError(socketException)) { + return; + } + // TODO-BACKPRESSURE Nabil - SOCKS5 Revisit alongside JAVA-5205 (SOCKS5 in async) so both sync and + // async proxy error surfaces can be handled together — likely via a dedicated internal + // exception thrown from the proxy code path. + socketException.addLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL); + socketException.addLabel(MongoException.RETRYABLE_ERROR_LABEL); + } + + private static boolean isDnsLookupFailure(final MongoSocketException t) { + Throwable cause = t.getCause(); + while (cause != null) { + if (cause instanceof UnknownHostException) { + return true; + } + cause = cause.getCause(); + } + return false; + } + + private static boolean isTlsConfigurationError(final MongoSocketException t) { + Throwable cause = t.getCause(); + while (cause != null) { + if (cause instanceof CertificateException + || cause instanceof CertPathBuilderException + || cause instanceof CertPathValidatorException + || cause instanceof SSLPeerUnverifiedException + || cause instanceof SSLProtocolException) { + return true; + } + if (cause instanceof SSLHandshakeException) { + String message = cause.getMessage(); + if (message != null) { + String lowerMessage = message.toLowerCase(Locale.ROOT); + if (lowerMessage.contains("certificate") + || lowerMessage.contains("verify") + || lowerMessage.contains("trust") + || lowerMessage.contains("hostname") + || lowerMessage.contains("protocol") + || lowerMessage.contains("cipher") + // PKIX path building/validation failures surface as SSLHandshakeException + // when the underlying CertPath* cause is not in the chain. + || lowerMessage.contains("pkix") + // Any "Received fatal alert: X" from OpenJDK's JSSE provider means the + // server actively answered with a TLS protocol error — not an overload + // signal. Catches all 25 RFC handshake alert descriptions in one rule. + || lowerMessage.contains("received fatal alert")) { + return true; + } + } + } + cause = cause.getCause(); + } + return false; + } +} diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java index af4acd8c03..371e709d80 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java @@ -137,6 +137,9 @@ private void handleException(final SdamIssue sdamIssue, final boolean beforeHand serverMonitor.connect(); } else if (sdamIssue.relatedToNetworkNotTimeout() || (beforeHandshake && (sdamIssue.relatedToNetworkTimeout() || sdamIssue.relatedToAuth()))) { + if (sdamIssue.hasSystemOverloadedLabel()) { + return; + } updateDescription(sdamIssue.serverDescription()); connectionPool.invalidate(sdamIssue.exception().orElse(null)); serverMonitor.cancelCurrentCheck(); diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 6b20c46719..09e28d7c84 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -230,9 +230,11 @@ public void open(final OperationContext originalOperationContext) { isTrue("Open already called", stream == null); stream = streamFactory.create(serverId.getAddress()); OperationContext operationContext = originalOperationContext; + boolean beforeHandshake = true; try { stream.open(operationContext); InternalConnectionInitializationDescription initializationDescription = connectionInitializer.startHandshake(this, operationContext); + beforeHandshake = false; operationContext = operationContext.withOverride(TimeoutContext::withNewlyStartedMaintenanceTimeout); initAfterHandshakeStart(initializationDescription); @@ -241,6 +243,9 @@ public void open(final OperationContext originalOperationContext) { initAfterHandshakeFinish(initializationDescription); } catch (Throwable t) { close(); + if (beforeHandshake) { + BackpressureErrorLabeler.applyLabelsIfEligible(t); + } if (t instanceof MongoException) { throw (MongoException) t; } else { @@ -263,6 +268,7 @@ public void completed(@Nullable final Void aVoid) { (initialResult, initialException) -> { if (initialException != null) { close(); + BackpressureErrorLabeler.applyLabelsIfEligible(initialException); callback.onResult(null, initialException); } else { assertNotNull(initialResult); @@ -278,11 +284,13 @@ public void completed(@Nullable final Void aVoid) { @Override public void failed(final Throwable t) { close(); + BackpressureErrorLabeler.applyLabelsIfEligible(t); callback.onResult(null, t); } }); } catch (Throwable t) { close(); + BackpressureErrorLabeler.applyLabelsIfEligible(t); callback.onResult(null, t); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java b/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java index 7f014d7ede..4b989193c0 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SdamServerDescriptionManager.java @@ -17,6 +17,7 @@ package com.mongodb.internal.connection; import com.mongodb.MongoCommandException; +import com.mongodb.MongoException; import com.mongodb.MongoNodeIsRecoveringException; import com.mongodb.MongoNotPrimaryException; import com.mongodb.MongoSecurityException; @@ -162,6 +163,11 @@ boolean relatedToWriteConcern() { return exception instanceof MongoWriteConcernWithResponseException; } + boolean hasSystemOverloadedLabel() { + return exception instanceof MongoException + && ((MongoException) exception).hasErrorLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL); + } + private static boolean stale(@Nullable final Throwable t, final ServerDescription currentServerDescription) { return TopologyVersionHelper.topologyVersion(t) .map(candidateTopologyVersion -> TopologyVersionHelper.newerOrEqual( diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java index d1b1f27a90..52287a23d1 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java @@ -113,6 +113,7 @@ protected void applyApplicationError(final BsonDocument applicationError) { switch (when) { case "beforeHandshakeCompletes": + BackpressureErrorLabeler.applyLabelsIfEligible(exception); server.sdamServerDescriptionManager().handleExceptionBeforeHandshake( SdamIssue.of(exception, new SdamIssue.Context(server.serverId(), errorGeneration, maxWireVersion))); break; diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/BackpressureErrorLabelerTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/BackpressureErrorLabelerTest.java new file mode 100644 index 0000000000..b07a4240cd --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/BackpressureErrorLabelerTest.java @@ -0,0 +1,183 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.connection; + +import com.mongodb.MongoCredential; +import com.mongodb.MongoException; +import com.mongodb.MongoSecurityException; +import com.mongodb.MongoSocketException; +import com.mongodb.MongoSocketOpenException; +import com.mongodb.MongoSocketReadTimeoutException; +import com.mongodb.ServerAddress; +import org.junit.jupiter.api.Named; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLProtocolException; +import java.io.EOFException; +import java.io.IOException; +import java.net.UnknownHostException; +import java.security.cert.CertPathBuilderException; +import java.security.cert.CertPathValidatorException; +import java.security.cert.CertificateException; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class BackpressureErrorLabelerTest { + + private static final ServerAddress ADDRESS = new ServerAddress(); + + static Stream> networkErrorShouldBeLabeled() { + return Stream.of( + named(new MongoSocketException("boom", ADDRESS)), + named(new MongoSocketReadTimeoutException("slow", ADDRESS, new IOException("read timed out"))), + named(new MongoSocketOpenException("open failed", ADDRESS, new IOException("connection refused"))), + // FIN-during-handshake: server closed the TCP connection while the client was mid-handshake + // (no protocol-level alert). I/O failure → must be labeled per CMAP "I/O error during TLS handshake". + named(new MongoSocketException("tls", ADDRESS, initCause( + new SSLHandshakeException("Remote host terminated the handshake"), + new EOFException("SSL peer shut down incorrectly")))) + ); + } + + @ParameterizedTest + @MethodSource + void networkErrorShouldBeLabeled(final MongoSocketException e) { + BackpressureErrorLabeler.applyLabelsIfEligible(e); + assertHasBackpressureLabels(e); + } + + static Stream> dnsFailureShouldNotBeLabeled() { + return Stream.of( + named(new MongoSocketException("lookup failed", ADDRESS, new UnknownHostException("nope"))), + named(new MongoSocketException("wrap", ADDRESS, new IOException("wrap", new UnknownHostException("nope")))) + ); + } + + @ParameterizedTest + @MethodSource + void dnsFailureShouldNotBeLabeled(final MongoSocketException e) { + BackpressureErrorLabeler.applyLabelsIfEligible(e); + assertLacksBackpressureLabels(e); + } + + static Stream> localTlsConfigErrorShouldNotBeLabeled() { + return Stream.of( + named(new CertificateException("bad cert")), + named(new CertPathBuilderException("path build failed")), + named(new CertPathValidatorException("validation failed")), + named(new SSLPeerUnverifiedException("peer not verified")), + named(new SSLProtocolException("protocol error")), + named(new SSLHandshakeException("PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: " + + "unable to find valid certification path to requested target")) + ); + } + + @ParameterizedTest + @MethodSource + void localTlsConfigErrorShouldNotBeLabeled(final Throwable cause) { + MongoSocketException e = new MongoSocketException("tls", ADDRESS, cause); + BackpressureErrorLabeler.applyLabelsIfEligible(e); + assertLacksBackpressureLabels(e); + } + + /** + * "Received fatal alert: " means the peer actively answered with a TLS protocol + * error — definitively a config/protocol issue, not an overload signal. Covers all 25 + * handshake-only RFC alert descriptions emitted by OpenJDK's JSSE provider. + */ + @ParameterizedTest(name = "Received fatal alert: {0}") + @ValueSource(strings = { + "handshake_failure", + "no_certificate", + "bad_certificate", + "unsupported_certificate", + "certificate_revoked", + "certificate_expired", + "certificate_unknown", + "illegal_parameter", + "unknown_ca", + "access_denied", + "decode_error", + "decrypt_error", + "export_restriction", + "protocol_version", + "insufficient_security", + "no_renegotiation", + "missing_extension", + "unsupported_extension", + "certificate_unobtainable", + "unrecognized_name", + "bad_certificate_status_response", + "bad_certificate_hash_value", + "unknown_psk_identity", + "certificate_required", + "no_application_protocol" + }) + void receivedTlsAlertShouldNotBeLabeled(final String alertDescription) { + SSLHandshakeException tls = new SSLHandshakeException("Received fatal alert: " + alertDescription); + MongoSocketException e = new MongoSocketException("tls", ADDRESS, tls); + BackpressureErrorLabeler.applyLabelsIfEligible(e); + assertLacksBackpressureLabels(e); + } + + static Stream> nonSocketErrorShouldNotBeLabeled() { + return Stream.of( + named(new MongoSecurityException( + MongoCredential.createCredential("user", "db", "pwd".toCharArray()), "auth failed")), + named(new MongoException(42, "some command error")), + named(new IOException("raw")) + ); + } + + @ParameterizedTest + @MethodSource + void nonSocketErrorShouldNotBeLabeled(final Throwable e) { + BackpressureErrorLabeler.applyLabelsIfEligible(e); + if (e instanceof MongoException) { + assertLacksBackpressureLabels((MongoException) e); + } + } + + private static Named named(final T e) { + return Named.of(e.getClass().getSimpleName(), e); + } + + private static T initCause(final T exception, final Throwable cause) { + exception.initCause(cause); + return exception; + } + + private static void assertHasBackpressureLabels(final MongoException e) { + assertTrue(e.hasErrorLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL), + "expected SystemOverloadedError label"); + assertTrue(e.hasErrorLabel(MongoException.RETRYABLE_ERROR_LABEL), + "expected RetryableError label"); + } + + private static void assertLacksBackpressureLabels(final MongoException e) { + assertFalse(e.hasErrorLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL), + "unexpected SystemOverloadedError label"); + assertFalse(e.hasErrorLabel(MongoException.RETRYABLE_ERROR_LABEL), + "unexpected RetryableError label"); + } +} diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy index 230301e903..e2a6043226 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy @@ -51,6 +51,7 @@ import org.bson.BsonInt32 import org.bson.codecs.BsonDocumentCodec import spock.lang.Specification +import java.security.cert.CertificateException import java.util.concurrent.CountDownLatch import static com.mongodb.ClusterFixture.CLIENT_METADATA @@ -258,6 +259,56 @@ class DefaultServerSpecification extends Specification { ] } + def 'should invalidate the pool when the exception without system overloaded label'() { + given: + assert !exceptionToThrow.hasErrorLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL) + def connectionPool = Mock(ConnectionPool) + connectionPool.get(_) >> { throw exceptionToThrow } + def serverMonitor = Mock(ServerMonitor) + def server = defaultServer(connectionPool, serverMonitor) + + when: + server.getConnection(createOperationContext()) + + then: + def e = thrown(MongoException) + e.is(exceptionToThrow) + 1 * connectionPool.invalidate(exceptionToThrow) + 1 * serverMonitor.cancelCurrentCheck() + + where: + exceptionToThrow << [ + new MongoSocketException('establishment failed', new ServerAddress()), + new MongoSocketOpenException('open failed', new ServerAddress(), new IOException()), + new MongoSocketReadTimeoutException('Read timed out', new ServerAddress(), new IOException()), + new MongoSocketException('DNS lookup failed', new ServerAddress(), + new UnknownHostException('no such host')), + new MongoSocketException('TLS config error', new ServerAddress(), + new CertificateException('bad cert')), + ] + } + + def 'should not invalidate the pool when the exception carries SystemOverloadedError'() { + given: + def exceptionToThrow = new MongoSocketException('rate-limited establishment', new ServerAddress()) + exceptionToThrow.addLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL) + + def connectionPool = Mock(ConnectionPool) + connectionPool.get(_) >> { throw exceptionToThrow } + def serverMonitor = Mock(ServerMonitor) + def server = defaultServer(connectionPool, serverMonitor) + + when: + server.getConnection(createOperationContext()) + + then: + def e = thrown(MongoException) + e.is(exceptionToThrow) + e.hasErrorLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL) + 0 * connectionPool.invalidate(_) + 0 * serverMonitor.cancelCurrentCheck() + } + def 'failed authentication should invalidate the connection pool'() { given: def connectionPool = Mock(ConnectionPool) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java index 095372a6ce..0406a8a9bb 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java @@ -54,9 +54,6 @@ public class ServerDiscoveryAndMonitoringTest extends AbstractServerDiscoveryAnd public ServerDiscoveryAndMonitoringTest(final String description, final BsonDocument definition) { super(definition); - assumeFalse("https://jira.mongodb.org/browse/JAVA-5949", - description.equals("error_handling_handshake.json: Network timeouts before and after the handshake completes")); - this.description = description; init(serverAddress -> NO_OP_SERVER_LISTENER, NO_OP_CLUSTER_LISTENER); } diff --git a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java index 18b3b3f4fc..8e20fdf211 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java @@ -18,6 +18,7 @@ import com.mongodb.ClusterFixture; import com.mongodb.MongoClientSettings; +import com.mongodb.event.ConnectionCheckOutFailedEvent; import com.mongodb.event.ConnectionPoolClearedEvent; import com.mongodb.event.ConnectionPoolListener; import com.mongodb.event.ConnectionPoolReadyEvent; @@ -26,6 +27,7 @@ import com.mongodb.event.ServerHeartbeatSucceededEvent; import com.mongodb.event.ServerListener; import com.mongodb.event.ServerMonitorListener; +import com.mongodb.internal.connection.TestConnectionPoolListener; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; import com.mongodb.internal.time.TimePointTest; @@ -47,6 +49,8 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import static com.mongodb.ClusterFixture.configureFailPoint; @@ -268,6 +272,72 @@ public void shouldEmitHeartbeatStartedBeforeSocketIsConnected() { // As it requires mocking and package access to `com.mongodb.internal.connection` } + /** + * See + * Connection Pool Backpressure. + */ + @Test + public void testConnectionPoolBackpressure() throws InterruptedException { + assumeTrue(serverVersionAtLeast(7, 0)); + + TestConnectionPoolListener connectionPoolListener = new TestConnectionPoolListener(); + + MongoClientSettings clientSettings = getMongoClientSettingsBuilder() + .applyToConnectionPoolSettings(builder -> builder + .maxConnecting(100) + .addConnectionPoolListener(connectionPoolListener)) + .build(); + + try (MongoClient adminClient = MongoClients.create(getMongoClientSettingsBuilder().build()); + MongoClient client = MongoClients.create(clientSettings)) { + + MongoDatabase adminDatabase = adminClient.getDatabase("admin"); + MongoDatabase database = client.getDatabase(getDefaultDatabaseName()); + MongoCollection collection = database.getCollection("testCollection"); + + try { + adminDatabase.runCommand(new Document("setParameter", 1) + .append("ingressConnectionEstablishmentRateLimiterEnabled", true)); + adminDatabase.runCommand(new Document("setParameter", 1) + .append("ingressConnectionEstablishmentRatePerSec", 20)); + adminDatabase.runCommand(new Document("setParameter", 1) + .append("ingressConnectionEstablishmentBurstCapacitySecs", 1)); + adminDatabase.runCommand(new Document("setParameter", 1) + .append("ingressConnectionEstablishmentMaxQueueDepth", 1)); + + collection.insertOne(Document.parse("{}")); + + ExecutorService executor = Executors.newFixedThreadPool(100); + try { + for (int i = 0; i < 100; i++) { + executor.submit(() -> + collection.find(new Document("$where", "function() { sleep(2000); return true; }")).first()); + } + executor.shutdown(); + assertTrue("Executor did not terminate within timeout", + executor.awaitTermination(20, SECONDS)); + } finally { + if (!executor.isTerminated()) { + executor.shutdownNow(); + } + } + + int failedCheckOutCount = connectionPoolListener.countEvents(ConnectionCheckOutFailedEvent.class); + assertTrue("Expected at least 10 ConnectionCheckOutFailedEvents, but got " + failedCheckOutCount, + failedCheckOutCount >= 10); + assertEquals(0, connectionPoolListener.countEvents(ConnectionPoolClearedEvent.class)); + } finally { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + adminDatabase.runCommand(new Document("setParameter", 1) + .append("ingressConnectionEstablishmentRateLimiterEnabled", false)); + } + } + } + private static void assertPoll(final BlockingQueue queue, @Nullable final Class allowed, final Set> required) throws InterruptedException { assertPoll(queue, allowed, required, Timeout.expiresIn(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS, ZERO_DURATION_MEANS_EXPIRED)); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java index b2718b4b2d..1bfe2e19a6 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java @@ -26,6 +26,7 @@ import com.mongodb.event.CommandStartedEvent; import com.mongodb.event.CommandSucceededEvent; import com.mongodb.event.ConnectionCheckOutFailedEvent; +import com.mongodb.event.ConnectionCheckedInEvent; import com.mongodb.event.ConnectionClosedEvent; import com.mongodb.event.ConnectionCreatedEvent; import com.mongodb.event.ConnectionPoolClearedEvent; @@ -208,6 +209,12 @@ public void waitForConnectionPoolEvents(final String client, final BsonDocument case "connectionReadyEvent": eventClass = ConnectionReadyEvent.class; break; + case "connectionClosedEvent": + eventClass = ConnectionClosedEvent.class; + break; + case "connectionCheckedInEvent": + eventClass = ConnectionCheckedInEvent.class; + break; default: throw new UnsupportedOperationException("Unsupported event: " + event.getFirstKey()); } @@ -436,11 +443,18 @@ private static boolean serverDescriptionChangedEventMatches(final BsonDocument e switch (newType) { case "Unknown": return event.getNewDescription().getType() == ServerType.UNKNOWN; - case "LoadBalancer": { + case "LoadBalancer": return event.getNewDescription().getType() == ServerType.LOAD_BALANCER; - } + case "Mongos": + return event.getNewDescription().getType() == ServerType.SHARD_ROUTER; + case "Standalone": + return event.getNewDescription().getType() == ServerType.STANDALONE; + case "RSPrimary": + return event.getNewDescription().getType() == ServerType.REPLICA_SET_PRIMARY; + case "RSSecondary": + return event.getNewDescription().getType() == ServerType.REPLICA_SET_SECONDARY; default: - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Unsupported server type " + newType); } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index cf003078f0..a6f14688b4 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -800,6 +800,8 @@ private OperationResult executeWaitForEvent(final UnifiedTestContext context, fi case "poolReadyEvent": case "connectionCreatedEvent": case "connectionReadyEvent": + case "connectionClosedEvent": + case "connectionCheckedInEvent": context.getEventMatcher().waitForConnectionPoolEvents(clientId, event, count, entities.getConnectionPoolListener(clientId)); break; case "serverHeartbeatStartedEvent": diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index 328c8298b6..06d72002fe 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -439,15 +439,8 @@ public static void applyCustomizations(final TestDef def) { .file("server-discovery-and-monitoring", "pool-clear-on-error-checkout"); def.skipJira("https://jira.mongodb.org/browse/JAVA-5664") .file("server-discovery-and-monitoring", "pool-cleared-on-min-pool-size-population-error"); - def.skipJira("https://jira.mongodb.org/browse/JAVA-5949") - .file("server-discovery-and-monitoring", "backpressure-network-error-fail-single"); - def.skipJira("https://jira.mongodb.org/browse/JAVA-5949") - .file("server-discovery-and-monitoring", "backpressure-network-timeout-error-single"); - def.skipJira("https://jira.mongodb.org/browse/JAVA-5949") - .file("server-discovery-and-monitoring", "backpressure-network-error-fail-replicaset"); - def.skipJira("https://jira.mongodb.org/browse/JAVA-5949") - .file("server-discovery-and-monitoring", "backpressure-network-timeout-error-replicaset"); - def.skipJira("https://jira.mongodb.org/browse/JAVA-5949") + // TODO-BACKPRESSURE Nabil - This issue is unrelated to backpressure but consider fixing it before merging to main + def.skipJira("https://jira.mongodb.org/browse/JAVA-6174") .file("server-discovery-and-monitoring", "backpressure-server-description-unchanged-on-min-pool-size-population-error"); // session tests