Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 93 additions & 62 deletions dms_datastore/auto_screen.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from dms_datastore.inventory import *
from dms_datastore.write_ts import *
from dms_datastore.filename import meta_to_filename
from schimpy.station import *
import geopandas as gpd
import numpy as np
import seaborn as sns
Expand Down Expand Up @@ -172,6 +171,8 @@ def auto_screen(
params=None,
plot_dest=None,
start_station=None,
failures_file=None,
logdir="logs",
):
"""Auto screen all data in directory
Parameters
Expand Down Expand Up @@ -212,7 +213,7 @@ def auto_screen(
actual_fpath = fpath if fpath is not None else repo_root(source_repo)
inventory = repo_data_inventory(repo="formatted",in_path=actual_fpath) # repo is the config repo, in_path is the data storage location
inventory = filter_inventory_(inventory, stations, params)
failed_read = []
failures = []

for index, row in inventory.iterrows():
station_id = row["station_id"]
Expand All @@ -236,70 +237,92 @@ def auto_screen(
# Now we have most information, but the time series may be split between sources
# with low and high priority
fetcher = custom_fetcher(agency)
# these may be lists
step = "read"
try:
# logger.debug(f"fetching {fpath},{station_id},{param}")
meta_ts = fetcher(source_repo, station_id, param, subloc=subloc, data_path=actual_fpath)
if meta_ts is None:
logger.debug(f"No data found for {station_id} {subloc} {param}")
failures.append({
"station_id": station_id, "subloc": subloc, "param": param,
"step": step, "exc_type": "NoData", "message": "fetcher returned None",
})
continue
metas, ts = meta_ts
meta = metas[0]
subloc_actual = (
meta["sublocation"]
if "sublocation" in meta
else meta["subloc"] if "subloc" in meta else "default"
)
step = "screen"
proto = context_config(screen_config, station_id, subloc, param)
do_plot = plot_dest is not None
subloc_label = "" if subloc == "default" else subloc
plot_label = f"{station_info['name']}_{station_id}@{subloc_label}_{param}"
screened = screener(
ts,
station_id,
subloc_actual,
param,
proto,
do_plot,
plot_label,
plot_dest=plot_dest,
)
logger.debug(f"screening complete for {station_id} {subloc} {param}")
if "value" in screened.columns:
screened = screened[["value", "user_flag"]]
meta["screen"] = proto

# Build output filename using configured naming spec for screened repo
output_meta = {
"agency": agency,
"station_id": station_id,
"subloc": subloc_actual if subloc_actual != "default" else None,
"param": param,
"agency_id": row.agency_id,
}
# Add year info if available from metadata
if "year" in meta:
output_meta["year"] = meta["year"]
elif "syear" in meta and "eyear" in meta:
output_meta["syear"] = meta["syear"]
output_meta["eyear"] = meta["eyear"]

# Get output without shard so that chunk_years will not append one and have it be redundant
output_fname = meta_to_filename(output_meta, repo="screened",include_shard=False)
output_fpath = os.path.join(dest, output_fname)
step = "write"
logger.debug(f"start write for {output_fpath} with meta {meta}")
write_ts_csv(screened, output_fpath, meta, chunk_years=True)
logger.debug("end write")
except Exception as e:
logger.warning(f"Read failed for {actual_fpath}, {station_id}, {param}, {subloc}, storage loc = {actual_fpath}")
logger.warning(
f"Failed at step={step} for {station_id}, {subloc}, {param}: {e}"
)
logger.exception(e)
print(e)
meta_ts = None

if meta_ts is None:
logger.debug(f"No data found for {station_id} {subloc} {param}")
failed_read.append((station_id, subloc, param))
logger.debug("Cumulative fails:")
for fr in failed_read:
logger.debug(fr)
failures.append({
"station_id": station_id,
"subloc": subloc,
"param": param,
"step": step,
"exc_type": type(e).__name__,
"message": str(e),
})
continue
metas, ts = meta_ts
meta = metas[0]
subloc_actual = (
meta["sublocation"]
if "sublocation" in meta
else meta["subloc"] if "subloc" in meta else "default"
)
proto = context_config(screen_config, station_id, subloc, param)
do_plot = plot_dest is not None
subloc_label = "" if subloc == "default" else subloc
plot_label = f"{station_info['name']}_{station_id}@{subloc_label}_{param}"
screened = screener(
ts,
station_id,
subloc_actual,
param,
proto,
do_plot,
plot_label,
plot_dest=plot_dest,
)
logger.debug(f"screening complete for {station_id} {subloc} {param}")
if "value" in screened.columns:
screened = screened[["value", "user_flag"]]
meta["screen"] = proto

# Build output filename using configured naming spec for screened repo
output_meta = {
"agency": agency,
"station_id": station_id,
"subloc": subloc_actual if subloc_actual != "default" else None,
"param": param,
"agency_id": row.agency_id,
}
# Add year info if available from metadata
if "year" in meta:
output_meta["year"] = meta["year"]
elif "syear" in meta and "eyear" in meta:
output_meta["syear"] = meta["syear"]
output_meta["eyear"] = meta["eyear"]

# Get output without shard so that chunk_years will not append one and have it be redundant
output_fname = meta_to_filename(output_meta, repo="screened",include_shard=False)
output_fpath = os.path.join(dest, output_fname)
logger.debug(f"start write for {output_fpath} with meta {meta}")
write_ts_csv(screened, output_fpath, meta, chunk_years=True)
logger.debug("end write")

# Write failures CSV
if failures_file is None:
logdir_path = Path(logdir)
logdir_path.mkdir(exist_ok=True)
failures_file = logdir_path / "auto_screen_failures.csv"
failures_file = Path(failures_file)
failures_file.parent.mkdir(parents=True, exist_ok=True)
pd.DataFrame(
failures,
columns=["station_id", "subloc", "param", "step", "exc_type", "message"],
).to_csv(failures_file, index=False)
logger.info(f"Failures written to {failures_file} ({len(failures)} entries)")


def update_steps(proto, x):
Expand Down Expand Up @@ -582,9 +605,15 @@ def test_single(fname): # not maintained
@click.option("--logdir", type=click.Path(path_type=Path), default="logs")
@click.option("--debug", is_flag=True)
@click.option("--quiet", is_flag=True)
@click.option(
"--failures-file",
type=click.Path(path_type=Path),
default=None,
help="Path for the failures CSV. Defaults to {logdir}/auto_screen_failures.csv.",
)
@click.help_option("-h", "--help")
def auto_screen_cli(config, fpath, dest, stations, params, plot_dest, start_station,
logdir=None, debug=False, quiet=False):
logdir=None, debug=False, quiet=False, failures_file=None):
"""Auto-screen individual files or whole repos."""
level, console = resolve_loglevel(
debug=debug,
Expand Down Expand Up @@ -615,6 +644,8 @@ def auto_screen_cli(config, fpath, dest, stations, params, plot_dest, start_stat
params=params_list,
plot_dest=plot_dest,
start_station=start_station,
failures_file=failures_file if failures_file is not None else Path(logdir) / "auto_screen_failures.csv",
logdir=logdir,
)


Expand Down
24 changes: 21 additions & 3 deletions dms_datastore/download_cdec.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,13 @@ def cdec_download(
)
stations = stations.loc[~subloc_inconsist, :]
for index, row in stations.iterrows():
download_station_data(
row, dest_dir, start, end, endfile, param, overwrite, freq, failures, skips
)
try:
download_station_data(
row, dest_dir, start, end, endfile, param, overwrite, freq, failures, skips
)
except Exception as e:
logger.error(f"Unhandled exception for station {row.station_id} param {row.param}: {e}")
failures.append((row.station_id, row.param))
# # Use ThreadPoolExecutor
# with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
# # Schedule the download tasks and handle them asynchronously
Expand Down Expand Up @@ -178,6 +182,20 @@ def cdec_download(
for failure in failures:
logger.info(failure)

failures_dicts = []
for f in failures:
station_id, param_name = (f[0], f[1]) if len(f) >= 2 else (f[0], None)
failures_dicts.append({
"agency": "cdec",
"station_id": station_id,
"agency_id": None,
"param": param_name,
"subloc": None,
"exc_type": "DownloadError",
"message": f"Download failed for station {station_id} param {param_name}",
})
return failures_dicts




Expand Down
19 changes: 18 additions & 1 deletion dms_datastore/download_des.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,10 @@ def des_download(stations, dest_dir, start, end=None, param=None, overwrite=Fals
itry = itry + 1
sleeptime = 4.0 if itry > 5 else 2.0
if itry >= max_retry:
raise
fmessage = f"ReadingDates failed for station {station}, subloc {subloc}, param {paramname} after {max_retry} retries"
logger.info(fmessage)
failures.append((station, paramname))
break
time.sleep(sleeptime)

fstart = rid.start_date
Expand Down Expand Up @@ -422,6 +425,20 @@ def des_download(stations, dest_dir, start, end=None, param=None, overwrite=Fals
for failure in failures:
logger.info(failure)

failures_dicts = []
for f in failures:
station_id, param_name = (f[0], f[1]) if len(f) >= 2 else (f[0], None)
failures_dicts.append({
"agency": "dwr_des",
"station_id": station_id,
"agency_id": None,
"param": param_name,
"subloc": None,
"exc_type": "DownloadError",
"message": f"Download failed for station {station_id} param {param_name}",
})
return failures_dicts




Expand Down
10 changes: 9 additions & 1 deletion dms_datastore/download_ncro.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,15 @@ async def _ncro_download_async(stations, dest_dir, stime, etime, overwrite, upda
logger.info(
f"Exception occurred during download: station={station_id} site={site} trace={trace} err={result}"
)
failures.append((station_id, site, trace, str(result)))
failures.append({
"agency": "ncro",
"station_id": station_id,
"agency_id": site,
"param": trace,
"subloc": None,
"exc_type": type(result).__name__,
"message": str(result),
})

return failures

Expand Down
13 changes: 13 additions & 0 deletions dms_datastore/download_noaa.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ def noaa_download(stations, dest_dir, start, end=None, param=None, overwrite=Fal
if not os.path.exists(dest_dir):
os.mkdir(dest_dir)
skips = []
failures = []

# This is an attempt to short-circuit the download of water levels for non-tidal stations
# The correctness of this remains to be checked.
Expand Down Expand Up @@ -316,6 +317,18 @@ def noaa_download(stations, dest_dir, start, end=None, param=None, overwrite=Fal
future.result() # This line can be used to handle results or exceptions from the tasks
except Exception as e:
logger.error(f"Exception occurred during download: {e}")
# Identify station if possible from future metadata
failures.append({
"agency": "noaa",
"station_id": None,
"agency_id": None,
"param": None,
"subloc": None,
"exc_type": type(e).__name__,
"message": str(e),
})

return failures


def list_stations():
Expand Down
27 changes: 27 additions & 0 deletions dms_datastore/download_nwis.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,15 @@ def nwis_download(stations, dest_dir, start, end=None, param=None, overwrite=Fal
except Exception as e:
logger.debug(traceback.print_tb(e.__traceback__))
logger.error(f"Exception occurred during download: {e}")
failures.append({
"agency": "usgs",
"station_id": None,
"agency_id": None,
"param": None,
"subloc": None,
"exc_type": type(e).__name__,
"message": str(e),
})

if len(failures) == 0:
logger.info("No failed stations")
Expand All @@ -370,6 +379,24 @@ def nwis_download(stations, dest_dir, start, end=None, param=None, overwrite=Fal
for failure in failures:
logger.info(failure)

failures_dicts = []
for f in failures:
if isinstance(f, dict):
failures_dicts.append(f)
else:
# Legacy tuple format: (station, paramname)
station_id, param_name = (f[0], f[1]) if len(f) >= 2 else (f[0], None)
failures_dicts.append({
"agency": "usgs",
"station_id": station_id,
"agency_id": None,
"param": param_name,
"subloc": None,
"exc_type": "DownloadError",
"message": f"Download failed for station {station_id} param {param_name}",
})
return failures_dicts


def parse_start_year(txt):
date_re = re.compile(
Expand Down
Loading
Loading