Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 119 additions & 32 deletions EPCN/process_bom_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import pandas as pd
import xarray as xr
import sys
import gc

#------------------------------------------------------------------------------
### MODULES (CUSTOM) ###
Expand Down Expand Up @@ -74,13 +75,16 @@ def _get_dataframe(self, station_id):
df.columns = new_cols
df.index = pd.to_datetime(df[['year', 'month', 'day', 'hour', 'minute']])
df.index.name = 'time'
# PRI 23/01/2026 remove duplicate time steps by taking the first one
df = df[~df.index.duplicated(keep='first')]
keep_cols = [12, 14, 16, 18, 20, 22, 24, 26]
parse_cols = ['hour_local', 'minute_local'] + df.columns[keep_cols].tolist()
for var in parse_cols: df[var] = pd.to_numeric(df[var], errors='coerce')
local_time = df['hour_local'] + df['minute_local'] / 60
#local_time = df['hour_local'] + df['minute_local'] / 60
df = df.iloc[:, keep_cols]
df.columns = ['Precip_accum', 'Ta', 'Td', 'RH', 'Ws', 'Wd', 'Wg', 'ps']
df['Precip'] = get_instantaneous_precip(df.Precip_accum, local_time)
#df['Precip'] = get_instantaneous_precip(df.Precip_accum, local_time)
df = get_instantaneous_precip(df)
df.drop('Precip_accum', axis=1, inplace=True)
df.ps = df.ps / 10 # Convert hPa to kPa
T_K = met_funcs.convert_celsius_to_Kelvin(df.Ta) # Get Ta in K
Expand All @@ -89,6 +93,7 @@ def _get_dataframe(self, station_id):
_interpolate_missing(df)
if self.site_details['Time step'] == 60: df = _resample_dataframe(df)
_apply_range_limits(df)
df = df[~df.index.duplicated(keep='first')]
return df
#--------------------------------------------------------------------------

Expand Down Expand Up @@ -121,9 +126,6 @@ def get_dataset(self):
# Generate variable and global attribute dictionaries and write to xarray dataset
_set_var_attrs(ds, nearest_stations)
ds = xr.merge([ds, make_qc_flags(ds)])

# Insert old variable names temporarily
#ds = xr.merge([ds, _make_temp_vars(ds)])
_set_global_attrs(ds, self.site_details)

return ds
Expand All @@ -142,6 +144,7 @@ def write_to_netcdf(self, write_path):
site_name = self.site_details.name
print ('Writing netCDF file for site {}'.format(site_name))
dataset = self.get_dataset()
# PRI 23/01/2026 append '_new' to file name to test new precipitation code
fname = '{}_AWS.nc'.format(''.join(site_name.split(' ')))
target = os.path.join(write_path, fname)
dataset.to_netcdf(target, format='NETCDF4')
Expand Down Expand Up @@ -196,15 +199,108 @@ def _resample_dataframe(df):
#------------------------------------------------------------------------------

#------------------------------------------------------------------------------
def get_instantaneous_precip(precip_accum, local_time):

precip_accum_interp = precip_accum.interpolate(limit=4)
precip_inst = precip_accum_interp - precip_accum_interp.shift()
reset_bool = local_time / 9.5 == 1
precip_inst.where(~reset_bool, precip_accum_interp, inplace=True)
neg_bool = (local_time / 9 == 1) & (precip_inst < 0)
precip_inst.where(~neg_bool, 0, inplace=True)
return precip_inst
#def get_instantaneous_precip(precip_accum, local_time):

#precip_accum_interp = precip_accum.interpolate(limit=4)
#precip_inst = precip_accum_interp - precip_accum_interp.shift()
#reset_bool = local_time / 9.5 == 1
#precip_inst.where(~reset_bool, precip_accum_interp, inplace=True)
#neg_bool = (local_time / 9 == 1) & (precip_inst < 0)
#precip_inst.where(~neg_bool, 0, inplace=True)
#return precip_inst
#------------------------------------------------------------------------------

