Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/app/api/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ async def test_connection(
raise

conn = await session.get(DbConnection, db_connection_uuid)
if conn is None:
raise HTTPException(status_code=404, detail="connection not found")
dsn = decrypt_text(conn.dsn_ciphertext, conn.dsn_nonce)
try:
version = await probe_database(dsn)
Expand Down
25 changes: 25 additions & 0 deletions backend/app/api/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from app.permissions import require_project_member
from app.schemas import (
InferredRelationshipOut,
SnapshotCreateIn,
SnapshotDetailOut,
SnapshotDiffOut,
Expand All @@ -26,6 +27,7 @@
from app.ddl.export import snapshot_json_to_sql
from app.ddl.migration import snapshot_diff_to_migration_sql
from app.diff.schema_diff import diff_snapshots
from app.spec.relationship_inference import infer_relationships
from app.jobs.valkey_queue import enqueue_job_signal
from app.spec.llm import (
LlmConfigurationError,
Expand Down Expand Up @@ -207,6 +209,29 @@ async def export_snapshot_sql(
return snapshot_json_to_sql(data.snapshot_json, target_dialect=dialect)


@router.get(
"/{schema_snapshot_uuid}/inferred-relationships",
response_model=list[InferredRelationshipOut],
)
async def inferred_relationships(
schema_snapshot_uuid: uuid.UUID,
user: CurrentUser = Depends(get_current_user),
session: AsyncSession = Depends(get_read_session),
) -> list[InferredRelationshipOut]:
"""Suggest implicit (undeclared) foreign keys inferred from naming.

Useful for reverse-engineering databases that never declared their FKs.
Returns an empty list for missing/unauthorized snapshots (uniform response).
"""
snap = await _get_authorized_snapshot(session, schema_snapshot_uuid, user)
if snap is None:
return []
data = await session.get(SchemaSnapshotData, schema_snapshot_uuid)
if data is None:
return []
return [InferredRelationshipOut(**rel) for rel in infer_relationships(data.snapshot_json)]


@router.get("/{schema_snapshot_uuid}/migration.sql", response_class=PlainTextResponse)
async def export_migration_sql(
schema_snapshot_uuid: uuid.UUID,
Expand Down
13 changes: 13 additions & 0 deletions backend/app/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,19 @@ class SnapshotDetailOut(BaseModel):
snapshot_json: dict | None


class InferredRelationshipOut(BaseModel):
"""An implicit (undeclared) foreign-key relationship inferred from names."""

child_schema: str
child_table: str
child_column: str
parent_schema: str
parent_table: str
parent_column: str
confidence: str
reason: str


class SnapshotDiffOut(BaseModel):
"""Structured diff between two schema snapshots.

Expand Down
145 changes: 145 additions & 0 deletions backend/app/spec/relationship_inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""Infer *implicit* foreign-key relationships from a schema snapshot.

Many real databases -- especially legacy or analytics schemas -- never declare
their foreign keys. Reverse-engineering those relationships is the core reason
to reach for a schema-intelligence tool over a generic ERD drawer.

The heuristic is deliberately high-precision (favouring few false positives):
a column named ``<X>_id`` is a likely FK to a table named ``X`` (or its simple
plural) that has a primary key, when the column is not *already* a declared FK.
Confidence is ``high`` when the column type matches the referenced key's type,
otherwise ``medium``.

Pure and dialect-agnostic (reads the common snapshot JSON shape).
"""

from __future__ import annotations

import re
from typing import Any


def _norm_type(data_type: object) -> str:
"""Normalize a SQL type for comparison (drop length/precision modifiers)."""
text = str(data_type or "").strip().lower()
text = re.sub(r"\(.*?\)", "", text) # varchar(100) -> varchar
return re.sub(r"\s+", " ", text).strip()


def _candidate_target_names(base: str) -> set[str]:
base = base.lower()
names = {base, base + "s", base + "es"}
if base.endswith("y"):
names.add(base[:-1] + "ies")
return names


def infer_relationships(snapshot: dict[str, Any] | None) -> list[dict[str, Any]]:
"""Return inferred (undeclared) foreign-key relationships, sorted stably."""
snapshot = snapshot or {}
relations = snapshot.get("relations") or []
columns = snapshot.get("columns") or []
pk_columns = snapshot.get("pk_columns") or []
fk_edges = snapshot.get("fk_edges") or []

rel_by_oid: dict[Any, dict[str, Any]] = {r.get("relation_oid"): r for r in relations}

# relation_name (lower) -> list of relation dicts (there may be same name in
# multiple schemas; we only infer within the same schema to avoid noise).
by_name: dict[str, list[dict[str, Any]]] = {}
for r in relations:
by_name.setdefault(str(r.get("relation_name") or "").lower(), []).append(r)

cols_by_oid: dict[Any, dict[str, dict[str, Any]]] = {}
for c in columns:
name = c.get("column_name")
if name is None:
continue
cols_by_oid.setdefault(c.get("relation_oid"), {})[str(name).lower()] = c

pk_by_oid: dict[Any, list[str]] = {}
for pk in pk_columns:
name = pk.get("column_name")
if name is not None:
pk_by_oid.setdefault(pk.get("relation_oid"), []).append(str(name).lower())

declared: set[tuple[Any, str]] = set()
for edge in fk_edges:
col = edge.get("child_column_name")
if col is not None:
declared.add((edge.get("child_relation_oid"), str(col).lower()))

def _ref_column(target_oid: Any, child_col: str) -> str | None:
target_cols = cols_by_oid.get(target_oid, {})
pks = pk_by_oid.get(target_oid, [])
if len(pks) == 1 and pks[0] in target_cols:
return pks[0]
if child_col in target_cols: # e.g. orders.member_id -> member.member_id
return child_col
if "id" in target_cols:
return "id"
return None

results: list[dict[str, Any]] = []
seen: set[tuple[Any, str, Any, str]] = set()

for child in relations:
child_oid = child.get("relation_oid")
child_schema = str(child.get("schema_name") or "")
for col_name_lower, col in cols_by_oid.get(child_oid, {}).items():
if not col_name_lower.endswith("_id") or len(col_name_lower) <= 3:
continue
if (child_oid, col_name_lower) in declared:
continue
base = col_name_lower[:-3]
if not base:
continue
candidates = _candidate_target_names(base)
for cand in candidates:
for target in by_name.get(cand, []):
target_oid = target.get("relation_oid")
if str(target.get("schema_name") or "") != child_schema:
continue
if not pk_by_oid.get(target_oid):
continue
ref = _ref_column(target_oid, col_name_lower)
if ref is None:
continue
# A table's own PK named "<table>_id" is the key itself,
# not a self-referencing foreign key.
if target_oid == child_oid and ref == col_name_lower:
continue
key = (child_oid, col_name_lower, target_oid, ref)
if key in seen:
continue
seen.add(key)
ref_col = cols_by_oid.get(target_oid, {}).get(ref, {})
same_type = _norm_type(col.get("data_type")) == _norm_type(
ref_col.get("data_type")
)
results.append(
{
"child_schema": child_schema,
"child_table": str(child.get("relation_name") or ""),
"child_column": str(col.get("column_name") or ""),
"parent_schema": str(target.get("schema_name") or ""),
"parent_table": str(target.get("relation_name") or ""),
"parent_column": ref,
"confidence": "high" if same_type else "medium",
"reason": (
f"column '{col.get('column_name')}' matches table "
f"'{target.get('relation_name')}'"
+ ("" if same_type else " (type differs)")
),
}
)

results.sort(
key=lambda r: (
r["child_schema"],
r["child_table"],
r["child_column"],
r["parent_table"],
)
)
return results
94 changes: 94 additions & 0 deletions backend/tests/test_relationship_inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from __future__ import annotations

from app.spec.relationship_inference import infer_relationships


def _snapshot(*, declare_fk=False, member_plural=False, id_type="bigint"):
member_name = "members" if member_plural else "member"
return {
"relations": [
{"relation_oid": 1, "schema_name": "public", "relation_name": member_name},
{"relation_oid": 2, "schema_name": "public", "relation_name": "orders"},
],
"columns": [
{"relation_oid": 1, "column_name": "member_id", "data_type": "bigint"},
{"relation_oid": 1, "column_name": "email", "data_type": "text"},
{"relation_oid": 2, "column_name": "order_id", "data_type": "bigint"},
{"relation_oid": 2, "column_name": "member_id", "data_type": id_type},
],
"pk_columns": [
{"relation_oid": 1, "column_name": "member_id"},
{"relation_oid": 2, "column_name": "order_id"},
],
"fk_edges": (
[
{
"child_relation_oid": 2,
"parent_relation_oid": 1,
"child_column_name": "member_id",
"parent_column_name": "member_id",
}
]
if declare_fk
else []
),
}


def test_infers_member_id_to_member_with_high_confidence():
rels = infer_relationships(_snapshot())
assert len(rels) == 1
r = rels[0]
assert (r["child_table"], r["child_column"]) == ("orders", "member_id")
assert (r["parent_table"], r["parent_column"]) == ("member", "member_id")
assert r["confidence"] == "high"


def test_skips_already_declared_foreign_keys():
assert infer_relationships(_snapshot(declare_fk=True)) == []


def test_medium_confidence_when_types_differ():
rels = infer_relationships(_snapshot(id_type="integer"))
assert len(rels) == 1
assert rels[0]["confidence"] == "medium"
assert "type differs" in rels[0]["reason"]


def test_matches_plural_table_name():
rels = infer_relationships(_snapshot(member_plural=True))
assert len(rels) == 1
assert rels[0]["parent_table"] == "members"


def test_no_inference_without_a_matching_table_or_pk():
snap = {
"relations": [
{"relation_oid": 2, "schema_name": "public", "relation_name": "orders"}
],
"columns": [
# references a "member" table that does not exist
{"relation_oid": 2, "column_name": "member_id", "data_type": "bigint"},
],
"pk_columns": [],
"fk_edges": [],
}
assert infer_relationships(snap) == []
assert infer_relationships({}) == []


def test_only_infers_within_the_same_schema():
snap = {
"relations": [
{"relation_oid": 1, "schema_name": "core", "relation_name": "member"},
{"relation_oid": 2, "schema_name": "sales", "relation_name": "orders"},
],
"columns": [
{"relation_oid": 1, "column_name": "member_id", "data_type": "bigint"},
{"relation_oid": 2, "column_name": "member_id", "data_type": "bigint"},
],
"pk_columns": [{"relation_oid": 1, "column_name": "member_id"}],
"fk_edges": [],
}
# orders is in 'sales', member is in 'core' -> no cross-schema guess
assert infer_relationships(snap) == []