Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions warehouses/fase/ingest/clone_oracle_all_table.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/bin/bash

# Define the list of schemas to process in "SCHEMA_NAME:INCLUDE_VIEWS" format
# Set the second value to 'true' if you want to clone views for that schema
SCHEMAS=(
"FACILITY_COMMON:false"
"FACILITY_ERAS:false"
"FACILITY_SAFETY_TEST:false"
"FACILITY_SCHEDULE:false"
"FACILITY_VISITS:false"
"ISISUSERDB:false"
"PROPOSAL_REGISTRY:false"
"IOPS_V4:true"
"PROP_CLF_ALL:true"
"PROP_STATUS:true"
"XPRESS_V4:true"
)

echo "Starting Oracle table cloning process..."
echo "---------------------------------------"

# Loop through each entry
for ENTRY in "${SCHEMAS[@]}"
do
# Split the entry into schema name and the include_views flag
SCHEMA="${ENTRY%%:*}"
INCLUDE_VIEWS="${ENTRY#*:}"

# Determine if we should add the flag to the command
VIEW_FLAG=""
if [ "$INCLUDE_VIEWS" = "true" ]; then
VIEW_FLAG="--include-views"
fi

echo "Running: python3 ./clone_oracle_table.py $SCHEMA all $VIEW_FLAG"

# Execute the command
python3 ./clone_oracle_table.py "$SCHEMA" all $VIEW_FLAG

# Check if the previous command succeeded
if [ $? -eq 0 ]; then
echo "✅ COMPLETED: $SCHEMA (Include Views: $INCLUDE_VIEWS)"
else
echo "❌ FAILED: $SCHEMA"
fi

echo "---------------------------------------"
done

echo "All tasks finished."
180 changes: 180 additions & 0 deletions warehouses/fase/ingest/clone_oracle_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
#!/usr/bin/env python3

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix formatting issues flagged by pre-commit hooks across both files.

Both clone_oracle_table.py and clone_oracle_all_table.sh triggered pre-commit hook failures for trailing whitespace and end-of-file formatting. The hooks have automatically applied fixes. Please ensure you commit the corrected versions to resolve the pipeline failures.

🧰 Tools
🪛 GitHub Actions: Static checks / 0_static checks.txt

[error] 1-1: pre-commit hook trailing-whitespace failed (exit code 1). Trailing whitespace detected and fixes were applied.


[error] 1-1: pre-commit hook end-of-file-fixer failed (exit code 1). End-of-file formatting detected and fixes were applied.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_oracle_table.py` at line 1, Remove trailing
whitespace and normalize end-of-file newlines in both files to satisfy
pre-commit hooks: open warehouses/fase/ingest/clone_oracle_table.py (starting at
the shebang "#!/usr/bin/env python3") and remove any trailing spaces on lines
and ensure a single newline at EOF, and do the same for
clone_oracle_all_table.sh so the pre-commit fixes are committed and the pipeline
passes.

Source: Pipeline failures

import os
# Set session default timezone to UTC to help bypass potential ORA-01805 version mismatches
os.environ["ORA_SDTZ"] = "UTC"

import argparse
import dlt
import cx_Oracle
import dateutil.parser

from dlt.sources.sql_database import sql_database
from elt_common.dlt_destinations import pyiceberg
from sqlalchemy import create_engine, event
from sqlalchemy.pool import NullPool

# ----------------------------
# Arguments
# ----------------------------
def parse_args():
parser = argparse.ArgumentParser(
description="Run DLT pipeline for a specific Oracle table or ALL tables"
)
parser.add_argument("target_schema", help="Oracle schema name")
parser.add_argument("target_table", help="Oracle table name to clone or 'all' for entire schema")
parser.add_argument(
"--drop",
action="store_true",
help="Force drop the destination table and clear pipeline state before running"
)
# NEW ARGUMENT ADDED HERE
parser.add_argument(
"--include-views",
action="store_true",
default=False,
help="If set, includes views in the cloning process (default: False)"
)
return parser.parse_args()

# ----------------------------
# Debug Mapper: Inspecting Nulls and Types
# ----------------------------
def robust_data_mapper(item):
date_keywords = ['DATE', 'TIME', 'TIMESTAMP']

if 'PAYLOAD' in item or 'payload' in item:
key = 'PAYLOAD' if 'PAYLOAD' in item else 'payload'
val = item[key]
if val is None:
item[key] = ""

for key, value in item.items():
if isinstance(value, str) and any(kw in key.upper() for kw in date_keywords):
try:
item[key] = dateutil.parser.parse(value)
except (ValueError, TypeError):
pass
return item
Comment on lines +42 to +57

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Pattern-based date parsing may produce false positives.

The function uses keyword matching (DATE, TIME, TIMESTAMP in column name) to decide which string values to parse as datetimes. This heuristic may incorrectly attempt to parse columns like UPDATE_TIMESTAMP_STRING that are intentionally strings. Whilst the try/except provides a safe fallback, consider verifying the actual column data type from schema metadata rather than relying solely on naming conventions.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_oracle_table.py` around lines 42 - 57, The
robust_data_mapper currently infers datetime fields by column name
(date_keywords) which causes false positives; update robust_data_mapper to
prefer explicit schema/type metadata when available (e.g., accept a schema dict
or column type map) and only apply dateutil.parser.parse for keys whose schema
type indicates a DATE/TIME/TIMESTAMP; keep the existing try/except fallback for
safety and preserve the PAYLOAD/payload handling. Ensure you reference and use
the date_keywords, robust_data_mapper function signature, and the item keys to
locate where to add the schema lookups and conditional parsing.


