diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index 8ad2775..4eb1b3f 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -1,9 +1,12 @@ +from datetime import datetime import os from pathlib import Path from google.cloud import bigquery -from mozfun_local.mozfun_local_rust import glam_style_histogram as _glam_style_histogram import numpy as np + +from google.cloud.bigquery_storage import BigQueryReadClient +from mozfun_local.mozfun_local_rust import glam_style_histogram as _glam_style_histogram import polars as pl @@ -36,11 +39,13 @@ def _read_metadata(self) -> dict: def glam_style_histogram( - probe: str, + probes: list, keyed: bool, date: str, limit: int = None, + sample_rate: float = None, table: str = "mozdata.telemetry.main_1pct", + step: float = 0.2, ) -> list: """Calculate the GLAM style histogram transformation to a given histogram metric. The result is a list of sorted key-value pairs of bucket and the @@ -52,33 +57,94 @@ def glam_style_histogram( keyed -- bool if the histogram is keyed date -- string of date you wish to calculate the transformation for (date is a partition key) limit -- int of the number of rows from the ping to take (default None/no limit) + sample_rate -- float what percent of samples to take per histogram, starting at zero, only divisible by 20, default None table -- full path to the table you wish to take probes from (default mozdata.telemetry.main_1pct) + step -- float to increment sample by """ + return [ + _sql_runner(probe, keyed, date, limit, sample_rate, table, step) + for probe in probes + ] + + +def _sql_runner( + probe: str, + keyed: bool, + date: str, + limit: int = None, + sample_rate: float = None, + table: str = "mozdata.telemetry.main_1pct", + step: float = 0.2, +) -> list: + + scaled_sample_rate = sample_rate * 100 if sample_rate else 10 + step = int(step * 100) + if sample_rate is not None: + assert ( + sample_rate > 0.0 and sample_rate <= 1.0 + ), "sample rate must be between zero and one, step must be <= sample rate, and note that cleanly dividing choices will probably result in more predictable behavior, though you may ignore and allow numpy to do whatever it likes." + _limit = f"LIMIT {limit}" if limit else "" - probe_location = ( + + results = [] + probe_string = ( f"payload.histograms.{probe}" if not keyed else f"payload.keyed_histograms.{probe}" ) - sql_query = f"""SELECT - client_id, - application.build_id, - {probe_location}, -FROM {table} -WHERE date(submission_timestamp) = '{date}' - AND date(submission_timestamp) > date(2022, 12, 20) - AND {probe_location} IS NOT NULL - {_limit}""" - metadata = get_metadata(probe) + print(f"got metadata for probe {probe} at {datetime.now()}") - project = table.split(".")[0] - bq_client = bigquery.Client(project=project) - - dataset = bq_client.query(sql_query).result() - df = pl.from_arrow(dataset.to_arrow()) - - results = _glam_style_histogram(df, metadata) + for f in np.arange(step, scaled_sample_rate + step, step): + if sample_rate: + sample_id_string = ( + f"AND sample_id >= {int(f - step)} AND sample_id < {int(f)}" + ) + else: + sample_id_string = "" + sql_query = f"""SELECT + client_id, + application.build_id, + {probe_string}, +FROM {table} as t +INNER JOIN + build_ids b on t.application.build_id = b.build_id and b.channel = t.normalized_channel +WHERE date(submission_timestamp) = '{date}' +AND date(submission_timestamp) > date(2022, 12, 20) +AND {probe_string} is not null +{sample_id_string} +{_limit}""" + + project = table.split(".")[0] + bq_client = bigquery.Client(project=project) + + job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.BATCH) + + storage_client = BigQueryReadClient() + + print(f"starting query at {datetime.now()}") + dataset = bq_client.query(sql_query, job_config=job_config).result() + print(f"got data from bq at {datetime.now()}") + dataset = dataset.to_arrow( + progress_bar_type="tqdm", bqstorage_client=storage_client + ).combine_chunks() + print(f"data moved to arrow at {datetime.now()}") + data = pl.from_arrow( + dataset, + rechunk=False, + ) # type: pl.DataFrame + print( + f"data moved to arrow at {datetime.now()} and loaded in polars at {datetime.now()}" + ) + + df = data.select(["client_id", "build_id", probe]) + print(f"columns selected at {datetime.now()}") + + metadata = get_metadata(probe) + print(f"sending {probe} to Rust at {datetime.now()}") + result = _glam_style_histogram(df, metadata) + results.append((probe, f, f - 10, result)) + print(f"finished through sample_id {f} at {datetime.now()}") return results @@ -98,14 +164,14 @@ def _lists_from_tuples(tuples): def _find_cutoffs(buckets, cdf, percentiles): assert len(percentiles) > 0, "Must provide at least one percentile to calculate" - percentiles = sorted(percentiles) # we only need to go through once - # if values are sorted - + percentiles = sorted(percentiles) # we only need to go through once + # if values are sorted + results = {} max_iter = len(cdf) i = 0 for p in percentiles: - while i < max_iter and cdf[i] < p: + while i < max_iter and cdf[i] < p: i += 1 if i < max_iter: results[p] = buckets[i] @@ -115,7 +181,7 @@ def _find_cutoffs(buckets, cdf, percentiles): return results -def calculate_percentiles(distribution: list, percentiles: list) -> dict: +def calculate_percentiles(distribution, percentiles) -> dict: """Given a list of percentiles and a distribution, find the buckets that represent each percentile. Internal functions are numba jitted python. @@ -125,8 +191,9 @@ def calculate_percentiles(distribution: list, percentiles: list) -> dict: percentiles -- list of floating point values [0.0, 1.0] of the percentiles you wish to calculate """ + # perf improvement as we iterate later and numpy has known types if type(percentiles) != np.ndarray: - percentiles = np.array(percentiles) + percentiles = np.array(percentiles) # type: np.ndarray k, v = _lists_from_tuples(distribution) diff --git a/src/glam.rs b/src/glam.rs index a017103..ed0ccf3 100644 --- a/src/glam.rs +++ b/src/glam.rs @@ -2,6 +2,7 @@ use crate::hist::{parse_main_histograms, parse_metadata_json}; use polars::prelude::*; use pyo3::prelude::*; use pyo3_polars::PyDataFrame; +use rayon::prelude::*; use std::hash::Hash; use std::ops::AddAssign; use std::{ @@ -44,11 +45,11 @@ fn normalize_histogram_glam(hist: HashMap) -> HashMap { fn map_sum(maps: Vec>) -> HashMap { let mut result_map = HashMap::new(); - for m in maps { + maps.into_iter().for_each(|m| { for (k, v) in m { result_map.entry(k).and_modify(|y| *y += v).or_insert(v); } - } + }); result_map } @@ -120,7 +121,6 @@ fn generate_linear_buckets(min: usize, max: usize, n_buckets: usize) -> Vec Vec) -> Vec<(usize, f64)> { let total = hist.values().sum::().round(); - let mut normalized: Vec<(usize, f64)> = hist - .iter() - .map(|(k, v)| (*k, *v / total)) - .collect(); + let mut normalized: Vec<(usize, f64)> = hist.iter().map(|(k, v)| (*k, *v / total)).collect(); normalized.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); @@ -171,7 +168,7 @@ fn fill_buckets(hist: &mut HashMap, buckets: &[usize]) { /// These are distinct sets, and the Glean set are predefined based on the type /// of histogram as defined in Glean fn calculate_dirichlet_distribution( - histogram_vector: HashMap, + histogram_vector: &HashMap, histogram_type: String, n_reporting: f64, positional_zero: usize, @@ -197,8 +194,8 @@ fn calculate_dirichlet_distribution( // 6.generate the array of all bucket values we need to fill in and // add the dirichlet transfromed value (5) to the appropriate bucket // - let mut hist = histogram_vector; // when we take the dictionary in from Python, - // we probably cannot take it as a reference + let mut hist = histogram_vector.to_owned(); // when we take the dictionary in from Python, + // we probably cannot take it as a reference // assuming at this point I have aggregated, normalized histograms let histogram_type = Distribution::from_str(histogram_type.as_str()); @@ -239,54 +236,67 @@ pub fn glam_style_histogram( let probe = histogram_metadata.probe.as_str(); let data: DataFrame = pydf.into(); - let partitioned_data = data.partition_by(["build_id"]).unwrap(); + let partitioned_data = data.partition_by(["build_id"]).unwrap().to_owned(); - let mut results = Vec::new(); - - for df in partitioned_data { - let build_id = df - .column("build_id") - .unwrap() - .str_value(0) - .unwrap() - .to_string(); - let client_level_dfs = df.partition_by(["client_id"]).unwrap(); - let mut client_levels = Vec::new(); - - for d in client_level_dfs { - let metric_column = d.select_series([probe]).unwrap(); - - let histograms_raw = metric_column[0] - .utf8() + let builds: Vec<(String, HashMap)> = partitioned_data + .iter() + .map(|df| { + let build_id = df + .column("build_id") .unwrap() - .into_iter() - .collect::>(); - let histograms_parsed = parse_main_histograms(histograms_raw); - - let client_aggregatted = map_sum(histograms_parsed); - let client_normed = normalize_histogram_glam(client_aggregatted); - - client_levels.push(client_normed); - } - - let build_histograms = map_sum(client_levels); - // this is necessary to stop weird floating point behavior - let n_reporting = build_histograms.clone().values().sum::().round(); - - let dirichlet_transformed_hists = calculate_dirichlet_distribution( - build_histograms, - histogram_metadata.histogram_type.clone(), - n_reporting, - histogram_metadata.buckets_for_probe[0], - histogram_metadata.buckets_for_probe[1], - histogram_metadata.buckets_for_probe[2], - ) - .unwrap(); + .str_value(0) + .unwrap() + .to_string(); + let client_level_dfs = df.partition_by(["client_id"]).unwrap(); + + let mut client_levels = Vec::new(); + client_level_dfs + .par_iter() + .map(|d| { + let metric_column = d.select_series([probe]).unwrap(); + + let histograms_raw = metric_column[0] + .utf8() + .unwrap() + .into_iter() + .collect::>(); + let histograms_parsed = parse_main_histograms(histograms_raw); + + let client_aggregated = map_sum(histograms_parsed); + let client_normed = normalize_histogram_glam(client_aggregated); + + client_normed + }) + .collect_into_vec(&mut client_levels); + + let build_histograms = map_sum(client_levels); + (build_id, build_histograms.clone()) + }) + .collect(); - let result = hist_to_normed_sorted(&dirichlet_transformed_hists); + let mut results = Vec::new(); + builds + .par_iter() + .map(|(build_id, build_histograms)| { + // this is necessary to stop weird floating point behavior + let n_reporting = build_histograms.clone().values().sum::().round(); + + let dirichlet_transformed_hists = calculate_dirichlet_distribution( + build_histograms, + histogram_metadata.histogram_type.clone(), + n_reporting, + histogram_metadata.buckets_for_probe[0], + histogram_metadata.buckets_for_probe[1], + histogram_metadata.buckets_for_probe[2], + ) + .unwrap(); + + let result = hist_to_normed_sorted(&dirichlet_transformed_hists); + + (build_id.to_owned(), result) + }) + .collect_into_vec(&mut results); - results.push((build_id, result)); - } Ok(results) }