Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 93 additions & 26 deletions python/mozfun_local/glam.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from datetime import datetime
import os
from pathlib import Path

from google.cloud import bigquery
from mozfun_local.mozfun_local_rust import glam_style_histogram as _glam_style_histogram
import numpy as np

from google.cloud.bigquery_storage import BigQueryReadClient
from mozfun_local.mozfun_local_rust import glam_style_histogram as _glam_style_histogram
import polars as pl


Expand Down Expand Up @@ -36,11 +39,13 @@ def _read_metadata(self) -> dict:


def glam_style_histogram(
probe: str,
probes: list,
keyed: bool,
date: str,
limit: int = None,
sample_rate: float = None,
table: str = "mozdata.telemetry.main_1pct",
step: float = 0.2,
) -> list:
"""Calculate the GLAM style histogram transformation to a given histogram
metric. The result is a list of sorted key-value pairs of bucket and the
Expand All @@ -52,33 +57,94 @@ def glam_style_histogram(
keyed -- bool if the histogram is keyed
date -- string of date you wish to calculate the transformation for (date is a partition key)
limit -- int of the number of rows from the ping to take (default None/no limit)
sample_rate -- float what percent of samples to take per histogram, starting at zero, only divisible by 20, default None
table -- full path to the table you wish to take probes from (default mozdata.telemetry.main_1pct)
step -- float to increment sample by
"""
return [
_sql_runner(probe, keyed, date, limit, sample_rate, table, step)
for probe in probes
]


def _sql_runner(
probe: str,
keyed: bool,
date: str,
limit: int = None,
sample_rate: float = None,
table: str = "mozdata.telemetry.main_1pct",
step: float = 0.2,
) -> list:

scaled_sample_rate = sample_rate * 100 if sample_rate else 10
step = int(step * 100)
if sample_rate is not None:
assert (
sample_rate > 0.0 and sample_rate <= 1.0
), "sample rate must be between zero and one, step must be <= sample rate, and note that cleanly dividing choices will probably result in more predictable behavior, though you may ignore and allow numpy to do whatever it likes."

_limit = f"LIMIT {limit}" if limit else ""
probe_location = (

results = []
probe_string = (
f"payload.histograms.{probe}"
if not keyed
else f"payload.keyed_histograms.{probe}"
)
sql_query = f"""SELECT
client_id,
application.build_id,
{probe_location},
FROM {table}
WHERE date(submission_timestamp) = '{date}'
AND date(submission_timestamp) > date(2022, 12, 20)
AND {probe_location} IS NOT NULL
{_limit}"""

metadata = get_metadata(probe)
print(f"got metadata for probe {probe} at {datetime.now()}")

project = table.split(".")[0]
bq_client = bigquery.Client(project=project)

dataset = bq_client.query(sql_query).result()
df = pl.from_arrow(dataset.to_arrow())

results = _glam_style_histogram(df, metadata)
for f in np.arange(step, scaled_sample_rate + step, step):
if sample_rate:
sample_id_string = (
f"AND sample_id >= {int(f - step)} AND sample_id < {int(f)}"
)
else:
sample_id_string = ""
sql_query = f"""SELECT
client_id,
application.build_id,
{probe_string},
FROM {table} as t
INNER JOIN
build_ids b on t.application.build_id = b.build_id and b.channel = t.normalized_channel
WHERE date(submission_timestamp) = '{date}'
AND date(submission_timestamp) > date(2022, 12, 20)
AND {probe_string} is not null
{sample_id_string}
{_limit}"""

project = table.split(".")[0]
bq_client = bigquery.Client(project=project)

job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.BATCH)

storage_client = BigQueryReadClient()

print(f"starting query at {datetime.now()}")
dataset = bq_client.query(sql_query, job_config=job_config).result()
print(f"got data from bq at {datetime.now()}")
dataset = dataset.to_arrow(
progress_bar_type="tqdm", bqstorage_client=storage_client
).combine_chunks()
print(f"data moved to arrow at {datetime.now()}")
data = pl.from_arrow(
dataset,
rechunk=False,
) # type: pl.DataFrame
print(
f"data moved to arrow at {datetime.now()} and loaded in polars at {datetime.now()}"
)

