Conversation
90aeb40 to
edb8ab1
Compare
gaxeliy
left a comment
There was a problem hiding this comment.
Let’s unify the structure of the providers.
You followed the AWS example, and that’s good, but Dima’s version looks different already. Let’s not create a separate file for every operator or hook. Just take a look at how it’s done in Dima’s pull request.
I'll do this in a another PR or branch (with PR in this branch) |
done! |
| fs_mkdir = OzoneCreatePathOperator( | ||
| task_id="fs_mkdir", | ||
| path="ofs://om/vol1/bucket-native/data_dir", | ||
| path=f"ofs://{OM_HOST}/{NATIVE_VOLUME}/{NATIVE_BUCKET}/{NATIVE_DIR}", |
There was a problem hiding this comment.
How about use Path-class from standard library?
I don't like os-specific delimiters.
| ozone_conn_id: str = default_conn_name, | ||
| *, | ||
| retry_attempts: int = RETRY_ATTEMPTS, | ||
| ): |
| self._kerberos_ticket_ready = False | ||
|
|
||
| @cached_property | ||
| def connection(self) -> object: |
There was a problem hiding this comment.
Can we type it more concrete? For example -> Connection:?
| stderr_msg="Command stderr: %s", | ||
| ) | ||
| return True | ||
| except OzoneCliError as err: |
There was a problem hiding this comment.
Are we need logging there?
| retry_attempts: int = RETRY_ATTEMPTS, | ||
| timeout: int = SLOW_TIMEOUT_SECONDS, | ||
| **kwargs, | ||
| ): |
| return self.get_connection(self.ozone_conn_id) | ||
|
|
||
| @cached_property | ||
| def connection_snapshot(self) -> OzoneConnSnapshot: |
There was a problem hiding this comment.
Check - can we change connection from Airflow UI throw lifetime of hook?
Then caching is error.
| except OzoneCliError as e: | ||
| error_stderr = (e.stderr or "").strip() | ||
| stderr_upper = error_stderr.upper() | ||
| if "FILE_ALREADY_EXISTS" in stderr_upper or "SNAPSHOT ALREADY EXISTS" in stderr_upper: |
There was a problem hiding this comment.
What mean this magic strings?
Add comment or extract to some
SNAPSHOT_ALREADY_EXISTS_MARKERS = ("FILE_ALREADY_EXISTS", "SNAPSHOT ALREADY EXISTS")
in utils/errors.py
| self.create_path(dest_path, timeout=timeout) | ||
| dest_dir = dest_path.rstrip("/") | ||
| for matched_path in matched: | ||
| # CHANGED: use URIHelper.split_ozone_path() instead of local basename logic. |
There was a problem hiding this comment.
What mean "CHANGED"? Is this string actual?
There was a problem hiding this comment.
Think about add SIGTERM signal handling.
Else subprocess will leak if Airflow will stop worker.
WARNING: it's not clear!
| hook.create_path(self.path, timeout=self.timeout) | ||
|
|
||
|
|
||
| class OzoneUploadContentOperator(BaseOperator): |
There was a problem hiding this comment.
How about add configurable content-size limit?
Something like
MAX_CONTENT_SIZE = 10 * 1024 * 1024
if len(content.encode('utf-8')) > MAX_CONTENT_SIZE:
raise ValueError(f"Content exceeds {MAX_CONTENT_SIZE} bytes")
but content size in config
Implement Ozone provider
This PR introduces a new Apache Ozone Provider for Apache Airflow, enabling seamless integration with Apache Ozone — a distributed, scalable object store for Apache Hadoop.
Features