From 9745b3458db7aaf64e1f8fea126a837180db0a05 Mon Sep 17 00:00:00 2001 From: Tyler Adkins Date: Fri, 24 Apr 2026 13:12:13 -0400 Subject: [PATCH 1/5] feat: onboard discovered-but-not-yet-onboarded datasets Previously `dataset onboard ` failed with "dataset not found" when given the ID of a dataset that was discovered but not yet onboarded, even though `dataset list` shows those IDs. Fall back to looking up discovered datasets across datasources and promote on the fly. Co-Authored-By: Claude Opus 4.7 (1M context) --- go/cmd/dataset_onboard.go | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/go/cmd/dataset_onboard.go b/go/cmd/dataset_onboard.go index 09df6c8..5dbc540 100644 --- a/go/cmd/dataset_onboard.go +++ b/go/cmd/dataset_onboard.go @@ -55,7 +55,42 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: } } if datasetName == "" { - return output.Errorf(2, "dataset '%s' not found", datasetID) + // Not onboarded yet — look across discovered datasets and promote on the fly. + dsPage, dsErr := client.ListDatasources(0, 500) + if dsErr != nil { + return dsErr + } + var foundDatasourceID, foundDatasourceName string + var foundDiscovered *api.DiscoveredDataset + for _, ds := range dsPage.Content { + discPage, discErr := client.ListDiscoveredDatasets(ds.ID, 0, 500) + if discErr != nil { + continue + } + for i := range discPage.Content { + d := &discPage.Content[i] + if d.ID == datasetID { + foundDatasourceID = ds.ID + foundDatasourceName = ds.Name + foundDiscovered = d + break + } + } + if foundDiscovered != nil { + break + } + } + if foundDiscovered == nil { + return output.Errorf(2, "dataset '%s' not found", datasetID) + } + fmt.Println(output.Dim.Render(" Onboarding dataset...")) + if err := client.OnboardDiscoveredDatasets(foundDatasourceID, api.OnboardDatasetsRequest{ + DiscoveredDatasetIDs: []string{datasetID}, + }); err != nil { + return err + } + datasetName = foundDiscovered.Name + qualifiedName = foundDatasourceName + "/" + strings.ReplaceAll(foundDiscovered.QualifiedName, ".", "/") } fmt.Printf(" Dataset: %s\n\n", output.Bold.Render(datasetName)) From 4c8518ede314e6cd10777a7badd7d2e89aa277ac Mon Sep 17 00:00:00 2001 From: Tyler Adkins Date: Fri, 24 Apr 2026 13:57:37 -0400 Subject: [PATCH 2/5] feat: --unique-keys on dataset diagnostics for failed rows collection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Soda Cloud requires uniqueKeyColumnNames to be set for failed rows collection to actually work. Expose it via a new --unique-keys flag on `dataset diagnostics` and include it in the displayed config. Uses read-modify-write on failedRowsConfiguration so partial updates (e.g. setting keys without touching enabled) don't clobber untouched fields — the API replaces the whole section otherwise. Co-Authored-By: Claude Opus 4.7 (1M context) --- go/cmd/dataset.go | 32 ++++++++++++++++++++++++++++---- go/internal/api/datasets.go | 10 ++++++---- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/go/cmd/dataset.go b/go/cmd/dataset.go index 3f8dc94..81a757a 100644 --- a/go/cmd/dataset.go +++ b/go/cmd/dataset.go @@ -523,6 +523,8 @@ var datasetDiagnosticsCmd = &cobra.Command{ noCollectResults, _ := cmd.Flags().GetBool("no-collect-results") collectFailedRows, _ := cmd.Flags().GetBool("collect-failed-rows") noCollectFailedRows, _ := cmd.Flags().GetBool("no-collect-failed-rows") + uniqueKeys, _ := cmd.Flags().GetStringSlice("unique-keys") + hasUniqueKeys := cmd.Flags().Changed("unique-keys") // flags not yet in the public API — fail fast with a clear message unsupportedFlags := []string{"schema", "table-prefix", "table-suffix", "failed-rows-description", "expose-failed-rows-query", "no-expose-failed-rows-query", "failed-rows-cta", "no-failed-rows-cta"} @@ -538,7 +540,7 @@ var datasetDiagnosticsCmd = &cobra.Command{ } // no flags → show current settings - if !collectResults && !noCollectResults && !collectFailedRows && !noCollectFailedRows { + if !collectResults && !noCollectResults && !collectFailedRows && !noCollectFailedRows && !hasUniqueKeys { result, err := client.GetDatasetDiagnostics(args[0]) if err != nil { return err @@ -569,6 +571,9 @@ var datasetDiagnosticsCmd = &cobra.Command{ if result.FailedRowsConfiguration.State != "" { fmt.Printf(" %-28s %s\n", output.Bold.Render("State"), result.FailedRowsConfiguration.State) } + if len(result.FailedRowsConfiguration.UniqueKeyColumnNames) > 0 { + fmt.Printf(" %-28s %s\n", output.Bold.Render("Unique key columns"), strings.Join(result.FailedRowsConfiguration.UniqueKeyColumnNames, ", ")) + } } return nil } @@ -579,9 +584,27 @@ var datasetDiagnosticsCmd = &cobra.Command{ enabled := collectResults cfg.ScanAndResultsConfiguration = &api.DiagnosticsScanConfig{Enabled: &enabled} } - if collectFailedRows || noCollectFailedRows { - enabled := collectFailedRows - cfg.FailedRowsConfiguration = &api.DiagnosticsFailedRowsConfig{Enabled: &enabled} + if collectFailedRows || noCollectFailedRows || hasUniqueKeys { + // Seed from current state — the API replaces the whole + // failedRowsConfiguration object, so untouched fields would be reset. + current, err := client.GetDatasetDiagnostics(args[0]) + if err != nil { + return err + } + fr := &api.DiagnosticsFailedRowsConfig{} + if current.FailedRowsConfiguration != nil { + enabled := current.FailedRowsConfiguration.Enabled + fr.Enabled = &enabled + fr.UniqueKeyColumnNames = current.FailedRowsConfiguration.UniqueKeyColumnNames + } + if collectFailedRows || noCollectFailedRows { + enabled := collectFailedRows + fr.Enabled = &enabled + } + if hasUniqueKeys { + fr.UniqueKeyColumnNames = uniqueKeys + } + cfg.FailedRowsConfiguration = fr } if _, err := client.UpdateDatasetDiagnostics(args[0], cfg); err != nil { @@ -781,6 +804,7 @@ func init() { datasetDiagnosticsCmd.Flags().Bool("no-collect-results", false, "Disable storing check results and scan history") datasetDiagnosticsCmd.Flags().Bool("collect-failed-rows", false, "Store failed rows") datasetDiagnosticsCmd.Flags().Bool("no-collect-failed-rows", false, "Disable storing failed rows") + datasetDiagnosticsCmd.Flags().StringSlice("unique-keys", nil, "Unique key columns for failed rows collection (comma-separated or repeated)") datasetDiagnosticsCmd.Flags().String("table-prefix", "", "Prefix for diagnostic table names") datasetDiagnosticsCmd.Flags().String("table-suffix", "", "Suffix for diagnostic table names") datasetDiagnosticsCmd.Flags().String("failed-rows-description", "", "Description for failed rows storage context") diff --git a/go/internal/api/datasets.go b/go/internal/api/datasets.go index c46aa6d..93d5b42 100644 --- a/go/internal/api/datasets.go +++ b/go/internal/api/datasets.go @@ -142,9 +142,10 @@ type DiagnosticsWarehouseResult struct { } type DiagnosticsFailedRowsResult struct { - Enabled bool `json:"enabled"` - MaxRowCount int `json:"maxRowCount"` - State string `json:"state"` + Enabled bool `json:"enabled"` + MaxRowCount int `json:"maxRowCount"` + State string `json:"state"` + UniqueKeyColumnNames []string `json:"uniqueKeyColumnNames"` } type DiagnosticsScanResult struct { @@ -165,7 +166,8 @@ func (c *Client) GetDatasetDiagnostics(datasetID string) (*DiagnosticsWarehouseR // POST /api/v1/datasets/{id}/diagnosticsWarehouse request type DiagnosticsFailedRowsConfig struct { - Enabled *bool `json:"enabled,omitempty"` + Enabled *bool `json:"enabled,omitempty"` + UniqueKeyColumnNames []string `json:"uniqueKeyColumnNames,omitempty"` } type DiagnosticsScanConfig struct { From 8f5863741b902f3f6e23b0781a1a3ee2f5e6f880 Mon Sep 17 00:00:00 2001 From: Tyler Adkins Date: Fri, 24 Apr 2026 15:41:02 -0400 Subject: [PATCH 3/5] feat: optional failed rows collection in dataset onboard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an optional step to `dataset onboard` that enables failed rows collection with user-specified unique key columns. Covered by new --collect-failed-rows / --no-collect-failed-rows and --unique-keys flags, and by a new interactive prompt when no flags are given. --unique-keys alone implies --collect-failed-rows. Enabling failed rows also enables scan results collection (required by the API). Datasource-level diagnostics must be set up first — if it isn't, the step warns and continues rather than aborting the onboard. Co-Authored-By: Claude Opus 4.7 (1M context) --- go/cmd/dataset_onboard.go | 122 +++++++++++++++++++++++++++++--------- 1 file changed, 93 insertions(+), 29 deletions(-) diff --git a/go/cmd/dataset_onboard.go b/go/cmd/dataset_onboard.go index 5dbc540..a4c9585 100644 --- a/go/cmd/dataset_onboard.go +++ b/go/cmd/dataset_onboard.go @@ -26,6 +26,7 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: hasMonitoring := cmd.Flags().Changed("monitoring") || cmd.Flags().Changed("no-monitoring") hasProfiling := cmd.Flags().Changed("profiling") || cmd.Flags().Changed("no-profiling") hasContracts := cmd.Flags().Changed("contracts") + hasFailedRows := cmd.Flags().Changed("collect-failed-rows") || cmd.Flags().Changed("no-collect-failed-rows") || cmd.Flags().Changed("unique-keys") noInteractive := GCtx.NoInteractive || (hasMonitoring && hasProfiling && hasContracts) enableMonitoring, _ := cmd.Flags().GetBool("monitoring") @@ -33,6 +34,8 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: enableProfiling, _ := cmd.Flags().GetBool("profiling") noProfiling, _ := cmd.Flags().GetBool("no-profiling") contractsMode, _ := cmd.Flags().GetString("contracts") + enableCollectFailedRows, _ := cmd.Flags().GetBool("collect-failed-rows") + uniqueKeys, _ := cmd.Flags().GetStringSlice("unique-keys") client, err := newAPIClient() if err != nil { @@ -96,7 +99,7 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: // ── Determine settings ────────────────────────────────────────────── - if !hasMonitoring && !hasProfiling && !hasContracts { + if !hasMonitoring && !hasProfiling && !hasContracts && !hasFailedRows { if noInteractive { return output.Errorf(2, "flags required in non-interactive mode: --monitoring/--no-monitoring, --profiling/--no-profiling, --contracts copilot|skeleton|none") } @@ -104,39 +107,67 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: monitoringChoice := "yes" profilingChoice := "yes" contractChoice := "none" + failedRowsChoice := "no" + uniqueKeysInput := "" - form := huh.NewForm(huh.NewGroup( - huh.NewSelect[string](). - Title("Enable default metric monitoring?"). - Description("Row count, row count change, freshness, schema changes,\npartition row count, most recent timestamp."). - Options( - huh.NewOption("Yes", "yes"), - huh.NewOption("No", "no"), - ). - Value(&monitoringChoice), - huh.NewSelect[string](). - Title("Enable dataset profiling?"). - Description("Column stats, row counts, and data type distribution."). - Options( - huh.NewOption("Yes", "yes"), - huh.NewOption("No", "no"), - ). - Value(&profilingChoice), - huh.NewSelect[string](). - Title("Set up a data contract?"). - Options( - huh.NewOption("AI-generated contract (Copilot)", "copilot"), - huh.NewOption("Skeleton contract (empty template)", "skeleton"), - huh.NewOption("No contract", "none"), - ). - Value(&contractChoice), - )) + form := huh.NewForm( + huh.NewGroup( + huh.NewSelect[string](). + Title("Enable default metric monitoring?"). + Description("Row count, row count change, freshness, schema changes,\npartition row count, most recent timestamp."). + Options( + huh.NewOption("Yes", "yes"), + huh.NewOption("No", "no"), + ). + Value(&monitoringChoice), + huh.NewSelect[string](). + Title("Enable dataset profiling?"). + Description("Column stats, row counts, and data type distribution."). + Options( + huh.NewOption("Yes", "yes"), + huh.NewOption("No", "no"), + ). + Value(&profilingChoice), + huh.NewSelect[string](). + Title("Set up a data contract?"). + Options( + huh.NewOption("AI-generated contract (Copilot)", "copilot"), + huh.NewOption("Skeleton contract (empty template)", "skeleton"), + huh.NewOption("No contract", "none"), + ). + Value(&contractChoice), + huh.NewSelect[string](). + Title("Enable failed rows collection?"). + Description("Store rows that fail checks in the diagnostics warehouse.\nRequires unique key columns."). + Options( + huh.NewOption("Yes", "yes"), + huh.NewOption("No", "no"), + ). + Value(&failedRowsChoice), + ), + huh.NewGroup( + huh.NewInput(). + Title("Unique key columns"). + Description("Comma-separated list, e.g. id,customer_email"). + Value(&uniqueKeysInput), + ).WithHideFunc(func() bool { + return failedRowsChoice != "yes" + }), + ) if err := form.Run(); err != nil { return output.Errorf(2, "onboarding cancelled") } enableMonitoring = monitoringChoice == "yes" enableProfiling = profilingChoice == "yes" contractsMode = contractChoice + enableCollectFailedRows = failedRowsChoice == "yes" + if enableCollectFailedRows { + for _, k := range strings.Split(uniqueKeysInput, ",") { + if k = strings.TrimSpace(k); k != "" { + uniqueKeys = append(uniqueKeys, k) + } + } + } } else { if noMonitoring { enableMonitoring = false @@ -147,6 +178,14 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: if contractsMode == "" { contractsMode = "none" } + // Treat --unique-keys alone as implicit --collect-failed-rows. + if len(uniqueKeys) > 0 { + enableCollectFailedRows = true + } + } + + if enableCollectFailedRows && len(uniqueKeys) == 0 { + return output.Errorf(2, "--unique-keys is required when --collect-failed-rows is set (failed rows collection won't work without unique key columns)") } // ── Execute ───────────────────────────────────────────────────────── @@ -172,7 +211,29 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: fmt.Println(output.Dim.Render(" Skipping monitoring and profiling setup.")) } - // Step 2: Contracts + // Step 2: Failed rows collection + if enableCollectFailedRows { + fmt.Println(output.Dim.Render(" Enabling failed rows collection...")) + enabled := true + cfg := api.DiagnosticsWarehouseConfig{ + ScanAndResultsConfiguration: &api.DiagnosticsScanConfig{Enabled: &enabled}, + FailedRowsConfiguration: &api.DiagnosticsFailedRowsConfig{ + Enabled: &enabled, + UniqueKeyColumnNames: uniqueKeys, + }, + } + if _, err := client.UpdateDatasetDiagnostics(datasetID, cfg); err != nil { + fmt.Fprintf(os.Stderr, " %s Could not enable failed rows collection: %v\n", output.Yellow.Render("⚠"), err) + if isNotEnabledOnDatasource(err) { + fmt.Fprintf(os.Stderr, " %s\n", output.Dim.Render("Set up the diagnostics warehouse on the datasource first:")) + fmt.Fprintf(os.Stderr, " %s\n", output.Dim.Render(" sodacli datasource diagnostics --enable")) + } + } else { + fmt.Println(output.Green.Render(" ✓") + fmt.Sprintf(" Failed rows collection enabled (keys: %s).", strings.Join(uniqueKeys, ", "))) + } + } + + // Step 3: Contracts var contractFile string switch contractsMode { case "copilot": @@ -203,7 +264,7 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: return output.Errorf(2, "unknown contracts mode '%s' — use copilot, skeleton, or none", contractsMode) } - // Step 3: Verify contract + // Step 4: Verify contract if contractFile != "" { fmt.Println() fmt.Println(output.Dim.Render(" Verifying contract...")) @@ -234,6 +295,9 @@ func init() { datasetOnboardCmd.Flags().Bool("profiling", false, "Enable dataset profiling") datasetOnboardCmd.Flags().Bool("no-profiling", false, "Skip profiling setup") datasetOnboardCmd.Flags().String("contracts", "", "Generate contract: copilot|skeleton|none") + datasetOnboardCmd.Flags().Bool("collect-failed-rows", false, "Enable failed rows collection (requires --unique-keys)") + datasetOnboardCmd.Flags().Bool("no-collect-failed-rows", false, "Skip failed rows collection setup") + datasetOnboardCmd.Flags().StringSlice("unique-keys", nil, "Unique key columns for failed rows collection (comma-separated or repeated)") datasetCmd.AddCommand(datasetOnboardCmd) } From d8fb4e2accdea4ae604668fd80045f47e8d457a3 Mon Sep 17 00:00:00 2001 From: Tyler Adkins Date: Fri, 1 May 2026 22:10:03 -0400 Subject: [PATCH 4/5] fix: avoid double datasource prefix in qualifiedName for promote-on-the-fly path DiscoveredDataset.QualifiedName from /api/v1/discoveredDatasets already contains the datasource prefix (e.g. "sf_nable/SODA_CE/SCHEMA/TABLE"), while Dataset.QualifiedName from /api/v1/datasets/{id} does not (e.g. "SODA_CE.SCHEMA.TABLE"). The promote-on-the-fly path was using foundDiscovered.QualifiedName and prepending the datasource name, which produced a doubled prefix like "sf_nable/sf_nable/SODA_CE/SCHEMA/TABLE" and caused contract copilot/skeleton to fail with "datasets not found". Re-fetch via GetDataset(datasetID) after promotion so qualifiedName is built from the same Dataset shape used by the already-onboarded path. --- go/cmd/dataset_onboard.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/go/cmd/dataset_onboard.go b/go/cmd/dataset_onboard.go index a4c9585..d045a18 100644 --- a/go/cmd/dataset_onboard.go +++ b/go/cmd/dataset_onboard.go @@ -63,7 +63,7 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: if dsErr != nil { return dsErr } - var foundDatasourceID, foundDatasourceName string + var foundDatasourceID string var foundDiscovered *api.DiscoveredDataset for _, ds := range dsPage.Content { discPage, discErr := client.ListDiscoveredDatasets(ds.ID, 0, 500) @@ -74,7 +74,6 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: d := &discPage.Content[i] if d.ID == datasetID { foundDatasourceID = ds.ID - foundDatasourceName = ds.Name foundDiscovered = d break } @@ -92,8 +91,15 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: }); err != nil { return err } - datasetName = foundDiscovered.Name - qualifiedName = foundDatasourceName + "/" + strings.ReplaceAll(foundDiscovered.QualifiedName, ".", "/") + // Re-fetch via the standard endpoint so qualifiedName matches the + // format used by the already-onboarded path (DiscoveredDataset + // includes the datasource prefix; Dataset does not). + detail, err := client.GetDataset(datasetID) + if err != nil { + return err + } + datasetName = detail.Name + qualifiedName = detail.Datasource.Name + "/" + strings.ReplaceAll(detail.QualifiedName, ".", "/") } fmt.Printf(" Dataset: %s\n\n", output.Bold.Render(datasetName)) From edf753fa5facb5d42ca1baa9a9e29b97b33f84d2 Mon Sep 17 00:00:00 2001 From: Tyler Adkins Date: Sat, 2 May 2026 23:13:08 -0400 Subject: [PATCH 5/5] feat(dataset onboard): bulk mode via repeatable --dataset flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds bulk onboarding for multiple datasets in one command. Matches the existing soda-cli convention for bulk inputs (repeatable StringArray flag, single positional kept for the common single-dataset case). sodacli dataset onboard # single (interactive) sodacli dataset onboard --dataset --dataset \ --monitoring --no-profiling --contracts copilot # bulk Behavior: - Bulk mode requires --monitoring/--no-monitoring, --profiling/--no-profiling, and --contracts (no interactive form for N datasets). - --collect-failed-rows / --unique-keys are rejected in bulk mode since unique keys are dataset-specific. - Promotion of discovered datasets is batched per datasource into a single OnboardDiscoveredDatasets call each. - Copilot generation is batched into one GenerateContract operation across all qualifiedNames (single poll, fetch each contract after completion). - Skeleton generation loops per-dataset (the API doesn't accept a list). - Each per-dataset failure is logged as a warning; the rest continue. Adds runContractCreateCopilotBulk helper alongside the existing single-dataset runContractCreateCopilot — same shape, polls one operation, fetches N contracts. --- go/cmd/contract.go | 73 ++++++++++ go/cmd/dataset_onboard.go | 273 +++++++++++++++++++++++++++----------- 2 files changed, 268 insertions(+), 78 deletions(-) diff --git a/go/cmd/contract.go b/go/cmd/contract.go index 9489b61..92224b0 100644 --- a/go/cmd/contract.go +++ b/go/cmd/contract.go @@ -533,6 +533,79 @@ func runContractCreateCopilot(client *api.Client, dataset, outFile string, noWai return nil } +// runContractCreateCopilotBulk submits one GenerateContract operation for N +// qualifiedNames, polls a single status, then fetches each contract and writes +// it to its own file (named via outFiles[qualifiedName]). Per-dataset failures +// are logged as warnings and don't abort the rest. Returns the list of files +// successfully written. +func runContractCreateCopilotBulk(client *api.Client, qualifiedNames []string, outFiles map[string]string, noWait bool) ([]string, error) { + if len(qualifiedNames) == 0 { + return nil, nil + } + opID, err := client.GenerateContract(api.GenerateContractRequest{ + DatasetQualifiedNames: qualifiedNames, + }) + if err != nil { + return nil, err + } + + if noWait { + fmt.Printf(" %s AI contract generation started for %d datasets.\n", output.Green.Render("✓"), len(qualifiedNames)) + fmt.Println(output.Dim.Render(" Running in background — contracts will appear in Soda Cloud when ready.")) + fmt.Println(output.Dim.Render(" Check results: sodacli results list")) + return nil, nil + } + + spinner := output.NewSpinner(fmt.Sprintf("Generating AI contracts for %d datasets...", len(qualifiedNames))) + spinner.Start() + + elapsed := 0 + for { + time.Sleep(3 * time.Second) + elapsed += 3 + status, err := client.GetGenerateStatus(opID) + if err != nil { + spinner.Stop() + return nil, err + } + if status.State == "completed" { + break + } + if status.State == "failed" || status.State == "canceled" { + spinner.Stop() + return nil, output.Errorf(2, "AI generation %s", status.State) + } + spinner.SetMessage(fmt.Sprintf("Generating AI contracts for %d datasets... (%ds)", len(qualifiedNames), elapsed)) + } + spinner.Stop() + + written := make([]string, 0, len(qualifiedNames)) + for _, qn := range qualifiedNames { + contract, err := client.FindContractByDataset(qn) + if err != nil { + fmt.Fprintf(os.Stderr, " %s [%s] could not fetch contract: %v\n", output.Yellow.Render("⚠"), qn, err) + continue + } + if contract == nil { + fmt.Fprintf(os.Stderr, " %s [%s] AI generation completed but contract was not persisted.\n", output.Yellow.Render("⚠"), qn) + continue + } + outFile := outFiles[qn] + if outFile == "" { + outFile = datasetFileName(qn) + } + if err := os.WriteFile(outFile, []byte(contract.Contents), 0644); err != nil { + fmt.Fprintf(os.Stderr, " %s [%s] could not write file: %v\n", output.Yellow.Render("⚠"), qn, err) + continue + } + written = append(written, outFile) + } + if len(written) > 0 { + output.PrintSuccess(fmt.Sprintf("Wrote %d AI-generated contract(s).", len(written)), GCtx) + } + return written, nil +} + // ── contract copilot ────────────────────────────────────────────────────────── var contractCopilotCmd = &cobra.Command{ diff --git a/go/cmd/dataset_onboard.go b/go/cmd/dataset_onboard.go index d045a18..061e863 100644 --- a/go/cmd/dataset_onboard.go +++ b/go/cmd/dataset_onboard.go @@ -12,17 +12,59 @@ import ( "github.com/soda-data-inc/soda-cli/internal/output" ) +// datasetInfo carries everything we need about each dataset between resolve +// and execute phases. +type datasetInfo struct { + ID string + Name string + QualifiedName string // canonical "datasource/db/schema/table" + Onboarded bool + DatasourceID string // populated only when promotion is needed +} + var datasetOnboardCmd = &cobra.Command{ - Use: "onboard ", - Short: "Guided setup: enable monitors, profiling and contracts for a dataset", - Long: `Set up a dataset with default monitors, profiling and optionally generate a contract. + Use: "onboard [dataset-id]", + Short: "Guided setup: enable monitors, profiling and contracts for one or more datasets", + Long: `Set up one or more datasets with default monitors, profiling and optionally generate contracts. + +Single-dataset mode walks through interactive prompts: + + sodacli dataset onboard -Interactive mode walks through each step. Use flags for CI/CD or AI agents: +Bulk mode (multiple datasets via --dataset, repeatable) requires non-interactive flags: - sodacli dataset onboard --monitoring --profiling --contracts skeleton`, - Args: cobra.ExactArgs(1), + sodacli dataset onboard --dataset --dataset \ + --monitoring --no-profiling --contracts copilot + +Failed-rows collection (--collect-failed-rows / --unique-keys) is only supported +in single-dataset mode, since unique keys are dataset-specific.`, + Args: cobra.MaximumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { - datasetID := args[0] + // ── Collect dataset IDs (positional + repeatable --dataset) ───────── + ids := []string{} + if len(args) == 1 && strings.TrimSpace(args[0]) != "" { + ids = append(ids, strings.TrimSpace(args[0])) + } + extra, _ := cmd.Flags().GetStringArray("dataset") + for _, e := range extra { + if e = strings.TrimSpace(e); e != "" { + ids = append(ids, e) + } + } + seen := map[string]bool{} + dedup := ids[:0] + for _, id := range ids { + if !seen[id] { + seen[id] = true + dedup = append(dedup, id) + } + } + ids = dedup + if len(ids) == 0 { + return output.Errorf(2, "at least one dataset ID is required (positional or --dataset)") + } + bulk := len(ids) > 1 + hasMonitoring := cmd.Flags().Changed("monitoring") || cmd.Flags().Changed("no-monitoring") hasProfiling := cmd.Flags().Changed("profiling") || cmd.Flags().Changed("no-profiling") hasContracts := cmd.Flags().Changed("contracts") @@ -37,75 +79,135 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: enableCollectFailedRows, _ := cmd.Flags().GetBool("collect-failed-rows") uniqueKeys, _ := cmd.Flags().GetStringSlice("unique-keys") + // Bulk-mode constraints + if bulk { + if !hasMonitoring || !hasProfiling || !hasContracts { + return output.Errorf(2, "bulk mode (multiple datasets) requires --monitoring/--no-monitoring, --profiling/--no-profiling, and --contracts copilot|skeleton|none") + } + if hasFailedRows { + return output.Errorf(2, "--collect-failed-rows / --unique-keys are not supported in bulk mode (run dataset onboard one at a time for failed-rows setup, since unique keys are dataset-specific)") + } + } + client, err := newAPIClient() if err != nil { return err } - // Validate dataset exists - fmt.Println(output.Dim.Render(" Checking dataset...")) - datasets, err := client.ListDatasets(api.ListDatasetsParams{Size: 500}) - if err != nil { - return err + // ── Resolve all dataset IDs ───────────────────────────────────────── + fmt.Println(output.Dim.Render(fmt.Sprintf(" Checking %d dataset(s)...", len(ids)))) + infoByID := make(map[string]*datasetInfo, len(ids)) + for _, id := range ids { + infoByID[id] = &datasetInfo{ID: id} } - var datasetName string - var qualifiedName string - for _, d := range datasets.Content { - if d.ID == datasetID { - datasetName = d.Name - qualifiedName = d.Datasource.Name + "/" + strings.ReplaceAll(d.QualifiedName, ".", "/") + + // Sweep already-onboarded datasets via paginated ListDatasets. + unresolved := len(ids) + page := 0 + for unresolved > 0 { + datasets, err := client.ListDatasets(api.ListDatasetsParams{Size: 500, Page: page}) + if err != nil { + return err + } + for _, d := range datasets.Content { + if i, ok := infoByID[d.ID]; ok && !i.Onboarded { + i.Name = d.Name + i.QualifiedName = d.Datasource.Name + "/" + strings.ReplaceAll(d.QualifiedName, ".", "/") + i.Onboarded = true + unresolved-- + } + } + if datasets.Last || len(datasets.Content) == 0 { break } + page++ } - if datasetName == "" { - // Not onboarded yet — look across discovered datasets and promote on the fly. + + // Anything still unresolved → look across discovered datasets per datasource. + if unresolved > 0 { dsPage, dsErr := client.ListDatasources(0, 500) if dsErr != nil { return dsErr } - var foundDatasourceID string - var foundDiscovered *api.DiscoveredDataset for _, ds := range dsPage.Content { + if unresolved == 0 { + break + } discPage, discErr := client.ListDiscoveredDatasets(ds.ID, 0, 500) if discErr != nil { continue } for i := range discPage.Content { d := &discPage.Content[i] - if d.ID == datasetID { - foundDatasourceID = ds.ID - foundDiscovered = d - break + if info, ok := infoByID[d.ID]; ok && !info.Onboarded && info.DatasourceID == "" { + info.DatasourceID = ds.ID + info.Name = d.Name + unresolved-- } } - if foundDiscovered != nil { - break - } } - if foundDiscovered == nil { - return output.Errorf(2, "dataset '%s' not found", datasetID) + } + + var notFound []string + for _, id := range ids { + if !infoByID[id].Onboarded && infoByID[id].DatasourceID == "" { + notFound = append(notFound, id) } - fmt.Println(output.Dim.Render(" Onboarding dataset...")) - if err := client.OnboardDiscoveredDatasets(foundDatasourceID, api.OnboardDatasetsRequest{ - DiscoveredDatasetIDs: []string{datasetID}, - }); err != nil { - return err + } + if len(notFound) > 0 { + return output.Errorf(2, "dataset(s) not found: %s", strings.Join(notFound, ", ")) + } + + // ── Promote any not-yet-onboarded datasets, batched per datasource ── + toPromoteByDS := map[string][]string{} + for _, id := range ids { + if !infoByID[id].Onboarded { + toPromoteByDS[infoByID[id].DatasourceID] = append(toPromoteByDS[infoByID[id].DatasourceID], id) + } + } + if len(toPromoteByDS) > 0 { + n := 0 + for _, v := range toPromoteByDS { + n += len(v) } - // Re-fetch via the standard endpoint so qualifiedName matches the - // format used by the already-onboarded path (DiscoveredDataset + fmt.Println(output.Dim.Render(fmt.Sprintf(" Onboarding %d discovered dataset(s)...", n))) + for dsID, idList := range toPromoteByDS { + if err := client.OnboardDiscoveredDatasets(dsID, api.OnboardDatasetsRequest{ + DiscoveredDatasetIDs: idList, + }); err != nil { + return err + } + } + // Re-fetch each via the standard endpoint so qualifiedName matches + // the format used by the already-onboarded path (DiscoveredDataset // includes the datasource prefix; Dataset does not). - detail, err := client.GetDataset(datasetID) - if err != nil { - return err + for _, id := range ids { + if infoByID[id].Onboarded { + continue + } + detail, err := client.GetDataset(id) + if err != nil { + return err + } + infoByID[id].Name = detail.Name + infoByID[id].QualifiedName = detail.Datasource.Name + "/" + strings.ReplaceAll(detail.QualifiedName, ".", "/") + infoByID[id].Onboarded = true } - datasetName = detail.Name - qualifiedName = detail.Datasource.Name + "/" + strings.ReplaceAll(detail.QualifiedName, ".", "/") } - fmt.Printf(" Dataset: %s\n\n", output.Bold.Render(datasetName)) - // ── Determine settings ────────────────────────────────────────────── + // Print resolved datasets + if bulk { + fmt.Printf(" Datasets (%d):\n", len(ids)) + for _, id := range ids { + fmt.Printf(" • %s\n", infoByID[id].Name) + } + fmt.Println() + } else { + fmt.Printf(" Dataset: %s\n\n", output.Bold.Render(infoByID[ids[0]].Name)) + } - if !hasMonitoring && !hasProfiling && !hasContracts && !hasFailedRows { + // ── Determine settings (interactive form only valid for single-dataset) ── + if !bulk && !hasMonitoring && !hasProfiling && !hasContracts && !hasFailedRows { if noInteractive { return output.Errorf(2, "flags required in non-interactive mode: --monitoring/--no-monitoring, --profiling/--no-profiling, --contracts copilot|skeleton|none") } @@ -184,8 +286,8 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: if contractsMode == "" { contractsMode = "none" } - // Treat --unique-keys alone as implicit --collect-failed-rows. - if len(uniqueKeys) > 0 { + // Treat --unique-keys alone as implicit --collect-failed-rows (single-mode only). + if !bulk && len(uniqueKeys) > 0 { enableCollectFailedRows = true } } @@ -196,7 +298,7 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: // ── Execute ───────────────────────────────────────────────────────── - // Step 1: Monitoring + Profiling + // Step 1: Monitoring + Profiling (per-dataset API call) if enableMonitoring || enableProfiling { label := "" switch { @@ -208,17 +310,23 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: label = "Enabling dataset profiling..." } fmt.Println(output.Dim.Render(" " + label)) - if err := client.EnableDatasetDefaults(datasetID, enableMonitoring, enableProfiling); err != nil { - fmt.Fprintf(os.Stderr, " %s Could not enable settings: %v\n", output.Yellow.Render("⚠"), err) - } else { + var hadErr bool + for _, id := range ids { + if err := client.EnableDatasetDefaults(id, enableMonitoring, enableProfiling); err != nil { + fmt.Fprintf(os.Stderr, " %s [%s] %v\n", output.Yellow.Render("⚠"), infoByID[id].Name, err) + hadErr = true + } + } + if !hadErr { fmt.Println(output.Green.Render(" ✓") + " " + label[:len(label)-3] + "d.") } } else { fmt.Println(output.Dim.Render(" Skipping monitoring and profiling setup.")) } - // Step 2: Failed rows collection + // Step 2: Failed rows (single-mode only — bulk-mode constraint above blocks this) if enableCollectFailedRows { + id := ids[0] fmt.Println(output.Dim.Render(" Enabling failed rows collection...")) enabled := true cfg := api.DiagnosticsWarehouseConfig{ @@ -228,7 +336,7 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: UniqueKeyColumnNames: uniqueKeys, }, } - if _, err := client.UpdateDatasetDiagnostics(datasetID, cfg); err != nil { + if _, err := client.UpdateDatasetDiagnostics(id, cfg); err != nil { fmt.Fprintf(os.Stderr, " %s Could not enable failed rows collection: %v\n", output.Yellow.Render("⚠"), err) if isNotEnabledOnDatasource(err) { fmt.Fprintf(os.Stderr, " %s\n", output.Dim.Render("Set up the diagnostics warehouse on the datasource first:")) @@ -240,28 +348,30 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: } // Step 3: Contracts - var contractFile string + var contractFiles []string switch contractsMode { case "copilot": - if qualifiedName == "" { - fmt.Fprintf(os.Stderr, " %s Cannot generate AI contract: dataset qualified name not available.\n", output.Yellow.Render("⚠")) + qns := make([]string, 0, len(ids)) + outFiles := make(map[string]string, len(ids)) + for _, id := range ids { + qn := infoByID[id].QualifiedName + qns = append(qns, qn) + outFiles[qn] = datasetFileName(qn) + } + files, err := runContractCreateCopilotBulk(client, qns, outFiles, false) + if err != nil { + fmt.Fprintf(os.Stderr, " %s Contract generation failed: %v\n", output.Yellow.Render("⚠"), err) } else { - outFile := datasetFileName(qualifiedName) - if err := runContractCreateCopilot(client, qualifiedName, outFile, false); err != nil { - fmt.Fprintf(os.Stderr, " %s Contract generation failed: %v\n", output.Yellow.Render("⚠"), err) - } else { - contractFile = outFile - } + contractFiles = append(contractFiles, files...) } case "skeleton": - if qualifiedName == "" { - fmt.Fprintf(os.Stderr, " %s Cannot generate skeleton contract: dataset qualified name not available.\n", output.Yellow.Render("⚠")) - } else { - outFile := datasetFileName(qualifiedName) - if err := runContractCreateSkeleton(client, qualifiedName, outFile); err != nil { - fmt.Fprintf(os.Stderr, " %s Contract generation failed: %v\n", output.Yellow.Render("⚠"), err) + for _, id := range ids { + qn := infoByID[id].QualifiedName + outFile := datasetFileName(qn) + if err := runContractCreateSkeleton(client, qn, outFile); err != nil { + fmt.Fprintf(os.Stderr, " %s [%s] Skeleton generation failed: %v\n", output.Yellow.Render("⚠"), infoByID[id].Name, err) } else { - contractFile = outFile + contractFiles = append(contractFiles, outFile) } } case "none": @@ -270,17 +380,23 @@ Interactive mode walks through each step. Use flags for CI/CD or AI agents: return output.Errorf(2, "unknown contracts mode '%s' — use copilot, skeleton, or none", contractsMode) } - // Step 4: Verify contract - if contractFile != "" { + // Step 4: Verify contracts + if len(contractFiles) > 0 { fmt.Println() - fmt.Println(output.Dim.Render(" Verifying contract...")) - if err := runContractVerify(client, contractFile, false); err != nil { - fmt.Fprintf(os.Stderr, " %s Verification failed: %v\n", output.Yellow.Render("⚠"), err) + fmt.Println(output.Dim.Render(fmt.Sprintf(" Verifying %d contract(s)...", len(contractFiles)))) + for _, f := range contractFiles { + if err := runContractVerify(client, f, false); err != nil { + fmt.Fprintf(os.Stderr, " %s [%s] Verification failed: %v\n", output.Yellow.Render("⚠"), f, err) + } } } fmt.Println() - output.PrintSuccess(fmt.Sprintf("Dataset '%s' onboarding complete.", datasetName), GCtx) + if bulk { + output.PrintSuccess(fmt.Sprintf("Onboarded %d datasets.", len(ids)), GCtx) + } else { + output.PrintSuccess(fmt.Sprintf("Dataset '%s' onboarding complete.", infoByID[ids[0]].Name), GCtx) + } return nil }, } @@ -296,14 +412,15 @@ func datasetFileName(qualifiedName string) string { } func init() { + datasetOnboardCmd.Flags().StringArray("dataset", nil, "Additional dataset ID to onboard (repeatable, enables bulk mode)") datasetOnboardCmd.Flags().Bool("monitoring", false, "Enable default metric monitors") datasetOnboardCmd.Flags().Bool("no-monitoring", false, "Skip monitoring setup") datasetOnboardCmd.Flags().Bool("profiling", false, "Enable dataset profiling") datasetOnboardCmd.Flags().Bool("no-profiling", false, "Skip profiling setup") datasetOnboardCmd.Flags().String("contracts", "", "Generate contract: copilot|skeleton|none") - datasetOnboardCmd.Flags().Bool("collect-failed-rows", false, "Enable failed rows collection (requires --unique-keys)") + datasetOnboardCmd.Flags().Bool("collect-failed-rows", false, "Enable failed rows collection (single-dataset only; requires --unique-keys)") datasetOnboardCmd.Flags().Bool("no-collect-failed-rows", false, "Skip failed rows collection setup") - datasetOnboardCmd.Flags().StringSlice("unique-keys", nil, "Unique key columns for failed rows collection (comma-separated or repeated)") + datasetOnboardCmd.Flags().StringSlice("unique-keys", nil, "Unique key columns for failed rows collection (single-dataset only; comma-separated or repeated)") datasetCmd.AddCommand(datasetOnboardCmd) }