Coverage for stems/executor.py : 58%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
""" Dask/Distributed related helpers """
""" Setup Dask to use threads or processes """ import dask
if scheduler in ('sync', 'single-threaded', 'synchronous', ): logger.debug('Using the synchronous/single-threaded backend') dask.config.set(scheduler='synchronous') elif scheduler == 'threads': if workers: from multiprocessing.pool import ThreadPool pool = ThreadPool(int(workers)) else: pool = None logger.debug(f'Using `threads` backend (n={workers or "auto"})') dask.config.set(scheduler='threads', pool=pool) elif scheduler == 'processes': if workers: from multiprocessing import Pool pool = Pool(int(workers)) else: pool = None logger.debug(f'Using `processes` backend (n={workers or "auto"})') dask.config.set(scheduler='processes', pool=pool) else: raise KeyError(f'Unsupported scheduler "{scheduler}"')
""" Setup a Dask distributed cluster scheduler client
Parameters ---------- address : str, optional This can be the address of a ``Scheduler`` server, like a string ``'127.0.0.1:8786'``. If ``None``, sets up a ``LocalCluster`` n_workers : int, optional Number of workers. Only used if setting up a ``LocalCluster`` threads_per_worker : int, optional Number of threads per worker kwds Additional options passed to :py:func:`distributed.Client`
Returns ------- distributed.Client Distributed compute client """ n_workers=n_workers, threads_per_worker=threads_per_worker, **kwds) except Exception as e: logger.exception('Could not start `distributed` cluster') raise else:
""" Return a list of strings with info for a scheduler
Parameters ---------- client : distributed.client.Client Scheduler (e.g., from ``client.scheduler``) ip : bool, optional Include scheduler IP bokeh : bool, optional Include Bokeh visualization IP stats : bool, optional Include stats on cluster (ncores, memory, etc)
Returns ------- list[str] Cluster information items """
else: ip_str = ''
# Bokeh info host = 'localhost' else: info['services'].get('dashboard', '')))
# Worker / memory / etc for w in info['workers'].values()) else: bokeh_str, workers, cores, memory = [''] * 4
|