Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 253 additions & 0 deletions docs/source/contributor-guide/debugging.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,256 @@ Example log output:
```

When backtraces are enabled (see earlier section) then backtraces will be included for failed allocations.

### Dumping native stream output with a `DbgExec` wrapper

When a native operator is suspected of producing wrong data (wrong values, wrong
nullability, wrong row count) but the JVM-side observable output is just a DataFrame
mismatch, it is useful to inspect the `RecordBatch`es that the operator actually
emits. A small `ExecutionPlan` wrapper that prints every batch as it flows through
makes this easy. Comet does not ship this wrapper in the source tree — paste it
into a convenient module (for example `native/core/src/parquet/parquet_exec.rs`)
for the duration of your debugging session and remove it before committing.

`DbgExec` forwards `schema`, `properties`, `children`, and `with_new_children` to the
inner plan, so slotting it in does not change operator semantics — it only adds
printing. Because it is itself an `ExecutionPlan` it can be inserted anywhere in
the physical plan tree built by `PhysicalPlanner::create_plan` in
`native/core/src/execution/planner.rs`.

### Dumping expression inputs and outputs with a `DbgExpr` wrapper

`DbgExec` works at the operator level. When the suspect is a single
**expression** — a cast, a binary op, a `CASE WHEN` predicate, a UDF — an
`ExecutionPlan` wrapper is too coarse. The equivalent trick at the
`PhysicalExpr` level is a small wrapper that forwards `evaluate()` to an inner
expression and prints both the input `RecordBatch` and the resulting
`ColumnarValue`. Like `DbgExec`, this wrapper is not shipped in the source tree
— paste it in for a debugging session and remove it before committing.

#### Using `DbgExpr` in `planner.rs`

Paste the wrappers below into a convenient module (for example
`native/core/src/parquet/parquet_exec.rs`). The first block is `DbgExec` (wraps
an `ExecutionPlan`); the second is `DbgExpr` (wraps a `PhysicalExpr`). You only
need whichever one matches the granularity you want to trace — they are
independent.

```rust
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code should be part of the product so we make sure it always compiiles in CI. Can we put this functionality behind a config somehow?

use std::sync::Arc;

use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;

/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through.
pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream {
use futures::StreamExt;
let schema = stream.schema();
let printing_stream = stream.map(|batch_result| {
match &batch_result {
Ok(batch) => {
dbg!(batch, batch.schema());
for (col_idx, column) in batch.columns().iter().enumerate() {
dbg!(col_idx, column, column.nulls());
}
}
Err(e) => {
println!("batch error: {:?}", e);
}
}
batch_result
});
Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream))
}

/// `ExecutionPlan` wrapper that prints every batch produced by `inner`.
#[derive(Debug)]
pub struct DbgExec {
label: String,
inner: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
}

impl DbgExec {
pub fn new(
label: impl Into<String>,
inner: Arc<dyn datafusion::physical_plan::ExecutionPlan>,
) -> Self {
Self {
label: label.into(),
inner,
}
}
}

impl datafusion::physical_plan::DisplayAs for DbgExec {
fn fmt_as(
&self,
_t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "DbgExec[{}]", self.label)
}
}

impl datafusion::physical_plan::ExecutionPlan for DbgExec {
fn name(&self) -> &str { "DbgExec" }
fn as_any(&self) -> &dyn std::any::Any { self }
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
self.inner.properties()
}
fn children(&self) -> Vec<&Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
vec![&self.inner]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn datafusion::physical_plan::ExecutionPlan>>,
) -> datafusion::common::Result<Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
Ok(Arc::new(DbgExec::new(
self.label.clone(),
Arc::clone(&children[0]),
)))
}
fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::TaskContext>,
) -> datafusion::common::Result<SendableRecordBatchStream> {
eprintln!(
"[comet-debug] DbgExec[{}] execute(partition={})",
self.label, partition
);
let stream = self.inner.execute(partition, context)?;
Ok(dbg_batch_stream(stream))
}
}
```

```rust
use std::any::Any;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Schema};
use datafusion::common::Result;
use datafusion::logical_expr::ColumnarValue;
use datafusion::physical_expr::PhysicalExpr;

