Skip to content

Feat: Implement FASE data ingestion pipeline (Oracle & Postgres to Iceberg)#341

Open
bashanlam wants to merge 1 commit into
mainfrom
ingest-local-fase-db
Open

Feat: Implement FASE data ingestion pipeline (Oracle & Postgres to Iceberg)#341
bashanlam wants to merge 1 commit into
mainfrom
ingest-local-fase-db

Conversation

@bashanlam

@bashanlam bashanlam commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

ref #340

Description
This PR implements the core ingestion pipeline to move FASE application data into the lakehouse. It establishes the end-to-end data flow by extracting data from both Oracle and PostgreSQL sources and loading it into the Iceberg destination.

Scope of Work

  • Build Oracle DB extraction pipeline: Implemented extraction logic for Oracle database sources using the dlt framework.
  • Build PostgreSQL DB extraction pipeline: Implemented extraction logic for PostgreSQL database sources.
  • Configure Iceberg destination loading: Configured the target pipeline to securely and efficiently stream data into the Apache Iceberg lakehouse destination.

Follow-Up Work

Summary by CodeRabbit

Release Notes

  • New Features
    • Added Oracle table cloning capability to ingest data into the data warehouse, with support for including views
    • Added PostgreSQL table ingestion into the data warehouse with automatic type mapping
    • Enabled batch processing of multiple Oracle schemas in a single operation
    • Implemented automated timezone handling and data type conversion for Oracle sources

@bashanlam bashanlam requested a review from martyngigg June 10, 2026 10:47
@bashanlam bashanlam requested a review from a team as a code owner June 10, 2026 10:47
@coderabbitai

coderabbitai Bot commented Jun 10, 2026

Copy link
Copy Markdown
Contributor
📝 Walkthrough

Walkthrough

This PR adds three new ingestion scripts to the FASE warehouse ingest directory. The clone_oracle_table.py script ingests Oracle database tables into a DLT pipeline with PyIceberg destination, featuring custom type handling, timezone configuration, and data normalisation. A complementary Bash wrapper clone_oracle_all_table.sh orchestrates batch processing across multiple schemas. The clone_postgresql_table.py script provides similar functionality for PostgreSQL sources with type mapping and JSON column handling.

Changes

Oracle Table Ingestion to Iceberg

Layer / File(s) Summary
Oracle environment and CLI configuration
warehouses/fase/ingest/clone_oracle_table.py
Global timezone override via ORA_SDTZ=UTC, imports for DLT/SQLAlchemy/cx_Oracle, and CLI argument parsing for target schema/table, state reset, and views inclusion.
Oracle type and data normalisation
warehouses/fase/ingest/clone_oracle_table.py
Custom cx_Oracle output type handler normalises CLOB/BLOB/datetime types; data mapper converts null payloads to empty strings and parses date/time strings; column-level hints set nullability based on primary keys and remap type names (date/time keywords to timestamp, float-like numerics to double).
DLT Oracle source and pipeline execution
warehouses/fase/ingest/clone_oracle_table.py
SQLAlchemy Oracle engine with NullPool, on-connect session configuration for NLS date/timestamp formats, @dlt.source integrating the sql_database source with optional view inclusion and applying normalisation functions to all resources; pipeline instantiation with optional state reset and write_disposition="replace".
Multi-schema batch orchestration
warehouses/fase/ingest/clone_oracle_all_table.sh
Bash script iterating over hardcoded schema entries, conditionally enabling --include-views flag, invoking the Python script per schema, and reporting per-schema success/failure status with start/end banners.

PostgreSQL Table Ingestion to Iceberg

Layer / File(s) Summary
PostgreSQL type mapping and table metadata
warehouses/fase/ingest/clone_postgresql_table.py
Fixed PostgreSQL connection URI; type mapping from SQLAlchemy types to DLT core types with JSON detection; SQLAlchemy inspection helpers to fetch column metadata and identify JSON-formatted columns per schema/table.
DLT PostgreSQL source and pipeline execution
warehouses/fase/ingest/clone_postgresql_table.py
Dynamic @dlt.source selecting single or all tables in a schema, constructs per-table Iceberg resources with pandas DataFrame extraction, converts detected JSON columns to JSON strings, emits dummy row for empty tables to ensure creation; CLI parsing for schema and table, pipeline instantiation with dataset naming tied to schema, run execution with error handling and exit code 1 on exception.

