From d519549c4495ba9f459189b746cd55ba92e17a13 Mon Sep 17 00:00:00 2001 From: dwr-psandhu Date: Thu, 2 Apr 2026 12:04:10 -0700 Subject: [PATCH 1/3] Enhance error handling and logging across download and reformat modules - Added exception handling in `cdec_download`, `des_download`, `nwis_download`, `noaa_download`, and `ncro_download` functions to log failures and return structured failure information. - Updated `populate_repo` to collect and return station failures, writing them to a CSV file for better traceability. - Implemented tests to ensure that downloaders and reformat functions continue processing despite individual station failures, capturing errors in a structured format. - Enhanced `reformat` and `populate` functions to write failure logs even when no errors occur, ensuring a consistent output format. - Added new test cases to validate the handling of failures in the auto-screening and downloading processes, ensuring robustness in error scenarios. --- dms_datastore/auto_screen.py | 154 +++++++------ dms_datastore/download_cdec.py | 24 ++- dms_datastore/download_des.py | 19 +- dms_datastore/download_ncro.py | 10 +- dms_datastore/download_noaa.py | 13 ++ dms_datastore/download_nwis.py | 27 +++ dms_datastore/populate_repo.py | 61 ++++-- dms_datastore/reformat.py | 35 ++- tests/test_auto_screen_continue_on_error.py | 198 +++++++++++++++++ tests/test_downloader_continue_on_error.py | 226 ++++++++++++++++++++ tests/test_format_usgs.py | 3 +- tests/test_populate_repo_failures.py | 104 +++++++++ tests/test_reformat_continue_on_error.py | 146 +++++++++++++ 13 files changed, 935 insertions(+), 85 deletions(-) create mode 100644 tests/test_auto_screen_continue_on_error.py create mode 100644 tests/test_downloader_continue_on_error.py create mode 100644 tests/test_populate_repo_failures.py create mode 100644 tests/test_reformat_continue_on_error.py diff --git a/dms_datastore/auto_screen.py b/dms_datastore/auto_screen.py index d514f28..4ea3af0 100644 --- a/dms_datastore/auto_screen.py +++ b/dms_datastore/auto_screen.py @@ -172,6 +172,8 @@ def auto_screen( params=None, plot_dest=None, start_station=None, + failures_file=None, + logdir="logs", ): """Auto screen all data in directory Parameters @@ -212,7 +214,7 @@ def auto_screen( actual_fpath = fpath if fpath is not None else repo_root(source_repo) inventory = repo_data_inventory(repo="formatted",in_path=actual_fpath) # repo is the config repo, in_path is the data storage location inventory = filter_inventory_(inventory, stations, params) - failed_read = [] + failures = [] for index, row in inventory.iterrows(): station_id = row["station_id"] @@ -236,70 +238,92 @@ def auto_screen( # Now we have most information, but the time series may be split between sources # with low and high priority fetcher = custom_fetcher(agency) - # these may be lists + step = "read" try: - # logger.debug(f"fetching {fpath},{station_id},{param}") meta_ts = fetcher(source_repo, station_id, param, subloc=subloc, data_path=actual_fpath) + if meta_ts is None: + logger.debug(f"No data found for {station_id} {subloc} {param}") + failures.append({ + "station_id": station_id, "subloc": subloc, "param": param, + "step": step, "exc_type": "NoData", "message": "fetcher returned None", + }) + continue + metas, ts = meta_ts + meta = metas[0] + subloc_actual = ( + meta["sublocation"] + if "sublocation" in meta + else meta["subloc"] if "subloc" in meta else "default" + ) + step = "screen" + proto = context_config(screen_config, station_id, subloc, param) + do_plot = plot_dest is not None + subloc_label = "" if subloc == "default" else subloc + plot_label = f"{station_info['name']}_{station_id}@{subloc_label}_{param}" + screened = screener( + ts, + station_id, + subloc_actual, + param, + proto, + do_plot, + plot_label, + plot_dest=plot_dest, + ) + logger.debug(f"screening complete for {station_id} {subloc} {param}") + if "value" in screened.columns: + screened = screened[["value", "user_flag"]] + meta["screen"] = proto + + # Build output filename using configured naming spec for screened repo + output_meta = { + "agency": agency, + "station_id": station_id, + "subloc": subloc_actual if subloc_actual != "default" else None, + "param": param, + "agency_id": row.agency_id, + } + # Add year info if available from metadata + if "year" in meta: + output_meta["year"] = meta["year"] + elif "syear" in meta and "eyear" in meta: + output_meta["syear"] = meta["syear"] + output_meta["eyear"] = meta["eyear"] + + # Get output without shard so that chunk_years will not append one and have it be redundant + output_fname = meta_to_filename(output_meta, repo="screened",include_shard=False) + output_fpath = os.path.join(dest, output_fname) + step = "write" + logger.debug(f"start write for {output_fpath} with meta {meta}") + write_ts_csv(screened, output_fpath, meta, chunk_years=True) + logger.debug("end write") except Exception as e: - logger.warning(f"Read failed for {actual_fpath}, {station_id}, {param}, {subloc}, storage loc = {actual_fpath}") + logger.warning( + f"Failed at step={step} for {station_id}, {subloc}, {param}: {e}" + ) logger.exception(e) - print(e) - meta_ts = None - - if meta_ts is None: - logger.debug(f"No data found for {station_id} {subloc} {param}") - failed_read.append((station_id, subloc, param)) - logger.debug("Cumulative fails:") - for fr in failed_read: - logger.debug(fr) + failures.append({ + "station_id": station_id, + "subloc": subloc, + "param": param, + "step": step, + "exc_type": type(e).__name__, + "message": str(e), + }) continue - metas, ts = meta_ts - meta = metas[0] - subloc_actual = ( - meta["sublocation"] - if "sublocation" in meta - else meta["subloc"] if "subloc" in meta else "default" - ) - proto = context_config(screen_config, station_id, subloc, param) - do_plot = plot_dest is not None - subloc_label = "" if subloc == "default" else subloc - plot_label = f"{station_info['name']}_{station_id}@{subloc_label}_{param}" - screened = screener( - ts, - station_id, - subloc_actual, - param, - proto, - do_plot, - plot_label, - plot_dest=plot_dest, - ) - logger.debug(f"screening complete for {station_id} {subloc} {param}") - if "value" in screened.columns: - screened = screened[["value", "user_flag"]] - meta["screen"] = proto - - # Build output filename using configured naming spec for screened repo - output_meta = { - "agency": agency, - "station_id": station_id, - "subloc": subloc_actual if subloc_actual != "default" else None, - "param": param, - "agency_id": row.agency_id, - } - # Add year info if available from metadata - if "year" in meta: - output_meta["year"] = meta["year"] - elif "syear" in meta and "eyear" in meta: - output_meta["syear"] = meta["syear"] - output_meta["eyear"] = meta["eyear"] - - # Get output without shard so that chunk_years will not append one and have it be redundant - output_fname = meta_to_filename(output_meta, repo="screened",include_shard=False) - output_fpath = os.path.join(dest, output_fname) - logger.debug(f"start write for {output_fpath} with meta {meta}") - write_ts_csv(screened, output_fpath, meta, chunk_years=True) - logger.debug("end write") + + # Write failures CSV + if failures_file is None: + logdir_path = Path(logdir) + logdir_path.mkdir(exist_ok=True) + failures_file = logdir_path / "auto_screen_failures.csv" + failures_file = Path(failures_file) + failures_file.parent.mkdir(parents=True, exist_ok=True) + pd.DataFrame( + failures, + columns=["station_id", "subloc", "param", "step", "exc_type", "message"], + ).to_csv(failures_file, index=False) + logger.info(f"Failures written to {failures_file} ({len(failures)} entries)") def update_steps(proto, x): @@ -582,9 +606,15 @@ def test_single(fname): # not maintained @click.option("--logdir", type=click.Path(path_type=Path), default="logs") @click.option("--debug", is_flag=True) @click.option("--quiet", is_flag=True) +@click.option( + "--failures-file", + type=click.Path(path_type=Path), + default=None, + help="Path for the failures CSV. Defaults to {logdir}/auto_screen_failures.csv.", +) @click.help_option("-h", "--help") def auto_screen_cli(config, fpath, dest, stations, params, plot_dest, start_station, - logdir=None, debug=False, quiet=False): + logdir=None, debug=False, quiet=False, failures_file=None): """Auto-screen individual files or whole repos.""" level, console = resolve_loglevel( debug=debug, @@ -615,6 +645,8 @@ def auto_screen_cli(config, fpath, dest, stations, params, plot_dest, start_stat params=params_list, plot_dest=plot_dest, start_station=start_station, + failures_file=failures_file if failures_file is not None else Path(logdir) / "auto_screen_failures.csv", + logdir=logdir, ) diff --git a/dms_datastore/download_cdec.py b/dms_datastore/download_cdec.py index 3b862e7..09d33ce 100644 --- a/dms_datastore/download_cdec.py +++ b/dms_datastore/download_cdec.py @@ -141,9 +141,13 @@ def cdec_download( ) stations = stations.loc[~subloc_inconsist, :] for index, row in stations.iterrows(): - download_station_data( - row, dest_dir, start, end, endfile, param, overwrite, freq, failures, skips - ) + try: + download_station_data( + row, dest_dir, start, end, endfile, param, overwrite, freq, failures, skips + ) + except Exception as e: + logger.error(f"Unhandled exception for station {row.station_id} param {row.param}: {e}") + failures.append((row.station_id, row.param)) # # Use ThreadPoolExecutor # with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: # # Schedule the download tasks and handle them asynchronously @@ -178,6 +182,20 @@ def cdec_download( for failure in failures: logger.info(failure) + failures_dicts = [] + for f in failures: + station_id, param_name = (f[0], f[1]) if len(f) >= 2 else (f[0], None) + failures_dicts.append({ + "agency": "cdec", + "station_id": station_id, + "agency_id": None, + "param": param_name, + "subloc": None, + "exc_type": "DownloadError", + "message": f"Download failed for station {station_id} param {param_name}", + }) + return failures_dicts + diff --git a/dms_datastore/download_des.py b/dms_datastore/download_des.py index 90f4566..fba8b34 100644 --- a/dms_datastore/download_des.py +++ b/dms_datastore/download_des.py @@ -354,7 +354,10 @@ def des_download(stations, dest_dir, start, end=None, param=None, overwrite=Fals itry = itry + 1 sleeptime = 4.0 if itry > 5 else 2.0 if itry >= max_retry: - raise + fmessage = f"ReadingDates failed for station {station}, subloc {subloc}, param {paramname} after {max_retry} retries" + logger.info(fmessage) + failures.append((station, paramname)) + break time.sleep(sleeptime) fstart = rid.start_date @@ -422,6 +425,20 @@ def des_download(stations, dest_dir, start, end=None, param=None, overwrite=Fals for failure in failures: logger.info(failure) + failures_dicts = [] + for f in failures: + station_id, param_name = (f[0], f[1]) if len(f) >= 2 else (f[0], None) + failures_dicts.append({ + "agency": "dwr_des", + "station_id": station_id, + "agency_id": None, + "param": param_name, + "subloc": None, + "exc_type": "DownloadError", + "message": f"Download failed for station {station_id} param {param_name}", + }) + return failures_dicts + diff --git a/dms_datastore/download_ncro.py b/dms_datastore/download_ncro.py index 6217258..fb66740 100644 --- a/dms_datastore/download_ncro.py +++ b/dms_datastore/download_ncro.py @@ -632,7 +632,15 @@ async def _ncro_download_async(stations, dest_dir, stime, etime, overwrite, upda logger.info( f"Exception occurred during download: station={station_id} site={site} trace={trace} err={result}" ) - failures.append((station_id, site, trace, str(result))) + failures.append({ + "agency": "ncro", + "station_id": station_id, + "agency_id": site, + "param": trace, + "subloc": None, + "exc_type": type(result).__name__, + "message": str(result), + }) return failures diff --git a/dms_datastore/download_noaa.py b/dms_datastore/download_noaa.py index b4eb447..2765f63 100644 --- a/dms_datastore/download_noaa.py +++ b/dms_datastore/download_noaa.py @@ -283,6 +283,7 @@ def noaa_download(stations, dest_dir, start, end=None, param=None, overwrite=Fal if not os.path.exists(dest_dir): os.mkdir(dest_dir) skips = [] + failures = [] # This is an attempt to short-circuit the download of water levels for non-tidal stations # The correctness of this remains to be checked. @@ -316,6 +317,18 @@ def noaa_download(stations, dest_dir, start, end=None, param=None, overwrite=Fal future.result() # This line can be used to handle results or exceptions from the tasks except Exception as e: logger.error(f"Exception occurred during download: {e}") + # Identify station if possible from future metadata + failures.append({ + "agency": "noaa", + "station_id": None, + "agency_id": None, + "param": None, + "subloc": None, + "exc_type": type(e).__name__, + "message": str(e), + }) + + return failures def list_stations(): diff --git a/dms_datastore/download_nwis.py b/dms_datastore/download_nwis.py index 6cc9e3a..0e68beb 100644 --- a/dms_datastore/download_nwis.py +++ b/dms_datastore/download_nwis.py @@ -362,6 +362,15 @@ def nwis_download(stations, dest_dir, start, end=None, param=None, overwrite=Fal except Exception as e: logger.debug(traceback.print_tb(e.__traceback__)) logger.error(f"Exception occurred during download: {e}") + failures.append({ + "agency": "usgs", + "station_id": None, + "agency_id": None, + "param": None, + "subloc": None, + "exc_type": type(e).__name__, + "message": str(e), + }) if len(failures) == 0: logger.info("No failed stations") @@ -370,6 +379,24 @@ def nwis_download(stations, dest_dir, start, end=None, param=None, overwrite=Fal for failure in failures: logger.info(failure) + failures_dicts = [] + for f in failures: + if isinstance(f, dict): + failures_dicts.append(f) + else: + # Legacy tuple format: (station, paramname) + station_id, param_name = (f[0], f[1]) if len(f) >= 2 else (f[0], None) + failures_dicts.append({ + "agency": "usgs", + "station_id": station_id, + "agency_id": None, + "param": param_name, + "subloc": None, + "exc_type": "DownloadError", + "message": f"Download failed for station {station_id} param {param_name}", + }) + return failures_dicts + def parse_start_year(txt): date_re = re.compile( diff --git a/dms_datastore/populate_repo.py b/dms_datastore/populate_repo.py index 9112c00..beae1c7 100644 --- a/dms_datastore/populate_repo.py +++ b/dms_datastore/populate_repo.py @@ -251,7 +251,8 @@ def populate_repo( sl2["subloc"] = "lower" stationlist = pd.concat([stationlist, sl1, sl2], axis=0) - downloaders[agency](stationlist, dest_dir, start, end, param, overwrite) + result = downloaders[agency](stationlist, dest_dir, start, end, param, overwrite) + return result if result is not None else [] def _write_renames(renames, outfile): @@ -307,6 +308,7 @@ def populate_repo2(df, dest, start, overwrite=False, ignore_existing=None): def populate(dest, all_agencies=None, varlist=None, partial_update=False): logger.info(f"dest: {dest} agencies: {all_agencies}") doneagency = [] + station_failures = [] purge = False ignore_existing = None @@ -338,7 +340,7 @@ def populate(dest, all_agencies=None, varlist=None, partial_update=False): for var in varlist: logger.info(f"Calling populate_repo with agency {agency} variable: {var}") if not partial_update: - populate_repo( + station_failures += populate_repo( agency, var, dest, @@ -346,7 +348,7 @@ def populate(dest, all_agencies=None, varlist=None, partial_update=False): pd.Timestamp(1999, 12, 31, 23, 59), ignore_existing=ignore_existing, ) - populate_repo( + station_failures += populate_repo( agency, var, dest, @@ -354,7 +356,7 @@ def populate(dest, all_agencies=None, varlist=None, partial_update=False): pd.Timestamp(2019, 12, 31, 23, 59), ignore_existing=ignore_existing, ) - populate_repo( + station_failures += populate_repo( agency, var, dest, pd.Timestamp(2020, 1, 1), None, overwrite=True ) ext = "rdb" if agency == "usgs" else ".csv" @@ -367,7 +369,7 @@ def populate(dest, all_agencies=None, varlist=None, partial_update=False): logger.info( f"Calling populate_repo (1) with agency {agency} variable: {var} start: 1980-01-01" ) - populate_repo( + station_failures += populate_repo( agency, var, dest, @@ -378,7 +380,7 @@ def populate(dest, all_agencies=None, varlist=None, partial_update=False): logger.info( f"Calling populate_repo (2) with agency {agency} variable: {var} start: 2000-01-01" ) - populate_repo( + station_failures += populate_repo( agency, var, dest, @@ -395,7 +397,7 @@ def populate(dest, all_agencies=None, varlist=None, partial_update=False): else None ) - populate_repo( + station_failures += populate_repo( agency, var, dest, @@ -411,6 +413,7 @@ def populate(dest, all_agencies=None, varlist=None, partial_update=False): logger.info("Completed population for these agencies: ") for agent in doneagency: logger.info(agent) + return station_failures def purge(dest): @@ -439,7 +442,7 @@ def ncro_only(dest): revise_filename_syear_eyear(os.path.join(dest, f"cdec_*.csv")) -def populate_main(dest, agencies=None, varlist=None, partial_update=False): +def populate_main(dest, agencies=None, varlist=None, partial_update=False, failures_file=None): do_purge = False if not os.path.exists(dest): raise ValueError(f"Destination directory {os.path.abspath(dest)} does not exist. Please create it before running populate.") @@ -447,7 +450,8 @@ def populate_main(dest, agencies=None, varlist=None, partial_update=False): if do_purge: purge(dest) - failures = [] + agency_failures = [] + station_failures = [] if agencies is None or len(agencies) == 0: all_agencies = ["usgs", "dwr_des", "usbr", "noaa", "dwr_ncro", "dwr"] else: @@ -464,11 +468,22 @@ def populate_main(dest, agencies=None, varlist=None, partial_update=False): for future in concurrent.futures.as_completed(future_to_agency): agency = future_to_agency[future] try: - future.result() + result = future.result() + if result: + station_failures.extend(result) except Exception as exc: - failures.append(agency) + agency_failures.append(agency) trace = traceback.format_exc() logger.info(f"{agency} generated an exception: {exc} with trace:\n{trace}") + station_failures.append({ + "agency": agency, + "station_id": None, + "agency_id": None, + "param": None, + "subloc": None, + "exc_type": type(exc).__name__, + "message": str(exc), + }) if "ncro" in agency: populate_ncro_realtime(dest) @@ -486,6 +501,19 @@ def populate_main(dest, agencies=None, varlist=None, partial_update=False): revise_filename_syear_eyear(os.path.join(dest, f"cdec_*.csv")) logger.info("These agency queries failed") + # Write failures CSV + if failures_file is None: + logdir = Path("logs") + logdir.mkdir(exist_ok=True) + failures_file = logdir / "populate_repo_failures.csv" + failures_file = Path(failures_file) + failures_file.parent.mkdir(parents=True, exist_ok=True) + pd.DataFrame( + station_failures, + columns=["agency", "station_id", "agency_id", "param", "subloc", "exc_type", "message"], + ).to_csv(failures_file, index=False) + logger.info(f"Failures written to {failures_file} ({len(station_failures)} entries)") + def populate_debug_ncro_rename(dest, agencies=None, varlist=None): do_purge = False @@ -533,8 +561,14 @@ def populate_debug_ncro_rename(dest, agencies=None, varlist=None): @click.option("--logdir", type=click.Path(path_type=Path), default="logs") @click.option("--debug", is_flag=True) @click.option("--quiet", is_flag=True) +@click.option( + "--failures-file", + type=click.Path(path_type=Path), + default=None, + help="Path for the failures CSV. Defaults to {logdir}/populate_repo_failures.csv.", +) @click.help_option("-h", "--help") -def populate_main_cli(dest, agencies, variables, partial, logdir="logs", debug=False, quiet=False): +def populate_main_cli(dest, agencies, variables, partial, logdir="logs", debug=False, quiet=False, failures_file=None): """Populate repository with data from various agencies.""" level, console = resolve_loglevel( @@ -551,7 +585,8 @@ def populate_main_cli(dest, agencies, variables, partial, logdir="logs", debug=F varlist = list(variables) if variables else None agencies_list = list(agencies) if agencies else None logger.info(f"dest: {dest}, agencies: {agencies_list}, varlist:{varlist}") - populate_main(dest, agencies_list, varlist=varlist, partial_update=partial) + effective_failures_file = failures_file if failures_file is not None else Path(logdir) / "populate_repo_failures.csv" + populate_main(dest, agencies_list, varlist=varlist, partial_update=partial, failures_file=effective_failures_file) if __name__ == "__main__": diff --git a/dms_datastore/reformat.py b/dms_datastore/reformat.py index 8a93ae8..b1e1b16 100644 --- a/dms_datastore/reformat.py +++ b/dms_datastore/reformat.py @@ -525,10 +525,12 @@ def reformat(inpath, outpath, pattern): print(f"Reformatting complete for {label}. Reformatting failed on these files:") for srcfail in failures: print(srcfail) + return failures def reformat_main( - inpath="raw", outpath="formatted", agencies=["usgs", "des", "cdec", "noaa", "ncro"] + inpath="raw", outpath="formatted", agencies=["usgs", "des", "cdec", "noaa", "ncro"], + failures_file=None, ): if not os.path.exists(outpath): raise ValueError(f"Destination directory {os.path.abspath(outpath)} does not exist. Please create it before running reformat.") @@ -541,6 +543,7 @@ def reformat_main( exts = known_ext[agency] if agency in known_ext else [".csv"] pattern[agency] = [f"{agency}*{ext}" for ext in exts] + all_failed_files = [] with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor: future_to_agency = { executor.submit(reformat, inpath, outpath, pattern[agency]): agency @@ -552,6 +555,8 @@ def reformat_main( try: data = future.result() print("Data", data) + if data: + all_failed_files.extend(data) except Exception as exc: trace = traceback.format_exc() print( @@ -560,6 +565,16 @@ def reformat_main( sys.stdout.flush() print("Exiting reformat_main") + # Write failures CSV + if failures_file is None: + logdir = Path("logs") + logdir.mkdir(exist_ok=True) + failures_file = logdir / "reformat_failures.csv" + failures_file = Path(failures_file) + failures_file.parent.mkdir(parents=True, exist_ok=True) + pd.DataFrame({"filepath": all_failed_files}).to_csv(failures_file, index=False) + print(f"Failures written to {failures_file} ({len(all_failed_files)} entries)") + @click.command() @click.option( @@ -587,8 +602,14 @@ def reformat_main( @click.option("--logdir", type=click.Path(path_type=Path), default="logs") @click.option("--debug", is_flag=True) @click.option("--quiet", is_flag=True) +@click.option( + "--failures-file", + type=click.Path(path_type=Path), + default=None, + help="Path for the failures CSV. Defaults to {logdir}/reformat_failures.csv.", +) @click.help_option("-h", "--help") -def reformat_cli(inpath, outpath, pattern, agencies, logdir=None, debug=False, quiet=False): +def reformat_cli(inpath, outpath, pattern, agencies, logdir=None, debug=False, quiet=False, failures_file=None): """Reformat files from raw to standard format and add metadata.""" in_dir = inpath out_dir = outpath @@ -619,10 +640,16 @@ def reformat_cli(inpath, outpath, pattern, agencies, logdir=None, debug=False, q if pattern_list is None: # Send to multithreaded driver - reformat_main(inpath=in_dir, outpath=out_dir, agencies=agencies_list) + effective_failures_file = failures_file if failures_file is not None else Path(logdir or "logs") / "reformat_failures.csv" + reformat_main(inpath=in_dir, outpath=out_dir, agencies=agencies_list, failures_file=effective_failures_file) else: # Send to simple python with pattern - reformat(inpath=in_dir, outpath=out_dir, pattern=pattern_list) + failed = reformat(inpath=in_dir, outpath=out_dir, pattern=pattern_list) + effective_failures_file = failures_file if failures_file is not None else Path(logdir or "logs") / "reformat_failures.csv" + effective_failures_file = Path(effective_failures_file) + effective_failures_file.parent.mkdir(parents=True, exist_ok=True) + pd.DataFrame({"filepath": failed}).to_csv(effective_failures_file, index=False) + print(f"Failures written to {effective_failures_file} ({len(failed)} entries)") if __name__ == "__main__": diff --git a/tests/test_auto_screen_continue_on_error.py b/tests/test_auto_screen_continue_on_error.py new file mode 100644 index 0000000..6dd5c9a --- /dev/null +++ b/tests/test_auto_screen_continue_on_error.py @@ -0,0 +1,198 @@ +"""Tests that auto_screen() catches per-station failures and writes a CSV.""" + +import os +import yaml +import pandas as pd +import pytest +from pathlib import Path + +import dms_datastore.auto_screen as auto_screen_mod + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_FAILURE_KEYS = {"station_id", "subloc", "param", "step", "exc_type", "message"} + +# Minimal protocol dict that screener/context_config would return. +_MINIMAL_PROTO = {"inherits_global": False, "steps": []} + +# Minimal YAML config written to a file; load_config() parses it from disk. +_MINIMAL_SCREEN_YAML = """ +global: + inherits_global: false + steps: [] +""" + + +def _write_config(path: Path) -> str: + """Write a minimal screen config YAML and return the path as string.""" + path.write_text(_MINIMAL_SCREEN_YAML) + return str(path) + + +def _fake_inventory(): + return pd.DataFrame([ + { + "station_id": "sta1", + "subloc": "default", + "param": "flow", + "agency": "usgs", + "agency_id": "11455420", + } + ]) + + +def _fake_station_db(): + return pd.DataFrame( + {"name": ["Station One"]}, + index=pd.Index(["sta1"], name="station_id"), + ) + + +def _minimal_ts(): + idx = pd.date_range("2020-01-01", periods=100, freq="15min") + return pd.DataFrame({"value": range(100), "user_flag": 0}, index=idx) + + +def _setup_common_patches(monkeypatch, *, fetcher_fn=None): + """Apply monkeypatches that are common to all auto_screen tests.""" + monkeypatch.setattr(auto_screen_mod, "repo_data_inventory", + lambda repo=None, in_path=None: _fake_inventory()) + monkeypatch.setattr(auto_screen_mod, "station_dbase", _fake_station_db) + # Bypass the complex context_config logic (requires region files etc.). + monkeypatch.setattr(auto_screen_mod, "context_config", + lambda cfg, station_id, subloc, param: _MINIMAL_PROTO) + # meta_to_filename needs the screened repo config file on disk; stub it out. + monkeypatch.setattr(auto_screen_mod, "meta_to_filename", + lambda meta, **kw: "usgs_sta1_11455420_flow.csv") + + if fetcher_fn is None: + # Default fetcher: returns a valid (metas, ts) tuple + def _default_fetcher(source_repo, station_id, param, subloc=None, data_path=None): + meta = { + "agency": "usgs", + "station_id": station_id, + "subloc": subloc or "default", + "sublocation": subloc or "default", + "param": param, + } + return ([meta], _minimal_ts()) + + fetcher_fn = _default_fetcher + + monkeypatch.setattr( + auto_screen_mod, "custom_fetcher", lambda agency: fetcher_fn + ) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +def test_auto_screen_catches_screener_failure(tmp_path, monkeypatch): + """When screener() raises, the failure is recorded and processing does not abort.""" + failures_file = tmp_path / "screen_failures.csv" + config_file = _write_config(tmp_path / "screen.yaml") + + _setup_common_patches(monkeypatch) + + def _bad_screener(ts, station_id, subloc, param, protocol, *args, **kwargs): + raise RuntimeError("screener exploded") + + monkeypatch.setattr(auto_screen_mod, "screener", _bad_screener) + monkeypatch.setattr(auto_screen_mod, "write_ts_csv", lambda *a, **kw: None) + + auto_screen_mod.auto_screen( + fpath=str(tmp_path), + config=config_file, + dest=str(tmp_path / "screened"), + failures_file=str(failures_file), + ) + + assert failures_file.exists() + df = pd.read_csv(failures_file) + assert len(df) == 1 + row = df.iloc[0] + assert row["station_id"] == "sta1" + assert row["step"] == "screen" + assert row["exc_type"] == "RuntimeError" + + +def test_auto_screen_catches_write_failure(tmp_path, monkeypatch): + """When write_ts_csv() raises, the failure is recorded (step='write').""" + failures_file = tmp_path / "write_failures.csv" + config_file = _write_config(tmp_path / "screen.yaml") + + _setup_common_patches(monkeypatch) + + # screener returns the input ts unchanged + monkeypatch.setattr( + auto_screen_mod, "screener", + lambda ts, *a, **kw: ts[["value", "user_flag"]] + ) + + def _bad_write(*args, **kwargs): + raise IOError("disk full") + + monkeypatch.setattr(auto_screen_mod, "write_ts_csv", _bad_write) + + auto_screen_mod.auto_screen( + fpath=str(tmp_path), + config=config_file, + dest=str(tmp_path / "screened"), + failures_file=str(failures_file), + ) + + assert failures_file.exists() + df = pd.read_csv(failures_file) + assert len(df) == 1 + assert df.iloc[0]["step"] == "write" + + +def test_auto_screen_writes_failures_csv(tmp_path, monkeypatch): + """Regardless of failure source, auto_screen always writes a CSV.""" + failures_file = tmp_path / "any_failures.csv" + config_file = _write_config(tmp_path / "screen.yaml") + + _setup_common_patches(monkeypatch) + + monkeypatch.setattr(auto_screen_mod, "screener", + lambda *a, **kw: (_ for _ in ()).throw(ValueError("bad proto"))) + monkeypatch.setattr(auto_screen_mod, "write_ts_csv", lambda *a, **kw: None) + + auto_screen_mod.auto_screen( + fpath=str(tmp_path), + config=config_file, + dest=str(tmp_path / "screened"), + failures_file=str(failures_file), + ) + + assert failures_file.exists() + + +def test_auto_screen_writes_empty_csv_on_clean_run(tmp_path, monkeypatch): + """When every station processes cleanly the CSV still exists (header-only).""" + failures_file = tmp_path / "empty_failures.csv" + config_file = _write_config(tmp_path / "screen.yaml") + + _setup_common_patches(monkeypatch) + + monkeypatch.setattr( + auto_screen_mod, "screener", + lambda ts, *a, **kw: ts[["value", "user_flag"]] + ) + monkeypatch.setattr(auto_screen_mod, "write_ts_csv", lambda *a, **kw: None) + + auto_screen_mod.auto_screen( + fpath=str(tmp_path), + config=config_file, + dest=str(tmp_path / "screened"), + failures_file=str(failures_file), + ) + + assert failures_file.exists() + df = pd.read_csv(failures_file) + assert len(df) == 0 + assert list(df.columns) == ["station_id", "subloc", "param", "step", "exc_type", "message"] diff --git a/tests/test_downloader_continue_on_error.py b/tests/test_downloader_continue_on_error.py new file mode 100644 index 0000000..18128ef --- /dev/null +++ b/tests/test_downloader_continue_on_error.py @@ -0,0 +1,226 @@ +"""Tests that each active downloader continues past per-station failures and +returns a properly-formatted failures list instead of raising.""" + +import asyncio +import pandas as pd +import pytest +from pathlib import Path + +import dms_datastore.download_nwis as download_nwis +import dms_datastore.download_cdec as download_cdec +import dms_datastore.download_des as download_des +import dms_datastore.download_noaa as download_noaa +import dms_datastore.download_ncro as download_ncro + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +_FAILURE_KEYS = {"agency", "station_id", "agency_id", "param", "subloc", "exc_type", "message"} + + +def _two_row_stationlist(**extra): + """Two-row stationlist: row 0 = 'bad', row 1 = 'good'.""" + bad = { + "station_id": "bad", + "agency_id": "00000", + "src_var_id": "00060", + "param": "flow", + "subloc": "default", + } + good = { + "station_id": "good", + "agency_id": "99999", + "src_var_id": "00060", + "param": "flow", + "subloc": "default", + } + bad.update(extra) + good.update(extra) + return pd.DataFrame([bad, good]) + + +# --------------------------------------------------------------------------- +# NWIS +# --------------------------------------------------------------------------- + +def test_nwis_continues_past_station_failure(tmp_path, monkeypatch): + """nwis_download returns failures list; does not raise on per-station error. + + download_station() is called in a ThreadPoolExecutor. When it raises, + future.result() re-raises and the outer except block converts the exception + into a failure dict rather than propagating it. + """ + calls = [] + + def _fake_download_station(row, dest_dir, start, end, param, overwrite, endfile, + successes, failures, skips): + calls.append(row.station_id) + if row.station_id == "bad": + raise RuntimeError("simulated network failure") + # good station: do nothing (no files written) + + monkeypatch.setattr(download_nwis, "download_station", _fake_download_station) + + stations = _two_row_stationlist() + result = download_nwis.nwis_download( + stations, str(tmp_path), pd.Timestamp(2020, 1, 1) + ) + + assert isinstance(result, list) + assert len(result) == 1 + f = result[0] + assert _FAILURE_KEYS.issubset(f.keys()), f"Missing keys in failure dict: {f}" + assert "bad" in calls and "good" in calls # both stations were attempted + + +# --------------------------------------------------------------------------- +# CDEC +# --------------------------------------------------------------------------- + +def test_cdec_continues_past_station_failure(tmp_path, monkeypatch): + """cdec_download wraps each download_station_data call; a raised exception + is caught, the station recorded as a failure, and processing continues.""" + calls = [] + + def _fake_download_station_data(row, dest_dir, start, end, endfile, param, + overwrite, freq, failures, skips): + calls.append(row.station_id) + if row.station_id == "bad": + raise RuntimeError("simulated CDEC error") + + monkeypatch.setattr(download_cdec, "download_station_data", _fake_download_station_data) + + stations = _two_row_stationlist() # src_var_id="00060", subloc="default" + result = download_cdec.cdec_download( + stations, str(tmp_path), pd.Timestamp(2020, 1, 1) + ) + + assert isinstance(result, list) + # The outer try/except appends (row.station_id, row.param) as a tuple that + # gets normalised to a dict before being returned. + assert len(result) == 1 + assert _FAILURE_KEYS.issubset(result[0].keys()) + # Both rows must have been attempted + assert "good" in calls + + +# --------------------------------------------------------------------------- +# DES — non-integer agency_id triggers a graceful per-station failure +# --------------------------------------------------------------------------- + +def test_des_invalid_agency_id_does_not_raise(tmp_path, monkeypatch): + """A station whose agency_id cannot be converted to an integer is recorded + as a failure immediately without raising or making network calls.""" + + # Build a minimal inventory DataFrame so that des_download never calls the + # real inventory() network endpoint. + fake_inventory = pd.DataFrame( + columns=[ + "result_id", "station_id", "station_name", "station_active", + "analyte_name", "unit_name", "equipment_name", "aggregate_name", + "interval_name", "cdec_code", "probe_depth", "start_date", + "end_date", "program_id", "rank_name", + ] + ) + + monkeypatch.setattr(download_des, "inventory", lambda *a, **kw: fake_inventory) + + stations = pd.DataFrame([{ + "station_id": "tst", + "agency_id": "NOT-AN-INT", # triggers the int() failure path + "src_var_id": "flow", + "param": "flow", + "subloc": "default", + }]) + + result = download_des.des_download( + stations, str(tmp_path), pd.Timestamp(2020, 1, 1) + ) + + assert isinstance(result, list) + assert len(result) == 1 + assert _FAILURE_KEYS.issubset(result[0].keys()) + + +# --------------------------------------------------------------------------- +# NOAA +# --------------------------------------------------------------------------- + +def test_noaa_continues_past_station_failure(tmp_path, monkeypatch): + """noaa_download collects failures from future.result() exceptions + and returns them instead of raising.""" + calls = [] + + def _fake_download_station_data(row, dest_dir, start, end, param, overwrite, + endfile, skips, verbose): + calls.append(row.station_id) + if row.station_id == "bad": + raise RuntimeError("simulated NOAA error") + + monkeypatch.setattr(download_noaa, "download_station_data", _fake_download_station_data) + # Patch subprogram so the station-type filter passes for both rows. + monkeypatch.setattr( + download_noaa, "subprogram", + lambda df: pd.Series(["tidecurrent"] * len(df), index=df.index) + ) + + stations = _two_row_stationlist(name="test station") + + result = download_noaa.noaa_download( + stations, str(tmp_path), pd.Timestamp(2020, 1, 1), param="elev" + ) + + assert isinstance(result, list) + assert len(result) == 1 + assert _FAILURE_KEYS.issubset(result[0].keys()) + + +# --------------------------------------------------------------------------- +# NCRO +# --------------------------------------------------------------------------- + +def test_ncro_continues_past_trace_failure(tmp_path, monkeypatch): + """ncro_download records exceptions returned by asyncio.gather + (return_exceptions=True) as failure dicts rather than raising.""" + + async def _fake_one_trace_to_csv( + client, semaphore, station_id, agency_id, paramname, + site, trace, dest_dir, stime, etime, overwrite, + ): + raise RuntimeError("simulated NCRO trace failure") + + monkeypatch.setattr( + download_ncro, "_async_download_one_trace_to_csv", _fake_one_trace_to_csv + ) + + fake_inventory = pd.DataFrame({ + "site": ["BADSIT"], + "trace": ["T1"], + "param": ["flow"], + "start_time": [pd.Timestamp(2019, 1, 1)], + "end_time": [pd.Timestamp(2025, 1, 1)], + }) + monkeypatch.setattr(download_ncro, "load_inventory", lambda **kw: fake_inventory) + monkeypatch.setattr( + download_ncro, "similar_ncro_station_names", lambda x: ["BADSIT"] + ) + monkeypatch.setattr( + download_ncro.dstore_config, "station_dbase", lambda: pd.DataFrame() + ) + + stations = pd.DataFrame([{ + "station_id": "tst", + "agency_id": "BADSIT", + "src_var_id": "flow", + "param": "flow", + }]) + + result = download_ncro.ncro_download( + stations, str(tmp_path), pd.Timestamp(2020, 1, 1) + ) + + assert isinstance(result, list) + assert len(result) == 1 + assert _FAILURE_KEYS.issubset(result[0].keys()) diff --git a/tests/test_format_usgs.py b/tests/test_format_usgs.py index e17a24a..e8ca229 100644 --- a/tests/test_format_usgs.py +++ b/tests/test_format_usgs.py @@ -9,8 +9,7 @@ normalize_station_request, stationfile_or_stations, ) -pytestmark = pytest.mark.skip(reason="Temporarily disabled while fixing test data packaging in CI") - +@pytest.mark.skip(reason="Requires live NWIS network access") def test_nwis_download(): stations = ["mok"] dest_dir = "data" diff --git a/tests/test_populate_repo_failures.py b/tests/test_populate_repo_failures.py new file mode 100644 index 0000000..55d2792 --- /dev/null +++ b/tests/test_populate_repo_failures.py @@ -0,0 +1,104 @@ +"""Tests that populate_main collects per-agency and per-station failures +and writes them to a CSV file without propagating exceptions.""" + +import pandas as pd +import pytest +from pathlib import Path + +import dms_datastore.populate_repo as populate_repo_mod + + +_FAILURE_KEYS = {"agency", "station_id", "agency_id", "param", "subloc", "exc_type", "message"} + +_SAMPLE_FAILURE = { + "agency": "usgs", + "station_id": "bad_sta", + "agency_id": "11111111", + "param": "flow", + "subloc": None, + "exc_type": "RuntimeError", + "message": "Simulated download failure", +} + + +def test_populate_main_collects_station_failures(tmp_path, monkeypatch): + """Station-level failures returned by populate() must appear in the + failures CSV written by populate_main().""" + + dest = tmp_path / "raw" + dest.mkdir() + failures_file = tmp_path / "failures.csv" + + def _fake_populate(dest_arg, agency=None, varlist=None, partial_update=False): + return [_SAMPLE_FAILURE] + + # Patch all the post-processing calls inside populate_main that would + # fail with no real repository. + monkeypatch.setattr(populate_repo_mod, "populate", _fake_populate) + monkeypatch.setattr(populate_repo_mod, "rationalize_time_partitions", lambda *a, **kw: None) + monkeypatch.setattr(populate_repo_mod, "revise_filename_syear_eyear", lambda *a, **kw: None) + + populate_repo_mod.populate_main( + str(dest), + agencies=["usgs"], + failures_file=str(failures_file), + ) + + assert failures_file.exists() + df = pd.read_csv(failures_file) + assert len(df) == 1 + row = df.iloc[0] + assert row["station_id"] == "bad_sta" + assert row["agency"] == "usgs" + + +def test_populate_main_writes_empty_csv_on_clean_run(tmp_path, monkeypatch): + """When no failures occur, populate_main must still write a valid (header- + only) CSV at the designated path.""" + + dest = tmp_path / "raw" + dest.mkdir() + failures_file = tmp_path / "clean_failures.csv" + + monkeypatch.setattr(populate_repo_mod, "populate", lambda *a, **kw: []) + monkeypatch.setattr(populate_repo_mod, "rationalize_time_partitions", lambda *a, **kw: None) + monkeypatch.setattr(populate_repo_mod, "revise_filename_syear_eyear", lambda *a, **kw: None) + + populate_repo_mod.populate_main( + str(dest), + agencies=["usgs"], + failures_file=str(failures_file), + ) + + assert failures_file.exists() + df = pd.read_csv(failures_file) + assert len(df) == 0 + assert list(df.columns) == ["agency", "station_id", "agency_id", "param", "subloc", "exc_type", "message"] + + +def test_populate_main_captures_agency_level_exception(tmp_path, monkeypatch): + """If populate() raises an exception rather than returning a list, the + exception must be caught and recorded as a failure row in the CSV.""" + + dest = tmp_path / "raw" + dest.mkdir() + failures_file = tmp_path / "agency_fail.csv" + + def _exploding_populate(dest_arg, agency=None, varlist=None, partial_update=False): + raise RuntimeError("whole agency exploded") + + monkeypatch.setattr(populate_repo_mod, "populate", _exploding_populate) + monkeypatch.setattr(populate_repo_mod, "rationalize_time_partitions", lambda *a, **kw: None) + monkeypatch.setattr(populate_repo_mod, "revise_filename_syear_eyear", lambda *a, **kw: None) + + # Should NOT raise + populate_repo_mod.populate_main( + str(dest), + agencies=["usgs"], + failures_file=str(failures_file), + ) + + assert failures_file.exists() + df = pd.read_csv(failures_file) + assert len(df) == 1 + assert df.iloc[0]["exc_type"] == "RuntimeError" diff --git a/tests/test_reformat_continue_on_error.py b/tests/test_reformat_continue_on_error.py new file mode 100644 index 0000000..40d3838 --- /dev/null +++ b/tests/test_reformat_continue_on_error.py @@ -0,0 +1,146 @@ +"""Tests that reformat() continues past bad files and reformat_main() writes a CSV.""" + +import os +import pandas as pd +import pytest +from pathlib import Path + +import dms_datastore.reformat as reformat_mod + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_VALID_CSV_CONTENT = """\ +# format: dwr-dms-1.0 +# agency: usgs +# station_id: anh +# param: flow +# subloc: default +# agency_id: 11455420 +datetime,value,user_flag +2020-01-01 00:00,1.0,0 +2020-01-02 00:00,2.0,0 +2020-01-03 00:00,3.0,0 +""" + + +def _write_valid_file(path: Path): + path.write_text(_VALID_CSV_CONTENT) + + +def _write_bad_file(path: Path): + path.write_text("this is not a parseable CSV file\n!!garbage!!\n") + + +# --------------------------------------------------------------------------- +# reformat() unit tests (no ProcessPoolExecutor involved) +# --------------------------------------------------------------------------- + +def test_reformat_returns_failure_for_bad_file(tmp_path, monkeypatch): + """reformat() should return a list containing the path of any file it + cannot parse rather than raising an exception.""" + indir = tmp_path / "raw" + indir.mkdir() + outdir = tmp_path / "formatted" + outdir.mkdir() + + bad = indir / "usgs_bad_99999_flow_2020.csv" + _write_bad_file(bad) + + # Patch infer_internal_meta_for_file so that bad file makes it past meta + # inference and fails at the read_ts stage (which exercises the except branch). + def _fake_meta(fpath): + raise ValueError("cannot infer meta from garbage file") + + monkeypatch.setattr(reformat_mod, "infer_internal_meta_for_file", _fake_meta) + + failures = reformat_mod.reformat(str(indir), str(outdir), ["usgs*.csv"]) + + assert isinstance(failures, list) + assert len(failures) == 1 + assert str(bad) in failures[0] + + +def test_reformat_continues_past_bad_file(tmp_path, monkeypatch): + """After a bad file fails, reformat() continues to process subsequent files.""" + indir = tmp_path / "raw" + indir.mkdir() + outdir = tmp_path / "formatted" + outdir.mkdir() + + bad = indir / "usgs_bad_99998_flow_2020.csv" + _write_bad_file(bad) + + call_count = {"count": 0} + original_infer = reformat_mod.infer_internal_meta_for_file + + def _selective_meta(fpath): + call_count["count"] += 1 + if "bad" in fpath: + raise ValueError(f"Simulated failure for {fpath}") + return original_infer(fpath) + + monkeypatch.setattr(reformat_mod, "infer_internal_meta_for_file", _selective_meta) + + failures = reformat_mod.reformat(str(indir), str(outdir), ["usgs*.csv"]) + + # Only the bad file should be in failures + assert len(failures) == 1 + # infer was called (proves we entered the loop) + assert call_count["count"] >= 1 + + +# --------------------------------------------------------------------------- +# reformat_main() tests — ProcessPoolExecutor calls the REAL reformat(); +# we use empty or broken indir to control what it does without lambdas. +# --------------------------------------------------------------------------- + +def test_reformat_main_writes_csv_on_empty_dir(tmp_path): + """With no input files, reformat_main() still writes a valid (header-only) + failures CSV at the supplied path.""" + indir = tmp_path / "raw" + indir.mkdir() + outdir = tmp_path / "formatted" + outdir.mkdir() + failures_file = tmp_path / "reformat_failures.csv" + + reformat_mod.reformat_main( + inpath=str(indir), + outpath=str(outdir), + agencies=["usgs"], + failures_file=str(failures_file), + ) + + assert failures_file.exists() + df = pd.read_csv(failures_file) + assert "filepath" in df.columns + assert len(df) == 0 + + +def test_reformat_main_records_bad_files(tmp_path): + """Files whose names don't match the expected naming convention are + recorded in the failures CSV (no monkeypatching / pickling needed).""" + indir = tmp_path / "raw" + indir.mkdir() + outdir = tmp_path / "formatted" + outdir.mkdir() + failures_file = tmp_path / "bad_files.csv" + + # A file whose name cannot be parsed by interpret_fname will raise in + # infer_internal_meta_for_file and be caught by reformat()'s except clause. + bad = indir / "usgs_garbage.csv" + bad.write_text("invalid content\n") + + reformat_mod.reformat_main( + inpath=str(indir), + outpath=str(outdir), + agencies=["usgs"], + failures_file=str(failures_file), + ) + + assert failures_file.exists() + df = pd.read_csv(failures_file) + assert len(df) == 1 + assert "usgs_garbage" in df.iloc[0]["filepath"] From 8f535872d68fab435f5b8b7eb0f356aca4199a7c Mon Sep 17 00:00:00 2001 From: dwr-psandhu Date: Thu, 2 Apr 2026 14:44:22 -0700 Subject: [PATCH 2/3] Remove unused import from auto_screen.py --- dms_datastore/auto_screen.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dms_datastore/auto_screen.py b/dms_datastore/auto_screen.py index 4ea3af0..4d9d769 100644 --- a/dms_datastore/auto_screen.py +++ b/dms_datastore/auto_screen.py @@ -19,7 +19,6 @@ from dms_datastore.inventory import * from dms_datastore.write_ts import * from dms_datastore.filename import meta_to_filename -from schimpy.station import * import geopandas as gpd import numpy as np import seaborn as sns From a08380ac6843bbb2640cc3c880205b57031d1623 Mon Sep 17 00:00:00 2001 From: dwr-psandhu Date: Thu, 2 Apr 2026 15:06:55 -0700 Subject: [PATCH 3/3] Add additional dependencies for enhanced functionality and documentation --- pyproject.toml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7b1deb8..c41a83d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,9 @@ classifiers = [ ] dependencies = [ + # cadwr-dms conda channel only (not on PyPI) — install from GitHub for pip usage: + # vtools3: https://github.com/CADWRDeltaModeling/vtools3 + # schimpy: https://github.com/CADWRDeltaModeling/schimpy "vtools3", "pyyaml", "beautifulsoup4", @@ -45,10 +48,14 @@ dependencies = [ "dask", "scikit-learn", "matplotlib", + "geopandas", + "scipy", + "seaborn", + "click", + "tabula-py", + "schimpy", "cfgrib", "diskcache", - "pytest", - "pytest-runner", ] [project.optional-dependencies]