Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions api/app/lib/age_client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,23 +157,50 @@ def _execute_cypher(
$$) as ({column_spec});
"""

with conn.cursor(cursor_factory=extras.RealDictCursor) as cur:
# Retry logic for AGE label creation race conditions.
# When two threads simultaneously CREATE a node with a label that doesn't
# exist yet, AGE auto-creates the label (DDL) and the loser gets
# "relation already exists". Retrying succeeds because the label now exists.
max_retries = 2
for attempt in range(max_retries + 1):
try:
cur.execute(age_query)
with conn.cursor(cursor_factory=extras.RealDictCursor) as cur:
cur.execute(age_query)

if fetch_one:
result = cur.fetchone()
if result:
return {k: self._parse_agtype(v) for k, v in result.items()}
return None
else:
results = cur.fetchall()
return [
{k: self._parse_agtype(v) for k, v in row.items()}
for row in results
]
except Exception as e:
error_str = str(e)

# Check if this is an expected race condition (parallel restore operations)
is_expected_race = (
"already exists" in error_str or
"Entity failed to be updated" in error_str
)

# Log at appropriate level (DEBUG for expected races, ERROR for real problems)
if is_expected_race and attempt < max_retries:
logger.debug(
"AGE label race condition (attempt %d/%d), retrying: %s",
attempt + 1, max_retries, error_str.split('\n')[0]
)
# Roll back the failed transaction so the connection is usable
conn.rollback()
self._setup_age(conn)
continue

# Final attempt or non-race error: log and raise
log_level = logger.debug if is_expected_race else logger.error

log_level("=" * 80)
log_level("Query execution failed" if not is_expected_race else "Expected concurrency conflict (will retry)")
log_level("Query execution failed" if not is_expected_race else "Expected concurrency conflict (retries exhausted)")
log_level(f"Error: {e}")
log_level(f"Column spec: {column_spec}")
log_level(f"Original query length: {len(query)} chars")
Expand All @@ -187,20 +214,6 @@ def _execute_cypher(
log_level("=" * 80)
raise

if fetch_one:
result = cur.fetchone()
if result:
# Parse all agtype values in the result dict
return {k: self._parse_agtype(v) for k, v in result.items()}
return None
else:
results = cur.fetchall()
# Parse all agtype values in each result dict
return [
{k: self._parse_agtype(v) for k, v in row.items()}
for row in results
]

finally:
conn.commit()
self.pool.putconn(conn)
Expand Down
113 changes: 113 additions & 0 deletions schema/migrations/058_precreate_graph_labels.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
-- Migration 058: Pre-create all vertex and edge labels (prevents AGE race condition)
--
-- Apache AGE auto-creates labels on first CREATE/MERGE, but this is a DDL operation
-- (CREATE TABLE) that is not concurrent-safe. On a fresh install, parallel ingestion
-- threads can race to create the same label, causing:
-- "relation 'Source' already exists"
--
-- Fix: explicitly create all known vertex and edge labels at schema init time.
-- Uses DO blocks to check ag_catalog.ag_label before creating (AGE has no
-- CREATE VLABEL IF NOT EXISTS syntax).

LOAD 'age';
SET search_path = ag_catalog, "$user", public;

-- ============================================================
-- Vertex labels
-- ============================================================

DO $$
DECLARE
labels text[] := ARRAY['Concept', 'Source', 'Instance', 'Ontology', 'DocumentMeta', 'VocabType', 'VocabCategory'];
lbl text;
BEGIN
FOREACH lbl IN ARRAY labels LOOP
IF NOT EXISTS (
SELECT 1 FROM ag_catalog.ag_label l
JOIN ag_catalog.ag_graph g ON l.graph = g.graphid
WHERE g.name = 'knowledge_graph' AND l.name = lbl AND l.kind = 'v'
) THEN
EXECUTE format(
'SELECT * FROM cypher(''knowledge_graph'', $$ CREATE VLABEL %I $$) as (a agtype)',
lbl
);
RAISE NOTICE 'Created vertex label: %', lbl;
END IF;
END LOOP;
END $$;

-- ============================================================
-- Static edge labels (infrastructure edges)
-- ============================================================

DO $$
DECLARE
labels text[] := ARRAY[
'APPEARS', 'EVIDENCED_BY', 'FROM_SOURCE', 'SCOPED_BY',
'HAS_SOURCE', 'IN_CATEGORY', 'ANCHORED_BY',
'OVERLAPS', 'SPECIALIZES', 'GENERALIZES'
];
lbl text;
BEGIN
FOREACH lbl IN ARRAY labels LOOP
IF NOT EXISTS (
SELECT 1 FROM ag_catalog.ag_label l
JOIN ag_catalog.ag_graph g ON l.graph = g.graphid
WHERE g.name = 'knowledge_graph' AND l.name = lbl AND l.kind = 'e'
) THEN
EXECUTE format(
'SELECT * FROM cypher(''knowledge_graph'', $$ CREATE ELABEL %I $$) as (a agtype)',
lbl
);
RAISE NOTICE 'Created edge label: %', lbl;
END IF;
END LOOP;
END $$;

-- ============================================================
-- Vocabulary edge labels (30 canonical relationship types)
-- These are dynamically created during ingestion, but pre-creating
-- them eliminates the concurrent DDL race on first use.
-- ============================================================

DO $$
DECLARE
labels text[] := ARRAY[
-- logical_truth
'IMPLIES', 'CONTRADICTS', 'PRESUPPOSES', 'EQUIVALENT_TO',
-- causal
'CAUSES', 'ENABLES', 'PREVENTS', 'INFLUENCES', 'RESULTS_FROM',
-- structural
'PART_OF', 'CONTAINS', 'COMPOSED_OF', 'SUBSET_OF', 'INSTANCE_OF',
-- evidential
'SUPPORTS', 'REFUTES', 'EXEMPLIFIES', 'MEASURED_BY',
-- similarity
'SIMILAR_TO', 'ANALOGOUS_TO', 'CONTRASTS_WITH', 'OPPOSITE_OF',
-- temporal
'PRECEDES', 'CONCURRENT_WITH', 'EVOLVES_INTO',
-- functional
'USED_FOR', 'REQUIRES', 'PRODUCES', 'REGULATES',
-- meta
'DEFINED_AS', 'CATEGORIZED_AS'
];
lbl text;
BEGIN
FOREACH lbl IN ARRAY labels LOOP
IF NOT EXISTS (
SELECT 1 FROM ag_catalog.ag_label l
JOIN ag_catalog.ag_graph g ON l.graph = g.graphid
WHERE g.name = 'knowledge_graph' AND l.name = lbl AND l.kind = 'e'
) THEN
EXECUTE format(
'SELECT * FROM cypher(''knowledge_graph'', $$ CREATE ELABEL %I $$) as (a agtype)',
lbl
);
RAISE NOTICE 'Created edge label: %', lbl;
END IF;
END LOOP;
END $$;

-- Record migration
INSERT INTO public.schema_migrations (version, name)
VALUES (58, 'precreate_graph_labels')
ON CONFLICT (version) DO NOTHING;