Poem

🐰 From Oracle's vault and Postgres deep,
Data flows where Iceberg tables sleep,
Schemas gathered, types transformed with care,
Batch and single, ingestion everywhere!
New pathways carved through the warehouse night,
Where DLT pipelines shine so bright! ✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and accurately summarizes the main change: implementing a FASE data ingestion pipeline from Oracle and PostgreSQL sources to Iceberg destination.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@bashanlam bashanlam self-assigned this Jun 10, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (3)
warehouses/fase/ingest/clone_oracle_table.py (1)

62-68: 💤 Low value

Consider renaming for clarity.

The function name disable_dlt_identifiers_if_no_pk suggests it disables multiple identifiers, but the implementation only clears the primary_key hint. Consider renaming to clear_primary_key_if_absent or similar to better reflect its single responsibility.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_oracle_table.py` around lines 62 - 68, The
function disable_dlt_identifiers_if_no_pk misleads readers because it only
clears the primary_key hint; rename the function to clear_primary_key_if_absent
(or similar) and update all references/calls accordingly; inside the function
keep the existing logic that checks columns for a primary_key and calls
resource.apply_hints(primary_key=[]) when none exists, but change the function
name declaration and any imports/usages to the new identifier to maintain
consistency.
warehouses/fase/ingest/clone_oracle_all_table.sh (1)

38-45: ⚡ Quick win

Check exit code directly to improve robustness.

The script captures $? and tests it indirectly. Directly testing the command improves readability and handles edge cases where $? might be unexpectedly modified. This addresses shellcheck SC2181.

♻️ Proposed fix
-    # Execute the command
-    python3 ./clone_oracle_table.py "$SCHEMA" all $VIEW_FLAG
-    
-    # Check if the previous command succeeded
-    if [ $? -eq 0 ]; then
+    # Execute the command and check result
+    if python3 ./clone_oracle_table.py "$SCHEMA" all $VIEW_FLAG; then
         echo "✅ COMPLETED: $SCHEMA (Include Views: $INCLUDE_VIEWS)"
     else
         echo "❌ FAILED: $SCHEMA"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_oracle_all_table.sh` around lines 38 - 45,
Replace the indirect exit-code check for the clone command with a direct test of
the command itself: run python3 ./clone_oracle_table.py "$SCHEMA" all $VIEW_FLAG
inside an if-statement (if python3 ./clone_oracle_table.py "$SCHEMA" all
$VIEW_FLAG; then ... else ... fi) so you don't rely on a separate $? read;
update the success/failure echo lines that reference $SCHEMA and $INCLUDE_VIEWS
accordingly to keep behavior identical and avoid shellcheck SC2181 warnings.

Source: Linters/SAST tools

warehouses/fase/ingest/clone_postgresql_table.py (1)

31-31: 💤 Low value

Case-insensitive "all" check may conflict with actual table names.

The check target_table.lower() != 'all' prevents ingesting a table literally named "all", "All", or "ALL". Consider an exact match or a reserved keyword pattern instead.

♻️ Proposed fix using case-sensitive check
-    tables = [target_table] if target_table.lower() != 'all' else inspector.get_table_names(schema=schema)
+    tables = [target_table] if target_table != 'all' else inspector.get_table_names(schema=schema)

