Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions py2/dispy.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ def __init__(self, compute_type, name):
self.name = name
self.id = None
self.code = ''
self.globals = {}
self.dest_path = None
self.xfer_files = []
self.reentrant = False
Expand Down Expand Up @@ -1680,7 +1681,7 @@ class JobCluster(object):
"""Create an instance of cluster for a specific computation.
"""

def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None,
def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None,
ip_addr=None, port=None, node_port=None, ext_ip_addr=None,
dest_path=None, loglevel=logging.INFO, setup=None, cleanup=True,
ping_interval=None, pulse_interval=None, poll_interval=None,
Expand All @@ -1706,6 +1707,12 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s
then the code for that object is transferred to the node executing
a job for this cluster.

@globals is a dictionary. Each key should correspond to a valid
variable name that will be made available in the namespace of
`setup`, `compute` and `cleanup`. The value associated with each
key should be a pickleable python object. If the class of the object
is not installed on the server, it should be added to `depends`.

@callback is a function or class method. When a job's results
become available, dispy will call provided callback
function/method with that job as the argument. If a job
Expand Down Expand Up @@ -1996,6 +2003,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s
# make sure code can be compiled
code = compile(compute.code, '<string>', 'exec')
del code
compute.globals = globals
if dest_path:
if not isinstance(dest_path, str):
raise Exception('Invalid dest_path: it must be a string')
Expand Down Expand Up @@ -2179,7 +2187,7 @@ class SharedJobCluster(JobCluster):
dispyscheduler must be called with appropriate pulse_interval.
The behaviour is same as for JobCluster.
"""
def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None,
def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None,
ip_addr=None, port=None, scheduler_node=None, scheduler_port=None,
ext_ip_addr=None, loglevel=logging.INFO, setup=None, cleanup=True, dest_path=None,
poll_interval=None, reentrant=False, secret='',
Expand All @@ -2202,7 +2210,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s
if not node_allocs:
raise Exception('"nodes" argument is invalid')

JobCluster.__init__(self, computation, depends=depends,
JobCluster.__init__(self, computation, depends=depends, globals=globals,
callback=callback, cluster_status=cluster_status,
ip_addr=ip_addr, port=port, ext_ip_addr=ext_ip_addr,
loglevel=loglevel, setup=setup, cleanup=cleanup, dest_path=dest_path,
Expand Down
14 changes: 11 additions & 3 deletions py2/dispy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ def __init__(self, compute_type, name):
self.name = name
self.id = None
self.code = ''
self.globals = {}
self.dest_path = None
self.xfer_files = []
self.reentrant = False
Expand Down Expand Up @@ -1680,7 +1681,7 @@ class JobCluster(object):
"""Create an instance of cluster for a specific computation.
"""

def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None,
def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None,
ip_addr=None, port=None, node_port=None, ext_ip_addr=None,
dest_path=None, loglevel=logging.INFO, setup=None, cleanup=True,
ping_interval=None, pulse_interval=None, poll_interval=None,
Expand All @@ -1706,6 +1707,12 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s
then the code for that object is transferred to the node executing
a job for this cluster.

@globals is a dictionary. Each key should correspond to a valid
variable name that will be made available in the namespace of
`setup`, `compute` and `cleanup`. The value associated with each
key should be a pickleable python object. If the class of the object
is not installed on the server, it should be added to `depends`.

@callback is a function or class method. When a job's results
become available, dispy will call provided callback
function/method with that job as the argument. If a job
Expand Down Expand Up @@ -1996,6 +2003,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s
# make sure code can be compiled
code = compile(compute.code, '<string>', 'exec')
del code
compute.globals = globals
if dest_path:
if not isinstance(dest_path, str):
raise Exception('Invalid dest_path: it must be a string')
Expand Down Expand Up @@ -2179,7 +2187,7 @@ class SharedJobCluster(JobCluster):
dispyscheduler must be called with appropriate pulse_interval.
The behaviour is same as for JobCluster.
"""
def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None,
def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None,
ip_addr=None, port=None, scheduler_node=None, scheduler_port=None,
ext_ip_addr=None, loglevel=logging.INFO, setup=None, cleanup=True, dest_path=None,
poll_interval=None, reentrant=False, secret='',
Expand All @@ -2202,7 +2210,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s
if not node_allocs:
raise Exception('"nodes" argument is invalid')

