diff --git a/.github/actions/run_event_validation/action.yml b/.github/actions/run_event_validation/action.yml index 3cfa928a..e3df1f93 100644 --- a/.github/actions/run_event_validation/action.yml +++ b/.github/actions/run_event_validation/action.yml @@ -55,17 +55,22 @@ runs: run: | cd tmp IFS=',' read -ra TAGS <<< "${{ inputs.release_tags }}" + git fetch --tags --quiet for TAG in "${TAGS[@]}"; do echo "Checking out tag: $TAG" - git fetch --tags --quiet if git checkout --quiet "$TAG"; then DEST_DIR="../specs/$TAG" - if [ -d "spec" ]; then - mkdir -p "../specs/$TAG" - find spec -path './website' -prune -o -type f \( -name '*Facet.json' -o -name 'OpenLineage.json' \) -exec cp {} "../specs/$TAG/" \; + if [[ -d "spec" || -d "integration/common/openlineage" ]]; then + mkdir -p "$DEST_DIR" + if [ -d "spec" ]; then + find spec -path './website' -prune -o -type f \( -name '*Facet.json' -o -name 'OpenLineage.json' \) -exec cp {} "$DEST_DIR" \; + fi + if [ -d "integration/common/src/openlineage" ]; then + find integration/common/src/openlineage -type f -iname '*facet.json' -exec cp {} "$DEST_DIR" \; + fi echo "success" else - echo "Spec directory not found in $TAG" + echo "Neither spec nor integration/common/src/openlineage directory found in $TAG" fi else echo "Tag $TAG not found!" diff --git a/.github/workflows/main_new_release.yml b/.github/workflows/main_new_release.yml index 1958c959..fe81f222 100644 --- a/.github/workflows/main_new_release.yml +++ b/.github/workflows/main_new_release.yml @@ -17,6 +17,10 @@ on: description: 'Run Hive Dataproc tests' required: false default: 'true' + run_dbt: + description: 'Run DBT tests' + required: false + default: 'true' openlineage_release: description: 'Override OpenLineage release version' required: false @@ -26,6 +30,9 @@ on: hive_matrix: description: 'Overwrite matrix for hive tests' required: false + dbt_matrix: + description: 'Overwrite matrix for hive tests' + required: false permissions: id-token: write @@ -40,9 +47,11 @@ jobs: run_dataplex: ${{ github.event.inputs.run_dataplex || 'true' }} run_spark_dataproc: ${{ github.event.inputs.run_spark_dataproc || 'true' }} run_hive_dataproc: ${{ github.event.inputs.run_hive_dataproc || 'true' }} + run_dbt: ${{ github.event.inputs.run_dbt || 'true' }} openlineage_release: ${{ github.event.inputs.openlineage_release || steps.select-components.outputs.ol_release }} spark_matrix: ${{ github.event.inputs.spark_matrix || steps.set-matrix-values.outputs.spark_dataproc_matrix }} hive_matrix: ${{ github.event.inputs.hive_matrix || steps.set-matrix-values.outputs.hive_dataproc_matrix }} + dbt_matrix: ${{ github.event.inputs.dbt_matrix || steps.set-matrix-values.outputs.dbt_matrix }} execution_time: ${{ steps.get-execution-time.outputs.execution_time }} steps: - name: Get execution time @@ -90,6 +99,7 @@ jobs: echo "spark_dataproc_matrix=$(get_matrix spark_dataproc)" >> $GITHUB_OUTPUT echo "hive_dataproc_matrix=$(get_matrix hive_dataproc)" >> $GITHUB_OUTPUT + echo "dbt_matrix=$(get_matrix dbt)" >> $GITHUB_OUTPUT ######## COMPONENT VALIDATION ######## @@ -130,6 +140,17 @@ jobs: component_release: ${{ matrix.component_version }} get-latest-snapshots: 'false' + dbt: + needs: initialize_workflow + if: ${{ needs.initialize_workflow.outputs.run_dbt == 'true' }} + uses: ./.github/workflows/producer_dbt.yml + strategy: + matrix: ${{ fromJson(needs.initialize_workflow.outputs.dbt_matrix) }} + with: + ol_release: ${{ matrix.openlineage_versions }} + dbt_release: ${{ matrix.component_version }} + get-latest-snapshots: 'false' + ######## COLLECTION OF REPORTS AND EXECUTE APPROPRIATE ACTIONS ######## collect-and-compare-reports: @@ -138,6 +159,7 @@ jobs: - dataplex - spark-dataproc - hive-dataproc + - dbt if: ${{ !failure() }} uses: ./.github/workflows/collect_and_compare_reports.yml diff --git a/.github/workflows/main_ol_spec_changes.yml b/.github/workflows/main_ol_spec_changes.yml index 69fab929..18cbe31b 100644 --- a/.github/workflows/main_ol_spec_changes.yml +++ b/.github/workflows/main_ol_spec_changes.yml @@ -19,6 +19,9 @@ on: hive_matrix: description: 'Overwrite matrix for hive tests' required: false + dbt_matrix: + description: 'Overwrite matrix for hive tests' + required: false permissions: @@ -35,6 +38,7 @@ jobs: ol_release: ${{ github.event.inputs.openlineage_release || steps.get-release.outputs.openlineage_release }} spark_matrix: ${{ github.event.inputs.spark_matrix || steps.set-matrix-values.outputs.spark_dataproc_matrix }} hive_matrix: ${{ github.event.inputs.hive_matrix || steps.set-matrix-values.outputs.hive_dataproc_matrix }} + dbt_matrix: ${{ github.event.inputs.dbt_matrix || steps.set-matrix-values.outputs.dbt_matrix }} execution_time: ${{ steps.get-execution-time.outputs.execution_time }} steps: - name: Get execution time @@ -108,6 +112,7 @@ jobs: echo "spark_dataproc_matrix=$(get_matrix spark_dataproc)" >> $GITHUB_OUTPUT echo "hive_dataproc_matrix=$(get_matrix hive_dataproc)" >> $GITHUB_OUTPUT + echo "dbt_matrix=$(get_matrix dbt)" >> $GITHUB_OUTPUT ######## COMPONENT VALIDATION ######## @@ -154,6 +159,18 @@ jobs: component_release: ${{ matrix.component_version }} get-latest-snapshots: 'true' + dbt: + needs: + - initialize_workflow + if: ${{ success() && needs.initialize_workflow.outputs.changes_in_spec == 'true' }} + uses: ./.github/workflows/producer_dbt.yml + strategy: + matrix: ${{ fromJson(needs.initialize_workflow.outputs.dbt_matrix) }} + with: + ol_release: ${{ matrix.openlineage_versions }} + dbt_release: ${{ matrix.component_version }} + get-latest-snapshots: 'false' + ######## COLLECTION OF REPORTS AND EXECUTE APPROPRIATE ACTIONS ######## collect-and-compare-reports: @@ -162,6 +179,7 @@ jobs: - scenarios_check - spark-dataproc - hive-dataproc + - dbt uses: ./.github/workflows/collect_and_compare_reports.yml with: fail-for-new-failures: true diff --git a/.github/workflows/main_pr.yml b/.github/workflows/main_pr.yml index e52f36af..a379356a 100644 --- a/.github/workflows/main_pr.yml +++ b/.github/workflows/main_pr.yml @@ -19,10 +19,12 @@ jobs: run_scenarios: ${{ steps.get-changed.outputs.scenarios_changed }} run_spark_dataproc: ${{ steps.get-changed.outputs.spark_dataproc_changed }} run_hive_dataproc: ${{ steps.get-changed.outputs.hive_dataproc_changed }} + run_dbt: ${{ steps.get-changed.outputs.dbt_changed }} ol_release: ${{ steps.get-release.outputs.openlineage_release }} any_run: ${{ steps.get-changed.outputs.any_changed }} spark_matrix: ${{ steps.set-matrix-values.outputs.spark_dataproc_matrix }} hive_matrix: ${{ steps.set-matrix-values.outputs.hive_dataproc_matrix }} + dbt_matrix: ${{ steps.set-matrix-values.outputs.dbt_matrix }} steps: - name: Checkout code uses: actions/checkout@v4 @@ -55,8 +57,9 @@ jobs: dataplex=$(check_path "consumer/consumers/dataplex/" "dataplex_changed") spark_dataproc=$(check_path "producer/spark_dataproc/" "spark_dataproc_changed") hive_dataproc=$(check_path "producer/hive_dataproc/" "hive_dataproc_changed") + dbt=$(check_path "producer/dbt/" "dbt_changed") - if [[ $scenarios || $dataplex || $spark_dataproc || $hive_dataproc ]]; then + if [[ $scenarios || $dataplex || $spark_dataproc || $hive_dataproc || $dbt ]]; then echo "any_changed=true" >> $GITHUB_OUTPUT fi fi @@ -94,6 +97,7 @@ jobs: echo "spark_dataproc_matrix=$(get_matrix spark_dataproc)" >> $GITHUB_OUTPUT echo "hive_dataproc_matrix=$(get_matrix hive_dataproc)" >> $GITHUB_OUTPUT + echo "dbt_matrix=$(get_matrix dbt)" >> $GITHUB_OUTPUT ######## COMPONENT VALIDATION ######## @@ -145,6 +149,17 @@ jobs: component_release: ${{ matrix.component_version }} get-latest-snapshots: 'false' + dbt: + needs: initialize_workflow + if: ${{ needs.initialize_workflow.outputs.run_dbt == 'true' }} + uses: ./.github/workflows/producer_dbt.yml + strategy: + matrix: ${{ fromJson(needs.initialize_workflow.outputs.dbt_matrix) }} + with: + dbt_release: ${{ matrix.component_version }} + ol_release: ${{ matrix.openlineage_versions }} + get-latest-snapshots: 'false' + ######## COLLECTION OF REPORTS AND EXECUTE APPROPRIATE ACTIONS ######## collect-and-compare-reports: @@ -154,6 +169,7 @@ jobs: - dataplex - spark_dataproc - hive_dataproc + - dbt if: ${{ !failure() && needs.initialize_workflow.outputs.any_run == 'true'}} uses: ./.github/workflows/collect_and_compare_reports.yml with: diff --git a/.github/workflows/producer_dbt.yml b/.github/workflows/producer_dbt.yml new file mode 100644 index 00000000..724b813c --- /dev/null +++ b/.github/workflows/producer_dbt.yml @@ -0,0 +1,128 @@ +name: dbt Producer + +on: + workflow_call: + inputs: + dbt_release: + description: "release of dbt-core to use" + type: string + ol_release: + description: "release tag of OpenLineage to use" + type: string + get-latest-snapshots: + description: "Should the artifact be downloaded from maven repo or circleci" + type: string + workflow_dispatch: + inputs: + dbt_release: + description: "release of dbt-core to use" + type: string + default: "1.8.0" + ol_release: + description: "release tag of OpenLineage to use" + type: string + default: "1.23.0" + get-latest-snapshots: + description: "Should the artifact be downloaded from maven repo or circleci" + type: string + default: "false" + +jobs: + run-dbt-tests: + runs-on: ubuntu-latest + + services: + postgres: + image: postgres:15-alpine + env: + POSTGRES_USER: testuser + POSTGRES_PASSWORD: testpass + POSTGRES_DB: dbt_test + ports: + - 5432:5432 + options: >- + --health-cmd "pg_isready -U testuser -d dbt_test" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Initialize tests + id: init + run: | + scenarios=$(./scripts/get_valid_test_scenarios.sh "producer/dbt/scenarios/" ${{ inputs.dbt_release }} ${{ inputs.ol_release }} ) + if [[ "$scenarios" != "" ]]; then + echo "scenarios=$scenarios" >> $GITHUB_OUTPUT + echo "Found scenarios: $scenarios" + else + echo "No valid scenarios found for dbt ${{ inputs.dbt_release }} and OL ${{ inputs.ol_release }}" + fi + + - name: Set up Python 3.12 + if: ${{ steps.init.outputs.scenarios }} + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dbt dependencies + if: ${{ steps.init.outputs.scenarios }} + run: | + python -m pip install --upgrade pip + pip install dbt-core==${{ inputs.dbt_release }} + pip install dbt-postgres + pip install openlineage-dbt==${{ inputs.ol_release }} + + - name: Set producer output event dir + if: ${{ steps.init.outputs.scenarios }} + id: set-producer-output + run: | + echo "event_dir=/tmp/dbt-events-$(date +%s%3N)" >> $GITHUB_OUTPUT + + - name: Run dbt scenarios and create OL events + if: ${{ steps.init.outputs.scenarios }} + id: run-producer + continue-on-error: true + run: | + set -e + IFS=';' read -ra scenarios <<< "${{ steps.init.outputs.scenarios }}" + + for scenario in "${scenarios[@]}" + do + echo "Running dbt scenario: $scenario" + + mkdir -p "${{ steps.set-producer-output.outputs.event_dir }}/$scenario" + bash producer/dbt/scenarios/$scenario/test/run.sh "${{ steps.set-producer-output.outputs.event_dir }}/$scenario" + + echo "Finished running scenario: $scenario" + done + + echo "Finished running all scenarios" + + - uses: actions/upload-artifact@v4 + if: ${{ steps.init.outputs.scenarios }} + with: + name: dbt-${{inputs.dbt_release}}-${{inputs.ol_release}}-events + path: ${{ steps.set-producer-output.outputs.event_dir }} + retention-days: 1 + + - name: Validation + if: ${{ steps.init.outputs.scenarios }} + uses: ./.github/actions/run_event_validation + with: + component: 'dbt' + producer-dir: 'producer' + release_tags: ${{ inputs.get-latest-snapshots == 'true' && 'main' || inputs.ol_release }} + ol_release: ${{ inputs.ol_release }} + component_release: ${{ inputs.dbt_release }} + event-directory: ${{ steps.set-producer-output.outputs.event_dir }} + target-path: 'dbt-${{inputs.dbt_release}}-${{inputs.ol_release}}-report.json' + + - uses: actions/upload-artifact@v4 + if: ${{ steps.init.outputs.scenarios }} + with: + name: dbt-${{inputs.dbt_release}}-${{inputs.ol_release}}-report + path: dbt-${{inputs.dbt_release}}-${{inputs.ol_release}}-report.json + retention-days: 1 diff --git a/.gitignore b/.gitignore index b89ae39c..2c95801b 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,9 @@ __pycache__/ # C extensions *.so +#Status files and documentation +Status/ + # Distribution / packaging .Python build/ @@ -164,4 +167,10 @@ cython_debug/ .idea/ ignored/ -bin/ \ No newline at end of file +bin/ + +# OpenLineage event files generated during local testing +**/specs/ +**/output/ +**/test/openlineage.yml +dbt_producer_report.json \ No newline at end of file diff --git a/generated-files/releases.json b/generated-files/releases.json index b220bd2c..a7326ae9 100644 --- a/generated-files/releases.json +++ b/generated-files/releases.json @@ -7,6 +7,10 @@ "name": "spark_dataproc", "latest_version": "" }, + { + "name": "dbt", + "latest_version": "1.8.0" + }, { "name": "openlineage", "latest_version": "1.40.1" diff --git a/producer/dbt/README.md b/producer/dbt/README.md new file mode 100644 index 00000000..282a8da3 --- /dev/null +++ b/producer/dbt/README.md @@ -0,0 +1,169 @@ +# dbt Producer Compatibility Test + +## Purpose and Scope + +This directory contains a compatibility test for the `openlineage-dbt` integration. Its purpose is to provide a standardized and reproducible framework for validating that dbt's OpenLineage integration produces events compliant with the OpenLineage specification. + +This framework is designed as a reference for the community to: +- Verify that `dbt-ol` generates syntactically and semantically correct OpenLineage events for common dbt operations. +- Provide a consistent testing environment for `openlineage-dbt` across different versions. +- Serve as a foundation for more advanced testing scenarios, such as multi-spec or multi-implementation validation. + +It is important to note that this is a **compatibility validation framework** using synthetic data. It is not intended to be a demonstration of a production data pipeline. + +## Test Architecture and Workflow + +The test is orchestrated by the `run_dbt_tests.sh` script and follows a clear, sequential workflow designed for reliability and ease of use. This structure ensures that each component of the integration is validated systematically. + +The end-to-end process is as follows: + +1. **Test Orchestration**: The `run_dbt_tests.sh` script serves as the main entry point. It sets up the environment and initiates over the scenarios folder to execute each test scenario. + +2. **Scenario Execution**: The test runner executes the dbt project defined in the `runner/` directory. The specific dbt commands to be run (e.g., `dbt seed`, `dbt run`, `dbt test`) are defined in the test scenarios run script (`test/run.sh`). + +3. **Event Generation and Capture**: During the execution, the `dbt-ol` wrapper intercepts the dbt commands and emits OpenLineage events. The `test/openlineage.yml` configuration directs these events to be captured as a local file (`{directory_input_param}/events.jsonl`) using the `file` transport. + +4. **Extract events**: OpenLineage emits events reliable to one file ('append: true' causes overwrites and events to be lost) so it is required to extract the before validation. + +5. **Event Validation**: Once the dbt process is complete, the test framework performs a two-stage validation on the generated events: + * **Syntax Validation**: Each event is validated against the official OpenLineage JSON schema (e.g., version `1.40.1`) to ensure it is structurally correct. + * **Semantic Validation**: The content of the events is compared against expected templates. This deep comparison, powered by the `scripts/compare_events.py` utility, verifies the accuracy of job names, dataset identifiers, lineage relationships, and the presence and structure of key facets. + +6. **Reporting**: Upon completion, the test runner generates a standardized JSON report (`dbt_producer_report.json`) that details the results of each validation step. This report is designed to be consumed by higher-level aggregation scripts in a CI/CD environment. + +## Validation Scope + +This test validates that the `openlineage-dbt` integration correctly generates OpenLineage events for core dbt operations. + +#### dbt Operations Covered: +- `dbt seed`: To load initial data. +- `dbt run`: To execute dbt models. + +#### Validation Checks: +- **Event Generation**: Correctly creates `START` and `COMPLETE` events for jobs and runs. +- **Core Facet Structure and Content**: Validates key facets, including: + - `jobType` + - `sql` + - `dbt_run` + - `dbt_version` + - `dbt_node` + - `schema` + - `dataSource` + - `columnLineage` +- **Specification Compliance**: Events are validated against the OpenLineage specification schema (version `2-0-2`). + +## Test Structure + +The test is organized into the following key directories, each with a specific role in the validation process: + +``` +producer/dbt/ +├── run_dbt_tests.sh # Main test execution script +├── scenarios/ # Defines the dbt commands and expected outcomes for each test case +├── output/ # Default output directory for generated OpenLineage events (generated during execution) +├── runner/ # A self-contained dbt project used as the test target +└── specs/ # Stores OpenLineage spcification get from local repository (generated during execution) +``` + +- **`runner/`**: A self-contained dbt project with models, seeds, and configuration. This is the target of the `dbt-ol` command. +- **`scenarios/`**: Defines the dbt commands to be executed and contains the expected event templates for validation. +- **`output/`**: The default output directory for the generated `events.jsonl` file and extracted events. + +## How to Run the Tests + +There are two primary ways to run the dbt compatibility tests: **locally for development and debugging**, or via **GitHub Actions for automated CI/CD validation**. Both approaches use the same underlying test framework but differ in their database setup and execution environment. + +### Running Tests via GitHub Actions (Automated CI/CD) + +**This is the standard, automated test runner for the repository and community.** + +GitHub Actions provides the canonical testing environment with: +- PostgreSQL 15 service container (automatically provisioned) +- Matrix testing across multiple dbt and OpenLineage versions +- Automated event validation against OpenLineage specifications +- Integration with the repository's reporting and compatibility tracking + +#### Triggering GitHub Actions Workflows + +1. **Automatic Trigger on Pull Requests**: The workflow runs automatically when changes are detected in `producer/dbt/` paths. + +2. **Via Pull Request**: Opening a PR that modifies dbt producer files will automatically trigger the test suite. + +The GitHub Actions workflow: +- Provisions a PostgreSQL 15 container with health checks +- Installs `dbt-core`, `dbt-postgres`, and `openlineage-dbt` at specified versions +- Executes all scenarios defined in `scenarios/` +- Validates events against OpenLineage JSON schemas +- Generates compatibility reports and uploads artifacts + +**Configuration**: See `.github/workflows/producer_dbt.yml` for the complete workflow definition. + +--- + +### Local Debugging (Optional) + +**For development debugging, you may optionally run PostgreSQL locally. The standard test environment is GitHub Actions.** + +If you need to debug event generation locally: + +1. **Start PostgreSQL (Optional)**: + ```bash + cd producer/dbt/scenarions/csv_to_postgres/test + docker compose up + ``` + +2. **Install Python Dependencies**: + ```bash + # Activate virtual environment (recommended) + python -m venv venv + source venv/bin/activate # On Windows: venv\Scripts\activate + ``` + +3. **Run Test Scenario**: + ```bash + ./producer/dbt/run_dbt_tests.sh --openlineage-directory + ``` + +4. **Inspect Generated Events**: + ```bash + # View events + cat ./producer/dbt/output/csv_to_postgres/event-{id}.json | jq '.' + + # check report + cat ./producer/dbt/dbt_producer_report.json | jq '.' + ``` + +**Note**: Local debugging is entirely optional. All official validation happens in GitHub Actions with PostgreSQL service containers. The test runner (`test/run.sh`) is the same code used by CI/CD, ensuring consistency. + +## Important dbt Integration Notes + +**⚠️ Please review the [OpenLineage dbt documentation](https://openlineage.io/docs/integrations/dbt) before running tests.** + +This integration has several nuances that are important to understand when analyzing test results or extending the framework: + +- The `dbt-ol` wrapper has specific configuration requirements that differ from a standard `dbt` execution. +- Event emission timing can vary depending on the dbt command being run (`run`, `test`, `build`). +- The availability of certain dbt-specific facets may depend on the version of `dbt-core` being used. +- The file transport configuration in `openlineage.yml` directly controls the location and format of the event output. + +## ⚠️ Known Validation Issues + +The dbt integration emits facets for which we cannot apply syntax validation due to schema issues: + +### Custom dbt Facets: +1. **`dbt_version`** (Run Facet) + - **Purpose**: Captures the version of dbt-core being used + - **Schema**: `dbt-version-run-facet.json` + - **Example**: `{"version": "1.10.15"}` + - **Schema issues**: + - lack of the '$id' property + - lack of the 'properties' object + - wrong ref to the RunFacet definition: OpenLineage.json#/**definitions**/RunFacet instead of OpenLineage.json#/**$defs**/RunFacet + +2. **`dbt_run`** (Run Facet) + - **Purpose**: Captures dbt-specific execution metadata + - **Schema**: `dbt-run-run-facet.json` + - **Fields**: `dbt_runtime`, `invocation_id`, `profile_name`, `project_name`, `project_version` + - **Schema issues**: + - wrong type definition: "type": "**str**" instead of "type": "**string**" + - wrong ref to the RunFacet definition: OpenLineage.json#/**definitions**/RunFacet instead of OpenLineage.json#/**$defs**/RunFacet diff --git a/producer/dbt/maintainers.json b/producer/dbt/maintainers.json new file mode 100644 index 00000000..8574ae22 --- /dev/null +++ b/producer/dbt/maintainers.json @@ -0,0 +1,14 @@ +[ + { + "type": "maintainer", + "github-name": "BearingNode", + "email": "contact@bearingnode.com", + "link": "https://www.bearingnode.com" + }, + { + "type": "maintainer", + "github-name": "LegendPawel-Marut", + "email": "pawel.marut@xebia.com", + "link": "" + } +] \ No newline at end of file diff --git a/producer/dbt/run_dbt_tests.sh b/producer/dbt/run_dbt_tests.sh new file mode 100644 index 00000000..4069afb9 --- /dev/null +++ b/producer/dbt/run_dbt_tests.sh @@ -0,0 +1,146 @@ +#!/bin/bash + +################################################################################ +############ dbt Producer Compatibility Test Execution Script ################ +################################################################################ + +# Help message function +usage() { + echo "Usage: $0 [OPTIONS]" + echo "" + echo "Options:" + echo " --openlineage-directory PATH Path to openlineage repository directory (required)" + echo " --producer-output-events-dir PATH Path to producer output events directory (default: output)" + echo " --openlineage-release VERSION OpenLineage release version (default: 2-0-2)" + echo " --report-path PATH Path to report directory (default: ../dbt_producer_report.json)" + echo " -h, --help Show this help message and exit" + echo "" + echo "Example:" + echo " $0 --openlineage-directory /path/to/specs --producer-output-events-dir output --openlineage-release 2-0-2" + exit 0 +} + +# Required variables (no defaults) +OPENLINEAGE_DIRECTORY="" + +# Variables with default values +PRODUCER_OUTPUT_EVENTS_DIR=output +OPENLINEAGE_RELEASE=1.40.1 +REPORT_PATH="./dbt_producer_report.json" + +# If -h or --help is passed, print usage and exit +if [[ "$1" == "-h" || "$1" == "--help" ]]; then + usage +fi + +# Parse command line arguments +while [[ "$#" -gt 0 ]]; do + case $1 in + --openlineage-directory) OPENLINEAGE_DIRECTORY="$2"; shift ;; + --producer-output-events-dir) PRODUCER_OUTPUT_EVENTS_DIR="$2"; shift ;; + --openlineage-release) OPENLINEAGE_RELEASE="$2"; shift ;; + --report-path) REPORT_PATH="$2"; shift ;; + *) echo "Unknown parameter passed: $1"; usage ;; + esac + shift +done + +# Check required arguments +if [[ -z "$OPENLINEAGE_DIRECTORY" ]]; then + echo "Error: Missing required arguments." + usage +fi + +# fail if scenarios are not defined in scenario directory +[[ $(find scenarios | wc -l) -gt 0 ]] || { echo >&2 "NO SCENARIOS DEFINED IN scenarios"; exit 1; } + +mkdir -p "$PRODUCER_OUTPUT_EVENTS_DIR" + +echo "==============================================================================" +echo " dbt PRODUCER COMPATIBILITY TEST " +echo "==============================================================================" +echo "OpenLineage Directory: $OPENLINEAGE_DIRECTORY" +echo "Producer Output Events Dir: $PRODUCER_OUTPUT_EVENTS_DIR" +echo "OpenLineage Release: $OPENLINEAGE_RELEASE" +echo "Report Path: $REPORT_PATH" +echo "==============================================================================" + +################################################################################ +# +# SETUP ENVIRONMENT +# +################################################################################ + +# Check if scenario directory exists +if [[ ! -d "scenarios" ]]; then + echo "Error: scenarios directory not found" + exit 1 +fi + +#install python dependencies +python -m pip install --upgrade pip + +if [ -f ./runner/requirements.txt ]; then + pip install -r ./runner/requirements.txt +fi + +################################################################################ +# +# RUN dbt PRODUCER TESTS +# +################################################################################ + +echo "Running dbt producer tests..." +POSIX_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BASE_DIR="$(cygpath -m "$POSIX_DIR")" + +# Run tests for each scenario +echo "Discovering test scenarios..." +for scenario_dir in scenarios/*/; do + if [[ -d "$scenario_dir" && -f "${scenario_dir}config.json" ]]; then + SCENARIO_NAME=$(basename "$scenario_dir") + echo "Found scenario: $SCENARIO_NAME" + + mkdir -p "$PRODUCER_OUTPUT_EVENTS_DIR/$SCENARIO_NAME" + "$scenario_dir"test/run.sh "$BASE_DIR/$PRODUCER_OUTPUT_EVENTS_DIR/$SCENARIO_NAME" + + echo "Scenario $SCENARIO_NAME completed" + fi +done + +echo "EVENT VALIDATION FOR SPEC VERSION $OPENLINEAGE_RELEASE" + +# Generate JSON report +REPORT_DIR=$(dirname "$REPORT_PATH") +mkdir -p "$REPORT_DIR" + +SPECS_BASE_DIR="./specs" +DEST_DIR="$SPECS_BASE_DIR/$OPENLINEAGE_RELEASE" + +mkdir -p "$DEST_DIR" + +if [ -d "$OPENLINEAGE_DIRECTORY"/spec ]; then + find "$OPENLINEAGE_DIRECTORY"/spec -type f \( -name '*Facet.json' -o -name 'OpenLineage.json' \) -exec cp -t "$DEST_DIR" {} + +fi +if [ -d "$OPENLINEAGE_DIRECTORY"/integration/common/src/openlineage ]; then + find "$OPENLINEAGE_DIRECTORY"/integration/common/src/openlineage -type f -iname '*facet.json' -exec cp -t "$DEST_DIR" {} + +fi + +if [ -z "$(ls -A "$DEST_DIR")" ]; then + echo "Cannot collect OpenLineage specs" + exit 1 +fi + +pip install -r ../../scripts/requirements.txt + +python ../../scripts/validate_ol_events.py \ +--event_base_dir="$PRODUCER_OUTPUT_EVENTS_DIR" \ +--spec_base_dir="$SPECS_BASE_DIR" \ +--target="$REPORT_PATH" \ +--component="dbt" \ +--component_version="1.8.0" \ +--producer_dir=.. \ +--openlineage_version="$OPENLINEAGE_RELEASE" + +echo "EVENT VALIDATION FINISHED" +echo "REPORT CREATED IN $REPORT_PATH" \ No newline at end of file diff --git a/producer/dbt/runner/.user.yml b/producer/dbt/runner/.user.yml new file mode 100644 index 00000000..2ccd4906 --- /dev/null +++ b/producer/dbt/runner/.user.yml @@ -0,0 +1 @@ +id: 04966b3a-fec8-4902-afd7-fe1bb85bad5a diff --git a/producer/dbt/runner/dbt_project.yml b/producer/dbt/runner/dbt_project.yml new file mode 100644 index 00000000..a0eda818 --- /dev/null +++ b/producer/dbt/runner/dbt_project.yml @@ -0,0 +1,30 @@ +name: 'openlineage_compatibility_test' +version: '1.0.0' +config-version: 2 + +# This setting configures which "profile" dbt uses for this project. +profile: 'openlineage_compatibility_test' + +# These configurations specify where dbt should look for different types of files. +model-paths: ["models"] +analysis-paths: ["analyses"] +test-paths: ["tests"] +seed-paths: ["seeds"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +target-path: "target" # directory which will store compiled SQL files +clean-targets: # directories to be removed by `dbt clean` + - "target" + - "dbt_packages" + +# Configuring models +# Full documentation: https://docs.getdbt.com/reference/model-configs + +models: + openlineage_compatibility_test: + # Config indicated by + and applies to all files under models/example/ + staging: + +materialized: table + marts: + +materialized: table \ No newline at end of file diff --git a/producer/dbt/runner/models/marts/customer_analytics.sql b/producer/dbt/runner/models/marts/customer_analytics.sql new file mode 100644 index 00000000..010bf2d8 --- /dev/null +++ b/producer/dbt/runner/models/marts/customer_analytics.sql @@ -0,0 +1,19 @@ +select + c.customer_id, + c.customer_name, + c.email, + c.segment, + c.value_tier, + count(o.order_id) as total_orders, + sum(o.completed_amount) as total_revenue, + avg(o.completed_amount) as avg_order_value, + max(o.order_date) as last_order_date +from {{ ref('stg_customers') }} c +left join {{ ref('stg_orders') }} o + on c.customer_id = o.customer_id +group by + c.customer_id, + c.customer_name, + c.email, + c.segment, + c.value_tier \ No newline at end of file diff --git a/producer/dbt/runner/models/schema.yml b/producer/dbt/runner/models/schema.yml new file mode 100644 index 00000000..8d009656 --- /dev/null +++ b/producer/dbt/runner/models/schema.yml @@ -0,0 +1,69 @@ +version: 2 + +sources: + - name: raw_data + description: Raw CSV data files + schema: main + tables: + - name: raw_customers + description: Raw customer data + columns: + - name: customer_id + description: Unique customer identifier + tests: + - unique + - not_null + - name: email + description: Customer email address + tests: + - unique + - not_null + + - name: raw_orders + description: Raw order data + columns: + - name: order_id + description: Unique order identifier + tests: + - unique + - not_null + - name: customer_id + description: Foreign key to customers + tests: + - not_null + +models: + - name: stg_customers + description: Cleaned and standardized customer data + columns: + - name: customer_id + description: Unique customer identifier + tests: + - unique + - not_null + + - name: stg_orders + description: Cleaned order data excluding cancelled orders + columns: + - name: order_id + description: Unique order identifier + tests: + - unique + - not_null + - name: customer_id + description: Foreign key to customers + tests: + - not_null + + - name: customer_analytics + description: Customer analytics with aggregated metrics + columns: + - name: customer_id + description: Unique customer identifier + tests: + - unique + - not_null + - name: total_revenue + description: Total completed revenue per customer + tests: + - not_null \ No newline at end of file diff --git a/producer/dbt/runner/models/staging/stg_customers.sql b/producer/dbt/runner/models/staging/stg_customers.sql new file mode 100644 index 00000000..fafc6a86 --- /dev/null +++ b/producer/dbt/runner/models/staging/stg_customers.sql @@ -0,0 +1,12 @@ +select + customer_id, + name as customer_name, + email, + registration_date, + segment, + case + when segment = 'enterprise' then 'high_value' + when segment = 'premium' then 'medium_value' + else 'standard_value' + end as value_tier +from {{ ref('raw_customers') }} \ No newline at end of file diff --git a/producer/dbt/runner/models/staging/stg_orders.sql b/producer/dbt/runner/models/staging/stg_orders.sql new file mode 100644 index 00000000..5c0e38bc --- /dev/null +++ b/producer/dbt/runner/models/staging/stg_orders.sql @@ -0,0 +1,12 @@ +select + order_id, + customer_id, + order_date, + amount, + status, + case + when status = 'completed' then amount + else 0 + end as completed_amount +from {{ ref('raw_orders') }} +where status != 'cancelled' \ No newline at end of file diff --git a/producer/dbt/runner/openlineage.yml b/producer/dbt/runner/openlineage.yml new file mode 100644 index 00000000..c1acf9cd --- /dev/null +++ b/producer/dbt/runner/openlineage.yml @@ -0,0 +1,4 @@ +transport: + type: file + log_file_path: ../events/openlineage_events.jsonl + append: true diff --git a/producer/dbt/runner/profiles.yml b/producer/dbt/runner/profiles.yml new file mode 100644 index 00000000..fe1863c3 --- /dev/null +++ b/producer/dbt/runner/profiles.yml @@ -0,0 +1,12 @@ +openlineage_compatibility_test: + target: postgres + outputs: + postgres: + type: postgres + host: "{{ env_var('DBT_POSTGRES_HOST', 'localhost') }}" + port: "{{ env_var('DBT_POSTGRES_PORT', '5432') | as_number }}" + user: "{{ env_var('DBT_POSTGRES_USER', 'testuser') }}" + password: "{{ env_var('DBT_POSTGRES_PASSWORD', 'testpass') }}" + dbname: "{{ env_var('DBT_POSTGRES_DB', 'dbt_test') }}" + schema: "{{ env_var('DBT_POSTGRES_SCHEMA', 'main') }}" + threads: 4 \ No newline at end of file diff --git a/producer/dbt/runner/requirements.txt b/producer/dbt/runner/requirements.txt new file mode 100644 index 00000000..0a3e411d --- /dev/null +++ b/producer/dbt/runner/requirements.txt @@ -0,0 +1,9 @@ +# OpenLineage dbt Producer Test Dependencies +# Install: pip install -r requirements.txt + +# dbt dependencies +dbt-core>=1.5.0 +dbt-postgres>=1.5.0 + +# OpenLineage integration (if available) +openlineage-dbt>=0.28.0 \ No newline at end of file diff --git a/producer/dbt/runner/seeds/raw_customers.csv b/producer/dbt/runner/seeds/raw_customers.csv new file mode 100644 index 00000000..686b805b --- /dev/null +++ b/producer/dbt/runner/seeds/raw_customers.csv @@ -0,0 +1,6 @@ +customer_id,name,email,registration_date,segment +1,John Doe,john.doe@example.com,2023-01-15,premium +2,Jane Smith,jane.smith@example.com,2023-02-20,standard +3,Bob Johnson,bob.johnson@example.com,2023-03-10,premium +4,Alice Brown,alice.brown@example.com,2023-04-05,standard +5,Charlie Wilson,charlie.wilson@example.com,2023-05-12,enterprise \ No newline at end of file diff --git a/producer/dbt/runner/seeds/raw_orders.csv b/producer/dbt/runner/seeds/raw_orders.csv new file mode 100644 index 00000000..2201b5ad --- /dev/null +++ b/producer/dbt/runner/seeds/raw_orders.csv @@ -0,0 +1,9 @@ +order_id,customer_id,order_date,amount,status +1001,1,2023-06-01,150.00,completed +1002,2,2023-06-02,89.99,completed +1003,1,2023-06-03,220.50,pending +1004,3,2023-06-04,75.25,completed +1005,4,2023-06-05,300.00,completed +1006,2,2023-06-06,45.00,cancelled +1007,5,2023-06-07,500.00,completed +1008,3,2023-06-08,125.75,pending \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/config.json b/producer/dbt/scenarios/csv_to_postgres/config.json new file mode 100644 index 00000000..6a737208 --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/config.json @@ -0,0 +1,270 @@ +{ + "component_versions": { + "min": "1.8.0", + "max": "1.8.0" + }, + "openlineage_versions": { + "min": "1.0.0", + "max": "5.0.0" + }, + "tests": [ + { + "name": "customers_datasource_facet_test", + "path": "events/customers/datasource_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "dataSource" + ], + "lineage_level": { + "postgres": [ + "dataset" + ] + } + } + }, + { + "name": "customers_schema_facet_test", + "path": "events/customers/schema_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "schema" + ], + "lineage_level": { + "postgres": [ + "dataset" + ] + } + } + }, + { + "name": "customers_column_lineage_test", + "path": "events/customers/column_lineage_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "columnLineage" + ], + "lineage_level": { + "postgres": [ + "dataset", + "column" + ] + } + } + }, + { + "name": "customers_sql_facet_test", + "path": "events/customers/sql_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "sql" + ], + "lineage_level": { + "postgres": [ + "dataset" + ] + } + } + }, + { + "name": "orders_datasource_facet_test", + "path": "events/orders/datasource_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "dataSource" + ], + "lineage_level": { + "postgres": [ + "dataset" + ] + } + } + }, + { + "name": "orders_schema_facet_test", + "path": "events/orders/schema_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "schema" + ], + "lineage_level": { + "postgres": [ + "dataset" + ] + } + } + }, + { + "name": "orders_column_lineage_test", + "path": "events/orders/column_lineage_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "columnLineage" + ], + "lineage_level": { + "postgres": [ + "dataset", + "column" + ] + } + } + }, + { + "name": "orders_sql_facet_test", + "path": "events/orders/sql_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "sql" + ], + "lineage_level": { + "postgres": [ + "dataset" + ] + } + } + }, + { + "name": "analytics_datasource_facet_test", + "path": "events/analytics/datasource_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "dataSource" + ], + "lineage_level": { + "postgres": [ + "dataset" + ] + } + } + }, + { + "name": "analytics_schema_facet_test", + "path": "events/analytics/schema_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "schema" + ], + "lineage_level": { + "postgres": [ + "dataset" + ] + } + } + }, + { + "name": "analytics_column_lineage_test", + "path": "events/analytics/column_lineage_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "columnLineage" + ], + "lineage_level": { + "postgres": [ + "dataset", + "column" + ] + } + } + }, + { + "name": "analytics_sql_facet_test", + "path": "events/analytics/sql_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "sql" + ], + "lineage_level": { + "postgres": [ + "dataset" + ] + } + } + }, + { + "name": "dbt_node_facet_test", + "path": "events/dbt_node_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "dbt_node" + ], + "lineage_level": { + "postgres": [ + "dataset" + ] + } + } + }, + { + "name": "dbt_run_facet_test", + "path": "events/dbt_run_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "dbt_run" + ], + "lineage_level": { + "postgres": [ + "dataset" + ] + } + } + }, + { + "name": "dbt_version_facet_test", + "path": "events/dbt_version_event.json", + "openlineage_versions": { + "min": "1.38.0" + }, + "tags": { + "facets": [ + "dbt_version" + ], + "lineage_level": { + "postgres": [ + "dataset" + ] + } + } + } + ] +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/analytics/column_lineage_event.json b/producer/dbt/scenarios/csv_to_postgres/events/analytics/column_lineage_event.json new file mode 100644 index 00000000..92330dac --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/analytics/column_lineage_event.json @@ -0,0 +1,113 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.customer_analytics" + }, + "outputs": [ + { + "name": "dbt_test.main.customer_analytics", + "namespace": "postgres://localhost:5432", + "facets": { + "columnLineage": { + "fields": { + "avg_order_value": { + "inputFields": [ + { + "field": "completed_amount", + "name": "dbt_test.main.stg_orders", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "customer_id": { + "inputFields": [ + { + "field": "customer_id", + "name": "dbt_test.main.stg_customers", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "customer_name": { + "inputFields": [ + { + "field": "customer_name", + "name": "dbt_test.main.stg_customers", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "email": { + "inputFields": [ + { + "field": "email", + "name": "dbt_test.main.stg_customers", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "last_order_date": { + "inputFields": [ + { + "field": "order_date", + "name": "dbt_test.main.stg_orders", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "segment": { + "inputFields": [ + { + "field": "segment", + "name": "dbt_test.main.stg_customers", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "total_orders": { + "inputFields": [ + { + "field": "order_id", + "name": "dbt_test.main.stg_orders", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "total_revenue": { + "inputFields": [ + { + "field": "completed_amount", + "name": "dbt_test.main.stg_orders", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "value_tier": { + "inputFields": [ + { + "field": "value_tier", + "name": "dbt_test.main.stg_customers", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + } + } + }, + "dataSource": { + "name": "postgres://localhost:5432", + "uri": "postgres://localhost:5432" + } + } + } + ] +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/analytics/datasource_event.json b/producer/dbt/scenarios/csv_to_postgres/events/analytics/datasource_event.json new file mode 100644 index 00000000..b3170aca --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/analytics/datasource_event.json @@ -0,0 +1,41 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.customer_analytics" + }, + "inputs": [ + { + "name": "dbt_test.main.stg_customers", + "namespace": "postgres://localhost:5432", + "facets": { + "dataSource": { + "name": "postgres://localhost:5432", + "uri": "postgres://localhost:5432" + } + } + }, + { + "name": "dbt_test.main.stg_orders", + "namespace": "postgres://localhost:5432", + "facets": { + "dataSource": { + "name": "postgres://localhost:5432", + "uri": "postgres://localhost:5432" + } + } + } + ], + "outputs": [ + { + "name": "dbt_test.main.customer_analytics", + "namespace": "postgres://localhost:5432", + "facets": { + "dataSource": { + "name": "postgres://localhost:5432", + "uri": "postgres://localhost:5432" + } + } + } + ] +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/analytics/schema_event.json b/producer/dbt/scenarios/csv_to_postgres/events/analytics/schema_event.json new file mode 100644 index 00000000..5653bb12 --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/analytics/schema_event.json @@ -0,0 +1,61 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.customer_analytics" + }, + "inputs": [ + { + "name": "dbt_test.main.stg_customers", + "namespace": "postgres://localhost:5432", + "facets": { + "schema": { + "fields": [ + { + "description": "Unique customer identifier", + "name": "customer_id" + } + ] + } + } + }, + { + "name": "dbt_test.main.stg_orders", + "namespace": "postgres://localhost:5432", + "facets": { + "schema": { + "fields": [ + { + "description": "Unique order identifier", + "name": "order_id" + }, + { + "description": "Foreign key to customers", + "name": "customer_id" + } + ] + } + } + } + ], + "outputs": [ + { + "name": "dbt_test.main.customer_analytics", + "namespace": "postgres://localhost:5432", + "facets": { + "schema": { + "fields": [ + { + "description": "Unique customer identifier", + "name": "customer_id" + }, + { + "description": "Total completed revenue per customer", + "name": "total_revenue" + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/analytics/sql_event.json b/producer/dbt/scenarios/csv_to_postgres/events/analytics/sql_event.json new file mode 100644 index 00000000..d72b09fb --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/analytics/sql_event.json @@ -0,0 +1,13 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.customer_analytics", + "facets": { + "sql": { + "dialect": "postgres", + "query": "select\n c.customer_id,\n c.customer_name,\n c.email,\n c.segment,\n c.value_tier,\n count(o.order_id) as total_orders,\n sum(o.completed_amount) as total_revenue,\n avg(o.completed_amount) as avg_order_value,\n max(o.order_date) as last_order_date\nfrom \"dbt_test\".\"main\".\"stg_customers\" c\nleft join \"dbt_test\".\"main\".\"stg_orders\" o \n on c.customer_id = o.customer_id\ngroup by \n c.customer_id,\n c.customer_name,\n c.email,\n c.segment,\n c.value_tier" + } + } + } +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/customers/column_lineage_event.json b/producer/dbt/scenarios/csv_to_postgres/events/customers/column_lineage_event.json new file mode 100644 index 00000000..f58fceb6 --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/customers/column_lineage_event.json @@ -0,0 +1,79 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.stg_customers" + }, + "outputs": [ + { + "name": "dbt_test.main.stg_customers", + "namespace": "postgres://localhost:5432", + "facets": { + "columnLineage": { + "fields": { + "customer_id": { + "inputFields": [ + { + "field": "customer_id", + "name": "dbt_test.main.raw_customers", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "customer_name": { + "inputFields": [ + { + "field": "name", + "name": "dbt_test.main.raw_customers", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "email": { + "inputFields": [ + { + "field": "email", + "name": "dbt_test.main.raw_customers", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "registration_date": { + "inputFields": [ + { + "field": "registration_date", + "name": "dbt_test.main.raw_customers", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "segment": { + "inputFields": [ + { + "field": "segment", + "name": "dbt_test.main.raw_customers", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "value_tier": { + "inputFields": [ + { + "field": "segment", + "name": "dbt_test.main.raw_customers", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + } + } + } + } + } + ] +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/customers/datasource_event.json b/producer/dbt/scenarios/csv_to_postgres/events/customers/datasource_event.json new file mode 100644 index 00000000..85eff81e --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/customers/datasource_event.json @@ -0,0 +1,19 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.stg_customers" + }, + "outputs": [ + { + "name": "dbt_test.main.stg_customers", + "namespace": "postgres://localhost:5432", + "facets": { + "dataSource": { + "name": "postgres://localhost:5432", + "uri": "postgres://localhost:5432" + } + } + } + ] +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/customers/schema_event.json b/producer/dbt/scenarios/csv_to_postgres/events/customers/schema_event.json new file mode 100644 index 00000000..da8a715b --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/customers/schema_event.json @@ -0,0 +1,23 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.stg_customers" + }, + "outputs": [ + { + "name": "dbt_test.main.stg_customers", + "namespace": "postgres://localhost:5432", + "facets": { + "schema": { + "fields": [ + { + "name": "customer_id", + "description": "Unique customer identifier" + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/customers/sql_event.json b/producer/dbt/scenarios/csv_to_postgres/events/customers/sql_event.json new file mode 100644 index 00000000..7c8ee37c --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/customers/sql_event.json @@ -0,0 +1,13 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.stg_customers", + "facets": { + "sql": { + "dialect": "postgres", + "query": "select\n customer_id,\n name as customer_name,\n email,\n registration_date,\n segment,\n case \n when segment = 'enterprise' then 'high_value'\n when segment = 'premium' then 'medium_value'\n else 'standard_value'\n end as value_tier\nfrom \"dbt_test\".\"main\".\"raw_customers\"" + } + } + } +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/dbt_node_event.json b/producer/dbt/scenarios/csv_to_postgres/events/dbt_node_event.json new file mode 100644 index 00000000..c57a4034 --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/dbt_node_event.json @@ -0,0 +1,16 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.customer_analytics", + "facets": { + "dbt_node": { + "alias": "customer_analytics", + "database": "dbt_test", + "original_file_path": "{{ match(result, 'models.+') }}", + "schema": "main", + "unique_id": "model.openlineage_compatibility_test.customer_analytics" + } + } + } +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/dbt_run_event.json b/producer/dbt/scenarios/csv_to_postgres/events/dbt_run_event.json new file mode 100644 index 00000000..6617942a --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/dbt_run_event.json @@ -0,0 +1,19 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.customer_analytics" + }, + "run": { + "runId": "{{ is_uuid(result) }}", + "facets": { + "dbt_run": { + "dbt_runtime": "core", + "invocation_id": "{{ is_uuid(result) }}", + "profile_name": "openlineage_compatibility_test", + "project_name": "openlineage_compatibility_test", + "project_version": "{{ any(result) }}" + } + } + } +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/dbt_version_event.json b/producer/dbt/scenarios/csv_to_postgres/events/dbt_version_event.json new file mode 100644 index 00000000..59a69e57 --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/dbt_version_event.json @@ -0,0 +1,15 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.customer_analytics" + }, + "run": { + "runId": "{{ is_uuid(result) }}", + "facets": { + "dbt_version": { + "version": "{{ any(result) }}" + } + } + } +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/orders/column_lineage_event.json b/producer/dbt/scenarios/csv_to_postgres/events/orders/column_lineage_event.json new file mode 100644 index 00000000..9b31be18 --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/orders/column_lineage_event.json @@ -0,0 +1,85 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.stg_orders" + }, + "outputs": [ + { + "name": "dbt_test.main.stg_orders", + "namespace": "postgres://localhost:5432", + "facets": { + "columnLineage": { + "fields": { + "amount": { + "inputFields": [ + { + "field": "amount", + "name": "dbt_test.main.raw_orders", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "completed_amount": { + "inputFields": [ + { + "field": "amount", + "name": "dbt_test.main.raw_orders", + "namespace": "postgres://localhost:5432", + "transformations": [] + }, + { + "field": "status", + "name": "dbt_test.main.raw_orders", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "customer_id": { + "inputFields": [ + { + "field": "customer_id", + "name": "dbt_test.main.raw_orders", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "order_date": { + "inputFields": [ + { + "field": "order_date", + "name": "dbt_test.main.raw_orders", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "order_id": { + "inputFields": [ + { + "field": "order_id", + "name": "dbt_test.main.raw_orders", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + }, + "status": { + "inputFields": [ + { + "field": "status", + "name": "dbt_test.main.raw_orders", + "namespace": "postgres://localhost:5432", + "transformations": [] + } + ] + } + } + } + } + } + ] +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/orders/datasource_event.json b/producer/dbt/scenarios/csv_to_postgres/events/orders/datasource_event.json new file mode 100644 index 00000000..2649d314 --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/orders/datasource_event.json @@ -0,0 +1,19 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.stg_orders" + }, + "outputs": [ + { + "name": "dbt_test.main.stg_orders", + "namespace": "postgres://localhost:5432", + "facets": { + "dataSource": { + "name": "postgres://localhost:5432", + "uri": "postgres://localhost:5432" + } + } + } + ] +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/orders/schema_event.json b/producer/dbt/scenarios/csv_to_postgres/events/orders/schema_event.json new file mode 100644 index 00000000..a343c016 --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/orders/schema_event.json @@ -0,0 +1,27 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.stg_orders" + }, + "outputs": [ + { + "name": "dbt_test.main.stg_orders", + "namespace": "postgres://localhost:5432", + "facets": { + "schema": { + "fields": [ + { + "description": "Unique order identifier", + "name": "order_id" + }, + { + "description": "Foreign key to customers", + "name": "customer_id" + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/events/orders/sql_event.json b/producer/dbt/scenarios/csv_to_postgres/events/orders/sql_event.json new file mode 100644 index 00000000..6d854a26 --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/events/orders/sql_event.json @@ -0,0 +1,13 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "dbt", + "name": "dbt_test.main.openlineage_compatibility_test.stg_orders", + "facets": { + "sql": { + "dialect": "postgres", + "query": "select\n order_id,\n customer_id,\n order_date,\n amount,\n status,\n case \n when status = 'completed' then amount\n else 0\n end as completed_amount\nfrom \"dbt_test\".\"main\".\"raw_orders\"\nwhere status != 'cancelled'" + } + } + } +} \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/maintainers.json b/producer/dbt/scenarios/csv_to_postgres/maintainers.json new file mode 100644 index 00000000..dbbe0453 --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/maintainers.json @@ -0,0 +1,14 @@ +[ + { + "type": "maintainer", + "github-name": "BearingNode", + "email": "contact@bearingnode.com", + "link": "https://www.bearingnode.com" + }, + { + "type": "maintainer", + "github-name": "LegendPawel-Marut", + "email": "pawel.marut@xebia.com", + "link": "" + } +] \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/scenario.md b/producer/dbt/scenarios/csv_to_postgres/scenario.md new file mode 100644 index 00000000..266270ac --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/scenario.md @@ -0,0 +1,65 @@ +# CSV to PostgreSQL Scenario + +## Overview + +This scenario validates dbt's OpenLineage integration compliance using synthetic test data in a controlled CSV → dbt → PostgreSQL pipeline with file transport. + +**Purpose**: Compatibility testing and validation. + +## Data Flow + +``` +Synthetic CSV Files (customers.csv, orders.csv) + ↓ (dbt seed) +PostgreSQL Raw Tables + ↓ (dbt models) +Staging Models (stg_customers, stg_orders) + ↓ (dbt models) +Analytics Model (customer_analytics) +``` + +## Test Coverage + +The scenario validates the following OpenLineage facets: + +- **Datasource Facets**: Source and destination database +- **Schema Facets**: Column definitions and data types +- **SQL Facets**: Actual SQL transformations executed by dbt +- **Column Lineage**: Field-level transformations and dependencies +- **DBT Lineage**: DBT related validations like version, node and execution + +## Test Data Logic + +Synthetic customer analytics scenario designed for validation testing: +- Import synthetic customer and order data from CSV files +- Clean and standardize data in staging layer +- Create aggregated customer metrics in analytics layer + +**Note**: This uses entirely synthetic data designed to test OpenLineage integration, not representative of production data patterns. + +## Technical Details + +- **Source**: Synthetic CSV files with test customer and order data +- **Transform**: dbt models with staging and analytics layers +- **Target**: PostgreSQL database (CI/CD service container) +- **Transport**: OpenLineage file transport (JSON Lines format) +- **Validation**: Comprehensive facet compliance testing + +## Expected Outputs + +- 8 OpenLineage events for dbt job and model executions +- Schema facets describing table structures and column definitions +- SQL facets with actual transformation queries and dialect information +- Column lineage facets showing field-level transformations +- Dataset lineage tracking data flow between models +- DBT facets with project name, profile, models and dbt version + +## Validation Framework + +This scenario serves as a test harness for validating: +- dbt OpenLineage integration functionality +- OpenLineage event structure compliance +- Facet generation accuracy and completeness +- Community compatibility testing standards +- Lineage relationships between datasets +- Column lineage for field-level tracking \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/test/compose.yml b/producer/dbt/scenarios/csv_to_postgres/test/compose.yml new file mode 100644 index 00000000..b9eddf99 --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/test/compose.yml @@ -0,0 +1,20 @@ +version: "3.9" + +name: csv_to_postgres + +services: + postgres: + image: postgres:15-alpine + container_name: postgres15 + restart: always + environment: + POSTGRES_USER: testuser + POSTGRES_PASSWORD: testpass + POSTGRES_DB: dbt_test + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + +volumes: + postgres_data: \ No newline at end of file diff --git a/producer/dbt/scenarios/csv_to_postgres/test/run.sh b/producer/dbt/scenarios/csv_to_postgres/test/run.sh new file mode 100644 index 00000000..acb9dbd3 --- /dev/null +++ b/producer/dbt/scenarios/csv_to_postgres/test/run.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +PRODUCER_OUTPUT_EVENTS_DIR=$1 + +if [ -d "$PRODUCER_OUTPUT_EVENTS_DIR" ]; then + cd "$(dirname "${BASH_SOURCE[0]}")" || exit + + cat < openlineage.yml +transport: + type: file + log_file_path: "${PRODUCER_OUTPUT_EVENTS_DIR}/events.jsonl" + append: true +EOF + + dbt-ol seed --project-dir="../../../runner" --profiles-dir="../../../runner" --target=postgres --no-version-check + dbt-ol run --project-dir="../../../runner" --profiles-dir="../../../runner" --target=postgres --no-version-check + + jq -c '.' "${PRODUCER_OUTPUT_EVENTS_DIR}/events.jsonl" | nl -w1 -s' ' | while read -r i line; do + echo "$line" | jq '.' > "${PRODUCER_OUTPUT_EVENTS_DIR}/event-$i.json" + done + rm "${PRODUCER_OUTPUT_EVENTS_DIR}/events.jsonl" + +else + echo "Output events directory '${PRODUCER_OUTPUT_EVENTS_DIR}' does not exist" +fi \ No newline at end of file diff --git a/producer/dbt/versions.json b/producer/dbt/versions.json new file mode 100644 index 00000000..40f47508 --- /dev/null +++ b/producer/dbt/versions.json @@ -0,0 +1,6 @@ +{ + "openlineage_versions": [], + "component_version": [ + "1.8.0" + ] +} \ No newline at end of file diff --git a/scripts/select_components.sh b/scripts/select_components.sh index 5df3bb8a..8e25efa5 100755 --- a/scripts/select_components.sh +++ b/scripts/select_components.sh @@ -4,15 +4,50 @@ version_sum() { echo $(( var1 * 1000000 + var2 * 1000 + var3)) } -current_ol=$(cat generated-files/releases.json | jq -c '.[] | select(.name | contains("openlineage")) | .latest_version ' -r) -latest_ol=$(curl https://api.github.com/repos/OpenLineage/OpenLineage/releases/latest -s | jq .tag_name -r) +update_version() { + local name="$1" + local new_version="$2" -if (( $(version_sum $latest_ol) > $(version_sum $current_ol) )); then + jq --arg name "$name" \ + --arg version "$new_version" \ + 'map(if .name == $name then .latest_version = $version else . end)' \ + generated-files/updated-releases.json > /tmp/releases.json \ + && mv /tmp/releases.json generated-files/updated-releases.json +} + +ol_version() { + current_ol=$(jq -r '.[] | select(.name | contains("openlineage")) | .latest_version ' generated-files/releases.json) + latest_ol=$(curl https://api.github.com/repos/OpenLineage/OpenLineage/releases/latest -s | jq .tag_name -r) + + if (( $(version_sum $latest_ol) > $(version_sum $current_ol) )); then + update_version "openlineage" "$latest_ol" echo "ol_release=${latest_ol}" >> $GITHUB_OUTPUT - echo "releases_updated=true" >> $GITHUB_OUTPUT - jq --arg latest_ol "$latest_ol" 'map(if .name == "openlineage" then .latest_version = $latest_ol else . end)' \ - generated-files/releases.json > generated-files/updated-releases.json -else - cp generated-files/releases.json generated-files/updated-releases.json - echo "ol_release=${current_ol}" >> $GITHUB_OUTPUT + return 0 + fi + echo "ol_release=${current_ol}" >> $GITHUB_OUTPUT + return 1 +} + +dbt_version() { + current_dbt=$(jq -r '.[] | select(.name | contains("dbt")) | .latest_version ' generated-files/releases.json) + latest_dbt=$(pip index versions dbt-core | grep LATEST | awk '{print $2}') + + if (( $(version_sum $latest_dbt) > $(version_sum $current_dbt) )); then + update_version "dbt" "$latest_dbt" + echo "dbt_release=${latest_dbt}" >> $GITHUB_OUTPUT + return 0 + fi + echo "dbt_release=${current_dbt}" >> $GITHUB_OUTPUT + return 1 +} + +cp generated-files/releases.json generated-files/updated-releases.json + +ol_version +ol_version_changed=$? +dbt_version +dbt_version_changed=$? + +if ((ol_version_changed == 0)) || ((dbt_version_changed == 0)); then + echo "releases_updated=true" >> $GITHUB_OUTPUT fi \ No newline at end of file diff --git a/scripts/validate_ol_events.py b/scripts/validate_ol_events.py index 2a4795f1..f8f3572d 100644 --- a/scripts/validate_ol_events.py +++ b/scripts/validate_ol_events.py @@ -16,6 +16,7 @@ from jsonschema import RefResolver class OLSyntaxValidator: + def __init__(self, schema_validators, unchecked_facets=None, openlineage_version=None): self.schema_validators = schema_validators self.unchecked_facets = unchecked_facets or {} @@ -26,12 +27,18 @@ def is_custom_facet(facet, schema_type): if facet.get('_schemaURL') is not None: is_custom = any(facet.get('_schemaURL').__contains__(f'defs/{facet_type}Facet') for facet_type in ['Run', 'Job', 'Dataset', 'InputDataset', 'OutputDataset']) - if is_custom: + is_common = any(facet.get('_schemaURL').__contains__(f'schema/{facet_type}-facet.json') for facet_type in + ['dbt-version-run', 'dbt-run-run']) + if is_custom or is_common: print(f"facet {schema_type} seems to be custom facet, validation skipped") - return is_custom + return True return False - + @staticmethod + def is_facet(path): + # List of facets from the common package that can be used for syntax validation + common_facets = ['dbt-node-job-facet.json'] + return 'Facet.json' in path or path in common_facets @classmethod def get_validators(cls, spec_path, tags): @@ -40,17 +47,20 @@ def get_validators(cls, spec_path, tags): @classmethod def get_validator(cls, spec_path, tag, unchecked_facets=None): file_paths = listdir(join(spec_path, tag)) - facet_schemas = [load_json(join(spec_path, tag, path)) for path in file_paths if path.__contains__('Facet.json')] + facet_schemas = { path: load_json(join(spec_path, tag, path)) for path in file_paths if cls.is_facet(path) } spec_schema = next(load_json(join(spec_path, tag, path)) for path in file_paths if path.__contains__('OpenLineage.json')) schema_validators = {} - for schema in facet_schemas: - name = next(iter(schema['properties'])) - store = { - spec_schema['$id']: spec_schema, - schema['$id']: schema, - } - resolver = RefResolver(base_uri="", referrer=spec_schema, store=store) - schema_validators[name] = Draft202012Validator(schema, resolver=resolver) + for path, schema in facet_schemas.items(): + try: + name = next(iter(schema['properties'])) + store = { + spec_schema['$id']: spec_schema, + schema['$id']: schema, + } + resolver = RefResolver(base_uri="", referrer=spec_schema, store=store) + schema_validators[name] = Draft202012Validator(schema, resolver=resolver) + except KeyError as e: + print(f"WARNING: Cannot create schema validator for '{path}' due to missing key:", e) schema_validators['core'] = Draft202012Validator(spec_schema) return cls(schema_validators, unchecked_facets, tag)