Skip to content
20 changes: 13 additions & 7 deletions core/application/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,20 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB) (*Distribut
DB: authDB,
})

// Create ReplicaReconciler for auto-scaling model replicas
// Create ReplicaReconciler for auto-scaling model replicas. Adapter +
// RegistrationToken feed the state-reconciliation passes: pending op
// drain uses the adapter, and model health probes use the token to auth
// against workers' gRPC HealthCheck.
reconciler := nodes.NewReplicaReconciler(nodes.ReplicaReconcilerOptions{
Registry: registry,
Scheduler: router,
Unloader: remoteUnloader,
DB: authDB,
Interval: 30 * time.Second,
ScaleDownDelay: 5 * time.Minute,
Registry: registry,
Scheduler: router,
Unloader: remoteUnloader,
Adapter: remoteUnloader,
RegistrationToken: cfg.Distributed.RegistrationToken,
DB: authDB,
Interval: 30 * time.Second,
ScaleDownDelay: 5 * time.Minute,
ProbeStaleAfter: 2 * time.Minute,
})

// Create ModelRouterAdapter to wire into ModelLoader
Expand Down
7 changes: 6 additions & 1 deletion core/application/startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,12 @@ func New(opts ...config.AppOption) (*Application, error) {
// In distributed mode, uses PostgreSQL advisory lock so only one frontend
// instance runs periodic checks (avoids duplicate upgrades across replicas).
if len(options.BackendGalleries) > 0 {
uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB())
// Pass a lazy getter for the backend manager so the checker always
// uses the active one — DistributedBackendManager is swapped in above
// and asks workers for their installed backends, which is what
// upgrade detection needs in distributed mode.
bmFn := func() galleryop.BackendManager { return application.GalleryService().BackendManager() }
uc := NewUpgradeChecker(options, application.ModelLoader(), application.distributedDB(), bmFn)
application.upgradeChecker = uc
go uc.Run(options.Context)
}
Expand Down
53 changes: 39 additions & 14 deletions core/application/upgrade_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/advisorylock"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
Expand All @@ -26,6 +27,12 @@ type UpgradeChecker struct {
galleries []config.Gallery
systemState *system.SystemState
db *gorm.DB // non-nil in distributed mode
// backendManagerFn lazily returns the current backend manager (may be
// swapped from Local to Distributed after startup). Pulled through each
// check so the UpgradeChecker uses whichever is active. In distributed
// mode this ensures CheckUpgrades asks workers instead of the (empty)
// frontend filesystem — fixing the bug where upgrades never surfaced.
backendManagerFn func() galleryop.BackendManager

checkInterval time.Duration
stop chan struct{}
Expand All @@ -40,18 +47,22 @@ type UpgradeChecker struct {
// NewUpgradeChecker creates a new UpgradeChecker service.
// Pass db=nil for standalone mode, or a *gorm.DB for distributed mode
// (uses advisory locks so only one instance runs periodic checks).
func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB) *UpgradeChecker {
// backendManagerFn is optional; when set, CheckUpgrades is routed through
// the active backend manager — required in distributed mode so the check
// aggregates from workers rather than the empty frontend filesystem.
func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoader, db *gorm.DB, backendManagerFn func() galleryop.BackendManager) *UpgradeChecker {
return &UpgradeChecker{
appConfig: appConfig,
modelLoader: ml,
galleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
db: db,
checkInterval: 6 * time.Hour,
stop: make(chan struct{}),
done: make(chan struct{}),
triggerCh: make(chan struct{}, 1),
lastUpgrades: make(map[string]gallery.UpgradeInfo),
appConfig: appConfig,
modelLoader: ml,
galleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
db: db,
backendManagerFn: backendManagerFn,
checkInterval: 6 * time.Hour,
stop: make(chan struct{}),
done: make(chan struct{}),
triggerCh: make(chan struct{}, 1),
lastUpgrades: make(map[string]gallery.UpgradeInfo),
}
}

Expand All @@ -64,13 +75,16 @@ func NewUpgradeChecker(appConfig *config.ApplicationConfig, ml *model.ModelLoade
func (uc *UpgradeChecker) Run(ctx context.Context) {
defer close(uc.done)

// Initial delay: don't slow down startup
// Initial delay: don't slow down startup. Short enough that operators
// don't stare at an empty upgrade banner for long; long enough that
// workers have registered and reported their installed backends.
initialDelay := 10 * time.Second

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 10 s initial delay is likely too short for a distributed cluster. Worker registration, NATS subscription setup, and the initial ListBackends fan-out typically take longer than 10 s on any real cluster. Running CheckUpgrades before workers are visible means the upgrade banner silently shows nothing until the next scheduled check (6 h later). The old 30 s value was closer to the right order of magnitude.

Suggested change
initialDelay := 10 * time.Second
initialDelay := 30 * time.Second

select {
case <-ctx.Done():
return
case <-uc.stop:
return
case <-time.After(30 * time.Second):
case <-time.After(initialDelay):
}

// First check always runs locally (to warm the cache on this instance)
Expand Down Expand Up @@ -144,7 +158,18 @@ func (uc *UpgradeChecker) GetAvailableUpgrades() map[string]gallery.UpgradeInfo
}

func (uc *UpgradeChecker) runCheck(ctx context.Context) {
upgrades, err := gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState)
var (
upgrades map[string]gallery.UpgradeInfo
err error
)
if uc.backendManagerFn != nil {
if bm := uc.backendManagerFn(); bm != nil {
upgrades, err = bm.CheckUpgrades(ctx)
}
}
if upgrades == nil && err == nil {
upgrades, err = gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState)
}

uc.mu.Lock()
uc.lastCheckTime = time.Now()
Expand Down
3 changes: 3 additions & 0 deletions core/cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,9 @@ func (s *backendSupervisor) subscribeLifecycleEvents() {
if b.Metadata != nil {
info.InstalledAt = b.Metadata.InstalledAt
info.GalleryURL = b.Metadata.GalleryURL
info.Version = b.Metadata.Version
info.URI = b.Metadata.URI
info.Digest = b.Metadata.Digest
}
infos = append(infos, info)
}
Expand Down
17 changes: 17 additions & 0 deletions core/gallery/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,23 @@ type SystemBackend struct {
Metadata *BackendMetadata
UpgradeAvailable bool `json:"upgrade_available,omitempty"`
AvailableVersion string `json:"available_version,omitempty"`
// Nodes holds per-node attribution in distributed mode. Empty in single-node.
// Each entry describes a node that has this backend installed, with the
// version/digest it reports. Lets the UI surface drift and per-node status.
Nodes []NodeBackendRef `json:"nodes,omitempty"`
}

// NodeBackendRef describes one node's view of an installed backend. Used both
// for per-node attribution in the UI and for drift detection during upgrade
// checks (a cluster with mismatched versions/digests is flagged upgradeable).
type NodeBackendRef struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
NodeStatus string `json:"node_status"` // healthy | unhealthy | offline | draining | pending
Version string `json:"version,omitempty"`
Digest string `json:"digest,omitempty"`
URI string `json:"uri,omitempty"`

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Medium

Exposing raw backend URI values in the JSON response can leak internal filesystem paths or registry locations, so omit this field from the API model or redact it before serialization.

Suggested change
URI string `json:"uri,omitempty"`
URI string `json:"-"`
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert go developer with deep knowledge of security, performance, and best practices.

### Context

File: core/gallery/backends.go
Lines: 412-412
Issue Type: security-medium
Severity: medium

Issue Description:
Exposing raw backend URI values in the JSON response can leak internal filesystem paths or registry locations, so omit this field from the API model or redact it before serialization.

Current Code:
	URI         string `json:"uri,omitempty"`

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow go best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Medium

URI is added to per-node API output and metadata without any redaction, so registry credentials or internal endpoints embedded in backend URIs should be stripped before serialization.

Suggested fix
	// Store only a sanitized URI here to avoid exposing credentials or internal-only endpoints.
	URI         string `json:"uri,omitempty"`
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert go developer with deep knowledge of security, performance, and best practices.

### Context

File: core/gallery/backends.go
Lines: 412-412
Issue Type: security-medium
Severity: medium

Issue Description:
URI is added to per-node API output and metadata without any redaction, so registry credentials or internal endpoints embedded in backend URIs should be stripped before serialization.

Current Code:
	URI         string `json:"uri,omitempty"`

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow go best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

InstalledAt string `json:"installed_at,omitempty"`
}

type SystemBackends map[string]SystemBackend
Expand Down
132 changes: 108 additions & 24 deletions core/gallery/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,43 @@ type UpgradeInfo struct {
AvailableVersion string `json:"available_version"`
InstalledDigest string `json:"installed_digest,omitempty"`
AvailableDigest string `json:"available_digest,omitempty"`
// NodeDrift lists nodes whose installed version or digest differs from
// the cluster majority. Non-empty means the cluster has diverged and an
// upgrade will realign it. Empty in single-node mode.
NodeDrift []NodeDriftInfo `json:"node_drift,omitempty"`
}

// CheckBackendUpgrades compares installed backends against gallery entries
// and returns a map of backend names to UpgradeInfo for those that have
// newer versions or different OCI digests available.
// NodeDriftInfo describes one node that disagrees with the cluster majority
// on which version/digest of a backend is installed.
type NodeDriftInfo struct {
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Version string `json:"version,omitempty"`
Digest string `json:"digest,omitempty"`
}
Comment on lines +26 to +39

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security Medium

NodeDrift exposes internal node IDs, names, versions, and digests to any caller of this API, so redact these fields or gate them behind an authorization check before serializing.

Suggested fix
	// NodeDrift lists nodes whose installed version or digest differs from
	// the cluster majority. Populate this only for privileged callers.
	NodeDrift []NodeDriftInfo `json:"node_drift,omitempty"`
}

