From 18c55366bee1af5045b50089b32244e1ea745f6c Mon Sep 17 00:00:00 2001 From: Ryan Haarmann Date: Thu, 4 May 2017 11:26:32 -0500 Subject: [PATCH 1/7] Addition of Amazon ECS job, copied code from luigi.contrib.ecs.py and modified for ndscheduler JobBase. --- simple_scheduler/jobs/ecs_job.py | 119 +++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 simple_scheduler/jobs/ecs_job.py diff --git a/simple_scheduler/jobs/ecs_job.py b/simple_scheduler/jobs/ecs_job.py new file mode 100644 index 00000000..ec246348 --- /dev/null +++ b/simple_scheduler/jobs/ecs_job.py @@ -0,0 +1,119 @@ +"""A sample job that prints string.""" + +import time +import logging +from ndscheduler import job + +logger = logging.getLogger(__name__) + +try: + import boto3 + + client = boto3.client('ecs') +except ImportError: + logger.warning('boto3 is not installed. ECSTasks require boto3') + +POLL_TIME = 2 + + +class ECSJob(job.JobBase): + @classmethod + def meta_info(cls): + return { + 'job_class_string': '%s.%s' % (cls.__module__, cls.__name__), + 'notes': 'This will execute a AWS ECS RunTask!', + 'arguments': [ + {'type': 'string', 'description': 'ECS Cluster to run on'}, + {'type': 'string', 'description': 'task_def_arn'}, + {'type': 'array[dict]', 'description': 'task_def'}, + {'type': 'string', 'description': 'Directly corresponds to the `overrides` parameter of runTask API'} + ], + 'example_arguments': '["ClusterName", None, "arn:aws:ecs:::task-definition/:", None]' + } + + def _get_task_statuses(self, task_ids): + """ + Retrieve task statuses from ECS API + + Returns list of {RUNNING|PENDING|STOPPED} for each id in task_ids + """ + logger.debug('Get status of task_ids: {}'.format(task_ids)) + response = client.describe_tasks(tasks=task_ids, cluster=self.cluster) + + # Error checking + if response['failures']: + raise Exception('There were some failures:\n{0}'.format( + response['failures'])) + status_code = response['ResponseMetadata']['HTTPStatusCode'] + if status_code != 200: + msg = 'Task status request received status code {0}:\n{1}' + raise Exception(msg.format(status_code, response)) + + return [t['lastStatus'] for t in response['tasks']] + + def _track_tasks(self, task_ids): + """Poll task status until STOPPED""" + while True: + statuses = self._get_task_statuses(task_ids) + if all([status == 'STOPPED' for status in statuses]): + logger.info('ECS tasks {0} STOPPED'.format(','.join(task_ids))) + break + time.sleep(POLL_TIME) + logger.debug('ECS task status for tasks {0}: {1}'.format( + ','.join(task_ids), statuses)) + + @property + def cluster(self): + if not hasattr(self, '_cluster'): + logger.warning('Cluster not set!') + return None + return self._cluster + + @cluster.setter + def cluster(self, cluster): + self._cluster = cluster + logger.debug('Set Cluster: {}'.format(cluster)) + + def run(self, cluster, task_def_arn=None, task_def=None, command=None): + self.cluster = cluster + if (not task_def and not task_def_arn) or \ + (task_def and task_def_arn): + raise ValueError(('Either (but not both) a task_def (dict) or' + 'task_def_arn (string) must be assigned')) + if not task_def_arn: + # Register the task and get assigned taskDefinition ID (arn) + response = client.register_task_definition(**task_def) + task_def_arn = response['taskDefinition']['taskDefinitionArn'] + logger.debug('Task Definition ARN: {}'.format(task_def_arn)) + + # Submit the task to AWS ECS and get assigned task ID + # (list containing 1 string) + if command: + overrides = {'containerOverrides': command} + else: + overrides = {} + response = client.run_task(taskDefinition=task_def_arn, + overrides=overrides, cluster=self.cluster) + _task_ids = [task['taskArn'] for task in response['tasks']] + + # Wait on task completion + self._track_tasks(_task_ids) + + +if __name__ == "__main__": + # You can easily test this job here + job = ECSJob.create_test_instance() + job.run('ClusterName', "arn:aws:ecs:::task-definition/:") + job.run('ClusterName', None, { + 'family': 'hello-world', + 'volumes': [], + 'containerDefinitions': [ + { + 'memory': 1, + 'essential': True, + 'name': 'hello-world', + 'image': 'ubuntu', + 'command': ['/bin/echo', 'hello world'] + } + ] + }) From 1ede3cbb2e7802444f561ca8689ea2cfb9ce508f Mon Sep 17 00:00:00 2001 From: Ryan Haarmann Date: Thu, 4 May 2017 11:31:31 -0500 Subject: [PATCH 2/7] Addition of Amazon ECS job, copied code from luigi.contrib.ecs.py and modified for ndscheduler JobBase. --- simple_scheduler/requirements.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/simple_scheduler/requirements.txt b/simple_scheduler/requirements.txt index e4b1e174..89e4c404 100644 --- a/simple_scheduler/requirements.txt +++ b/simple_scheduler/requirements.txt @@ -10,3 +10,7 @@ requests == 2.9.1 # Uncomment this if you want to use MySQL as datastore # # pymysql == 0.6.7 + +# Uncomment this if you want to use ECS Job +# +boto3 From 150ff2e512ec9c5c3e50c3c3fe8acf56263d9b92 Mon Sep 17 00:00:00 2001 From: Ryan Haarmann Date: Thu, 4 May 2017 11:40:11 -0500 Subject: [PATCH 3/7] Addition of Amazon ECS job, copied code from luigi.contrib.ecs.py and modified for ndscheduler JobBase. Fixed the 100 character limit --- simple_scheduler/jobs/ecs_job.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/simple_scheduler/jobs/ecs_job.py b/simple_scheduler/jobs/ecs_job.py index ec246348..d2b03021 100644 --- a/simple_scheduler/jobs/ecs_job.py +++ b/simple_scheduler/jobs/ecs_job.py @@ -26,9 +26,11 @@ def meta_info(cls): {'type': 'string', 'description': 'ECS Cluster to run on'}, {'type': 'string', 'description': 'task_def_arn'}, {'type': 'array[dict]', 'description': 'task_def'}, - {'type': 'string', 'description': 'Directly corresponds to the `overrides` parameter of runTask API'} + {'type': 'string', 'description': 'Directly corresponds to the ' + '`overrides` parameter of runTask API'} ], - 'example_arguments': '["ClusterName", None, "arn:aws:ecs:::task-definition/:", None]' + 'example_arguments': '["ClusterName", None, "arn:aws:ecs:' + '::task-definition/:", None]' } def _get_task_statuses(self, task_ids): @@ -103,8 +105,9 @@ def run(self, cluster, task_def_arn=None, task_def=None, command=None): if __name__ == "__main__": # You can easily test this job here job = ECSJob.create_test_instance() - job.run('ClusterName', "arn:aws:ecs:::task-definition/:") - job.run('ClusterName', None, { + job.run('ClusterName', "arn:aws:ecs:::task-" + "definition/:") + job.run('DataETLCluster', None, { 'family': 'hello-world', 'volumes': [], 'containerDefinitions': [ From bdd50e30241f47e243ec06a9a7bbe4bbfceef48a Mon Sep 17 00:00:00 2001 From: Ryan Haarmann Date: Thu, 4 May 2017 11:41:21 -0500 Subject: [PATCH 4/7] Addition of Amazon ECS job, copied code from luigi.contrib.ecs.py and modified for ndscheduler JobBase. Fixed the 100 character limit blank whitespace --- simple_scheduler/jobs/ecs_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simple_scheduler/jobs/ecs_job.py b/simple_scheduler/jobs/ecs_job.py index d2b03021..cc06325a 100644 --- a/simple_scheduler/jobs/ecs_job.py +++ b/simple_scheduler/jobs/ecs_job.py @@ -36,7 +36,7 @@ def meta_info(cls): def _get_task_statuses(self, task_ids): """ Retrieve task statuses from ECS API - + Returns list of {RUNNING|PENDING|STOPPED} for each id in task_ids """ logger.debug('Get status of task_ids: {}'.format(task_ids)) From 2be8ad35ab843a1e69cb589f7c9ea9d2ed71a9ab Mon Sep 17 00:00:00 2001 From: Ryan Haarmann Date: Thu, 11 May 2017 14:11:22 -0500 Subject: [PATCH 5/7] add a 7 day window, will need to eventually build a true time picker --- ndscheduler/static/index.html | 1 + 1 file changed, 1 insertion(+) diff --git a/ndscheduler/static/index.html b/ndscheduler/static/index.html index bc4bd2e2..962e78e8 100644 --- a/ndscheduler/static/index.html +++ b/ndscheduler/static/index.html @@ -111,6 +111,7 @@ +
From 64f9ff9e0cd1d05a043e849fc94a4d16daf2be85 Mon Sep 17 00:00:00 2001 From: Ryan Haarmann Date: Thu, 11 May 2017 14:22:53 -0500 Subject: [PATCH 6/7] add a 7 day window, will need to eventually build a true time picker --- ndscheduler/static/index.html | 1 + 1 file changed, 1 insertion(+) diff --git a/ndscheduler/static/index.html b/ndscheduler/static/index.html index 962e78e8..ee6de7be 100644 --- a/ndscheduler/static/index.html +++ b/ndscheduler/static/index.html @@ -83,6 +83,7 @@ +
From a48ff8e5c739f7fe9420018ddc87b6ed3fe5e816 Mon Sep 17 00:00:00 2001 From: Ryan Haarmann Date: Tue, 16 May 2017 13:14:31 -0500 Subject: [PATCH 7/7] update for ecs_job better error handling --- simple_scheduler/jobs/ecs_job.py | 46 +++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/simple_scheduler/jobs/ecs_job.py b/simple_scheduler/jobs/ecs_job.py index cc06325a..db3d17b6 100644 --- a/simple_scheduler/jobs/ecs_job.py +++ b/simple_scheduler/jobs/ecs_job.py @@ -2,21 +2,27 @@ import time import logging +import boto3 from ndscheduler import job logger = logging.getLogger(__name__) -try: - import boto3 - - client = boto3.client('ecs') -except ImportError: - logger.warning('boto3 is not installed. ECSTasks require boto3') +client = boto3.client('ecs') POLL_TIME = 2 +class ECSFailureException(BaseException): + pass + + +class ECSResponseException(BaseException): + pass + + class ECSJob(job.JobBase): + retry_count = 3 + @classmethod def meta_info(cls): return { @@ -44,25 +50,39 @@ def _get_task_statuses(self, task_ids): # Error checking if response['failures']: - raise Exception('There were some failures:\n{0}'.format( + raise ECSFailureException('There were some failures:\n{0}'.format( response['failures'])) status_code = response['ResponseMetadata']['HTTPStatusCode'] if status_code != 200: msg = 'Task status request received status code {0}:\n{1}' - raise Exception(msg.format(status_code, response)) + raise ECSResponseException(msg.format(status_code, response)) return [t['lastStatus'] for t in response['tasks']] + def get_task_statuses(self, task_ids): + retries = 0 + while True: + try: + return self._get_task_statuses(task_ids) + except ECSResponseException as e: + if retries <= self.retry_count: + msg = 'Response failed retry attempt {}/{}'.format(retries, self.retry_count) + logger.warning(msg) + time.sleep(POLL_TIME) + else: + raise + def _track_tasks(self, task_ids): """Poll task status until STOPPED""" while True: - statuses = self._get_task_statuses(task_ids) + statuses = self.get_task_statuses(task_ids) + if all([status == 'STOPPED' for status in statuses]): logger.info('ECS tasks {0} STOPPED'.format(','.join(task_ids))) break - time.sleep(POLL_TIME) - logger.debug('ECS task status for tasks {0}: {1}'.format( - ','.join(task_ids), statuses)) + time.sleep(POLL_TIME) + logger.debug('ECS task status for tasks {0}: {1}'.format( + ','.join(task_ids), statuses)) @property def cluster(self): @@ -76,7 +96,7 @@ def cluster(self, cluster): self._cluster = cluster logger.debug('Set Cluster: {}'.format(cluster)) - def run(self, cluster, task_def_arn=None, task_def=None, command=None): + def run(self, cluster, task_def_arn=None, task_def=None, command=None, *args, **kwargs): self.cluster = cluster if (not task_def and not task_def_arn) or \ (task_def and task_def_arn):