diff --git a/bin/radical-stack b/bin/radical-stack index a440bd634..a1c7bf17a 100755 --- a/bin/radical-stack +++ b/bin/radical-stack @@ -6,7 +6,7 @@ import radical.utils as ru namespaces = sys.argv[1:] if not namespaces: - namespaces = ['radical'] + namespaces = ['radical', 'rc'] stack = ru.stack(namespaces) diff --git a/requirements.txt b/requirements.txt index b55e00ed8..4339c51f4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ ntplib pyzmq regex setproctitle +rc.process diff --git a/src/radical/utils/__init__.py b/src/radical/utils/__init__.py index 00ac1910c..8c5f0216b 100644 --- a/src/radical/utils/__init__.py +++ b/src/radical/utils/__init__.py @@ -66,7 +66,7 @@ from .zmq import PubSub, Publisher, Subscriber from .zmq import Server, Client -from .flux import FluxHelper +from .flux import FluxService, FluxHelper from .logger import DEBUG, INFO, WARNING, WARN, ERROR, CRITICAL, OFF from .logger import Logger @@ -75,6 +75,7 @@ from .profile import read_profiles, combine_profiles, clean_profile from .profile import TIME, EVENT, COMP, TID, UID, STATE, MSG, ENTITY from .profile import PROF_KEY_MAX +from .profile import Yappi from .json_io import read_json, read_json_str, write_json from .json_io import parse_json, parse_json_str, dumps_json diff --git a/src/radical/utils/env.py b/src/radical/utils/env.py index 796f1756d..2b7063cd0 100644 --- a/src/radical/utils/env.py +++ b/src/radical/utils/env.py @@ -60,7 +60,8 @@ def env_read(fname: str) -> Dict[str, str]: # ------------------------------------------------------------------------------ # -def env_write(script_path, env, unset=None, blacklist=None, pre_exec=None): +def env_write(script_path, env, unset=None, blacklist=None, pre_exec=None, + extend=False): data = '\n' if unset: @@ -104,7 +105,7 @@ def env_write(script_path, env, unset=None, blacklist=None, pre_exec=None): continue if not re_snake_case.match(k): continue - data += "export %s=%s\n" % (k, _quote(env[k])) + data += "export %s=%s\n" % (k, _quote(env[k], extend=extend)) data += '\n' if funcs: @@ -194,14 +195,15 @@ def env_read_lines(lines: List[str]) -> Dict[str, str]: # ------------------------------------------------------------------------------ # -def _quote(data: str) -> str: +def _quote(data: str, extend: bool = False) -> str: if "'" in data or '$' in data or '`' in data: # cannot use single quote, so use double quote and escale all other # double quotes in the data # NOTE: we only support these three types of shell directives - data = data.replace('"', '\\"') \ - .replace('$', '\\$') + data = data.replace('"', '\\"') + if not extend: + data = data.replace('$', '\\$') data = '"' + data + '"' else: @@ -407,7 +409,7 @@ def env_prep(environment : Optional[Dict[str,str]] = None, _, tmp_name = tempfile.mkstemp(prefix=prefix, dir=tgt) env_write(tmp_name, env=environment, unset=unset, blacklist=blacklist, - pre_exec=pre_exec_cached) + pre_exec=pre_exec_cached, extend=False) cmd = '/bin/bash -c ". %s && /usr/bin/env"' % tmp_name out, err, ret = sh_callout(cmd) @@ -430,7 +432,7 @@ def env_prep(environment : Optional[Dict[str,str]] = None, # FIXME: files could also be cached and re-used (copied or linked) if script_path: env_write(script_path, env=env, unset=unset, blacklist=blacklist, - pre_exec=pre_exec) + pre_exec=pre_exec, extend=True) return env @@ -544,7 +546,12 @@ def __exit__(self, exc_type: Optional[Exception], exc_tb : Optional[Any] ) -> None: - if exc_type and self._child: + if self._parent: + while self._data is None: + try : self._data = self._q.get(timeout=1) + except queue.Empty: pass + + elif exc_type and self._child: stacktrace = ' '.join(traceback.format_exception( exc_type, exc_val, exc_tb)) self._q.put([None, exc_type, exc_val, stacktrace]) @@ -553,41 +560,34 @@ def __exit__(self, exc_type: Optional[Exception], os._exit(0) - if self._parent: - - while True: - try: - self._data = self._q.get(timeout=1) - break - except queue.Empty: - self._data = None - pass - - # -------------------------------------------------------------------------- # def put(self, data: str) -> None: - if self._child: - self._q.put([data, None, None, None]) - self._q.close() - self._q.join_thread() - os._exit(0) + assert self._child + + self._q.put([data, None, None, None]) + self._q.close() + self._q.join_thread() + os._exit(0) # -------------------------------------------------------------------------- # def get(self) -> Any: + assert self._parent + if self._data is None: return - data, exc_type, exc_val, stacktrace = self._data if exc_type: - sys.stdout.write('%s [%s]\n' % (exc_type, exc_val)) - sys.stdout.write('%s\n\n' % stacktrace) - raise exc_type # pylint: disable=raising-bad-type + sys.stderr.write('envp excepted %s(%s)\n' % (exc_type, exc_val)) + sys.stderr.write(stacktrace) + sys.stderr.flush() + raise RuntimeError('envp failed: %s(%s) - check stderr' + % (exc_type, exc_val)) return data diff --git a/src/radical/utils/flux.py b/src/radical/utils/flux.py deleted file mode 100644 index 92980e0b3..000000000 --- a/src/radical/utils/flux.py +++ /dev/null @@ -1,642 +0,0 @@ - -# pylint: disable=cell-var-from-loop - -import os -import sys -import time -import json -import shlex - -from typing import Optional, List, Dict, Any, Callable - -import threading as mt -import subprocess as sp - - -from .url import Url -from .ids import generate_id, ID_CUSTOM -from .shell import sh_callout -from .logger import Logger -from .profile import Profiler -from .modules import import_module - - -# -------------------------------------------------------------------------- -# -class _FluxService(object): - ''' - Helper class to handle a private Flux instance, including configuration, - start, monitoring and termination. - ''' - - # -------------------------------------------------------------------------- - # - def __init__(self, uid : str, - log : Logger, - prof : Profiler) -> None: - - self._uid = uid - self._log = log - self._prof = prof - - self._lock = mt.RLock() - self._term = mt.Event() - - self._uri = None - self._env = None - self._proc = None - self._watcher = None - - try: - cmd = 'flux python -c "import flux; print(flux.__file__)"' - out, err, ret = sh_callout(cmd) - - if ret: - raise RuntimeError('flux not found: %s' % err) - - flux_path = os.path.dirname(out.strip()) - mod_path = os.path.dirname(flux_path) - sys.path.append(mod_path) - - self._flux = import_module('flux') - self._flux_job = import_module('flux.job') - - except Exception: - self._log.exception('flux import failed') - raise - - - # -------------------------------------------------------------------------- - # - @property - def uid(self): - return self._uid - - - @property - def uri(self): - return self._uri - - - @property - def env(self): - return self._env - - - # -------------------------------------------------------------------------- - # - def _watch(self) -> None: - - # FIXME: this thread will change `os.environ` for this *process* because - # we want to call `flux ping` via `sh_callout`. We should - # instead use the Flux Python API to run the pings and pass the - # URI explicitly. - self._log.info('starting flux watcher') - - if self._env: - for k,v in self._env.items(): - os.environ[k] = v - - out, err, ret = sh_callout('flux resource list') - self._log.info('flux resources [ %d %s]:\n%s', ret, err, out) - - while not self._term.is_set(): - - time.sleep(1) - - _, err, ret = sh_callout('flux ping -c 1 kvs') - if ret: - self._log.error('flux watcher err: %s', err) - break - - # we only get here when the ping failed - set the event - self._term.set() - self._log.warn('flux stopped') - - - # -------------------------------------------------------------------------- - # - def start_service(self, - launcher: Optional[str] = None, - env : Optional[Dict[str,str]] = None - ) -> Optional[str]: - - with self._lock: - - if self._proc is not None: - raise RuntimeError('already started Flux: %s' % self._uri) - - self._term.clear() - - return self._locked_start_service(launcher, env) - - - # -------------------------------------------------------------------------- - # - def _locked_start_service(self, - launcher: Optional[str] = None, - env : Optional[Dict[str,str]] = None - ) -> Optional[str]: - - cmd = list() - - if launcher: - cmd += shlex.split(launcher) - - cmd += ['flux', 'start', 'bash', '-c', - 'echo "HOST:$(hostname) URI:$FLUX_URI" && sleep inf'] - - self._log.debug('flux command: %s', ' '.join(cmd)) - - flux_proc = sp.Popen(cmd, encoding="utf-8", - stdin=sp.DEVNULL, stdout=sp.PIPE, stderr=sp.PIPE) - - flux_env = dict() - while flux_proc.poll() is None: - - try: - line = flux_proc.stdout.readline() - - except Exception as e: - self._log.exception('flux service failed to start') - raise RuntimeError('could not start flux') from e - - if not line: - continue - - self._log.debug('flux output: %s', line) - - if line.startswith('HOST:'): - - flux_host, flux_uri = line.split(' ', 1) - - flux_host = flux_host.split(':', 1)[1].strip() - flux_uri = flux_uri.split(':', 1)[1].strip() - - flux_env['FLUX_HOST'] = flux_host - flux_env['FLUX_URI'] = flux_uri - break - - if flux_proc.poll() is not None: - raise RuntimeError('could not execute `flux start`') - - # fr = self._flux.uri.uri.FluxURIResolver() - # ret = fr.resolve('pid:%d' % flux_proc.pid) - # flux_env = {'FLUX_URI': ret} - - assert 'FLUX_URI' in flux_env, 'no FLUX_URI in env' - - # make sure that the flux url can be reached from other hosts - # FIXME: this also routes local access via ssh which may slow comm - flux_url = Url(flux_env['FLUX_URI']) - flux_url.host = flux_env['FLUX_HOST'] - flux_url.schema = 'ssh' - flux_uri = str(flux_url) - flux_env['FLUX_URI'] = flux_uri - - self._uri = flux_uri - self._env = flux_env - self._proc = flux_proc - - self._log.debug('flux uri: %s', flux_uri) - - self._prof.prof('flux_started', msg=self._uid) - - # start watcher thread to monitor the instance - self._watcher = mt.Thread(target=self._watch) - self._watcher.daemon = True - self._watcher.start() - - self._log.info("flux startup successful: [%s]", flux_env['FLUX_URI']) - - return self._uri - - - # -------------------------------------------------------------------------- - # - def check_service(self) -> Optional[str]: - - with self._lock: - - if not self._proc: - raise RuntimeError('flux service was not yet started') - - if self._term.is_set(): - raise RuntimeError('flux service was terminated') - - return self._uri - - - # -------------------------------------------------------------------------- - # - def close_service(self) -> None: - - with self._lock: - - self.check_service() - - if not self._proc: - raise RuntimeError('cannot kill flux from this process') - - if self._watcher: - self._watcher.join() - - # terminate the service process - # FIXME: send termination signal to flux for cleanup - self._proc.kill() - time.sleep(0.1) - self._proc.terminate() - self._proc.wait() - - self._uri = None - self._env = None - - -# ------------------------------------------------------------------------------ -# -class FluxHelper(object): - - ''' - Helper CLASS to programnatically handle flux instances and to obtain state - update events for flux jobs known in that instance. - ''' - - # -------------------------------------------------------------------------- - # - def __init__(self, uid:str = None) -> None: - ''' - The Flux Helper c'tor takes no arguments and will initially not be - connected to a Flux instance. After construction, the application can - call either one of the following methods: - - FluxHelper.connect_flux(uri=None) - FluxHelper.start_flux() - - The first will attempt to connect to the Flux instance referenced by - that URI - a `ValueError` exception will be raised if that instance - cannot be reached. If no URI is provided, the environment variable - `FLUX_URI` will be used. - - The second method will instantiate a new flux instance in the current - process environment. - - In both cases, the properties - - FluxHelper.uri - FluxHelper.env - - will provide information about the connected Flux instance. The `uri` - is provided as a string, the `env` as a dictionary of environment - settings (including `FLUX_URI` again). - - The method - - FluxHelper.reset() - - will disconnect from the Flux instance, and in the case where - `start_flux` created a private instance, that instance will be killed. - The `uri` and `env` properties will be reset to `None`. - - - While connected to a Flux instance, the following methods can be used to - interact with the instance: - - FluxHelper.get_executor() - return a flux.job.Executor instance - FluxHelper.get_handle() - return a flux.job.Flux instance - - All provided executors and handles will be invalidated upon `reset()`. - ''' - - self._service : Optional[_FluxService] = None - - if uid: self._uid = uid - else : self._uid = generate_id('flux.%(item_counter)04d', ID_CUSTOM) - - self._log = Logger(self._uid, ns='radical.utils') - self._prof = Profiler(self._uid, ns='radical.utils') - - self._lock = mt.RLock() - - self._uri = None - self._env = None - self._exe = None - self._handle = None - self._handles = list() # TODO - self._executors = list() # TODO - - try: - cmd = 'flux python -c "import flux; print(flux.__file__)"' - out, err, ret = sh_callout(cmd) - - if ret: - raise RuntimeError('flux not found: %s' % err) - - flux_path = os.path.dirname(out.strip()) - mod_path = os.path.dirname(flux_path) - sys.path.append(mod_path) - - self._flux = import_module('flux') - self._flux_job = import_module('flux.job') - - except Exception: - self._log.exception('flux import failed') - raise - - - # -------------------------------------------------------------------------- - # - def __del__(self): - - # FIXME: are handles / executors correctly garbage collected? - self.reset() - - - # -------------------------------------------------------------------------- - # - def reset(self): - ''' - Close the connection to the FLux instance (if it exists), and terminate - the Flux service if it was started by this instance. All handles and - executors created for this service will be invalidated. - ''' - - with self._lock: - - for idx in range(len(self._handles)): - del self._handles[idx] - - for idx in range(len(self._executors)): - del self._executors[idx] - - self._exe = None - self._handle = None - - if self._uri: - try: - self._service.close_service() - except: - pass - self._uri = None - self._env = None - - - # -------------------------------------------------------------------------- - # - @property - def uid(self): - ''' - unique ID for this FluxHelper instance - ''' - - with self._lock: - return self._uid - - - # -------------------------------------------------------------------------- - # - @property - def uri(self): - ''' - uri for the connected Flux instance. Returns `None` if no instance is - connected. - ''' - - with self._lock: - return self._uri - - - # -------------------------------------------------------------------------- - # - @property - def env(self): - ''' - environment dict for the connected Flux instance. Returns `None` if no - instance is connected. - ''' - - with self._lock: - return self._env - - - # -------------------------------------------------------------------------- - # - def start_flux(self, launcher: Optional[str] = None) -> None: - ''' - Start a private Flux instance - - FIXME: forward env - ''' - - with self._lock: - - if self._uri: - raise RuntimeError('service already connected: %s' % self._uri) - - self._service = _FluxService(self._uid, self._log, self._prof) - self._service.start_service(launcher=launcher) - - self._uri = self._service.check_service() - self._env = self._service.env - - # with ru_open(self._uid + '.dump', 'a') as fout: - # fout.write('start flux pid %d: %s\n' % (os.getpid(), self._uri)) - # for l in get_stacktrace()[:-1]: - # fout.write(l) - - self._setup() - - - # -------------------------------------------------------------------------- - # - def connect_flux(self, uri : Optional[str] = None) -> None: - ''' - Connect to an existing Flux instance - ''' - - with self._lock: - - # with ru_open(self._uid + '.dump', 'a') as fout: - # fout.write('connect flux %d: %s\n' % (os.getpid(), uri)) - # for l in get_stacktrace(): - # fout.write(l + '\n') - - if self._uri: - raise RuntimeError('service already connected: %s' % self._uri) - - if not uri: - uri = os.environ.get('FLUX_URI') - - if not uri: - raise RuntimeError('no Flux instance found via FLUX_URI') - - self._uri = uri - self._env = {'FLUX_URI': uri} - - # FIXME: run a ping test to ensure the service is up - - self._setup() - - - # ---------------------------------------------------------------------- - # - def _setup(self): - ''' - Once a service is connected, create a handle and executor - ''' - - with self._lock: - - assert self._uri, 'not initialized' - - # create a executor and handle for job management - self._exe = self.get_executor() - self._handle = self.get_handle() - - - # -------------------------------------------------------------------------- - # - def submit_jobs(self, - specs: List[Dict[str, Any]], - cb : Optional[Callable[[str, Any], None]] = None - ) -> Any: - - with self._lock: - - if not self._uri: - raise RuntimeError('FluxHelper is not connected') - - assert self._exe, 'no executor' - - futures = list() - for spec in specs: - jobspec = json.dumps(spec) - fut = self._flux_job.submit_async(self._handle, jobspec) - futures.append(fut) - - ids = list() - for fut in futures: - flux_id = fut.get_id() - ids.append(flux_id) - self._log.debug('submit: %s', flux_id) - - if cb: - def app_cb(fut, event): - try: - cb(flux_id, event) - except: - self._log.exception('app cb failed') - - for ev in [ - 'submit', - 'alloc', - 'start', - 'finish', - 'release', - # 'free', - # 'clean', - 'exception', - ]: - fut.add_event_callback(ev, app_cb) - - self._log.debug('submitted: %s', ids) - return ids - - - # -------------------------------------------------------------------------- - # - def attach_jobs(self, - ids: List[int], - cb : Optional[Callable[[int, Any], None]] = None - ) -> Any: - - with self._lock: - - if not self._uri: - raise RuntimeError('FluxHelper is not connected') - - assert self._exe, 'no executor' - - for flux_id in ids: - - fut = self._exe.attach(flux_id) - self._log.debug('attach %s : %s', flux_id, fut) - - if cb: - def app_cb(fut, event): - try: - cb(flux_id, event) - except: - self._log.exception('app cb failed') - - for ev in [ - 'submit', - 'alloc', - 'start', - 'finish', - 'release', - # 'free', - # 'clean', - 'exception', - ]: - fut.add_event_callback(ev, app_cb) - - - # -------------------------------------------------------------------------- - # - def cancel_jobs(self, flux_ids: List[int]) -> None: - - with self._lock: - - assert self._exe, 'no executor' - - for flux_id in flux_ids: - fut = self._exe.attach(flux_id) - self._log.debug('cancel %s : %s', flux_id, fut) - fut.cancel() - - - # -------------------------------------------------------------------------- - # - def get_handle(self) -> Any: - - with self._lock: - - if not self._uri: - raise RuntimeError('FluxHelper is not connected') - - try: - handle = self._flux.Flux(url=self._uri) - assert handle, 'no handle' - - except Exception as e: - raise RuntimeError('failed to connect at %s' % self._uri) from e - - self._handles.append(handle) - - return handle - - - # -------------------------------------------------------------------------- - # - def get_executor(self) -> Any: - - with self._lock: - - if not self._uri: - raise RuntimeError('FluxHelper is not connected') - - try: - args = {'url': self._uri} - exe = self._flux_job.executor.FluxExecutor(handle_kwargs=args) - assert exe, 'no executor' - - except Exception as e: - raise RuntimeError('failed to connect at %s' % self._uri) from e - - self._executors.append(exe) - - return exe - - -# ------------------------------------------------------------------------------ - diff --git a/src/radical/utils/flux/__init__.py b/src/radical/utils/flux/__init__.py new file mode 100644 index 000000000..a7fbfbcce --- /dev/null +++ b/src/radical/utils/flux/__init__.py @@ -0,0 +1,10 @@ + +from .flux_service import FluxService +from .flux_helper_v0 import FluxHelperV0 as _FluxHelperV0 +from .flux_helper_v1 import FluxHelperV1 as _FluxHelperV1 +from .flux_module import FluxModule, spec_from_command, spec_from_dict + +_fm = FluxModule() +if _fm.version == 1: FluxHelper = _FluxHelperV1 +else : FluxHelper = _FluxHelperV0 + diff --git a/src/radical/utils/flux/flux_helper_v0.py b/src/radical/utils/flux/flux_helper_v0.py new file mode 100644 index 000000000..5d9510bdb --- /dev/null +++ b/src/radical/utils/flux/flux_helper_v0.py @@ -0,0 +1,216 @@ + +import time +import queue + +import threading as mt + +from functools import partial +from collections import defaultdict +from typing import List, Union + +from ..misc import as_list +from ..ids import generate_id +from ..logger import Logger + +from .flux_module import FluxModule + + +# ------------------------------------------------------------------------------ +# +class FluxHelperV0(object): + + # -------------------------------------------------------------------------- + # + def __init__(self, uri : str, + log : Logger = None) -> None: + + # print('=== v0 flux helper ===') + + self._uri = uri + self._log = log or Logger('radical.utils.flux') + self._uid = generate_id('ru.flux') + + self._fm = FluxModule() + self._handle = self._fm.core.Flux(self._uri) + self._api_lock = mt.Lock() + self._exe = self._fm.job.executor.FluxExecutor( + handle_kwargs={'url': self._uri}) + + self._idlock = mt.Lock() # lock ID dicts + self._elock = mt.Lock() # lock event dict + self._task_ids = dict() # flux ID -> task ID + self._flux_ids = dict() # task ID -> flux ID + self._events = defaultdict(list) # flux ID -> event list + self._cbacks = list() # list of callbacks + + self._fm.verify() + + + # -------------------------------------------------------------------------- + # + def start(self, launcher: str = None) -> None: + + pass + + + # -------------------------------------------------------------------------- + # + def stop(self): + + with self._api_lock: + + # FIXME: shutdown flux instance + self._uri = None + self._handle = None + + + # -------------------------------------------------------------------------- + # + @property + def uid(self) -> str: + return self._uid + + @property + def uri(self) -> str: + return self._uri + + + # -------------------------------------------------------------------------- + # + def register_cb(self, cb: callable) -> None: + + with self._api_lock, self._elock: + self._log.debug('register cb %s', cb) + self._cbacks.append(cb) + + + # -------------------------------------------------------------------------- + # + def unregister_cb(self, cb: callable) -> None: + + with self._api_lock, self._elock: + self._cbacks.remove(cb) + + + # -------------------------------------------------------------------------- + # + def _handle_events(self, fh : 'flux.Flux', + fid : 'flux.job.JobID', + event: 'flux.job.journal.JournalEvent' = None + ) -> None: + + self._log.debug('event for %s: %s', fid, event) + # print('event %s: %s' % (fid, event)) + + # if triggered by submit, check if we have anything to do + if not event: + if fid not in self._events: + # print('no event') + return + + # check if we can handle the event - otherwise store it + if not self._cbacks: + # print('no cbacks') + self._events[fid].append(event) + return + + # task is known, flush stored events + for ev in self._events[fid]: + # print('flush stored events') + for cb in self._cbacks: + try : cb(fid, ev) + except: self._log.exception('cb failed') + self._events[fid] = [] + + # process the current event + if event: + # print('process current event') + for cb in self._cbacks: + try : cb(fid, event) + except: self._log.exception('cb failed') + + + # -------------------------------------------------------------------------- + # + def submit(self, specs: List['flux.job.JobspecV1']) -> List[str]: + + with self._api_lock: + + if not self._handle: + raise RuntimeError('flux instance not started') + + self._log.debug('== submit %d specs', len(specs)) + + events = ['submit', 'depend', 'alloc', 'start', # 'cleanup', + 'finish', 'release', 'free', 'clean', 'priority', 'exception'] + + def event_cb(fid, fut, event): + self._handle_events(self._handle, fid, event) + + futures = list() + def id_cb(fut): + flux_id = fut.jobid() + idx = fut.ru_idx + for ev in events: + tmp_cb = partial(event_cb, flux_id) + fut.add_event_callback(ev, tmp_cb) + futures.append([flux_id, idx, fut]) + self._log.debug('got flux id: %s: %s', idx, flux_id) + + for idx, spec in enumerate(specs): + fut = self._exe.submit(spec, waitable=True) + fut.ru_idx = idx + self._log.debug('%s: submitted : %s', self._uid, idx) + fut.add_jobid_callback(id_cb) + + # wait until we saw all jobid callbacks (assume 10 tasks/sec) + timeout = len(specs) + timeout = max(100, timeout) + start = time.time() + self._log.debug('%s: wait %.2fsec for %d flux IDs', + self._uid, timeout, len(specs)) + while len(futures) < len(specs): + time.sleep(0.1) + self._log.debug('%s: wait %s / %s', self._uid, + len(futures), len(specs)) + if time.time() - start > timeout: + raise RuntimeError('%s: timeout on submission' % self._uid) + self._log.info('got %d flux IDs', len(futures)) + + # get flux_ids sorted by submission order (idx) + flux_ids = [fut[0] for fut in sorted(futures, key=lambda x: x[1])] + + self._log.debug('%s: submitted: %s', self._uid, flux_ids) + return flux_ids + + + # -------------------------------------------------------------------------- + # + def cancel(self, fids: [Union[str, List[str]]]) -> None: + + with self._api_lock: + + if not self._handle: + raise RuntimeError('flux instance not started') + + with self._idlock: + for fid in as_list(fids): + self._fm.job.cancel_async(self._handle, fid, reason='user cancel') + + + # -------------------------------------------------------------------------- + # + def wait(self, fids: [Union[str, List[str]]]) -> None: + + with self._api_lock: + + if not self._handle: + raise RuntimeError('flux instance not started') + + for fid in fids: + self._log.debug('wait for %s', fid) + self._fm.job.wait(self._handle, fid) + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/utils/flux/flux_helper_v1.py b/src/radical/utils/flux/flux_helper_v1.py new file mode 100644 index 000000000..fef643fe5 --- /dev/null +++ b/src/radical/utils/flux/flux_helper_v1.py @@ -0,0 +1,318 @@ + +import time +import queue + +import threading as mt + +from collections import defaultdict +from typing import List, Union + +from ..misc import as_list +from ..ids import generate_id +from ..logger import Logger + +from .flux_module import FluxModule + + +# ------------------------------------------------------------------------------ +# +class FluxHelperV1(object): + + # -------------------------------------------------------------------------- + # + def __init__(self, uri : str, + log : Logger = None) -> None: + + # print('=== v1 flux helper ===') + + self._uri = uri + self._log = log or Logger('radical.utils.flux') + self._uid = generate_id('ru.flux') + + self._fm = FluxModule() + self._handle = self._fm.core.Flux(self._uri) + self._api_lock = mt.Lock() + + # journal watcher + self._jthread = None + self._jterm = mt.Event() + + # event handle thread + self._ethread = None + self._equeue = queue.Queue() + + # submit thread + self._sthread = None + self._squeue = queue.Queue() + self._sevent = mt.Event() + + self._idlock = mt.Lock() # lock ID dicts + self._elock = mt.Lock() # lock event dict + self._task_ids = dict() # flux ID -> task ID + self._flux_ids = dict() # task ID -> flux ID + self._events = defaultdict(list) # flux ID -> event list + self._cbacks = list() # list of callbacks + + self._fm.verify() + + + # -------------------------------------------------------------------------- + # + def start(self, launcher: str = None) -> None: + + with self._api_lock: + + if self._jthread is not None: + return + + self._jthread = mt.Thread(target=self._jwatcher) + self._jthread.daemon = True + self._jthread.start() + + self._ethread = mt.Thread(target=self._ewatcher) + self._ethread.daemon = True + self._ethread.start() + + self._sthread = mt.Thread(target=self._swatcher) + self._sthread.daemon = True + self._sthread.start() + + + # -------------------------------------------------------------------------- + # + def stop(self): + + with self._api_lock: + + if self._handle is None: + self._jterm.set() + self._jthread.join() + + # FIXME: shutdown flux instance + self._flux_service = None + self._uri = None + self._handle = None + + + # -------------------------------------------------------------------------- + # + @property + def uid(self) -> str: + return self._uid + + @property + def uri(self) -> str: + return self._uri + + + # -------------------------------------------------------------------------- + # + def _jwatcher(self): + + # NOTE: *never* used self._handle in this thread, as it is not thread + # safe. Instead, use the private handle created here + fh = self._fm.core.Flux(self._uri) + + # start watching the event journal + journal = self._fm.job.JournalConsumer(fh) + journal.start() + + while not self._jterm.is_set(): + + try: + event = journal.poll(timeout=1.0) + if event: + # FIXME: How can that ever *not* be a journal event? + # But it has happened... + self._handle_events(fh, event.jobid, event) + + except TimeoutError: + pass + + + # -------------------------------------------------------------------------- + # + def _swatcher(self): + + self._log.debug('swatcher started') + + # if we get new specs, submit them, return IDs to iqueue, and also + # forward ID to ewatcher + fh = self._fm.core.Flux(self._uri) + while True: + + try: + specs = self._squeue.get(block=True, timeout=1.0) + self._log.debug('got %d specs', len(specs)) + + except queue.Empty: + continue + + except: + self._log.exception("exception") + raise + + with self._idlock: + + try: + + futs = list() + for spec in specs: + tid = spec.attributes['user']['uid'] + fut = self._fm.job.submit_async(fh, spec, waitable=True) + futs.append([fut, tid]) + + for fut, tid in futs: + fid = fut.get_id() + self._task_ids[fid] = tid + self._flux_ids[tid] = fid + + # trigger an event check + self._equeue.put(fid) + + except Exception: + self._log.exception("exception") + raise + + finally: + # trigger submit completion + self._log.debug('submit done') + self._sevent.set() + + + # -------------------------------------------------------------------------- + # + def _ewatcher(self): + + # if we get a new job ID, check if we have events for it + + fh = self._fm.core.Flux(self._uri) + while True: + + try: + fid = self._equeue.get(timeout=1.0) + self._handle_events(fh, fid) + + except queue.Empty: + continue + + + # -------------------------------------------------------------------------- + # + def register_cb(self, cb: callable) -> None: + + with self._api_lock, self._elock: + self._log.debug('register cb %s', cb) + self._cbacks.append(cb) + + + # -------------------------------------------------------------------------- + # + def unregister_cb(self, cb: callable) -> None: + + with self._api_lock, self._elock: + self._cbacks.remove(cb) + + + # -------------------------------------------------------------------------- + # + def _handle_events(self, fh : 'flux.Flux', + fid : 'flux.job.JobID', + event: 'flux.job.journal.JournalEvent' = None + ) -> None: + + with self._elock: + + # self._log.debug_9('event %s: %s', fid, event) + + # if triggered by submit, check if we have anything to do + if not event: + if fid not in self._events: + return + + # check if we can handle the event - otherwise store it + if not self._cbacks: + self._events[fid].append(event) + return + + # check if application knows the task - otherwise store the event + if fid not in self._task_ids: + self._events[fid].append(event) + return + + tid = self._task_ids[fid] + + # task is known, flush stored events + for ev in self._events[fid]: + for cb in self._cbacks: + try : cb(tid, ev) + except: self._log.exception('cb failed') + self._events[fid] = [] + + # process the current event + if event: + for cb in self._cbacks: + try : cb(tid, event) + except: self._log.exception('cb failed') + + + # -------------------------------------------------------------------------- + # + def submit(self, specs: List['flux.job.JobspecV1']) -> List[str]: + + with self._api_lock: + + if not self._handle: + raise RuntimeError('flux instance not started') + + self._log.debug('== submit %d specs start', len(specs)) + tids = [spec.attributes['user']['uid'] for spec in specs] + + self._sevent.clear() + self._squeue.put(specs) + self._sevent.wait(timeout=60.0) + + if not self._sevent.is_set(): + raise RuntimeError('flux submit timeout') + + self._log.debug('== submit %d specs done', len(specs)) + + return tids + + + # -------------------------------------------------------------------------- + # + def cancel(self, tids: [Union[str, List[str]]]) -> None: + + with self._api_lock: + + if not self._handle: + raise RuntimeError('flux instance not started') + + with self._idlock: + for tid in as_list(tids): + fid = self._flux_ids[tid] + self._fm.job.cancel_async(self._handle, fid, reason='user cancel') + + + # -------------------------------------------------------------------------- + # + def wait(self, tids: [Union[str, List[str]]]) -> None: + + with self._api_lock: + + if not self._handle: + raise RuntimeError('flux instance not started') + + tids = as_list(tids) + with self._idlock: + fids = [self._flux_ids[tid] for tid in tids] + + for fid in fids: + self._fm.job.wait(self._handle, fid) + + # FIXME: remove tasks which have been waited for. + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/utils/flux/flux_module.py b/src/radical/utils/flux/flux_module.py new file mode 100644 index 000000000..ca263e0e4 --- /dev/null +++ b/src/radical/utils/flux/flux_module.py @@ -0,0 +1,196 @@ + +import os +import sys +import math +import shlex + +from typing import Any + +from ..which import which +from ..ids import generate_id, ID_SIMPLE +from ..modules import import_module +from ..shell import sh_callout + + +# ------------------------------------------------------------------------------ +# +class FluxModule(object): + + _flux_core = None + _flux_job = None + _flux_exc = None + _flux_v = None + + + # -------------------------------------------------------------------------- + # + def __init__(self): + ''' + import the flux module, if available + ''' + + if self._flux_core or self._flux_job or self._flux_exc: + return + + flux = None + flux_job = None + flux_exc = None + flux_v = None + + try: + flux = import_module('flux') + flux_job = import_module('flux.job') + if 'JournalConsumer' in dir(flux_job): + flux_v = 1 + else: + flux_v = 0 + + except Exception as e: + flux_exc = e + + + # on failure, try to derive module path from flux executable + if flux is None or flux_job is None: + + to_pop = None + try: + cmd = 'flux python -c "import flux; print(flux.__file__)"' + out, err, ret = sh_callout(cmd) + + assert not ret, [cmd, err] + + flux_path = os.path.dirname(out.strip()) + mod_path = os.path.dirname(flux_path) + sys.path.append(mod_path) + to_pop = mod_path + + flux = import_module('flux') + flux_job = import_module('flux.job') + if 'JournalConsumer' in dir(flux_job): + flux_v = 1 + else: + flux_v = 0 + + except Exception as e: + flux_exc = e + + if to_pop: + sys.path.remove(to_pop) + + self._flux_core = flux + self._flux_job = flux_job + self._flux_exc = flux_exc + self._flux_v = flux_v + self._flux_exe = which('flux') + + + # -------------------------------------------------------------------------- + # + def verify(self) -> None: + ''' + verify that flux modules are available + ''' + + if self._flux_core is None: + raise RuntimeError('flux core module not found') from self._flux_exc + + if self._flux_job is None: + raise RuntimeError('flux.job module not found') from self._flux_exc + + if self._flux_exe is None: + raise RuntimeError('flux executable not found') from self._flux_exc + + + # -------------------------------------------------------------------------- + # + @property + def version(self) -> int: + return self._flux_v + + @property + def core(self) -> Any: + return self._flux_core + + @property + def job(self) -> Any: + return self._flux_job + + @property + def exc(self) -> Any: + return self._flux_exc + + @property + def exe(self) -> str: + return self._flux_exe + + +# ------------------------------------------------------------------------------ +# +def spec_from_command(cmd: str) -> 'flux.job.JobspecV1': + + fm = FluxModule() + + spec = fm.job.JobspecV1.from_command(shlex.split(cmd)) + if not 'user' in spec.attributes: + spec.attributes['user'] = dict() + spec.attributes['user']['uid'] = generate_id(ID_SIMPLE) + + return spec + + +# ------------------------------------------------------------------------------ +# +def spec_from_dict(td: dict) -> 'flux.job.JobspecV1': + + fm = FluxModule() + + version = 1 + user = {'uid' : td.get('uid', generate_id('ru_flux', ID_SIMPLE))} + system = {'duration': td.get('timeout', 0.0)} + tasks = [{'command': [td['executable']] + td.get('arguments', []), + 'slot' : 'task', + 'count' : {'per_slot': 1}}] + + if 'environment' in td: system['environment'] = td['environment'] + if 'sandbox' in td: system['cwd'] = td['sandbox'] + if 'shell' in td: system['shell'] = td['shell'] + + attributes = {'system' : system, + 'user' : user} + + n_ranks = td.get('ranks', 1) + resources = [{'count': n_ranks, + 'type' : 'slot', + 'label': 'task', + 'with' : [{ + 'count': int(td.get('cores_per_rank', 1)), + 'type' : 'core'}]}] + # 'count': int(td.get('gpus_per_rank', 0)) or None, + # 'type' : 'gpu' + + gpr = td.get('gpus_per_rank', 0) + if gpr: + resources[0]['with'].append({'count': math.ceil(gpr), # flux needs int + 'type' : 'gpu'}) + + spec = fm.job.JobspecV1(resources=resources, + attributes=attributes, + tasks=tasks, + version=version) + + if n_ranks > 1: + if td.get('use_mpi', True): + # ensure that all ranks exit if one rank fails + spec.setattr_shell_option('exit-on-error', 1) # defaults to 0 + else: + spec.setattr_shell_option('exit-timeout', 'none') # defaults to 30 + + if td.get('stdin') : spec.stdin = td['stdin'] + if td.get('stdout'): spec.stdout = td['stdout'] + if td.get('stderr'): spec.stderr = td['stderr'] + + return spec + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/utils/flux/flux_service.py b/src/radical/utils/flux/flux_service.py new file mode 100644 index 000000000..594388598 --- /dev/null +++ b/src/radical/utils/flux/flux_service.py @@ -0,0 +1,155 @@ + +import time + +import threading as mt + +from rc.process import Process +from functools import partial +from typing import List + +from ..url import Url +from ..ids import generate_id +from ..logger import Logger + +from .flux_module import FluxModule + + +# ------------------------------------------------------------------------------ +# +class FluxService(object): + + # -------------------------------------------------------------------------- + # + def __init__(self, uid : str = None, + log : Logger = None, + launcher: str = None + ) -> None: + + self._uid = uid or generate_id('ru.flux') + self._log = log or Logger('radical.utils.flux', level='DEBUG') + self._launcher = launcher or '' + + self._fm = FluxModule() + self._uri = None + self._r_uri = None + self._host = None + self._proc = None + self._ready = mt.Event() + + self._fm.verify() + + + # -------------------------------------------------------------------------- + # + @property + def uid(self) -> str: + return self._uid + + @property + def uri(self) -> str: + return self._uri + + + @property + def r_uri(self) -> str: + return self._r_uri + + + # -------------------------------------------------------------------------- + # + def _proc_line_cb(self, prefix: str, + proc : Process, + lines : List[str] + ) -> None: + + try: + for line in lines: + self._log.info('%s: flux io : %s', self._uid, line) + + if line.startswith('FLUX_URI='): + parts = line.strip().split(' ', 1) + self._log.info('%s: found flux info: %s', self._uid, parts) + + self._uri = parts[0].split('=', 1)[1] + self._host = parts[1].split('=', 1)[1] + + url = Url(self._uri) + url.host = self._host + url.schema = 'ssh' + self._r_uri = str(url) + + self._log.info('%s: flux uri: %s', self._uid, self._uri) + self._log.info('%s: r uri: %s', self._uid, self._r_uri) + self._ready.set() + except: + self._log.exception('line processing failed') + + + # -------------------------------------------------------------------------- + # + def _proc_state_cb(self, proc: Process, state: str) -> None: + + self._log.info('flux instance state update: %s', state) + if state in Process.FINAL: + + self._log.info('flux instance stopped: %s', state) + self.stop() + + + # -------------------------------------------------------------------------- + # + def start(self, timeout: float = None) -> None: + + fcmd = 'echo FLUX_URI=\\$FLUX_URI FLUX_HOST=\\$(hostname) ' + fcmd += ' && flux resource list ' + fcmd += ' && sleep inf ' + cmd = ' %s start bash -c "%s"' % (self._fm.exe, fcmd) + + if self._launcher: + cmd = '%s %s' % (self._launcher, cmd) + + self._log.info('%s: start flux instance: %s', self._uid, cmd) + + p = Process(cmd) + p.register_cb(p.CB_OUT_LINE, partial(self._proc_line_cb, 'out')) + p.register_cb(p.CB_ERR_LINE, partial(self._proc_line_cb, 'err')) + p.register_cb(p.CB_STATE, self._proc_state_cb) + p.polldelay = 0.1 + p.start() + + self._proc = p + self._ptime = time.time() + + return self.ready(timeout=timeout) + + + # -------------------------------------------------------------------------- + # + def ready(self, timeout: float = None) -> None: + + if timeout is not None: + if timeout < 0: self._ready.wait() + else : self._ready.wait(timeout) + + return self._ready.is_set() + + + # -------------------------------------------------------------------------- + # + def stop(self) -> None: + + if not self._proc: + self._uri = None + return + + self._proc.cancel() + self._proc.wait() + + self._uri = None + self._proc = None + + self._log.info('%s: found flux uri: %s', self._uid, self.uri) + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/utils/ids.py b/src/radical/utils/ids.py index ecec34c76..c974530c7 100644 --- a/src/radical/utils/ids.py +++ b/src/radical/utils/ids.py @@ -252,20 +252,26 @@ def _generate_id(template, prefix, ns=None): if '%(uuid)' in template: info['uuid'] = uuid.uuid1() # plain uuid def _read_file_counter(name): - fd = os.open(name, os.O_RDWR | os.O_CREAT) + output = 0 try: - fcntl.flock(fd, fcntl.LOCK_EX) - except OSError: - # fcntl.flock might cause OSError: [Errno 524] Unknown error 524 - # (the case for Theta@ALCF) - fcntl.lockf(fd, fcntl.LOCK_EX) - os.lseek(fd, 0, os.SEEK_SET) - data = os.read(fd, 256) - if not data: output = 0 - else : output = int(data) - os.lseek(fd, 0, os.SEEK_SET) - os.write(fd, str.encode("%d\n" % (output + 1))) - os.close(fd) + fd = os.open(name, os.O_RDWR | os.O_CREAT) + try: + fcntl.flock(fd, fcntl.LOCK_EX) + except OSError: + # fcntl.flock might cause OSError: [Errno 524] Unknown error 524 + # (the case for Theta@ALCF) + fcntl.lockf(fd, fcntl.LOCK_EX) + os.lseek(fd, 0, os.SEEK_SET) + data = os.read(fd, 256) + if data: output = int(data) + os.lseek(fd, 0, os.SEEK_SET) + os.write(fd, str.encode("%d\n" % (output + 1))) + os.close(fd) + finally: + try: + os.close(fd) + except: + pass return output if '%(day_counter)' in template: diff --git a/src/radical/utils/profile.py b/src/radical/utils/profile.py index 2e90875c4..5afd31d2c 100644 --- a/src/radical/utils/profile.py +++ b/src/radical/utils/profile.py @@ -12,6 +12,9 @@ from .threads import get_thread_name as ru_get_thread_name from .config import DefaultConfig from .atfork import atfork +from .shell import sh_callout +from .which import which +from .modules import import_module # ------------------------------------------------------------------------------ @@ -840,5 +843,42 @@ def event_to_label(event): return event[EVENT] +# ------------------------------------------------------------------------------ +# +class Yappi(object): + + def __init__(self, name, method='wall', verbose=False): + self._yappi = import_module('yappi') + self._yappi.set_clock_type(method) + self._name = name + self._verb = verbose + + def __enter__(self): + if self._yappi: + self._yappi.start(builtins=True) + + def __exit__(self, etype, value, traceback): + + if self._yappi: + + if self._verb: + self._yappi.get_func_stats().print_all() + self._yappi.get_thread_stats().print_all() + + fstats = self._yappi.get_func_stats() + pstats = self._yappi.convert2pstats(fstats) + pstats.dump_stats('%s.pstats' % self._name) + + if which('gprof2dot'): + + cmd = 'gprof2dot -e 0.00 -n 0.12 --skew=0.3' + cmd += ' --node-label="total-time" -f pstats %s.pstats' % self._name + cmd += ' | dot -Tpng -o %s.png' % self._name + + out, err, ret = sh_callout(cmd, shell=True) + if ret: + print('gprof2dot failed: %s' % err) + + # ------------------------------------------------------------------------------ diff --git a/src/radical/utils/typeddict.py b/src/radical/utils/typeddict.py index a160f5829..252686956 100644 --- a/src/radical/utils/typeddict.py +++ b/src/radical/utils/typeddict.py @@ -158,11 +158,13 @@ def __init__(self, from_dict=None, **kwargs): if self._deep: - self.update(copy.deepcopy(self._defaults)) + self.__dict__['_data'] = copy.deepcopy(self._defaults) else: - self.update(self._defaults) + self.__dict__['_data'] = dict() + self.__dict__['_data'].update(self._defaults) - self.update(from_dict) + if from_dict: + self.update(from_dict) if kwargs: self.update(kwargs) @@ -228,7 +230,10 @@ def __getitem__(self, k): return self._data[k] def __setitem__(self, k, v): - self._data[k] = self._verify_setter(k, v) + if self._check : + self._data[k] = self._verify_setter(k, v) + else: + self._data[k] = v def __delitem__(self, k): del self._data[k] @@ -295,8 +300,6 @@ def popitem(self): def __getattr__(self, k): if k == '_data': - if '_data' not in self.__dict__: - self.__dict__['_data'] = dict() return self.__dict__['_data'] if k.startswith('__'): @@ -314,7 +317,10 @@ def __setattr__(self, k, v): if k.startswith('__'): return object.__setattr__(self, k, v) - self._data[k] = self._verify_setter(k, v) + if self._check : + self._data[k] = self._verify_setter(k, v) + else: + self._data[k] = v def __delattr__(self, k): diff --git a/tests/integration_tests/test_flux.py b/tests/integration_tests/test_flux.py index 25513865a..d997b0133 100755 --- a/tests/integration_tests/test_flux.py +++ b/tests/integration_tests/test_flux.py @@ -14,107 +14,36 @@ yaml = pytest.importorskip('yaml') flux = pytest.importorskip('flux') events = dict() -spec = { - "tasks": [{ - "slot": "task", - "count": { - "per_slot": 1 - }, - "command": [ - "/bin/date" - ] - }], - "attributes": { - "system": { - "duration": 10000 - } - }, - "version": 1, - "resources": [{ - "count": 1, - "type" : "slot", - "label": "task", - "with": [{ - "count": 1, - "type": "core" - }] - }] - } +spec = ru.flux.spec_from_command(cmd='/bin/date') # ------------------------------------------------------------------------------ # -def test_flux_startup(): +def test_flux(): global events njobs = 10 events = dict() - def cb1(job_id, state, ts, context): + def cb1(job_id, state): - # print([job_id, state, ts, context]) if job_id not in events: - events[job_id] = [ts, state] + events[job_id] = [state] else: - events[job_id].append([ts, state]) + events[job_id].append(state) - fh = ru.FluxHelper() - fh.start_flux() + fs = ru.FluxService() + fs.start(timeout=10) - assert fh.uri - assert 'FLUX_URI' in fh.env + assert fs.uri - specs = [spec] * njobs - ids = fh.submit_jobs(specs, cb=cb1) - assert len(ids) == njobs, len(ids) - - time.sleep(5) - - assert len(events) == njobs, len(events) - for jid in events: - # we expect at least 4 events per job: - # 'submit', 'start', 'finish', 'clean', - assert len(events[jid]) >= 4, [jid, events[jid]] - - fh.reset() - assert fh.uri is None - - -# ------------------------------------------------------------------------------ -# -def test_flux_pickup(): - - global events - - njobs = 10 - events = dict() - outer_fh = None - - if 'FLUX_URI' not in os.environ: - outer_fh = ru.FluxHelper() - outer_fh.start_flux() - - for k,v in outer_fh.env.items(): - os.environ[k] = v - - def cb1(job_id, state, ts, context): - - # print([job_id, state, ts, context]) - if job_id not in events: - events[job_id] = [ts, state] - else: - events[job_id].append([ts, state]) - - fh = ru.FluxHelper() - fh.start_flux() - - assert fh.uri - assert 'FLUX_URI' in fh.env + fh = ru.FluxHelper(uri=fs.uri) + fh.register_cb(cb1) specs = [spec] * njobs - ids = fh.submit_jobs(specs, cb=cb1) + ids = fh.submit(specs) assert len(ids) == njobs, len(ids) time.sleep(5) @@ -125,19 +54,18 @@ def cb1(job_id, state, ts, context): # 'submit', 'start', 'finish', 'clean', assert len(events[jid]) >= 4, [jid, events[jid]] - fh.reset() + fh.stop() assert fh.uri is None - if outer_fh: - outer_fh.reset() + fs.stop() + assert fs.uri is None # ------------------------------------------------------------------------------ # if __name__ == '__main__': - test_flux_startup() - test_flux_pickup() + test_flux() # ------------------------------------------------------------------------------ diff --git a/tests/unittests/test_env.py b/tests/unittests/test_env.py index e0fe4325c..88c135bf9 100755 --- a/tests/unittests/test_env.py +++ b/tests/unittests/test_env.py @@ -126,7 +126,8 @@ def test_env_proc(): env_proc = ru.EnvProcess(env=env) with env_proc: - env_proc.put(ru.sh_callout('echo -n $%s' % key, shell=True)) + if env_proc: + env_proc.put(ru.sh_callout('echo -n $%s' % key, shell=True)) out = str(env_proc.get()) assert isinstance(out, str) diff --git a/tests/unittests/test_heartbeat.py b/tests/unittests/test_heartbeat.py index aa2f7af3f..9c11399de 100755 --- a/tests/unittests/test_heartbeat.py +++ b/tests/unittests/test_heartbeat.py @@ -153,6 +153,13 @@ def is_alive(pid): else: return True + def _join(proc, timeout=0.1): + proc.join(timeout=timeout) + try: + os.waitpid(test_proc.pid, os.WNOHANG) + except: + pass + # watcher process def _watcher(action): @@ -208,7 +215,7 @@ def _watcher(action): # after 1.2 seconds, the watcher should have exited time.sleep(1.2) - test_proc.join(timeout=0.0) + _join(test_proc, 0.0) assert not is_alive(pids[0]) assert not is_alive(pids[1]) assert not is_alive(pids[2]) @@ -222,7 +229,7 @@ def _watcher(action): # after 0.4 seconds, only second sleep should still be alive time.sleep(0.4) - test_proc.join(timeout=0.1) + _join(test_proc, 0.1) assert not is_alive(pids[0]) assert not is_alive(pids[1]) assert is_alive(pids[2]) @@ -243,14 +250,14 @@ def _watcher(action): # after 0.4 seconds, only second sleep should still be alive time.sleep(0.4) - test_proc.join(timeout=0.1) + _join(test_proc, 0.1) assert is_alive(pids[0]) assert not is_alive(pids[1]) assert not is_alive(pids[2]) # after 0.5 seconds, none of the processes should be alive time.sleep(0.5) - test_proc.join(timeout=0.1) + _join(test_proc, 0.1) assert not is_alive(pids[0]) assert not is_alive(pids[1]) assert not is_alive(pids[2]) @@ -265,14 +272,14 @@ def _watcher(action): # after 0.4 seconds, only second sleep should still be alive time.sleep(0.4) - test_proc.join(timeout=0.1) + _join(test_proc, 0.1) assert is_alive(pids[0]) assert not is_alive(pids[1]) assert not is_alive(pids[2]) # after 0.5 seconds, none of the processes should be alive time.sleep(0.5) - test_proc.join(timeout=0.1) + _join(test_proc, 0.1) assert not is_alive(pids[0]) assert not is_alive(pids[1]) assert not is_alive(pids[2]) diff --git a/tests/unittests/test_typeddict.py b/tests/unittests/test_typeddict.py old mode 100644 new mode 100755 index d2c84df25..38d2e319f --- a/tests/unittests/test_typeddict.py +++ b/tests/unittests/test_typeddict.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python3 + # pylint: disable=protected-access __author__ = 'RADICAL-Cybertools Team' @@ -654,22 +656,43 @@ class TDSchemaNotNone(TypedDict): self.assertIsInstance(td.any_data, TypedDict) self.assertIsNot(td.any_data, input_data['any_data']) - # -------------------------------------------------------------------------- - # - def test_pickle(self): - - import pickle + # # -------------------------------------------------------------------------- + # # + # def test_pickle(self): + # + # import pickle + # + # td = TDSimple({ + # 'attr_str' : 'foo', + # 'attr_dict': {'bar': 'buz'}}) + # + # ser = pickle.dumps(td) + # td2 = pickle.loads(ser) + # td2.verify() + # + # self.assertEqual(td2.attr_str, 'foo') + # self.assertEqual(td2.attr_int, 1) + # self.assertEqual(td2.attr_dict['bar'], 'buz') - td = TDSimple({ - 'attr_str' : 'foo', - 'attr_dict': {'bar': 'buz'}}) - - ser = pickle.dumps(td) - td2 = pickle.loads(ser) - td2.verify() +# ------------------------------------------------------------------------------ +# +if __name__ == '__main__': + + tc = TypedDictTestCase() + tc.setUpClass() + + tc.test_init() + tc.test_hash() + tc.test_self_default() + tc.test_verify() + tc.test_verify_setter() + tc.test_base_methods() + tc.test_pop() + tc.test_popitem() + tc.test_query() + tc.test_metaclass() + tc.test_tderrors() + tc.test_none() + # tc.test_pickle() - self.assertEqual(td2.attr_str, 'foo') - self.assertEqual(td2.attr_int, 1) - self.assertEqual(td2.attr_dict['bar'], 'buz') -# ------------------------------------------------------------------------------