diff --git a/vortex-cuda/src/pooled_read_at.rs b/vortex-cuda/src/pooled_read_at.rs index 1b5ec4fb31a..e06eb7529de 100644 --- a/vortex-cuda/src/pooled_read_at.rs +++ b/vortex-cuda/src/pooled_read_at.rs @@ -106,7 +106,7 @@ impl VortexReadAt for PooledFileReadAt { async move { let mut target = pool.get(length)?; let target = handle - .spawn_blocking(move || { + .spawn_blocking_io(move || { read_exact_at(&file, target.as_mut_slice(), offset)?; Ok::<_, io::Error>(target) }) @@ -234,7 +234,7 @@ impl VortexReadAt for PooledObjectStoreReadAt { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(file, _) => { target = handle - .spawn_blocking(move || { + .spawn_blocking_io(move || { read_exact_at(&file, target.as_mut_slice(), range.start)?; Ok::<_, io::Error>(target) }) diff --git a/vortex-duckdb/src/duckdb/file_system.rs b/vortex-duckdb/src/duckdb/file_system.rs index a3c76af633d..40f238625ca 100644 --- a/vortex-duckdb/src/duckdb/file_system.rs +++ b/vortex-duckdb/src/duckdb/file_system.rs @@ -118,7 +118,7 @@ impl VortexWrite for DuckDbFsWriter { let runtime = RUNTIME.handle(); let buffer = runtime - .spawn_blocking(move || { + .spawn_blocking_io(move || { let mut err: cpp::duckdb_vx_error = ptr::null_mut(); let mut out_len: cpp::idx_t = 0; let status = unsafe { @@ -149,7 +149,7 @@ impl VortexWrite for DuckDbFsWriter { let runtime = RUNTIME.handle(); runtime - .spawn_blocking(move || { + .spawn_blocking_io(move || { let mut err: cpp::duckdb_vx_error = ptr::null_mut(); let status = unsafe { cpp::duckdb_vx_fs_sync(handle.as_ptr(), &raw mut err) }; if status != cpp::duckdb_state::DuckDBSuccess { diff --git a/vortex-duckdb/src/filesystem.rs b/vortex-duckdb/src/filesystem.rs index e6d5c7c9750..3b22990bf27 100644 --- a/vortex-duckdb/src/filesystem.rs +++ b/vortex-duckdb/src/filesystem.rs @@ -133,7 +133,7 @@ impl FileSystem for DuckDbFileSystem { stream::once(async move { RUNTIME .handle() - .spawn_blocking(move || list_recursive(ctx, &directory_url, &base_url)) + .spawn_blocking_io(move || list_recursive(ctx, &directory_url, &base_url)) .await }) .flat_map(|result| match result { @@ -271,7 +271,7 @@ impl VortexReadAt for DuckDbFsReader { let runtime = RUNTIME.handle(); let size = runtime - .spawn_blocking(move || { + .spawn_blocking_io(move || { let mut err: cpp::duckdb_vx_error = ptr::null_mut(); let mut size_out: cpp::idx_t = 0; let status = unsafe { @@ -301,7 +301,7 @@ impl VortexReadAt for DuckDbFsReader { async move { let runtime = RUNTIME.handle(); let result: VortexResult = runtime - .spawn_blocking(move || -> VortexResult { + .spawn_blocking_io(move || -> VortexResult { let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); unsafe { buffer.set_len(length) }; diff --git a/vortex-io/src/object_store/read_at.rs b/vortex-io/src/object_store/read_at.rs index fd0f979c187..5016d3ac829 100644 --- a/vortex-io/src/object_store/read_at.rs +++ b/vortex-io/src/object_store/read_at.rs @@ -134,7 +134,7 @@ impl VortexReadAt for ObjectStoreReadAt { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(file, _) => { handle - .spawn_blocking(move || { + .spawn_blocking_io(move || { read_exact_at(&file, buffer.as_mut_slice(), range.start)?; Ok::<_, io::Error>(buffer) }) diff --git a/vortex-io/src/runtime/handle.rs b/vortex-io/src/runtime/handle.rs index dfbdc7fa77f..4aa1b0fb953 100644 --- a/vortex-io/src/runtime/handle.rs +++ b/vortex-io/src/runtime/handle.rs @@ -122,7 +122,7 @@ impl Handle { } /// Spawn a blocking I/O task for execution on the runtime. - pub fn spawn_blocking(&self, f: F) -> Task + pub fn spawn_blocking_io(&self, f: F) -> Task where F: FnOnce() -> R + Send + 'static, R: Send + 'static, diff --git a/vortex-io/src/std_file/read_at.rs b/vortex-io/src/std_file/read_at.rs index 3d59a595f70..848cab51d14 100644 --- a/vortex-io/src/std_file/read_at.rs +++ b/vortex-io/src/std_file/read_at.rs @@ -126,7 +126,7 @@ impl VortexReadAt for FileReadAt { let allocator = Arc::clone(&self.allocator); async move { handle - .spawn_blocking(move || { + .spawn_blocking_io(move || { let mut buffer = allocator.allocate(length, alignment)?; read_exact_at(&file, buffer.as_mut_slice(), offset)?; Ok(BufferHandle::new_host(buffer.freeze())) diff --git a/vortex-layout/src/scan/scan_builder.rs b/vortex-layout/src/scan/scan_builder.rs index 84dfbdd93ce..ff3aee265f1 100644 --- a/vortex-layout/src/scan/scan_builder.rs +++ b/vortex-layout/src/scan/scan_builder.rs @@ -372,9 +372,8 @@ impl Stream for LazyScanStream { .unwrap_or(1); let concurrency = builder.concurrency * num_workers; let handle = builder.session.handle(); - let task = handle.spawn_blocking(move || { - builder.prepare().and_then(|scan| scan.execute(None)) - }); + let task = handle + .spawn_cpu(move || builder.prepare().and_then(|scan| scan.execute(None))); self.state = LazyScanState::Preparing(PreparingScan { ordered, concurrency,