Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,010 changes: 1,912 additions & 98 deletions poetry.lock

Large diffs are not rendered by default.

56 changes: 54 additions & 2 deletions sds_data_manager/constructs/instrument_lambdas.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,69 @@ def __init__(
for q in sqs_queues:
self.instrument_lambda.add_event_source(SqsEventSource(q))

# Send reprocessing events to a sqs that dagster can poll from.
# Create a dead letter queue to save messages that could not be processed.
# This DLQ just saves the messages and doesn't do anything with them.
self.dead_letter_queue = sqs.Queue(
self,
"ReprocessingDeadLetterQueue",
queue_name="reprocessing_dead_letter_queue.fifo",
encryption=sqs.QueueEncryption.UNENCRYPTED,
fifo=True,
)

self.reprocessing_queue = sqs.Queue(
self,
"ReprocessQueue",
queue_name="ReprocessQueue.fifo",
# This timeout determines how long the queue waits for processing.
visibility_timeout=Duration.seconds(300),
fifo=True,
# Removes messages with identical content.
content_based_deduplication=True,
# The dead letter queue will take messages that failed retry.
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=1, queue=self.dead_letter_queue
),
)
self.reprocessing_sqs_url = self.reprocessing_queue.queue_url
# Create a lambda that the API can trigger to send messages to the reprocessing
# queue. This is necessary because HTTP API Gateway v2's parameter mapping
# expression language is too limited to forward query string parameters as an
# SQS message body. This lambda acts as a proxy that converts the
# query string parameters to a JSON message and sends it to the queue.
self.reprocessing_proxy_lambda = lambda_.Function(
self,
"reprocessing-proxy",
function_name="reprocessing-proxy",
code=code,
handler="SDSCode.pipeline_lambdas.reprocessing_proxy.lambda_handler",
runtime=lambda_.Runtime.PYTHON_3_12,
environment={
"QUEUE_URL": self.reprocessing_queue.queue_url,
},
memory_size=128,
timeout=Duration.seconds(30),
vpc=vpc,
vpc_subnets=subnet,
security_groups=[rds_security_group],
allow_public_subnet=True,
layers=layers,
)

# Permission for the lambda to send messages to the reprocessing queue
self.reprocessing_queue.grant_send_messages(self.reprocessing_proxy_lambda)
# Add api routes for triggering batch starter with a bulk reprocessing request
# Only allow authenticated routes for reprocessing
api.add_route(
route="/authorized/reprocess",
http_method="POST",
lambda_function=self.instrument_lambda,
lambda_function=self.reprocessing_proxy_lambda,
)
api.add_route(
route="/api-key/reprocess",
http_method="POST",
lambda_function=self.instrument_lambda,
lambda_function=self.reprocessing_proxy_lambda,
)

# Set up eventBridge rules to trigger batch starter lambda.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import json
import logging
import os

import boto3

sqs_client = boto3.client("sqs")
QUEUE_URL = os.environ["QUEUE_URL"]
# Logger setup
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def lambda_handler(event, context):
params = event.get("queryStringParameters", {})

logger.info(
f"Received event: {json.dumps(event, indent=2)}. Sending to reprocessing queue"
)

sqs_client.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps(params),
MessageGroupId="reprocess",
)

return {"statusCode": 200, "body": json.dumps({"message": "Reprocess job queued"})}
39 changes: 38 additions & 1 deletion sds_data_manager/orchestration/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,43 @@ def recursive_flatten_list(self, nested_list):
return flat_list


def get_kickoff_jobs(instrument: str | None = None) -> list[ProcessingJobNode]:
"""Return all the jobs that kick off each instrument pipeline.

These are nodes that are downstream from a node with the data_level equal to
"l0" and the descriptor equal to "raw".

If instrument is provided, return only the kickoff job for that instruments
pipeline.

Parameters
----------
instrument : str, optional
The instrument for which to get the kickoff job.

Returns
-------
list[ProcessingJobNode]
List of ProcessingJobNode that are the root job node of each instrument pipeline.
If instrument is provided, return only the kickoff job for that instrument.
"""
kick_off_jobs = []

reader = DependencyConfigReader()
for potential_job in reader.config:
for upstream_node in reader.inputs(potential_job):
if upstream_node.data_type == "l0" and upstream_node.descriptor == "raw":
if instrument and upstream_node.source == instrument:
return [reader.config[potential_job]]
kick_off_jobs.append(reader.config[potential_job])

