From f29782543880d5929ff374a36eb2d8406119d793 Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Thu, 19 Jan 2023 11:14:53 -0500 Subject: [PATCH 01/15] small improvement to glam functions --- python/mozfun_local/glam.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index 8ad2775..4b9c428 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -98,14 +98,14 @@ def _lists_from_tuples(tuples): def _find_cutoffs(buckets, cdf, percentiles): assert len(percentiles) > 0, "Must provide at least one percentile to calculate" - percentiles = sorted(percentiles) # we only need to go through once - # if values are sorted - + percentiles = sorted(percentiles) # we only need to go through once + # if values are sorted + results = {} max_iter = len(cdf) i = 0 for p in percentiles: - while i < max_iter and cdf[i] < p: + while i < max_iter and cdf[i] < p: i += 1 if i < max_iter: results[p] = buckets[i] @@ -115,7 +115,7 @@ def _find_cutoffs(buckets, cdf, percentiles): return results -def calculate_percentiles(distribution: list, percentiles: list) -> dict: +def calculate_percentiles(distribution, percentiles) -> dict: """Given a list of percentiles and a distribution, find the buckets that represent each percentile. Internal functions are numba jitted python. @@ -125,8 +125,9 @@ def calculate_percentiles(distribution: list, percentiles: list) -> dict: percentiles -- list of floating point values [0.0, 1.0] of the percentiles you wish to calculate """ + # perf improvement as we iterate later and numpy has known types if type(percentiles) != np.ndarray: - percentiles = np.array(percentiles) + percentiles = np.array(percentiles) # type: np.ndarray k, v = _lists_from_tuples(distribution) From 26b45b405a648d9ef7773e00f73d4d02befba79b Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Fri, 20 Jan 2023 09:47:50 -0500 Subject: [PATCH 02/15] added ability to run multiple columns at a time --- python/mozfun_local/glam.py | 45 ++++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index 4b9c428..575b694 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -36,7 +36,7 @@ def _read_metadata(self) -> dict: def glam_style_histogram( - probe: str, + probes: list, keyed: bool, date: str, limit: int = None, @@ -55,30 +55,43 @@ def glam_style_histogram( table -- full path to the table you wish to take probes from (default mozdata.telemetry.main_1pct) """ _limit = f"LIMIT {limit}" if limit else "" - probe_location = ( - f"payload.histograms.{probe}" - if not keyed - else f"payload.keyed_histograms.{probe}" - ) + + probe_locations = [ + ( + f"payload.histograms.{probe}" + if not keyed + else f"payload.keyed_histograms.{probe}" + ) + for probe in probes + ] + + probe_string = (", \n ").join(probe_locations) + sql_query = f"""SELECT - client_id, - application.build_id, - {probe_location}, + client_id, + application.build_id, + {probe_string}, FROM {table} WHERE date(submission_timestamp) = '{date}' - AND date(submission_timestamp) > date(2022, 12, 20) - AND {probe_location} IS NOT NULL - {_limit}""" - - metadata = get_metadata(probe) +AND date(submission_timestamp) > date(2022, 12, 20) +AND sample_id < 10 +{_limit}""" project = table.split(".")[0] bq_client = bigquery.Client(project=project) dataset = bq_client.query(sql_query).result() - df = pl.from_arrow(dataset.to_arrow()) + data = pl.from_arrow(dataset.to_arrow()) # type: pl.DataFrame + + results = [] + for probe in probes: + df = data.select(["client_id", "build_id", probe]) + df = df.filter(pl.col(probe).is_not_null()) + + metadata = get_metadata(probe) - results = _glam_style_histogram(df, metadata) + result = _glam_style_histogram(df, metadata) + results.append((probe, result)) return results From bdb42868f78377194f375baa53672bb7896b8946 Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Fri, 20 Jan 2023 17:34:08 -0500 Subject: [PATCH 03/15] implemented batching for large histograms --- python/mozfun_local/glam.py | 41 +++++++++++++++++++++++++------------ src/glam.rs | 2 +- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index 575b694..dd79bec 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -40,6 +40,7 @@ def glam_style_histogram( keyed: bool, date: str, limit: int = None, + sample_rate: float = None, table: str = "mozdata.telemetry.main_1pct", ) -> list: """Calculate the GLAM style histogram transformation to a given histogram @@ -52,10 +53,18 @@ def glam_style_histogram( keyed -- bool if the histogram is keyed date -- string of date you wish to calculate the transformation for (date is a partition key) limit -- int of the number of rows from the ping to take (default None/no limit) + sample_rate -- float what percent of samples to take, starting at zero, only divisible by 10, default None table -- full path to the table you wish to take probes from (default mozdata.telemetry.main_1pct) """ + scaled_sample = sample_rate * 100 if sample_rate else 10 + if sample_rate is not None: + assert ( + (scaled_sample) % 10 == 0 and sample_rate > 0.0 and sample_rate <= 1.0 + ), "sample rate must be between zero and one, and divsible by 10 in whole number representation" + _limit = f"LIMIT {limit}" if limit else "" + results = [] probe_locations = [ ( f"payload.histograms.{probe}" @@ -67,31 +76,37 @@ def glam_style_histogram( probe_string = (", \n ").join(probe_locations) - sql_query = f"""SELECT + # 0 <= sample_id < 100 + for f in np.arange(10, scaled_sample + 10, 10): + print(f) + if sample_rate: + sample_id_string = f"AND sample_id >= {int(f - 10)} AND sample_id < {int(f)}" + else: + sample_id_string = "" + sql_query = f"""SELECT client_id, application.build_id, {probe_string}, FROM {table} WHERE date(submission_timestamp) = '{date}' AND date(submission_timestamp) > date(2022, 12, 20) -AND sample_id < 10 +{sample_id_string} {_limit}""" - project = table.split(".")[0] - bq_client = bigquery.Client(project=project) + project = table.split(".")[0] + bq_client = bigquery.Client(project=project) - dataset = bq_client.query(sql_query).result() - data = pl.from_arrow(dataset.to_arrow()) # type: pl.DataFrame + dataset = bq_client.query(sql_query).result() + data = pl.from_arrow(dataset.to_arrow()) # type: pl.DataFrame - results = [] - for probe in probes: - df = data.select(["client_id", "build_id", probe]) - df = df.filter(pl.col(probe).is_not_null()) + for probe in probes: + df = data.select(["client_id", "build_id", probe]) + df = df.filter(pl.col(probe).is_not_null()) - metadata = get_metadata(probe) + metadata = get_metadata(probe) - result = _glam_style_histogram(df, metadata) - results.append((probe, result)) + result = _glam_style_histogram(df, metadata) + results.append((probe, f, f-10, result)) return results diff --git a/src/glam.rs b/src/glam.rs index a017103..c4a88ca 100644 --- a/src/glam.rs +++ b/src/glam.rs @@ -131,7 +131,7 @@ fn hist_to_normed_sorted(hist: &HashMap) -> Vec<(usize, f64)> { let mut normalized: Vec<(usize, f64)> = hist .iter() - .map(|(k, v)| (*k, *v / total)) + .map(|(k, v)| (*k as usize, *v / total)) .collect(); normalized.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); From 9b797760fc4f73816369e28546051ef6506f84a8 Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Fri, 20 Jan 2023 17:40:10 -0500 Subject: [PATCH 04/15] renamed sample to more logical batch --- python/mozfun_local/glam.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index dd79bec..0abe42a 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -40,7 +40,7 @@ def glam_style_histogram( keyed: bool, date: str, limit: int = None, - sample_rate: float = None, + batch_size: float = None, table: str = "mozdata.telemetry.main_1pct", ) -> list: """Calculate the GLAM style histogram transformation to a given histogram @@ -53,13 +53,13 @@ def glam_style_histogram( keyed -- bool if the histogram is keyed date -- string of date you wish to calculate the transformation for (date is a partition key) limit -- int of the number of rows from the ping to take (default None/no limit) - sample_rate -- float what percent of samples to take, starting at zero, only divisible by 10, default None + batch_size -- float what percent of samples to take per batch, starting at zero, only divisible by 10, default None table -- full path to the table you wish to take probes from (default mozdata.telemetry.main_1pct) """ - scaled_sample = sample_rate * 100 if sample_rate else 10 - if sample_rate is not None: + scaled_batch_size = batch_size * 100 if batch_size else 10 + if batch_size is not None: assert ( - (scaled_sample) % 10 == 0 and sample_rate > 0.0 and sample_rate <= 1.0 + (scaled_batch_size) % 10 == 0 and batch_size > 0.0 and batch_size <= 1.0 ), "sample rate must be between zero and one, and divsible by 10 in whole number representation" _limit = f"LIMIT {limit}" if limit else "" @@ -77,9 +77,9 @@ def glam_style_histogram( probe_string = (", \n ").join(probe_locations) # 0 <= sample_id < 100 - for f in np.arange(10, scaled_sample + 10, 10): + for f in np.arange(10, scaled_batch_size + 10, 10): print(f) - if sample_rate: + if batch_size: sample_id_string = f"AND sample_id >= {int(f - 10)} AND sample_id < {int(f)}" else: sample_id_string = "" From 8b817311a311b800e9c4f1731dc4e48b7e5e5ced Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Fri, 20 Jan 2023 17:40:20 -0500 Subject: [PATCH 05/15] renamed sample to more logical batch --- python/mozfun_local/glam.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index 0abe42a..d9b14fc 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -80,7 +80,9 @@ def glam_style_histogram( for f in np.arange(10, scaled_batch_size + 10, 10): print(f) if batch_size: - sample_id_string = f"AND sample_id >= {int(f - 10)} AND sample_id < {int(f)}" + sample_id_string = ( + f"AND sample_id >= {int(f - 10)} AND sample_id < {int(f)}" + ) else: sample_id_string = "" sql_query = f"""SELECT @@ -106,7 +108,7 @@ def glam_style_histogram( metadata = get_metadata(probe) result = _glam_style_histogram(df, metadata) - results.append((probe, f, f-10, result)) + results.append((probe, f, f - 10, result)) return results From 8ee5b1b5dec2778fee9afc5494611f4da0c82a48 Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Fri, 20 Jan 2023 17:41:09 -0500 Subject: [PATCH 06/15] renamed sample to more logical batch --- src/glam.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/glam.rs b/src/glam.rs index c4a88ca..b49f514 100644 --- a/src/glam.rs +++ b/src/glam.rs @@ -129,10 +129,7 @@ fn generate_linear_buckets(min: usize, max: usize, n_buckets: usize) -> Vec) -> Vec<(usize, f64)> { let total = hist.values().sum::().round(); - let mut normalized: Vec<(usize, f64)> = hist - .iter() - .map(|(k, v)| (*k as usize, *v / total)) - .collect(); + let mut normalized: Vec<(usize, f64)> = hist.iter().map(|(k, v)| (*k, *v / total)).collect(); normalized.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); From e01cd1252692d0ee033a8e03cffa54464ab2aa8f Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Mon, 23 Jan 2023 08:40:57 -0500 Subject: [PATCH 07/15] put it back to sample rate --- python/mozfun_local/glam.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index d9b14fc..f9e73aa 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -40,7 +40,7 @@ def glam_style_histogram( keyed: bool, date: str, limit: int = None, - batch_size: float = None, + sample_rate: float = None, table: str = "mozdata.telemetry.main_1pct", ) -> list: """Calculate the GLAM style histogram transformation to a given histogram @@ -53,13 +53,13 @@ def glam_style_histogram( keyed -- bool if the histogram is keyed date -- string of date you wish to calculate the transformation for (date is a partition key) limit -- int of the number of rows from the ping to take (default None/no limit) - batch_size -- float what percent of samples to take per batch, starting at zero, only divisible by 10, default None + sample_rate -- float what percent of samples to take per histogram, starting at zero, only divisible by 10, default None table -- full path to the table you wish to take probes from (default mozdata.telemetry.main_1pct) """ - scaled_batch_size = batch_size * 100 if batch_size else 10 - if batch_size is not None: + scaled_sample_rate = sample_rate * 100 if sample_rate else 10 + if sample_rate is not None: assert ( - (scaled_batch_size) % 10 == 0 and batch_size > 0.0 and batch_size <= 1.0 + (scaled_sample_rate) % 10 == 0 and sample_rate > 0.0 and sample_rate <= 1.0 ), "sample rate must be between zero and one, and divsible by 10 in whole number representation" _limit = f"LIMIT {limit}" if limit else "" @@ -77,9 +77,9 @@ def glam_style_histogram( probe_string = (", \n ").join(probe_locations) # 0 <= sample_id < 100 - for f in np.arange(10, scaled_batch_size + 10, 10): + for f in np.arange(10, scaled_sample_rate + 10, 10): print(f) - if batch_size: + if sample_rate: sample_id_string = ( f"AND sample_id >= {int(f - 10)} AND sample_id < {int(f)}" ) From e300f8acbffb32d6583c1e16481a8e267da6430e Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Mon, 23 Jan 2023 09:48:17 -0500 Subject: [PATCH 08/15] increased sampling rate to 20 percent --- python/mozfun_local/glam.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index f9e73aa..5c825d9 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -53,14 +53,14 @@ def glam_style_histogram( keyed -- bool if the histogram is keyed date -- string of date you wish to calculate the transformation for (date is a partition key) limit -- int of the number of rows from the ping to take (default None/no limit) - sample_rate -- float what percent of samples to take per histogram, starting at zero, only divisible by 10, default None + sample_rate -- float what percent of samples to take per histogram, starting at zero, only divisible by 20, default None table -- full path to the table you wish to take probes from (default mozdata.telemetry.main_1pct) """ - scaled_sample_rate = sample_rate * 100 if sample_rate else 10 + scaled_sample_rate = sample_rate * 100 if sample_rate else 20 if sample_rate is not None: assert ( - (scaled_sample_rate) % 10 == 0 and sample_rate > 0.0 and sample_rate <= 1.0 - ), "sample rate must be between zero and one, and divsible by 10 in whole number representation" + (scaled_sample_rate) % 20 == 0 and sample_rate > 0.0 and sample_rate <= 1.0 + ), "sample rate must be between zero and one, and divsible by 20 in whole number representation" _limit = f"LIMIT {limit}" if limit else "" @@ -77,7 +77,7 @@ def glam_style_histogram( probe_string = (", \n ").join(probe_locations) # 0 <= sample_id < 100 - for f in np.arange(10, scaled_sample_rate + 10, 10): + for f in np.arange(20, scaled_sample_rate + 10, 20): print(f) if sample_rate: sample_id_string = ( From fdbaca0b9f6e00d9a17f7c28c14d30a625c45838 Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Mon, 23 Jan 2023 09:49:25 -0500 Subject: [PATCH 09/15] increased sampling rate to 20 percent --- python/mozfun_local/glam.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index 5c825d9..325722e 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -78,7 +78,6 @@ def glam_style_histogram( # 0 <= sample_id < 100 for f in np.arange(20, scaled_sample_rate + 10, 20): - print(f) if sample_rate: sample_id_string = ( f"AND sample_id >= {int(f - 10)} AND sample_id < {int(f)}" From 0d9e51a1138bb0dc2cf634cab11f19a370dbd0df Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Mon, 23 Jan 2023 12:56:37 -0500 Subject: [PATCH 10/15] added debugging print statements --- python/mozfun_local/glam.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index 325722e..b328dda 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -1,5 +1,6 @@ import os from pathlib import Path +from datetime import datetime from google.cloud import bigquery from mozfun_local.mozfun_local_rust import glam_style_histogram as _glam_style_histogram @@ -73,6 +74,7 @@ def glam_style_histogram( ) for probe in probes ] + print(f"got metadata for probes at {datetime.now()}") probe_string = (", \n ").join(probe_locations) @@ -97,7 +99,9 @@ def glam_style_histogram( project = table.split(".")[0] bq_client = bigquery.Client(project=project) + print(f"starting query at {datetime.now()}") dataset = bq_client.query(sql_query).result() + print(f"got data from bq at {datetime.now()}") data = pl.from_arrow(dataset.to_arrow()) # type: pl.DataFrame for probe in probes: @@ -105,9 +109,10 @@ def glam_style_histogram( df = df.filter(pl.col(probe).is_not_null()) metadata = get_metadata(probe) - + print(f"sending {probe} to Rust at {datetime.now()}") result = _glam_style_histogram(df, metadata) results.append((probe, f, f - 10, result)) + print(f"finished through sample_id {f} at {datetime.now()}") return results From 6876e8290a9b885964609719c637cf98ae784474 Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Mon, 23 Jan 2023 14:38:04 -0500 Subject: [PATCH 11/15] don't move data around in memory unecessarily --- python/mozfun_local/glam.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index b328dda..5099515 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -102,11 +102,13 @@ def glam_style_histogram( print(f"starting query at {datetime.now()}") dataset = bq_client.query(sql_query).result() print(f"got data from bq at {datetime.now()}") - data = pl.from_arrow(dataset.to_arrow()) # type: pl.DataFrame + data = pl.from_arrow(dataset.to_arrow(), rechunk=False) # type: pl.DataFrame + print(f"data moved to arrow and loaded in polars at {datetime.now()}") for probe in probes: df = data.select(["client_id", "build_id", probe]) df = df.filter(pl.col(probe).is_not_null()) + print(f"nulls filtered at {datetime.now()}") metadata = get_metadata(probe) print(f"sending {probe} to Rust at {datetime.now()}") From cccce39ad312d7dba763ed492c59784f5e683f9e Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Wed, 25 Jan 2023 16:49:42 -0500 Subject: [PATCH 12/15] re-added concurrency --- src/glam.rs | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/src/glam.rs b/src/glam.rs index b49f514..eeb52a9 100644 --- a/src/glam.rs +++ b/src/glam.rs @@ -2,6 +2,7 @@ use crate::hist::{parse_main_histograms, parse_metadata_json}; use polars::prelude::*; use pyo3::prelude::*; use pyo3_polars::PyDataFrame; +use rayon::prelude::*; use std::hash::Hash; use std::ops::AddAssign; use std::{ @@ -248,23 +249,28 @@ pub fn glam_style_histogram( .unwrap() .to_string(); let client_level_dfs = df.partition_by(["client_id"]).unwrap(); - let mut client_levels = Vec::new(); - - for d in client_level_dfs { - let metric_column = d.select_series([probe]).unwrap(); - - let histograms_raw = metric_column[0] - .utf8() - .unwrap() - .into_iter() - .collect::>(); - let histograms_parsed = parse_main_histograms(histograms_raw); - let client_aggregatted = map_sum(histograms_parsed); - let client_normed = normalize_histogram_glam(client_aggregatted); + let mut client_levels = Vec::new(); - client_levels.push(client_normed); - } + client_level_dfs + .par_iter() + .map(|d| { + let metric_column = d.select_series([probe]).unwrap(); + + let histograms_raw = metric_column[0] + .utf8() + .unwrap() + .into_iter() + .collect::>(); + let histograms_parsed = parse_main_histograms(histograms_raw); + + let client_aggregatted = map_sum(histograms_parsed); + let client_normed = normalize_histogram_glam(client_aggregatted); + + // client_levels.push(client_normed); + client_normed + }) + .collect_into_vec(&mut client_levels); let build_histograms = map_sum(client_levels); // this is necessary to stop weird floating point behavior From a253aac6ae37282ff5c419962ce8b74e5e01af3b Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Wed, 1 Feb 2023 11:42:15 -0500 Subject: [PATCH 13/15] changed organization a bit --- python/mozfun_local/glam.py | 81 ++++++++++++++++++++++++------------- 1 file changed, 53 insertions(+), 28 deletions(-) diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index 5099515..4a014dc 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -1,6 +1,6 @@ +from datetime import datetime import os from pathlib import Path -from datetime import datetime from google.cloud import bigquery from mozfun_local.mozfun_local_rust import glam_style_histogram as _glam_style_histogram @@ -43,6 +43,7 @@ def glam_style_histogram( limit: int = None, sample_rate: float = None, table: str = "mozdata.telemetry.main_1pct", + step: float = 0.2, ) -> list: """Calculate the GLAM style histogram transformation to a given histogram metric. The result is a list of sorted key-value pairs of bucket and the @@ -56,33 +57,49 @@ def glam_style_histogram( limit -- int of the number of rows from the ping to take (default None/no limit) sample_rate -- float what percent of samples to take per histogram, starting at zero, only divisible by 20, default None table -- full path to the table you wish to take probes from (default mozdata.telemetry.main_1pct) + step -- float to increment sample by """ - scaled_sample_rate = sample_rate * 100 if sample_rate else 20 + return [ + _sql_runner(probe, keyed, date, limit, sample_rate, table, step) + for probe in probes + ] + + +def _sql_runner( + probe: str, + keyed: bool, + date: str, + limit: int = None, + sample_rate: float = None, + table: str = "mozdata.telemetry.main_1pct", + step: float = 0.2, +) -> list: + + scaled_sample_rate = sample_rate * 100 if sample_rate else 10 + step = int(step * 100) if sample_rate is not None: assert ( - (scaled_sample_rate) % 20 == 0 and sample_rate > 0.0 and sample_rate <= 1.0 - ), "sample rate must be between zero and one, and divsible by 20 in whole number representation" + sample_rate > 0.0 and sample_rate <= 1.0 # and step <= sample_rate + ), "sample rate must be between zero and one, step must be <= sample rate, and note that cleanly dividing choices will probably result in more predictable behavior, though you may ignore and allow numpy to do whatever it likes." _limit = f"LIMIT {limit}" if limit else "" results = [] - probe_locations = [ - ( - f"payload.histograms.{probe}" - if not keyed - else f"payload.keyed_histograms.{probe}" - ) - for probe in probes - ] - print(f"got metadata for probes at {datetime.now()}") + probe_string = ( + f"payload.histograms.{probe}" + if not keyed + else f"payload.keyed_histograms.{probe}" + ) + + print(f"got metadata for probe {probe} at {datetime.now()}") - probe_string = (", \n ").join(probe_locations) + # probe_string = (", \n ").join(probe_locations) # 0 <= sample_id < 100 - for f in np.arange(20, scaled_sample_rate + 10, 20): + for f in np.arange(step, scaled_sample_rate + step, step): if sample_rate: sample_id_string = ( - f"AND sample_id >= {int(f - 10)} AND sample_id < {int(f)}" + f"AND sample_id >= {int(f - step)} AND sample_id < {int(f)}" ) else: sample_id_string = "" @@ -90,31 +107,39 @@ def glam_style_histogram( client_id, application.build_id, {probe_string}, -FROM {table} +FROM {table} as t WHERE date(submission_timestamp) = '{date}' AND date(submission_timestamp) > date(2022, 12, 20) +AND {probe_string} is not null {sample_id_string} {_limit}""" project = table.split(".")[0] bq_client = bigquery.Client(project=project) + job_config = bigquery.QueryJobConfig( + allow_large_results=True, priority=bigquery.QueryPriority.BATCH + ) + print(f"starting query at {datetime.now()}") - dataset = bq_client.query(sql_query).result() + dataset = bq_client.query(sql_query, job_config=job_config).result() print(f"got data from bq at {datetime.now()}") - data = pl.from_arrow(dataset.to_arrow(), rechunk=False) # type: pl.DataFrame + data = pl.from_arrow( + dataset.to_arrow( + progress_bar_type="tqdm", + ).combine_chunks(), + rechunk=False, + ) # type: pl.DataFrame print(f"data moved to arrow and loaded in polars at {datetime.now()}") - for probe in probes: - df = data.select(["client_id", "build_id", probe]) - df = df.filter(pl.col(probe).is_not_null()) - print(f"nulls filtered at {datetime.now()}") + df = data.select(["client_id", "build_id", probe]) + print(f"nulls filtered at {datetime.now()}") - metadata = get_metadata(probe) - print(f"sending {probe} to Rust at {datetime.now()}") - result = _glam_style_histogram(df, metadata) - results.append((probe, f, f - 10, result)) - print(f"finished through sample_id {f} at {datetime.now()}") + metadata = get_metadata(probe) + print(f"sending {probe} to Rust at {datetime.now()}") + result = _glam_style_histogram(df, metadata) + results.append((probe, f, f - 10, result)) + print(f"finished through sample_id {f} at {datetime.now()}") return results From e5a5cf99a01a7decb054bd1c2aa3fc313abc446a Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Wed, 1 Feb 2023 16:49:42 -0500 Subject: [PATCH 14/15] improve concurrency a bit --- src/glam.rs | 116 ++++++++++++++++++++++++++++------------------------ 1 file changed, 63 insertions(+), 53 deletions(-) diff --git a/src/glam.rs b/src/glam.rs index eeb52a9..0d4667e 100644 --- a/src/glam.rs +++ b/src/glam.rs @@ -169,7 +169,7 @@ fn fill_buckets(hist: &mut HashMap, buckets: &[usize]) { /// These are distinct sets, and the Glean set are predefined based on the type /// of histogram as defined in Glean fn calculate_dirichlet_distribution( - histogram_vector: HashMap, + histogram_vector: &HashMap, histogram_type: String, n_reporting: f64, positional_zero: usize, @@ -195,8 +195,8 @@ fn calculate_dirichlet_distribution( // 6.generate the array of all bucket values we need to fill in and // add the dirichlet transfromed value (5) to the appropriate bucket // - let mut hist = histogram_vector; // when we take the dictionary in from Python, - // we probably cannot take it as a reference + let mut hist = histogram_vector.to_owned(); // when we take the dictionary in from Python, + // we probably cannot take it as a reference // assuming at this point I have aggregated, normalized histograms let histogram_type = Distribution::from_str(histogram_type.as_str()); @@ -237,59 +237,69 @@ pub fn glam_style_histogram( let probe = histogram_metadata.probe.as_str(); let data: DataFrame = pydf.into(); - let partitioned_data = data.partition_by(["build_id"]).unwrap(); + let partitioned_data = data.partition_by(["build_id"]).unwrap().to_owned(); + + let builds: Vec<(String, HashMap)> = partitioned_data + .iter() + .map(|df| { + let build_id = df + .column("build_id") + .unwrap() + .str_value(0) + .unwrap() + .to_string(); + let client_level_dfs = df.partition_by(["client_id"]).unwrap(); + + let mut client_levels = Vec::new(); + client_level_dfs + .par_iter() + .map(|d| { + let metric_column = d.select_series([probe]).unwrap(); + + let histograms_raw = metric_column[0] + .utf8() + .unwrap() + .into_iter() + .collect::>(); + let histograms_parsed = parse_main_histograms(histograms_raw); + + let client_aggregated = map_sum(histograms_parsed); + let client_normed = normalize_histogram_glam(client_aggregated); + + // client_levels.push(client_normed); + client_normed + }) + .collect_into_vec(&mut client_levels); + + let build_histograms = map_sum(client_levels); + (build_id, build_histograms.clone()) + }) + .collect(); let mut results = Vec::new(); + builds + .par_iter() + .map(|(build_id, build_histograms)| { + // this is necessary to stop weird floating point behavior + + let n_reporting = build_histograms.clone().values().sum::().round(); + + let dirichlet_transformed_hists = calculate_dirichlet_distribution( + build_histograms, + histogram_metadata.histogram_type.clone(), + n_reporting, + histogram_metadata.buckets_for_probe[0], + histogram_metadata.buckets_for_probe[1], + histogram_metadata.buckets_for_probe[2], + ) + .unwrap(); + + let result = hist_to_normed_sorted(&dirichlet_transformed_hists); + + (build_id.to_owned(), result) + }) + .collect_into_vec(&mut results); - for df in partitioned_data { - let build_id = df - .column("build_id") - .unwrap() - .str_value(0) - .unwrap() - .to_string(); - let client_level_dfs = df.partition_by(["client_id"]).unwrap(); - - let mut client_levels = Vec::new(); - - client_level_dfs - .par_iter() - .map(|d| { - let metric_column = d.select_series([probe]).unwrap(); - - let histograms_raw = metric_column[0] - .utf8() - .unwrap() - .into_iter() - .collect::>(); - let histograms_parsed = parse_main_histograms(histograms_raw); - - let client_aggregatted = map_sum(histograms_parsed); - let client_normed = normalize_histogram_glam(client_aggregatted); - - // client_levels.push(client_normed); - client_normed - }) - .collect_into_vec(&mut client_levels); - - let build_histograms = map_sum(client_levels); - // this is necessary to stop weird floating point behavior - let n_reporting = build_histograms.clone().values().sum::().round(); - - let dirichlet_transformed_hists = calculate_dirichlet_distribution( - build_histograms, - histogram_metadata.histogram_type.clone(), - n_reporting, - histogram_metadata.buckets_for_probe[0], - histogram_metadata.buckets_for_probe[1], - histogram_metadata.buckets_for_probe[2], - ) - .unwrap(); - - let result = hist_to_normed_sorted(&dirichlet_transformed_hists); - - results.push((build_id, result)); - } Ok(results) } From cdc20f8fa1bdef78e2ae7649aa11e52b24898033 Mon Sep 17 00:00:00 2001 From: Perry McManis Date: Fri, 3 Feb 2023 10:13:58 -0500 Subject: [PATCH 15/15] concurrency improvements --- python/mozfun_local/glam.py | 31 ++++++++++++++++++------------- src/glam.rs | 7 ++----- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/python/mozfun_local/glam.py b/python/mozfun_local/glam.py index 4a014dc..4eb1b3f 100644 --- a/python/mozfun_local/glam.py +++ b/python/mozfun_local/glam.py @@ -3,8 +3,10 @@ from pathlib import Path from google.cloud import bigquery -from mozfun_local.mozfun_local_rust import glam_style_histogram as _glam_style_histogram import numpy as np + +from google.cloud.bigquery_storage import BigQueryReadClient +from mozfun_local.mozfun_local_rust import glam_style_histogram as _glam_style_histogram import polars as pl @@ -79,7 +81,7 @@ def _sql_runner( step = int(step * 100) if sample_rate is not None: assert ( - sample_rate > 0.0 and sample_rate <= 1.0 # and step <= sample_rate + sample_rate > 0.0 and sample_rate <= 1.0 ), "sample rate must be between zero and one, step must be <= sample rate, and note that cleanly dividing choices will probably result in more predictable behavior, though you may ignore and allow numpy to do whatever it likes." _limit = f"LIMIT {limit}" if limit else "" @@ -93,9 +95,6 @@ def _sql_runner( print(f"got metadata for probe {probe} at {datetime.now()}") - # probe_string = (", \n ").join(probe_locations) - - # 0 <= sample_id < 100 for f in np.arange(step, scaled_sample_rate + step, step): if sample_rate: sample_id_string = ( @@ -108,6 +107,8 @@ def _sql_runner( application.build_id, {probe_string}, FROM {table} as t +INNER JOIN + build_ids b on t.application.build_id = b.build_id and b.channel = t.normalized_channel WHERE date(submission_timestamp) = '{date}' AND date(submission_timestamp) > date(2022, 12, 20) AND {probe_string} is not null @@ -117,23 +118,27 @@ def _sql_runner( project = table.split(".")[0] bq_client = bigquery.Client(project=project) - job_config = bigquery.QueryJobConfig( - allow_large_results=True, priority=bigquery.QueryPriority.BATCH - ) + job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.BATCH) + + storage_client = BigQueryReadClient() print(f"starting query at {datetime.now()}") dataset = bq_client.query(sql_query, job_config=job_config).result() print(f"got data from bq at {datetime.now()}") + dataset = dataset.to_arrow( + progress_bar_type="tqdm", bqstorage_client=storage_client + ).combine_chunks() + print(f"data moved to arrow at {datetime.now()}") data = pl.from_arrow( - dataset.to_arrow( - progress_bar_type="tqdm", - ).combine_chunks(), + dataset, rechunk=False, ) # type: pl.DataFrame - print(f"data moved to arrow and loaded in polars at {datetime.now()}") + print( + f"data moved to arrow at {datetime.now()} and loaded in polars at {datetime.now()}" + ) df = data.select(["client_id", "build_id", probe]) - print(f"nulls filtered at {datetime.now()}") + print(f"columns selected at {datetime.now()}") metadata = get_metadata(probe) print(f"sending {probe} to Rust at {datetime.now()}") diff --git a/src/glam.rs b/src/glam.rs index 0d4667e..ed0ccf3 100644 --- a/src/glam.rs +++ b/src/glam.rs @@ -45,11 +45,11 @@ fn normalize_histogram_glam(hist: HashMap) -> HashMap { fn map_sum(maps: Vec>) -> HashMap { let mut result_map = HashMap::new(); - for m in maps { + maps.into_iter().for_each(|m| { for (k, v) in m { result_map.entry(k).and_modify(|y| *y += v).or_insert(v); } - } + }); result_map } @@ -121,7 +121,6 @@ fn generate_linear_buckets(min: usize, max: usize, n_buckets: usize) -> Vec().round(); let dirichlet_transformed_hists = calculate_dirichlet_distribution(