# ----------------------------
# No PK handling
# ----------------------------
def disable_dlt_identifiers_if_no_pk(resource):
has_pk = any(
col.get("primary_key", False)
for col in resource.columns.values()
)
if not has_pk:
resource.apply_hints(primary_key=[])

# ----------------------------
# Oracle type fix (Timestamp & LOBs)
# ----------------------------
def oracle_output_type_handler(cursor, name, default_type, size, precision, scale):
if default_type in (cx_Oracle.DATETIME, cx_Oracle.TIMESTAMP, getattr(cx_Oracle, 'TIMESTAMPTZ', -1)):
return cursor.var(str, 128, arraysize=cursor.arraysize)
if default_type == cx_Oracle.CLOB:
return cursor.var(cx_Oracle.LONG_STRING, arraysize=cursor.arraysize)
if default_type == cx_Oracle.BLOB:
return cursor.var(cx_Oracle.LONG_BINARY, arraysize=cursor.arraysize)

# ----------------------------
# Robust Schema Hints
# ----------------------------
def apply_robust_hints(resource):
new_cols = {}
for col_name, col_meta in resource.columns.items():
is_pk = col_meta.get("primary_key", False)
data_type = col_meta.get("data_type")
hint = {}

if not is_pk:
hint["nullable"] = True
else:
hint["primary_key"] = True
hint["nullable"] = False

if any(kw in col_name.upper() for kw in ['DATE', 'TIME', 'TIMESTAMP']):
hint["data_type"] = "timestamp"

if data_type in ("decimal", "double", "float"):
hint["data_type"] = "double"
Comment on lines +100 to +101

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Mapping decimal to double may lose precision.

Converting decimal types to double (floating-point) can introduce rounding errors and lose precision, particularly problematic for financial data or exact numeric calculations. Oracle NUMBER columns with specific precision/scale should preserve their exact representation. Consider mapping decimals to a fixed-precision type or preserving the decimal type if supported by Iceberg.

🔢 Proposed fix to preserve decimal precision
-        if data_type in ("decimal", "double", "float"):
+        if data_type in ("double", "float"):
             hint["data_type"] = "double"