Document the reserved keyword "all" in CLI help text.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_postgresql_table.py` at line 31, The current
check lowercases target_table so a real table named "all"/"All"/"ALL" is treated
as the special keyword; change the logic in the ingestion selection so it uses
an exact, case-sensitive sentinel (e.g., compare target_table == 'all' without
lowercasing) or better yet require an explicit flag (e.g., --all) and treat
target_table literally; update the code paths that build tables (the expression
assigning tables, and any use of target_table, inspector.get_table_names,
schema) to respect the new sentinel behavior and update CLI/help text to
document that "all" is reserved or that --all must be used to ingest every
table.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@warehouses/fase/ingest/clone_oracle_table.py`:
- Around line 42-57: The robust_data_mapper currently infers datetime fields by
column name (date_keywords) which causes false positives; update
robust_data_mapper to prefer explicit schema/type metadata when available (e.g.,
accept a schema dict or column type map) and only apply dateutil.parser.parse
for keys whose schema type indicates a DATE/TIME/TIMESTAMP; keep the existing
try/except fallback for safety and preserve the PAYLOAD/payload handling. Ensure
you reference and use the date_keywords, robust_data_mapper function signature,
and the item keys to locate where to add the schema lookups and conditional
parsing.
- Line 1: Remove trailing whitespace and normalize end-of-file newlines in both
files to satisfy pre-commit hooks: open
warehouses/fase/ingest/clone_oracle_table.py (starting at the shebang
"#!/usr/bin/env python3") and remove any trailing spaces on lines and ensure a
single newline at EOF, and do the same for clone_oracle_all_table.sh so the
pre-commit fixes are committed and the pipeline passes.
- Around line 100-101: The code currently maps "decimal" (and "double","float")
to "double", which loses precision; update the mapping in clone_oracle_table.py
so "decimal"/Oracle NUMBER preserves its exact representation by setting
hint["data_type"] = "decimal" and populating hint["precision"] and hint["scale"]
from the column metadata (e.g., the column/column_metadata object you use when
inspecting columns); only fall back to "double" if precision/scale are
unavailable and document that fallback, and ensure any downstream function that
reads hint (e.g., the code that emits Iceberg/target schemas) respects the
precision/scale fields.
- Around line 152-155: The oracle_uri currently contains a hardcoded plaintext
password; update the construction of oracle_uri to read the DB password from a
secure source (e.g., an environment variable or secrets manager) instead of
embedding "pa55w0rdTolocalDB", using args.target_schema for the username as
before; add a guard to fail fast with a clear error if the password env var (or
secret lookup) is missing and ensure no sensitive value is logged or committed
to source control. Reference: the oracle_uri variable and args.target_schema in
clone_oracle_table.py.

In `@warehouses/fase/ingest/clone_postgresql_table.py`:
- Around line 14-20: The current type-mapping block in clone_postgresql_table.py
uses multiple single-line if statements (e.g., "if 'int' in t: return 'bigint'")
which triggers E701; expand each conditional into its own multi-line if
statement with the condition on one line and the return on the next, remove any
trailing spaces (e.g., after 'text'), and ensure consistent spacing/indentation
for all cases (int→bigint, bool→bool, json→text, float/numeric/double→double,
timestamp→timestamp, date→date, default return 'text') so the linter passes.
- Line 92: Change the except clause that currently binds an unused variable (the
line with "except Exception as e") so it no longer assigns the unused name;
locate the try/except block in clone_postgresql_table.py and replace the "except
Exception as e" clause with an except that does not bind the exception (or binds
it to _ if you intend to ignore it), leaving the existing exception handling
body unchanged.
- Line 9: The BASE_URI constant currently contains hardcoded DB credentials;
replace this with a runtime-configured value sourced from environment/config
secrets (e.g., read DATABASE_URL or separate DB_USER/DB_PASS/DB_HOST/DB_NAME env
vars) and remove the plaintext credentials from BASE_URI; update any code
referencing BASE_URI to use the new env-derived value and, if your project uses
a secrets manager or config loader, integrate with that instead of embedding
credentials in the file.
- Line 57: The query built with f'SELECT * FROM "{schema}"."{t_name}"' uses
string interpolation for SQL identifiers and should be replaced with a safe
identifier-aware API; locate the query assignment (variables schema, t_name,
query) and change it to use SQLAlchemy's Table/metadata reflection or
pandas.read_sql_table so identifiers are quoted/handled properly (e.g., reflect
or construct a sqlalchemy.schema.Table for t_name+schema and issue a
select(table) or use pandas.read_sql_table with schema argument) instead of
formatting identifiers into a raw SQL string.

