From 03008ce1996c56a652a12cb94813cd1e768bde1e Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 5 May 2026 17:06:52 -0700 Subject: [PATCH 1/5] docs: add DbgExec helper for dumping native stream output Introduce a small ExecutionPlan wrapper, DbgExec, that applies the existing dbg_batch_stream helper to whatever its inner plan emits. This lets contributors drop a one-line wrap around any native operator to see the RecordBatches it actually produces (values, schema, and per-column null buffers), which is hard to infer from a failing DataFrame assertion alone. Document it in the contributor debugging guide with a worked LEAD IGNORE NULLS example. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/source/contributor-guide/debugging.md | 86 ++++++++++++++++++++++ native/core/src/parquet/parquet_exec.rs | 74 +++++++++++++++++++ 2 files changed, 160 insertions(+) diff --git a/docs/source/contributor-guide/debugging.md b/docs/source/contributor-guide/debugging.md index 3356a83893..18766ef76b 100644 --- a/docs/source/contributor-guide/debugging.md +++ b/docs/source/contributor-guide/debugging.md @@ -220,3 +220,89 @@ Example log output: ``` When backtraces are enabled (see earlier section) then backtraces will be included for failed allocations. + +### Dumping native stream output with `DbgExec` + +When a native operator is suspected of producing wrong data (wrong values, wrong +nullability, wrong row count) but the JVM-side observable output is just a DataFrame +mismatch, it is useful to inspect the `RecordBatch`es that the operator actually +emits. Comet ships two small helpers in `native/core/src/parquet/parquet_exec.rs`: + +- `dbg_batch_stream(stream)` — wraps an existing `SendableRecordBatchStream` so that + every batch, schema, and per-column null buffer is dumped to stderr as the stream + is polled. +- `DbgExec::new(label, inner)` — an `ExecutionPlan` wrapper that applies + `dbg_batch_stream` to whatever its inner plan produces on `execute()`. Because it + is itself an `ExecutionPlan` it can be inserted anywhere in the physical plan + tree built by `PhysicalPlanner::create_plan` in `native/core/src/execution/planner.rs`. + +`DbgExec` forwards `schema`, `properties`, `children`, and `with_new_children` to the +inner plan, so slotting it in does not change operator semantics — it only adds +printing. + +#### Using `DbgExec` to inspect an operator's output + +Import the wrapper in `planner.rs`: + +```rust +use crate::parquet::parquet_exec::DbgExec; +``` + +Then wrap whichever `Arc` you want to see the batches of. For +example, to dump the output of the window operator: + +```rust +let window_agg: Arc = Arc::new(WindowAggExec::try_new( + window_expr?, + Arc::clone(&child.native_plan), + !partition_exprs.is_empty(), +)?); + +// TEMPORARY: print every batch emitted by the window operator. +let window_agg: Arc = + Arc::new(DbgExec::new("window", window_agg)); +``` + +Rebuild with `make core` and run the JVM test that exercises the operator. Each +emitted batch is printed to stderr with `dbg!`. The output looks like this for a +`LEAD(c) IGNORE NULLS OVER (PARTITION BY a ORDER BY b)` query over five rows: + +```text +[comet-debug] DbgExec[window] execute(partition=0) +[core/src/parquet/parquet_exec.rs:225:17] batch = RecordBatch { + schema: Schema { fields: [ ... "col_0" Int32, "col_1" Int32, "col_2" Int32, "lead" Int32 ] }, + columns: [ + PrimitiveArray [1, 1, 1, 2, 2], // partition key a + PrimitiveArray [1, 2, 3, 1, 2], // order key b + PrimitiveArray [10, null, 30, null, 20], // value c + PrimitiveArray [null, null, null, null, null], // lead(c) + ], + row_count: 5, +} +[core/src/parquet/parquet_exec.rs:227:21] col_idx = 3 +[core/src/parquet/parquet_exec.rs:227:21] column.nulls() = Some( + NullBuffer { ..., null_count: 5 }, +) +``` + +From the dump you can see immediately that the native `lead` column is all-null +(`null_count: 5` out of 5 rows) while the inputs are correctly sorted — pinpointing +the bug to the native operator rather than to the scan, the plan shape, or the +JVM-side comparison. + +#### Where to place `DbgExec` + +Good insertion points when debugging: + +- **Right after the operator under test**, to see exactly what that operator emits. +- **Right before the operator under test**, to confirm what its input looked like + (rules out upstream issues). +- **Both**, with different labels (`DbgExec::new("window-in", ...)` / + `DbgExec::new("window-out", ...)`), to diff input vs output. + +Because `DbgExec` is a one-line wrap, it is fine to scatter several throughout the +plan during a debugging session and remove them afterwards. The `label` is part of +every line it emits, so multiple wrappers stay easy to tell apart. + +`DbgExec` is a debugging aid — remove the wrap (and, if you added one, the +`use crate::parquet::parquet_exec::DbgExec;` import) before committing. diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index ef4c878b9a..8b0e613f63 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -230,3 +230,77 @@ pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatc }); Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream)) } + +/// Execution plan wrapper that prints each batch produced by `inner` at +/// runtime. Wrap an operator (e.g. `WindowAggExec`) with this to see what +/// its output stream actually contains during JVM test execution. +/// +/// See `docs/source/contributor-guide/debugging.md` for a walkthrough. +#[derive(Debug)] +pub struct DbgExec { + label: String, + inner: Arc, +} + +impl DbgExec { + pub fn new( + label: impl Into, + inner: Arc, + ) -> Self { + Self { + label: label.into(), + inner, + } + } +} + +impl datafusion::physical_plan::DisplayAs for DbgExec { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "DbgExec[{}]", self.label) + } +} + +impl datafusion::physical_plan::ExecutionPlan for DbgExec { + fn name(&self) -> &str { + "DbgExec" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &Arc { + self.inner.properties() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion::common::Result> { + Ok(Arc::new(DbgExec::new( + self.label.clone(), + Arc::clone(&children[0]), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> datafusion::common::Result { + eprintln!( + "[comet-debug] DbgExec[{}] execute(partition={})", + self.label, partition + ); + let stream = self.inner.execute(partition, context)?; + Ok(dbg_batch_stream(stream)) + } +} From 1589c916d351a928998640d3639375afe674e4bc Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 5 May 2026 17:17:43 -0700 Subject: [PATCH 2/5] doc: Comet query runtime data debugging --- docs/source/contributor-guide/debugging.md | 127 ++++++++++++++++++--- native/core/src/parquet/parquet_exec.rs | 98 ---------------- 2 files changed, 114 insertions(+), 111 deletions(-) diff --git a/docs/source/contributor-guide/debugging.md b/docs/source/contributor-guide/debugging.md index 18766ef76b..20ab603eb8 100644 --- a/docs/source/contributor-guide/debugging.md +++ b/docs/source/contributor-guide/debugging.md @@ -221,28 +221,129 @@ Example log output: When backtraces are enabled (see earlier section) then backtraces will be included for failed allocations. -### Dumping native stream output with `DbgExec` +### Dumping native stream output with a `DbgExec` wrapper When a native operator is suspected of producing wrong data (wrong values, wrong nullability, wrong row count) but the JVM-side observable output is just a DataFrame mismatch, it is useful to inspect the `RecordBatch`es that the operator actually -emits. Comet ships two small helpers in `native/core/src/parquet/parquet_exec.rs`: +emits. A small `ExecutionPlan` wrapper that prints every batch as it flows through +makes this easy. Comet does not ship this wrapper in the source tree — paste it +into a convenient module (for example `native/core/src/parquet/parquet_exec.rs`) +for the duration of your debugging session and remove it before committing. -- `dbg_batch_stream(stream)` — wraps an existing `SendableRecordBatchStream` so that - every batch, schema, and per-column null buffer is dumped to stderr as the stream - is polled. -- `DbgExec::new(label, inner)` — an `ExecutionPlan` wrapper that applies - `dbg_batch_stream` to whatever its inner plan produces on `execute()`. Because it - is itself an `ExecutionPlan` it can be inserted anywhere in the physical plan - tree built by `PhysicalPlanner::create_plan` in `native/core/src/execution/planner.rs`. +#### Reference implementation + +```rust +use std::sync::Arc; + +use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; + +/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through. +/// Returns a new `SendableRecordBatchStream` that yields the same batches. +pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream { + use futures::StreamExt; + let schema = stream.schema(); + let printing_stream = stream.map(|batch_result| { + match &batch_result { + Ok(batch) => { + dbg!(batch, batch.schema()); + for (col_idx, column) in batch.columns().iter().enumerate() { + dbg!(col_idx, column, column.nulls()); + } + } + Err(e) => { + println!("batch error: {:?}", e); + } + } + batch_result + }); + Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream)) +} + +/// Execution plan wrapper that prints each batch produced by `inner` at +/// runtime. Wrap an operator (e.g. `WindowAggExec`) with this to see what +/// its output stream actually contains during JVM test execution. +#[derive(Debug)] +pub struct DbgExec { + label: String, + inner: Arc, +} + +impl DbgExec { + pub fn new( + label: impl Into, + inner: Arc, + ) -> Self { + Self { + label: label.into(), + inner, + } + } +} + +impl datafusion::physical_plan::DisplayAs for DbgExec { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "DbgExec[{}]", self.label) + } +} + +impl datafusion::physical_plan::ExecutionPlan for DbgExec { + fn name(&self) -> &str { + "DbgExec" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &Arc { + self.inner.properties() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion::common::Result> { + Ok(Arc::new(DbgExec::new( + self.label.clone(), + Arc::clone(&children[0]), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> datafusion::common::Result { + eprintln!( + "[comet-debug] DbgExec[{}] execute(partition={})", + self.label, partition + ); + let stream = self.inner.execute(partition, context)?; + Ok(dbg_batch_stream(stream)) + } +} +``` `DbgExec` forwards `schema`, `properties`, `children`, and `with_new_children` to the inner plan, so slotting it in does not change operator semantics — it only adds -printing. +printing. Because it is itself an `ExecutionPlan` it can be inserted anywhere in +the physical plan tree built by `PhysicalPlanner::create_plan` in +`native/core/src/execution/planner.rs`. #### Using `DbgExec` to inspect an operator's output -Import the wrapper in `planner.rs`: +Import the wrapper in `planner.rs` (assuming you pasted it into +`parquet_exec.rs`): ```rust use crate::parquet::parquet_exec::DbgExec; @@ -304,5 +405,5 @@ Because `DbgExec` is a one-line wrap, it is fine to scatter several throughout t plan during a debugging session and remove them afterwards. The `label` is part of every line it emits, so multiple wrappers stay easy to tell apart. -`DbgExec` is a debugging aid — remove the wrap (and, if you added one, the -`use crate::parquet::parquet_exec::DbgExec;` import) before committing. +`DbgExec` is a debugging aid — remove the wrapper definition, any wraps, and the +`use crate::parquet::parquet_exec::DbgExec;` import before committing. diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index 8b0e613f63..dae11a9a41 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -28,11 +28,9 @@ use datafusion::datasource::physical_plan::{ }; use datafusion::datasource::source::DataSourceExec; use datafusion::execution::object_store::ObjectStoreUrl; -use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_expr::expressions::{BinaryExpr, Column}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_expr_adapter::PhysicalExprAdapterFactory; -use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::prelude::SessionContext; use datafusion::scalar::ScalarValue; use datafusion_comet_spark_expr::EvalMode; @@ -208,99 +206,3 @@ fn get_options( (table_parquet_options, spark_parquet_options) } - -/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through. -/// Returns a new `SendableRecordBatchStream` that yields the same batches. -pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream { - use futures::StreamExt; - let schema = stream.schema(); - let printing_stream = stream.map(|batch_result| { - match &batch_result { - Ok(batch) => { - dbg!(batch, batch.schema()); - for (col_idx, column) in batch.columns().iter().enumerate() { - dbg!(col_idx, column, column.nulls()); - } - } - Err(e) => { - println!("batch error: {:?}", e); - } - } - batch_result - }); - Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream)) -} - -/// Execution plan wrapper that prints each batch produced by `inner` at -/// runtime. Wrap an operator (e.g. `WindowAggExec`) with this to see what -/// its output stream actually contains during JVM test execution. -/// -/// See `docs/source/contributor-guide/debugging.md` for a walkthrough. -#[derive(Debug)] -pub struct DbgExec { - label: String, - inner: Arc, -} - -impl DbgExec { - pub fn new( - label: impl Into, - inner: Arc, - ) -> Self { - Self { - label: label.into(), - inner, - } - } -} - -impl datafusion::physical_plan::DisplayAs for DbgExec { - fn fmt_as( - &self, - _t: datafusion::physical_plan::DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - write!(f, "DbgExec[{}]", self.label) - } -} - -impl datafusion::physical_plan::ExecutionPlan for DbgExec { - fn name(&self) -> &str { - "DbgExec" - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn properties(&self) -> &Arc { - self.inner.properties() - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.inner] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> datafusion::common::Result> { - Ok(Arc::new(DbgExec::new( - self.label.clone(), - Arc::clone(&children[0]), - ))) - } - - fn execute( - &self, - partition: usize, - context: Arc, - ) -> datafusion::common::Result { - eprintln!( - "[comet-debug] DbgExec[{}] execute(partition={})", - self.label, partition - ); - let stream = self.inner.execute(partition, context)?; - Ok(dbg_batch_stream(stream)) - } -} From 3d5e688b939e7f2aae8bb72d9a7ab7b2b0ce61ca Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 5 May 2026 17:36:14 -0700 Subject: [PATCH 3/5] doc: Comet query runtime data debugging --- docs/source/contributor-guide/debugging.md | 181 +++++++++++++++++++++ 1 file changed, 181 insertions(+) diff --git a/docs/source/contributor-guide/debugging.md b/docs/source/contributor-guide/debugging.md index 20ab603eb8..54652586e9 100644 --- a/docs/source/contributor-guide/debugging.md +++ b/docs/source/contributor-guide/debugging.md @@ -407,3 +407,184 @@ every line it emits, so multiple wrappers stay easy to tell apart. `DbgExec` is a debugging aid — remove the wrapper definition, any wraps, and the `use crate::parquet::parquet_exec::DbgExec;` import before committing. + +### Dumping expression inputs and outputs with a `DbgExpr` wrapper + +`DbgExec` works at the operator level. When the suspect is a single +**expression** — a cast, a binary op, a `CASE WHEN` predicate, a UDF — an +`ExecutionPlan` wrapper is too coarse. The equivalent trick at the +`PhysicalExpr` level is a small wrapper that forwards `evaluate()` to an inner +expression and prints both the input `RecordBatch` and the resulting +`ColumnarValue`. Like `DbgExec`, this wrapper is not shipped in the source tree +— paste it in for a debugging session and remove it before committing. + +#### Reference implementation + +```rust +use std::any::Any; +use std::fmt; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use arrow::array::RecordBatch; +use arrow::datatypes::{DataType, Schema}; +use datafusion::common::Result; +use datafusion::logical_expr::ColumnarValue; +use datafusion::physical_expr::PhysicalExpr; + +/// `PhysicalExpr` wrapper that prints every `evaluate()` call: +/// - the input `RecordBatch` (rows / columns / schema), +/// - the resulting `ColumnarValue` (array + null buffer, or scalar). +/// +/// Wrap any `Arc` produced by `PhysicalPlanner::create_expr` +/// in `planner.rs` to see exactly what it receives and returns at runtime. +#[derive(Debug)] +pub struct DbgExpr { + label: String, + inner: Arc, +} + +impl DbgExpr { + pub fn new(label: impl Into, inner: Arc) -> Self { + Self { + label: label.into(), + inner, + } + } +} + +impl fmt::Display for DbgExpr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "DbgExpr[{}]({})", self.label, self.inner) + } +} + +impl PartialEq for DbgExpr { + fn eq(&self, other: &Self) -> bool { + self.label == other.label && self.inner.eq(&other.inner) + } +} +impl Eq for DbgExpr {} +impl Hash for DbgExpr { + fn hash(&self, state: &mut H) { + self.label.hash(state); + self.inner.hash(state); + } +} + +impl PhysicalExpr for DbgExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, input_schema: &Schema) -> Result { + self.inner.data_type(input_schema) + } + + fn nullable(&self, input_schema: &Schema) -> Result { + self.inner.nullable(input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + eprintln!( + "[comet-debug] DbgExpr[{}].evaluate(rows={}, cols={})", + self.label, + batch.num_rows(), + batch.num_columns() + ); + dbg!(batch, batch.schema()); + let out = self.inner.evaluate(batch)?; + match &out { + ColumnarValue::Array(arr) => { + dbg!(arr.len(), arr.nulls(), arr); + } + ColumnarValue::Scalar(s) => { + dbg!(s); + } + } + Ok(out) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(DbgExpr::new( + self.label.clone(), + Arc::clone(&children[0]), + ))) + } + + fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt_sql(f) + } +} +``` + +`DbgExpr` forwards `data_type`, `nullable`, `children`, and `with_new_children` +to the inner expression, so wrapping it does not change semantics — it only adds +printing on every `evaluate()` call. + +#### Using `DbgExpr` in `planner.rs` + +`PhysicalPlanner::create_expr` returns `Arc`, so any expression +produced during plan building can be one-line-wrapped. For example, to dump what +a `CASE WHEN` predicate sees and produces: + +```rust +use crate::parquet::parquet_exec::DbgExpr; // or wherever you pasted it + +// Before: +let predicate = self.create_expr(when_expr, Arc::clone(&input_schema))?; + +// After — TEMPORARY, remove before committing: +let predicate: Arc = + Arc::new(DbgExpr::new("case-when-predicate", predicate)); +``` + +To trace every argument of a suspicious scalar function, wrap each child as it +is built: + +```rust +let args: Vec> = expr + .children + .iter() + .enumerate() + .map(|(i, c)| { + let child = self.create_expr(c, Arc::clone(&input_schema))?; + Ok::<_, ExecutionError>(Arc::new(DbgExpr::new( + format!("arg{i}"), + child, + )) as Arc) + }) + .collect::>()?; +``` + +Rebuild with `make core` and run the JVM test. Sample output for a `BinaryExpr` +computing `a + b` over a three-row batch: + +```text +[comet-debug] DbgExpr[add].evaluate(rows=3, cols=2) +[core/src/execution/planner.rs:…] batch = RecordBatch { columns: [Int32[1,2,3], Int32[10,20,30]], row_count: 3 } +[core/src/execution/planner.rs:…] arr.len() = 3 +[core/src/execution/planner.rs:…] arr.nulls() = None +[core/src/execution/planner.rs:…] arr = PrimitiveArray [11, 22, 33] +``` + +#### When to reach for `DbgExpr` vs `DbgExec` + +- Use **`DbgExec`** when you suspect an *operator* (scan, window, sort, aggregate) + is emitting wrong batches — you want to see what crosses operator boundaries. +- Use **`DbgExpr`** when the operator looks fine but a specific *expression* + inside a projection, filter, or window function is returning wrong values or + nullability — you want to see what one expression receives and computes. +- They compose: wrap the suspect expression with `DbgExpr`, and wrap the + operator that evaluates it with `DbgExec`, to correlate per-expression + behavior with the batches the operator is producing overall. + +`DbgExpr` is a debugging aid — remove the wrapper definition, any wraps, and the +`use crate::parquet::parquet_exec::DbgExpr;` import before committing. From 4b1bc6e707675b8a7ea5f6f5ee1017c9e59acc08 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 5 May 2026 17:44:11 -0700 Subject: [PATCH 4/5] doc: Comet query runtime data debugging --- docs/source/contributor-guide/debugging.md | 296 --------------------- 1 file changed, 296 deletions(-) diff --git a/docs/source/contributor-guide/debugging.md b/docs/source/contributor-guide/debugging.md index 54652586e9..995cabd89e 100644 --- a/docs/source/contributor-guide/debugging.md +++ b/docs/source/contributor-guide/debugging.md @@ -231,183 +231,12 @@ makes this easy. Comet does not ship this wrapper in the source tree — paste i into a convenient module (for example `native/core/src/parquet/parquet_exec.rs`) for the duration of your debugging session and remove it before committing. -#### Reference implementation - -```rust -use std::sync::Arc; - -use datafusion::execution::SendableRecordBatchStream; -use datafusion::physical_plan::stream::RecordBatchStreamAdapter; - -/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through. -/// Returns a new `SendableRecordBatchStream` that yields the same batches. -pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream { - use futures::StreamExt; - let schema = stream.schema(); - let printing_stream = stream.map(|batch_result| { - match &batch_result { - Ok(batch) => { - dbg!(batch, batch.schema()); - for (col_idx, column) in batch.columns().iter().enumerate() { - dbg!(col_idx, column, column.nulls()); - } - } - Err(e) => { - println!("batch error: {:?}", e); - } - } - batch_result - }); - Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream)) -} - -/// Execution plan wrapper that prints each batch produced by `inner` at -/// runtime. Wrap an operator (e.g. `WindowAggExec`) with this to see what -/// its output stream actually contains during JVM test execution. -#[derive(Debug)] -pub struct DbgExec { - label: String, - inner: Arc, -} - -impl DbgExec { - pub fn new( - label: impl Into, - inner: Arc, - ) -> Self { - Self { - label: label.into(), - inner, - } - } -} - -impl datafusion::physical_plan::DisplayAs for DbgExec { - fn fmt_as( - &self, - _t: datafusion::physical_plan::DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - write!(f, "DbgExec[{}]", self.label) - } -} - -impl datafusion::physical_plan::ExecutionPlan for DbgExec { - fn name(&self) -> &str { - "DbgExec" - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn properties(&self) -> &Arc { - self.inner.properties() - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.inner] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> datafusion::common::Result> { - Ok(Arc::new(DbgExec::new( - self.label.clone(), - Arc::clone(&children[0]), - ))) - } - - fn execute( - &self, - partition: usize, - context: Arc, - ) -> datafusion::common::Result { - eprintln!( - "[comet-debug] DbgExec[{}] execute(partition={})", - self.label, partition - ); - let stream = self.inner.execute(partition, context)?; - Ok(dbg_batch_stream(stream)) - } -} -``` - `DbgExec` forwards `schema`, `properties`, `children`, and `with_new_children` to the inner plan, so slotting it in does not change operator semantics — it only adds printing. Because it is itself an `ExecutionPlan` it can be inserted anywhere in the physical plan tree built by `PhysicalPlanner::create_plan` in `native/core/src/execution/planner.rs`. -#### Using `DbgExec` to inspect an operator's output - -Import the wrapper in `planner.rs` (assuming you pasted it into -`parquet_exec.rs`): - -```rust -use crate::parquet::parquet_exec::DbgExec; -``` - -Then wrap whichever `Arc` you want to see the batches of. For -example, to dump the output of the window operator: - -```rust -let window_agg: Arc = Arc::new(WindowAggExec::try_new( - window_expr?, - Arc::clone(&child.native_plan), - !partition_exprs.is_empty(), -)?); - -// TEMPORARY: print every batch emitted by the window operator. -let window_agg: Arc = - Arc::new(DbgExec::new("window", window_agg)); -``` - -Rebuild with `make core` and run the JVM test that exercises the operator. Each -emitted batch is printed to stderr with `dbg!`. The output looks like this for a -`LEAD(c) IGNORE NULLS OVER (PARTITION BY a ORDER BY b)` query over five rows: - -```text -[comet-debug] DbgExec[window] execute(partition=0) -[core/src/parquet/parquet_exec.rs:225:17] batch = RecordBatch { - schema: Schema { fields: [ ... "col_0" Int32, "col_1" Int32, "col_2" Int32, "lead" Int32 ] }, - columns: [ - PrimitiveArray [1, 1, 1, 2, 2], // partition key a - PrimitiveArray [1, 2, 3, 1, 2], // order key b - PrimitiveArray [10, null, 30, null, 20], // value c - PrimitiveArray [null, null, null, null, null], // lead(c) - ], - row_count: 5, -} -[core/src/parquet/parquet_exec.rs:227:21] col_idx = 3 -[core/src/parquet/parquet_exec.rs:227:21] column.nulls() = Some( - NullBuffer { ..., null_count: 5 }, -) -``` - -From the dump you can see immediately that the native `lead` column is all-null -(`null_count: 5` out of 5 rows) while the inputs are correctly sorted — pinpointing -the bug to the native operator rather than to the scan, the plan shape, or the -JVM-side comparison. - -#### Where to place `DbgExec` - -Good insertion points when debugging: - -- **Right after the operator under test**, to see exactly what that operator emits. -- **Right before the operator under test**, to confirm what its input looked like - (rules out upstream issues). -- **Both**, with different labels (`DbgExec::new("window-in", ...)` / - `DbgExec::new("window-out", ...)`), to diff input vs output. - -Because `DbgExec` is a one-line wrap, it is fine to scatter several throughout the -plan during a debugging session and remove them afterwards. The `label` is part of -every line it emits, so multiple wrappers stay easy to tell apart. - -`DbgExec` is a debugging aid — remove the wrapper definition, any wraps, and the -`use crate::parquet::parquet_exec::DbgExec;` import before committing. - ### Dumping expression inputs and outputs with a `DbgExpr` wrapper `DbgExec` works at the operator level. When the suspect is a single @@ -418,117 +247,6 @@ expression and prints both the input `RecordBatch` and the resulting `ColumnarValue`. Like `DbgExec`, this wrapper is not shipped in the source tree — paste it in for a debugging session and remove it before committing. -#### Reference implementation - -```rust -use std::any::Any; -use std::fmt; -use std::hash::{Hash, Hasher}; -use std::sync::Arc; - -use arrow::array::RecordBatch; -use arrow::datatypes::{DataType, Schema}; -use datafusion::common::Result; -use datafusion::logical_expr::ColumnarValue; -use datafusion::physical_expr::PhysicalExpr; - -/// `PhysicalExpr` wrapper that prints every `evaluate()` call: -/// - the input `RecordBatch` (rows / columns / schema), -/// - the resulting `ColumnarValue` (array + null buffer, or scalar). -/// -/// Wrap any `Arc` produced by `PhysicalPlanner::create_expr` -/// in `planner.rs` to see exactly what it receives and returns at runtime. -#[derive(Debug)] -pub struct DbgExpr { - label: String, - inner: Arc, -} - -impl DbgExpr { - pub fn new(label: impl Into, inner: Arc) -> Self { - Self { - label: label.into(), - inner, - } - } -} - -impl fmt::Display for DbgExpr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "DbgExpr[{}]({})", self.label, self.inner) - } -} - -impl PartialEq for DbgExpr { - fn eq(&self, other: &Self) -> bool { - self.label == other.label && self.inner.eq(&other.inner) - } -} -impl Eq for DbgExpr {} -impl Hash for DbgExpr { - fn hash(&self, state: &mut H) { - self.label.hash(state); - self.inner.hash(state); - } -} - -impl PhysicalExpr for DbgExpr { - fn as_any(&self) -> &dyn Any { - self - } - - fn data_type(&self, input_schema: &Schema) -> Result { - self.inner.data_type(input_schema) - } - - fn nullable(&self, input_schema: &Schema) -> Result { - self.inner.nullable(input_schema) - } - - fn evaluate(&self, batch: &RecordBatch) -> Result { - eprintln!( - "[comet-debug] DbgExpr[{}].evaluate(rows={}, cols={})", - self.label, - batch.num_rows(), - batch.num_columns() - ); - dbg!(batch, batch.schema()); - let out = self.inner.evaluate(batch)?; - match &out { - ColumnarValue::Array(arr) => { - dbg!(arr.len(), arr.nulls(), arr); - } - ColumnarValue::Scalar(s) => { - dbg!(s); - } - } - Ok(out) - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.inner] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> Result> { - Ok(Arc::new(DbgExpr::new( - self.label.clone(), - Arc::clone(&children[0]), - ))) - } - - fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.inner.fmt_sql(f) - } -} -``` - -`DbgExpr` forwards `data_type`, `nullable`, `children`, and `with_new_children` -to the inner expression, so wrapping it does not change semantics — it only adds -printing on every `evaluate()` call. - #### Using `DbgExpr` in `planner.rs` `PhysicalPlanner::create_expr` returns `Arc`, so any expression @@ -574,17 +292,3 @@ computing `a + b` over a three-row batch: [core/src/execution/planner.rs:…] arr.nulls() = None [core/src/execution/planner.rs:…] arr = PrimitiveArray [11, 22, 33] ``` - -#### When to reach for `DbgExpr` vs `DbgExec` - -- Use **`DbgExec`** when you suspect an *operator* (scan, window, sort, aggregate) - is emitting wrong batches — you want to see what crosses operator boundaries. -- Use **`DbgExpr`** when the operator looks fine but a specific *expression* - inside a projection, filter, or window function is returning wrong values or - nullability — you want to see what one expression receives and computes. -- They compose: wrap the suspect expression with `DbgExpr`, and wrap the - operator that evaluates it with `DbgExec`, to correlate per-expression - behavior with the batches the operator is producing overall. - -`DbgExpr` is a debugging aid — remove the wrapper definition, any wraps, and the -`use crate::parquet::parquet_exec::DbgExpr;` import before committing. From 1c3240fb049965ff27094af0afec99350b509b72 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 5 May 2026 17:55:34 -0700 Subject: [PATCH 5/5] doc: Comet query runtime data debugging --- docs/source/contributor-guide/debugging.md | 181 +++++++++++++++++++++ 1 file changed, 181 insertions(+) diff --git a/docs/source/contributor-guide/debugging.md b/docs/source/contributor-guide/debugging.md index 995cabd89e..0907e2f5c3 100644 --- a/docs/source/contributor-guide/debugging.md +++ b/docs/source/contributor-guide/debugging.md @@ -249,6 +249,187 @@ expression and prints both the input `RecordBatch` and the resulting #### Using `DbgExpr` in `planner.rs` +Paste the wrappers below into a convenient module (for example +`native/core/src/parquet/parquet_exec.rs`). The first block is `DbgExec` (wraps +an `ExecutionPlan`); the second is `DbgExpr` (wraps a `PhysicalExpr`). You only +need whichever one matches the granularity you want to trace — they are +independent. + +```rust +use std::sync::Arc; + +use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; + +/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through. +pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream { + use futures::StreamExt; + let schema = stream.schema(); + let printing_stream = stream.map(|batch_result| { + match &batch_result { + Ok(batch) => { + dbg!(batch, batch.schema()); + for (col_idx, column) in batch.columns().iter().enumerate() { + dbg!(col_idx, column, column.nulls()); + } + } + Err(e) => { + println!("batch error: {:?}", e); + } + } + batch_result + }); + Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream)) +} + +/// `ExecutionPlan` wrapper that prints every batch produced by `inner`. +#[derive(Debug)] +pub struct DbgExec { + label: String, + inner: Arc, +} + +impl DbgExec { + pub fn new( + label: impl Into, + inner: Arc, + ) -> Self { + Self { + label: label.into(), + inner, + } + } +} + +impl datafusion::physical_plan::DisplayAs for DbgExec { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "DbgExec[{}]", self.label) + } +} + +impl datafusion::physical_plan::ExecutionPlan for DbgExec { + fn name(&self) -> &str { "DbgExec" } + fn as_any(&self) -> &dyn std::any::Any { self } + fn properties(&self) -> &Arc { + self.inner.properties() + } + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion::common::Result> { + Ok(Arc::new(DbgExec::new( + self.label.clone(), + Arc::clone(&children[0]), + ))) + } + fn execute( + &self, + partition: usize, + context: Arc, + ) -> datafusion::common::Result { + eprintln!( + "[comet-debug] DbgExec[{}] execute(partition={})", + self.label, partition + ); + let stream = self.inner.execute(partition, context)?; + Ok(dbg_batch_stream(stream)) + } +} +``` + +```rust +use std::any::Any; +use std::fmt; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use arrow::array::RecordBatch; +use arrow::datatypes::{DataType, Schema}; +use datafusion::common::Result; +use datafusion::logical_expr::ColumnarValue; +use datafusion::physical_expr::PhysicalExpr; + +/// `PhysicalExpr` wrapper that prints every `evaluate()` call: input +/// `RecordBatch` and the resulting `ColumnarValue`. +#[derive(Debug)] +pub struct DbgExpr { + label: String, + inner: Arc, +} + +impl DbgExpr { + pub fn new(label: impl Into, inner: Arc) -> Self { + Self { label: label.into(), inner } + } +} + +impl fmt::Display for DbgExpr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "DbgExpr[{}]({})", self.label, self.inner) + } +} + +impl PartialEq for DbgExpr { + fn eq(&self, other: &Self) -> bool { + self.label == other.label && self.inner.eq(&other.inner) + } +} +impl Eq for DbgExpr {} +impl Hash for DbgExpr { + fn hash(&self, state: &mut H) { + self.label.hash(state); + self.inner.hash(state); + } +} + +impl PhysicalExpr for DbgExpr { + fn as_any(&self) -> &dyn Any { self } + fn data_type(&self, input_schema: &Schema) -> Result { + self.inner.data_type(input_schema) + } + fn nullable(&self, input_schema: &Schema) -> Result { + self.inner.nullable(input_schema) + } + fn evaluate(&self, batch: &RecordBatch) -> Result { + eprintln!( + "[comet-debug] DbgExpr[{}].evaluate(rows={}, cols={})", + self.label, + batch.num_rows(), + batch.num_columns() + ); + dbg!(batch, batch.schema()); + let out = self.inner.evaluate(batch)?; + match &out { + ColumnarValue::Array(arr) => { dbg!(arr.len(), arr.nulls(), arr); } + ColumnarValue::Scalar(s) => { dbg!(s); } + } + Ok(out) + } + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(DbgExpr::new( + self.label.clone(), + Arc::clone(&children[0]), + ))) + } + fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt_sql(f) + } +} +``` + `PhysicalPlanner::create_expr` returns `Arc`, so any expression produced during plan building can be one-line-wrapped. For example, to dump what a `CASE WHEN` predicate sees and produces: