Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/app/api/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ async def test_connection(
raise

conn = await session.get(DbConnection, db_connection_uuid)
if conn is None:
raise HTTPException(status_code=404, detail="connection not found")
dsn = decrypt_text(conn.dsn_ciphertext, conn.dsn_nonce)
try:
version = await probe_database(dsn)
Expand Down
34 changes: 34 additions & 0 deletions backend/app/api/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
JobQueue,
SchemaSnapshot,
SchemaSnapshotData,
TableAnnotation,
)
from app.permissions import require_project_member
from app.schemas import (
Expand All @@ -26,6 +27,7 @@
from app.ddl.export import snapshot_json_to_sql
from app.ddl.migration import snapshot_diff_to_migration_sql
from app.diff.schema_diff import diff_snapshots
from app.spec.data_dictionary import snapshot_to_data_dictionary_md
from app.jobs.valkey_queue import enqueue_job_signal
from app.spec.llm import (
LlmConfigurationError,
Expand Down Expand Up @@ -236,6 +238,38 @@ async def export_migration_sql(
)


@router.get(
"/{schema_snapshot_uuid}/data-dictionary.md", response_class=PlainTextResponse
)
async def export_data_dictionary(
schema_snapshot_uuid: uuid.UUID,
user: CurrentUser = Depends(get_current_user),
session: AsyncSession = Depends(get_read_session),
) -> str:
"""Export a snapshot as a Markdown data dictionary, merged with the
project's table annotations (living documentation)."""
snap = await _get_authorized_snapshot(session, schema_snapshot_uuid, user)
if snap is None:
return "-- snapshot not found\n"
data = await session.get(SchemaSnapshotData, schema_snapshot_uuid)
if data is None:
return "-- snapshot data not found\n"
rows = await session.execute(
select(TableAnnotation).where(
TableAnnotation.project_space_uuid == snap.project_space_uuid
)
)
annotations = [
{
"schema_name": a.schema_name,
"relation_name": a.relation_name,
"body": a.body,
}
for a in rows.scalars().all()
]
return snapshot_to_data_dictionary_md(data.snapshot_json, annotations)


@router.get(
"/{schema_snapshot_uuid}/reversing-spec.md",
response_class=PlainTextResponse,
Expand Down
176 changes: 176 additions & 0 deletions backend/app/spec/data_dictionary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""Render a schema snapshot as a Markdown data dictionary.

Turns a captured snapshot (plus optional project table annotations) into
"living documentation": one section per table with columns, keys, indexes,
comments, example values, and any human-authored annotation.

Pure and dialect-agnostic -- it reads the common snapshot JSON shape produced
by the introspectors, so it works for both PostgreSQL and Snowflake snapshots.
"""

from __future__ import annotations

from typing import Any, Iterable


def _cell(value: object) -> str:
"""Render a value for a Markdown table cell (escape pipes/newlines)."""
if value is None:
return ""
text = str(value)
return text.replace("|", "\\|").replace("\n", " ").strip()


def _relation_kind_label(kind: object) -> str:
return {
"r": "table",
"v": "view",
"m": "materialized view",
"p": "partitioned table",
"f": "foreign table",
}.get(str(kind or ""), "table")


def _annotation_map(
annotations: Iterable[dict[str, Any]] | None,
) -> dict[tuple[str, str], str]:
result: dict[tuple[str, str], str] = {}
for ann in annotations or []:
key = (str(ann.get("schema_name") or ""), str(ann.get("relation_name") or ""))
body = ann.get("body")
if body:
result[key] = str(body)
return result


def snapshot_to_data_dictionary_md(
snapshot: dict[str, Any] | None,
annotations: Iterable[dict[str, Any]] | None = None,
) -> str:
"""Return a Markdown data dictionary for one snapshot."""
snapshot = snapshot or {}
relations = snapshot.get("relations") or []
columns = snapshot.get("columns") or []
pk_columns = snapshot.get("pk_columns") or []
fk_edges = snapshot.get("fk_edges") or []
indexes = snapshot.get("indexes") or []
ann_by_table = _annotation_map(annotations)

oid_to_rel: dict[Any, dict[str, Any]] = {
rel.get("relation_oid"): rel for rel in relations
}

cols_by_oid: dict[Any, list[dict[str, Any]]] = {}
for col in columns:
cols_by_oid.setdefault(col.get("relation_oid"), []).append(col)
for cols in cols_by_oid.values():
cols.sort(key=lambda c: (c.get("column_position") or 0))

pk_by_oid: dict[Any, set[str]] = {}
for pk in pk_columns:
name = pk.get("column_name")
if name is not None:
pk_by_oid.setdefault(pk.get("relation_oid"), set()).add(str(name))

fk_child_cols: dict[Any, set[str]] = {}
fk_by_oid: dict[Any, list[dict[str, Any]]] = {}
for edge in fk_edges:
oid = edge.get("child_relation_oid")
fk_by_oid.setdefault(oid, []).append(edge)
child_col = edge.get("child_column_name")
if child_col is not None:
fk_child_cols.setdefault(oid, set()).add(str(child_col))

idx_by_oid: dict[Any, list[dict[str, Any]]] = {}
for idx in indexes:
idx_by_oid.setdefault(idx.get("relation_oid"), []).append(idx)

out: list[str] = ["# Data Dictionary", ""]
meta = []
if snapshot.get("captured_at"):
meta.append(f"Captured: {snapshot['captured_at']}")
if snapshot.get("server_version"):
meta.append(f"Server: {snapshot['server_version']}")
if meta:
out.append("_" + " · ".join(meta) + "_")
out.append("")

def _rel_sort_key(rel: dict[str, Any]) -> tuple[str, str]:
return (str(rel.get("schema_name") or ""), str(rel.get("relation_name") or ""))

if not relations:
out.append("_No tables in this snapshot._")
return "\n".join(out) + "\n"

for rel in sorted(relations, key=_rel_sort_key):
oid = rel.get("relation_oid")
schema = str(rel.get("schema_name") or "")
name = str(rel.get("relation_name") or "")
out.append(f"## {schema}.{name}")
kind = _relation_kind_label(rel.get("relation_kind"))
if kind != "table":
out.append(f"_{kind}_")
if rel.get("relation_comment"):
out.append(str(rel["relation_comment"]))
note = ann_by_table.get((schema, name))
if note:
out.append(f"> 📝 {note}")
out.append("")

pks = pk_by_oid.get(oid, set())
fk_cols = fk_child_cols.get(oid, set())
out.append("| # | Column | Type | Null | Default | Key | Comment | Example |")
out.append("|---|--------|------|------|---------|-----|---------|---------|")
for i, col in enumerate(cols_by_oid.get(oid, []), start=1):
col_name = str(col.get("column_name") or "")
key_marks = []
if col_name in pks:
key_marks.append("PK")
if col_name in fk_cols:
key_marks.append("FK")
nullable = "NOT NULL" if col.get("is_not_null") else ""
default = col.get("default_expr") if col.get("has_default") else ""
out.append(
"| {i} | {name} | {type} | {null} | {default} | {key} | {comment} | {example} |".format(
i=i,
name=_cell(col_name),
type=_cell(col.get("data_type")),
null=nullable,
default=_cell(default),
key=" ".join(key_marks),
comment=_cell(col.get("column_comment")),
example=_cell(col.get("example_value")),
)
)
out.append("")

fks = fk_by_oid.get(oid, [])
if fks:
out.append("**Foreign keys:**")
for edge in fks:
parent = oid_to_rel.get(edge.get("parent_relation_oid"), {})
parent_name = (
f"{parent.get('schema_name', '')}.{parent.get('relation_name', '')}"
if parent
else "?"
)
out.append(
f"- `{col_or_q(edge.get('child_column_name'))}` → "
f"`{parent_name}.{col_or_q(edge.get('parent_column_name'))}`"
+ (f" ({edge['fk_constraint_name']})" if edge.get("fk_constraint_name") else "")
)
out.append("")

idxs = [i for i in idx_by_oid.get(oid, []) if not i.get("is_primary")]
if idxs:
out.append("**Indexes:**")
for idx in idxs:
unique = "UNIQUE " if idx.get("is_unique") else ""
out.append(f"- {unique}`{idx.get('index_name', '')}`")
out.append("")

return "\n".join(out).rstrip() + "\n"


def col_or_q(value: object) -> str:
return str(value) if value is not None else "?"
100 changes: 100 additions & 0 deletions backend/tests/test_data_dictionary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from __future__ import annotations

from app.spec.data_dictionary import snapshot_to_data_dictionary_md


def _snapshot():
return {
"captured_at": "2026-07-05T00:00:00Z",
"server_version": "16.2",
"relations": [
{
"relation_oid": 1,
"schema_name": "public",
"relation_name": "member",
"relation_kind": "r",
"relation_comment": "회원 마스터",
},
{
"relation_oid": 2,
"schema_name": "public",
"relation_name": "orders",
"relation_kind": "r",
},
{
"relation_oid": 3,
"schema_name": "public",
"relation_name": "active_members",
"relation_kind": "v",
},
],
"columns": [
{"relation_oid": 1, "column_position": 1, "column_name": "member_id", "data_type": "bigint", "is_not_null": True, "example_value": "1001"},
{"relation_oid": 1, "column_position": 2, "column_name": "email", "data_type": "varchar(100)", "is_not_null": False, "has_default": True, "default_expr": "''::varchar", "column_comment": "a|b"},
{"relation_oid": 2, "column_position": 1, "column_name": "order_id", "data_type": "bigint", "is_not_null": True},
{"relation_oid": 2, "column_position": 2, "column_name": "member_id", "data_type": "bigint", "is_not_null": True},
],
"pk_columns": [
{"relation_oid": 1, "column_name": "member_id", "column_ordinal": 1},
{"relation_oid": 2, "column_name": "order_id", "column_ordinal": 1},
],
"fk_edges": [
{
"fk_constraint_name": "fk_orders_member",
"child_relation_oid": 2,
"parent_relation_oid": 1,
"child_column_name": "member_id",
"parent_column_name": "member_id",
"column_ordinal": 1,
}
],
"indexes": [
{"relation_oid": 1, "index_name": "member_pkey", "is_unique": True, "is_primary": True},
{"relation_oid": 1, "index_name": "ix_member_email", "is_unique": True, "is_primary": False},
],
}


def test_renders_tables_columns_keys_and_metadata():
md = snapshot_to_data_dictionary_md(_snapshot())
assert md.startswith("# Data Dictionary")
assert "Captured: 2026-07-05T00:00:00Z" in md
assert "## public.member" in md
assert "회원 마스터" in md
# column row with PK marker + example
assert "| member_id | bigint | NOT NULL |" in md
assert "| PK |" in md or " PK " in md
assert "1001" in md
# default rendered when has_default
assert "''::varchar" in md
# pipe in comment escaped
assert "a\\|b" in md


def test_renders_foreign_keys_and_non_primary_indexes():
md = snapshot_to_data_dictionary_md(_snapshot())
assert "**Foreign keys:**" in md
assert "`member_id` → `public.member.member_id`" in md
assert "(fk_orders_member)" in md
assert "**Indexes:**" in md
assert "UNIQUE `ix_member_email`" in md
# primary-key index is not listed under Indexes
assert "member_pkey" not in md.split("**Indexes:**", 1)[-1]


def test_merges_project_annotations():
md = snapshot_to_data_dictionary_md(
_snapshot(),
annotations=[
{"schema_name": "public", "relation_name": "orders", "body": "주문 트랜잭션 테이블"}
],
)
assert "> 📝 주문 트랜잭션 테이블" in md


def test_labels_views_and_handles_empty_snapshot():
md = snapshot_to_data_dictionary_md(_snapshot())
assert "## public.active_members" in md
assert "_view_" in md
empty = snapshot_to_data_dictionary_md({})
assert "No tables in this snapshot" in empty
24 changes: 24 additions & 0 deletions frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
deleteAnnotation,
deleteView,
diffSnapshots,
fetchDataDictionaryMarkdown,
getSnapshot,
getView,
listAnnotations,
Expand Down Expand Up @@ -805,6 +806,16 @@ export default function App() {
downloadText("pg-erd-diagram.mermaid", exportMermaid(nodes, edges), "text/plain");
}

async function onDownloadDataDictionary() {
if (!snapshotId) return;
try {
const md = await fetchDataDictionaryMarkdown(snapshotId);
downloadText("data-dictionary.md", md, "text/markdown");
} catch {
window.alert("데이터 사전을 불러오지 못했습니다.");
}
}

function onRelDelete() {
if (!editingEdge) return;
if (!window.confirm("정말로 이 관계를 삭제하시겠습니까?")) return;
Expand Down Expand Up @@ -1699,6 +1710,19 @@ export default function App() {
>
{"{}"}
</button>
<button
type="button"
onClick={onDownloadDataDictionary}
disabled={!snapshotId}
title={
!snapshotId
? "스냅샷을 먼저 생성하세요"
: "데이터 사전 (Markdown) 내보내기"
}
aria-label="데이터 사전 내보내기"
>
📖
</button>
<div className="srOnly" aria-live="polite">
{[layoutMessage, nodeSearchStatus].filter(Boolean).join(" ")}
</div>
Expand Down
Loading