perf: move request submission off event loop thread in execute_concurrent#827
perf: move request submission off event loop thread in execute_concurrent#827mykaul wants to merge 2 commits intoscylladb:masterfrom
Conversation
c678f3b to
fd9be81
Compare
ConcurrentExecutorListResults now uses a dedicated submitter thread instead of calling _execute_next inline from the event loop callback. This decouples I/O completion processing from new request serialization and enqueuing, yielding ~6-9% higher write throughput. The callback now just signals a threading.Event; the submitter thread drains a deque and calls session.execute_async in batches. This avoids blocking the libev event loop thread with request preparation work (query plan, serialization, tablet lookup) that takes ~27us per request. Bug fixes included: - Start submitter thread after initial batch (avoid race on _exec_count) - Write _exec_count under _condition lock (avoid race with _current) - Use exhausted flag to avoid repeated StopIteration on iterator
fd9be81 to
b759d4a
Compare
Lazy Event: defer Event() creation until result() is actually called. For execute_concurrent (which never calls result()), this eliminates ~620ns of Event construction + Event.set() per request. Merged add_callbacks: register both callback and errback under a single _callback_lock acquisition instead of two separate ones (~80ns saved). _set_final_result/_set_final_exception: capture _event reference under _callback_lock for free-threaded Python safety; skip .set() when Event was never created. _wait_for_result: check result availability under _callback_lock before creating Event, avoiding Event creation entirely when the result arrived before the caller waits. _on_speculative_execute: check _final_result/_final_exception directly instead of relying on Event.is_set(), since Event may be None. All changes are safe under both GIL and free-threaded (PEP 703) Python.
v2 changes: reduce per-request lock overhead in ResponseFutureNew commit Changes
Design notes
Test results
|
|
Correction on test failures: The 6 "pre-existing failures" reported in the v2 comment were all caused by a stale Cython 642 passed, 0 failures, 8 skipped No pre-existing failures. Clean test run. |
Summary
execute_async()calls inexecute_concurrentfrom the event-loop callback thread to a dedicated submitter threadexecute_async(), which includes serialization — keeping that CPU work off the event loopv2: Reduce per-request lock overhead in ResponseFuture
Second commit (
7cae6a14e) reduces lock/synchronization cost per request in theexecute_concurrenthot path:Lazy
Eventcreation:ResponseFuture._eventstarts asNoneinstead ofEvent(). The Event is only materialized inresult()(the synchronous path). Forexecute_concurrent, which never callsresult()on individual futures, this eliminates ~620ns per request (351ns Event construction + 267ns Event.set()).Merged
add_callbacks(): Registers both callback and errback under a single_callback_lockacquisition instead of two separate lock/unlock cycles. Saves ~80ns per request._set_final_result/_set_final_exception: Capture_eventreference under_callback_lockbefore calling.set()outside the lock. Skip.set()when Event was never created. Null-check callback/errback lists before buildingto_calltuple._wait_for_result(): Checks result availability under_callback_lockbefore creating Event — avoids Event creation entirely when the result arrived before the caller waits._on_speculative_execute: Checks_final_result/_final_exceptiondirectly instead ofEvent.is_set(), since Event may beNonewith lazy creation.All changes are safe under both GIL and free-threaded (PEP 703) Python. No GIL assumptions.
Benchmark Results
On our vector ingestion benchmark (100K rows, 768-dim float32 vectors, ScyllaDB 2026.1.1):
How It Works
_ConcurrentExecutorBasespawns a daemon submitter thread alongside the existing callback mechanismdeque.append(1); event.set()— minimal work on the hot path_execute_next()in a batchcollections.deque(atomic append/popleft in CPython) +threading.EventNonein deque signals the thread to exit;join()inwait()Testing
test_concurrent.pyunit tests pass