/// `PhysicalExpr` wrapper that prints every `evaluate()` call: input
/// `RecordBatch` and the resulting `ColumnarValue`.
#[derive(Debug)]
pub struct DbgExpr {
label: String,
inner: Arc<dyn PhysicalExpr>,
}

impl DbgExpr {
pub fn new(label: impl Into<String>, inner: Arc<dyn PhysicalExpr>) -> Self {
Self { label: label.into(), inner }
}
}

impl fmt::Display for DbgExpr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DbgExpr[{}]({})", self.label, self.inner)
}
}

impl PartialEq for DbgExpr {
fn eq(&self, other: &Self) -> bool {
self.label == other.label && self.inner.eq(&other.inner)
}
}
impl Eq for DbgExpr {}
impl Hash for DbgExpr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.label.hash(state);
self.inner.hash(state);
}
}

impl PhysicalExpr for DbgExpr {
fn as_any(&self) -> &dyn Any { self }
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
self.inner.data_type(input_schema)
}
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
self.inner.nullable(input_schema)
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
eprintln!(
"[comet-debug] DbgExpr[{}].evaluate(rows={}, cols={})",
self.label,
batch.num_rows(),
batch.num_columns()
);
dbg!(batch, batch.schema());
let out = self.inner.evaluate(batch)?;
match &out {
ColumnarValue::Array(arr) => { dbg!(arr.len(), arr.nulls(), arr); }
ColumnarValue::Scalar(s) => { dbg!(s); }
}
Ok(out)
}
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.inner]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(DbgExpr::new(
self.label.clone(),
Arc::clone(&children[0]),
)))
}
fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt_sql(f)
}
}
```

`PhysicalPlanner::create_expr` returns `Arc<dyn PhysicalExpr>`, so any expression
produced during plan building can be one-line-wrapped. For example, to dump what
a `CASE WHEN` predicate sees and produces:

```rust
use crate::parquet::parquet_exec::DbgExpr; // or wherever you pasted it

// Before:
let predicate = self.create_expr(when_expr, Arc::clone(&input_schema))?;

// After — TEMPORARY, remove before committing:
let predicate: Arc<dyn PhysicalExpr> =
Arc::new(DbgExpr::new("case-when-predicate", predicate));
```

To trace every argument of a suspicious scalar function, wrap each child as it
is built:

```rust
let args: Vec<Arc<dyn PhysicalExpr>> = expr
.children
.iter()
.enumerate()
.map(|(i, c)| {
let child = self.create_expr(c, Arc::clone(&input_schema))?;
Ok::<_, ExecutionError>(Arc::new(DbgExpr::new(
format!("arg{i}"),
child,
)) as Arc<dyn PhysicalExpr>)
})
.collect::<Result<_, _>>()?;
```

Rebuild with `make core` and run the JVM test. Sample output for a `BinaryExpr`
computing `a + b` over a three-row batch:

```text
[comet-debug] DbgExpr[add].evaluate(rows=3, cols=2)
[core/src/execution/planner.rs:…] batch = RecordBatch { columns: [Int32[1,2,3], Int32[10,20,30]], row_count: 3 }
[core/src/execution/planner.rs:…] arr.len() = 3
[core/src/execution/planner.rs:…] arr.nulls() = None
[core/src/execution/planner.rs:…] arr = PrimitiveArray<Int32> [11, 22, 33]
```
24 changes: 0 additions & 24 deletions native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ use datafusion::datasource::physical_plan::{
};
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_expr::expressions::{BinaryExpr, Column};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::prelude::SessionContext;
use datafusion::scalar::ScalarValue;
use datafusion_comet_spark_expr::EvalMode;
Expand Down Expand Up @@ -208,25 +206,3 @@ fn get_options(

(table_parquet_options, spark_parquet_options)
}

/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through.
/// Returns a new `SendableRecordBatchStream` that yields the same batches.
pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream {
use futures::StreamExt;
let schema = stream.schema();
let printing_stream = stream.map(|batch_result| {
match &batch_result {
Ok(batch) => {
dbg!(batch, batch.schema());
for (col_idx, column) in batch.columns().iter().enumerate() {
dbg!(col_idx, column, column.nulls());
}
}
Err(e) => {
println!("batch error: {:?}", e);
}
}
batch_result
});
Box::pin(RecordBatchStreamAdapter::new(schema, printing_stream))
}
Loading