#------------------------------------------------------------------------------
def get_instantaneous_precip(df):
"""
Purpose:
Convert precipitation from a cumulative total over a day to a total
over 1 timestep (30 minutes).
Subtleties:
The BoM reports precipitation at the AWS sites as a cumulative total
over 24 hours from 09:00 to 09:00 local time. The nominal reset time is
09:01. For both 30 and 60 minute data, the precipitation at 09:00 is the
total over the last 24 hours. For 30 minute data, the value at 09:30 is
the precipitation from 09:01 to 09:30 and is the required total over that
30 minute timestep. For 60 minute data, the value at 09:30 will be missing
and the value at 10:00 is the required total over the 60 minute timestep
from 09:01 to 10:00.
Some sites start with a 60 minute timestep and change to a 30 minute
timestep part way through the data record.
The situation is further complicated by missing data occurring around the
09:01 reset time. In this case, the first valid precipitation value after
the reset time represents the cumulative total from the reset time to
the time of the first valid value.
Usage:
Side effects:
Author: PRI
Date: January 2026
"""
df = df.copy()
df["Precip"] = np.nan
df["P_diff"] = np.nan
# get a subset of the required data
df_precip = df[["Precip_accum"]].copy()
df_precip["hour"] = df_precip.index.hour
df_precip["minute"] = df_precip.index.minute
df_precip["hm"] = df_precip["hour"] + df_precip["minute"] / 60
# drop rows with NaNs
df_precip = df_precip.dropna()
# get the instananeous precipitation from the cumulative total
# this is the easy part, dealing with the reset time is complicated
df_precip["P_diff"] = df_precip["Precip_accum"].diff()
# get the decimal hour and minute
df_precip["hm"] = df_precip["hour"] + df_precip["minute"]/60
# reset occurs at 09:01, 09:30 precip is present (30 minute data)
h9p5_notna_mask = (df_precip["hm"]==9.5) & (df_precip["Precip_accum"].notna())
# use the cumulative total at 09:30 as the instananeous value for 09:01 to 09:30
df_precip.loc[h9p5_notna_mask==True, "P_diff"] = df_precip["Precip_accum"][h9p5_notna_mask==True]
# reset occurs at 09:01, 09:30 precip is not present, 10:00 precip is present (60 minute data)
# shift the h9p5_notna_mask 1 period (30 minutes) to align the mask value at 09:30 to 10:00
h9p5_notna_mask_shifted = h9p5_notna_mask.shift(periods=1)
# do not copy the 10:00 cumulative value to the instantaneous variable if this was done at 09:30
# we detect this by shifting the h9p5_notna mask 1 timestep forward so that the mask values
# for 09:30 are aligned with the precipitation values for 10:00
h9p5_notna_mask_shifted_inverted = np.logical_not(h9p5_notna_mask_shifted)
h10p0_notna_mask = (df_precip["hm"]==10.0) & (df_precip["Precip_accum"].notna()) & (h9p5_notna_mask_shifted_inverted)
df_precip.loc[h10p0_notna_mask==True, "P_diff"] = df_precip["Precip_accum"][h10p0_notna_mask==True]
# save the masks in the dataframe
#df_precip["h9p5_notna_mask"] = h9p5_notna_mask
#df_precip["h9p5_notna_mask_shifted"] = h9p5_notna_mask_shifted
#df_precip["h10p0_notna_mask"] = h10p0_notna_mask
# there will still be a small number of negative P_inst values left after dealing with the
# nominal reset times of 09:30 (30 minute data) and 10:00 (60 minute data) due to mistakes in timing
# (daylight saving?) and data gaps around the nominal reset times
# trap these cases by filling any remaining P_inst<0 values with P_cum, this assumes
# all remaining P_inst<0 values are caused by data gaps spanning the nominal reset
# time of 09:01
neg_mask = (df_precip["P_diff"]<0)
#df_neg1 = df_precip[neg_mask==True]
#print(df_neg1["hm"].value_counts())
df_precip.loc[neg_mask==True, "P_diff"] = df_precip["Precip_accum"][neg_mask==True]
#df_precip["neg_mask"] = neg_mask
#neg_mask2 = (df_precip["P_diff"]<0)
#df_neg2 = df_precip[neg_mask2==True]
#print(df_neg2["hm"].value_counts())
# drop columns in df_precip that are no longer needed
df_precip = df_precip.drop(columns=["hour", "minute", "Precip_accum", "hm"])
# update df with the instaneous precipitations
df.update(df_precip)
# spread hourly precipitation across the halfhour and hour
# mask of NaNs
is_nan = df["P_diff"].isna()
# mask of source values to be spread over NaNs
# NOTE: this assumes a timestep of 30 minutes with NaN on the halfhour and valid
# values on the hour
is_source = is_nan.shift(periods=1, fill_value=False)
# spread the valid values on the hour to NaNs on the halfhour
df["Precip"] = df["P_diff"].fillna(df["P_diff"].shift(-1))
df = df.drop(columns=["P_diff"])
# get a mask of the values spread over an hour ...
mask = is_nan | is_source
# ... and divide those values by 2
df.loc[mask, "Precip"] = df.loc[mask, "Precip"] / 2
return df
#------------------------------------------------------------------------------

#------------------------------------------------------------------------------
Expand All @@ -220,22 +316,6 @@ def make_qc_flags(ds):
da_list.append(da)
return xr.merge(da_list)
#------------------------------------------------------------------------------

#------------------------------------------------------------------------------
def _make_temp_vars(ds):

"""Temporary addition of old variable names"""

ds_list = []
rename_dict = {'AH': 'Ah', 'SH': 'q'}
for this_str in rename_dict.keys():
vars_list = [x for x in ds.variables if this_str in x]
vars_mapper = {var: var.replace(this_str, rename_dict[this_str])
for var in vars_list}
sub_ds = ds[vars_list].rename(vars_mapper)
ds_list.append(sub_ds)
return xr.merge(ds_list)
#------------------------------------------------------------------------------

#------------------------------------------------------------------------------
def _set_global_attrs(ds, site_details):
Expand All @@ -259,8 +339,9 @@ def _set_global_attrs(ds, site_details):
#'nc_rundatetime': dt.datetime.strftime(dt.datetime.now(),
# '%Y-%m-%d %H:%M:%S'),
#'xl_datemode': '0'}

# PRI 23/01/2026 set dtype for time to float64 to avoid xarray warning
ds.time.encoding = {'units': 'days since 1800-01-01',
'dtype': 'float64',
'_FillValue': None}
#------------------------------------------------------------------------------

Expand Down Expand Up @@ -355,9 +436,15 @@ def get_bom_dict(idx):
if __name__ == "__main__":

# Get conversion class and write text data to nc file
sites = utils.get_ozflux_site_list()#(active_sites_only=True)
sites = utils.get_ozflux_site_list()
for site in sites.index:
specific_file_path = nc_file_path.format(site.replace(' ', ''))
# Does specific_file_path exist?
os.makedirs(specific_file_path, exist_ok=True)
conv_class = bom_data_converter(sites.loc[site])
conv_class.write_to_netcdf(specific_file_path)
del conv_class
gc.collect()

#------------------------------------------------------------------------------
print("AWS data processing finished")