diff --git a/Cargo.lock b/Cargo.lock index f9fd5e6e960..fd401f2ec2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3892,6 +3892,12 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" +[[package]] +name = "fax" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caf1079563223d5d59d83c85886a56e586cfd5c1a26292e971a0fa266531ac5a" + [[package]] name = "filetime" version = "0.2.27" @@ -7306,6 +7312,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" +[[package]] +name = "quick-error" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" + [[package]] name = "quick-xml" version = "0.39.2" @@ -9363,6 +9375,20 @@ dependencies = [ "ordered-float 2.10.1", ] +[[package]] +name = "tiff" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b63feaf3343d35b6ca4d50483f94843803b0f51634937cc2ec519fc32232bc52" +dependencies = [ + "fax", + "flate2", + "half", + "quick-error", + "weezl", + "zune-jpeg", +] + [[package]] name = "time" version = "0.3.47" @@ -10003,15 +10029,18 @@ version = "0.1.0" dependencies = [ "anyhow", "arrow-array 58.1.0", + "clap", "codspeed-divan-compat", "fastlanes", "futures", "mimalloc", + "ndarray", "parquet 58.1.0", "paste", "rand 0.10.1", "rand_distr 0.6.0", "serde_json", + "tiff", "tokio", "tracing", "tracing-subscriber", @@ -11225,6 +11254,12 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "weezl" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88" + [[package]] name = "which" version = "8.0.2" @@ -11980,3 +12015,18 @@ dependencies = [ "cc", "pkg-config", ] + +[[package]] +name = "zune-core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb8a0807f7c01457d0379ba880ba6322660448ddebc890ce29bb64da71fb40f9" + +[[package]] +name = "zune-jpeg" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27bc9d5b815bc103f142aa054f561d9187d191692ec7c2d1e2b4737f8dbd7296" +dependencies = [ + "zune-core", +] diff --git a/vortex/Cargo.toml b/vortex/Cargo.toml index 982127a4035..f19595ab8a5 100644 --- a/vortex/Cargo.toml +++ b/vortex/Cargo.toml @@ -52,15 +52,18 @@ vortex-zstd = { workspace = true, optional = true } [dev-dependencies] anyhow = { workspace = true } arrow-array = { workspace = true } +clap = { workspace = true, features = ["derive"] } divan = { workspace = true } fastlanes = { workspace = true } futures = { workspace = true } mimalloc = { workspace = true } +ndarray = "0.16" parquet = { workspace = true } paste = { workspace = true } rand = { workspace = true } rand_distr = { workspace = true } serde_json = { workspace = true } +tiff = "0.11" tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } tracing-subscriber = { workspace = true } @@ -89,6 +92,14 @@ unstable_encodings = [ name = "turboquant_vector_search" required-features = ["files", "tokio", "unstable_encodings"] +[[example]] +name = "geotiff_to_vortex" +required-features = ["files", "tokio"] + +[[example]] +name = "raster_bench" +required-features = ["files", "tokio"] + [[bench]] name = "single_encoding_throughput" harness = false diff --git a/vortex/examples/geotiff_to_vortex.rs b/vortex/examples/geotiff_to_vortex.rs new file mode 100644 index 00000000000..bc91512abb7 --- /dev/null +++ b/vortex/examples/geotiff_to_vortex.rs @@ -0,0 +1,400 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![allow(clippy::cast_possible_truncation)] + +//! Convert a tiled GeoTIFF into a Vortex file with one row per tile. +//! +//! The output schema is: +//! +//! ```text +//! struct { +//! minx: f64, +//! miny: f64, +//! maxx: f64, +//! maxy: f64, +//! band_0: list, +//! band_1: list, +//! ... +//! } +//! ``` +//! +//! Each row holds one tile's footprint (in CRS coordinates derived from the GeoTIFF +//! geotransform) and the raw bytes of every band, separated. `List` is used over +//! `ListView` because tile bytes are written contiguously and we never reorder elements. +//! +//! Run with: +//! +//! ```sh +//! cargo run --example geotiff_to_vortex -p vortex --release -- \ +//! --input m_3211428_sw_11_030_20230617_20240228.tif \ +//! --output naip.vortex \ +//! --compression compact \ +//! --block-size-bytes 4194304 +//! ``` + +use std::fs::File; +use std::path::PathBuf; + +use anyhow::Context; +use anyhow::Result; +use anyhow::bail; +use clap::Parser; +use clap::ValueEnum; +use tiff::decoder::Decoder; +use tiff::decoder::DecodingResult; +use tiff::tags::Tag; +use vortex::VortexSessionDefault; +use vortex::array::ArrayRef; +use vortex::array::IntoArray; +use vortex::array::arrays::ListArray; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::StructArray; +use vortex::array::validity::Validity; +use vortex::buffer::Buffer; +use vortex::compressor::BtrBlocksCompressorBuilder; +use vortex::file::WriteOptionsSessionExt; +use vortex::file::WriteStrategyBuilder; +use vortex::session::VortexSession; + +#[derive(Copy, Clone, Debug, ValueEnum)] +enum CompressionMode { + /// Default BtrBlocks cascade (no PCO, no Zstd). + Btrblocks, + /// BtrBlocks with the "compact" extras: PCO (numerics) and Zstd (strings/buffers). + Compact, +} + +#[derive(Parser, Debug)] +#[command(about = "Convert a tiled GeoTIFF into a Vortex file (one row per tile).")] +struct Args { + /// Path to the source tiled GeoTIFF. + #[arg(long, short)] + input: PathBuf, + + /// Output Vortex file path. + #[arg(long, short)] + output: PathBuf, + + /// Compression strategy for the file write pipeline. + #[arg(long, value_enum, default_value_t = CompressionMode::Btrblocks)] + compression: CompressionMode, + + /// Target row-group block size in bytes. The default of 2 MiB matches Vortex's + /// `BufferedStrategy` default. The example translates this byte budget into a row count + /// using the per-tile size, since `WriteStrategyBuilder::with_row_block_size` is + /// expressed in rows. + #[arg(long, default_value_t = 2 * 1024 * 1024)] + block_size_bytes: usize, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + let tiff_size = std::fs::metadata(&args.input)?.len(); + let raster = read_geotiff(&args.input)?; + + println!( + "loaded {tiles} tiles ({tile_w}x{tile_h}, {bands} bands, {raster_w}x{raster_h} raster)", + tiles = raster.num_tiles(), + tile_w = raster.tile_w, + tile_h = raster.tile_h, + bands = raster.bands, + raster_w = raster.image_w, + raster_h = raster.image_h, + ); + println!( + "geotransform: origin=({:.3}, {:.3}) scale=({:.6}, {:.6})", + raster.origin_x, raster.origin_y, raster.scale_x, raster.scale_y, + ); + + let array = build_struct_array(&raster)?; + + // Pick a row-group size in rows that approximates the requested byte budget. + let row_bytes_estimate = + (raster.tile_w as usize).saturating_mul(raster.tile_h as usize) + 4 * 8; + let rows_per_block = args + .block_size_bytes + .div_ceil(row_bytes_estimate.max(1)) + .max(1); + + let compressor = match args.compression { + CompressionMode::Btrblocks => BtrBlocksCompressorBuilder::default(), + CompressionMode::Compact => BtrBlocksCompressorBuilder::default().with_compact(), + }; + + let strategy = WriteStrategyBuilder::default() + .with_btrblocks_builder(compressor) + .with_row_block_size(rows_per_block) + .build(); + + let session = VortexSession::default(); + let len = array.len(); + let stream = array.into_array().to_array_stream(); + + let summary = session + .write_options() + .with_strategy(strategy) + .write(tokio::fs::File::create(&args.output).await?, stream) + .await?; + + let vortex_size = summary.size(); + println!( + "wrote {} rows to {} ({} bytes)", + len, + args.output.display(), + vortex_size, + ); + println!( + "geotiff: {tiff} bytes vortex: {vx} bytes ratio: {ratio:.3}x ({delta:+.1}%)", + tiff = tiff_size, + vx = vortex_size, + ratio = tiff_size as f64 / vortex_size as f64, + delta = (vortex_size as f64 - tiff_size as f64) * 100.0 / tiff_size as f64, + ); + + Ok(()) +} + +/// All the per-raster information the converter needs in one pass: tile dimensions, bands, +/// geotransform, and the raw bytes for every (band, tile) pair. +struct Raster { + image_w: u32, + image_h: u32, + tile_w: u32, + tile_h: u32, + bands: usize, + origin_x: f64, + origin_y: f64, + scale_x: f64, + scale_y: f64, + /// `tile_bands[band][tile]` = raw bytes of that band's tile (always `tile_w * tile_h` bytes). + tile_bands: Vec>>, +} + +impl Raster { + fn num_tiles(&self) -> usize { + self.tile_bands.first().map(Vec::len).unwrap_or(0) + } + + fn tiles_per_row(&self) -> u32 { + self.image_w.div_ceil(self.tile_w) + } +} + +fn read_geotiff(path: &PathBuf) -> Result { + let mut decoder = Decoder::new(File::open(path).with_context(|| { + format!("failed to open GeoTIFF at {}", path.display()) + })?)?; + + // Tile sizes; bail if the file is striped instead of tiled. + let tile_w = decoder + .get_tag_u32(Tag::TileWidth) + .context("input is not a tiled GeoTIFF (missing TileWidth tag)")?; + let tile_h = decoder + .get_tag_u32(Tag::TileLength) + .context("input is not a tiled GeoTIFF (missing TileLength tag)")?; + + let (image_w, image_h) = decoder.dimensions()?; + + // Probe the first chunk to determine how many bands the tiff crate is actually decoding. + // `SamplesPerPixel` may exceed this when the file declares extra samples (e.g. NIR alongside + // RGB) that get dropped to fit the photometric `ColorType` reported by the decoder. + let first_chunk = read_chunk_u8(&mut decoder, 0)?; + let tile_pixels = (tile_w as usize) * (tile_h as usize); + if first_chunk.is_empty() || first_chunk.len() % tile_pixels != 0 { + bail!( + "first chunk size {} is not a multiple of tile pixel count {}", + first_chunk.len(), + tile_pixels + ); + } + let bands = first_chunk.len() / tile_pixels; + + // Default planar configuration is 1 (chunky / interleaved). Both 1 and 2 are useful, + // but the deinterleave path differs, so we handle them separately. + let planar_cfg = decoder.get_tag_u32(Tag::PlanarConfiguration).unwrap_or(1); + + // Geotransform via ModelTiepoint + ModelPixelScale. The full ModelTransformation tag is rarer + // and not handled here; users with rotated/skewed CRS transforms can extend this branch. + let tiepoint = decoder.get_tag_f64_vec(Tag::ModelTiepointTag)?; + let scale = decoder.get_tag_f64_vec(Tag::ModelPixelScaleTag)?; + if tiepoint.len() < 6 || scale.len() < 2 { + bail!( + "GeoTIFF is missing a usable geotransform (tiepoint len {}, scale len {})", + tiepoint.len(), + scale.len() + ); + } + // Tiepoint format: (i, j, k, x, y, z). Standard north-up convention; `scale.1` is the + // *positive* pixel height in CRS units, so y decreases as j increases. + let origin_x = tiepoint[3] - tiepoint[0] * scale[0]; + let origin_y = tiepoint[4] + tiepoint[1] * scale[1]; + let scale_x = scale[0]; + let scale_y = scale[1]; + + let tiles_x = image_w.div_ceil(tile_w); + let tiles_y = image_h.div_ceil(tile_h); + let tile_count = (tiles_x as usize) * (tiles_y as usize); + let tile_bytes = (tile_w as usize) * (tile_h as usize); + + // Eagerly decode every tile. With a few hundred tiles this is fine; for very large rasters + // the converter would want to stream instead. + let mut tile_bands: Vec>> = (0..bands) + .map(|_| Vec::with_capacity(tile_count)) + .collect(); + + let mut first_chunk_opt = Some(first_chunk); + for tile_idx in 0..tile_count { + let tile_idx_u32 = + u32::try_from(tile_idx).context("tile count exceeds u32 range")?; + // Edge tiles are clipped by the tiff crate's `read_chunk`, so the chunk's actual + // data dimensions can be smaller than the nominal tile dimensions. We always copy + // into a full tile_w x tile_h buffer per band, leaving the padded region as zeros. + let (data_w, data_h) = decoder.chunk_data_dimensions(tile_idx_u32); + let data_w = data_w as usize; + let data_h = data_h as usize; + match planar_cfg { + 1 => { + let bytes = if tile_idx == 0 { + first_chunk_opt + .take() + .unwrap_or_else(Vec::new) + } else { + read_chunk_u8(&mut decoder, tile_idx_u32)? + }; + let expected = data_w * data_h * bands; + if bytes.len() != expected { + bail!( + "unexpected chunk size at tile {}: got {} bytes, expected {} ({}x{}x{})", + tile_idx, + bytes.len(), + expected, + data_w, + data_h, + bands + ); + } + for band in 0..bands { + let mut out = vec![0u8; tile_bytes]; + for y in 0..data_h { + for x in 0..data_w { + out[y * (tile_w as usize) + x] = bytes[(y * data_w + x) * bands + band]; + } + } + tile_bands[band].push(out); + } + } + 2 => { + for band in 0..bands { + let band_u32 = + u32::try_from(band).context("band index exceeds u32 range")?; + let chunk_idx = band_u32 + .checked_mul(u32::try_from(tile_count).context("tile count too large")?) + .and_then(|v| v.checked_add(tile_idx_u32)) + .context("planar chunk index overflowed u32")?; + let bytes = read_chunk_u8(&mut decoder, chunk_idx)?; + let expected = data_w * data_h; + if bytes.len() != expected { + bail!( + "planar chunk size mismatch: got {} bytes, expected {}", + bytes.len(), + expected + ); + } + let mut out = vec![0u8; tile_bytes]; + for y in 0..data_h { + for x in 0..data_w { + out[y * (tile_w as usize) + x] = bytes[y * data_w + x]; + } + } + tile_bands[band].push(out); + } + } + other => bail!("unsupported PlanarConfiguration tag value {}", other), + } + } + + Ok(Raster { + image_w, + image_h, + tile_w, + tile_h, + bands, + origin_x, + origin_y, + scale_x, + scale_y, + tile_bands, + }) +} + +fn read_chunk_u8(decoder: &mut Decoder, chunk_idx: u32) -> Result> { + match decoder.read_chunk(chunk_idx)? { + DecodingResult::U8(bytes) => Ok(bytes), + _ => bail!("only 8-bit GeoTIFFs are supported"), + } +} + +fn build_struct_array(raster: &Raster) -> Result { + let tile_count = raster.num_tiles(); + let tiles_per_row = raster.tiles_per_row(); + + // Bounds: one f64 per tile per axis. + let mut minx = Vec::with_capacity(tile_count); + let mut miny = Vec::with_capacity(tile_count); + let mut maxx = Vec::with_capacity(tile_count); + let mut maxy = Vec::with_capacity(tile_count); + + for tile_idx in 0..tile_count { + let tx = (tile_idx as u32) % tiles_per_row; + let ty = (tile_idx as u32) / tiles_per_row; + let x0 = raster.origin_x + (tx * raster.tile_w) as f64 * raster.scale_x; + let x1 = raster.origin_x + ((tx + 1) * raster.tile_w) as f64 * raster.scale_x; + let y0 = raster.origin_y - ((ty + 1) * raster.tile_h) as f64 * raster.scale_y; + let y1 = raster.origin_y - (ty * raster.tile_h) as f64 * raster.scale_y; + minx.push(x0); + maxx.push(x1); + miny.push(y0); + maxy.push(y1); + } + + let mut fields: Vec<(String, ArrayRef)> = Vec::with_capacity(4 + raster.bands); + fields.push(("minx".to_string(), f64_column(minx))); + fields.push(("miny".to_string(), f64_column(miny))); + fields.push(("maxx".to_string(), f64_column(maxx))); + fields.push(("maxy".to_string(), f64_column(maxy))); + + for (band_idx, tiles) in raster.tile_bands.iter().enumerate() { + let list = build_list_u8(tiles)?; + fields.push((format!("band_{band_idx}"), list)); + } + + let items: Vec<(&str, ArrayRef)> = + fields.iter().map(|(n, a)| (n.as_str(), a.clone())).collect(); + StructArray::from_fields(&items).map_err(Into::into) +} + +fn f64_column(values: Vec) -> ArrayRef { + PrimitiveArray::new(Buffer::::from(values), Validity::NonNullable).into_array() +} + +fn build_list_u8(tiles: &[Vec]) -> Result { + // Concatenate all tile bytes into a single contiguous buffer; offsets carry the per-tile + // boundaries. u64 offsets handle rasters where total bytes exceed 2^32. + let total: usize = tiles.iter().map(Vec::len).sum(); + let mut elements: Vec = Vec::with_capacity(total); + let mut offsets: Vec = Vec::with_capacity(tiles.len() + 1); + offsets.push(0); + for tile in tiles { + elements.extend_from_slice(tile); + offsets.push(elements.len() as u64); + } + + let elements_array = + PrimitiveArray::new(Buffer::::from(elements), Validity::NonNullable).into_array(); + let offsets_array = + PrimitiveArray::new(Buffer::::from(offsets), Validity::NonNullable).into_array(); + Ok(ListArray::try_new(elements_array, offsets_array, Validity::NonNullable)?.into_array()) +} diff --git a/vortex/examples/raster_bench.rs b/vortex/examples/raster_bench.rs new file mode 100644 index 00000000000..42854c0e044 --- /dev/null +++ b/vortex/examples/raster_bench.rs @@ -0,0 +1,607 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![allow(clippy::cast_possible_truncation, clippy::type_complexity)] + +//! Benchmark raster access between a tiled GeoTIFF and the matching Vortex file produced by +//! the `geotiff_to_vortex` example. +//! +//! Two access patterns are measured on both backends: +//! +//! 1. **Single-tile decode** — pick a random tile and decode it into an `ndarray::Array3` +//! with shape `(bands, tile_h, tile_w)`. On the GeoTIFF side this is one `read_chunk` call. +//! On the Vortex side this is a row-index `Selection` pushed into the file scan; the result +//! arrives as `List` chunks that are reshaped into the ndarray. +//! 2. **Random crop batch** — sample `--num-crops` random crops at `--crop-size` pixels and +//! decode every tile that overlaps each crop. On the GeoTIFF side this is a per-crop loop of +//! `read_chunk` calls. On the Vortex side this is a single OR-of-bbox filter expression +//! pushed through `with_filter`, so tile rows are pruned by the file's zone-map statistics. +//! +//! Run with: +//! +//! ```sh +//! cargo run --example raster_bench -p vortex --release -- \ +//! --geotiff m_3211428_sw_11_030_20230617_20240228.tif \ +//! --vortex naip.vortex \ +//! --num-crops 16 \ +//! --crop-size 1024 \ +//! --iterations 5 +//! ``` + +use std::fs::File; +use std::future::Future; +use std::path::PathBuf; +use std::time::Duration; +use std::time::Instant; + +use anyhow::Context; +use anyhow::Result; +use anyhow::bail; +use clap::Parser; +use futures::TryStreamExt; +use ndarray::Array3; +use rand::Rng; +use rand::RngExt; +use rand::SeedableRng; +use rand::rngs::StdRng; +use tiff::decoder::Decoder; +use tiff::decoder::DecodingResult; +use tiff::tags::Tag; +use vortex::VortexSessionDefault; +use vortex::array::ArrayRef; +use vortex::array::IntoArray; +use vortex::array::VortexSessionExecute; +use vortex::array::arrays::ChunkedArray; +use vortex::array::arrays::List; +use vortex::array::arrays::ListView; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::StructArray; +use vortex::array::arrays::list::ListArrayExt; +use vortex::array::arrays::listview::ListViewArrayExt; +use vortex::array::arrays::struct_::StructArrayExt; +use vortex::array::expr::Expression; +use vortex::array::expr::and; +use vortex::array::expr::col; +use vortex::array::expr::gt; +use vortex::array::expr::lit; +use vortex::array::expr::lt; +use vortex::array::expr::or; +use vortex::buffer::Buffer; +use vortex::file::OpenOptionsSessionExt; +use vortex::session::VortexSession; +use vortex_array::ExecutionCtx; + +#[derive(Parser, Debug)] +#[command(about = "Benchmark single-tile and random-crop reads against GeoTIFF and Vortex.")] +struct Args { + /// Path to the source tiled GeoTIFF. + #[arg(long)] + geotiff: PathBuf, + + /// Path to the matching Vortex file (one row per tile). + #[arg(long)] + vortex: PathBuf, + + /// How many random crops to sample in the batch benchmark. + #[arg(long, default_value_t = 16)] + num_crops: usize, + + /// Crop edge length in pixels. + #[arg(long, default_value_t = 1024)] + crop_size: u32, + + /// Iterations per benchmark to average over. + #[arg(long, default_value_t = 5)] + iterations: usize, + + /// PRNG seed for reproducible tile / crop selection. + #[arg(long, default_value_t = 0xa10c_u64)] + seed: u64, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + let session = VortexSession::default(); + + let meta = read_tiff_metadata(&args.geotiff)?; + println!( + "geotiff: {w}x{h}, {bands} bands, tiles {tw}x{th} ({tx}x{ty} grid)", + w = meta.image_w, + h = meta.image_h, + bands = meta.bands, + tw = meta.tile_w, + th = meta.tile_h, + tx = meta.tiles_x(), + ty = meta.tiles_y(), + ); + + let mut rng = StdRng::seed_from_u64(args.seed); + + // ---- Single-tile decode ------------------------------------------------------------------ + let tile_count = meta.num_tiles(); + let tile_indices: Vec = (0..args.iterations) + .map(|_| rng.random_range(0..tile_count) as u32) + .collect(); + + let geotiff_single = bench(args.iterations, || { + for &tile_idx in &tile_indices { + let _arr = decode_tiff_tile(&args.geotiff, tile_idx, &meta)?; + } + Ok(()) + })?; + + let vortex_single = bench_async(args.iterations, || async { + for &tile_idx in &tile_indices { + let _arr = decode_vortex_tile(&session, &args.vortex, tile_idx, &meta).await?; + } + Ok(()) + }) + .await?; + + println!( + "\nsingle-tile decode ({} iterations, {} tiles each):\n geotiff: {:>8.3} ms/iter\n vortex: {:>8.3} ms/iter ({:.2}x)", + args.iterations, + tile_indices.len(), + geotiff_single.as_secs_f64() * 1e3, + vortex_single.as_secs_f64() * 1e3, + geotiff_single.as_secs_f64() / vortex_single.as_secs_f64(), + ); + + // ---- Random crop batch ------------------------------------------------------------------- + let crops: Vec = (0..args.num_crops) + .map(|_| Crop::random(&mut rng, &meta, args.crop_size)) + .collect(); + + let geotiff_crops = bench(args.iterations, || { + for crop in &crops { + drop(decode_tiff_crop(&args.geotiff, crop, &meta)?); + } + Ok(()) + })?; + + let vortex_crops = bench_async(args.iterations, || async { + for crop in &crops { + drop(decode_vortex_crop(&session, &args.vortex, crop, &meta).await?); + } + Ok(()) + }) + .await?; + + let vortex_crops_combined = bench_async(args.iterations, || async { + drop(decode_vortex_crops_combined(&session, &args.vortex, &crops, &meta).await?); + Ok(()) + }) + .await?; + + println!( + "\nrandom crop batch ({} crops at {}px, {} iterations):\n \ + geotiff (per-crop loop): {:>8.3} ms/iter\n \ + vortex (per-crop scan): {:>8.3} ms/iter\n \ + vortex (single OR scan): {:>8.3} ms/iter", + args.num_crops, + args.crop_size, + args.iterations, + geotiff_crops.as_secs_f64() * 1e3, + vortex_crops.as_secs_f64() * 1e3, + vortex_crops_combined.as_secs_f64() * 1e3, + ); + + Ok(()) +} + +// ================================================================================================= +// Shared metadata +// ================================================================================================= + +struct RasterMeta { + image_w: u32, + image_h: u32, + tile_w: u32, + tile_h: u32, + bands: usize, + origin_x: f64, + origin_y: f64, + scale_x: f64, + scale_y: f64, + planar_cfg: u32, +} + +impl RasterMeta { + fn tiles_x(&self) -> u32 { + self.image_w.div_ceil(self.tile_w) + } + + fn tiles_y(&self) -> u32 { + self.image_h.div_ceil(self.tile_h) + } + + fn num_tiles(&self) -> usize { + (self.tiles_x() as usize) * (self.tiles_y() as usize) + } + + /// Map a pixel coordinate to CRS coordinates using the image-origin geotransform. + fn pixel_to_world(&self, px: u32, py: u32) -> (f64, f64) { + let x = self.origin_x + (px as f64) * self.scale_x; + let y = self.origin_y - (py as f64) * self.scale_y; + (x, y) + } +} + +fn read_tiff_metadata(path: &PathBuf) -> Result { + let mut decoder = Decoder::new(File::open(path)?)?; + let (image_w, image_h) = decoder.dimensions()?; + let tile_w = decoder.get_tag_u32(Tag::TileWidth)?; + let tile_h = decoder.get_tag_u32(Tag::TileLength)?; + let planar_cfg = decoder.get_tag_u32(Tag::PlanarConfiguration).unwrap_or(1); + + let tiepoint = decoder.get_tag_f64_vec(Tag::ModelTiepointTag)?; + let scale = decoder.get_tag_f64_vec(Tag::ModelPixelScaleTag)?; + if tiepoint.len() < 6 || scale.len() < 2 { + bail!("GeoTIFF is missing a usable geotransform"); + } + let origin_x = tiepoint[3] - tiepoint[0] * scale[0]; + let origin_y = tiepoint[4] + tiepoint[1] * scale[1]; + + // Probe the first chunk to discover how many bands the decoder actually returns. The + // converter does the same probe, so the resulting Vortex schema and the GeoTIFF decode + // path agree on band count even when `SamplesPerPixel` includes extra samples that get + // dropped to fit the photometric `ColorType`. + let probe = match decoder.read_chunk(0)? { + DecodingResult::U8(v) => v, + _ => bail!("only 8-bit GeoTIFFs are supported"), + }; + let (data_w, data_h) = decoder.chunk_data_dimensions(0); + let probe_pixels = (data_w as usize) * (data_h as usize); + if probe_pixels == 0 || probe.len() % probe_pixels != 0 { + bail!( + "first chunk size {} not divisible by chunk pixel count {}", + probe.len(), + probe_pixels + ); + } + let bands = probe.len() / probe_pixels; + + Ok(RasterMeta { + image_w, + image_h, + tile_w, + tile_h, + bands, + origin_x, + origin_y, + scale_x: scale[0], + scale_y: scale[1], + planar_cfg, + }) +} + +// ================================================================================================= +// GeoTIFF decode paths +// ================================================================================================= + +fn decode_tiff_tile(path: &PathBuf, tile_idx: u32, meta: &RasterMeta) -> Result> { + let mut decoder = Decoder::new(File::open(path)?)?; + let tw = meta.tile_w as usize; + let th = meta.tile_h as usize; + let bands = meta.bands; + // Edge tiles are clipped: data dims may be smaller than (tile_w, tile_h). We always emit a + // full-size ndarray and zero-fill the padded region to keep crop/tile shapes uniform. + let (data_w, data_h) = decoder.chunk_data_dimensions(tile_idx); + let dw = data_w as usize; + let dh = data_h as usize; + + let mut out = Array3::::zeros((bands, th, tw)); + match meta.planar_cfg { + 1 => { + let bytes = match decoder.read_chunk(tile_idx)? { + DecodingResult::U8(v) => v, + _ => bail!("only 8-bit GeoTIFFs are supported"), + }; + for b in 0..bands { + for y in 0..dh { + for x in 0..dw { + out[(b, y, x)] = bytes[(y * dw + x) * bands + b]; + } + } + } + } + 2 => { + for b in 0..bands { + let chunk_idx = + u32::try_from(b)?.saturating_mul(u32::try_from(meta.num_tiles())?) + tile_idx; + let bytes = match decoder.read_chunk(chunk_idx)? { + DecodingResult::U8(v) => v, + _ => bail!("only 8-bit GeoTIFFs are supported"), + }; + for y in 0..dh { + for x in 0..dw { + out[(b, y, x)] = bytes[y * dw + x]; + } + } + } + } + other => bail!("unsupported PlanarConfiguration {}", other), + } + Ok(out) +} + +fn decode_tiff_crop(path: &PathBuf, crop: &Crop, meta: &RasterMeta) -> Result>> { + // Read every tile that overlaps the crop. We don't bother clipping into a final crop array, + // since the access cost we want to measure is "fetch all required tile bytes". + let tile_indices = tiles_overlapping_crop(crop, meta); + let mut out = Vec::with_capacity(tile_indices.len()); + for tile_idx in tile_indices { + out.push(decode_tiff_tile(path, tile_idx, meta)?); + } + Ok(out) +} + +// ================================================================================================= +// Vortex decode paths +// ================================================================================================= + +async fn decode_vortex_tile( + session: &VortexSession, + path: &PathBuf, + tile_idx: u32, + meta: &RasterMeta, +) -> Result> { + let file = session.open_options().open_path(path).await?; + let chunks: Vec = file + .scan()? + .with_row_indices(Buffer::::from(vec![tile_idx as u64])) + .into_array_stream()? + .try_collect() + .await?; + + let mut ctx = session.create_execution_ctx(); + let tiles = decode_vortex_chunks(chunks, meta, &mut ctx)?; + tiles + .into_iter() + .next() + .context("scan produced no tiles for the requested row index") +} + +async fn decode_vortex_crop( + session: &VortexSession, + path: &PathBuf, + crop: &Crop, + meta: &RasterMeta, +) -> Result>> { + let file = session.open_options().open_path(path).await?; + let chunks: Vec = file + .scan()? + .with_filter(crop_filter(crop)) + .into_array_stream()? + .try_collect() + .await?; + + let mut ctx = session.create_execution_ctx(); + decode_vortex_chunks(chunks, meta, &mut ctx) +} + +async fn decode_vortex_crops_combined( + session: &VortexSession, + path: &PathBuf, + crops: &[Crop], + meta: &RasterMeta, +) -> Result>> { + // Combine all crop predicates into one OR-tree so we can read every overlapping tile + // through a single scan and let the file's zone map prune the rest. + let Some(filter) = crops + .iter() + .map(crop_filter) + .reduce(or) + else { + return Ok(vec![]); + }; + + let file = session.open_options().open_path(path).await?; + let chunks: Vec = file + .scan()? + .with_filter(filter) + .into_array_stream()? + .try_collect() + .await?; + + let mut ctx = session.create_execution_ctx(); + decode_vortex_chunks(chunks, meta, &mut ctx) +} + +fn decode_vortex_chunks( + chunks: Vec, + meta: &RasterMeta, + ctx: &mut ExecutionCtx, +) -> Result>> { + let total_rows: usize = chunks.iter().map(|c| c.len()).sum(); + if total_rows == 0 || chunks.is_empty() { + return Ok(vec![]); + } + + let dtype = chunks[0].dtype().clone(); + let chunked: StructArray = ChunkedArray::try_new(chunks, dtype)? + .into_array() + .execute(ctx)?; + + let tile_pixels = (meta.tile_w as usize) * (meta.tile_h as usize); + + // For each band column, materialize the elements + per-row [start, end) offsets. Lists may + // come back canonicalized to a `ListView` (`ListView` is the canonical form for list dtypes), + // or they may stay as `List` if the file kept that encoding — handle both. + let mut band_buffers: Vec<(Vec, Vec<(u64, u64)>)> = Vec::with_capacity(meta.bands); + for b in 0..meta.bands { + let name = format!("band_{b}"); + let field: ArrayRef = chunked.unmasked_field_by_name(&name)?.clone().execute(ctx)?; + let (elements, ranges) = if field.is::() { + list_ranges_from_list(&field, ctx)? + } else if field.is::() { + list_ranges_from_view(&field, ctx)? + } else { + bail!( + "expected band column to canonicalize to List or ListView, got {}", + field.encoding_id() + ); + }; + band_buffers.push((elements, ranges)); + } + + let mut out = Vec::with_capacity(total_rows); + for row in 0..total_rows { + let mut arr = Array3::::zeros((meta.bands, meta.tile_h as usize, meta.tile_w as usize)); + for (b, (elements, ranges)) in band_buffers.iter().enumerate() { + let (start, end) = ranges[row]; + let start = start as usize; + let end = end as usize; + if end - start != tile_pixels { + bail!( + "band {b} tile {row} has {} bytes, expected {tile_pixels}", + end - start + ); + } + let tile = &elements[start..end]; + for y in 0..(meta.tile_h as usize) { + for x in 0..(meta.tile_w as usize) { + arr[(b, y, x)] = tile[y * (meta.tile_w as usize) + x]; + } + } + } + out.push(arr); + } + Ok(out) +} + +fn list_ranges_from_list( + field: &ArrayRef, + ctx: &mut ExecutionCtx, +) -> Result<(Vec, Vec<(u64, u64)>)> { + let list = field.as_::(); + let elements: PrimitiveArray = list.elements().clone().execute(ctx)?; + let offsets: PrimitiveArray = list.offsets().clone().execute(ctx)?; + let elem_bytes = elements.as_slice::().to_vec(); + let offsets_u64 = offsets_to_u64(&offsets)?; + let ranges: Vec<(u64, u64)> = offsets_u64 + .windows(2) + .map(|w| (w[0], w[1])) + .collect(); + Ok((elem_bytes, ranges)) +} + +fn list_ranges_from_view( + field: &ArrayRef, + ctx: &mut ExecutionCtx, +) -> Result<(Vec, Vec<(u64, u64)>)> { + let view = field.as_::(); + let elements: PrimitiveArray = view.elements().clone().execute(ctx)?; + let offsets: PrimitiveArray = view.offsets().clone().execute(ctx)?; + let sizes: PrimitiveArray = view.sizes().clone().execute(ctx)?; + let elem_bytes = elements.as_slice::().to_vec(); + let offsets_u64 = offsets_to_u64(&offsets)?; + let sizes_u64 = offsets_to_u64(&sizes)?; + let ranges: Vec<(u64, u64)> = offsets_u64 + .iter() + .zip(sizes_u64.iter()) + .map(|(&o, &s)| (o, o + s)) + .collect(); + Ok((elem_bytes, ranges)) +} + +fn offsets_to_u64(offsets: &PrimitiveArray) -> Result> { + use vortex::dtype::PType; + let result = match offsets.ptype() { + PType::I32 => offsets.as_slice::().iter().map(|&v| v as u64).collect(), + PType::U32 => offsets.as_slice::().iter().map(|&v| v as u64).collect(), + PType::I64 => offsets.as_slice::().iter().map(|&v| v as u64).collect(), + PType::U64 => offsets.as_slice::().to_vec(), + other => bail!("unsupported list offset ptype: {other:?}"), + }; + Ok(result) +} + +// ================================================================================================= +// Crops + filter expressions +// ================================================================================================= + +#[derive(Clone, Debug)] +struct Crop { + pixel_x: u32, + pixel_y: u32, + size: u32, + minx: f64, + miny: f64, + maxx: f64, + maxy: f64, +} + +impl Crop { + fn random(rng: &mut R, meta: &RasterMeta, size: u32) -> Self { + let max_x = meta.image_w.saturating_sub(size); + let max_y = meta.image_h.saturating_sub(size); + let pixel_x = if max_x > 0 { rng.random_range(0..max_x) } else { 0 }; + let pixel_y = if max_y > 0 { rng.random_range(0..max_y) } else { 0 }; + let (x0, y1) = meta.pixel_to_world(pixel_x, pixel_y); + let (x1, y0) = meta.pixel_to_world(pixel_x + size, pixel_y + size); + Crop { + pixel_x, + pixel_y, + size, + minx: x0.min(x1), + maxx: x0.max(x1), + miny: y0.min(y1), + maxy: y0.max(y1), + } + } +} + +fn crop_filter(crop: &Crop) -> Expression { + // A tile overlaps the crop when its bounds are not strictly to one side. With the schema + // produced by the converter (`minx <= maxx`, `miny <= maxy` per row) the overlap check is: + // tile.minx < crop.maxx AND tile.maxx > crop.minx AND + // tile.miny < crop.maxy AND tile.maxy > crop.miny + let x_overlap = and(lt(col("minx"), lit(crop.maxx)), gt(col("maxx"), lit(crop.minx))); + let y_overlap = and(lt(col("miny"), lit(crop.maxy)), gt(col("maxy"), lit(crop.miny))); + and(x_overlap, y_overlap) +} + +fn tiles_overlapping_crop(crop: &Crop, meta: &RasterMeta) -> Vec { + let tx_start = crop.pixel_x / meta.tile_w; + let tx_end = (crop.pixel_x + crop.size - 1) / meta.tile_w; + let ty_start = crop.pixel_y / meta.tile_h; + let ty_end = (crop.pixel_y + crop.size - 1) / meta.tile_h; + let tiles_x = meta.tiles_x(); + + let mut out = Vec::new(); + for ty in ty_start..=ty_end.min(meta.tiles_y().saturating_sub(1)) { + for tx in tx_start..=tx_end.min(tiles_x.saturating_sub(1)) { + out.push(ty * tiles_x + tx); + } + } + out +} + +// ================================================================================================= +// Timing helpers +// ================================================================================================= + +fn bench Result<()>>(iterations: usize, mut f: F) -> Result { + // Warm up once so the first run doesn't inflate the average with cold caches. + f()?; + let start = Instant::now(); + for _ in 0..iterations { + f()?; + } + Ok(start.elapsed() / iterations.max(1) as u32) +} + +async fn bench_async(iterations: usize, mut f: F) -> Result +where + F: FnMut() -> Fut, + Fut: Future>, +{ + f().await?; + let start = Instant::now(); + for _ in 0..iterations { + f().await?; + } + Ok(start.elapsed() / iterations.max(1) as u32) +}