JAVA-2673#1950
Conversation
| * | ||
| * <p>This class is not part of the public API and may be removed or changed at any time</p> | ||
| */ | ||
| public interface AsyncWriteThenReadOperationCursor<T> { |
There was a problem hiding this comment.
this is a new interface that will be used by OperationExecutorImpl
It adds a new override to execute that accepts this interface instead of ReadOperation
All methods here were copied from ReadOperation interface
There was a problem hiding this comment.
Pull request overview
This PR introduces a new internal “write-then-read cursor” operation shape and routes non-inline mapReduce execution through it, avoiding binding downcasts and separating inline vs non-inline execution paths.
Changes:
- Add
AsyncWriteThenReadOperationCursor(driver-core) and a correspondingOperationExecutor#executeoverload. - Update reactive-streams execution plumbing (
MongoOperationPublisher,OperationExecutorImpl,VoidWriteOperationThenCursorReadOperation) to support write-then-read cursor operations. - Change
MapReducePublisherImplso non-inline mapReduce uses the write-then-read path, and add a unit test asserting inline mapReduce stays on the read-operation path.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutor.java | Adds executor overload for write-then-read cursor operations. |
| driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java | Implements new execute path for write-then-read cursor operations. |
| driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java | Adds helper to create Monos for write-then-read cursor operations. |
| driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java | Converts to the new write-then-read cursor operation interface using AsyncReadWriteBinding. |
| driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java | Routes non-inline mapReduce cursor reads through the new write-then-read path. |
| driver-core/src/main/com/mongodb/internal/operation/AsyncWriteThenReadOperationCursor.java | Introduces new internal operation interface for write-then-read returning a cursor. |
| driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestOperationExecutor.java | Extends test executor to record/return write-then-read operations. |
| driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/TestHelper.java | Updates Mockito stubbing for overloaded execute methods. |
| driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MapReducePublisherImplTest.java | Adds inline routing assertion test (and related assertions/import changes). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @Override | ||
| public <T> Mono<AsyncBatchCursor<T>> execute(final AsyncWriteThenReadOperationCursor<T> operation, final ReadConcern readConcern, | ||
| @Nullable final ClientSession session) { | ||
| isTrue("open", !mongoClient.getCluster().isClosed()); | ||
| notNull("operation", operation); | ||
| notNull("readConcern", readConcern); | ||
|
|
There was a problem hiding this comment.
I added notifyOperationInitiated call , also implementationf of notifyOperationInitiated now also checked instance of AsyncWriteThenRead
| public Mono<BatchCursor<T>> batchCursor(final int initialBatchSize) { | ||
| if (inline) { | ||
| return super.batchCursor(initialBatchSize); | ||
| } | ||
| return writeThenReadBatchCursor(initialBatchSize); | ||
| } |
|
|
||
| import com.mongodb.reactivestreams.client.MapReducePublisher; | ||
|
|
||
|
|
There was a problem hiding this comment.
removed empty line
| if (inline) { | ||
| return super.first(); | ||
| } | ||
| return writeThenReadBatchCursor(1) |
There was a problem hiding this comment.
this number 1 comes from BatchCursorPublisher
There was a problem hiding this comment.
I think both async and sync paths should follow the same logic.
I also think that as this is for the deprecated MapReduce functionality I'm not sure its worth changing eg: I can live with the code smell.
Update: Seems Aggregation with $out also has this code smell.
JAVA-2673
I added a new interface for ReadThenWrite so that we don't have to cast read operation to write operation