diff --git a/docs/source/contributor-guide/debugging.md b/docs/source/contributor-guide/debugging.md index 3356a83893..0907e2f5c3 100644 --- a/docs/source/contributor-guide/debugging.md +++ b/docs/source/contributor-guide/debugging.md @@ -220,3 +220,256 @@ Example log output: ``` When backtraces are enabled (see earlier section) then backtraces will be included for failed allocations. + +### Dumping native stream output with a `DbgExec` wrapper + +When a native operator is suspected of producing wrong data (wrong values, wrong +nullability, wrong row count) but the JVM-side observable output is just a DataFrame +mismatch, it is useful to inspect the `RecordBatch`es that the operator actually +emits. A small `ExecutionPlan` wrapper that prints every batch as it flows through +makes this easy. Comet does not ship this wrapper in the source tree — paste it +into a convenient module (for example `native/core/src/parquet/parquet_exec.rs`) +for the duration of your debugging session and remove it before committing. + +`DbgExec` forwards `schema`, `properties`, `children`, and `with_new_children` to the +inner plan, so slotting it in does not change operator semantics — it only adds +printing. Because it is itself an `ExecutionPlan` it can be inserted anywhere in +the physical plan tree built by `PhysicalPlanner::create_plan` in +`native/core/src/execution/planner.rs`. + +### Dumping expression inputs and outputs with a `DbgExpr` wrapper + +`DbgExec` works at the operator level. When the suspect is a single +**expression** — a cast, a binary op, a `CASE WHEN` predicate, a UDF — an +`ExecutionPlan` wrapper is too coarse. The equivalent trick at the +`PhysicalExpr` level is a small wrapper that forwards `evaluate()` to an inner +expression and prints both the input `RecordBatch` and the resulting +`ColumnarValue`. Like `DbgExec`, this wrapper is not shipped in the source tree +— paste it in for a debugging session and remove it before committing. + +#### Using `DbgExpr` in `planner.rs` + +Paste the wrappers below into a convenient module (for example +`native/core/src/parquet/parquet_exec.rs`). The first block is `DbgExec` (wraps +an `ExecutionPlan`); the second is `DbgExpr` (wraps a `PhysicalExpr`). You only +need whichever one matches the granularity you want to trace — they are +independent. + +```rust +use std::sync::Arc; + +use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; + +/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through. +pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream { + use futures::StreamExt; + let schema = stream.schema(); + let printing_stream = stream.map(|batch_result| { + match &batch_result { + Ok(batch) => { + dbg!(batch, batch.schema()); + for (col_idx, column) in batch.columns().iter().enumerate() { + dbg!(col_idx, column, column.nulls()); + } + } + Err(e) => { + println!("batch error: {:?}", e); + } + } + batch_result + }); + Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream)) +} + +/// `ExecutionPlan` wrapper that prints every batch produced by `inner`. +#[derive(Debug)] +pub struct DbgExec { + label: String, + inner: Arc, +} + +impl DbgExec { + pub fn new( + label: impl Into, + inner: Arc, + ) -> Self { + Self { + label: label.into(), + inner, + } + } +} + +impl datafusion::physical_plan::DisplayAs for DbgExec { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "DbgExec[{}]", self.label) + } +} + +impl datafusion::physical_plan::ExecutionPlan for DbgExec { + fn name(&self) -> &str { "DbgExec" } + fn as_any(&self) -> &dyn std::any::Any { self } + fn properties(&self) -> &Arc { + self.inner.properties() + } + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion::common::Result> { + Ok(Arc::new(DbgExec::new( + self.label.clone(), + Arc::clone(&children[0]), + ))) + } + fn execute( + &self, + partition: usize, + context: Arc, + ) -> datafusion::common::Result { + eprintln!( + "[comet-debug] DbgExec[{}] execute(partition={})", + self.label, partition + ); + let stream = self.inner.execute(partition, context)?; + Ok(dbg_batch_stream(stream)) + } +} +``` + +```rust +use std::any::Any; +use std::fmt; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use arrow::array::RecordBatch; +use arrow::datatypes::{DataType, Schema}; +use datafusion::common::Result; +use datafusion::logical_expr::ColumnarValue; +use datafusion::physical_expr::PhysicalExpr; + +/// `PhysicalExpr` wrapper that prints every `evaluate()` call: input +/// `RecordBatch` and the resulting `ColumnarValue`. +#[derive(Debug)] +pub struct DbgExpr { + label: String, + inner: Arc, +} + +impl DbgExpr { + pub fn new(label: impl Into, inner: Arc) -> Self { + Self { label: label.into(), inner } + } +} + +impl fmt::Display for DbgExpr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "DbgExpr[{}]({})", self.label, self.inner) + } +} + +impl PartialEq for DbgExpr { + fn eq(&self, other: &Self) -> bool { + self.label == other.label && self.inner.eq(&other.inner) + } +} +impl Eq for DbgExpr {} +impl Hash for DbgExpr { + fn hash(&self, state: &mut H) { + self.label.hash(state); + self.inner.hash(state); + } +} + +impl PhysicalExpr for DbgExpr { + fn as_any(&self) -> &dyn Any { self } + fn data_type(&self, input_schema: &Schema) -> Result { + self.inner.data_type(input_schema) + } + fn nullable(&self, input_schema: &Schema) -> Result { + self.inner.nullable(input_schema) + } + fn evaluate(&self, batch: &RecordBatch) -> Result { + eprintln!( + "[comet-debug] DbgExpr[{}].evaluate(rows={}, cols={})", + self.label, + batch.num_rows(), + batch.num_columns() + ); + dbg!(batch, batch.schema()); + let out = self.inner.evaluate(batch)?; + match &out { + ColumnarValue::Array(arr) => { dbg!(arr.len(), arr.nulls(), arr); } + ColumnarValue::Scalar(s) => { dbg!(s); } + } + Ok(out) + } + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(DbgExpr::new( + self.label.clone(), + Arc::clone(&children[0]), + ))) + } + fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt_sql(f) + } +} +``` + +`PhysicalPlanner::create_expr` returns `Arc`, so any expression +produced during plan building can be one-line-wrapped. For example, to dump what +a `CASE WHEN` predicate sees and produces: + +```rust +use crate::parquet::parquet_exec::DbgExpr; // or wherever you pasted it + +// Before: +let predicate = self.create_expr(when_expr, Arc::clone(&input_schema))?; + +// After — TEMPORARY, remove before committing: +let predicate: Arc = + Arc::new(DbgExpr::new("case-when-predicate", predicate)); +``` + +To trace every argument of a suspicious scalar function, wrap each child as it +is built: + +```rust +let args: Vec> = expr + .children + .iter() + .enumerate() + .map(|(i, c)| { + let child = self.create_expr(c, Arc::clone(&input_schema))?; + Ok::<_, ExecutionError>(Arc::new(DbgExpr::new( + format!("arg{i}"), + child, + )) as Arc) + }) + .collect::>()?; +``` + +Rebuild with `make core` and run the JVM test. Sample output for a `BinaryExpr` +computing `a + b` over a three-row batch: + +```text +[comet-debug] DbgExpr[add].evaluate(rows=3, cols=2) +[core/src/execution/planner.rs:…] batch = RecordBatch { columns: [Int32[1,2,3], Int32[10,20,30]], row_count: 3 } +[core/src/execution/planner.rs:…] arr.len() = 3 +[core/src/execution/planner.rs:…] arr.nulls() = None +[core/src/execution/planner.rs:…] arr = PrimitiveArray [11, 22, 33] +``` diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index ef4c878b9a..dae11a9a41 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -28,11 +28,9 @@ use datafusion::datasource::physical_plan::{ }; use datafusion::datasource::source::DataSourceExec; use datafusion::execution::object_store::ObjectStoreUrl; -use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_expr::expressions::{BinaryExpr, Column}; use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_expr_adapter::PhysicalExprAdapterFactory; -use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::prelude::SessionContext; use datafusion::scalar::ScalarValue; use datafusion_comet_spark_expr::EvalMode; @@ -208,25 +206,3 @@ fn get_options( (table_parquet_options, spark_parquet_options) } - -/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through. -/// Returns a new `SendableRecordBatchStream` that yields the same batches. -pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream { - use futures::StreamExt; - let schema = stream.schema(); - let printing_stream = stream.map(|batch_result| { - match &batch_result { - Ok(batch) => { - dbg!(batch, batch.schema()); - for (col_idx, column) in batch.columns().iter().enumerate() { - dbg!(col_idx, column, column.nulls()); - } - } - Err(e) => { - println!("batch error: {:?}", e); - } - } - batch_result - }); - Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream)) -}