From 7909e55667b96576b6d620cbf5c6e7ba7e1350fc Mon Sep 17 00:00:00 2001 From: suryaiyer95 Date: Thu, 26 Mar 2026 17:39:50 -0700 Subject: [PATCH 1/4] feat: add data-parity cross-database table comparison - Add DataParity engine integration via native Rust bindings - Add data-diff tool for LLM agent (profile, joindiff, hashdiff, cascade, auto) - Add ClickHouse driver support - Add data-parity skill: profile-first workflow, algorithm selection guide, CRITICAL warning that joindiff cannot run cross-database (always returns 0 diffs), output style rules (facts only, no editorializing) - Gitignore .altimate-code/ (credentials) and *.node (platform binaries) --- .gitignore | 6 + .opencode/skills/data-parity/SKILL.md | 290 ++++++++++++++++++ packages/drivers/src/clickhouse.ts | 120 ++++++++ packages/drivers/src/index.ts | 1 + .../altimate/native/connections/data-diff.ts | 268 ++++++++++++++++ .../altimate/native/connections/register.ts | 8 + .../altimate/native/connections/registry.ts | 4 + .../opencode/src/altimate/native/types.ts | 34 ++ .../opencode/src/altimate/tools/data-diff.ts | 174 +++++++++++ packages/opencode/src/tool/registry.ts | 2 + 10 files changed, 907 insertions(+) create mode 100644 .opencode/skills/data-parity/SKILL.md create mode 100644 packages/drivers/src/clickhouse.ts create mode 100644 packages/opencode/src/altimate/native/connections/data-diff.ts create mode 100644 packages/opencode/src/altimate/tools/data-diff.ts diff --git a/.gitignore b/.gitignore index b10c1bb043..4dfe62f9ee 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,12 @@ target # Commit message scratch files .github/meta/ +# Local connections config (may contain credentials) +.altimate-code/ + +# Pre-built native binaries (platform-specific, not for source control) +packages/opencode/*.node + # Local dev files opencode-dev logs/ diff --git a/.opencode/skills/data-parity/SKILL.md b/.opencode/skills/data-parity/SKILL.md new file mode 100644 index 0000000000..4d7b7460c9 --- /dev/null +++ b/.opencode/skills/data-parity/SKILL.md @@ -0,0 +1,290 @@ +--- +name: data-parity +description: Validate that two tables or query results are identical — or diagnose exactly how they differ. Discover schema, identify keys, profile cheaply, then diff. Use for migration validation, ETL regression, and query refactor verification. +--- + +# Data Parity (Table Diff) + +## Output Style + +**Report facts only. No editorializing.** +- Show counts, changed values, missing rows, new rows — that's it. +- Do NOT explain why row-level diffing is valuable, why COUNT(*) is insufficient, or pitch the tool. +- Do NOT add "the dangerous one", "this is exactly why", "this matters" style commentary. +- The user asked for a diff result, not a lecture. + +## Requirements +**Agent:** any +**Tools used:** `sql_query` (for schema discovery), `data_diff` + +## When to Use This Skill + +**Use when the user wants to:** +- Confirm two tables contain the same data after a migration +- Find rows added, deleted, or modified between source and target +- Validate that a dbt model produces the same output as the old query +- Run regression checks after a pipeline change + +**Do NOT use for:** +- Schema comparison (column names, types) — check DDL instead +- Performance benchmarking — this runs SELECT queries + +--- + +## The `data_diff` Tool + +`data_diff` takes table names and key columns. It generates SQL, routes it through the specified warehouse connections, and reports differences. It **does not discover schema** — you must provide key columns and relevant comparison columns. + +**Key parameters:** +- `source` — table name (`orders`, `db.schema.orders`) or full SELECT/WITH query +- `target` — table name or SELECT query +- `key_columns` — primary key(s) uniquely identifying each row (required) +- `source_warehouse` — connection name for source +- `target_warehouse` — connection name for target (omit = same as source) +- `extra_columns` — columns to compare beyond keys (omit = compare all) +- `algorithm` — `auto`, `joindiff`, `hashdiff`, `profile`, `cascade` +- `where_clause` — filter applied to both tables + +> **CRITICAL — Algorithm choice:** +> - If `source_warehouse` ≠ `target_warehouse` → **always use `hashdiff`** (or `auto`). +> - `joindiff` runs a single SQL JOIN on ONE connection — it physically cannot see the other table. +> Using `joindiff` across different servers always reports 0 differences (both sides look identical). +> - When in doubt, use `algorithm="auto"` — it picks `joindiff` for same-warehouse and `hashdiff` for cross-warehouse automatically. + +--- + +## Workflow + +The key principle: **the LLM does the identification work using SQL tools first, then calls data_diff with informed parameters.** + +### Step 1: Inspect the tables + +Before calling `data_diff`, use `sql_query` to understand what you're comparing: + +```sql +-- Get columns and types +SELECT column_name, data_type, is_nullable +FROM information_schema.columns +WHERE table_schema = 'public' AND table_name = 'orders' +ORDER BY ordinal_position +``` + +For ClickHouse: +```sql +DESCRIBE TABLE source_db.events +``` + +For Snowflake: +```sql +SHOW COLUMNS IN TABLE orders +``` + +**Look for:** +- Columns that look like primary keys (named `id`, `*_id`, `*_key`, `uuid`) +- Columns with `NOT NULL` constraints +- Whether there are composite keys + +### Step 2: Identify the key columns + +If the primary key isn't obvious from the schema, run a cardinality check: + +```sql +SELECT + COUNT(*) AS total_rows, + COUNT(DISTINCT order_id) AS distinct_order_id, + COUNT(DISTINCT customer_id) AS distinct_customer_id, + COUNT(DISTINCT created_at) AS distinct_created_at +FROM orders +``` + +**A good key column:** `distinct_count = total_rows` (fully unique) and `null_count = 0`. + +If no single column is unique, find a composite key: +```sql +SELECT order_id, line_item_id, COUNT(*) as cnt +FROM order_lines +GROUP BY order_id, line_item_id +HAVING COUNT(*) > 1 +LIMIT 5 +``` +If this returns 0 rows, `(order_id, line_item_id)` is a valid composite key. + +### Step 3: Estimate table size + +```sql +SELECT COUNT(*) FROM orders +``` + +Use this to choose the algorithm: +- **< 1M rows**: `joindiff` (same DB) or `hashdiff` (cross-DB) — either is fine +- **1M–100M rows**: `hashdiff` or `cascade` +- **> 100M rows**: `hashdiff` with a `where_clause` date filter to validate a recent window first + +### Step 4: Profile first for unknown tables + +If you don't know what to expect (first-time validation, unfamiliar pipeline), start cheap: + +``` +data_diff( + source="orders", + target="orders_migrated", + key_columns=["order_id"], + source_warehouse="postgres_prod", + target_warehouse="snowflake_dw", + algorithm="profile" +) +``` + +Profile output tells you: +- Row count on each side (mismatch = load completeness problem) +- Which columns have null count differences (mismatch = NULL handling bug) +- Min/max divergence per column (mismatch = value transformation bug) +- Which columns match exactly (safe to skip in row-level diff) + +**Interpret profile to narrow the diff:** +``` +Column Profile Comparison + + ✓ order_id: match + ✓ customer_id: match + ✗ amount: DIFFER ← source min=10.00, target min=10.01 — rounding issue? + ✗ status: DIFFER ← source nulls=0, target nulls=47 — NULL mapping bug? + ✓ created_at: match +``` +→ Only diff `amount` and `status` in the next step. + +### Step 5: Run targeted row-level diff + +``` +data_diff( + source="orders", + target="orders_migrated", + key_columns=["order_id"], + extra_columns=["amount", "status"], // only the columns profile said differ + source_warehouse="postgres_prod", + target_warehouse="snowflake_dw", + algorithm="hashdiff" +) +``` + +--- + +## Algorithm Selection + +| Algorithm | When to use | +|-----------|-------------| +| `profile` | First pass — column stats (count, min, max, nulls). No row scan. | +| `joindiff` | Same database — single FULL OUTER JOIN query. Fast. | +| `hashdiff` | Cross-database, or large tables — bisection with checksums. Scales. | +| `cascade` | Auto-escalate: profile → hashdiff on diverging columns. | +| `auto` | JoinDiff if same warehouse, HashDiff if cross-database. | + +**JoinDiff constraint:** Both tables must be on the **same database connection**. If source and target are on different servers, JoinDiff will always report 0 diffs (it only sees one side). Use `hashdiff` or `auto` for cross-database. + +--- + +## Output Interpretation + +### IDENTICAL +``` +✓ Tables are IDENTICAL + Rows checked: 1,000,000 +``` +→ Migration validated. Data is identical. + +### DIFFER — Diagnose by pattern + +``` +✗ Tables DIFFER + + Only in source: 2 → rows deleted in target (ETL missed deletes) + Only in target: 2 → rows added to target (dedup issue or new data) + Updated rows: 3 → values changed (transform bug, type casting, rounding) + Identical rows: 15 +``` + +| Pattern | Root cause hypothesis | +|---------|----------------------| +| `only_in_source > 0`, `only_in_target = 0` | ETL dropped rows — check filters, incremental logic | +| `only_in_source = 0`, `only_in_target > 0` | Target has extra rows — check dedup or wrong join | +| `updated_rows > 0`, row counts match | Silent value corruption — check transforms, type casts | +| Row count differs | Load completeness issue — check ETL watermarks | + +Sample diffs point to the specific key + column + old→new value: +``` +key={"order_id":"4"} col=amount: 300.00 → 305.00 +``` +Use this to query the source systems directly and trace the discrepancy. + +--- + +## Usage Examples + +### Full workflow: unknown migration +``` +// 1. Discover schema +sql_query("SELECT column_name, data_type FROM information_schema.columns WHERE table_name='orders'", warehouse="postgres_prod") + +// 2. Check row count +sql_query("SELECT COUNT(*), COUNT(DISTINCT order_id) FROM orders", warehouse="postgres_prod") + +// 3. Profile to find which columns differ +data_diff(source="orders", target="orders", key_columns=["order_id"], + source_warehouse="postgres_prod", target_warehouse="snowflake_dw", algorithm="profile") + +// 4. Row-level diff on diverging columns only +data_diff(source="orders", target="orders", key_columns=["order_id"], + extra_columns=["amount", "status"], + source_warehouse="postgres_prod", target_warehouse="snowflake_dw", algorithm="hashdiff") +``` + +### Same-database query refactor +``` +data_diff( + source="SELECT id, amount, status FROM orders WHERE region = 'us-east'", + target="SELECT id, amount, status FROM orders_v2 WHERE region = 'us-east'", + key_columns=["id"] +) +``` + +### Large table — filter to recent window first +``` +data_diff( + source="fact_events", + target="fact_events_v2", + key_columns=["event_id"], + where_clause="event_date >= '2024-01-01'", + algorithm="hashdiff" +) +``` + +### ClickHouse — always qualify with database.table +``` +data_diff( + source="source_db.events", + target="target_db.events", + key_columns=["event_id"], + source_warehouse="clickhouse_source", + target_warehouse="clickhouse_target", + algorithm="hashdiff" +) +``` + +--- + +## Common Mistakes + +**Calling data_diff without knowing the key** +→ Run `sql_query` to check cardinality first. A bad key gives meaningless results. + +**Using joindiff for cross-database tables** +→ JoinDiff runs one SQL query on one connection. It can't see the other table. Use `hashdiff` or `auto`. + +**Diffing a 1B row table without a date filter** +→ Add `where_clause` to scope to recent data. Validate a window first, then expand. + +**Ignoring profile output and jumping to full diff** +→ Profile is free. It tells you which columns actually differ so you can avoid scanning all columns across all rows. + +**Forgetting to check row counts before diffing** +→ If source has 1M rows and target has 900K, row-level diff is misleading. Fix the load completeness issue first. diff --git a/packages/drivers/src/clickhouse.ts b/packages/drivers/src/clickhouse.ts new file mode 100644 index 0000000000..7da7794513 --- /dev/null +++ b/packages/drivers/src/clickhouse.ts @@ -0,0 +1,120 @@ +/** + * ClickHouse driver using the HTTP API (no external package required). + * + * Uses ClickHouse's native HTTP interface on port 8123 with JSONCompactEachRow + * format for efficient row streaming. + */ + +import type { ConnectionConfig, Connector, ConnectorResult, SchemaColumn } from "./types" + +export async function connect(config: ConnectionConfig): Promise { + const host = (config.host as string) ?? "127.0.0.1" + const port = (config.port as number) ?? 8123 + const database = (config.database as string) ?? "default" + const user = (config.user as string) ?? "default" + const password = (config.password as string) ?? "" + + const baseUrl = `http://${host}:${port}/` + + async function httpQuery(sql: string): Promise<{ columns: string[]; rows: unknown[][] }> { + // Use JSONCompactEachRowWithNamesAndTypes to get column names + typed rows + const params = new URLSearchParams({ + database, + default_format: "JSONCompactEachRowWithNamesAndTypes", + output_format_json_quote_64bit_integers: "0", + }) + + const headers: Record = { + "Content-Type": "text/plain", + "X-ClickHouse-Database": database, + } + + if (user !== "default" || password) { + // Basic auth via headers + headers["X-ClickHouse-User"] = user + if (password) headers["X-ClickHouse-Key"] = password + } + + const response = await fetch(`${baseUrl}?${params}`, { + method: "POST", + headers, + body: sql, + }) + + if (!response.ok) { + const body = await response.text() + throw new Error(`ClickHouse HTTP ${response.status}: ${body.slice(0, 500)}`) + } + + const text = await response.text() + const lines = text.trim().split("\n").filter(Boolean) + + if (lines.length < 2) { + // No rows (DDL or empty result) + return { columns: [], rows: [] } + } + + // First line: column names, Second line: column types, Rest: data rows + const columns: string[] = JSON.parse(lines[0]) + // lines[1] = types (ignored for now) + const rows: unknown[][] = lines.slice(2).map((line) => JSON.parse(line)) + + return { columns, rows } + } + + const connector: Connector = { + async connect() { + // Verify connection with a lightweight query + await httpQuery("SELECT 1") + }, + + async execute(sql: string, limit?: number): Promise { + const effectiveLimit = limit ?? 1000 + const isSelectLike = /^\s*(SELECT|WITH)\b/i.test(sql) + + let query = sql + if (isSelectLike && effectiveLimit && !/\bLIMIT\b/i.test(sql)) { + query = `${sql.replace(/;\s*$/, "")} LIMIT ${effectiveLimit + 1}` + } + + const { columns, rows } = await httpQuery(query) + + const truncated = isSelectLike && rows.length > effectiveLimit + const finalRows = truncated ? rows.slice(0, effectiveLimit) : rows + + return { + columns, + rows: finalRows as unknown[][], + row_count: finalRows.length, + truncated, + } + }, + + async getSchema(database_?: string): Promise { + const db = database_ ?? database + const { rows } = await httpQuery(` + SELECT + table_name, + column_name, + data_type, + is_nullable + FROM information_schema.columns + WHERE table_schema = '${db}' + ORDER BY table_name, ordinal_position + `) + + return rows.map((row) => ({ + table_name: String(row[0]), + name: String(row[1]), + data_type: String(row[2]), + nullable: row[3] === "YES" || row[3] === "1", + })) + }, + + async close() { + // HTTP client — no persistent connection to close + }, + } + + return connector +} diff --git a/packages/drivers/src/index.ts b/packages/drivers/src/index.ts index 73a8d7c2c1..8102d6e275 100644 --- a/packages/drivers/src/index.ts +++ b/packages/drivers/src/index.ts @@ -16,3 +16,4 @@ export { connect as connectOracle } from "./oracle" export { connect as connectDuckdb } from "./duckdb" export { connect as connectSqlite } from "./sqlite" export { connect as connectMongodb } from "./mongodb" +export { connect as connectClickhouse } from "./clickhouse" diff --git a/packages/opencode/src/altimate/native/connections/data-diff.ts b/packages/opencode/src/altimate/native/connections/data-diff.ts new file mode 100644 index 0000000000..035df6b4ca --- /dev/null +++ b/packages/opencode/src/altimate/native/connections/data-diff.ts @@ -0,0 +1,268 @@ +/** + * DataParity orchestrator — runs the cooperative Rust state machine against + * live database connections. + * + * The Rust engine (DataParitySession) never touches databases — it emits SQL + * for us to execute, we feed results back, and it decides the next step. + * This file is the bridge between that engine and altimate-code's drivers. + */ + +import type { DataDiffParams, DataDiffResult } from "../types" +import * as Registry from "./registry" + +// --------------------------------------------------------------------------- +// Query-source detection +// --------------------------------------------------------------------------- + +const SQL_KEYWORDS = /^\s*(SELECT|WITH|VALUES)\b/i + +/** + * Detect whether a string is an arbitrary SQL query (vs a plain table name). + * Plain table names may contain dots (schema.table, db.schema.table) but not spaces. + */ +function isQuery(input: string): boolean { + return SQL_KEYWORDS.test(input) +} + +/** + * If either source or target is an arbitrary query, wrap them in CTEs so the + * DataParity engine can treat them as tables named `__diff_source` / `__diff_target`. + * + * Returns `{ table1Name, table2Name, ctePrefix | null }`. + * + * When a CTE prefix is returned, it must be prepended to every SQL task emitted + * by the engine before execution. + */ +export function resolveTableSources( + source: string, + target: string, +): { table1Name: string; table2Name: string; ctePrefix: string | null } { + const source_is_query = isQuery(source) + const target_is_query = isQuery(target) + + if (!source_is_query && !target_is_query) { + // Both are plain table names — pass through unchanged + return { table1Name: source, table2Name: target, ctePrefix: null } + } + + // At least one is a query — wrap both in CTEs + const srcExpr = source_is_query ? source : `SELECT * FROM ${source}` + const tgtExpr = target_is_query ? target : `SELECT * FROM ${target}` + + const ctePrefix = `WITH __diff_source AS (\n${srcExpr}\n), __diff_target AS (\n${tgtExpr}\n)` + return { + table1Name: "__diff_source", + table2Name: "__diff_target", + ctePrefix, + } +} + +/** + * Inject a CTE prefix into a SQL statement from the engine. + * + * The engine emits standalone SELECT statements. We need to prepend our CTE + * definitions so `__diff_source`/`__diff_target` resolve correctly. + * + * Handles the case where the engine itself emits CTEs (starts with WITH …): + * WITH engine_cte AS (…) SELECT … FROM __diff_source + * becomes: + * WITH __diff_source AS (…), __diff_target AS (…), engine_cte AS (…) SELECT … + */ +export function injectCte(sql: string, ctePrefix: string): string { + const trimmed = sql.trimStart() + const withMatch = trimmed.match(/^WITH\s+/i) + + if (withMatch) { + // Engine also has CTEs — merge them: our CTEs first, then engine CTEs + const afterWith = trimmed.slice(withMatch[0].length) + // ctePrefix already starts with "WITH …" — strip "WITH " and append ", " + const ourDefs = ctePrefix.replace(/^WITH\s+/i, "") + return `WITH ${ourDefs},\n${afterWith}` + } + + // Plain SELECT — just prepend our CTE block + return `${ctePrefix}\n${trimmed}` +} + +// --------------------------------------------------------------------------- +// Executor +// --------------------------------------------------------------------------- + +type Rows = (string | null)[][] + +/** + * Execute a SQL statement against a named warehouse and return rows as string[][]. + */ +async function executeQuery(sql: string, warehouseName: string | undefined): Promise { + let connector + if (warehouseName) { + connector = await Registry.get(warehouseName) + } else { + const warehouses = Registry.list().warehouses + if (warehouses.length === 0) { + throw new Error("No default warehouse configured.") + } + connector = await Registry.get(warehouses[0].name) + } + + const result = await connector.execute(sql) + + // Normalise to string[][] — drivers return mixed types + return result.rows.map((row: unknown[]) => + row.map((v) => (v === null || v === undefined ? null : String(v))), + ) +} + +// --------------------------------------------------------------------------- +// Main orchestrator +// --------------------------------------------------------------------------- + +const MAX_STEPS = 200 + +export async function runDataDiff(params: DataDiffParams): Promise { + // Dynamically import NAPI module (not available in test environments without the binary) + let DataParitySession: new (specJson: string) => { + start(): string + step(responsesJson: string): string + } + + try { + const core = await import("@altimateai/altimate-core") + DataParitySession = (core as any).DataParitySession + if (!DataParitySession) throw new Error("DataParitySession not exported from @altimateai/altimate-core") + } catch (e) { + return { + success: false, + error: `altimate-core NAPI module unavailable: ${e}`, + steps: 0, + } + } + + // Resolve sources (plain table names vs arbitrary queries) + const { table1Name, table2Name, ctePrefix } = resolveTableSources( + params.source, + params.target, + ) + + // Parse optional qualified names: "db.schema.table" → { database, schema, table } + const parseQualified = (name: string) => { + const parts = name.split(".") + if (parts.length === 3) return { database: parts[0], schema: parts[1], table: parts[2] } + if (parts.length === 2) return { schema: parts[0], table: parts[1] } + return { table: name } + } + + const table1Ref = parseQualified(table1Name) + const table2Ref = parseQualified(table2Name) + + // Resolve dialect from warehouse config + const resolveDialect = (warehouse: string | undefined): string => { + if (warehouse) { + const cfg = Registry.getConfig(warehouse) + return cfg?.type ?? "generic" + } + const warehouses = Registry.list().warehouses + return warehouses[0]?.type ?? "generic" + } + + const dialect1 = resolveDialect(params.source_warehouse) + const dialect2 = resolveDialect(params.target_warehouse ?? params.source_warehouse) + + // Build session spec + const spec = { + table1: table1Ref, + table2: table2Ref, + dialect1, + dialect2, + config: { + algorithm: params.algorithm ?? "auto", + key_columns: params.key_columns, + extra_columns: params.extra_columns ?? [], + ...(params.where_clause ? { where_clause: params.where_clause } : {}), + ...(params.numeric_tolerance != null ? { numeric_tolerance: params.numeric_tolerance } : {}), + ...(params.timestamp_tolerance_ms != null + ? { timestamp_tolerance_ms: params.timestamp_tolerance_ms } + : {}), + }, + } + + // Create session + let session: InstanceType + try { + session = new DataParitySession(JSON.stringify(spec)) + } catch (e) { + return { + success: false, + error: `Failed to create DataParitySession: ${e}`, + steps: 0, + } + } + + // Route SQL tasks to the correct warehouse + const warehouseFor = (tableSide: string): string | undefined => + tableSide === "Table2" ? (params.target_warehouse ?? params.source_warehouse) : params.source_warehouse + + // Cooperative loop + let actionJson = session.start() + let stepCount = 0 + + while (stepCount < MAX_STEPS) { + const action = JSON.parse(actionJson) as { + type: string + tasks?: Array<{ id: string; table_side: string; sql: string; expected_shape: string }> + outcome?: unknown + message?: string + } + + if (action.type === "Done") { + return { + success: true, + steps: stepCount, + outcome: action.outcome, + } + } + + if (action.type === "Error") { + return { + success: false, + error: action.message ?? "Unknown engine error", + steps: stepCount, + } + } + + if (action.type !== "ExecuteSql") { + return { + success: false, + error: `Unexpected action type: ${action.type}`, + steps: stepCount, + } + } + + stepCount++ + + // Execute all SQL tasks in parallel + const tasks = action.tasks ?? [] + const responses = await Promise.all( + tasks.map(async (task) => { + const warehouse = warehouseFor(task.table_side) + // Inject CTE definitions if we're in query-comparison mode + const sql = ctePrefix ? injectCte(task.sql, ctePrefix) : task.sql + try { + const rows = await executeQuery(sql, warehouse) + return { id: task.id, rows } + } catch (e) { + // Return error shape — engine will produce an Error action on next step + return { id: task.id, rows: [], error: String(e) } + } + }), + ) + + actionJson = session.step(JSON.stringify(responses)) + } + + return { + success: false, + error: `Exceeded maximum step limit (${MAX_STEPS}). The diff may require more iterations for this table size.`, + steps: stepCount, + } +} diff --git a/packages/opencode/src/altimate/native/connections/register.ts b/packages/opencode/src/altimate/native/connections/register.ts index 7267a142d0..b93e1e1570 100644 --- a/packages/opencode/src/altimate/native/connections/register.ts +++ b/packages/opencode/src/altimate/native/connections/register.ts @@ -10,6 +10,7 @@ import { register } from "../dispatcher" import * as Registry from "./registry" import { discoverContainers } from "./docker-discovery" import { parseDbtProfiles } from "./dbt-profiles" +import { runDataDiff } from "./data-diff" import type { SqlExecuteParams, SqlExecuteResult, @@ -29,6 +30,8 @@ import type { SchemaInspectResult, DbtProfilesParams, DbtProfilesResult, + DataDiffParams, + DataDiffResult, } from "../types" import type { ConnectionConfig } from "@altimateai/drivers" import { Telemetry } from "../../../telemetry" @@ -425,6 +428,11 @@ register("dbt.profiles", async (params: DbtProfilesParams): Promise => { + return runDataDiff(params) +}) + } // end registerAll // Auto-register on module load diff --git a/packages/opencode/src/altimate/native/connections/registry.ts b/packages/opencode/src/altimate/native/connections/registry.ts index 5aaafdd640..2fa7e24237 100644 --- a/packages/opencode/src/altimate/native/connections/registry.ts +++ b/packages/opencode/src/altimate/native/connections/registry.ts @@ -128,6 +128,7 @@ const DRIVER_MAP: Record = { sqlite: "@altimateai/drivers/sqlite", mongodb: "@altimateai/drivers/mongodb", mongo: "@altimateai/drivers/mongodb", + clickhouse: "@altimateai/drivers/clickhouse", } async function createConnector(name: string, config: ConnectionConfig): Promise { @@ -193,6 +194,9 @@ async function createConnector(name: string, config: ConnectionConfig): Promise< case "@altimateai/drivers/mongodb": mod = await import("@altimateai/drivers/mongodb") break + case "@altimateai/drivers/clickhouse": + mod = await import("@altimateai/drivers/clickhouse") + break default: throw new Error(`No static import available for driver: ${driverPath}`) } diff --git a/packages/opencode/src/altimate/native/types.ts b/packages/opencode/src/altimate/native/types.ts index 8d0f3978fc..0ecefdd2d8 100644 --- a/packages/opencode/src/altimate/native/types.ts +++ b/packages/opencode/src/altimate/native/types.ts @@ -955,6 +955,38 @@ export interface LocalTestResult { error?: string } +// --- Data Diff --- + +export interface DataDiffParams { + /** Source table name (e.g. "orders", "db.schema.orders") or full SQL query */ + source: string + /** Target table name or SQL query */ + target: string + /** Primary key columns that uniquely identify each row */ + key_columns: string[] + /** Source warehouse connection name */ + source_warehouse?: string + /** Target warehouse connection name (defaults to source_warehouse) */ + target_warehouse?: string + /** Extra columns to compare beyond the key */ + extra_columns?: string[] + /** Algorithm: "auto" | "joindiff" | "hashdiff" | "profile" | "cascade" */ + algorithm?: string + /** Optional WHERE filter applied to both tables */ + where_clause?: string + /** Absolute numeric tolerance */ + numeric_tolerance?: number + /** Timestamp tolerance in milliseconds */ + timestamp_tolerance_ms?: number +} + +export interface DataDiffResult { + success: boolean + steps: number + outcome?: unknown + error?: string +} + // --- Method registry --- export const BridgeMethods = { @@ -998,6 +1030,8 @@ export const BridgeMethods = { // --- local testing --- "local.schema_sync": {} as { params: LocalSchemaSyncParams; result: LocalSchemaSyncResult }, "local.test": {} as { params: LocalTestParams; result: LocalTestResult }, + // --- data diff --- + "data.diff": {} as { params: DataDiffParams; result: DataDiffResult }, // --- altimate-core (existing) --- "altimate_core.validate": {} as { params: AltimateCoreValidateParams; result: AltimateCoreResult }, "altimate_core.lint": {} as { params: AltimateCoreLintParams; result: AltimateCoreResult }, diff --git a/packages/opencode/src/altimate/tools/data-diff.ts b/packages/opencode/src/altimate/tools/data-diff.ts new file mode 100644 index 0000000000..0719361dbe --- /dev/null +++ b/packages/opencode/src/altimate/tools/data-diff.ts @@ -0,0 +1,174 @@ +import z from "zod" +import { Tool } from "../../tool/tool" +import { Dispatcher } from "../native" + +export const DataDiffTool = Tool.define("data_diff", { + description: [ + "Compare two database tables or query results row-by-row to find differences.", + "", + "Two use cases:", + "1. Migration validation — compare the same table across two databases:", + ' source="orders" source_warehouse="postgres_prod" target_warehouse="snowflake_dw"', + "2. Query optimization — compare results of two SQL queries on the same database:", + ' source="SELECT id, amount FROM orders WHERE ..." target="SELECT id, amount FROM orders_v2 WHERE ..."', + "", + "Algorithms:", + "- auto: JoinDiff if same dialect, HashDiff if cross-database (default)", + "- joindiff: FULL OUTER JOIN (fast, same-database only)", + "- hashdiff: Bisection with checksums (cross-database, any scale)", + "- profile: Column-level statistics comparison", + ].join("\n"), + parameters: z.object({ + source: z.string().describe( + "Source table name (e.g. 'orders', 'db.schema.orders') or a full SQL query starting with SELECT/WITH", + ), + target: z.string().describe( + "Target table name or SQL query to compare against source", + ), + key_columns: z + .array(z.string()) + .describe("Primary key columns that uniquely identify each row (e.g. ['id'] or ['order_id', 'line_item'])"), + source_warehouse: z.string().optional().describe("Source warehouse connection name"), + target_warehouse: z.string().optional().describe( + "Target warehouse connection name. Omit to use the same warehouse as source (query comparison mode)", + ), + extra_columns: z + .array(z.string()) + .optional() + .describe("Additional columns to compare beyond the key columns. Omit to compare all columns"), + algorithm: z + .enum(["auto", "joindiff", "hashdiff", "profile", "cascade"]) + .optional() + .default("auto") + .describe("Comparison algorithm"), + where_clause: z.string().optional().describe("Optional WHERE filter applied to both tables"), + numeric_tolerance: z + .number() + .optional() + .describe("Absolute tolerance for numeric comparisons (e.g. 0.01 for cent-level tolerance)"), + timestamp_tolerance_ms: z + .number() + .optional() + .describe("Tolerance for timestamp comparisons in milliseconds"), + }), + async execute(args, ctx) { + // Require read permission — data diff executes SELECT queries + await ctx.ask({ + permission: "sql_execute_read", + patterns: [args.source.slice(0, 120), args.target.slice(0, 120)], + always: ["*"], + metadata: {}, + }) + + try { + const result = await Dispatcher.call("data.diff", { + source: args.source, + target: args.target, + key_columns: args.key_columns, + source_warehouse: args.source_warehouse, + target_warehouse: args.target_warehouse, + extra_columns: args.extra_columns, + algorithm: args.algorithm, + where_clause: args.where_clause, + numeric_tolerance: args.numeric_tolerance, + timestamp_tolerance_ms: args.timestamp_tolerance_ms, + }) + + if (!result.success) { + return { + title: "Data diff: ERROR", + metadata: { success: false, steps: result.steps }, + output: `Data diff failed: ${result.error}`, + } + } + + const outcome = result.outcome as any + const output = formatOutcome(outcome, args.source, args.target) + + return { + title: `Data diff: ${summarize(outcome)}`, + metadata: { success: true, steps: result.steps }, + output, + } + } catch (e) { + const msg = e instanceof Error ? e.message : String(e) + return { + title: "Data diff: ERROR", + metadata: { success: false, steps: 0, error: msg }, + output: `Data diff failed: ${msg}`, + } + } + }, +}) + +function summarize(outcome: any): string { + if (!outcome) return "complete" + if (outcome.Match) return "IDENTICAL ✓" + if (outcome.Diff) { + const r = outcome.Diff + const parts: string[] = [] + if (r.rows_only_in_source > 0) parts.push(`${r.rows_only_in_source} only in source`) + if (r.rows_only_in_target > 0) parts.push(`${r.rows_only_in_target} only in target`) + if (r.rows_updated > 0) parts.push(`${r.rows_updated} updated`) + return parts.length ? parts.join(", ") : "differences found" + } + if (outcome.Profile) return "profile complete" + return "complete" +} + +function formatOutcome(outcome: any, source: string, target: string): string { + if (!outcome) return "Comparison complete." + + const lines: string[] = [] + + if (outcome.Match) { + lines.push(`✓ Tables are IDENTICAL`) + const m = outcome.Match + if (m.row_count != null) lines.push(` Rows checked: ${m.row_count.toLocaleString()}`) + if (m.algorithm) lines.push(` Algorithm: ${m.algorithm}`) + return lines.join("\n") + } + + if (outcome.Diff) { + const r = outcome.Diff + lines.push(`✗ Tables DIFFER`) + lines.push(``) + lines.push(` Source: ${source}`) + lines.push(` Target: ${target}`) + lines.push(``) + + if (r.total_source_rows != null) lines.push(` Source rows: ${r.total_source_rows.toLocaleString()}`) + if (r.total_target_rows != null) lines.push(` Target rows: ${r.total_target_rows.toLocaleString()}`) + if (r.rows_only_in_source > 0) lines.push(` Only in source: ${r.rows_only_in_source.toLocaleString()}`) + if (r.rows_only_in_target > 0) lines.push(` Only in target: ${r.rows_only_in_target.toLocaleString()}`) + if (r.rows_updated > 0) lines.push(` Updated rows: ${r.rows_updated.toLocaleString()}`) + if (r.rows_identical > 0) lines.push(` Identical rows: ${r.rows_identical.toLocaleString()}`) + + if (r.sample_diffs?.length) { + lines.push(``) + lines.push(` Sample differences (first ${r.sample_diffs.length}):`) + for (const d of r.sample_diffs.slice(0, 5)) { + lines.push(` key=${JSON.stringify(d.key)} col=${d.column}: ${d.source_value} → ${d.target_value}`) + } + } + + return lines.join("\n") + } + + if (outcome.Profile) { + const p = outcome.Profile + lines.push(`Column Profile Comparison`) + lines.push(``) + for (const col of p.columns ?? []) { + const verdict = col.verdict === "match" ? "✓" : col.verdict === "within_tolerance" ? "~" : "✗" + lines.push(` ${verdict} ${col.column}: ${col.verdict}`) + if (col.source_stats && col.target_stats) { + lines.push(` source: count=${col.source_stats.count} nulls=${col.source_stats.null_count} min=${col.source_stats.min} max=${col.source_stats.max}`) + lines.push(` target: count=${col.target_stats.count} nulls=${col.target_stats.null_count} min=${col.target_stats.min} max=${col.target_stats.max}`) + } + } + return lines.join("\n") + } + + return JSON.stringify(outcome, null, 2) +} diff --git a/packages/opencode/src/tool/registry.ts b/packages/opencode/src/tool/registry.ts index 075291248f..e5fc1bf9c9 100644 --- a/packages/opencode/src/tool/registry.ts +++ b/packages/opencode/src/tool/registry.ts @@ -57,6 +57,7 @@ import { SqlFormatTool } from "../altimate/tools/sql-format" import { SqlFixTool } from "../altimate/tools/sql-fix" import { SqlAutocompleteTool } from "../altimate/tools/sql-autocomplete" import { SqlDiffTool } from "../altimate/tools/sql-diff" +import { DataDiffTool } from "../altimate/tools/data-diff" import { FinopsQueryHistoryTool } from "../altimate/tools/finops-query-history" import { FinopsAnalyzeCreditsTool } from "../altimate/tools/finops-analyze-credits" import { FinopsExpensiveQueriesTool } from "../altimate/tools/finops-expensive-queries" @@ -233,6 +234,7 @@ export namespace ToolRegistry { SqlFixTool, SqlAutocompleteTool, SqlDiffTool, + DataDiffTool, FinopsQueryHistoryTool, FinopsAnalyzeCreditsTool, FinopsExpensiveQueriesTool, From 4bc671607ba6099128cb36be3a01852a809bcc49 Mon Sep 17 00:00:00 2001 From: suryaiyer95 Date: Thu, 26 Mar 2026 18:21:06 -0700 Subject: [PATCH 2/4] feat: add partition support to data_diff Split large tables by a date or numeric column before diffing. Each partition is diffed independently then results are aggregated. New params: - partition_column: column to split on (date or numeric) - partition_granularity: day | week | month | year (for dates) - partition_bucket_size: bucket width for numeric columns New output field: - partition_results: per-partition breakdown (identical / differ / error) Dialect-aware SQL: Postgres, Snowflake, BigQuery, ClickHouse, MySQL. Skill updated with partition guidance and examples. --- .opencode/skills/data-parity/SKILL.md | 30 ++- .../altimate/native/connections/data-diff.ts | 233 +++++++++++++++++- .../opencode/src/altimate/native/types.ts | 35 +++ .../opencode/src/altimate/tools/data-diff.ts | 54 +++- 4 files changed, 348 insertions(+), 4 deletions(-) diff --git a/.opencode/skills/data-parity/SKILL.md b/.opencode/skills/data-parity/SKILL.md index 4d7b7460c9..3f739eda4b 100644 --- a/.opencode/skills/data-parity/SKILL.md +++ b/.opencode/skills/data-parity/SKILL.md @@ -44,6 +44,9 @@ description: Validate that two tables or query results are identical — or diag - `extra_columns` — columns to compare beyond keys (omit = compare all) - `algorithm` — `auto`, `joindiff`, `hashdiff`, `profile`, `cascade` - `where_clause` — filter applied to both tables +- `partition_column` — split the table by this column and diff each group independently (recommended for large tables) +- `partition_granularity` — `day` | `week` | `month` | `year` for date columns (default: `month`) +- `partition_bucket_size` — for numeric columns: bucket width (e.g. `100000` splits by ranges of 100K) > **CRITICAL — Algorithm choice:** > - If `source_warehouse` ≠ `target_warehouse` → **always use `hashdiff`** (or `auto`). @@ -117,8 +120,31 @@ SELECT COUNT(*) FROM orders Use this to choose the algorithm: - **< 1M rows**: `joindiff` (same DB) or `hashdiff` (cross-DB) — either is fine -- **1M–100M rows**: `hashdiff` or `cascade` -- **> 100M rows**: `hashdiff` with a `where_clause` date filter to validate a recent window first +- **1M–100M rows**: `hashdiff` with `partition_column` for faster, more precise results +- **> 100M rows**: `hashdiff` + `partition_column` — required; bisection alone may miss rows at this scale + +**When to use `partition_column`:** +- Table has a natural time or key column (e.g. `created_at`, `order_id`, `event_date`) +- Table has > 500K rows and bisection is slow or returning incomplete results +- You need per-partition visibility (which month/range has the problem) + +``` +// Date column — partition by month +data_diff(source="lineitem", target="lineitem", + key_columns=["l_orderkey", "l_linenumber"], + source_warehouse="pg_source", target_warehouse="pg_target", + partition_column="l_shipdate", partition_granularity="month", + algorithm="hashdiff") + +// Numeric column — partition by key ranges of 100K +data_diff(source="orders", target="orders", + key_columns=["o_orderkey"], + source_warehouse="pg_source", target_warehouse="pg_target", + partition_column="o_orderkey", partition_bucket_size=100000, + algorithm="hashdiff") +``` + +Output includes an aggregate diff plus a per-partition table showing exactly which ranges differ. ### Step 4: Profile first for unknown tables diff --git a/packages/opencode/src/altimate/native/connections/data-diff.ts b/packages/opencode/src/altimate/native/connections/data-diff.ts index 035df6b4ca..fe1c926f92 100644 --- a/packages/opencode/src/altimate/native/connections/data-diff.ts +++ b/packages/opencode/src/altimate/native/connections/data-diff.ts @@ -7,7 +7,7 @@ * This file is the bridge between that engine and altimate-code's drivers. */ -import type { DataDiffParams, DataDiffResult } from "../types" +import type { DataDiffParams, DataDiffResult, PartitionDiffResult } from "../types" import * as Registry from "./registry" // --------------------------------------------------------------------------- @@ -119,7 +119,238 @@ async function executeQuery(sql: string, warehouseName: string | undefined): Pro const MAX_STEPS = 200 +// --------------------------------------------------------------------------- +// Partition support +// --------------------------------------------------------------------------- + +/** + * Build a DATE_TRUNC expression appropriate for the warehouse dialect. + */ +function dateTruncExpr(granularity: string, column: string, dialect: string): string { + const g = granularity.toLowerCase() + switch (dialect) { + case "bigquery": + return `DATE_TRUNC(${column}, ${g.toUpperCase()})` + case "clickhouse": + return `toStartOf${g.charAt(0).toUpperCase() + g.slice(1)}(${column})` + case "mysql": + case "mariadb": { + const fmt = { day: "%Y-%m-%d", week: "%Y-%u", month: "%Y-%m-01", year: "%Y-01-01" }[g] ?? "%Y-%m-01" + return `DATE_FORMAT(${column}, '${fmt}')` + } + default: + // Postgres, Snowflake, Redshift, DuckDB, etc. + return `DATE_TRUNC('${g}', ${column})` + } +} + +/** + * Build SQL to discover distinct partition values from the source table. + */ +function buildPartitionDiscoverySQL( + table: string, + partitionColumn: string, + granularity: string | undefined, + bucketSize: number | undefined, + dialect: string, + whereClause?: string, +): string { + const isNumeric = bucketSize != null + + let expr: string + if (isNumeric) { + expr = `FLOOR(${partitionColumn} / ${bucketSize}) * ${bucketSize}` + } else { + expr = dateTruncExpr(granularity ?? "month", partitionColumn, dialect) + } + + const where = whereClause ? `WHERE ${whereClause}` : "" + return `SELECT DISTINCT ${expr} AS _p FROM ${table} ${where} ORDER BY _p` +} + +/** + * Build a WHERE clause that scopes to a single partition. + */ +function buildPartitionWhereClause( + partitionColumn: string, + partitionValue: string, + granularity: string | undefined, + bucketSize: number | undefined, + dialect: string, +): string { + if (bucketSize != null) { + const lo = Number(partitionValue) + const hi = lo + bucketSize + return `${partitionColumn} >= ${lo} AND ${partitionColumn} < ${hi}` + } + + const expr = dateTruncExpr(granularity ?? "month", partitionColumn, dialect) + + // Cast the literal appropriately per dialect + switch (dialect) { + case "bigquery": + return `${expr} = '${partitionValue}'` + case "clickhouse": + return `${expr} = toDate('${partitionValue}')` + case "mysql": + case "mariadb": + return `${expr} = '${partitionValue}'` + default: + return `${expr} = '${partitionValue}'` + } +} + +/** + * Extract DiffStats from a successful outcome (if present). + */ +function extractStats(outcome: unknown): { + rows_source: number + rows_target: number + differences: number + status: "identical" | "differ" +} { + const o = outcome as any + if (!o) return { rows_source: 0, rows_target: 0, differences: 0, status: "identical" } + + if (o.Match) { + return { + rows_source: o.Match.row_count ?? 0, + rows_target: o.Match.row_count ?? 0, + differences: 0, + status: "identical", + } + } + + if (o.Diff) { + const d = o.Diff + return { + rows_source: d.total_source_rows ?? 0, + rows_target: d.total_target_rows ?? 0, + differences: (d.rows_only_in_source ?? 0) + (d.rows_only_in_target ?? 0) + (d.rows_updated ?? 0), + status: "differ", + } + } + + return { rows_source: 0, rows_target: 0, differences: 0, status: "identical" } +} + +/** + * Merge two Diff outcomes into one aggregated Diff outcome. + */ +function mergeOutcomes(accumulated: unknown, next: unknown): unknown { + const a = accumulated as any + const n = next as any + + const aD = a?.Diff ?? (a?.Match ? { total_source_rows: a.Match.row_count, total_target_rows: a.Match.row_count, rows_only_in_source: 0, rows_only_in_target: 0, rows_updated: 0, rows_identical: a.Match.row_count, sample_diffs: [] } : null) + const nD = n?.Diff ?? (n?.Match ? { total_source_rows: n.Match.row_count, total_target_rows: n.Match.row_count, rows_only_in_source: 0, rows_only_in_target: 0, rows_updated: 0, rows_identical: n.Match.row_count, sample_diffs: [] } : null) + + if (!aD && !nD) return { Match: { row_count: 0 } } + if (!aD) return next + if (!nD) return accumulated + + const merged = { + total_source_rows: (aD.total_source_rows ?? 0) + (nD.total_source_rows ?? 0), + total_target_rows: (aD.total_target_rows ?? 0) + (nD.total_target_rows ?? 0), + rows_only_in_source: (aD.rows_only_in_source ?? 0) + (nD.rows_only_in_source ?? 0), + rows_only_in_target: (aD.rows_only_in_target ?? 0) + (nD.rows_only_in_target ?? 0), + rows_updated: (aD.rows_updated ?? 0) + (nD.rows_updated ?? 0), + rows_identical: (aD.rows_identical ?? 0) + (nD.rows_identical ?? 0), + sample_diffs: [...(aD.sample_diffs ?? []), ...(nD.sample_diffs ?? [])].slice(0, 20), + } + + const totalDiff = merged.rows_only_in_source + merged.rows_only_in_target + merged.rows_updated + if (totalDiff === 0) { + return { Match: { row_count: merged.total_source_rows, algorithm: "partitioned" } } + } + return { Diff: merged } +} + +/** + * Run a partitioned diff: discover partition values, diff each partition independently, + * then aggregate results. + */ +async function runPartitionedDiff(params: DataDiffParams): Promise { + const resolveDialect = (warehouse: string | undefined): string => { + if (warehouse) { + const cfg = Registry.getConfig(warehouse) + return cfg?.type ?? "generic" + } + const warehouses = Registry.list().warehouses + return warehouses[0]?.type ?? "generic" + } + + const sourceDialect = resolveDialect(params.source_warehouse) + const { table1Name } = resolveTableSources(params.source, params.target) + + // Discover partition values from source + const discoverySql = buildPartitionDiscoverySQL( + table1Name, + params.partition_column!, + params.partition_granularity, + params.partition_bucket_size, + sourceDialect, + params.where_clause, + ) + + let partitionValues: string[] + try { + const rows = await executeQuery(discoverySql, params.source_warehouse) + partitionValues = rows.map((r) => String(r[0] ?? "")).filter(Boolean) + } catch (e) { + return { success: false, error: `Partition discovery failed: ${e}`, steps: 0 } + } + + if (partitionValues.length === 0) { + return { success: true, steps: 1, outcome: { Match: { row_count: 0, algorithm: "partitioned" } }, partition_results: [] } + } + + // Diff each partition + const partitionResults: PartitionDiffResult[] = [] + let aggregatedOutcome: unknown = null + let totalSteps = 1 + + for (const pVal of partitionValues) { + const partWhere = buildPartitionWhereClause( + params.partition_column!, + pVal, + params.partition_granularity, + params.partition_bucket_size, + sourceDialect, + ) + const fullWhere = params.where_clause ? `(${params.where_clause}) AND (${partWhere})` : partWhere + + const result = await runDataDiff({ + ...params, + where_clause: fullWhere, + partition_column: undefined, // prevent recursion + }) + + totalSteps += result.steps + + if (!result.success) { + partitionResults.push({ partition: pVal, rows_source: 0, rows_target: 0, differences: 0, status: "error", error: result.error }) + continue + } + + const stats = extractStats(result.outcome) + partitionResults.push({ partition: pVal, ...stats }) + aggregatedOutcome = aggregatedOutcome == null ? result.outcome : mergeOutcomes(aggregatedOutcome, result.outcome) + } + + return { + success: true, + steps: totalSteps, + outcome: aggregatedOutcome ?? { Match: { row_count: 0, algorithm: "partitioned" } }, + partition_results: partitionResults, + } +} + export async function runDataDiff(params: DataDiffParams): Promise { + // Dispatch to partitioned diff if partition_column is set + if (params.partition_column) { + return runPartitionedDiff(params) + } + // Dynamically import NAPI module (not available in test environments without the binary) let DataParitySession: new (specJson: string) => { start(): string diff --git a/packages/opencode/src/altimate/native/types.ts b/packages/opencode/src/altimate/native/types.ts index 0ecefdd2d8..0a3ffce9c9 100644 --- a/packages/opencode/src/altimate/native/types.ts +++ b/packages/opencode/src/altimate/native/types.ts @@ -978,6 +978,39 @@ export interface DataDiffParams { numeric_tolerance?: number /** Timestamp tolerance in milliseconds */ timestamp_tolerance_ms?: number + /** + * Column to partition on before diffing. The table is split into groups by + * this column and each group is diffed independently. Results are aggregated. + * Use for large tables where bisection alone is too slow or imprecise. + * + * Examples: "l_shipdate" (date column), "l_orderkey" (numeric column) + */ + partition_column?: string + /** + * Granularity for date partition columns: "day" | "week" | "month" | "year". + * For numeric columns, ignored — use partition_bucket_size instead. + * Defaults to "month". + */ + partition_granularity?: "day" | "week" | "month" | "year" + /** + * For numeric partition columns: size of each bucket. + * E.g. 100000 splits l_orderkey into [0, 100000), [100000, 200000), … + */ + partition_bucket_size?: number +} + +export interface PartitionDiffResult { + /** The partition value (date string or numeric bucket start) */ + partition: string + /** Source row count in this partition */ + rows_source: number + /** Target row count in this partition */ + rows_target: number + /** Total differences found (exclusive + updated) */ + differences: number + /** "identical" | "differ" | "error" */ + status: "identical" | "differ" | "error" + error?: string } export interface DataDiffResult { @@ -985,6 +1018,8 @@ export interface DataDiffResult { steps: number outcome?: unknown error?: string + /** Per-partition breakdown when partition_column is used */ + partition_results?: PartitionDiffResult[] } // --- Method registry --- diff --git a/packages/opencode/src/altimate/tools/data-diff.ts b/packages/opencode/src/altimate/tools/data-diff.ts index 0719361dbe..767921e2e8 100644 --- a/packages/opencode/src/altimate/tools/data-diff.ts +++ b/packages/opencode/src/altimate/tools/data-diff.ts @@ -50,6 +50,23 @@ export const DataDiffTool = Tool.define("data_diff", { .number() .optional() .describe("Tolerance for timestamp comparisons in milliseconds"), + partition_column: z + .string() + .optional() + .describe( + "Column to partition on before diffing. Splits the table into groups and diffs each independently. " + + "Use for large tables to get faster, more precise results. " + + "Examples: 'l_shipdate' (date), 'l_orderkey' (numeric). " + + "Results are aggregated with a per-partition breakdown showing which groups have differences.", + ), + partition_granularity: z + .enum(["day", "week", "month", "year"]) + .optional() + .describe("Granularity for date partition columns. Defaults to 'month'."), + partition_bucket_size: z + .number() + .optional() + .describe("For numeric partition columns: size of each bucket. E.g. 100000 splits orders into ranges of 100K keys."), }), async execute(args, ctx) { // Require read permission — data diff executes SELECT queries @@ -72,6 +89,9 @@ export const DataDiffTool = Tool.define("data_diff", { where_clause: args.where_clause, numeric_tolerance: args.numeric_tolerance, timestamp_tolerance_ms: args.timestamp_tolerance_ms, + partition_column: args.partition_column, + partition_granularity: args.partition_granularity, + partition_bucket_size: args.partition_bucket_size, }) if (!result.success) { @@ -83,7 +103,11 @@ export const DataDiffTool = Tool.define("data_diff", { } const outcome = result.outcome as any - const output = formatOutcome(outcome, args.source, args.target) + let output = formatOutcome(outcome, args.source, args.target) + + if (result.partition_results?.length) { + output += formatPartitionResults(result.partition_results, args.partition_column!) + } return { title: `Data diff: ${summarize(outcome)}`, @@ -172,3 +196,31 @@ function formatOutcome(outcome: any, source: string, target: string): string { return JSON.stringify(outcome, null, 2) } + +function formatPartitionResults( + partitions: Array<{ partition: string; rows_source: number; rows_target: number; differences: number; status: string; error?: string }>, + partitionColumn: string, +): string { + const lines: string[] = ["", `Partition breakdown (by ${partitionColumn}):`] + + const clean = partitions.filter((p) => p.status === "identical") + const dirty = partitions.filter((p) => p.status === "differ") + const errored = partitions.filter((p) => p.status === "error") + + if (dirty.length === 0 && errored.length === 0) { + lines.push(` ✓ All ${partitions.length} partitions identical`) + return lines.join("\n") + } + + for (const p of dirty) { + lines.push(` ✗ ${p.partition} source=${p.rows_source.toLocaleString()} target=${p.rows_target.toLocaleString()} diff=${p.differences.toLocaleString()}`) + } + for (const p of errored) { + lines.push(` ! ${p.partition} ERROR: ${p.error}`) + } + if (clean.length > 0) { + lines.push(` ✓ ${clean.length} partition${clean.length === 1 ? "" : "s"} identical`) + } + + return lines.join("\n") +} From fe0bf67e3c9def186d85287271116cbc1886e690 Mon Sep 17 00:00:00 2001 From: suryaiyer95 Date: Thu, 26 Mar 2026 18:23:05 -0700 Subject: [PATCH 3/4] feat: add categorical partition mode (string, enum, boolean) When partition_column is set without partition_granularity or partition_bucket_size, groups by raw DISTINCT values. Works for any non-date, non-numeric column: status, region, country, etc. WHERE clause uses equality: col = 'value' with proper escaping. --- .opencode/skills/data-parity/SKILL.md | 16 ++++++-- .../altimate/native/connections/data-diff.ts | 41 +++++++++++++++---- .../opencode/src/altimate/tools/data-diff.ts | 10 +++-- 3 files changed, 53 insertions(+), 14 deletions(-) diff --git a/.opencode/skills/data-parity/SKILL.md b/.opencode/skills/data-parity/SKILL.md index 3f739eda4b..4d47be8036 100644 --- a/.opencode/skills/data-parity/SKILL.md +++ b/.opencode/skills/data-parity/SKILL.md @@ -44,9 +44,12 @@ description: Validate that two tables or query results are identical — or diag - `extra_columns` — columns to compare beyond keys (omit = compare all) - `algorithm` — `auto`, `joindiff`, `hashdiff`, `profile`, `cascade` - `where_clause` — filter applied to both tables -- `partition_column` — split the table by this column and diff each group independently (recommended for large tables) -- `partition_granularity` — `day` | `week` | `month` | `year` for date columns (default: `month`) -- `partition_bucket_size` — for numeric columns: bucket width (e.g. `100000` splits by ranges of 100K) +- `partition_column` — split the table by this column and diff each group independently (recommended for large tables); three modes: + - **Date column**: set `partition_granularity` → groups by truncated date periods + - **Numeric column**: set `partition_bucket_size` → groups by equal-width key ranges + - **Categorical column**: set neither → groups by distinct values (strings, enums, booleans like `status`, `region`, `country`) +- `partition_granularity` — `day` | `week` | `month` | `year` — only for date columns +- `partition_bucket_size` — bucket width for numeric columns (e.g. `100000`) > **CRITICAL — Algorithm choice:** > - If `source_warehouse` ≠ `target_warehouse` → **always use `hashdiff`** (or `auto`). @@ -142,6 +145,13 @@ data_diff(source="orders", target="orders", source_warehouse="pg_source", target_warehouse="pg_target", partition_column="o_orderkey", partition_bucket_size=100000, algorithm="hashdiff") + +// Categorical column — partition by distinct status values ('O', 'F', 'P') +data_diff(source="orders", target="orders", + key_columns=["o_orderkey"], + source_warehouse="pg_source", target_warehouse="pg_target", + partition_column="o_orderstatus", // no granularity or bucket_size needed + algorithm="hashdiff") ``` Output includes an aggregate diff plus a per-partition table showing exactly which ranges differ. diff --git a/packages/opencode/src/altimate/native/connections/data-diff.ts b/packages/opencode/src/altimate/native/connections/data-diff.ts index fe1c926f92..98609b744a 100644 --- a/packages/opencode/src/altimate/native/connections/data-diff.ts +++ b/packages/opencode/src/altimate/native/connections/data-diff.ts @@ -144,6 +144,21 @@ function dateTruncExpr(granularity: string, column: string, dialect: string): st } } +/** + * Determine the partition mode based on which params are provided. + * - "date" → partition_granularity is set (or column looks like a date) + * - "numeric" → partition_bucket_size is set + * - "categorical" → neither — use DISTINCT values directly (string, enum, boolean) + */ +function partitionMode( + granularity: string | undefined, + bucketSize: number | undefined, +): "date" | "numeric" | "categorical" { + if (bucketSize != null) return "numeric" + if (granularity != null) return "date" + return "categorical" +} + /** * Build SQL to discover distinct partition values from the source table. */ @@ -155,16 +170,19 @@ function buildPartitionDiscoverySQL( dialect: string, whereClause?: string, ): string { - const isNumeric = bucketSize != null + const where = whereClause ? `WHERE ${whereClause}` : "" + const mode = partitionMode(granularity, bucketSize) let expr: string - if (isNumeric) { + if (mode === "numeric") { expr = `FLOOR(${partitionColumn} / ${bucketSize}) * ${bucketSize}` + } else if (mode === "date") { + expr = dateTruncExpr(granularity!, partitionColumn, dialect) } else { - expr = dateTruncExpr(granularity ?? "month", partitionColumn, dialect) + // categorical — raw distinct values, no transformation + expr = partitionColumn } - const where = whereClause ? `WHERE ${whereClause}` : "" return `SELECT DISTINCT ${expr} AS _p FROM ${table} ${where} ORDER BY _p` } @@ -178,13 +196,22 @@ function buildPartitionWhereClause( bucketSize: number | undefined, dialect: string, ): string { - if (bucketSize != null) { + const mode = partitionMode(granularity, bucketSize) + + if (mode === "numeric") { const lo = Number(partitionValue) - const hi = lo + bucketSize + const hi = lo + bucketSize! return `${partitionColumn} >= ${lo} AND ${partitionColumn} < ${hi}` } - const expr = dateTruncExpr(granularity ?? "month", partitionColumn, dialect) + if (mode === "categorical") { + // Quote the value — works for strings, enums, booleans + const escaped = partitionValue.replace(/'/g, "''") + return `${partitionColumn} = '${escaped}'` + } + + // date mode + const expr = dateTruncExpr(granularity!, partitionColumn, dialect) // Cast the literal appropriately per dialect switch (dialect) { diff --git a/packages/opencode/src/altimate/tools/data-diff.ts b/packages/opencode/src/altimate/tools/data-diff.ts index 767921e2e8..fc56e0da6d 100644 --- a/packages/opencode/src/altimate/tools/data-diff.ts +++ b/packages/opencode/src/altimate/tools/data-diff.ts @@ -55,18 +55,20 @@ export const DataDiffTool = Tool.define("data_diff", { .optional() .describe( "Column to partition on before diffing. Splits the table into groups and diffs each independently. " + - "Use for large tables to get faster, more precise results. " + - "Examples: 'l_shipdate' (date), 'l_orderkey' (numeric). " + + "Three modes depending on which other params you set:\n" + + " • Date column → set partition_granularity (day/week/month/year). E.g. partition_column='l_shipdate', partition_granularity='month'\n" + + " • Numeric column → set partition_bucket_size. E.g. partition_column='l_orderkey', partition_bucket_size=100000\n" + + " • Categorical → set neither. Works for string/enum/boolean columns like 'status', 'region', 'country'. Groups by distinct values.\n" + "Results are aggregated with a per-partition breakdown showing which groups have differences.", ), partition_granularity: z .enum(["day", "week", "month", "year"]) .optional() - .describe("Granularity for date partition columns. Defaults to 'month'."), + .describe("For date partition columns: truncation granularity. Omit for numeric or categorical columns."), partition_bucket_size: z .number() .optional() - .describe("For numeric partition columns: size of each bucket. E.g. 100000 splits orders into ranges of 100K keys."), + .describe("For numeric partition columns: size of each bucket. E.g. 100000 splits l_orderkey into ranges of 100K. Omit for date or categorical columns."), }), async execute(args, ctx) { // Require read permission — data diff executes SELECT queries From b850a1d18e00eca552b5fdaa4d0c8899702de6fb Mon Sep 17 00:00:00 2001 From: suryaiyer95 Date: Thu, 26 Mar 2026 18:41:15 -0700 Subject: [PATCH 4/4] fix: correct outcome shape handling in extractStats and formatOutcome Rust serializes ReladiffOutcome with serde tag 'mode', producing: {mode: 'diff', diff_rows: [...], stats: {rows_table1, rows_table2, exclusive_table1, exclusive_table2, updated, unchanged}} Previous code checked for {Match: {...}} / {Diff: {...}} shapes that never matched, causing partitioned diff to report all partitions as 'identical' with 0 rows. - extractStats(): check outcome.mode === 'diff', read from stats fields - mergeOutcomes(): aggregate mode-based outcomes correctly - summarize()/formatOutcome(): display mode-based shape with correct labels --- .../altimate/native/connections/data-diff.ts | 72 +++++++++--------- .../opencode/src/altimate/tools/data-diff.ts | 76 +++++++++++-------- 2 files changed, 82 insertions(+), 66 deletions(-) diff --git a/packages/opencode/src/altimate/native/connections/data-diff.ts b/packages/opencode/src/altimate/native/connections/data-diff.ts index 98609b744a..6c4f2e7a61 100644 --- a/packages/opencode/src/altimate/native/connections/data-diff.ts +++ b/packages/opencode/src/altimate/native/connections/data-diff.ts @@ -229,6 +229,9 @@ function buildPartitionWhereClause( /** * Extract DiffStats from a successful outcome (if present). + * + * Rust serializes ReladiffOutcome as: {mode: "diff", diff_rows: [...], stats: {...}} + * stats fields: rows_table1, rows_table2, exclusive_table1, exclusive_table2, updated, unchanged */ function extractStats(outcome: unknown): { rows_source: number @@ -239,22 +242,17 @@ function extractStats(outcome: unknown): { const o = outcome as any if (!o) return { rows_source: 0, rows_target: 0, differences: 0, status: "identical" } - if (o.Match) { - return { - rows_source: o.Match.row_count ?? 0, - rows_target: o.Match.row_count ?? 0, - differences: 0, - status: "identical", - } - } - - if (o.Diff) { - const d = o.Diff + if (o.mode === "diff") { + const s = o.stats ?? {} + const exclusive1 = Number(s.exclusive_table1 ?? 0) + const exclusive2 = Number(s.exclusive_table2 ?? 0) + const updated = Number(s.updated ?? 0) + const differences = exclusive1 + exclusive2 + updated return { - rows_source: d.total_source_rows ?? 0, - rows_target: d.total_target_rows ?? 0, - differences: (d.rows_only_in_source ?? 0) + (d.rows_only_in_target ?? 0) + (d.rows_updated ?? 0), - status: "differ", + rows_source: Number(s.rows_table1 ?? 0), + rows_target: Number(s.rows_table2 ?? 0), + differences, + status: differences > 0 ? "differ" : "identical", } } @@ -262,34 +260,36 @@ function extractStats(outcome: unknown): { } /** - * Merge two Diff outcomes into one aggregated Diff outcome. + * Merge two diff outcomes into one aggregated outcome. + * + * Both outcomes use the Rust shape: {mode: "diff", diff_rows: [...], stats: {...}} */ function mergeOutcomes(accumulated: unknown, next: unknown): unknown { + if (!accumulated) return next + if (!next) return accumulated + const a = accumulated as any const n = next as any - const aD = a?.Diff ?? (a?.Match ? { total_source_rows: a.Match.row_count, total_target_rows: a.Match.row_count, rows_only_in_source: 0, rows_only_in_target: 0, rows_updated: 0, rows_identical: a.Match.row_count, sample_diffs: [] } : null) - const nD = n?.Diff ?? (n?.Match ? { total_source_rows: n.Match.row_count, total_target_rows: n.Match.row_count, rows_only_in_source: 0, rows_only_in_target: 0, rows_updated: 0, rows_identical: n.Match.row_count, sample_diffs: [] } : null) - - if (!aD && !nD) return { Match: { row_count: 0 } } - if (!aD) return next - if (!nD) return accumulated - - const merged = { - total_source_rows: (aD.total_source_rows ?? 0) + (nD.total_source_rows ?? 0), - total_target_rows: (aD.total_target_rows ?? 0) + (nD.total_target_rows ?? 0), - rows_only_in_source: (aD.rows_only_in_source ?? 0) + (nD.rows_only_in_source ?? 0), - rows_only_in_target: (aD.rows_only_in_target ?? 0) + (nD.rows_only_in_target ?? 0), - rows_updated: (aD.rows_updated ?? 0) + (nD.rows_updated ?? 0), - rows_identical: (aD.rows_identical ?? 0) + (nD.rows_identical ?? 0), - sample_diffs: [...(aD.sample_diffs ?? []), ...(nD.sample_diffs ?? [])].slice(0, 20), - } + const aS = a.stats ?? {} + const nS = n.stats ?? {} + + const rows_table1 = (Number(aS.rows_table1) || 0) + (Number(nS.rows_table1) || 0) + const rows_table2 = (Number(aS.rows_table2) || 0) + (Number(nS.rows_table2) || 0) + const exclusive_table1 = (Number(aS.exclusive_table1) || 0) + (Number(nS.exclusive_table1) || 0) + const exclusive_table2 = (Number(aS.exclusive_table2) || 0) + (Number(nS.exclusive_table2) || 0) + const updated = (Number(aS.updated) || 0) + (Number(nS.updated) || 0) + const unchanged = (Number(aS.unchanged) || 0) + (Number(nS.unchanged) || 0) - const totalDiff = merged.rows_only_in_source + merged.rows_only_in_target + merged.rows_updated - if (totalDiff === 0) { - return { Match: { row_count: merged.total_source_rows, algorithm: "partitioned" } } + const totalRows = rows_table1 + rows_table2 + const totalDiff = exclusive_table1 + exclusive_table2 + updated + const diff_percent = totalRows > 0 ? (totalDiff / totalRows) * 100 : 0 + + return { + mode: "diff", + diff_rows: [...(a.diff_rows ?? []), ...(n.diff_rows ?? [])].slice(0, 100), + stats: { rows_table1, rows_table2, exclusive_table1, exclusive_table2, updated, unchanged, diff_percent }, } - return { Diff: merged } } /** diff --git a/packages/opencode/src/altimate/tools/data-diff.ts b/packages/opencode/src/altimate/tools/data-diff.ts index fc56e0da6d..d498eefe7e 100644 --- a/packages/opencode/src/altimate/tools/data-diff.ts +++ b/packages/opencode/src/altimate/tools/data-diff.ts @@ -129,16 +129,23 @@ export const DataDiffTool = Tool.define("data_diff", { function summarize(outcome: any): string { if (!outcome) return "complete" - if (outcome.Match) return "IDENTICAL ✓" - if (outcome.Diff) { - const r = outcome.Diff + + // Rust serializes ReladiffOutcome as {mode: "diff"|"profile"|..., stats: {...}, diff_rows: [...]} + if (outcome.mode === "diff") { + const s = outcome.stats ?? {} + const e1 = Number(s.exclusive_table1 ?? 0) + const e2 = Number(s.exclusive_table2 ?? 0) + const upd = Number(s.updated ?? 0) + if (e1 === 0 && e2 === 0 && upd === 0) return "IDENTICAL ✓" const parts: string[] = [] - if (r.rows_only_in_source > 0) parts.push(`${r.rows_only_in_source} only in source`) - if (r.rows_only_in_target > 0) parts.push(`${r.rows_only_in_target} only in target`) - if (r.rows_updated > 0) parts.push(`${r.rows_updated} updated`) - return parts.length ? parts.join(", ") : "differences found" + if (e1 > 0) parts.push(`${e1} only in source`) + if (e2 > 0) parts.push(`${e2} only in target`) + if (upd > 0) parts.push(`${upd} updated`) + return parts.join(", ") } - if (outcome.Profile) return "profile complete" + if (outcome.mode === "profile") return "profile complete" + if (outcome.mode === "cascade") return "cascade complete" + return "complete" } @@ -147,45 +154,54 @@ function formatOutcome(outcome: any, source: string, target: string): string { const lines: string[] = [] - if (outcome.Match) { - lines.push(`✓ Tables are IDENTICAL`) - const m = outcome.Match - if (m.row_count != null) lines.push(` Rows checked: ${m.row_count.toLocaleString()}`) - if (m.algorithm) lines.push(` Algorithm: ${m.algorithm}`) - return lines.join("\n") - } + // Rust serializes ReladiffOutcome as {mode: "diff", diff_rows: [...], stats: {...}} + // stats: rows_table1, rows_table2, exclusive_table1, exclusive_table2, updated, unchanged + if (outcome.mode === "diff") { + const s = outcome.stats ?? {} + const rows1 = Number(s.rows_table1 ?? 0) + const rows2 = Number(s.rows_table2 ?? 0) + const e1 = Number(s.exclusive_table1 ?? 0) + const e2 = Number(s.exclusive_table2 ?? 0) + const updated = Number(s.updated ?? 0) + const unchanged = Number(s.unchanged ?? 0) + + if (e1 === 0 && e2 === 0 && updated === 0) { + lines.push(`✓ Tables are IDENTICAL`) + if (rows1 > 0) lines.push(` Rows checked: ${rows1.toLocaleString()}`) + return lines.join("\n") + } - if (outcome.Diff) { - const r = outcome.Diff lines.push(`✗ Tables DIFFER`) lines.push(``) lines.push(` Source: ${source}`) lines.push(` Target: ${target}`) lines.push(``) - if (r.total_source_rows != null) lines.push(` Source rows: ${r.total_source_rows.toLocaleString()}`) - if (r.total_target_rows != null) lines.push(` Target rows: ${r.total_target_rows.toLocaleString()}`) - if (r.rows_only_in_source > 0) lines.push(` Only in source: ${r.rows_only_in_source.toLocaleString()}`) - if (r.rows_only_in_target > 0) lines.push(` Only in target: ${r.rows_only_in_target.toLocaleString()}`) - if (r.rows_updated > 0) lines.push(` Updated rows: ${r.rows_updated.toLocaleString()}`) - if (r.rows_identical > 0) lines.push(` Identical rows: ${r.rows_identical.toLocaleString()}`) + if (rows1 > 0) lines.push(` Source rows: ${rows1.toLocaleString()}`) + if (rows2 > 0) lines.push(` Target rows: ${rows2.toLocaleString()}`) + if (e1 > 0) lines.push(` Only in source: ${e1.toLocaleString()}`) + if (e2 > 0) lines.push(` Only in target: ${e2.toLocaleString()}`) + if (updated > 0) lines.push(` Updated rows: ${updated.toLocaleString()}`) + if (unchanged > 0) lines.push(` Identical rows: ${unchanged.toLocaleString()}`) - if (r.sample_diffs?.length) { + const diffRows = outcome.diff_rows ?? [] + if (diffRows.length > 0) { lines.push(``) - lines.push(` Sample differences (first ${r.sample_diffs.length}):`) - for (const d of r.sample_diffs.slice(0, 5)) { - lines.push(` key=${JSON.stringify(d.key)} col=${d.column}: ${d.source_value} → ${d.target_value}`) + lines.push(` Sample differences (first ${Math.min(diffRows.length, 5)}):`) + for (const d of diffRows.slice(0, 5)) { + const label = d.sign === "-" ? "source only" : "target only" + lines.push(` [${label}] ${d.values?.join(" | ")}`) } } return lines.join("\n") } - if (outcome.Profile) { - const p = outcome.Profile + if (outcome.mode === "profile") { + const cols = outcome.column_stats ?? outcome.columns ?? [] lines.push(`Column Profile Comparison`) lines.push(``) - for (const col of p.columns ?? []) { + for (const col of cols) { const verdict = col.verdict === "match" ? "✓" : col.verdict === "within_tolerance" ? "~" : "✗" lines.push(` ${verdict} ${col.column}: ${col.verdict}`) if (col.source_stats && col.target_stats) {