diff --git a/src/commands/base.rs b/src/commands/base.rs index 6feb76e..6690d72 100644 --- a/src/commands/base.rs +++ b/src/commands/base.rs @@ -17,8 +17,10 @@ use crate::core::config::{get_full_context, render_globals, render_string_value} use crate::core::env::load_env_vars; use crate::core::templating::{self, ParsedQuery}; use crate::core::utils::{ - catch_error_and_exit, check_exports_as_statecheck_proxy, export_vars, perform_retries, - pull_providers, run_ext_script, run_stackql_command, run_stackql_query, show_query, + catch_error_and_exit, check_exports_as_statecheck_proxy, check_short_circuit, export_vars, + flatten_returning_row, has_returning_clause, perform_retries, pull_providers, + run_callback_poll, run_ext_script, run_stackql_command, run_stackql_dml_returning, + run_stackql_query, show_query, }; use crate::resource::manifest::{Manifest, Resource}; use crate::resource::validation::validate_manifest; @@ -291,6 +293,9 @@ impl CommandRunner { } /// Create a resource. + /// + /// Returns `(created, returning_row)` where `returning_row` is `Some` when + /// the create query included `RETURNING *` and the provider returned data. #[allow(clippy::too_many_arguments)] pub fn create_resource( &mut self, @@ -301,30 +306,53 @@ impl CommandRunner { dry_run: bool, show_queries: bool, ignore_errors: bool, - ) -> bool { + ) -> (bool, Option>) { if dry_run { - info!( - "dry run create for [{}]:\n\n/* insert (create) query */\n{}\n", - resource.name, create_query - ); - return false; + if has_returning_clause(create_query) { + info!( + "dry run create for [{}]:\n\n/* insert (create) query with RETURNING */\n{}\n\ + [dry run: RETURNING * capture skipped]\n", + resource.name, create_query + ); + } else { + info!( + "dry run create for [{}]:\n\n/* insert (create) query */\n{}\n", + resource.name, create_query + ); + } + return (false, None); } info!("[{}] does not exist, creating...", resource.name); show_query(show_queries, create_query); - let msg = run_stackql_command( - create_query, - &mut self.client, - ignore_errors, - retries, - retry_delay, - ); - debug!("Create response: {}", msg); - true + if has_returning_clause(create_query) { + let (msg, returning_row) = run_stackql_dml_returning( + create_query, + &mut self.client, + ignore_errors, + retries, + retry_delay, + ); + debug!("Create response: {}", msg); + (true, returning_row) + } else { + let msg = run_stackql_command( + create_query, + &mut self.client, + ignore_errors, + retries, + retry_delay, + ); + debug!("Create response: {}", msg); + (true, None) + } } /// Update a resource. + /// + /// Returns `(updated, returning_row)` where `returning_row` is `Some` when + /// the update query included `RETURNING *` and the provider returned data. #[allow(clippy::too_many_arguments)] pub fn update_resource( &mut self, @@ -335,41 +363,64 @@ impl CommandRunner { dry_run: bool, show_queries: bool, ignore_errors: bool, - ) -> bool { + ) -> (bool, Option>) { match update_query { Some(query) => { if dry_run { - info!( - "dry run update for [{}]:\n\n/* update query */\n{}\n", - resource.name, query - ); - return false; + if has_returning_clause(query) { + info!( + "dry run update for [{}]:\n\n/* update query with RETURNING */\n{}\n\ + [dry run: RETURNING * capture skipped]\n", + resource.name, query + ); + } else { + info!( + "dry run update for [{}]:\n\n/* update query */\n{}\n", + resource.name, query + ); + } + return (false, None); } info!("updating [{}]...", resource.name); show_query(show_queries, query); - let msg = run_stackql_command( - query, - &mut self.client, - ignore_errors, - retries, - retry_delay, - ); - debug!("Update response: {}", msg); - true + if has_returning_clause(query) { + let (msg, returning_row) = run_stackql_dml_returning( + query, + &mut self.client, + ignore_errors, + retries, + retry_delay, + ); + debug!("Update response: {}", msg); + (true, returning_row) + } else { + let msg = run_stackql_command( + query, + &mut self.client, + ignore_errors, + retries, + retry_delay, + ); + debug!("Update response: {}", msg); + (true, None) + } } None => { info!( "Update query not configured for [{}], skipping update...", resource.name ); - false + (false, None) } } } /// Delete a resource. + /// + /// Returns `Some(first_row)` when the delete query included `RETURNING *` + /// and the provider returned data; `None` otherwise. #[allow(clippy::too_many_arguments)] pub fn delete_resource( &mut self, @@ -380,26 +431,136 @@ impl CommandRunner { dry_run: bool, show_queries: bool, ignore_errors: bool, + ) -> Option> { + if dry_run { + if has_returning_clause(delete_query) { + info!( + "dry run delete for [{}]:\n\n{}\n[dry run: RETURNING * capture skipped]\n", + resource.name, delete_query + ); + } else { + info!( + "dry run delete for [{}]:\n\n{}\n", + resource.name, delete_query + ); + } + return None; + } + + info!("deleting [{}]...", resource.name); + show_query(show_queries, delete_query); + + if has_returning_clause(delete_query) { + let (msg, returning_row) = run_stackql_dml_returning( + delete_query, + &mut self.client, + ignore_errors, + retries, + retry_delay, + ); + debug!("Delete response: {}", msg); + returning_row + } else { + let msg = run_stackql_command( + delete_query, + &mut self.client, + ignore_errors, + retries, + retry_delay, + ); + debug!("Delete response: {}", msg); + None + } + } + + // ----------------------------------------------------------------------- + // RETURNING * capture and callback support + // ----------------------------------------------------------------------- + + /// Store a RETURNING * row for `resource_name` in the global context. + /// + /// Flat keys (`callback.{field}`, `{resource_name}.callback.{field}`) are + /// inserted so they are accessible from subsequent template renders: + /// - The unscoped `callback.*` form is available to the resource's own + /// `.iql` templates (and is overwritten by the next DML that has + /// RETURNING *). + /// - The scoped `{resource_name}.callback.*` form is available to any + /// downstream resource (written once, never overwritten). + pub fn store_callback_data( + &mut self, + resource_name: &str, + returning_row: &HashMap, + ) { + info!( + "storing RETURNING * result for [{}] in callback context", + resource_name + ); + flatten_returning_row(returning_row, resource_name, &mut self.global_context); + } + + /// Execute a callback block associated with a DML operation. + /// + /// 1. If `short_circuit_field` is set and the field in the current context + /// equals `short_circuit_value`, skip polling. + /// 2. Otherwise poll the callback query up to `retries` times. + /// 3. On exhaustion, call `catch_error_and_exit`. + /// + /// `operation` is used only for log messages (e.g. `"create"`). + #[allow(clippy::too_many_arguments)] + pub fn run_callback( + &mut self, + resource: &Resource, + callback_query: &str, + retries: u32, + retry_delay: u32, + short_circuit_field: Option<&str>, + short_circuit_value: Option<&str>, + operation: &str, + dry_run: bool, + show_queries: bool, ) { if dry_run { info!( - "dry run delete for [{}]:\n\n{}\n", - resource.name, delete_query + "dry run callback ({}) for [{}]:\n\n/* callback query */\n{}\n\ + [dry run: callback polling skipped]\n", + operation, resource.name, callback_query ); return; } - info!("deleting [{}]...", resource.name); - show_query(show_queries, delete_query); + // Short-circuit check. + if let (Some(field), Some(expected)) = (short_circuit_field, short_circuit_value) { + if check_short_circuit(&self.global_context, field, expected) { + info!( + "[{}] {} callback short-circuited (field '{}' = '{}')", + resource.name, operation, field, expected + ); + return; + } + } - let msg = run_stackql_command( - delete_query, - &mut self.client, - ignore_errors, + info!("running {} callback for [{}]...", operation, resource.name); + show_query(show_queries, callback_query); + + let succeeded = run_callback_poll( + &resource.name, + callback_query, retries, retry_delay, + &mut self.client, + ); + + if !succeeded { + catch_error_and_exit(&format!( + "callback timeout for [{}] {} operation after {} retries", + resource.name, operation, retries + )); + } + + info!( + "[{}] {} callback completed successfully", + resource.name, operation ); - debug!("Delete response: {}", msg); } /// Run a command-type query. diff --git a/src/commands/build.rs b/src/commands/build.rs index 427c3c0..996fd19 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -361,7 +361,7 @@ fn run_build( let mut is_created_or_updated = false; if !resource_exists { - is_created_or_updated = runner.create_resource( + let (created, returning_row) = runner.create_resource( resource, create_query.as_ref().unwrap(), create_retries, @@ -370,10 +370,51 @@ fn run_build( show_queries, ignore_errors, ); + is_created_or_updated = created; + + // Capture RETURNING * result. + if let Some(ref row) = returning_row { + runner.store_callback_data(&resource.name, row); + } + + // Run callback:create block if present. + if is_created_or_updated { + let cb_anchor = if resource_queries.contains_key("callback:create") { + Some("callback:create") + } else if resource_queries.contains_key("callback") { + Some("callback") + } else { + None + }; + if let Some(anchor) = cb_anchor { + // Pre-extract before the mutable borrow of runner. + if let Some(q) = resource_queries.get(anchor) { + let cb_template = q.template.clone(); + let cb_retries = q.options.retries; + let cb_delay = q.options.retry_delay; + let cb_sc_field = q.options.short_circuit_field.clone(); + let cb_sc_value = q.options.short_circuit_value.clone(); + let cb_ctx = runner.get_full_context(resource); + let rendered_cb = + runner.render_query(&resource.name, anchor, &cb_template, &cb_ctx); + runner.run_callback( + resource, + &rendered_cb, + cb_retries, + cb_delay, + cb_sc_field.as_deref(), + cb_sc_value.as_deref(), + "create", + dry_run, + show_queries, + ); + } + } + } } if resource_exists && !is_correct_state { - is_created_or_updated = runner.update_resource( + let (updated, returning_row) = runner.update_resource( resource, update_query.as_deref(), update_retries, @@ -382,6 +423,46 @@ fn run_build( show_queries, ignore_errors, ); + is_created_or_updated = updated; + + // Capture RETURNING * result. + if let Some(ref row) = returning_row { + runner.store_callback_data(&resource.name, row); + } + + // Run callback:update block if present. + if is_created_or_updated { + let cb_anchor = if resource_queries.contains_key("callback:update") { + Some("callback:update") + } else if resource_queries.contains_key("callback") { + Some("callback") + } else { + None + }; + if let Some(anchor) = cb_anchor { + if let Some(q) = resource_queries.get(anchor) { + let cb_template = q.template.clone(); + let cb_retries = q.options.retries; + let cb_delay = q.options.retry_delay; + let cb_sc_field = q.options.short_circuit_field.clone(); + let cb_sc_value = q.options.short_circuit_value.clone(); + let cb_ctx = runner.get_full_context(resource); + let rendered_cb = + runner.render_query(&resource.name, anchor, &cb_template, &cb_ctx); + runner.run_callback( + resource, + &rendered_cb, + cb_retries, + cb_delay, + cb_sc_field.as_deref(), + cb_sc_value.as_deref(), + "update", + dry_run, + show_queries, + ); + } + } + } } // Post-deploy state check diff --git a/src/commands/teardown.rs b/src/commands/teardown.rs index b756b34..0e2d536 100644 --- a/src/commands/teardown.rs +++ b/src/commands/teardown.rs @@ -268,7 +268,7 @@ fn run_teardown(runner: &mut CommandRunner, dry_run: bool, show_queries: bool, _ // Delete if resource_exists { - runner.delete_resource( + let returning_row = runner.delete_resource( resource, &delete_query, delete_retries, @@ -277,6 +277,43 @@ fn run_teardown(runner: &mut CommandRunner, dry_run: bool, show_queries: bool, _ show_queries, ignore_errors, ); + + // Capture RETURNING * result. + if let Some(ref row) = returning_row { + runner.store_callback_data(&resource.name, row); + } + + // Run callback:delete block if present. + let cb_anchor = if resource_queries.contains_key("callback:delete") { + Some("callback:delete") + } else if resource_queries.contains_key("callback") { + Some("callback") + } else { + None + }; + if let Some(anchor) = cb_anchor { + if let Some(q) = resource_queries.get(anchor) { + let cb_template = q.template.clone(); + let cb_retries = q.options.retries; + let cb_delay = q.options.retry_delay; + let cb_sc_field = q.options.short_circuit_field.clone(); + let cb_sc_value = q.options.short_circuit_value.clone(); + let cb_ctx = runner.get_full_context(resource); + let rendered_cb = + runner.render_query(&resource.name, anchor, &cb_template, &cb_ctx); + runner.run_callback( + resource, + &rendered_cb, + cb_retries, + cb_delay, + cb_sc_field.as_deref(), + cb_sc_value.as_deref(), + "delete", + dry_run, + show_queries, + ); + } + } } else { info!( "resource [{}] does not exist, skipping delete", diff --git a/src/core/templating.rs b/src/core/templating.rs index d7a33c1..39ac506 100644 --- a/src/core/templating.rs +++ b/src/core/templating.rs @@ -37,34 +37,52 @@ pub struct QueryOptions { pub retry_delay: u32, pub postdelete_retries: u32, pub postdelete_retry_delay: u32, + /// Dot-path into the RETURNING * result to check before polling + /// (e.g. `"ProgressEvent.OperationStatus"`). Only used on `callback` + /// anchors. + pub short_circuit_field: Option, + /// Value of `short_circuit_field` that means polling can be skipped. + /// Only used on `callback` anchors. + pub short_circuit_value: Option, } -/// Parse an anchor line to extract key and options. -/// Matches Python's `parse_anchor`. -fn parse_anchor(anchor: &str) -> (String, HashMap) { +/// Parse an anchor line to extract key, numeric options, and string options. +/// Matches Python's `parse_anchor`, extended for callback string params. +/// +/// Returns `(key, uint_options, str_options)`. Numeric-valued params go into +/// `uint_options`; all other params (e.g. `short_circuit_field`, +/// `short_circuit_value`) go into `str_options`. +fn parse_anchor(anchor: &str) -> (String, HashMap, HashMap) { let parts: Vec<&str> = anchor.split(',').collect(); let key = parts[0].trim().to_lowercase(); - let mut options = HashMap::new(); + let mut uint_options: HashMap = HashMap::new(); + let mut str_options: HashMap = HashMap::new(); for part in &parts[1..] { if let Some((option_key, option_value)) = part.split_once('=') { - if let Ok(value) = option_value.trim().parse::() { - options.insert(option_key.trim().to_string(), value); + let k = option_key.trim().to_string(); + let v = option_value.trim().to_string(); + if let Ok(uint_val) = v.parse::() { + uint_options.insert(k, uint_val); + } else { + str_options.insert(k, v); } } } - (key, options) + (key, uint_options, str_options) } -/// Load SQL queries from a .iql file, split by anchors. -/// Matches Python's `load_sql_queries`. -fn load_sql_queries( - file_path: &Path, -) -> ( +/// Return type of `load_sql_queries`: (templates, uint_options, str_options). +type SqlQueriesResult = ( HashMap, HashMap>, -) { + HashMap>, +); + +/// Load SQL queries from a .iql file, split by anchors. +/// Matches Python's `load_sql_queries`. +fn load_sql_queries(file_path: &Path) -> SqlQueriesResult { let content = match fs::read_to_string(file_path) { Ok(c) => c, Err(e) => { @@ -74,7 +92,8 @@ fn load_sql_queries( }; let mut queries: HashMap = HashMap::new(); - let mut options: HashMap> = HashMap::new(); + let mut uint_options: HashMap> = HashMap::new(); + let mut str_options: HashMap> = HashMap::new(); let mut current_anchor: Option = None; let mut query_buffer: Vec = Vec::new(); @@ -83,12 +102,13 @@ fn load_sql_queries( // Store the current query under the last anchor if let Some(ref anchor) = current_anchor { if !query_buffer.is_empty() { - let (anchor_key, anchor_options) = parse_anchor(anchor); + let (anchor_key, anchor_uint_opts, anchor_str_opts) = parse_anchor(anchor); queries.insert( anchor_key.clone(), query_buffer.join("\n").trim().to_string(), ); - options.insert(anchor_key, anchor_options); + uint_options.insert(anchor_key.clone(), anchor_uint_opts); + str_options.insert(anchor_key, anchor_str_opts); query_buffer.clear(); } } @@ -104,16 +124,17 @@ fn load_sql_queries( // Store the last query if let Some(ref anchor) = current_anchor { if !query_buffer.is_empty() { - let (anchor_key, anchor_options) = parse_anchor(anchor); + let (anchor_key, anchor_uint_opts, anchor_str_opts) = parse_anchor(anchor); queries.insert( anchor_key.clone(), query_buffer.join("\n").trim().to_string(), ); - options.insert(anchor_key, anchor_options); + uint_options.insert(anchor_key.clone(), anchor_uint_opts); + str_options.insert(anchor_key, anchor_str_opts); } } - (queries, options) + (queries, uint_options, str_options) } /// Pre-process Jinja2 inline dict expressions that Tera doesn't support. @@ -270,8 +291,16 @@ pub fn render_query( res_name, anchor, template ); + let expanded = match preprocess_this_prefix(template, res_name) { + Ok(t) => t, + Err(e) => { + error!("[{}] [{}] {}", res_name, anchor, e); + process::exit(1); + } + }; + let mut ctx = temp_context; - let compat_query = preprocess_jinja2_compat(template); + let compat_query = preprocess_jinja2_compat(&expanded); let processed_query = preprocess_inline_dicts(&compat_query, &mut ctx); let template_name = format!("{}__{}", res_name, anchor); @@ -328,6 +357,10 @@ pub fn render_query( /// Templates are NOT rendered here — rendering is deferred to when /// each query is actually needed (JIT rendering). /// Matches Python's `get_queries`. +/// +/// Callback anchors (e.g. `callback:create`, `callback:delete`) are stored +/// under the key `"callback:create"`, `"callback:delete"`, etc. A bare +/// `callback` anchor (no operation qualifier) is stored under `"callback"`. pub fn get_queries( _engine: &TemplateEngine, stack_dir: &str, @@ -349,27 +382,32 @@ pub fn get_queries( process::exit(1); } - let (query_templates, query_options) = load_sql_queries(&template_path); + let (query_templates, query_uint_options, query_str_options) = load_sql_queries(&template_path); for (anchor, template) in &query_templates { - // Fix backward compatibility for preflight and postdeploy + // Fix backward compatibility for preflight and postdeploy. + // Callback anchors (callback:create, callback:delete, callback:update, + // callback) are passed through unchanged. let normalized_anchor = match anchor.as_str() { "preflight" => "exists".to_string(), "postdeploy" => "statecheck".to_string(), other => other.to_string(), }; - let opts = query_options.get(anchor).cloned().unwrap_or_default(); + let uint_opts = query_uint_options.get(anchor).cloned().unwrap_or_default(); + let str_opts = query_str_options.get(anchor).cloned().unwrap_or_default(); result.insert( normalized_anchor.clone(), ParsedQuery { template: template.clone(), options: QueryOptions { - retries: *opts.get("retries").unwrap_or(&1), - retry_delay: *opts.get("retry_delay").unwrap_or(&0), - postdelete_retries: *opts.get("postdelete_retries").unwrap_or(&10), - postdelete_retry_delay: *opts.get("postdelete_retry_delay").unwrap_or(&5), + retries: *uint_opts.get("retries").unwrap_or(&1), + retry_delay: *uint_opts.get("retry_delay").unwrap_or(&0), + postdelete_retries: *uint_opts.get("postdelete_retries").unwrap_or(&10), + postdelete_retry_delay: *uint_opts.get("postdelete_retry_delay").unwrap_or(&5), + short_circuit_field: str_opts.get("short_circuit_field").cloned(), + short_circuit_value: str_opts.get("short_circuit_value").cloned(), }, }, ); @@ -383,6 +421,46 @@ pub fn get_queries( result } +/// Pre-process `this.` prefix inside Tera template blocks. +/// +/// Within every `{{ ... }}` and `{% ... %}` block, replaces `this.` with +/// `{resource_name}.`, allowing resource-scoped variables to be referenced +/// unambiguously inside a resource's own `.iql` file. +/// +/// Returns `Err` with a diagnostic if `this.` appears but `resource_name` +/// is empty (i.e. no active resource context, such as a global template). +pub fn preprocess_this_prefix(template: &str, resource_name: &str) -> Result { + if !template.contains("this.") { + return Ok(template.to_string()); + } + + if resource_name.is_empty() { + return Err( + "Template uses 'this.' prefix but no resource context is active; \ + 'this.' is only valid inside a resource's .iql file." + .to_string(), + ); + } + + let replacement = format!("{}.", resource_name); + + // Replace 'this.' with '{resource_name}.' inside {{ ... }} blocks. + let var_re = Regex::new(r"(?s)\{\{(.*?)\}\}").unwrap(); + let with_vars = var_re.replace_all(template, |caps: ®ex::Captures| { + let inner = caps[1].replace("this.", &replacement); + format!("{{{{{}}}}}", inner) + }); + + // Also handle {% ... %} tag blocks (conditionals, loops). + let tag_re = Regex::new(r"(?s)\{%(.*?)%\}").unwrap(); + let with_tags = tag_re.replace_all(&with_vars, |caps: ®ex::Captures| { + let inner = caps[1].replace("this.", &replacement); + format!("{{%{}%}}", inner) + }); + + Ok(with_tags.to_string()) +} + /// Render an inline SQL template string. /// Matches Python's `render_inline_template`. pub fn render_inline_template( @@ -397,7 +475,16 @@ pub fn render_inline_template( ); let mut temp_context = prepare_query_context(full_context); - let compat = preprocess_jinja2_compat(template_string); + + let expanded = match preprocess_this_prefix(template_string, resource_name) { + Ok(t) => t, + Err(e) => { + error!("[{}] inline template: {}", resource_name, e); + process::exit(1); + } + }; + + let compat = preprocess_jinja2_compat(&expanded); let processed = preprocess_inline_dicts(&compat, &mut temp_context); let template_name = format!("{}__inline", resource_name); @@ -447,3 +534,176 @@ pub fn render_inline_template( } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::template::engine::TemplateEngine; + + // ── preprocess_this_prefix unit tests ───────────────────────────────── + + #[test] + fn test_preprocess_this_prefix_basic_rewrite() { + let result = preprocess_this_prefix("{{ this.fred }}", "resource_name_x").unwrap(); + assert_eq!(result, "{{ resource_name_x.fred }}"); + } + + #[test] + fn test_preprocess_this_prefix_noop_when_no_this() { + let template = "{{ fred }}"; + let result = preprocess_this_prefix(template, "resource_name_x").unwrap(); + assert_eq!( + result, template, + "template without 'this.' should be unchanged" + ); + } + + #[test] + fn test_preprocess_this_prefix_error_when_no_resource_name() { + let result = preprocess_this_prefix("{{ this.fred }}", ""); + assert!(result.is_err(), "empty resource_name should return Err"); + let msg = result.unwrap_err(); + assert!( + msg.contains("this.") || msg.contains("resource context"), + "error message should mention 'this.' or resource context, got: {}", + msg + ); + } + + #[test] + fn test_preprocess_this_prefix_multiple_occurrences() { + let template = "{{ this.a }} and {{ this.b }}"; + let result = preprocess_this_prefix(template, "my_res").unwrap(); + assert_eq!(result, "{{ my_res.a }} and {{ my_res.b }}"); + } + + #[test] + fn test_preprocess_this_prefix_deep_path() { + let template = "{{ this.callback.ProgressEvent.RequestToken }}"; + let result = preprocess_this_prefix(template, "resource_name_x").unwrap(); + assert_eq!( + result, + "{{ resource_name_x.callback.ProgressEvent.RequestToken }}" + ); + } + + #[test] + fn test_preprocess_this_prefix_in_tag_block() { + let template = "{% if this.flag %}yes{% endif %}"; + let result = preprocess_this_prefix(template, "res").unwrap(); + assert_eq!(result, "{% if res.flag %}yes{% endif %}"); + } + + #[test] + fn test_preprocess_this_prefix_with_filter() { + let template = "{{ this.tags | from_json }}"; + let result = preprocess_this_prefix(template, "my_vpc").unwrap(); + assert_eq!(result, "{{ my_vpc.tags | from_json }}"); + } + + // ── End-to-end rendering tests via TemplateEngine ───────────────────── + + #[test] + fn test_this_resolves_resource_scoped_over_global() { + // When both a global 'fred' and a resource-scoped 'resource_name_x.fred' + // exist, {{ this.fred }} must resolve to the resource-scoped value. + let engine = TemplateEngine::new(); + let mut context = std::collections::HashMap::new(); + context.insert("fred".to_string(), "global_fred".to_string()); + context.insert( + "resource_name_x.fred".to_string(), + "scoped_fred".to_string(), + ); + + let expanded = preprocess_this_prefix("{{ this.fred }}", "resource_name_x").unwrap(); + let result = engine + .render_with_filters("t", &expanded, &context) + .unwrap(); + assert_eq!( + result, "scoped_fred", + "this.fred should resolve to the resource-scoped value, not the global" + ); + } + + #[test] + fn test_this_resolves_when_only_resource_scoped_exists() { + // No global 'fred' - only the resource-scoped one. + let engine = TemplateEngine::new(); + let mut context = std::collections::HashMap::new(); + context.insert( + "resource_name_x.fred".to_string(), + "scoped_only".to_string(), + ); + + let expanded = preprocess_this_prefix("{{ this.fred }}", "resource_name_x").unwrap(); + let result = engine + .render_with_filters("t", &expanded, &context) + .unwrap(); + assert_eq!(result, "scoped_only"); + } + + #[test] + fn test_this_errors_when_only_global_exists_not_resource_scoped() { + // this.fred expands to resource_name_x.fred; if only a global 'fred' + // exists the render should fail rather than silently using the global. + let engine = TemplateEngine::new(); + let mut context = std::collections::HashMap::new(); + context.insert("fred".to_string(), "global_fred".to_string()); + // No resource_name_x.fred in context + + let expanded = preprocess_this_prefix("{{ this.fred }}", "resource_name_x").unwrap(); + let result = engine.render_with_filters("t", &expanded, &context); + assert!( + result.is_err(), + "this.fred should error when resource_name_x.fred is not in context" + ); + } + + #[test] + fn test_this_callback_resolves_same_as_scoped_and_shorthand() { + // {{ this.callback.ProgressEvent.RequestToken }} inside resource_name_x + // should resolve identically to: + // {{ resource_name_x.callback.ProgressEvent.RequestToken }} (explicit) + // {{ callback.ProgressEvent.RequestToken }} (shorthand) + let engine = TemplateEngine::new(); + let mut context = std::collections::HashMap::new(); + context.insert( + "resource_name_x.callback.ProgressEvent.RequestToken".to_string(), + "token-abc".to_string(), + ); + context.insert( + "callback.ProgressEvent.RequestToken".to_string(), + "token-abc".to_string(), + ); + + let expanded = preprocess_this_prefix( + "{{ this.callback.ProgressEvent.RequestToken }}", + "resource_name_x", + ) + .unwrap(); + + let via_this = engine + .render_with_filters("t1", &expanded, &context) + .unwrap(); + let via_explicit = engine + .render_with_filters( + "t2", + "{{ resource_name_x.callback.ProgressEvent.RequestToken }}", + &context, + ) + .unwrap(); + let via_shorthand = engine + .render_with_filters("t3", "{{ callback.ProgressEvent.RequestToken }}", &context) + .unwrap(); + + assert_eq!(via_this, "token-abc"); + assert_eq!( + via_this, via_explicit, + "this.callback should equal resource_name_x.callback" + ); + assert_eq!( + via_this, via_shorthand, + "this.callback should equal shorthand callback" + ); + } +} diff --git a/src/core/utils.rs b/src/core/utils.rs index 69065dd..0e2d6f0 100644 --- a/src/core/utils.rs +++ b/src/core/utils.rs @@ -606,6 +606,297 @@ pub fn run_ext_script( } } +// --------------------------------------------------------------------------- +// RETURNING * capture helpers +// --------------------------------------------------------------------------- + +/// Return `true` if the rendered query string contains a `RETURNING` clause. +/// Case-insensitive match; used to decide whether to capture a DML result. +pub fn has_returning_clause(query: &str) -> bool { + query.to_uppercase().contains("RETURNING") +} + +/// Execute a DML command (INSERT / UPDATE / DELETE), optionally capturing +/// the `RETURNING *` result as the first row. +/// +/// Returns `(command_message, Option)`. When the DML includes +/// `RETURNING *` and the provider returns rows, the first row is captured. +/// If no rows are returned (provider returned no body), `None` is returned – +/// this is **not** an error. +pub fn run_stackql_dml_returning( + command: &str, + client: &mut PgwireLite, + ignore_errors: bool, + retries: u32, + retry_delay: u32, +) -> (String, Option>) { + let mut attempt = 0u32; + + while attempt <= retries { + debug!( + "Executing stackql DML (attempt {}):\n\n{}\n", + attempt + 1, + command + ); + + match execute_query(command, client) { + Ok(result) => match result { + QueryResult::Data { + columns, + rows, + notices, + } => { + // Check for errors in notices before accepting the result. + let mut error_noticed = false; + for notice in ¬ices { + if error_detected_in_notice(notice) && !ignore_errors { + if attempt < retries { + warn!( + "DML error in notice, retrying in {} seconds (attempt {} of {})...", + retry_delay, attempt + 1, retries + 1 + ); + thread::sleep(Duration::from_secs(retry_delay as u64)); + attempt += 1; + error_noticed = true; + break; + } else { + catch_error_and_exit(&format!( + "Error during stackql DML execution:\n\n{}\n", + notice + )); + } + } + } + if error_noticed { + continue; + } + + // Capture RETURNING * first row (if any). + let first_row = if !rows.is_empty() { + let col_names: Vec = + columns.iter().map(|c| c.name.clone()).collect(); + let row = &rows[0]; + let mut map = HashMap::new(); + for (i, col_name) in col_names.iter().enumerate() { + let value = row.values.get(i).cloned().unwrap_or_default(); + map.insert(col_name.clone(), value); + } + Some(map) + } else { + None + }; + + let msg = notices.join("\n"); + return (msg, first_row); + } + QueryResult::Command(msg) => { + return (msg, None); + } + QueryResult::Empty => { + return (String::new(), None); + } + }, + Err(e) => { + if !ignore_errors { + if attempt < retries { + warn!( + "DML failed, retrying in {} seconds (attempt {} of {})...", + retry_delay, + attempt + 1, + retries + 1 + ); + thread::sleep(Duration::from_secs(retry_delay as u64)); + attempt += 1; + continue; + } + catch_error_and_exit(&format!( + "Exception during stackql DML execution:\n\n{}\n", + e + )); + } else { + debug!("DML failed (ignored): {}", e); + return (String::new(), None); + } + } + } + } + + (String::new(), None) +} + +/// Flatten a single RETURNING * row into dotted context keys and insert them +/// into `context`. +/// +/// For each column `col` in `row`: +/// - `callback.{col}` is set (shorthand for the current resource's own `.iql` +/// templates). +/// - `{resource_name}.callback.{col}` is set (fully-qualified key accessible +/// by downstream resources). +/// +/// If a column value is a JSON object it is recursively expanded: +/// `"ProgressEvent" = {"OperationStatus":"SUCCESS","RequestToken":"abc"}` +/// produces: +/// ```text +/// callback.ProgressEvent.OperationStatus = SUCCESS +/// callback.ProgressEvent.RequestToken = abc +/// ``` +pub fn flatten_returning_row( + row: &HashMap, + resource_name: &str, + context: &mut HashMap, +) { + for (col, val) in row { + let short_prefix = format!("callback.{}", col); + let full_prefix = format!("{}.callback.{}", resource_name, col); + flatten_value_into_context(&short_prefix, &full_prefix, val, context); + } +} + +/// Recursively expand a string value (possibly JSON) into dotted context keys. +fn flatten_value_into_context( + short_prefix: &str, + full_prefix: &str, + value: &str, + context: &mut HashMap, +) { + if let Ok(json) = serde_json::from_str::(value) { + if json.is_object() { + flatten_json_into_context(short_prefix, full_prefix, &json, context); + return; + } + } + context.insert(short_prefix.to_string(), value.to_string()); + context.insert(full_prefix.to_string(), value.to_string()); +} + +fn flatten_json_into_context( + short_prefix: &str, + full_prefix: &str, + value: &serde_json::Value, + context: &mut HashMap, +) { + match value { + serde_json::Value::Object(map) => { + for (k, v) in map { + let new_short = format!("{}.{}", short_prefix, k); + let new_full = format!("{}.{}", full_prefix, k); + flatten_json_into_context(&new_short, &new_full, v, context); + } + } + serde_json::Value::String(s) => { + context.insert(short_prefix.to_string(), s.clone()); + context.insert(full_prefix.to_string(), s.clone()); + } + other => { + let s = other.to_string(); + context.insert(short_prefix.to_string(), s.clone()); + context.insert(full_prefix.to_string(), s); + } + } +} + +/// Check whether a short-circuit condition is met using already-captured +/// callback data. +/// +/// `field` is a dot-path into the captured result (e.g. +/// `"ProgressEvent.OperationStatus"`), looked up as `callback.{field}` in +/// `context`. Returns `true` if the value equals `expected_value`. +/// Returns `false` (no short-circuit) if the field is absent. +pub fn check_short_circuit( + context: &HashMap, + field: &str, + expected_value: &str, +) -> bool { + let lookup_key = format!("callback.{}", field); + match context.get(&lookup_key) { + Some(val) => { + let result = val == expected_value; + if result { + info!( + "short-circuit condition met: {} = {} (skipping callback poll)", + lookup_key, expected_value + ); + } + result + } + None => { + debug!( + "short-circuit field '{}' not found in context, proceeding with callback poll", + lookup_key + ); + false + } + } +} + +/// Poll a callback query until the `success` (or `count`) column returns a +/// truthy value, or `retries` are exhausted. +/// +/// Returns `true` on success, `false` when retries are exhausted (the caller +/// is responsible for treating exhaustion as an error). +pub fn run_callback_poll( + resource_name: &str, + query: &str, + retries: u32, + retry_delay: u32, + client: &mut PgwireLite, +) -> bool { + let mut attempt = 0u32; + + while attempt <= retries { + debug!( + "Callback poll for [{}] attempt {}:\n\n{}\n", + resource_name, + attempt + 1, + query + ); + + let result = run_stackql_query(query, client, true, 0, 0); + + if !result.is_empty() { + let row = &result[0]; + + // Check `success` column (primary). + if let Some(success_val) = row.get("success") { + if success_val == "1" || success_val.to_lowercase() == "true" { + info!( + "[{}] callback poll succeeded on attempt {}", + resource_name, + attempt + 1 + ); + return true; + } + } + + // Check `count` column (alternative). + if let Some(count_val) = row.get("count") { + if count_val == "1" { + info!( + "[{}] callback poll succeeded (count=1) on attempt {}", + resource_name, + attempt + 1 + ); + return true; + } + } + } + + if attempt < retries { + info!( + "[{}] callback poll attempt {}/{}: retrying in {} seconds...", + resource_name, + attempt + 1, + retries + 1, + retry_delay + ); + thread::sleep(Duration::from_secs(retry_delay as u64)); + } + attempt += 1; + } + + false +} + #[cfg(test)] mod tests { use super::*; @@ -698,4 +989,143 @@ mod tests { Some("super-secret"), ); } + + // ------------------------------------------------------------------ + // has_returning_clause + // ------------------------------------------------------------------ + + #[test] + fn test_has_returning_clause_positive() { + assert!(has_returning_clause( + "INSERT INTO awscc.s3.buckets(BucketName, region) SELECT 'my-bucket', 'us-east-1' RETURNING *" + )); + } + + #[test] + fn test_has_returning_clause_case_insensitive() { + assert!(has_returning_clause("DELETE FROM t WHERE id=1 returning *")); + } + + #[test] + fn test_has_returning_clause_negative() { + assert!(!has_returning_clause("INSERT INTO t(col) SELECT 'val'")); + } + + // ------------------------------------------------------------------ + // flatten_returning_row + // ------------------------------------------------------------------ + + #[test] + fn test_flatten_returning_row_simple_string_values() { + let mut row = HashMap::new(); + row.insert("RequestToken".to_string(), "tok-123".to_string()); + row.insert("OperationStatus".to_string(), "SUCCESS".to_string()); + + let mut ctx: HashMap = HashMap::new(); + flatten_returning_row(&row, "my_resource", &mut ctx); + + assert_eq!( + ctx.get("callback.RequestToken").map(|s| s.as_str()), + Some("tok-123") + ); + assert_eq!( + ctx.get("my_resource.callback.RequestToken") + .map(|s| s.as_str()), + Some("tok-123") + ); + assert_eq!( + ctx.get("callback.OperationStatus").map(|s| s.as_str()), + Some("SUCCESS") + ); + assert_eq!( + ctx.get("my_resource.callback.OperationStatus") + .map(|s| s.as_str()), + Some("SUCCESS") + ); + } + + #[test] + fn test_flatten_returning_row_nested_json() { + // Provider returns ProgressEvent as a JSON object string. + let mut row = HashMap::new(); + row.insert( + "ProgressEvent".to_string(), + r#"{"OperationStatus":"SUCCESS","RequestToken":"abc"}"#.to_string(), + ); + + let mut ctx: HashMap = HashMap::new(); + flatten_returning_row(&row, "aws_s3_bucket", &mut ctx); + + assert_eq!( + ctx.get("callback.ProgressEvent.OperationStatus") + .map(|s| s.as_str()), + Some("SUCCESS") + ); + assert_eq!( + ctx.get("callback.ProgressEvent.RequestToken") + .map(|s| s.as_str()), + Some("abc") + ); + assert_eq!( + ctx.get("aws_s3_bucket.callback.ProgressEvent.OperationStatus") + .map(|s| s.as_str()), + Some("SUCCESS") + ); + assert_eq!( + ctx.get("aws_s3_bucket.callback.ProgressEvent.RequestToken") + .map(|s| s.as_str()), + Some("abc") + ); + } + + #[test] + fn test_flatten_returning_row_empty_row_is_noop() { + let row: HashMap = HashMap::new(); + let mut ctx: HashMap = HashMap::new(); + flatten_returning_row(&row, "res", &mut ctx); + assert!(ctx.is_empty()); + } + + // ------------------------------------------------------------------ + // check_short_circuit + // ------------------------------------------------------------------ + + #[test] + fn test_check_short_circuit_matches() { + let mut ctx: HashMap = HashMap::new(); + ctx.insert( + "callback.ProgressEvent.OperationStatus".to_string(), + "SUCCESS".to_string(), + ); + assert!(check_short_circuit( + &ctx, + "ProgressEvent.OperationStatus", + "SUCCESS" + )); + } + + #[test] + fn test_check_short_circuit_no_match() { + let mut ctx: HashMap = HashMap::new(); + ctx.insert( + "callback.ProgressEvent.OperationStatus".to_string(), + "IN_PROGRESS".to_string(), + ); + assert!(!check_short_circuit( + &ctx, + "ProgressEvent.OperationStatus", + "SUCCESS" + )); + } + + #[test] + fn test_check_short_circuit_missing_field() { + let ctx: HashMap = HashMap::new(); + // Field not present in context → no short-circuit. + assert!(!check_short_circuit( + &ctx, + "ProgressEvent.OperationStatus", + "SUCCESS" + )); + } } diff --git a/src/template/engine.rs b/src/template/engine.rs index 6aa660e..7c30aa3 100644 --- a/src/template/engine.rs +++ b/src/template/engine.rs @@ -159,42 +159,66 @@ fn full_error_chain(err: &dyn StdError) -> String { /// Build a Tera context from a flat `HashMap`. /// -/// Keys that contain a `.` (e.g. `"resource_name.var"`) are grouped into -/// nested objects so that Tera's property-access syntax works: +/// Keys that contain `.` (e.g. `"resource_name.var"` or +/// `"resource.callback.Field.SubField"`) are recursively expanded into +/// nested JSON objects so that Tera's property-access syntax works at any +/// depth: /// /// ```text -/// context["my_vpc.vpc_id"] = "vpc-123" +/// context["my_vpc.vpc_id"] = "vpc-123" +/// context["res.callback.ProgressEvent.RequestToken"] = "abc" /// ↓ -/// Tera context: { my_vpc: { vpc_id: "vpc-123" }, ... } -/// → {{ my_vpc.vpc_id }} renders as "vpc-123" +/// Tera context: { +/// my_vpc: { vpc_id: "vpc-123" }, +/// res: { callback: { ProgressEvent: { RequestToken: "abc" } } } +/// } +/// → {{ my_vpc.vpc_id }} renders as "vpc-123" +/// → {{ res.callback.ProgressEvent.RequestToken }} renders as "abc" /// ``` /// -/// Non-dotted keys are inserted as top-level strings as before. +/// Non-dotted keys are inserted as top-level strings. fn build_tera_context(context: &HashMap) -> TeraContext { let mut tera_context = TeraContext::new(); - - // Collect dotted keys grouped by prefix - let mut nested: HashMap> = HashMap::new(); + let mut root: serde_json::Map = serde_json::Map::new(); for (key, value) in context { - if let Some((prefix, suffix)) = key.split_once('.') { - nested - .entry(prefix.to_string()) - .or_default() - .insert(suffix.to_string(), value.clone()); - } else { - tera_context.insert(key, value); - } + insert_nested_key(&mut root, key, value); } - // Insert each prefix group as a nested object - for (prefix, map) in &nested { - tera_context.insert(prefix, map); + for (key, val) in &root { + tera_context.insert(key, val); } tera_context } +/// Recursively insert a dotted key into a JSON object tree. +/// +/// `"a.b.c"` with value `"v"` produces `{ a: { b: { c: "v" } } }`. +/// If an intermediate node already exists as a string (leaf), it is +/// left unchanged and the deeper insertion is skipped to avoid +/// overwriting with an object. +fn insert_nested_key(map: &mut serde_json::Map, key: &str, value: &str) { + match key.split_once('.') { + None => { + // Leaf – only write if the slot is not already a nested object. + // This prevents a dotted key from being overwritten by a later + // scalar with the same prefix. + map.entry(key.to_string()) + .or_insert_with(|| serde_json::Value::String(value.to_string())); + } + Some((prefix, suffix)) => { + let entry = map + .entry(prefix.to_string()) + .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new())); + if let serde_json::Value::Object(nested) = entry { + insert_nested_key(nested, suffix, value); + } + // If the entry is already a scalar, we cannot nest further – skip. + } + } +} + /// Register all custom Jinja2 filters matching the Python implementation. fn register_custom_filters(tera: &mut Tera) { tera.register_filter("from_json", filter_from_json); @@ -510,4 +534,73 @@ mod tests { .unwrap(); assert_eq!(result, "val_a-val_b"); } + + #[test] + fn test_three_level_dotted_key() { + // Validates that callback.ProgressEvent.RequestToken style keys work. + let engine = TemplateEngine::new(); + let mut context = HashMap::new(); + context.insert( + "callback.ProgressEvent.RequestToken".to_string(), + "token-123".to_string(), + ); + context.insert( + "callback.ProgressEvent.OperationStatus".to_string(), + "SUCCESS".to_string(), + ); + + let result = engine + .render_with_filters("t", "{{ callback.ProgressEvent.RequestToken }}", &context) + .unwrap(); + assert_eq!(result, "token-123"); + + let result2 = engine + .render_with_filters( + "t2", + "{{ callback.ProgressEvent.OperationStatus }}", + &context, + ) + .unwrap(); + assert_eq!(result2, "SUCCESS"); + } + + #[test] + fn test_four_level_dotted_key_with_resource_prefix() { + // Validates resource-scoped callback access: + // {{ aws_s3_bucket.callback.ProgressEvent.RequestToken }} + let engine = TemplateEngine::new(); + let mut context = HashMap::new(); + context.insert( + "aws_s3_bucket.callback.ProgressEvent.RequestToken".to_string(), + "req-abc".to_string(), + ); + + let result = engine + .render_with_filters( + "t", + "{{ aws_s3_bucket.callback.ProgressEvent.RequestToken }}", + &context, + ) + .unwrap(); + assert_eq!(result, "req-abc"); + } + + #[test] + fn test_mixed_depth_keys_same_prefix() { + // A prefix can have both shallow and deep dotted children. + let engine = TemplateEngine::new(); + let mut context = HashMap::new(); + context.insert("res.export_var".to_string(), "exported".to_string()); + context.insert("res.callback.Field.Sub".to_string(), "deep_val".to_string()); + + let result = engine + .render_with_filters("t", "{{ res.export_var }}", &context) + .unwrap(); + assert_eq!(result, "exported"); + + let result2 = engine + .render_with_filters("t2", "{{ res.callback.Field.Sub }}", &context) + .unwrap(); + assert_eq!(result2, "deep_val"); + } } diff --git a/src/utils/server.rs b/src/utils/server.rs index 1fb15ac..db5d38e 100644 --- a/src/utils/server.rs +++ b/src/utils/server.rs @@ -74,7 +74,15 @@ pub struct RunningServer { /// Check if the stackql server is running on a specific port pub fn is_server_running(port: u16) -> bool { let servers = find_all_running_servers(); - debug!("is_server_running({}): found {} candidate server(s): {:?}", port, servers.len(), servers.iter().map(|s| format!("pid={} port={}", s.pid, s.port)).collect::>()); + debug!( + "is_server_running({}): found {} candidate server(s): {:?}", + port, + servers.len(), + servers + .iter() + .map(|s| format!("pid={} port={}", s.pid, s.port)) + .collect::>() + ); let result = servers.iter().any(|server| server.port == port); debug!("is_server_running({}) -> {}", port, result); result @@ -110,20 +118,37 @@ pub fn find_all_running_servers() -> Vec { if !output.stdout.is_empty() { let pids_str = String::from_utf8_lossy(&output.stdout).to_string(); let pids = pids_str.trim().split('\n').collect::>(); - debug!("find_all_running_servers: pgrep found {} PID(s): {:?}", pids.len(), pids); + debug!( + "find_all_running_servers: pgrep found {} PID(s): {:?}", + pids.len(), + pids + ); for pid_str in pids { if let Ok(pid) = pid_str.trim().parse::() { // Log the full command line of this PID before attempting port extraction - if let Ok(ps_out) = ProcessCommand::new("ps").arg("-p").arg(pid.to_string()).arg("-o").arg("args").output() { + if let Ok(ps_out) = ProcessCommand::new("ps") + .arg("-p") + .arg(pid.to_string()) + .arg("-o") + .arg("args") + .output() + { let cmdline = String::from_utf8_lossy(&ps_out.stdout); - debug!("find_all_running_servers: PID {} cmdline: {}", pid, cmdline.trim()); + debug!( + "find_all_running_servers: PID {} cmdline: {}", + pid, + cmdline.trim() + ); } if let Some(port) = extract_port_from_ps(pid_str) { debug!("find_all_running_servers: PID {} -> port {}", pid, port); running_servers.push(RunningServer { pid, port }); } else { - debug!("find_all_running_servers: PID {} -> no --pgsrv.port found, skipping", pid); + debug!( + "find_all_running_servers: PID {} -> no --pgsrv.port found, skipping", + pid + ); } } } @@ -217,7 +242,10 @@ pub fn get_server_pid(port: u16) -> Option { /// Start the stackql server with the given options pub fn start_server(options: &StartServerOptions) -> Result { - debug!("start_server called: host={}, port={}", options.host, options.port); + debug!( + "start_server called: host={}, port={}", + options.host, options.port + ); let binary_path = match get_binary_path() { Some(path) => { @@ -227,12 +255,18 @@ pub fn start_server(options: &StartServerOptions) -> Result { _none => return Err("stackql binary not found".to_string()), }; - debug!("Checking if server is already running on port {}...", options.port); + debug!( + "Checking if server is already running on port {}...", + options.port + ); if is_server_running(options.port) { info!("Server is already running on port {}", options.port); return Ok(get_server_pid(options.port).unwrap_or(0)); } - debug!("Server not running on port {}; proceeding to start.", options.port); + debug!( + "Server not running on port {}; proceeding to start.", + options.port + ); let mut cmd = ProcessCommand::new(&binary_path); cmd.arg("srv"); @@ -276,10 +310,16 @@ pub fn start_server(options: &StartServerOptions) -> Result { let pid = child.id(); info!("Starting stackql server with PID: {}", pid); - debug!("Waiting 5 seconds for server on port {} to become ready...", options.port); + debug!( + "Waiting 5 seconds for server on port {} to become ready...", + options.port + ); thread::sleep(Duration::from_secs(5)); - debug!("Re-checking if server is running on port {}...", options.port); + debug!( + "Re-checking if server is running on port {}...", + options.port + ); if is_server_running(options.port) { info!("Server started successfully on port {}", options.port); Ok(pid) @@ -341,11 +381,17 @@ pub fn check_and_start_server() { debug!("check_and_start_server: host={}, port={}", host, port); if LOCAL_SERVER_ADDRESSES.contains(&host) { - debug!("Host '{}' is local; checking if server is running on port {}...", host, port); + debug!( + "Host '{}' is local; checking if server is running on port {}...", + host, port + ); if is_server_running(port) { info!("Local server is already running on port {}.", port); } else { - debug!("Server not detected on port {}; will attempt to start it.", port); + debug!( + "Server not detected on port {}; will attempt to start it.", + port + ); info!("Server not running. Starting server..."); let options = StartServerOptions { @@ -354,7 +400,10 @@ pub fn check_and_start_server() { ..Default::default() }; - debug!("StartServerOptions: host={}, port={}", options.host, options.port); + debug!( + "StartServerOptions: host={}, port={}", + options.host, options.port + ); if let Err(e) = start_server(&options) { error!("Failed to start server: {}", e); diff --git a/website/docs/manifest-file.md b/website/docs/manifest-file.md index 1dea06c..257d046 100644 --- a/website/docs/manifest-file.md +++ b/website/docs/manifest-file.md @@ -135,6 +135,14 @@ the fields within the __`stackql_manifest.yml`__ file are described in further d *** +### `resource.callback` + +There is no `callback` section in the manifest schema. Callback behaviour — including what to poll, retry counts, retry delays, and short-circuit conditions — is configured entirely within the resource's `.iql` file using the `/*+ callback */` anchor. This prevents confusion for users who might expect a manifest entry by analogy with `props` and `exports`. + +See [Resource Query Files - callback](resource-query-files#callback) for the full reference. + +*** + ### `resource.props` diff --git a/website/docs/resource-query-files.md b/website/docs/resource-query-files.md index bba5b1a..721fb59 100644 --- a/website/docs/resource-query-files.md +++ b/website/docs/resource-query-files.md @@ -169,7 +169,7 @@ Useful functions for testing the desired state of a resource include [`JSON_EQUA ```sql /*+ exports */ -SELECT +SELECT '{{ vpc_name }}' as vpc_name, selfLink as vpc_link FROM google.compute.networks @@ -177,6 +177,79 @@ WHERE name = '{{ vpc_name }}' AND project = '{{ project }}' ``` +### `callback` + +`callback` blocks are optional polling queries that run **after** a `create`, `update`, or `delete` DML statement to track the outcome of a long-running asynchronous operation. They are only used when the preceding DML statement includes a `RETURNING *` clause that returns a tracking handle from the provider. + +The canonical use-case is the AWS Cloud Control API (`awscc`), where every mutation returns a `ProgressEvent` object containing an `OperationStatus` and a `RequestToken` that must be polled until the operation completes. + +```sql +/*+ callback:create, retries=10, retry_delay=15, short_circuit_field=ProgressEvent.OperationStatus, short_circuit_value=SUCCESS */ +SELECT OperationStatus = 'SUCCESS' as success +FROM awscc.cloudcontrol.resource_request_statuses +WHERE region = '{{ region }}' +AND RequestToken = '{{ callback.ProgressEvent.RequestToken }}' +``` + +The operation qualifier (`:create`, `:update`, `:delete`) associates the callback with the matching DML anchor. A plain `/*+ callback */` with no qualifier runs after **any** DML operation on the resource that used `RETURNING *`. + +#### Callback options + +| Option | Required | Default | Description | +|---|---|---|---| +| `retries` | no | 3 | Maximum number of poll attempts | +| `retry_delay` | no | 5 | Seconds to wait between attempts | +| `short_circuit_field` | no | — | Dot-path into the `RETURNING *` result checked before polling | +| `short_circuit_value` | no | — | Value of `short_circuit_field` that means polling can be skipped | + +#### Success condition + +The callback query must return a column named **`success`** (or `count`). The operation is considered complete when the query returns a row where `success` equals `1` or `true`. If the query returns no rows, or the value is not truthy, the runner waits `retry_delay` seconds and retries. If all retries are exhausted the run fails with a timeout error. + +#### RETURNING * capture and the `callback.*` namespace + +When a DML statement includes `RETURNING *` the first row of the provider response is automatically captured and stored in the template context: + +- **`callback.{field}`** — shorthand form, available within the current resource's own `.iql` file (overwritten by the next DML with `RETURNING *` on any resource). +- **`{resource_name}.callback.{field}`** — fully-qualified form, available to any downstream resource for the rest of the stack run. + +For example, after a `create` on `aws_s3_workspace_bucket` that returns: + +```json +{ + "ProgressEvent": { + "OperationStatus": "SUCCESS", + "RequestToken": "a0088b9e-db47-4507-b93e-345b77979626" + } +} +``` + +The following keys become available: + +``` +callback.ProgressEvent.OperationStatus → SUCCESS +callback.ProgressEvent.RequestToken → a0088b9e-... +aws_s3_workspace_bucket.callback.ProgressEvent.OperationStatus → SUCCESS +aws_s3_workspace_bucket.callback.ProgressEvent.RequestToken → a0088b9e-... +``` + +`RETURNING *` without a `callback` block is valid — the result is captured and no polling occurs. + +#### Short-circuit + +Some providers return the final operation status synchronously in the `RETURNING *` response. When `short_circuit_field` and `short_circuit_value` are set, the runner checks the named field in the already-captured result **before** making any poll attempt. If the value matches, the callback is skipped entirely. + +#### Scope boundaries + +- Callbacks only run during `build` and `teardown`. The `test` command runs no DML and is unaffected. +- In dry-run mode, neither the `RETURNING *` capture nor the callback query executes. Intent is logged instead. + +#### Known limitations + +- There is no mechanism to short-circuit retries on a terminal failure state (e.g. `OperationStatus = 'FAILED'`). The runner retries until success or exhaustion. +- `RETURNING *` only captures the **first** row of the response. +- The `callback.*` shorthand is implicitly scoped to the current resource's `.iql` file. Use the fully-qualified `{resource_name}.callback.*` form in downstream resources. + ## Query options Query options are used with query anchors to provide options for the execution of the query. @@ -266,6 +339,68 @@ AND region = '{{ region }}' +### AWS Cloud Control API example with `callback` + +This example shows a `resource` file for an S3 bucket using the AWS Cloud Control API (`awscc`), which uses an asynchronous mutation model. The `RETURNING *` clause captures the `ProgressEvent` tracking handle and the `callback:create` / `callback:delete` blocks poll until the operation completes. + + + +```sql +/*+ exists */ +SELECT COUNT(*) as count +FROM awscc.s3.buckets +WHERE region = '{{ region }}' +AND Identifier = '{{ bucket_name }}' + +/*+ create */ +INSERT INTO awscc.s3.buckets(BucketName, region) +SELECT '{{ bucket_name }}', '{{ region }}' +RETURNING * + +/*+ callback:create, retries=10, retry_delay=15, short_circuit_field=ProgressEvent.OperationStatus, short_circuit_value=SUCCESS */ +SELECT OperationStatus = 'SUCCESS' as success +FROM awscc.cloudcontrol.resource_request_statuses +WHERE region = '{{ region }}' +AND RequestToken = '{{ callback.ProgressEvent.RequestToken }}' + +/*+ statecheck, retries=3, retry_delay=5 */ +SELECT COUNT(*) as count +FROM awscc.s3.buckets +WHERE region = '{{ region }}' +AND Identifier = '{{ bucket_name }}' + +/*+ exports */ +SELECT '{{ bucket_name }}' as bucket_name + +/*+ delete */ +DELETE FROM awscc.s3.buckets +WHERE region = '{{ region }}' +AND Identifier = '{{ bucket_name }}' +RETURNING * + +/*+ callback:delete, retries=10, retry_delay=15, short_circuit_field=ProgressEvent.OperationStatus, short_circuit_value=SUCCESS */ +SELECT OperationStatus = 'SUCCESS' as success +FROM awscc.cloudcontrol.resource_request_statuses +WHERE region = '{{ region }}' +AND RequestToken = '{{ callback.ProgressEvent.RequestToken }}' +``` + + + +The corresponding manifest entry requires **no** `callback` section — callback behaviour is configured entirely in the `.iql` file: + +```yaml + - name: aws_s3_workspace_bucket + file: aws/s3/buckets.iql + props: + - name: bucket_name + value: "{{ stack_name }}-{{ stack_env }}-root-bucket" + - name: region + value: "ap-southeast-2" + exports: + - bucket_name +``` + ### `query` type example This `query` example demonstrates retrieving the KMS key id for a given key alias in AWS.