diff --git a/internal/config/config.go b/internal/config/config.go index 7101002..ee88714 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -25,6 +25,11 @@ var ( HTMLOutputFile string // "" means not set LogLevel string // "" means default (info); one of error/warn/info/debug InstallDir string // "" means default (~/.stepsecurity); non-empty makes the agent put all its files (logs, hook errors, future state) under this directory. Bootstrap config.json itself stays at the legacy location. Per-run opt-out is the CLI flag --install-dir=. Resolution: --install-dir flag > STEPSECURITY_HOME env > this field > default — see internal/paths. + // UseLegacyPackageScan, when true, disables the scan-state delta-upload + // optimization for npm and Python project scans — every run re-uploads + // the full snapshot as in pre-1.13 agents. Default false = optimized. + // Environment override: STEPSEC_DISABLE_SCAN_STATE=1 takes precedence. + UseLegacyPackageScan bool ) // MaxExecutionDuration is the whole-process execution-watchdog limit @@ -54,6 +59,7 @@ type ConfigFile struct { LogLevel string `json:"log_level,omitempty"` InstallDir string `json:"install_dir,omitempty"` MaxExecutionDuration string `json:"max_execution_duration,omitempty"` + UseLegacyPackageScan bool `json:"use_legacy_package_scan,omitempty"` } // userConfigDir returns ~/.stepsecurity — the per-user config location. @@ -186,6 +192,9 @@ func Load() { if cfg.MaxExecutionDuration != "" && MaxExecutionDuration == "" { MaxExecutionDuration = cfg.MaxExecutionDuration } + if cfg.UseLegacyPackageScan { + UseLegacyPackageScan = true + } } // IsEnterpriseMode returns true if valid enterprise credentials are configured. diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 696433c..32c0473 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -3,6 +3,7 @@ package config import ( "bytes" "encoding/json" + "os" "path/filepath" "testing" ) @@ -181,3 +182,57 @@ func TestConfigFile_InstallDir_JSONRoundTrip(t *testing.T) { t.Errorf("empty install_dir should be omitted: %s", data) } } + +func TestConfigFile_UseLegacyPackageScan_JSONRoundTrip(t *testing.T) { + in := ConfigFile{UseLegacyPackageScan: true} + data, err := json.Marshal(in) + if err != nil { + t.Fatal(err) + } + if !bytes.Contains(data, []byte(`"use_legacy_package_scan":true`)) { + t.Errorf("use_legacy_package_scan not serialized: %s", data) + } + + var out ConfigFile + if err := json.Unmarshal(data, &out); err != nil { + t.Fatal(err) + } + if !out.UseLegacyPackageScan { + t.Errorf("UseLegacyPackageScan round-trip = false, want true") + } + + // False/absent is omitted (default optimized path is the unmarked state). + empty := ConfigFile{} + data, err = json.Marshal(empty) + if err != nil { + t.Fatal(err) + } + if bytes.Contains(data, []byte("use_legacy_package_scan")) { + t.Errorf("default-false should be omitted: %s", data) + } +} + +func TestLoad_UseLegacyPackageScan_AppliedFromFile(t *testing.T) { + // Save and restore package var. + prev := UseLegacyPackageScan + t.Cleanup(func() { UseLegacyPackageScan = prev }) + UseLegacyPackageScan = false + + dir := t.TempDir() + t.Setenv("HOME", dir) + t.Setenv("STEPSECURITY_HOME", dir) + cfgPath := filepath.Join(dir, ".stepsecurity", "config.json") + if err := os.MkdirAll(filepath.Dir(cfgPath), 0o755); err != nil { + t.Fatal(err) + } + body := []byte(`{"use_legacy_package_scan":true}`) + if err := os.WriteFile(cfgPath, body, 0o600); err != nil { + t.Fatal(err) + } + + Load() + + if !UseLegacyPackageScan { + t.Errorf("Load did not propagate use_legacy_package_scan from config.json") + } +} diff --git a/internal/detector/nodescan.go b/internal/detector/nodescan.go index a5a3b59..1b2bf27 100644 --- a/internal/detector/nodescan.go +++ b/internal/detector/nodescan.go @@ -9,8 +9,10 @@ import ( "sort" "strconv" "strings" + "sync" "time" + "github.com/step-security/dev-machine-guard/internal/buildinfo" "github.com/step-security/dev-machine-guard/internal/executor" "github.com/step-security/dev-machine-guard/internal/model" "github.com/step-security/dev-machine-guard/internal/progress" @@ -47,8 +49,11 @@ type NodeScanner struct { // per project; this cache collapses that to one lookup per distinct // PM. A scanner is created once per telemetry run (see // internal/telemetry/telemetry.go), so the cache's effective scope - // matches a single scan even though the map isn't reset. - pmAvailability map[string]error + // matches a single scan even though the map isn't reset. Mutex-guarded + // because the worker pool in scanProjectsConcurrent reads/writes this + // map from multiple goroutines. + pmAvailability map[string]error + pmAvailabilityMu sync.Mutex } func NewNodeScanner(exec executor.Executor, log *progress.Logger, loggedInUser string) *NodeScanner { @@ -65,9 +70,13 @@ func NewNodeScanner(exec executor.Executor, log *progress.Logger, loggedInUser s // the per-project loop don't pay a LookPath per project on devices that // have hundreds of lockfiles for a PM that isn't installed. func (s *NodeScanner) binaryAvailable(ctx context.Context, name string) error { + s.pmAvailabilityMu.Lock() if err, ok := s.pmAvailability[name]; ok { + s.pmAvailabilityMu.Unlock() return err } + s.pmAvailabilityMu.Unlock() + err := s.checkPath(ctx, name) if err != nil { // Logged once per PM (cache miss). "Not on PATH" is a normal @@ -76,7 +85,9 @@ func (s *NodeScanner) binaryAvailable(ctx context.Context, name string) error { // (send the Debug header) instead of an unexplained absence. s.log.Debug("%s not found in PATH (delegated=%v) — projects using it will be skipped: %v", name, s.shouldRunAsUser(), err) } + s.pmAvailabilityMu.Lock() s.pmAvailability[name] = err + s.pmAvailabilityMu.Unlock() return err } @@ -388,10 +399,17 @@ type projectEntry struct { modTime int64 } -// ScanProjects finds package.json files, sorts by most recently modified, then scans. -// Respects the size limit (default 500MB, override via STEPSEC_MAX_NODE_SCAN_BYTES). -func (s *NodeScanner) ScanProjects(ctx context.Context, searchDirs []string) []model.NodeScanResult { - // Phase 1: Discover all package.json files +// ScanProjects finds package.json files and scans them within the size cap. +// +// Ordering: never-before-seen projects (paths absent from knownLastVerified) +// come first, sorted by mtime descending. Already-known projects follow, +// sorted by their LastVerifiedAt ascending so the stalest are re-checked +// first. Pass a nil map for plain mtime-descending order. +// +// The second return is every project directory discovered on disk (before +// the cap), so callers can distinguish "missing from disk" from "dropped by +// the cap" when comparing against prior state. +func (s *NodeScanner) ScanProjects(ctx context.Context, searchDirs []string, knownLastVerified map[string]time.Time) (results []model.NodeScanResult, discovered []string) { var projects []projectEntry for _, dir := range searchDirs { s.log.Progress(" Searching in: %s", dir) @@ -417,7 +435,6 @@ func (s *NodeScanner) ScanProjects(ctx context.Context, searchDirs []string) []m if isInsideNodeModules(projectDir) { return nil } - // Get modification time for sorting modTime := int64(0) if info, err := entry.Info(); err == nil { modTime = info.ModTime().Unix() @@ -429,62 +446,180 @@ func (s *NodeScanner) ScanProjects(ctx context.Context, searchDirs []string) []m s.log.Debug("node project discovery: found %d package.json files across %d search dir(s)", len(projects), len(searchDirs)) - // Phase 2: Sort by modification time descending (most recent first) - sort.Slice(projects, func(i, j int) bool { - return projects[i].modTime > projects[j].modTime - }) + discovered = make([]string, 0, len(projects)) + for _, p := range projects { + discovered = append(discovered, p.dir) + } - // Phase 3: Scan in order, respecting limits - maxBytes := getMaxProjectScanBytes() - var results []model.NodeScanResult - totalSize := int64(0) + projects = orderScanProjects(projects, knownLastVerified) - totalProjects := len(projects) - if totalProjects > maxNodeProjects { - totalProjects = maxNodeProjects + if len(projects) > maxNodeProjects { + s.log.Warn("Node project scan truncated at %d projects (total discovered: %d) — lowest-priority projects were skipped", maxNodeProjects, len(projects)) + projects = projects[:maxNodeProjects] } - for i, p := range projects { - if i >= maxNodeProjects { - s.log.Progress(" Reached maximum of %d projects, stopping search", maxNodeProjects) - s.log.Warn("Node project scan truncated at %d projects (total discovered: %d) — oldest projects were skipped", maxNodeProjects, len(projects)) - break - } - if totalSize > maxBytes { + + results = s.scanProjectsConcurrent(ctx, projects) + + maxBytes := getMaxProjectScanBytes() + totalSize := int64(0) + capped := make([]model.NodeScanResult, 0, len(results)) + for _, r := range results { + resultSize := int64(len(r.RawStdoutBase64)) + int64(len(r.RawStderrBase64)) + if totalSize+resultSize > maxBytes { s.log.Warn("Reached data size limit (%d bytes collected, limit: %d bytes)", totalSize, maxBytes) s.log.Warn("Skipping remaining projects (prioritized by most recently modified)") break } + totalSize += resultSize + capped = append(capped, r) + } - // Per-project sub-progress for the heartbeat goroutine. Surfaces - // to console as "current_phase_detail: project 12 of 47" so a - // stuck scan is visibly so, not just opaque "node_scan in progress". - s.emitProgress(fmt.Sprintf("project %d of %d", i+1, totalProjects)) + return capped, discovered +} - s.log.Progress(" Found project: %s", p.dir) - pm := DetectProjectPM(s.exec, p.dir) - s.log.Progress(" Package manager: %s", pm) +// scanProjectsConcurrent returns one NodeScanResult per project in the input +// order. Cache hits skip the PM CLI entirely; cache misses run through a +// bounded worker pool. Successful fresh scans are written back to the cache. +// Projects whose PM isn't installed on the device produce no result (skipped +// in the returned slice to match the legacy contract). +func (s *NodeScanner) scanProjectsConcurrent(ctx context.Context, projects []projectEntry) []model.NodeScanResult { + cachePath := scanCacheFile(s.exec) + cache := loadScanCache(cachePath) + bypassCache := s.exec.Getenv("STEPSEC_NODE_SCAN_CACHE_BYPASS") == "1" + nowUnix := time.Now().Unix() + + type slot struct { + result model.NodeScanResult + pm string + populated bool + fromCache bool + } + slots := make([]slot, len(projects)) + missIdx := make([]int, 0, len(projects)) - r, ok := s.scanProject(ctx, p.dir, pm) - if !ok { - // PM not installed on this device — not an error, just nothing - // to scan. Skip without emitting a telemetry record. + for i, p := range projects { + s.emitProgress(fmt.Sprintf("project %d of %d", i+1, len(projects))) + pm := DetectProjectPM(s.exec, p.dir) + slots[i].pm = pm + + entry, ok := cache.Projects[p.dir] + if ok && cacheValidFor(s.exec, entry, p.dir, pm, s.agentVersion(), bypassCache) { + s.log.Progress(" Cached: %s (%s)", p.dir, pm) + slots[i].result = entry.CachedResult + slots[i].populated = true + slots[i].fromCache = true continue } - resultSize := int64(len(r.RawStdoutBase64)) + int64(len(r.RawStderrBase64)) + missIdx = append(missIdx, i) + } + + if len(missIdx) > 0 { + jobs := make(chan int, len(missIdx)) + var wg sync.WaitGroup + workers := scanWorkerCount(s.exec) + for range workers { + wg.Add(1) + go func() { + defer wg.Done() + for idx := range jobs { + p := projects[idx] + pm := slots[idx].pm + s.log.Progress(" Scanning: %s (%s)", p.dir, pm) + r, ok := s.scanProject(ctx, p.dir, pm) + if !ok { + continue + } + slots[idx].result = r + slots[idx].populated = true + } + }() + } + for _, i := range missIdx { + jobs <- i + } + close(jobs) + wg.Wait() + } - if totalSize+resultSize > maxBytes { - s.log.Warn("Reached data size limit (%d bytes collected, limit: %d bytes)", totalSize, maxBytes) - s.log.Warn("Skipping remaining projects (prioritized by most recently modified)") - break + results := make([]model.NodeScanResult, 0, len(slots)) + for i, sl := range slots { + if !sl.populated { + continue } + results = append(results, sl.result) + if !sl.fromCache && sl.result.ExitCode == 0 { + cache.Projects[projects[i].dir] = scanCacheEntry{ + PackageManager: sl.pm, + LastScanUnix: nowUnix, + PackageJSONMtime: mtimeOr0(s.exec, filepath.Join(projects[i].dir, "package.json")), + LockfileMtime: mtimeOr0(s.exec, lockfileFor(s.exec, projects[i].dir, sl.pm)), + NodeModulesMtime: mtimeOr0(s.exec, filepath.Join(projects[i].dir, "node_modules")), + AgentVersion: s.agentVersion(), + CachedResult: sl.result, + } + } + } - totalSize += resultSize - results = append(results, r) + pruneCacheToDiscovered(cache, projects) + if err := cache.save(cachePath); err != nil { + s.log.Debug("node-scan-cache: save failed (%v) — next run will re-scan everything", err) } + s.log.Progress(" Scanned %d projects (%d cache hits)", len(missIdx), len(slots)-len(missIdx)) return results } +// pruneCacheToDiscovered drops cache entries for projects not present in the +// current discovery pass. Bounds the cache file at the device's current +// project set rather than growing unboundedly across runs. +func pruneCacheToDiscovered(cache *scanCache, projects []projectEntry) { + keep := make(map[string]struct{}, len(projects)) + for _, p := range projects { + keep[p.dir] = struct{}{} + } + for dir := range cache.Projects { + if _, ok := keep[dir]; !ok { + delete(cache.Projects, dir) + } + } +} + +// agentVersion returns the running agent's version, used as a cache key +// guard so post-upgrade runs always re-scan. +func (s *NodeScanner) agentVersion() string { + return buildinfo.Version +} + +// orderScanProjects sorts discovered projects so that paths absent from +// knownLastVerified (never-seen projects) come first by mtime descending, +// then known paths by LastVerifiedAt ascending (stalest first). A nil map +// degrades to the legacy mtime-descending order. +func orderScanProjects(projects []projectEntry, knownLastVerified map[string]time.Time) []projectEntry { + if len(knownLastVerified) == 0 { + sort.Slice(projects, func(i, j int) bool { + return projects[i].modTime > projects[j].modTime + }) + return projects + } + + unknown := make([]projectEntry, 0, len(projects)) + known := make([]projectEntry, 0, len(projects)) + for _, p := range projects { + if _, ok := knownLastVerified[p.dir]; ok { + known = append(known, p) + } else { + unknown = append(unknown, p) + } + } + sort.Slice(unknown, func(i, j int) bool { + return unknown[i].modTime > unknown[j].modTime + }) + sort.Slice(known, func(i, j int) bool { + return knownLastVerified[known[i].dir].Before(knownLastVerified[known[j].dir]) + }) + return append(unknown, known...) +} + // scanProject runs the project's detected package manager in the project // directory and returns the raw stdout/stderr as a NodeScanResult. The // second return is false when no record should be emitted — currently only diff --git a/internal/detector/nodescan_cache.go b/internal/detector/nodescan_cache.go new file mode 100644 index 0000000..e814f06 --- /dev/null +++ b/internal/detector/nodescan_cache.go @@ -0,0 +1,205 @@ +package detector + +import ( + "encoding/json" + "os" + "path/filepath" + "runtime" + "strconv" + + "github.com/step-security/dev-machine-guard/internal/executor" + "github.com/step-security/dev-machine-guard/internal/model" + "github.com/step-security/dev-machine-guard/internal/paths" +) + +// scanCacheVersion is bumped when the on-disk format changes. A loaded entry +// from a previous version is silently discarded — the next run pays a fresh +// scan rather than risking a misinterpreted cached body. +const scanCacheVersion = 2 + +// scanCacheEntry is one project's cached scan result keyed by directory. The +// three mtimes (package.json, lockfile, node_modules) are the invalidation +// signals: if any post-dates LastScanUnix the cache is stale and the PM CLI +// must run fresh. AgentVersion pins the entry to the binary that produced it +// so post-upgrade runs always re-scan. +type scanCacheEntry struct { + PackageManager string `json:"package_manager"` + LastScanUnix int64 `json:"last_scan_unix"` + PackageJSONMtime int64 `json:"package_json_mtime"` + LockfileMtime int64 `json:"lockfile_mtime"` + NodeModulesMtime int64 `json:"node_modules_mtime"` + AgentVersion string `json:"agent_version"` + CachedResult model.NodeScanResult `json:"cached_result"` +} + +type scanCache struct { + Version int `json:"version"` + Projects map[string]scanCacheEntry `json:"projects"` +} + +func newScanCache() *scanCache { + return &scanCache{Version: scanCacheVersion, Projects: map[string]scanCacheEntry{}} +} + +// scanCacheFile returns the cache path under paths.Home(). Returns "" when +// Home is disabled — caller must treat "" as "cache disabled, scan everything". +// Override with STEPSEC_NODE_SCAN_CACHE for tests. +func scanCacheFile(exec executor.Executor) string { + if override := exec.Getenv("STEPSEC_NODE_SCAN_CACHE"); override != "" { + return override + } + home := paths.Home() + if home == "" { + return "" + } + return filepath.Join(home, "node-scan-cache.json") +} + +// loadScanCache reads the cache file. Returns an empty cache on any failure +// (missing, parse error, schema mismatch) so a corrupt cache only forces a +// full scan, never breaks a run. +func loadScanCache(path string) *scanCache { + if path == "" { + return newScanCache() + } + data, err := os.ReadFile(filepath.Clean(path)) + if err != nil { + return newScanCache() + } + var c scanCache + if err := json.Unmarshal(data, &c); err != nil || c.Version != scanCacheVersion { + return newScanCache() + } + if c.Projects == nil { + c.Projects = map[string]scanCacheEntry{} + } + return &c +} + +func (c *scanCache) save(path string) error { + if path == "" { + return nil + } + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0o750); err != nil { + return err + } + data, err := json.Marshal(c) + if err != nil { + return err + } + tmp, err := os.CreateTemp(dir, ".node-scan-cache-*.tmp") + if err != nil { + return err + } + tmpPath := tmp.Name() + if _, err := tmp.Write(data); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpPath) + return err + } + if err := tmp.Sync(); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpPath) + return err + } + if err := tmp.Close(); err != nil { + _ = os.Remove(tmpPath) + return err + } + return os.Rename(tmpPath, path) +} + +// lockfileFor returns the path of the PM's lockfile in projectDir, or "" if +// the expected lockfile isn't present. Used as one of the cache invalidation +// signals. +func lockfileFor(exec executor.Executor, projectDir, pm string) string { + var names []string + switch pm { + case "npm": + names = []string{"package-lock.json"} + case "yarn", "yarn-berry": + names = []string{"yarn.lock"} + case "pnpm": + names = []string{"pnpm-lock.yaml"} + case "bun": + names = []string{"bun.lock", "bun.lockb"} + default: + return "" + } + for _, n := range names { + p := filepath.Join(projectDir, n) + if exec.FileExists(p) { + return p + } + } + return "" +} + +// mtimeOr0 returns the file's mtime in unix seconds, or 0 if it can't be +// stat'd. Used both for cache writes (capture current state) and cache reads +// (compare to LastScanUnix). +func mtimeOr0(exec executor.Executor, path string) int64 { + if path == "" { + return 0 + } + info, err := exec.Stat(path) + if err != nil { + return 0 + } + return info.ModTime().Unix() +} + +// cacheValidFor reports whether the cached entry can be reused for projectDir. +// Three guards: +// - PM unchanged (lockfile detection must agree with the cached entry) +// - Agent version unchanged (defensive against parsing-format drift) +// - All three mtimes ≤ LastScanUnix (package.json, lockfile, node_modules). +// The node_modules check catches `rm -rf node_modules/foo` cases where +// the lockfile alone doesn't move. +// +// `bypass` short-circuits to false. The caller passes true during forced +// full syncs so the wire-shipped bodies match what the PM CLI sees right now. +func cacheValidFor( + exec executor.Executor, entry scanCacheEntry, + projectDir, pm, agentVersion string, bypass bool, +) bool { + if bypass { + return false + } + if entry.PackageManager != pm { + return false + } + if entry.AgentVersion != agentVersion { + return false + } + lockPath := lockfileFor(exec, projectDir, pm) + if lockPath == "" { + // No lockfile means there's nothing reliable to mtime-check; always re-scan. + return false + } + pkgMt := mtimeOr0(exec, filepath.Join(projectDir, "package.json")) + lockMt := mtimeOr0(exec, lockPath) + nmMt := mtimeOr0(exec, filepath.Join(projectDir, "node_modules")) + return pkgMt <= entry.LastScanUnix && + lockMt <= entry.LastScanUnix && + nmMt <= entry.LastScanUnix +} + +// scanWorkerCount returns how many concurrent project scans to dispatch. +// min(NumCPU, 8) by default. Override with STEPSEC_NODE_SCAN_WORKERS. +func scanWorkerCount(exec executor.Executor) int { + if v := exec.Getenv("STEPSEC_NODE_SCAN_WORKERS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + return n + } + } + n := runtime.NumCPU() + if n > 8 { + n = 8 + } + if n < 1 { + n = 1 + } + return n +} diff --git a/internal/detector/nodescan_cache_test.go b/internal/detector/nodescan_cache_test.go new file mode 100644 index 0000000..2b6479f --- /dev/null +++ b/internal/detector/nodescan_cache_test.go @@ -0,0 +1,221 @@ +package detector + +import ( + "os" + "path/filepath" + "testing" + + "github.com/step-security/dev-machine-guard/internal/buildinfo" + "github.com/step-security/dev-machine-guard/internal/executor" + "github.com/step-security/dev-machine-guard/internal/model" +) + +func TestScanCache_RoundTrip(t *testing.T) { + path := filepath.Join(t.TempDir(), "node-scan-cache.json") + + c := newScanCache() + c.Projects["/app"] = scanCacheEntry{ + PackageManager: "npm", + LastScanUnix: 1700000000, + PackageJSONMtime: 1700000000, + LockfileMtime: 1700000000, + NodeModulesMtime: 1700000000, + AgentVersion: buildinfo.Version, + CachedResult: model.NodeScanResult{ + ProjectPath: "/app", + PackageManager: "npm", + RawStdoutBase64: "eyJkZXBzIjpbXX0=", + ExitCode: 0, + }, + } + if err := c.save(path); err != nil { + t.Fatalf("save: %v", err) + } + + loaded := loadScanCache(path) + if loaded.Version != scanCacheVersion { + t.Errorf("version: got %d, want %d", loaded.Version, scanCacheVersion) + } + entry, ok := loaded.Projects["/app"] + if !ok { + t.Fatal("missing /app entry after reload") + } + if entry.LastScanUnix != 1700000000 || entry.PackageManager != "npm" { + t.Errorf("entry mismatch: %+v", entry) + } +} + +func TestScanCache_MissReturnsEmpty(t *testing.T) { + c := loadScanCache(filepath.Join(t.TempDir(), "does-not-exist.json")) + if c == nil || c.Projects == nil { + t.Fatal("expected non-nil empty cache on miss") + } + if len(c.Projects) != 0 { + t.Errorf("expected empty projects map on miss, got %d entries", len(c.Projects)) + } +} + +func TestScanCache_CorruptReturnsEmpty(t *testing.T) { + path := filepath.Join(t.TempDir(), "node-scan-cache.json") + if err := os.WriteFile(path, []byte("not json"), 0o644); err != nil { + t.Fatal(err) + } + c := loadScanCache(path) + if len(c.Projects) != 0 { + t.Errorf("expected empty cache after corrupt read, got %d entries", len(c.Projects)) + } +} + +func TestScanCache_WrongVersionReturnsEmpty(t *testing.T) { + path := filepath.Join(t.TempDir(), "node-scan-cache.json") + if err := os.WriteFile(path, []byte(`{"version":999,"projects":{"/a":{"package_manager":"npm","last_scan_unix":1}}}`), 0o644); err != nil { + t.Fatal(err) + } + c := loadScanCache(path) + if len(c.Projects) != 0 { + t.Errorf("expected empty cache on version mismatch, got %d entries", len(c.Projects)) + } +} + +func TestLockfileFor(t *testing.T) { + mock := executor.NewMock() + mock.SetFile(filepath.Join("/proj-npm", "package-lock.json"), []byte{}) + mock.SetFile(filepath.Join("/proj-yarn", "yarn.lock"), []byte{}) + mock.SetFile(filepath.Join("/proj-pnpm", "pnpm-lock.yaml"), []byte{}) + mock.SetFile(filepath.Join("/proj-bun", "bun.lockb"), []byte{}) + + cases := []struct { + dir, pm, want string + }{ + {"/proj-npm", "npm", filepath.Join("/proj-npm", "package-lock.json")}, + {"/proj-yarn", "yarn", filepath.Join("/proj-yarn", "yarn.lock")}, + {"/proj-yarn", "yarn-berry", filepath.Join("/proj-yarn", "yarn.lock")}, + {"/proj-pnpm", "pnpm", filepath.Join("/proj-pnpm", "pnpm-lock.yaml")}, + {"/proj-bun", "bun", filepath.Join("/proj-bun", "bun.lockb")}, + {"/missing", "npm", ""}, + {"/proj-npm", "unknown", ""}, + } + for _, c := range cases { + got := lockfileFor(mock, c.dir, c.pm) + if got != c.want { + t.Errorf("lockfileFor(%q,%q): got %q, want %q", c.dir, c.pm, got, c.want) + } + } +} + +func TestCacheValidFor_HitWhenMtimesUnchanged(t *testing.T) { + mock := executor.NewMock() + mock.SetFileMtime(filepath.Join("/p", "package.json"), 100) + mock.SetFileMtime(filepath.Join("/p", "package-lock.json"), 100) + mock.SetFileMtime(filepath.Join("/p", "node_modules"), 100) + + entry := scanCacheEntry{PackageManager: "npm", LastScanUnix: 200, AgentVersion: buildinfo.Version} + if !cacheValidFor(mock, entry, "/p", "npm", buildinfo.Version, false) { + t.Error("expected hit when all mtimes <= LastScanUnix") + } +} + +func TestCacheValidFor_MissWhenLockfileNewer(t *testing.T) { + mock := executor.NewMock() + mock.SetFileMtime(filepath.Join("/p", "package.json"), 100) + mock.SetFileMtime(filepath.Join("/p", "package-lock.json"), 300) // newer than LastScanUnix + mock.SetFileMtime(filepath.Join("/p", "node_modules"), 100) + + entry := scanCacheEntry{PackageManager: "npm", LastScanUnix: 200, AgentVersion: buildinfo.Version} + if cacheValidFor(mock, entry, "/p", "npm", buildinfo.Version, false) { + t.Error("expected miss when lockfile mtime > LastScanUnix") + } +} + +func TestCacheValidFor_MissWhenNodeModulesNewer(t *testing.T) { + mock := executor.NewMock() + mock.SetFileMtime(filepath.Join("/p", "package.json"), 100) + mock.SetFileMtime(filepath.Join("/p", "package-lock.json"), 100) + mock.SetFileMtime(filepath.Join("/p", "node_modules"), 300) // user did `rm -rf node_modules/foo` + + entry := scanCacheEntry{PackageManager: "npm", LastScanUnix: 200, AgentVersion: buildinfo.Version} + if cacheValidFor(mock, entry, "/p", "npm", buildinfo.Version, false) { + t.Error("expected miss when node_modules mtime > LastScanUnix (rm -rf node_modules/foo case)") + } +} + +func TestCacheValidFor_MissWhenNoLockfile(t *testing.T) { + // Project without a lockfile — nothing to mtime-check authoritatively. + mock := executor.NewMock() + mock.SetFileMtime(filepath.Join("/p", "package.json"), 100) + + entry := scanCacheEntry{PackageManager: "npm", LastScanUnix: 200, AgentVersion: buildinfo.Version} + if cacheValidFor(mock, entry, "/p", "npm", buildinfo.Version, false) { + t.Error("expected miss when no lockfile present (can't trust mtimes)") + } +} + +func TestCacheValidFor_MissOnAgentVersionDrift(t *testing.T) { + mock := executor.NewMock() + mock.SetFileMtime(filepath.Join("/p", "package.json"), 100) + mock.SetFileMtime(filepath.Join("/p", "package-lock.json"), 100) + mock.SetFileMtime(filepath.Join("/p", "node_modules"), 100) + + entry := scanCacheEntry{PackageManager: "npm", LastScanUnix: 200, AgentVersion: "1.10.0"} + if cacheValidFor(mock, entry, "/p", "npm", "1.13.0", false) { + t.Error("expected miss on agent version drift") + } +} + +func TestCacheValidFor_MissOnPMChange(t *testing.T) { + mock := executor.NewMock() + mock.SetFileMtime(filepath.Join("/p", "yarn.lock"), 100) + mock.SetFileMtime(filepath.Join("/p", "package.json"), 100) + mock.SetFileMtime(filepath.Join("/p", "node_modules"), 100) + + entry := scanCacheEntry{PackageManager: "npm", LastScanUnix: 200, AgentVersion: buildinfo.Version} + if cacheValidFor(mock, entry, "/p", "yarn", buildinfo.Version, false) { + t.Error("expected miss when cached PM differs from current detection") + } +} + +func TestCacheValidFor_BypassShortCircuits(t *testing.T) { + mock := executor.NewMock() + mock.SetFileMtime(filepath.Join("/p", "package.json"), 100) + mock.SetFileMtime(filepath.Join("/p", "package-lock.json"), 100) + mock.SetFileMtime(filepath.Join("/p", "node_modules"), 100) + + entry := scanCacheEntry{PackageManager: "npm", LastScanUnix: 200, AgentVersion: buildinfo.Version} + if cacheValidFor(mock, entry, "/p", "npm", buildinfo.Version, true) { + t.Error("bypass=true must always miss") + } +} + +func TestPruneCacheToDiscovered_KeepsOnlyDiscovered(t *testing.T) { + c := newScanCache() + c.Projects["/a"] = scanCacheEntry{} + c.Projects["/b"] = scanCacheEntry{} + c.Projects["/c"] = scanCacheEntry{} + + pruneCacheToDiscovered(c, []projectEntry{{dir: "/a"}, {dir: "/c"}}) + + if _, ok := c.Projects["/a"]; !ok { + t.Error("/a should be kept") + } + if _, ok := c.Projects["/b"]; ok { + t.Error("/b should be pruned (not discovered)") + } + if _, ok := c.Projects["/c"]; !ok { + t.Error("/c should be kept") + } +} + +func TestScanWorkerCount_HonorsEnvOverride(t *testing.T) { + mock := executor.NewMock() + mock.SetEnv("STEPSEC_NODE_SCAN_WORKERS", "3") + if got := scanWorkerCount(mock); got != 3 { + t.Errorf("expected 3 workers from env, got %d", got) + } +} + +func TestScanWorkerCount_DefaultPositive(t *testing.T) { + mock := executor.NewMock() + if got := scanWorkerCount(mock); got < 1 { + t.Errorf("default worker count must be >= 1, got %d", got) + } +} diff --git a/internal/detector/nodescan_order_test.go b/internal/detector/nodescan_order_test.go new file mode 100644 index 0000000..4fdc621 --- /dev/null +++ b/internal/detector/nodescan_order_test.go @@ -0,0 +1,100 @@ +package detector + +import ( + "reflect" + "testing" + "time" +) + +func TestOrderScanProjects_NilMapFallsBackToMtimeDesc(t *testing.T) { + in := []projectEntry{ + {dir: "/old", modTime: 100}, + {dir: "/new", modTime: 300}, + {dir: "/mid", modTime: 200}, + } + got := orderScanProjects(in, nil) + want := []projectEntry{ + {dir: "/new", modTime: 300}, + {dir: "/mid", modTime: 200}, + {dir: "/old", modTime: 100}, + } + if !reflect.DeepEqual(got, want) { + t.Errorf("nil map: got %v, want %v", got, want) + } +} + +func TestOrderScanProjects_EmptyMapFallsBackToMtimeDesc(t *testing.T) { + in := []projectEntry{ + {dir: "/a", modTime: 100}, + {dir: "/b", modTime: 200}, + } + got := orderScanProjects(in, map[string]time.Time{}) + want := []projectEntry{ + {dir: "/b", modTime: 200}, + {dir: "/a", modTime: 100}, + } + if !reflect.DeepEqual(got, want) { + t.Errorf("empty map: got %v, want %v", got, want) + } +} + +func TestOrderScanProjects_UnknownFirstByMtime_KnownAfterByStaleness(t *testing.T) { + verified := func(year int) time.Time { + return time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC) + } + known := map[string]time.Time{ + "/known-fresh": verified(2026), + "/known-stale": verified(2024), + "/known-mid": verified(2025), + } + in := []projectEntry{ + {dir: "/known-mid", modTime: 100}, + {dir: "/unknown-new", modTime: 500}, + {dir: "/known-fresh", modTime: 600}, + {dir: "/unknown-old", modTime: 200}, + {dir: "/known-stale", modTime: 300}, + } + + got := orderScanProjects(in, known) + wantOrder := []string{ + "/unknown-new", // unknown, mtime 500 (highest) + "/unknown-old", // unknown, mtime 200 + "/known-stale", // known, verified 2024 (oldest) + "/known-mid", // known, verified 2025 + "/known-fresh", // known, verified 2026 (newest) + } + for i, want := range wantOrder { + if got[i].dir != want { + t.Errorf("position %d: got %q, want %q (full order: %v)", i, got[i].dir, want, got) + } + } +} + +func TestOrderScanProjects_AllKnown(t *testing.T) { + verified := func(year int) time.Time { + return time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC) + } + known := map[string]time.Time{ + "/a": verified(2025), + "/b": verified(2024), + } + in := []projectEntry{ + {dir: "/a", modTime: 100}, + {dir: "/b", modTime: 200}, + } + got := orderScanProjects(in, known) + if got[0].dir != "/b" || got[1].dir != "/a" { + t.Errorf("expected stalest-first ordering, got %v", got) + } +} + +func TestOrderScanProjects_AllUnknown(t *testing.T) { + in := []projectEntry{ + {dir: "/a", modTime: 100}, + {dir: "/b", modTime: 200}, + } + got := orderScanProjects(in, map[string]time.Time{"/other": time.Now()}) + if got[0].dir != "/b" || got[1].dir != "/a" { + t.Errorf("expected mtime-desc for all-unknown, got %v", got) + } +} diff --git a/internal/detector/nodescan_test.go b/internal/detector/nodescan_test.go index daffcb5..076882b 100644 --- a/internal/detector/nodescan_test.go +++ b/internal/detector/nodescan_test.go @@ -424,7 +424,7 @@ func TestNodeScanner_ScanProjects_DropsRecordsForMissingPM(t *testing.T) { mock.SetFile(filepath.Join(dirB, "package-lock.json"), []byte{}) scanner := newTestScanner(mock) - results := scanner.ScanProjects(context.Background(), []string{`C:\Users\dev`}) + results, _ := scanner.ScanProjects(context.Background(), []string{`C:\Users\dev`}, nil) if len(results) != 0 { t.Errorf("expected 0 telemetry records when PM not installed, got %d", len(results)) diff --git a/internal/detector/pythonpm_test.go b/internal/detector/pythonpm_test.go index 2d845f1..ba1aa95 100644 --- a/internal/detector/pythonpm_test.go +++ b/internal/detector/pythonpm_test.go @@ -111,7 +111,7 @@ func TestPythonProjectDetector_CountProjects(t *testing.T) { filepath.Join(dir, "project2", "venv", "bin", "pip"), "list", "--format", "json") det := NewPythonProjectDetector(mock) - projects := det.ListProjects([]string{dir}) + projects, _ := det.ListProjects([]string{dir}, nil) if len(projects) != 2 { t.Fatalf("expected 2 venv projects, got %d", len(projects)) @@ -137,7 +137,7 @@ func TestPythonProjectDetector_ArbitraryVenvName(t *testing.T) { filepath.Join(dir, "proj", "myenv", "bin", "pip"), "list", "--format", "json") det := NewPythonProjectDetector(mock) - projects := det.ListProjects([]string{dir}) + projects, _ := det.ListProjects([]string{dir}, nil) if len(projects) != 1 { t.Fatalf("expected 1 project, got %d", len(projects)) @@ -169,7 +169,7 @@ func TestPythonProjectDetector_MultipleVenvsSameParent(t *testing.T) { filepath.Join(dir, "proj", "venv-b", "bin", "pip"), "list", "--format", "json") det := NewPythonProjectDetector(mock) - projects := det.ListProjects([]string{dir}) + projects, _ := det.ListProjects([]string{dir}, nil) if len(projects) != 2 { t.Fatalf("expected 2 projects (one per venv), got %d", len(projects)) @@ -197,7 +197,7 @@ func TestPythonProjectDetector_LegacyVenvWithActivate(t *testing.T) { filepath.Join(dir, "proj", "env", "bin", "pip"), "list", "--format", "json") det := NewPythonProjectDetector(mock) - projects := det.ListProjects([]string{dir}) + projects, _ := det.ListProjects([]string{dir}, nil) if len(projects) != 1 { t.Fatalf("expected 1 project, got %d", len(projects)) @@ -215,7 +215,7 @@ func TestPythonProjectDetector_NotAVenv(t *testing.T) { mock.SetFile(filepath.Join(dir, "fake", "bin", "pip"), []byte("")) det := NewPythonProjectDetector(mock) - projects := det.ListProjects([]string{dir}) + projects, _ := det.ListProjects([]string{dir}, nil) if len(projects) != 0 { t.Fatalf("expected 0 projects, got %d", len(projects)) @@ -236,7 +236,7 @@ func TestPythonProjectDetector_WindowsLayout(t *testing.T) { filepath.Join(dir, "proj", ".venv", "Scripts", "pip.exe"), "list", "--format", "json") det := NewPythonProjectDetector(mock) - projects := det.ListProjects([]string{dir}) + projects, _ := det.ListProjects([]string{dir}, nil) if len(projects) != 1 { t.Fatalf("expected 1 project, got %d", len(projects)) @@ -261,7 +261,7 @@ func TestPythonProjectDetector_VenvWithoutPip(t *testing.T) { mock.SetFile(filepath.Join(dir, "proj", ".venv", "pyvenv.cfg"), []byte("")) det := NewPythonProjectDetector(mock) - projects := det.ListProjects([]string{dir}) + projects, _ := det.ListProjects([]string{dir}, nil) if len(projects) != 1 { t.Fatalf("expected 1 project (venv without pip), got %d", len(projects)) diff --git a/internal/detector/pythonproject.go b/internal/detector/pythonproject.go index 022a47c..cb084a4 100644 --- a/internal/detector/pythonproject.go +++ b/internal/detector/pythonproject.go @@ -5,6 +5,7 @@ import ( "encoding/json" "os" "path/filepath" + "sort" "strings" "time" @@ -34,25 +35,90 @@ func (d *PythonProjectDetector) WithSkipper(s *tcc.Skipper) *PythonProjectDetect // CountProjects counts Python projects with virtual environments. func (d *PythonProjectDetector) CountProjects(_ context.Context, searchDirs []string) int { - return len(d.ListProjects(searchDirs)) + projects, _ := d.ListProjects(searchDirs, nil) + return len(projects) +} + +// venvCandidate is one discovered virtual environment, captured before any +// per-venv pip list is run so the discovered set can be reordered by state. +type venvCandidate struct { + path string + pipPath string + pm string + modTime int64 } // ListProjects returns Python projects that have a virtual environment, // along with the packages installed in each venv. // -// Note: ProjectInfo.Path is the venv directory itself (not the project root, -// unlike Node detection). This lets multiple venvs under the same parent -// directory each surface as their own entry. The package_manager field is -// still derived from marker files in the parent directory. -func (d *PythonProjectDetector) ListProjects(searchDirs []string) []model.ProjectInfo { - var projects []model.ProjectInfo +// Ordering: paths absent from knownLastVerified come first by mtime +// descending; known paths follow by LastVerifiedAt ascending. Pass nil for +// discovery-order behavior. +// +// ProjectInfo.Path is the venv directory itself (not the project root, unlike +// node detection). Multiple venvs sharing a parent each surface as their own +// entry. PackageManager is derived from marker files in the parent. +// +// The second return is every venv path discovered on disk (before the cap), +// so callers can distinguish "missing from disk" from "dropped by the cap" +// when comparing against prior state. +func (d *PythonProjectDetector) ListProjects(searchDirs []string, knownLastVerified map[string]time.Time) (projects []model.ProjectInfo, discovered []string) { + var candidates []venvCandidate for _, dir := range searchDirs { - projects = append(projects, d.listInDir(dir)...) - if len(projects) >= maxPythonProjects { - return projects[:maxPythonProjects] + candidates = append(candidates, d.discoverInDir(dir)...) + } + + discovered = make([]string, 0, len(candidates)) + for _, c := range candidates { + discovered = append(discovered, c.path) + } + + candidates = orderVenvs(candidates, knownLastVerified) + + if len(candidates) > maxPythonProjects { + candidates = candidates[:maxPythonProjects] + } + + ctx := context.Background() + projects = make([]model.ProjectInfo, 0, len(candidates)) + for _, c := range candidates { + var pkgs []model.PackageDetail + if c.pipPath != "" { + pkgs = d.listVenvPackages(ctx, c.pipPath) + } + projects = append(projects, model.ProjectInfo{ + Path: c.path, + PackageManager: c.pm, + Packages: pkgs, + }) + } + return projects, discovered +} + +// orderVenvs prioritises never-seen venvs (mtime desc) before known venvs +// (LastVerifiedAt asc). A nil map (no state at all) preserves discovery order; +// an empty map means state exists but has no Python entries yet, so every +// candidate is unknown and still gets sorted by mtime desc. +func orderVenvs(candidates []venvCandidate, knownLastVerified map[string]time.Time) []venvCandidate { + if knownLastVerified == nil { + return candidates + } + unknown := make([]venvCandidate, 0, len(candidates)) + known := make([]venvCandidate, 0, len(candidates)) + for _, c := range candidates { + if _, ok := knownLastVerified[c.path]; ok { + known = append(known, c) + } else { + unknown = append(unknown, c) } } - return projects + sort.Slice(unknown, func(i, j int) bool { + return unknown[i].modTime > unknown[j].modTime + }) + sort.Slice(known, func(i, j int) bool { + return knownLastVerified[known[i].path].Before(knownLastVerified[known[j].path]) + }) + return append(unknown, known...) } // findPipInVenv returns the path to pip inside a venv-shaped dir, or "". @@ -128,9 +194,11 @@ var pythonPMFromMarker = map[string]string{ "requirements.txt": "pip", } -func (d *PythonProjectDetector) listInDir(dir string) []model.ProjectInfo { - ctx := context.Background() - var projects []model.ProjectInfo +// discoverInDir walks `dir` and returns every venv it finds, without running +// pip list. The two-phase split (discover → order → scan) lets the caller +// reorder via state before any pip list is run. +func (d *PythonProjectDetector) discoverInDir(dir string) []venvCandidate { + var found []venvCandidate _ = filepath.WalkDir(dir, func(path string, entry os.DirEntry, err error) error { if err != nil { return nil @@ -153,26 +221,19 @@ func (d *PythonProjectDetector) listInDir(dir string) []model.ProjectInfo { return nil } - // Each venv is reported as its own entry (Path = venv folder), so - // multiple venvs sharing a parent are all surfaced. package_manager - // detection runs against the parent dir. Skip descending into the - // venv tree regardless of pip presence — site-packages can be huge. - pm := d.detectPM(filepath.Dir(path)) - var pkgs []model.PackageDetail - if pipPath != "" { - pkgs = d.listVenvPackages(ctx, pipPath) + modTime := int64(0) + if info, infoErr := entry.Info(); infoErr == nil { + modTime = info.ModTime().Unix() } - projects = append(projects, model.ProjectInfo{ - Path: path, - PackageManager: pm, - Packages: pkgs, + found = append(found, venvCandidate{ + path: path, + pipPath: pipPath, + pm: d.detectPM(filepath.Dir(path)), + modTime: modTime, }) - if len(projects) >= maxPythonProjects { - return filepath.SkipAll - } return filepath.SkipDir }) - return projects + return found } // detectPM determines the package manager for a project directory based on lock/marker files. diff --git a/internal/executor/mock.go b/internal/executor/mock.go index 0016695..d933e21 100644 --- a/internal/executor/mock.go +++ b/internal/executor/mock.go @@ -120,6 +120,22 @@ func (m *Mock) SetFileInfo(path string, info os.FileInfo) { m.fileInfos[path] = info } +// SetFileMtime registers a stat result for `path` with a custom mtime +// (Unix seconds) AND marks the file as existing (FileExists returns true). +// Useful for cache-invalidation tests that need to assert behavior across +// mtime windows. +func (m *Mock) SetFileMtime(path string, unixSec int64) { + m.mu.Lock() + defer m.mu.Unlock() + m.fileInfos[path] = &mockFileInfo{ + name: filepath.Base(path), + modTime: time.Unix(unixSec, 0), + } + if _, ok := m.files[path]; !ok { + m.files[path] = []byte{} + } +} + func (m *Mock) SetPath(name, path string) { m.mu.Lock() defer m.mu.Unlock() @@ -373,15 +389,16 @@ func cmdKey(name string, args ...string) string { } type mockFileInfo struct { - name string - size int64 - dir bool + name string + size int64 + dir bool + modTime time.Time } func (fi *mockFileInfo) Name() string { return fi.name } func (fi *mockFileInfo) Size() int64 { return fi.size } func (fi *mockFileInfo) IsDir() bool { return fi.dir } -func (fi *mockFileInfo) ModTime() time.Time { return time.Time{} } +func (fi *mockFileInfo) ModTime() time.Time { return fi.modTime } func (fi *mockFileInfo) Mode() os.FileMode { return 0o644 } func (fi *mockFileInfo) Sys() any { return nil } diff --git a/internal/model/model.go b/internal/model/model.go index 99611e6..ec6bd57 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -122,6 +122,31 @@ type Summary struct { FlatpakPackagesCount int `json:"flatpak_packages_count"` } +// UnchangedProjectRef tells the backend a project is unchanged since the +// last successful upload. Backend bumps LastSeenAt on every package row +// whose ProjectPaths contains Path. +type UnchangedProjectRef struct { + Path string `json:"path"` + ScanOutputHash string `json:"scan_output_hash"` + LastUploadedExecutionID string `json:"last_uploaded_execution_id,omitempty"` +} + +// RemovedProjectRef tells the backend a project has disappeared from disk. +// Backend drops Path from every matching row's ProjectPaths and bumps +// RecordUpdatedAt but not LastSeenAt. +type RemovedProjectRef struct { + Path string `json:"path"` + LastUploadedExecutionID string `json:"last_uploaded_execution_id,omitempty"` +} + +// UnchangedGlobalRef tells the backend a PM's global package set is unchanged. +// Keyed by PM name (globals are PM-scoped, not path-scoped). +type UnchangedGlobalRef struct { + PackageManager string `json:"package_manager"` + ScanOutputHash string `json:"scan_output_hash"` + LastUploadedExecutionID string `json:"last_uploaded_execution_id,omitempty"` +} + // NodeScanResult holds raw scan output for enterprise telemetry. // Used for both global packages and per-project scans. type NodeScanResult struct { diff --git a/internal/paths/paths.go b/internal/paths/paths.go index 617d9ec..c7e4d12 100644 --- a/internal/paths/paths.go +++ b/internal/paths/paths.go @@ -140,3 +140,13 @@ func expandHome(s string) string { func LegacyHome() string { return config.LegacyDir() } + +// ScanStateFile returns the absolute path to scan-state.json, or "" when +// Home() is disabled. Callers must treat "" as "state tracking unavailable". +func ScanStateFile() string { + home := Home() + if home == "" { + return "" + } + return filepath.Join(home, "scan-state.json") +} diff --git a/internal/scan/scanner.go b/internal/scan/scanner.go index 75af40e..96a66ee 100644 --- a/internal/scan/scanner.go +++ b/internal/scan/scanner.go @@ -215,7 +215,7 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) error { log.StepStart("Scanning Python projects") start = time.Now() pyProjectDetector := detector.NewPythonProjectDetector(exec).WithSkipper(tccSkipper) - pythonProjects = pyProjectDetector.ListProjects(searchDirs) + pythonProjects, _ = pyProjectDetector.ListProjects(searchDirs, nil) log.StepDone(time.Since(start)) } else { log.StepStart("Python package scanning") diff --git a/internal/state/hash.go b/internal/state/hash.go new file mode 100644 index 0000000..41cbdd5 --- /dev/null +++ b/internal/state/hash.go @@ -0,0 +1,36 @@ +// Package state manages the device-side scan state used to skip re-uploading +// unchanged npm and Python project scans across telemetry runs. See the design +// proposal in docs/ (or memory) for the full protocol. +package state + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" +) + +const hashPrefix = "sha256:" + +// CanonicalHashJSON returns sha256: of `data`. When `data` is valid JSON it +// is re-marshaled with sorted map keys first, so two PM-version-induced key +// reorderings of the same logical output produce the same hash. When `data` is +// not valid JSON (e.g. malformed PM stdout, non-zero exit) the raw bytes are +// hashed instead — the design's invariant is "never silently produce no hash." +// The returned error is non-nil only to surface the parse failure for logging; +// the hash is always populated. +func CanonicalHashJSON(data []byte) (string, error) { + var v any + if err := json.Unmarshal(data, &v); err != nil { + return sum(data), err + } + canon, err := json.Marshal(v) + if err != nil { + return sum(data), err + } + return sum(canon), nil +} + +func sum(b []byte) string { + h := sha256.Sum256(b) + return hashPrefix + hex.EncodeToString(h[:]) +} diff --git a/internal/state/hash_test.go b/internal/state/hash_test.go new file mode 100644 index 0000000..934a07c --- /dev/null +++ b/internal/state/hash_test.go @@ -0,0 +1,84 @@ +package state + +import ( + "regexp" + "testing" +) + +var hashFormat = regexp.MustCompile(`^sha256:[0-9a-f]{64}$`) + +func TestCanonicalHashJSON_KeyReorderingStable(t *testing.T) { + a := []byte(`{"name":"lodash","version":"4.17.21","deps":{"a":"1","b":"2"}}`) + b := []byte(`{"deps":{"b":"2","a":"1"},"version":"4.17.21","name":"lodash"}`) + + ha, err := CanonicalHashJSON(a) + if err != nil { + t.Fatalf("a: %v", err) + } + hb, err := CanonicalHashJSON(b) + if err != nil { + t.Fatalf("b: %v", err) + } + if ha != hb { + t.Errorf("expected equal hashes for key-reordered JSON; got %s vs %s", ha, hb) + } +} + +func TestCanonicalHashJSON_ValueChangeDiffers(t *testing.T) { + a := []byte(`{"deps":{"lodash":"4.17.21"}}`) + b := []byte(`{"deps":{"lodash":"4.17.22"}}`) + + ha, _ := CanonicalHashJSON(a) + hb, _ := CanonicalHashJSON(b) + if ha == hb { + t.Errorf("expected different hashes for version bump; both %s", ha) + } +} + +func TestCanonicalHashJSON_ArrayChangeDiffers(t *testing.T) { + a := []byte(`{"pkgs":["a","b","c"]}`) + b := []byte(`{"pkgs":["a","b","c","d"]}`) + + ha, _ := CanonicalHashJSON(a) + hb, _ := CanonicalHashJSON(b) + if ha == hb { + t.Errorf("expected different hashes when array grows; both %s", ha) + } +} + +func TestCanonicalHashJSON_MalformedFallbackDeterministic(t *testing.T) { + bad := []byte(`{not json}`) + h1, err1 := CanonicalHashJSON(bad) + h2, err2 := CanonicalHashJSON(bad) + if err1 == nil || err2 == nil { + t.Error("expected non-nil error to surface JSON parse failure") + } + if h1 != h2 { + t.Errorf("fallback should be deterministic; %s vs %s", h1, h2) + } + if !hashFormat.MatchString(h1) { + t.Errorf("fallback hash format mismatch: %s", h1) + } +} + +func TestCanonicalHashJSON_OutputFormat(t *testing.T) { + h, err := CanonicalHashJSON([]byte(`{}`)) + if err != nil { + t.Fatal(err) + } + if !hashFormat.MatchString(h) { + t.Errorf("hash format mismatch: %s", h) + } +} + +func TestCanonicalHashJSON_ArrayOrderMatters(t *testing.T) { + // Dependency tree order is semantically significant for npm ls output — + // reordering peers can mean a re-resolution. Don't canonicalize arrays. + a := []byte(`["a","b","c"]`) + b := []byte(`["c","b","a"]`) + ha, _ := CanonicalHashJSON(a) + hb, _ := CanonicalHashJSON(b) + if ha == hb { + t.Errorf("array order must affect hash; both %s", ha) + } +} diff --git a/internal/state/scanrecord.go b/internal/state/scanrecord.go new file mode 100644 index 0000000..3ff30b1 --- /dev/null +++ b/internal/state/scanrecord.go @@ -0,0 +1,44 @@ +package state + +import ( + "encoding/base64" + "encoding/json" +) + +// ScanRecordFromBase64 builds a ScanRecord from a PM scan result whose raw +// stdout is base64-encoded (NodeScanResult.RawStdoutBase64). When the base64 +// decode fails the raw string is hashed instead so the record is never empty. +func ScanRecordFromBase64(path, pm, pmVersion, rawStdoutBase64 string, exitCode int) ScanRecord { + decoded, err := base64.StdEncoding.DecodeString(rawStdoutBase64) + if err != nil { + decoded = []byte(rawStdoutBase64) + } + hash, _ := CanonicalHashJSON(decoded) + return ScanRecord{ + Path: path, + Hash: hash, + PackageManager: pm, + PMVersion: pmVersion, + ExitCode: exitCode, + } +} + +// ScanRecordFromValue builds a ScanRecord by JSON-marshaling `value` and +// canonical-hashing the result. Used for ecosystems like Python where the +// scanner returns parsed package data instead of raw PM stdout. exitCode +// must reflect whether the upstream scan succeeded — failed scans are never +// cached so the next run retries them. +func ScanRecordFromValue(path, pm, pmVersion string, value any, exitCode int) ScanRecord { + raw, err := json.Marshal(value) + if err != nil { + raw = []byte{} + } + hash, _ := CanonicalHashJSON(raw) + return ScanRecord{ + Path: path, + Hash: hash, + PackageManager: pm, + PMVersion: pmVersion, + ExitCode: exitCode, + } +} diff --git a/internal/state/scanrecord_test.go b/internal/state/scanrecord_test.go new file mode 100644 index 0000000..3d33f1f --- /dev/null +++ b/internal/state/scanrecord_test.go @@ -0,0 +1,43 @@ +package state + +import ( + "encoding/base64" + "testing" +) + +func TestScanRecordFromBase64_HashesDecodedPayload(t *testing.T) { + raw := []byte(`{"name":"svc","version":"1.0.0"}`) + encoded := base64.StdEncoding.EncodeToString(raw) + wantHash, _ := CanonicalHashJSON(raw) + + r := ScanRecordFromBase64("/p", "npm", "10.2.0", encoded, 0) + if r.Hash != wantHash { + t.Errorf("hash mismatch: got %s want %s", r.Hash, wantHash) + } + if r.Path != "/p" || r.PackageManager != "npm" || r.PMVersion != "10.2.0" || r.ExitCode != 0 { + t.Errorf("metadata not propagated: %+v", r) + } +} + +func TestScanRecordFromBase64_InvalidBase64FallsBackToRawString(t *testing.T) { + bad := "not!valid!base64" + wantHash, _ := CanonicalHashJSON([]byte(bad)) + + r := ScanRecordFromBase64("/p", "npm", "", bad, 1) + if r.Hash != wantHash { + t.Errorf("fallback hash mismatch: got %s want %s", r.Hash, wantHash) + } + if r.ExitCode != 1 { + t.Errorf("exit code lost: %d", r.ExitCode) + } +} + +func TestScanRecordFromBase64_KeyReorderingStable(t *testing.T) { + a := base64.StdEncoding.EncodeToString([]byte(`{"a":1,"b":2}`)) + b := base64.StdEncoding.EncodeToString([]byte(`{"b":2,"a":1}`)) + ra := ScanRecordFromBase64("/p", "npm", "", a, 0) + rb := ScanRecordFromBase64("/p", "npm", "", b, 0) + if ra.Hash != rb.Hash { + t.Errorf("key-reordered JSON should hash equal: %s vs %s", ra.Hash, rb.Hash) + } +} diff --git a/internal/state/state.go b/internal/state/state.go new file mode 100644 index 0000000..92b7d81 --- /dev/null +++ b/internal/state/state.go @@ -0,0 +1,451 @@ +package state + +import ( + "encoding/json" + "errors" + "os" + "path/filepath" + "sort" + "time" +) + +// SchemaVersion is the on-disk format version. Bump when the JSON shape changes +// in a way Load can't transparently migrate. Older versions are treated as +// empty state — a corrupt or unmigratable file must never break a scan. +const SchemaVersion = 1 + +// Ecosystem identifiers used for routing in Reconcile / Partition / Commit. +const ( + EcosystemNPM = "npm" + EcosystemPython = "python" +) + +// DefaultFullSyncHorizon is the maximum gap between successful uploads before +// the next run is forced to treat every discovered project as `changed`. +// Counters push-only drift: agent cannot detect backend data loss, so it +// re-asserts the full picture periodically. +const DefaultFullSyncHorizon = 7 * 24 * time.Hour + +// ProjectEntry is one project's last-successfully-uploaded record. Used for +// both npm and Python project maps in State. +type ProjectEntry struct { + ScanOutputHash string `json:"scan_output_hash"` + LastUploadedExecutionID string `json:"last_uploaded_execution_id"` + LastUploadedAt time.Time `json:"last_uploaded_at"` + LastVerifiedAt time.Time `json:"last_verified_at"` + FirstSeenAt time.Time `json:"first_seen_at"` + PackageManager string `json:"pm"` + PMVersion string `json:"pm_version,omitempty"` +} + +// GlobalEntry is one PM's last-successfully-uploaded global-packages record. +// Keyed by PM name in State.NPMGlobal / State.PythonGlobal. +type GlobalEntry struct { + ScanOutputHash string `json:"scan_output_hash"` + LastUploadedExecutionID string `json:"last_uploaded_execution_id"` + LastUploadedAt time.Time `json:"last_uploaded_at"` + LastVerifiedAt time.Time `json:"last_verified_at"` +} + +// PendingRemoval is a project that has disappeared from disk but whose +// corresponding `removed` payload entry has not yet been confirmed by the +// backend. Survives across runs until AckRemovals drops it. +type PendingRemoval struct { + Path string `json:"path"` + Ecosystem string `json:"ecosystem"` + RemovedAt time.Time `json:"removed_at"` +} + +// State is the on-disk envelope. Marshaled as scan-state.json. +type State struct { + SchemaVersion int `json:"schema_version"` + AgentVersion string `json:"agent_version"` + LastFullSyncAt time.Time `json:"last_full_sync_at"` + LastSuccessfulExecutionID string `json:"last_successful_execution_id,omitempty"` + NPMProjects map[string]ProjectEntry `json:"npm_projects"` + PythonProjects map[string]ProjectEntry `json:"python_projects"` + NPMGlobal map[string]GlobalEntry `json:"npm_global"` + PythonGlobal map[string]GlobalEntry `json:"python_global"` + RemovedPendingAck []PendingRemoval `json:"removed_pending_ack"` +} + +// New returns a zero-value state stamped with the running agent version. Used +// when Load finds no file or refuses to migrate from a foreign schema. +func New(agentVersion string) *State { + return &State{ + SchemaVersion: SchemaVersion, + AgentVersion: agentVersion, + NPMProjects: map[string]ProjectEntry{}, + PythonProjects: map[string]ProjectEntry{}, + NPMGlobal: map[string]GlobalEntry{}, + PythonGlobal: map[string]GlobalEntry{}, + RemovedPendingAck: []PendingRemoval{}, + } +} + +// Load reads scan-state.json. Missing file, parse error, or schema mismatch +// returns a fresh empty state — the next run becomes a full sync naturally. +// The returned error is non-nil only for read or parse failures (to surface +// the cause for logging); missing file and schema mismatch return a nil error +// because those are expected fall-throughs. +func Load(path, agentVersion string) (*State, error) { + cleanedPath := filepath.Clean(path) + data, err := os.ReadFile(cleanedPath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return New(agentVersion), nil + } + return New(agentVersion), err + } + var s State + if err := json.Unmarshal(data, &s); err != nil { + return New(agentVersion), err + } + if s.SchemaVersion != SchemaVersion { + return New(agentVersion), nil + } + // Nil maps after unmarshal of `null` or missing fields — normalize so + // callers never have to nil-check. + if s.NPMProjects == nil { + s.NPMProjects = map[string]ProjectEntry{} + } + if s.PythonProjects == nil { + s.PythonProjects = map[string]ProjectEntry{} + } + if s.NPMGlobal == nil { + s.NPMGlobal = map[string]GlobalEntry{} + } + if s.PythonGlobal == nil { + s.PythonGlobal = map[string]GlobalEntry{} + } + return &s, nil +} + +// Save writes scan-state.json atomically: write tmp sibling, fsync, rename. +// On any error before the rename the original file is untouched. +func (s *State) Save(path string) error { + cleanedPath := filepath.Clean(path) + dir := filepath.Dir(cleanedPath) + if err := os.MkdirAll(dir, 0o750); err != nil { + return err + } + data, err := json.MarshalIndent(s, "", " ") + if err != nil { + return err + } + tmp, err := os.CreateTemp(dir, ".scan-state-*.tmp") + if err != nil { + return err + } + tmpPath := tmp.Name() + if _, err := tmp.Write(data); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpPath) + return err + } + if err := tmp.Sync(); err != nil { + _ = tmp.Close() + _ = os.Remove(tmpPath) + return err + } + if err := tmp.Close(); err != nil { + _ = os.Remove(tmpPath) + return err + } + // Windows os.Rename fails when the destination already exists, so the + // second save would silently stop overwriting. POSIX rename atomically + // replaces, making this Remove a no-op there. Same pattern as + // internal/progress/filelog/filelog.go RotateIfOverCap. + _ = os.Remove(cleanedPath) + if err := os.Rename(tmpPath, cleanedPath); err != nil { + _ = os.Remove(tmpPath) + return err + } + return nil +} + +// IsFullSyncDue returns true when the next upload must include every +// discovered project regardless of hash match. Triggered by: +// - LastFullSyncAt older than `horizon` +// - AgentVersion in state differs from the running binary (payload-format +// drift insurance after agent upgrades). +func (s *State) IsFullSyncDue(now time.Time, runningAgentVersion string, horizon time.Duration) bool { + if s.AgentVersion != runningAgentVersion { + return true + } + if s.LastFullSyncAt.IsZero() { + return true + } + return now.Sub(s.LastFullSyncAt) >= horizon +} + +// Reconcile splits the discovered project set against per-ecosystem state. +// +// unknown — discovered AND not in state; caller orders by mtime desc. +// known — discovered AND in state; returned sorted by LastVerifiedAt +// ascending so the planner re-verifies the staleest first. +// removed — in state AND not in discovered. +// +// Returned slices are independent; the caller may sort/cap them freely. +func (s *State) Reconcile(ecosystem string, discovered []string) (unknown, known, removed []string) { + entries := s.projectMap(ecosystem) + discoveredSet := make(map[string]struct{}, len(discovered)) + for _, p := range discovered { + discoveredSet[p] = struct{}{} + if _, ok := entries[p]; ok { + known = append(known, p) + } else { + unknown = append(unknown, p) + } + } + for p := range entries { + if _, ok := discoveredSet[p]; !ok { + removed = append(removed, p) + } + } + sort.Slice(known, func(i, j int) bool { + return entries[known[i]].LastVerifiedAt.Before(entries[known[j]].LastVerifiedAt) + }) + sort.Strings(removed) + return unknown, known, removed +} + +// ScanRecord is the caller's per-project result fed into Partition and Commit. +// ExitCode == 0 means the PM CLI succeeded; failed scans are never cached so +// the next run retries them. +type ScanRecord struct { + Path string + Hash string + PackageManager string + PMVersion string + ExitCode int +} + +// Partition classifies scanned project records against the stored hash. +// A record is `changed` when no prior entry exists OR the stored hash differs. +// A record is `unchanged` when stored hash matches the scanned hash. Failed +// scans (ExitCode != 0) always count as `changed` — we can't claim the prior +// snapshot is still accurate. `fullSync` forces every record into `changed`. +func (s *State) Partition(ecosystem string, scanned []ScanRecord, fullSync bool) (changed, unchanged []string) { + entries := s.projectMap(ecosystem) + for _, r := range scanned { + if fullSync || r.ExitCode != 0 { + changed = append(changed, r.Path) + continue + } + prev, ok := entries[r.Path] + if !ok || prev.ScanOutputHash != r.Hash { + changed = append(changed, r.Path) + continue + } + unchanged = append(unchanged, r.Path) + } + return changed, unchanged +} + +// GlobalRecord is the caller's per-PM globals result for npm/Python globals. +type GlobalRecord struct { + PM string + Hash string + ExitCode int +} + +// PartitionGlobals classifies global-scan records by PM. Same rules as +// Partition — failed scans always `changed`, fullSync forces `changed`. +func (s *State) PartitionGlobals(ecosystem string, scanned []GlobalRecord, fullSync bool) (changed, unchanged []string) { + entries := s.globalMap(ecosystem) + for _, r := range scanned { + if fullSync || r.ExitCode != 0 { + changed = append(changed, r.PM) + continue + } + prev, ok := entries[r.PM] + if !ok || prev.ScanOutputHash != r.Hash { + changed = append(changed, r.PM) + continue + } + unchanged = append(unchanged, r.PM) + } + return changed, unchanged +} + +// MarkRemovedPending appends paths to RemovedPendingAck (dedup by path + +// ecosystem). Called once per run, before payload assembly, so the upload's +// `removed` list is built from the pending-ack tail rather than this run's +// fresh diff alone — that's what survives the commit-after-confirm ordering. +func (s *State) MarkRemovedPending(ecosystem string, paths []string, now time.Time) { + existing := make(map[string]struct{}, len(s.RemovedPendingAck)) + for _, e := range s.RemovedPendingAck { + existing[e.Ecosystem+"|"+e.Path] = struct{}{} + } + for _, p := range paths { + key := ecosystem + "|" + p + if _, ok := existing[key]; ok { + continue + } + s.RemovedPendingAck = append(s.RemovedPendingAck, PendingRemoval{ + Path: p, + Ecosystem: ecosystem, + RemovedAt: now, + }) + existing[key] = struct{}{} + } +} + +// PendingRemovalsFor returns the current pending-ack entries for an ecosystem. +// Callers use the returned slice to build the payload's `removed` list and +// then pass it back to AckRemovals after confirm-upload succeeds. +func (s *State) PendingRemovalsFor(ecosystem string) []PendingRemoval { + out := make([]PendingRemoval, 0, len(s.RemovedPendingAck)) + for _, e := range s.RemovedPendingAck { + if e.Ecosystem == ecosystem { + out = append(out, e) + } + } + return out +} + +// AckRemovals drops the given (ecosystem, path) pairs from RemovedPendingAck. +// Called only after confirm-upload returns 200 for an upload that included +// those removals. +func (s *State) AckRemovals(acked []PendingRemoval) { + if len(acked) == 0 { + return + } + drop := make(map[string]struct{}, len(acked)) + for _, e := range acked { + drop[e.Ecosystem+"|"+e.Path] = struct{}{} + } + kept := s.RemovedPendingAck[:0] + for _, e := range s.RemovedPendingAck { + if _, ok := drop[e.Ecosystem+"|"+e.Path]; ok { + continue + } + kept = append(kept, e) + } + s.RemovedPendingAck = kept +} + +// CommitAfterUpload applies a successful upload's effects to the in-memory +// state. The caller must call Save afterward to persist. Effects: +// +// - For every scanned record with ExitCode == 0: upsert the project entry +// with this run's hash, execution_id, and timestamps. FirstSeenAt is +// preserved when present; LastVerifiedAt is always bumped. +// - LastSuccessfulExecutionID and AgentVersion are stamped. +// - If fullSync: refresh LastFullSyncAt. +// +// Failed scans are intentionally not touched — next run retries them and the +// previous successful hash (if any) stays valid for the unchanged-skip path. +func (s *State) CommitAfterUpload( + now time.Time, + executionID, runningAgentVersion string, + npmScanned, pythonScanned []ScanRecord, + npmGlobals, pythonGlobals []GlobalRecord, + fullSync bool, +) { + s.commitProjects(EcosystemNPM, npmScanned, now, executionID) + s.commitProjects(EcosystemPython, pythonScanned, now, executionID) + s.commitGlobals(EcosystemNPM, npmGlobals, now, executionID) + s.commitGlobals(EcosystemPython, pythonGlobals, now, executionID) + s.LastSuccessfulExecutionID = executionID + s.AgentVersion = runningAgentVersion + if fullSync { + s.LastFullSyncAt = now + } +} + +func (s *State) commitProjects(ecosystem string, scanned []ScanRecord, now time.Time, executionID string) { + entries := s.projectMap(ecosystem) + for _, r := range scanned { + if r.ExitCode != 0 { + continue + } + prev, existed := entries[r.Path] + next := ProjectEntry{ + ScanOutputHash: r.Hash, + LastUploadedExecutionID: executionID, + LastUploadedAt: now, + LastVerifiedAt: now, + PackageManager: r.PackageManager, + PMVersion: r.PMVersion, + } + if existed && !prev.FirstSeenAt.IsZero() { + next.FirstSeenAt = prev.FirstSeenAt + } else { + next.FirstSeenAt = now + } + entries[r.Path] = next + } +} + +func (s *State) commitGlobals(ecosystem string, scanned []GlobalRecord, now time.Time, executionID string) { + entries := s.globalMap(ecosystem) + for _, r := range scanned { + if r.ExitCode != 0 { + continue + } + entries[r.PM] = GlobalEntry{ + ScanOutputHash: r.Hash, + LastUploadedExecutionID: executionID, + LastUploadedAt: now, + LastVerifiedAt: now, + } + } +} + +// BumpVerified updates LastVerifiedAt on entries whose hash matched (the +// `unchanged` bucket from Partition) without otherwise touching them. Lets the +// planner pick the staleest-verified known project next run. +func (s *State) BumpVerified(ecosystem string, paths []string, now time.Time) { + entries := s.projectMap(ecosystem) + for _, p := range paths { + if e, ok := entries[p]; ok { + e.LastVerifiedAt = now + entries[p] = e + } + } +} + +// BumpVerifiedGlobals updates LastVerifiedAt on global entries whose hash +// matched. +func (s *State) BumpVerifiedGlobals(ecosystem string, pms []string, now time.Time) { + entries := s.globalMap(ecosystem) + for _, pm := range pms { + if e, ok := entries[pm]; ok { + e.LastVerifiedAt = now + entries[pm] = e + } + } +} + +// DropRemovedFromProjects removes acknowledged-removed paths from the project +// map after AckRemovals drops them from pending. Callers running the full +// commit-after-confirm sequence should invoke this once per ecosystem with +// the paths that the backend just acked. +func (s *State) DropRemovedFromProjects(ecosystem string, paths []string) { + entries := s.projectMap(ecosystem) + for _, p := range paths { + delete(entries, p) + } +} + +func (s *State) projectMap(ecosystem string) map[string]ProjectEntry { + switch ecosystem { + case EcosystemNPM: + return s.NPMProjects + case EcosystemPython: + return s.PythonProjects + } + panic("state: unknown ecosystem " + ecosystem) +} + +func (s *State) globalMap(ecosystem string) map[string]GlobalEntry { + switch ecosystem { + case EcosystemNPM: + return s.NPMGlobal + case EcosystemPython: + return s.PythonGlobal + } + panic("state: unknown ecosystem " + ecosystem) +} diff --git a/internal/state/state_test.go b/internal/state/state_test.go new file mode 100644 index 0000000..37bcad9 --- /dev/null +++ b/internal/state/state_test.go @@ -0,0 +1,419 @@ +package state + +import ( + "encoding/json" + "os" + "path/filepath" + "sort" + "testing" + "time" +) + +const runningAgentVersion = "1.6.0" + +func tempStatePath(t *testing.T) string { + t.Helper() + return filepath.Join(t.TempDir(), "scan-state.json") +} + +// freshScan simulates one project's scan record. +func freshScan(path, hash string) ScanRecord { + return ScanRecord{Path: path, Hash: hash, PackageManager: "npm", PMVersion: "10.2.0", ExitCode: 0} +} + +func sortedEq(a, b []string) bool { + if len(a) != len(b) { + return false + } + x := append([]string(nil), a...) + y := append([]string(nil), b...) + sort.Strings(x) + sort.Strings(y) + for i := range x { + if x[i] != y[i] { + return false + } + } + return true +} + +func TestLoad_MissingFileReturnsEmpty(t *testing.T) { + s, err := Load(tempStatePath(t), runningAgentVersion) + if err != nil { + t.Fatalf("missing file should not error: %v", err) + } + if s == nil || s.SchemaVersion != SchemaVersion { + t.Errorf("expected fresh state with version %d, got %+v", SchemaVersion, s) + } + if len(s.NPMProjects)+len(s.PythonProjects)+len(s.NPMGlobal)+len(s.PythonGlobal) != 0 { + t.Errorf("expected empty maps on miss") + } +} + +func TestLoad_CorruptReturnsEmpty(t *testing.T) { + path := tempStatePath(t) + if err := os.WriteFile(path, []byte("not json"), 0o644); err != nil { + t.Fatal(err) + } + s, err := Load(path, runningAgentVersion) + if err == nil { + t.Error("expected parse error to surface") + } + if s == nil || len(s.NPMProjects) != 0 { + t.Errorf("expected empty fallback state on corrupt file, got %+v", s) + } +} + +func TestLoad_WrongSchemaVersionReturnsEmpty(t *testing.T) { + path := tempStatePath(t) + body, _ := json.Marshal(map[string]any{ + "schema_version": 999, + "npm_projects": map[string]any{"/x": map[string]any{"scan_output_hash": "sha256:xx"}}, + }) + if err := os.WriteFile(path, body, 0o644); err != nil { + t.Fatal(err) + } + s, err := Load(path, runningAgentVersion) + if err != nil { + t.Fatalf("schema mismatch should not error: %v", err) + } + if len(s.NPMProjects) != 0 { + t.Errorf("expected empty state on schema mismatch") + } +} + +func TestSaveLoad_RoundTrip(t *testing.T) { + path := tempStatePath(t) + now := time.Date(2026, 5, 1, 12, 0, 0, 0, time.UTC) + + orig := New(runningAgentVersion) + orig.NPMProjects["/svc"] = ProjectEntry{ + ScanOutputHash: "sha256:abc", + LastUploadedExecutionID: "exec-1", + LastUploadedAt: now, + LastVerifiedAt: now, + FirstSeenAt: now, + PackageManager: "npm", + PMVersion: "10.2.0", + } + orig.LastFullSyncAt = now + orig.LastSuccessfulExecutionID = "exec-1" + + if err := orig.Save(path); err != nil { + t.Fatalf("save: %v", err) + } + loaded, err := Load(path, runningAgentVersion) + if err != nil { + t.Fatalf("load: %v", err) + } + got, ok := loaded.NPMProjects["/svc"] + if !ok { + t.Fatal("missing /svc after reload") + } + if got.ScanOutputHash != "sha256:abc" || !got.LastUploadedAt.Equal(now) { + t.Errorf("entry roundtrip lost data: %+v", got) + } + if !loaded.LastFullSyncAt.Equal(now) || loaded.LastSuccessfulExecutionID != "exec-1" { + t.Errorf("envelope fields lost: %+v", loaded) + } +} + +// Reconcile cases --------------------------------------------------------- + +func TestReconcile_FirstRunEverythingUnknown(t *testing.T) { + s := New(runningAgentVersion) + unknown, known, removed := s.Reconcile(EcosystemNPM, []string{"/a", "/b"}) + if !sortedEq(unknown, []string{"/a", "/b"}) { + t.Errorf("unknown=%v", unknown) + } + if len(known) != 0 || len(removed) != 0 { + t.Errorf("known=%v removed=%v", known, removed) + } +} + +func TestReconcile_RemovedDetected(t *testing.T) { + s := New(runningAgentVersion) + s.NPMProjects["/a"] = ProjectEntry{ScanOutputHash: "sha256:1"} + s.NPMProjects["/b"] = ProjectEntry{ScanOutputHash: "sha256:2"} + + _, _, removed := s.Reconcile(EcosystemNPM, []string{"/a"}) + if !sortedEq(removed, []string{"/b"}) { + t.Errorf("expected /b removed, got %v", removed) + } +} + +func TestReconcile_KnownSortedByStaleness(t *testing.T) { + s := New(runningAgentVersion) + old := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + mid := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC) + new := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC) + s.NPMProjects["/recent"] = ProjectEntry{LastVerifiedAt: new} + s.NPMProjects["/oldest"] = ProjectEntry{LastVerifiedAt: old} + s.NPMProjects["/middle"] = ProjectEntry{LastVerifiedAt: mid} + + _, known, _ := s.Reconcile(EcosystemNPM, []string{"/recent", "/oldest", "/middle"}) + if known[0] != "/oldest" || known[1] != "/middle" || known[2] != "/recent" { + t.Errorf("expected oldest-first ordering, got %v", known) + } +} + +// Partition cases --------------------------------------------------------- + +func TestPartition_NewProjectIsChanged(t *testing.T) { + s := New(runningAgentVersion) + changed, unchanged := s.Partition(EcosystemNPM, []ScanRecord{freshScan("/a", "sha256:x")}, false) + if !sortedEq(changed, []string{"/a"}) || len(unchanged) != 0 { + t.Errorf("changed=%v unchanged=%v", changed, unchanged) + } +} + +func TestPartition_MatchingHashIsUnchanged(t *testing.T) { + s := New(runningAgentVersion) + s.NPMProjects["/a"] = ProjectEntry{ScanOutputHash: "sha256:x"} + + changed, unchanged := s.Partition(EcosystemNPM, []ScanRecord{freshScan("/a", "sha256:x")}, false) + if len(changed) != 0 || !sortedEq(unchanged, []string{"/a"}) { + t.Errorf("changed=%v unchanged=%v", changed, unchanged) + } +} + +func TestPartition_HashDiffIsChanged(t *testing.T) { + s := New(runningAgentVersion) + s.NPMProjects["/a"] = ProjectEntry{ScanOutputHash: "sha256:old"} + + changed, unchanged := s.Partition(EcosystemNPM, []ScanRecord{freshScan("/a", "sha256:new")}, false) + if !sortedEq(changed, []string{"/a"}) || len(unchanged) != 0 { + t.Errorf("changed=%v unchanged=%v", changed, unchanged) + } +} + +func TestPartition_FullSyncForcesChanged(t *testing.T) { + s := New(runningAgentVersion) + s.NPMProjects["/a"] = ProjectEntry{ScanOutputHash: "sha256:x"} + + changed, unchanged := s.Partition(EcosystemNPM, []ScanRecord{freshScan("/a", "sha256:x")}, true) + if !sortedEq(changed, []string{"/a"}) || len(unchanged) != 0 { + t.Errorf("full sync should force changed; got changed=%v unchanged=%v", changed, unchanged) + } +} + +func TestPartition_FailedScanIsChanged(t *testing.T) { + s := New(runningAgentVersion) + s.NPMProjects["/a"] = ProjectEntry{ScanOutputHash: "sha256:x"} + rec := freshScan("/a", "sha256:x") + rec.ExitCode = 1 + + changed, unchanged := s.Partition(EcosystemNPM, []ScanRecord{rec}, false) + if !sortedEq(changed, []string{"/a"}) || len(unchanged) != 0 { + t.Errorf("failed scan must not collapse to unchanged; changed=%v unchanged=%v", changed, unchanged) + } +} + +// Full-sync rules --------------------------------------------------------- + +func TestIsFullSyncDue_FirstEverRun(t *testing.T) { + s := New(runningAgentVersion) + if !s.IsFullSyncDue(time.Now(), runningAgentVersion, DefaultFullSyncHorizon) { + t.Error("first run (zero LastFullSyncAt) should force full sync") + } +} + +func TestIsFullSyncDue_AgentVersionChange(t *testing.T) { + s := New(runningAgentVersion) + s.LastFullSyncAt = time.Now() + if !s.IsFullSyncDue(time.Now(), "1.7.0", DefaultFullSyncHorizon) { + t.Error("agent version drift should force full sync") + } +} + +func TestIsFullSyncDue_HorizonElapsed(t *testing.T) { + s := New(runningAgentVersion) + now := time.Now() + s.LastFullSyncAt = now.Add(-8 * 24 * time.Hour) + if !s.IsFullSyncDue(now, runningAgentVersion, DefaultFullSyncHorizon) { + t.Error("8 days > 7-day horizon should force full sync") + } +} + +func TestIsFullSyncDue_RecentNotDue(t *testing.T) { + s := New(runningAgentVersion) + now := time.Now() + s.LastFullSyncAt = now.Add(-time.Hour) + if s.IsFullSyncDue(now, runningAgentVersion, DefaultFullSyncHorizon) { + t.Error("recent full sync should not force another") + } +} + +// Removal-pending lifecycle ---------------------------------------------- + +func TestMarkRemovedPending_DedupsAcrossCalls(t *testing.T) { + s := New(runningAgentVersion) + now := time.Now() + s.MarkRemovedPending(EcosystemNPM, []string{"/a", "/b"}, now) + s.MarkRemovedPending(EcosystemNPM, []string{"/a", "/c"}, now) + if len(s.RemovedPendingAck) != 3 { + t.Errorf("expected dedup to yield 3 entries, got %d: %+v", len(s.RemovedPendingAck), s.RemovedPendingAck) + } +} + +func TestPendingRemovalsFor_FiltersByEcosystem(t *testing.T) { + s := New(runningAgentVersion) + now := time.Now() + s.MarkRemovedPending(EcosystemNPM, []string{"/a"}, now) + s.MarkRemovedPending(EcosystemPython, []string{"/p"}, now) + + if got := s.PendingRemovalsFor(EcosystemNPM); len(got) != 1 || got[0].Path != "/a" { + t.Errorf("npm filter: %+v", got) + } + if got := s.PendingRemovalsFor(EcosystemPython); len(got) != 1 || got[0].Path != "/p" { + t.Errorf("python filter: %+v", got) + } +} + +func TestAckRemovals_DropsExactMatches(t *testing.T) { + s := New(runningAgentVersion) + now := time.Now() + s.MarkRemovedPending(EcosystemNPM, []string{"/a", "/b"}, now) + s.MarkRemovedPending(EcosystemPython, []string{"/a"}, now) // same path, different ecosystem + + s.AckRemovals([]PendingRemoval{{Ecosystem: EcosystemNPM, Path: "/a"}}) + if len(s.RemovedPendingAck) != 2 { + t.Errorf("expected 2 remaining after one ack, got %d: %+v", len(s.RemovedPendingAck), s.RemovedPendingAck) + } + for _, e := range s.RemovedPendingAck { + if e.Ecosystem == EcosystemNPM && e.Path == "/a" { + t.Error("acked entry still present") + } + } +} + +// Commit semantics -------------------------------------------------------- + +func TestCommitAfterUpload_NewProjectStored(t *testing.T) { + s := New(runningAgentVersion) + now := time.Date(2026, 5, 1, 12, 0, 0, 0, time.UTC) + s.CommitAfterUpload(now, "exec-1", runningAgentVersion, + []ScanRecord{freshScan("/a", "sha256:x")}, nil, nil, nil, false) + + e := s.NPMProjects["/a"] + if e.ScanOutputHash != "sha256:x" || e.LastUploadedExecutionID != "exec-1" || !e.FirstSeenAt.Equal(now) { + t.Errorf("commit lost data: %+v", e) + } +} + +func TestCommitAfterUpload_PreservesFirstSeen(t *testing.T) { + s := New(runningAgentVersion) + t0 := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + t1 := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC) + s.NPMProjects["/a"] = ProjectEntry{ + ScanOutputHash: "sha256:old", + FirstSeenAt: t0, + } + s.CommitAfterUpload(t1, "exec-2", runningAgentVersion, + []ScanRecord{freshScan("/a", "sha256:new")}, nil, nil, nil, false) + + if !s.NPMProjects["/a"].FirstSeenAt.Equal(t0) { + t.Errorf("FirstSeenAt should not move on update, got %v", s.NPMProjects["/a"].FirstSeenAt) + } +} + +func TestCommitAfterUpload_SkipsFailedScans(t *testing.T) { + s := New(runningAgentVersion) + s.NPMProjects["/a"] = ProjectEntry{ScanOutputHash: "sha256:old"} + + bad := freshScan("/a", "sha256:doesnt-matter") + bad.ExitCode = 1 + s.CommitAfterUpload(time.Now(), "exec-2", runningAgentVersion, + []ScanRecord{bad}, nil, nil, nil, false) + + if s.NPMProjects["/a"].ScanOutputHash != "sha256:old" { + t.Errorf("failed scan must not overwrite cached hash; got %s", s.NPMProjects["/a"].ScanOutputHash) + } +} + +func TestCommitAfterUpload_RefreshesFullSyncTimestamp(t *testing.T) { + s := New(runningAgentVersion) + now := time.Date(2026, 5, 1, 12, 0, 0, 0, time.UTC) + s.CommitAfterUpload(now, "exec-1", runningAgentVersion, nil, nil, nil, nil, true) + if !s.LastFullSyncAt.Equal(now) { + t.Errorf("full sync flag should refresh LastFullSyncAt, got %v", s.LastFullSyncAt) + } +} + +func TestBumpVerified_OnlyTouchesVerifiedAt(t *testing.T) { + s := New(runningAgentVersion) + t0 := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + t1 := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC) + s.NPMProjects["/a"] = ProjectEntry{ + ScanOutputHash: "sha256:x", LastUploadedAt: t0, LastVerifiedAt: t0, FirstSeenAt: t0, + } + s.BumpVerified(EcosystemNPM, []string{"/a"}, t1) + + got := s.NPMProjects["/a"] + if !got.LastVerifiedAt.Equal(t1) { + t.Errorf("LastVerifiedAt: got %v want %v", got.LastVerifiedAt, t1) + } + if !got.LastUploadedAt.Equal(t0) { + t.Errorf("LastUploadedAt must not move on bump-verify; got %v", got.LastUploadedAt) + } +} + +// Cap-dropped projects ---------------------------------------------------- + +func TestPartition_CapDroppedEntriesUntouched(t *testing.T) { + // State has A, B, C. Discovery returns A, B, C. Scanner runs A and B + // only (cap). The state entry for C must be preserved without appearing + // in any payload bucket — Partition only sees A and B. + s := New(runningAgentVersion) + s.NPMProjects["/a"] = ProjectEntry{ScanOutputHash: "sha256:x"} + s.NPMProjects["/b"] = ProjectEntry{ScanOutputHash: "sha256:y"} + s.NPMProjects["/c"] = ProjectEntry{ScanOutputHash: "sha256:z"} + + scanned := []ScanRecord{freshScan("/a", "sha256:x"), freshScan("/b", "sha256:y")} + changed, unchanged := s.Partition(EcosystemNPM, scanned, false) + if len(changed) != 0 { + t.Errorf("expected no changes; got %v", changed) + } + if !sortedEq(unchanged, []string{"/a", "/b"}) { + t.Errorf("expected /a /b unchanged; got %v", unchanged) + } + if _, ok := s.NPMProjects["/c"]; !ok { + t.Error("/c must remain in state when cap dropped it") + } +} + +// Cross-ecosystem isolation ---------------------------------------------- + +func TestPartition_SamePathDifferentEcosystems(t *testing.T) { + s := New(runningAgentVersion) + s.NPMProjects["/proj"] = ProjectEntry{ScanOutputHash: "sha256:npm"} + + // Python scanner sees the same path as a different project (e.g. a + // monorepo with both a package.json and a pyproject.toml). The Python + // bucket must treat it as unknown. + changed, unchanged := s.Partition(EcosystemPython, []ScanRecord{freshScan("/proj", "sha256:py")}, false) + if !sortedEq(changed, []string{"/proj"}) || len(unchanged) != 0 { + t.Errorf("ecosystem isolation broken: changed=%v unchanged=%v", changed, unchanged) + } +} + +// Globals ----------------------------------------------------------------- + +func TestPartitionGlobals_NewPMIsChanged(t *testing.T) { + s := New(runningAgentVersion) + changed, unchanged := s.PartitionGlobals(EcosystemNPM, + []GlobalRecord{{PM: "npm", Hash: "sha256:x"}}, false) + if !sortedEq(changed, []string{"npm"}) || len(unchanged) != 0 { + t.Errorf("changed=%v unchanged=%v", changed, unchanged) + } +} + +func TestPartitionGlobals_MatchIsUnchanged(t *testing.T) { + s := New(runningAgentVersion) + s.NPMGlobal["yarn"] = GlobalEntry{ScanOutputHash: "sha256:y"} + changed, unchanged := s.PartitionGlobals(EcosystemNPM, + []GlobalRecord{{PM: "yarn", Hash: "sha256:y"}}, false) + if len(changed) != 0 || !sortedEq(unchanged, []string{"yarn"}) { + t.Errorf("changed=%v unchanged=%v", changed, unchanged) + } +} diff --git a/internal/telemetry/delta.go b/internal/telemetry/delta.go new file mode 100644 index 0000000..af81d03 --- /dev/null +++ b/internal/telemetry/delta.go @@ -0,0 +1,325 @@ +package telemetry + +import ( + "time" + + "github.com/step-security/dev-machine-guard/internal/model" + "github.com/step-security/dev-machine-guard/internal/state" +) + +// deltaSnapshot is the partitioned view of one run's scan output. Built after +// scanning but before payload assembly so the payload can route each project +// to its correct slot (full body for changed, ref for unchanged or removed). +// Carries both the wire-ready slices and the state.ScanRecord lists needed +// for the post-upload commit. +type deltaSnapshot struct { + fullSync bool + + npmRecords []state.ScanRecord + npmChanged []model.NodeScanResult + npmUnchanged []model.UnchangedProjectRef + npmRemoved []model.RemovedProjectRef + npmGlobalRecords []state.GlobalRecord + npmGlobalsChanged []model.NodeScanResult + npmGlobalsUnchanged []model.UnchangedGlobalRef + + pyRecords []state.ScanRecord + pyChanged []model.ProjectInfo + pyUnchanged []model.UnchangedProjectRef + pyRemoved []model.RemovedProjectRef + pyGlobalRecords []state.GlobalRecord + pyGlobalsChanged []model.PythonScanResult + pyGlobalsUnchanged []model.UnchangedGlobalRef +} + +// buildDeltaSnapshot runs the partition / reconcile / pending-removal +// pipeline against the freshly-scanned outputs. It mutates the in-memory +// state (appends to RemovedPendingAck) but does NOT persist — only +// commitDeltaSnapshot, called after a successful upload, calls state.Save. +// A nil scan state means delta is disabled and the caller should use the +// legacy payload shape. +func buildDeltaSnapshot( + s *state.State, fullSync bool, + npmResults []model.NodeScanResult, npmDiscovered []string, + pythonResults []model.ProjectInfo, pythonDiscovered []string, + npmGlobals []model.NodeScanResult, pythonGlobals []model.PythonScanResult, +) *deltaSnapshot { + if s == nil { + return nil + } + snap := &deltaSnapshot{fullSync: fullSync} + + snap.npmRecords = npmRecordsFromResults(npmResults) + snap.pyRecords = pythonRecordsFromResults(pythonResults) + snap.npmGlobalRecords = globalRecordsFromNode(npmGlobals) + snap.pyGlobalRecords = globalRecordsFromPython(pythonGlobals) + + npmChangedPaths, npmUnchangedPaths := s.Partition(state.EcosystemNPM, snap.npmRecords, fullSync) + pyChangedPaths, pyUnchangedPaths := s.Partition(state.EcosystemPython, snap.pyRecords, fullSync) + npmGChanged, npmGUnchanged := s.PartitionGlobals(state.EcosystemNPM, snap.npmGlobalRecords, fullSync) + pyGChanged, pyGUnchanged := s.PartitionGlobals(state.EcosystemPython, snap.pyGlobalRecords, fullSync) + + snap.npmChanged, snap.npmUnchanged = splitNodeProjects(s.NPMProjects, npmResults, snap.npmRecords, npmChangedPaths, npmUnchangedPaths) + snap.pyChanged, snap.pyUnchanged = splitPythonProjects(s.PythonProjects, pythonResults, snap.pyRecords, pyChangedPaths, pyUnchangedPaths) + snap.npmGlobalsChanged, snap.npmGlobalsUnchanged = splitNodeGlobals(s.NPMGlobal, npmGlobals, snap.npmGlobalRecords, npmGChanged, npmGUnchanged) + snap.pyGlobalsChanged, snap.pyGlobalsUnchanged = splitPythonGlobals(s.PythonGlobal, pythonGlobals, snap.pyGlobalRecords, pyGChanged, pyGUnchanged) + + now := time.Now() + _, _, npmRemovedPaths := s.Reconcile(state.EcosystemNPM, npmDiscovered) + _, _, pyRemovedPaths := s.Reconcile(state.EcosystemPython, pythonDiscovered) + s.MarkRemovedPending(state.EcosystemNPM, npmRemovedPaths, now) + s.MarkRemovedPending(state.EcosystemPython, pyRemovedPaths, now) + + snap.npmRemoved = removedRefsFor(s, state.EcosystemNPM, npmDiscovered) + snap.pyRemoved = removedRefsFor(s, state.EcosystemPython, pythonDiscovered) + return snap +} + +func npmRecordsFromResults(results []model.NodeScanResult) []state.ScanRecord { + out := make([]state.ScanRecord, 0, len(results)) + for _, r := range results { + if r.ProjectPath == "" { + continue + } + out = append(out, state.ScanRecordFromBase64( + r.ProjectPath, r.PackageManager, r.PMVersion, r.RawStdoutBase64, r.ExitCode, + )) + } + return out +} + +func pythonRecordsFromResults(results []model.ProjectInfo) []state.ScanRecord { + out := make([]state.ScanRecord, 0, len(results)) + for _, r := range results { + if r.Path == "" { + continue + } + exitCode := 0 + if r.Packages == nil { + exitCode = 1 + } + out = append(out, state.ScanRecordFromValue( + r.Path, r.PackageManager, "", r.Packages, exitCode, + )) + } + return out +} + +func globalRecordsFromNode(results []model.NodeScanResult) []state.GlobalRecord { + out := make([]state.GlobalRecord, 0, len(results)) + for _, r := range results { + if r.PackageManager == "" { + continue + } + hash, _ := state.CanonicalHashJSON(decodeBase64OrRaw(r.RawStdoutBase64)) + out = append(out, state.GlobalRecord{PM: r.PackageManager, Hash: hash, ExitCode: r.ExitCode}) + } + return out +} + +func globalRecordsFromPython(results []model.PythonScanResult) []state.GlobalRecord { + out := make([]state.GlobalRecord, 0, len(results)) + for _, r := range results { + if r.PackageManager == "" { + continue + } + hash, _ := state.CanonicalHashJSON(decodeBase64OrRaw(r.RawStdoutBase64)) + out = append(out, state.GlobalRecord{PM: r.PackageManager, Hash: hash, ExitCode: r.ExitCode}) + } + return out +} + +func splitNodeProjects( + prior map[string]state.ProjectEntry, + results []model.NodeScanResult, + records []state.ScanRecord, + changedPaths, unchangedPaths []string, +) ([]model.NodeScanResult, []model.UnchangedProjectRef) { + changedSet := setFromStrings(changedPaths) + unchangedSet := setFromStrings(unchangedPaths) + hashByPath := hashesByPath(records) + + changed := make([]model.NodeScanResult, 0, len(changedPaths)) + unchanged := make([]model.UnchangedProjectRef, 0, len(unchangedPaths)) + for _, r := range results { + if _, ok := changedSet[r.ProjectPath]; ok { + changed = append(changed, r) + continue + } + if _, ok := unchangedSet[r.ProjectPath]; ok { + unchanged = append(unchanged, model.UnchangedProjectRef{ + Path: r.ProjectPath, + ScanOutputHash: hashByPath[r.ProjectPath], + LastUploadedExecutionID: prior[r.ProjectPath].LastUploadedExecutionID, + }) + } + } + return changed, unchanged +} + +func splitPythonProjects( + prior map[string]state.ProjectEntry, + results []model.ProjectInfo, + records []state.ScanRecord, + changedPaths, unchangedPaths []string, +) ([]model.ProjectInfo, []model.UnchangedProjectRef) { + changedSet := setFromStrings(changedPaths) + unchangedSet := setFromStrings(unchangedPaths) + hashByPath := hashesByPath(records) + + changed := make([]model.ProjectInfo, 0, len(changedPaths)) + unchanged := make([]model.UnchangedProjectRef, 0, len(unchangedPaths)) + for _, r := range results { + if _, ok := changedSet[r.Path]; ok { + changed = append(changed, r) + continue + } + if _, ok := unchangedSet[r.Path]; ok { + unchanged = append(unchanged, model.UnchangedProjectRef{ + Path: r.Path, + ScanOutputHash: hashByPath[r.Path], + LastUploadedExecutionID: prior[r.Path].LastUploadedExecutionID, + }) + } + } + return changed, unchanged +} + +func splitNodeGlobals( + prior map[string]state.GlobalEntry, + results []model.NodeScanResult, + records []state.GlobalRecord, + changedPMs, unchangedPMs []string, +) ([]model.NodeScanResult, []model.UnchangedGlobalRef) { + changedSet := setFromStrings(changedPMs) + unchangedSet := setFromStrings(unchangedPMs) + hashByPM := hashesByPM(records) + + changed := make([]model.NodeScanResult, 0, len(changedPMs)) + unchanged := make([]model.UnchangedGlobalRef, 0, len(unchangedPMs)) + for _, r := range results { + if _, ok := changedSet[r.PackageManager]; ok { + changed = append(changed, r) + continue + } + if _, ok := unchangedSet[r.PackageManager]; ok { + unchanged = append(unchanged, model.UnchangedGlobalRef{ + PackageManager: r.PackageManager, + ScanOutputHash: hashByPM[r.PackageManager], + LastUploadedExecutionID: prior[r.PackageManager].LastUploadedExecutionID, + }) + } + } + return changed, unchanged +} + +func splitPythonGlobals( + prior map[string]state.GlobalEntry, + results []model.PythonScanResult, + records []state.GlobalRecord, + changedPMs, unchangedPMs []string, +) ([]model.PythonScanResult, []model.UnchangedGlobalRef) { + changedSet := setFromStrings(changedPMs) + unchangedSet := setFromStrings(unchangedPMs) + hashByPM := hashesByPM(records) + + changed := make([]model.PythonScanResult, 0, len(changedPMs)) + unchanged := make([]model.UnchangedGlobalRef, 0, len(unchangedPMs)) + for _, r := range results { + if _, ok := changedSet[r.PackageManager]; ok { + changed = append(changed, r) + continue + } + if _, ok := unchangedSet[r.PackageManager]; ok { + unchanged = append(unchanged, model.UnchangedGlobalRef{ + PackageManager: r.PackageManager, + ScanOutputHash: hashByPM[r.PackageManager], + LastUploadedExecutionID: prior[r.PackageManager].LastUploadedExecutionID, + }) + } + } + return changed, unchanged +} + +// removedRefsFor returns refs for every pending-ack entry in the ecosystem, +// EXCLUDING paths that are present in `discovered`. A project that reappears +// before its prior removal was confirmed by the backend should not be +// reported as removed on this run. +func removedRefsFor(s *state.State, ecosystem string, discovered []string) []model.RemovedProjectRef { + discoveredSet := setFromStrings(discovered) + pending := s.PendingRemovalsFor(ecosystem) + out := make([]model.RemovedProjectRef, 0, len(pending)) + for _, p := range pending { + if _, ok := discoveredSet[p.Path]; ok { + continue + } + var lastExec string + switch ecosystem { + case state.EcosystemNPM: + lastExec = s.NPMProjects[p.Path].LastUploadedExecutionID + case state.EcosystemPython: + lastExec = s.PythonProjects[p.Path].LastUploadedExecutionID + } + out = append(out, model.RemovedProjectRef{Path: p.Path, LastUploadedExecutionID: lastExec}) + } + return out +} + +// commitDeltaSnapshot persists the run's outcome after a successful upload: +// upserts state entries for successful scans, acks the removals the wire +// reported (drops them from pending_ack and from the main maps), and +// atomically saves the file. +func commitDeltaSnapshot(s *state.State, snap *deltaSnapshot, path, executionID, agentVersion string) error { + now := time.Now() + s.CommitAfterUpload(now, executionID, agentVersion, + snap.npmRecords, snap.pyRecords, + snap.npmGlobalRecords, snap.pyGlobalRecords, + snap.fullSync, + ) + ackNPM := pendingFromRefs(state.EcosystemNPM, snap.npmRemoved) + ackPy := pendingFromRefs(state.EcosystemPython, snap.pyRemoved) + s.AckRemovals(append(ackNPM, ackPy...)) + s.DropRemovedFromProjects(state.EcosystemNPM, pathsFromRemovedRefs(snap.npmRemoved)) + s.DropRemovedFromProjects(state.EcosystemPython, pathsFromRemovedRefs(snap.pyRemoved)) + return s.Save(path) +} + +func pendingFromRefs(ecosystem string, refs []model.RemovedProjectRef) []state.PendingRemoval { + out := make([]state.PendingRemoval, 0, len(refs)) + for _, r := range refs { + out = append(out, state.PendingRemoval{Path: r.Path, Ecosystem: ecosystem}) + } + return out +} + +func pathsFromRemovedRefs(refs []model.RemovedProjectRef) []string { + out := make([]string, 0, len(refs)) + for _, r := range refs { + out = append(out, r.Path) + } + return out +} + +func setFromStrings(xs []string) map[string]struct{} { + out := make(map[string]struct{}, len(xs)) + for _, x := range xs { + out[x] = struct{}{} + } + return out +} + +func hashesByPath(records []state.ScanRecord) map[string]string { + out := make(map[string]string, len(records)) + for _, r := range records { + out[r.Path] = r.Hash + } + return out +} + +func hashesByPM(records []state.GlobalRecord) map[string]string { + out := make(map[string]string, len(records)) + for _, r := range records { + out[r.PM] = r.Hash + } + return out +} diff --git a/internal/telemetry/payload_delta_test.go b/internal/telemetry/payload_delta_test.go new file mode 100644 index 0000000..d4e93ce --- /dev/null +++ b/internal/telemetry/payload_delta_test.go @@ -0,0 +1,88 @@ +package telemetry + +import ( + "encoding/json" + "strings" + "testing" + + "github.com/step-security/dev-machine-guard/internal/model" +) + +func TestPayload_LegacyShapeOmitsDeltaFields(t *testing.T) { + p := &Payload{ + CustomerID: "c", + DeviceID: "d", + NodeProjects: []model.NodeScanResult{ + {ProjectPath: "/svc", PackageManager: "npm"}, + }, + } + raw, err := json.Marshal(p) + if err != nil { + t.Fatal(err) + } + s := string(raw) + if strings.Contains(s, "payload_schema_version") { + t.Errorf("legacy payload (schema=0) should omit payload_schema_version: %s", s) + } + for _, field := range []string{ + "node_projects_unchanged", "node_projects_removed", "node_globals_unchanged", + "python_projects_unchanged", "python_projects_removed", "python_globals_unchanged", + } { + if strings.Contains(s, field) { + t.Errorf("legacy payload should omit %q field, but it's present", field) + } + } +} + +func TestPayload_DeltaShapeShipsRefSlices(t *testing.T) { + p := &Payload{ + PayloadSchemaVersion: CurrentPayloadSchemaVersion, + CustomerID: "c", + DeviceID: "d", + NodeProjects: []model.NodeScanResult{ + {ProjectPath: "/changed-svc", PackageManager: "npm"}, + }, + NodeProjectsUnchanged: []model.UnchangedProjectRef{ + {Path: "/unchanged-svc", ScanOutputHash: "sha256:abc", LastUploadedExecutionID: "exec-1"}, + }, + NodeProjectsRemoved: []model.RemovedProjectRef{ + {Path: "/removed-svc", LastUploadedExecutionID: "exec-0"}, + }, + NodeGlobalsUnchanged: []model.UnchangedGlobalRef{ + {PackageManager: "npm", ScanOutputHash: "sha256:def"}, + }, + } + raw, err := json.Marshal(p) + if err != nil { + t.Fatal(err) + } + s := string(raw) + for _, want := range []string{ + `"payload_schema_version":1`, + `"path":"/unchanged-svc"`, + `"scan_output_hash":"sha256:abc"`, + `"last_uploaded_execution_id":"exec-1"`, + `"path":"/removed-svc"`, + `"package_manager":"npm"`, + `"scan_output_hash":"sha256:def"`, + } { + if !strings.Contains(s, want) { + t.Errorf("delta payload missing %q\npayload: %s", want, s) + } + } + + // Round-trip: unmarshal back. + var out Payload + if err := json.Unmarshal(raw, &out); err != nil { + t.Fatal(err) + } + if out.PayloadSchemaVersion != CurrentPayloadSchemaVersion { + t.Errorf("schema version lost: %d", out.PayloadSchemaVersion) + } + if len(out.NodeProjectsUnchanged) != 1 || out.NodeProjectsUnchanged[0].Path != "/unchanged-svc" { + t.Errorf("unchanged ref round-trip lost: %+v", out.NodeProjectsUnchanged) + } + if len(out.NodeProjectsRemoved) != 1 || out.NodeProjectsRemoved[0].Path != "/removed-svc" { + t.Errorf("removed ref round-trip lost: %+v", out.NodeProjectsRemoved) + } +} diff --git a/internal/telemetry/scan_state_test.go b/internal/telemetry/scan_state_test.go new file mode 100644 index 0000000..9b77811 --- /dev/null +++ b/internal/telemetry/scan_state_test.go @@ -0,0 +1,307 @@ +package telemetry + +import ( + "encoding/base64" + "path/filepath" + "testing" + "time" + + "github.com/step-security/dev-machine-guard/internal/buildinfo" + "github.com/step-security/dev-machine-guard/internal/model" + "github.com/step-security/dev-machine-guard/internal/state" +) + +var timeFixture = time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC) + +func tempStateFile(t *testing.T) string { + t.Helper() + return filepath.Join(t.TempDir(), "scan-state.json") +} + +func nodeResult(path, pm, stdout string) model.NodeScanResult { + return model.NodeScanResult{ + ProjectPath: path, + PackageManager: pm, + PMVersion: "10.2.0", + RawStdoutBase64: base64.StdEncoding.EncodeToString([]byte(stdout)), + ExitCode: 0, + } +} + +func nodeGlobal(pm, stdout string, exit int) model.NodeScanResult { + return model.NodeScanResult{ + PackageManager: pm, + PMVersion: "10.2.0", + RawStdoutBase64: base64.StdEncoding.EncodeToString([]byte(stdout)), + ExitCode: exit, + } +} + +// runDelta drives buildDeltaSnapshot + commitDeltaSnapshot for the given +// inputs and returns the snapshot (post-mutation, pre-commit) plus the +// reloaded state from disk after commit. +func runDelta( + t *testing.T, s *state.State, statePath, execID string, + npm []model.NodeScanResult, npmDisc []string, + py []model.ProjectInfo, pyDisc []string, + npmG []model.NodeScanResult, pyG []model.PythonScanResult, + fullSync bool, +) (*deltaSnapshot, *state.State) { + t.Helper() + snap := buildDeltaSnapshot(s, fullSync, npm, npmDisc, py, pyDisc, npmG, pyG) + if err := commitDeltaSnapshot(s, snap, statePath, execID, buildinfo.Version); err != nil { + t.Fatalf("commit: %v", err) + } + reloaded, err := state.Load(statePath, buildinfo.Version) + if err != nil { + t.Fatalf("reload: %v", err) + } + return snap, reloaded +} + +func TestDelta_FirstRunPopulatesStateAndShipsAllAsChanged(t *testing.T) { + path := tempStateFile(t) + s := state.New(buildinfo.Version) + npm := []model.NodeScanResult{ + nodeResult("/svc-api", "npm", `{"deps":{"x":"1.0"}}`), + nodeResult("/svc-web", "npm", `{"deps":{"y":"2.0"}}`), + } + + snap, reloaded := runDelta(t, s, path, "exec-1", npm, []string{"/svc-api", "/svc-web"}, nil, nil, nil, nil, false) + + if len(snap.npmChanged) != 2 { + t.Errorf("expected 2 changed, got %d", len(snap.npmChanged)) + } + if len(snap.npmUnchanged) != 0 { + t.Errorf("expected 0 unchanged on first run, got %d", len(snap.npmUnchanged)) + } + if len(reloaded.NPMProjects) != 2 { + t.Errorf("expected 2 npm state entries after commit, got %d", len(reloaded.NPMProjects)) + } + if reloaded.LastSuccessfulExecutionID != "exec-1" { + t.Errorf("execution_id not stamped: %s", reloaded.LastSuccessfulExecutionID) + } +} + +func TestDelta_SecondRunWithSameHashesShipsUnchangedRefs(t *testing.T) { + path := tempStateFile(t) + s := state.New(buildinfo.Version) + npm := []model.NodeScanResult{nodeResult("/svc", "npm", `{"deps":{"x":"1.0"}}`)} + + runDelta(t, s, path, "exec-1", npm, []string{"/svc"}, nil, nil, nil, nil, false) + + s2, _ := state.Load(path, buildinfo.Version) + snap, _ := runDelta(t, s2, path, "exec-2", npm, []string{"/svc"}, nil, nil, nil, nil, false) + + if len(snap.npmChanged) != 0 { + t.Errorf("expected 0 changed bodies on identical re-run, got %d", len(snap.npmChanged)) + } + if len(snap.npmUnchanged) != 1 || snap.npmUnchanged[0].Path != "/svc" { + t.Errorf("expected /svc as unchanged ref, got %+v", snap.npmUnchanged) + } + if snap.npmUnchanged[0].LastUploadedExecutionID != "exec-1" { + t.Errorf("expected ref to carry prior exec id, got %s", snap.npmUnchanged[0].LastUploadedExecutionID) + } + if snap.npmUnchanged[0].ScanOutputHash == "" { + t.Errorf("expected ref to carry hash") + } +} + +func TestDelta_HashDiffShipsAsChanged(t *testing.T) { + path := tempStateFile(t) + s := state.New(buildinfo.Version) + runDelta(t, s, path, "exec-1", + []model.NodeScanResult{nodeResult("/svc", "npm", `{"deps":{"x":"1.0"}}`)}, + []string{"/svc"}, nil, nil, nil, nil, false) + + s2, _ := state.Load(path, buildinfo.Version) + snap, _ := runDelta(t, s2, path, "exec-2", + []model.NodeScanResult{nodeResult("/svc", "npm", `{"deps":{"x":"2.0"}}`)}, + []string{"/svc"}, nil, nil, nil, nil, false) + + if len(snap.npmChanged) != 1 { + t.Errorf("expected /svc in changed (version bump), got %+v", snap.npmChanged) + } + if len(snap.npmUnchanged) != 0 { + t.Errorf("expected 0 unchanged, got %+v", snap.npmUnchanged) + } +} + +func TestDelta_RemovedProjectShipsAsRemovedRef(t *testing.T) { + path := tempStateFile(t) + s := state.New(buildinfo.Version) + runDelta(t, s, path, "exec-1", + []model.NodeScanResult{ + nodeResult("/a", "npm", `{"x":1}`), + nodeResult("/b", "npm", `{"y":2}`), + }, []string{"/a", "/b"}, nil, nil, nil, nil, false) + + s2, _ := state.Load(path, buildinfo.Version) + snap, reloaded := runDelta(t, s2, path, "exec-2", + []model.NodeScanResult{nodeResult("/a", "npm", `{"x":1}`)}, + []string{"/a"}, nil, nil, nil, nil, false) + + if len(snap.npmRemoved) != 1 || snap.npmRemoved[0].Path != "/b" { + t.Errorf("expected /b in removed refs, got %+v", snap.npmRemoved) + } + if snap.npmRemoved[0].LastUploadedExecutionID != "exec-1" { + t.Errorf("expected ref to carry prior exec id, got %s", snap.npmRemoved[0].LastUploadedExecutionID) + } + // After commit, /b is gone from both the main map AND pending_ack. + if _, ok := reloaded.NPMProjects["/b"]; ok { + t.Errorf("/b should be dropped from main map after ack") + } + if len(reloaded.PendingRemovalsFor(state.EcosystemNPM)) != 0 { + t.Errorf("pending_ack should be empty after ack, got %+v", reloaded.PendingRemovalsFor(state.EcosystemNPM)) + } +} + +func TestDelta_CapDroppedProjectIsNotMarkedRemoved(t *testing.T) { + path := tempStateFile(t) + s := state.New(buildinfo.Version) + runDelta(t, s, path, "exec-1", + []model.NodeScanResult{ + nodeResult("/a", "npm", `{"x":1}`), + nodeResult("/b", "npm", `{"y":2}`), + }, []string{"/a", "/b"}, nil, nil, nil, nil, false) + + // Second run: /b is still on disk (in discovered) but didn't get scanned + // (cap dropped it). MUST NOT be marked removed. + s2, _ := state.Load(path, buildinfo.Version) + snap, reloaded := runDelta(t, s2, path, "exec-2", + []model.NodeScanResult{nodeResult("/a", "npm", `{"x":1}`)}, + []string{"/a", "/b"}, nil, nil, nil, nil, false) + + if len(snap.npmRemoved) != 0 { + t.Errorf("cap-dropped project must not appear in removed refs: %+v", snap.npmRemoved) + } + if _, ok := reloaded.NPMProjects["/b"]; !ok { + t.Errorf("cap-dropped project entry should be preserved in state") + } +} + +func TestDelta_ReappearedProjectIsFilteredFromRemovedRefs(t *testing.T) { + path := tempStateFile(t) + s := state.New(buildinfo.Version) + // Run 1: /a + /b → state. + runDelta(t, s, path, "exec-1", + []model.NodeScanResult{ + nodeResult("/a", "npm", `{"x":1}`), + nodeResult("/b", "npm", `{"y":2}`), + }, []string{"/a", "/b"}, nil, nil, nil, nil, false) + + // Run 2: /b gone — but commitDeltaSnapshot's Save fails on next run by + // loading from a fresh path so the pending_ack persists (simulating an + // upload that ack'd /b on disk but a follow-up run that re-discovers it). + s2, _ := state.Load(path, buildinfo.Version) + // Manually leave /b in state's pending_ack (post-MarkRemoved state) so we + // can test the filter on the next call. + s2.MarkRemovedPending(state.EcosystemNPM, []string{"/b"}, timeFixture) + // Now /b reappears with the same content. + snap := buildDeltaSnapshot(s2, false, + []model.NodeScanResult{ + nodeResult("/a", "npm", `{"x":1}`), + nodeResult("/b", "npm", `{"y":2}`), + }, []string{"/a", "/b"}, nil, nil, nil, nil) + + if len(snap.npmRemoved) != 0 { + t.Errorf("re-discovered project must be filtered from removed refs: %+v", snap.npmRemoved) + } +} + +func TestDelta_FailedScanDoesNotOverwriteHash(t *testing.T) { + path := tempStateFile(t) + s := state.New(buildinfo.Version) + runDelta(t, s, path, "exec-1", + []model.NodeScanResult{nodeResult("/svc", "npm", `{"x":1}`)}, + []string{"/svc"}, nil, nil, nil, nil, false) + + good, _ := state.Load(path, buildinfo.Version) + goodHash := good.NPMProjects["/svc"].ScanOutputHash + + s2, _ := state.Load(path, buildinfo.Version) + bad := []model.NodeScanResult{{ + ProjectPath: "/svc", + PackageManager: "npm", + RawStdoutBase64: base64.StdEncoding.EncodeToString([]byte(`{"x":2}`)), + ExitCode: 1, + }} + _, reloaded := runDelta(t, s2, path, "exec-2", bad, []string{"/svc"}, nil, nil, nil, nil, false) + + if reloaded.NPMProjects["/svc"].ScanOutputHash != goodHash { + t.Errorf("failed scan must not overwrite prior hash: got %s want %s", + reloaded.NPMProjects["/svc"].ScanOutputHash, goodHash) + } +} + +func TestDelta_PythonRoundTrip(t *testing.T) { + path := tempStateFile(t) + s := state.New(buildinfo.Version) + py := []model.ProjectInfo{ + {Path: "/proj/.venv", PackageManager: "pip", Packages: []model.PackageDetail{{Name: "django", Version: "5.0"}}}, + } + snap, reloaded := runDelta(t, s, path, "exec-1", nil, nil, py, []string{"/proj/.venv"}, nil, nil, false) + + if len(snap.pyChanged) != 1 { + t.Errorf("expected 1 python changed, got %+v", snap.pyChanged) + } + if len(reloaded.PythonProjects) != 1 { + t.Fatalf("expected 1 python entry, got %d", len(reloaded.PythonProjects)) + } +} + +func TestDelta_GlobalsAreTracked(t *testing.T) { + path := tempStateFile(t) + s := state.New(buildinfo.Version) + globals := []model.NodeScanResult{ + nodeGlobal("npm", `{"dependencies":{"tsc":"5.0"}}`, 0), + nodeGlobal("yarn", `{"dependencies":{"create-react-app":"5.0"}}`, 0), + } + snap, reloaded := runDelta(t, s, path, "exec-1", nil, nil, nil, nil, globals, nil, false) + + if len(snap.npmGlobalsChanged) != 2 { + t.Errorf("first run: expected 2 globals as changed, got %+v", snap.npmGlobalsChanged) + } + if len(reloaded.NPMGlobal) != 2 { + t.Errorf("expected 2 global entries after commit, got %d", len(reloaded.NPMGlobal)) + } + + // Second run with identical globals: ships as unchanged refs, no bodies. + s2, _ := state.Load(path, buildinfo.Version) + snap2, _ := runDelta(t, s2, path, "exec-2", nil, nil, nil, nil, globals, nil, false) + if len(snap2.npmGlobalsChanged) != 0 { + t.Errorf("second run: expected 0 changed globals, got %+v", snap2.npmGlobalsChanged) + } + if len(snap2.npmGlobalsUnchanged) != 2 { + t.Errorf("second run: expected 2 unchanged-ref globals, got %+v", snap2.npmGlobalsUnchanged) + } +} + +func TestDelta_FullSyncForcesAllChanged(t *testing.T) { + path := tempStateFile(t) + s := state.New(buildinfo.Version) + npm := []model.NodeScanResult{nodeResult("/svc", "npm", `{"x":1}`)} + runDelta(t, s, path, "exec-1", npm, []string{"/svc"}, nil, nil, nil, nil, false) + + s2, _ := state.Load(path, buildinfo.Version) + snap, reloaded := runDelta(t, s2, path, "exec-2", npm, []string{"/svc"}, nil, nil, nil, nil, true) + + if len(snap.npmChanged) != 1 { + t.Errorf("full sync should force changed even on identical hashes, got %+v", snap.npmChanged) + } + if len(snap.npmUnchanged) != 0 { + t.Errorf("full sync should produce no unchanged refs, got %+v", snap.npmUnchanged) + } + if reloaded.LastFullSyncAt.IsZero() { + t.Errorf("full-sync timestamp not refreshed") + } +} + +func TestDelta_NilStateReturnsNilSnapshot(t *testing.T) { + snap := buildDeltaSnapshot(nil, false, + []model.NodeScanResult{nodeResult("/svc", "npm", `{"x":1}`)}, + []string{"/svc"}, nil, nil, nil, nil) + if snap != nil { + t.Errorf("nil state should produce nil snapshot, got %+v", snap) + } +} diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index ca87b54..573e67a 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -4,6 +4,7 @@ import ( "bytes" "compress/gzip" "context" + "encoding/base64" "encoding/json" "fmt" "io" @@ -25,7 +26,9 @@ import ( "github.com/step-security/dev-machine-guard/internal/executor" "github.com/step-security/dev-machine-guard/internal/lock" "github.com/step-security/dev-machine-guard/internal/model" + "github.com/step-security/dev-machine-guard/internal/paths" "github.com/step-security/dev-machine-guard/internal/progress" + "github.com/step-security/dev-machine-guard/internal/state" "github.com/step-security/dev-machine-guard/internal/tcc" ) @@ -34,19 +37,27 @@ import ( // can shrink it; production code never mutates it. var s3UploadBackoffUnit = 2 * time.Second +// CurrentPayloadSchemaVersion is bumped when the delta-protocol wire shape +// changes in a way the backend cares about. 0 = legacy agent. +const CurrentPayloadSchemaVersion = 1 + // Payload is the enterprise telemetry JSON structure. type Payload struct { - CustomerID string `json:"customer_id"` - DeviceID string `json:"device_id"` - SerialNumber string `json:"serial_number"` - UserIdentity string `json:"user_identity"` - Hostname string `json:"hostname"` - Platform string `json:"platform"` - OSVersion string `json:"os_version"` - Resources model.MachineResources `json:"resources"` - AgentVersion string `json:"agent_version"` - CollectedAt int64 `json:"collected_at"` - NoUserLoggedIn bool `json:"no_user_logged_in"` + // PayloadSchemaVersion gates the delta-protocol sibling fields below + // (NodeProjectsUnchanged etc.). Zero/absent = legacy snapshot, every + // scanned project ships its full body in NodeProjects/PythonProjects. + PayloadSchemaVersion int `json:"payload_schema_version,omitempty"` + CustomerID string `json:"customer_id"` + DeviceID string `json:"device_id"` + SerialNumber string `json:"serial_number"` + UserIdentity string `json:"user_identity"` + Hostname string `json:"hostname"` + Platform string `json:"platform"` + OSVersion string `json:"os_version"` + Resources model.MachineResources `json:"resources"` + AgentVersion string `json:"agent_version"` + CollectedAt int64 `json:"collected_at"` + NoUserLoggedIn bool `json:"no_user_logged_in"` // InvocationMethod is "install" when the agent ran from an installed // launchd/systemd/schtasks unit, "one_time" for a manual CLI run. @@ -59,27 +70,36 @@ type Payload struct { // streamed via the run-status endpoint during the run. StatusInfo *RunStatusInfo `json:"status_info,omitempty"` - IDEExtensions []model.Extension `json:"ide_extensions"` - IDEInstallations []model.IDE `json:"ide_installations"` - NodePkgManagers []model.PkgManager `json:"node_package_managers"` - NodeGlobalPackages []model.NodeScanResult `json:"node_global_packages"` - NodeProjects []model.NodeScanResult `json:"node_projects"` - BrewPkgManager *model.PkgManager `json:"brew_package_manager,omitempty"` - BrewScans []model.BrewScanResult `json:"brew_scans"` - BrewFormulae []model.BrewPackage `json:"brew_formulae,omitempty"` - BrewCasks []model.BrewPackage `json:"brew_casks,omitempty"` - PythonPkgManagers []model.PkgManager `json:"python_package_managers"` - PythonGlobalPackages []model.PythonScanResult `json:"python_global_packages"` - PythonProjects []model.ProjectInfo `json:"python_projects"` - SystemPackageScans []model.SystemPackageScanResult `json:"system_package_scans"` - AIAgents []model.AITool `json:"ai_agents"` - MCPConfigs []model.MCPConfigEnterprise `json:"mcp_configs"` - NPMRCAudit *model.NPMRCAudit `json:"npmrc_audit,omitempty"` - PipAudit *model.PipAudit `json:"pip_audit,omitempty"` - RuleScan *model.RuleScan `json:"rule_scan,omitempty"` - PnpmAudit *model.PnpmAudit `json:"pnpm_audit,omitempty"` - BunAudit *model.BunAudit `json:"bun_audit,omitempty"` - YarnAudit *model.YarnAudit `json:"yarn_audit,omitempty"` + IDEExtensions []model.Extension `json:"ide_extensions"` + IDEInstallations []model.IDE `json:"ide_installations"` + NodePkgManagers []model.PkgManager `json:"node_package_managers"` + NodeGlobalPackages []model.NodeScanResult `json:"node_global_packages"` + NodeProjects []model.NodeScanResult `json:"node_projects"` + BrewPkgManager *model.PkgManager `json:"brew_package_manager,omitempty"` + BrewScans []model.BrewScanResult `json:"brew_scans"` + BrewFormulae []model.BrewPackage `json:"brew_formulae,omitempty"` + BrewCasks []model.BrewPackage `json:"brew_casks,omitempty"` + PythonPkgManagers []model.PkgManager `json:"python_package_managers"` + PythonGlobalPackages []model.PythonScanResult `json:"python_global_packages"` + PythonProjects []model.ProjectInfo `json:"python_projects"` + // Delta-protocol siblings (PayloadSchemaVersion >= 1). NodeProjects / + // NodeGlobalPackages / PythonProjects / PythonGlobalPackages above carry + // only the `changed` subset; everything else is here. + NodeProjectsUnchanged []model.UnchangedProjectRef `json:"node_projects_unchanged,omitempty"` + NodeProjectsRemoved []model.RemovedProjectRef `json:"node_projects_removed,omitempty"` + NodeGlobalsUnchanged []model.UnchangedGlobalRef `json:"node_globals_unchanged,omitempty"` + PythonProjectsUnchanged []model.UnchangedProjectRef `json:"python_projects_unchanged,omitempty"` + PythonProjectsRemoved []model.RemovedProjectRef `json:"python_projects_removed,omitempty"` + PythonGlobalsUnchanged []model.UnchangedGlobalRef `json:"python_globals_unchanged,omitempty"` + SystemPackageScans []model.SystemPackageScanResult `json:"system_package_scans"` + AIAgents []model.AITool `json:"ai_agents"` + MCPConfigs []model.MCPConfigEnterprise `json:"mcp_configs"` + NPMRCAudit *model.NPMRCAudit `json:"npmrc_audit,omitempty"` + PipAudit *model.PipAudit `json:"pip_audit,omitempty"` + RuleScan *model.RuleScan `json:"rule_scan,omitempty"` + PnpmAudit *model.PnpmAudit `json:"pnpm_audit,omitempty"` + BunAudit *model.BunAudit `json:"bun_audit,omitempty"` + YarnAudit *model.YarnAudit `json:"yarn_audit,omitempty"` ExecutionLogs *ExecutionLogs `json:"execution_logs,omitempty"` PerformanceMetrics *PerformanceMetrics `json:"performance_metrics,omitempty"` @@ -367,6 +387,31 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err } endPhase(phaseCtx, phaseCancel, tracker, log, "device_info") + // Per-device scan state for the delta-upload protocol. Three opt-outs: + // - config.UseLegacyPackageScan = true (persistent, set in config.json) + // - STEPSEC_DISABLE_SCAN_STATE=1 (env, incident response) + // - paths.Home() unresolvable (no place to write the file) + // Any of the three leaves scanState nil and the run behaves as pre-1.13. + var scanState *state.State + var scanStatePath string + var scanStateFullSync bool + scanStateDisabled := config.UseLegacyPackageScan || os.Getenv("STEPSEC_DISABLE_SCAN_STATE") == "1" + if !scanStateDisabled { + scanStatePath = paths.ScanStateFile() + if scanStatePath != "" { + loaded, loadErr := state.Load(scanStatePath, buildinfo.Version) + if loadErr != nil { + log.Debug("scan-state: load fallback (%v) — treating as empty", loadErr) + } + scanState = loaded + scanStateFullSync = scanState.IsFullSyncDue(time.Now(), buildinfo.Version, state.DefaultFullSyncHorizon) + log.Debug("scan-state: loaded from %s (npm=%d python=%d full_sync=%v)", + scanStatePath, len(scanState.NPMProjects), len(scanState.PythonProjects), scanStateFullSync) + } + } else if config.UseLegacyPackageScan { + log.Debug("scan-state: disabled by config.use_legacy_package_scan; falling back to full-snapshot uploads") + } + // Report "started" now that we have a device_id. Fire-and-forget. reportRunStatus(ctx, log, executionID, deviceID, runStatusStarted, "", invocationMethod) @@ -639,6 +684,7 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err var pythonPkgManagers []model.PkgManager var pythonGlobalPkgs []model.PythonScanResult var pythonProjects []model.ProjectInfo + var pythonDiscovered []string if pythonEnabled { phaseCtx, phaseCancel = startPhase(ctx, tracker, "python_scan") @@ -663,7 +709,14 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err log.Progress("Searching for Python projects...") pyProjectDetector := detector.NewPythonProjectDetector(exec).WithSkipper(tccSkipper) - pythonProjects = pyProjectDetector.ListProjects(searchDirs) + var knownPython map[string]time.Time + if scanState != nil && !scanStateFullSync { + knownPython = make(map[string]time.Time, len(scanState.PythonProjects)) + for path, entry := range scanState.PythonProjects { + knownPython[path] = entry.LastVerifiedAt + } + } + pythonProjects, pythonDiscovered = pyProjectDetector.ListProjects(searchDirs, knownPython) log.Progress(" Found %d Python projects", len(pythonProjects)) fmt.Fprintln(os.Stderr) endPhase(phaseCtx, phaseCancel, tracker, log, "python_scan") @@ -746,6 +799,7 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err var pkgManagers []model.PkgManager var globalPkgs []model.NodeScanResult var nodeProjects []model.NodeScanResult + var nodeDiscovered []string var nodeScanMs int64 if npmEnabled { @@ -782,7 +836,14 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err log.Progress("Searching for Node.js projects...") scanStart := time.Now() - nodeProjects = nodeScanner.ScanProjects(phaseCtx, searchDirs) + var knownNPM map[string]time.Time + if scanState != nil && !scanStateFullSync { + knownNPM = make(map[string]time.Time, len(scanState.NPMProjects)) + for path, entry := range scanState.NPMProjects { + knownNPM[path] = entry.LastVerifiedAt + } + } + nodeProjects, nodeDiscovered = nodeScanner.ScanProjects(phaseCtx, searchDirs, knownNPM) nodeScanMs = time.Since(scanStart).Milliseconds() log.Progress(" Found %d Node.js projects", len(nodeProjects)) log.Progress(" Scan duration: %dms", nodeScanMs) @@ -862,19 +923,57 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err // runs after this snapshot and is intentionally not tracked as a phase. finalStatusInfo := tracker.Snapshot() + // Partition this run's scan output against state. When delta is enabled + // (snap != nil), the payload below routes changed bodies to the legacy + // slots and unchanged/removed projects to the new ref slots. + snap := buildDeltaSnapshot( + scanState, scanStateFullSync, + nodeProjects, nodeDiscovered, pythonProjects, pythonDiscovered, + globalPkgs, pythonGlobalPkgs, + ) + payloadNodeProjects := nodeProjects + payloadNodeGlobals := globalPkgs + payloadPythonProjects := pythonProjects + payloadPythonGlobals := pythonGlobalPkgs + schemaVersion := 0 + var npmUnchangedRefs []model.UnchangedProjectRef + var npmRemovedRefs []model.RemovedProjectRef + var npmGlobalsUnchangedRefs []model.UnchangedGlobalRef + var pyUnchangedRefs []model.UnchangedProjectRef + var pyRemovedRefs []model.RemovedProjectRef + var pyGlobalsUnchangedRefs []model.UnchangedGlobalRef + if snap != nil { + schemaVersion = CurrentPayloadSchemaVersion + payloadNodeProjects = snap.npmChanged + payloadNodeGlobals = snap.npmGlobalsChanged + payloadPythonProjects = snap.pyChanged + payloadPythonGlobals = snap.pyGlobalsChanged + npmUnchangedRefs = snap.npmUnchanged + npmRemovedRefs = snap.npmRemoved + npmGlobalsUnchangedRefs = snap.npmGlobalsUnchanged + pyUnchangedRefs = snap.pyUnchanged + pyRemovedRefs = snap.pyRemoved + pyGlobalsUnchangedRefs = snap.pyGlobalsUnchanged + log.Debug("delta payload: npm(changed=%d unchanged=%d removed=%d globals_changed=%d globals_unchanged=%d) python(changed=%d unchanged=%d removed=%d globals_changed=%d globals_unchanged=%d) full_sync=%v", + len(payloadNodeProjects), len(npmUnchangedRefs), len(npmRemovedRefs), len(payloadNodeGlobals), len(npmGlobalsUnchangedRefs), + len(payloadPythonProjects), len(pyUnchangedRefs), len(pyRemovedRefs), len(payloadPythonGlobals), len(pyGlobalsUnchangedRefs), + scanStateFullSync) + } + // Build payload payload := &Payload{ - CustomerID: config.CustomerID, - DeviceID: dev.SerialNumber, - SerialNumber: dev.SerialNumber, - UserIdentity: dev.UserIdentity, - Hostname: dev.Hostname, - Platform: dev.Platform, - OSVersion: dev.OSVersion, - Resources: dev.Resources, - AgentVersion: buildinfo.Version, - CollectedAt: endTime.Unix(), - NoUserLoggedIn: noUserLoggedIn, + PayloadSchemaVersion: schemaVersion, + CustomerID: config.CustomerID, + DeviceID: dev.SerialNumber, + SerialNumber: dev.SerialNumber, + UserIdentity: dev.UserIdentity, + Hostname: dev.Hostname, + Platform: dev.Platform, + OSVersion: dev.OSVersion, + Resources: dev.Resources, + AgentVersion: buildinfo.Version, + CollectedAt: endTime.Unix(), + NoUserLoggedIn: noUserLoggedIn, InvocationMethod: invocationMethod, StatusInfo: &finalStatusInfo, @@ -882,24 +981,31 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err IDEExtensions: extensions, IDEInstallations: ides, NodePkgManagers: pkgManagers, - NodeGlobalPackages: globalPkgs, - NodeProjects: nodeProjects, + NodeGlobalPackages: payloadNodeGlobals, + NodeProjects: payloadNodeProjects, BrewPkgManager: brewPkgMgr, BrewScans: brewScans, BrewFormulae: brewFormulae, BrewCasks: brewCasks, PythonPkgManagers: pythonPkgManagers, - PythonGlobalPackages: pythonGlobalPkgs, - PythonProjects: pythonProjects, - SystemPackageScans: systemPackageScans, - AIAgents: allAI, - MCPConfigs: mcpConfigs, - NPMRCAudit: &npmrcAudit, - PipAudit: &pipAudit, - RuleScan: ruleScan, - PnpmAudit: &pnpmAudit, - BunAudit: &bunAudit, - YarnAudit: &yarnAudit, + PythonGlobalPackages: payloadPythonGlobals, + PythonProjects: payloadPythonProjects, + + NodeProjectsUnchanged: npmUnchangedRefs, + NodeProjectsRemoved: npmRemovedRefs, + NodeGlobalsUnchanged: npmGlobalsUnchangedRefs, + PythonProjectsUnchanged: pyUnchangedRefs, + PythonProjectsRemoved: pyRemovedRefs, + PythonGlobalsUnchanged: pyGlobalsUnchangedRefs, + SystemPackageScans: systemPackageScans, + AIAgents: allAI, + MCPConfigs: mcpConfigs, + NPMRCAudit: &npmrcAudit, + PipAudit: &pipAudit, + RuleScan: ruleScan, + PnpmAudit: &pnpmAudit, + BunAudit: &bunAudit, + YarnAudit: &yarnAudit, ExecutionLogs: &ExecutionLogs{ OutputBase64: execLogsBase64, @@ -933,6 +1039,16 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err return err } log.Progress("telemetry written to %s (upload skipped)", cfg.TelemetryOutFile) + // Treat a successful local dump as a successful upload for the + // purposes of scan-state persistence so dev/stress-test runs + // produce comparable second-run behavior to a real upload. + if snap != nil { + if err := commitDeltaSnapshot(scanState, snap, scanStatePath, executionID, buildinfo.Version); err != nil { + log.Warn("scan-state: save failed (%v) — next run will full-sync", err) + } else { + log.Debug("scan-state: saved %s (telemetry-out mode)", scanStatePath) + } + } return nil } @@ -958,6 +1074,14 @@ func Run(exec executor.Executor, log *progress.Logger, cfg *cli.Config) (err err } endPhase(phaseCtx, phaseCancel, tracker, log, "telemetry_upload") + if snap != nil { + if err := commitDeltaSnapshot(scanState, snap, scanStatePath, executionID, buildinfo.Version); err != nil { + log.Warn("scan-state: save failed (%v) — next run will full-sync", err) + } else { + log.Debug("scan-state: saved %s", scanStatePath) + } + } + fmt.Fprintln(os.Stderr) log.Progress("Telemetry collection completed successfully") tccSkipper.LogHits(log.Debug) @@ -986,6 +1110,16 @@ func writeTelemetryFile(path string, payload *Payload) error { return nil } +// decodeBase64OrRaw returns the bytes decoded from a standard base64 string, +// falling back to the raw string bytes when the input isn't valid base64. +func decodeBase64OrRaw(s string) []byte { + decoded, err := base64.StdEncoding.DecodeString(s) + if err != nil { + return []byte(s) + } + return decoded +} + func brewFormulaeCount(scans []model.BrewScanResult) int { for _, s := range scans { if s.ScanType == "formulae" {