Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 109 additions & 2 deletions internal/external/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log/slog"
"net"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -65,8 +66,14 @@ func (r *Reconciler) Scan(ctx context.Context) ([]Overview, error) {
if err != nil {
return nil, err
}
own := r.ownDeploymentAddrs()
out := make([]Overview, 0, len(services))
for _, svc := range services {
// Skip AIMA's own deployment backends (e.g. native llama.cpp on :8080);
// they are managed deployments, not importable external services.
if _, ok := own[normalizeHostPort(svc.BaseURL)]; ok {
continue
}
overview := OverviewFromScan(svc)
if err := r.store.UpsertExternalService(ctx, RecordFromOverview(overview)); err != nil {
slog.Warn("external service scan: failed to persist service", "base_url", svc.BaseURL, "error", err)
Expand All @@ -78,6 +85,13 @@ func (r *Reconciler) Scan(ctx context.Context) ([]Overview, error) {
for _, svc := range out {
reachable[svc.BaseURL] = struct{}{}
}
// Mark previously-discovered services that have disappeared (e.g. an
// undeployed model's backend) unreachable so they stop being listed.
for _, url := range staleScannedAddrs(existing, reachable, own) {
if err := r.store.SetExternalServiceStatus(ctx, url, "unreachable", "not found in last scan"); err != nil {
slog.Warn("external service scan: failed to mark stale service unreachable", "base_url", url, "error", err)
}
}
if err := r.Restore(ctx, reachable); err != nil {
slog.Warn("external service scan: failed to restore imported services", "error", err)
}
Expand All @@ -91,9 +105,17 @@ func (r *Reconciler) List(ctx context.Context) ([]Overview, error) {
}
out := make([]Overview, 0, len(rows))
for _, row := range rows {
out = append(out, OverviewFromRecord(row))
ov := OverviewFromRecord(row)
// Hide auto-discovered services that have vanished (dead scanned, not
// imported) — e.g. the backend of a model the user just undeployed.
if ov.Source == "scan" && !ov.Imported && strings.EqualFold(ov.Status, "unreachable") {
continue
}
out = append(out, ov)
}
return out, nil
// Hide AIMA's own deployment backends that a prior scan may have recorded
// (e.g. a native llama.cpp deployment on llama.cpp's default port 8080).
return filterOutOwnDeployments(out, r.ownDeploymentAddrs()), nil
}

func (r *Reconciler) Import(ctx context.Context, idOrBaseURL string, models []string) (ImportResult, error) {
Expand Down Expand Up @@ -404,3 +426,88 @@ func sameStringSet(a, b []string) bool {
}
return true
}

// normalizeHostPort reduces a base URL or address to a comparable "host:port",
// dropping scheme/path and folding loopback/any-interface hosts to 127.0.0.1 so
// a scanned endpoint can be matched against an AIMA deployment address.
func normalizeHostPort(addr string) string {
addr = strings.TrimSpace(strings.ToLower(addr))
if addr == "" {
return ""
}
if i := strings.Index(addr, "://"); i >= 0 {
addr = addr[i+3:]
}
if i := strings.IndexByte(addr, '/'); i >= 0 {
addr = addr[:i]
}
host, port, err := net.SplitHostPort(addr)
if err != nil {
return addr
}
switch host {
case "localhost", "::1", "[::1]", "0.0.0.0", "":
host = "127.0.0.1"
}
return host + ":" + port
}

// ownDeploymentAddrs returns the set of host:port addresses backing AIMA's own
// (non-external) proxy deployments. The external scanner probes llama.cpp's
// default port 8080, which an AIMA native deployment also binds, so these must
// be excluded to avoid surfacing AIMA's own backend as an importable service.
func (r *Reconciler) ownDeploymentAddrs() map[string]struct{} {
own := make(map[string]struct{})
if r.proxy == nil {
return own
}
for _, b := range r.proxy.ListBackends() {
if b == nil || b.External {
continue
}
if a := normalizeHostPort(b.Address); a != "" {
own[a] = struct{}{}
}
}
return own
}

// filterOutOwnDeployments drops services whose address matches an AIMA-owned
// deployment backend.
func filterOutOwnDeployments(services []Overview, own map[string]struct{}) []Overview {
if len(own) == 0 {
return services
}
out := make([]Overview, 0, len(services))
for _, svc := range services {
if _, ok := own[normalizeHostPort(svc.BaseURL)]; ok {
continue
}
out = append(out, svc)
}
return out
}

// staleScannedAddrs returns base URLs of auto-discovered (scanned, non-imported)
// services that were not seen in the latest scan and are not AIMA's own
// deployments — i.e. services that have disappeared (e.g. a model that was
// undeployed) and should be marked unreachable so they drop out of the list.
func staleScannedAddrs(existing []*state.ExternalService, reachable, own map[string]struct{}) []string {
var stale []string
for _, ex := range existing {
if ex == nil || ex.Imported || ex.Source != "scan" {
continue
}
if strings.EqualFold(ex.Status, "unreachable") {
continue
}
if _, ok := reachable[ex.BaseURL]; ok {
continue
}
if _, ok := own[normalizeHostPort(ex.BaseURL)]; ok {
continue
}
stale = append(stale, ex.BaseURL)
}
return stale
}
132 changes: 132 additions & 0 deletions internal/external/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,135 @@ func TestRestoreKeepsImportedSubsetOnRestore(t *testing.T) {
t.Fatal("new-model backend should be restored")
}
}

func TestNormalizeHostPort(t *testing.T) {
cases := []struct {
in string
want string
}{
{"127.0.0.1:8080", "127.0.0.1:8080"},
{"http://127.0.0.1:8080", "127.0.0.1:8080"},
{"http://127.0.0.1:8080/v1", "127.0.0.1:8080"},
{"localhost:8080", "127.0.0.1:8080"},
{"http://localhost:8080", "127.0.0.1:8080"},
{"0.0.0.0:8080", "127.0.0.1:8080"},
{"10.42.0.8:8000", "10.42.0.8:8000"},
}
for _, c := range cases {
if got := normalizeHostPort(c.in); got != c.want {
t.Errorf("normalizeHostPort(%q) = %q, want %q", c.in, got, c.want)
}
}
}

// AIMA's own native deployment binds llama.cpp's default port 8080, which the
// external scanner also probes. List must not surface AIMA's own deployment
// backend as an importable external service; genuine external services stay.
func TestListExcludesOwnDeploymentServices(t *testing.T) {
ctx := context.Background()
db, err := state.Open(ctx, ":memory:")
if err != nil {
t.Fatalf("Open(:memory:): %v", err)
}
t.Cleanup(func() { db.Close() })

for _, svc := range []*state.ExternalService{
{ID: "own-8080", BaseURL: "http://127.0.0.1:8080", Kind: "openai", Status: "reachable", Source: "scan", ModelsJSON: `["Qwen3.6-27B-Q4_K_M.gguf"]`},
{ID: "real-ollama", BaseURL: "http://127.0.0.1:11434", Kind: "ollama", Status: "reachable", Source: "scan", ModelsJSON: `["llama3"]`},
} {
if err := db.UpsertExternalService(ctx, svc); err != nil {
t.Fatalf("UpsertExternalService: %v", err)
}
}

proxyServer := proxy.NewServer()
// AIMA's own native deployment backend (External defaults to false).
proxyServer.RegisterBackend("Qwen3.6-27B-Q4_K_M", &proxy.Backend{
ModelName: "Qwen3.6-27B-Q4_K_M",
Address: "127.0.0.1:8080",
Ready: true,
})

services, err := NewReconciler(db, proxyServer).List(ctx)
if err != nil {
t.Fatalf("List: %v", err)
}
for _, svc := range services {
if svc.BaseURL == "http://127.0.0.1:8080" {
t.Errorf("List returned AIMA's own deployment as external service: %s", svc.BaseURL)
}
}
var sawOllama bool
for _, svc := range services {
if svc.BaseURL == "http://127.0.0.1:11434" {
sawOllama = true
}
}
if !sawOllama {
t.Errorf("genuine external service (ollama) was wrongly excluded; got %d services", len(services))
}
}

func TestStaleScannedAddrs(t *testing.T) {
existing := []*state.ExternalService{
{BaseURL: "http://127.0.0.1:8080", Source: "scan", Status: "reachable"}, // vanished -> stale
{BaseURL: "http://127.0.0.1:11434", Source: "scan", Status: "reachable"}, // still reachable
{BaseURL: "http://127.0.0.1:9000", Source: "scan", Status: "reachable", Imported: true}, // imported -> keep
{BaseURL: "http://127.0.0.1:5000", Source: "scan", Status: "unreachable"}, // already marked -> skip
}
reachable := map[string]struct{}{"http://127.0.0.1:11434": {}}
got := staleScannedAddrs(existing, reachable, map[string]struct{}{})
if len(got) != 1 || got[0] != "http://127.0.0.1:8080" {
t.Fatalf("got %v, want [http://127.0.0.1:8080]", got)
}
}

func TestStaleScannedAddrsSkipsOwnDeployment(t *testing.T) {
existing := []*state.ExternalService{{BaseURL: "http://127.0.0.1:8080", Source: "scan", Status: "reachable"}}
own := map[string]struct{}{"127.0.0.1:8080": {}}
if got := staleScannedAddrs(existing, map[string]struct{}{}, own); len(got) != 0 {
t.Errorf("own deployment must not be marked stale, got %v", got)
}
}

// A scanned service that has died (status unreachable) must drop out of List,
// while a reachable scanned service and an imported (even if offline) one stay.
func TestListHidesVanishedScannedService(t *testing.T) {
ctx := context.Background()
db, err := state.Open(ctx, ":memory:")
if err != nil {
t.Fatalf("Open: %v", err)
}
t.Cleanup(func() { db.Close() })

for _, svc := range []*state.ExternalService{
{ID: "dead", BaseURL: "http://127.0.0.1:8080", Kind: "openai", Status: "unreachable", Source: "scan", ModelsJSON: `["x"]`},
{ID: "live", BaseURL: "http://127.0.0.1:11434", Kind: "ollama", Status: "reachable", Source: "scan", ModelsJSON: `["y"]`},
{ID: "imp", BaseURL: "http://127.0.0.1:9000", Kind: "openai", Status: "unreachable", Source: "scan", ModelsJSON: `["z"]`},
} {
if err := db.UpsertExternalService(ctx, svc); err != nil {
t.Fatalf("Upsert: %v", err)
}
}
if err := db.SetExternalServiceImportedModels(ctx, "http://127.0.0.1:9000", true, []string{"z"}); err != nil {
t.Fatalf("import: %v", err)
}

services, err := NewReconciler(db, proxy.NewServer()).List(ctx)
if err != nil {
t.Fatalf("List: %v", err)
}
seen := map[string]bool{}
for _, s := range services {
seen[s.BaseURL] = true
}
if seen["http://127.0.0.1:8080"] {
t.Error("vanished scanned service :8080 should be hidden")
}
if !seen["http://127.0.0.1:11434"] {
t.Error("reachable scanned service :11434 should be shown")
}
if !seen["http://127.0.0.1:9000"] {
t.Error("imported service :9000 should be shown even when unreachable")
}
}
Loading