df = data.select(["client_id", "build_id", probe])
print(f"columns selected at {datetime.now()}")

metadata = get_metadata(probe)
print(f"sending {probe} to Rust at {datetime.now()}")
result = _glam_style_histogram(df, metadata)
results.append((probe, f, f - 10, result))
print(f"finished through sample_id {f} at {datetime.now()}")

return results

Expand All @@ -98,14 +164,14 @@ def _lists_from_tuples(tuples):

def _find_cutoffs(buckets, cdf, percentiles):
assert len(percentiles) > 0, "Must provide at least one percentile to calculate"
percentiles = sorted(percentiles) # we only need to go through once
# if values are sorted
percentiles = sorted(percentiles) # we only need to go through once
# if values are sorted

results = {}
max_iter = len(cdf)
i = 0
for p in percentiles:
while i < max_iter and cdf[i] < p:
while i < max_iter and cdf[i] < p:
i += 1
if i < max_iter:
results[p] = buckets[i]
Expand All @@ -115,7 +181,7 @@ def _find_cutoffs(buckets, cdf, percentiles):
return results


def calculate_percentiles(distribution: list, percentiles: list) -> dict:
def calculate_percentiles(distribution, percentiles) -> dict:
"""Given a list of percentiles and a distribution, find the buckets that
represent each percentile. Internal functions are numba jitted python.

Expand All @@ -125,8 +191,9 @@ def calculate_percentiles(distribution: list, percentiles: list) -> dict:
percentiles -- list of floating point values [0.0, 1.0] of the percentiles
you wish to calculate
"""
# perf improvement as we iterate later and numpy has known types
if type(percentiles) != np.ndarray:
percentiles = np.array(percentiles)
percentiles = np.array(percentiles) # type: np.ndarray

k, v = _lists_from_tuples(distribution)

Expand Down
118 changes: 64 additions & 54 deletions src/glam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::hist::{parse_main_histograms, parse_metadata_json};
use polars::prelude::*;
use pyo3::prelude::*;
use pyo3_polars::PyDataFrame;
use rayon::prelude::*;
use std::hash::Hash;
use std::ops::AddAssign;
use std::{
Expand Down Expand Up @@ -44,11 +45,11 @@ fn normalize_histogram_glam(hist: HashMap<i64, i64>) -> HashMap<usize, f64> {
fn map_sum<T: Hash + Eq + Copy, U: AddAssign + Copy>(maps: Vec<HashMap<T, U>>) -> HashMap<T, U> {
let mut result_map = HashMap::new();

for m in maps {
maps.into_iter().for_each(|m| {
for (k, v) in m {
result_map.entry(k).and_modify(|y| *y += v).or_insert(v);
}
}
});

result_map
}
Expand Down Expand Up @@ -120,7 +121,6 @@ fn generate_linear_buckets(min: usize, max: usize, n_buckets: usize) -> Vec<usiz

result.push(linear_range);
}

result
}

Expand All @@ -129,10 +129,7 @@ fn generate_linear_buckets(min: usize, max: usize, n_buckets: usize) -> Vec<usiz
fn hist_to_normed_sorted(hist: &HashMap<usize, f64>) -> Vec<(usize, f64)> {
let total = hist.values().sum::<f64>().round();

let mut normalized: Vec<(usize, f64)> = hist
.iter()
.map(|(k, v)| (*k, *v / total))
.collect();
let mut normalized: Vec<(usize, f64)> = hist.iter().map(|(k, v)| (*k, *v / total)).collect();

normalized.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());

