From c8411fb8873aa79d5ef092945dde8af916a2503d Mon Sep 17 00:00:00 2001 From: Matthew Wardrop Date: Wed, 20 May 2015 11:06:39 +1000 Subject: [PATCH 1/2] Fix class loading and better compartmentalise computations. --- py2/dispy.py | 1 + py2/dispy/__init__.py | 1 + py2/dispynode.py | 27 ++++++++++++++------------- py3/dispy.py | 1 + py3/dispy/__init__.py | 1 + py3/dispynode.py | 27 ++++++++++++++------------- 6 files changed, 32 insertions(+), 26 deletions(-) diff --git a/py2/dispy.py b/py2/dispy.py index dfad7ce..43b3405 100755 --- a/py2/dispy.py +++ b/py2/dispy.py @@ -281,6 +281,7 @@ def __init__(self, compute_type, name): self.name = name self.id = None self.code = '' + self.globals = {} self.dest_path = None self.xfer_files = [] self.reentrant = False diff --git a/py2/dispy/__init__.py b/py2/dispy/__init__.py index dfad7ce..43b3405 100755 --- a/py2/dispy/__init__.py +++ b/py2/dispy/__init__.py @@ -281,6 +281,7 @@ def __init__(self, compute_type, name): self.name = name self.id = None self.code = '' + self.globals = {} self.dest_path = None self.xfer_files = [] self.reentrant = False diff --git a/py2/dispynode.py b/py2/dispynode.py index 9358c11..7df8b3c 100755 --- a/py2/dispynode.py +++ b/py2/dispynode.py @@ -184,7 +184,7 @@ def _same_file(tgt, xf): def _dispy_job_func(__dispy_job_info, __dispy_job_certfile, __dispy_job_keyfile, __dispy_job_name, __dispy_job_args, __dispy_job_kwargs, - __dispy_job_code, __dispy_path, __dispy_reply_Q): + __dispy_job_code, __dispy_job_globals, __dispy_path, __dispy_reply_Q): """Internal use only. """ os.chdir(__dispy_path) @@ -193,14 +193,13 @@ def _dispy_job_func(__dispy_job_info, __dispy_job_certfile, __dispy_job_keyfile, __dispy_job_reply = __dispy_job_info.job_reply try: - exec(marshal.loads(__dispy_job_code[0])) + exec(marshal.loads(__dispy_job_code[0])) in __dispy_job_globals if __dispy_job_code[1]: - exec(__dispy_job_code[1]) - globals().update(locals()) + exec(__dispy_job_code[1]) in __dispy_job_globals __dispy_job_args = unserialize(__dispy_job_args) __dispy_job_kwargs = unserialize(__dispy_job_kwargs) - __func = globals()[__dispy_job_name] - __dispy_job_reply.result = __func(*__dispy_job_args, **__dispy_job_kwargs) + __dispy_job_globals.update(locals()) + exec("__dispy_job_reply.result = %s(*__dispy_job_args, **__dispy_job_kwargs)" % __dispy_job_name) in __dispy_job_globals __dispy_job_reply.status = DispyJob.Finished except: __dispy_job_reply.exception = traceback.format_exc() @@ -507,7 +506,7 @@ def job_request_task(msg): args = (job_info, self.certfile, self.keyfile, compute.name, _job.args, _job.kwargs, - (compute.code, _job.code), compute.dest_path, self.reply_Q) + (compute.code, _job.code), compute.globals.copy(), compute.dest_path, self.reply_Q) try: yield conn.send_msg('ACK') except: @@ -731,9 +730,10 @@ def setup_computation(msg): compute = self.computations[compute_id] assert isinstance(compute.setup, _Function) os.chdir(compute.dest_path) - exec(marshal.loads(compute.code)) in globals(), locals() - _dispy_setup_func = locals()[compute.setup.name] - assert _dispy_setup_func(*compute.setup.args, **compute.setup.kwargs) == 0 + exec(marshal.loads(compute.code)) in compute.globals + compute.globals['__compute_setup_args'] = compute.setup.args + compute.globals['__compute_setup_kwargs'] = compute.setup.kwargs + exec("assert %s(*__compute_setup_args,**__compute_setup_kwargs) == 0" % compute.setup.name) in compute.globals except: logger.debug('Setup "%s" failed' % compute.setup.name) resp = traceback.format_exc().encode() @@ -1259,9 +1259,10 @@ def cleanup_computation(self, compute): os.chdir(self.dest_path_prefix) if isinstance(compute.cleanup, _Function): try: - exec(marshal.loads(compute.code)) in globals(), locals() - _dispy_cleanup_func = locals()[compute.cleanup.name] - _dispy_cleanup_func(*compute.cleanup.args, **compute.cleanup.kwargs) + exec(marshal.loads(compute.code)) in compute.globals + compute.globals['__compute_cleanup_args'] = compute.cleanup.args + compute.globals['__compute_cleanup_kwargs'] = compute.cleanup.kwargs + exec("%s(*__compute_cleanup_args,**__compute_cleanup_kwargs)" % compute.cleanup.name) in compute.globals except: logger.debug('Cleanup "%s" failed' % compute.cleanup.name) logger.debug(traceback.format_exc()) diff --git a/py3/dispy.py b/py3/dispy.py index 3d5e4c8..bcf1ba5 100755 --- a/py3/dispy.py +++ b/py3/dispy.py @@ -281,6 +281,7 @@ def __init__(self, compute_type, name): self.name = name self.id = None self.code = '' + self.globals = {} self.dest_path = None self.xfer_files = [] self.reentrant = False diff --git a/py3/dispy/__init__.py b/py3/dispy/__init__.py index 3d5e4c8..bcf1ba5 100755 --- a/py3/dispy/__init__.py +++ b/py3/dispy/__init__.py @@ -281,6 +281,7 @@ def __init__(self, compute_type, name): self.name = name self.id = None self.code = '' + self.globals = {} self.dest_path = None self.xfer_files = [] self.reentrant = False diff --git a/py3/dispynode.py b/py3/dispynode.py index 7eb1f5b..e677deb 100755 --- a/py3/dispynode.py +++ b/py3/dispynode.py @@ -184,7 +184,7 @@ def _same_file(tgt, xf): def _dispy_job_func(__dispy_job_info, __dispy_job_certfile, __dispy_job_keyfile, __dispy_job_name, __dispy_job_args, __dispy_job_kwargs, - __dispy_job_code, __dispy_path, __dispy_reply_Q): + __dispy_job_code, __dispy_job_globals, __dispy_path, __dispy_reply_Q): """Internal use only. """ os.chdir(__dispy_path) @@ -192,14 +192,13 @@ def _dispy_job_func(__dispy_job_info, __dispy_job_certfile, __dispy_job_keyfile, sys.stderr = io.StringIO() __dispy_job_reply = __dispy_job_info.job_reply try: - exec(marshal.loads(__dispy_job_code[0])) + exec(marshal.loads(__dispy_job_code[0])) in __dispy_job_globals if __dispy_job_code[1]: - exec(__dispy_job_code[1]) - globals().update(locals()) + exec(__dispy_job_code[1]) in __dispy_job_globals __dispy_job_args = unserialize(__dispy_job_args) __dispy_job_kwargs = unserialize(__dispy_job_kwargs) - __func = globals()[__dispy_job_name] - __dispy_job_reply.result = __func(*__dispy_job_args, **__dispy_job_kwargs) + __dispy_job_globals.update(locals()) + exec("__dispy_job_reply.result = %s(*__dispy_job_args, **__dispy_job_kwargs)" % __dispy_job_name) in __dispy_job_globals __dispy_job_reply.status = DispyJob.Finished except: __dispy_job_reply.exception = traceback.format_exc() @@ -506,7 +505,7 @@ def job_request_task(msg): job_info = _DispyJobInfo(reply, reply_addr, compute, _job.xfer_files) args = (job_info, self.certfile, self.keyfile, compute.name, _job.args, _job.kwargs, - (compute.code, _job.code), compute.dest_path, self.reply_Q) + (compute.code, _job.code), compute.globals.copy(), compute.dest_path, self.reply_Q) try: yield conn.send_msg(b'ACK') except: @@ -731,9 +730,10 @@ def setup_computation(msg): compute = self.computations[compute_id] assert isinstance(compute.setup, _Function) os.chdir(compute.dest_path) - exec(marshal.loads(compute.code)) in globals(), locals() - _dispy_setup_func = locals()[compute.setup.name] - assert _dispy_setup_func(*compute.setup.args, **compute.setup.kwargs) == 0 + exec(marshal.loads(compute.code)) in compute.globals + compute.globals['__compute_setup_args'] = compute.setup.args + compute.globals['__compute_setup_kwargs'] = compute.setup.kwargs + exec("assert %s(*__compute_setup_args,**__compute_setup_kwargs) == 0" % compute.setup.name) in compute.globals except: logger.debug('Setup "%s" failed' % compute.setup.name) resp = traceback.format_exc().encode() @@ -1259,9 +1259,10 @@ def cleanup_computation(self, compute): os.chdir(self.dest_path_prefix) if isinstance(compute.cleanup, _Function): try: - exec(marshal.loads(compute.code)) in globals(), locals() - _dispy_cleanup_func = locals()[compute.cleanup.name] - _dispy_cleanup_func(*compute.cleanup.args, **compute.cleanup.kwargs) + exec(marshal.loads(compute.code)) in compute.globals + compute.globals['__compute_cleanup_args'] = compute.cleanup.args + compute.globals['__compute_cleanup_kwargs'] = compute.cleanup.kwargs + exec("%s(*__compute_cleanup_args,**__compute_cleanup_kwargs)" % compute.cleanup.name) in compute.globals except: logger.debug('Cleanup "%s" failed' % compute.cleanup.name) logger.debug(traceback.format_exc()) From 0f27cbccf6059f1a49d253c9eda885e2a2908f01 Mon Sep 17 00:00:00 2001 From: Matthew Wardrop Date: Sat, 23 May 2015 21:32:50 +1000 Subject: [PATCH 2/2] Expose _Compute.globals to the client in JobCluster and SharedJobCluster; allowing the user to pass local variables to the namespace of the job running on the server. --- py2/dispy.py | 13 ++++++++++--- py2/dispy/__init__.py | 13 ++++++++++--- py3/dispy.py | 13 ++++++++++--- py3/dispy/__init__.py | 13 ++++++++++--- 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/py2/dispy.py b/py2/dispy.py index 43b3405..b14d4b4 100755 --- a/py2/dispy.py +++ b/py2/dispy.py @@ -1681,7 +1681,7 @@ class JobCluster(object): """Create an instance of cluster for a specific computation. """ - def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None, + def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None, ip_addr=None, port=None, node_port=None, ext_ip_addr=None, dest_path=None, loglevel=logging.INFO, setup=None, cleanup=True, ping_interval=None, pulse_interval=None, poll_interval=None, @@ -1707,6 +1707,12 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s then the code for that object is transferred to the node executing a job for this cluster. + @globals is a dictionary. Each key should correspond to a valid + variable name that will be made available in the namespace of + `setup`, `compute` and `cleanup`. The value associated with each + key should be a pickleable python object. If the class of the object + is not installed on the server, it should be added to `depends`. + @callback is a function or class method. When a job's results become available, dispy will call provided callback function/method with that job as the argument. If a job @@ -1997,6 +2003,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s # make sure code can be compiled code = compile(compute.code, '', 'exec') del code + compute.globals = globals if dest_path: if not isinstance(dest_path, str): raise Exception('Invalid dest_path: it must be a string') @@ -2180,7 +2187,7 @@ class SharedJobCluster(JobCluster): dispyscheduler must be called with appropriate pulse_interval. The behaviour is same as for JobCluster. """ - def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None, + def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None, ip_addr=None, port=None, scheduler_node=None, scheduler_port=None, ext_ip_addr=None, loglevel=logging.INFO, setup=None, cleanup=True, dest_path=None, poll_interval=None, reentrant=False, secret='', @@ -2203,7 +2210,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s if not node_allocs: raise Exception('"nodes" argument is invalid') - JobCluster.__init__(self, computation, depends=depends, + JobCluster.__init__(self, computation, depends=depends, globals=globals, callback=callback, cluster_status=cluster_status, ip_addr=ip_addr, port=port, ext_ip_addr=ext_ip_addr, loglevel=loglevel, setup=setup, cleanup=cleanup, dest_path=dest_path, diff --git a/py2/dispy/__init__.py b/py2/dispy/__init__.py index 43b3405..b14d4b4 100755 --- a/py2/dispy/__init__.py +++ b/py2/dispy/__init__.py @@ -1681,7 +1681,7 @@ class JobCluster(object): """Create an instance of cluster for a specific computation. """ - def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None, + def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None, ip_addr=None, port=None, node_port=None, ext_ip_addr=None, dest_path=None, loglevel=logging.INFO, setup=None, cleanup=True, ping_interval=None, pulse_interval=None, poll_interval=None, @@ -1707,6 +1707,12 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s then the code for that object is transferred to the node executing a job for this cluster. + @globals is a dictionary. Each key should correspond to a valid + variable name that will be made available in the namespace of + `setup`, `compute` and `cleanup`. The value associated with each + key should be a pickleable python object. If the class of the object + is not installed on the server, it should be added to `depends`. + @callback is a function or class method. When a job's results become available, dispy will call provided callback function/method with that job as the argument. If a job @@ -1997,6 +2003,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s # make sure code can be compiled code = compile(compute.code, '', 'exec') del code + compute.globals = globals if dest_path: if not isinstance(dest_path, str): raise Exception('Invalid dest_path: it must be a string') @@ -2180,7 +2187,7 @@ class SharedJobCluster(JobCluster): dispyscheduler must be called with appropriate pulse_interval. The behaviour is same as for JobCluster. """ - def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None, + def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None, ip_addr=None, port=None, scheduler_node=None, scheduler_port=None, ext_ip_addr=None, loglevel=logging.INFO, setup=None, cleanup=True, dest_path=None, poll_interval=None, reentrant=False, secret='', @@ -2203,7 +2210,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s if not node_allocs: raise Exception('"nodes" argument is invalid') - JobCluster.__init__(self, computation, depends=depends, + JobCluster.__init__(self, computation, depends=depends, globals=globals, callback=callback, cluster_status=cluster_status, ip_addr=ip_addr, port=port, ext_ip_addr=ext_ip_addr, loglevel=loglevel, setup=setup, cleanup=cleanup, dest_path=dest_path, diff --git a/py3/dispy.py b/py3/dispy.py index bcf1ba5..a679dc2 100755 --- a/py3/dispy.py +++ b/py3/dispy.py @@ -1679,7 +1679,7 @@ class JobCluster(object): """Create an instance of cluster for a specific computation. """ - def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None, + def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None, ip_addr=None, port=None, node_port=None, ext_ip_addr=None, dest_path=None, loglevel=logging.INFO, setup=None, cleanup=True, ping_interval=None, pulse_interval=None, poll_interval=None, @@ -1705,6 +1705,12 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s then the code for that object is transferred to the node executing a job for this cluster. + @globals is a dictionary. Each key should correspond to a valid + variable name that will be made available in the namespace of + `setup`, `compute` and `cleanup`. The value associated with each + key should be a pickleable python object. If the class of the object + is not installed on the server, it should be added to `depends`. + @callback is a function or class method. When a job's results become available, dispy will call provided callback function/method with that job as the argument. If a job @@ -1995,6 +2001,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s # make sure code can be compiled code = compile(compute.code, '', 'exec') del code + compute.globals = globals if dest_path: if not isinstance(dest_path, str): raise Exception('Invalid dest_path: it must be a string') @@ -2178,7 +2185,7 @@ class SharedJobCluster(JobCluster): dispyscheduler must be called with appropriate pulse_interval. The behaviour is same as for JobCluster. """ - def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None, + def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None, ip_addr=None, port=None, scheduler_node=None, scheduler_port=None, ext_ip_addr=None, loglevel=logging.INFO, setup=None, cleanup=True, dest_path=None, poll_interval=None, reentrant=False, secret='', @@ -2201,7 +2208,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s if not node_allocs: raise Exception('"nodes" argument is invalid') - JobCluster.__init__(self, computation, depends=depends, + JobCluster.__init__(self, computation, depends=depends, globals=globals, callback=callback, cluster_status=cluster_status, ip_addr=ip_addr, port=port, ext_ip_addr=ext_ip_addr, loglevel=loglevel, setup=setup, cleanup=cleanup, dest_path=dest_path, diff --git a/py3/dispy/__init__.py b/py3/dispy/__init__.py index bcf1ba5..a679dc2 100755 --- a/py3/dispy/__init__.py +++ b/py3/dispy/__init__.py @@ -1679,7 +1679,7 @@ class JobCluster(object): """Create an instance of cluster for a specific computation. """ - def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None, + def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None, ip_addr=None, port=None, node_port=None, ext_ip_addr=None, dest_path=None, loglevel=logging.INFO, setup=None, cleanup=True, ping_interval=None, pulse_interval=None, poll_interval=None, @@ -1705,6 +1705,12 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s then the code for that object is transferred to the node executing a job for this cluster. + @globals is a dictionary. Each key should correspond to a valid + variable name that will be made available in the namespace of + `setup`, `compute` and `cleanup`. The value associated with each + key should be a pickleable python object. If the class of the object + is not installed on the server, it should be added to `depends`. + @callback is a function or class method. When a job's results become available, dispy will call provided callback function/method with that job as the argument. If a job @@ -1995,6 +2001,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s # make sure code can be compiled code = compile(compute.code, '', 'exec') del code + compute.globals = globals if dest_path: if not isinstance(dest_path, str): raise Exception('Invalid dest_path: it must be a string') @@ -2178,7 +2185,7 @@ class SharedJobCluster(JobCluster): dispyscheduler must be called with appropriate pulse_interval. The behaviour is same as for JobCluster. """ - def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None, + def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None, ip_addr=None, port=None, scheduler_node=None, scheduler_port=None, ext_ip_addr=None, loglevel=logging.INFO, setup=None, cleanup=True, dest_path=None, poll_interval=None, reentrant=False, secret='', @@ -2201,7 +2208,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s if not node_allocs: raise Exception('"nodes" argument is invalid') - JobCluster.__init__(self, computation, depends=depends, + JobCluster.__init__(self, computation, depends=depends, globals=globals, callback=callback, cluster_status=cluster_status, ip_addr=ip_addr, port=port, ext_ip_addr=ext_ip_addr, loglevel=loglevel, setup=setup, cleanup=cleanup, dest_path=dest_path,