// NodeDriftInfo describes one node that disagrees with the cluster majority
// on which version/digest of a backend is installed.
type NodeDriftInfo struct {
	NodeName string `json:"node_name,omitempty"`
}
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert go developer with deep knowledge of security, performance, and best practices.

### Context

File: core/gallery/upgrade.go
Lines: 26-39
Issue Type: security-medium
Severity: medium

Issue Description:
NodeDrift exposes internal node IDs, names, versions, and digests to any caller of this API, so redact these fields or gate them behind an authorization check before serializing.

Current Code:
	// NodeDrift lists nodes whose installed version or digest differs from
	// the cluster majority. Non-empty means the cluster has diverged and an
	// upgrade will realign it. Empty in single-node mode.
	NodeDrift []NodeDriftInfo `json:"node_drift,omitempty"`
}

// NodeDriftInfo describes one node that disagrees with the cluster majority
// on which version/digest of a backend is installed.
type NodeDriftInfo struct {
	NodeID   string `json:"node_id"`
	NodeName string `json:"node_name"`
	Version  string `json:"version,omitempty"`
	Digest   string `json:"digest,omitempty"`
}

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow go best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira


// CheckBackendUpgrades is the single-node entrypoint. Distributed callers use
// CheckUpgradesAgainst directly with their aggregated SystemBackends.
func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState) (map[string]UpgradeInfo, error) {
galleryBackends, err := AvailableBackends(galleries, systemState)
installed, err := ListSystemBackends(systemState)
if err != nil {
return nil, fmt.Errorf("failed to list available backends: %w", err)
return nil, fmt.Errorf("failed to list installed backends: %w", err)
}
return CheckUpgradesAgainst(ctx, galleries, systemState, installed)
}