+        # Preserve decimal types to avoid precision loss
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_oracle_table.py` around lines 100 - 101, The
code currently maps "decimal" (and "double","float") to "double", which loses
precision; update the mapping in clone_oracle_table.py so "decimal"/Oracle
NUMBER preserves its exact representation by setting hint["data_type"] =
"decimal" and populating hint["precision"] and hint["scale"] from the column
metadata (e.g., the column/column_metadata object you use when inspecting
columns); only fall back to "double" if precision/scale are unavailable and
document that fallback, and ensure any downstream function that reads hint
(e.g., the code that emits Iceberg/target schemas) respects the precision/scale
fields.


if hint:
new_cols[col_name] = hint

if new_cols:
resource.apply_hints(columns=new_cols)

# ----------------------------
# DLT Oracle source
# ----------------------------
@dlt.source()
def oracle_source(oracle_uri: str, schema_name: str, table_name: str, include_views: bool = False):
engine = create_engine(
oracle_uri,
poolclass=NullPool,
connect_args={"encoding": "UTF-8", "nencoding": "UTF-8"},
)

@event.listens_for(engine, "connect")
def on_connect(dbapi_conn, _):
dbapi_conn.outputtypehandler = oracle_output_type_handler
cursor = dbapi_conn.cursor()
cursor.execute("""
ALTER SESSION SET
NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'
NLS_TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF'
NLS_TIMESTAMP_TZ_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'
""")
cursor.close()

tables = None if table_name.lower() == 'all' else [table_name]

# PASSING THE ARGUMENT HERE
source = sql_database(
credentials=engine,
schema=schema_name,
include_views=include_views,
table_names=tables
)

for resource in source.resources.values():
apply_robust_hints(resource)
disable_dlt_identifiers_if_no_pk(resource)
resource.add_map(robust_data_mapper)

return source

if __name__ == "__main__":
args = parse_args()

oracle_uri = (
f"oracle+cx_oracle://{args.target_schema}:pa55w0rdTolocalDB"
"@localhost:1521/?service_name=xepdb"
)
Comment on lines +152 to +155

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Critical: Remove hardcoded credentials.

The password pa55w0rdTolocalDB is hardcoded in plaintext in the connection string. This is a critical security vulnerability that exposes credentials in source control and logs. Even for local development, credentials must be externalised to environment variables, secrets managers, or secure configuration files.

🔐 Proposed fix using environment variables
+    oracle_password = os.environ.get("ORACLE_PASSWORD")
+    if not oracle_password:
+        raise ValueError("ORACLE_PASSWORD environment variable must be set")
+    
     oracle_uri = (
-        f"oracle+cx_oracle://{args.target_schema}:pa55w0rdTolocalDB"
+        f"oracle+cx_oracle://{args.target_schema}:{oracle_password}"
         "`@localhost`:1521/?service_name=xepdb"
     )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_oracle_table.py` around lines 152 - 155, The
oracle_uri currently contains a hardcoded plaintext password; update the
construction of oracle_uri to read the DB password from a secure source (e.g.,
an environment variable or secrets manager) instead of embedding
"pa55w0rdTolocalDB", using args.target_schema for the username as before; add a
guard to fail fast with a clear error if the password env var (or secret lookup)
is missing and ensure no sensitive value is logged or committed to source
control. Reference: the oracle_uri variable and args.target_schema in
clone_oracle_table.py.


pipeline_id = f"debug_oracle_{args.target_schema.lower()}_{args.target_table.lower()}"

pipeline = dlt.pipeline(
pipeline_name=pipeline_id,
destination=pyiceberg,
dataset_name=args.target_schema.lower()
)

if args.drop:
print(f"\n--- [RESET] Dropping state for: {pipeline_id} ---")
pipeline.drop()

try:
# PASSING THE ARGUMENT FROM ARGS
source_data = oracle_source(
oracle_uri,
args.target_schema,
args.target_table,
include_views=args.include_views
)
info = pipeline.run(source_data, write_disposition="replace")
print(info)
except Exception:
raise
95 changes: 95 additions & 0 deletions warehouses/fase/ingest/clone_postgresql_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import dlt
import pandas as pd
import json
import argparse
import sys
from sqlalchemy import create_engine, inspect
from elt_common.dlt_destinations import pyiceberg

BASE_URI = "postgresql://duouser:duopassword@localhost:5432/duo"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Hardcoded database credentials expose a critical security risk.

The connection URI contains plaintext username and password that will be committed to version control and visible in repository history. Credentials must never be hardcoded.

🔒 Proposed fix using environment variables
+import os
+
-BASE_URI = "postgresql://duouser:duopassword@localhost:5432/duo"
+BASE_URI = os.environ.get(
+    "POSTGRES_URI",
+    "postgresql://localhost:5432/duo"  # Default without credentials for local dev
+)

