Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,10 @@ asset-uris:
handler: airflow.providers.amazon.aws.assets.s3.sanitize_uri
to_openlineage_converter: airflow.providers.amazon.aws.assets.s3.convert_asset_to_openlineage
factory: airflow.providers.amazon.aws.assets.s3.create_asset
- schemes: [redshift]
handler: airflow.providers.amazon.aws.assets.redshift.sanitize_uri
factory: airflow.providers.amazon.aws.assets.redshift.create_asset
to_openlineage_converter: airflow.providers.amazon.aws.assets.redshift.convert_asset_to_openlineage

# dataset has been renamed to asset in Airflow 3.0
# This is kept for backward compatibility.
Expand All @@ -628,6 +632,10 @@ dataset-uris:
handler: airflow.providers.amazon.aws.assets.s3.sanitize_uri
to_openlineage_converter: airflow.providers.amazon.aws.assets.s3.convert_asset_to_openlineage
factory: airflow.providers.amazon.aws.assets.s3.create_asset
- schemes: [redshift]
handler: airflow.providers.amazon.aws.assets.redshift.sanitize_uri
factory: airflow.providers.amazon.aws.assets.redshift.create_asset
to_openlineage_converter: airflow.providers.amazon.aws.assets.redshift.convert_asset_to_openlineage

filesystems:
- airflow.providers.amazon.aws.fs.s3
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.providers.common.compat.assets import Asset

if TYPE_CHECKING:
from urllib.parse import SplitResult

from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset


def sanitize_uri(uri: SplitResult) -> SplitResult:
if not uri.netloc:
raise ValueError("URI format redshift:// must contain a host")
if uri.port is None:
host = uri.netloc.rstrip(":")
uri = uri._replace(netloc=f"{host}:5439")
if len(uri.path.split("/")) != 4: # Leading slash, database, schema, and table names.
raise ValueError("URI format redshift:// must contain database, schema, and table names")
return uri


def create_asset(
*, host: str, database: str, schema: str, table: str, port: int = 5439, extra: dict | None = None
) -> Asset:
return Asset(uri=f"redshift://{host}:{port}/{database}/{schema}/{table}", extra=extra)


def convert_asset_to_openlineage(asset: Asset, lineage_context) -> OpenLineageDataset:
"""Translate Asset with valid AIP-60 uri to OpenLineage with assistance from the hook."""
from urllib.parse import urlsplit

from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset

parsed = urlsplit(asset.uri)
_, database, schema, table = parsed.path.split("/") # Leading slash, database, schema, and table names.
return OpenLineageDataset(namespace=f"redshift://{parsed.netloc}", name=f"{database}.{schema}.{table}")
Original file line number Diff line number Diff line change
Expand Up @@ -667,15 +667,27 @@ def get_provider_info():
"handler": "airflow.providers.amazon.aws.assets.s3.sanitize_uri",
"to_openlineage_converter": "airflow.providers.amazon.aws.assets.s3.convert_asset_to_openlineage",
"factory": "airflow.providers.amazon.aws.assets.s3.create_asset",
}
},
{
"schemes": ["redshift"],
"handler": "airflow.providers.amazon.aws.assets.redshift.sanitize_uri",
"factory": "airflow.providers.amazon.aws.assets.redshift.create_asset",
"to_openlineage_converter": "airflow.providers.amazon.aws.assets.redshift.convert_asset_to_openlineage",
},
],
"dataset-uris": [
{
"schemes": ["s3"],
"handler": "airflow.providers.amazon.aws.assets.s3.sanitize_uri",
"to_openlineage_converter": "airflow.providers.amazon.aws.assets.s3.convert_asset_to_openlineage",
"factory": "airflow.providers.amazon.aws.assets.s3.create_asset",
}
},
{
"schemes": ["redshift"],
"handler": "airflow.providers.amazon.aws.assets.redshift.sanitize_uri",
"factory": "airflow.providers.amazon.aws.assets.redshift.create_asset",
"to_openlineage_converter": "airflow.providers.amazon.aws.assets.redshift.convert_asset_to_openlineage",
},
],
"filesystems": ["airflow.providers.amazon.aws.fs.s3"],
"hooks": [
Expand Down
90 changes: 90 additions & 0 deletions providers/amazon/tests/unit/amazon/aws/assets/test_redshift.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import urllib.parse

import pytest

from airflow.providers.amazon.aws.assets.redshift import (
convert_asset_to_openlineage,
create_asset,
sanitize_uri,
)
from airflow.providers.common.compat.assets import Asset


@pytest.mark.parametrize(
("original", "normalized"),
[
pytest.param(
"redshift://cluster.us-east-1:5439/database/schema/table",
"redshift://cluster.us-east-1:5439/database/schema/table",
id="normalized",
),
pytest.param(
"redshift://cluster.us-east-1/database/schema/table",
"redshift://cluster.us-east-1:5439/database/schema/table",
id="default-port",
),
],
)
def test_sanitize_uri_pass(original: str, normalized: str) -> None:
uri_i = urllib.parse.urlsplit(original)
uri_o = sanitize_uri(uri_i)
assert urllib.parse.urlunsplit(uri_o) == normalized


@pytest.mark.parametrize(
"value",
[
pytest.param("redshift://", id="blank"),
pytest.param("redshift:///database/schema/table", id="no-host"),
pytest.param("redshift://host/database/table", id="missing-component"),
pytest.param("redshift://host/database/schema/table/column", id="extra-component"),
],
)
def test_sanitize_uri_fail(value: str) -> None:
uri_i = urllib.parse.urlsplit(value)
with pytest.raises(ValueError, match="URI format redshift:// must contain"):
sanitize_uri(uri_i)


def test_sanitize_uri_fail_non_port() -> None:
uri_i = urllib.parse.urlsplit("redshift://cluster.us-east-1:abcd/database/schema/table")
with pytest.raises(ValueError, match="Port could not be cast to integer value as 'abcd'"):
sanitize_uri(uri_i)


def test_create_asset() -> None:
result = create_asset(host="cluster.us-east-1", database="mydb", schema="public", table="users")
assert result == Asset(uri="redshift://cluster.us-east-1:5439/mydb/public/users")


def test_create_asset_custom_port() -> None:
result = create_asset(
host="cluster.us-east-1", port=5440, database="mydb", schema="public", table="users"
)
assert result == Asset(uri="redshift://cluster.us-east-1:5440/mydb/public/users")


def test_convert_asset_to_openlineage() -> None:
asset = Asset(uri="redshift://cluster.us-east-1:5439/mydb/public/users")
ol_dataset = convert_asset_to_openlineage(asset=asset, lineage_context=None)
assert ol_dataset.namespace == "redshift://cluster.us-east-1:5439"
assert ol_dataset.name == "mydb.public.users"
14 changes: 14 additions & 0 deletions providers/apache/hdfs/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ sensors:
python-modules:
- airflow.providers.apache.hdfs.sensors.web_hdfs

asset-uris:
- schemes: [hdfs]
handler: airflow.providers.apache.hdfs.assets.hdfs.sanitize_uri
factory: airflow.providers.apache.hdfs.assets.hdfs.create_asset
to_openlineage_converter: airflow.providers.apache.hdfs.assets.hdfs.convert_asset_to_openlineage

# dataset has been renamed to asset in Airflow 3.0
# This is kept for backward compatibility.
dataset-uris:
- schemes: [hdfs]
handler: airflow.providers.apache.hdfs.assets.hdfs.sanitize_uri
factory: airflow.providers.apache.hdfs.assets.hdfs.create_asset
to_openlineage_converter: airflow.providers.apache.hdfs.assets.hdfs.convert_asset_to_openlineage

hooks:
- integration-name: WebHDFS
python-modules:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.providers.common.compat.assets import Asset

if TYPE_CHECKING:
from urllib.parse import SplitResult

from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset


def sanitize_uri(uri: SplitResult) -> SplitResult:
if not uri.netloc:
raise ValueError("URI format hdfs:// must contain a namenode host")
if not uri.path:
raise ValueError("URI format hdfs:// must contain a path")
return uri


def create_asset(*, host: str, path: str, port: int = 8020, extra: dict | None = None) -> Asset:
return Asset(uri=f"hdfs://{host}:{port}/{path}", extra=extra)


def convert_asset_to_openlineage(asset: Asset, lineage_context) -> OpenLineageDataset:
"""Translate Asset with valid AIP-60 uri to OpenLineage with assistance from the hook."""
from urllib.parse import urlsplit

from airflow.providers.common.compat.openlineage.facet import Dataset as OpenLineageDataset

parsed = urlsplit(asset.uri)
path = parsed.path[1:] if parsed.path.startswith("/") else parsed.path
return OpenLineageDataset(namespace=f"hdfs://{parsed.netloc}", name=path or "/")
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ def get_provider_info():
"python-modules": ["airflow.providers.apache.hdfs.sensors.web_hdfs"],
}
],
"asset-uris": [
{
"schemes": ["hdfs"],
"handler": "airflow.providers.apache.hdfs.assets.hdfs.sanitize_uri",
"factory": "airflow.providers.apache.hdfs.assets.hdfs.create_asset",
"to_openlineage_converter": "airflow.providers.apache.hdfs.assets.hdfs.convert_asset_to_openlineage",
}
],
"dataset-uris": [
{
"schemes": ["hdfs"],
"handler": "airflow.providers.apache.hdfs.assets.hdfs.sanitize_uri",
"factory": "airflow.providers.apache.hdfs.assets.hdfs.create_asset",
"to_openlineage_converter": "airflow.providers.apache.hdfs.assets.hdfs.convert_asset_to_openlineage",
}
],
"hooks": [
{"integration-name": "WebHDFS", "python-modules": ["airflow.providers.apache.hdfs.hooks.webhdfs"]}
],
Expand Down
16 changes: 16 additions & 0 deletions providers/apache/hdfs/tests/unit/apache/hdfs/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading
Loading