diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java new file mode 100644 index 0000000000..0a48079bf5 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java @@ -0,0 +1,55 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoNamespace; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.binding.AsyncReadWriteBinding; +import com.mongodb.internal.connection.OperationContext; + +/** + * An async-only operation that performs a write followed by a read that returns a cursor. + * + *

Unlike {@link ReadOperationCursor}, this operation requires an {@link AsyncReadWriteBinding} + * so that both the write and the read portions can be executed without narrowing casts. + * + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public interface AsyncWriteThenReadOperationCursor { + + /** + * @return the command name of the write phase of this operation (e.g. "mapReduce", "aggregate") + */ + String getCommandName(); + + /** + * @return the namespace the write phase targets + */ + MongoNamespace getNamespace(); + + /** + * Executes the write phase followed by the read phase, yielding an {@link AsyncBatchCursor} + * over the results of the read. + * + * @param binding the read-write binding used by both phases + * @param operationContext the operation context to use + * @param callback receives the {@link AsyncBatchCursor} on success, or the failure of either phase + */ + void executeAsync(AsyncReadWriteBinding binding, OperationContext operationContext, + SingleResultCallback> callback); +} diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index 511f9f62c6..87b9078115 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -27,6 +27,7 @@ import com.mongodb.internal.observability.micrometer.TracingManager; import com.mongodb.internal.observability.micrometer.TransactionSpan; import com.mongodb.internal.operation.AbortTransactionOperation; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.CommitTransactionOperation; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteConcernHelper; @@ -88,7 +89,7 @@ public boolean notifyMessageSent() { @Override public void notifyOperationInitiated(final Object operation) { - assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation); + assertTrue(operation instanceof ReadOperation || operation instanceof WriteOperation || operation instanceof AsyncWriteThenReadOperationCursor); if (!(hasActiveTransaction() || operation instanceof CommitTransactionOperation)) { assertTrue(getPinnedServerAddress() == null || (transactionState != TransactionState.ABORTED && transactionState != TransactionState.NONE)); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java index 1e8e7fa223..86a2239f75 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java @@ -40,6 +40,7 @@ import org.bson.BsonDocument; import org.bson.conversions.Bson; import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -201,10 +202,46 @@ public ReadOperationCursor asReadOperation(final int initialBatchSize) { if (inline) { // initialBatchSize is ignored for map reduce operations. return createMapReduceInlineOperation(); - } else { - return new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(), - createFindOperation(initialBatchSize)); } + throw new IllegalStateException("Non-inline map-reduce uses the write-then-read path; " + + "asReadOperation must not be called."); + } + + @Override + public Mono> batchCursor(final int initialBatchSize) { + if (inline) { + return super.batchCursor(initialBatchSize); + } + return writeThenReadBatchCursor(initialBatchSize); + } + + @Override + public Publisher first() { + if (inline) { + return super.first(); + } + return writeThenReadBatchCursor(1) + .flatMap(batchCursor -> { + batchCursor.setBatchSize(1); + return Mono.from(batchCursor.next()) + .doOnTerminate(batchCursor::close) + .flatMap(results -> { + if (results == null || results.isEmpty()) { + return Mono.empty(); + } + return Mono.fromCallable(() -> results.get(0)); + }); + }); + } + + private Mono> writeThenReadBatchCursor(final int initialBatchSize) { + return getMongoOperationPublisher() + .createWriteThenReadOperationMono( + operations -> operations.createTimeoutSettings(maxTimeMS), + () -> new VoidWriteOperationThenCursorReadOperation<>(createMapReduceToCollectionOperation(), + createFindOperation(initialBatchSize)), + getClientSession()) + .map(BatchCursor::new); } private WrappedMapReduceReadOperation createMapReduceInlineOperation() { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java index 84c810f1b5..8d561859af 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java @@ -60,6 +60,8 @@ import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.bulk.WriteRequest; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.IndexHelper; import com.mongodb.internal.operation.Operations; import com.mongodb.internal.operation.ReadOperation; @@ -513,6 +515,15 @@ Mono createReadOperationMono(final Supplier timeoutSetti .execute(readOperation, readPreference, getReadConcern(), clientSession); } + Mono> createWriteThenReadOperationMono( + final Function, TimeoutSettings> timeoutSettingsFunction, + final Supplier> operationSupplier, + @Nullable final ClientSession clientSession) { + AsyncWriteThenReadOperationCursor operation = operationSupplier.get(); + return getExecutor(timeoutSettingsFunction.apply(operations)) + .execute(operation, getReadConcern(), clientSession); + } + Mono createWriteOperationMono(final Function, TimeoutSettings> timeoutSettingsFunction, final Supplier> operationSupplier, @Nullable final ClientSession clientSession) { return createWriteOperationMono(() -> timeoutSettingsFunction.apply(operations), operationSupplier, clientSession); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java index cd666720f3..7c61ecf547 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java @@ -19,6 +19,8 @@ import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.Nullable; @@ -54,6 +56,20 @@ Mono execute(ReadOperation operation, ReadPreference readPreference */ Mono execute(WriteOperation operation, ReadConcern readConcern, @Nullable ClientSession session); + /** + * Execute an operation that writes and then reads a cursor within a single read-write binding. + * + *

The binding is acquired once and used for both phases, avoiding the need to narrow an + * {@code AsyncReadBinding} to an {@code AsyncWriteBinding}. + * + * @param operation the write-then-read operation. + * @param readConcern the read concern + * @param session the session to associate this operation with + * @param the document type returned by the cursor. + */ + Mono> execute(AsyncWriteThenReadOperationCursor operation, + ReadConcern readConcern, @Nullable ClientSession session); + /** * Create a new OperationExecutor with a specific timeout settings * diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java index 62a4431cc9..9a7b0e2c59 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java @@ -33,6 +33,8 @@ import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext; import com.mongodb.internal.observability.micrometer.Span; import com.mongodb.internal.observability.micrometer.TracingManager; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.OperationHelper; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; @@ -179,6 +181,51 @@ public Mono execute(final WriteOperation operation, final ReadConcern ); } + @Override + public Mono> execute(final AsyncWriteThenReadOperationCursor operation, final ReadConcern readConcern, + @Nullable final ClientSession session) { + isTrue("open", !mongoClient.getCluster().isClosed()); + notNull("operation", operation); + notNull("readConcern", readConcern); + + if (session != null) { + session.notifyOperationInitiated(operation); + } + + return Mono.from(subscriber -> + clientSessionHelper.withClientSession(session, this) + .flatMap(actualClientSession -> { + AsyncReadWriteBinding binding = getReadWriteBinding(primary(), actualClientSession, session == null); + RequestContext requestContext = getContext(subscriber); + OperationContext operationContext = getOperationContext(requestContext, actualClientSession, readConcern, operation.getCommandName()) + .withSessionContext(new ClientSessionBinding.AsyncClientSessionContext(actualClientSession, + isImplicitSession(session), readConcern)); + Span span = tracingManager.createOperationSpan(actualClientSession.getTransactionSpan(), + operationContext, operation.getCommandName(), operation.getNamespace()); + + return Mono.>create(sink -> operation.executeAsync(binding, operationContext, (result, t) -> { + try { + binding.release(); + } finally { + if (t != null) { + Throwable exceptionToHandle = t instanceof MongoException + ? OperationHelper.unwrap((MongoException) t) : t; + labelException(session, exceptionToHandle); + unpinServerAddressOnTransientTransactionError(session, exceptionToHandle); + if (span != null) { + span.error(t); + } + } + if (span != null) { + span.end(); + } + sinkToCallback(sink).onResult(result, t); + } + })); + }).subscribe(subscriber) + ); + } + @Override public OperationExecutor withTimeoutSettings(final TimeoutSettings newTimeoutSettings) { if (Objects.equals(timeoutSettings, newTimeoutSettings)) { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java index a7d1191c8b..fb253dafa8 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java @@ -19,13 +19,13 @@ import com.mongodb.MongoNamespace; import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; -import com.mongodb.internal.binding.AsyncReadBinding; -import com.mongodb.internal.binding.AsyncWriteBinding; +import com.mongodb.internal.binding.AsyncReadWriteBinding; import com.mongodb.internal.connection.OperationContext; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperationCursor; import com.mongodb.internal.operation.WriteOperation; -class VoidWriteOperationThenCursorReadOperation implements ReadOperationCursorAsyncOnly { +class VoidWriteOperationThenCursorReadOperation implements AsyncWriteThenReadOperationCursor { private final WriteOperation writeOperation; private final ReadOperationCursor cursorReadOperation; @@ -46,8 +46,9 @@ public MongoNamespace getNamespace() { } @Override - public void executeAsync(final AsyncReadBinding binding, final OperationContext operationContext, final SingleResultCallback> callback) { - writeOperation.executeAsync((AsyncWriteBinding) binding, operationContext, (result, t) -> { + public void executeAsync(final AsyncReadWriteBinding binding, final OperationContext operationContext, + final SingleResultCallback> callback) { + writeOperation.executeAsync(binding, operationContext, (result, t) -> { if (t != null) { callback.onResult(null, t); } else { diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java index c112395a81..36c8ac9370 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java @@ -20,9 +20,13 @@ import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; import com.mongodb.client.model.Sorts; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.MapReduceStatistics; import com.mongodb.internal.operation.MapReduceToCollectionOperation; import com.mongodb.internal.operation.MapReduceWithInlineResultsOperation; +import com.mongodb.reactivestreams.client.MapReducePublisher; + + import org.bson.BsonDocument; import org.bson.BsonInt32; import org.bson.BsonJavaScript; @@ -39,6 +43,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @SuppressWarnings({"rawtypes", "deprecation"}) @@ -48,6 +53,44 @@ public class MapReducePublisherImplTest extends TestHelper { private static final String REDUCE_FUNCTION = "reduceFunction(){}"; private static final String FINALIZE_FUNCTION = "finalizeFunction(){}"; + @DisplayName("Non-inline MapReduce routes through the write-then-read executor path") + @Test + void shouldRouteNonInlineMapReduceThroughWriteThenReadPath() { + configureBatchCursor(); + TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor())); + + MapReducePublisher publisher = + new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor), + MAP_FUNCTION, REDUCE_FUNCTION) + .collectionName(NAMESPACE.getCollectionName()); + + Flux.from(publisher).blockFirst(); + + AsyncWriteThenReadOperationCursor op = executor.getWriteThenReadOperation(); + assertNotNull(op, "expected a write-then-read operation"); + assertEquals(NAMESPACE, op.getNamespace()); + // Must not fall through to the plain read-operation path. + assertNull(executor.getReadOperation()); + } + + + @DisplayName("Inline MapReduce still routes through the read-operation path") + @Test + void shouldRouteInlineMapReduceThroughReadOperationPath() { + configureBatchCursor(); + TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor())); + + MapReducePublisher publisher = + new MapReducePublisherImpl<>(null, createMongoOperationPublisher(executor), + MAP_FUNCTION, REDUCE_FUNCTION); // no collectionName -> inline + + Flux.from(publisher).blockFirst(); + + assertNotNull(executor.getReadOperation()); + assertNull(executor.getWriteThenReadOperation()); + } + + @DisplayName("Should build the expected MapReduceWithInlineResultsOperation") @Test void shouldBuildTheExpectedMapReduceWithInlineResultsOperation() { diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java index 450536df2b..685b6361d0 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java @@ -26,6 +26,7 @@ import com.mongodb.internal.bulk.IndexRequest; import com.mongodb.internal.bulk.WriteRequest; import com.mongodb.internal.client.model.FindOptions; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.NonNull; @@ -93,7 +94,10 @@ public class TestHelper { Mockito.lenient().doAnswer(invocation -> Mono.empty()) .when(executor) - .execute(any(), any(), any()); + .execute(any(WriteOperation.class), any(), any()); + Mockito.lenient().doAnswer(invocation -> Mono.empty()) + .when(executor) + .execute(any(AsyncWriteThenReadOperationCursor.class), any(), any()); Mockito.lenient().doAnswer(invocation -> Mono.empty()) .when(executor) .execute(any(), any(), any(), any()); diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java index 831d22b308..731eb25137 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java @@ -19,6 +19,8 @@ import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; import com.mongodb.internal.TimeoutSettings; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.operation.AsyncWriteThenReadOperationCursor; import com.mongodb.internal.operation.ReadOperation; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.lang.Nullable; @@ -37,6 +39,7 @@ public class TestOperationExecutor implements OperationExecutor { private final List readOperations = new ArrayList<>(); private final List writeOperations = new ArrayList<>(); + private final List writeThenReadOperations = new ArrayList<>(); public TestOperationExecutor(final List responses) { this.responses = new ArrayList<>(responses); @@ -60,6 +63,14 @@ public Mono execute(final WriteOperation operation, final ReadConcern return createMono(); } + @Override + public Mono> execute(final AsyncWriteThenReadOperationCursor operation, final ReadConcern readConcern, + @Nullable final ClientSession session) { + clientSessions.add(session); + writeThenReadOperations.add(operation); + return createMono(); + } + @Override public OperationExecutor withTimeoutSettings(final TimeoutSettings timeoutSettings) { return this; @@ -106,4 +117,9 @@ WriteOperation getWriteOperation() { return writeOperations.isEmpty() ? null : writeOperations.remove(0); } + @Nullable + AsyncWriteThenReadOperationCursor getWriteThenReadOperation() { + return writeThenReadOperations.isEmpty() ? null : writeThenReadOperations.remove(0); + } + }