##// END OF EJS Templates
pass config obj to Scheduler as dict...
MinRK -
Show More
@@ -1,117 +1,119 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
3 This is a collection of one Hub and several Schedulers.
3 This is a collection of one Hub and several Schedulers.
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010 The IPython Development Team
6 # Copyright (C) 2010 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
15 from __future__ import print_function
16
16
17 from multiprocessing import Process
17 from multiprocessing import Process
18
18
19 import zmq
19 import zmq
20 from zmq.devices import ProcessMonitoredQueue
20 from zmq.devices import ProcessMonitoredQueue
21 # internal:
21 # internal:
22 from IPython.utils.importstring import import_item
22 from IPython.utils.importstring import import_item
23 from IPython.utils.traitlets import Int, CStr, Instance, List, Bool
23 from IPython.utils.traitlets import Int, CStr, Instance, List, Bool
24
24
25 from IPython.parallel.util import signal_children
25 from IPython.parallel.util import signal_children
26 from .hub import Hub, HubFactory
26 from .hub import Hub, HubFactory
27 from .scheduler import launch_scheduler
27 from .scheduler import launch_scheduler
28
28
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30 # Configurable
30 # Configurable
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32
32
33
33
34 class ControllerFactory(HubFactory):
34 class ControllerFactory(HubFactory):
35 """Configurable for setting up a Hub and Schedulers."""
35 """Configurable for setting up a Hub and Schedulers."""
36
36
37 usethreads = Bool(False, config=True)
37 usethreads = Bool(False, config=True)
38 # pure-zmq downstream HWM
38 # pure-zmq downstream HWM
39 hwm = Int(0, config=True)
39 hwm = Int(0, config=True)
40
40
41 # internal
41 # internal
42 children = List()
42 children = List()
43 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
43 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
44
44
45 def _usethreads_changed(self, name, old, new):
45 def _usethreads_changed(self, name, old, new):
46 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
46 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
47
47
48 def __init__(self, **kwargs):
48 def __init__(self, **kwargs):
49 super(ControllerFactory, self).__init__(**kwargs)
49 super(ControllerFactory, self).__init__(**kwargs)
50 self.subconstructors.append(self.construct_schedulers)
50 self.subconstructors.append(self.construct_schedulers)
51
51
52 def start(self):
52 def start(self):
53 super(ControllerFactory, self).start()
53 super(ControllerFactory, self).start()
54 child_procs = []
54 child_procs = []
55 for child in self.children:
55 for child in self.children:
56 child.start()
56 child.start()
57 if isinstance(child, ProcessMonitoredQueue):
57 if isinstance(child, ProcessMonitoredQueue):
58 child_procs.append(child.launcher)
58 child_procs.append(child.launcher)
59 elif isinstance(child, Process):
59 elif isinstance(child, Process):
60 child_procs.append(child)
60 child_procs.append(child)
61 if child_procs:
61 if child_procs:
62 signal_children(child_procs)
62 signal_children(child_procs)
63
63
64
64
65 def construct_schedulers(self):
65 def construct_schedulers(self):
66 children = self.children
66 children = self.children
67 mq = import_item(self.mq_class)
67 mq = import_item(self.mq_class)
68
68
69 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
69 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
70 # IOPub relay (in a Process)
70 # IOPub relay (in a Process)
71 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
71 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
72 q.bind_in(self.client_info['iopub'])
72 q.bind_in(self.client_info['iopub'])
73 q.bind_out(self.engine_info['iopub'])
73 q.bind_out(self.engine_info['iopub'])
74 q.setsockopt_out(zmq.SUBSCRIBE, '')
74 q.setsockopt_out(zmq.SUBSCRIBE, '')
75 q.connect_mon(self.monitor_url)
75 q.connect_mon(self.monitor_url)
76 q.daemon=True
76 q.daemon=True
77 children.append(q)
77 children.append(q)
78
78
79 # Multiplexer Queue (in a Process)
79 # Multiplexer Queue (in a Process)
80 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
80 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
81 q.bind_in(self.client_info['mux'])
81 q.bind_in(self.client_info['mux'])
82 q.setsockopt_in(zmq.IDENTITY, 'mux')
82 q.setsockopt_in(zmq.IDENTITY, 'mux')
83 q.bind_out(self.engine_info['mux'])
83 q.bind_out(self.engine_info['mux'])
84 q.connect_mon(self.monitor_url)
84 q.connect_mon(self.monitor_url)
85 q.daemon=True
85 q.daemon=True
86 children.append(q)
86 children.append(q)
87
87
88 # Control Queue (in a Process)
88 # Control Queue (in a Process)
89 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
89 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
90 q.bind_in(self.client_info['control'])
90 q.bind_in(self.client_info['control'])
91 q.setsockopt_in(zmq.IDENTITY, 'control')
91 q.setsockopt_in(zmq.IDENTITY, 'control')
92 q.bind_out(self.engine_info['control'])
92 q.bind_out(self.engine_info['control'])
93 q.connect_mon(self.monitor_url)
93 q.connect_mon(self.monitor_url)
94 q.daemon=True
94 q.daemon=True
95 children.append(q)
95 children.append(q)
96 # Task Queue (in a Process)
96 # Task Queue (in a Process)
97 if self.scheme == 'pure':
97 if self.scheme == 'pure':
98 self.log.warn("task::using pure XREQ Task scheduler")
98 self.log.warn("task::using pure XREQ Task scheduler")
99 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
99 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
100 q.setsockopt_out(zmq.HWM, self.hwm)
100 q.setsockopt_out(zmq.HWM, self.hwm)
101 q.bind_in(self.client_info['task'][1])
101 q.bind_in(self.client_info['task'][1])
102 q.setsockopt_in(zmq.IDENTITY, 'task')
102 q.setsockopt_in(zmq.IDENTITY, 'task')
103 q.bind_out(self.engine_info['task'])
103 q.bind_out(self.engine_info['task'])
104 q.connect_mon(self.monitor_url)
104 q.connect_mon(self.monitor_url)
105 q.daemon=True
105 q.daemon=True
106 children.append(q)
106 children.append(q)
107 elif self.scheme == 'none':
107 elif self.scheme == 'none':
108 self.log.warn("task::using no Task scheduler")
108 self.log.warn("task::using no Task scheduler")
109
109
110 else:
110 else:
111 self.log.info("task::using Python %s Task scheduler"%self.scheme)
111 self.log.info("task::using Python %s Task scheduler"%self.scheme)
112 sargs = (self.client_info['task'][1], self.engine_info['task'], self.monitor_url, self.client_info['notification'])
112 sargs = (self.client_info['task'][1], self.engine_info['task'],
113 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config)
113 self.monitor_url, self.client_info['notification'])
114 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level,
115 config=dict(self.config))
114 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
116 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
115 q.daemon=True
117 q.daemon=True
116 children.append(q)
118 children.append(q)
117
119
@@ -1,592 +1,596 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #----------------------------------------------------------------------
14 #----------------------------------------------------------------------
15 # Imports
15 # Imports
16 #----------------------------------------------------------------------
16 #----------------------------------------------------------------------
17
17
18 from __future__ import print_function
18 from __future__ import print_function
19
19
20 import logging
20 import logging
21 import sys
21 import sys
22
22
23 from datetime import datetime, timedelta
23 from datetime import datetime, timedelta
24 from random import randint, random
24 from random import randint, random
25 from types import FunctionType
25 from types import FunctionType
26
26
27 try:
27 try:
28 import numpy
28 import numpy
29 except ImportError:
29 except ImportError:
30 numpy = None
30 numpy = None
31
31
32 import zmq
32 import zmq
33 from zmq.eventloop import ioloop, zmqstream
33 from zmq.eventloop import ioloop, zmqstream
34
34
35 # local imports
35 # local imports
36 from IPython.external.decorator import decorator
36 from IPython.external.decorator import decorator
37 from IPython.config.loader import Config
37 from IPython.utils.traitlets import Instance, Dict, List, Set
38 from IPython.utils.traitlets import Instance, Dict, List, Set
38
39
39 from IPython.parallel import error
40 from IPython.parallel import error
40 from IPython.parallel.factory import SessionFactory
41 from IPython.parallel.factory import SessionFactory
41 from IPython.parallel.util import connect_logger, local_logger
42 from IPython.parallel.util import connect_logger, local_logger
42
43
43 from .dependency import Dependency
44 from .dependency import Dependency
44
45
45 @decorator
46 @decorator
46 def logged(f,self,*args,**kwargs):
47 def logged(f,self,*args,**kwargs):
47 # print ("#--------------------")
48 # print ("#--------------------")
48 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
49 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
49 # print ("#--")
50 # print ("#--")
50 return f(self,*args, **kwargs)
51 return f(self,*args, **kwargs)
51
52
52 #----------------------------------------------------------------------
53 #----------------------------------------------------------------------
53 # Chooser functions
54 # Chooser functions
54 #----------------------------------------------------------------------
55 #----------------------------------------------------------------------
55
56
56 def plainrandom(loads):
57 def plainrandom(loads):
57 """Plain random pick."""
58 """Plain random pick."""
58 n = len(loads)
59 n = len(loads)
59 return randint(0,n-1)
60 return randint(0,n-1)
60
61
61 def lru(loads):
62 def lru(loads):
62 """Always pick the front of the line.
63 """Always pick the front of the line.
63
64
64 The content of `loads` is ignored.
65 The content of `loads` is ignored.
65
66
66 Assumes LRU ordering of loads, with oldest first.
67 Assumes LRU ordering of loads, with oldest first.
67 """
68 """
68 return 0
69 return 0
69
70
70 def twobin(loads):
71 def twobin(loads):
71 """Pick two at random, use the LRU of the two.
72 """Pick two at random, use the LRU of the two.
72
73
73 The content of loads is ignored.
74 The content of loads is ignored.
74
75
75 Assumes LRU ordering of loads, with oldest first.
76 Assumes LRU ordering of loads, with oldest first.
76 """
77 """
77 n = len(loads)
78 n = len(loads)
78 a = randint(0,n-1)
79 a = randint(0,n-1)
79 b = randint(0,n-1)
80 b = randint(0,n-1)
80 return min(a,b)
81 return min(a,b)
81
82
82 def weighted(loads):
83 def weighted(loads):
83 """Pick two at random using inverse load as weight.
84 """Pick two at random using inverse load as weight.
84
85
85 Return the less loaded of the two.
86 Return the less loaded of the two.
86 """
87 """
87 # weight 0 a million times more than 1:
88 # weight 0 a million times more than 1:
88 weights = 1./(1e-6+numpy.array(loads))
89 weights = 1./(1e-6+numpy.array(loads))
89 sums = weights.cumsum()
90 sums = weights.cumsum()
90 t = sums[-1]
91 t = sums[-1]
91 x = random()*t
92 x = random()*t
92 y = random()*t
93 y = random()*t
93 idx = 0
94 idx = 0
94 idy = 0
95 idy = 0
95 while sums[idx] < x:
96 while sums[idx] < x:
96 idx += 1
97 idx += 1
97 while sums[idy] < y:
98 while sums[idy] < y:
98 idy += 1
99 idy += 1
99 if weights[idy] > weights[idx]:
100 if weights[idy] > weights[idx]:
100 return idy
101 return idy
101 else:
102 else:
102 return idx
103 return idx
103
104
104 def leastload(loads):
105 def leastload(loads):
105 """Always choose the lowest load.
106 """Always choose the lowest load.
106
107
107 If the lowest load occurs more than once, the first
108 If the lowest load occurs more than once, the first
108 occurance will be used. If loads has LRU ordering, this means
109 occurance will be used. If loads has LRU ordering, this means
109 the LRU of those with the lowest load is chosen.
110 the LRU of those with the lowest load is chosen.
110 """
111 """
111 return loads.index(min(loads))
112 return loads.index(min(loads))
112
113
113 #---------------------------------------------------------------------
114 #---------------------------------------------------------------------
114 # Classes
115 # Classes
115 #---------------------------------------------------------------------
116 #---------------------------------------------------------------------
116 # store empty default dependency:
117 # store empty default dependency:
117 MET = Dependency([])
118 MET = Dependency([])
118
119
119 class TaskScheduler(SessionFactory):
120 class TaskScheduler(SessionFactory):
120 """Python TaskScheduler object.
121 """Python TaskScheduler object.
121
122
122 This is the simplest object that supports msg_id based
123 This is the simplest object that supports msg_id based
123 DAG dependencies. *Only* task msg_ids are checked, not
124 DAG dependencies. *Only* task msg_ids are checked, not
124 msg_ids of jobs submitted via the MUX queue.
125 msg_ids of jobs submitted via the MUX queue.
125
126
126 """
127 """
127
128
128 # input arguments:
129 # input arguments:
129 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
130 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
130 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
131 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
131 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
132 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
132 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
133 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
133 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
134 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
134
135
135 # internals:
136 # internals:
136 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
137 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
137 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
138 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
138 pending = Dict() # dict by engine_uuid of submitted tasks
139 pending = Dict() # dict by engine_uuid of submitted tasks
139 completed = Dict() # dict by engine_uuid of completed tasks
140 completed = Dict() # dict by engine_uuid of completed tasks
140 failed = Dict() # dict by engine_uuid of failed tasks
141 failed = Dict() # dict by engine_uuid of failed tasks
141 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
142 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
142 clients = Dict() # dict by msg_id for who submitted the task
143 clients = Dict() # dict by msg_id for who submitted the task
143 targets = List() # list of target IDENTs
144 targets = List() # list of target IDENTs
144 loads = List() # list of engine loads
145 loads = List() # list of engine loads
145 all_completed = Set() # set of all completed tasks
146 all_completed = Set() # set of all completed tasks
146 all_failed = Set() # set of all failed tasks
147 all_failed = Set() # set of all failed tasks
147 all_done = Set() # set of all finished tasks=union(completed,failed)
148 all_done = Set() # set of all finished tasks=union(completed,failed)
148 all_ids = Set() # set of all submitted task IDs
149 all_ids = Set() # set of all submitted task IDs
149 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
150 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
150 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
151 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
151
152
152
153
153 def start(self):
154 def start(self):
154 self.engine_stream.on_recv(self.dispatch_result, copy=False)
155 self.engine_stream.on_recv(self.dispatch_result, copy=False)
155 self._notification_handlers = dict(
156 self._notification_handlers = dict(
156 registration_notification = self._register_engine,
157 registration_notification = self._register_engine,
157 unregistration_notification = self._unregister_engine
158 unregistration_notification = self._unregister_engine
158 )
159 )
159 self.notifier_stream.on_recv(self.dispatch_notification)
160 self.notifier_stream.on_recv(self.dispatch_notification)
160 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
161 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
161 self.auditor.start()
162 self.auditor.start()
162 self.log.info("Scheduler started...%r"%self)
163 self.log.info("Scheduler started...%r"%self)
163
164
164 def resume_receiving(self):
165 def resume_receiving(self):
165 """Resume accepting jobs."""
166 """Resume accepting jobs."""
166 self.client_stream.on_recv(self.dispatch_submission, copy=False)
167 self.client_stream.on_recv(self.dispatch_submission, copy=False)
167
168
168 def stop_receiving(self):
169 def stop_receiving(self):
169 """Stop accepting jobs while there are no engines.
170 """Stop accepting jobs while there are no engines.
170 Leave them in the ZMQ queue."""
171 Leave them in the ZMQ queue."""
171 self.client_stream.on_recv(None)
172 self.client_stream.on_recv(None)
172
173
173 #-----------------------------------------------------------------------
174 #-----------------------------------------------------------------------
174 # [Un]Registration Handling
175 # [Un]Registration Handling
175 #-----------------------------------------------------------------------
176 #-----------------------------------------------------------------------
176
177
177 def dispatch_notification(self, msg):
178 def dispatch_notification(self, msg):
178 """dispatch register/unregister events."""
179 """dispatch register/unregister events."""
179 idents,msg = self.session.feed_identities(msg)
180 idents,msg = self.session.feed_identities(msg)
180 msg = self.session.unpack_message(msg)
181 msg = self.session.unpack_message(msg)
181 msg_type = msg['msg_type']
182 msg_type = msg['msg_type']
182 handler = self._notification_handlers.get(msg_type, None)
183 handler = self._notification_handlers.get(msg_type, None)
183 if handler is None:
184 if handler is None:
184 raise Exception("Unhandled message type: %s"%msg_type)
185 raise Exception("Unhandled message type: %s"%msg_type)
185 else:
186 else:
186 try:
187 try:
187 handler(str(msg['content']['queue']))
188 handler(str(msg['content']['queue']))
188 except KeyError:
189 except KeyError:
189 self.log.error("task::Invalid notification msg: %s"%msg)
190 self.log.error("task::Invalid notification msg: %s"%msg)
190
191
191 @logged
192 @logged
192 def _register_engine(self, uid):
193 def _register_engine(self, uid):
193 """New engine with ident `uid` became available."""
194 """New engine with ident `uid` became available."""
194 # head of the line:
195 # head of the line:
195 self.targets.insert(0,uid)
196 self.targets.insert(0,uid)
196 self.loads.insert(0,0)
197 self.loads.insert(0,0)
197 # initialize sets
198 # initialize sets
198 self.completed[uid] = set()
199 self.completed[uid] = set()
199 self.failed[uid] = set()
200 self.failed[uid] = set()
200 self.pending[uid] = {}
201 self.pending[uid] = {}
201 if len(self.targets) == 1:
202 if len(self.targets) == 1:
202 self.resume_receiving()
203 self.resume_receiving()
203
204
204 def _unregister_engine(self, uid):
205 def _unregister_engine(self, uid):
205 """Existing engine with ident `uid` became unavailable."""
206 """Existing engine with ident `uid` became unavailable."""
206 if len(self.targets) == 1:
207 if len(self.targets) == 1:
207 # this was our only engine
208 # this was our only engine
208 self.stop_receiving()
209 self.stop_receiving()
209
210
210 # handle any potentially finished tasks:
211 # handle any potentially finished tasks:
211 self.engine_stream.flush()
212 self.engine_stream.flush()
212
213
213 self.completed.pop(uid)
214 self.completed.pop(uid)
214 self.failed.pop(uid)
215 self.failed.pop(uid)
215 # don't pop destinations, because it might be used later
216 # don't pop destinations, because it might be used later
216 # map(self.destinations.pop, self.completed.pop(uid))
217 # map(self.destinations.pop, self.completed.pop(uid))
217 # map(self.destinations.pop, self.failed.pop(uid))
218 # map(self.destinations.pop, self.failed.pop(uid))
218
219
219 idx = self.targets.index(uid)
220 idx = self.targets.index(uid)
220 self.targets.pop(idx)
221 self.targets.pop(idx)
221 self.loads.pop(idx)
222 self.loads.pop(idx)
222
223
223 # wait 5 seconds before cleaning up pending jobs, since the results might
224 # wait 5 seconds before cleaning up pending jobs, since the results might
224 # still be incoming
225 # still be incoming
225 if self.pending[uid]:
226 if self.pending[uid]:
226 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
227 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
227 dc.start()
228 dc.start()
228
229
229 @logged
230 @logged
230 def handle_stranded_tasks(self, engine):
231 def handle_stranded_tasks(self, engine):
231 """Deal with jobs resident in an engine that died."""
232 """Deal with jobs resident in an engine that died."""
232 lost = self.pending.pop(engine)
233 lost = self.pending.pop(engine)
233
234
234 for msg_id, (raw_msg, targets, MET, follow, timeout) in lost.iteritems():
235 for msg_id, (raw_msg, targets, MET, follow, timeout) in lost.iteritems():
235 self.all_failed.add(msg_id)
236 self.all_failed.add(msg_id)
236 self.all_done.add(msg_id)
237 self.all_done.add(msg_id)
237 idents,msg = self.session.feed_identities(raw_msg, copy=False)
238 idents,msg = self.session.feed_identities(raw_msg, copy=False)
238 msg = self.session.unpack_message(msg, copy=False, content=False)
239 msg = self.session.unpack_message(msg, copy=False, content=False)
239 parent = msg['header']
240 parent = msg['header']
240 idents = [idents[0],engine]+idents[1:]
241 idents = [idents[0],engine]+idents[1:]
241 # print (idents)
242 # print (idents)
242 try:
243 try:
243 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
244 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
244 except:
245 except:
245 content = error.wrap_exception()
246 content = error.wrap_exception()
246 msg = self.session.send(self.client_stream, 'apply_reply', content,
247 msg = self.session.send(self.client_stream, 'apply_reply', content,
247 parent=parent, ident=idents)
248 parent=parent, ident=idents)
248 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
249 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
249 self.update_graph(msg_id)
250 self.update_graph(msg_id)
250
251
251
252
252 #-----------------------------------------------------------------------
253 #-----------------------------------------------------------------------
253 # Job Submission
254 # Job Submission
254 #-----------------------------------------------------------------------
255 #-----------------------------------------------------------------------
255 @logged
256 @logged
256 def dispatch_submission(self, raw_msg):
257 def dispatch_submission(self, raw_msg):
257 """Dispatch job submission to appropriate handlers."""
258 """Dispatch job submission to appropriate handlers."""
258 # ensure targets up to date:
259 # ensure targets up to date:
259 self.notifier_stream.flush()
260 self.notifier_stream.flush()
260 try:
261 try:
261 idents, msg = self.session.feed_identities(raw_msg, copy=False)
262 idents, msg = self.session.feed_identities(raw_msg, copy=False)
262 msg = self.session.unpack_message(msg, content=False, copy=False)
263 msg = self.session.unpack_message(msg, content=False, copy=False)
263 except:
264 except:
264 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
265 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
265 return
266 return
266
267
267 # send to monitor
268 # send to monitor
268 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
269 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
269
270
270 header = msg['header']
271 header = msg['header']
271 msg_id = header['msg_id']
272 msg_id = header['msg_id']
272 self.all_ids.add(msg_id)
273 self.all_ids.add(msg_id)
273
274
274 # targets
275 # targets
275 targets = set(header.get('targets', []))
276 targets = set(header.get('targets', []))
276
277
277 # time dependencies
278 # time dependencies
278 after = Dependency(header.get('after', []))
279 after = Dependency(header.get('after', []))
279 if after.all:
280 if after.all:
280 if after.success:
281 if after.success:
281 after.difference_update(self.all_completed)
282 after.difference_update(self.all_completed)
282 if after.failure:
283 if after.failure:
283 after.difference_update(self.all_failed)
284 after.difference_update(self.all_failed)
284 if after.check(self.all_completed, self.all_failed):
285 if after.check(self.all_completed, self.all_failed):
285 # recast as empty set, if `after` already met,
286 # recast as empty set, if `after` already met,
286 # to prevent unnecessary set comparisons
287 # to prevent unnecessary set comparisons
287 after = MET
288 after = MET
288
289
289 # location dependencies
290 # location dependencies
290 follow = Dependency(header.get('follow', []))
291 follow = Dependency(header.get('follow', []))
291
292
292 # turn timeouts into datetime objects:
293 # turn timeouts into datetime objects:
293 timeout = header.get('timeout', None)
294 timeout = header.get('timeout', None)
294 if timeout:
295 if timeout:
295 timeout = datetime.now() + timedelta(0,timeout,0)
296 timeout = datetime.now() + timedelta(0,timeout,0)
296
297
297 args = [raw_msg, targets, after, follow, timeout]
298 args = [raw_msg, targets, after, follow, timeout]
298
299
299 # validate and reduce dependencies:
300 # validate and reduce dependencies:
300 for dep in after,follow:
301 for dep in after,follow:
301 # check valid:
302 # check valid:
302 if msg_id in dep or dep.difference(self.all_ids):
303 if msg_id in dep or dep.difference(self.all_ids):
303 self.depending[msg_id] = args
304 self.depending[msg_id] = args
304 return self.fail_unreachable(msg_id, error.InvalidDependency)
305 return self.fail_unreachable(msg_id, error.InvalidDependency)
305 # check if unreachable:
306 # check if unreachable:
306 if dep.unreachable(self.all_completed, self.all_failed):
307 if dep.unreachable(self.all_completed, self.all_failed):
307 self.depending[msg_id] = args
308 self.depending[msg_id] = args
308 return self.fail_unreachable(msg_id)
309 return self.fail_unreachable(msg_id)
309
310
310 if after.check(self.all_completed, self.all_failed):
311 if after.check(self.all_completed, self.all_failed):
311 # time deps already met, try to run
312 # time deps already met, try to run
312 if not self.maybe_run(msg_id, *args):
313 if not self.maybe_run(msg_id, *args):
313 # can't run yet
314 # can't run yet
314 self.save_unmet(msg_id, *args)
315 self.save_unmet(msg_id, *args)
315 else:
316 else:
316 self.save_unmet(msg_id, *args)
317 self.save_unmet(msg_id, *args)
317
318
318 # @logged
319 # @logged
319 def audit_timeouts(self):
320 def audit_timeouts(self):
320 """Audit all waiting tasks for expired timeouts."""
321 """Audit all waiting tasks for expired timeouts."""
321 now = datetime.now()
322 now = datetime.now()
322 for msg_id in self.depending.keys():
323 for msg_id in self.depending.keys():
323 # must recheck, in case one failure cascaded to another:
324 # must recheck, in case one failure cascaded to another:
324 if msg_id in self.depending:
325 if msg_id in self.depending:
325 raw,after,targets,follow,timeout = self.depending[msg_id]
326 raw,after,targets,follow,timeout = self.depending[msg_id]
326 if timeout and timeout < now:
327 if timeout and timeout < now:
327 self.fail_unreachable(msg_id, timeout=True)
328 self.fail_unreachable(msg_id, timeout=True)
328
329
329 @logged
330 @logged
330 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
331 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
331 """a task has become unreachable, send a reply with an ImpossibleDependency
332 """a task has become unreachable, send a reply with an ImpossibleDependency
332 error."""
333 error."""
333 if msg_id not in self.depending:
334 if msg_id not in self.depending:
334 self.log.error("msg %r already failed!"%msg_id)
335 self.log.error("msg %r already failed!"%msg_id)
335 return
336 return
336 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
337 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
337 for mid in follow.union(after):
338 for mid in follow.union(after):
338 if mid in self.graph:
339 if mid in self.graph:
339 self.graph[mid].remove(msg_id)
340 self.graph[mid].remove(msg_id)
340
341
341 # FIXME: unpacking a message I've already unpacked, but didn't save:
342 # FIXME: unpacking a message I've already unpacked, but didn't save:
342 idents,msg = self.session.feed_identities(raw_msg, copy=False)
343 idents,msg = self.session.feed_identities(raw_msg, copy=False)
343 msg = self.session.unpack_message(msg, copy=False, content=False)
344 msg = self.session.unpack_message(msg, copy=False, content=False)
344 header = msg['header']
345 header = msg['header']
345
346
346 try:
347 try:
347 raise why()
348 raise why()
348 except:
349 except:
349 content = error.wrap_exception()
350 content = error.wrap_exception()
350
351
351 self.all_done.add(msg_id)
352 self.all_done.add(msg_id)
352 self.all_failed.add(msg_id)
353 self.all_failed.add(msg_id)
353
354
354 msg = self.session.send(self.client_stream, 'apply_reply', content,
355 msg = self.session.send(self.client_stream, 'apply_reply', content,
355 parent=header, ident=idents)
356 parent=header, ident=idents)
356 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
357 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
357
358
358 self.update_graph(msg_id, success=False)
359 self.update_graph(msg_id, success=False)
359
360
360 @logged
361 @logged
361 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
362 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
362 """check location dependencies, and run if they are met."""
363 """check location dependencies, and run if they are met."""
363 blacklist = self.blacklist.setdefault(msg_id, set())
364 blacklist = self.blacklist.setdefault(msg_id, set())
364 if follow or targets or blacklist:
365 if follow or targets or blacklist:
365 # we need a can_run filter
366 # we need a can_run filter
366 def can_run(idx):
367 def can_run(idx):
367 target = self.targets[idx]
368 target = self.targets[idx]
368 # check targets
369 # check targets
369 if targets and target not in targets:
370 if targets and target not in targets:
370 return False
371 return False
371 # check blacklist
372 # check blacklist
372 if target in blacklist:
373 if target in blacklist:
373 return False
374 return False
374 # check follow
375 # check follow
375 return follow.check(self.completed[target], self.failed[target])
376 return follow.check(self.completed[target], self.failed[target])
376
377
377 indices = filter(can_run, range(len(self.targets)))
378 indices = filter(can_run, range(len(self.targets)))
378 if not indices:
379 if not indices:
379 # couldn't run
380 # couldn't run
380 if follow.all:
381 if follow.all:
381 # check follow for impossibility
382 # check follow for impossibility
382 dests = set()
383 dests = set()
383 relevant = set()
384 relevant = set()
384 if follow.success:
385 if follow.success:
385 relevant = self.all_completed
386 relevant = self.all_completed
386 if follow.failure:
387 if follow.failure:
387 relevant = relevant.union(self.all_failed)
388 relevant = relevant.union(self.all_failed)
388 for m in follow.intersection(relevant):
389 for m in follow.intersection(relevant):
389 dests.add(self.destinations[m])
390 dests.add(self.destinations[m])
390 if len(dests) > 1:
391 if len(dests) > 1:
391 self.fail_unreachable(msg_id)
392 self.fail_unreachable(msg_id)
392 return False
393 return False
393 if targets:
394 if targets:
394 # check blacklist+targets for impossibility
395 # check blacklist+targets for impossibility
395 targets.difference_update(blacklist)
396 targets.difference_update(blacklist)
396 if not targets or not targets.intersection(self.targets):
397 if not targets or not targets.intersection(self.targets):
397 self.fail_unreachable(msg_id)
398 self.fail_unreachable(msg_id)
398 return False
399 return False
399 return False
400 return False
400 else:
401 else:
401 indices = None
402 indices = None
402
403
403 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
404 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
404 return True
405 return True
405
406
406 @logged
407 @logged
407 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
408 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
408 """Save a message for later submission when its dependencies are met."""
409 """Save a message for later submission when its dependencies are met."""
409 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
410 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
410 # track the ids in follow or after, but not those already finished
411 # track the ids in follow or after, but not those already finished
411 for dep_id in after.union(follow).difference(self.all_done):
412 for dep_id in after.union(follow).difference(self.all_done):
412 if dep_id not in self.graph:
413 if dep_id not in self.graph:
413 self.graph[dep_id] = set()
414 self.graph[dep_id] = set()
414 self.graph[dep_id].add(msg_id)
415 self.graph[dep_id].add(msg_id)
415
416
416 @logged
417 @logged
417 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
418 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
418 """Submit a task to any of a subset of our targets."""
419 """Submit a task to any of a subset of our targets."""
419 if indices:
420 if indices:
420 loads = [self.loads[i] for i in indices]
421 loads = [self.loads[i] for i in indices]
421 else:
422 else:
422 loads = self.loads
423 loads = self.loads
423 idx = self.scheme(loads)
424 idx = self.scheme(loads)
424 if indices:
425 if indices:
425 idx = indices[idx]
426 idx = indices[idx]
426 target = self.targets[idx]
427 target = self.targets[idx]
427 # print (target, map(str, msg[:3]))
428 # print (target, map(str, msg[:3]))
428 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
429 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
429 self.engine_stream.send_multipart(raw_msg, copy=False)
430 self.engine_stream.send_multipart(raw_msg, copy=False)
430 self.add_job(idx)
431 self.add_job(idx)
431 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
432 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
432 content = dict(msg_id=msg_id, engine_id=target)
433 content = dict(msg_id=msg_id, engine_id=target)
433 self.session.send(self.mon_stream, 'task_destination', content=content,
434 self.session.send(self.mon_stream, 'task_destination', content=content,
434 ident=['tracktask',self.session.session])
435 ident=['tracktask',self.session.session])
435
436
436 #-----------------------------------------------------------------------
437 #-----------------------------------------------------------------------
437 # Result Handling
438 # Result Handling
438 #-----------------------------------------------------------------------
439 #-----------------------------------------------------------------------
439 @logged
440 @logged
440 def dispatch_result(self, raw_msg):
441 def dispatch_result(self, raw_msg):
441 """dispatch method for result replies"""
442 """dispatch method for result replies"""
442 try:
443 try:
443 idents,msg = self.session.feed_identities(raw_msg, copy=False)
444 idents,msg = self.session.feed_identities(raw_msg, copy=False)
444 msg = self.session.unpack_message(msg, content=False, copy=False)
445 msg = self.session.unpack_message(msg, content=False, copy=False)
445 except:
446 except:
446 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
447 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
447 return
448 return
448
449
449 header = msg['header']
450 header = msg['header']
450 if header.get('dependencies_met', True):
451 if header.get('dependencies_met', True):
451 success = (header['status'] == 'ok')
452 success = (header['status'] == 'ok')
452 self.handle_result(idents, msg['parent_header'], raw_msg, success)
453 self.handle_result(idents, msg['parent_header'], raw_msg, success)
453 # send to Hub monitor
454 # send to Hub monitor
454 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
455 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
455 else:
456 else:
456 self.handle_unmet_dependency(idents, msg['parent_header'])
457 self.handle_unmet_dependency(idents, msg['parent_header'])
457
458
458 @logged
459 @logged
459 def handle_result(self, idents, parent, raw_msg, success=True):
460 def handle_result(self, idents, parent, raw_msg, success=True):
460 """handle a real task result, either success or failure"""
461 """handle a real task result, either success or failure"""
461 # first, relay result to client
462 # first, relay result to client
462 engine = idents[0]
463 engine = idents[0]
463 client = idents[1]
464 client = idents[1]
464 # swap_ids for XREP-XREP mirror
465 # swap_ids for XREP-XREP mirror
465 raw_msg[:2] = [client,engine]
466 raw_msg[:2] = [client,engine]
466 # print (map(str, raw_msg[:4]))
467 # print (map(str, raw_msg[:4]))
467 self.client_stream.send_multipart(raw_msg, copy=False)
468 self.client_stream.send_multipart(raw_msg, copy=False)
468 # now, update our data structures
469 # now, update our data structures
469 msg_id = parent['msg_id']
470 msg_id = parent['msg_id']
470 self.blacklist.pop(msg_id, None)
471 self.blacklist.pop(msg_id, None)
471 self.pending[engine].pop(msg_id)
472 self.pending[engine].pop(msg_id)
472 if success:
473 if success:
473 self.completed[engine].add(msg_id)
474 self.completed[engine].add(msg_id)
474 self.all_completed.add(msg_id)
475 self.all_completed.add(msg_id)
475 else:
476 else:
476 self.failed[engine].add(msg_id)
477 self.failed[engine].add(msg_id)
477 self.all_failed.add(msg_id)
478 self.all_failed.add(msg_id)
478 self.all_done.add(msg_id)
479 self.all_done.add(msg_id)
479 self.destinations[msg_id] = engine
480 self.destinations[msg_id] = engine
480
481
481 self.update_graph(msg_id, success)
482 self.update_graph(msg_id, success)
482
483
483 @logged
484 @logged
484 def handle_unmet_dependency(self, idents, parent):
485 def handle_unmet_dependency(self, idents, parent):
485 """handle an unmet dependency"""
486 """handle an unmet dependency"""
486 engine = idents[0]
487 engine = idents[0]
487 msg_id = parent['msg_id']
488 msg_id = parent['msg_id']
488
489
489 if msg_id not in self.blacklist:
490 if msg_id not in self.blacklist:
490 self.blacklist[msg_id] = set()
491 self.blacklist[msg_id] = set()
491 self.blacklist[msg_id].add(engine)
492 self.blacklist[msg_id].add(engine)
492
493
493 args = self.pending[engine].pop(msg_id)
494 args = self.pending[engine].pop(msg_id)
494 raw,targets,after,follow,timeout = args
495 raw,targets,after,follow,timeout = args
495
496
496 if self.blacklist[msg_id] == targets:
497 if self.blacklist[msg_id] == targets:
497 self.depending[msg_id] = args
498 self.depending[msg_id] = args
498 return self.fail_unreachable(msg_id)
499 return self.fail_unreachable(msg_id)
499
500
500 elif not self.maybe_run(msg_id, *args):
501 elif not self.maybe_run(msg_id, *args):
501 # resubmit failed, put it back in our dependency tree
502 # resubmit failed, put it back in our dependency tree
502 self.save_unmet(msg_id, *args)
503 self.save_unmet(msg_id, *args)
503
504
504
505
505 @logged
506 @logged
506 def update_graph(self, dep_id, success=True):
507 def update_graph(self, dep_id, success=True):
507 """dep_id just finished. Update our dependency
508 """dep_id just finished. Update our dependency
508 graph and submit any jobs that just became runable."""
509 graph and submit any jobs that just became runable."""
509 # print ("\n\n***********")
510 # print ("\n\n***********")
510 # pprint (dep_id)
511 # pprint (dep_id)
511 # pprint (self.graph)
512 # pprint (self.graph)
512 # pprint (self.depending)
513 # pprint (self.depending)
513 # pprint (self.all_completed)
514 # pprint (self.all_completed)
514 # pprint (self.all_failed)
515 # pprint (self.all_failed)
515 # print ("\n\n***********\n\n")
516 # print ("\n\n***********\n\n")
516 if dep_id not in self.graph:
517 if dep_id not in self.graph:
517 return
518 return
518 jobs = self.graph.pop(dep_id)
519 jobs = self.graph.pop(dep_id)
519
520
520 for msg_id in jobs:
521 for msg_id in jobs:
521 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
522 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
522
523
523 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
524 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
524 self.fail_unreachable(msg_id)
525 self.fail_unreachable(msg_id)
525
526
526 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
527 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
527 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
528 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
528
529
529 self.depending.pop(msg_id)
530 self.depending.pop(msg_id)
530 for mid in follow.union(after):
531 for mid in follow.union(after):
531 if mid in self.graph:
532 if mid in self.graph:
532 self.graph[mid].remove(msg_id)
533 self.graph[mid].remove(msg_id)
533
534
534 #----------------------------------------------------------------------
535 #----------------------------------------------------------------------
535 # methods to be overridden by subclasses
536 # methods to be overridden by subclasses
536 #----------------------------------------------------------------------
537 #----------------------------------------------------------------------
537
538
538 def add_job(self, idx):
539 def add_job(self, idx):
539 """Called after self.targets[idx] just got the job with header.
540 """Called after self.targets[idx] just got the job with header.
540 Override with subclasses. The default ordering is simple LRU.
541 Override with subclasses. The default ordering is simple LRU.
541 The default loads are the number of outstanding jobs."""
542 The default loads are the number of outstanding jobs."""
542 self.loads[idx] += 1
543 self.loads[idx] += 1
543 for lis in (self.targets, self.loads):
544 for lis in (self.targets, self.loads):
544 lis.append(lis.pop(idx))
545 lis.append(lis.pop(idx))
545
546
546
547
547 def finish_job(self, idx):
548 def finish_job(self, idx):
548 """Called after self.targets[idx] just finished a job.
549 """Called after self.targets[idx] just finished a job.
549 Override with subclasses."""
550 Override with subclasses."""
550 self.loads[idx] -= 1
551 self.loads[idx] -= 1
551
552
552
553
553
554
554 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
555 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
555 log_addr=None, loglevel=logging.DEBUG, scheme='lru',
556 log_addr=None, loglevel=logging.DEBUG, scheme='lru',
556 identity=b'task'):
557 identity=b'task'):
557 from zmq.eventloop import ioloop
558 from zmq.eventloop import ioloop
558 from zmq.eventloop.zmqstream import ZMQStream
559 from zmq.eventloop.zmqstream import ZMQStream
559
560
561 if config:
562 # unwrap dict back into Config
563 config = Config(config)
564
560 ctx = zmq.Context()
565 ctx = zmq.Context()
561 loop = ioloop.IOLoop()
566 loop = ioloop.IOLoop()
562 print (in_addr, out_addr, mon_addr, not_addr)
563 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
567 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
564 ins.setsockopt(zmq.IDENTITY, identity)
568 ins.setsockopt(zmq.IDENTITY, identity)
565 ins.bind(in_addr)
569 ins.bind(in_addr)
566
570
567 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
571 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
568 outs.setsockopt(zmq.IDENTITY, identity)
572 outs.setsockopt(zmq.IDENTITY, identity)
569 outs.bind(out_addr)
573 outs.bind(out_addr)
570 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
574 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
571 mons.connect(mon_addr)
575 mons.connect(mon_addr)
572 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
576 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
573 nots.setsockopt(zmq.SUBSCRIBE, '')
577 nots.setsockopt(zmq.SUBSCRIBE, '')
574 nots.connect(not_addr)
578 nots.connect(not_addr)
575
579
576 scheme = globals().get(scheme, None)
580 scheme = globals().get(scheme, None)
577 # setup logging
581 # setup logging
578 if log_addr:
582 if log_addr:
579 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
583 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
580 else:
584 else:
581 local_logger(logname, loglevel)
585 local_logger(logname, loglevel)
582
586
583 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
587 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
584 mon_stream=mons, notifier_stream=nots,
588 mon_stream=mons, notifier_stream=nots,
585 scheme=scheme, loop=loop, logname=logname,
589 scheme=scheme, loop=loop, logname=logname,
586 config=config)
590 config=config)
587 scheduler.start()
591 scheduler.start()
588 try:
592 try:
589 loop.start()
593 loop.start()
590 except KeyboardInterrupt:
594 except KeyboardInterrupt:
591 print ("interrupted, exiting...", file=sys.__stderr__)
595 print ("interrupted, exiting...", file=sys.__stderr__)
592
596
General Comments 0
You need to be logged in to leave comments. Login now