JobCluster.__init__(self, computation, depends=depends,
JobCluster.__init__(self, computation, depends=depends, globals=globals,
callback=callback, cluster_status=cluster_status,
ip_addr=ip_addr, port=port, ext_ip_addr=ext_ip_addr,
loglevel=loglevel, setup=setup, cleanup=cleanup, dest_path=dest_path,
Expand Down
27 changes: 14 additions & 13 deletions py2/dispynode.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def _same_file(tgt, xf):

def _dispy_job_func(__dispy_job_info, __dispy_job_certfile, __dispy_job_keyfile,
__dispy_job_name, __dispy_job_args, __dispy_job_kwargs,
__dispy_job_code, __dispy_path, __dispy_reply_Q):
__dispy_job_code, __dispy_job_globals, __dispy_path, __dispy_reply_Q):
"""Internal use only.
"""
os.chdir(__dispy_path)
Expand All @@ -193,14 +193,13 @@ def _dispy_job_func(__dispy_job_info, __dispy_job_certfile, __dispy_job_keyfile,
__dispy_job_reply = __dispy_job_info.job_reply

try:
exec(marshal.loads(__dispy_job_code[0]))
exec(marshal.loads(__dispy_job_code[0])) in __dispy_job_globals
if __dispy_job_code[1]:
exec(__dispy_job_code[1])
globals().update(locals())
exec(__dispy_job_code[1]) in __dispy_job_globals
__dispy_job_args = unserialize(__dispy_job_args)
__dispy_job_kwargs = unserialize(__dispy_job_kwargs)
__func = globals()[__dispy_job_name]
__dispy_job_reply.result = __func(*__dispy_job_args, **__dispy_job_kwargs)
__dispy_job_globals.update(locals())
exec("__dispy_job_reply.result = %s(*__dispy_job_args, **__dispy_job_kwargs)" % __dispy_job_name) in __dispy_job_globals
__dispy_job_reply.status = DispyJob.Finished
except:
__dispy_job_reply.exception = traceback.format_exc()
Expand Down Expand Up @@ -507,7 +506,7 @@ def job_request_task(msg):

args = (job_info, self.certfile, self.keyfile,
compute.name, _job.args, _job.kwargs,
(compute.code, _job.code), compute.dest_path, self.reply_Q)
(compute.code, _job.code), compute.globals.copy(), compute.dest_path, self.reply_Q)
try:
yield conn.send_msg('ACK')
except:
Expand Down Expand Up @@ -731,9 +730,10 @@ def setup_computation(msg):
compute = self.computations[compute_id]
assert isinstance(compute.setup, _Function)
os.chdir(compute.dest_path)
exec(marshal.loads(compute.code)) in globals(), locals()
_dispy_setup_func = locals()[compute.setup.name]
assert _dispy_setup_func(*compute.setup.args, **compute.setup.kwargs) == 0
exec(marshal.loads(compute.code)) in compute.globals
compute.globals['__compute_setup_args'] = compute.setup.args
compute.globals['__compute_setup_kwargs'] = compute.setup.kwargs
exec("assert %s(*__compute_setup_args,**__compute_setup_kwargs) == 0" % compute.setup.name) in compute.globals
except:
logger.debug('Setup "%s" failed' % compute.setup.name)
resp = traceback.format_exc().encode()
Expand Down Expand Up @@ -1259,9 +1259,10 @@ def cleanup_computation(self, compute):
os.chdir(self.dest_path_prefix)
if isinstance(compute.cleanup, _Function):
try:
exec(marshal.loads(compute.code)) in globals(), locals()
_dispy_cleanup_func = locals()[compute.cleanup.name]
_dispy_cleanup_func(*compute.cleanup.args, **compute.cleanup.kwargs)
exec(marshal.loads(compute.code)) in compute.globals
compute.globals['__compute_cleanup_args'] = compute.cleanup.args
compute.globals['__compute_cleanup_kwargs'] = compute.cleanup.kwargs
exec("%s(*__compute_cleanup_args,**__compute_cleanup_kwargs)" % compute.cleanup.name) in compute.globals
except:
logger.debug('Cleanup "%s" failed' % compute.cleanup.name)
logger.debug(traceback.format_exc())
Expand Down
14 changes: 11 additions & 3 deletions py3/dispy.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ def __init__(self, compute_type, name):
self.name = name
self.id = None
self.code = ''
self.globals = {}
self.dest_path = None
self.xfer_files = []
self.reentrant = False
Expand Down Expand Up @@ -1678,7 +1679,7 @@ class JobCluster(object):
"""Create an instance of cluster for a specific computation.
"""

def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None,
def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None,
ip_addr=None, port=None, node_port=None, ext_ip_addr=None,
dest_path=None, loglevel=logging.INFO, setup=None, cleanup=True,
ping_interval=None, pulse_interval=None, poll_interval=None,
Expand All @@ -1704,6 +1705,12 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s
then the code for that object is transferred to the node executing
a job for this cluster.

@globals is a dictionary. Each key should correspond to a valid
variable name that will be made available in the namespace of
`setup`, `compute` and `cleanup`. The value associated with each
key should be a pickleable python object. If the class of the object
is not installed on the server, it should be added to `depends`.

@callback is a function or class method. When a job's results
become available, dispy will call provided callback
function/method with that job as the argument. If a job
Expand Down Expand Up @@ -1994,6 +2001,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s
# make sure code can be compiled
code = compile(compute.code, '<string>', 'exec')
del code
compute.globals = globals
if dest_path:
if not isinstance(dest_path, str):
raise Exception('Invalid dest_path: it must be a string')
Expand Down Expand Up @@ -2177,7 +2185,7 @@ class SharedJobCluster(JobCluster):
dispyscheduler must be called with appropriate pulse_interval.
The behaviour is same as for JobCluster.
"""
def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None,
def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None,
ip_addr=None, port=None, scheduler_node=None, scheduler_port=None,
ext_ip_addr=None, loglevel=logging.INFO, setup=None, cleanup=True, dest_path=None,
poll_interval=None, reentrant=False, secret='',
Expand All @@ -2200,7 +2208,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s
if not node_allocs:
raise Exception('"nodes" argument is invalid')

JobCluster.__init__(self, computation, depends=depends,
JobCluster.__init__(self, computation, depends=depends, globals=globals,
callback=callback, cluster_status=cluster_status,
ip_addr=ip_addr, port=port, ext_ip_addr=ext_ip_addr,
loglevel=loglevel, setup=setup, cleanup=cleanup, dest_path=dest_path,
Expand Down
14 changes: 11 additions & 3 deletions py3/dispy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ def __init__(self, compute_type, name):
self.name = name
self.id = None
self.code = ''
self.globals = {}
self.dest_path = None
self.xfer_files = []
self.reentrant = False
Expand Down Expand Up @@ -1678,7 +1679,7 @@ class JobCluster(object):
"""Create an instance of cluster for a specific computation.
"""

def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None,
def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None,
ip_addr=None, port=None, node_port=None, ext_ip_addr=None,
dest_path=None, loglevel=logging.INFO, setup=None, cleanup=True,
ping_interval=None, pulse_interval=None, poll_interval=None,
Expand All @@ -1704,6 +1705,12 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s
then the code for that object is transferred to the node executing
a job for this cluster.

@globals is a dictionary. Each key should correspond to a valid
variable name that will be made available in the namespace of
`setup`, `compute` and `cleanup`. The value associated with each
key should be a pickleable python object. If the class of the object
is not installed on the server, it should be added to `depends`.

@callback is a function or class method. When a job's results
become available, dispy will call provided callback
function/method with that job as the argument. If a job
Expand Down Expand Up @@ -1994,6 +2001,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s
# make sure code can be compiled
code = compile(compute.code, '<string>', 'exec')
del code
compute.globals = globals
if dest_path:
if not isinstance(dest_path, str):
raise Exception('Invalid dest_path: it must be a string')
Expand Down Expand Up @@ -2177,7 +2185,7 @@ class SharedJobCluster(JobCluster):
dispyscheduler must be called with appropriate pulse_interval.
The behaviour is same as for JobCluster.
"""
def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_status=None,
def __init__(self, computation, nodes=None, depends=[], globals={}, callback=None, cluster_status=None,
ip_addr=None, port=None, scheduler_node=None, scheduler_port=None,
ext_ip_addr=None, loglevel=logging.INFO, setup=None, cleanup=True, dest_path=None,
poll_interval=None, reentrant=False, secret='',
Expand All @@ -2200,7 +2208,7 @@ def __init__(self, computation, nodes=None, depends=[], callback=None, cluster_s
if not node_allocs:
raise Exception('"nodes" argument is invalid')

JobCluster.__init__(self, computation, depends=depends,
JobCluster.__init__(self, computation, depends=depends, globals=globals,
callback=callback, cluster_status=cluster_status,
ip_addr=ip_addr, port=port, ext_ip_addr=ext_ip_addr,
loglevel=loglevel, setup=setup, cleanup=cleanup, dest_path=dest_path,
Expand Down
Loading