Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME")

if not all([DB_USER, DB_PASS, DB_HOST, DB_NAME]):
raise SystemExit("Missing DB credentials in config.env. Please set DB_USER, DB_PASSWORD, DB_HOST, DB_NAME.")

#Shema
#Schema
TARGET_SCHEMA = "final"


Expand All @@ -27,7 +24,7 @@
pool_pre_ping=True,
)

# Helper to prefix CREATE TABLE with schema and table name
# Function to CREATE TABLE with schema and table name
def wrap_create(schema: str, table: str, body_sql: str) -> str:
# Ensure the body_sql does NOT already contain a CREATE TABLE line.
header = f"CREATE TABLE IF NOT EXISTS {schema}.{table} AS\n"
Expand Down Expand Up @@ -64,15 +61,15 @@ def wrap_create(schema: str, table: str, body_sql: str) -> str:
sm.subject_area,
sm.sub_field
FROM intermediate.student_education se
JOIN LATERAL unnest(se.subject_id) AS unnested_subject(subject_id) ON TRUE
JOIN LATERAL unnest(se.subject_id) AS unnested_subject(subject_id) ON TRUE --unnest() function will create one row for each element in the array subject_id
JOIN intermediate.subject_mapping sm
ON unnested_subject.subject_id = sm.id
JOIN intermediate.course_mapping cm
ON se.education_course_id = cm.course_id
LEFT JOIN intermediate.college_mapping colm
LEFT JOIN intermediate.college_mapping colm -- Left join as we don't have all the colleges listed in our standardized table i.e college_mapping table
ON se.college_id = colm.college_id
LEFT JOIN intermediate.university_mapping um
ON se.university_id = um.university_id
ON se.university_id = um.university_id -- Left join as we don't have all the universities listed in our standardized table i.e university_mapping table
),
aggregated_subjects AS (
SELECT
Expand Down Expand Up @@ -111,9 +108,9 @@ def wrap_create(schema: str, table: str, body_sql: str) -> str:
na.college_name,
na.university_name
FROM student_details sd
LEFT JOIN intermediate.location_mapping lm
LEFT JOIN intermediate.location_mapping lm --INNER JOIN would have exclude INC 1,2,3 records because no match would be found in the standardized file.
ON sd.location_id = lm.location_id
LEFT JOIN student_registration sr
LEFT JOIN student_registration sr --Ensures no data from the student details table is lost, even if there is no corresponding entry in the student_registration table.
ON sd.id = sr.student_id
LEFT JOIN aggregated_subjects asub
ON sd.id = asub.student_id
Expand All @@ -126,7 +123,6 @@ def wrap_create(schema: str, table: str, body_sql: str) -> str:
WITH cohort_range AS (
SELECT start_date, end_date
FROM intermediate.cohort
--WHERE cohort_code = 'INC007'
),

live_sessions AS (
Expand Down Expand Up @@ -179,15 +175,15 @@ def wrap_create(schema: str, table: str, body_sql: str) -> str:
sm.subject_area,
sm.sub_field
FROM intermediate.student_education se
JOIN LATERAL unnest(se.subject_id) AS unnested_subject(subject_id) ON TRUE
JOIN LATERAL unnest(se.subject_id) AS unnested_subject(subject_id) ON TRUE --unnest() function will create one row for each element in the array subject_id
JOIN intermediate.subject_mapping sm
ON unnested_subject.subject_id = sm.id
JOIN intermediate.course_mapping cm
ON se.education_course_id = cm.course_id
LEFT JOIN intermediate.college_mapping colm
LEFT JOIN intermediate.college_mapping colm -- Left join as we don't have all the colleges listed in our standardized table i.e college_mapping table
ON se.college_id = colm.college_id
LEFT JOIN intermediate.university_mapping um
ON se.university_id = um.university_id
LEFT JOIN intermediate.university_mapping um -- Left join as we don't have all the universities listed in our standardized table i.e university_mapping table
ON se.university_id = um.university_id
),

aggregated_subjects AS (
Expand Down Expand Up @@ -233,9 +229,9 @@ def wrap_create(schema: str, table: str, body_sql: str) -> str:
na.college_name,
na.university_name
FROM student_attendance sa
LEFT JOIN intermediate.location_mapping lm
LEFT JOIN intermediate.location_mapping lm --INNER JOIN would have exclude INC 1,2,3 records because no match would be found in the standardized file.
ON sa.location_id = lm.location_id
LEFT JOIN student_registration sr
LEFT JOIN student_registration sr --Ensures no data from the student details table is lost, even if there is no corresponding entry in the student_registration table.
ON sa.student_id = sr.student_id
LEFT JOIN aggregated_subjects asub
ON sa.student_id = asub.student_id
Expand Down Expand Up @@ -294,9 +290,9 @@ def wrap_create(schema: str, table: str, body_sql: str) -> str:
ON unnested_subject.subject_id = sm.id
JOIN intermediate.course_mapping cm
ON se.education_course_id = cm.course_id
LEFT JOIN intermediate.college_mapping colm
LEFT JOIN intermediate.college_mapping colm -- Left join as we don't have all the colleges listed in our standardized table i.e college_mapping table
ON se.college_id = colm.college_id
LEFT JOIN intermediate.university_mapping um
LEFT JOIN intermediate.university_mapping um -- Left join as we don't have all the universities listed in our standardized table i.e university_mapping table
ON se.university_id = um.university_id
),

Expand Down Expand Up @@ -345,9 +341,9 @@ def wrap_create(schema: str, table: str, body_sql: str) -> str:
na.college_name,
na.university_name
FROM student_assignment ss
LEFT JOIN intermediate.location_mapping lm
LEFT JOIN intermediate.location_mapping lm --INNER JOIN would have exclude INC 1,2,3 records because no match would be found in the standardized file.
ON ss.location_id = lm.location_id
LEFT JOIN student_registration sr
LEFT JOIN student_registration sr --Ensures no data from the student details table is lost, even if there is no corresponding entry in the student_registration table.
ON ss.student_id = sr.student_id
LEFT JOIN aggregated_subjects asub
ON ss.student_id = asub.student_id
Expand Down Expand Up @@ -403,9 +399,9 @@ def wrap_create(schema: str, table: str, body_sql: str) -> str:
ON unnested_subject.subject_id = sm.id
JOIN intermediate.course_mapping cm
ON se.education_course_id = cm.course_id
LEFT JOIN intermediate.college_mapping colm
LEFT JOIN intermediate.college_mapping colm -- Left join as we don't have all the colleges listed in our standardized table i.e college_mapping table
ON se.college_id = colm.college_id
LEFT JOIN intermediate.university_mapping um
LEFT JOIN intermediate.university_mapping um -- Left join as we don't have all the universities listed in our standardized table i.e university_mapping table
ON se.university_id = um.university_id
),

