Feat: Implement FASE data ingestion pipeline (Oracle & Postgres to Iceberg)#341
Feat: Implement FASE data ingestion pipeline (Oracle & Postgres to Iceberg)#341bashanlam wants to merge 1 commit into
Conversation
… Iceberg destination loading.
📝 WalkthroughWalkthroughThis PR adds three new ingestion scripts to the FASE warehouse ingest directory. The ChangesOracle Table Ingestion to Iceberg
PostgreSQL Table Ingestion to Iceberg
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (3)
warehouses/fase/ingest/clone_oracle_table.py (1)
62-68: 💤 Low valueConsider renaming for clarity.
The function name
disable_dlt_identifiers_if_no_pksuggests it disables multiple identifiers, but the implementation only clears theprimary_keyhint. Consider renaming toclear_primary_key_if_absentor similar to better reflect its single responsibility.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@warehouses/fase/ingest/clone_oracle_table.py` around lines 62 - 68, The function disable_dlt_identifiers_if_no_pk misleads readers because it only clears the primary_key hint; rename the function to clear_primary_key_if_absent (or similar) and update all references/calls accordingly; inside the function keep the existing logic that checks columns for a primary_key and calls resource.apply_hints(primary_key=[]) when none exists, but change the function name declaration and any imports/usages to the new identifier to maintain consistency.warehouses/fase/ingest/clone_oracle_all_table.sh (1)
38-45: ⚡ Quick winCheck exit code directly to improve robustness.
The script captures
$?and tests it indirectly. Directly testing the command improves readability and handles edge cases where$?might be unexpectedly modified. This addresses shellcheck SC2181.♻️ Proposed fix
- # Execute the command - python3 ./clone_oracle_table.py "$SCHEMA" all $VIEW_FLAG - - # Check if the previous command succeeded - if [ $? -eq 0 ]; then + # Execute the command and check result + if python3 ./clone_oracle_table.py "$SCHEMA" all $VIEW_FLAG; then echo "✅ COMPLETED: $SCHEMA (Include Views: $INCLUDE_VIEWS)" else echo "❌ FAILED: $SCHEMA"🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@warehouses/fase/ingest/clone_oracle_all_table.sh` around lines 38 - 45, Replace the indirect exit-code check for the clone command with a direct test of the command itself: run python3 ./clone_oracle_table.py "$SCHEMA" all $VIEW_FLAG inside an if-statement (if python3 ./clone_oracle_table.py "$SCHEMA" all $VIEW_FLAG; then ... else ... fi) so you don't rely on a separate $? read; update the success/failure echo lines that reference $SCHEMA and $INCLUDE_VIEWS accordingly to keep behavior identical and avoid shellcheck SC2181 warnings.Source: Linters/SAST tools
warehouses/fase/ingest/clone_postgresql_table.py (1)
31-31: 💤 Low valueCase-insensitive "all" check may conflict with actual table names.
The check
target_table.lower() != 'all'prevents ingesting a table literally named "all", "All", or "ALL". Consider an exact match or a reserved keyword pattern instead.♻️ Proposed fix using case-sensitive check
- tables = [target_table] if target_table.lower() != 'all' else inspector.get_table_names(schema=schema) + tables = [target_table] if target_table != 'all' else inspector.get_table_names(schema=schema)Document the reserved keyword "all" in CLI help text.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@warehouses/fase/ingest/clone_postgresql_table.py` at line 31, The current check lowercases target_table so a real table named "all"/"All"/"ALL" is treated as the special keyword; change the logic in the ingestion selection so it uses an exact, case-sensitive sentinel (e.g., compare target_table == 'all' without lowercasing) or better yet require an explicit flag (e.g., --all) and treat target_table literally; update the code paths that build tables (the expression assigning tables, and any use of target_table, inspector.get_table_names, schema) to respect the new sentinel behavior and update CLI/help text to document that "all" is reserved or that --all must be used to ingest every table.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@warehouses/fase/ingest/clone_oracle_table.py`:
- Around line 42-57: The robust_data_mapper currently infers datetime fields by
column name (date_keywords) which causes false positives; update
robust_data_mapper to prefer explicit schema/type metadata when available (e.g.,
accept a schema dict or column type map) and only apply dateutil.parser.parse
for keys whose schema type indicates a DATE/TIME/TIMESTAMP; keep the existing
try/except fallback for safety and preserve the PAYLOAD/payload handling. Ensure
you reference and use the date_keywords, robust_data_mapper function signature,
and the item keys to locate where to add the schema lookups and conditional
parsing.
- Line 1: Remove trailing whitespace and normalize end-of-file newlines in both
files to satisfy pre-commit hooks: open
warehouses/fase/ingest/clone_oracle_table.py (starting at the shebang
"#!/usr/bin/env python3") and remove any trailing spaces on lines and ensure a
single newline at EOF, and do the same for clone_oracle_all_table.sh so the
pre-commit fixes are committed and the pipeline passes.
- Around line 100-101: The code currently maps "decimal" (and "double","float")
to "double", which loses precision; update the mapping in clone_oracle_table.py
so "decimal"/Oracle NUMBER preserves its exact representation by setting
hint["data_type"] = "decimal" and populating hint["precision"] and hint["scale"]
from the column metadata (e.g., the column/column_metadata object you use when
inspecting columns); only fall back to "double" if precision/scale are
unavailable and document that fallback, and ensure any downstream function that
reads hint (e.g., the code that emits Iceberg/target schemas) respects the
precision/scale fields.
- Around line 152-155: The oracle_uri currently contains a hardcoded plaintext
password; update the construction of oracle_uri to read the DB password from a
secure source (e.g., an environment variable or secrets manager) instead of
embedding "pa55w0rdTolocalDB", using args.target_schema for the username as
before; add a guard to fail fast with a clear error if the password env var (or
secret lookup) is missing and ensure no sensitive value is logged or committed
to source control. Reference: the oracle_uri variable and args.target_schema in
clone_oracle_table.py.
In `@warehouses/fase/ingest/clone_postgresql_table.py`:
- Around line 14-20: The current type-mapping block in clone_postgresql_table.py
uses multiple single-line if statements (e.g., "if 'int' in t: return 'bigint'")
which triggers E701; expand each conditional into its own multi-line if
statement with the condition on one line and the return on the next, remove any
trailing spaces (e.g., after 'text'), and ensure consistent spacing/indentation
for all cases (int→bigint, bool→bool, json→text, float/numeric/double→double,
timestamp→timestamp, date→date, default return 'text') so the linter passes.
- Line 92: Change the except clause that currently binds an unused variable (the
line with "except Exception as e") so it no longer assigns the unused name;
locate the try/except block in clone_postgresql_table.py and replace the "except
Exception as e" clause with an except that does not bind the exception (or binds
it to _ if you intend to ignore it), leaving the existing exception handling
body unchanged.
- Line 9: The BASE_URI constant currently contains hardcoded DB credentials;
replace this with a runtime-configured value sourced from environment/config
secrets (e.g., read DATABASE_URL or separate DB_USER/DB_PASS/DB_HOST/DB_NAME env
vars) and remove the plaintext credentials from BASE_URI; update any code
referencing BASE_URI to use the new env-derived value and, if your project uses
a secrets manager or config loader, integrate with that instead of embedding
credentials in the file.
- Line 57: The query built with f'SELECT * FROM "{schema}"."{t_name}"' uses
string interpolation for SQL identifiers and should be replaced with a safe
identifier-aware API; locate the query assignment (variables schema, t_name,
query) and change it to use SQLAlchemy's Table/metadata reflection or
pandas.read_sql_table so identifiers are quoted/handled properly (e.g., reflect
or construct a sqlalchemy.schema.Table for t_name+schema and issue a
select(table) or use pandas.read_sql_table with schema argument) instead of
formatting identifiers into a raw SQL string.
---
Nitpick comments:
In `@warehouses/fase/ingest/clone_oracle_all_table.sh`:
- Around line 38-45: Replace the indirect exit-code check for the clone command
with a direct test of the command itself: run python3 ./clone_oracle_table.py
"$SCHEMA" all $VIEW_FLAG inside an if-statement (if python3
./clone_oracle_table.py "$SCHEMA" all $VIEW_FLAG; then ... else ... fi) so you
don't rely on a separate $? read; update the success/failure echo lines that
reference $SCHEMA and $INCLUDE_VIEWS accordingly to keep behavior identical and
avoid shellcheck SC2181 warnings.
In `@warehouses/fase/ingest/clone_oracle_table.py`:
- Around line 62-68: The function disable_dlt_identifiers_if_no_pk misleads
readers because it only clears the primary_key hint; rename the function to
clear_primary_key_if_absent (or similar) and update all references/calls
accordingly; inside the function keep the existing logic that checks columns for
a primary_key and calls resource.apply_hints(primary_key=[]) when none exists,
but change the function name declaration and any imports/usages to the new
identifier to maintain consistency.
In `@warehouses/fase/ingest/clone_postgresql_table.py`:
- Line 31: The current check lowercases target_table so a real table named
"all"/"All"/"ALL" is treated as the special keyword; change the logic in the
ingestion selection so it uses an exact, case-sensitive sentinel (e.g., compare
target_table == 'all' without lowercasing) or better yet require an explicit
flag (e.g., --all) and treat target_table literally; update the code paths that
build tables (the expression assigning tables, and any use of target_table,
inspector.get_table_names, schema) to respect the new sentinel behavior and
update CLI/help text to document that "all" is reserved or that --all must be
used to ingest every table.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 8f797a6a-dad2-4828-b58b-fbbeb7e362d5
📒 Files selected for processing (3)
warehouses/fase/ingest/clone_oracle_all_table.shwarehouses/fase/ingest/clone_oracle_table.pywarehouses/fase/ingest/clone_postgresql_table.py
| @@ -0,0 +1,180 @@ | |||
| #!/usr/bin/env python3 | |||
There was a problem hiding this comment.
Fix formatting issues flagged by pre-commit hooks across both files.
Both clone_oracle_table.py and clone_oracle_all_table.sh triggered pre-commit hook failures for trailing whitespace and end-of-file formatting. The hooks have automatically applied fixes. Please ensure you commit the corrected versions to resolve the pipeline failures.
🧰 Tools
🪛 GitHub Actions: Static checks / 0_static checks.txt
[error] 1-1: pre-commit hook trailing-whitespace failed (exit code 1). Trailing whitespace detected and fixes were applied.
[error] 1-1: pre-commit hook end-of-file-fixer failed (exit code 1). End-of-file formatting detected and fixes were applied.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@warehouses/fase/ingest/clone_oracle_table.py` at line 1, Remove trailing
whitespace and normalize end-of-file newlines in both files to satisfy
pre-commit hooks: open warehouses/fase/ingest/clone_oracle_table.py (starting at
the shebang "#!/usr/bin/env python3") and remove any trailing spaces on lines
and ensure a single newline at EOF, and do the same for
clone_oracle_all_table.sh so the pre-commit fixes are committed and the pipeline
passes.
Source: Pipeline failures
| def robust_data_mapper(item): | ||
| date_keywords = ['DATE', 'TIME', 'TIMESTAMP'] | ||
|
|
||
| if 'PAYLOAD' in item or 'payload' in item: | ||
| key = 'PAYLOAD' if 'PAYLOAD' in item else 'payload' | ||
| val = item[key] | ||
| if val is None: | ||
| item[key] = "" | ||
|
|
||
| for key, value in item.items(): | ||
| if isinstance(value, str) and any(kw in key.upper() for kw in date_keywords): | ||
| try: | ||
| item[key] = dateutil.parser.parse(value) | ||
| except (ValueError, TypeError): | ||
| pass | ||
| return item |
There was a problem hiding this comment.
Pattern-based date parsing may produce false positives.
The function uses keyword matching (DATE, TIME, TIMESTAMP in column name) to decide which string values to parse as datetimes. This heuristic may incorrectly attempt to parse columns like UPDATE_TIMESTAMP_STRING that are intentionally strings. Whilst the try/except provides a safe fallback, consider verifying the actual column data type from schema metadata rather than relying solely on naming conventions.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@warehouses/fase/ingest/clone_oracle_table.py` around lines 42 - 57, The
robust_data_mapper currently infers datetime fields by column name
(date_keywords) which causes false positives; update robust_data_mapper to
prefer explicit schema/type metadata when available (e.g., accept a schema dict
or column type map) and only apply dateutil.parser.parse for keys whose schema
type indicates a DATE/TIME/TIMESTAMP; keep the existing try/except fallback for
safety and preserve the PAYLOAD/payload handling. Ensure you reference and use
the date_keywords, robust_data_mapper function signature, and the item keys to
locate where to add the schema lookups and conditional parsing.
| if data_type in ("decimal", "double", "float"): | ||
| hint["data_type"] = "double" |
There was a problem hiding this comment.
Mapping decimal to double may lose precision.
Converting decimal types to double (floating-point) can introduce rounding errors and lose precision, particularly problematic for financial data or exact numeric calculations. Oracle NUMBER columns with specific precision/scale should preserve their exact representation. Consider mapping decimals to a fixed-precision type or preserving the decimal type if supported by Iceberg.
🔢 Proposed fix to preserve decimal precision
- if data_type in ("decimal", "double", "float"):
+ if data_type in ("double", "float"):
hint["data_type"] = "double"
+ # Preserve decimal types to avoid precision loss🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@warehouses/fase/ingest/clone_oracle_table.py` around lines 100 - 101, The
code currently maps "decimal" (and "double","float") to "double", which loses
precision; update the mapping in clone_oracle_table.py so "decimal"/Oracle
NUMBER preserves its exact representation by setting hint["data_type"] =
"decimal" and populating hint["precision"] and hint["scale"] from the column
metadata (e.g., the column/column_metadata object you use when inspecting
columns); only fall back to "double" if precision/scale are unavailable and
document that fallback, and ensure any downstream function that reads hint
(e.g., the code that emits Iceberg/target schemas) respects the precision/scale
fields.
| oracle_uri = ( | ||
| f"oracle+cx_oracle://{args.target_schema}:pa55w0rdTolocalDB" | ||
| "@localhost:1521/?service_name=xepdb" | ||
| ) |
There was a problem hiding this comment.
Critical: Remove hardcoded credentials.
The password pa55w0rdTolocalDB is hardcoded in plaintext in the connection string. This is a critical security vulnerability that exposes credentials in source control and logs. Even for local development, credentials must be externalised to environment variables, secrets managers, or secure configuration files.
🔐 Proposed fix using environment variables
+ oracle_password = os.environ.get("ORACLE_PASSWORD")
+ if not oracle_password:
+ raise ValueError("ORACLE_PASSWORD environment variable must be set")
+
oracle_uri = (
- f"oracle+cx_oracle://{args.target_schema}:pa55w0rdTolocalDB"
+ f"oracle+cx_oracle://{args.target_schema}:{oracle_password}"
"`@localhost`:1521/?service_name=xepdb"
)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@warehouses/fase/ingest/clone_oracle_table.py` around lines 152 - 155, The
oracle_uri currently contains a hardcoded plaintext password; update the
construction of oracle_uri to read the DB password from a secure source (e.g.,
an environment variable or secrets manager) instead of embedding
"pa55w0rdTolocalDB", using args.target_schema for the username as before; add a
guard to fail fast with a clear error if the password env var (or secret lookup)
is missing and ensure no sensitive value is logged or committed to source
control. Reference: the oracle_uri variable and args.target_schema in
clone_oracle_table.py.
| from sqlalchemy import create_engine, inspect | ||
| from elt_common.dlt_destinations import pyiceberg | ||
|
|
||
| BASE_URI = "postgresql://duouser:duopassword@localhost:5432/duo" |
There was a problem hiding this comment.
Hardcoded database credentials expose a critical security risk.
The connection URI contains plaintext username and password that will be committed to version control and visible in repository history. Credentials must never be hardcoded.
🔒 Proposed fix using environment variables
+import os
+
-BASE_URI = "postgresql://duouser:duopassword@localhost:5432/duo"
+BASE_URI = os.environ.get(
+ "POSTGRES_URI",
+ "postgresql://localhost:5432/duo" # Default without credentials for local dev
+)Alternatively, integrate with the project's existing credential management system if available.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| BASE_URI = "postgresql://duouser:duopassword@localhost:5432/duo" | |
| import os | |
| BASE_URI = os.environ.get( | |
| "POSTGRES_URI", | |
| "postgresql://localhost:5432/duo" # Default without credentials for local dev | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@warehouses/fase/ingest/clone_postgresql_table.py` at line 9, The BASE_URI
constant currently contains hardcoded DB credentials; replace this with a
runtime-configured value sourced from environment/config secrets (e.g., read
DATABASE_URL or separate DB_USER/DB_PASS/DB_HOST/DB_NAME env vars) and remove
the plaintext credentials from BASE_URI; update any code referencing BASE_URI to
use the new env-derived value and, if your project uses a secrets manager or
config loader, integrate with that instead of embedding credentials in the file.
| if 'int' in t: return 'bigint' | ||
| if 'bool' in t: return 'bool' | ||
| if 'json' in t: return 'text' | ||
| if 'float' in t or 'numeric' in t or 'double' in t: return 'double' | ||
| if 'timestamp' in t: return 'timestamp' | ||
| if 'date' in t: return 'date' | ||
| return 'text' |
There was a problem hiding this comment.
Fix formatting violations blocking CI.
Lines 14–19 violate E701 (multiple statements on one line), causing static checks to fail.
🔧 Proposed fix to expand single-line conditionals
- if 'int' in t: return 'bigint'
- if 'bool' in t: return 'bool'
- if 'json' in t: return 'text'
- if 'float' in t or 'numeric' in t or 'double' in t: return 'double'
- if 'timestamp' in t: return 'timestamp'
- if 'date' in t: return 'date'
+ if 'int' in t:
+ return 'bigint'
+ if 'bool' in t:
+ return 'bool'
+ if 'json' in t:
+ return 'text'
+ if 'float' in t or 'numeric' in t or 'double' in t:
+ return 'double'
+ if 'timestamp' in t:
+ return 'timestamp'
+ if 'date' in t:
+ return 'date'
return 'text'🧰 Tools
🪛 GitHub Actions: Static checks / 0_static checks.txt
[error] 14-14: ruff check (E701): Multiple statements on one line (colon) at if 'int' in t: return 'bigint'.
[error] 15-15: ruff check (E701): Multiple statements on one line (colon) at if 'bool' in t: return 'bool'.
[error] 16-16: ruff check (E701): Multiple statements on one line (colon) at if 'json' in t: return 'text'.
[error] 17-17: ruff check (E701): Multiple statements on one line (colon) at if 'float' in t or 'numeric' in t or 'double' in t: return 'double'.
[error] 18-18: ruff check (E701): Multiple statements on one line (colon) at if 'timestamp' in t: return 'timestamp'.
[error] 19-19: ruff check (E701): Multiple statements on one line (colon) at if 'date' in t: return 'date'.
🪛 GitHub Actions: Static checks / static checks
[error] 14-14: ruff (E701) Multiple statements on one line (colon) at: if 'int' in t: return 'bigint'
[error] 15-15: ruff (E701) Multiple statements on one line (colon) at: if 'bool' in t: return 'bool'
[error] 16-16: ruff (E701) Multiple statements on one line (colon) at: if 'json' in t: return 'text'
[error] 17-17: ruff (E701) Multiple statements on one line (colon) at: if 'float' in t or 'numeric' in t or 'double' in t: return 'double'
[error] 18-18: ruff (E701) Multiple statements on one line (colon) at: if 'timestamp' in t: return 'timestamp'
[error] 19-19: ruff (E701) Multiple statements on one line (colon) at: if 'date' in t: return 'date'
🪛 Ruff (0.15.15)
[error] 14-14: Multiple statements on one line (colon)
(E701)
[error] 15-15: Multiple statements on one line (colon)
(E701)
[error] 16-16: Multiple statements on one line (colon)
(E701)
[error] 17-17: Multiple statements on one line (colon)
(E701)
[error] 18-18: Multiple statements on one line (colon)
(E701)
[error] 19-19: Multiple statements on one line (colon)
(E701)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@warehouses/fase/ingest/clone_postgresql_table.py` around lines 14 - 20, The
current type-mapping block in clone_postgresql_table.py uses multiple
single-line if statements (e.g., "if 'int' in t: return 'bigint'") which
triggers E701; expand each conditional into its own multi-line if statement with
the condition on one line and the return on the next, remove any trailing spaces
(e.g., after 'text'), and ensure consistent spacing/indentation for all cases
(int→bigint, bool→bool, json→text, float/numeric/double→double,
timestamp→timestamp, date→date, default return 'text') so the linter passes.
Sources: Linters/SAST tools, Pipeline failures
| table_format="iceberg" | ||
| ) | ||
| def load_resource(): | ||
| query = f'SELECT * FROM "{schema}"."{t_name}"' |
There was a problem hiding this comment.
Avoid string interpolation for SQL identifiers.
The f-string query construction is flagged as a potential SQL injection vector (S608). Whilst schema and t_name originate from SQLAlchemy's inspector (a controlled source), this pattern is fragile and could introduce risk if the code is refactored.
🛡️ Proposed fix using SQLAlchemy text with bound parameters or proper quoting
+from sqlalchemy import text
+
- query = f'SELECT * FROM "{schema}"."{t_name}"'
- df = pd.read_sql(query, engine)
+ # Use SQLAlchemy text and proper identifier binding
+ query = text(f'SELECT * FROM "{schema}"."{t_name}"')
+ df = pd.read_sql(query, engine)Alternatively, use pandas read_sql_table for safer identifier handling:
- query = f'SELECT * FROM "{schema}"."{t_name}"'
- df = pd.read_sql(query, engine)
+ df = pd.read_sql_table(t_name, engine, schema=schema)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| query = f'SELECT * FROM "{schema}"."{t_name}"' | |
| df = pd.read_sql_table(t_name, engine, schema=schema) |
🧰 Tools
🪛 Ruff (0.15.15)
[error] 57-57: Possible SQL injection vector through string-based query construction
(S608)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@warehouses/fase/ingest/clone_postgresql_table.py` at line 57, The query built
with f'SELECT * FROM "{schema}"."{t_name}"' uses string interpolation for SQL
identifiers and should be replaced with a safe identifier-aware API; locate the
query assignment (variables schema, t_name, query) and change it to use
SQLAlchemy's Table/metadata reflection or pandas.read_sql_table so identifiers
are quoted/handled properly (e.g., reflect or construct a
sqlalchemy.schema.Table for t_name+schema and issue a select(table) or use
pandas.read_sql_table with schema argument) instead of formatting identifiers
into a raw SQL string.
Source: Linters/SAST tools
| try: | ||
| info = pipeline.run(postgres_dynamic_source(engine, args.schema, args.table)) | ||
| print(info) | ||
| except Exception as e: |
There was a problem hiding this comment.
Remove unused exception variable to fix CI.
The exception variable e is assigned but never used (F841), causing static checks to fail.
🔧 Proposed fix
- except Exception as e:
+ except Exception:
import traceback
traceback.print_exc()
sys.exit(1)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| except Exception as e: | |
| except Exception: | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) |
🧰 Tools
🪛 GitHub Actions: Static checks / 0_static checks.txt
[error] 92-92: ruff check (F841): Local variable e is assigned to but never used in except Exception as e:.
🪛 GitHub Actions: Static checks / static checks
[error] 92-92: ruff (F841) Local variable e is assigned to but never used. Remove assignment to unused variable e.
🪛 Ruff (0.15.15)
[warning] 92-92: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@warehouses/fase/ingest/clone_postgresql_table.py` at line 92, Change the
except clause that currently binds an unused variable (the line with "except
Exception as e") so it no longer assigns the unused name; locate the try/except
block in clone_postgresql_table.py and replace the "except Exception as e"
clause with an except that does not bind the exception (or binds it to _ if you
intend to ignore it), leaving the existing exception handling body unchanged.
Sources: Linters/SAST tools, Pipeline failures
ref #340
Description
This PR implements the core ingestion pipeline to move FASE application data into the lakehouse. It establishes the end-to-end data flow by extracting data from both Oracle and PostgreSQL sources and loading it into the Iceberg destination.
Scope of Work
Follow-Up Work
Summary by CodeRabbit
Release Notes