---

Nitpick comments:
In `@warehouses/fase/ingest/clone_oracle_all_table.sh`:
- Around line 38-45: Replace the indirect exit-code check for the clone command
with a direct test of the command itself: run python3 ./clone_oracle_table.py
"$SCHEMA" all $VIEW_FLAG inside an if-statement (if python3
./clone_oracle_table.py "$SCHEMA" all $VIEW_FLAG; then ... else ... fi) so you
don't rely on a separate $? read; update the success/failure echo lines that
reference $SCHEMA and $INCLUDE_VIEWS accordingly to keep behavior identical and
avoid shellcheck SC2181 warnings.

In `@warehouses/fase/ingest/clone_oracle_table.py`:
- Around line 62-68: The function disable_dlt_identifiers_if_no_pk misleads
readers because it only clears the primary_key hint; rename the function to
clear_primary_key_if_absent (or similar) and update all references/calls
accordingly; inside the function keep the existing logic that checks columns for
a primary_key and calls resource.apply_hints(primary_key=[]) when none exists,
but change the function name declaration and any imports/usages to the new
identifier to maintain consistency.

In `@warehouses/fase/ingest/clone_postgresql_table.py`:
- Line 31: The current check lowercases target_table so a real table named
"all"/"All"/"ALL" is treated as the special keyword; change the logic in the
ingestion selection so it uses an exact, case-sensitive sentinel (e.g., compare
target_table == 'all' without lowercasing) or better yet require an explicit
flag (e.g., --all) and treat target_table literally; update the code paths that
build tables (the expression assigning tables, and any use of target_table,
inspector.get_table_names, schema) to respect the new sentinel behavior and
update CLI/help text to document that "all" is reserved or that --all must be
used to ingest every table.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 8f797a6a-dad2-4828-b58b-fbbeb7e362d5

📥 Commits

Reviewing files that changed from the base of the PR and between b93399d and bd35ead.

📒 Files selected for processing (3)
  • warehouses/fase/ingest/clone_oracle_all_table.sh
  • warehouses/fase/ingest/clone_oracle_table.py
  • warehouses/fase/ingest/clone_postgresql_table.py

@@ -0,0 +1,180 @@
#!/usr/bin/env python3

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix formatting issues flagged by pre-commit hooks across both files.

Both clone_oracle_table.py and clone_oracle_all_table.sh triggered pre-commit hook failures for trailing whitespace and end-of-file formatting. The hooks have automatically applied fixes. Please ensure you commit the corrected versions to resolve the pipeline failures.

🧰 Tools
🪛 GitHub Actions: Static checks / 0_static checks.txt

[error] 1-1: pre-commit hook trailing-whitespace failed (exit code 1). Trailing whitespace detected and fixes were applied.


[error] 1-1: pre-commit hook end-of-file-fixer failed (exit code 1). End-of-file formatting detected and fixes were applied.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_oracle_table.py` at line 1, Remove trailing
whitespace and normalize end-of-file newlines in both files to satisfy
pre-commit hooks: open warehouses/fase/ingest/clone_oracle_table.py (starting at
the shebang "#!/usr/bin/env python3") and remove any trailing spaces on lines
and ensure a single newline at EOF, and do the same for
clone_oracle_all_table.sh so the pre-commit fixes are committed and the pipeline
passes.

Source: Pipeline failures

Comment on lines +42 to +57
def robust_data_mapper(item):
date_keywords = ['DATE', 'TIME', 'TIMESTAMP']

if 'PAYLOAD' in item or 'payload' in item:
key = 'PAYLOAD' if 'PAYLOAD' in item else 'payload'
val = item[key]
if val is None:
item[key] = ""

for key, value in item.items():
if isinstance(value, str) and any(kw in key.upper() for kw in date_keywords):
try:
item[key] = dateutil.parser.parse(value)
except (ValueError, TypeError):
pass
return item

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Pattern-based date parsing may produce false positives.

The function uses keyword matching (DATE, TIME, TIMESTAMP in column name) to decide which string values to parse as datetimes. This heuristic may incorrectly attempt to parse columns like UPDATE_TIMESTAMP_STRING that are intentionally strings. Whilst the try/except provides a safe fallback, consider verifying the actual column data type from schema metadata rather than relying solely on naming conventions.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_oracle_table.py` around lines 42 - 57, The
robust_data_mapper currently infers datetime fields by column name
(date_keywords) which causes false positives; update robust_data_mapper to
prefer explicit schema/type metadata when available (e.g., accept a schema dict
or column type map) and only apply dateutil.parser.parse for keys whose schema
type indicates a DATE/TIME/TIMESTAMP; keep the existing try/except fallback for
safety and preserve the PAYLOAD/payload handling. Ensure you reference and use
the date_keywords, robust_data_mapper function signature, and the item keys to
locate where to add the schema lookups and conditional parsing.