Alternatively, integrate with the project's existing credential management system if available.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
BASE_URI = "postgresql://duouser:duopassword@localhost:5432/duo"
import os
BASE_URI = os.environ.get(
"POSTGRES_URI",
"postgresql://localhost:5432/duo" # Default without credentials for local dev
)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_postgresql_table.py` at line 9, The BASE_URI
constant currently contains hardcoded DB credentials; replace this with a
runtime-configured value sourced from environment/config secrets (e.g., read
DATABASE_URL or separate DB_USER/DB_PASS/DB_HOST/DB_NAME env vars) and remove
the plaintext credentials from BASE_URI; update any code referencing BASE_URI to
use the new env-derived value and, if your project uses a secrets manager or
config loader, integrate with that instead of embedding credentials in the file.


def map_pg_to_dlt_type(pg_type):
"""Maps SQLAlchemy types to dlt core types."""
t = str(pg_type).lower()
if 'int' in t: return 'bigint'
if 'bool' in t: return 'bool'
if 'json' in t: return 'text'
if 'float' in t or 'numeric' in t or 'double' in t: return 'double'
if 'timestamp' in t: return 'timestamp'
if 'date' in t: return 'date'
return 'text'
Comment on lines +14 to +20

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Fix formatting violations blocking CI.

Lines 14–19 violate E701 (multiple statements on one line), causing static checks to fail.

🔧 Proposed fix to expand single-line conditionals
-    if 'int' in t: return 'bigint'
-    if 'bool' in t: return 'bool'
-    if 'json' in t: return 'text' 
-    if 'float' in t or 'numeric' in t or 'double' in t: return 'double'
-    if 'timestamp' in t: return 'timestamp'
-    if 'date' in t: return 'date'
+    if 'int' in t:
+        return 'bigint'
+    if 'bool' in t:
+        return 'bool'
+    if 'json' in t:
+        return 'text'
+    if 'float' in t or 'numeric' in t or 'double' in t:
+        return 'double'
+    if 'timestamp' in t:
+        return 'timestamp'
+    if 'date' in t:
+        return 'date'
     return 'text'
🧰 Tools
🪛 GitHub Actions: Static checks / 0_static checks.txt

[error] 14-14: ruff check (E701): Multiple statements on one line (colon) at if 'int' in t: return 'bigint'.


[error] 15-15: ruff check (E701): Multiple statements on one line (colon) at if 'bool' in t: return 'bool'.


[error] 16-16: ruff check (E701): Multiple statements on one line (colon) at if 'json' in t: return 'text'.


[error] 17-17: ruff check (E701): Multiple statements on one line (colon) at if 'float' in t or 'numeric' in t or 'double' in t: return 'double'.


[error] 18-18: ruff check (E701): Multiple statements on one line (colon) at if 'timestamp' in t: return 'timestamp'.


[error] 19-19: ruff check (E701): Multiple statements on one line (colon) at if 'date' in t: return 'date'.

🪛 GitHub Actions: Static checks / static checks

[error] 14-14: ruff (E701) Multiple statements on one line (colon) at: if 'int' in t: return 'bigint'


[error] 15-15: ruff (E701) Multiple statements on one line (colon) at: if 'bool' in t: return 'bool'


[error] 16-16: ruff (E701) Multiple statements on one line (colon) at: if 'json' in t: return 'text'


[error] 17-17: ruff (E701) Multiple statements on one line (colon) at: if 'float' in t or 'numeric' in t or 'double' in t: return 'double'


[error] 18-18: ruff (E701) Multiple statements on one line (colon) at: if 'timestamp' in t: return 'timestamp'


[error] 19-19: ruff (E701) Multiple statements on one line (colon) at: if 'date' in t: return 'date'

🪛 Ruff (0.15.15)

[error] 14-14: Multiple statements on one line (colon)

(E701)


[error] 15-15: Multiple statements on one line (colon)

(E701)


[error] 16-16: Multiple statements on one line (colon)

(E701)


[error] 17-17: Multiple statements on one line (colon)

(E701)


[error] 18-18: Multiple statements on one line (colon)

(E701)


[error] 19-19: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_postgresql_table.py` around lines 14 - 20, The
current type-mapping block in clone_postgresql_table.py uses multiple
single-line if statements (e.g., "if 'int' in t: return 'bigint'") which
triggers E701; expand each conditional into its own multi-line if statement with
the condition on one line and the return on the next, remove any trailing spaces
(e.g., after 'text'), and ensure consistent spacing/indentation for all cases
(int→bigint, bool→bool, json→text, float/numeric/double→double,
timestamp→timestamp, date→date, default return 'text') so the linter passes.

Sources: Linters/SAST tools, Pipeline failures


def get_table_metadata(engine, schema, table_name):
inspector = inspect(engine)
columns = inspector.get_columns(table_name, schema=schema)
json_cols = [c['name'] for c in columns if 'json' in str(c['type']).lower()]
return json_cols, columns

@dlt.source
def postgres_dynamic_source(engine, schema, target_table):
inspector = inspect(engine)
tables = [target_table] if target_table.lower() != 'all' else inspector.get_table_names(schema=schema)

for table_name in tables:
json_cols, columns = get_table_metadata(engine, schema, table_name)

# Build hints and a dummy row
col_hints = {}
dummy_row = {}

for c in columns:
col_name = c['name']
col_hints[col_name] = {
"name": col_name,
"data_type": map_pg_to_dlt_type(c['type']),
"nullable": True # FORCE nullable for the dummy load
}
dummy_row[col_name] = None

def make_resource(t_name, j_cols, hints, d_row):
@dlt.resource(
name=t_name,
write_disposition="replace",
columns=hints,
table_format="iceberg"
)
def load_resource():
query = f'SELECT * FROM "{schema}"."{t_name}"'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid string interpolation for SQL identifiers.

The f-string query construction is flagged as a potential SQL injection vector (S608). Whilst schema and t_name originate from SQLAlchemy's inspector (a controlled source), this pattern is fragile and could introduce risk if the code is refactored.

🛡️ Proposed fix using SQLAlchemy text with bound parameters or proper quoting
+from sqlalchemy import text
+
-                query = f'SELECT * FROM "{schema}"."{t_name}"'
-                df = pd.read_sql(query, engine)
+                # Use SQLAlchemy text and proper identifier binding
+                query = text(f'SELECT * FROM "{schema}"."{t_name}"')
+                df = pd.read_sql(query, engine)

Alternatively, use pandas read_sql_table for safer identifier handling:

-                query = f'SELECT * FROM "{schema}"."{t_name}"'
-                df = pd.read_sql(query, engine)
+                df = pd.read_sql_table(t_name, engine, schema=schema)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
query = f'SELECT * FROM "{schema}"."{t_name}"'
df = pd.read_sql_table(t_name, engine, schema=schema)
🧰 Tools
🪛 Ruff (0.15.15)

[error] 57-57: Possible SQL injection vector through string-based query construction

(S608)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_postgresql_table.py` at line 57, The query built
with f'SELECT * FROM "{schema}"."{t_name}"' uses string interpolation for SQL
identifiers and should be replaced with a safe identifier-aware API; locate the
query assignment (variables schema, t_name, query) and change it to use
SQLAlchemy's Table/metadata reflection or pandas.read_sql_table so identifiers
are quoted/handled properly (e.g., reflect or construct a
sqlalchemy.schema.Table for t_name+schema and issue a select(table) or use
pandas.read_sql_table with schema argument) instead of formatting identifiers
into a raw SQL string.

Source: Linters/SAST tools

df = pd.read_sql(query, engine)

if not df.empty:
for col in j_cols:
if col in df.columns:
df[col] = df[col].apply(
lambda x: json.dumps(x) if isinstance(x, (dict, list)) else (str(x) if x is not None else None)
)
yield df.to_dict(orient="records")
else:
print(f"--- Table {t_name} is empty. Yielding dummy row to force Iceberg creation. ---")
# Yielding a list with one dictionary containing Nones forces a load package
yield [d_row]

return load_resource

yield make_resource(table_name, json_cols, col_hints, dummy_row)

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("schema")
parser.add_argument("table")
args = parser.parse_args()

engine = create_engine(BASE_URI)
pipeline = dlt.pipeline(
pipeline_name=f"pg_to_iceberg_{args.schema}",
destination=pyiceberg,
dataset_name=args.schema,
)

try:
info = pipeline.run(postgres_dynamic_source(engine, args.schema, args.table))
print(info)
except Exception as e:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Remove unused exception variable to fix CI.

The exception variable e is assigned but never used (F841), causing static checks to fail.

🔧 Proposed fix
-    except Exception as e:
+    except Exception:
         import traceback
         traceback.print_exc()
         sys.exit(1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
except Exception:
import traceback
traceback.print_exc()
sys.exit(1)
🧰 Tools
🪛 GitHub Actions: Static checks / 0_static checks.txt

[error] 92-92: ruff check (F841): Local variable e is assigned to but never used in except Exception as e:.

🪛 GitHub Actions: Static checks / static checks

[error] 92-92: ruff (F841) Local variable e is assigned to but never used. Remove assignment to unused variable e.

🪛 Ruff (0.15.15)

[warning] 92-92: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_postgresql_table.py` at line 92, Change the
except clause that currently binds an unused variable (the line with "except
Exception as e") so it no longer assigns the unused name; locate the try/except
block in clone_postgresql_table.py and replace the "except Exception as e"
clause with an except that does not bind the exception (or binds it to _ if you
intend to ignore it), leaving the existing exception handling body unchanged.

Sources: Linters/SAST tools, Pipeline failures

import traceback
traceback.print_exc()
sys.exit(1)
Loading