if not kick_off_jobs:
logger.info(
"No kickoff jobs found. Please check the instrument dependency YAML files."
)
return kick_off_jobs


def upload_dependency_file(dependency_file_path: Path, serialized_dependencies: str):
"""Upload a JSON file containing a job's dependencies to S3.

Expand Down Expand Up @@ -327,4 +364,4 @@ def upload_dependency_file(dependency_file_path: Path, serialized_dependencies:
f"Unexpected error during cadence file upload: {e}. "
f"Dependency file upload failed and the job did not get kicked off."
)
return None
return None
4 changes: 3 additions & 1 deletion sds_data_manager/orchestration/imap_dagster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dagster import Definitions
from imap_data_access import VALID_DATALEVELS
from sds_data_manager.orchestration import custom_partitions, repoint_file, spice, idex
from sds_data_manager.orchestration import custom_partitions, repoint_file, spice, idex, \
reprocessing
from sds_data_manager.orchestration import spin
from sds_data_manager.orchestration.imap_file import IMAPAncillaryFileHandler, IMAPScienceFileHandler
from sds_data_manager.orchestration.imap_job import IMAPJobHandler
Expand Down Expand Up @@ -103,5 +104,6 @@
+ custom_partitions.sensors
+ spice.sensors
+ sensors
+ reprocessing.sensor
+ idex.L0_sensor,
)
4 changes: 2 additions & 2 deletions sds_data_manager/orchestration/imap_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
'l3c':300,
'l3d':300}

_partition_map = {
partition_map = {
"daily": custom_partitions.daily_partitions,
"repoint": custom_partitions.repoint_partitions,
"10d": custom_partitions.idex10_partitions,
Expand Down Expand Up @@ -115,7 +115,7 @@ def __init__(self,
"""
self.job_config = job

self.partitions_def = _partition_map.get(self.job_config.partition)
self.partitions_def = partition_map.get(self.job_config.partition)
self.sensor_schedule = _sensor_schedule.get(self.job_config.data_type, 600)

