Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion orchestration/_tests/test_globus_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ def test_832_dispatcher(mocker: MockFixture):

mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret())

mocker.patch('orchestration.flows.bl832.move.schedule_prefect_flow', return_value=None)
mock_prune_controller = mocker.MagicMock()
mock_prune_controller.prune.return_value = True
mocker.patch('orchestration.flows.bl832.move.get_prune_controller', return_value=mock_prune_controller)

# Mock read_deployment_by_name with a manually defined mock class
class MockDeployment:
Expand Down
53 changes: 23 additions & 30 deletions orchestration/flows/bl832/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from orchestration.flows.scicat.ingest import scicat_ingest_flow
from orchestration.flows.bl832.config import Config832
from orchestration.globus.transfer import GlobusEndpoint, start_transfer
from orchestration.prefect import schedule_prefect_flow
from orchestration.prune_controller import get_prune_controller, PruneMethod
from orchestration.prometheus_utils import PrometheusMetrics


Expand Down Expand Up @@ -149,50 +149,43 @@ def process_new_832_file_task(
logger.info(
f"File successfully transferred from data832 to NERSC {file_path}. Task {task}"
)
flow_name = f"ingest scicat: {Path(file_path).name}"
# Ingest into SciCat
logger.info(f"Ingesting {file_path} with {TOMO_INGESTOR_SPEC}")
try:
scicat_ingest_flow(dataset_path=Path(file_path), ingester_spec=TOMO_INGESTOR_SPEC)
except Exception as e:
logger.error(f"SciCat ingest failed with {e}")

# schedule_prefect_flow(
# "ingest_scicat/ingest_scicat",
# flow_name,
# {"relative_path": relative_path},
# datetime.timedelta(0.0),
# )
logger.info("Initializing prune controller")
prune_controller = get_prune_controller(
prune_type=PruneMethod.GLOBUS,
config=config
)

bl832_settings = Variable.get("bl832-settings", _sync=True)

flow_name = f"delete spot832: {Path(file_path).name}"
# flow_name = f"delete spot832: {Path(file_path).name}"
schedule_spot832_delete_days = bl832_settings["delete_spot832_files_after_days"]
schedule_data832_delete_days = bl832_settings["delete_data832_files_after_days"]
schedule_prefect_flow(
"prune_spot832/prune_spot832",
flow_name,
{
"relative_path": relative_path,
"source_endpoint": config.spot832,
"check_endpoint": config.data832,
},

datetime.timedelta(days=schedule_spot832_delete_days),

# Schedule pruning from spot832
logger.info(f"Scheduling delete from spot832 in {schedule_spot832_delete_days} days")
prune_controller.prune(
file_path=relative_path,
source_endpoint=config.spot832,
check_endpoint=config.data832,
days_from_now=schedule_spot832_delete_days
)
logger.info(
f"Scheduled delete from spot832 at {datetime.timedelta(days=schedule_spot832_delete_days)}"
)

flow_name = f"delete data832: {Path(file_path).name}"
schedule_prefect_flow(
"prune_data832/prune_data832",
flow_name,
{
"relative_path": relative_path,
"source_endpoint": config.data832,
"check_endpoint": config.nersc832,
},
datetime.timedelta(days=schedule_data832_delete_days),
# Schedule pruning from data832
logger.info(f"Scheduling delete from data832 in {schedule_data832_delete_days} days")
prune_controller.prune(
file_path=relative_path,
source_endpoint=config.data832,
check_endpoint=config.nersc832,
days_from_now=schedule_data832_delete_days
)
logger.info(
f"Scheduled delete from data832 at {datetime.timedelta(days=schedule_data832_delete_days)}"
Expand Down
8 changes: 7 additions & 1 deletion orchestration/flows/bl832/prefect.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ deployments:

# Move new files to the appropriate locations (data832, nersc cfs) and ingest metadata in scicat
- name: new_file_832
entrypoint: orchestration/flows/bl832/move.py:process_new_832_file
entrypoint: orchestration/flows/bl832/move.py:process_new_832_file_flow
work_pool:
name: new_file_832_pool
work_queue_name: new_file_832_queue
Expand Down Expand Up @@ -56,6 +56,12 @@ deployments:
work_queue_name: alcf_recon_flow_queue

# Pruning flows
- name: prune_globus_endpoint
entrypoint: orchestration/prune_controller.py:prune_globus_endpoint
work_pool:
name: prune_832_pool
work_queue_name: prune_832_queue

- name: prune_data832_raw
entrypoint: orchestration/flows/bl832/prune.py:prune_data832_raw
work_pool:
Expand Down
6 changes: 2 additions & 4 deletions orchestration/prefect.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@ async def schedule(


@task(name="Schedule Prefect Flow")
def schedule_prefect_flow(
async def schedule_prefect_flow(
deployment_name, flow_run_name, parameters, duration_from_now: datetime.timedelta
):
logger = get_run_logger()
asyncio.run(
schedule(deployment_name, flow_run_name, parameters, duration_from_now, logger)
)
await schedule(deployment_name, flow_run_name, parameters, duration_from_now, logger)
return


Expand Down
14 changes: 9 additions & 5 deletions orchestration/prune_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from prefect.variables import Variable

from orchestration.config import BeamlineConfig
from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe
from orchestration.globus.transfer import GlobusEndpoint, init_transfer_client, prune_one_safe
from orchestration.prefect import schedule_prefect_flow
from orchestration.transfer_endpoints import FileSystemEndpoint, TransferEndpoint

Expand Down Expand Up @@ -291,14 +291,14 @@ def prune(
f"in {days_from_now.total_seconds()/86400:.1f} days")

try:
schedule_prefect_flow(
schedule_prefect_flow.submit(
deployment_name="prune_globus_endpoint/prune_globus_endpoint",
flow_run_name=flow_name,
parameters={
"relative_path": file_path,
"source_endpoint": source_endpoint,
"check_endpoint": check_endpoint,
"config": self.config
# "config": self.config
},
duration_from_now=days_from_now,
)
Expand All @@ -314,7 +314,7 @@ def prune_globus_endpoint(
relative_path: str,
source_endpoint: GlobusEndpoint,
check_endpoint: Optional[GlobusEndpoint] = None,
config: BeamlineConfig = None
config: Optional[BeamlineConfig] = None
) -> None:
"""
Prefect flow that performs the actual Globus endpoint pruning operation.
Expand All @@ -326,6 +326,10 @@ def prune_globus_endpoint(
"""
logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'")

if not config:
tc = init_transfer_client()
else:
tc = config.tc
globus_settings = Variable.get("globus-settings", _sync=True)
max_wait_seconds = globus_settings["max_wait_seconds"]
flow_name = f"prune_from_{source_endpoint.name}"
Expand All @@ -334,7 +338,7 @@ def prune_globus_endpoint(
prune_one_safe(
file=relative_path,
if_older_than_days=0,
transfer_client=config.tc,
transfer_client=tc,
source_endpoint=source_endpoint,
check_endpoint=check_endpoint,
logger=logger,
Expand Down