From c79bc7277a2f396dc2f074ce6fa55f9bb106e963 Mon Sep 17 00:00:00 2001 From: Almas Abdrazak Date: Tue, 28 Apr 2026 17:07:37 -0700 Subject: [PATCH 1/4] JAVA-2673 --- .../AsyncWriteThenReadOperationCursor.java | 54 +++++++++++++++++++ .../internal/MapReducePublisherImpl.java | 44 +++++++++++++-- .../internal/MongoOperationPublisher.java | 11 ++++ .../client/internal/OperationExecutor.java | 16 ++++++ .../internal/OperationExecutorImpl.java | 43 +++++++++++++++ ...WriteOperationThenCursorReadOperation.java | 11 ++-- .../internal/MapReducePublisherImplTest.java | 22 ++++++++ .../client/internal/TestHelper.java | 6 ++- .../internal/TestOperationExecutor.java | 16 ++++++ 9 files changed, 214 insertions(+), 9 deletions(-) create mode 100644 driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java new file mode 100644 index 00000000000..9c9505f42bd --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java @@ -0,0 +1,54 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncReadWriteBinding; +import com.mongodb.internal.connection.OperationContext; + +/** + * An async-only operation that performs a write followed by a read that returns a cursor. + * + *

Unlike {@link ReadOperationCursor}, this operation requires an {@link AsyncReadWriteBinding} + * so that both the write and the read portions can be executed without narrowing casts. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public interface AsyncWriteThenReadOperationCursor { + + /** + * @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc. + */ + String getCommandName(); + + /** + * @return the namespace of the operation + */ + MongoNamespace getNamespace(); + + /** + * General execute which can return anything of type T + * + * @param binding the binding to execute in the context of + * @param operationContext the operation context to use + * @param callback the callback to be called when the operation has been executed + */ + void executeAsync(AsyncReadWriteBinding binding, OperationContext operationContext, + SingleResultCallback> callback); +} diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java index 1e8e7fa223b..4ac7f503254 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java @@ -28,6 +28,7 @@ import com.mongodb.internal.binding.WriteBinding; import com.mongodb.internal.client.model.FindOptions; import com.mongodb.internal.connection.OperationContext; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.MapReduceAsyncBatchCursor; import com.mongodb.internal.operation.MapReduceBatchCursor; import com.mongodb.internal.operation.MapReduceStatistics; @@ -40,6 +41,7 @@ import org.bson.BsonDocument; import org.bson.conversions.Bson; import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -201,10 +203,46 @@ public ReadOperationCursor asReadOperation(final int initialBatchSize) { if (inline) { // initialBatchSize is ignored for map reduce operations. return createMapReduceInlineOperation(); - } else { - return new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(), - createFindOperation(initialBatchSize)); } + throw new IllegalStateException("Non-inline map-reduce uses the write-then-read path; " + + "asReadOperation must not be called."); + } + + @Override + public Mono> batchCursor(final int initialBatchSize) { + if (inline) { + return super.batchCursor(initialBatchSize); + } + return writeThenReadBatchCursor(initialBatchSize); + } + + @Override + public Publisher first() { + if (inline) { + return super.first(); + } + return writeThenReadBatchCursor(1) + .flatMap(batchCursor -> { + batchCursor.setBatchSize(1); + return Mono.from(batchCursor.next()) + .doOnTerminate(batchCursor::close) + .flatMap(results -> { + if (results == null || results.isEmpty()) { + return Mono.empty(); + } + return Mono.fromCallable(() -> results.get(0)); + }); + }); + } + + private Mono> writeThenReadBatchCursor(final int initialBatchSize) { + return getMongoOperationPublisher() + .createWriteThenReadOperationMono( + operations -> operations.createTimeoutSettings(maxTimeMS), + () -> new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(), + createFindOperation(initialBatchSize)), + getClientSession()) + .map(BatchCursor::new); } private WrappedMapReduceReadOperation createMapReduceInlineOperation() { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java index 84c810f1b5e..8d561859af4 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java @@ -60,6 +60,8 @@ import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.bulk.WriteRequest; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.IndexHelper; import com.mongodb.internal.operation.Operations; import com.mongodb.internal.operation.ReadOperation; @@ -513,6 +515,15 @@ Mono createReadOperationMono(final Supplier timeoutSetti .execute(readOperation, readPreference, getReadConcern(), clientSession); } + Mono> createWriteThenReadOperationMono( + final Function, TimeoutSettings> timeoutSettingsFunction, + final Supplier> operationSupplier, + @Nullable final ClientSession clientSession) { + AsyncWriteThenReadOperationCursor operation = operationSupplier.get(); + return getExecutor(timeoutSettingsFunction.apply(operations)) + .execute(operation, getReadConcern(), clientSession); + } + Mono createWriteOperationMono(final Function, TimeoutSettings> timeoutSettingsFunction, final Supplier> operationSupplier, @Nullable final ClientSession clientSession) { return createWriteOperationMono(() -> timeoutSettingsFunction.apply(operations), operationSupplier, clientSession); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java index cd666720f33..7c61ecf547e 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java @@ -19,6 +19,8 @@ import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.Nullable; @@ -54,6 +56,20 @@ Mono execute(ReadOperation operation, ReadPreference readPreference */ Mono execute(WriteOperation operation, ReadConcern readConcern, @Nullable ClientSession session); + /** + * Execute an operation that writes and then reads a cursor within a single read-write binding. + * + *

The binding is acquired once and used for both phases, avoiding the need to narrow an + * {@code AsyncReadBinding} to an {@code AsyncWriteBinding}. + * + * @param operation the write-then-read operation. + * @param readConcern the read concern + * @param session the session to associate this operation with + * @param the document type returned by the cursor. + */ + Mono> execute(AsyncWriteThenReadOperationCursor operation, + ReadConcern readConcern, @Nullable ClientSession session); + /** * Create a new OperationExecutor with a specific timeout settings * diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java index 62a4431cc9a..a29f15ad55b 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java @@ -33,6 +33,8 @@ import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext; import com.mongodb.internal.observability.micrometer.Span; import com.mongodb.internal.observability.micrometer.TracingManager; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.OperationHelper; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; @@ -179,6 +181,47 @@ public Mono execute(final WriteOperation operation, final ReadConcern ); } + @Override + public Mono> execute(final AsyncWriteThenReadOperationCursor operation, final ReadConcern readConcern, + @Nullable final ClientSession session) { + isTrue("open", !mongoClient.getCluster().isClosed()); + notNull("operation", operation); + notNull("readConcern", readConcern); + + return Mono.from(subscriber -> + clientSessionHelper.withClientSession(session, this) + .flatMap(actualClientSession -> { + AsyncReadWriteBinding binding = getReadWriteBinding(primary(), actualClientSession, session == null); + RequestContext requestContext = getContext(subscriber); + OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName()) + .withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession, + isImplicitSession(session), readConcern)); + Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(), + operationContext, operation.getCommandName(), operation.getNamespace()); + + return Mono.>create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> { + try { + binding.release(); + } finally { + if (t != null) { + Throwable exceptionToHandle = t instanceof MongoException + ? OperationHelper.unwrap((MongoException) t) : t; + labelException(session, exceptionToHandle); + unpinServerAddressOnTransientTransactionError(session, exceptionToHandle); + if (span != null) { + span.error(t); + } + } + if (span != null) { + span.end(); + } + sinkToCallback(sink).onResult(result, t); + } + })); + }).subscribe(subscriber) + ); + } + @Override public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSettings) { if (Objects.equals(timeoutSettings, newTimeoutSettings)) { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java index a7d1191c8bf..fb253dafa88 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java @@ -19,13 +19,13 @@ import com.mongodb.MongoNamespace; import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; -import com.mongodb.internal.binding.AsyncReadBinding; -import com.mongodb.internal.binding.AsyncWriteBinding; +import com.mongodb.internal.binding.AsyncReadWriteBinding; import com.mongodb.internal.connection.OperationContext; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperationCursor; import com.mongodb.internal.operation.WriteOperation; -class VoidWriteOperationThenCursorReadOperation implements ReadOperationCursorAsyncOnly { +class VoidWriteOperationThenCursorReadOperation implements AsyncWriteThenReadOperationCursor { private final WriteOperation writeOperation; private final ReadOperationCursor cursorReadOperation; @@ -46,8 +46,9 @@ public MongoNamespace getNamespace() { } @Override - public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext, final SingleResultCallback> callback) { - writeOperation.executeAsync((AsyncWriteBinding) binding, operationContext, (result, t) -> { + public void executeAsync(final AsyncReadWriteBinding binding, final OperationContext operationContext, + final SingleResultCallback> callback) { + writeOperation.executeAsync(binding, operationContext, (result, t) -> { if (t != null) { callback.onResult(null, t); } else { diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java index c112395a818..953e5e0fc13 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java @@ -23,6 +23,10 @@ import com.mongodb.internal.operation.MapReduceStatistics; import com.mongodb.internal.operation.MapReduceToCollectionOperation; import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation; + +import com.mongodb.reactivestreams.client.MapReducePublisher; + + import org.bson.BsonDocument; import org.bson.BsonInt32; import org.bson.BsonJavaScript; @@ -39,6 +43,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @SuppressWarnings({"rawtypes", "deprecation"}) @@ -48,6 +53,23 @@ public class MapReducePublisherImplTest extends TestHelper { private static final String REDUCE_FUNCTION = "reduceFunction(){}"; private static final String FINALIZE_FUNCTION = "finalizeFunction(){}"; + @DisplayName("Inline MapReduce still routes through the read-operation path") + @Test + void shouldRouteInlineMapReduceThroughReadOperationPath() { + configureBatchCursor(); + TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor())); + + MapReducePublisher publisher = + new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor), + MAP_FUNCTION, REDUCE_FUNCTION); // no collectionName -> inline + + Flux.from(publisher).blockFirst(); + + assertNotNull(executor.getReadOperation()); + assertNull(executor.getWriteThenReadOperation()); + } + + @DisplayName("Should build the expected MapReduceWithInlineResultsOperation") @Test void shouldBuildTheExpectedMapReduceWithInlineResultsOperation() { diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java index 450536df2b8..685b6361d04 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java @@ -26,6 +26,7 @@ import com.mongodb.internal.bulk.IndexRequest; import com.mongodb.internal.bulk.WriteRequest; import com.mongodb.internal.client.model.FindOptions; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.NonNull; @@ -93,7 +94,10 @@ public class TestHelper { Mockito.lenient().doAnswer(invocation -> Mono.empty()) .when(executor) - .execute(any(), any(), any()); + .execute(any(WriteOperation.class), any(), any()); + Mockito.lenient().doAnswer(invocation -> Mono.empty()) + .when(executor) + .execute(any(AsyncWriteThenReadOperationCursor.class), any(), any()); Mockito.lenient().doAnswer(invocation -> Mono.empty()) .when(executor) .execute(any(), any(), any(), any()); diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java index 831d22b3080..731eb251374 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java @@ -19,6 +19,8 @@ import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.Nullable; @@ -37,6 +39,7 @@ public class TestOperationExecutor implements OperationExecutor { private final List readOperations = new ArrayList<>(); private final List writeOperations = new ArrayList<>(); + private final List writeThenReadOperations = new ArrayList<>(); public TestOperationExecutor(final List responses) { this.responses = new ArrayList<>(responses); @@ -60,6 +63,14 @@ public Mono execute(final WriteOperation operation, final ReadConcern return createMono(); } + @Override + public Mono> execute(final AsyncWriteThenReadOperationCursor operation, final ReadConcern readConcern, + @Nullable final ClientSession session) { + clientSessions.add(session); + writeThenReadOperations.add(operation); + return createMono(); + } + @Override public OperationExecutor withTimeoutSettings(final TimeoutSettings timeoutSettings) { return this; @@ -106,4 +117,9 @@ WriteOperation getWriteOperation() { return writeOperations.isEmpty() ? null : writeOperations.remove(0); } + @Nullable + AsyncWriteThenReadOperationCursor getWriteThenReadOperation() { + return writeThenReadOperations.isEmpty() ? null : writeThenReadOperations.remove(0); + } + } From 71e84a5c5aa3be9da5775aa76ed4acf62fb0aa74 Mon Sep 17 00:00:00 2001 From: Almas Abdrazak Date: Wed, 29 Apr 2026 09:54:54 -0700 Subject: [PATCH 2/4] remove unused import --- .../reactivestreams/client/internal/MapReducePublisherImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java index 4ac7f503254..86a2239f759 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java @@ -28,7 +28,6 @@ import com.mongodb.internal.binding.WriteBinding; import com.mongodb.internal.client.model.FindOptions; import com.mongodb.internal.connection.OperationContext; -import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.MapReduceAsyncBatchCursor; import com.mongodb.internal.operation.MapReduceBatchCursor; import com.mongodb.internal.operation.MapReduceStatistics; From 2f1a51725549f5d031d5136f161c42445dc5b5d1 Mon Sep 17 00:00:00 2001 From: Almas Abdrazak Date: Wed, 29 Apr 2026 12:13:25 -0700 Subject: [PATCH 3/4] address copilot feedback --- .../internal/ClientSessionPublisherImpl.java | 3 ++- .../internal/OperationExecutorImpl.java | 4 ++++ .../internal/MapReducePublisherImplTest.java | 23 ++++++++++++++++++- 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index 511f9f62c6b..87b90781150 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -27,6 +27,7 @@ import com.mongodb.internal.observability.micrometer.TracingManager; import com.mongodb.internal.observability.micrometer.TransactionSpan; import com.mongodb.internal.operation.AbortTransactionOperation; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.CommitTransactionOperation; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteConcernHelper; @@ -88,7 +89,7 @@ public boolean notifyMessageSent() { @Override public void notifyOperationInitiated(final Object operation) { - assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation); + assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation || operation instanceof AsyncWriteThenReadOperationCursor); if (!(hasActiveTransaction() || operation instanceof CommitTransactionOperation)) { assertTrue(getPinnedServerAddress() == null || (transactionState != TransactionState.ABORTED && transactionState != TransactionState.NONE)); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java index a29f15ad55b..9a7b0e2c592 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java @@ -188,6 +188,10 @@ public Mono> execute(final AsyncWriteThenReadOperationCu notNull("operation", operation); notNull("readConcern", readConcern); + if (session != null) { + session.notifyOperationInitiated(operation); + } + return Mono.from(subscriber -> clientSessionHelper.withClientSession(session, this) .flatMap(actualClientSession -> { diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java index 953e5e0fc13..36c8ac93705 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java @@ -20,10 +20,10 @@ import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; import com.mongodb.client.model.Sorts; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.MapReduceStatistics; import com.mongodb.internal.operation.MapReduceToCollectionOperation; import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation; - import com.mongodb.reactivestreams.client.MapReducePublisher; @@ -53,6 +53,27 @@ public class MapReducePublisherImplTest extends TestHelper { private static final String REDUCE_FUNCTION = "reduceFunction(){}"; private static final String FINALIZE_FUNCTION = "finalizeFunction(){}"; + @DisplayName("Non-inline MapReduce routes through the write-then-read executor path") + @Test + void shouldRouteNonInlineMapReduceThroughWriteThenReadPath() { + configureBatchCursor(); + TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor())); + + MapReducePublisher publisher = + new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor), + MAP_FUNCTION, REDUCE_FUNCTION) + .collectionName(NAMESPACE.getCollectionName()); + + Flux.from(publisher).blockFirst(); + + AsyncWriteThenReadOperationCursor op = executor.getWriteThenReadOperation(); + assertNotNull(op, "expected a write-then-read operation"); + assertEquals(NAMESPACE, op.getNamespace()); + // Must not fall through to the plain read-operation path. + assertNull(executor.getReadOperation()); + } + + @DisplayName("Inline MapReduce still routes through the read-operation path") @Test void shouldRouteInlineMapReduceThroughReadOperationPath() { From fa700ae908a8444fb4e80d84868a3fa5c165f480 Mon Sep 17 00:00:00 2001 From: Almas Abdrazak Date: Wed, 29 Apr 2026 16:56:23 -0700 Subject: [PATCH 4/4] update the docs --- .../operation/AsyncWriteThenReadOperationCursor.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java index 9c9505f42bd..0a48079bf5f 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java @@ -33,21 +33,22 @@ public interface AsyncWriteThenReadOperationCursor { /** - * @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc. + * @return the command name of the write phase of this operation (e.g. "mapReduce", "aggregate") */ String getCommandName(); /** - * @return the namespace of the operation + * @return the namespace the write phase targets */ MongoNamespace getNamespace(); /** - * General execute which can return anything of type T + * Executes the write phase followed by the read phase, yielding an {@link AsyncBatchCursor} + * over the results of the read. * - * @param binding the binding to execute in the context of + * @param binding the read-write binding used by both phases * @param operationContext the operation context to use - * @param callback the callback to be called when the operation has been executed + * @param callback receives the {@link AsyncBatchCursor} on success, or the failure of either phase */ void executeAsync(AsyncReadWriteBinding binding, OperationContext operationContext, SingleResultCallback> callback);