Comment on lines +100 to +101
if data_type in ("decimal", "double", "float"):
hint["data_type"] = "double"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Mapping decimal to double may lose precision.

Converting decimal types to double (floating-point) can introduce rounding errors and lose precision, particularly problematic for financial data or exact numeric calculations. Oracle NUMBER columns with specific precision/scale should preserve their exact representation. Consider mapping decimals to a fixed-precision type or preserving the decimal type if supported by Iceberg.

🔢 Proposed fix to preserve decimal precision
-        if data_type in ("decimal", "double", "float"):
+        if data_type in ("double", "float"):
             hint["data_type"] = "double"
+        # Preserve decimal types to avoid precision loss
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_oracle_table.py` around lines 100 - 101, The
code currently maps "decimal" (and "double","float") to "double", which loses
precision; update the mapping in clone_oracle_table.py so "decimal"/Oracle
NUMBER preserves its exact representation by setting hint["data_type"] =
"decimal" and populating hint["precision"] and hint["scale"] from the column
metadata (e.g., the column/column_metadata object you use when inspecting
columns); only fall back to "double" if precision/scale are unavailable and
document that fallback, and ensure any downstream function that reads hint
(e.g., the code that emits Iceberg/target schemas) respects the precision/scale
fields.

Comment on lines +152 to +155
oracle_uri = (
f"oracle+cx_oracle://{args.target_schema}:pa55w0rdTolocalDB"
"@localhost:1521/?service_name=xepdb"
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Critical: Remove hardcoded credentials.

The password pa55w0rdTolocalDB is hardcoded in plaintext in the connection string. This is a critical security vulnerability that exposes credentials in source control and logs. Even for local development, credentials must be externalised to environment variables, secrets managers, or secure configuration files.

🔐 Proposed fix using environment variables
+    oracle_password = os.environ.get("ORACLE_PASSWORD")
+    if not oracle_password:
+        raise ValueError("ORACLE_PASSWORD environment variable must be set")
+    
     oracle_uri = (
-        f"oracle+cx_oracle://{args.target_schema}:pa55w0rdTolocalDB"
+        f"oracle+cx_oracle://{args.target_schema}:{oracle_password}"
         "`@localhost`:1521/?service_name=xepdb"
     )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_oracle_table.py` around lines 152 - 155, The
oracle_uri currently contains a hardcoded plaintext password; update the
construction of oracle_uri to read the DB password from a secure source (e.g.,
an environment variable or secrets manager) instead of embedding
"pa55w0rdTolocalDB", using args.target_schema for the username as before; add a
guard to fail fast with a clear error if the password env var (or secret lookup)
is missing and ensure no sensitive value is logged or committed to source
control. Reference: the oracle_uri variable and args.target_schema in
clone_oracle_table.py.

from sqlalchemy import create_engine, inspect
from elt_common.dlt_destinations import pyiceberg

BASE_URI = "postgresql://duouser:duopassword@localhost:5432/duo"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Hardcoded database credentials expose a critical security risk.

The connection URI contains plaintext username and password that will be committed to version control and visible in repository history. Credentials must never be hardcoded.

