From 28b6715316acc4057a04533a2e6ac419e2c292c3 Mon Sep 17 00:00:00 2001 From: yanggu Date: Thu, 16 Apr 2026 22:27:34 -0400 Subject: [PATCH] data prep for bias_correction --- .../assim.sequential/inst/bias_correction.R | 747 ++++++++++++++++++ modules/assim.sequential/inst/data_prep.R | 633 +++++++++++++++ 2 files changed, 1380 insertions(+) create mode 100644 modules/assim.sequential/inst/bias_correction.R create mode 100644 modules/assim.sequential/inst/data_prep.R diff --git a/modules/assim.sequential/inst/bias_correction.R b/modules/assim.sequential/inst/bias_correction.R new file mode 100644 index 00000000000..f714ba87098 --- /dev/null +++ b/modules/assim.sequential/inst/bias_correction.R @@ -0,0 +1,747 @@ +## Load packages +library(data.table) +library(dplyr) +library(xgboost) +library(parallel) +library(doParallel) +library(caret) + +##' @title parse_bcm_args +##' @name parse_bcm_args +##' @author Yang Gu +##' +##' @param args Character vector from commandArgs(trailingOnly = TRUE). +##' +##' @description +##' Parse command-line arguments for flux type and response variable. +##' The first argument is the flux type ("LE" or "NEE"), and the second +##' argument is the raw ensemble variable name (ens01). +##' +##' @return +##' A list containing `flux_type`, `raw_response_var`, and `response_var`. +parse_bcm_args <- function(args) { + flux_type <- if (length(args) >= 1) toupper(args[1]) else "LE" + raw_response_var <- if (length(args) >= 2) args[2] else "ens01" + + if (!flux_type %in% c("LE", "NEE")) { + stop("flux_type must be either 'LE' or 'NEE'.") + } + + response_var <- paste0(raw_response_var, "_residual") + + return(list( + flux_type = flux_type, + raw_response_var = raw_response_var, + response_var = response_var + )) +} + +##' @title get_flux_config +##' @name get_flux_config +##' @author Yang Gu +##' +##' @param flux_type Character string, either "LE" or "NEE". +##' +##' @description +##' Return flux-specific configuration, including observation column names, +##' model mean column names, columns to remove, and output suffixes. +##' +##' @return +##' A list of flux-specific settings. +get_flux_config <- function(flux_type) { + if (flux_type == "LE") { + return(list( + flux_type = "LE", + mean_col = "Qle_mean", + mean_std_col = "flux_mean", + obs_col = "LE_F_MDS", + obs_qc_col = "LE_F_MDS_QC", + residual_col = "LE_residual", + corrected_col = "LE_corrected", + pred_residual_col = "LE_resi_pred", + gfed_file = "/projectnb/dietzelab/guYANG/Gap_fill/results/le_ens_ec_3h.csv", + output_tag = "le" + )) + } + + if (flux_type == "NEE") { + return(list( + flux_type = "NEE", + mean_col = "NEE_mean", + mean_std_col = "flux_mean", + obs_col = "NEE_CUT_USTAR50", + obs_qc_col = "NEE_CUT_USTAR50_QC", + residual_col = "NEE_residual", + corrected_col = "NEE_corrected", + pred_residual_col = "NEE_resi_pred", + gfed_file = "/projectnb/dietzelab/guYANG/Gap_fill/results/ens_ec_3h.csv", + output_tag = "nee" + )) + } + + stop("Unsupported flux_type.") +} + +##' @title read_close_points_and_envres +##' @name read_close_points_and_envres +##' @author Yang Gu +##' +##' @param matched_file Path to matched point-site file. +##' @param envres_file Path to .RData file containing `envres`. +##' +##' @description +##' Read the nearest matched site-index table and load the integrated envres +##' dataset from .RData. +##' +##' @return +##' A list containing `close_points_df` and `envres`. +read_close_points_and_envres <- function(matched_file, envres_file) { + close_points_df <- fread(matched_file) + setDT(close_points_df) + close_points_df <- close_points_df[order(min_dist_m), .SD[1], by = index] + + load(envres_file) + + return(list( + close_points_df = close_points_df, + envres = envres + )) +} + +##' @title filter_envres_for_flux +##' @name filter_envres_for_flux +##' @author Yang Gu +##' +##' @param envres Environmental result table. +##' @param flux_cfg Flux-specific configuration list. +##' +##' @description +##' Remove flux-specific observed columns, filter land-cover classes, +##' and restrict the date range used in BCM. +##' +##' @return +##' A filtered data.table. +filter_envres_for_flux <- function(envres, flux_cfg) { + cols_to_drop <- intersect( + c(flux_cfg$obs_col, flux_cfg$obs_qc_col, flux_cfg$residual_col), + names(envres) + ) + if (length(cols_to_drop) > 0) { + envres[, (cols_to_drop) := NULL] + } + + envres <- envres[as.numeric(as.character(LC)) <= 4] + + cutoff_date <- as.POSIXct("2012-07-15 23:59:59", tz = "UTC") + envres <- envres[utc > cutoff_date] + + return(envres) +} + +##' @title add_gapfill_and_predictors +##' @name add_gapfill_and_predictors +##' @author Yang Gu +##' +##' @param envres Environmental result table. +##' @param gfed Gap-fill ensemble result table. +##' @param close_points_df Nearest site-index table. +##' @param raw_response_var Raw ensemble variable name, e.g. "ens01". +##' @param flux_cfg Flux-specific configuration list. +##' +##' @description +##' Merge a selected ensemble gap-fill column into `envres`, add predictor +##' transformations, standardize the model mean column to `flux_mean`, +##' and construct the response residual. +##' +##' @return +##' A processed data.table ready for BCM modeling. +add_gapfill_and_predictors <- function(envres, + gfed, + close_points_df, + raw_response_var, + flux_cfg) { + cols_to_merge <- c(raw_response_var) + gfed_sub <- gfed[, c("Site_ID", "utc", cols_to_merge), with = FALSE] + + setkey(envres, Site_ID, utc) + setkey(gfed_sub, Site_ID, utc) + envres <- gfed_sub[envres] + + envres[, c("LC", "is_day", "KGC") := lapply(.SD, as.factor), + .SDcols = c("LC", "is_day", "KGC")] + + envres[, t2m2 := t2m^2] + envres[, PPFD2 := PPFD^2] + envres[, EVI2 := EVI^2] + envres[, VPD_t2m := VPD * t2m] + envres[, sincos := sin_doy * cos_doy] + + envres[, Site_ID := NULL] + + setkey(envres, index) + setkey(close_points_df, index) + envres[close_points_df, Site_ID := i.Site_ID] + + envres[EVI == -3000, EVI := NA_real_] + envres[NDVI == -3000, NDVI := NA_real_] + + if (flux_cfg$mean_col %in% names(envres) && + flux_cfg$mean_col != flux_cfg$mean_std_col) { + setnames(envres, old = flux_cfg$mean_col, new = flux_cfg$mean_std_col) + } + + cols_ens <- grep("ens", names(envres), value = TRUE) + cols_to_keep <- raw_response_var + cols_to_remove <- setdiff(cols_ens, cols_to_keep) + if (length(cols_to_remove) > 0) { + envres[, (cols_to_remove) := NULL] + } + + response_residual_col <- paste0(raw_response_var, "_residual") + envres[, (response_residual_col) := get(flux_cfg$mean_std_col) - get(raw_response_var)] + + return(envres) +} + +##' @title split_bcm_data +##' @name split_bcm_data +##' @author Yang Gu +##' +##' @param envres Environmental result table. +##' @param response_var Response column name. +##' +##' @description +##' Split the data into observed-site training data and unobserved-site +##' prediction data. +##' +##' @return +##' A list containing `tenvres` and `pdat`. +split_bcm_data <- function(envres, response_var) { + indices_to_keep <- envres[!is.na(Site_ID), unique(index)] + + tenvres <- envres[index %in% indices_to_keep] + setDT(tenvres) + tenvres[, LC := factor(as.character(LC))] + tenvres <- tenvres[!is.na(get(response_var))] + tenvres[, `:=`( + LC = as.factor(LC), + is_day = as.factor(is_day), + KGC = as.factor(KGC), + index = as.character(index) + )] + + envres <- envres[!(index %in% indices_to_keep)] + pdat <- envres[is.na(get(response_var))] + + return(list( + tenvres = tenvres, + pdat = pdat + )) +} + +##' @title get_bcm_model_components +##' @name get_bcm_model_components +##' @author Yang Gu +##' +##' @description +##' Return the predictor list, formula, and XGBoost hyperparameter settings +##' used in the BCM workflow. +##' +##' @return +##' A list of modeling components. +get_bcm_model_components <- function() { + predictors_all <- c( + "t2m", "sp", "d2m", "tp", "strd", "KGC", "lon", + "sin_doy", "cos_doy", "NDVI", "EVI", "twi", "PH", "Sand", "lat", + "agb", "SOC", "N", "year_since_disturb", "PPFD", "WindSpeed", + "t2m2", "PPFD2", "EVI2", "VPD_t2m", "sincos", + "sm_0_10", "sm_10_40", "sm_40_100", "sm_100_200", + "VPD", "is_day" + ) + + pred_formula <- as.formula( + paste0("~ ", paste(predictors_all, collapse = " + "), " - 1") + ) + + ### TBD: set the specific settings for different LC type + lc_param_list <- list( + `10` = list(objective = "reg:squarederror", eta = 0.02, max_depth = 4, + subsample = 0.5, colsample_bytree = 0.5, + min_child_weight = 20, gamma = 2, lambda = 4, alpha = 3), + `40` = list(objective = "reg:squarederror", eta = 0.025, max_depth = 5, + subsample = 0.55, colsample_bytree = 0.6, + min_child_weight = 15, gamma = 1, lambda = 4, alpha = 2), + `50` = list(objective = "reg:squarederror", eta = 0.015, max_depth = 3, + subsample = 0.35, colsample_bytree = 0.4, + min_child_weight = 40, gamma = 4, lambda = 6, alpha = 5), + `60` = list(objective = "reg:squarederror", eta = 0.02, max_depth = 3, + subsample = 0.4, colsample_bytree = 0.5, + min_child_weight = 25, gamma = 3, lambda = 4, alpha = 3), + `70` = list(objective = "reg:squarederror", eta = 0.02, max_depth = 3, + subsample = 0.4, colsample_bytree = 0.4, + min_child_weight = 30, gamma = 4, lambda = 5, alpha = 4) + ) + + default_params <- list( + objective = "reg:squarederror", eta = 0.1, max_depth = 6, + subsample = 0.8, colsample_bytree = 0.8, + min_child_weight = 10, gamma = 0.5, lambda = 3, alpha = 2 + ) + + return(list( + predictors_all = predictors_all, + pred_formula = pred_formula, + lc_param_list = lc_param_list, + default_params = default_params, + nrounds_fixed = 200 + )) +} + +##' @title get_valid_lc_tasks +##' @name get_valid_lc_tasks +##' @author Yang Gu +##' +##' @param tenvres Training data.table. +##' +##' @description +##' Construct leave-one-site-out CV tasks for LC classes with more than one site. +##' +##' @return +##' A data.frame of CV tasks. +get_valid_lc_tasks <- function(tenvres) { + lc_counts <- tenvres %>% + distinct(LC, index) %>% + group_by(LC) %>% + summarise(n_sites = n(), .groups = "drop") + + valid_lcs <- lc_counts %>% + filter(n_sites > 1) %>% + pull(LC) + + tasks <- tenvres %>% + filter(LC %in% valid_lcs) %>% + distinct(LC, index) %>% + arrange(LC, index) + + return(tasks) +} + +##' @title run_bcm_cv +##' @name run_bcm_cv +##' @author Yang Gu +##' +##' @param tenvres Training data.table. +##' @param tasks CV task table. +##' @param predictors_all Predictor column names. +##' @param response_var Response column name. +##' @param nrounds_fixed Fixed number of boosting rounds. +##' @param lc_param_list LC-specific XGBoost parameter list. +##' @param default_params Default XGBoost parameter list. +##' @param flux_cfg Flux-specific configuration list. +##' +##' @description +##' Run leave-one-site-out cross validation within land-cover classes. +##' +##' @return +##' A data.frame of CV predictions and diagnostics. +run_bcm_cv <- function(tenvres, + tasks, + predictors_all, + response_var, + nrounds_fixed, + lc_param_list, + default_params, + flux_cfg) { + ncores <- max(detectCores() - 1, 1) + cl <- makeCluster(ncores) + registerDoParallel(cl) + + cv_results <- foreach( + i = seq_len(nrow(tasks)), + .packages = c("dplyr", "xgboost", "data.table"), + .combine = rbind, + .export = c("tenvres", "predictors_all", "response_var", + "nrounds_fixed", "tasks", "lc_param_list", + "default_params", "flux_cfg") + ) %dopar% { + lc <- tasks$LC[i] + site <- tasks$index[i] + + df_lc <- tenvres %>% filter(LC == lc) + train_df <- df_lc %>% filter(index != site) + test_df <- df_lc %>% filter(index == site) + + if (nrow(train_df) == 0 || nrow(test_df) == 0) { + return(NULL) + } + + train_x <- data.matrix(train_df[, predictors_all, with = FALSE]) + test_x <- data.matrix(test_df[, predictors_all, with = FALSE]) + + train_y <- train_df[[response_var]] + test_y <- test_df[[response_var]] + + dtrain <- xgb.DMatrix(data = train_x, label = train_y, missing = NA) + dtest <- xgb.DMatrix(data = test_x, label = test_y, missing = NA) + watchlist <- list(train = dtrain, eval = dtest) + + params <- if (as.character(lc) %in% names(lc_param_list)) { + lc_param_list[[as.character(lc)]] + } else { + default_params + } + + model_i <- xgb.train( + params = params, + data = dtrain, + nrounds = nrounds_fixed, + watchlist = watchlist, + eval_metric = "rmse", + early_stopping_rounds = 20, + nthread = 1, + verbose = 0 + ) + + preds_i <- predict(model_i, test_x) + ev <- as.data.frame(model_i$evaluation_log) + + final_train_rmse <- ev$train_rmse[nrow(ev)] + final_eval_rmse <- ev$eval_rmse[nrow(ev)] + best_iter <- which.min(ev$eval_rmse) + + n_check <- 5 + if (nrow(ev) >= n_check + 1) { + train_tail <- tail(ev$train_rmse, n_check + 1) + eval_tail <- tail(ev$eval_rmse, n_check + 1) + train_trend <- diff(train_tail) + eval_trend <- diff(eval_tail) + has_overfit <- all(train_trend < 0) && all(eval_trend > 0) + } else { + has_overfit <- FALSE + } + + rmse_gap <- final_eval_rmse - final_train_rmse + rmse_ratio <- final_eval_rmse / final_train_rmse + + n <- nrow(test_df) + + data.frame( + LC = rep(lc, n), + index = rep(site, n), + utc = test_df$utc, + flux_mean = test_df[[flux_cfg$mean_std_col]], + observed = test_df[[response_var]], + predicted = preds_i, + train_rmse = rep(final_train_rmse, n), + eval_rmse = rep(final_eval_rmse, n), + rmse_gap = rep(rmse_gap, n), + rmse_ratio = rep(rmse_ratio, n), + best_iter = rep(best_iter, n), + has_overfit = rep(has_overfit, n), + stringsAsFactors = FALSE + ) + } + + stopCluster(cl) + registerDoSEQ() + + cv_results <- cv_results %>% + mutate( + flux_pred = flux_mean - predicted, + flux_obs = flux_mean - observed + ) + + return(cv_results) +} + +##' @title predict_bcm +##' @name predict_bcm +##' @author Yang Gu +##' +##' @param tenvres Training data.table. +##' @param pdat Prediction data.table. +##' @param response_var Response column name. +##' @param pred_formula Formula used for model.matrix construction. +##' @param predictors_all Predictor column names. +##' @param nrounds_fixed Fixed number of boosting rounds. +##' @param lc_param_list LC-specific XGBoost parameter list. +##' @param default_params Default XGBoost parameter list. +##' @param flux_cfg Flux-specific configuration list. +##' +##' @description +##' Fit one model per LC using all observed sites and predict residual +##' corrections for unobserved points. +##' +##' @return +##' A prediction data.table with flux-corrected outputs. +predict_bcm <- function(tenvres, + pdat, + response_var, + pred_formula, + predictors_all, + nrounds_fixed, + lc_param_list, + default_params, + flux_cfg) { + pdat[, flux_resi_pred := NA_real_] + + for (lc in levels(tenvres$LC)) { + df_lc <- tenvres[LC == lc] + if (nrow(df_lc) < 10) next + + mf_train_lc <- model.frame(pred_formula, data = df_lc, na.action = na.pass) + mat_train_lc <- model.matrix(pred_formula, data = mf_train_lc) + label_train_lc <- df_lc[[response_var]] + dtrain_lc <- xgb.DMatrix(data = mat_train_lc, label = label_train_lc, missing = NA) + + params_lc <- if (lc %in% names(lc_param_list)) lc_param_list[[lc]] else default_params + + bst_lc <- xgb.train( + params = params_lc, + data = dtrain_lc, + nrounds = nrounds_fixed, + verbose = 0 + ) + + pdat_lc <- pdat[LC == lc] + if (nrow(pdat_lc) == 0) next + + mf_p_lc <- model.frame(pred_formula, data = pdat_lc, na.action = na.pass) + mat_p_lc <- model.matrix(pred_formula, data = mf_p_lc) + dtest_lc <- xgb.DMatrix(data = mat_p_lc, missing = NA) + preds_lc <- predict(bst_lc, dtest_lc) + + pdat[LC == lc, flux_resi_pred := preds_lc] + } + + pdat[, flux_corrected := get(flux_cfg$mean_std_col) - flux_resi_pred] + + if (flux_cfg$pred_residual_col != "flux_resi_pred") { + pdat[, (flux_cfg$pred_residual_col) := flux_resi_pred] + } + if (flux_cfg$corrected_col != "flux_corrected") { + pdat[, (flux_cfg$corrected_col) := flux_corrected] + } + + return(pdat) +} + +##' @title compute_bcm_metrics +##' @name compute_bcm_metrics +##' @author Yang Gu +##' +##' @param cv_results Cross-validation result table. +##' +##' @description +##' Compute site-level, LC-level, and corrected-vs-baseline R2 summaries, +##' as well as overfitting diagnostics. +##' +##' @return +##' A list of summary tables. +compute_bcm_metrics <- function(cv_results) { + r2_by_site <- cv_results %>% + group_by(LC, index) %>% + summarise( + R2 = 1 - sum((observed - predicted)^2) / + sum((observed - mean(observed))^2), + .groups = "drop" + ) + + r2_by_lc <- r2_by_site %>% + group_by(LC) %>% + summarise( + R2 = mean(R2), + .groups = "drop" + ) + + overfit_table <- cv_results %>% + group_by(LC, index) %>% + summarise( + train_rmse = first(train_rmse), + eval_rmse = first(eval_rmse), + rmse_gap = first(rmse_gap), + rmse_ratio = first(rmse_ratio), + has_overfit = first(has_overfit), + .groups = "drop" + ) + + r2_by_index <- cv_results %>% + group_by(LC, index) %>% + summarise( + R2_fused = cor(flux_pred, flux_obs, use = "complete.obs")^2, + R2_baseline = cor(flux_mean, flux_obs, use = "complete.obs")^2, + .groups = "drop" + ) + + return(list( + r2_by_site = r2_by_site, + r2_by_lc = r2_by_lc, + overfit_table = overfit_table, + r2_by_index = r2_by_index + )) +} + +##' @title write_bcm_outputs +##' @name write_bcm_outputs +##' @author Yang Gu +##' +##' @param file_prefix Output file prefix. +##' @param flux_cfg Flux-specific configuration list. +##' @param overfit_table Overfitting summary table. +##' @param pdat Prediction output table. +##' @param r2_by_site Site-level R2 table. +##' @param r2_by_lc LC-level R2 table. +##' @param cv_results Cross-validation output table. +##' @param r2_by_index Index-level corrected-vs-baseline R2 table. +##' +##' @description +##' Write BCM outputs to disk, using flux-specific file suffixes. +##' +##' @return +##' No return value. +write_bcm_outputs <- function(file_prefix, + flux_cfg, + overfit_table, + pdat, + r2_by_site, + r2_by_lc, + cv_results, + r2_by_index) { + fwrite(overfit_table, paste0(file_prefix, "_lc1_", flux_cfg$output_tag, "_overfit.csv")) + fwrite(pdat, paste0(file_prefix, "_pred_lc1_", flux_cfg$output_tag, ".csv")) + + write.csv( + r2_by_site, + paste0(file_prefix, "_lc1_", flux_cfg$output_tag, "_site.csv"), + row.names = FALSE, quote = TRUE + ) + write.csv( + r2_by_lc, + paste0(file_prefix, "_lc1_", flux_cfg$output_tag, "_r2.csv"), + row.names = FALSE, quote = TRUE + ) + write.csv( + cv_results, + paste0(file_prefix, "_lc1_", flux_cfg$output_tag, "_cv.csv"), + row.names = FALSE, quote = TRUE + ) + write.csv( + r2_by_index, + paste0(file_prefix, "_lc1_", flux_cfg$output_tag, "_index_improve_r2.csv"), + row.names = FALSE + ) +} + +##' @title run_bcm_pipeline +##' @name run_bcm_pipeline +##' @author Yang Gu +##' +##' @param flux_type Flux type, either "LE" or "NEE". +##' @param raw_response_var Raw ensemble response variable, e.g. "ens01". +##' @param envres_file Path to envres .RData file. +##' @param matched_file Path to matched point-site CSV file. +##' @param output_dir Directory for writing outputs. +##' +##' @description +##' Run the full BCM workflow for a selected flux type using the integrated +##' `envres` dataset and ensemble gap-fill predictions. +##' +##' @return +##' A list containing prediction outputs and evaluation summaries. +run_bcm_pipeline <- function(flux_type, + raw_response_var, + envres_file, + matched_file, + output_dir) { + flux_cfg <- get_flux_config(flux_type) + response_var <- paste0(raw_response_var, "_residual") + + data_obj <- read_close_points_and_envres( + matched_file = matched_file, + envres_file = envres_file + ) + close_points_df <- data_obj$close_points_df + envres <- data_obj$envres + + gfed <- fread(flux_cfg$gfed_file) + + envres <- filter_envres_for_flux(envres, flux_cfg) + envres <- add_gapfill_and_predictors( + envres = envres, + gfed = gfed, + close_points_df = close_points_df, + raw_response_var = raw_response_var, + flux_cfg = flux_cfg + ) + + split_obj <- split_bcm_data(envres, response_var) + tenvres <- split_obj$tenvres + pdat <- split_obj$pdat + rm(envres) + gc() + + model_obj <- get_bcm_model_components() + predictors_all <- model_obj$predictors_all + pred_formula <- model_obj$pred_formula + lc_param_list <- model_obj$lc_param_list + default_params <- model_obj$default_params + nrounds_fixed <- model_obj$nrounds_fixed + + tasks <- get_valid_lc_tasks(tenvres) + + cv_results <- run_bcm_cv( + tenvres = tenvres, + tasks = tasks, + predictors_all = predictors_all, + response_var = response_var, + nrounds_fixed = nrounds_fixed, + lc_param_list = lc_param_list, + default_params = default_params, + flux_cfg = flux_cfg + ) + + pdat <- predict_bcm( + tenvres = tenvres, + pdat = pdat, + response_var = response_var, + pred_formula = pred_formula, + predictors_all = predictors_all, + nrounds_fixed = nrounds_fixed, + lc_param_list = lc_param_list, + default_params = default_params, + flux_cfg = flux_cfg + ) + + cols_to_keep <- c( + "utc", "index", + "flux_resi_pred", "flux_corrected", + flux_cfg$pred_residual_col, + flux_cfg$corrected_col + ) + cols_to_keep <- intersect(cols_to_keep, names(pdat)) + pdat <- pdat[, ..cols_to_keep] + + metric_obj <- compute_bcm_metrics(cv_results) + + file_prefix <- paste0(output_dir, "/", response_var) + + write_bcm_outputs( + file_prefix = file_prefix, + flux_cfg = flux_cfg, + overfit_table = metric_obj$overfit_table, + pdat = pdat, + r2_by_site = metric_obj$r2_by_site, + r2_by_lc = metric_obj$r2_by_lc, + cv_results = cv_results, + r2_by_index = metric_obj$r2_by_index + ) + + return(list( + pdat = pdat, + cv_results = cv_results, + r2_by_site = metric_obj$r2_by_site, + r2_by_lc = metric_obj$r2_by_lc, + overfit_table = metric_obj$overfit_table, + r2_by_index = metric_obj$r2_by_index + )) +} \ No newline at end of file diff --git a/modules/assim.sequential/inst/data_prep.R b/modules/assim.sequential/inst/data_prep.R new file mode 100644 index 00000000000..0f8b362cd7b --- /dev/null +++ b/modules/assim.sequential/inst/data_prep.R @@ -0,0 +1,633 @@ +## Load packages +library(ncdf4) +library(data.table) +library(dplyr) +library(future.apply) +library(tidyr) +library(lubridate) +library(purrr) +library(kgc) +library(terra) +library(lutz) + +##' @title read_annual_csv_series +##' @name read_annual_csv_series +##' @author Yang Gu +##' +##' @param years Integer vector of years. +##' @param path_template A character string template containing one `%d` +##' placeholder for year, e.g. "/path/to/file_%d.csv". +##' +##' @description +##' Read a series of annual CSV files and return them as a list of data.tables. +##' +##' @return +##' A named list of data.tables, one per year. +read_annual_csv_series <- function(years, path_template) { + out <- lapply(years, function(y) { + fread(sprintf(path_template, y)) + }) + names(out) <- as.character(years) + return(out) +} + +##' @title bind_annual_tables +##' @name bind_annual_tables +##' @author Yang Gu +##' +##' @param dt_list A list of data.tables. +##' @param drop_year Logical; if TRUE and a `year` column exists, remove it. +##' +##' @description +##' Standardize and row-bind annual tables into one combined data.table. +##' +##' @return +##' A single combined data.table. +bind_annual_tables <- function(dt_list, drop_year = TRUE) { + cleaned <- lapply(dt_list, function(df) { + df <- as.data.table(df) + if (drop_year && "year" %in% names(df)) { + df[, year := NULL] + } + return(df) + }) + out <- rbindlist(cleaned, use.names = TRUE, fill = TRUE) + return(out) +} + +##' @title extract_kgc_for_points +##' @name extract_kgc_for_points +##' @author Yang Gu +##' +##' @param coords_dt A data.table containing columns `lon`, `lat`, and `Y`. +##' @param kg_file Path to Köppen-Geiger raster. +##' @param kg_lookup Character vector mapping raster values to KGC classes. +##' +##' @description +##' Extract Köppen-Geiger climate classes for each coordinate point. +##' +##' @return +##' A data.table with columns `index`, `lon`, `lat`, and `KGC`. +extract_kgc_for_points <- function(coords_dt, kg_file, kg_lookup) { + kg_raster <- rast(kg_file) + coords_vect <- vect(coords_dt, geom = c("lon", "lat"), crs = "EPSG:4326") + ex <- terra::extract(kg_raster, coords_vect, ID = FALSE) + + vals <- as.integer(ex[[1]]) + valid <- !is.na(vals) & vals >= 1 & vals <= length(kg_lookup) + + KGC <- rep(NA_character_, length(vals)) + KGC[valid] <- kg_lookup[vals[valid]] + + points_meta <- data.table( + index = coords_dt$Y, + lon = coords_dt$lon, + lat = coords_dt$lat, + KGC = KGC + ) + points_meta[, KGC := factor(KGC, levels = kg_lookup)] + return(points_meta) +} + +##' @title add_point_metadata_to_era +##' @name add_point_metadata_to_era +##' @author Yang Gu +##' +##' @param era_dt ERA5 data.table with `index`. +##' @param coords_file Path to coordinate CSV file. +##' @param kg_file Path to Köppen-Geiger raster. +##' +##' @description +##' Add point-level metadata, including lon/lat and KGC, to ERA5 records. +##' +##' @return +##' A list with: +##' \itemize{ +##' \item `era_all`: ERA data.table merged with metadata +##' \item `points_meta`: point metadata table +##' } +add_point_metadata_to_era <- function(era_dt, coords_file, kg_file) { + test_coords <- fread(coords_file)[, .(lon, lat, Y)] + + kg_lookup <- c( + "Af","Am","Aw","BWh","BWk","BSh","BSk","Csa","Csb","Csc", + "Cwa","Cwb","Cwc","Cfa","Cfb","Cfc","Dsa","Dsb","Dsc", + "Dsd","Dwa","Dwb","Dwc","Dwd","Dfa","Dfb","Dfc","Dfd","ET","EF" + ) + + points_meta <- extract_kgc_for_points( + coords_dt = test_coords, + kg_file = kg_file, + kg_lookup = kg_lookup + ) + + era_dt <- merge( + era_dt, + points_meta, + by = "index", + all.x = TRUE, + allow.cartesian = TRUE + ) + + return(list( + era_all = era_dt, + points_meta = points_meta + )) +} + +##' @title expand_tiff_daily +##' @name expand_tiff_daily +##' @author Yang Gu +##' +##' @param tiff_file Path to TIFF-derived annual covariate CSV. +##' +##' @description +##' Expand annual/static TIFF-based covariates to daily resolution according +##' to the study year definition. +##' +##' @return +##' A daily-resolution data.table keyed by `index` and `Date`. +expand_tiff_daily <- function(tiff_file) { + tiff <- fread(tiff_file) + tiff <- as.data.table(tiff) + + tiff[, `:=`( + start_date = fifelse( + Year == 2012, + as.Date("2012-01-01"), + as.Date(paste0(Year - 1, "-07-16")) + ), + end_date = fifelse( + Year == 2012, + as.Date("2012-07-15"), + as.Date(paste0(Year, "-07-15")) + ) + )] + + tiff_daily <- tiff[, .( + Date = seq(start_date[1], end_date[1], by = "day") + ), by = .( + Year, index, cell, cell_lon, cell_lat, diff_lon, diff_lat, distance, + LC, twi, PH, Sand, agb, SOC, N, year_since_disturb + )] + + return(tiff_daily) +} + +##' @title read_soil_moisture_data +##' @name read_soil_moisture_data +##' @author Yang Gu +##' +##' @param soilm_file Path to soil moisture CSV. +##' +##' @description +##' Read and standardize soil moisture data. +##' +##' @return +##' A data.table with standardized columns `index` and `utc`. +read_soil_moisture_data <- function(soilm_file) { + soilm <- fread(soilm_file) + setnames(soilm, c("Y", "datetime"), c("index", "utc")) + return(soilm) +} + +##' @title add_daily_tiff_covariates +##' @name add_daily_tiff_covariates +##' @author Yang Gu +##' +##' @param envres Environmental data.table with `index` and `utc`. +##' @param tiff_daily Daily TIFF covariate data.table. +##' +##' @description +##' Merge daily TIFF covariates into the environmental table. +##' +##' @return +##' A merged data.table. +add_daily_tiff_covariates <- function(envres, tiff_daily) { + envres[, Date := as.Date(utc)] + setDT(envres) + setDT(tiff_daily) + setkey(envres, index, Date) + setkey(tiff_daily, index, Date) + + out <- merge( + envres, + tiff_daily, + by = c("index", "Date"), + all.x = TRUE, + sort = FALSE + ) + return(out) +} + +##' @title add_meteorological_derived_variables +##' @name add_meteorological_derived_variables +##' @author Yang Gu +##' +##' @param dt A data.table containing ERA5 meteorological variables. +##' +##' @description +##' Add derived meteorological variables such as PPFD, wind speed, +##' temperature in Celsius, relative humidity, VPD, and day/night indicator. +##' +##' @return +##' A data.table with derived variables appended. +add_meteorological_derived_variables <- function(dt) { + dt[, `:=`( + PPFD = 0.45 * ssrd / 4.57 / (3 * 3600), + WindSpeed = sqrt(u10^2 + v10^2), + t_air_C = t2m - 273.15, + d_air_C = d2m - 273.15 + )] + + dt[, `:=`( + RH = 100 * exp((17.625 * d_air_C) / (243.04 + d_air_C) - + (17.625 * t_air_C) / (243.04 + t_air_C)), + e_s = 0.6108 * exp((17.27 * t_air_C) / (t_air_C + 237.3)), + e_a = 0.6108 * exp((17.27 * d_air_C) / (d_air_C + 237.3)), + is_day = as.integer(ssrd > 10) + )] + + dt[, VPD := pmax(e_s - e_a, 0)] + dt[, c("t_air_C", "d_air_C", "e_s", "e_a") := NULL] + + return(dt) +} + +##' @title add_local_time_variables +##' @name add_local_time_variables +##' @author Yang Gu +##' +##' @param dt A data.table containing `index`, `lat.x`, `lon.x`, and `utc`. +##' +##' @description +##' Add local timezone, local datetime, and seasonal harmonic variables. +##' +##' @return +##' A data.table with local time variables appended. +add_local_time_variables <- function(dt) { + site_coords <- unique(dt[, .(index, lat.x, lon.x)]) + site_coords[, tz := lutz::tz_lookup_coords(lat.x, lon.x, method = "accurate")] + + dt[site_coords, tz := i.tz, on = "index"] + dt[, tlocal := with_tz(utc, tz = unique(tz)), by = tz] + + dt[, year_local := year(tlocal)] + dt[, days_in_year := ifelse(leap_year(year_local), 366, 365)] + dt[, doy := yday(tlocal) + + hour(tlocal) / 24 + + minute(tlocal) / 1440 + + second(tlocal) / 86400] + + dt[, `:=`( + sin_doy = sin(2 * pi * doy / days_in_year), + cos_doy = cos(2 * pi * doy / days_in_year) + )] + + cols_to_drop <- intersect( + c("cell", "cell_lon", "cell_lat", "diff_lon", "diff_lat", + "distance", "tz", "lat.y", "lon.y", "lon_index", + "lat_index", "year_local", "days_in_year"), + names(dt) + ) + if (length(cols_to_drop) > 0) { + dt[, (cols_to_drop) := NULL] + } + + return(dt) +} + +##' @title read_and_fill_modis_daily +##' @name read_and_fill_modis_daily +##' @author Yang Gu +##' +##' @param modis_files Character vector of MODIS CSV file paths. +##' @param start_date Start date. +##' @param end_date End date. +##' +##' @description +##' Read MODIS NDVI/EVI data and fill to daily resolution using nearest +##' available observations. +##' +##' @return +##' A data.table with columns `index`, `Date`, `EVI`, and `NDVI`. +read_and_fill_modis_daily <- function(modis_files, + start_date = as.Date("2012-01-01"), + end_date = as.Date("2024-12-31")) { + dat <- rbindlist(lapply(modis_files, fread), use.names = TRUE, fill = TRUE) + + setnames( + dat, + old = c("ID", "Latitude", "Longitude", + "MYD13Q1_061__250m_16_days_EVI", + "MYD13Q1_061__250m_16_days_NDVI"), + new = c("index", "lat", "lon", "EVI", "NDVI") + ) + + dat <- dat[, .(index, Date, EVI, NDVI)] + + unique_idx <- unique(dat$index) + all_days <- data.table( + index = rep(unique_idx, each = as.integer(end_date - start_date + 1)), + Date = rep(seq(start_date, end_date, by = "day"), times = length(unique_idx)), + lat = NA_real_, + lon = NA_real_, + EVI = NA_real_, + NDVI = NA_real_ + ) + + setkey(dat, index, Date) + setkey(all_days, index, Date) + + all_days[, Date_cal := Date] + filled <- dat[all_days, on = .(index, Date), roll = "nearest"] + final_df <- filled[, .(index, Date = Date_cal, EVI, NDVI)] + + return(final_df) +} + +##' @title add_modis_covariates +##' @name add_modis_covariates +##' @author Yang Gu +##' +##' @param envres Environmental result table. +##' @param modis_dt Daily MODIS table. +##' +##' @description +##' Merge daily MODIS EVI/NDVI data into the environmental result table. +##' +##' @return +##' A merged data.table. +add_modis_covariates <- function(envres, modis_dt) { + out <- merge( + envres, + modis_dt, + by = c("index", "Date"), + all.x = TRUE + ) + return(out) +} + +##' @title read_sipnet_flux_outputs +##' @name read_sipnet_flux_outputs +##' @author Yang Gu +##' +##' @param years Integer vector of years. +##' @param path_template File path template for SIPNET annual CSV files. +##' +##' @description +##' Read and combine annual SIPNET flux output files. +##' +##' @return +##' A data.table with columns `utc`, `index`, `NEE_mean`, `Qle_mean`, `GPP_mean`. +read_sipnet_flux_outputs <- function(years, path_template) { + sipnet_list <- read_annual_csv_series(years, path_template) + sipnet_all <- bind_annual_tables(sipnet_list, drop_year = TRUE) + + setnames(sipnet_all, "time", "utc") + output.df <- sipnet_all[, .(utc, index, NEE_mean, Qle_mean, GPP_mean)] + + return(output.df) +} + +##' @title add_sipnet_outputs +##' @name add_sipnet_outputs +##' @author Yang Gu +##' +##' @param envres Environmental result table. +##' @param sipnet_dt SIPNET output table. +##' +##' @description +##' Merge SIPNET outputs into the environmental result table and convert +##' NEE units to observation-compatible units. +##' +##' @return +##' A merged data.table. +add_sipnet_outputs <- function(envres, sipnet_dt) { + out <- merge( + envres, + sipnet_dt[, .(utc, index, NEE_mean, Qle_mean, GPP_mean)], + by = c("utc", "index"), + all.x = TRUE, + sort = FALSE + ) + + out[, NEE_mean := NEE_mean * 1e8 / 1.157407] + return(out) +} + +##' @title read_and_match_ec_data +##' @name read_and_match_ec_data +##' @author Yang Gu +##' +##' @param ec_file Path to eddy covariance data CSV. +##' @param matched_file Path to matched point-site CSV. +##' +##' @description +##' Read eddy covariance data, fill missing flux variables using fallback +##' columns, and match site IDs to spatial indices. +##' +##' @return +##' A data.table with columns matched to `utc` and `index`. +read_and_match_ec_data <- function(ec_file, matched_file) { + resimet.df <- fread(ec_file) + close_points_df <- fread(matched_file) + + setDT(close_points_df) + close_points_df <- close_points_df[order(min_dist_m), .SD[1], by = index] + + resimet.df[is.na(NEE_CUT_USTAR50) & !is.na(NEE_VUT_USTAR50), + NEE_CUT_USTAR50 := NEE_VUT_USTAR50] + resimet.df[is.na(GPP_NT_CUT_USTAR50) & !is.na(GPP_NT_VUT_USTAR50), + GPP_NT_CUT_USTAR50 := GPP_NT_VUT_USTAR50] + + resimet.df <- resimet.df[, .( + utc, NEE_CUT_USTAR50, LE_F_MDS, GPP_NT_CUT_USTAR50, Site_ID + )] + + site_index_map <- close_points_df[, .(Site_ID, index)] + + resimet.df <- merge( + resimet.df, + site_index_map, + by = "Site_ID", + all.x = TRUE + ) + + resimet.df <- resimet.df[, .SD[1], by = .(utc, index)] + return(resimet.df) +} + +##' @title add_ec_observations_and_residuals +##' @name add_ec_observations_and_residuals +##' @author Yang Gu +##' +##' @param envres Environmental result table. +##' @param ec_dt Eddy covariance table. +##' +##' @description +##' Merge EC observations into the environmental result table and compute +##' residuals between SIPNET outputs and EC observations. +##' +##' @return +##' A data.table with residual columns added. +add_ec_observations_and_residuals <- function(envres, ec_dt) { + out <- merge( + envres, + ec_dt, + by = c("utc", "index"), + all.x = TRUE, + sort = FALSE + ) + + out[, `:=`( + NEE_residual = NEE_mean - NEE_CUT_USTAR50, + LE_residual = Qle_mean - LE_F_MDS, + GPP_residual = GPP_mean - GPP_NT_CUT_USTAR50 + )] + + out <- out[, .SD[1], by = .(utc, index)] + return(out) +} + +##' @title finalize_envres_columns +##' @name finalize_envres_columns +##' @author Yang Gu +##' +##' @param dt Final environmental result table. +##' +##' @description +##' Clean up duplicated coordinate columns and standardize final names. +##' +##' @return +##' A cleaned data.table ready for saving. +finalize_envres_columns <- function(dt) { + drop_cols <- intersect(c("lat.y", "lon.y"), names(dt)) + if (length(drop_cols) > 0) { + dt[, (drop_cols) := NULL] + } + + rename_old <- intersect(c("lat.x", "lon.x"), names(dt)) + rename_new <- c("lat", "lon")[c("lat.x", "lon.x") %in% rename_old] + + if (length(rename_old) > 0) { + setnames(dt, rename_old, rename_new) + } + + return(dt) +} + +##' @title build_envres_dataset +##' @name build_envres_dataset +##' @author Yang Gu +##' +##' @param years Integer vector of years to process. +##' @param era_template File path template for ERA annual CSV files. +##' @param coords_file Path to coordinate CSV. +##' @param kg_file Path to Köppen-Geiger raster. +##' @param tiff_file Path to TIFF annual covariate CSV. +##' @param soilm_file Path to soil moisture CSV. +##' @param modis_files Character vector of MODIS CSV files. +##' @param sipnet_template File path template for SIPNET annual CSV files. +##' @param ec_file Path to eddy covariance CSV. +##' @param matched_file Path to matched spatial index CSV. +##' +##' @description +##' Build the full environmental modeling dataset by combining ERA5, +##' soil moisture, TIFF-derived covariates, MODIS, SIPNET outputs, and +##' eddy covariance observations. +##' +##' @return +##' A final integrated data.table. +build_envres_dataset <- function( + years, + era_template, + coords_file, + kg_file, + tiff_file, + soilm_file, + modis_files, + sipnet_template, + ec_file, + matched_file +) { + ## ERA5 + era_list <- read_annual_csv_series(years, era_template) + era_all <- bind_annual_tables(era_list, drop_year = TRUE) + rm(era_list) + gc() + + era_meta <- add_point_metadata_to_era( + era_dt = era_all, + coords_file = coords_file, + kg_file = kg_file + ) + era_all <- era_meta$era_all + points_meta <- era_meta$points_meta + gc() + + ## TIFF + tiff_daily <- expand_tiff_daily(tiff_file) + gc() + + ## Soil moisture + soilm <- read_soil_moisture_data(soilm_file) + + ## Merge ERA + soil moisture + envres <- merge(era_all, soilm, by = c("index", "utc"), all.x = TRUE) + rm(era_all, soilm) + gc() + + ## Add daily TIFF covariates + envres <- add_daily_tiff_covariates(envres, tiff_daily) + rm(tiff_daily) + gc() + + ## Derived meteorological variables + envres <- add_meteorological_derived_variables(envres) + gc() + + ## Local time and seasonal harmonics + envres <- add_local_time_variables(envres) + gc() + + ## MODIS + modis_dt <- read_and_fill_modis_daily(modis_files) + envres <- add_modis_covariates(envres, modis_dt) + rm(modis_dt) + gc() + + ## SIPNET + sipnet_dt <- read_sipnet_flux_outputs(years, sipnet_template) + envres <- add_sipnet_outputs(envres, sipnet_dt) + rm(sipnet_dt) + gc() + + ## Eddy covariance + ec_dt <- read_and_match_ec_data(ec_file, matched_file) + envres <- add_ec_observations_and_residuals(envres, ec_dt) + rm(ec_dt, points_meta) + gc() + + ## Final cleanup + envres <- finalize_envres_columns(envres) + gc() + + return(envres) +} + +##' @title save_envres_dataset +##' @name save_envres_dataset +##' @author Yang Gu +##' +##' @param envres Final environmental result data.table. +##' @param output_file Output `.RData` file path. +##' +##' @description +##' Save the final dataset to an `.RData` file. +##' +##' @return +##' No return value. Saves `envres` to disk. +save_envres_dataset <- function(envres, output_file) { + save(envres, file = output_file) +} \ No newline at end of file