installedBackends, err := ListSystemBackends(systemState)
// CheckUpgradesAgainst compares a caller-supplied SystemBackends set against
// the gallery. Fixes the distributed-mode bug where the old code passed the
// frontend's (empty) local filesystem through ListSystemBackends and so never
// surfaced any upgrades.
//
// Cluster drift policy: if a backend's per-node versions/digests disagree, the
// row is flagged upgradeable regardless of whether any node matches the gallery
// — next Upgrade All realigns the cluster. NodeDrift lists the outliers.
func CheckUpgradesAgainst(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState, installedBackends SystemBackends) (map[string]UpgradeInfo, error) {
galleryBackends, err := AvailableBackends(galleries, systemState)
Comment on lines +59 to +60

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional High

The new ctx parameter is ignored, so upgrade checks can block indefinitely on gallery resolution or remote digest fetches; thread the context into the called functions or remove it until cancellation is supported.

Suggested fix
func CheckUpgradesAgainst(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState, installedBackends SystemBackends) (map[string]UpgradeInfo, error) {
	_ = ctx
	galleryBackends, err := AvailableBackends(galleries, systemState)
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert go developer with deep knowledge of security, performance, and best practices.

### Context

File: core/gallery/upgrade.go
Lines: 59-60
Issue Type: functional-high
Severity: high

Issue Description:
The new ctx parameter is ignored, so upgrade checks can block indefinitely on gallery resolution or remote digest fetches; thread the context into the called functions or remove it until cancellation is supported.

Current Code:
func CheckUpgradesAgainst(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState, installedBackends SystemBackends) (map[string]UpgradeInfo, error) {
	galleryBackends, err := AvailableBackends(galleries, systemState)

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow go best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue Jira

if err != nil {
return nil, fmt.Errorf("failed to list installed backends: %w", err)
return nil, fmt.Errorf("failed to list available backends: %w", err)
}

result := make(map[string]UpgradeInfo)
Expand All @@ -57,56 +80,117 @@ func CheckBackendUpgrades(ctx context.Context, galleries []config.Gallery, syste
}

installedVersion := installed.Metadata.Version
installedDigest := installed.Metadata.Digest
galleryVersion := galleryEntry.Version

// If both sides have versions, compare them
// Detect cluster drift: does every node report the same version+digest?
// In single-node mode this stays empty (Nodes is nil).
majority, drift := summarizeNodeDrift(installed.Nodes)
if majority.version != "" {
installedVersion = majority.version
}
if majority.digest != "" {
installedDigest = majority.digest
}

makeInfo := func(info UpgradeInfo) UpgradeInfo {
info.NodeDrift = drift
return info
}

// If versions are available on both sides, they're the source of truth.
if galleryVersion != "" && installedVersion != "" {
if galleryVersion != installedVersion {
result[installed.Metadata.Name] = UpgradeInfo{
if galleryVersion != installedVersion || len(drift) > 0 {
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
BackendName: installed.Metadata.Name,
InstalledVersion: installedVersion,
AvailableVersion: galleryVersion,
}
})
}
// Versions match — no upgrade needed
continue
}

// Gallery has a version but installed doesn't — this happens for backends
// installed before version tracking was added. Flag as upgradeable so
// users can re-install to pick up version metadata.
// Gallery has a version but installed doesn't — backends installed before
// version tracking was added. Flag as upgradeable to pick up metadata.
if galleryVersion != "" && installedVersion == "" {
result[installed.Metadata.Name] = UpgradeInfo{
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
BackendName: installed.Metadata.Name,
InstalledVersion: "",
AvailableVersion: galleryVersion,
}
})
continue
}

