Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions python/dftracer/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import pandas as pd
import structlog
from betterset import BetterSet as S
from dask import compute, persist
from dask.distributed import fire_and_forget, get_client, wait
from omegaconf import OmegaConf
Expand Down Expand Up @@ -45,12 +44,14 @@
ViewType,
Views,
)
from .utils.collection_utils import is_set_like_series
from .utils.dask_agg import quantile_stats, unique_set, unique_set_flatten
from .utils.dask_utils import flatten_column_names
from .utils.expr_utils import extract_numerator_and_denominators
from .utils.file_utils import ensure_dir
from .utils.json_encoders import NpEncoder
from .utils.log_utils import console_block, log_block
from .utils.pandas_utils import to_nullable_numeric


CHECKPOINT_FLAT_VIEW = "_flat_view"
Expand Down Expand Up @@ -700,19 +701,42 @@ def set_layer_metrics(
hlm = hlm.copy()
hlm_columns = list(hlm.columns)
size_derived_metric_set = set(size_derived_metrics or [])
is_size_col = {col: (col == "size" or "size_bin" in col) for col in hlm_columns}
col_kinds = {}
numeric_cols = {}

for col in hlm_columns:
series = hlm[col]
if is_size_col[col] or pd.api.types.is_numeric_dtype(series.dtype):
col_kinds[col] = "numeric"
numeric_cols[col] = to_nullable_numeric(series)
elif pd.api.types.is_string_dtype(series.dtype):
col_kinds[col] = "string"
elif is_set_like_series(series):
col_kinds[col] = "set_like"
else:
raise TypeError(
f"Unsupported data type '{series.dtype}' for column '{col}'. "
"Developer must add explicit handling for this data type in set_layer_metrics."
)

# Build derived columns in-memory and append once to avoid repeated fragmentation.
derived_cols: Dict[str, pd.Series] = {}
for metric, condition in derived_metrics.items():
metric_mask = hlm.eval(condition)
is_size_metric = metric in size_derived_metric_set
for col in hlm_columns:
is_size_col = col == "size" or "size_bin" in col
if not is_size_metric and is_size_col:
if not is_size_metric and is_size_col[col]:
continue
metric_col = f"{metric}_{col}"
hlm[metric_col] = pd.NA
if pd.api.types.is_string_dtype(hlm.dtypes[col]) and not is_size_col:
hlm[metric_col] = hlm[metric_col].map(lambda x: S())
hlm[metric_col] = hlm[metric_col].mask(hlm.eval(condition), hlm[col])
if not pd.api.types.is_string_dtype(hlm.dtypes[col]):
hlm[metric_col] = pd.to_numeric(hlm[metric_col], errors="coerce")
if col_kinds[col] in {"string", "set_like"}:
# unique_set_flatten skips None for set-like columns downstream.
derived_cols[metric_col] = hlm[col].where(metric_mask, None)
else:
derived_cols[metric_col] = numeric_cols[col].where(metric_mask)

if derived_cols:
hlm = pd.concat([hlm, pd.DataFrame(derived_cols, index=hlm.index)], axis=1)
return hlm

@staticmethod
Expand Down
8 changes: 8 additions & 0 deletions python/dftracer/analyzer/utils/collection_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pandas as pd
from typing import Iterable, List
from betterset import BetterSet as S


def deepflatten(collection, ignore_types=(bytes, str)):
Expand Down Expand Up @@ -42,3 +43,10 @@ def join_with_and(values: List[str]):
return ' and '.join(values)
else:
return ', '.join(values[:-1]) + ', and ' + values[-1]


def is_set_like_series(series: pd.Series) -> bool:
for value in series.array:
if value is not None and value is not pd.NA:
return isinstance(value, S)
return False
1 change: 1 addition & 0 deletions python/dftracer/analyzer/utils/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ py.install_sources(
'json_encoders.py',
'log_utils.py',
'notebook_utils.py',
'pandas_utils.py',
'warning_utils.py',
],
subdir: 'dftracer/analyzer/utils',
Expand Down
10 changes: 10 additions & 0 deletions python/dftracer/analyzer/utils/pandas_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import pandas as pd


def to_nullable_numeric(series: pd.Series) -> pd.Series:
numeric = pd.to_numeric(series, errors="coerce")
if pd.api.types.is_integer_dtype(numeric.dtype):
return numeric.astype("Int64")
if pd.api.types.is_float_dtype(numeric.dtype):
return numeric.astype("Float64")
return numeric
116 changes: 116 additions & 0 deletions tests/test_set_layer_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import numpy as np
import pandas as pd
import pytest
from betterset import BetterSet as S

from dftracer.analyzer.analyzer import Analyzer
from dftracer.analyzer.utils.dask_agg import unique_set_flatten

pytestmark = [pytest.mark.smoke, pytest.mark.full]


DERIVED_METRICS = {
"read": "io_cat == 1",
"write": "io_cat == 2",
"metadata": "io_cat == 3",
}

SIZE_DERIVED_METRICS = ["read", "write"]


def _build_hlm_df(n_rows: int = 30_000) -> pd.DataFrame:
io_cat = np.tile(np.array([1, 2, 3, 1, 2], dtype=np.int64), int(np.ceil(n_rows / 5)))[:n_rows]
idx = np.arange(n_rows, dtype=np.int64)
return pd.DataFrame(
{
"io_cat": io_cat,
"count": (idx % 17) + 1,
"time": ((idx % 23) + 1).astype(float),
"size": ((idx % 101) + 1) * 4096,
"size_bin_0_4kb": (idx % 2).astype(np.int64),
"func_name": np.where(io_cat == 1, "read", np.where(io_cat == 2, "write", "metadata")),
}
)


def test_set_layer_metrics_correctness() -> None:
hlm = _build_hlm_df(n_rows=2_000)
out = Analyzer.set_layer_metrics(
hlm=hlm,
derived_metrics=DERIVED_METRICS,
size_derived_metrics=SIZE_DERIVED_METRICS,
)

# Size columns should only be created for metrics explicitly listed in size_derived_metrics.
assert "read_size" in out.columns
assert "write_size" in out.columns
assert "metadata_size" not in out.columns

read_mask = hlm["io_cat"] == 1
write_mask = hlm["io_cat"] == 2
metadata_mask = hlm["io_cat"] == 3

assert np.allclose(
out.loc[read_mask, "read_count"].astype(float),
pd.to_numeric(hlm.loc[read_mask, "count"], errors="coerce").astype(float),
equal_nan=True,
)
assert out.loc[~read_mask, "read_count"].isna().all()
assert str(out["read_count"].dtype) == "Int64"

assert np.allclose(
out.loc[write_mask, "write_time"].astype(float),
pd.to_numeric(hlm.loc[write_mask, "time"], errors="coerce").astype(float),
equal_nan=True,
)
assert out.loc[~write_mask, "write_time"].isna().all()
assert str(out["write_time"].dtype) == "Float64"

# String-derived columns carry original values for matching rows and missing values otherwise.
# Downstream unique_set_flatten skips missing values.
assert (out.loc[read_mask, "read_func_name"] == hlm.loc[read_mask, "func_name"]).all()
assert out.loc[~read_mask, "read_func_name"].isna().all()
assert (out.loc[metadata_mask, "metadata_func_name"] == hlm.loc[metadata_mask, "func_name"]).all()


def test_set_layer_metrics_preserves_betterset_columns() -> None:
hlm = pd.DataFrame(
{
"group": ["g0", "g0", "g1", "g1"],
"io_cat": pd.Series([1, 2, 1, 3], dtype="Int64"),
"count": pd.Series([1, 2, 3, 4], dtype="Int64"),
"file_name": pd.Series(
[S(["a"]), S(["b"]), S(["c"]), S(["d"])],
dtype="object",
),
}
)
out = Analyzer.set_layer_metrics(
hlm=hlm,
derived_metrics=DERIVED_METRICS,
size_derived_metrics=SIZE_DERIVED_METRICS,
)

read_mask = hlm["io_cat"] == 1
for idx in hlm.index[read_mask]:
assert out.at[idx, "read_file_name"] == hlm.at[idx, "file_name"]
assert out.loc[~read_mask, "read_file_name"].isna().all()

flatten_agg = unique_set_flatten()
chunked = flatten_agg.chunk(out.groupby("group")["read_file_name"])
aggregated = flatten_agg.agg(chunked.groupby(level=0))
assert set(aggregated.loc["g0"]) == {"a"}
assert set(aggregated.loc["g1"]) == {"c"}


def test_set_layer_metrics_perf_smoke() -> None:
hlm = _build_hlm_df(n_rows=50_000)
out = None
for _ in range(8):
out = Analyzer.set_layer_metrics(
hlm=hlm,
derived_metrics=DERIVED_METRICS,
size_derived_metrics=SIZE_DERIVED_METRICS,
)
assert out is not None
assert int(out["read_count"].notna().sum()) > 0
17 changes: 17 additions & 0 deletions tests/utils/test_collection_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import pandas as pd
import pytest
from betterset import BetterSet as S

from dftracer.analyzer.utils.collection_utils import is_set_like_series

pytestmark = [pytest.mark.smoke, pytest.mark.full]


def test_is_set_like_series_detects_betterset_values() -> None:
series = pd.Series([None, S(["a"]), S(["b"])], dtype="object")
assert is_set_like_series(series) is True


def test_is_set_like_series_ignores_plain_strings() -> None:
series = pd.Series(["a", "b", None], dtype="object")
assert is_set_like_series(series) is False
20 changes: 20 additions & 0 deletions tests/utils/test_pandas_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pandas as pd
import pytest

from dftracer.analyzer.utils.pandas_utils import to_nullable_numeric

pytestmark = [pytest.mark.smoke, pytest.mark.full]


def test_to_nullable_numeric_preserves_integer_nullability() -> None:
series = pd.Series([1, 2, 3], dtype="int64")
out = to_nullable_numeric(series).where(pd.Series([True, False, True]))
assert str(out.dtype) == "Int64"
assert out.tolist() == [1, pd.NA, 3]


def test_to_nullable_numeric_preserves_float_nullability() -> None:
series = pd.Series([1.5, 2.5, 3.5], dtype="float64")
out = to_nullable_numeric(series).where(pd.Series([True, False, True]))
assert str(out.dtype) == "Float64"
assert out.tolist() == [1.5, pd.NA, 3.5]