Expand Down Expand Up @@ -455,9 +451,9 @@ def wrap_create(schema: str, table: str, body_sql: str) -> str:
na.college_name,
na.university_name
FROM student_quiz ss
LEFT JOIN intermediate.location_mapping lm
LEFT JOIN intermediate.location_mapping lm --INNER JOIN would have exclude INC 1,2,3 records because no match would be found in the standardized file.
ON ss.location_id = lm.location_id
LEFT JOIN student_registration sr
LEFT JOIN student_registration sr --Ensures no data from the student details table is lost, even if there is no corresponding entry in the student_registration table.
ON ss.student_id = sr.student_id
LEFT JOIN aggregated_subjects asub
ON ss.student_id = asub.student_id
Expand All @@ -466,20 +462,14 @@ def wrap_create(schema: str, table: str, body_sql: str) -> str:
AND asub.education_course_id = na.education_course_id;
""")

# Map desired final table names
# Map final table names
table_map = [
("student_demography", student_demography_body),
("daily_weekly_attendance", student_attendance_body),
("final_assignment", student_assignment_body),
("final_quiz", student_quiz_body),
]

def save_sql_file(schema: str, table: str, sql_text: str):
base = f"{schema}_{table}"
fname = f"./{base}.sql"
with open(fname, "w", encoding="utf-8") as f:
f.write(sql_text)
print(f"Saved SQL to {fname}")

def drop_table_if_exists(conn, schema: str, table: str):
q = text(f"DROP TABLE IF EXISTS {schema}.{table} CASCADE;")
Expand All @@ -488,7 +478,7 @@ def drop_table_if_exists(conn, schema: str, table: str):

def run():
with engine.begin() as conn:
# ensure target schema exists (create if missing)
# ensure target schema exists
try:
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {TARGET_SCHEMA};"))
print(f"Schema ensured: {TARGET_SCHEMA}")
Expand All @@ -498,7 +488,7 @@ def run():
for table_name, body in table_map:
full_sql = wrap_create(TARGET_SCHEMA, table_name, body)
print(f"\n--- Preparing to create {TARGET_SCHEMA}.{table_name} ---")
save_sql_file(TARGET_SCHEMA, table_name, full_sql)

if DROP_IF_EXISTS:
drop_table_if_exists(conn, TARGET_SCHEMA, table_name)
try:
Expand Down