From b141ff4dffcc7e1678d3be3139f78057f1b39e4d Mon Sep 17 00:00:00 2001 From: Izzet Yildirim Date: Thu, 5 Mar 2026 19:52:04 -0600 Subject: [PATCH 1/3] Optimize set_layer_metrics batching --- python/dftracer/analyzer/analyzer.py | 30 ++++++++--- tests/test_set_layer_metrics.py | 79 ++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 8 deletions(-) create mode 100644 tests/test_set_layer_metrics.py diff --git a/python/dftracer/analyzer/analyzer.py b/python/dftracer/analyzer/analyzer.py index 93b28eb..4d0e4b3 100644 --- a/python/dftracer/analyzer/analyzer.py +++ b/python/dftracer/analyzer/analyzer.py @@ -700,19 +700,33 @@ def set_layer_metrics( hlm = hlm.copy() hlm_columns = list(hlm.columns) size_derived_metric_set = set(size_derived_metrics or []) + is_size_col = {col: (col == "size" or "size_bin" in col) for col in hlm_columns} + is_string_col = {col: pd.api.types.is_string_dtype(hlm.dtypes[col]) for col in hlm_columns} + + # Precompute numeric representations once per source column. + numeric_cols = { + col: pd.to_numeric(hlm[col], errors="coerce") + for col in hlm_columns + if is_size_col[col] or not is_string_col[col] + } + + # Build derived columns in-memory and append once to avoid repeated fragmentation. + derived_cols: Dict[str, pd.Series] = {} for metric, condition in derived_metrics.items(): + metric_mask = hlm.eval(condition) is_size_metric = metric in size_derived_metric_set for col in hlm_columns: - is_size_col = col == "size" or "size_bin" in col - if not is_size_metric and is_size_col: + if not is_size_metric and is_size_col[col]: continue metric_col = f"{metric}_{col}" - hlm[metric_col] = pd.NA - if pd.api.types.is_string_dtype(hlm.dtypes[col]) and not is_size_col: - hlm[metric_col] = hlm[metric_col].map(lambda x: S()) - hlm[metric_col] = hlm[metric_col].mask(hlm.eval(condition), hlm[col]) - if not pd.api.types.is_string_dtype(hlm.dtypes[col]): - hlm[metric_col] = pd.to_numeric(hlm[metric_col], errors="coerce") + if is_string_col[col] and not is_size_col[col]: + # Use None for non-matching rows; unique_set_flatten skips None downstream. + derived_cols[metric_col] = hlm[col].where(metric_mask, None) + else: + derived_cols[metric_col] = numeric_cols[col].mask(~metric_mask, pd.NA) + + if derived_cols: + hlm = pd.concat([hlm, pd.DataFrame(derived_cols, index=hlm.index)], axis=1) return hlm @staticmethod diff --git a/tests/test_set_layer_metrics.py b/tests/test_set_layer_metrics.py new file mode 100644 index 0000000..c9ea9b1 --- /dev/null +++ b/tests/test_set_layer_metrics.py @@ -0,0 +1,79 @@ +import numpy as np +import pandas as pd + +from dftracer.analyzer.analyzer import Analyzer + + +DERIVED_METRICS = { + "read": "io_cat == 1", + "write": "io_cat == 2", + "metadata": "io_cat == 3", +} + +SIZE_DERIVED_METRICS = ["read", "write"] + + +def _build_hlm_df(n_rows: int = 30_000) -> pd.DataFrame: + io_cat = np.tile(np.array([1, 2, 3, 1, 2], dtype=np.int64), int(np.ceil(n_rows / 5)))[:n_rows] + idx = np.arange(n_rows, dtype=np.int64) + return pd.DataFrame( + { + "io_cat": io_cat, + "count": (idx % 17) + 1, + "time": ((idx % 23) + 1).astype(float), + "size": ((idx % 101) + 1) * 4096, + "size_bin_0_4kb": (idx % 2).astype(np.int64), + "func_name": np.where(io_cat == 1, "read", np.where(io_cat == 2, "write", "metadata")), + } + ) + + +def test_set_layer_metrics_correctness() -> None: + hlm = _build_hlm_df(n_rows=2_000) + out = Analyzer.set_layer_metrics( + hlm=hlm, + derived_metrics=DERIVED_METRICS, + size_derived_metrics=SIZE_DERIVED_METRICS, + ) + + # Size columns should only be created for metrics explicitly listed in size_derived_metrics. + assert "read_size" in out.columns + assert "write_size" in out.columns + assert "metadata_size" not in out.columns + + read_mask = hlm["io_cat"] == 1 + write_mask = hlm["io_cat"] == 2 + metadata_mask = hlm["io_cat"] == 3 + + assert np.allclose( + out.loc[read_mask, "read_count"].astype(float), + pd.to_numeric(hlm.loc[read_mask, "count"], errors="coerce").astype(float), + equal_nan=True, + ) + assert out.loc[~read_mask, "read_count"].isna().all() + + assert np.allclose( + out.loc[write_mask, "write_time"].astype(float), + pd.to_numeric(hlm.loc[write_mask, "time"], errors="coerce").astype(float), + equal_nan=True, + ) + assert out.loc[~write_mask, "write_time"].isna().all() + + # String-derived columns carry original values for matching rows and missing values otherwise. + # Downstream unique_set_flatten skips missing values. + assert (out.loc[read_mask, "read_func_name"] == hlm.loc[read_mask, "func_name"]).all() + assert out.loc[~read_mask, "read_func_name"].isna().all() + assert (out.loc[metadata_mask, "metadata_func_name"] == hlm.loc[metadata_mask, "func_name"]).all() + + +def test_set_layer_metrics_perf_smoke() -> None: + hlm = _build_hlm_df(n_rows=50_000) + out = None + for _ in range(8): + out = Analyzer.set_layer_metrics( + hlm=hlm, + derived_metrics=DERIVED_METRICS, + size_derived_metrics=SIZE_DERIVED_METRICS, + ) + assert out is not None + assert int(out["read_count"].notna().sum()) > 0 From 221f70eabb26207ae68adcde66eb8163c603a888 Mon Sep 17 00:00:00 2001 From: Izzet Yildirim Date: Fri, 6 Mar 2026 12:04:28 -0600 Subject: [PATCH 2/3] Handle set-like derived metrics safely --- python/dftracer/analyzer/analyzer.py | 34 ++++++++++++------- .../analyzer/utils/collection_utils.py | 8 +++++ python/dftracer/analyzer/utils/meson.build | 1 + .../dftracer/analyzer/utils/pandas_utils.py | 10 ++++++ tests/test_set_layer_metrics.py | 34 +++++++++++++++++++ tests/utils/test_collection_utils.py | 14 ++++++++ tests/utils/test_pandas_utils.py | 17 ++++++++++ 7 files changed, 106 insertions(+), 12 deletions(-) create mode 100644 python/dftracer/analyzer/utils/pandas_utils.py create mode 100644 tests/utils/test_collection_utils.py create mode 100644 tests/utils/test_pandas_utils.py diff --git a/python/dftracer/analyzer/analyzer.py b/python/dftracer/analyzer/analyzer.py index 4d0e4b3..f43bce9 100644 --- a/python/dftracer/analyzer/analyzer.py +++ b/python/dftracer/analyzer/analyzer.py @@ -7,7 +7,6 @@ import os import pandas as pd import structlog -from betterset import BetterSet as S from dask import compute, persist from dask.distributed import fire_and_forget, get_client, wait from omegaconf import OmegaConf @@ -45,12 +44,14 @@ ViewType, Views, ) +from .utils.collection_utils import is_set_like_series from .utils.dask_agg import quantile_stats, unique_set, unique_set_flatten from .utils.dask_utils import flatten_column_names from .utils.expr_utils import extract_numerator_and_denominators from .utils.file_utils import ensure_dir from .utils.json_encoders import NpEncoder from .utils.log_utils import console_block, log_block +from .utils.pandas_utils import to_nullable_numeric CHECKPOINT_FLAT_VIEW = "_flat_view" @@ -701,14 +702,23 @@ def set_layer_metrics( hlm_columns = list(hlm.columns) size_derived_metric_set = set(size_derived_metrics or []) is_size_col = {col: (col == "size" or "size_bin" in col) for col in hlm_columns} - is_string_col = {col: pd.api.types.is_string_dtype(hlm.dtypes[col]) for col in hlm_columns} - - # Precompute numeric representations once per source column. - numeric_cols = { - col: pd.to_numeric(hlm[col], errors="coerce") - for col in hlm_columns - if is_size_col[col] or not is_string_col[col] - } + col_kinds = {} + numeric_cols = {} + + for col in hlm_columns: + series = hlm[col] + if is_size_col[col] or pd.api.types.is_numeric_dtype(series.dtype): + col_kinds[col] = "numeric" + numeric_cols[col] = to_nullable_numeric(series) + elif pd.api.types.is_string_dtype(series.dtype): + col_kinds[col] = "string" + elif is_set_like_series(series): + col_kinds[col] = "set_like" + else: + raise TypeError( + f"Unsupported data type '{series.dtype}' for column '{col}'. " + "Developer must add explicit handling for this data type in set_layer_metrics." + ) # Build derived columns in-memory and append once to avoid repeated fragmentation. derived_cols: Dict[str, pd.Series] = {} @@ -719,11 +729,11 @@ def set_layer_metrics( if not is_size_metric and is_size_col[col]: continue metric_col = f"{metric}_{col}" - if is_string_col[col] and not is_size_col[col]: - # Use None for non-matching rows; unique_set_flatten skips None downstream. + if col_kinds[col] in {"string", "set_like"}: + # unique_set_flatten skips None for set-like columns downstream. derived_cols[metric_col] = hlm[col].where(metric_mask, None) else: - derived_cols[metric_col] = numeric_cols[col].mask(~metric_mask, pd.NA) + derived_cols[metric_col] = numeric_cols[col].where(metric_mask) if derived_cols: hlm = pd.concat([hlm, pd.DataFrame(derived_cols, index=hlm.index)], axis=1) diff --git a/python/dftracer/analyzer/utils/collection_utils.py b/python/dftracer/analyzer/utils/collection_utils.py index 64dd1c3..9085ded 100644 --- a/python/dftracer/analyzer/utils/collection_utils.py +++ b/python/dftracer/analyzer/utils/collection_utils.py @@ -1,5 +1,6 @@ import pandas as pd from typing import Iterable, List +from betterset import BetterSet as S def deepflatten(collection, ignore_types=(bytes, str)): @@ -42,3 +43,10 @@ def join_with_and(values: List[str]): return ' and '.join(values) else: return ', '.join(values[:-1]) + ', and ' + values[-1] + + +def is_set_like_series(series: pd.Series) -> bool: + for value in series.array: + if value is not None and value is not pd.NA: + return isinstance(value, S) + return False diff --git a/python/dftracer/analyzer/utils/meson.build b/python/dftracer/analyzer/utils/meson.build index 179be24..95e66b9 100644 --- a/python/dftracer/analyzer/utils/meson.build +++ b/python/dftracer/analyzer/utils/meson.build @@ -12,6 +12,7 @@ py.install_sources( 'json_encoders.py', 'log_utils.py', 'notebook_utils.py', + 'pandas_utils.py', 'warning_utils.py', ], subdir: 'dftracer/analyzer/utils', diff --git a/python/dftracer/analyzer/utils/pandas_utils.py b/python/dftracer/analyzer/utils/pandas_utils.py new file mode 100644 index 0000000..c30e944 --- /dev/null +++ b/python/dftracer/analyzer/utils/pandas_utils.py @@ -0,0 +1,10 @@ +import pandas as pd + + +def to_nullable_numeric(series: pd.Series) -> pd.Series: + numeric = pd.to_numeric(series, errors="coerce") + if pd.api.types.is_integer_dtype(numeric.dtype): + return numeric.astype("Int64") + if pd.api.types.is_float_dtype(numeric.dtype): + return numeric.astype("Float64") + return numeric diff --git a/tests/test_set_layer_metrics.py b/tests/test_set_layer_metrics.py index c9ea9b1..df95ab2 100644 --- a/tests/test_set_layer_metrics.py +++ b/tests/test_set_layer_metrics.py @@ -1,7 +1,9 @@ import numpy as np import pandas as pd +from betterset import BetterSet as S from dftracer.analyzer.analyzer import Analyzer +from dftracer.analyzer.utils.dask_agg import unique_set_flatten DERIVED_METRICS = { @@ -51,6 +53,7 @@ def test_set_layer_metrics_correctness() -> None: equal_nan=True, ) assert out.loc[~read_mask, "read_count"].isna().all() + assert str(out["read_count"].dtype) == "Int64" assert np.allclose( out.loc[write_mask, "write_time"].astype(float), @@ -58,6 +61,7 @@ def test_set_layer_metrics_correctness() -> None: equal_nan=True, ) assert out.loc[~write_mask, "write_time"].isna().all() + assert str(out["write_time"].dtype) == "Float64" # String-derived columns carry original values for matching rows and missing values otherwise. # Downstream unique_set_flatten skips missing values. @@ -66,6 +70,36 @@ def test_set_layer_metrics_correctness() -> None: assert (out.loc[metadata_mask, "metadata_func_name"] == hlm.loc[metadata_mask, "func_name"]).all() +def test_set_layer_metrics_preserves_betterset_columns() -> None: + hlm = pd.DataFrame( + { + "group": ["g0", "g0", "g1", "g1"], + "io_cat": pd.Series([1, 2, 1, 3], dtype="Int64"), + "count": pd.Series([1, 2, 3, 4], dtype="Int64"), + "file_name": pd.Series( + [S(["a"]), S(["b"]), S(["c"]), S(["d"])], + dtype="object", + ), + } + ) + out = Analyzer.set_layer_metrics( + hlm=hlm, + derived_metrics=DERIVED_METRICS, + size_derived_metrics=SIZE_DERIVED_METRICS, + ) + + read_mask = hlm["io_cat"] == 1 + for idx in hlm.index[read_mask]: + assert out.at[idx, "read_file_name"] == hlm.at[idx, "file_name"] + assert out.loc[~read_mask, "read_file_name"].isna().all() + + flatten_agg = unique_set_flatten() + chunked = flatten_agg.chunk(out.groupby("group")["read_file_name"]) + aggregated = flatten_agg.agg(chunked.groupby(level=0)) + assert set(aggregated.loc["g0"]) == {"a"} + assert set(aggregated.loc["g1"]) == {"c"} + + def test_set_layer_metrics_perf_smoke() -> None: hlm = _build_hlm_df(n_rows=50_000) out = None diff --git a/tests/utils/test_collection_utils.py b/tests/utils/test_collection_utils.py new file mode 100644 index 0000000..e7337d4 --- /dev/null +++ b/tests/utils/test_collection_utils.py @@ -0,0 +1,14 @@ +import pandas as pd +from betterset import BetterSet as S + +from dftracer.analyzer.utils.collection_utils import is_set_like_series + + +def test_is_set_like_series_detects_betterset_values() -> None: + series = pd.Series([None, S(["a"]), S(["b"])], dtype="object") + assert is_set_like_series(series) is True + + +def test_is_set_like_series_ignores_plain_strings() -> None: + series = pd.Series(["a", "b", None], dtype="object") + assert is_set_like_series(series) is False diff --git a/tests/utils/test_pandas_utils.py b/tests/utils/test_pandas_utils.py new file mode 100644 index 0000000..2a68c59 --- /dev/null +++ b/tests/utils/test_pandas_utils.py @@ -0,0 +1,17 @@ +import pandas as pd + +from dftracer.analyzer.utils.pandas_utils import to_nullable_numeric + + +def test_to_nullable_numeric_preserves_integer_nullability() -> None: + series = pd.Series([1, 2, 3], dtype="int64") + out = to_nullable_numeric(series).where(pd.Series([True, False, True])) + assert str(out.dtype) == "Int64" + assert out.tolist() == [1, pd.NA, 3] + + +def test_to_nullable_numeric_preserves_float_nullability() -> None: + series = pd.Series([1.5, 2.5, 3.5], dtype="float64") + out = to_nullable_numeric(series).where(pd.Series([True, False, True])) + assert str(out.dtype) == "Float64" + assert out.tolist() == [1.5, pd.NA, 3.5] From 8ccc85db250cce7acf35baccca35d01f864829be Mon Sep 17 00:00:00 2001 From: Izzet Yildirim Date: Fri, 6 Mar 2026 12:15:02 -0600 Subject: [PATCH 3/3] Mark derived metric tests for smoke and full --- tests/test_set_layer_metrics.py | 3 +++ tests/utils/test_collection_utils.py | 3 +++ tests/utils/test_pandas_utils.py | 3 +++ 3 files changed, 9 insertions(+) diff --git a/tests/test_set_layer_metrics.py b/tests/test_set_layer_metrics.py index df95ab2..f23c344 100644 --- a/tests/test_set_layer_metrics.py +++ b/tests/test_set_layer_metrics.py @@ -1,10 +1,13 @@ import numpy as np import pandas as pd +import pytest from betterset import BetterSet as S from dftracer.analyzer.analyzer import Analyzer from dftracer.analyzer.utils.dask_agg import unique_set_flatten +pytestmark = [pytest.mark.smoke, pytest.mark.full] + DERIVED_METRICS = { "read": "io_cat == 1", diff --git a/tests/utils/test_collection_utils.py b/tests/utils/test_collection_utils.py index e7337d4..d623344 100644 --- a/tests/utils/test_collection_utils.py +++ b/tests/utils/test_collection_utils.py @@ -1,8 +1,11 @@ import pandas as pd +import pytest from betterset import BetterSet as S from dftracer.analyzer.utils.collection_utils import is_set_like_series +pytestmark = [pytest.mark.smoke, pytest.mark.full] + def test_is_set_like_series_detects_betterset_values() -> None: series = pd.Series([None, S(["a"]), S(["b"])], dtype="object") diff --git a/tests/utils/test_pandas_utils.py b/tests/utils/test_pandas_utils.py index 2a68c59..d2072ee 100644 --- a/tests/utils/test_pandas_utils.py +++ b/tests/utils/test_pandas_utils.py @@ -1,7 +1,10 @@ import pandas as pd +import pytest from dftracer.analyzer.utils.pandas_utils import to_nullable_numeric +pytestmark = [pytest.mark.smoke, pytest.mark.full] + def test_to_nullable_numeric_preserves_integer_nullability() -> None: series = pd.Series([1, 2, 3], dtype="int64")