Skip to content

Implement Ozone provider#13

Open
luserg wants to merge 9 commits intov2-10-stablefrom
feature/ozone-provider
Open

Implement Ozone provider#13
luserg wants to merge 9 commits intov2-10-stablefrom
feature/ozone-provider

Conversation

@luserg
Copy link
Collaborator

@luserg luserg commented Jan 30, 2026

Implement Ozone provider

This PR introduces a new Apache Ozone Provider for Apache Airflow, enabling seamless integration with Apache Ozone — a distributed, scalable object store for Apache Hadoop.

Features

  • Native CLI Operations: Manage volumes/buckets and perform filesystem operations on ofs:// and o3fs:// paths via the Ozone CLI
  • Data Transfers: Efficient data movement including HDFS → Ozone (DistCp), Ozone → S3 backups, and in-cluster moves/renames (metadata-only)
  • Hive Integration: Register Ozone paths as Hive table partitions for query access
  • Backup & DR: Create native Ozone bucket snapshots for disaster recovery
  • Monitoring: Sensors for Ozone FS paths/keys and S3 keys
  • SSL/TLS Support: Secure Native CLI and S3 Gateway connections with configurable certificate verification
  • Kerberos Authentication: Enterprise authentication for Native CLI with automatic kinit using keytab
  • Secrets Backend Support: Resolve sensitive values via secret:// references and mask secrets in logs
  • Reliability & Performance: Retries with exponential backoff, subprocess timeouts, detailed logging, and parallel transfers for bulk operations

@luserg luserg self-assigned this Jan 30, 2026
@luserg luserg force-pushed the feature/ozone-provider branch from 90aeb40 to edb8ab1 Compare February 8, 2026 22:18
Copy link
Collaborator

@gaxeliy gaxeliy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let’s unify the structure of the providers.
You followed the AWS example, and that’s good, but Dima’s version looks different already. Let’s not create a separate file for every operator or hook. Just take a look at how it’s done in Dima’s pull request.

@luserg
Copy link
Collaborator Author

luserg commented Feb 26, 2026

Let’s unify the structure of the providers. You followed the AWS example, and that’s good, but Dima’s version looks different already. Let’s not create a separate file for every operator or hook. Just take a look at how it’s done in Dima’s pull request.

I'll do this in a another PR or branch (with PR in this branch)

@luserg luserg requested a review from gaxeliy February 26, 2026 19:17
@luserg
Copy link
Collaborator Author

luserg commented Mar 3, 2026

Let’s unify the structure of the providers. You followed the AWS example, and that’s good, but Dima’s version looks different already. Let’s not create a separate file for every operator or hook. Just take a look at how it’s done in Dima’s pull request.

I'll do this in a another PR or branch (with PR in this branch)

done!

fs_mkdir = OzoneCreatePathOperator(
task_id="fs_mkdir",
path="ofs://om/vol1/bucket-native/data_dir",
path=f"ofs://{OM_HOST}/{NATIVE_VOLUME}/{NATIVE_BUCKET}/{NATIVE_DIR}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use Path-class from standard library?
I don't like os-specific delimiters.

ozone_conn_id: str = default_conn_name,
*,
retry_attempts: int = RETRY_ATTEMPTS,
):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> None:

self._kerberos_ticket_ready = False

@cached_property
def connection(self) -> object:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we type it more concrete? For example -> Connection:?

stderr_msg="Command stderr: %s",
)
return True
except OzoneCliError as err:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we need logging there?

retry_attempts: int = RETRY_ATTEMPTS,
timeout: int = SLOW_TIMEOUT_SECONDS,
**kwargs,
):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> None:

return self.get_connection(self.ozone_conn_id)

@cached_property
def connection_snapshot(self) -> OzoneConnSnapshot:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check - can we change connection from Airflow UI throw lifetime of hook?
Then caching is error.

except OzoneCliError as e:
error_stderr = (e.stderr or "").strip()
stderr_upper = error_stderr.upper()
if "FILE_ALREADY_EXISTS" in stderr_upper or "SNAPSHOT ALREADY EXISTS" in stderr_upper:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What mean this magic strings?
Add comment or extract to some

SNAPSHOT_ALREADY_EXISTS_MARKERS = ("FILE_ALREADY_EXISTS", "SNAPSHOT ALREADY EXISTS")

in utils/errors.py

self.create_path(dest_path, timeout=timeout)
dest_dir = dest_path.rstrip("/")
for matched_path in matched:
# CHANGED: use URIHelper.split_ozone_path() instead of local basename logic.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What mean "CHANGED"? Is this string actual?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about add SIGTERM signal handling.
Else subprocess will leak if Airflow will stop worker.

WARNING: it's not clear!

hook.create_path(self.path, timeout=self.timeout)


class OzoneUploadContentOperator(BaseOperator):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about add configurable content-size limit?
Something like

MAX_CONTENT_SIZE = 10 * 1024 * 1024
if len(content.encode('utf-8')) > MAX_CONTENT_SIZE:
    raise ValueError(f"Content exceeds {MAX_CONTENT_SIZE} bytes")

but content size in config

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants