diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java
new file mode 100644
index 0000000000..0a48079bf5
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.operation;
+
+import com.mongodb.MongoNamespace;
+import com.mongodb.internal.async.AsyncBatchCursor;
+import com.mongodb.internal.async.SingleResultCallback;
+import com.mongodb.internal.binding.AsyncReadWriteBinding;
+import com.mongodb.internal.connection.OperationContext;
+
+/**
+ * An async-only operation that performs a write followed by a read that returns a cursor.
+ *
+ *
Unlike {@link ReadOperationCursor}, this operation requires an {@link AsyncReadWriteBinding}
+ * so that both the write and the read portions can be executed without narrowing casts.
+ *
+ *
This class is not part of the public API and may be removed or changed at any time
+ */
+public interface AsyncWriteThenReadOperationCursor {
+
+ /**
+ * @return the command name of the write phase of this operation (e.g. "mapReduce", "aggregate")
+ */
+ String getCommandName();
+
+ /**
+ * @return the namespace the write phase targets
+ */
+ MongoNamespace getNamespace();
+
+ /**
+ * Executes the write phase followed by the read phase, yielding an {@link AsyncBatchCursor}
+ * over the results of the read.
+ *
+ * @param binding the read-write binding used by both phases
+ * @param operationContext the operation context to use
+ * @param callback receives the {@link AsyncBatchCursor} on success, or the failure of either phase
+ */
+ void executeAsync(AsyncReadWriteBinding binding, OperationContext operationContext,
+ SingleResultCallback> callback);
+}
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java
index 511f9f62c6..87b9078115 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java
@@ -27,6 +27,7 @@
import com.mongodb.internal.observability.micrometer.TracingManager;
import com.mongodb.internal.observability.micrometer.TransactionSpan;
import com.mongodb.internal.operation.AbortTransactionOperation;
+import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.CommitTransactionOperation;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteConcernHelper;
@@ -88,7 +89,7 @@ public boolean notifyMessageSent() {
@Override
public void notifyOperationInitiated(final Object operation) {
- assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation);
+ assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation || operation instanceof AsyncWriteThenReadOperationCursor);
if (!(hasActiveTransaction() || operation instanceof CommitTransactionOperation)) {
assertTrue(getPinnedServerAddress() == null
|| (transactionState != TransactionState.ABORTED && transactionState != TransactionState.NONE));
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java
index 1e8e7fa223..86a2239f75 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java
@@ -40,6 +40,7 @@
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;
+import reactor.core.publisher.Mono;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -201,10 +202,46 @@ public ReadOperationCursor asReadOperation(final int initialBatchSize) {
if (inline) {
// initialBatchSize is ignored for map reduce operations.
return createMapReduceInlineOperation();
- } else {
- return new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(),
- createFindOperation(initialBatchSize));
}
+ throw new IllegalStateException("Non-inline map-reduce uses the write-then-read path; "
+ + "asReadOperation must not be called.");
+ }
+
+ @Override
+ public Mono> batchCursor(final int initialBatchSize) {
+ if (inline) {
+ return super.batchCursor(initialBatchSize);
+ }
+ return writeThenReadBatchCursor(initialBatchSize);
+ }
+
+ @Override
+ public Publisher first() {
+ if (inline) {
+ return super.first();
+ }
+ return writeThenReadBatchCursor(1)
+ .flatMap(batchCursor -> {
+ batchCursor.setBatchSize(1);
+ return Mono.from(batchCursor.next())
+ .doOnTerminate(batchCursor::close)
+ .flatMap(results -> {
+ if (results == null || results.isEmpty()) {
+ return Mono.empty();
+ }
+ return Mono.fromCallable(() -> results.get(0));
+ });
+ });
+ }
+
+ private Mono> writeThenReadBatchCursor(final int initialBatchSize) {
+ return getMongoOperationPublisher()
+ .createWriteThenReadOperationMono(
+ operations -> operations.createTimeoutSettings(maxTimeMS),
+ () -> new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(),
+ createFindOperation(initialBatchSize)),
+ getClientSession())
+ .map(BatchCursor::new);
}
private WrappedMapReduceReadOperation createMapReduceInlineOperation() {
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java
index 84c810f1b5..8d561859af 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java
@@ -60,6 +60,8 @@
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.bulk.WriteRequest;
+import com.mongodb.internal.async.AsyncBatchCursor;
+import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.IndexHelper;
import com.mongodb.internal.operation.Operations;
import com.mongodb.internal.operation.ReadOperation;
@@ -513,6 +515,15 @@ Mono createReadOperationMono(final Supplier timeoutSetti
.execute(readOperation, readPreference, getReadConcern(), clientSession);
}
+ Mono> createWriteThenReadOperationMono(
+ final Function, TimeoutSettings> timeoutSettingsFunction,
+ final Supplier> operationSupplier,
+ @Nullable final ClientSession clientSession) {
+ AsyncWriteThenReadOperationCursor operation = operationSupplier.get();
+ return getExecutor(timeoutSettingsFunction.apply(operations))
+ .execute(operation, getReadConcern(), clientSession);
+ }
+
Mono createWriteOperationMono(final Function, TimeoutSettings> timeoutSettingsFunction,
final Supplier> operationSupplier, @Nullable final ClientSession clientSession) {
return createWriteOperationMono(() -> timeoutSettingsFunction.apply(operations), operationSupplier, clientSession);
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java
index cd666720f3..7c61ecf547 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java
@@ -19,6 +19,8 @@
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.internal.TimeoutSettings;
+import com.mongodb.internal.async.AsyncBatchCursor;
+import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.lang.Nullable;
@@ -54,6 +56,20 @@ Mono execute(ReadOperation, T> operation, ReadPreference readPreference
*/
Mono execute(WriteOperation operation, ReadConcern readConcern, @Nullable ClientSession session);
+ /**
+ * Execute an operation that writes and then reads a cursor within a single read-write binding.
+ *
+ *
The binding is acquired once and used for both phases, avoiding the need to narrow an
+ * {@code AsyncReadBinding} to an {@code AsyncWriteBinding}.
+ *
+ * @param operation the write-then-read operation.
+ * @param readConcern the read concern
+ * @param session the session to associate this operation with
+ * @param the document type returned by the cursor.
+ */
+ Mono> execute(AsyncWriteThenReadOperationCursor operation,
+ ReadConcern readConcern, @Nullable ClientSession session);
+
/**
* Create a new OperationExecutor with a specific timeout settings
*
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java
index 62a4431cc9..9a7b0e2c59 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java
@@ -33,6 +33,8 @@
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
import com.mongodb.internal.observability.micrometer.Span;
import com.mongodb.internal.observability.micrometer.TracingManager;
+import com.mongodb.internal.async.AsyncBatchCursor;
+import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.OperationHelper;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
@@ -179,6 +181,51 @@ public Mono execute(final WriteOperation operation, final ReadConcern
);
}
+ @Override
+ public Mono> execute(final AsyncWriteThenReadOperationCursor operation, final ReadConcern readConcern,
+ @Nullable final ClientSession session) {
+ isTrue("open", !mongoClient.getCluster().isClosed());
+ notNull("operation", operation);
+ notNull("readConcern", readConcern);
+
+ if (session != null) {
+ session.notifyOperationInitiated(operation);
+ }
+
+ return Mono.from(subscriber ->
+ clientSessionHelper.withClientSession(session, this)
+ .flatMap(actualClientSession -> {
+ AsyncReadWriteBinding binding = getReadWriteBinding(primary(), actualClientSession, session == null);
+ RequestContext requestContext = getContext(subscriber);
+ OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName())
+ .withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession,
+ isImplicitSession(session), readConcern));
+ Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(),
+ operationContext, operation.getCommandName(), operation.getNamespace());
+
+ return Mono.>create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> {
+ try {
+ binding.release();
+ } finally {
+ if (t != null) {
+ Throwable exceptionToHandle = t instanceof MongoException
+ ? OperationHelper.unwrap((MongoException) t) : t;
+ labelException(session, exceptionToHandle);
+ unpinServerAddressOnTransientTransactionError(session, exceptionToHandle);
+ if (span != null) {
+ span.error(t);
+ }
+ }
+ if (span != null) {
+ span.end();
+ }
+ sinkToCallback(sink).onResult(result, t);
+ }
+ }));
+ }).subscribe(subscriber)
+ );
+ }
+
@Override
public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSettings) {
if (Objects.equals(timeoutSettings, newTimeoutSettings)) {
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java
index a7d1191c8b..fb253dafa8 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java
@@ -19,13 +19,13 @@
import com.mongodb.MongoNamespace;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
-import com.mongodb.internal.binding.AsyncReadBinding;
-import com.mongodb.internal.binding.AsyncWriteBinding;
+import com.mongodb.internal.binding.AsyncReadWriteBinding;
import com.mongodb.internal.connection.OperationContext;
+import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.ReadOperationCursor;
import com.mongodb.internal.operation.WriteOperation;
-class VoidWriteOperationThenCursorReadOperation implements ReadOperationCursorAsyncOnly {
+class VoidWriteOperationThenCursorReadOperation implements AsyncWriteThenReadOperationCursor {
private final WriteOperation writeOperation;
private final ReadOperationCursor cursorReadOperation;
@@ -46,8 +46,9 @@ public MongoNamespace getNamespace() {
}
@Override
- public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext, final SingleResultCallback> callback) {
- writeOperation.executeAsync((AsyncWriteBinding) binding, operationContext, (result, t) -> {
+ public void executeAsync(final AsyncReadWriteBinding binding, final OperationContext operationContext,
+ final SingleResultCallback> callback) {
+ writeOperation.executeAsync(binding, operationContext, (result, t) -> {
if (t != null) {
callback.onResult(null, t);
} else {
diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java
index c112395a81..36c8ac9370 100644
--- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java
+++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java
@@ -20,9 +20,13 @@
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Sorts;
+import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.MapReduceStatistics;
import com.mongodb.internal.operation.MapReduceToCollectionOperation;
import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation;
+import com.mongodb.reactivestreams.client.MapReducePublisher;
+
+
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonJavaScript;
@@ -39,6 +43,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@SuppressWarnings({"rawtypes", "deprecation"})
@@ -48,6 +53,44 @@ public class MapReducePublisherImplTest extends TestHelper {
private static final String REDUCE_FUNCTION = "reduceFunction(){}";
private static final String FINALIZE_FUNCTION = "finalizeFunction(){}";
+ @DisplayName("Non-inline MapReduce routes through the write-then-read executor path")
+ @Test
+ void shouldRouteNonInlineMapReduceThroughWriteThenReadPath() {
+ configureBatchCursor();
+ TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor()));
+
+ MapReducePublisher publisher =
+ new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor),
+ MAP_FUNCTION, REDUCE_FUNCTION)
+ .collectionName(NAMESPACE.getCollectionName());
+
+ Flux.from(publisher).blockFirst();
+
+ AsyncWriteThenReadOperationCursor> op = executor.getWriteThenReadOperation();
+ assertNotNull(op, "expected a write-then-read operation");
+ assertEquals(NAMESPACE, op.getNamespace());
+ // Must not fall through to the plain read-operation path.
+ assertNull(executor.getReadOperation());
+ }
+
+
+ @DisplayName("Inline MapReduce still routes through the read-operation path")
+ @Test
+ void shouldRouteInlineMapReduceThroughReadOperationPath() {
+ configureBatchCursor();
+ TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor()));
+
+ MapReducePublisher publisher =
+ new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor),
+ MAP_FUNCTION, REDUCE_FUNCTION); // no collectionName -> inline
+
+ Flux.from(publisher).blockFirst();
+
+ assertNotNull(executor.getReadOperation());
+ assertNull(executor.getWriteThenReadOperation());
+ }
+
+
@DisplayName("Should build the expected MapReduceWithInlineResultsOperation")
@Test
void shouldBuildTheExpectedMapReduceWithInlineResultsOperation() {
diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java
index 450536df2b..685b6361d0 100644
--- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java
+++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java
@@ -26,6 +26,7 @@
import com.mongodb.internal.bulk.IndexRequest;
import com.mongodb.internal.bulk.WriteRequest;
import com.mongodb.internal.client.model.FindOptions;
+import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.lang.NonNull;
@@ -93,7 +94,10 @@ public class TestHelper {
Mockito.lenient().doAnswer(invocation -> Mono.empty())
.when(executor)
- .execute(any(), any(), any());
+ .execute(any(WriteOperation.class), any(), any());
+ Mockito.lenient().doAnswer(invocation -> Mono.empty())
+ .when(executor)
+ .execute(any(AsyncWriteThenReadOperationCursor.class), any(), any());
Mockito.lenient().doAnswer(invocation -> Mono.empty())
.when(executor)
.execute(any(), any(), any(), any());
diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java
index 831d22b308..731eb25137 100644
--- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java
+++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java
@@ -19,6 +19,8 @@
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.internal.TimeoutSettings;
+import com.mongodb.internal.async.AsyncBatchCursor;
+import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor;
import com.mongodb.internal.operation.ReadOperation;
import com.mongodb.internal.operation.WriteOperation;
import com.mongodb.lang.Nullable;
@@ -37,6 +39,7 @@ public class TestOperationExecutor implements OperationExecutor {
private final List readOperations = new ArrayList<>();
private final List writeOperations = new ArrayList<>();
+ private final List writeThenReadOperations = new ArrayList<>();
public TestOperationExecutor(final List