diff --git a/python/dftracer/analyzer/analyzer.py b/python/dftracer/analyzer/analyzer.py index 93b28eb..f43bce9 100644 --- a/python/dftracer/analyzer/analyzer.py +++ b/python/dftracer/analyzer/analyzer.py @@ -7,7 +7,6 @@ import os import pandas as pd import structlog -from betterset import BetterSet as S from dask import compute, persist from dask.distributed import fire_and_forget, get_client, wait from omegaconf import OmegaConf @@ -45,12 +44,14 @@ ViewType, Views, ) +from .utils.collection_utils import is_set_like_series from .utils.dask_agg import quantile_stats, unique_set, unique_set_flatten from .utils.dask_utils import flatten_column_names from .utils.expr_utils import extract_numerator_and_denominators from .utils.file_utils import ensure_dir from .utils.json_encoders import NpEncoder from .utils.log_utils import console_block, log_block +from .utils.pandas_utils import to_nullable_numeric CHECKPOINT_FLAT_VIEW = "_flat_view" @@ -700,19 +701,42 @@ def set_layer_metrics( hlm = hlm.copy() hlm_columns = list(hlm.columns) size_derived_metric_set = set(size_derived_metrics or []) + is_size_col = {col: (col == "size" or "size_bin" in col) for col in hlm_columns} + col_kinds = {} + numeric_cols = {} + + for col in hlm_columns: + series = hlm[col] + if is_size_col[col] or pd.api.types.is_numeric_dtype(series.dtype): + col_kinds[col] = "numeric" + numeric_cols[col] = to_nullable_numeric(series) + elif pd.api.types.is_string_dtype(series.dtype): + col_kinds[col] = "string" + elif is_set_like_series(series): + col_kinds[col] = "set_like" + else: + raise TypeError( + f"Unsupported data type '{series.dtype}' for column '{col}'. " + "Developer must add explicit handling for this data type in set_layer_metrics." + ) + + # Build derived columns in-memory and append once to avoid repeated fragmentation. + derived_cols: Dict[str, pd.Series] = {} for metric, condition in derived_metrics.items(): + metric_mask = hlm.eval(condition) is_size_metric = metric in size_derived_metric_set for col in hlm_columns: - is_size_col = col == "size" or "size_bin" in col - if not is_size_metric and is_size_col: + if not is_size_metric and is_size_col[col]: continue metric_col = f"{metric}_{col}" - hlm[metric_col] = pd.NA - if pd.api.types.is_string_dtype(hlm.dtypes[col]) and not is_size_col: - hlm[metric_col] = hlm[metric_col].map(lambda x: S()) - hlm[metric_col] = hlm[metric_col].mask(hlm.eval(condition), hlm[col]) - if not pd.api.types.is_string_dtype(hlm.dtypes[col]): - hlm[metric_col] = pd.to_numeric(hlm[metric_col], errors="coerce") + if col_kinds[col] in {"string", "set_like"}: + # unique_set_flatten skips None for set-like columns downstream. + derived_cols[metric_col] = hlm[col].where(metric_mask, None) + else: + derived_cols[metric_col] = numeric_cols[col].where(metric_mask) + + if derived_cols: + hlm = pd.concat([hlm, pd.DataFrame(derived_cols, index=hlm.index)], axis=1) return hlm @staticmethod diff --git a/python/dftracer/analyzer/utils/collection_utils.py b/python/dftracer/analyzer/utils/collection_utils.py index 64dd1c3..9085ded 100644 --- a/python/dftracer/analyzer/utils/collection_utils.py +++ b/python/dftracer/analyzer/utils/collection_utils.py @@ -1,5 +1,6 @@ import pandas as pd from typing import Iterable, List +from betterset import BetterSet as S def deepflatten(collection, ignore_types=(bytes, str)): @@ -42,3 +43,10 @@ def join_with_and(values: List[str]): return ' and '.join(values) else: return ', '.join(values[:-1]) + ', and ' + values[-1] + + +def is_set_like_series(series: pd.Series) -> bool: + for value in series.array: + if value is not None and value is not pd.NA: + return isinstance(value, S) + return False diff --git a/python/dftracer/analyzer/utils/meson.build b/python/dftracer/analyzer/utils/meson.build index 179be24..95e66b9 100644 --- a/python/dftracer/analyzer/utils/meson.build +++ b/python/dftracer/analyzer/utils/meson.build @@ -12,6 +12,7 @@ py.install_sources( 'json_encoders.py', 'log_utils.py', 'notebook_utils.py', + 'pandas_utils.py', 'warning_utils.py', ], subdir: 'dftracer/analyzer/utils', diff --git a/python/dftracer/analyzer/utils/pandas_utils.py b/python/dftracer/analyzer/utils/pandas_utils.py new file mode 100644 index 0000000..c30e944 --- /dev/null +++ b/python/dftracer/analyzer/utils/pandas_utils.py @@ -0,0 +1,10 @@ +import pandas as pd + + +def to_nullable_numeric(series: pd.Series) -> pd.Series: + numeric = pd.to_numeric(series, errors="coerce") + if pd.api.types.is_integer_dtype(numeric.dtype): + return numeric.astype("Int64") + if pd.api.types.is_float_dtype(numeric.dtype): + return numeric.astype("Float64") + return numeric diff --git a/tests/test_set_layer_metrics.py b/tests/test_set_layer_metrics.py new file mode 100644 index 0000000..f23c344 --- /dev/null +++ b/tests/test_set_layer_metrics.py @@ -0,0 +1,116 @@ +import numpy as np +import pandas as pd +import pytest +from betterset import BetterSet as S + +from dftracer.analyzer.analyzer import Analyzer +from dftracer.analyzer.utils.dask_agg import unique_set_flatten + +pytestmark = [pytest.mark.smoke, pytest.mark.full] + + +DERIVED_METRICS = { + "read": "io_cat == 1", + "write": "io_cat == 2", + "metadata": "io_cat == 3", +} + +SIZE_DERIVED_METRICS = ["read", "write"] + + +def _build_hlm_df(n_rows: int = 30_000) -> pd.DataFrame: + io_cat = np.tile(np.array([1, 2, 3, 1, 2], dtype=np.int64), int(np.ceil(n_rows / 5)))[:n_rows] + idx = np.arange(n_rows, dtype=np.int64) + return pd.DataFrame( + { + "io_cat": io_cat, + "count": (idx % 17) + 1, + "time": ((idx % 23) + 1).astype(float), + "size": ((idx % 101) + 1) * 4096, + "size_bin_0_4kb": (idx % 2).astype(np.int64), + "func_name": np.where(io_cat == 1, "read", np.where(io_cat == 2, "write", "metadata")), + } + ) + + +def test_set_layer_metrics_correctness() -> None: + hlm = _build_hlm_df(n_rows=2_000) + out = Analyzer.set_layer_metrics( + hlm=hlm, + derived_metrics=DERIVED_METRICS, + size_derived_metrics=SIZE_DERIVED_METRICS, + ) + + # Size columns should only be created for metrics explicitly listed in size_derived_metrics. + assert "read_size" in out.columns + assert "write_size" in out.columns + assert "metadata_size" not in out.columns + + read_mask = hlm["io_cat"] == 1 + write_mask = hlm["io_cat"] == 2 + metadata_mask = hlm["io_cat"] == 3 + + assert np.allclose( + out.loc[read_mask, "read_count"].astype(float), + pd.to_numeric(hlm.loc[read_mask, "count"], errors="coerce").astype(float), + equal_nan=True, + ) + assert out.loc[~read_mask, "read_count"].isna().all() + assert str(out["read_count"].dtype) == "Int64" + + assert np.allclose( + out.loc[write_mask, "write_time"].astype(float), + pd.to_numeric(hlm.loc[write_mask, "time"], errors="coerce").astype(float), + equal_nan=True, + ) + assert out.loc[~write_mask, "write_time"].isna().all() + assert str(out["write_time"].dtype) == "Float64" + + # String-derived columns carry original values for matching rows and missing values otherwise. + # Downstream unique_set_flatten skips missing values. + assert (out.loc[read_mask, "read_func_name"] == hlm.loc[read_mask, "func_name"]).all() + assert out.loc[~read_mask, "read_func_name"].isna().all() + assert (out.loc[metadata_mask, "metadata_func_name"] == hlm.loc[metadata_mask, "func_name"]).all() + + +def test_set_layer_metrics_preserves_betterset_columns() -> None: + hlm = pd.DataFrame( + { + "group": ["g0", "g0", "g1", "g1"], + "io_cat": pd.Series([1, 2, 1, 3], dtype="Int64"), + "count": pd.Series([1, 2, 3, 4], dtype="Int64"), + "file_name": pd.Series( + [S(["a"]), S(["b"]), S(["c"]), S(["d"])], + dtype="object", + ), + } + ) + out = Analyzer.set_layer_metrics( + hlm=hlm, + derived_metrics=DERIVED_METRICS, + size_derived_metrics=SIZE_DERIVED_METRICS, + ) + + read_mask = hlm["io_cat"] == 1 + for idx in hlm.index[read_mask]: + assert out.at[idx, "read_file_name"] == hlm.at[idx, "file_name"] + assert out.loc[~read_mask, "read_file_name"].isna().all() + + flatten_agg = unique_set_flatten() + chunked = flatten_agg.chunk(out.groupby("group")["read_file_name"]) + aggregated = flatten_agg.agg(chunked.groupby(level=0)) + assert set(aggregated.loc["g0"]) == {"a"} + assert set(aggregated.loc["g1"]) == {"c"} + + +def test_set_layer_metrics_perf_smoke() -> None: + hlm = _build_hlm_df(n_rows=50_000) + out = None + for _ in range(8): + out = Analyzer.set_layer_metrics( + hlm=hlm, + derived_metrics=DERIVED_METRICS, + size_derived_metrics=SIZE_DERIVED_METRICS, + ) + assert out is not None + assert int(out["read_count"].notna().sum()) > 0 diff --git a/tests/utils/test_collection_utils.py b/tests/utils/test_collection_utils.py new file mode 100644 index 0000000..d623344 --- /dev/null +++ b/tests/utils/test_collection_utils.py @@ -0,0 +1,17 @@ +import pandas as pd +import pytest +from betterset import BetterSet as S + +from dftracer.analyzer.utils.collection_utils import is_set_like_series + +pytestmark = [pytest.mark.smoke, pytest.mark.full] + + +def test_is_set_like_series_detects_betterset_values() -> None: + series = pd.Series([None, S(["a"]), S(["b"])], dtype="object") + assert is_set_like_series(series) is True + + +def test_is_set_like_series_ignores_plain_strings() -> None: + series = pd.Series(["a", "b", None], dtype="object") + assert is_set_like_series(series) is False diff --git a/tests/utils/test_pandas_utils.py b/tests/utils/test_pandas_utils.py new file mode 100644 index 0000000..d2072ee --- /dev/null +++ b/tests/utils/test_pandas_utils.py @@ -0,0 +1,20 @@ +import pandas as pd +import pytest + +from dftracer.analyzer.utils.pandas_utils import to_nullable_numeric + +pytestmark = [pytest.mark.smoke, pytest.mark.full] + + +def test_to_nullable_numeric_preserves_integer_nullability() -> None: + series = pd.Series([1, 2, 3], dtype="int64") + out = to_nullable_numeric(series).where(pd.Series([True, False, True])) + assert str(out.dtype) == "Int64" + assert out.tolist() == [1, pd.NA, 3] + + +def test_to_nullable_numeric_preserves_float_nullability() -> None: + series = pd.Series([1.5, 2.5, 3.5], dtype="float64") + out = to_nullable_numeric(series).where(pd.Series([True, False, True])) + assert str(out.dtype) == "Float64" + assert out.tolist() == [1.5, pd.NA, 3.5]