diff --git a/CHANGELOG b/CHANGELOG index d030bb8..b857d31 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,41 @@ +2026-06-03 v1.0.23 + + Features: + - Production-friendly output for the compare + command, enabled with --output-for-production + (or OUTPUT_FOR_PRODUCTION=true in the config + file). When on, the generated migration is + shaped to minimise locking on a live + database: indexes are built with CREATE INDEX + CONCURRENTLY (the UNIQUE keyword is + preserved); indexes on partitioned parents + use CREATE INDEX ... ON ONLY {parent} inside + the transaction, then CREATE INDEX + CONCURRENTLY on each partition followed by + ALTER INDEX ... ATTACH PARTITION afterwards + (falling back to a single non-concurrent + CREATE INDEX on the parent, with an + explanatory comment, when the layout cannot + be expanded safely); indexes are dropped with + DROP INDEX CONCURRENTLY (kept non-concurrent + for indexes on partitioned tables, where + concurrent drop is illegal); and new foreign + keys are added NOT VALID inside the + transaction and validated with a separate + VALIDATE CONSTRAINT afterwards. Every + statement that cannot run inside a + transaction block (CREATE/DROP INDEX + CONCURRENTLY, VALIDATE CONSTRAINT, ALTER + INDEX ... ATTACH PARTITION) is emitted in a + clearly marked Production post-commit section + after commit;. Defaults to false; when off, + the output is byte-for-byte identical to + previous behaviour. + - Added the OUTPUT_FOR_PRODUCTION key to the + sample data/pgc.conf and documented the new + flag, configuration key, and behaviour in + README.md. + 2026-05-26 v1.0.22 Bug fixes: diff --git a/README.md b/README.md index ac267dd..69fb73d 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,8 @@ Command line arguments can be used to execute just one function in one time. `--grants-mode {ignore|addonly|full}` - controls how grants (privileges) are handled during comparison. `ignore` (default) skips grants entirely; `addonly` adds grants that exist in TO but not in FROM; `full` makes grants identical by adding missing and revoking extra. +`--output-for-production {true|false}` - set to `true` to generate a migration script that is convenient to run against a live production database (default `false` — output is unchanged). See [Production-friendly output](#production-friendly-output). + `--max-connections {number}` - maximum number of connections in the PostgreSQL connection pool. Default: `16`. Used by all concurrent introspection queries; table metadata is pulled schema-wide in one query per resource kind (columns, indexes, constraints, triggers, policies, partition info, definitions) so connection count mostly matters for the sibling queries (extensions, sequences, routines, views, etc.) running in parallel. `--use-cascade` - add `CASCADE` to every `DROP` statement in the clear script. **Warning:** `CASCADE` can silently drop dependent objects that live outside the selected schema(s) (e.g., foreign keys or views in other schemas referencing the dropped objects). Use only when you are certain no cross-schema dependencies should survive. Without this flag the generated drops rely on the explicit dependency ordering and will fail cleanly if unresolved dependencies exist. @@ -95,6 +97,23 @@ This command comparing two dumps and produce SQL script for the `FROM` database If we add `--use-drop` argument comparer will add drop scripts for all items that non exists in target database, otherwise drop scripts will be ignored. By default, comparer ignore drops. +### Production-friendly output + +```bash +pgc --command compare --from {from_dump} --to {to_dump} --output {file} --use-single-transaction --output-for-production +``` + +By default the delta script favours brevity and is meant to be applied to an idle database. With `--output-for-production true` (config key `OUTPUT_FOR_PRODUCTION=true`) the comparer instead emits a script that minimises locking on a live database: + +- **Indexes are built concurrently** — `CREATE INDEX` becomes `CREATE INDEX CONCURRENTLY` (the `UNIQUE` keyword is preserved), so building an index does not block writes. +- **Partitioned tables are handled correctly** — `CONCURRENTLY` is not allowed directly on a partitioned table, so for a partitioned parent the comparer emits `CREATE INDEX ... ON ONLY {parent}` (in the transaction), then `CREATE INDEX CONCURRENTLY` on each partition followed by `ALTER INDEX ... ATTACH PARTITION` (after the transaction). When the partition layout cannot be expanded safely (no known partitions, or multi-level/sub-partitioned children) it falls back to a single non-concurrent `CREATE INDEX` on the parent and explains why in a comment. +- **Indexes are dropped concurrently** — `DROP INDEX` becomes `DROP INDEX CONCURRENTLY` (kept non-concurrent for indexes on partitioned tables, where concurrent drop is illegal). +- **Foreign keys are validated separately** — a new foreign key is added `NOT VALID` inside the transaction (a fast, metadata-only operation) and a matching `VALIDATE CONSTRAINT` (the long, scan-heavy step) is emitted afterwards so it does not hold the lock for the whole migration. + +Because `CREATE/DROP INDEX CONCURRENTLY`, `VALIDATE CONSTRAINT` and `ALTER INDEX ... ATTACH PARTITION` **cannot run inside a transaction block**, every such statement is moved to a clearly marked `Production post-commit` section emitted **after** the `commit;`. The rest of the migration still runs inside the single transaction when `--use-single-transaction` is set. Each post-commit statement runs in its own implicit transaction, so if one fails you can re-run that section without redoing the committed part. + +`--output-for-production` defaults to `false`; when off the output is byte-for-byte identical to previous behaviour. + ### Generate a clear (drop-all) script for a database ```bash @@ -153,8 +172,11 @@ USE_SINGLE_TRANSACTION=true USE_COMMENTS=false GRANTS_MODE=ignore MAX_CONNECTIONS=16 +OUTPUT_FOR_PRODUCTION=false ``` +`OUTPUT_FOR_PRODUCTION` (default `false`) is the configuration-file equivalent of the `--output-for-production` flag described in [Production-friendly output](#production-friendly-output). + ## Choosing `MAX_CONNECTIONS` `MAX_CONNECTIONS` caps the connection pool the tool opens per dump. Independent introspection queries (extensions, sequences, routines, views, types, tables, etc.) are fired concurrently via `tokio::try_join!`, and the table-level data (columns, indexes, constraints, triggers, policies, partition info, definitions) is pulled with **schema-wide** queries — one query per sub-resource, independent of table count. So the connection count mostly bounds how many of the ~18 concurrent sibling queries run without queueing. diff --git a/app/Cargo.lock b/app/Cargo.lock index 209158b..33fa333 100644 --- a/app/Cargo.lock +++ b/app/Cargo.lock @@ -1146,7 +1146,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pgc" -version = "1.0.21" +version = "1.0.23" dependencies = [ "chrono", "clap", diff --git a/app/Cargo.toml b/app/Cargo.toml index 12d0e9e..ced8b0b 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgc" -version = "1.0.21" +version = "1.0.23" edition = "2024" license = "MIT" authors = ["nettrash "] diff --git a/app/src/comparer/core.rs b/app/src/comparer/core.rs index e14c7a4..919a959 100644 --- a/app/src/comparer/core.rs +++ b/app/src/comparer/core.rs @@ -1,6 +1,8 @@ +use crate::comparer::production::{self, ChildRef, PartitionContext}; use crate::config::grants_mode::GrantsMode; use crate::dump::acl; use crate::dump::column_dependent::ColumnDependentKind; +use crate::dump::table::IndexAlterPlan; use crate::dump::table_column::TableColumn; use crate::dump::table_constraint::TableConstraint; use crate::dump::table_index::TableIndex; @@ -28,9 +30,17 @@ pub struct Comparer { use_comments: bool, // How to handle grants (privileges) during comparison grants_mode: GrantsMode, + // Whether to emit a migration script that is safe/convenient to run on a + // live production database (concurrent index builds, partition-aware index + // creation, NOT VALID + VALIDATE for foreign keys, concurrent index drops). + output_for_production: bool, // The script that will be generated script: String, + // Statements that must run after the main transaction commits, because + // CONCURRENTLY / VALIDATE CONSTRAINT / ATTACH PARTITION cannot run inside a + // transaction block. Only populated when `output_for_production` is set. + production_post_script: String, enum_pre_script: String, enum_post_script: String, type_post_script: String, @@ -64,7 +74,9 @@ impl Comparer { use_single_transaction, use_comments, grants_mode, + output_for_production: false, script: String::new(), + production_post_script: String::new(), enum_pre_script: String::new(), enum_post_script: String::new(), type_post_script: String::new(), @@ -92,8 +104,24 @@ impl Comparer { comparer } + /// Enable or disable production-friendly output. When enabled, indexes are + /// built concurrently (partition-aware), foreign keys are added `NOT VALID` + /// then validated after commit, and indexes are dropped concurrently — all + /// post-commit statements are emitted after the main transaction. + pub fn set_output_for_production(&mut self, value: bool) -> &mut Self { + self.output_for_production = value; + self + } + // Compare dumps and generate the script pub async fn compare(&mut self) -> Result<(), Error> { + if self.output_for_production { + self.script.append_block( + "/* Output generated for production: indexes are built/dropped concurrently, \ + foreign keys are validated, and the statements that cannot run inside a \ + transaction are emitted after COMMIT. */", + ); + } if self.use_single_transaction { self.script.append_block("begin;"); } @@ -156,6 +184,19 @@ impl Comparer { self.script.push_str("\ncommit;"); } + // Concurrent index builds/drops, FK validations and partition-index + // attaches cannot run inside a transaction block, so they are emitted + // here, after COMMIT (each runs in its own implicit transaction). + if !self.production_post_script.is_empty() { + self.script.append_block( + "\n/* ---> Production post-commit (run outside a transaction): Start --------------- */", + ); + let post = std::mem::take(&mut self.production_post_script); + self.script.push_str(&post); + self.script + .append_block("/* ---> Production post-commit: End --------------- */"); + } + Ok(()) } @@ -298,6 +339,108 @@ impl Comparer { } } + /// Build the partition topology maps the production output path needs: + /// the set of partitioned parent tables, each parent's direct partitions, + /// and the set of indexes that live on a partitioned parent (which cannot + /// be dropped with `DROP INDEX CONCURRENTLY`). All keys use the dump's + /// `quote_ident`-applied `schema.name` form so they match `partition_of`. + fn build_partition_context_maps( + &self, + ) -> ( + HashSet, + HashMap>, + HashSet, + ) { + let mut partitioned_parents: HashSet = HashSet::new(); + let mut partitioned_indexes: HashSet = HashSet::new(); + for table in self.to.tables.iter().chain(self.from.tables.iter()) { + if table.partition_key.is_some() { + partitioned_parents.insert(format!("{}.{}", table.schema, table.name)); + for index in &table.indexes { + if !index.is_partition_index { + partitioned_indexes.insert(format!("{}.{}", index.schema, index.name)); + } + } + } + } + // Children come from the TO side — the schema we are migrating toward. + let mut children: HashMap> = HashMap::new(); + for table in &self.to.tables { + if let Some(parent) = &table.partition_of { + children.entry(parent.clone()).or_default().push(ChildRef { + schema: table.schema.clone(), + table: table.name.clone(), + }); + } + } + (partitioned_parents, children, partitioned_indexes) + } + + /// Emit a new table's CREATE (without triggers) plus its indexes, rewriting + /// every index for production (concurrent / partition-aware). Free function + /// over the two buffers so it can run while `self.to` is borrowed by the + /// emission loops. + fn emit_new_table_create_prod( + script: &mut String, + post_commit: &mut String, + table: &Table, + ctx: &PartitionContext, + ) { + script.push_str(&table.get_script_without_triggers_no_indexes()); + for index in table.creatable_indexes() { + let split = production::index_create_split(index, ctx); + script.push_str(&split.in_txn); + post_commit.push_str(&split.post_commit); + } + } + + /// Emit the index changes of an ALTER for production: drops (concurrent + /// unless on a partitioned table), comment-only changes (in-txn), then + /// (re)creates (concurrent / partition-aware). Mirrors the ordering of + /// `build_alter_script` (drops before creates). + fn emit_index_alter_plan_prod( + script: &mut String, + post_commit: &mut String, + plan: &IndexAlterPlan, + use_drop: bool, + ctx: &PartitionContext, + ) { + for old_index in &plan.drop { + let (stmt, post) = production::index_drop_statement(old_index, ctx); + let block = stmt.with_empty_lines(); + let target = if post { + &mut *post_commit + } else { + &mut *script + }; + if use_drop { + target.push_str(&block); + } else { + target.push_str(&format!("-- {block}")); + } + } + for index in &plan.comment_changes { + if let Some(comment) = &index.comment { + script.append_block(&format!( + "comment on index {}.{} is '{}';", + index.schema, + index.name, + comment.replace('\'', "''") + )); + } else { + script.append_block(&format!( + "comment on index {}.{} is null;", + index.schema, index.name + )); + } + } + for index in &plan.create { + let split = production::index_create_split(index, ctx); + script.push_str(&split.in_txn); + post_commit.push_str(&split.post_commit); + } + } + fn get_script(&self) -> String { if self.use_comments { return self.script.clone(); @@ -1638,6 +1781,18 @@ impl Comparer { // We will drop all tables that exists just in "from" dump (partitions first). let ordered_from_drop = Self::ordered_tables_for_drop(&self.from.tables); + // Partition topology used by the production output path to build/drop + // indexes concurrently and partition-aware. Owned maps, so they don't + // borrow `self` and the emission loops below can still push to the + // script buffers. + let (partitioned_parents, partition_children, partitioned_indexes) = + self.build_partition_context_maps(); + let partition_ctx = PartitionContext { + partitioned_parents: &partitioned_parents, + children: &partition_children, + partitioned_indexes: &partitioned_indexes, + }; + // Build lookup maps for O(1) access let to_table_map: HashMap<(&str, &str), usize> = self .to @@ -1881,12 +2036,23 @@ impl Comparer { self.script .push_str(format!("/* Table: {}.{}*/\n", table.schema, table.name).as_str()); - self.script - .push_str(table.get_script_without_triggers().as_str()); + if self.output_for_production { + Self::emit_new_table_create_prod( + &mut self.script, + &mut self.production_post_script, + table, + &partition_ctx, + ); + } else { + self.script + .push_str(table.get_script_without_triggers().as_str()); + } self.trigger_post_script .push_str(table.get_trigger_script().as_str()); } } + // NOTE: `table` above is `&&Table` (from `ordered_to.iter()`); the + // helper takes `&Table` via deref coercion at the call boundary. // We will find all existing tables in both dumps with different hashes for table in ordered_from.iter() { @@ -1932,8 +2098,17 @@ impl Comparer { // `recreated_tables` to swap in the default ACL. self.recreated_tables .insert(Self::table_key(&table.schema, &table.name)); - self.script - .push_str(to_table.get_script_without_triggers().as_str()); + if self.output_for_production { + Self::emit_new_table_create_prod( + &mut self.script, + &mut self.production_post_script, + to_table, + &partition_ctx, + ); + } else { + self.script + .push_str(to_table.get_script_without_triggers().as_str()); + } self.trigger_post_script .push_str(to_table.get_trigger_script().as_str()); } else { @@ -1949,9 +2124,6 @@ impl Comparer { // drift apart. let table_recreated = table.will_be_dropped_and_recreated(to_table); - let alter_script = - table.get_alter_script_without_triggers(to_table, self.use_drop); - if table_recreated { // Mark for grants: same reasoning as the branch // above — the dropped+recreated table inherits @@ -1965,7 +2137,30 @@ impl Comparer { } } - self.script.push_str(alter_script.as_str()); + // In production mode (and only when the table is altered + // in place — a wholesale drop+recreate re-emits the full + // CREATE with inline indexes anyway), defer index + // create/drop and re-emit them concurrently / partition- + // aware via the index alter plan. + if self.output_for_production && !table_recreated { + let alter_script = table.get_alter_script_without_triggers_no_indexes( + to_table, + self.use_drop, + ); + self.script.push_str(alter_script.as_str()); + let plan = table.index_alter_plan(to_table); + Self::emit_index_alter_plan_prod( + &mut self.script, + &mut self.production_post_script, + &plan, + self.use_drop, + &partition_ctx, + ); + } else { + let alter_script = + table.get_alter_script_without_triggers(to_table, self.use_drop); + self.script.push_str(alter_script.as_str()); + } // Issue #188 / Path B: when `get_alter_script` // routes a column through its @@ -2026,8 +2221,17 @@ impl Comparer { for table in deferred_new_children { self.script .push_str(format!("/* Table: {}.{}*/\n", table.schema, table.name).as_str()); - self.script - .push_str(table.get_script_without_triggers().as_str()); + if self.output_for_production { + Self::emit_new_table_create_prod( + &mut self.script, + &mut self.production_post_script, + table, + &partition_ctx, + ); + } else { + self.script + .push_str(table.get_script_without_triggers().as_str()); + } self.trigger_post_script .push_str(table.get_trigger_script().as_str()); } @@ -2635,11 +2839,33 @@ impl Comparer { } // Table exists in both. Check for FK changes. - self.script - .push_str(&from_table.get_foreign_key_alter_script(table)); + if self.output_for_production { + // In-place FK modifications stay as-is; brand-new foreign + // keys are added NOT VALID and validated after commit. + let (alters, new_fks) = from_table.foreign_key_alter_split(table); + self.script.push_str(&alters); + for constraint in new_fks { + let split = production::foreign_key_split(constraint); + self.script.push_str(&split.in_txn); + self.production_post_script.push_str(&split.post_commit); + } + } else { + self.script + .push_str(&from_table.get_foreign_key_alter_script(table)); + } } else { // New table. Add its FKs. - self.script.push_str(&table.get_foreign_key_script()); + if self.output_for_production { + for constraint in &table.constraints { + if constraint.constraint_type.to_lowercase() == "foreign key" { + let split = production::foreign_key_split(constraint); + self.script.push_str(&split.in_txn); + self.production_post_script.push_str(&split.post_commit); + } + } + } else { + self.script.push_str(&table.get_foreign_key_script()); + } } } diff --git a/app/src/comparer/core_tests.rs b/app/src/comparer/core_tests.rs index 05f3eab..2215d3c 100644 --- a/app/src/comparer/core_tests.rs +++ b/app/src/comparer/core_tests.rs @@ -11121,3 +11121,138 @@ async fn issue191_pr198_cycle_fk_comment_survives_round_trip() { script ); } + +#[tokio::test] +async fn output_for_production_defers_concurrent_index_and_validates_fk() { + // New table with one index and one foreign key. Production mode must build + // the index CONCURRENTLY after COMMIT, add the FK NOT VALID inside the + // transaction, and VALIDATE it after COMMIT. + let from_dump = Dump::new(DumpConfig::default()); + let mut to_dump = Dump::new(DumpConfig::default()); + + let index = TableIndex { + schema: "public".to_string(), + table: "orders".to_string(), + name: "idx_orders_total".to_string(), + catalog: None, + indexdef: "CREATE INDEX idx_orders_total ON public.orders USING btree (total)".to_string(), + is_partition_index: false, + comment: None, + }; + let fk = TableConstraint { + catalog: "postgres".to_string(), + schema: "public".to_string(), + name: "fk_orders_customer".to_string(), + table_name: "orders".to_string(), + constraint_type: "FOREIGN KEY".to_string(), + is_deferrable: false, + initially_deferred: false, + definition: Some("FOREIGN KEY (customer_id) REFERENCES public.customers (id)".to_string()), + coninhcount: 0, + is_enforced: true, + no_inherit: false, + nulls_not_distinct: false, + comment: None, + }; + let table = Table::new( + "public".to_string(), + "orders".to_string(), + "public".to_string(), + "orders".to_string(), + "postgres".to_string(), + None, + vec![], + vec![fk], + vec![index], + vec![], + None, + ); + to_dump.tables.push(table); + + let mut comparer = Comparer::new(from_dump, to_dump, false, true, true, GrantsMode::Ignore); + comparer.set_output_for_production(true); + comparer.compare().await.unwrap(); + let script = comparer.get_script(); + + let commit_pos = script.find("commit;").expect("script must contain commit;"); + let concurrent_pos = script + .find("CREATE INDEX CONCURRENTLY idx_orders_total ON public.orders USING btree (total);") + .expect("concurrent index build must be present"); + let validate_pos = script + .find("validate constraint fk_orders_customer;") + .expect("FK validation must be present"); + let not_valid_pos = script + .find("not valid;") + .expect("FK must be added NOT VALID"); + + // Concurrent build and FK validation run after COMMIT. + assert!( + concurrent_pos > commit_pos, + "concurrent index must come after commit;\n{script}" + ); + assert!( + validate_pos > commit_pos, + "validate constraint must come after commit;\n{script}" + ); + // The FK is added NOT VALID before COMMIT. + assert!( + not_valid_pos < commit_pos, + "FK NOT VALID must be added before commit;\n{script}" + ); + // The in-transaction CREATE TABLE must not carry an inline (non-concurrent) + // index build. + let in_txn = &script[..commit_pos]; + assert!( + !in_txn.contains("CREATE INDEX idx_orders_total ON public.orders"), + "index must be deferred out of the in-transaction create:\n{in_txn}" + ); +} + +#[tokio::test] +async fn output_for_production_disabled_keeps_inline_index() { + // Same shape as above but with the flag off: the index is inline in the + // CREATE TABLE and there is no post-commit / CONCURRENTLY output. + let from_dump = Dump::new(DumpConfig::default()); + let mut to_dump = Dump::new(DumpConfig::default()); + + let index = TableIndex { + schema: "public".to_string(), + table: "orders".to_string(), + name: "idx_orders_total".to_string(), + catalog: None, + indexdef: "CREATE INDEX idx_orders_total ON public.orders USING btree (total)".to_string(), + is_partition_index: false, + comment: None, + }; + let table = Table::new( + "public".to_string(), + "orders".to_string(), + "public".to_string(), + "orders".to_string(), + "postgres".to_string(), + None, + vec![], + vec![], + vec![index], + vec![], + None, + ); + to_dump.tables.push(table); + + let mut comparer = Comparer::new(from_dump, to_dump, false, true, true, GrantsMode::Ignore); + comparer.compare().await.unwrap(); + let script = comparer.get_script(); + + assert!( + script.contains("CREATE INDEX idx_orders_total ON public.orders USING btree (total);"), + "default mode keeps the inline index:\n{script}" + ); + assert!( + !script.contains("CONCURRENTLY"), + "default mode must not emit CONCURRENTLY:\n{script}" + ); + assert!( + !script.contains("Production post-commit"), + "default mode must not emit a post-commit section:\n{script}" + ); +} diff --git a/app/src/comparer/mod.rs b/app/src/comparer/mod.rs index 0703e4b..3487ef1 100644 --- a/app/src/comparer/mod.rs +++ b/app/src/comparer/mod.rs @@ -1,2 +1,3 @@ pub mod core; +pub mod production; mod scanner; diff --git a/app/src/comparer/production.rs b/app/src/comparer/production.rs new file mode 100644 index 0000000..3207023 --- /dev/null +++ b/app/src/comparer/production.rs @@ -0,0 +1,297 @@ +//! Production-mode SQL rewriting for the `output_for_production` flag. +//! +//! When the flag is enabled the comparer routes index and foreign-key DDL +//! through these helpers so the resulting migration is convenient to apply to a +//! live database: +//! +//! * indexes are built with `CREATE INDEX CONCURRENTLY` (no long write lock); +//! * indexes on partitioned parents use the only-safe pattern — +//! `CREATE INDEX ON ONLY parent`, then `CREATE INDEX CONCURRENTLY` on each +//! partition, then `ALTER INDEX ... ATTACH PARTITION` — because +//! `CONCURRENTLY` is rejected on a partitioned table directly; +//! * foreign keys are added `NOT VALID` and validated in a separate step +//! (`VALIDATE CONSTRAINT`) so the long validation scan does not hold a +//! table lock for the whole migration; +//! * indexes are dropped with `DROP INDEX CONCURRENTLY`. +//! +//! `CONCURRENTLY`, `VALIDATE CONSTRAINT` and `ATTACH PARTITION` cannot run +//! inside a transaction block, so every statement that must run after the main +//! transaction commits is collected in the `post_commit` half of a +//! [`ProdSplit`]. The comparer appends that buffer after `commit;`. + +use std::collections::{HashMap, HashSet}; + +use crate::dump::table_constraint::TableConstraint; +use crate::dump::table_index::TableIndex; +use crate::utils::string_extensions::StringExt; + +/// A pair of script fragments produced by a production rewrite: statements that +/// stay inside the migration transaction, and statements that must run after it +/// commits (each in its own autocommit statement). +#[derive(Debug, Default, PartialEq, Eq)] +pub struct ProdSplit { + pub in_txn: String, + pub post_commit: String, +} + +/// Direct child of a partitioned parent table. Identifiers are stored in the +/// same `quote_ident`-applied form as [`TableIndex::schema`] / [`TableIndex::table`]. +#[derive(Debug, Clone)] +pub struct ChildRef { + pub schema: String, + pub table: String, +} + +/// Partition topology the index rewriters need. All qualified names use the +/// `quote_ident(schema).quote_ident(name)` form already produced by the dump. +pub struct PartitionContext<'a> { + /// Qualified names of partitioned parent tables (relkind `p`). + pub partitioned_parents: &'a HashSet, + /// Parent qualified name -> its direct partitions. + pub children: &'a HashMap>, + /// Qualified index names (`schema.name`) whose underlying table is a + /// partitioned parent. `DROP INDEX CONCURRENTLY` is illegal for these. + pub partitioned_indexes: &'a HashSet, +} + +/// Insert `CONCURRENTLY` after the `CREATE [UNIQUE] INDEX` keyword. +/// `pg_get_indexdef` always emits uppercase keywords, so a literal prefix match +/// suffices; an unrecognised statement is returned unchanged. +fn insert_concurrently(indexdef: &str) -> String { + if let Some(rest) = indexdef.strip_prefix("CREATE UNIQUE INDEX ") { + return format!("CREATE UNIQUE INDEX CONCURRENTLY {rest}"); + } + if let Some(rest) = indexdef.strip_prefix("CREATE INDEX ") { + return format!("CREATE INDEX CONCURRENTLY {rest}"); + } + indexdef.to_string() +} + +/// Insert `ONLY` after the `ON` of an index definition so the index is created +/// on the partitioned parent alone (invalid until every partition's index is +/// attached). Returns `None` if the statement has no ` ON ` token. +fn make_on_only(indexdef: &str) -> Option { + let pos = indexdef.find(" ON ")?; + let mut out = String::with_capacity(indexdef.len() + 5); + out.push_str(&indexdef[..pos]); + out.push_str(" ON ONLY "); + out.push_str(&indexdef[pos + " ON ".len()..]); + Some(out) +} + +/// Split an index definition into its `CREATE [UNIQUE] INDEX` keyword and the +/// `USING ...` tail (access method, columns, predicate). Returns `None` when +/// the statement does not have the expected shape. +fn create_kw_and_tail(indexdef: &str) -> Option<(&'static str, &str)> { + let create_kw = if indexdef.starts_with("CREATE UNIQUE INDEX ") { + "CREATE UNIQUE INDEX" + } else if indexdef.starts_with("CREATE INDEX ") { + "CREATE INDEX" + } else { + return None; + }; + let using = indexdef.find(" USING ")?; + Some((create_kw, indexdef[using + 1..].trim_end())) +} + +fn unquote_ident(s: &str) -> String { + let t = s.trim(); + if t.len() >= 2 && t.starts_with('"') && t.ends_with('"') { + t[1..t.len() - 1].replace("\"\"", "\"") + } else { + t.to_string() + } +} + +fn quote_ident(s: &str) -> String { + let needs_quote = s.is_empty() + || s.chars() + .next() + .map(|c| !(c.is_ascii_lowercase() || c == '_')) + == Some(true) + || s.chars() + .any(|c| !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')); + if needs_quote { + format!("\"{}\"", s.replace('"', "\"\"")) + } else { + s.to_string() + } +} + +fn comment_on_index(index: &TableIndex, comment: &str) -> String { + format!( + "comment on index {}.{} is '{}';", + index.schema, + index.name, + comment.replace('\'', "''") + ) +} + +/// Derive a unique per-partition index name from the parent index and the +/// partition table name, e.g. parent index `idx_orders_total` on partition +/// `orders_2024` -> `orders_2024_idx_orders_total`. PostgreSQL truncates names +/// to 63 bytes; on the rare collision the migration would error rather than do +/// the wrong thing. +fn child_index_name(parent_index: &TableIndex, child: &ChildRef) -> String { + quote_ident(&format!( + "{}_{}", + unquote_ident(&child.table), + unquote_ident(&parent_index.name) + )) +} + +/// Production rewrite of a single index *creation*. +pub fn index_create_split(index: &TableIndex, ctx: &PartitionContext) -> ProdSplit { + let mut split = ProdSplit::default(); + let parent_qualified = format!("{}.{}", index.schema, index.table); + + // Plain (non-partitioned) table: a straight concurrent build, post-commit. + if !ctx.partitioned_parents.contains(&parent_qualified) { + split + .post_commit + .push_str(&insert_concurrently(&index.indexdef)); + split.post_commit.append_block(";"); + if let Some(comment) = &index.comment { + split + .post_commit + .append_block(&comment_on_index(index, comment)); + } + return split; + } + + let children = ctx + .children + .get(&parent_qualified) + .map(Vec::as_slice) + .unwrap_or(&[]); + + // Concurrent per-partition handling needs the parsed keyword/tail and a + // single level of partitioning. Sub-partitioned children (a partition that + // is itself a partitioned table) and unparseable definitions fall back to a + // plain, in-transaction CREATE INDEX on the parent (which recurses to every + // partition under a brief lock — CONCURRENTLY is illegal on a partitioned + // table, so this is the correct, if less convenient, fallback). + let has_sub_partition = children.iter().any(|c| { + ctx.partitioned_parents + .contains(&format!("{}.{}", c.schema, c.table)) + }); + let parsed = create_kw_and_tail(&index.indexdef); + + if children.is_empty() || has_sub_partition || parsed.is_none() { + let reason = if parsed.is_none() { + "unrecognised index definition" + } else if children.is_empty() { + "no partitions found" + } else { + "multi-level partitioning" + }; + split.in_txn.push_str(&format!( + "/* partitioned index {}.{}: {reason} — created non-concurrently */\n", + index.schema, index.name + )); + split.in_txn.push_str(&index.indexdef); + split.in_txn.append_block(";"); + if let Some(comment) = &index.comment { + split.in_txn.append_block(&comment_on_index(index, comment)); + } + return split; + } + + let (create_kw, tail) = parsed.unwrap(); + + // 1. Create the index on the parent only (metadata-only, fast) in-txn. + match make_on_only(&index.indexdef) { + Some(only) => split.in_txn.append_block(&format!("{only};")), + None => split.in_txn.append_block(&format!( + "{create_kw} {} ON ONLY {}.{} {tail};", + index.name, index.schema, index.table + )), + } + if let Some(comment) = &index.comment { + split.in_txn.append_block(&comment_on_index(index, comment)); + } + + // 2. Build each partition's index concurrently, then attach it. Once every + // partition is attached the parent index becomes valid automatically. + for child in children { + let child_idx = child_index_name(index, child); + split.post_commit.append_block(&format!( + "{create_kw} CONCURRENTLY {child_idx} ON {}.{} {tail};", + child.schema, child.table + )); + split.post_commit.append_block(&format!( + "alter index {}.{} attach partition {}.{};", + index.schema, index.name, child.schema, child_idx + )); + } + split +} + +/// Production rewrite of a single index *drop*. Returns the statement plus +/// whether it must run post-commit (`true`) or stay in-transaction (`false`). +/// `DROP INDEX CONCURRENTLY` is illegal for an index on a partitioned table, so +/// those stay in-transaction and non-concurrent. +pub fn index_drop_statement(index: &TableIndex, ctx: &PartitionContext) -> (String, bool) { + let qualified_index = format!("{}.{}", index.schema, index.name); + if ctx.partitioned_indexes.contains(&qualified_index) { + ( + format!("drop index if exists {}.{};", index.schema, index.name), + false, + ) + } else { + ( + format!( + "drop index concurrently if exists {}.{};", + index.schema, index.name + ), + true, + ) + } +} + +/// Production rewrite of a foreign-key constraint creation: add it `NOT VALID` +/// inside the transaction, then `VALIDATE CONSTRAINT` afterwards. Only enforced +/// FOREIGN KEY constraints are split; anything else is emitted unchanged +/// in-transaction (a `NOT ENFORCED` constraint is never validated, and other +/// constraint kinds do not support `NOT VALID` in this path). +pub fn foreign_key_split(constraint: &TableConstraint) -> ProdSplit { + let mut split = ProdSplit::default(); + let full = constraint.get_script(); + + if !constraint + .constraint_type + .eq_ignore_ascii_case("foreign key") + || !constraint.is_enforced + { + split.in_txn.push_str(&full); + return split; + } + + // The ADD CONSTRAINT statement ends at the first ';' (a FOREIGN KEY clause + // never contains one); any trailing `comment on constraint` block follows. + match full.find(';') { + Some(semi) => { + let add_stmt = full[..semi].trim_end(); + let remainder = full[semi + 1..].trim_start_matches('\n'); + + let mut in_txn = add_stmt.to_string(); + in_txn.push_str(" not valid;"); + in_txn = in_txn.with_empty_lines(); + if !remainder.trim().is_empty() { + in_txn.push_str(remainder); + } + split.in_txn = in_txn; + split.post_commit = format!( + "alter table {}.{} validate constraint {};", + constraint.schema, constraint.table_name, constraint.name + ) + .with_empty_lines(); + } + None => split.in_txn.push_str(&full), + } + split +} + +#[cfg(test)] +#[path = "production_tests.rs"] +mod tests; diff --git a/app/src/comparer/production_tests.rs b/app/src/comparer/production_tests.rs new file mode 100644 index 0000000..c1021d1 --- /dev/null +++ b/app/src/comparer/production_tests.rs @@ -0,0 +1,286 @@ +use super::*; + +fn index(schema: &str, table: &str, name: &str, indexdef: &str) -> TableIndex { + TableIndex { + schema: schema.to_string(), + table: table.to_string(), + name: name.to_string(), + catalog: None, + indexdef: indexdef.to_string(), + is_partition_index: false, + comment: None, + } +} + +fn fk(schema: &str, table: &str, name: &str, definition: &str) -> TableConstraint { + TableConstraint { + catalog: "postgres".to_string(), + schema: schema.to_string(), + name: name.to_string(), + table_name: table.to_string(), + constraint_type: "FOREIGN KEY".to_string(), + is_deferrable: false, + initially_deferred: false, + definition: Some(definition.to_string()), + coninhcount: 0, + is_enforced: true, + no_inherit: false, + nulls_not_distinct: false, + comment: None, + } +} + +fn empty_ctx() -> ( + HashSet, + HashMap>, + HashSet, +) { + (HashSet::new(), HashMap::new(), HashSet::new()) +} + +#[test] +fn plain_index_becomes_concurrent_post_commit() { + let (parents, children, part_idx) = empty_ctx(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index( + "public", + "orders", + "idx_orders_total", + "CREATE INDEX idx_orders_total ON public.orders USING btree (total)", + ); + let split = index_create_split(&idx, &ctx); + assert!(split.in_txn.is_empty()); + assert!( + split.post_commit.contains( + "CREATE INDEX CONCURRENTLY idx_orders_total ON public.orders USING btree (total);" + ), + "got: {}", + split.post_commit + ); +} + +#[test] +fn unique_index_keyword_preserved() { + let (parents, children, part_idx) = empty_ctx(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index( + "public", + "orders", + "uq_orders_ref", + "CREATE UNIQUE INDEX uq_orders_ref ON public.orders USING btree (ref)", + ); + let split = index_create_split(&idx, &ctx); + assert!( + split + .post_commit + .contains("CREATE UNIQUE INDEX CONCURRENTLY uq_orders_ref"), + "got: {}", + split.post_commit + ); +} + +#[test] +fn index_comment_follows_concurrent_build() { + let (parents, children, part_idx) = empty_ctx(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let mut idx = index( + "public", + "orders", + "idx_total", + "CREATE INDEX idx_total ON public.orders USING btree (total)", + ); + idx.comment = Some("by total".to_string()); + let split = index_create_split(&idx, &ctx); + assert!(split.post_commit.contains("CREATE INDEX CONCURRENTLY")); + assert!( + split + .post_commit + .contains("comment on index public.idx_total is 'by total';") + ); +} + +#[test] +fn partitioned_parent_expands_to_only_plus_attach() { + let mut parents = HashSet::new(); + parents.insert("public.orders".to_string()); + let mut children = HashMap::new(); + children.insert( + "public.orders".to_string(), + vec![ + ChildRef { + schema: "public".to_string(), + table: "orders_2023".to_string(), + }, + ChildRef { + schema: "public".to_string(), + table: "orders_2024".to_string(), + }, + ], + ); + let part_idx = HashSet::new(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index( + "public", + "orders", + "idx_orders_total", + "CREATE INDEX idx_orders_total ON public.orders USING btree (total)", + ); + let split = index_create_split(&idx, &ctx); + + // Parent index created ON ONLY, in-transaction. + assert!( + split + .in_txn + .contains("CREATE INDEX idx_orders_total ON ONLY public.orders USING btree (total);"), + "in_txn: {}", + split.in_txn + ); + // Each partition gets a concurrent build + attach, post-commit. + assert!(split.post_commit.contains( + "CREATE INDEX CONCURRENTLY orders_2023_idx_orders_total ON public.orders_2023 USING btree (total);" + )); + assert!(split.post_commit.contains( + "alter index public.idx_orders_total attach partition public.orders_2023_idx_orders_total;" + )); + assert!(split.post_commit.contains( + "CREATE INDEX CONCURRENTLY orders_2024_idx_orders_total ON public.orders_2024 USING btree (total);" + )); + assert!(split.post_commit.contains( + "alter index public.idx_orders_total attach partition public.orders_2024_idx_orders_total;" + )); +} + +#[test] +fn partitioned_parent_without_known_children_falls_back_in_txn() { + let mut parents = HashSet::new(); + parents.insert("public.orders".to_string()); + let children = HashMap::new(); + let part_idx = HashSet::new(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index( + "public", + "orders", + "idx_orders_total", + "CREATE INDEX idx_orders_total ON public.orders USING btree (total)", + ); + let split = index_create_split(&idx, &ctx); + assert!(split.post_commit.is_empty()); + assert!(split.in_txn.contains("no partitions found")); + assert!( + split + .in_txn + .contains("CREATE INDEX idx_orders_total ON public.orders USING btree (total);") + ); +} + +#[test] +fn drop_index_concurrent_post_commit_for_plain_table() { + let (parents, children, part_idx) = empty_ctx(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index("public", "orders", "idx_total", ""); + let (stmt, post_commit) = index_drop_statement(&idx, &ctx); + assert!(post_commit); + assert_eq!(stmt, "drop index concurrently if exists public.idx_total;"); +} + +#[test] +fn drop_partitioned_index_stays_in_txn_non_concurrent() { + let parents = HashSet::new(); + let children = HashMap::new(); + let mut part_idx = HashSet::new(); + part_idx.insert("public.idx_total".to_string()); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index("public", "orders", "idx_total", ""); + let (stmt, post_commit) = index_drop_statement(&idx, &ctx); + assert!(!post_commit); + assert_eq!(stmt, "drop index if exists public.idx_total;"); +} + +#[test] +fn foreign_key_split_into_not_valid_and_validate() { + let c = fk( + "public", + "orders", + "fk_orders_customer", + "FOREIGN KEY (customer_id) REFERENCES public.customers (id)", + ); + let split = foreign_key_split(&c); + assert!( + split.in_txn.contains( + "add constraint fk_orders_customer foreign key (customer_id) references public.customers (id) not valid;" + ), + "in_txn: {}", + split.in_txn + ); + assert_eq!( + split.post_commit.trim_end(), + "alter table public.orders validate constraint fk_orders_customer;" + ); +} + +#[test] +fn non_enforced_fk_is_not_split() { + let mut c = fk( + "public", + "orders", + "fk_orders_customer", + "FOREIGN KEY (customer_id) REFERENCES public.customers (id) NOT ENFORCED", + ); + c.is_enforced = false; + let split = foreign_key_split(&c); + assert!(split.post_commit.is_empty()); + assert!(!split.in_txn.contains("not valid")); +} + +#[test] +fn fk_comment_preserved_in_txn() { + let mut c = fk( + "public", + "orders", + "fk_orders_customer", + "FOREIGN KEY (customer_id) REFERENCES public.customers (id)", + ); + c.comment = Some("links to customer".to_string()); + let split = foreign_key_split(&c); + assert!(split.in_txn.contains("not valid;")); + assert!( + split.in_txn.contains( + "comment on constraint fk_orders_customer on public.orders is 'links to customer';" + ), + "in_txn: {}", + split.in_txn + ); + assert!( + split + .post_commit + .contains("validate constraint fk_orders_customer;") + ); +} diff --git a/app/src/config/core.rs b/app/src/config/core.rs index 4fab43a..c58f86d 100644 --- a/app/src/config/core.rs +++ b/app/src/config/core.rs @@ -20,6 +20,11 @@ pub struct Config { pub grants_mode: GrantsMode, // Maximum number of connections in the PostgreSQL connection pool pub max_connections: u32, + // Whether to emit a migration script that is safe/convenient to run on a + // live production database (concurrent index builds, partition-aware index + // creation, NOT VALID + VALIDATE for foreign keys, concurrent index drops, + // and a split transaction so the concurrent statements run outside it). + pub output_for_production: bool, } impl Config { @@ -55,6 +60,7 @@ impl Config { let mut use_comments = true; let mut grants_mode = GrantsMode::Ignore; let mut max_connections: u32 = 16; + let mut output_for_production = false; for line in &config_data { if line.trim().is_empty() || line.starts_with('#') { @@ -89,6 +95,7 @@ impl Config { && key != "USE_COMMENTS" && key != "GRANTS_MODE" && key != "MAX_CONNECTIONS" + && key != "OUTPUT_FOR_PRODUCTION" { return Err(format!("Unknown configuration key: {}", parts[0])); } @@ -119,6 +126,17 @@ impl Config { "OUTPUT" => output = raw_value.to_string(), "USE_DROP" => use_drop = value == "TRUE", "USE_SINGLE_TRANSACTION" => use_single_transaction = value == "TRUE", + "OUTPUT_FOR_PRODUCTION" => { + output_for_production = match value.as_str() { + "TRUE" => true, + "FALSE" => false, + _ => { + return Err(format!( + "Invalid value for OUTPUT_FOR_PRODUCTION: {raw_value}" + )); + } + }; + } "USE_COMMENTS" => { use_comments = match value.as_str() { "TRUE" => true, @@ -208,6 +226,7 @@ impl Config { use_comments, grants_mode, max_connections, + output_for_production, }) } diff --git a/app/src/config/core_tests.rs b/app/src/config/core_tests.rs index 83022c9..ab5ddf7 100644 --- a/app/src/config/core_tests.rs +++ b/app/src/config/core_tests.rs @@ -30,6 +30,7 @@ fn test_valid_config_parsing() { OUTPUT=result.out USE_DROP=true USE_SINGLE_TRANSACTION=true + OUTPUT_FOR_PRODUCTION=true "#; let file = write_temp_config(config_content, "test_valid_config_parsing.cfg"); let config = Config::new(file.clone()); @@ -46,6 +47,25 @@ fn test_valid_config_parsing() { assert_eq!(config.output, "result.out"); assert!(config.use_drop); assert!(config.use_single_transaction); + assert!(config.output_for_production); + let _ = std::fs::remove_file(file); +} + +#[test] +fn test_output_for_production_defaults_false() { + let config_content = "FROM_HOST=localhost\nTO_HOST=remotehost"; + let file = write_temp_config(config_content, "test_output_for_production_default.cfg"); + let config = Config::new(file.clone()); + assert!(!config.output_for_production); + let _ = std::fs::remove_file(file); +} + +#[test] +#[should_panic] +fn test_invalid_output_for_production_value_panics() { + let config_content = "FROM_HOST=localhost\nOUTPUT_FOR_PRODUCTION=maybe"; + let file = write_temp_config(config_content, "test_invalid_ofp_value.cfg"); + let _ = Config::new(file.clone()); let _ = std::fs::remove_file(file); } diff --git a/app/src/dump/table.rs b/app/src/dump/table.rs index 4c775db..79e52ac 100644 --- a/app/src/dump/table.rs +++ b/app/src/dump/table.rs @@ -175,6 +175,20 @@ pub struct Table { pub typed_table_type: Option, // OF type name for typed tables } +/// Structured result of [`Table::index_alter_plan`]: which indexes to create, +/// drop, or merely re-comment when altering a table. Borrows from both the FROM +/// and TO tables. Consumed by the production output path. +#[derive(Debug, Default)] +pub struct IndexAlterPlan<'a> { + /// Indexes to (re)create — brand-new, or whose definition changed. + pub create: Vec<&'a TableIndex>, + /// Old indexes to drop — definition changed (paired with a `create`) or + /// removed entirely. + pub drop: Vec<&'a TableIndex>, + /// Indexes whose comment changed but whose definition did not. + pub comment_changes: Vec<&'a TableIndex>, +} + impl Table { /// Creates a new Table with the given name #[allow(clippy::too_many_arguments)] // Table metadata naturally includes these fields (from pg_class and related catalogs). @@ -1100,7 +1114,7 @@ impl Table { self.hash = Some(format!("{:x}", hasher.finalize())); } - fn build_script(&self, include_triggers: bool) -> String { + fn build_script(&self, include_triggers: bool, defer_indexes: bool) -> String { let mut script = String::new(); let unlogged_prefix = if self.is_unlogged { "unlogged " } else { "" }; @@ -1373,10 +1387,16 @@ impl Table { } } - // 6. Add indexes (excluding primary key indexes and partition-inherited indexes) - for index in &self.indexes { - if !index.indexdef.to_lowercase().contains("primary key") && !index.is_partition_index { - script.push_str(&index.get_script()); + // 6. Add indexes (excluding primary key indexes and partition-inherited indexes). + // In production output mode the indexes are emitted separately by the + // comparer (concurrently / partition-aware), so they are deferred here. + if !defer_indexes { + for index in &self.indexes { + if !index.indexdef.to_lowercase().contains("primary key") + && !index.is_partition_index + { + script.push_str(&index.get_script()); + } } } @@ -1447,12 +1467,71 @@ impl Table { /// Get script for the table pub fn get_script(&self) -> String { - self.build_script(true) + self.build_script(true, false) } /// Get script for the table without triggers (for deferred trigger creation) pub fn get_script_without_triggers(&self) -> String { - self.build_script(false) + self.build_script(false, false) + } + + /// Get script for the table without triggers and without index creation. + /// Used by the production output path, which emits indexes separately so + /// they can be built concurrently / partition-aware. + pub fn get_script_without_triggers_no_indexes(&self) -> String { + self.build_script(false, true) + } + + /// Indexes that `build_script` would emit for a brand-new table: excludes + /// primary-key-backing indexes (created via the PRIMARY KEY clause) and + /// partition-inherited indexes (managed by the partitioned parent). Mirrors + /// the filter in `build_script` so the production path stays in sync. + pub fn creatable_indexes(&self) -> Vec<&TableIndex> { + self.indexes + .iter() + .filter(|index| { + !index.indexdef.to_lowercase().contains("primary key") && !index.is_partition_index + }) + .collect() + } + + /// Structured index diff between `self` (FROM) and `to_table` (TO), + /// mirroring the index handling in `build_alter_script`. The production + /// output path consumes this so each index change can be rewritten + /// concurrently / partition-aware instead of emitted inline. + pub fn index_alter_plan<'a>(&'a self, to_table: &'a Table) -> IndexAlterPlan<'a> { + let mut plan = IndexAlterPlan::default(); + + for new_index in &to_table.indexes { + if new_index.is_partition_index { + continue; + } + if let Some(old_index) = self.indexes.iter().find(|i| i.name == new_index.name) { + if old_index != new_index { + if old_index.indexdef != new_index.indexdef { + // Definition changed: drop the old, build the new. + plan.drop.push(old_index); + plan.create.push(new_index); + } else { + // Comment-only change. + plan.comment_changes.push(new_index); + } + } + } else { + plan.create.push(new_index); + } + } + + for old_index in &self.indexes { + if old_index.is_partition_index { + continue; + } + if !to_table.indexes.iter().any(|i| i.name == old_index.name) { + plan.drop.push(old_index); + } + } + + plan } /// Get trigger creation scripts only @@ -1521,11 +1600,46 @@ impl Table { script } + /// Like [`Table::get_foreign_key_alter_script`] but returns the in-place FK + /// modifications as a script and the brand-new foreign keys separately, so + /// the production output path can add the new ones `NOT VALID` and validate + /// them after the transaction commits. Mirrors the branch structure of + /// `get_foreign_key_alter_script` so the two cannot drift apart. + pub fn foreign_key_alter_split<'a>( + &self, + to_table: &'a Table, + ) -> (String, Vec<&'a TableConstraint>) { + let mut alters = String::new(); + let mut new_fks: Vec<&TableConstraint> = Vec::new(); + for new_constraint in &to_table.constraints { + if new_constraint.constraint_type.to_lowercase() != "foreign key" { + continue; + } + if let Some(old_constraint) = self + .constraints + .iter() + .find(|c| c.name == new_constraint.name) + { + if old_constraint != new_constraint { + if let Some(alter_script) = old_constraint.get_alter_script(new_constraint) { + alters.push_str(&alter_script); + } else { + new_fks.push(new_constraint); + } + } + } else { + new_fks.push(new_constraint); + } + } + (alters, new_fks) + } + fn build_alter_script( &self, to_table: &Table, use_drop: bool, include_triggers: bool, + defer_indexes: bool, ) -> String { // If partition key changes (e.g. from LIST to RANGE, or different column), we must recreate the table. // Also if table changes from partitioned to non-partitioned or vice versa. @@ -1914,14 +2028,18 @@ impl Table { script.push_str(&partition_script); script.push_str(&constraint_pre_script); script.push_str(&column_alter_script); - script.push_str(&index_drop_script); + if !defer_indexes { + script.push_str(&index_drop_script); + } if include_triggers { script.push_str(&trigger_drop_script); } script.push_str(&policy_drop_script); script.push_str(&column_drop_script); script.push_str(&constraint_post_script); - script.push_str(&index_script); + if !defer_indexes { + script.push_str(&index_script); + } if include_triggers { script.push_str(&trigger_script); } @@ -2052,12 +2170,24 @@ impl Table { /// Get script for altering the table (including triggers) pub fn get_alter_script(&self, to_table: &Table, use_drop: bool) -> String { - self.build_alter_script(to_table, use_drop, true) + self.build_alter_script(to_table, use_drop, true, false) } /// Get script for altering the table without triggers (for deferred trigger creation) pub fn get_alter_script_without_triggers(&self, to_table: &Table, use_drop: bool) -> String { - self.build_alter_script(to_table, use_drop, false) + self.build_alter_script(to_table, use_drop, false, false) + } + + /// Like [`Table::get_alter_script_without_triggers`] but omits index + /// creation/drop statements. Used by the production output path, which + /// emits index changes separately (concurrently / partition-aware) via + /// [`Table::index_alter_plan`]. + pub fn get_alter_script_without_triggers_no_indexes( + &self, + to_table: &Table, + use_drop: bool, + ) -> String { + self.build_alter_script(to_table, use_drop, false, true) } /// True when comparing `self` (FROM) against `to_table` (TO) would diff --git a/app/src/main.rs b/app/src/main.rs index af27551..06d2089 100644 --- a/app/src/main.rs +++ b/app/src/main.rs @@ -96,6 +96,15 @@ struct Args { #[arg(long, default_value = "16", value_parser = clap::value_parser!(u32).range(1..))] max_connections: u32, + /// Emit a migration script that is convenient to run against a live + /// production database: indexes are built with CREATE INDEX CONCURRENTLY + /// (partition-aware for partitioned tables), foreign keys are added + /// NOT VALID then VALIDATEd, and indexes are dropped with DROP INDEX + /// CONCURRENTLY. The concurrent statements run after the main transaction + /// commits. Default: false (output unchanged). + #[arg(long, default_value_t = false, num_args = 0..=1, default_missing_value = "true", value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] + output_for_production: bool, + /// Use CASCADE in DROP statements for the clear command. WARNING: CASCADE can drop /// dependent objects outside the selected schema(s) (e.g., foreign keys or views in /// other schemas that reference the dropped objects). Without this flag, drops rely @@ -157,6 +166,7 @@ async fn run_main() -> Result<(), Error> { args.use_single_transaction, args.use_comments, args.grants_mode, + args.output_for_production, ) .await; } @@ -263,6 +273,7 @@ async fn run_by_config(config: String) -> Result<(), Error> { cfg.use_single_transaction, cfg.use_comments, cfg.grants_mode, + cfg.output_for_production, ) .await; @@ -311,6 +322,7 @@ async fn clear_database( Ok(()) } +#[allow(clippy::too_many_arguments)] async fn compare_dumps( from: String, to: String, @@ -319,6 +331,7 @@ async fn compare_dumps( use_single_transaction: bool, use_comments: bool, grants_mode: GrantsMode, + output_for_production: bool, ) -> Result<(), Error> { println!("Reading dumps..."); let from = Dump::read_from_file(&from).await?; @@ -334,6 +347,7 @@ async fn compare_dumps( use_comments, grants_mode, ); + comparer.set_output_for_production(output_for_production); comparer.compare().await?; comparer.save_script(&output).await?; println!("Dump compared successfully. Result script: {output}"); diff --git a/data/pgc.conf b/data/pgc.conf index 4d2f797..0257b3f 100644 --- a/data/pgc.conf +++ b/data/pgc.conf @@ -24,4 +24,13 @@ TO_DUMP=to.dump # OUTPUT OUTPUT=delta.sql -USE_DROP=false \ No newline at end of file +USE_DROP=false + +# Emit a migration script that is convenient to run against a live production +# database: indexes are built with CREATE INDEX CONCURRENTLY (and partition-aware +# ONLY parent + per-partition + ATTACH for partitioned tables), foreign keys are +# added NOT VALID and validated afterwards, and indexes are dropped with +# DROP INDEX CONCURRENTLY. The concurrent statements are emitted after the main +# transaction commits (so USE_SINGLE_TRANSACTION still wraps the rest). +# Default: false (output is identical to previous behaviour). +OUTPUT_FOR_PRODUCTION=false \ No newline at end of file