Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions R/frs_break.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
#' @param label_map Named character vector or `NULL`. Maps values in
#' `label_col` to output labels (e.g. `c("BARRIER" = "blocked")`).
#' Only used with `label_col`.
#' @param col_blk Character. Column name for the stream identifier in
#' `points_table`. Default `"blue_line_key"`.
#' @param col_measure Character. Column name for the route measure in
#' `points_table`. Default `"downstream_route_measure"`.
#' @param overwrite Logical. If `TRUE`, drop `to` before creating.
#' Default `TRUE`.
#' @param append Logical. If `TRUE`, INSERT INTO existing `to` table
Expand Down Expand Up @@ -97,6 +101,8 @@ frs_break_find <- function(conn, table, to = "working.breaks",
where = NULL, aoi = NULL,
label = NULL, label_col = NULL,
label_map = NULL,
col_blk = "blue_line_key",
col_measure = "downstream_route_measure",
overwrite = TRUE, append = FALSE) {
.frs_validate_identifier(table, "source table")
.frs_validate_identifier(to, "destination table")
Expand Down Expand Up @@ -124,8 +130,9 @@ frs_break_find <- function(conn, table, to = "working.breaks",
.frs_break_find_attribute(conn, table, to, attribute, threshold,
interval, distance)
} else if (has_table) {
.frs_break_find_table(conn, table, to, points_table, where, aoi,
label, label_col, label_map, append)
.frs_break_find_table(conn, table, to, points_table, where,
label, label_col, label_map,
col_blk, col_measure, append)
} else {
.frs_break_find_points(conn, table, to, points)
}
Expand Down Expand Up @@ -196,32 +203,36 @@ frs_break_find <- function(conn, table, to = "working.breaks",
#'
#' @noRd
.frs_break_find_table <- function(conn, table, to, points_table,
where = NULL, aoi = NULL,
where = NULL,
label = NULL, label_col = NULL,
label_map = NULL,
col_blk = "blue_line_key",
col_measure = "downstream_route_measure",
append = FALSE) {
.frs_validate_identifier(points_table, "points table")
.frs_validate_identifier(col_blk, "col_blk")
.frs_validate_identifier(col_measure, "col_measure")

clauses <- character(0)

# Scope to BLKs present in the working streams table
clauses <- c(clauses, sprintf(
"%s IN (SELECT DISTINCT blue_line_key FROM %s)",
col_blk, table))

if (!is.null(where)) {
clauses <- c(clauses, where)
}
aoi_pred <- .frs_resolve_aoi(aoi, conn = conn)
if (nzchar(aoi_pred)) {
clauses <- c(clauses, aoi_pred)
}

where_clause <- if (length(clauses) > 0) {
paste(" WHERE", paste(clauses, collapse = " AND "))
} else {
""
}
where_clause <- paste(" WHERE", paste(clauses, collapse = " AND "))

# Build label expression
label_expr <- .frs_label_expr(label, label_col, label_map)

# Alias source columns to standard names
select_sql <- sprintf(
"SELECT DISTINCT blue_line_key, downstream_route_measure, %s, %s AS source FROM %s%s",
"SELECT DISTINCT %s AS blue_line_key, %s AS downstream_route_measure, %s, %s AS source FROM %s%s",
col_blk, col_measure,
label_expr,
.frs_quote_string(points_table),
points_table, where_clause
Expand Down
83 changes: 49 additions & 34 deletions R/frs_habitat.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
#' [frs_habitat_access()], or `NULL` for gradient-only. Each spec is a
#' list with `table`, and optionally `where`, `label`, `label_col`,
#' `label_map`. See [frs_habitat_access()] for details.
#' @param to_prefix Character or `NULL`. When provided, persist species
#' output tables with this prefix (e.g. `"fresh.streams"` creates
#' `fresh.streams_co`, `fresh.streams_bt`). Existing rows for the
#' same WSG are replaced (delete + insert). Default `NULL` (no
#' persistence, working tables only).
#' @param password Character. Database password for parallel workers.
#' Required when `workers > 1` and the database uses password auth.
#' Not needed for trust auth or `.pgpass`.
Expand Down Expand Up @@ -55,13 +60,22 @@
#' label_map = c("BARRIER" = "blocked", "POTENTIAL" = "potential"))
#' ))
#'
#' # Persist to output tables (accumulate across runs)
#' result <- frs_habitat(conn, "BULK",
#' to_prefix = "fresh.streams",
#' break_sources = list(
#' list(table = "working.falls", label = "blocked")))
#' # Creates: fresh.streams_co, fresh.streams_bt, etc.
#' # Re-run with "MORR" — appends to same tables
#'
#' # Gradient-only (no external break sources)
#' result <- frs_habitat(conn, "ADMS")
#'
#' DBI::dbDisconnect(conn)
#' }
frs_habitat <- function(conn, wsg, workers = 1L,
break_sources = NULL,
to_prefix = NULL,
password = "",
cleanup = TRUE, verbose = TRUE) {
stopifnot(is.character(wsg), length(wsg) > 0)
Expand Down Expand Up @@ -240,7 +254,38 @@ frs_habitat <- function(conn, wsg, workers = 1L,
}

# ==========================================================================
# Phase 3: Cleanup
# Phase 3: Persist to output tables
# ==========================================================================
if (!is.null(to_prefix)) {
.frs_validate_identifier(to_prefix, "to_prefix")
for (i in seq_len(nrow(results))) {
src <- results$table_name[i]
sp <- tolower(results$species_code[i])
dest <- paste0(to_prefix, "_", sp)
wsg_code <- results$partition[i]

# Create if not exists (match source schema, no generated columns)
.frs_db_execute(conn, sprintf(
"CREATE TABLE IF NOT EXISTS %s AS SELECT * FROM %s LIMIT 0", dest, src))

# Delete existing rows for this WSG, then insert
.frs_db_execute(conn, sprintf(
"DELETE FROM %s WHERE watershed_group_code = %s",
dest, .frs_quote_string(toupper(wsg_code))))
.frs_db_execute(conn, sprintf(
"INSERT INTO %s SELECT * FROM %s", dest, src))

if (verbose) {
n <- DBI::dbGetQuery(conn,
sprintf("SELECT count(*)::int AS n FROM %s WHERE watershed_group_code = %s",
dest, .frs_quote_string(toupper(wsg_code))))$n
cat(" -> ", dest, " (", n, " rows)\n", sep = "")
}
}
}

# ==========================================================================
# Phase 4: Cleanup
# ==========================================================================
if (cleanup) {
for (tbl in cleanup_tables) {
Expand Down Expand Up @@ -371,11 +416,8 @@ frs_habitat_partition <- function(conn, aoi, label, species,
cleanup_tables <- c(cleanup_tables, breaks_tbl)

t0 <- proc.time()
# For WSG code AOIs, add watershed_group_code filter to each break source
# (avoids needing geometry on break source tables)
src <- .frs_scope_break_sources(break_sources, aoi)
frs_habitat_access(conn, base_tbl, threshold = thr,
to = breaks_tbl, break_sources = src)
to = breaks_tbl, break_sources = break_sources)

if (verbose) {
spp <- species$species_code[species$access_gradient == thr]
Expand Down Expand Up @@ -501,6 +543,8 @@ frs_habitat_access <- function(conn, table, threshold,
label = src$label,
label_col = src$label_col,
label_map = src$label_map,
col_blk = if (is.null(src$col_blk)) "blue_line_key" else src$col_blk,
col_measure = if (is.null(src$col_measure)) "downstream_route_measure" else src$col_measure,
to = to, overwrite = FALSE, append = TRUE)
}
}
Expand Down Expand Up @@ -542,35 +586,6 @@ frs_habitat_access <- function(conn, table, threshold,
}


#' Scope break sources to a WSG code
#'
#' When AOI is a 4-letter WSG code, appends a `watershed_group_code`
#' filter to each break source's `where` clause. This avoids needing
#' geometry on break source tables (e.g. CSV-loaded falls).
#'
#' @param break_sources List of break source specs, or NULL.
#' @param aoi AOI specification.
#' @return Modified break_sources list, or NULL.
#' @noRd
.frs_scope_break_sources <- function(break_sources, aoi) {
if (is.null(break_sources)) return(NULL)
if (!(is.character(aoi) && length(aoi) == 1 && grepl("^[A-Z]{4}$", aoi))) {
return(break_sources)
}

wsg_pred <- paste0("watershed_group_code = ", .frs_quote_string(aoi))
lapply(break_sources, function(src) {
w <- src$where
src$where <- if (!is.null(w) && nzchar(w)) {
paste(w, "AND", wsg_pred)
} else {
wsg_pred
}
src
})
}


#' Classify Habitat for One Species
#'
#' Copy a base stream network, apply pre-computed access barriers, then
Expand Down
8 changes: 8 additions & 0 deletions man/frs_break_find.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions man/frs_habitat.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions tests/testthat/test-frs_break.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ test_that("frs_break_find table mode adds AOI filter", {
)

frs_break_find("mock", "working.streams",
points_table = "bcfishpass.falls_events_sp",
aoi = "BULK")
points_table = "bcfishpass.falls_events_sp")

expect_match(sql_log[2], "WHERE.*BULK")
expect_match(sql_log[2], "WHERE.*blue_line_key IN")
})

test_that("frs_break_find table mode adds where filter", {
Expand Down Expand Up @@ -100,11 +99,10 @@ test_that("frs_break_find table mode combines where and aoi", {

frs_break_find("mock", "working.streams",
points_table = "bcfishpass.falls_vw",
where = "barrier_ind = TRUE",
aoi = "BULK")
where = "barrier_ind = TRUE")

expect_match(sql_log[2], "blue_line_key IN")
expect_match(sql_log[2], "barrier_ind = TRUE")
expect_match(sql_log[2], "BULK")
expect_match(sql_log[2], "AND")
})

Expand Down
Loading