diff --git a/CHANGELOG b/CHANGELOG index d030bb8..af10f34 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,59 @@ +2026-06-03 v1.0.23 + + Features: + - Production-friendly output for the compare + command, enabled with --output-for-production + (or OUTPUT_FOR_PRODUCTION=true in the config + file). When on, the generated migration is + shaped to minimise locking on a live + database: indexes are built with CREATE INDEX + CONCURRENTLY (the UNIQUE keyword is + preserved); indexes on partitioned parents + use CREATE INDEX ... ON ONLY {parent} inside + the transaction, then CREATE INDEX + CONCURRENTLY on each partition followed by + ALTER INDEX ... ATTACH PARTITION afterwards + (falling back to a single non-concurrent + CREATE INDEX on the parent, with an + explanatory comment, when the layout cannot + be expanded safely); indexes are dropped with + DROP INDEX CONCURRENTLY (kept non-concurrent + for indexes on partitioned tables, where + concurrent drop is illegal); and new foreign + keys are added NOT VALID inside the + transaction and validated with a separate + VALIDATE CONSTRAINT afterwards. Every + statement that cannot run inside a + transaction block (CREATE/DROP INDEX + CONCURRENTLY, VALIDATE CONSTRAINT, ALTER + INDEX ... ATTACH PARTITION) is emitted in a + clearly marked Production post-commit section + after commit. Defaults to false; when off, + the output is byte-for-byte identical to + previous behaviour. + - Added the OUTPUT_FOR_PRODUCTION key to the + sample data/pgc.conf and documented the new + flag, configuration key, and behaviour in + README.md. + - Production output is now re-runnable: every + DDL form PostgreSQL supports is emitted defensively + with an idempotency guard where available, so a + partially-applied migration can be replayed without failing. + CREATE [UNLOGGED] TABLE, CREATE [UNLOGGED] + SEQUENCE, CREATE MATERIALIZED VIEW and CREATE + [UNIQUE] INDEX [CONCURRENTLY] gain IF NOT + EXISTS; CREATE VIEW becomes CREATE OR REPLACE + VIEW; ALTER TABLE ... ADD COLUMN gains IF NOT + EXISTS; and ALTER TABLE ... DROP COLUMN / + DROP CONSTRAINT gain IF EXISTS. CREATE TYPE + and ALTER TABLE ... ADD CONSTRAINT are left + as-is (PostgreSQL has no idempotency guard for + them). The rewrite is literal-, comment- and + dollar-quote-aware, so quoted text, commented + drops and function bodies are untouched, and + it only applies when + --output-for-production is on. + 2026-05-26 v1.0.22 Bug fixes: diff --git a/README.md b/README.md index ac267dd..58c110b 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,8 @@ Command line arguments can be used to execute just one function in one time. `--grants-mode {ignore|addonly|full}` - controls how grants (privileges) are handled during comparison. `ignore` (default) skips grants entirely; `addonly` adds grants that exist in TO but not in FROM; `full` makes grants identical by adding missing and revoking extra. +`--output-for-production {true|false}` - set to `true` to generate a migration script that is convenient to run against a live production database (default `false` — output is unchanged). See [Production-friendly output](#production-friendly-output). + `--max-connections {number}` - maximum number of connections in the PostgreSQL connection pool. Default: `16`. Used by all concurrent introspection queries; table metadata is pulled schema-wide in one query per resource kind (columns, indexes, constraints, triggers, policies, partition info, definitions) so connection count mostly matters for the sibling queries (extensions, sequences, routines, views, etc.) running in parallel. `--use-cascade` - add `CASCADE` to every `DROP` statement in the clear script. **Warning:** `CASCADE` can silently drop dependent objects that live outside the selected schema(s) (e.g., foreign keys or views in other schemas referencing the dropped objects). Use only when you are certain no cross-schema dependencies should survive. Without this flag the generated drops rely on the explicit dependency ordering and will fail cleanly if unresolved dependencies exist. @@ -95,6 +97,24 @@ This command comparing two dumps and produce SQL script for the `FROM` database If we add `--use-drop` argument comparer will add drop scripts for all items that non exists in target database, otherwise drop scripts will be ignored. By default, comparer ignore drops. +### Production-friendly output + +```bash +pgc --command compare --from {from_dump} --to {to_dump} --output {file} --use-single-transaction --output-for-production +``` + +By default the delta script favours brevity and is meant to be applied to an idle database. With `--output-for-production true` (config key `OUTPUT_FOR_PRODUCTION=true`) the comparer instead emits a script that minimises locking on a live database: + +- **Indexes are built concurrently** — `CREATE INDEX` becomes `CREATE INDEX CONCURRENTLY` (the `UNIQUE` keyword is preserved), so building an index does not block writes. +- **Partitioned tables are handled correctly** — `CONCURRENTLY` is not allowed directly on a partitioned table, so for a partitioned parent the comparer emits `CREATE INDEX ... ON ONLY {parent}` (in the transaction), then `CREATE INDEX CONCURRENTLY` on each partition followed by `ALTER INDEX ... ATTACH PARTITION` (after the transaction). When the partition layout cannot be expanded safely (no known partitions, or multi-level/sub-partitioned children) it falls back to a single non-concurrent `CREATE INDEX` on the parent and explains why in a comment. +- **Indexes are dropped concurrently** — `DROP INDEX` becomes `DROP INDEX CONCURRENTLY` (kept non-concurrent for indexes on partitioned tables, where concurrent drop is illegal). +- **Foreign keys are validated separately** — a new foreign key is added `NOT VALID` inside the transaction (a fast, metadata-only operation) and a matching `VALIDATE CONSTRAINT` (the long, scan-heavy step) is emitted afterwards so it does not hold the lock for the whole migration. +- **DDL is emitted defensively so the migration is re-runnable** — every statement PostgreSQL supports a guard for is generated idempotently, so a migration that was applied partially (e.g. a post-commit step failed) can be replayed without errors. `CREATE [UNLOGGED] TABLE`, `CREATE [UNLOGGED] SEQUENCE`, `CREATE MATERIALIZED VIEW` and `CREATE [UNIQUE] INDEX [CONCURRENTLY]` gain `IF NOT EXISTS`; `CREATE VIEW` becomes `CREATE OR REPLACE VIEW`; `ALTER TABLE ... ADD COLUMN` gains `IF NOT EXISTS`; and `ALTER TABLE ... DROP COLUMN` / `DROP CONSTRAINT` gain `IF EXISTS`. `CREATE TYPE` and `ALTER TABLE ... ADD CONSTRAINT` are left unguarded because PostgreSQL has no idempotency clause for them. The rewrite is SQL-aware (it skips string literals, quoted identifiers, comments — including commented-out drops — and dollar-quoted function bodies), so only the structural keyword of each statement is touched. + +Because `CREATE/DROP INDEX CONCURRENTLY`, `VALIDATE CONSTRAINT` and `ALTER INDEX ... ATTACH PARTITION` **cannot run inside a transaction block**, every such statement is moved to a clearly marked `Production post-commit` section emitted **after** the `commit;`. The rest of the migration still runs inside the single transaction when `--use-single-transaction` is set. Each post-commit statement runs in its own implicit transaction; most are safe to re-run thanks to idempotency guards, but `ALTER INDEX ... ATTACH PARTITION` has no built-in guard, so already-successful ATTACH steps may need to be skipped when replaying the section. + +`--output-for-production` defaults to `false`; when off the output is byte-for-byte identical to previous behaviour. + ### Generate a clear (drop-all) script for a database ```bash @@ -153,8 +173,11 @@ USE_SINGLE_TRANSACTION=true USE_COMMENTS=false GRANTS_MODE=ignore MAX_CONNECTIONS=16 +OUTPUT_FOR_PRODUCTION=false ``` +`OUTPUT_FOR_PRODUCTION` (default `false`) is the configuration-file equivalent of the `--output-for-production` flag described in [Production-friendly output](#production-friendly-output). + ## Choosing `MAX_CONNECTIONS` `MAX_CONNECTIONS` caps the connection pool the tool opens per dump. Independent introspection queries (extensions, sequences, routines, views, types, tables, etc.) are fired concurrently via `tokio::try_join!`, and the table-level data (columns, indexes, constraints, triggers, policies, partition info, definitions) is pulled with **schema-wide** queries — one query per sub-resource, independent of table count. So the connection count mostly bounds how many of the ~18 concurrent sibling queries run without queueing. diff --git a/app/Cargo.lock b/app/Cargo.lock index 209158b..33fa333 100644 --- a/app/Cargo.lock +++ b/app/Cargo.lock @@ -1146,7 +1146,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pgc" -version = "1.0.21" +version = "1.0.23" dependencies = [ "chrono", "clap", diff --git a/app/Cargo.toml b/app/Cargo.toml index 12d0e9e..ced8b0b 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgc" -version = "1.0.21" +version = "1.0.23" edition = "2024" license = "MIT" authors = ["nettrash "] diff --git a/app/src/comparer/core.rs b/app/src/comparer/core.rs index e14c7a4..51c61d4 100644 --- a/app/src/comparer/core.rs +++ b/app/src/comparer/core.rs @@ -1,6 +1,8 @@ +use crate::comparer::production::{self, ChildRef, PartitionContext}; use crate::config::grants_mode::GrantsMode; use crate::dump::acl; use crate::dump::column_dependent::ColumnDependentKind; +use crate::dump::table::IndexAlterPlan; use crate::dump::table_column::TableColumn; use crate::dump::table_constraint::TableConstraint; use crate::dump::table_index::TableIndex; @@ -28,9 +30,17 @@ pub struct Comparer { use_comments: bool, // How to handle grants (privileges) during comparison grants_mode: GrantsMode, + // Whether to emit a migration script that is safe/convenient to run on a + // live production database (concurrent index builds, partition-aware index + // creation, NOT VALID + VALIDATE for foreign keys, concurrent index drops). + output_for_production: bool, // The script that will be generated script: String, + // Statements that must run after the main transaction commits, because + // CONCURRENTLY / VALIDATE CONSTRAINT / ATTACH PARTITION cannot run inside a + // transaction block. Only populated when `output_for_production` is set. + production_post_script: String, enum_pre_script: String, enum_post_script: String, type_post_script: String, @@ -64,7 +74,9 @@ impl Comparer { use_single_transaction, use_comments, grants_mode, + output_for_production: false, script: String::new(), + production_post_script: String::new(), enum_pre_script: String::new(), enum_post_script: String::new(), type_post_script: String::new(), @@ -92,8 +104,35 @@ impl Comparer { comparer } + /// Enable or disable production-friendly output. When enabled, indexes are + /// built concurrently (partition-aware), foreign keys are added `NOT VALID` + /// then validated after commit, and indexes are dropped concurrently — all + /// post-commit statements are emitted after the main transaction. + pub fn set_output_for_production(&mut self, value: bool) -> &mut Self { + self.output_for_production = value; + self + } + // Compare dumps and generate the script pub async fn compare(&mut self) -> Result<(), Error> { + if self.output_for_production { + // The statements that cannot run inside a transaction block are + // emitted in a trailing section. With --use-single-transaction the + // rest of the migration is wrapped in begin/commit, so that section + // lands after COMMIT; without it no transaction is opened and the + // section is simply the tail of the script (each statement still + // runs in its own implicit transaction). + let tail = if self.use_single_transaction { + "are emitted after COMMIT" + } else { + "are emitted in a separate section at the end" + }; + self.script.append_block(&format!( + "/* Output generated for production: indexes are built/dropped concurrently, \ + foreign keys are validated, and the statements that cannot run inside a \ + transaction {tail}. */" + )); + } if self.use_single_transaction { self.script.append_block("begin;"); } @@ -156,6 +195,31 @@ impl Comparer { self.script.push_str("\ncommit;"); } + // Concurrent index builds/drops, FK validations and partition-index + // attaches cannot run inside a transaction block, so they are emitted + // here in a trailing section — after COMMIT when --use-single-transaction + // wrapped the migration, otherwise just at the end. Either way each runs + // in its own implicit transaction. + if !self.production_post_script.is_empty() { + self.script.append_block( + "\n/* ---> Production post-commit (run outside a transaction): Start --------------- */", + ); + let post = std::mem::take(&mut self.production_post_script); + self.script.push_str(&post); + self.script + .append_block("/* ---> Production post-commit: End --------------- */"); + } + + // Production migrations are meant to be re-runnable against a live, + // possibly partially-migrated database: inject `if [not] exists` / + // `or replace` guards into every DDL form PostgreSQL supports them for. + // The pass is literal-/comment-aware so commented-out drops and quoted + // text are left untouched. Done last so it also covers the post-commit + // (concurrent index) statements appended above. + if self.output_for_production { + self.script = production::make_idempotent(&self.script); + } + Ok(()) } @@ -298,6 +362,108 @@ impl Comparer { } } + /// Build the partition topology maps the production output path needs: + /// the set of partitioned parent tables, each parent's direct partitions, + /// and the set of indexes that live on a partitioned parent (which cannot + /// be dropped with `DROP INDEX CONCURRENTLY`). All keys use the dump's + /// `quote_ident`-applied `schema.name` form so they match `partition_of`. + fn build_partition_context_maps( + &self, + ) -> ( + HashSet, + HashMap>, + HashSet, + ) { + let mut partitioned_parents: HashSet = HashSet::new(); + let mut partitioned_indexes: HashSet = HashSet::new(); + for table in self.to.tables.iter().chain(self.from.tables.iter()) { + if table.partition_key.is_some() { + partitioned_parents.insert(format!("{}.{}", table.schema, table.name)); + for index in &table.indexes { + if !index.is_partition_index { + partitioned_indexes.insert(format!("{}.{}", index.schema, index.name)); + } + } + } + } + // Children come from the TO side — the schema we are migrating toward. + let mut children: HashMap> = HashMap::new(); + for table in &self.to.tables { + if let Some(parent) = &table.partition_of { + children.entry(parent.clone()).or_default().push(ChildRef { + schema: table.schema.clone(), + table: table.name.clone(), + }); + } + } + (partitioned_parents, children, partitioned_indexes) + } + + /// Emit a new table's CREATE (without triggers) plus its indexes, rewriting + /// every index for production (concurrent / partition-aware). Free function + /// over the two buffers so it can run while `self.to` is borrowed by the + /// emission loops. + fn emit_new_table_create_prod( + script: &mut String, + post_commit: &mut String, + table: &Table, + ctx: &PartitionContext, + ) { + script.push_str(&table.get_script_without_triggers_no_indexes()); + for index in table.creatable_indexes() { + let split = production::index_create_split(index, ctx, true); + script.push_str(&split.in_txn); + post_commit.push_str(&split.post_commit); + } + } + + /// Emit the index changes of an ALTER for production: drops (concurrent + /// unless on a partitioned table), comment-only changes (in-txn), then + /// (re)creates (concurrent / partition-aware). Mirrors the ordering of + /// `build_alter_script` (drops before creates). + fn emit_index_alter_plan_prod( + script: &mut String, + post_commit: &mut String, + plan: &IndexAlterPlan, + use_drop: bool, + ctx: &PartitionContext, + ) { + for old_index in &plan.drop { + let (stmt, post) = production::index_drop_statement(old_index, ctx); + let block = stmt.with_empty_lines(); + let target = if post { + &mut *post_commit + } else { + &mut *script + }; + if use_drop { + target.push_str(&block); + } else { + target.push_str(&format!("-- {block}")); + } + } + for index in &plan.comment_changes { + if let Some(comment) = &index.comment { + script.append_block(&format!( + "comment on index {}.{} is '{}';", + index.schema, + index.name, + comment.replace('\'', "''") + )); + } else { + script.append_block(&format!( + "comment on index {}.{} is null;", + index.schema, index.name + )); + } + } + for index in &plan.create { + let split = production::index_create_split(index, ctx, false); + script.push_str(&split.in_txn); + post_commit.push_str(&split.post_commit); + } + } + fn get_script(&self) -> String { if self.use_comments { return self.script.clone(); @@ -1638,6 +1804,18 @@ impl Comparer { // We will drop all tables that exists just in "from" dump (partitions first). let ordered_from_drop = Self::ordered_tables_for_drop(&self.from.tables); + // Partition topology used by the production output path to build/drop + // indexes concurrently and partition-aware. Owned maps, so they don't + // borrow `self` and the emission loops below can still push to the + // script buffers. + let (partitioned_parents, partition_children, partitioned_indexes) = + self.build_partition_context_maps(); + let partition_ctx = PartitionContext { + partitioned_parents: &partitioned_parents, + children: &partition_children, + partitioned_indexes: &partitioned_indexes, + }; + // Build lookup maps for O(1) access let to_table_map: HashMap<(&str, &str), usize> = self .to @@ -1881,12 +2059,23 @@ impl Comparer { self.script .push_str(format!("/* Table: {}.{}*/\n", table.schema, table.name).as_str()); - self.script - .push_str(table.get_script_without_triggers().as_str()); + if self.output_for_production { + Self::emit_new_table_create_prod( + &mut self.script, + &mut self.production_post_script, + table, + &partition_ctx, + ); + } else { + self.script + .push_str(table.get_script_without_triggers().as_str()); + } self.trigger_post_script .push_str(table.get_trigger_script().as_str()); } } + // NOTE: `table` above is `&&Table` (from `ordered_to.iter()`); the + // helper takes `&Table` via deref coercion at the call boundary. // We will find all existing tables in both dumps with different hashes for table in ordered_from.iter() { @@ -1932,8 +2121,17 @@ impl Comparer { // `recreated_tables` to swap in the default ACL. self.recreated_tables .insert(Self::table_key(&table.schema, &table.name)); - self.script - .push_str(to_table.get_script_without_triggers().as_str()); + if self.output_for_production { + Self::emit_new_table_create_prod( + &mut self.script, + &mut self.production_post_script, + to_table, + &partition_ctx, + ); + } else { + self.script + .push_str(to_table.get_script_without_triggers().as_str()); + } self.trigger_post_script .push_str(to_table.get_trigger_script().as_str()); } else { @@ -1949,9 +2147,6 @@ impl Comparer { // drift apart. let table_recreated = table.will_be_dropped_and_recreated(to_table); - let alter_script = - table.get_alter_script_without_triggers(to_table, self.use_drop); - if table_recreated { // Mark for grants: same reasoning as the branch // above — the dropped+recreated table inherits @@ -1965,7 +2160,30 @@ impl Comparer { } } - self.script.push_str(alter_script.as_str()); + // In production mode (and only when the table is altered + // in place — a wholesale drop+recreate re-emits the full + // CREATE with inline indexes anyway), defer index + // create/drop and re-emit them concurrently / partition- + // aware via the index alter plan. + if self.output_for_production && !table_recreated { + let alter_script = table.get_alter_script_without_triggers_no_indexes( + to_table, + self.use_drop, + ); + self.script.push_str(alter_script.as_str()); + let plan = table.index_alter_plan(to_table); + Self::emit_index_alter_plan_prod( + &mut self.script, + &mut self.production_post_script, + &plan, + self.use_drop, + &partition_ctx, + ); + } else { + let alter_script = + table.get_alter_script_without_triggers(to_table, self.use_drop); + self.script.push_str(alter_script.as_str()); + } // Issue #188 / Path B: when `get_alter_script` // routes a column through its @@ -2026,8 +2244,17 @@ impl Comparer { for table in deferred_new_children { self.script .push_str(format!("/* Table: {}.{}*/\n", table.schema, table.name).as_str()); - self.script - .push_str(table.get_script_without_triggers().as_str()); + if self.output_for_production { + Self::emit_new_table_create_prod( + &mut self.script, + &mut self.production_post_script, + table, + &partition_ctx, + ); + } else { + self.script + .push_str(table.get_script_without_triggers().as_str()); + } self.trigger_post_script .push_str(table.get_trigger_script().as_str()); } @@ -2635,11 +2862,33 @@ impl Comparer { } // Table exists in both. Check for FK changes. - self.script - .push_str(&from_table.get_foreign_key_alter_script(table)); + if self.output_for_production { + // In-place FK modifications stay as-is; brand-new foreign + // keys are added NOT VALID and validated after commit. + let (alters, new_fks) = from_table.foreign_key_alter_split(table); + self.script.push_str(&alters); + for constraint in new_fks { + let split = production::foreign_key_split(constraint); + self.script.push_str(&split.in_txn); + self.production_post_script.push_str(&split.post_commit); + } + } else { + self.script + .push_str(&from_table.get_foreign_key_alter_script(table)); + } } else { // New table. Add its FKs. - self.script.push_str(&table.get_foreign_key_script()); + if self.output_for_production { + for constraint in &table.constraints { + if constraint.constraint_type.to_lowercase() == "foreign key" { + let split = production::foreign_key_split(constraint); + self.script.push_str(&split.in_txn); + self.production_post_script.push_str(&split.post_commit); + } + } + } else { + self.script.push_str(&table.get_foreign_key_script()); + } } } diff --git a/app/src/comparer/core_tests.rs b/app/src/comparer/core_tests.rs index 05f3eab..7ff81ff 100644 --- a/app/src/comparer/core_tests.rs +++ b/app/src/comparer/core_tests.rs @@ -11121,3 +11121,195 @@ async fn issue191_pr198_cycle_fk_comment_survives_round_trip() { script ); } + +#[tokio::test] +async fn output_for_production_defers_concurrent_index_and_validates_fk() { + // New table with one index and one foreign key. Production mode must build + // the index CONCURRENTLY after COMMIT, add the FK NOT VALID inside the + // transaction, and VALIDATE it after COMMIT. + let from_dump = Dump::new(DumpConfig::default()); + let mut to_dump = Dump::new(DumpConfig::default()); + + let index = TableIndex { + schema: "public".to_string(), + table: "orders".to_string(), + name: "idx_orders_total".to_string(), + catalog: None, + indexdef: "CREATE INDEX idx_orders_total ON public.orders USING btree (total)".to_string(), + is_partition_index: false, + comment: None, + }; + let fk = TableConstraint { + catalog: "postgres".to_string(), + schema: "public".to_string(), + name: "fk_orders_customer".to_string(), + table_name: "orders".to_string(), + constraint_type: "FOREIGN KEY".to_string(), + is_deferrable: false, + initially_deferred: false, + definition: Some("FOREIGN KEY (customer_id) REFERENCES public.customers (id)".to_string()), + coninhcount: 0, + is_enforced: true, + no_inherit: false, + nulls_not_distinct: false, + comment: None, + }; + let table = Table::new( + "public".to_string(), + "orders".to_string(), + "public".to_string(), + "orders".to_string(), + "postgres".to_string(), + None, + vec![], + vec![fk], + vec![index], + vec![], + None, + ); + to_dump.tables.push(table); + + let mut comparer = Comparer::new(from_dump, to_dump, false, true, true, GrantsMode::Ignore); + comparer.set_output_for_production(true); + comparer.compare().await.unwrap(); + let script = comparer.get_script(); + + let commit_pos = script.find("commit;").expect("script must contain commit;"); + let concurrent_pos = script + .find( + "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_total ON public.orders USING btree (total);", + ) + .expect("concurrent index build must be present"); + let validate_pos = script + .find("validate constraint fk_orders_customer;") + .expect("FK validation must be present"); + let not_valid_pos = script + .find("not valid;") + .expect("FK must be added NOT VALID"); + + // Concurrent build and FK validation run after COMMIT. + assert!( + concurrent_pos > commit_pos, + "concurrent index must come after commit;\n{script}" + ); + assert!( + validate_pos > commit_pos, + "validate constraint must come after commit;\n{script}" + ); + // The FK is added NOT VALID before COMMIT. + assert!( + not_valid_pos < commit_pos, + "FK NOT VALID must be added before commit;\n{script}" + ); + // The in-transaction CREATE TABLE must not carry an inline (non-concurrent) + // index build. + let in_txn = &script[..commit_pos]; + assert!( + !in_txn.contains("CREATE INDEX idx_orders_total ON public.orders"), + "index must be deferred out of the in-transaction create:\n{in_txn}" + ); +} + +#[tokio::test] +async fn output_for_production_disabled_keeps_inline_index() { + // Same shape as above but with the flag off: the index is inline in the + // CREATE TABLE and there is no post-commit / CONCURRENTLY output. + let from_dump = Dump::new(DumpConfig::default()); + let mut to_dump = Dump::new(DumpConfig::default()); + + let index = TableIndex { + schema: "public".to_string(), + table: "orders".to_string(), + name: "idx_orders_total".to_string(), + catalog: None, + indexdef: "CREATE INDEX idx_orders_total ON public.orders USING btree (total)".to_string(), + is_partition_index: false, + comment: None, + }; + let table = Table::new( + "public".to_string(), + "orders".to_string(), + "public".to_string(), + "orders".to_string(), + "postgres".to_string(), + None, + vec![], + vec![], + vec![index], + vec![], + None, + ); + to_dump.tables.push(table); + + let mut comparer = Comparer::new(from_dump, to_dump, false, true, true, GrantsMode::Ignore); + comparer.compare().await.unwrap(); + let script = comparer.get_script(); + + assert!( + script.contains("CREATE INDEX idx_orders_total ON public.orders USING btree (total);"), + "default mode keeps the inline index:\n{script}" + ); + assert!( + !script.contains("CONCURRENTLY"), + "default mode must not emit CONCURRENTLY:\n{script}" + ); + assert!( + !script.contains("Production post-commit"), + "default mode must not emit a post-commit section:\n{script}" + ); +} + +#[tokio::test] +async fn production_header_mentions_commit_only_with_single_transaction() { + // The production header comment must not claim post-transaction statements + // run "after COMMIT" when --use-single-transaction is off (no COMMIT is + // emitted in that valid configuration). + let build = || { + let mut to_dump = Dump::new(DumpConfig::default()); + to_dump.tables.push(Table::new( + "public".to_string(), + "orders".to_string(), + "public".to_string(), + "orders".to_string(), + "postgres".to_string(), + None, + vec![], + vec![], + vec![], + vec![], + None, + )); + (Dump::new(DumpConfig::default()), to_dump) + }; + + // With single transaction: header says "after COMMIT" and a commit; exists. + let (from_txn, to_txn) = build(); + let mut with_txn = Comparer::new(from_txn, to_txn, false, true, true, GrantsMode::Ignore); + with_txn.set_output_for_production(true); + with_txn.compare().await.unwrap(); + let txn_script = with_txn.get_script(); + assert!( + txn_script.contains("are emitted after COMMIT"), + "single-transaction header must mention COMMIT:\n{txn_script}" + ); + assert!(txn_script.contains("commit;"), "{txn_script}"); + + // Without single transaction: no COMMIT, so the header must not claim one. + let (from_no, to_no) = build(); + let mut no_txn = Comparer::new(from_no, to_no, false, false, true, GrantsMode::Ignore); + no_txn.set_output_for_production(true); + no_txn.compare().await.unwrap(); + let no_txn_script = no_txn.get_script(); + assert!( + !no_txn_script.contains("commit;"), + "no transaction must be opened without --use-single-transaction:\n{no_txn_script}" + ); + assert!( + !no_txn_script.contains("after COMMIT"), + "header must not claim 'after COMMIT' without a transaction:\n{no_txn_script}" + ); + assert!( + no_txn_script.contains("are emitted in a separate section at the end"), + "header must describe the trailing section accurately:\n{no_txn_script}" + ); +} diff --git a/app/src/comparer/mod.rs b/app/src/comparer/mod.rs index 0703e4b..3487ef1 100644 --- a/app/src/comparer/mod.rs +++ b/app/src/comparer/mod.rs @@ -1,2 +1,3 @@ pub mod core; +pub mod production; mod scanner; diff --git a/app/src/comparer/production.rs b/app/src/comparer/production.rs new file mode 100644 index 0000000..2b48073 --- /dev/null +++ b/app/src/comparer/production.rs @@ -0,0 +1,619 @@ +//! Production-mode SQL rewriting for the `output_for_production` flag. +//! +//! When the flag is enabled the comparer routes index and foreign-key DDL +//! through these helpers so the resulting migration is convenient to apply to a +//! live database: +//! +//! * indexes are built with `CREATE INDEX CONCURRENTLY` (no long write lock); +//! * indexes on partitioned parents use the only-safe pattern — +//! `CREATE INDEX ON ONLY parent`, then `CREATE INDEX CONCURRENTLY` on each +//! partition, then `ALTER INDEX ... ATTACH PARTITION` — because +//! `CONCURRENTLY` is rejected on a partitioned table directly; +//! * foreign keys are added `NOT VALID` and validated in a separate step +//! (`VALIDATE CONSTRAINT`) so the long validation scan does not hold a +//! table lock for the whole migration; +//! * indexes are dropped with `DROP INDEX CONCURRENTLY`. +//! +//! `CONCURRENTLY`, `VALIDATE CONSTRAINT` and `ATTACH PARTITION` cannot run +//! inside a transaction block, so every statement that must run after the main +//! transaction commits is collected in the `post_commit` half of a +//! [`ProdSplit`]. The comparer appends that buffer after `commit;`. + +use std::collections::{HashMap, HashSet}; + +use crate::comparer::scanner::{copy_quoted_literal, dollar_tag_at}; +use crate::dump::table_constraint::TableConstraint; +use crate::dump::table_index::TableIndex; +use crate::utils::string_extensions::StringExt; + +/// A pair of script fragments produced by a production rewrite: statements that +/// stay inside the migration transaction, and statements that must run after it +/// commits (each in its own autocommit statement). +#[derive(Debug, Default, PartialEq, Eq)] +pub struct ProdSplit { + pub in_txn: String, + pub post_commit: String, +} + +/// Direct child of a partitioned parent table. Identifiers are stored in the +/// same `quote_ident`-applied form as [`TableIndex::schema`] / [`TableIndex::table`]. +#[derive(Debug, Clone)] +pub struct ChildRef { + pub schema: String, + pub table: String, +} + +/// Partition topology the index rewriters need. All qualified names use the +/// `quote_ident(schema).quote_ident(name)` form already produced by the dump. +pub struct PartitionContext<'a> { + /// Qualified names of partitioned parent tables (relkind `p`). + pub partitioned_parents: &'a HashSet, + /// Parent qualified name -> its direct partitions. + pub children: &'a HashMap>, + /// Qualified index names (`schema.name`) whose underlying table is a + /// partitioned parent. `DROP INDEX CONCURRENTLY` is illegal for these. + pub partitioned_indexes: &'a HashSet, +} + +/// Insert `CONCURRENTLY` after the `CREATE [UNIQUE] INDEX` keyword. +/// `pg_get_indexdef` always emits uppercase keywords, so a literal prefix match +/// suffices; an unrecognised statement is returned unchanged. +fn insert_concurrently(indexdef: &str) -> String { + if let Some(rest) = indexdef.strip_prefix("CREATE UNIQUE INDEX ") { + return format!("CREATE UNIQUE INDEX CONCURRENTLY {rest}"); + } + if let Some(rest) = indexdef.strip_prefix("CREATE INDEX ") { + return format!("CREATE INDEX CONCURRENTLY {rest}"); + } + indexdef.to_string() +} + +/// Insert `ONLY` after the `ON` of an index definition so the index is created +/// on the partitioned parent alone (invalid until every partition's index is +/// attached). Returns `None` if the statement has no ` ON ` token. +/// +/// `pg_get_indexdef` already emits `ON ONLY` for an index on a partitioned +/// parent, so the rewrite is idempotent: a definition that is already +/// `ON ONLY` is returned unchanged rather than producing `ON ONLY ONLY`. +fn make_on_only(indexdef: &str) -> Option { + let pos = indexdef.find(" ON ")?; + let rest = &indexdef[pos + " ON ".len()..]; + if rest.starts_with("ONLY ") { + return Some(indexdef.to_string()); + } + let mut out = String::with_capacity(indexdef.len() + 5); + out.push_str(&indexdef[..pos]); + out.push_str(" ON ONLY "); + out.push_str(rest); + Some(out) +} + +/// Split an index definition into its `CREATE [UNIQUE] INDEX` keyword and the +/// `USING ...` tail (access method, columns, predicate). Returns `None` when +/// the statement does not have the expected shape. +fn create_kw_and_tail(indexdef: &str) -> Option<(&'static str, &str)> { + let create_kw = if indexdef.starts_with("CREATE UNIQUE INDEX ") { + "CREATE UNIQUE INDEX" + } else if indexdef.starts_with("CREATE INDEX ") { + "CREATE INDEX" + } else { + return None; + }; + let using = indexdef.find(" USING ")?; + Some((create_kw, indexdef[using + 1..].trim_end())) +} + +fn unquote_ident(s: &str) -> String { + let t = s.trim(); + if t.len() >= 2 && t.starts_with('"') && t.ends_with('"') { + t[1..t.len() - 1].replace("\"\"", "\"") + } else { + t.to_string() + } +} + +fn quote_ident(s: &str) -> String { + let needs_quote = s.is_empty() + || s.chars() + .next() + .map(|c| !(c.is_ascii_lowercase() || c == '_')) + == Some(true) + || s.chars() + .any(|c| !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')); + if needs_quote { + format!("\"{}\"", s.replace('"', "\"\"")) + } else { + s.to_string() + } +} + +fn comment_on_index(index: &TableIndex, comment: &str) -> String { + format!( + "comment on index {}.{} is '{}';", + index.schema, + index.name, + comment.replace('\'', "''") + ) +} + +/// Derive a unique per-partition index name from the parent index and the +/// partition table name, e.g. parent index `idx_orders_total` on partition +/// `orders_2024` -> `orders_2024_idx_orders_total`. PostgreSQL truncates names +/// to 63 bytes; on the rare collision the migration would error rather than do +/// the wrong thing. +fn child_index_name(parent_index: &TableIndex, child: &ChildRef) -> String { + quote_ident(&format!( + "{}_{}", + unquote_ident(&child.table), + unquote_ident(&parent_index.name) + )) +} + +/// Production rewrite of a single index *creation*. +/// +/// `parent_is_new` is true when the index's table is being created from scratch +/// in this same migration (the new-table path). For a brand-new *partitioned* +/// parent every partition is also new and empty: each is created later via +/// `CREATE TABLE ... PARTITION OF`, at which point PostgreSQL automatically +/// creates and attaches the matching partition index. Running the manual +/// per-partition `CREATE INDEX CONCURRENTLY ... ; ALTER INDEX ... ATTACH +/// PARTITION` dance on top of that collides with the auto-attached index +/// (`another index is already attached for partition ...`), so a new +/// partitioned parent gets a plain in-transaction `CREATE INDEX` — correct and +/// cheap, since the table holds no rows yet. +pub fn index_create_split( + index: &TableIndex, + ctx: &PartitionContext, + parent_is_new: bool, +) -> ProdSplit { + let mut split = ProdSplit::default(); + let parent_qualified = format!("{}.{}", index.schema, index.table); + let is_partitioned_parent = ctx.partitioned_parents.contains(&parent_qualified); + + // New partitioned parent: let PostgreSQL manage the partition indexes when + // the (also new, empty) partitions are created. A plain in-txn build avoids + // the double-attach conflict. + if is_partitioned_parent && parent_is_new { + split.in_txn.push_str(&index.indexdef); + split.in_txn.append_block(";"); + if let Some(comment) = &index.comment { + split.in_txn.append_block(&comment_on_index(index, comment)); + } + return split; + } + + // Plain (non-partitioned) table: a straight concurrent build, post-commit. + if !is_partitioned_parent { + split + .post_commit + .push_str(&insert_concurrently(&index.indexdef)); + split.post_commit.append_block(";"); + if let Some(comment) = &index.comment { + split + .post_commit + .append_block(&comment_on_index(index, comment)); + } + return split; + } + + let children = ctx + .children + .get(&parent_qualified) + .map(Vec::as_slice) + .unwrap_or(&[]); + + // Concurrent per-partition handling needs the parsed keyword/tail and a + // single level of partitioning. Sub-partitioned children (a partition that + // is itself a partitioned table) and unparseable definitions fall back to a + // plain, in-transaction CREATE INDEX on the parent (which recurses to every + // partition under a brief lock — CONCURRENTLY is illegal on a partitioned + // table, so this is the correct, if less convenient, fallback). + let has_sub_partition = children.iter().any(|c| { + ctx.partitioned_parents + .contains(&format!("{}.{}", c.schema, c.table)) + }); + let parsed = create_kw_and_tail(&index.indexdef); + + if children.is_empty() || has_sub_partition || parsed.is_none() { + let reason = if parsed.is_none() { + "unrecognised index definition" + } else if children.is_empty() { + "no partitions found" + } else { + "multi-level partitioning" + }; + split.in_txn.push_str(&format!( + "/* partitioned index {}.{}: {reason} — created non-concurrently */\n", + index.schema, index.name + )); + split.in_txn.push_str(&index.indexdef); + split.in_txn.append_block(";"); + if let Some(comment) = &index.comment { + split.in_txn.append_block(&comment_on_index(index, comment)); + } + return split; + } + + let (create_kw, tail) = parsed.unwrap(); + + // 1. Create the index on the parent only (metadata-only, fast) in-txn. + match make_on_only(&index.indexdef) { + Some(only) => split.in_txn.append_block(&format!("{only};")), + None => split.in_txn.append_block(&format!( + "{create_kw} {} ON ONLY {}.{} {tail};", + index.name, index.schema, index.table + )), + } + if let Some(comment) = &index.comment { + split.in_txn.append_block(&comment_on_index(index, comment)); + } + + // 2. Build each partition's index concurrently, then attach it. Once every + // partition is attached the parent index becomes valid automatically. + for child in children { + let child_idx = child_index_name(index, child); + split.post_commit.append_block(&format!( + "{create_kw} CONCURRENTLY {child_idx} ON {}.{} {tail};", + child.schema, child.table + )); + split.post_commit.append_block(&format!( + "alter index {}.{} attach partition {}.{};", + index.schema, index.name, child.schema, child_idx + )); + } + split +} + +/// Production rewrite of a single index *drop*. Returns the statement plus +/// whether it must run post-commit (`true`) or stay in-transaction (`false`). +/// `DROP INDEX CONCURRENTLY` is illegal for an index on a partitioned table, so +/// those stay in-transaction and non-concurrent. +pub fn index_drop_statement(index: &TableIndex, ctx: &PartitionContext) -> (String, bool) { + let qualified_index = format!("{}.{}", index.schema, index.name); + if ctx.partitioned_indexes.contains(&qualified_index) { + ( + format!("drop index if exists {}.{};", index.schema, index.name), + false, + ) + } else { + ( + format!( + "drop index concurrently if exists {}.{};", + index.schema, index.name + ), + true, + ) + } +} + +/// Production rewrite of a foreign-key constraint creation: add it `NOT VALID` +/// inside the transaction, then `VALIDATE CONSTRAINT` afterwards. Only enforced +/// FOREIGN KEY constraints are split; anything else is emitted unchanged +/// in-transaction (a `NOT ENFORCED` constraint is never validated, and other +/// constraint kinds do not support `NOT VALID` in this path). +pub fn foreign_key_split(constraint: &TableConstraint) -> ProdSplit { + let mut split = ProdSplit::default(); + let full = constraint.get_script(); + + if !constraint + .constraint_type + .eq_ignore_ascii_case("foreign key") + || !constraint.is_enforced + { + split.in_txn.push_str(&full); + return split; + } + + // The ADD CONSTRAINT statement ends at the first ';' (a FOREIGN KEY clause + // never contains one); any trailing `comment on constraint` block follows. + match full.find(';') { + Some(semi) => { + let add_stmt = full[..semi].trim_end(); + let remainder = full[semi + 1..].trim_start_matches('\n'); + + let mut in_txn = add_stmt.to_string(); + in_txn.push_str(" not valid;"); + in_txn = in_txn.with_empty_lines(); + if !remainder.trim().is_empty() { + in_txn.push_str(remainder); + } + split.in_txn = in_txn; + split.post_commit = format!( + "alter table {}.{} validate constraint {};", + constraint.schema, constraint.table_name, constraint.name + ) + .with_empty_lines(); + } + None => split.in_txn.push_str(&full), + } + split +} + +/// Case-insensitive ASCII check that `src[pos..]` begins with `pat`. +fn matches_ci(src: &[u8], pos: usize, pat: &[u8]) -> bool { + pos + pat.len() <= src.len() && src[pos..pos + pat.len()].eq_ignore_ascii_case(pat) +} + +/// A `create …` statement form that gains an `if not exists` guard. `prefix` +/// is the leading keyword (including its trailing space) the guard is inserted +/// after; `guard` is the exact text injected (its casing matches the +/// surrounding keyword family). Longer prefixes must precede their own +/// sub-prefixes so the most specific form wins. +struct CreateGuard { + prefix: &'static [u8], + guard: &'static [u8], +} + +/// `create …` forms that take `if not exists`, most-specific first. `create +/// view` (regular, non-materialized) is handled separately — PostgreSQL has no +/// `IF NOT EXISTS` for it, so it is rewritten to `create or replace view`. +/// `create type` is intentionally absent: PostgreSQL supports no idempotency +/// guard for it, and a drop-then-create rewrite would cascade dependents. +const CREATE_GUARDS: &[CreateGuard] = &[ + CreateGuard { + prefix: b"create unlogged table ", + guard: b"if not exists ", + }, + CreateGuard { + prefix: b"create table ", + guard: b"if not exists ", + }, + CreateGuard { + prefix: b"create unlogged sequence ", + guard: b"if not exists ", + }, + CreateGuard { + prefix: b"create sequence ", + guard: b"if not exists ", + }, + CreateGuard { + prefix: b"create materialized view ", + guard: b"if not exists ", + }, + // Index forms use uppercase keywords (`pg_get_indexdef` output, optionally + // routed through `CREATE INDEX CONCURRENTLY` for production). The guard is + // injected after `concurrently ` when present so the access-method tail is + // untouched. Concurrently variants precede their plain counterparts. + CreateGuard { + prefix: b"create unique index concurrently ", + guard: b"IF NOT EXISTS ", + }, + CreateGuard { + prefix: b"create index concurrently ", + guard: b"IF NOT EXISTS ", + }, + CreateGuard { + prefix: b"create unique index ", + guard: b"IF NOT EXISTS ", + }, + CreateGuard { + prefix: b"create index ", + guard: b"IF NOT EXISTS ", + }, +]; + +/// `alter table … ` forms whose object reference gains an idempotency +/// guard. `op` is the operation keyword as generated (lowercase, no leading +/// space); `guard` is inserted directly after it. `add constraint` is +/// intentionally absent: PostgreSQL has no `IF NOT EXISTS` for it. +struct AlterGuard { + op: &'static [u8], + guard: &'static [u8], +} + +const ALTER_GUARDS: &[AlterGuard] = &[ + AlterGuard { + op: b"add column ", + guard: b"if not exists ", + }, + AlterGuard { + op: b"drop column ", + guard: b"if exists ", + }, + AlterGuard { + op: b"drop constraint ", + guard: b"if exists ", + }, +]; + +/// Make a production migration script re-runnable by injecting idempotency +/// guards into the DDL forms PostgreSQL supports them for. Applied once, at the +/// end of [`Comparer::compare`], only when `output_for_production` is set. +/// +/// The scan is literal-, comment- and dollar-quote-aware (mirroring +/// [`crate::comparer::scanner::strip_comments_and_collapse`]) so a keyword that +/// appears inside a string literal, quoted identifier, or comment is never +/// mistaken for a statement to rewrite — including the `-- ` line-commented +/// drops emitted when `use_drop` is off, which must stay untouched. +/// +/// Guards injected (each a no-op when already present): +/// * `create [unlogged] table` → `… if not exists` +/// * `create [unlogged] sequence` → `… if not exists` +/// * `create materialized view` → `… if not exists` +/// * `create view` → `create or replace view` +/// * `create [unique] index [concurrently]` → `… if not exists` +/// * `alter table … add column` → `… add column if not exists` +/// * `alter table … drop column` → `… drop column if exists` +/// * `alter table … drop constraint` → `… drop constraint if exists` +/// +/// `create type` and `alter table … add constraint` are deliberately left +/// unguarded: PostgreSQL has no `IF NOT EXISTS` for them, and a +/// drop-then-create rewrite would risk cascading dependent objects. +pub fn make_idempotent(script: &str) -> String { + let src = script.as_bytes(); + let len = src.len(); + let mut out: Vec = Vec::with_capacity(len + 64); + let mut i = 0; + // True at the very start and immediately after each top-level `;`, through + // the leading run of whitespace/comments, until the first code byte. + let mut at_stmt_start = true; + // Set when an `alter table` statement is open and its operation keyword has + // not yet been seen; cleared when the op is found or the statement ends. + let mut pending_alter = false; + + while i < len { + let b = src[i]; + + // Dollar-quoted string ($$…$$ / $tag$…$tag$) — copy verbatim. + if b == b'$' + && let Some(tag_len) = dollar_tag_at(src, i) + { + let tag = &src[i..i + tag_len]; + out.extend_from_slice(tag); + i += tag_len; + loop { + if i >= len { + break; + } + if src[i] == b'$' + && let Some(close_len) = dollar_tag_at(src, i) + && close_len == tag_len + && &src[i..i + close_len] == tag + { + out.extend_from_slice(&src[i..i + close_len]); + i += close_len; + break; + } + out.push(src[i]); + i += 1; + } + at_stmt_start = false; + continue; + } + // E-string literal E'…' / e'…' — copy verbatim. + if (b == b'E' || b == b'e') && i + 1 < len && src[i + 1] == b'\'' { + out.push(b); + out.push(b'\''); + i += 2; + copy_quoted_literal(src, &mut out, &mut i, b'\'', true); + at_stmt_start = false; + continue; + } + // Single-quoted string — copy verbatim. + if b == b'\'' { + out.push(b'\''); + i += 1; + copy_quoted_literal(src, &mut out, &mut i, b'\'', false); + at_stmt_start = false; + continue; + } + // Double-quoted identifier — copy verbatim. + if b == b'"' { + out.push(b'"'); + i += 1; + copy_quoted_literal(src, &mut out, &mut i, b'"', false); + at_stmt_start = false; + continue; + } + // Block comment /* … */ (PostgreSQL allows nesting) — copy verbatim. + // Trivia: does not end the leading-trivia run before a statement. + if i + 1 < len && b == b'/' && src[i + 1] == b'*' { + out.push(b'/'); + out.push(b'*'); + i += 2; + let mut depth: usize = 1; + while i + 1 < len && depth > 0 { + if src[i] == b'/' && src[i + 1] == b'*' { + depth += 1; + out.push(b'/'); + out.push(b'*'); + i += 2; + } else if src[i] == b'*' && src[i + 1] == b'/' { + depth -= 1; + out.push(b'*'); + out.push(b'/'); + i += 2; + } else { + out.push(src[i]); + i += 1; + } + } + continue; + } + // Line comment -- … — copy verbatim. Trivia (see block comment). + if i + 1 < len && b == b'-' && src[i + 1] == b'-' { + while i < len && src[i] != b'\n' { + out.push(src[i]); + i += 1; + } + continue; + } + + // Open `alter table` statement: look for its operation keyword at a + // word boundary (previous output byte is ASCII whitespace). The first + // such match is the structural operation — a later occurrence inside a + // default expression or check clause sits in a literal/quoted form + // already handled above. + if pending_alter + && out.last().is_some_and(u8::is_ascii_whitespace) + && let Some(g) = ALTER_GUARDS.iter().find(|g| matches_ci(src, i, g.op)) + { + out.extend_from_slice(&src[i..i + g.op.len()]); + i += g.op.len(); + if !matches_ci(src, i, g.guard) { + out.extend_from_slice(g.guard); + } + pending_alter = false; + continue; + } + + // Top-level statement terminator. + if b == b';' { + out.push(b';'); + i += 1; + at_stmt_start = true; + pending_alter = false; + continue; + } + + // Whitespace inside the leading-trivia run keeps `at_stmt_start` set. + if b.is_ascii_whitespace() { + out.push(b); + i += 1; + continue; + } + + // First code byte of a statement: classify and inject. + if at_stmt_start { + at_stmt_start = false; + if matches_ci(src, i, b"alter table ") { + out.extend_from_slice(&src[i..i + b"alter table ".len()]); + i += b"alter table ".len(); + pending_alter = true; + continue; + } + if matches_ci(src, i, b"create view ") { + // No IF NOT EXISTS for regular views — use OR REPLACE instead. + out.extend_from_slice(b"create or replace view "); + i += b"create view ".len(); + continue; + } + if matches_ci(src, i, b"create or replace ") { + // Already idempotent (view/function) — leave untouched. + out.push(b); + i += 1; + continue; + } + if let Some(g) = CREATE_GUARDS.iter().find(|g| matches_ci(src, i, g.prefix)) { + out.extend_from_slice(&src[i..i + g.prefix.len()]); + i += g.prefix.len(); + if !matches_ci(src, i, g.guard) { + out.extend_from_slice(g.guard); + } + continue; + } + } + + // Ordinary code byte. + out.push(b); + i += 1; + } + + // Safety: `out` is built entirely from slices of `script` (valid UTF-8) and + // ASCII guard literals, so it is guaranteed to be valid UTF-8. + String::from_utf8(out).expect("output must be valid UTF-8") +} + +#[cfg(test)] +#[path = "production_tests.rs"] +mod tests; diff --git a/app/src/comparer/production_tests.rs b/app/src/comparer/production_tests.rs new file mode 100644 index 0000000..f2ec768 --- /dev/null +++ b/app/src/comparer/production_tests.rs @@ -0,0 +1,599 @@ +use super::*; + +fn index(schema: &str, table: &str, name: &str, indexdef: &str) -> TableIndex { + TableIndex { + schema: schema.to_string(), + table: table.to_string(), + name: name.to_string(), + catalog: None, + indexdef: indexdef.to_string(), + is_partition_index: false, + comment: None, + } +} + +fn fk(schema: &str, table: &str, name: &str, definition: &str) -> TableConstraint { + TableConstraint { + catalog: "postgres".to_string(), + schema: schema.to_string(), + name: name.to_string(), + table_name: table.to_string(), + constraint_type: "FOREIGN KEY".to_string(), + is_deferrable: false, + initially_deferred: false, + definition: Some(definition.to_string()), + coninhcount: 0, + is_enforced: true, + no_inherit: false, + nulls_not_distinct: false, + comment: None, + } +} + +fn empty_ctx() -> ( + HashSet, + HashMap>, + HashSet, +) { + (HashSet::new(), HashMap::new(), HashSet::new()) +} + +#[test] +fn plain_index_becomes_concurrent_post_commit() { + let (parents, children, part_idx) = empty_ctx(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index( + "public", + "orders", + "idx_orders_total", + "CREATE INDEX idx_orders_total ON public.orders USING btree (total)", + ); + let split = index_create_split(&idx, &ctx, false); + assert!(split.in_txn.is_empty()); + assert!( + split.post_commit.contains( + "CREATE INDEX CONCURRENTLY idx_orders_total ON public.orders USING btree (total);" + ), + "got: {}", + split.post_commit + ); +} + +#[test] +fn unique_index_keyword_preserved() { + let (parents, children, part_idx) = empty_ctx(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index( + "public", + "orders", + "uq_orders_ref", + "CREATE UNIQUE INDEX uq_orders_ref ON public.orders USING btree (ref)", + ); + let split = index_create_split(&idx, &ctx, false); + assert!( + split + .post_commit + .contains("CREATE UNIQUE INDEX CONCURRENTLY uq_orders_ref"), + "got: {}", + split.post_commit + ); +} + +#[test] +fn index_comment_follows_concurrent_build() { + let (parents, children, part_idx) = empty_ctx(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let mut idx = index( + "public", + "orders", + "idx_total", + "CREATE INDEX idx_total ON public.orders USING btree (total)", + ); + idx.comment = Some("by total".to_string()); + let split = index_create_split(&idx, &ctx, false); + assert!(split.post_commit.contains("CREATE INDEX CONCURRENTLY")); + assert!( + split + .post_commit + .contains("comment on index public.idx_total is 'by total';") + ); +} + +#[test] +fn partitioned_parent_expands_to_only_plus_attach() { + let mut parents = HashSet::new(); + parents.insert("public.orders".to_string()); + let mut children = HashMap::new(); + children.insert( + "public.orders".to_string(), + vec![ + ChildRef { + schema: "public".to_string(), + table: "orders_2023".to_string(), + }, + ChildRef { + schema: "public".to_string(), + table: "orders_2024".to_string(), + }, + ], + ); + let part_idx = HashSet::new(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index( + "public", + "orders", + "idx_orders_total", + "CREATE INDEX idx_orders_total ON public.orders USING btree (total)", + ); + let split = index_create_split(&idx, &ctx, false); + + // Parent index created ON ONLY, in-transaction. + assert!( + split + .in_txn + .contains("CREATE INDEX idx_orders_total ON ONLY public.orders USING btree (total);"), + "in_txn: {}", + split.in_txn + ); + // Each partition gets a concurrent build + attach, post-commit. + assert!(split.post_commit.contains( + "CREATE INDEX CONCURRENTLY orders_2023_idx_orders_total ON public.orders_2023 USING btree (total);" + )); + assert!(split.post_commit.contains( + "alter index public.idx_orders_total attach partition public.orders_2023_idx_orders_total;" + )); + assert!(split.post_commit.contains( + "CREATE INDEX CONCURRENTLY orders_2024_idx_orders_total ON public.orders_2024 USING btree (total);" + )); + assert!(split.post_commit.contains( + "alter index public.idx_orders_total attach partition public.orders_2024_idx_orders_total;" + )); +} + +#[test] +fn new_partitioned_parent_emits_plain_in_txn_no_attach() { + // When the partitioned parent is itself created in this migration, its + // partitions are new and empty: PostgreSQL auto-creates and attaches each + // partition index at `PARTITION OF` time. The rewrite must NOT emit the + // manual concurrent build + attach (that double-attaches), just a plain + // in-transaction CREATE INDEX. + let mut parents = HashSet::new(); + parents.insert("public.orders".to_string()); + let mut children = HashMap::new(); + children.insert( + "public.orders".to_string(), + vec![ChildRef { + schema: "public".to_string(), + table: "orders_2024".to_string(), + }], + ); + let part_idx = HashSet::new(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index( + "public", + "orders", + "idx_orders_total", + "CREATE INDEX idx_orders_total ON ONLY public.orders USING btree (total)", + ); + let split = index_create_split(&idx, &ctx, true); + + assert!( + split + .in_txn + .contains("CREATE INDEX idx_orders_total ON ONLY public.orders USING btree (total);"), + "in_txn: {}", + split.in_txn + ); + assert!( + split.post_commit.is_empty(), + "new partitioned parent must not emit post-commit attach: {}", + split.post_commit + ); + assert!( + !split.in_txn.contains("attach partition"), + "must not attach partitions manually: {}", + split.in_txn + ); +} + +#[test] +fn partitioned_parent_indexdef_already_on_only_is_not_doubled() { + // `pg_get_indexdef` emits `ON ONLY` for an index on a partitioned parent. + // The rewrite must be idempotent and not produce `ON ONLY ONLY`. + let mut parents = HashSet::new(); + parents.insert("data.tagged_items".to_string()); + let mut children = HashMap::new(); + children.insert( + "data.tagged_items".to_string(), + vec![ChildRef { + schema: "data".to_string(), + table: "tagged_items_p0".to_string(), + }], + ); + let part_idx = HashSet::new(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index( + "data", + "tagged_items", + "idx_tagged_items_detail", + "CREATE INDEX idx_tagged_items_detail ON ONLY data.tagged_items USING btree (detail)", + ); + let split = index_create_split(&idx, &ctx, false); + + assert!( + split.in_txn.contains( + "CREATE INDEX idx_tagged_items_detail ON ONLY data.tagged_items USING btree (detail);" + ), + "in_txn: {}", + split.in_txn + ); + assert!( + !split.in_txn.contains("ON ONLY ONLY"), + "doubled ONLY: {}", + split.in_txn + ); +} + +#[test] +fn partitioned_parent_without_known_children_falls_back_in_txn() { + let mut parents = HashSet::new(); + parents.insert("public.orders".to_string()); + let children = HashMap::new(); + let part_idx = HashSet::new(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index( + "public", + "orders", + "idx_orders_total", + "CREATE INDEX idx_orders_total ON public.orders USING btree (total)", + ); + let split = index_create_split(&idx, &ctx, false); + assert!(split.post_commit.is_empty()); + assert!(split.in_txn.contains("no partitions found")); + assert!( + split + .in_txn + .contains("CREATE INDEX idx_orders_total ON public.orders USING btree (total);") + ); +} + +#[test] +fn drop_index_concurrent_post_commit_for_plain_table() { + let (parents, children, part_idx) = empty_ctx(); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index("public", "orders", "idx_total", ""); + let (stmt, post_commit) = index_drop_statement(&idx, &ctx); + assert!(post_commit); + assert_eq!(stmt, "drop index concurrently if exists public.idx_total;"); +} + +#[test] +fn drop_partitioned_index_stays_in_txn_non_concurrent() { + let parents = HashSet::new(); + let children = HashMap::new(); + let mut part_idx = HashSet::new(); + part_idx.insert("public.idx_total".to_string()); + let ctx = PartitionContext { + partitioned_parents: &parents, + children: &children, + partitioned_indexes: &part_idx, + }; + let idx = index("public", "orders", "idx_total", ""); + let (stmt, post_commit) = index_drop_statement(&idx, &ctx); + assert!(!post_commit); + assert_eq!(stmt, "drop index if exists public.idx_total;"); +} + +#[test] +fn foreign_key_split_into_not_valid_and_validate() { + let c = fk( + "public", + "orders", + "fk_orders_customer", + "FOREIGN KEY (customer_id) REFERENCES public.customers (id)", + ); + let split = foreign_key_split(&c); + assert!( + split.in_txn.contains( + "add constraint fk_orders_customer foreign key (customer_id) references public.customers (id) not valid;" + ), + "in_txn: {}", + split.in_txn + ); + assert_eq!( + split.post_commit.trim_end(), + "alter table public.orders validate constraint fk_orders_customer;" + ); +} + +#[test] +fn non_enforced_fk_is_not_split() { + let mut c = fk( + "public", + "orders", + "fk_orders_customer", + "FOREIGN KEY (customer_id) REFERENCES public.customers (id) NOT ENFORCED", + ); + c.is_enforced = false; + let split = foreign_key_split(&c); + assert!(split.post_commit.is_empty()); + assert!(!split.in_txn.contains("not valid")); +} + +#[test] +fn fk_comment_preserved_in_txn() { + let mut c = fk( + "public", + "orders", + "fk_orders_customer", + "FOREIGN KEY (customer_id) REFERENCES public.customers (id)", + ); + c.comment = Some("links to customer".to_string()); + let split = foreign_key_split(&c); + assert!(split.in_txn.contains("not valid;")); + assert!( + split.in_txn.contains( + "comment on constraint fk_orders_customer on public.orders is 'links to customer';" + ), + "in_txn: {}", + split.in_txn + ); + assert!( + split + .post_commit + .contains("validate constraint fk_orders_customer;") + ); +} + +// ---- make_idempotent ---- + +#[test] +fn create_table_gets_if_not_exists() { + assert_eq!( + make_idempotent("create table public.orders (id int);"), + "create table if not exists public.orders (id int);" + ); +} + +#[test] +fn create_unlogged_table_gets_if_not_exists() { + assert_eq!( + make_idempotent("create unlogged table public.t (id int);"), + "create unlogged table if not exists public.t (id int);" + ); +} + +#[test] +fn create_table_partition_of_gets_if_not_exists() { + assert_eq!( + make_idempotent("create table public.p1 partition of public.p for values in (1);"), + "create table if not exists public.p1 partition of public.p for values in (1);" + ); +} + +#[test] +fn create_table_already_guarded_is_not_doubled() { + let s = "create table if not exists public.orders (id int);"; + assert_eq!(make_idempotent(s), s); +} + +#[test] +fn create_sequence_gets_if_not_exists() { + assert_eq!( + make_idempotent("create sequence public.s;"), + "create sequence if not exists public.s;" + ); + assert_eq!( + make_idempotent("create unlogged sequence public.s;"), + "create unlogged sequence if not exists public.s;" + ); +} + +#[test] +fn create_materialized_view_gets_if_not_exists() { + assert_eq!( + make_idempotent("create materialized view public.mv as\nselect 1;"), + "create materialized view if not exists public.mv as\nselect 1;" + ); +} + +#[test] +fn create_view_becomes_create_or_replace() { + assert_eq!( + make_idempotent("create view public.v as\nselect 1;"), + "create or replace view public.v as\nselect 1;" + ); +} + +#[test] +fn create_or_replace_view_is_left_untouched() { + let s = "CREATE OR REPLACE VIEW public.v as\nselect 1;"; + assert_eq!(make_idempotent(s), s); +} + +#[test] +fn create_index_gets_if_not_exists() { + assert_eq!( + make_idempotent("CREATE INDEX idx ON public.t USING btree (a);"), + "CREATE INDEX IF NOT EXISTS idx ON public.t USING btree (a);" + ); + assert_eq!( + make_idempotent("CREATE UNIQUE INDEX idx ON public.t USING btree (a);"), + "CREATE UNIQUE INDEX IF NOT EXISTS idx ON public.t USING btree (a);" + ); +} + +#[test] +fn create_index_concurrently_gets_if_not_exists_after_concurrently() { + assert_eq!( + make_idempotent("CREATE INDEX CONCURRENTLY idx ON public.t USING btree (a);"), + "CREATE INDEX CONCURRENTLY IF NOT EXISTS idx ON public.t USING btree (a);" + ); + assert_eq!( + make_idempotent("CREATE UNIQUE INDEX CONCURRENTLY idx ON public.t USING btree (a);"), + "CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS idx ON public.t USING btree (a);" + ); +} + +#[test] +fn create_index_already_guarded_is_not_doubled() { + let s = "CREATE INDEX IF NOT EXISTS idx ON public.t USING btree (a);"; + assert_eq!(make_idempotent(s), s); +} + +#[test] +fn add_column_gets_if_not_exists() { + assert_eq!( + make_idempotent("alter table public.t add column c int;"), + "alter table public.t add column if not exists c int;" + ); +} + +#[test] +fn drop_column_gets_if_exists() { + assert_eq!( + make_idempotent("alter table public.t drop column c;"), + "alter table public.t drop column if exists c;" + ); +} + +#[test] +fn drop_constraint_gets_if_exists() { + assert_eq!( + make_idempotent("alter table public.t drop constraint c;"), + "alter table public.t drop constraint if exists c;" + ); +} + +#[test] +fn add_constraint_is_left_unguarded() { + let s = "alter table public.t add constraint c check (x > 0);"; + assert_eq!(make_idempotent(s), s); +} + +#[test] +fn alter_guards_not_doubled() { + let s = "alter table public.t add column if not exists c int;"; + assert_eq!(make_idempotent(s), s); + let d = "alter table public.t drop column if exists c;"; + assert_eq!(make_idempotent(d), d); +} + +#[test] +fn create_type_is_left_unguarded() { + let s = "create type public.mood as enum ('a', 'b');"; + assert_eq!(make_idempotent(s), s); +} + +#[test] +fn drop_table_if_exists_is_left_untouched() { + let s = "drop table if exists public.t;"; + assert_eq!(make_idempotent(s), s); +} + +#[test] +fn keyword_inside_string_literal_is_not_rewritten() { + // A default literal that contains the text `drop column` must be left + // verbatim; only the structural `add column` is guarded. + let s = "alter table public.t add column c text default 'x drop column y';"; + assert_eq!( + make_idempotent(s), + "alter table public.t add column if not exists c text default 'x drop column y';" + ); +} + +#[test] +fn keyword_inside_quoted_identifier_is_not_rewritten() { + let s = "alter table public.t drop column \"add column\";"; + assert_eq!( + make_idempotent(s), + "alter table public.t drop column if exists \"add column\";" + ); +} + +#[test] +fn commented_out_drop_is_left_untouched() { + // With `use_drop` off, drops are line-commented and must not be guarded. + let s = "-- alter table public.t drop column c;\ncreate table public.u (id int);"; + assert_eq!( + make_idempotent(s), + "-- alter table public.t drop column c;\ncreate table if not exists public.u (id int);" + ); +} + +#[test] +fn block_comment_before_statement_is_preserved() { + let s = "/* Table: public.t */\ncreate table public.t (id int);"; + assert_eq!( + make_idempotent(s), + "/* Table: public.t */\ncreate table if not exists public.t (id int);" + ); +} + +#[test] +fn dollar_quoted_body_is_left_untouched() { + // A function body that mentions DDL keywords must pass through verbatim. + let s = "create or replace function f() returns void language plpgsql as $$\nbegin\n -- create table x\nend;\n$$;"; + assert_eq!(make_idempotent(s), s); +} + +#[test] +fn multiple_statements_each_guarded_independently() { + let s = "create table public.a (id int);\n\nalter table public.a add column b int;\n\nalter table public.a drop constraint old;"; + assert_eq!( + make_idempotent(s), + "create table if not exists public.a (id int);\n\nalter table public.a add column if not exists b int;\n\nalter table public.a drop constraint if exists old;" + ); +} + +#[test] +fn begin_commit_wrapper_is_untouched() { + let s = "begin;\n\ncreate table public.t (id int);\n\ncommit;"; + assert_eq!( + make_idempotent(s), + "begin;\n\ncreate table if not exists public.t (id int);\n\ncommit;" + ); +} + +#[test] +fn alter_column_set_default_is_not_mistaken_for_op() { + let s = "alter table public.t alter column c set default 'add column z';"; + assert_eq!(make_idempotent(s), s); +} + +#[test] +fn empty_input_is_empty() { + assert_eq!(make_idempotent(""), ""); +} diff --git a/app/src/comparer/scanner.rs b/app/src/comparer/scanner.rs index 444ac42..c794b98 100644 --- a/app/src/comparer/scanner.rs +++ b/app/src/comparer/scanner.rs @@ -132,7 +132,7 @@ pub(crate) fn strip_comments_and_collapse(script: &str) -> String { /// In both modes every byte is copied verbatim — the scanner does not /// interpret the content, only tracks enough structure to know when the /// literal ends. -fn copy_quoted_literal( +pub(crate) fn copy_quoted_literal( src: &[u8], result: &mut Vec, i: &mut usize, @@ -169,7 +169,7 @@ fn copy_quoted_literal( /// Checks if a dollar-quote tag starts at position `pos` in `src`. /// A tag is `$` followed by zero or more alphanumeric/underscore chars, followed by `$`. /// Returns the total length of the tag (including both `$` signs) or None. -fn dollar_tag_at(src: &[u8], pos: usize) -> Option { +pub(crate) fn dollar_tag_at(src: &[u8], pos: usize) -> Option { if pos >= src.len() || src[pos] != b'$' { return None; } diff --git a/app/src/config/core.rs b/app/src/config/core.rs index 4fab43a..c58f86d 100644 --- a/app/src/config/core.rs +++ b/app/src/config/core.rs @@ -20,6 +20,11 @@ pub struct Config { pub grants_mode: GrantsMode, // Maximum number of connections in the PostgreSQL connection pool pub max_connections: u32, + // Whether to emit a migration script that is safe/convenient to run on a + // live production database (concurrent index builds, partition-aware index + // creation, NOT VALID + VALIDATE for foreign keys, concurrent index drops, + // and a split transaction so the concurrent statements run outside it). + pub output_for_production: bool, } impl Config { @@ -55,6 +60,7 @@ impl Config { let mut use_comments = true; let mut grants_mode = GrantsMode::Ignore; let mut max_connections: u32 = 16; + let mut output_for_production = false; for line in &config_data { if line.trim().is_empty() || line.starts_with('#') { @@ -89,6 +95,7 @@ impl Config { && key != "USE_COMMENTS" && key != "GRANTS_MODE" && key != "MAX_CONNECTIONS" + && key != "OUTPUT_FOR_PRODUCTION" { return Err(format!("Unknown configuration key: {}", parts[0])); } @@ -119,6 +126,17 @@ impl Config { "OUTPUT" => output = raw_value.to_string(), "USE_DROP" => use_drop = value == "TRUE", "USE_SINGLE_TRANSACTION" => use_single_transaction = value == "TRUE", + "OUTPUT_FOR_PRODUCTION" => { + output_for_production = match value.as_str() { + "TRUE" => true, + "FALSE" => false, + _ => { + return Err(format!( + "Invalid value for OUTPUT_FOR_PRODUCTION: {raw_value}" + )); + } + }; + } "USE_COMMENTS" => { use_comments = match value.as_str() { "TRUE" => true, @@ -208,6 +226,7 @@ impl Config { use_comments, grants_mode, max_connections, + output_for_production, }) } diff --git a/app/src/config/core_tests.rs b/app/src/config/core_tests.rs index 83022c9..ab5ddf7 100644 --- a/app/src/config/core_tests.rs +++ b/app/src/config/core_tests.rs @@ -30,6 +30,7 @@ fn test_valid_config_parsing() { OUTPUT=result.out USE_DROP=true USE_SINGLE_TRANSACTION=true + OUTPUT_FOR_PRODUCTION=true "#; let file = write_temp_config(config_content, "test_valid_config_parsing.cfg"); let config = Config::new(file.clone()); @@ -46,6 +47,25 @@ fn test_valid_config_parsing() { assert_eq!(config.output, "result.out"); assert!(config.use_drop); assert!(config.use_single_transaction); + assert!(config.output_for_production); + let _ = std::fs::remove_file(file); +} + +#[test] +fn test_output_for_production_defaults_false() { + let config_content = "FROM_HOST=localhost\nTO_HOST=remotehost"; + let file = write_temp_config(config_content, "test_output_for_production_default.cfg"); + let config = Config::new(file.clone()); + assert!(!config.output_for_production); + let _ = std::fs::remove_file(file); +} + +#[test] +#[should_panic] +fn test_invalid_output_for_production_value_panics() { + let config_content = "FROM_HOST=localhost\nOUTPUT_FOR_PRODUCTION=maybe"; + let file = write_temp_config(config_content, "test_invalid_ofp_value.cfg"); + let _ = Config::new(file.clone()); let _ = std::fs::remove_file(file); } diff --git a/app/src/dump/table.rs b/app/src/dump/table.rs index 4c775db..79e52ac 100644 --- a/app/src/dump/table.rs +++ b/app/src/dump/table.rs @@ -175,6 +175,20 @@ pub struct Table { pub typed_table_type: Option, // OF type name for typed tables } +/// Structured result of [`Table::index_alter_plan`]: which indexes to create, +/// drop, or merely re-comment when altering a table. Borrows from both the FROM +/// and TO tables. Consumed by the production output path. +#[derive(Debug, Default)] +pub struct IndexAlterPlan<'a> { + /// Indexes to (re)create — brand-new, or whose definition changed. + pub create: Vec<&'a TableIndex>, + /// Old indexes to drop — definition changed (paired with a `create`) or + /// removed entirely. + pub drop: Vec<&'a TableIndex>, + /// Indexes whose comment changed but whose definition did not. + pub comment_changes: Vec<&'a TableIndex>, +} + impl Table { /// Creates a new Table with the given name #[allow(clippy::too_many_arguments)] // Table metadata naturally includes these fields (from pg_class and related catalogs). @@ -1100,7 +1114,7 @@ impl Table { self.hash = Some(format!("{:x}", hasher.finalize())); } - fn build_script(&self, include_triggers: bool) -> String { + fn build_script(&self, include_triggers: bool, defer_indexes: bool) -> String { let mut script = String::new(); let unlogged_prefix = if self.is_unlogged { "unlogged " } else { "" }; @@ -1373,10 +1387,16 @@ impl Table { } } - // 6. Add indexes (excluding primary key indexes and partition-inherited indexes) - for index in &self.indexes { - if !index.indexdef.to_lowercase().contains("primary key") && !index.is_partition_index { - script.push_str(&index.get_script()); + // 6. Add indexes (excluding primary key indexes and partition-inherited indexes). + // In production output mode the indexes are emitted separately by the + // comparer (concurrently / partition-aware), so they are deferred here. + if !defer_indexes { + for index in &self.indexes { + if !index.indexdef.to_lowercase().contains("primary key") + && !index.is_partition_index + { + script.push_str(&index.get_script()); + } } } @@ -1447,12 +1467,71 @@ impl Table { /// Get script for the table pub fn get_script(&self) -> String { - self.build_script(true) + self.build_script(true, false) } /// Get script for the table without triggers (for deferred trigger creation) pub fn get_script_without_triggers(&self) -> String { - self.build_script(false) + self.build_script(false, false) + } + + /// Get script for the table without triggers and without index creation. + /// Used by the production output path, which emits indexes separately so + /// they can be built concurrently / partition-aware. + pub fn get_script_without_triggers_no_indexes(&self) -> String { + self.build_script(false, true) + } + + /// Indexes that `build_script` would emit for a brand-new table: excludes + /// primary-key-backing indexes (created via the PRIMARY KEY clause) and + /// partition-inherited indexes (managed by the partitioned parent). Mirrors + /// the filter in `build_script` so the production path stays in sync. + pub fn creatable_indexes(&self) -> Vec<&TableIndex> { + self.indexes + .iter() + .filter(|index| { + !index.indexdef.to_lowercase().contains("primary key") && !index.is_partition_index + }) + .collect() + } + + /// Structured index diff between `self` (FROM) and `to_table` (TO), + /// mirroring the index handling in `build_alter_script`. The production + /// output path consumes this so each index change can be rewritten + /// concurrently / partition-aware instead of emitted inline. + pub fn index_alter_plan<'a>(&'a self, to_table: &'a Table) -> IndexAlterPlan<'a> { + let mut plan = IndexAlterPlan::default(); + + for new_index in &to_table.indexes { + if new_index.is_partition_index { + continue; + } + if let Some(old_index) = self.indexes.iter().find(|i| i.name == new_index.name) { + if old_index != new_index { + if old_index.indexdef != new_index.indexdef { + // Definition changed: drop the old, build the new. + plan.drop.push(old_index); + plan.create.push(new_index); + } else { + // Comment-only change. + plan.comment_changes.push(new_index); + } + } + } else { + plan.create.push(new_index); + } + } + + for old_index in &self.indexes { + if old_index.is_partition_index { + continue; + } + if !to_table.indexes.iter().any(|i| i.name == old_index.name) { + plan.drop.push(old_index); + } + } + + plan } /// Get trigger creation scripts only @@ -1521,11 +1600,46 @@ impl Table { script } + /// Like [`Table::get_foreign_key_alter_script`] but returns the in-place FK + /// modifications as a script and the brand-new foreign keys separately, so + /// the production output path can add the new ones `NOT VALID` and validate + /// them after the transaction commits. Mirrors the branch structure of + /// `get_foreign_key_alter_script` so the two cannot drift apart. + pub fn foreign_key_alter_split<'a>( + &self, + to_table: &'a Table, + ) -> (String, Vec<&'a TableConstraint>) { + let mut alters = String::new(); + let mut new_fks: Vec<&TableConstraint> = Vec::new(); + for new_constraint in &to_table.constraints { + if new_constraint.constraint_type.to_lowercase() != "foreign key" { + continue; + } + if let Some(old_constraint) = self + .constraints + .iter() + .find(|c| c.name == new_constraint.name) + { + if old_constraint != new_constraint { + if let Some(alter_script) = old_constraint.get_alter_script(new_constraint) { + alters.push_str(&alter_script); + } else { + new_fks.push(new_constraint); + } + } + } else { + new_fks.push(new_constraint); + } + } + (alters, new_fks) + } + fn build_alter_script( &self, to_table: &Table, use_drop: bool, include_triggers: bool, + defer_indexes: bool, ) -> String { // If partition key changes (e.g. from LIST to RANGE, or different column), we must recreate the table. // Also if table changes from partitioned to non-partitioned or vice versa. @@ -1914,14 +2028,18 @@ impl Table { script.push_str(&partition_script); script.push_str(&constraint_pre_script); script.push_str(&column_alter_script); - script.push_str(&index_drop_script); + if !defer_indexes { + script.push_str(&index_drop_script); + } if include_triggers { script.push_str(&trigger_drop_script); } script.push_str(&policy_drop_script); script.push_str(&column_drop_script); script.push_str(&constraint_post_script); - script.push_str(&index_script); + if !defer_indexes { + script.push_str(&index_script); + } if include_triggers { script.push_str(&trigger_script); } @@ -2052,12 +2170,24 @@ impl Table { /// Get script for altering the table (including triggers) pub fn get_alter_script(&self, to_table: &Table, use_drop: bool) -> String { - self.build_alter_script(to_table, use_drop, true) + self.build_alter_script(to_table, use_drop, true, false) } /// Get script for altering the table without triggers (for deferred trigger creation) pub fn get_alter_script_without_triggers(&self, to_table: &Table, use_drop: bool) -> String { - self.build_alter_script(to_table, use_drop, false) + self.build_alter_script(to_table, use_drop, false, false) + } + + /// Like [`Table::get_alter_script_without_triggers`] but omits index + /// creation/drop statements. Used by the production output path, which + /// emits index changes separately (concurrently / partition-aware) via + /// [`Table::index_alter_plan`]. + pub fn get_alter_script_without_triggers_no_indexes( + &self, + to_table: &Table, + use_drop: bool, + ) -> String { + self.build_alter_script(to_table, use_drop, false, true) } /// True when comparing `self` (FROM) against `to_table` (TO) would diff --git a/app/src/main.rs b/app/src/main.rs index af27551..06d2089 100644 --- a/app/src/main.rs +++ b/app/src/main.rs @@ -96,6 +96,15 @@ struct Args { #[arg(long, default_value = "16", value_parser = clap::value_parser!(u32).range(1..))] max_connections: u32, + /// Emit a migration script that is convenient to run against a live + /// production database: indexes are built with CREATE INDEX CONCURRENTLY + /// (partition-aware for partitioned tables), foreign keys are added + /// NOT VALID then VALIDATEd, and indexes are dropped with DROP INDEX + /// CONCURRENTLY. The concurrent statements run after the main transaction + /// commits. Default: false (output unchanged). + #[arg(long, default_value_t = false, num_args = 0..=1, default_missing_value = "true", value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] + output_for_production: bool, + /// Use CASCADE in DROP statements for the clear command. WARNING: CASCADE can drop /// dependent objects outside the selected schema(s) (e.g., foreign keys or views in /// other schemas that reference the dropped objects). Without this flag, drops rely @@ -157,6 +166,7 @@ async fn run_main() -> Result<(), Error> { args.use_single_transaction, args.use_comments, args.grants_mode, + args.output_for_production, ) .await; } @@ -263,6 +273,7 @@ async fn run_by_config(config: String) -> Result<(), Error> { cfg.use_single_transaction, cfg.use_comments, cfg.grants_mode, + cfg.output_for_production, ) .await; @@ -311,6 +322,7 @@ async fn clear_database( Ok(()) } +#[allow(clippy::too_many_arguments)] async fn compare_dumps( from: String, to: String, @@ -319,6 +331,7 @@ async fn compare_dumps( use_single_transaction: bool, use_comments: bool, grants_mode: GrantsMode, + output_for_production: bool, ) -> Result<(), Error> { println!("Reading dumps..."); let from = Dump::read_from_file(&from).await?; @@ -334,6 +347,7 @@ async fn compare_dumps( use_comments, grants_mode, ); + comparer.set_output_for_production(output_for_production); comparer.compare().await?; comparer.save_script(&output).await?; println!("Dump compared successfully. Result script: {output}"); diff --git a/data/pgc.conf b/data/pgc.conf index 4d2f797..0257b3f 100644 --- a/data/pgc.conf +++ b/data/pgc.conf @@ -24,4 +24,13 @@ TO_DUMP=to.dump # OUTPUT OUTPUT=delta.sql -USE_DROP=false \ No newline at end of file +USE_DROP=false + +# Emit a migration script that is convenient to run against a live production +# database: indexes are built with CREATE INDEX CONCURRENTLY (and partition-aware +# ONLY parent + per-partition + ATTACH for partitioned tables), foreign keys are +# added NOT VALID and validated afterwards, and indexes are dropped with +# DROP INDEX CONCURRENTLY. The concurrent statements are emitted after the main +# transaction commits (so USE_SINGLE_TRANSACTION still wraps the rest). +# Default: false (output is identical to previous behaviour). +OUTPUT_FOR_PRODUCTION=false \ No newline at end of file