-
Notifications
You must be signed in to change notification settings - Fork 0
Feat: Implement FASE data ingestion pipeline (Oracle & Postgres to Iceberg) #341
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| #!/bin/bash | ||
|
|
||
| # Define the list of schemas to process in "SCHEMA_NAME:INCLUDE_VIEWS" format | ||
| # Set the second value to 'true' if you want to clone views for that schema | ||
| SCHEMAS=( | ||
| "FACILITY_COMMON:false" | ||
| "FACILITY_ERAS:false" | ||
| "FACILITY_SAFETY_TEST:false" | ||
| "FACILITY_SCHEDULE:false" | ||
| "FACILITY_VISITS:false" | ||
| "ISISUSERDB:false" | ||
| "PROPOSAL_REGISTRY:false" | ||
| "IOPS_V4:true" | ||
| "PROP_CLF_ALL:true" | ||
| "PROP_STATUS:true" | ||
| "XPRESS_V4:true" | ||
| ) | ||
|
|
||
| echo "Starting Oracle table cloning process..." | ||
| echo "---------------------------------------" | ||
|
|
||
| # Loop through each entry | ||
| for ENTRY in "${SCHEMAS[@]}" | ||
| do | ||
| # Split the entry into schema name and the include_views flag | ||
| SCHEMA="${ENTRY%%:*}" | ||
| INCLUDE_VIEWS="${ENTRY#*:}" | ||
|
|
||
| # Determine if we should add the flag to the command | ||
| VIEW_FLAG="" | ||
| if [ "$INCLUDE_VIEWS" = "true" ]; then | ||
| VIEW_FLAG="--include-views" | ||
| fi | ||
|
|
||
| echo "Running: python3 ./clone_oracle_table.py $SCHEMA all $VIEW_FLAG" | ||
|
|
||
| # Execute the command | ||
| python3 ./clone_oracle_table.py "$SCHEMA" all $VIEW_FLAG | ||
|
|
||
| # Check if the previous command succeeded | ||
| if [ $? -eq 0 ]; then | ||
| echo "✅ COMPLETED: $SCHEMA (Include Views: $INCLUDE_VIEWS)" | ||
| else | ||
| echo "❌ FAILED: $SCHEMA" | ||
| fi | ||
|
|
||
| echo "---------------------------------------" | ||
| done | ||
|
|
||
| echo "All tasks finished." |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,180 @@ | ||
| #!/usr/bin/env python3 | ||
| import os | ||
| # Set session default timezone to UTC to help bypass potential ORA-01805 version mismatches | ||
| os.environ["ORA_SDTZ"] = "UTC" | ||
|
|
||
| import argparse | ||
| import dlt | ||
| import cx_Oracle | ||
| import dateutil.parser | ||
|
|
||
| from dlt.sources.sql_database import sql_database | ||
| from elt_common.dlt_destinations import pyiceberg | ||
| from sqlalchemy import create_engine, event | ||
| from sqlalchemy.pool import NullPool | ||
|
|
||
| # ---------------------------- | ||
| # Arguments | ||
| # ---------------------------- | ||
| def parse_args(): | ||
| parser = argparse.ArgumentParser( | ||
| description="Run DLT pipeline for a specific Oracle table or ALL tables" | ||
| ) | ||
| parser.add_argument("target_schema", help="Oracle schema name") | ||
| parser.add_argument("target_table", help="Oracle table name to clone or 'all' for entire schema") | ||
| parser.add_argument( | ||
| "--drop", | ||
| action="store_true", | ||
| help="Force drop the destination table and clear pipeline state before running" | ||
| ) | ||
| # NEW ARGUMENT ADDED HERE | ||
| parser.add_argument( | ||
| "--include-views", | ||
| action="store_true", | ||
| default=False, | ||
| help="If set, includes views in the cloning process (default: False)" | ||
| ) | ||
| return parser.parse_args() | ||
|
|
||
| # ---------------------------- | ||
| # Debug Mapper: Inspecting Nulls and Types | ||
| # ---------------------------- | ||
| def robust_data_mapper(item): | ||
| date_keywords = ['DATE', 'TIME', 'TIMESTAMP'] | ||
|
|
||
| if 'PAYLOAD' in item or 'payload' in item: | ||
| key = 'PAYLOAD' if 'PAYLOAD' in item else 'payload' | ||
| val = item[key] | ||
| if val is None: | ||
| item[key] = "" | ||
|
|
||
| for key, value in item.items(): | ||
| if isinstance(value, str) and any(kw in key.upper() for kw in date_keywords): | ||
| try: | ||
| item[key] = dateutil.parser.parse(value) | ||
| except (ValueError, TypeError): | ||
| pass | ||
| return item | ||
|
Comment on lines
+42
to
+57
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pattern-based date parsing may produce false positives. The function uses keyword matching ( 🤖 Prompt for AI Agents |
||
|
|
||
| # ---------------------------- | ||
| # No PK handling | ||
| # ---------------------------- | ||
| def disable_dlt_identifiers_if_no_pk(resource): | ||
| has_pk = any( | ||
| col.get("primary_key", False) | ||
| for col in resource.columns.values() | ||
| ) | ||
| if not has_pk: | ||
| resource.apply_hints(primary_key=[]) | ||
|
|
||
| # ---------------------------- | ||
| # Oracle type fix (Timestamp & LOBs) | ||
| # ---------------------------- | ||
| def oracle_output_type_handler(cursor, name, default_type, size, precision, scale): | ||
| if default_type in (cx_Oracle.DATETIME, cx_Oracle.TIMESTAMP, getattr(cx_Oracle, 'TIMESTAMPTZ', -1)): | ||
| return cursor.var(str, 128, arraysize=cursor.arraysize) | ||
| if default_type == cx_Oracle.CLOB: | ||
| return cursor.var(cx_Oracle.LONG_STRING, arraysize=cursor.arraysize) | ||
| if default_type == cx_Oracle.BLOB: | ||
| return cursor.var(cx_Oracle.LONG_BINARY, arraysize=cursor.arraysize) | ||
|
|
||
| # ---------------------------- | ||
| # Robust Schema Hints | ||
| # ---------------------------- | ||
| def apply_robust_hints(resource): | ||
| new_cols = {} | ||
| for col_name, col_meta in resource.columns.items(): | ||
| is_pk = col_meta.get("primary_key", False) | ||
| data_type = col_meta.get("data_type") | ||
| hint = {} | ||
|
|
||
| if not is_pk: | ||
| hint["nullable"] = True | ||
| else: | ||
| hint["primary_key"] = True | ||
| hint["nullable"] = False | ||
|
|
||
| if any(kw in col_name.upper() for kw in ['DATE', 'TIME', 'TIMESTAMP']): | ||
| hint["data_type"] = "timestamp" | ||
|
|
||
| if data_type in ("decimal", "double", "float"): | ||
| hint["data_type"] = "double" | ||
|
Comment on lines
+100
to
+101
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mapping Converting 🔢 Proposed fix to preserve decimal precision- if data_type in ("decimal", "double", "float"):
+ if data_type in ("double", "float"):
hint["data_type"] = "double"
+ # Preserve decimal types to avoid precision loss🤖 Prompt for AI Agents |
||
|
|
||
| if hint: | ||
| new_cols[col_name] = hint | ||
|
|
||
| if new_cols: | ||
| resource.apply_hints(columns=new_cols) | ||
|
|
||
| # ---------------------------- | ||
| # DLT Oracle source | ||
| # ---------------------------- | ||
| @dlt.source() | ||
| def oracle_source(oracle_uri: str, schema_name: str, table_name: str, include_views: bool = False): | ||
| engine = create_engine( | ||
| oracle_uri, | ||
| poolclass=NullPool, | ||
| connect_args={"encoding": "UTF-8", "nencoding": "UTF-8"}, | ||
| ) | ||
|
|
||
| @event.listens_for(engine, "connect") | ||
| def on_connect(dbapi_conn, _): | ||
| dbapi_conn.outputtypehandler = oracle_output_type_handler | ||
| cursor = dbapi_conn.cursor() | ||
| cursor.execute(""" | ||
| ALTER SESSION SET | ||
| NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS' | ||
| NLS_TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF' | ||
| NLS_TIMESTAMP_TZ_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM' | ||
| """) | ||
| cursor.close() | ||
|
|
||
| tables = None if table_name.lower() == 'all' else [table_name] | ||
|
|
||
| # PASSING THE ARGUMENT HERE | ||
| source = sql_database( | ||
| credentials=engine, | ||
| schema=schema_name, | ||
| include_views=include_views, | ||
| table_names=tables | ||
| ) | ||
|
|
||
| for resource in source.resources.values(): | ||
| apply_robust_hints(resource) | ||
| disable_dlt_identifiers_if_no_pk(resource) | ||
| resource.add_map(robust_data_mapper) | ||
|
|
||
| return source | ||
|
|
||
| if __name__ == "__main__": | ||
| args = parse_args() | ||
|
|
||
| oracle_uri = ( | ||
| f"oracle+cx_oracle://{args.target_schema}:pa55w0rdTolocalDB" | ||
| "@localhost:1521/?service_name=xepdb" | ||
| ) | ||
|
Comment on lines
+152
to
+155
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical: Remove hardcoded credentials. The password 🔐 Proposed fix using environment variables+ oracle_password = os.environ.get("ORACLE_PASSWORD")
+ if not oracle_password:
+ raise ValueError("ORACLE_PASSWORD environment variable must be set")
+
oracle_uri = (
- f"oracle+cx_oracle://{args.target_schema}:pa55w0rdTolocalDB"
+ f"oracle+cx_oracle://{args.target_schema}:{oracle_password}"
"`@localhost`:1521/?service_name=xepdb"
)🤖 Prompt for AI Agents |
||
|
|
||
| pipeline_id = f"debug_oracle_{args.target_schema.lower()}_{args.target_table.lower()}" | ||
|
|
||
| pipeline = dlt.pipeline( | ||
| pipeline_name=pipeline_id, | ||
| destination=pyiceberg, | ||
| dataset_name=args.target_schema.lower() | ||
| ) | ||
|
|
||
| if args.drop: | ||
| print(f"\n--- [RESET] Dropping state for: {pipeline_id} ---") | ||
| pipeline.drop() | ||
|
|
||
| try: | ||
| # PASSING THE ARGUMENT FROM ARGS | ||
| source_data = oracle_source( | ||
| oracle_uri, | ||
| args.target_schema, | ||
| args.target_table, | ||
| include_views=args.include_views | ||
| ) | ||
| info = pipeline.run(source_data, write_disposition="replace") | ||
| print(info) | ||
| except Exception: | ||
| raise | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,95 @@ | ||||||||||||||||
| import dlt | ||||||||||||||||
| import pandas as pd | ||||||||||||||||
| import json | ||||||||||||||||
| import argparse | ||||||||||||||||
| import sys | ||||||||||||||||
| from sqlalchemy import create_engine, inspect | ||||||||||||||||
| from elt_common.dlt_destinations import pyiceberg | ||||||||||||||||
|
|
||||||||||||||||
| BASE_URI = "postgresql://duouser:duopassword@localhost:5432/duo" | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hardcoded database credentials expose a critical security risk. The connection URI contains plaintext username and password that will be committed to version control and visible in repository history. Credentials must never be hardcoded. 🔒 Proposed fix using environment variables+import os
+
-BASE_URI = "postgresql://duouser:duopassword@localhost:5432/duo"
+BASE_URI = os.environ.get(
+ "POSTGRES_URI",
+ "postgresql://localhost:5432/duo" # Default without credentials for local dev
+)Alternatively, integrate with the project's existing credential management system if available. 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||
|
|
||||||||||||||||
| def map_pg_to_dlt_type(pg_type): | ||||||||||||||||
| """Maps SQLAlchemy types to dlt core types.""" | ||||||||||||||||
| t = str(pg_type).lower() | ||||||||||||||||
| if 'int' in t: return 'bigint' | ||||||||||||||||
| if 'bool' in t: return 'bool' | ||||||||||||||||
| if 'json' in t: return 'text' | ||||||||||||||||
| if 'float' in t or 'numeric' in t or 'double' in t: return 'double' | ||||||||||||||||
| if 'timestamp' in t: return 'timestamp' | ||||||||||||||||
| if 'date' in t: return 'date' | ||||||||||||||||
| return 'text' | ||||||||||||||||
|
Comment on lines
+14
to
+20
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix formatting violations blocking CI. Lines 14–19 violate E701 (multiple statements on one line), causing static checks to fail. 🔧 Proposed fix to expand single-line conditionals- if 'int' in t: return 'bigint'
- if 'bool' in t: return 'bool'
- if 'json' in t: return 'text'
- if 'float' in t or 'numeric' in t or 'double' in t: return 'double'
- if 'timestamp' in t: return 'timestamp'
- if 'date' in t: return 'date'
+ if 'int' in t:
+ return 'bigint'
+ if 'bool' in t:
+ return 'bool'
+ if 'json' in t:
+ return 'text'
+ if 'float' in t or 'numeric' in t or 'double' in t:
+ return 'double'
+ if 'timestamp' in t:
+ return 'timestamp'
+ if 'date' in t:
+ return 'date'
return 'text'🧰 Tools🪛 GitHub Actions: Static checks / 0_static checks.txt[error] 14-14: ruff check (E701): Multiple statements on one line (colon) at [error] 15-15: ruff check (E701): Multiple statements on one line (colon) at [error] 16-16: ruff check (E701): Multiple statements on one line (colon) at [error] 17-17: ruff check (E701): Multiple statements on one line (colon) at [error] 18-18: ruff check (E701): Multiple statements on one line (colon) at [error] 19-19: ruff check (E701): Multiple statements on one line (colon) at 🪛 GitHub Actions: Static checks / static checks[error] 14-14: ruff (E701) Multiple statements on one line (colon) at: [error] 15-15: ruff (E701) Multiple statements on one line (colon) at: [error] 16-16: ruff (E701) Multiple statements on one line (colon) at: [error] 17-17: ruff (E701) Multiple statements on one line (colon) at: [error] 18-18: ruff (E701) Multiple statements on one line (colon) at: [error] 19-19: ruff (E701) Multiple statements on one line (colon) at: 🪛 Ruff (0.15.15)[error] 14-14: Multiple statements on one line (colon) (E701) [error] 15-15: Multiple statements on one line (colon) (E701) [error] 16-16: Multiple statements on one line (colon) (E701) [error] 17-17: Multiple statements on one line (colon) (E701) [error] 18-18: Multiple statements on one line (colon) (E701) [error] 19-19: Multiple statements on one line (colon) (E701) 🤖 Prompt for AI AgentsSources: Linters/SAST tools, Pipeline failures |
||||||||||||||||
|
|
||||||||||||||||
| def get_table_metadata(engine, schema, table_name): | ||||||||||||||||
| inspector = inspect(engine) | ||||||||||||||||
| columns = inspector.get_columns(table_name, schema=schema) | ||||||||||||||||
| json_cols = [c['name'] for c in columns if 'json' in str(c['type']).lower()] | ||||||||||||||||
| return json_cols, columns | ||||||||||||||||
|
|
||||||||||||||||
| @dlt.source | ||||||||||||||||
| def postgres_dynamic_source(engine, schema, target_table): | ||||||||||||||||
| inspector = inspect(engine) | ||||||||||||||||
| tables = [target_table] if target_table.lower() != 'all' else inspector.get_table_names(schema=schema) | ||||||||||||||||
|
|
||||||||||||||||
| for table_name in tables: | ||||||||||||||||
| json_cols, columns = get_table_metadata(engine, schema, table_name) | ||||||||||||||||
|
|
||||||||||||||||
| # Build hints and a dummy row | ||||||||||||||||
| col_hints = {} | ||||||||||||||||
| dummy_row = {} | ||||||||||||||||
|
|
||||||||||||||||
| for c in columns: | ||||||||||||||||
| col_name = c['name'] | ||||||||||||||||
| col_hints[col_name] = { | ||||||||||||||||
| "name": col_name, | ||||||||||||||||
| "data_type": map_pg_to_dlt_type(c['type']), | ||||||||||||||||
| "nullable": True # FORCE nullable for the dummy load | ||||||||||||||||
| } | ||||||||||||||||
| dummy_row[col_name] = None | ||||||||||||||||
|
|
||||||||||||||||
| def make_resource(t_name, j_cols, hints, d_row): | ||||||||||||||||
| @dlt.resource( | ||||||||||||||||
| name=t_name, | ||||||||||||||||
| write_disposition="replace", | ||||||||||||||||
| columns=hints, | ||||||||||||||||
| table_format="iceberg" | ||||||||||||||||
| ) | ||||||||||||||||
| def load_resource(): | ||||||||||||||||
| query = f'SELECT * FROM "{schema}"."{t_name}"' | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid string interpolation for SQL identifiers. The f-string query construction is flagged as a potential SQL injection vector (S608). Whilst 🛡️ Proposed fix using SQLAlchemy text with bound parameters or proper quoting+from sqlalchemy import text
+
- query = f'SELECT * FROM "{schema}"."{t_name}"'
- df = pd.read_sql(query, engine)
+ # Use SQLAlchemy text and proper identifier binding
+ query = text(f'SELECT * FROM "{schema}"."{t_name}"')
+ df = pd.read_sql(query, engine)Alternatively, use pandas - query = f'SELECT * FROM "{schema}"."{t_name}"'
- df = pd.read_sql(query, engine)
+ df = pd.read_sql_table(t_name, engine, schema=schema)📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.15.15)[error] 57-57: Possible SQL injection vector through string-based query construction (S608) 🤖 Prompt for AI AgentsSource: Linters/SAST tools |
||||||||||||||||
| df = pd.read_sql(query, engine) | ||||||||||||||||
|
|
||||||||||||||||
| if not df.empty: | ||||||||||||||||
| for col in j_cols: | ||||||||||||||||
| if col in df.columns: | ||||||||||||||||
| df[col] = df[col].apply( | ||||||||||||||||
| lambda x: json.dumps(x) if isinstance(x, (dict, list)) else (str(x) if x is not None else None) | ||||||||||||||||
| ) | ||||||||||||||||
| yield df.to_dict(orient="records") | ||||||||||||||||
| else: | ||||||||||||||||
| print(f"--- Table {t_name} is empty. Yielding dummy row to force Iceberg creation. ---") | ||||||||||||||||
| # Yielding a list with one dictionary containing Nones forces a load package | ||||||||||||||||
| yield [d_row] | ||||||||||||||||
|
|
||||||||||||||||
| return load_resource | ||||||||||||||||
|
|
||||||||||||||||
| yield make_resource(table_name, json_cols, col_hints, dummy_row) | ||||||||||||||||
|
|
||||||||||||||||
| if __name__ == "__main__": | ||||||||||||||||
| parser = argparse.ArgumentParser() | ||||||||||||||||
| parser.add_argument("schema") | ||||||||||||||||
| parser.add_argument("table") | ||||||||||||||||
| args = parser.parse_args() | ||||||||||||||||
|
|
||||||||||||||||
| engine = create_engine(BASE_URI) | ||||||||||||||||
| pipeline = dlt.pipeline( | ||||||||||||||||
| pipeline_name=f"pg_to_iceberg_{args.schema}", | ||||||||||||||||
| destination=pyiceberg, | ||||||||||||||||
| dataset_name=args.schema, | ||||||||||||||||
| ) | ||||||||||||||||
|
|
||||||||||||||||
| try: | ||||||||||||||||
| info = pipeline.run(postgres_dynamic_source(engine, args.schema, args.table)) | ||||||||||||||||
| print(info) | ||||||||||||||||
| except Exception as e: | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove unused exception variable to fix CI. The exception variable 🔧 Proposed fix- except Exception as e:
+ except Exception:
import traceback
traceback.print_exc()
sys.exit(1)📝 Committable suggestion
Suggested change
🧰 Tools🪛 GitHub Actions: Static checks / 0_static checks.txt[error] 92-92: ruff check (F841): Local variable 🪛 GitHub Actions: Static checks / static checks[error] 92-92: ruff (F841) Local variable 🪛 Ruff (0.15.15)[warning] 92-92: Do not catch blind exception: (BLE001) 🤖 Prompt for AI AgentsSources: Linters/SAST tools, Pipeline failures |
||||||||||||||||
| import traceback | ||||||||||||||||
| traceback.print_exc() | ||||||||||||||||
| sys.exit(1) | ||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix formatting issues flagged by pre-commit hooks across both files.
Both
clone_oracle_table.pyandclone_oracle_all_table.shtriggered pre-commit hook failures for trailing whitespace and end-of-file formatting. The hooks have automatically applied fixes. Please ensure you commit the corrected versions to resolve the pipeline failures.🧰 Tools
🪛 GitHub Actions: Static checks / 0_static checks.txt
[error] 1-1: pre-commit hook
trailing-whitespacefailed (exit code 1). Trailing whitespace detected and fixes were applied.[error] 1-1: pre-commit hook
end-of-file-fixerfailed (exit code 1). End-of-file formatting detected and fixes were applied.🤖 Prompt for AI Agents
Source: Pipeline failures