Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 203 additions & 42 deletions src/commands/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use crate::core::config::{get_full_context, render_globals, render_string_value}
use crate::core::env::load_env_vars;
use crate::core::templating::{self, ParsedQuery};
use crate::core::utils::{
catch_error_and_exit, check_exports_as_statecheck_proxy, export_vars, perform_retries,
pull_providers, run_ext_script, run_stackql_command, run_stackql_query, show_query,
catch_error_and_exit, check_exports_as_statecheck_proxy, check_short_circuit, export_vars,
flatten_returning_row, has_returning_clause, perform_retries, pull_providers,
run_callback_poll, run_ext_script, run_stackql_command, run_stackql_dml_returning,
run_stackql_query, show_query,
};
use crate::resource::manifest::{Manifest, Resource};
use crate::resource::validation::validate_manifest;
Expand Down Expand Up @@ -291,6 +293,9 @@ impl CommandRunner {
}

/// Create a resource.
///
/// Returns `(created, returning_row)` where `returning_row` is `Some` when
/// the create query included `RETURNING *` and the provider returned data.
#[allow(clippy::too_many_arguments)]
pub fn create_resource(
&mut self,
Expand All @@ -301,30 +306,53 @@ impl CommandRunner {
dry_run: bool,
show_queries: bool,
ignore_errors: bool,
) -> bool {
) -> (bool, Option<HashMap<String, String>>) {
if dry_run {
info!(
"dry run create for [{}]:\n\n/* insert (create) query */\n{}\n",
resource.name, create_query
);
return false;
if has_returning_clause(create_query) {
info!(
"dry run create for [{}]:\n\n/* insert (create) query with RETURNING */\n{}\n\
[dry run: RETURNING * capture skipped]\n",
resource.name, create_query
);
} else {
info!(
"dry run create for [{}]:\n\n/* insert (create) query */\n{}\n",
resource.name, create_query
);
}
return (false, None);
}

info!("[{}] does not exist, creating...", resource.name);
show_query(show_queries, create_query);

let msg = run_stackql_command(
create_query,
&mut self.client,
ignore_errors,
retries,
retry_delay,
);
debug!("Create response: {}", msg);
true
if has_returning_clause(create_query) {
let (msg, returning_row) = run_stackql_dml_returning(
create_query,
&mut self.client,
ignore_errors,
retries,
retry_delay,
);
debug!("Create response: {}", msg);
(true, returning_row)
} else {
let msg = run_stackql_command(
create_query,
&mut self.client,
ignore_errors,
retries,
retry_delay,
);
debug!("Create response: {}", msg);
(true, None)
}
}

/// Update a resource.
///
/// Returns `(updated, returning_row)` where `returning_row` is `Some` when
/// the update query included `RETURNING *` and the provider returned data.
#[allow(clippy::too_many_arguments)]
pub fn update_resource(
&mut self,
Expand All @@ -335,41 +363,64 @@ impl CommandRunner {
dry_run: bool,
show_queries: bool,
ignore_errors: bool,
) -> bool {
) -> (bool, Option<HashMap<String, String>>) {
match update_query {
Some(query) => {
if dry_run {
info!(
"dry run update for [{}]:\n\n/* update query */\n{}\n",
resource.name, query
);
return false;
if has_returning_clause(query) {
info!(
"dry run update for [{}]:\n\n/* update query with RETURNING */\n{}\n\
[dry run: RETURNING * capture skipped]\n",
resource.name, query
);
} else {
info!(
"dry run update for [{}]:\n\n/* update query */\n{}\n",
resource.name, query
);
}
return (false, None);
}

info!("updating [{}]...", resource.name);
show_query(show_queries, query);

let msg = run_stackql_command(
query,
&mut self.client,
ignore_errors,
retries,
retry_delay,
);
debug!("Update response: {}", msg);
true
if has_returning_clause(query) {
let (msg, returning_row) = run_stackql_dml_returning(
query,
&mut self.client,
ignore_errors,
retries,
retry_delay,
);
debug!("Update response: {}", msg);
(true, returning_row)
} else {
let msg = run_stackql_command(
query,
&mut self.client,
ignore_errors,
retries,
retry_delay,
);
debug!("Update response: {}", msg);
(true, None)
}
}
None => {
info!(
"Update query not configured for [{}], skipping update...",
resource.name
);
false
(false, None)
}
}
}

/// Delete a resource.
///
/// Returns `Some(first_row)` when the delete query included `RETURNING *`
/// and the provider returned data; `None` otherwise.
#[allow(clippy::too_many_arguments)]
pub fn delete_resource(
&mut self,
Expand All @@ -380,26 +431,136 @@ impl CommandRunner {
dry_run: bool,
show_queries: bool,
ignore_errors: bool,
) -> Option<HashMap<String, String>> {
if dry_run {
if has_returning_clause(delete_query) {
info!(
"dry run delete for [{}]:\n\n{}\n[dry run: RETURNING * capture skipped]\n",
resource.name, delete_query
);
} else {
info!(
"dry run delete for [{}]:\n\n{}\n",
resource.name, delete_query
);
}
return None;
}

info!("deleting [{}]...", resource.name);
show_query(show_queries, delete_query);

if has_returning_clause(delete_query) {
let (msg, returning_row) = run_stackql_dml_returning(
delete_query,
&mut self.client,
ignore_errors,
retries,
retry_delay,
);
debug!("Delete response: {}", msg);
returning_row
} else {
let msg = run_stackql_command(
delete_query,
&mut self.client,
ignore_errors,
retries,
retry_delay,
);
debug!("Delete response: {}", msg);
None
}
}

// -----------------------------------------------------------------------
// RETURNING * capture and callback support
// -----------------------------------------------------------------------

/// Store a RETURNING * row for `resource_name` in the global context.
///
/// Flat keys (`callback.{field}`, `{resource_name}.callback.{field}`) are
/// inserted so they are accessible from subsequent template renders:
/// - The unscoped `callback.*` form is available to the resource's own
/// `.iql` templates (and is overwritten by the next DML that has
/// RETURNING *).
/// - The scoped `{resource_name}.callback.*` form is available to any
/// downstream resource (written once, never overwritten).
pub fn store_callback_data(
&mut self,
resource_name: &str,
returning_row: &HashMap<String, String>,
) {
info!(
"storing RETURNING * result for [{}] in callback context",
resource_name
);
flatten_returning_row(returning_row, resource_name, &mut self.global_context);
}

/// Execute a callback block associated with a DML operation.
///
/// 1. If `short_circuit_field` is set and the field in the current context
/// equals `short_circuit_value`, skip polling.
/// 2. Otherwise poll the callback query up to `retries` times.
/// 3. On exhaustion, call `catch_error_and_exit`.
///
/// `operation` is used only for log messages (e.g. `"create"`).
#[allow(clippy::too_many_arguments)]
pub fn run_callback(
&mut self,
resource: &Resource,
callback_query: &str,
retries: u32,
retry_delay: u32,
short_circuit_field: Option<&str>,
short_circuit_value: Option<&str>,
operation: &str,
dry_run: bool,
show_queries: bool,
) {
if dry_run {
info!(
"dry run delete for [{}]:\n\n{}\n",
resource.name, delete_query
"dry run callback ({}) for [{}]:\n\n/* callback query */\n{}\n\
[dry run: callback polling skipped]\n",
operation, resource.name, callback_query
);
return;
}

info!("deleting [{}]...", resource.name);
show_query(show_queries, delete_query);
// Short-circuit check.
if let (Some(field), Some(expected)) = (short_circuit_field, short_circuit_value) {
if check_short_circuit(&self.global_context, field, expected) {
info!(
"[{}] {} callback short-circuited (field '{}' = '{}')",
resource.name, operation, field, expected
);
return;
}
}

let msg = run_stackql_command(
delete_query,
&mut self.client,
ignore_errors,
info!("running {} callback for [{}]...", operation, resource.name);
show_query(show_queries, callback_query);

let succeeded = run_callback_poll(
&resource.name,
callback_query,
retries,
retry_delay,
&mut self.client,
);

if !succeeded {
catch_error_and_exit(&format!(
"callback timeout for [{}] {} operation after {} retries",
resource.name, operation, retries
));
}

info!(
"[{}] {} callback completed successfully",
resource.name, operation
);
debug!("Delete response: {}", msg);
}

/// Run a command-type query.
Expand Down
Loading