Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

""" Dask/Distributed related helpers 

""" 

import logging 

import socket 

 

logger = logging.getLogger(__name__) 

 

 

def setup_backend(scheduler, workers=None): 

""" Setup Dask to use threads or processes 

""" 

import dask 

 

if scheduler in ('sync', 'single-threaded', 'synchronous', ): 

logger.debug('Using the synchronous/single-threaded backend') 

dask.config.set(scheduler='synchronous') 

elif scheduler == 'threads': 

if workers: 

from multiprocessing.pool import ThreadPool 

pool = ThreadPool(int(workers)) 

else: 

pool = None 

logger.debug(f'Using `threads` backend (n={workers or "auto"})') 

dask.config.set(scheduler='threads', pool=pool) 

elif scheduler == 'processes': 

if workers: 

from multiprocessing import Pool 

pool = Pool(int(workers)) 

else: 

pool = None 

logger.debug(f'Using `processes` backend (n={workers or "auto"})') 

dask.config.set(scheduler='processes', pool=pool) 

else: 

raise KeyError(f'Unsupported scheduler "{scheduler}"') 

 

 

def setup_executor(address=None, n_workers=None, threads_per_worker=1, **kwds): 

""" Setup a Dask distributed cluster scheduler client 

 

Parameters 

---------- 

address : str, optional 

This can be the address of a ``Scheduler`` server, like a string 

``'127.0.0.1:8786'``. If ``None``, sets up a ``LocalCluster`` 

n_workers : int, optional 

Number of workers. Only used if setting up a ``LocalCluster`` 

threads_per_worker : int, optional 

Number of threads per worker 

kwds 

Additional options passed to :py:func:`distributed.Client` 

 

Returns 

------- 

distributed.Client 

Distributed compute client 

""" 

import distributed 

try: 

client = distributed.Client(address=address, 

n_workers=n_workers, 

threads_per_worker=threads_per_worker, 

**kwds) 

except Exception as e: 

logger.exception('Could not start `distributed` cluster') 

raise 

else: 

return client 

 

 

def executor_info(client, ip=True, bokeh=True, stats=True): 

""" Return a list of strings with info for a scheduler 

 

Parameters 

---------- 

client : distributed.client.Client 

Scheduler (e.g., from ``client.scheduler``) 

ip : bool, optional 

Include scheduler IP 

bokeh : bool, optional 

Include Bokeh visualization IP 

stats : bool, optional 

Include stats on cluster (ncores, memory, etc) 

 

Returns 

------- 

list[str] 

Cluster information items 

""" 

import distributed 

info = client.scheduler_info() 

 

if client.scheduler is not None: 

ip_str = client.scheduler.address 

else: 

ip_str = '' 

 

if info: 

# Bokeh info 

protocol, rest = client.scheduler.address.split('://') 

if protocol == 'inproc': # process/thread server 

host = 'localhost' 

else: 

host = rest.split(':')[0] 

port = (info['services'].get('bokeh', 

info['services'].get('dashboard', ''))) 

bokeh_str = 'http://{host}:{port}/status'.format(host=host, port=port) 

 

# Worker / memory / etc 

workers = len(info['workers']) 

cores = sum(w.get('ncores', w.get('nthreads', 1)) 

for w in info['workers'].values()) 

memory = sum(w['memory_limit'] for w in info['workers'].values()) 

memory = distributed.utils.format_bytes(memory) 

else: 

bokeh_str, workers, cores, memory = [''] * 4 

 

infos = [] 

if ip: 

infos.append('Scheduler: {0}'.format(ip_str)) 

infos.append('Host: {0}'.format(socket.gethostname())) 

if bokeh: 

infos.append('Bokeh: {0}'.format(bokeh_str)) 

if stats: 

infos.append('Workers: {0}'.format(workers)) 

infos.append('Cores: {0}'.format(cores)) 

infos.append('Memory: {0}'.format(memory)) 

 

return infos