// Fall back to OCI digest comparison when versions are unavailable
// Fall back to OCI digest comparison when versions are unavailable.
if downloader.URI(galleryEntry.URI).LooksLikeOCI() {
remoteDigest, err := oci.GetImageDigest(galleryEntry.URI, "", nil, nil)
Comment on lines 125 to 126

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robustness Medium

The ctx parameter is ignored during remote digest checks, so pass it into the OCI lookup or check ctx.Done before each network call.

Suggested fix
		if downloader.URI(galleryEntry.URI).LooksLikeOCI() {
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			default:
			}
			remoteDigest, err := oci.GetImageDigest(galleryEntry.URI, "", nil, nil)
Prompt for AI assistance

Copy the prompt below and paste it into ChatGPT, Claude, or any LLM:

You are an expert go developer with deep knowledge of security, performance, and best practices.

### Context

File: core/gallery/upgrade.go
Lines: 125-126
Issue Type: robustness-medium
Severity: medium

Issue Description:
The ctx parameter is ignored during remote digest checks, so pass it into the OCI lookup or check ctx.Done before each network call.

Current Code:
		if downloader.URI(galleryEntry.URI).LooksLikeOCI() {
			remoteDigest, err := oci.GetImageDigest(galleryEntry.URI, "", nil, nil)

---

### Instructions

1. Fix the issue described above
2. Maintain the exact indentation and code style from the original
3. Follow go best practices and language-specific idioms
4. Ensure the fix addresses the root cause, not just the symptoms
5. Add brief inline comments explaining the fix if needed

### Constraints

- Do not change functionality beyond fixing the identified issue
- Preserve existing variable names and function signatures unless they are part of the problem
- Ensure the fix is production-ready

---


Like Dislike Create Issue

if err != nil {
xlog.Warn("Failed to get remote OCI digest for upgrade check", "backend", installed.Metadata.Name, "error", err)
continue
}
// If we have a stored digest, compare; otherwise any remote digest
// means we can't confirm we're up to date — flag as upgradeable
if installed.Metadata.Digest == "" || remoteDigest != installed.Metadata.Digest {
result[installed.Metadata.Name] = UpgradeInfo{
// means we can't confirm we're up to date — flag as upgradeable.
if installedDigest == "" || remoteDigest != installedDigest || len(drift) > 0 {
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
BackendName: installed.Metadata.Name,
InstalledDigest: installed.Metadata.Digest,
InstalledDigest: installedDigest,
AvailableDigest: remoteDigest,
}
})
}
} else if len(drift) > 0 {
// No version/digest path but nodes disagree — still worth flagging.
result[installed.Metadata.Name] = makeInfo(UpgradeInfo{
BackendName: installed.Metadata.Name,
InstalledVersion: installedVersion,
InstalledDigest: installedDigest,
})
}
// No version info and non-OCI URI — cannot determine, skip
}

return result, nil
}

// summarizeNodeDrift collapses per-node version/digest tuples to a majority
// pair and returns the outliers. In single-node mode (empty nodes slice) this
// returns zero values and a nil drift list.
func summarizeNodeDrift(nodes []NodeBackendRef) (majority struct{ version, digest string }, drift []NodeDriftInfo) {
if len(nodes) == 0 {
return majority, nil
}

type key struct{ version, digest string }
counts := map[key]int{}
var topKey key
var topCount int
for _, n := range nodes {
k := key{n.Version, n.Digest}
counts[k]++
if counts[k] > topCount {
topCount = counts[k]
topKey = k
}
}

majority.version = topKey.version
majority.digest = topKey.digest

if len(counts) == 1 {
return majority, nil // unanimous — no drift
}
for _, n := range nodes {
if n.Version == majority.version && n.Digest == majority.digest {
continue
}
drift = append(drift, NodeDriftInfo{
NodeID: n.NodeID,
NodeName: n.NodeName,
Version: n.Version,
Digest: n.Digest,
})
}
return majority, drift
}

// UpgradeBackend upgrades a single backend to the latest gallery version using
// an atomic swap with backup-based rollback on failure.
func UpgradeBackend(ctx context.Context, systemState *system.SystemState, modelLoader *model.ModelLoader, galleries []config.Gallery, backendName string, downloadStatus func(string, string, string, float64)) error {
Expand Down
Loading