outputs_for_job = [x.to_dagster_asset() for x in self.job_config.outputs]
Expand Down
131 changes: 131 additions & 0 deletions sds_data_manager/orchestration/reprocessing.py
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know the specific but really easy to follow!

Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Reprocessing logic."""

import datetime
import json
import os

import boto3
from dagster import AssetKey, AssetSelection, SensorEvaluationContext, sensor
from dagster._core.execution.backfill import PartitionBackfill

from sds_data_manager.orchestration.dagster_utilities import get_affected_partitions
from sds_data_manager.orchestration.dependency import (
DependencyConfigReader,
get_kickoff_jobs,
)
from sds_data_manager.orchestration.imap_job import partition_map

SQS_CLIENT = boto3.client("sqs", "us-west-2")


def read_sqs_messages(sqs_queue_url=None):
"""Read SQS messages from the reprocessing queue."""
response = SQS_CLIENT.receive_message(
QueueUrl=sqs_queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=1,
)
return response.get("Messages", [])


@sensor(
asset_selection=AssetSelection.all(),
minimum_interval_seconds=100, # TODO what do we want here
)
def reprocess_sensor(context: SensorEvaluationContext):
"""Sensor that triggers reprocessing backfills."""
sqs_queue_url = os.getenv("REPROCESSING_SQS_URL")
messages = read_sqs_messages(sqs_queue_url)
reader = DependencyConfigReader()
if not messages:
return None

for message in messages:
params = json.loads(message["Body"])

instrument = params.get("instrument")
data_level = params.get("data_level")
descriptor = params.get("descriptor")
start_date = params.get("start_date")
end_date = params.get("end_date")

context.log.info(
f"A reprocessing event was triggered with the parameters: {instrument=}, "
f"{data_level=}, {descriptor=}, {start_date=}, {end_date=}"
)
if not end_date or not start_date:
raise ValueError(
"Start date and end date are required for a reprocessing Event."
)
if not instrument:
raise ValueError("Instrument must be provided for a reprocessing event.")

if bool(data_level) != bool(descriptor):
raise ValueError(
"data_level and descriptor must both be provided or both None."
)

if not data_level:
# If data_level is not provided, we need to reprocess all levels.
# Get the jobs that kick of each pipeline, to trigger processing
# for all levels.
root_job = get_kickoff_jobs(instrument)[0]
# Create the asset name based on the root job information
asset_name = f"{instrument}_{root_job.data_type}_{root_job.descriptor}"
partition = root_job.partition
else:
# If data_level is provided (and therefore descriptor) construct the
# asset name using the input parameters
asset_name = f"{instrument}_{data_level}_{descriptor}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bryan had issue with assets name with - and descriptor has -. Just FYI.

# Get the partition type from the dependency config
partition = reader.config[(instrument, data_level, descriptor)].partition

# Get the partitions definition based on the DependencyNode
partition_def = partition_map.get(partition)
# convert start and end date to datetime
start_date = datetime.datetime.strptime(start_date, "%Y%m%d").replace(
tzinfo=datetime.timezone.utc
)
end_date = datetime.datetime.strptime(end_date, "%Y%m%d").replace(
tzinfo=datetime.timezone.utc
)

# Determine the affected partitions based on the start_date and end_date
partition_keys = get_affected_partitions(
context, partition_def, start_date, end_date
)
if not partition_keys:
return None
context.log.info(
f"Reprocessing asset {asset_name} across {partition_keys} partitions"
)

backfill = PartitionBackfill.from_asset_partitions(
backfill_id=f"reprocess-{instrument}-{int(datetime.datetime.now().timestamp())}",
asset_graph=context.repository_def.asset_graph,
partition_names=partition_keys,
asset_selection=[AssetKey(asset_name)],
backfill_timestamp=datetime.datetime.now(datetime.timezone.utc).timestamp(),
tags={
"instrument": instrument,
"descriptor": descriptor or "",
"data_level": data_level or "",
},
dynamic_partitions_store=context.instance,
all_partitions=False,
title=None,
description=None,
run_config=None,
)

context.instance.add_backfill(backfill)

SQS_CLIENT.delete_message(
QueueUrl=sqs_queue_url,
ReceiptHandle=message["ReceiptHandle"],
)

return None


sensors = [reprocess_sensor]
11 changes: 7 additions & 4 deletions sds_data_manager/utils/stackbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def build_sds(

instrument_delay_sqs = file_arrive_sqs_construct.delay_queue

instrument_lambdas.BatchStarterLambda(
batch_starter_construct = instrument_lambdas.BatchStarterLambda(
scope=sdc_stack,
construct_id="BatchStarterLambda",
env=env,
Expand Down Expand Up @@ -461,8 +461,9 @@ def build_sds(
secret_name=ialirt_secret_name,
account_name=account_name,
)
# Dagster Stack
# Repo root is three levels up from this file (sds_data_manager/utils/stackbuilder.py)
# Dagster Stack
# Repo root is three levels up from this
# file (sds_data_manager/utils/stackbuilder.py)
repo_root = str(Path(__file__).parent.parent.parent)

dagster_repo_stack = dagster_construct.ElasticContainerRegistryStack(
Expand Down Expand Up @@ -493,7 +494,7 @@ def build_sds(
# db_name
# username: dagster
# port: 5432
#
#
# Dagster will look for DB credentails through dagster.yaml in
# sds_data_manager/orchestration/dagster.yaml.
# In there, it's going to read DAGSTER_PG_PASSWORD and DAGSTER_PG_HOST.
Expand All @@ -508,6 +509,7 @@ def build_sds(
"imap_data_access_url", "https://api.dev.imap-mission.com"
),
"SSM_API_KEY_PARAMETER": "/imap-sdc/batch-jobs/api-key",
"REPROCESSING_SQS_URL": batch_starter_construct.reprocessing_sqs_url,
}

dagster_ecs_stack = dagster_construct.DagsterEcsStack(
Expand All @@ -519,6 +521,7 @@ def build_sds(
)
dagster_ecs_stack.node.add_dependency(dagster_image_stack)


def build_backup(scope: App, env: Environment, source_account: str):
"""Build backup bucket with permissions for replication from source_account.

Expand Down
2 changes: 1 addition & 1 deletion tests/infrastructure/test_instrument_lambda_construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def template(stack, env, code):

def test_batch_starter_resources(template):
"""Ensure the template has appropriate IAM roles."""
template.resource_count_is("AWS::IAM::Role", 7)
template.resource_count_is("AWS::IAM::Role", 9)
# 2 for RDS stack + 1 for BatchStarterLambda + 1 for API Gateway + 1 for
# bucket notifications
template.resource_count_is("AWS::Lambda::Function", 5)
Expand Down
Loading
Loading