##// END OF EJS Templates
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
MinRK -
Show More
@@ -1,1293 +1,1270 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from datetime import datetime
20 from datetime import datetime
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25
25
26 # internal:
26 # internal:
27 from IPython.utils.importstring import import_item
27 from IPython.utils.importstring import import_item
28 from IPython.utils.traitlets import (
28 from IPython.utils.traitlets import (
29 HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CStr
29 HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CStr
30 )
30 )
31
31
32 from IPython.parallel import error, util
32 from IPython.parallel import error, util
33 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
33 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
34
34
35 from .heartmonitor import HeartMonitor
35 from .heartmonitor import HeartMonitor
36
36
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Code
38 # Code
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40
40
41 def _passer(*args, **kwargs):
41 def _passer(*args, **kwargs):
42 return
42 return
43
43
44 def _printer(*args, **kwargs):
44 def _printer(*args, **kwargs):
45 print (args)
45 print (args)
46 print (kwargs)
46 print (kwargs)
47
47
48 def empty_record():
48 def empty_record():
49 """Return an empty dict with all record keys."""
49 """Return an empty dict with all record keys."""
50 return {
50 return {
51 'msg_id' : None,
51 'msg_id' : None,
52 'header' : None,
52 'header' : None,
53 'content': None,
53 'content': None,
54 'buffers': None,
54 'buffers': None,
55 'submitted': None,
55 'submitted': None,
56 'client_uuid' : None,
56 'client_uuid' : None,
57 'engine_uuid' : None,
57 'engine_uuid' : None,
58 'started': None,
58 'started': None,
59 'completed': None,
59 'completed': None,
60 'resubmitted': None,
60 'resubmitted': None,
61 'result_header' : None,
61 'result_header' : None,
62 'result_content' : None,
62 'result_content' : None,
63 'result_buffers' : None,
63 'result_buffers' : None,
64 'queue' : None,
64 'queue' : None,
65 'pyin' : None,
65 'pyin' : None,
66 'pyout': None,
66 'pyout': None,
67 'pyerr': None,
67 'pyerr': None,
68 'stdout': '',
68 'stdout': '',
69 'stderr': '',
69 'stderr': '',
70 }
70 }
71
71
72 def init_record(msg):
72 def init_record(msg):
73 """Initialize a TaskRecord based on a request."""
73 """Initialize a TaskRecord based on a request."""
74 header = msg['header']
74 header = msg['header']
75 return {
75 return {
76 'msg_id' : header['msg_id'],
76 'msg_id' : header['msg_id'],
77 'header' : header,
77 'header' : header,
78 'content': msg['content'],
78 'content': msg['content'],
79 'buffers': msg['buffers'],
79 'buffers': msg['buffers'],
80 'submitted': datetime.strptime(header['date'], util.ISO8601),
80 'submitted': datetime.strptime(header['date'], util.ISO8601),
81 'client_uuid' : None,
81 'client_uuid' : None,
82 'engine_uuid' : None,
82 'engine_uuid' : None,
83 'started': None,
83 'started': None,
84 'completed': None,
84 'completed': None,
85 'resubmitted': None,
85 'resubmitted': None,
86 'result_header' : None,
86 'result_header' : None,
87 'result_content' : None,
87 'result_content' : None,
88 'result_buffers' : None,
88 'result_buffers' : None,
89 'queue' : None,
89 'queue' : None,
90 'pyin' : None,
90 'pyin' : None,
91 'pyout': None,
91 'pyout': None,
92 'pyerr': None,
92 'pyerr': None,
93 'stdout': '',
93 'stdout': '',
94 'stderr': '',
94 'stderr': '',
95 }
95 }
96
96
97
97
98 class EngineConnector(HasTraits):
98 class EngineConnector(HasTraits):
99 """A simple object for accessing the various zmq connections of an object.
99 """A simple object for accessing the various zmq connections of an object.
100 Attributes are:
100 Attributes are:
101 id (int): engine ID
101 id (int): engine ID
102 uuid (str): uuid (unused?)
102 uuid (str): uuid (unused?)
103 queue (str): identity of queue's XREQ socket
103 queue (str): identity of queue's XREQ socket
104 registration (str): identity of registration XREQ socket
104 registration (str): identity of registration XREQ socket
105 heartbeat (str): identity of heartbeat XREQ socket
105 heartbeat (str): identity of heartbeat XREQ socket
106 """
106 """
107 id=Int(0)
107 id=Int(0)
108 queue=CStr()
108 queue=CStr()
109 control=CStr()
109 control=CStr()
110 registration=CStr()
110 registration=CStr()
111 heartbeat=CStr()
111 heartbeat=CStr()
112 pending=Set()
112 pending=Set()
113
113
114 class HubFactory(RegistrationFactory):
114 class HubFactory(RegistrationFactory):
115 """The Configurable for setting up a Hub."""
115 """The Configurable for setting up a Hub."""
116
116
117 # port-pairs for monitoredqueues:
117 # port-pairs for monitoredqueues:
118 hb = Tuple(Int,Int,config=True,
118 hb = Tuple(Int,Int,config=True,
119 help="""XREQ/SUB Port pair for Engine heartbeats""")
119 help="""XREQ/SUB Port pair for Engine heartbeats""")
120 def _hb_default(self):
120 def _hb_default(self):
121 return tuple(util.select_random_ports(2))
121 return tuple(util.select_random_ports(2))
122
122
123 mux = Tuple(Int,Int,config=True,
123 mux = Tuple(Int,Int,config=True,
124 help="""Engine/Client Port pair for MUX queue""")
124 help="""Engine/Client Port pair for MUX queue""")
125
125
126 def _mux_default(self):
126 def _mux_default(self):
127 return tuple(util.select_random_ports(2))
127 return tuple(util.select_random_ports(2))
128
128
129 task = Tuple(Int,Int,config=True,
129 task = Tuple(Int,Int,config=True,
130 help="""Engine/Client Port pair for Task queue""")
130 help="""Engine/Client Port pair for Task queue""")
131 def _task_default(self):
131 def _task_default(self):
132 return tuple(util.select_random_ports(2))
132 return tuple(util.select_random_ports(2))
133
133
134 control = Tuple(Int,Int,config=True,
134 control = Tuple(Int,Int,config=True,
135 help="""Engine/Client Port pair for Control queue""")
135 help="""Engine/Client Port pair for Control queue""")
136
136
137 def _control_default(self):
137 def _control_default(self):
138 return tuple(util.select_random_ports(2))
138 return tuple(util.select_random_ports(2))
139
139
140 iopub = Tuple(Int,Int,config=True,
140 iopub = Tuple(Int,Int,config=True,
141 help="""Engine/Client Port pair for IOPub relay""")
141 help="""Engine/Client Port pair for IOPub relay""")
142
142
143 def _iopub_default(self):
143 def _iopub_default(self):
144 return tuple(util.select_random_ports(2))
144 return tuple(util.select_random_ports(2))
145
145
146 # single ports:
146 # single ports:
147 mon_port = Int(config=True,
147 mon_port = Int(config=True,
148 help="""Monitor (SUB) port for queue traffic""")
148 help="""Monitor (SUB) port for queue traffic""")
149
149
150 def _mon_port_default(self):
150 def _mon_port_default(self):
151 return util.select_random_ports(1)[0]
151 return util.select_random_ports(1)[0]
152
152
153 notifier_port = Int(config=True,
153 notifier_port = Int(config=True,
154 help="""PUB port for sending engine status notifications""")
154 help="""PUB port for sending engine status notifications""")
155
155
156 def _notifier_port_default(self):
156 def _notifier_port_default(self):
157 return util.select_random_ports(1)[0]
157 return util.select_random_ports(1)[0]
158
158
159 engine_ip = Unicode('127.0.0.1', config=True,
159 engine_ip = Unicode('127.0.0.1', config=True,
160 help="IP on which to listen for engine connections. [default: loopback]")
160 help="IP on which to listen for engine connections. [default: loopback]")
161 engine_transport = Unicode('tcp', config=True,
161 engine_transport = Unicode('tcp', config=True,
162 help="0MQ transport for engine connections. [default: tcp]")
162 help="0MQ transport for engine connections. [default: tcp]")
163
163
164 client_ip = Unicode('127.0.0.1', config=True,
164 client_ip = Unicode('127.0.0.1', config=True,
165 help="IP on which to listen for client connections. [default: loopback]")
165 help="IP on which to listen for client connections. [default: loopback]")
166 client_transport = Unicode('tcp', config=True,
166 client_transport = Unicode('tcp', config=True,
167 help="0MQ transport for client connections. [default : tcp]")
167 help="0MQ transport for client connections. [default : tcp]")
168
168
169 monitor_ip = Unicode('127.0.0.1', config=True,
169 monitor_ip = Unicode('127.0.0.1', config=True,
170 help="IP on which to listen for monitor messages. [default: loopback]")
170 help="IP on which to listen for monitor messages. [default: loopback]")
171 monitor_transport = Unicode('tcp', config=True,
171 monitor_transport = Unicode('tcp', config=True,
172 help="0MQ transport for monitor messages. [default : tcp]")
172 help="0MQ transport for monitor messages. [default : tcp]")
173
173
174 monitor_url = Unicode('')
174 monitor_url = Unicode('')
175
175
176 db_class = Unicode('IPython.parallel.controller.dictdb.DictDB', config=True,
176 db_class = Unicode('IPython.parallel.controller.dictdb.DictDB', config=True,
177 help="""The class to use for the DB backend""")
177 help="""The class to use for the DB backend""")
178
178
179 # not configurable
179 # not configurable
180 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
180 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
181 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
181 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
182
182
183 def _ip_changed(self, name, old, new):
183 def _ip_changed(self, name, old, new):
184 self.engine_ip = new
184 self.engine_ip = new
185 self.client_ip = new
185 self.client_ip = new
186 self.monitor_ip = new
186 self.monitor_ip = new
187 self._update_monitor_url()
187 self._update_monitor_url()
188
188
189 def _update_monitor_url(self):
189 def _update_monitor_url(self):
190 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
190 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
191
191
192 def _transport_changed(self, name, old, new):
192 def _transport_changed(self, name, old, new):
193 self.engine_transport = new
193 self.engine_transport = new
194 self.client_transport = new
194 self.client_transport = new
195 self.monitor_transport = new
195 self.monitor_transport = new
196 self._update_monitor_url()
196 self._update_monitor_url()
197
197
198 def __init__(self, **kwargs):
198 def __init__(self, **kwargs):
199 super(HubFactory, self).__init__(**kwargs)
199 super(HubFactory, self).__init__(**kwargs)
200 self._update_monitor_url()
200 self._update_monitor_url()
201 # self.on_trait_change(self._sync_ips, 'ip')
202 # self.on_trait_change(self._sync_transports, 'transport')
203 # self.subconstructors.append(self.construct_hub)
204
201
205
202
206 def construct(self):
203 def construct(self):
207 self.init_hub()
204 self.init_hub()
208
205
209 def start(self):
206 def start(self):
210 self.heartmonitor.start()
207 self.heartmonitor.start()
211 self.log.info("Heartmonitor started")
208 self.log.info("Heartmonitor started")
212
209
213 def init_hub(self):
210 def init_hub(self):
214 """construct"""
211 """construct"""
215 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
212 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
216 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
213 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
217
214
218 ctx = self.context
215 ctx = self.context
219 loop = self.loop
216 loop = self.loop
220
217
221 # Registrar socket
218 # Registrar socket
222 q = ZMQStream(ctx.socket(zmq.XREP), loop)
219 q = ZMQStream(ctx.socket(zmq.XREP), loop)
223 q.bind(client_iface % self.regport)
220 q.bind(client_iface % self.regport)
224 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
221 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
225 if self.client_ip != self.engine_ip:
222 if self.client_ip != self.engine_ip:
226 q.bind(engine_iface % self.regport)
223 q.bind(engine_iface % self.regport)
227 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
224 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
228
225
229 ### Engine connections ###
226 ### Engine connections ###
230
227
231 # heartbeat
228 # heartbeat
232 hpub = ctx.socket(zmq.PUB)
229 hpub = ctx.socket(zmq.PUB)
233 hpub.bind(engine_iface % self.hb[0])
230 hpub.bind(engine_iface % self.hb[0])
234 hrep = ctx.socket(zmq.XREP)
231 hrep = ctx.socket(zmq.XREP)
235 hrep.bind(engine_iface % self.hb[1])
232 hrep.bind(engine_iface % self.hb[1])
236 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
233 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
237 config=self.config)
234 config=self.config)
238
235
239 ### Client connections ###
236 ### Client connections ###
240 # Notifier socket
237 # Notifier socket
241 n = ZMQStream(ctx.socket(zmq.PUB), loop)
238 n = ZMQStream(ctx.socket(zmq.PUB), loop)
242 n.bind(client_iface%self.notifier_port)
239 n.bind(client_iface%self.notifier_port)
243
240
244 ### build and launch the queues ###
241 ### build and launch the queues ###
245
242
246 # monitor socket
243 # monitor socket
247 sub = ctx.socket(zmq.SUB)
244 sub = ctx.socket(zmq.SUB)
248 sub.setsockopt(zmq.SUBSCRIBE, "")
245 sub.setsockopt(zmq.SUBSCRIBE, "")
249 sub.bind(self.monitor_url)
246 sub.bind(self.monitor_url)
250 sub.bind('inproc://monitor')
247 sub.bind('inproc://monitor')
251 sub = ZMQStream(sub, loop)
248 sub = ZMQStream(sub, loop)
252
249
253 # connect the db
250 # connect the db
254 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
251 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
255 # cdir = self.config.Global.cluster_dir
252 # cdir = self.config.Global.cluster_dir
256 self.db = import_item(str(self.db_class))(session=self.session.session, config=self.config)
253 self.db = import_item(str(self.db_class))(session=self.session.session, config=self.config)
257 time.sleep(.25)
254 time.sleep(.25)
258 try:
255 try:
259 scheme = self.config.TaskScheduler.scheme_name
256 scheme = self.config.TaskScheduler.scheme_name
260 except AttributeError:
257 except AttributeError:
261 from .scheduler import TaskScheduler
258 from .scheduler import TaskScheduler
262 scheme = TaskScheduler.scheme_name.get_default_value()
259 scheme = TaskScheduler.scheme_name.get_default_value()
263 # build connection dicts
260 # build connection dicts
264 self.engine_info = {
261 self.engine_info = {
265 'control' : engine_iface%self.control[1],
262 'control' : engine_iface%self.control[1],
266 'mux': engine_iface%self.mux[1],
263 'mux': engine_iface%self.mux[1],
267 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
264 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
268 'task' : engine_iface%self.task[1],
265 'task' : engine_iface%self.task[1],
269 'iopub' : engine_iface%self.iopub[1],
266 'iopub' : engine_iface%self.iopub[1],
270 # 'monitor' : engine_iface%self.mon_port,
267 # 'monitor' : engine_iface%self.mon_port,
271 }
268 }
272
269
273 self.client_info = {
270 self.client_info = {
274 'control' : client_iface%self.control[0],
271 'control' : client_iface%self.control[0],
275 'mux': client_iface%self.mux[0],
272 'mux': client_iface%self.mux[0],
276 'task' : (scheme, client_iface%self.task[0]),
273 'task' : (scheme, client_iface%self.task[0]),
277 'iopub' : client_iface%self.iopub[0],
274 'iopub' : client_iface%self.iopub[0],
278 'notification': client_iface%self.notifier_port
275 'notification': client_iface%self.notifier_port
279 }
276 }
280 self.log.debug("Hub engine addrs: %s"%self.engine_info)
277 self.log.debug("Hub engine addrs: %s"%self.engine_info)
281 self.log.debug("Hub client addrs: %s"%self.client_info)
278 self.log.debug("Hub client addrs: %s"%self.client_info)
282
279
283 # resubmit stream
280 # resubmit stream
284 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
281 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
285 url = util.disambiguate_url(self.client_info['task'][-1])
282 url = util.disambiguate_url(self.client_info['task'][-1])
286 r.setsockopt(zmq.IDENTITY, self.session.session)
283 r.setsockopt(zmq.IDENTITY, self.session.session)
287 r.connect(url)
284 r.connect(url)
288
285
289 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
286 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
290 query=q, notifier=n, resubmit=r, db=self.db,
287 query=q, notifier=n, resubmit=r, db=self.db,
291 engine_info=self.engine_info, client_info=self.client_info,
288 engine_info=self.engine_info, client_info=self.client_info,
292 logname=self.log.name)
289 logname=self.log.name)
293
290
294
291
295 class Hub(LoggingFactory):
292 class Hub(LoggingFactory):
296 """The IPython Controller Hub with 0MQ connections
293 """The IPython Controller Hub with 0MQ connections
297
294
298 Parameters
295 Parameters
299 ==========
296 ==========
300 loop: zmq IOLoop instance
297 loop: zmq IOLoop instance
301 session: StreamSession object
298 session: StreamSession object
302 <removed> context: zmq context for creating new connections (?)
299 <removed> context: zmq context for creating new connections (?)
303 queue: ZMQStream for monitoring the command queue (SUB)
300 queue: ZMQStream for monitoring the command queue (SUB)
304 query: ZMQStream for engine registration and client queries requests (XREP)
301 query: ZMQStream for engine registration and client queries requests (XREP)
305 heartbeat: HeartMonitor object checking the pulse of the engines
302 heartbeat: HeartMonitor object checking the pulse of the engines
306 notifier: ZMQStream for broadcasting engine registration changes (PUB)
303 notifier: ZMQStream for broadcasting engine registration changes (PUB)
307 db: connection to db for out of memory logging of commands
304 db: connection to db for out of memory logging of commands
308 NotImplemented
305 NotImplemented
309 engine_info: dict of zmq connection information for engines to connect
306 engine_info: dict of zmq connection information for engines to connect
310 to the queues.
307 to the queues.
311 client_info: dict of zmq connection information for engines to connect
308 client_info: dict of zmq connection information for engines to connect
312 to the queues.
309 to the queues.
313 """
310 """
314 # internal data structures:
311 # internal data structures:
315 ids=Set() # engine IDs
312 ids=Set() # engine IDs
316 keytable=Dict()
313 keytable=Dict()
317 by_ident=Dict()
314 by_ident=Dict()
318 engines=Dict()
315 engines=Dict()
319 clients=Dict()
316 clients=Dict()
320 hearts=Dict()
317 hearts=Dict()
321 pending=Set()
318 pending=Set()
322 queues=Dict() # pending msg_ids keyed by engine_id
319 queues=Dict() # pending msg_ids keyed by engine_id
323 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
320 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
324 completed=Dict() # completed msg_ids keyed by engine_id
321 completed=Dict() # completed msg_ids keyed by engine_id
325 all_completed=Set() # completed msg_ids keyed by engine_id
322 all_completed=Set() # completed msg_ids keyed by engine_id
326 dead_engines=Set() # completed msg_ids keyed by engine_id
323 dead_engines=Set() # completed msg_ids keyed by engine_id
327 unassigned=Set() # set of task msg_ds not yet assigned a destination
324 unassigned=Set() # set of task msg_ds not yet assigned a destination
328 incoming_registrations=Dict()
325 incoming_registrations=Dict()
329 registration_timeout=Int()
326 registration_timeout=Int()
330 _idcounter=Int(0)
327 _idcounter=Int(0)
331
328
332 # objects from constructor:
329 # objects from constructor:
333 loop=Instance(ioloop.IOLoop)
330 loop=Instance(ioloop.IOLoop)
334 query=Instance(ZMQStream)
331 query=Instance(ZMQStream)
335 monitor=Instance(ZMQStream)
332 monitor=Instance(ZMQStream)
336 notifier=Instance(ZMQStream)
333 notifier=Instance(ZMQStream)
337 resubmit=Instance(ZMQStream)
334 resubmit=Instance(ZMQStream)
338 heartmonitor=Instance(HeartMonitor)
335 heartmonitor=Instance(HeartMonitor)
339 db=Instance(object)
336 db=Instance(object)
340 client_info=Dict()
337 client_info=Dict()
341 engine_info=Dict()
338 engine_info=Dict()
342
339
343
340
344 def __init__(self, **kwargs):
341 def __init__(self, **kwargs):
345 """
342 """
346 # universal:
343 # universal:
347 loop: IOLoop for creating future connections
344 loop: IOLoop for creating future connections
348 session: streamsession for sending serialized data
345 session: streamsession for sending serialized data
349 # engine:
346 # engine:
350 queue: ZMQStream for monitoring queue messages
347 queue: ZMQStream for monitoring queue messages
351 query: ZMQStream for engine+client registration and client requests
348 query: ZMQStream for engine+client registration and client requests
352 heartbeat: HeartMonitor object for tracking engines
349 heartbeat: HeartMonitor object for tracking engines
353 # extra:
350 # extra:
354 db: ZMQStream for db connection (NotImplemented)
351 db: ZMQStream for db connection (NotImplemented)
355 engine_info: zmq address/protocol dict for engine connections
352 engine_info: zmq address/protocol dict for engine connections
356 client_info: zmq address/protocol dict for client connections
353 client_info: zmq address/protocol dict for client connections
357 """
354 """
358
355
359 super(Hub, self).__init__(**kwargs)
356 super(Hub, self).__init__(**kwargs)
360 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
357 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
361
358
362 # validate connection dicts:
359 # validate connection dicts:
363 for k,v in self.client_info.iteritems():
360 for k,v in self.client_info.iteritems():
364 if k == 'task':
361 if k == 'task':
365 util.validate_url_container(v[1])
362 util.validate_url_container(v[1])
366 else:
363 else:
367 util.validate_url_container(v)
364 util.validate_url_container(v)
368 # util.validate_url_container(self.client_info)
365 # util.validate_url_container(self.client_info)
369 util.validate_url_container(self.engine_info)
366 util.validate_url_container(self.engine_info)
370
367
371 # register our callbacks
368 # register our callbacks
372 self.query.on_recv(self.dispatch_query)
369 self.query.on_recv(self.dispatch_query)
373 self.monitor.on_recv(self.dispatch_monitor_traffic)
370 self.monitor.on_recv(self.dispatch_monitor_traffic)
374
371
375 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
372 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
376 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
373 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
377
374
378 self.monitor_handlers = { 'in' : self.save_queue_request,
375 self.monitor_handlers = { 'in' : self.save_queue_request,
379 'out': self.save_queue_result,
376 'out': self.save_queue_result,
380 'intask': self.save_task_request,
377 'intask': self.save_task_request,
381 'outtask': self.save_task_result,
378 'outtask': self.save_task_result,
382 'tracktask': self.save_task_destination,
379 'tracktask': self.save_task_destination,
383 'incontrol': _passer,
380 'incontrol': _passer,
384 'outcontrol': _passer,
381 'outcontrol': _passer,
385 'iopub': self.save_iopub_message,
382 'iopub': self.save_iopub_message,
386 }
383 }
387
384
388 self.query_handlers = {'queue_request': self.queue_status,
385 self.query_handlers = {'queue_request': self.queue_status,
389 'result_request': self.get_results,
386 'result_request': self.get_results,
390 'history_request': self.get_history,
387 'history_request': self.get_history,
391 'db_request': self.db_query,
388 'db_request': self.db_query,
392 'purge_request': self.purge_results,
389 'purge_request': self.purge_results,
393 'load_request': self.check_load,
390 'load_request': self.check_load,
394 'resubmit_request': self.resubmit_task,
391 'resubmit_request': self.resubmit_task,
395 'shutdown_request': self.shutdown_request,
392 'shutdown_request': self.shutdown_request,
396 'registration_request' : self.register_engine,
393 'registration_request' : self.register_engine,
397 'unregistration_request' : self.unregister_engine,
394 'unregistration_request' : self.unregister_engine,
398 'connection_request': self.connection_request,
395 'connection_request': self.connection_request,
399 }
396 }
400
397
401 # ignore resubmit replies
398 # ignore resubmit replies
402 self.resubmit.on_recv(lambda msg: None, copy=False)
399 self.resubmit.on_recv(lambda msg: None, copy=False)
403
400
404 self.log.info("hub::created hub")
401 self.log.info("hub::created hub")
405
402
406 @property
403 @property
407 def _next_id(self):
404 def _next_id(self):
408 """gemerate a new ID.
405 """gemerate a new ID.
409
406
410 No longer reuse old ids, just count from 0."""
407 No longer reuse old ids, just count from 0."""
411 newid = self._idcounter
408 newid = self._idcounter
412 self._idcounter += 1
409 self._idcounter += 1
413 return newid
410 return newid
414 # newid = 0
411 # newid = 0
415 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
412 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
416 # # print newid, self.ids, self.incoming_registrations
413 # # print newid, self.ids, self.incoming_registrations
417 # while newid in self.ids or newid in incoming:
414 # while newid in self.ids or newid in incoming:
418 # newid += 1
415 # newid += 1
419 # return newid
416 # return newid
420
417
421 #-----------------------------------------------------------------------------
418 #-----------------------------------------------------------------------------
422 # message validation
419 # message validation
423 #-----------------------------------------------------------------------------
420 #-----------------------------------------------------------------------------
424
421
425 def _validate_targets(self, targets):
422 def _validate_targets(self, targets):
426 """turn any valid targets argument into a list of integer ids"""
423 """turn any valid targets argument into a list of integer ids"""
427 if targets is None:
424 if targets is None:
428 # default to all
425 # default to all
429 targets = self.ids
426 targets = self.ids
430
427
431 if isinstance(targets, (int,str,unicode)):
428 if isinstance(targets, (int,str,unicode)):
432 # only one target specified
429 # only one target specified
433 targets = [targets]
430 targets = [targets]
434 _targets = []
431 _targets = []
435 for t in targets:
432 for t in targets:
436 # map raw identities to ids
433 # map raw identities to ids
437 if isinstance(t, (str,unicode)):
434 if isinstance(t, (str,unicode)):
438 t = self.by_ident.get(t, t)
435 t = self.by_ident.get(t, t)
439 _targets.append(t)
436 _targets.append(t)
440 targets = _targets
437 targets = _targets
441 bad_targets = [ t for t in targets if t not in self.ids ]
438 bad_targets = [ t for t in targets if t not in self.ids ]
442 if bad_targets:
439 if bad_targets:
443 raise IndexError("No Such Engine: %r"%bad_targets)
440 raise IndexError("No Such Engine: %r"%bad_targets)
444 if not targets:
441 if not targets:
445 raise IndexError("No Engines Registered")
442 raise IndexError("No Engines Registered")
446 return targets
443 return targets
447
444
448 #-----------------------------------------------------------------------------
445 #-----------------------------------------------------------------------------
449 # dispatch methods (1 per stream)
446 # dispatch methods (1 per stream)
450 #-----------------------------------------------------------------------------
447 #-----------------------------------------------------------------------------
451
448
452 # def dispatch_registration_request(self, msg):
453 # """"""
454 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
455 # idents,msg = self.session.feed_identities(msg)
456 # if not idents:
457 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
458 # return
459 # try:
460 # msg = self.session.unpack_message(msg,content=True)
461 # except:
462 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
463 # return
464 #
465 # msg_type = msg['msg_type']
466 # content = msg['content']
467 #
468 # handler = self.query_handlers.get(msg_type, None)
469 # if handler is None:
470 # self.log.error("registration::got bad registration message: %s"%msg)
471 # else:
472 # handler(idents, msg)
473
449
474 def dispatch_monitor_traffic(self, msg):
450 def dispatch_monitor_traffic(self, msg):
475 """all ME and Task queue messages come through here, as well as
451 """all ME and Task queue messages come through here, as well as
476 IOPub traffic."""
452 IOPub traffic."""
477 self.log.debug("monitor traffic: %r"%msg[:2])
453 self.log.debug("monitor traffic: %r"%msg[:2])
478 switch = msg[0]
454 switch = msg[0]
479 idents, msg = self.session.feed_identities(msg[1:])
455 try:
456 idents, msg = self.session.feed_identities(msg[1:])
457 except ValueError:
458 idents=[]
480 if not idents:
459 if not idents:
481 self.log.error("Bad Monitor Message: %r"%msg)
460 self.log.error("Bad Monitor Message: %r"%msg)
482 return
461 return
483 handler = self.monitor_handlers.get(switch, None)
462 handler = self.monitor_handlers.get(switch, None)
484 if handler is not None:
463 if handler is not None:
485 handler(idents, msg)
464 handler(idents, msg)
486 else:
465 else:
487 self.log.error("Invalid monitor topic: %r"%switch)
466 self.log.error("Invalid monitor topic: %r"%switch)
488
467
489
468
490 def dispatch_query(self, msg):
469 def dispatch_query(self, msg):
491 """Route registration requests and queries from clients."""
470 """Route registration requests and queries from clients."""
492 idents, msg = self.session.feed_identities(msg)
471 idents, msg = self.session.feed_identities(msg)
493 if not idents:
472 if not idents:
494 self.log.error("Bad Query Message: %r"%msg)
473 self.log.error("Bad Query Message: %r"%msg)
495 return
474 return
496 client_id = idents[0]
475 client_id = idents[0]
497 try:
476 try:
498 msg = self.session.unpack_message(msg, content=True)
477 msg = self.session.unpack_message(msg, content=True)
499 except:
478 except:
500 content = error.wrap_exception()
479 content = error.wrap_exception()
501 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
480 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
502 self.session.send(self.query, "hub_error", ident=client_id,
481 self.session.send(self.query, "hub_error", ident=client_id,
503 content=content)
482 content=content)
504 return
483 return
505
484
506 # print client_id, header, parent, content
485 # print client_id, header, parent, content
507 #switch on message type:
486 #switch on message type:
508 msg_type = msg['msg_type']
487 msg_type = msg['msg_type']
509 self.log.info("client::client %r requested %r"%(client_id, msg_type))
488 self.log.info("client::client %r requested %r"%(client_id, msg_type))
510 handler = self.query_handlers.get(msg_type, None)
489 handler = self.query_handlers.get(msg_type, None)
511 try:
490 try:
512 assert handler is not None, "Bad Message Type: %r"%msg_type
491 assert handler is not None, "Bad Message Type: %r"%msg_type
513 except:
492 except:
514 content = error.wrap_exception()
493 content = error.wrap_exception()
515 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
494 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
516 self.session.send(self.query, "hub_error", ident=client_id,
495 self.session.send(self.query, "hub_error", ident=client_id,
517 content=content)
496 content=content)
518 return
497 return
519
498
520 else:
499 else:
521 handler(idents, msg)
500 handler(idents, msg)
522
501
523 def dispatch_db(self, msg):
502 def dispatch_db(self, msg):
524 """"""
503 """"""
525 raise NotImplementedError
504 raise NotImplementedError
526
505
527 #---------------------------------------------------------------------------
506 #---------------------------------------------------------------------------
528 # handler methods (1 per event)
507 # handler methods (1 per event)
529 #---------------------------------------------------------------------------
508 #---------------------------------------------------------------------------
530
509
531 #----------------------- Heartbeat --------------------------------------
510 #----------------------- Heartbeat --------------------------------------
532
511
533 def handle_new_heart(self, heart):
512 def handle_new_heart(self, heart):
534 """handler to attach to heartbeater.
513 """handler to attach to heartbeater.
535 Called when a new heart starts to beat.
514 Called when a new heart starts to beat.
536 Triggers completion of registration."""
515 Triggers completion of registration."""
537 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
516 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
538 if heart not in self.incoming_registrations:
517 if heart not in self.incoming_registrations:
539 self.log.info("heartbeat::ignoring new heart: %r"%heart)
518 self.log.info("heartbeat::ignoring new heart: %r"%heart)
540 else:
519 else:
541 self.finish_registration(heart)
520 self.finish_registration(heart)
542
521
543
522
544 def handle_heart_failure(self, heart):
523 def handle_heart_failure(self, heart):
545 """handler to attach to heartbeater.
524 """handler to attach to heartbeater.
546 called when a previously registered heart fails to respond to beat request.
525 called when a previously registered heart fails to respond to beat request.
547 triggers unregistration"""
526 triggers unregistration"""
548 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
527 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
549 eid = self.hearts.get(heart, None)
528 eid = self.hearts.get(heart, None)
550 queue = self.engines[eid].queue
529 queue = self.engines[eid].queue
551 if eid is None:
530 if eid is None:
552 self.log.info("heartbeat::ignoring heart failure %r"%heart)
531 self.log.info("heartbeat::ignoring heart failure %r"%heart)
553 else:
532 else:
554 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
533 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
555
534
556 #----------------------- MUX Queue Traffic ------------------------------
535 #----------------------- MUX Queue Traffic ------------------------------
557
536
558 def save_queue_request(self, idents, msg):
537 def save_queue_request(self, idents, msg):
559 if len(idents) < 2:
538 if len(idents) < 2:
560 self.log.error("invalid identity prefix: %s"%idents)
539 self.log.error("invalid identity prefix: %r"%idents)
561 return
540 return
562 queue_id, client_id = idents[:2]
541 queue_id, client_id = idents[:2]
563 try:
542 try:
564 msg = self.session.unpack_message(msg, content=False)
543 msg = self.session.unpack_message(msg, content=False)
565 except:
544 except Exception:
566 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
545 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
567 return
546 return
568
547
569 eid = self.by_ident.get(queue_id, None)
548 eid = self.by_ident.get(queue_id, None)
570 if eid is None:
549 if eid is None:
571 self.log.error("queue::target %r not registered"%queue_id)
550 self.log.error("queue::target %r not registered"%queue_id)
572 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
551 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
573 return
552 return
574
553
575 header = msg['header']
554 header = msg['header']
576 msg_id = header['msg_id']
555 msg_id = header['msg_id']
577 record = init_record(msg)
556 record = init_record(msg)
578 record['engine_uuid'] = queue_id
557 record['engine_uuid'] = queue_id
579 record['client_uuid'] = client_id
558 record['client_uuid'] = client_id
580 record['queue'] = 'mux'
559 record['queue'] = 'mux'
581
560
582 try:
561 try:
583 # it's posible iopub arrived first:
562 # it's posible iopub arrived first:
584 existing = self.db.get_record(msg_id)
563 existing = self.db.get_record(msg_id)
585 for key,evalue in existing.iteritems():
564 for key,evalue in existing.iteritems():
586 rvalue = record.get(key, None)
565 rvalue = record.get(key, None)
587 if evalue and rvalue and evalue != rvalue:
566 if evalue and rvalue and evalue != rvalue:
588 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
567 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
589 elif evalue and not rvalue:
568 elif evalue and not rvalue:
590 record[key] = evalue
569 record[key] = evalue
591 self.db.update_record(msg_id, record)
570 self.db.update_record(msg_id, record)
592 except KeyError:
571 except KeyError:
593 self.db.add_record(msg_id, record)
572 self.db.add_record(msg_id, record)
594
573
595 self.pending.add(msg_id)
574 self.pending.add(msg_id)
596 self.queues[eid].append(msg_id)
575 self.queues[eid].append(msg_id)
597
576
598 def save_queue_result(self, idents, msg):
577 def save_queue_result(self, idents, msg):
599 if len(idents) < 2:
578 if len(idents) < 2:
600 self.log.error("invalid identity prefix: %s"%idents)
579 self.log.error("invalid identity prefix: %r"%idents)
601 return
580 return
602
581
603 client_id, queue_id = idents[:2]
582 client_id, queue_id = idents[:2]
604 try:
583 try:
605 msg = self.session.unpack_message(msg, content=False)
584 msg = self.session.unpack_message(msg, content=False)
606 except:
585 except Exception:
607 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
586 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
608 queue_id,client_id, msg), exc_info=True)
587 queue_id,client_id, msg), exc_info=True)
609 return
588 return
610
589
611 eid = self.by_ident.get(queue_id, None)
590 eid = self.by_ident.get(queue_id, None)
612 if eid is None:
591 if eid is None:
613 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
592 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
614 # self.log.debug("queue:: %s"%msg[2:])
615 return
593 return
616
594
617 parent = msg['parent_header']
595 parent = msg['parent_header']
618 if not parent:
596 if not parent:
619 return
597 return
620 msg_id = parent['msg_id']
598 msg_id = parent['msg_id']
621 if msg_id in self.pending:
599 if msg_id in self.pending:
622 self.pending.remove(msg_id)
600 self.pending.remove(msg_id)
623 self.all_completed.add(msg_id)
601 self.all_completed.add(msg_id)
624 self.queues[eid].remove(msg_id)
602 self.queues[eid].remove(msg_id)
625 self.completed[eid].append(msg_id)
603 self.completed[eid].append(msg_id)
626 elif msg_id not in self.all_completed:
604 elif msg_id not in self.all_completed:
627 # it could be a result from a dead engine that died before delivering the
605 # it could be a result from a dead engine that died before delivering the
628 # result
606 # result
629 self.log.warn("queue:: unknown msg finished %s"%msg_id)
607 self.log.warn("queue:: unknown msg finished %r"%msg_id)
630 return
608 return
631 # update record anyway, because the unregistration could have been premature
609 # update record anyway, because the unregistration could have been premature
632 rheader = msg['header']
610 rheader = msg['header']
633 completed = datetime.strptime(rheader['date'], util.ISO8601)
611 completed = datetime.strptime(rheader['date'], util.ISO8601)
634 started = rheader.get('started', None)
612 started = rheader.get('started', None)
635 if started is not None:
613 if started is not None:
636 started = datetime.strptime(started, util.ISO8601)
614 started = datetime.strptime(started, util.ISO8601)
637 result = {
615 result = {
638 'result_header' : rheader,
616 'result_header' : rheader,
639 'result_content': msg['content'],
617 'result_content': msg['content'],
640 'started' : started,
618 'started' : started,
641 'completed' : completed
619 'completed' : completed
642 }
620 }
643
621
644 result['result_buffers'] = msg['buffers']
622 result['result_buffers'] = msg['buffers']
645 try:
623 try:
646 self.db.update_record(msg_id, result)
624 self.db.update_record(msg_id, result)
647 except Exception:
625 except Exception:
648 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
626 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
649
627
650
628
651 #--------------------- Task Queue Traffic ------------------------------
629 #--------------------- Task Queue Traffic ------------------------------
652
630
653 def save_task_request(self, idents, msg):
631 def save_task_request(self, idents, msg):
654 """Save the submission of a task."""
632 """Save the submission of a task."""
655 client_id = idents[0]
633 client_id = idents[0]
656
634
657 try:
635 try:
658 msg = self.session.unpack_message(msg, content=False)
636 msg = self.session.unpack_message(msg, content=False)
659 except:
637 except Exception:
660 self.log.error("task::client %r sent invalid task message: %s"%(
638 self.log.error("task::client %r sent invalid task message: %r"%(
661 client_id, msg), exc_info=True)
639 client_id, msg), exc_info=True)
662 return
640 return
663 record = init_record(msg)
641 record = init_record(msg)
664
642
665 record['client_uuid'] = client_id
643 record['client_uuid'] = client_id
666 record['queue'] = 'task'
644 record['queue'] = 'task'
667 header = msg['header']
645 header = msg['header']
668 msg_id = header['msg_id']
646 msg_id = header['msg_id']
669 self.pending.add(msg_id)
647 self.pending.add(msg_id)
670 self.unassigned.add(msg_id)
648 self.unassigned.add(msg_id)
671 try:
649 try:
672 # it's posible iopub arrived first:
650 # it's posible iopub arrived first:
673 existing = self.db.get_record(msg_id)
651 existing = self.db.get_record(msg_id)
674 if existing['resubmitted']:
652 if existing['resubmitted']:
675 for key in ('submitted', 'client_uuid', 'buffers'):
653 for key in ('submitted', 'client_uuid', 'buffers'):
676 # don't clobber these keys on resubmit
654 # don't clobber these keys on resubmit
677 # submitted and client_uuid should be different
655 # submitted and client_uuid should be different
678 # and buffers might be big, and shouldn't have changed
656 # and buffers might be big, and shouldn't have changed
679 record.pop(key)
657 record.pop(key)
680 # still check content,header which should not change
658 # still check content,header which should not change
681 # but are not expensive to compare as buffers
659 # but are not expensive to compare as buffers
682
660
683 for key,evalue in existing.iteritems():
661 for key,evalue in existing.iteritems():
684 if key.endswith('buffers'):
662 if key.endswith('buffers'):
685 # don't compare buffers
663 # don't compare buffers
686 continue
664 continue
687 rvalue = record.get(key, None)
665 rvalue = record.get(key, None)
688 if evalue and rvalue and evalue != rvalue:
666 if evalue and rvalue and evalue != rvalue:
689 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
667 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
690 elif evalue and not rvalue:
668 elif evalue and not rvalue:
691 record[key] = evalue
669 record[key] = evalue
692 self.db.update_record(msg_id, record)
670 self.db.update_record(msg_id, record)
693 except KeyError:
671 except KeyError:
694 self.db.add_record(msg_id, record)
672 self.db.add_record(msg_id, record)
695 except Exception:
673 except Exception:
696 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
674 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
697
675
698 def save_task_result(self, idents, msg):
676 def save_task_result(self, idents, msg):
699 """save the result of a completed task."""
677 """save the result of a completed task."""
700 client_id = idents[0]
678 client_id = idents[0]
701 try:
679 try:
702 msg = self.session.unpack_message(msg, content=False)
680 msg = self.session.unpack_message(msg, content=False)
703 except:
681 except Exception:
704 self.log.error("task::invalid task result message send to %r: %s"%(
682 self.log.error("task::invalid task result message send to %r: %r"%(
705 client_id, msg), exc_info=True)
683 client_id, msg), exc_info=True)
706 raise
707 return
684 return
708
685
709 parent = msg['parent_header']
686 parent = msg['parent_header']
710 if not parent:
687 if not parent:
711 # print msg
688 # print msg
712 self.log.warn("Task %r had no parent!"%msg)
689 self.log.warn("Task %r had no parent!"%msg)
713 return
690 return
714 msg_id = parent['msg_id']
691 msg_id = parent['msg_id']
715 if msg_id in self.unassigned:
692 if msg_id in self.unassigned:
716 self.unassigned.remove(msg_id)
693 self.unassigned.remove(msg_id)
717
694
718 header = msg['header']
695 header = msg['header']
719 engine_uuid = header.get('engine', None)
696 engine_uuid = header.get('engine', None)
720 eid = self.by_ident.get(engine_uuid, None)
697 eid = self.by_ident.get(engine_uuid, None)
721
698
722 if msg_id in self.pending:
699 if msg_id in self.pending:
723 self.pending.remove(msg_id)
700 self.pending.remove(msg_id)
724 self.all_completed.add(msg_id)
701 self.all_completed.add(msg_id)
725 if eid is not None:
702 if eid is not None:
726 self.completed[eid].append(msg_id)
703 self.completed[eid].append(msg_id)
727 if msg_id in self.tasks[eid]:
704 if msg_id in self.tasks[eid]:
728 self.tasks[eid].remove(msg_id)
705 self.tasks[eid].remove(msg_id)
729 completed = datetime.strptime(header['date'], util.ISO8601)
706 completed = datetime.strptime(header['date'], util.ISO8601)
730 started = header.get('started', None)
707 started = header.get('started', None)
731 if started is not None:
708 if started is not None:
732 started = datetime.strptime(started, util.ISO8601)
709 started = datetime.strptime(started, util.ISO8601)
733 result = {
710 result = {
734 'result_header' : header,
711 'result_header' : header,
735 'result_content': msg['content'],
712 'result_content': msg['content'],
736 'started' : started,
713 'started' : started,
737 'completed' : completed,
714 'completed' : completed,
738 'engine_uuid': engine_uuid
715 'engine_uuid': engine_uuid
739 }
716 }
740
717
741 result['result_buffers'] = msg['buffers']
718 result['result_buffers'] = msg['buffers']
742 try:
719 try:
743 self.db.update_record(msg_id, result)
720 self.db.update_record(msg_id, result)
744 except Exception:
721 except Exception:
745 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
722 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
746
723
747 else:
724 else:
748 self.log.debug("task::unknown task %s finished"%msg_id)
725 self.log.debug("task::unknown task %r finished"%msg_id)
749
726
750 def save_task_destination(self, idents, msg):
727 def save_task_destination(self, idents, msg):
751 try:
728 try:
752 msg = self.session.unpack_message(msg, content=True)
729 msg = self.session.unpack_message(msg, content=True)
753 except:
730 except Exception:
754 self.log.error("task::invalid task tracking message", exc_info=True)
731 self.log.error("task::invalid task tracking message", exc_info=True)
755 return
732 return
756 content = msg['content']
733 content = msg['content']
757 # print (content)
734 # print (content)
758 msg_id = content['msg_id']
735 msg_id = content['msg_id']
759 engine_uuid = content['engine_id']
736 engine_uuid = content['engine_id']
760 eid = self.by_ident[engine_uuid]
737 eid = self.by_ident[engine_uuid]
761
738
762 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
739 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
763 if msg_id in self.unassigned:
740 if msg_id in self.unassigned:
764 self.unassigned.remove(msg_id)
741 self.unassigned.remove(msg_id)
765 # else:
742 # else:
766 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
743 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
767
744
768 self.tasks[eid].append(msg_id)
745 self.tasks[eid].append(msg_id)
769 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
746 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
770 try:
747 try:
771 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
748 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
772 except Exception:
749 except Exception:
773 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
750 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
774
751
775
752
776 def mia_task_request(self, idents, msg):
753 def mia_task_request(self, idents, msg):
777 raise NotImplementedError
754 raise NotImplementedError
778 client_id = idents[0]
755 client_id = idents[0]
779 # content = dict(mia=self.mia,status='ok')
756 # content = dict(mia=self.mia,status='ok')
780 # self.session.send('mia_reply', content=content, idents=client_id)
757 # self.session.send('mia_reply', content=content, idents=client_id)
781
758
782
759
783 #--------------------- IOPub Traffic ------------------------------
760 #--------------------- IOPub Traffic ------------------------------
784
761
785 def save_iopub_message(self, topics, msg):
762 def save_iopub_message(self, topics, msg):
786 """save an iopub message into the db"""
763 """save an iopub message into the db"""
787 # print (topics)
764 # print (topics)
788 try:
765 try:
789 msg = self.session.unpack_message(msg, content=True)
766 msg = self.session.unpack_message(msg, content=True)
790 except:
767 except Exception:
791 self.log.error("iopub::invalid IOPub message", exc_info=True)
768 self.log.error("iopub::invalid IOPub message", exc_info=True)
792 return
769 return
793
770
794 parent = msg['parent_header']
771 parent = msg['parent_header']
795 if not parent:
772 if not parent:
796 self.log.error("iopub::invalid IOPub message: %s"%msg)
773 self.log.error("iopub::invalid IOPub message: %r"%msg)
797 return
774 return
798 msg_id = parent['msg_id']
775 msg_id = parent['msg_id']
799 msg_type = msg['msg_type']
776 msg_type = msg['msg_type']
800 content = msg['content']
777 content = msg['content']
801
778
802 # ensure msg_id is in db
779 # ensure msg_id is in db
803 try:
780 try:
804 rec = self.db.get_record(msg_id)
781 rec = self.db.get_record(msg_id)
805 except KeyError:
782 except KeyError:
806 rec = empty_record()
783 rec = empty_record()
807 rec['msg_id'] = msg_id
784 rec['msg_id'] = msg_id
808 self.db.add_record(msg_id, rec)
785 self.db.add_record(msg_id, rec)
809 # stream
786 # stream
810 d = {}
787 d = {}
811 if msg_type == 'stream':
788 if msg_type == 'stream':
812 name = content['name']
789 name = content['name']
813 s = rec[name] or ''
790 s = rec[name] or ''
814 d[name] = s + content['data']
791 d[name] = s + content['data']
815
792
816 elif msg_type == 'pyerr':
793 elif msg_type == 'pyerr':
817 d['pyerr'] = content
794 d['pyerr'] = content
818 elif msg_type == 'pyin':
795 elif msg_type == 'pyin':
819 d['pyin'] = content['code']
796 d['pyin'] = content['code']
820 else:
797 else:
821 d[msg_type] = content.get('data', '')
798 d[msg_type] = content.get('data', '')
822
799
823 try:
800 try:
824 self.db.update_record(msg_id, d)
801 self.db.update_record(msg_id, d)
825 except Exception:
802 except Exception:
826 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
803 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
827
804
828
805
829
806
830 #-------------------------------------------------------------------------
807 #-------------------------------------------------------------------------
831 # Registration requests
808 # Registration requests
832 #-------------------------------------------------------------------------
809 #-------------------------------------------------------------------------
833
810
834 def connection_request(self, client_id, msg):
811 def connection_request(self, client_id, msg):
835 """Reply with connection addresses for clients."""
812 """Reply with connection addresses for clients."""
836 self.log.info("client::client %s connected"%client_id)
813 self.log.info("client::client %r connected"%client_id)
837 content = dict(status='ok')
814 content = dict(status='ok')
838 content.update(self.client_info)
815 content.update(self.client_info)
839 jsonable = {}
816 jsonable = {}
840 for k,v in self.keytable.iteritems():
817 for k,v in self.keytable.iteritems():
841 if v not in self.dead_engines:
818 if v not in self.dead_engines:
842 jsonable[str(k)] = v
819 jsonable[str(k)] = v
843 content['engines'] = jsonable
820 content['engines'] = jsonable
844 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
821 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
845
822
846 def register_engine(self, reg, msg):
823 def register_engine(self, reg, msg):
847 """Register a new engine."""
824 """Register a new engine."""
848 content = msg['content']
825 content = msg['content']
849 try:
826 try:
850 queue = content['queue']
827 queue = content['queue']
851 except KeyError:
828 except KeyError:
852 self.log.error("registration::queue not specified", exc_info=True)
829 self.log.error("registration::queue not specified", exc_info=True)
853 return
830 return
854 heart = content.get('heartbeat', None)
831 heart = content.get('heartbeat', None)
855 """register a new engine, and create the socket(s) necessary"""
832 """register a new engine, and create the socket(s) necessary"""
856 eid = self._next_id
833 eid = self._next_id
857 # print (eid, queue, reg, heart)
834 # print (eid, queue, reg, heart)
858
835
859 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
836 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
860
837
861 content = dict(id=eid,status='ok')
838 content = dict(id=eid,status='ok')
862 content.update(self.engine_info)
839 content.update(self.engine_info)
863 # check if requesting available IDs:
840 # check if requesting available IDs:
864 if queue in self.by_ident:
841 if queue in self.by_ident:
865 try:
842 try:
866 raise KeyError("queue_id %r in use"%queue)
843 raise KeyError("queue_id %r in use"%queue)
867 except:
844 except:
868 content = error.wrap_exception()
845 content = error.wrap_exception()
869 self.log.error("queue_id %r in use"%queue, exc_info=True)
846 self.log.error("queue_id %r in use"%queue, exc_info=True)
870 elif heart in self.hearts: # need to check unique hearts?
847 elif heart in self.hearts: # need to check unique hearts?
871 try:
848 try:
872 raise KeyError("heart_id %r in use"%heart)
849 raise KeyError("heart_id %r in use"%heart)
873 except:
850 except:
874 self.log.error("heart_id %r in use"%heart, exc_info=True)
851 self.log.error("heart_id %r in use"%heart, exc_info=True)
875 content = error.wrap_exception()
852 content = error.wrap_exception()
876 else:
853 else:
877 for h, pack in self.incoming_registrations.iteritems():
854 for h, pack in self.incoming_registrations.iteritems():
878 if heart == h:
855 if heart == h:
879 try:
856 try:
880 raise KeyError("heart_id %r in use"%heart)
857 raise KeyError("heart_id %r in use"%heart)
881 except:
858 except:
882 self.log.error("heart_id %r in use"%heart, exc_info=True)
859 self.log.error("heart_id %r in use"%heart, exc_info=True)
883 content = error.wrap_exception()
860 content = error.wrap_exception()
884 break
861 break
885 elif queue == pack[1]:
862 elif queue == pack[1]:
886 try:
863 try:
887 raise KeyError("queue_id %r in use"%queue)
864 raise KeyError("queue_id %r in use"%queue)
888 except:
865 except:
889 self.log.error("queue_id %r in use"%queue, exc_info=True)
866 self.log.error("queue_id %r in use"%queue, exc_info=True)
890 content = error.wrap_exception()
867 content = error.wrap_exception()
891 break
868 break
892
869
893 msg = self.session.send(self.query, "registration_reply",
870 msg = self.session.send(self.query, "registration_reply",
894 content=content,
871 content=content,
895 ident=reg)
872 ident=reg)
896
873
897 if content['status'] == 'ok':
874 if content['status'] == 'ok':
898 if heart in self.heartmonitor.hearts:
875 if heart in self.heartmonitor.hearts:
899 # already beating
876 # already beating
900 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
877 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
901 self.finish_registration(heart)
878 self.finish_registration(heart)
902 else:
879 else:
903 purge = lambda : self._purge_stalled_registration(heart)
880 purge = lambda : self._purge_stalled_registration(heart)
904 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
881 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
905 dc.start()
882 dc.start()
906 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
883 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
907 else:
884 else:
908 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
885 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
909 return eid
886 return eid
910
887
911 def unregister_engine(self, ident, msg):
888 def unregister_engine(self, ident, msg):
912 """Unregister an engine that explicitly requested to leave."""
889 """Unregister an engine that explicitly requested to leave."""
913 try:
890 try:
914 eid = msg['content']['id']
891 eid = msg['content']['id']
915 except:
892 except:
916 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
893 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
917 return
894 return
918 self.log.info("registration::unregister_engine(%s)"%eid)
895 self.log.info("registration::unregister_engine(%r)"%eid)
919 # print (eid)
896 # print (eid)
920 uuid = self.keytable[eid]
897 uuid = self.keytable[eid]
921 content=dict(id=eid, queue=uuid)
898 content=dict(id=eid, queue=uuid)
922 self.dead_engines.add(uuid)
899 self.dead_engines.add(uuid)
923 # self.ids.remove(eid)
900 # self.ids.remove(eid)
924 # uuid = self.keytable.pop(eid)
901 # uuid = self.keytable.pop(eid)
925 #
902 #
926 # ec = self.engines.pop(eid)
903 # ec = self.engines.pop(eid)
927 # self.hearts.pop(ec.heartbeat)
904 # self.hearts.pop(ec.heartbeat)
928 # self.by_ident.pop(ec.queue)
905 # self.by_ident.pop(ec.queue)
929 # self.completed.pop(eid)
906 # self.completed.pop(eid)
930 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
907 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
931 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
908 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
932 dc.start()
909 dc.start()
933 ############## TODO: HANDLE IT ################
910 ############## TODO: HANDLE IT ################
934
911
935 if self.notifier:
912 if self.notifier:
936 self.session.send(self.notifier, "unregistration_notification", content=content)
913 self.session.send(self.notifier, "unregistration_notification", content=content)
937
914
938 def _handle_stranded_msgs(self, eid, uuid):
915 def _handle_stranded_msgs(self, eid, uuid):
939 """Handle messages known to be on an engine when the engine unregisters.
916 """Handle messages known to be on an engine when the engine unregisters.
940
917
941 It is possible that this will fire prematurely - that is, an engine will
918 It is possible that this will fire prematurely - that is, an engine will
942 go down after completing a result, and the client will be notified
919 go down after completing a result, and the client will be notified
943 that the result failed and later receive the actual result.
920 that the result failed and later receive the actual result.
944 """
921 """
945
922
946 outstanding = self.queues[eid]
923 outstanding = self.queues[eid]
947
924
948 for msg_id in outstanding:
925 for msg_id in outstanding:
949 self.pending.remove(msg_id)
926 self.pending.remove(msg_id)
950 self.all_completed.add(msg_id)
927 self.all_completed.add(msg_id)
951 try:
928 try:
952 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
929 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
953 except:
930 except:
954 content = error.wrap_exception()
931 content = error.wrap_exception()
955 # build a fake header:
932 # build a fake header:
956 header = {}
933 header = {}
957 header['engine'] = uuid
934 header['engine'] = uuid
958 header['date'] = datetime.now()
935 header['date'] = datetime.now()
959 rec = dict(result_content=content, result_header=header, result_buffers=[])
936 rec = dict(result_content=content, result_header=header, result_buffers=[])
960 rec['completed'] = header['date']
937 rec['completed'] = header['date']
961 rec['engine_uuid'] = uuid
938 rec['engine_uuid'] = uuid
962 try:
939 try:
963 self.db.update_record(msg_id, rec)
940 self.db.update_record(msg_id, rec)
964 except Exception:
941 except Exception:
965 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
942 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
966
943
967
944
968 def finish_registration(self, heart):
945 def finish_registration(self, heart):
969 """Second half of engine registration, called after our HeartMonitor
946 """Second half of engine registration, called after our HeartMonitor
970 has received a beat from the Engine's Heart."""
947 has received a beat from the Engine's Heart."""
971 try:
948 try:
972 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
949 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
973 except KeyError:
950 except KeyError:
974 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
951 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
975 return
952 return
976 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
953 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
977 if purge is not None:
954 if purge is not None:
978 purge.stop()
955 purge.stop()
979 control = queue
956 control = queue
980 self.ids.add(eid)
957 self.ids.add(eid)
981 self.keytable[eid] = queue
958 self.keytable[eid] = queue
982 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
959 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
983 control=control, heartbeat=heart)
960 control=control, heartbeat=heart)
984 self.by_ident[queue] = eid
961 self.by_ident[queue] = eid
985 self.queues[eid] = list()
962 self.queues[eid] = list()
986 self.tasks[eid] = list()
963 self.tasks[eid] = list()
987 self.completed[eid] = list()
964 self.completed[eid] = list()
988 self.hearts[heart] = eid
965 self.hearts[heart] = eid
989 content = dict(id=eid, queue=self.engines[eid].queue)
966 content = dict(id=eid, queue=self.engines[eid].queue)
990 if self.notifier:
967 if self.notifier:
991 self.session.send(self.notifier, "registration_notification", content=content)
968 self.session.send(self.notifier, "registration_notification", content=content)
992 self.log.info("engine::Engine Connected: %i"%eid)
969 self.log.info("engine::Engine Connected: %i"%eid)
993
970
994 def _purge_stalled_registration(self, heart):
971 def _purge_stalled_registration(self, heart):
995 if heart in self.incoming_registrations:
972 if heart in self.incoming_registrations:
996 eid = self.incoming_registrations.pop(heart)[0]
973 eid = self.incoming_registrations.pop(heart)[0]
997 self.log.info("registration::purging stalled registration: %i"%eid)
974 self.log.info("registration::purging stalled registration: %i"%eid)
998 else:
975 else:
999 pass
976 pass
1000
977
1001 #-------------------------------------------------------------------------
978 #-------------------------------------------------------------------------
1002 # Client Requests
979 # Client Requests
1003 #-------------------------------------------------------------------------
980 #-------------------------------------------------------------------------
1004
981
1005 def shutdown_request(self, client_id, msg):
982 def shutdown_request(self, client_id, msg):
1006 """handle shutdown request."""
983 """handle shutdown request."""
1007 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
984 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1008 # also notify other clients of shutdown
985 # also notify other clients of shutdown
1009 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
986 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1010 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
987 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1011 dc.start()
988 dc.start()
1012
989
1013 def _shutdown(self):
990 def _shutdown(self):
1014 self.log.info("hub::hub shutting down.")
991 self.log.info("hub::hub shutting down.")
1015 time.sleep(0.1)
992 time.sleep(0.1)
1016 sys.exit(0)
993 sys.exit(0)
1017
994
1018
995
1019 def check_load(self, client_id, msg):
996 def check_load(self, client_id, msg):
1020 content = msg['content']
997 content = msg['content']
1021 try:
998 try:
1022 targets = content['targets']
999 targets = content['targets']
1023 targets = self._validate_targets(targets)
1000 targets = self._validate_targets(targets)
1024 except:
1001 except:
1025 content = error.wrap_exception()
1002 content = error.wrap_exception()
1026 self.session.send(self.query, "hub_error",
1003 self.session.send(self.query, "hub_error",
1027 content=content, ident=client_id)
1004 content=content, ident=client_id)
1028 return
1005 return
1029
1006
1030 content = dict(status='ok')
1007 content = dict(status='ok')
1031 # loads = {}
1008 # loads = {}
1032 for t in targets:
1009 for t in targets:
1033 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1010 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1034 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1011 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1035
1012
1036
1013
1037 def queue_status(self, client_id, msg):
1014 def queue_status(self, client_id, msg):
1038 """Return the Queue status of one or more targets.
1015 """Return the Queue status of one or more targets.
1039 if verbose: return the msg_ids
1016 if verbose: return the msg_ids
1040 else: return len of each type.
1017 else: return len of each type.
1041 keys: queue (pending MUX jobs)
1018 keys: queue (pending MUX jobs)
1042 tasks (pending Task jobs)
1019 tasks (pending Task jobs)
1043 completed (finished jobs from both queues)"""
1020 completed (finished jobs from both queues)"""
1044 content = msg['content']
1021 content = msg['content']
1045 targets = content['targets']
1022 targets = content['targets']
1046 try:
1023 try:
1047 targets = self._validate_targets(targets)
1024 targets = self._validate_targets(targets)
1048 except:
1025 except:
1049 content = error.wrap_exception()
1026 content = error.wrap_exception()
1050 self.session.send(self.query, "hub_error",
1027 self.session.send(self.query, "hub_error",
1051 content=content, ident=client_id)
1028 content=content, ident=client_id)
1052 return
1029 return
1053 verbose = content.get('verbose', False)
1030 verbose = content.get('verbose', False)
1054 content = dict(status='ok')
1031 content = dict(status='ok')
1055 for t in targets:
1032 for t in targets:
1056 queue = self.queues[t]
1033 queue = self.queues[t]
1057 completed = self.completed[t]
1034 completed = self.completed[t]
1058 tasks = self.tasks[t]
1035 tasks = self.tasks[t]
1059 if not verbose:
1036 if not verbose:
1060 queue = len(queue)
1037 queue = len(queue)
1061 completed = len(completed)
1038 completed = len(completed)
1062 tasks = len(tasks)
1039 tasks = len(tasks)
1063 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1040 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1064 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1041 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1065
1042
1066 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1043 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1067
1044
1068 def purge_results(self, client_id, msg):
1045 def purge_results(self, client_id, msg):
1069 """Purge results from memory. This method is more valuable before we move
1046 """Purge results from memory. This method is more valuable before we move
1070 to a DB based message storage mechanism."""
1047 to a DB based message storage mechanism."""
1071 content = msg['content']
1048 content = msg['content']
1072 msg_ids = content.get('msg_ids', [])
1049 msg_ids = content.get('msg_ids', [])
1073 reply = dict(status='ok')
1050 reply = dict(status='ok')
1074 if msg_ids == 'all':
1051 if msg_ids == 'all':
1075 try:
1052 try:
1076 self.db.drop_matching_records(dict(completed={'$ne':None}))
1053 self.db.drop_matching_records(dict(completed={'$ne':None}))
1077 except Exception:
1054 except Exception:
1078 reply = error.wrap_exception()
1055 reply = error.wrap_exception()
1079 else:
1056 else:
1080 pending = filter(lambda m: m in self.pending, msg_ids)
1057 pending = filter(lambda m: m in self.pending, msg_ids)
1081 if pending:
1058 if pending:
1082 try:
1059 try:
1083 raise IndexError("msg pending: %r"%pending[0])
1060 raise IndexError("msg pending: %r"%pending[0])
1084 except:
1061 except:
1085 reply = error.wrap_exception()
1062 reply = error.wrap_exception()
1086 else:
1063 else:
1087 try:
1064 try:
1088 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1065 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1089 except Exception:
1066 except Exception:
1090 reply = error.wrap_exception()
1067 reply = error.wrap_exception()
1091
1068
1092 if reply['status'] == 'ok':
1069 if reply['status'] == 'ok':
1093 eids = content.get('engine_ids', [])
1070 eids = content.get('engine_ids', [])
1094 for eid in eids:
1071 for eid in eids:
1095 if eid not in self.engines:
1072 if eid not in self.engines:
1096 try:
1073 try:
1097 raise IndexError("No such engine: %i"%eid)
1074 raise IndexError("No such engine: %i"%eid)
1098 except:
1075 except:
1099 reply = error.wrap_exception()
1076 reply = error.wrap_exception()
1100 break
1077 break
1101 msg_ids = self.completed.pop(eid)
1078 msg_ids = self.completed.pop(eid)
1102 uid = self.engines[eid].queue
1079 uid = self.engines[eid].queue
1103 try:
1080 try:
1104 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1081 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1105 except Exception:
1082 except Exception:
1106 reply = error.wrap_exception()
1083 reply = error.wrap_exception()
1107 break
1084 break
1108
1085
1109 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1086 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1110
1087
1111 def resubmit_task(self, client_id, msg):
1088 def resubmit_task(self, client_id, msg):
1112 """Resubmit one or more tasks."""
1089 """Resubmit one or more tasks."""
1113 def finish(reply):
1090 def finish(reply):
1114 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1091 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1115
1092
1116 content = msg['content']
1093 content = msg['content']
1117 msg_ids = content['msg_ids']
1094 msg_ids = content['msg_ids']
1118 reply = dict(status='ok')
1095 reply = dict(status='ok')
1119 try:
1096 try:
1120 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1097 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1121 'header', 'content', 'buffers'])
1098 'header', 'content', 'buffers'])
1122 except Exception:
1099 except Exception:
1123 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1100 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1124 return finish(error.wrap_exception())
1101 return finish(error.wrap_exception())
1125
1102
1126 # validate msg_ids
1103 # validate msg_ids
1127 found_ids = [ rec['msg_id'] for rec in records ]
1104 found_ids = [ rec['msg_id'] for rec in records ]
1128 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1105 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1129 if len(records) > len(msg_ids):
1106 if len(records) > len(msg_ids):
1130 try:
1107 try:
1131 raise RuntimeError("DB appears to be in an inconsistent state."
1108 raise RuntimeError("DB appears to be in an inconsistent state."
1132 "More matching records were found than should exist")
1109 "More matching records were found than should exist")
1133 except Exception:
1110 except Exception:
1134 return finish(error.wrap_exception())
1111 return finish(error.wrap_exception())
1135 elif len(records) < len(msg_ids):
1112 elif len(records) < len(msg_ids):
1136 missing = [ m for m in msg_ids if m not in found_ids ]
1113 missing = [ m for m in msg_ids if m not in found_ids ]
1137 try:
1114 try:
1138 raise KeyError("No such msg(s): %s"%missing)
1115 raise KeyError("No such msg(s): %r"%missing)
1139 except KeyError:
1116 except KeyError:
1140 return finish(error.wrap_exception())
1117 return finish(error.wrap_exception())
1141 elif invalid_ids:
1118 elif invalid_ids:
1142 msg_id = invalid_ids[0]
1119 msg_id = invalid_ids[0]
1143 try:
1120 try:
1144 raise ValueError("Task %r appears to be inflight"%(msg_id))
1121 raise ValueError("Task %r appears to be inflight"%(msg_id))
1145 except Exception:
1122 except Exception:
1146 return finish(error.wrap_exception())
1123 return finish(error.wrap_exception())
1147
1124
1148 # clear the existing records
1125 # clear the existing records
1149 rec = empty_record()
1126 rec = empty_record()
1150 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1127 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1151 rec['resubmitted'] = datetime.now()
1128 rec['resubmitted'] = datetime.now()
1152 rec['queue'] = 'task'
1129 rec['queue'] = 'task'
1153 rec['client_uuid'] = client_id[0]
1130 rec['client_uuid'] = client_id[0]
1154 try:
1131 try:
1155 for msg_id in msg_ids:
1132 for msg_id in msg_ids:
1156 self.all_completed.discard(msg_id)
1133 self.all_completed.discard(msg_id)
1157 self.db.update_record(msg_id, rec)
1134 self.db.update_record(msg_id, rec)
1158 except Exception:
1135 except Exception:
1159 self.log.error('db::db error upating record', exc_info=True)
1136 self.log.error('db::db error upating record', exc_info=True)
1160 reply = error.wrap_exception()
1137 reply = error.wrap_exception()
1161 else:
1138 else:
1162 # send the messages
1139 # send the messages
1163 for rec in records:
1140 for rec in records:
1164 header = rec['header']
1141 header = rec['header']
1165 msg = self.session.msg(header['msg_type'])
1142 msg = self.session.msg(header['msg_type'])
1166 msg['content'] = rec['content']
1143 msg['content'] = rec['content']
1167 msg['header'] = header
1144 msg['header'] = header
1168 msg['msg_id'] = rec['msg_id']
1145 msg['msg_id'] = rec['msg_id']
1169 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1146 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1170
1147
1171 finish(dict(status='ok'))
1148 finish(dict(status='ok'))
1172
1149
1173
1150
1174 def _extract_record(self, rec):
1151 def _extract_record(self, rec):
1175 """decompose a TaskRecord dict into subsection of reply for get_result"""
1152 """decompose a TaskRecord dict into subsection of reply for get_result"""
1176 io_dict = {}
1153 io_dict = {}
1177 for key in 'pyin pyout pyerr stdout stderr'.split():
1154 for key in 'pyin pyout pyerr stdout stderr'.split():
1178 io_dict[key] = rec[key]
1155 io_dict[key] = rec[key]
1179 content = { 'result_content': rec['result_content'],
1156 content = { 'result_content': rec['result_content'],
1180 'header': rec['header'],
1157 'header': rec['header'],
1181 'result_header' : rec['result_header'],
1158 'result_header' : rec['result_header'],
1182 'io' : io_dict,
1159 'io' : io_dict,
1183 }
1160 }
1184 if rec['result_buffers']:
1161 if rec['result_buffers']:
1185 buffers = map(str, rec['result_buffers'])
1162 buffers = map(str, rec['result_buffers'])
1186 else:
1163 else:
1187 buffers = []
1164 buffers = []
1188
1165
1189 return content, buffers
1166 return content, buffers
1190
1167
1191 def get_results(self, client_id, msg):
1168 def get_results(self, client_id, msg):
1192 """Get the result of 1 or more messages."""
1169 """Get the result of 1 or more messages."""
1193 content = msg['content']
1170 content = msg['content']
1194 msg_ids = sorted(set(content['msg_ids']))
1171 msg_ids = sorted(set(content['msg_ids']))
1195 statusonly = content.get('status_only', False)
1172 statusonly = content.get('status_only', False)
1196 pending = []
1173 pending = []
1197 completed = []
1174 completed = []
1198 content = dict(status='ok')
1175 content = dict(status='ok')
1199 content['pending'] = pending
1176 content['pending'] = pending
1200 content['completed'] = completed
1177 content['completed'] = completed
1201 buffers = []
1178 buffers = []
1202 if not statusonly:
1179 if not statusonly:
1203 try:
1180 try:
1204 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1181 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1205 # turn match list into dict, for faster lookup
1182 # turn match list into dict, for faster lookup
1206 records = {}
1183 records = {}
1207 for rec in matches:
1184 for rec in matches:
1208 records[rec['msg_id']] = rec
1185 records[rec['msg_id']] = rec
1209 except Exception:
1186 except Exception:
1210 content = error.wrap_exception()
1187 content = error.wrap_exception()
1211 self.session.send(self.query, "result_reply", content=content,
1188 self.session.send(self.query, "result_reply", content=content,
1212 parent=msg, ident=client_id)
1189 parent=msg, ident=client_id)
1213 return
1190 return
1214 else:
1191 else:
1215 records = {}
1192 records = {}
1216 for msg_id in msg_ids:
1193 for msg_id in msg_ids:
1217 if msg_id in self.pending:
1194 if msg_id in self.pending:
1218 pending.append(msg_id)
1195 pending.append(msg_id)
1219 elif msg_id in self.all_completed:
1196 elif msg_id in self.all_completed:
1220 completed.append(msg_id)
1197 completed.append(msg_id)
1221 if not statusonly:
1198 if not statusonly:
1222 c,bufs = self._extract_record(records[msg_id])
1199 c,bufs = self._extract_record(records[msg_id])
1223 content[msg_id] = c
1200 content[msg_id] = c
1224 buffers.extend(bufs)
1201 buffers.extend(bufs)
1225 elif msg_id in records:
1202 elif msg_id in records:
1226 if rec['completed']:
1203 if rec['completed']:
1227 completed.append(msg_id)
1204 completed.append(msg_id)
1228 c,bufs = self._extract_record(records[msg_id])
1205 c,bufs = self._extract_record(records[msg_id])
1229 content[msg_id] = c
1206 content[msg_id] = c
1230 buffers.extend(bufs)
1207 buffers.extend(bufs)
1231 else:
1208 else:
1232 pending.append(msg_id)
1209 pending.append(msg_id)
1233 else:
1210 else:
1234 try:
1211 try:
1235 raise KeyError('No such message: '+msg_id)
1212 raise KeyError('No such message: '+msg_id)
1236 except:
1213 except:
1237 content = error.wrap_exception()
1214 content = error.wrap_exception()
1238 break
1215 break
1239 self.session.send(self.query, "result_reply", content=content,
1216 self.session.send(self.query, "result_reply", content=content,
1240 parent=msg, ident=client_id,
1217 parent=msg, ident=client_id,
1241 buffers=buffers)
1218 buffers=buffers)
1242
1219
1243 def get_history(self, client_id, msg):
1220 def get_history(self, client_id, msg):
1244 """Get a list of all msg_ids in our DB records"""
1221 """Get a list of all msg_ids in our DB records"""
1245 try:
1222 try:
1246 msg_ids = self.db.get_history()
1223 msg_ids = self.db.get_history()
1247 except Exception as e:
1224 except Exception as e:
1248 content = error.wrap_exception()
1225 content = error.wrap_exception()
1249 else:
1226 else:
1250 content = dict(status='ok', history=msg_ids)
1227 content = dict(status='ok', history=msg_ids)
1251
1228
1252 self.session.send(self.query, "history_reply", content=content,
1229 self.session.send(self.query, "history_reply", content=content,
1253 parent=msg, ident=client_id)
1230 parent=msg, ident=client_id)
1254
1231
1255 def db_query(self, client_id, msg):
1232 def db_query(self, client_id, msg):
1256 """Perform a raw query on the task record database."""
1233 """Perform a raw query on the task record database."""
1257 content = msg['content']
1234 content = msg['content']
1258 query = content.get('query', {})
1235 query = content.get('query', {})
1259 keys = content.get('keys', None)
1236 keys = content.get('keys', None)
1260 query = util.extract_dates(query)
1237 query = util.extract_dates(query)
1261 buffers = []
1238 buffers = []
1262 empty = list()
1239 empty = list()
1263
1240
1264 try:
1241 try:
1265 records = self.db.find_records(query, keys)
1242 records = self.db.find_records(query, keys)
1266 except Exception as e:
1243 except Exception as e:
1267 content = error.wrap_exception()
1244 content = error.wrap_exception()
1268 else:
1245 else:
1269 # extract buffers from reply content:
1246 # extract buffers from reply content:
1270 if keys is not None:
1247 if keys is not None:
1271 buffer_lens = [] if 'buffers' in keys else None
1248 buffer_lens = [] if 'buffers' in keys else None
1272 result_buffer_lens = [] if 'result_buffers' in keys else None
1249 result_buffer_lens = [] if 'result_buffers' in keys else None
1273 else:
1250 else:
1274 buffer_lens = []
1251 buffer_lens = []
1275 result_buffer_lens = []
1252 result_buffer_lens = []
1276
1253
1277 for rec in records:
1254 for rec in records:
1278 # buffers may be None, so double check
1255 # buffers may be None, so double check
1279 if buffer_lens is not None:
1256 if buffer_lens is not None:
1280 b = rec.pop('buffers', empty) or empty
1257 b = rec.pop('buffers', empty) or empty
1281 buffer_lens.append(len(b))
1258 buffer_lens.append(len(b))
1282 buffers.extend(b)
1259 buffers.extend(b)
1283 if result_buffer_lens is not None:
1260 if result_buffer_lens is not None:
1284 rb = rec.pop('result_buffers', empty) or empty
1261 rb = rec.pop('result_buffers', empty) or empty
1285 result_buffer_lens.append(len(rb))
1262 result_buffer_lens.append(len(rb))
1286 buffers.extend(rb)
1263 buffers.extend(rb)
1287 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1264 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1288 result_buffer_lens=result_buffer_lens)
1265 result_buffer_lens=result_buffer_lens)
1289
1266
1290 self.session.send(self.query, "db_reply", content=content,
1267 self.session.send(self.query, "db_reply", content=content,
1291 parent=msg, ident=client_id,
1268 parent=msg, ident=client_id,
1292 buffers=buffers)
1269 buffers=buffers)
1293
1270
@@ -1,679 +1,679 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #----------------------------------------------------------------------
14 #----------------------------------------------------------------------
15 # Imports
15 # Imports
16 #----------------------------------------------------------------------
16 #----------------------------------------------------------------------
17
17
18 from __future__ import print_function
18 from __future__ import print_function
19
19
20 import logging
20 import logging
21 import sys
21 import sys
22
22
23 from datetime import datetime, timedelta
23 from datetime import datetime, timedelta
24 from random import randint, random
24 from random import randint, random
25 from types import FunctionType
25 from types import FunctionType
26
26
27 try:
27 try:
28 import numpy
28 import numpy
29 except ImportError:
29 except ImportError:
30 numpy = None
30 numpy = None
31
31
32 import zmq
32 import zmq
33 from zmq.eventloop import ioloop, zmqstream
33 from zmq.eventloop import ioloop, zmqstream
34
34
35 # local imports
35 # local imports
36 from IPython.external.decorator import decorator
36 from IPython.external.decorator import decorator
37 from IPython.config.loader import Config
37 from IPython.config.loader import Config
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Str, Enum
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Str, Enum
39
39
40 from IPython.parallel import error
40 from IPython.parallel import error
41 from IPython.parallel.factory import SessionFactory
41 from IPython.parallel.factory import SessionFactory
42 from IPython.parallel.util import connect_logger, local_logger
42 from IPython.parallel.util import connect_logger, local_logger
43
43
44 from .dependency import Dependency
44 from .dependency import Dependency
45
45
46 @decorator
46 @decorator
47 def logged(f,self,*args,**kwargs):
47 def logged(f,self,*args,**kwargs):
48 # print ("#--------------------")
48 # print ("#--------------------")
49 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
49 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
50 # print ("#--")
50 # print ("#--")
51 return f(self,*args, **kwargs)
51 return f(self,*args, **kwargs)
52
52
53 #----------------------------------------------------------------------
53 #----------------------------------------------------------------------
54 # Chooser functions
54 # Chooser functions
55 #----------------------------------------------------------------------
55 #----------------------------------------------------------------------
56
56
57 def plainrandom(loads):
57 def plainrandom(loads):
58 """Plain random pick."""
58 """Plain random pick."""
59 n = len(loads)
59 n = len(loads)
60 return randint(0,n-1)
60 return randint(0,n-1)
61
61
62 def lru(loads):
62 def lru(loads):
63 """Always pick the front of the line.
63 """Always pick the front of the line.
64
64
65 The content of `loads` is ignored.
65 The content of `loads` is ignored.
66
66
67 Assumes LRU ordering of loads, with oldest first.
67 Assumes LRU ordering of loads, with oldest first.
68 """
68 """
69 return 0
69 return 0
70
70
71 def twobin(loads):
71 def twobin(loads):
72 """Pick two at random, use the LRU of the two.
72 """Pick two at random, use the LRU of the two.
73
73
74 The content of loads is ignored.
74 The content of loads is ignored.
75
75
76 Assumes LRU ordering of loads, with oldest first.
76 Assumes LRU ordering of loads, with oldest first.
77 """
77 """
78 n = len(loads)
78 n = len(loads)
79 a = randint(0,n-1)
79 a = randint(0,n-1)
80 b = randint(0,n-1)
80 b = randint(0,n-1)
81 return min(a,b)
81 return min(a,b)
82
82
83 def weighted(loads):
83 def weighted(loads):
84 """Pick two at random using inverse load as weight.
84 """Pick two at random using inverse load as weight.
85
85
86 Return the less loaded of the two.
86 Return the less loaded of the two.
87 """
87 """
88 # weight 0 a million times more than 1:
88 # weight 0 a million times more than 1:
89 weights = 1./(1e-6+numpy.array(loads))
89 weights = 1./(1e-6+numpy.array(loads))
90 sums = weights.cumsum()
90 sums = weights.cumsum()
91 t = sums[-1]
91 t = sums[-1]
92 x = random()*t
92 x = random()*t
93 y = random()*t
93 y = random()*t
94 idx = 0
94 idx = 0
95 idy = 0
95 idy = 0
96 while sums[idx] < x:
96 while sums[idx] < x:
97 idx += 1
97 idx += 1
98 while sums[idy] < y:
98 while sums[idy] < y:
99 idy += 1
99 idy += 1
100 if weights[idy] > weights[idx]:
100 if weights[idy] > weights[idx]:
101 return idy
101 return idy
102 else:
102 else:
103 return idx
103 return idx
104
104
105 def leastload(loads):
105 def leastload(loads):
106 """Always choose the lowest load.
106 """Always choose the lowest load.
107
107
108 If the lowest load occurs more than once, the first
108 If the lowest load occurs more than once, the first
109 occurance will be used. If loads has LRU ordering, this means
109 occurance will be used. If loads has LRU ordering, this means
110 the LRU of those with the lowest load is chosen.
110 the LRU of those with the lowest load is chosen.
111 """
111 """
112 return loads.index(min(loads))
112 return loads.index(min(loads))
113
113
114 #---------------------------------------------------------------------
114 #---------------------------------------------------------------------
115 # Classes
115 # Classes
116 #---------------------------------------------------------------------
116 #---------------------------------------------------------------------
117 # store empty default dependency:
117 # store empty default dependency:
118 MET = Dependency([])
118 MET = Dependency([])
119
119
120 class TaskScheduler(SessionFactory):
120 class TaskScheduler(SessionFactory):
121 """Python TaskScheduler object.
121 """Python TaskScheduler object.
122
122
123 This is the simplest object that supports msg_id based
123 This is the simplest object that supports msg_id based
124 DAG dependencies. *Only* task msg_ids are checked, not
124 DAG dependencies. *Only* task msg_ids are checked, not
125 msg_ids of jobs submitted via the MUX queue.
125 msg_ids of jobs submitted via the MUX queue.
126
126
127 """
127 """
128
128
129 hwm = Int(0, config=True, shortname='hwm',
129 hwm = Int(0, config=True, shortname='hwm',
130 help="""specify the High Water Mark (HWM) for the downstream
130 help="""specify the High Water Mark (HWM) for the downstream
131 socket in the Task scheduler. This is the maximum number
131 socket in the Task scheduler. This is the maximum number
132 of allowed outstanding tasks on each engine."""
132 of allowed outstanding tasks on each engine."""
133 )
133 )
134 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
134 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
135 'leastload', config=True, shortname='scheme', allow_none=False,
135 'leastload', config=True, shortname='scheme', allow_none=False,
136 help="""select the task scheduler scheme [default: Python LRU]
136 help="""select the task scheduler scheme [default: Python LRU]
137 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
137 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
138 )
138 )
139 def _scheme_name_changed(self, old, new):
139 def _scheme_name_changed(self, old, new):
140 self.log.debug("Using scheme %r"%new)
140 self.log.debug("Using scheme %r"%new)
141 self.scheme = globals()[new]
141 self.scheme = globals()[new]
142
142
143 # input arguments:
143 # input arguments:
144 scheme = Instance(FunctionType) # function for determining the destination
144 scheme = Instance(FunctionType) # function for determining the destination
145 def _scheme_default(self):
145 def _scheme_default(self):
146 return leastload
146 return leastload
147 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
147 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
148 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
148 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
149 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
149 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
150 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
150 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
151
151
152 # internals:
152 # internals:
153 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
153 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
154 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
154 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
155 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
155 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
156 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
156 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
157 pending = Dict() # dict by engine_uuid of submitted tasks
157 pending = Dict() # dict by engine_uuid of submitted tasks
158 completed = Dict() # dict by engine_uuid of completed tasks
158 completed = Dict() # dict by engine_uuid of completed tasks
159 failed = Dict() # dict by engine_uuid of failed tasks
159 failed = Dict() # dict by engine_uuid of failed tasks
160 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
160 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
161 clients = Dict() # dict by msg_id for who submitted the task
161 clients = Dict() # dict by msg_id for who submitted the task
162 targets = List() # list of target IDENTs
162 targets = List() # list of target IDENTs
163 loads = List() # list of engine loads
163 loads = List() # list of engine loads
164 # full = Set() # set of IDENTs that have HWM outstanding tasks
164 # full = Set() # set of IDENTs that have HWM outstanding tasks
165 all_completed = Set() # set of all completed tasks
165 all_completed = Set() # set of all completed tasks
166 all_failed = Set() # set of all failed tasks
166 all_failed = Set() # set of all failed tasks
167 all_done = Set() # set of all finished tasks=union(completed,failed)
167 all_done = Set() # set of all finished tasks=union(completed,failed)
168 all_ids = Set() # set of all submitted task IDs
168 all_ids = Set() # set of all submitted task IDs
169 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
169 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
170 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
170 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
171
171
172
172
173 def start(self):
173 def start(self):
174 self.engine_stream.on_recv(self.dispatch_result, copy=False)
174 self.engine_stream.on_recv(self.dispatch_result, copy=False)
175 self._notification_handlers = dict(
175 self._notification_handlers = dict(
176 registration_notification = self._register_engine,
176 registration_notification = self._register_engine,
177 unregistration_notification = self._unregister_engine
177 unregistration_notification = self._unregister_engine
178 )
178 )
179 self.notifier_stream.on_recv(self.dispatch_notification)
179 self.notifier_stream.on_recv(self.dispatch_notification)
180 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
180 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
181 self.auditor.start()
181 self.auditor.start()
182 self.log.info("Scheduler started...%r"%self)
182 self.log.info("Scheduler started...%r"%self)
183
183
184 def resume_receiving(self):
184 def resume_receiving(self):
185 """Resume accepting jobs."""
185 """Resume accepting jobs."""
186 self.client_stream.on_recv(self.dispatch_submission, copy=False)
186 self.client_stream.on_recv(self.dispatch_submission, copy=False)
187
187
188 def stop_receiving(self):
188 def stop_receiving(self):
189 """Stop accepting jobs while there are no engines.
189 """Stop accepting jobs while there are no engines.
190 Leave them in the ZMQ queue."""
190 Leave them in the ZMQ queue."""
191 self.client_stream.on_recv(None)
191 self.client_stream.on_recv(None)
192
192
193 #-----------------------------------------------------------------------
193 #-----------------------------------------------------------------------
194 # [Un]Registration Handling
194 # [Un]Registration Handling
195 #-----------------------------------------------------------------------
195 #-----------------------------------------------------------------------
196
196
197 def dispatch_notification(self, msg):
197 def dispatch_notification(self, msg):
198 """dispatch register/unregister events."""
198 """dispatch register/unregister events."""
199 idents,msg = self.session.feed_identities(msg)
199 idents,msg = self.session.feed_identities(msg)
200 msg = self.session.unpack_message(msg)
200 msg = self.session.unpack_message(msg)
201 msg_type = msg['msg_type']
201 msg_type = msg['msg_type']
202 handler = self._notification_handlers.get(msg_type, None)
202 handler = self._notification_handlers.get(msg_type, None)
203 if handler is None:
203 if handler is None:
204 raise Exception("Unhandled message type: %s"%msg_type)
204 raise Exception("Unhandled message type: %s"%msg_type)
205 else:
205 else:
206 try:
206 try:
207 handler(str(msg['content']['queue']))
207 handler(str(msg['content']['queue']))
208 except KeyError:
208 except KeyError:
209 self.log.error("task::Invalid notification msg: %s"%msg)
209 self.log.error("task::Invalid notification msg: %s"%msg)
210
210
211 @logged
211 @logged
212 def _register_engine(self, uid):
212 def _register_engine(self, uid):
213 """New engine with ident `uid` became available."""
213 """New engine with ident `uid` became available."""
214 # head of the line:
214 # head of the line:
215 self.targets.insert(0,uid)
215 self.targets.insert(0,uid)
216 self.loads.insert(0,0)
216 self.loads.insert(0,0)
217 # initialize sets
217 # initialize sets
218 self.completed[uid] = set()
218 self.completed[uid] = set()
219 self.failed[uid] = set()
219 self.failed[uid] = set()
220 self.pending[uid] = {}
220 self.pending[uid] = {}
221 if len(self.targets) == 1:
221 if len(self.targets) == 1:
222 self.resume_receiving()
222 self.resume_receiving()
223 # rescan the graph:
223 # rescan the graph:
224 self.update_graph(None)
224 self.update_graph(None)
225
225
226 def _unregister_engine(self, uid):
226 def _unregister_engine(self, uid):
227 """Existing engine with ident `uid` became unavailable."""
227 """Existing engine with ident `uid` became unavailable."""
228 if len(self.targets) == 1:
228 if len(self.targets) == 1:
229 # this was our only engine
229 # this was our only engine
230 self.stop_receiving()
230 self.stop_receiving()
231
231
232 # handle any potentially finished tasks:
232 # handle any potentially finished tasks:
233 self.engine_stream.flush()
233 self.engine_stream.flush()
234
234
235 # don't pop destinations, because they might be used later
235 # don't pop destinations, because they might be used later
236 # map(self.destinations.pop, self.completed.pop(uid))
236 # map(self.destinations.pop, self.completed.pop(uid))
237 # map(self.destinations.pop, self.failed.pop(uid))
237 # map(self.destinations.pop, self.failed.pop(uid))
238
238
239 # prevent this engine from receiving work
239 # prevent this engine from receiving work
240 idx = self.targets.index(uid)
240 idx = self.targets.index(uid)
241 self.targets.pop(idx)
241 self.targets.pop(idx)
242 self.loads.pop(idx)
242 self.loads.pop(idx)
243
243
244 # wait 5 seconds before cleaning up pending jobs, since the results might
244 # wait 5 seconds before cleaning up pending jobs, since the results might
245 # still be incoming
245 # still be incoming
246 if self.pending[uid]:
246 if self.pending[uid]:
247 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
247 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
248 dc.start()
248 dc.start()
249 else:
249 else:
250 self.completed.pop(uid)
250 self.completed.pop(uid)
251 self.failed.pop(uid)
251 self.failed.pop(uid)
252
252
253
253
254 @logged
254 @logged
255 def handle_stranded_tasks(self, engine):
255 def handle_stranded_tasks(self, engine):
256 """Deal with jobs resident in an engine that died."""
256 """Deal with jobs resident in an engine that died."""
257 lost = self.pending[engine]
257 lost = self.pending[engine]
258 for msg_id in lost.keys():
258 for msg_id in lost.keys():
259 if msg_id not in self.pending[engine]:
259 if msg_id not in self.pending[engine]:
260 # prevent double-handling of messages
260 # prevent double-handling of messages
261 continue
261 continue
262
262
263 raw_msg = lost[msg_id][0]
263 raw_msg = lost[msg_id][0]
264
265 idents,msg = self.session.feed_identities(raw_msg, copy=False)
264 idents,msg = self.session.feed_identities(raw_msg, copy=False)
266 msg = self.session.unpack_message(msg, copy=False, content=False)
265 msg = self.session.unpack_message(msg, copy=False, content=False)
267 parent = msg['header']
266 parent = msg['header']
268 idents = [engine, idents[0]]
267 idents = [engine, idents[0]]
269
268
270 # build fake error reply
269 # build fake error reply
271 try:
270 try:
272 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
271 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
273 except:
272 except:
274 content = error.wrap_exception()
273 content = error.wrap_exception()
275 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
274 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
276 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
275 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
277 # and dispatch it
276 # and dispatch it
278 self.dispatch_result(raw_reply)
277 self.dispatch_result(raw_reply)
279
278
280 # finally scrub completed/failed lists
279 # finally scrub completed/failed lists
281 self.completed.pop(engine)
280 self.completed.pop(engine)
282 self.failed.pop(engine)
281 self.failed.pop(engine)
283
282
284
283
285 #-----------------------------------------------------------------------
284 #-----------------------------------------------------------------------
286 # Job Submission
285 # Job Submission
287 #-----------------------------------------------------------------------
286 #-----------------------------------------------------------------------
288 @logged
287 @logged
289 def dispatch_submission(self, raw_msg):
288 def dispatch_submission(self, raw_msg):
290 """Dispatch job submission to appropriate handlers."""
289 """Dispatch job submission to appropriate handlers."""
291 # ensure targets up to date:
290 # ensure targets up to date:
292 self.notifier_stream.flush()
291 self.notifier_stream.flush()
293 try:
292 try:
294 idents, msg = self.session.feed_identities(raw_msg, copy=False)
293 idents, msg = self.session.feed_identities(raw_msg, copy=False)
295 msg = self.session.unpack_message(msg, content=False, copy=False)
294 msg = self.session.unpack_message(msg, content=False, copy=False)
296 except Exception:
295 except Exception:
297 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
296 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
298 return
297 return
299
298
299
300 # send to monitor
300 # send to monitor
301 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
301 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
302
302
303 header = msg['header']
303 header = msg['header']
304 msg_id = header['msg_id']
304 msg_id = header['msg_id']
305 self.all_ids.add(msg_id)
305 self.all_ids.add(msg_id)
306
306
307 # targets
307 # targets
308 targets = set(header.get('targets', []))
308 targets = set(header.get('targets', []))
309 retries = header.get('retries', 0)
309 retries = header.get('retries', 0)
310 self.retries[msg_id] = retries
310 self.retries[msg_id] = retries
311
311
312 # time dependencies
312 # time dependencies
313 after = Dependency(header.get('after', []))
313 after = Dependency(header.get('after', []))
314 if after.all:
314 if after.all:
315 if after.success:
315 if after.success:
316 after.difference_update(self.all_completed)
316 after.difference_update(self.all_completed)
317 if after.failure:
317 if after.failure:
318 after.difference_update(self.all_failed)
318 after.difference_update(self.all_failed)
319 if after.check(self.all_completed, self.all_failed):
319 if after.check(self.all_completed, self.all_failed):
320 # recast as empty set, if `after` already met,
320 # recast as empty set, if `after` already met,
321 # to prevent unnecessary set comparisons
321 # to prevent unnecessary set comparisons
322 after = MET
322 after = MET
323
323
324 # location dependencies
324 # location dependencies
325 follow = Dependency(header.get('follow', []))
325 follow = Dependency(header.get('follow', []))
326
326
327 # turn timeouts into datetime objects:
327 # turn timeouts into datetime objects:
328 timeout = header.get('timeout', None)
328 timeout = header.get('timeout', None)
329 if timeout:
329 if timeout:
330 timeout = datetime.now() + timedelta(0,timeout,0)
330 timeout = datetime.now() + timedelta(0,timeout,0)
331
331
332 args = [raw_msg, targets, after, follow, timeout]
332 args = [raw_msg, targets, after, follow, timeout]
333
333
334 # validate and reduce dependencies:
334 # validate and reduce dependencies:
335 for dep in after,follow:
335 for dep in after,follow:
336 # check valid:
336 # check valid:
337 if msg_id in dep or dep.difference(self.all_ids):
337 if msg_id in dep or dep.difference(self.all_ids):
338 self.depending[msg_id] = args
338 self.depending[msg_id] = args
339 return self.fail_unreachable(msg_id, error.InvalidDependency)
339 return self.fail_unreachable(msg_id, error.InvalidDependency)
340 # check if unreachable:
340 # check if unreachable:
341 if dep.unreachable(self.all_completed, self.all_failed):
341 if dep.unreachable(self.all_completed, self.all_failed):
342 self.depending[msg_id] = args
342 self.depending[msg_id] = args
343 return self.fail_unreachable(msg_id)
343 return self.fail_unreachable(msg_id)
344
344
345 if after.check(self.all_completed, self.all_failed):
345 if after.check(self.all_completed, self.all_failed):
346 # time deps already met, try to run
346 # time deps already met, try to run
347 if not self.maybe_run(msg_id, *args):
347 if not self.maybe_run(msg_id, *args):
348 # can't run yet
348 # can't run yet
349 if msg_id not in self.all_failed:
349 if msg_id not in self.all_failed:
350 # could have failed as unreachable
350 # could have failed as unreachable
351 self.save_unmet(msg_id, *args)
351 self.save_unmet(msg_id, *args)
352 else:
352 else:
353 self.save_unmet(msg_id, *args)
353 self.save_unmet(msg_id, *args)
354
354
355 # @logged
355 # @logged
356 def audit_timeouts(self):
356 def audit_timeouts(self):
357 """Audit all waiting tasks for expired timeouts."""
357 """Audit all waiting tasks for expired timeouts."""
358 now = datetime.now()
358 now = datetime.now()
359 for msg_id in self.depending.keys():
359 for msg_id in self.depending.keys():
360 # must recheck, in case one failure cascaded to another:
360 # must recheck, in case one failure cascaded to another:
361 if msg_id in self.depending:
361 if msg_id in self.depending:
362 raw,after,targets,follow,timeout = self.depending[msg_id]
362 raw,after,targets,follow,timeout = self.depending[msg_id]
363 if timeout and timeout < now:
363 if timeout and timeout < now:
364 self.fail_unreachable(msg_id, error.TaskTimeout)
364 self.fail_unreachable(msg_id, error.TaskTimeout)
365
365
366 @logged
366 @logged
367 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
367 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
368 """a task has become unreachable, send a reply with an ImpossibleDependency
368 """a task has become unreachable, send a reply with an ImpossibleDependency
369 error."""
369 error."""
370 if msg_id not in self.depending:
370 if msg_id not in self.depending:
371 self.log.error("msg %r already failed!"%msg_id)
371 self.log.error("msg %r already failed!"%msg_id)
372 return
372 return
373 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
373 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
374 for mid in follow.union(after):
374 for mid in follow.union(after):
375 if mid in self.graph:
375 if mid in self.graph:
376 self.graph[mid].remove(msg_id)
376 self.graph[mid].remove(msg_id)
377
377
378 # FIXME: unpacking a message I've already unpacked, but didn't save:
378 # FIXME: unpacking a message I've already unpacked, but didn't save:
379 idents,msg = self.session.feed_identities(raw_msg, copy=False)
379 idents,msg = self.session.feed_identities(raw_msg, copy=False)
380 msg = self.session.unpack_message(msg, copy=False, content=False)
380 msg = self.session.unpack_message(msg, copy=False, content=False)
381 header = msg['header']
381 header = msg['header']
382
382
383 try:
383 try:
384 raise why()
384 raise why()
385 except:
385 except:
386 content = error.wrap_exception()
386 content = error.wrap_exception()
387
387
388 self.all_done.add(msg_id)
388 self.all_done.add(msg_id)
389 self.all_failed.add(msg_id)
389 self.all_failed.add(msg_id)
390
390
391 msg = self.session.send(self.client_stream, 'apply_reply', content,
391 msg = self.session.send(self.client_stream, 'apply_reply', content,
392 parent=header, ident=idents)
392 parent=header, ident=idents)
393 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
393 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
394
394
395 self.update_graph(msg_id, success=False)
395 self.update_graph(msg_id, success=False)
396
396
397 @logged
397 @logged
398 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
398 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
399 """check location dependencies, and run if they are met."""
399 """check location dependencies, and run if they are met."""
400 blacklist = self.blacklist.setdefault(msg_id, set())
400 blacklist = self.blacklist.setdefault(msg_id, set())
401 if follow or targets or blacklist or self.hwm:
401 if follow or targets or blacklist or self.hwm:
402 # we need a can_run filter
402 # we need a can_run filter
403 def can_run(idx):
403 def can_run(idx):
404 # check hwm
404 # check hwm
405 if self.hwm and self.loads[idx] == self.hwm:
405 if self.hwm and self.loads[idx] == self.hwm:
406 return False
406 return False
407 target = self.targets[idx]
407 target = self.targets[idx]
408 # check blacklist
408 # check blacklist
409 if target in blacklist:
409 if target in blacklist:
410 return False
410 return False
411 # check targets
411 # check targets
412 if targets and target not in targets:
412 if targets and target not in targets:
413 return False
413 return False
414 # check follow
414 # check follow
415 return follow.check(self.completed[target], self.failed[target])
415 return follow.check(self.completed[target], self.failed[target])
416
416
417 indices = filter(can_run, range(len(self.targets)))
417 indices = filter(can_run, range(len(self.targets)))
418
418
419 if not indices:
419 if not indices:
420 # couldn't run
420 # couldn't run
421 if follow.all:
421 if follow.all:
422 # check follow for impossibility
422 # check follow for impossibility
423 dests = set()
423 dests = set()
424 relevant = set()
424 relevant = set()
425 if follow.success:
425 if follow.success:
426 relevant = self.all_completed
426 relevant = self.all_completed
427 if follow.failure:
427 if follow.failure:
428 relevant = relevant.union(self.all_failed)
428 relevant = relevant.union(self.all_failed)
429 for m in follow.intersection(relevant):
429 for m in follow.intersection(relevant):
430 dests.add(self.destinations[m])
430 dests.add(self.destinations[m])
431 if len(dests) > 1:
431 if len(dests) > 1:
432 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
432 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
433 self.fail_unreachable(msg_id)
433 self.fail_unreachable(msg_id)
434 return False
434 return False
435 if targets:
435 if targets:
436 # check blacklist+targets for impossibility
436 # check blacklist+targets for impossibility
437 targets.difference_update(blacklist)
437 targets.difference_update(blacklist)
438 if not targets or not targets.intersection(self.targets):
438 if not targets or not targets.intersection(self.targets):
439 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
439 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
440 self.fail_unreachable(msg_id)
440 self.fail_unreachable(msg_id)
441 return False
441 return False
442 return False
442 return False
443 else:
443 else:
444 indices = None
444 indices = None
445
445
446 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
446 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
447 return True
447 return True
448
448
449 @logged
449 @logged
450 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
450 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
451 """Save a message for later submission when its dependencies are met."""
451 """Save a message for later submission when its dependencies are met."""
452 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
452 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
453 # track the ids in follow or after, but not those already finished
453 # track the ids in follow or after, but not those already finished
454 for dep_id in after.union(follow).difference(self.all_done):
454 for dep_id in after.union(follow).difference(self.all_done):
455 if dep_id not in self.graph:
455 if dep_id not in self.graph:
456 self.graph[dep_id] = set()
456 self.graph[dep_id] = set()
457 self.graph[dep_id].add(msg_id)
457 self.graph[dep_id].add(msg_id)
458
458
459 @logged
459 @logged
460 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
460 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
461 """Submit a task to any of a subset of our targets."""
461 """Submit a task to any of a subset of our targets."""
462 if indices:
462 if indices:
463 loads = [self.loads[i] for i in indices]
463 loads = [self.loads[i] for i in indices]
464 else:
464 else:
465 loads = self.loads
465 loads = self.loads
466 idx = self.scheme(loads)
466 idx = self.scheme(loads)
467 if indices:
467 if indices:
468 idx = indices[idx]
468 idx = indices[idx]
469 target = self.targets[idx]
469 target = self.targets[idx]
470 # print (target, map(str, msg[:3]))
470 # print (target, map(str, msg[:3]))
471 # send job to the engine
471 # send job to the engine
472 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
472 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
473 self.engine_stream.send_multipart(raw_msg, copy=False)
473 self.engine_stream.send_multipart(raw_msg, copy=False)
474 # update load
474 # update load
475 self.add_job(idx)
475 self.add_job(idx)
476 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
476 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
477 # notify Hub
477 # notify Hub
478 content = dict(msg_id=msg_id, engine_id=target)
478 content = dict(msg_id=msg_id, engine_id=target)
479 self.session.send(self.mon_stream, 'task_destination', content=content,
479 self.session.send(self.mon_stream, 'task_destination', content=content,
480 ident=['tracktask',self.session.session])
480 ident=['tracktask',self.session.session])
481
481
482
482
483 #-----------------------------------------------------------------------
483 #-----------------------------------------------------------------------
484 # Result Handling
484 # Result Handling
485 #-----------------------------------------------------------------------
485 #-----------------------------------------------------------------------
486 @logged
486 @logged
487 def dispatch_result(self, raw_msg):
487 def dispatch_result(self, raw_msg):
488 """dispatch method for result replies"""
488 """dispatch method for result replies"""
489 try:
489 try:
490 idents,msg = self.session.feed_identities(raw_msg, copy=False)
490 idents,msg = self.session.feed_identities(raw_msg, copy=False)
491 msg = self.session.unpack_message(msg, content=False, copy=False)
491 msg = self.session.unpack_message(msg, content=False, copy=False)
492 engine = idents[0]
492 engine = idents[0]
493 try:
493 try:
494 idx = self.targets.index(engine)
494 idx = self.targets.index(engine)
495 except ValueError:
495 except ValueError:
496 pass # skip load-update for dead engines
496 pass # skip load-update for dead engines
497 else:
497 else:
498 self.finish_job(idx)
498 self.finish_job(idx)
499 except Exception:
499 except Exception:
500 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
500 self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True)
501 return
501 return
502
502
503 header = msg['header']
503 header = msg['header']
504 parent = msg['parent_header']
504 parent = msg['parent_header']
505 if header.get('dependencies_met', True):
505 if header.get('dependencies_met', True):
506 success = (header['status'] == 'ok')
506 success = (header['status'] == 'ok')
507 msg_id = parent['msg_id']
507 msg_id = parent['msg_id']
508 retries = self.retries[msg_id]
508 retries = self.retries[msg_id]
509 if not success and retries > 0:
509 if not success and retries > 0:
510 # failed
510 # failed
511 self.retries[msg_id] = retries - 1
511 self.retries[msg_id] = retries - 1
512 self.handle_unmet_dependency(idents, parent)
512 self.handle_unmet_dependency(idents, parent)
513 else:
513 else:
514 del self.retries[msg_id]
514 del self.retries[msg_id]
515 # relay to client and update graph
515 # relay to client and update graph
516 self.handle_result(idents, parent, raw_msg, success)
516 self.handle_result(idents, parent, raw_msg, success)
517 # send to Hub monitor
517 # send to Hub monitor
518 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
518 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
519 else:
519 else:
520 self.handle_unmet_dependency(idents, parent)
520 self.handle_unmet_dependency(idents, parent)
521
521
522 @logged
522 @logged
523 def handle_result(self, idents, parent, raw_msg, success=True):
523 def handle_result(self, idents, parent, raw_msg, success=True):
524 """handle a real task result, either success or failure"""
524 """handle a real task result, either success or failure"""
525 # first, relay result to client
525 # first, relay result to client
526 engine = idents[0]
526 engine = idents[0]
527 client = idents[1]
527 client = idents[1]
528 # swap_ids for XREP-XREP mirror
528 # swap_ids for XREP-XREP mirror
529 raw_msg[:2] = [client,engine]
529 raw_msg[:2] = [client,engine]
530 # print (map(str, raw_msg[:4]))
530 # print (map(str, raw_msg[:4]))
531 self.client_stream.send_multipart(raw_msg, copy=False)
531 self.client_stream.send_multipart(raw_msg, copy=False)
532 # now, update our data structures
532 # now, update our data structures
533 msg_id = parent['msg_id']
533 msg_id = parent['msg_id']
534 self.blacklist.pop(msg_id, None)
534 self.blacklist.pop(msg_id, None)
535 self.pending[engine].pop(msg_id)
535 self.pending[engine].pop(msg_id)
536 if success:
536 if success:
537 self.completed[engine].add(msg_id)
537 self.completed[engine].add(msg_id)
538 self.all_completed.add(msg_id)
538 self.all_completed.add(msg_id)
539 else:
539 else:
540 self.failed[engine].add(msg_id)
540 self.failed[engine].add(msg_id)
541 self.all_failed.add(msg_id)
541 self.all_failed.add(msg_id)
542 self.all_done.add(msg_id)
542 self.all_done.add(msg_id)
543 self.destinations[msg_id] = engine
543 self.destinations[msg_id] = engine
544
544
545 self.update_graph(msg_id, success)
545 self.update_graph(msg_id, success)
546
546
547 @logged
547 @logged
548 def handle_unmet_dependency(self, idents, parent):
548 def handle_unmet_dependency(self, idents, parent):
549 """handle an unmet dependency"""
549 """handle an unmet dependency"""
550 engine = idents[0]
550 engine = idents[0]
551 msg_id = parent['msg_id']
551 msg_id = parent['msg_id']
552
552
553 if msg_id not in self.blacklist:
553 if msg_id not in self.blacklist:
554 self.blacklist[msg_id] = set()
554 self.blacklist[msg_id] = set()
555 self.blacklist[msg_id].add(engine)
555 self.blacklist[msg_id].add(engine)
556
556
557 args = self.pending[engine].pop(msg_id)
557 args = self.pending[engine].pop(msg_id)
558 raw,targets,after,follow,timeout = args
558 raw,targets,after,follow,timeout = args
559
559
560 if self.blacklist[msg_id] == targets:
560 if self.blacklist[msg_id] == targets:
561 self.depending[msg_id] = args
561 self.depending[msg_id] = args
562 self.fail_unreachable(msg_id)
562 self.fail_unreachable(msg_id)
563 elif not self.maybe_run(msg_id, *args):
563 elif not self.maybe_run(msg_id, *args):
564 # resubmit failed
564 # resubmit failed
565 if msg_id not in self.all_failed:
565 if msg_id not in self.all_failed:
566 # put it back in our dependency tree
566 # put it back in our dependency tree
567 self.save_unmet(msg_id, *args)
567 self.save_unmet(msg_id, *args)
568
568
569 if self.hwm:
569 if self.hwm:
570 try:
570 try:
571 idx = self.targets.index(engine)
571 idx = self.targets.index(engine)
572 except ValueError:
572 except ValueError:
573 pass # skip load-update for dead engines
573 pass # skip load-update for dead engines
574 else:
574 else:
575 if self.loads[idx] == self.hwm-1:
575 if self.loads[idx] == self.hwm-1:
576 self.update_graph(None)
576 self.update_graph(None)
577
577
578
578
579
579
580 @logged
580 @logged
581 def update_graph(self, dep_id=None, success=True):
581 def update_graph(self, dep_id=None, success=True):
582 """dep_id just finished. Update our dependency
582 """dep_id just finished. Update our dependency
583 graph and submit any jobs that just became runable.
583 graph and submit any jobs that just became runable.
584
584
585 Called with dep_id=None to update entire graph for hwm, but without finishing
585 Called with dep_id=None to update entire graph for hwm, but without finishing
586 a task.
586 a task.
587 """
587 """
588 # print ("\n\n***********")
588 # print ("\n\n***********")
589 # pprint (dep_id)
589 # pprint (dep_id)
590 # pprint (self.graph)
590 # pprint (self.graph)
591 # pprint (self.depending)
591 # pprint (self.depending)
592 # pprint (self.all_completed)
592 # pprint (self.all_completed)
593 # pprint (self.all_failed)
593 # pprint (self.all_failed)
594 # print ("\n\n***********\n\n")
594 # print ("\n\n***********\n\n")
595 # update any jobs that depended on the dependency
595 # update any jobs that depended on the dependency
596 jobs = self.graph.pop(dep_id, [])
596 jobs = self.graph.pop(dep_id, [])
597
597
598 # recheck *all* jobs if
598 # recheck *all* jobs if
599 # a) we have HWM and an engine just become no longer full
599 # a) we have HWM and an engine just become no longer full
600 # or b) dep_id was given as None
600 # or b) dep_id was given as None
601 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
601 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
602 jobs = self.depending.keys()
602 jobs = self.depending.keys()
603
603
604 for msg_id in jobs:
604 for msg_id in jobs:
605 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
605 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
606
606
607 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
607 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
608 self.fail_unreachable(msg_id)
608 self.fail_unreachable(msg_id)
609
609
610 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
610 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
611 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
611 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
612
612
613 self.depending.pop(msg_id)
613 self.depending.pop(msg_id)
614 for mid in follow.union(after):
614 for mid in follow.union(after):
615 if mid in self.graph:
615 if mid in self.graph:
616 self.graph[mid].remove(msg_id)
616 self.graph[mid].remove(msg_id)
617
617
618 #----------------------------------------------------------------------
618 #----------------------------------------------------------------------
619 # methods to be overridden by subclasses
619 # methods to be overridden by subclasses
620 #----------------------------------------------------------------------
620 #----------------------------------------------------------------------
621
621
622 def add_job(self, idx):
622 def add_job(self, idx):
623 """Called after self.targets[idx] just got the job with header.
623 """Called after self.targets[idx] just got the job with header.
624 Override with subclasses. The default ordering is simple LRU.
624 Override with subclasses. The default ordering is simple LRU.
625 The default loads are the number of outstanding jobs."""
625 The default loads are the number of outstanding jobs."""
626 self.loads[idx] += 1
626 self.loads[idx] += 1
627 for lis in (self.targets, self.loads):
627 for lis in (self.targets, self.loads):
628 lis.append(lis.pop(idx))
628 lis.append(lis.pop(idx))
629
629
630
630
631 def finish_job(self, idx):
631 def finish_job(self, idx):
632 """Called after self.targets[idx] just finished a job.
632 """Called after self.targets[idx] just finished a job.
633 Override with subclasses."""
633 Override with subclasses."""
634 self.loads[idx] -= 1
634 self.loads[idx] -= 1
635
635
636
636
637
637
638 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
638 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
639 log_url=None, loglevel=logging.DEBUG,
639 log_url=None, loglevel=logging.DEBUG,
640 identity=b'task'):
640 identity=b'task'):
641 from zmq.eventloop import ioloop
641 from zmq.eventloop import ioloop
642 from zmq.eventloop.zmqstream import ZMQStream
642 from zmq.eventloop.zmqstream import ZMQStream
643
643
644 if config:
644 if config:
645 # unwrap dict back into Config
645 # unwrap dict back into Config
646 config = Config(config)
646 config = Config(config)
647
647
648 ctx = zmq.Context()
648 ctx = zmq.Context()
649 loop = ioloop.IOLoop()
649 loop = ioloop.IOLoop()
650 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
650 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
651 ins.setsockopt(zmq.IDENTITY, identity)
651 ins.setsockopt(zmq.IDENTITY, identity)
652 ins.bind(in_addr)
652 ins.bind(in_addr)
653
653
654 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
654 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
655 outs.setsockopt(zmq.IDENTITY, identity)
655 outs.setsockopt(zmq.IDENTITY, identity)
656 outs.bind(out_addr)
656 outs.bind(out_addr)
657 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
657 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
658 mons.connect(mon_addr)
658 mons.connect(mon_addr)
659 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
659 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
660 nots.setsockopt(zmq.SUBSCRIBE, '')
660 nots.setsockopt(zmq.SUBSCRIBE, '')
661 nots.connect(not_addr)
661 nots.connect(not_addr)
662
662
663 # setup logging. Note that these will not work in-process, because they clobber
663 # setup logging. Note that these will not work in-process, because they clobber
664 # existing loggers.
664 # existing loggers.
665 if log_url:
665 if log_url:
666 connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
666 connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
667 else:
667 else:
668 local_logger(logname, loglevel)
668 local_logger(logname, loglevel)
669
669
670 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
670 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
671 mon_stream=mons, notifier_stream=nots,
671 mon_stream=mons, notifier_stream=nots,
672 loop=loop, logname=logname,
672 loop=loop, logname=logname,
673 config=config)
673 config=config)
674 scheduler.start()
674 scheduler.start()
675 try:
675 try:
676 loop.start()
676 loop.start()
677 except KeyboardInterrupt:
677 except KeyboardInterrupt:
678 print ("interrupted, exiting...", file=sys.__stderr__)
678 print ("interrupted, exiting...", file=sys.__stderr__)
679
679
General Comments 0
You need to be logged in to leave comments. Login now