Expand Down Expand Up @@ -171,7 +168,7 @@ fn fill_buckets(hist: &mut HashMap<usize, f64>, buckets: &[usize]) {
/// These are distinct sets, and the Glean set are predefined based on the type
/// of histogram as defined in Glean
fn calculate_dirichlet_distribution(
histogram_vector: HashMap<usize, f64>,
histogram_vector: &HashMap<usize, f64>,
histogram_type: String,
n_reporting: f64,
positional_zero: usize,
Expand All @@ -197,8 +194,8 @@ fn calculate_dirichlet_distribution(
// 6.generate the array of all bucket values we need to fill in and
// add the dirichlet transfromed value (5) to the appropriate bucket
//
let mut hist = histogram_vector; // when we take the dictionary in from Python,
// we probably cannot take it as a reference
let mut hist = histogram_vector.to_owned(); // when we take the dictionary in from Python,
// we probably cannot take it as a reference

// assuming at this point I have aggregated, normalized histograms
let histogram_type = Distribution::from_str(histogram_type.as_str());
Expand Down Expand Up @@ -239,54 +236,67 @@ pub fn glam_style_histogram(
let probe = histogram_metadata.probe.as_str();
let data: DataFrame = pydf.into();

let partitioned_data = data.partition_by(["build_id"]).unwrap();
let partitioned_data = data.partition_by(["build_id"]).unwrap().to_owned();

let mut results = Vec::new();

for df in partitioned_data {
let build_id = df
.column("build_id")
.unwrap()
.str_value(0)
.unwrap()
.to_string();
let client_level_dfs = df.partition_by(["client_id"]).unwrap();
let mut client_levels = Vec::new();

for d in client_level_dfs {
let metric_column = d.select_series([probe]).unwrap();

let histograms_raw = metric_column[0]
.utf8()
let builds: Vec<(String, HashMap<usize, f64>)> = partitioned_data
.iter()
.map(|df| {
let build_id = df
.column("build_id")
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let histograms_parsed = parse_main_histograms(histograms_raw);

let client_aggregatted = map_sum(histograms_parsed);
let client_normed = normalize_histogram_glam(client_aggregatted);

client_levels.push(client_normed);
}

let build_histograms = map_sum(client_levels);
// this is necessary to stop weird floating point behavior
let n_reporting = build_histograms.clone().values().sum::<f64>().round();

let dirichlet_transformed_hists = calculate_dirichlet_distribution(
build_histograms,
histogram_metadata.histogram_type.clone(),
n_reporting,
histogram_metadata.buckets_for_probe[0],
histogram_metadata.buckets_for_probe[1],
histogram_metadata.buckets_for_probe[2],
)
.unwrap();
.str_value(0)
.unwrap()
.to_string();
let client_level_dfs = df.partition_by(["client_id"]).unwrap();

let mut client_levels = Vec::new();
client_level_dfs
.par_iter()
.map(|d| {
let metric_column = d.select_series([probe]).unwrap();

let histograms_raw = metric_column[0]
.utf8()
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let histograms_parsed = parse_main_histograms(histograms_raw);

let client_aggregated = map_sum(histograms_parsed);
let client_normed = normalize_histogram_glam(client_aggregated);

client_normed
})
.collect_into_vec(&mut client_levels);

let build_histograms = map_sum(client_levels);
(build_id, build_histograms.clone())
})
.collect();

let result = hist_to_normed_sorted(&dirichlet_transformed_hists);
let mut results = Vec::new();
builds
.par_iter()
.map(|(build_id, build_histograms)| {
// this is necessary to stop weird floating point behavior
let n_reporting = build_histograms.clone().values().sum::<f64>().round();

let dirichlet_transformed_hists = calculate_dirichlet_distribution(
build_histograms,
histogram_metadata.histogram_type.clone(),
n_reporting,
histogram_metadata.buckets_for_probe[0],
histogram_metadata.buckets_for_probe[1],
histogram_metadata.buckets_for_probe[2],
)
.unwrap();

let result = hist_to_normed_sorted(&dirichlet_transformed_hists);

(build_id.to_owned(), result)
})
.collect_into_vec(&mut results);

results.push((build_id, result));
}
Ok(results)
}

Expand Down