diff --git a/R/frs_break.R b/R/frs_break.R index 8fd45a1..a68d3c9 100644 --- a/R/frs_break.R +++ b/R/frs_break.R @@ -43,6 +43,10 @@ #' @param label_map Named character vector or `NULL`. Maps values in #' `label_col` to output labels (e.g. `c("BARRIER" = "blocked")`). #' Only used with `label_col`. +#' @param col_blk Character. Column name for the stream identifier in +#' `points_table`. Default `"blue_line_key"`. +#' @param col_measure Character. Column name for the route measure in +#' `points_table`. Default `"downstream_route_measure"`. #' @param overwrite Logical. If `TRUE`, drop `to` before creating. #' Default `TRUE`. #' @param append Logical. If `TRUE`, INSERT INTO existing `to` table @@ -97,6 +101,8 @@ frs_break_find <- function(conn, table, to = "working.breaks", where = NULL, aoi = NULL, label = NULL, label_col = NULL, label_map = NULL, + col_blk = "blue_line_key", + col_measure = "downstream_route_measure", overwrite = TRUE, append = FALSE) { .frs_validate_identifier(table, "source table") .frs_validate_identifier(to, "destination table") @@ -124,8 +130,9 @@ frs_break_find <- function(conn, table, to = "working.breaks", .frs_break_find_attribute(conn, table, to, attribute, threshold, interval, distance) } else if (has_table) { - .frs_break_find_table(conn, table, to, points_table, where, aoi, - label, label_col, label_map, append) + .frs_break_find_table(conn, table, to, points_table, where, + label, label_col, label_map, + col_blk, col_measure, append) } else { .frs_break_find_points(conn, table, to, points) } @@ -196,32 +203,36 @@ frs_break_find <- function(conn, table, to = "working.breaks", #' #' @noRd .frs_break_find_table <- function(conn, table, to, points_table, - where = NULL, aoi = NULL, + where = NULL, label = NULL, label_col = NULL, label_map = NULL, + col_blk = "blue_line_key", + col_measure = "downstream_route_measure", append = FALSE) { .frs_validate_identifier(points_table, "points table") + .frs_validate_identifier(col_blk, "col_blk") + .frs_validate_identifier(col_measure, "col_measure") clauses <- character(0) + + # Scope to BLKs present in the working streams table + clauses <- c(clauses, sprintf( + "%s IN (SELECT DISTINCT blue_line_key FROM %s)", + col_blk, table)) + if (!is.null(where)) { clauses <- c(clauses, where) } - aoi_pred <- .frs_resolve_aoi(aoi, conn = conn) - if (nzchar(aoi_pred)) { - clauses <- c(clauses, aoi_pred) - } - where_clause <- if (length(clauses) > 0) { - paste(" WHERE", paste(clauses, collapse = " AND ")) - } else { - "" - } + where_clause <- paste(" WHERE", paste(clauses, collapse = " AND ")) # Build label expression label_expr <- .frs_label_expr(label, label_col, label_map) + # Alias source columns to standard names select_sql <- sprintf( - "SELECT DISTINCT blue_line_key, downstream_route_measure, %s, %s AS source FROM %s%s", + "SELECT DISTINCT %s AS blue_line_key, %s AS downstream_route_measure, %s, %s AS source FROM %s%s", + col_blk, col_measure, label_expr, .frs_quote_string(points_table), points_table, where_clause diff --git a/R/frs_habitat.R b/R/frs_habitat.R index 75540b1..ab83c8d 100644 --- a/R/frs_habitat.R +++ b/R/frs_habitat.R @@ -21,6 +21,11 @@ #' [frs_habitat_access()], or `NULL` for gradient-only. Each spec is a #' list with `table`, and optionally `where`, `label`, `label_col`, #' `label_map`. See [frs_habitat_access()] for details. +#' @param to_prefix Character or `NULL`. When provided, persist species +#' output tables with this prefix (e.g. `"fresh.streams"` creates +#' `fresh.streams_co`, `fresh.streams_bt`). Existing rows for the +#' same WSG are replaced (delete + insert). Default `NULL` (no +#' persistence, working tables only). #' @param password Character. Database password for parallel workers. #' Required when `workers > 1` and the database uses password auth. #' Not needed for trust auth or `.pgpass`. @@ -55,6 +60,14 @@ #' label_map = c("BARRIER" = "blocked", "POTENTIAL" = "potential")) #' )) #' +#' # Persist to output tables (accumulate across runs) +#' result <- frs_habitat(conn, "BULK", +#' to_prefix = "fresh.streams", +#' break_sources = list( +#' list(table = "working.falls", label = "blocked"))) +#' # Creates: fresh.streams_co, fresh.streams_bt, etc. +#' # Re-run with "MORR" — appends to same tables +#' #' # Gradient-only (no external break sources) #' result <- frs_habitat(conn, "ADMS") #' @@ -62,6 +75,7 @@ #' } frs_habitat <- function(conn, wsg, workers = 1L, break_sources = NULL, + to_prefix = NULL, password = "", cleanup = TRUE, verbose = TRUE) { stopifnot(is.character(wsg), length(wsg) > 0) @@ -240,7 +254,38 @@ frs_habitat <- function(conn, wsg, workers = 1L, } # ========================================================================== - # Phase 3: Cleanup + # Phase 3: Persist to output tables + # ========================================================================== + if (!is.null(to_prefix)) { + .frs_validate_identifier(to_prefix, "to_prefix") + for (i in seq_len(nrow(results))) { + src <- results$table_name[i] + sp <- tolower(results$species_code[i]) + dest <- paste0(to_prefix, "_", sp) + wsg_code <- results$partition[i] + + # Create if not exists (match source schema, no generated columns) + .frs_db_execute(conn, sprintf( + "CREATE TABLE IF NOT EXISTS %s AS SELECT * FROM %s LIMIT 0", dest, src)) + + # Delete existing rows for this WSG, then insert + .frs_db_execute(conn, sprintf( + "DELETE FROM %s WHERE watershed_group_code = %s", + dest, .frs_quote_string(toupper(wsg_code)))) + .frs_db_execute(conn, sprintf( + "INSERT INTO %s SELECT * FROM %s", dest, src)) + + if (verbose) { + n <- DBI::dbGetQuery(conn, + sprintf("SELECT count(*)::int AS n FROM %s WHERE watershed_group_code = %s", + dest, .frs_quote_string(toupper(wsg_code))))$n + cat(" -> ", dest, " (", n, " rows)\n", sep = "") + } + } + } + + # ========================================================================== + # Phase 4: Cleanup # ========================================================================== if (cleanup) { for (tbl in cleanup_tables) { @@ -371,11 +416,8 @@ frs_habitat_partition <- function(conn, aoi, label, species, cleanup_tables <- c(cleanup_tables, breaks_tbl) t0 <- proc.time() - # For WSG code AOIs, add watershed_group_code filter to each break source - # (avoids needing geometry on break source tables) - src <- .frs_scope_break_sources(break_sources, aoi) frs_habitat_access(conn, base_tbl, threshold = thr, - to = breaks_tbl, break_sources = src) + to = breaks_tbl, break_sources = break_sources) if (verbose) { spp <- species$species_code[species$access_gradient == thr] @@ -501,6 +543,8 @@ frs_habitat_access <- function(conn, table, threshold, label = src$label, label_col = src$label_col, label_map = src$label_map, + col_blk = if (is.null(src$col_blk)) "blue_line_key" else src$col_blk, + col_measure = if (is.null(src$col_measure)) "downstream_route_measure" else src$col_measure, to = to, overwrite = FALSE, append = TRUE) } } @@ -542,35 +586,6 @@ frs_habitat_access <- function(conn, table, threshold, } -#' Scope break sources to a WSG code -#' -#' When AOI is a 4-letter WSG code, appends a `watershed_group_code` -#' filter to each break source's `where` clause. This avoids needing -#' geometry on break source tables (e.g. CSV-loaded falls). -#' -#' @param break_sources List of break source specs, or NULL. -#' @param aoi AOI specification. -#' @return Modified break_sources list, or NULL. -#' @noRd -.frs_scope_break_sources <- function(break_sources, aoi) { - if (is.null(break_sources)) return(NULL) - if (!(is.character(aoi) && length(aoi) == 1 && grepl("^[A-Z]{4}$", aoi))) { - return(break_sources) - } - - wsg_pred <- paste0("watershed_group_code = ", .frs_quote_string(aoi)) - lapply(break_sources, function(src) { - w <- src$where - src$where <- if (!is.null(w) && nzchar(w)) { - paste(w, "AND", wsg_pred) - } else { - wsg_pred - } - src - }) -} - - #' Classify Habitat for One Species #' #' Copy a base stream network, apply pre-computed access barriers, then diff --git a/man/frs_break_find.Rd b/man/frs_break_find.Rd index c61daf7..2326805 100644 --- a/man/frs_break_find.Rd +++ b/man/frs_break_find.Rd @@ -19,6 +19,8 @@ frs_break_find( label = NULL, label_col = NULL, label_map = NULL, + col_blk = "blue_line_key", + col_measure = "downstream_route_measure", overwrite = TRUE, append = FALSE ) @@ -73,6 +75,12 @@ read labels from. Values are passed through as-is, or remapped via \code{label_col} to output labels (e.g. \code{c("BARRIER" = "blocked")}). Only used with \code{label_col}.} +\item{col_blk}{Character. Column name for the stream identifier in +\code{points_table}. Default \code{"blue_line_key"}.} + +\item{col_measure}{Character. Column name for the route measure in +\code{points_table}. Default \code{"downstream_route_measure"}.} + \item{overwrite}{Logical. If \code{TRUE}, drop \code{to} before creating. Default \code{TRUE}.} diff --git a/man/frs_habitat.Rd b/man/frs_habitat.Rd index f57ab43..dbd0883 100644 --- a/man/frs_habitat.Rd +++ b/man/frs_habitat.Rd @@ -9,6 +9,7 @@ frs_habitat( wsg, workers = 1L, break_sources = NULL, + to_prefix = NULL, password = "", cleanup = TRUE, verbose = TRUE @@ -31,6 +32,12 @@ Used for both Phase 1 (partition prep across WSGs) and Phase 2 list with \code{table}, and optionally \code{where}, \code{label}, \code{label_col}, \code{label_map}. See \code{\link[=frs_habitat_access]{frs_habitat_access()}} for details.} +\item{to_prefix}{Character or \code{NULL}. When provided, persist species +output tables with this prefix (e.g. \code{"fresh.streams"} creates +\code{fresh.streams_co}, \code{fresh.streams_bt}). Existing rows for the +same WSG are replaced (delete + insert). Default \code{NULL} (no +persistence, working tables only).} + \item{password}{Character. Database password for parallel workers. Required when \code{workers > 1} and the database uses password auth. Not needed for trust auth or \code{.pgpass}.} @@ -75,6 +82,14 @@ result <- frs_habitat(conn, "ADMS", break_sources = list( label_map = c("BARRIER" = "blocked", "POTENTIAL" = "potential")) )) +# Persist to output tables (accumulate across runs) +result <- frs_habitat(conn, "BULK", + to_prefix = "fresh.streams", + break_sources = list( + list(table = "working.falls", label = "blocked"))) +# Creates: fresh.streams_co, fresh.streams_bt, etc. +# Re-run with "MORR" — appends to same tables + # Gradient-only (no external break sources) result <- frs_habitat(conn, "ADMS") diff --git a/tests/testthat/test-frs_break.R b/tests/testthat/test-frs_break.R index e212b91..adf929c 100644 --- a/tests/testthat/test-frs_break.R +++ b/tests/testthat/test-frs_break.R @@ -67,10 +67,9 @@ test_that("frs_break_find table mode adds AOI filter", { ) frs_break_find("mock", "working.streams", - points_table = "bcfishpass.falls_events_sp", - aoi = "BULK") + points_table = "bcfishpass.falls_events_sp") - expect_match(sql_log[2], "WHERE.*BULK") + expect_match(sql_log[2], "WHERE.*blue_line_key IN") }) test_that("frs_break_find table mode adds where filter", { @@ -100,11 +99,10 @@ test_that("frs_break_find table mode combines where and aoi", { frs_break_find("mock", "working.streams", points_table = "bcfishpass.falls_vw", - where = "barrier_ind = TRUE", - aoi = "BULK") + where = "barrier_ind = TRUE") + expect_match(sql_log[2], "blue_line_key IN") expect_match(sql_log[2], "barrier_ind = TRUE") - expect_match(sql_log[2], "BULK") expect_match(sql_log[2], "AND") })