🔒 Proposed fix using environment variables
+import os
+
-BASE_URI = "postgresql://duouser:duopassword@localhost:5432/duo"
+BASE_URI = os.environ.get(
+    "POSTGRES_URI",
+    "postgresql://localhost:5432/duo"  # Default without credentials for local dev
+)

Alternatively, integrate with the project's existing credential management system if available.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
BASE_URI = "postgresql://duouser:duopassword@localhost:5432/duo"
import os
BASE_URI = os.environ.get(
"POSTGRES_URI",
"postgresql://localhost:5432/duo" # Default without credentials for local dev
)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_postgresql_table.py` at line 9, The BASE_URI
constant currently contains hardcoded DB credentials; replace this with a
runtime-configured value sourced from environment/config secrets (e.g., read
DATABASE_URL or separate DB_USER/DB_PASS/DB_HOST/DB_NAME env vars) and remove
the plaintext credentials from BASE_URI; update any code referencing BASE_URI to
use the new env-derived value and, if your project uses a secrets manager or
config loader, integrate with that instead of embedding credentials in the file.

Comment on lines +14 to +20
if 'int' in t: return 'bigint'
if 'bool' in t: return 'bool'
if 'json' in t: return 'text'
if 'float' in t or 'numeric' in t or 'double' in t: return 'double'
if 'timestamp' in t: return 'timestamp'
if 'date' in t: return 'date'
return 'text'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Fix formatting violations blocking CI.

Lines 14–19 violate E701 (multiple statements on one line), causing static checks to fail.

🔧 Proposed fix to expand single-line conditionals
-    if 'int' in t: return 'bigint'
-    if 'bool' in t: return 'bool'
-    if 'json' in t: return 'text' 
-    if 'float' in t or 'numeric' in t or 'double' in t: return 'double'
-    if 'timestamp' in t: return 'timestamp'
-    if 'date' in t: return 'date'
+    if 'int' in t:
+        return 'bigint'
+    if 'bool' in t:
+        return 'bool'
+    if 'json' in t:
+        return 'text'
+    if 'float' in t or 'numeric' in t or 'double' in t:
+        return 'double'
+    if 'timestamp' in t:
+        return 'timestamp'
+    if 'date' in t:
+        return 'date'
     return 'text'
🧰 Tools
🪛 GitHub Actions: Static checks / 0_static checks.txt

[error] 14-14: ruff check (E701): Multiple statements on one line (colon) at if 'int' in t: return 'bigint'.


[error] 15-15: ruff check (E701): Multiple statements on one line (colon) at if 'bool' in t: return 'bool'.


[error] 16-16: ruff check (E701): Multiple statements on one line (colon) at if 'json' in t: return 'text'.


[error] 17-17: ruff check (E701): Multiple statements on one line (colon) at if 'float' in t or 'numeric' in t or 'double' in t: return 'double'.


[error] 18-18: ruff check (E701): Multiple statements on one line (colon) at if 'timestamp' in t: return 'timestamp'.


[error] 19-19: ruff check (E701): Multiple statements on one line (colon) at if 'date' in t: return 'date'.

🪛 GitHub Actions: Static checks / static checks

[error] 14-14: ruff (E701) Multiple statements on one line (colon) at: if 'int' in t: return 'bigint'


[error] 15-15: ruff (E701) Multiple statements on one line (colon) at: if 'bool' in t: return 'bool'


[error] 16-16: ruff (E701) Multiple statements on one line (colon) at: if 'json' in t: return 'text'


[error] 17-17: ruff (E701) Multiple statements on one line (colon) at: if 'float' in t or 'numeric' in t or 'double' in t: return 'double'


[error] 18-18: ruff (E701) Multiple statements on one line (colon) at: if 'timestamp' in t: return 'timestamp'


[error] 19-19: ruff (E701) Multiple statements on one line (colon) at: if 'date' in t: return 'date'

🪛 Ruff (0.15.15)

[error] 14-14: Multiple statements on one line (colon)

(E701)


[error] 15-15: Multiple statements on one line (colon)

(E701)


[error] 16-16: Multiple statements on one line (colon)

(E701)


[error] 17-17: Multiple statements on one line (colon)

(E701)


[error] 18-18: Multiple statements on one line (colon)

(E701)


[error] 19-19: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_postgresql_table.py` around lines 14 - 20, The
current type-mapping block in clone_postgresql_table.py uses multiple
single-line if statements (e.g., "if 'int' in t: return 'bigint'") which
triggers E701; expand each conditional into its own multi-line if statement with
the condition on one line and the return on the next, remove any trailing spaces
(e.g., after 'text'), and ensure consistent spacing/indentation for all cases
(int→bigint, bool→bool, json→text, float/numeric/double→double,
timestamp→timestamp, date→date, default return 'text') so the linter passes.

Sources: Linters/SAST tools, Pipeline failures

table_format="iceberg"
)
def load_resource():
query = f'SELECT * FROM "{schema}"."{t_name}"'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid string interpolation for SQL identifiers.

The f-string query construction is flagged as a potential SQL injection vector (S608). Whilst schema and t_name originate from SQLAlchemy's inspector (a controlled source), this pattern is fragile and could introduce risk if the code is refactored.

🛡️ Proposed fix using SQLAlchemy text with bound parameters or proper quoting
+from sqlalchemy import text
+
-                query = f'SELECT * FROM "{schema}"."{t_name}"'
-                df = pd.read_sql(query, engine)
+                # Use SQLAlchemy text and proper identifier binding
+                query = text(f'SELECT * FROM "{schema}"."{t_name}"')
+                df = pd.read_sql(query, engine)

Alternatively, use pandas read_sql_table for safer identifier handling:

-                query = f'SELECT * FROM "{schema}"."{t_name}"'
-                df = pd.read_sql(query, engine)
+                df = pd.read_sql_table(t_name, engine, schema=schema)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
query = f'SELECT * FROM "{schema}"."{t_name}"'
df = pd.read_sql_table(t_name, engine, schema=schema)
🧰 Tools
🪛 Ruff (0.15.15)

[error] 57-57: Possible SQL injection vector through string-based query construction

(S608)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_postgresql_table.py` at line 57, The query built
with f'SELECT * FROM "{schema}"."{t_name}"' uses string interpolation for SQL
identifiers and should be replaced with a safe identifier-aware API; locate the
query assignment (variables schema, t_name, query) and change it to use
SQLAlchemy's Table/metadata reflection or pandas.read_sql_table so identifiers
are quoted/handled properly (e.g., reflect or construct a
sqlalchemy.schema.Table for t_name+schema and issue a select(table) or use
pandas.read_sql_table with schema argument) instead of formatting identifiers
into a raw SQL string.

Source: Linters/SAST tools

try:
info = pipeline.run(postgres_dynamic_source(engine, args.schema, args.table))
print(info)
except Exception as e:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Remove unused exception variable to fix CI.

The exception variable e is assigned but never used (F841), causing static checks to fail.

🔧 Proposed fix
-    except Exception as e:
+    except Exception:
         import traceback
         traceback.print_exc()
         sys.exit(1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
except Exception:
import traceback
traceback.print_exc()
sys.exit(1)
🧰 Tools
🪛 GitHub Actions: Static checks / 0_static checks.txt

[error] 92-92: ruff check (F841): Local variable e is assigned to but never used in except Exception as e:.

🪛 GitHub Actions: Static checks / static checks

[error] 92-92: ruff (F841) Local variable e is assigned to but never used. Remove assignment to unused variable e.

🪛 Ruff (0.15.15)

[warning] 92-92: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@warehouses/fase/ingest/clone_postgresql_table.py` at line 92, Change the
except clause that currently binds an unused variable (the line with "except
Exception as e") so it no longer assigns the unused name; locate the try/except
block in clone_postgresql_table.py and replace the "except Exception as e"
clause with an except that does not bind the exception (or binds it to _ if you
intend to ignore it), leaving the existing exception handling body unchanged.

Sources: Linters/SAST tools, Pipeline failures

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant