##// END OF EJS Templates
add missing buffers to resubmitted tasks...
MinRK -
Show More
@@ -1,1314 +1,1314 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import sys
21 import sys
22 import time
22 import time
23 from datetime import datetime
23 from datetime import datetime
24
24
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27 from zmq.eventloop.zmqstream import ZMQStream
27 from zmq.eventloop.zmqstream import ZMQStream
28
28
29 # internal:
29 # internal:
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31 from IPython.utils.py3compat import cast_bytes
31 from IPython.utils.py3compat import cast_bytes
32 from IPython.utils.traitlets import (
32 from IPython.utils.traitlets import (
33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
34 )
34 )
35
35
36 from IPython.parallel import error, util
36 from IPython.parallel import error, util
37 from IPython.parallel.factory import RegistrationFactory
37 from IPython.parallel.factory import RegistrationFactory
38
38
39 from IPython.zmq.session import SessionFactory
39 from IPython.zmq.session import SessionFactory
40
40
41 from .heartmonitor import HeartMonitor
41 from .heartmonitor import HeartMonitor
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Code
44 # Code
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 def _passer(*args, **kwargs):
47 def _passer(*args, **kwargs):
48 return
48 return
49
49
50 def _printer(*args, **kwargs):
50 def _printer(*args, **kwargs):
51 print (args)
51 print (args)
52 print (kwargs)
52 print (kwargs)
53
53
54 def empty_record():
54 def empty_record():
55 """Return an empty dict with all record keys."""
55 """Return an empty dict with all record keys."""
56 return {
56 return {
57 'msg_id' : None,
57 'msg_id' : None,
58 'header' : None,
58 'header' : None,
59 'content': None,
59 'content': None,
60 'buffers': None,
60 'buffers': None,
61 'submitted': None,
61 'submitted': None,
62 'client_uuid' : None,
62 'client_uuid' : None,
63 'engine_uuid' : None,
63 'engine_uuid' : None,
64 'started': None,
64 'started': None,
65 'completed': None,
65 'completed': None,
66 'resubmitted': None,
66 'resubmitted': None,
67 'received': None,
67 'received': None,
68 'result_header' : None,
68 'result_header' : None,
69 'result_content' : None,
69 'result_content' : None,
70 'result_buffers' : None,
70 'result_buffers' : None,
71 'queue' : None,
71 'queue' : None,
72 'pyin' : None,
72 'pyin' : None,
73 'pyout': None,
73 'pyout': None,
74 'pyerr': None,
74 'pyerr': None,
75 'stdout': '',
75 'stdout': '',
76 'stderr': '',
76 'stderr': '',
77 }
77 }
78
78
79 def init_record(msg):
79 def init_record(msg):
80 """Initialize a TaskRecord based on a request."""
80 """Initialize a TaskRecord based on a request."""
81 header = msg['header']
81 header = msg['header']
82 return {
82 return {
83 'msg_id' : header['msg_id'],
83 'msg_id' : header['msg_id'],
84 'header' : header,
84 'header' : header,
85 'content': msg['content'],
85 'content': msg['content'],
86 'buffers': msg['buffers'],
86 'buffers': msg['buffers'],
87 'submitted': header['date'],
87 'submitted': header['date'],
88 'client_uuid' : None,
88 'client_uuid' : None,
89 'engine_uuid' : None,
89 'engine_uuid' : None,
90 'started': None,
90 'started': None,
91 'completed': None,
91 'completed': None,
92 'resubmitted': None,
92 'resubmitted': None,
93 'received': None,
93 'received': None,
94 'result_header' : None,
94 'result_header' : None,
95 'result_content' : None,
95 'result_content' : None,
96 'result_buffers' : None,
96 'result_buffers' : None,
97 'queue' : None,
97 'queue' : None,
98 'pyin' : None,
98 'pyin' : None,
99 'pyout': None,
99 'pyout': None,
100 'pyerr': None,
100 'pyerr': None,
101 'stdout': '',
101 'stdout': '',
102 'stderr': '',
102 'stderr': '',
103 }
103 }
104
104
105
105
106 class EngineConnector(HasTraits):
106 class EngineConnector(HasTraits):
107 """A simple object for accessing the various zmq connections of an object.
107 """A simple object for accessing the various zmq connections of an object.
108 Attributes are:
108 Attributes are:
109 id (int): engine ID
109 id (int): engine ID
110 uuid (str): uuid (unused?)
110 uuid (str): uuid (unused?)
111 queue (str): identity of queue's XREQ socket
111 queue (str): identity of queue's XREQ socket
112 registration (str): identity of registration XREQ socket
112 registration (str): identity of registration XREQ socket
113 heartbeat (str): identity of heartbeat XREQ socket
113 heartbeat (str): identity of heartbeat XREQ socket
114 """
114 """
115 id=Integer(0)
115 id=Integer(0)
116 queue=CBytes()
116 queue=CBytes()
117 control=CBytes()
117 control=CBytes()
118 registration=CBytes()
118 registration=CBytes()
119 heartbeat=CBytes()
119 heartbeat=CBytes()
120 pending=Set()
120 pending=Set()
121
121
122 class HubFactory(RegistrationFactory):
122 class HubFactory(RegistrationFactory):
123 """The Configurable for setting up a Hub."""
123 """The Configurable for setting up a Hub."""
124
124
125 # port-pairs for monitoredqueues:
125 # port-pairs for monitoredqueues:
126 hb = Tuple(Integer,Integer,config=True,
126 hb = Tuple(Integer,Integer,config=True,
127 help="""XREQ/SUB Port pair for Engine heartbeats""")
127 help="""XREQ/SUB Port pair for Engine heartbeats""")
128 def _hb_default(self):
128 def _hb_default(self):
129 return tuple(util.select_random_ports(2))
129 return tuple(util.select_random_ports(2))
130
130
131 mux = Tuple(Integer,Integer,config=True,
131 mux = Tuple(Integer,Integer,config=True,
132 help="""Engine/Client Port pair for MUX queue""")
132 help="""Engine/Client Port pair for MUX queue""")
133
133
134 def _mux_default(self):
134 def _mux_default(self):
135 return tuple(util.select_random_ports(2))
135 return tuple(util.select_random_ports(2))
136
136
137 task = Tuple(Integer,Integer,config=True,
137 task = Tuple(Integer,Integer,config=True,
138 help="""Engine/Client Port pair for Task queue""")
138 help="""Engine/Client Port pair for Task queue""")
139 def _task_default(self):
139 def _task_default(self):
140 return tuple(util.select_random_ports(2))
140 return tuple(util.select_random_ports(2))
141
141
142 control = Tuple(Integer,Integer,config=True,
142 control = Tuple(Integer,Integer,config=True,
143 help="""Engine/Client Port pair for Control queue""")
143 help="""Engine/Client Port pair for Control queue""")
144
144
145 def _control_default(self):
145 def _control_default(self):
146 return tuple(util.select_random_ports(2))
146 return tuple(util.select_random_ports(2))
147
147
148 iopub = Tuple(Integer,Integer,config=True,
148 iopub = Tuple(Integer,Integer,config=True,
149 help="""Engine/Client Port pair for IOPub relay""")
149 help="""Engine/Client Port pair for IOPub relay""")
150
150
151 def _iopub_default(self):
151 def _iopub_default(self):
152 return tuple(util.select_random_ports(2))
152 return tuple(util.select_random_ports(2))
153
153
154 # single ports:
154 # single ports:
155 mon_port = Integer(config=True,
155 mon_port = Integer(config=True,
156 help="""Monitor (SUB) port for queue traffic""")
156 help="""Monitor (SUB) port for queue traffic""")
157
157
158 def _mon_port_default(self):
158 def _mon_port_default(self):
159 return util.select_random_ports(1)[0]
159 return util.select_random_ports(1)[0]
160
160
161 notifier_port = Integer(config=True,
161 notifier_port = Integer(config=True,
162 help="""PUB port for sending engine status notifications""")
162 help="""PUB port for sending engine status notifications""")
163
163
164 def _notifier_port_default(self):
164 def _notifier_port_default(self):
165 return util.select_random_ports(1)[0]
165 return util.select_random_ports(1)[0]
166
166
167 engine_ip = Unicode('127.0.0.1', config=True,
167 engine_ip = Unicode('127.0.0.1', config=True,
168 help="IP on which to listen for engine connections. [default: loopback]")
168 help="IP on which to listen for engine connections. [default: loopback]")
169 engine_transport = Unicode('tcp', config=True,
169 engine_transport = Unicode('tcp', config=True,
170 help="0MQ transport for engine connections. [default: tcp]")
170 help="0MQ transport for engine connections. [default: tcp]")
171
171
172 client_ip = Unicode('127.0.0.1', config=True,
172 client_ip = Unicode('127.0.0.1', config=True,
173 help="IP on which to listen for client connections. [default: loopback]")
173 help="IP on which to listen for client connections. [default: loopback]")
174 client_transport = Unicode('tcp', config=True,
174 client_transport = Unicode('tcp', config=True,
175 help="0MQ transport for client connections. [default : tcp]")
175 help="0MQ transport for client connections. [default : tcp]")
176
176
177 monitor_ip = Unicode('127.0.0.1', config=True,
177 monitor_ip = Unicode('127.0.0.1', config=True,
178 help="IP on which to listen for monitor messages. [default: loopback]")
178 help="IP on which to listen for monitor messages. [default: loopback]")
179 monitor_transport = Unicode('tcp', config=True,
179 monitor_transport = Unicode('tcp', config=True,
180 help="0MQ transport for monitor messages. [default : tcp]")
180 help="0MQ transport for monitor messages. [default : tcp]")
181
181
182 monitor_url = Unicode('')
182 monitor_url = Unicode('')
183
183
184 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
184 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
185 config=True, help="""The class to use for the DB backend""")
185 config=True, help="""The class to use for the DB backend""")
186
186
187 # not configurable
187 # not configurable
188 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
188 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
189 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
189 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
190
190
191 def _ip_changed(self, name, old, new):
191 def _ip_changed(self, name, old, new):
192 self.engine_ip = new
192 self.engine_ip = new
193 self.client_ip = new
193 self.client_ip = new
194 self.monitor_ip = new
194 self.monitor_ip = new
195 self._update_monitor_url()
195 self._update_monitor_url()
196
196
197 def _update_monitor_url(self):
197 def _update_monitor_url(self):
198 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
198 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
199
199
200 def _transport_changed(self, name, old, new):
200 def _transport_changed(self, name, old, new):
201 self.engine_transport = new
201 self.engine_transport = new
202 self.client_transport = new
202 self.client_transport = new
203 self.monitor_transport = new
203 self.monitor_transport = new
204 self._update_monitor_url()
204 self._update_monitor_url()
205
205
206 def __init__(self, **kwargs):
206 def __init__(self, **kwargs):
207 super(HubFactory, self).__init__(**kwargs)
207 super(HubFactory, self).__init__(**kwargs)
208 self._update_monitor_url()
208 self._update_monitor_url()
209
209
210
210
211 def construct(self):
211 def construct(self):
212 self.init_hub()
212 self.init_hub()
213
213
214 def start(self):
214 def start(self):
215 self.heartmonitor.start()
215 self.heartmonitor.start()
216 self.log.info("Heartmonitor started")
216 self.log.info("Heartmonitor started")
217
217
218 def init_hub(self):
218 def init_hub(self):
219 """construct"""
219 """construct"""
220 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
220 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
221 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
221 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
222
222
223 ctx = self.context
223 ctx = self.context
224 loop = self.loop
224 loop = self.loop
225
225
226 # Registrar socket
226 # Registrar socket
227 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
227 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
228 q.bind(client_iface % self.regport)
228 q.bind(client_iface % self.regport)
229 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
229 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
230 if self.client_ip != self.engine_ip:
230 if self.client_ip != self.engine_ip:
231 q.bind(engine_iface % self.regport)
231 q.bind(engine_iface % self.regport)
232 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
232 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
233
233
234 ### Engine connections ###
234 ### Engine connections ###
235
235
236 # heartbeat
236 # heartbeat
237 hpub = ctx.socket(zmq.PUB)
237 hpub = ctx.socket(zmq.PUB)
238 hpub.bind(engine_iface % self.hb[0])
238 hpub.bind(engine_iface % self.hb[0])
239 hrep = ctx.socket(zmq.ROUTER)
239 hrep = ctx.socket(zmq.ROUTER)
240 hrep.bind(engine_iface % self.hb[1])
240 hrep.bind(engine_iface % self.hb[1])
241 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
241 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
242 pingstream=ZMQStream(hpub,loop),
242 pingstream=ZMQStream(hpub,loop),
243 pongstream=ZMQStream(hrep,loop)
243 pongstream=ZMQStream(hrep,loop)
244 )
244 )
245
245
246 ### Client connections ###
246 ### Client connections ###
247 # Notifier socket
247 # Notifier socket
248 n = ZMQStream(ctx.socket(zmq.PUB), loop)
248 n = ZMQStream(ctx.socket(zmq.PUB), loop)
249 n.bind(client_iface%self.notifier_port)
249 n.bind(client_iface%self.notifier_port)
250
250
251 ### build and launch the queues ###
251 ### build and launch the queues ###
252
252
253 # monitor socket
253 # monitor socket
254 sub = ctx.socket(zmq.SUB)
254 sub = ctx.socket(zmq.SUB)
255 sub.setsockopt(zmq.SUBSCRIBE, b"")
255 sub.setsockopt(zmq.SUBSCRIBE, b"")
256 sub.bind(self.monitor_url)
256 sub.bind(self.monitor_url)
257 sub.bind('inproc://monitor')
257 sub.bind('inproc://monitor')
258 sub = ZMQStream(sub, loop)
258 sub = ZMQStream(sub, loop)
259
259
260 # connect the db
260 # connect the db
261 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
261 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
262 # cdir = self.config.Global.cluster_dir
262 # cdir = self.config.Global.cluster_dir
263 self.db = import_item(str(self.db_class))(session=self.session.session,
263 self.db = import_item(str(self.db_class))(session=self.session.session,
264 config=self.config, log=self.log)
264 config=self.config, log=self.log)
265 time.sleep(.25)
265 time.sleep(.25)
266 try:
266 try:
267 scheme = self.config.TaskScheduler.scheme_name
267 scheme = self.config.TaskScheduler.scheme_name
268 except AttributeError:
268 except AttributeError:
269 from .scheduler import TaskScheduler
269 from .scheduler import TaskScheduler
270 scheme = TaskScheduler.scheme_name.get_default_value()
270 scheme = TaskScheduler.scheme_name.get_default_value()
271 # build connection dicts
271 # build connection dicts
272 self.engine_info = {
272 self.engine_info = {
273 'control' : engine_iface%self.control[1],
273 'control' : engine_iface%self.control[1],
274 'mux': engine_iface%self.mux[1],
274 'mux': engine_iface%self.mux[1],
275 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
275 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
276 'task' : engine_iface%self.task[1],
276 'task' : engine_iface%self.task[1],
277 'iopub' : engine_iface%self.iopub[1],
277 'iopub' : engine_iface%self.iopub[1],
278 # 'monitor' : engine_iface%self.mon_port,
278 # 'monitor' : engine_iface%self.mon_port,
279 }
279 }
280
280
281 self.client_info = {
281 self.client_info = {
282 'control' : client_iface%self.control[0],
282 'control' : client_iface%self.control[0],
283 'mux': client_iface%self.mux[0],
283 'mux': client_iface%self.mux[0],
284 'task' : (scheme, client_iface%self.task[0]),
284 'task' : (scheme, client_iface%self.task[0]),
285 'iopub' : client_iface%self.iopub[0],
285 'iopub' : client_iface%self.iopub[0],
286 'notification': client_iface%self.notifier_port
286 'notification': client_iface%self.notifier_port
287 }
287 }
288 self.log.debug("Hub engine addrs: %s", self.engine_info)
288 self.log.debug("Hub engine addrs: %s", self.engine_info)
289 self.log.debug("Hub client addrs: %s", self.client_info)
289 self.log.debug("Hub client addrs: %s", self.client_info)
290
290
291 # resubmit stream
291 # resubmit stream
292 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
292 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
293 url = util.disambiguate_url(self.client_info['task'][-1])
293 url = util.disambiguate_url(self.client_info['task'][-1])
294 r.setsockopt(zmq.IDENTITY, self.session.bsession)
294 r.setsockopt(zmq.IDENTITY, self.session.bsession)
295 r.connect(url)
295 r.connect(url)
296
296
297 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
297 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
298 query=q, notifier=n, resubmit=r, db=self.db,
298 query=q, notifier=n, resubmit=r, db=self.db,
299 engine_info=self.engine_info, client_info=self.client_info,
299 engine_info=self.engine_info, client_info=self.client_info,
300 log=self.log)
300 log=self.log)
301
301
302
302
303 class Hub(SessionFactory):
303 class Hub(SessionFactory):
304 """The IPython Controller Hub with 0MQ connections
304 """The IPython Controller Hub with 0MQ connections
305
305
306 Parameters
306 Parameters
307 ==========
307 ==========
308 loop: zmq IOLoop instance
308 loop: zmq IOLoop instance
309 session: Session object
309 session: Session object
310 <removed> context: zmq context for creating new connections (?)
310 <removed> context: zmq context for creating new connections (?)
311 queue: ZMQStream for monitoring the command queue (SUB)
311 queue: ZMQStream for monitoring the command queue (SUB)
312 query: ZMQStream for engine registration and client queries requests (XREP)
312 query: ZMQStream for engine registration and client queries requests (XREP)
313 heartbeat: HeartMonitor object checking the pulse of the engines
313 heartbeat: HeartMonitor object checking the pulse of the engines
314 notifier: ZMQStream for broadcasting engine registration changes (PUB)
314 notifier: ZMQStream for broadcasting engine registration changes (PUB)
315 db: connection to db for out of memory logging of commands
315 db: connection to db for out of memory logging of commands
316 NotImplemented
316 NotImplemented
317 engine_info: dict of zmq connection information for engines to connect
317 engine_info: dict of zmq connection information for engines to connect
318 to the queues.
318 to the queues.
319 client_info: dict of zmq connection information for engines to connect
319 client_info: dict of zmq connection information for engines to connect
320 to the queues.
320 to the queues.
321 """
321 """
322 # internal data structures:
322 # internal data structures:
323 ids=Set() # engine IDs
323 ids=Set() # engine IDs
324 keytable=Dict()
324 keytable=Dict()
325 by_ident=Dict()
325 by_ident=Dict()
326 engines=Dict()
326 engines=Dict()
327 clients=Dict()
327 clients=Dict()
328 hearts=Dict()
328 hearts=Dict()
329 pending=Set()
329 pending=Set()
330 queues=Dict() # pending msg_ids keyed by engine_id
330 queues=Dict() # pending msg_ids keyed by engine_id
331 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
331 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
332 completed=Dict() # completed msg_ids keyed by engine_id
332 completed=Dict() # completed msg_ids keyed by engine_id
333 all_completed=Set() # completed msg_ids keyed by engine_id
333 all_completed=Set() # completed msg_ids keyed by engine_id
334 dead_engines=Set() # completed msg_ids keyed by engine_id
334 dead_engines=Set() # completed msg_ids keyed by engine_id
335 unassigned=Set() # set of task msg_ds not yet assigned a destination
335 unassigned=Set() # set of task msg_ds not yet assigned a destination
336 incoming_registrations=Dict()
336 incoming_registrations=Dict()
337 registration_timeout=Integer()
337 registration_timeout=Integer()
338 _idcounter=Integer(0)
338 _idcounter=Integer(0)
339
339
340 # objects from constructor:
340 # objects from constructor:
341 query=Instance(ZMQStream)
341 query=Instance(ZMQStream)
342 monitor=Instance(ZMQStream)
342 monitor=Instance(ZMQStream)
343 notifier=Instance(ZMQStream)
343 notifier=Instance(ZMQStream)
344 resubmit=Instance(ZMQStream)
344 resubmit=Instance(ZMQStream)
345 heartmonitor=Instance(HeartMonitor)
345 heartmonitor=Instance(HeartMonitor)
346 db=Instance(object)
346 db=Instance(object)
347 client_info=Dict()
347 client_info=Dict()
348 engine_info=Dict()
348 engine_info=Dict()
349
349
350
350
351 def __init__(self, **kwargs):
351 def __init__(self, **kwargs):
352 """
352 """
353 # universal:
353 # universal:
354 loop: IOLoop for creating future connections
354 loop: IOLoop for creating future connections
355 session: streamsession for sending serialized data
355 session: streamsession for sending serialized data
356 # engine:
356 # engine:
357 queue: ZMQStream for monitoring queue messages
357 queue: ZMQStream for monitoring queue messages
358 query: ZMQStream for engine+client registration and client requests
358 query: ZMQStream for engine+client registration and client requests
359 heartbeat: HeartMonitor object for tracking engines
359 heartbeat: HeartMonitor object for tracking engines
360 # extra:
360 # extra:
361 db: ZMQStream for db connection (NotImplemented)
361 db: ZMQStream for db connection (NotImplemented)
362 engine_info: zmq address/protocol dict for engine connections
362 engine_info: zmq address/protocol dict for engine connections
363 client_info: zmq address/protocol dict for client connections
363 client_info: zmq address/protocol dict for client connections
364 """
364 """
365
365
366 super(Hub, self).__init__(**kwargs)
366 super(Hub, self).__init__(**kwargs)
367 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
367 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
368
368
369 # validate connection dicts:
369 # validate connection dicts:
370 for k,v in self.client_info.iteritems():
370 for k,v in self.client_info.iteritems():
371 if k == 'task':
371 if k == 'task':
372 util.validate_url_container(v[1])
372 util.validate_url_container(v[1])
373 else:
373 else:
374 util.validate_url_container(v)
374 util.validate_url_container(v)
375 # util.validate_url_container(self.client_info)
375 # util.validate_url_container(self.client_info)
376 util.validate_url_container(self.engine_info)
376 util.validate_url_container(self.engine_info)
377
377
378 # register our callbacks
378 # register our callbacks
379 self.query.on_recv(self.dispatch_query)
379 self.query.on_recv(self.dispatch_query)
380 self.monitor.on_recv(self.dispatch_monitor_traffic)
380 self.monitor.on_recv(self.dispatch_monitor_traffic)
381
381
382 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
382 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
383 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
383 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
384
384
385 self.monitor_handlers = {b'in' : self.save_queue_request,
385 self.monitor_handlers = {b'in' : self.save_queue_request,
386 b'out': self.save_queue_result,
386 b'out': self.save_queue_result,
387 b'intask': self.save_task_request,
387 b'intask': self.save_task_request,
388 b'outtask': self.save_task_result,
388 b'outtask': self.save_task_result,
389 b'tracktask': self.save_task_destination,
389 b'tracktask': self.save_task_destination,
390 b'incontrol': _passer,
390 b'incontrol': _passer,
391 b'outcontrol': _passer,
391 b'outcontrol': _passer,
392 b'iopub': self.save_iopub_message,
392 b'iopub': self.save_iopub_message,
393 }
393 }
394
394
395 self.query_handlers = {'queue_request': self.queue_status,
395 self.query_handlers = {'queue_request': self.queue_status,
396 'result_request': self.get_results,
396 'result_request': self.get_results,
397 'history_request': self.get_history,
397 'history_request': self.get_history,
398 'db_request': self.db_query,
398 'db_request': self.db_query,
399 'purge_request': self.purge_results,
399 'purge_request': self.purge_results,
400 'load_request': self.check_load,
400 'load_request': self.check_load,
401 'resubmit_request': self.resubmit_task,
401 'resubmit_request': self.resubmit_task,
402 'shutdown_request': self.shutdown_request,
402 'shutdown_request': self.shutdown_request,
403 'registration_request' : self.register_engine,
403 'registration_request' : self.register_engine,
404 'unregistration_request' : self.unregister_engine,
404 'unregistration_request' : self.unregister_engine,
405 'connection_request': self.connection_request,
405 'connection_request': self.connection_request,
406 }
406 }
407
407
408 # ignore resubmit replies
408 # ignore resubmit replies
409 self.resubmit.on_recv(lambda msg: None, copy=False)
409 self.resubmit.on_recv(lambda msg: None, copy=False)
410
410
411 self.log.info("hub::created hub")
411 self.log.info("hub::created hub")
412
412
413 @property
413 @property
414 def _next_id(self):
414 def _next_id(self):
415 """gemerate a new ID.
415 """gemerate a new ID.
416
416
417 No longer reuse old ids, just count from 0."""
417 No longer reuse old ids, just count from 0."""
418 newid = self._idcounter
418 newid = self._idcounter
419 self._idcounter += 1
419 self._idcounter += 1
420 return newid
420 return newid
421 # newid = 0
421 # newid = 0
422 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
422 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
423 # # print newid, self.ids, self.incoming_registrations
423 # # print newid, self.ids, self.incoming_registrations
424 # while newid in self.ids or newid in incoming:
424 # while newid in self.ids or newid in incoming:
425 # newid += 1
425 # newid += 1
426 # return newid
426 # return newid
427
427
428 #-----------------------------------------------------------------------------
428 #-----------------------------------------------------------------------------
429 # message validation
429 # message validation
430 #-----------------------------------------------------------------------------
430 #-----------------------------------------------------------------------------
431
431
432 def _validate_targets(self, targets):
432 def _validate_targets(self, targets):
433 """turn any valid targets argument into a list of integer ids"""
433 """turn any valid targets argument into a list of integer ids"""
434 if targets is None:
434 if targets is None:
435 # default to all
435 # default to all
436 return self.ids
436 return self.ids
437
437
438 if isinstance(targets, (int,str,unicode)):
438 if isinstance(targets, (int,str,unicode)):
439 # only one target specified
439 # only one target specified
440 targets = [targets]
440 targets = [targets]
441 _targets = []
441 _targets = []
442 for t in targets:
442 for t in targets:
443 # map raw identities to ids
443 # map raw identities to ids
444 if isinstance(t, (str,unicode)):
444 if isinstance(t, (str,unicode)):
445 t = self.by_ident.get(cast_bytes(t), t)
445 t = self.by_ident.get(cast_bytes(t), t)
446 _targets.append(t)
446 _targets.append(t)
447 targets = _targets
447 targets = _targets
448 bad_targets = [ t for t in targets if t not in self.ids ]
448 bad_targets = [ t for t in targets if t not in self.ids ]
449 if bad_targets:
449 if bad_targets:
450 raise IndexError("No Such Engine: %r" % bad_targets)
450 raise IndexError("No Such Engine: %r" % bad_targets)
451 if not targets:
451 if not targets:
452 raise IndexError("No Engines Registered")
452 raise IndexError("No Engines Registered")
453 return targets
453 return targets
454
454
455 #-----------------------------------------------------------------------------
455 #-----------------------------------------------------------------------------
456 # dispatch methods (1 per stream)
456 # dispatch methods (1 per stream)
457 #-----------------------------------------------------------------------------
457 #-----------------------------------------------------------------------------
458
458
459
459
460 @util.log_errors
460 @util.log_errors
461 def dispatch_monitor_traffic(self, msg):
461 def dispatch_monitor_traffic(self, msg):
462 """all ME and Task queue messages come through here, as well as
462 """all ME and Task queue messages come through here, as well as
463 IOPub traffic."""
463 IOPub traffic."""
464 self.log.debug("monitor traffic: %r", msg[0])
464 self.log.debug("monitor traffic: %r", msg[0])
465 switch = msg[0]
465 switch = msg[0]
466 try:
466 try:
467 idents, msg = self.session.feed_identities(msg[1:])
467 idents, msg = self.session.feed_identities(msg[1:])
468 except ValueError:
468 except ValueError:
469 idents=[]
469 idents=[]
470 if not idents:
470 if not idents:
471 self.log.error("Monitor message without topic: %r", msg)
471 self.log.error("Monitor message without topic: %r", msg)
472 return
472 return
473 handler = self.monitor_handlers.get(switch, None)
473 handler = self.monitor_handlers.get(switch, None)
474 if handler is not None:
474 if handler is not None:
475 handler(idents, msg)
475 handler(idents, msg)
476 else:
476 else:
477 self.log.error("Unrecognized monitor topic: %r", switch)
477 self.log.error("Unrecognized monitor topic: %r", switch)
478
478
479
479
480 @util.log_errors
480 @util.log_errors
481 def dispatch_query(self, msg):
481 def dispatch_query(self, msg):
482 """Route registration requests and queries from clients."""
482 """Route registration requests and queries from clients."""
483 try:
483 try:
484 idents, msg = self.session.feed_identities(msg)
484 idents, msg = self.session.feed_identities(msg)
485 except ValueError:
485 except ValueError:
486 idents = []
486 idents = []
487 if not idents:
487 if not idents:
488 self.log.error("Bad Query Message: %r", msg)
488 self.log.error("Bad Query Message: %r", msg)
489 return
489 return
490 client_id = idents[0]
490 client_id = idents[0]
491 try:
491 try:
492 msg = self.session.unserialize(msg, content=True)
492 msg = self.session.unserialize(msg, content=True)
493 except Exception:
493 except Exception:
494 content = error.wrap_exception()
494 content = error.wrap_exception()
495 self.log.error("Bad Query Message: %r", msg, exc_info=True)
495 self.log.error("Bad Query Message: %r", msg, exc_info=True)
496 self.session.send(self.query, "hub_error", ident=client_id,
496 self.session.send(self.query, "hub_error", ident=client_id,
497 content=content)
497 content=content)
498 return
498 return
499 # print client_id, header, parent, content
499 # print client_id, header, parent, content
500 #switch on message type:
500 #switch on message type:
501 msg_type = msg['header']['msg_type']
501 msg_type = msg['header']['msg_type']
502 self.log.info("client::client %r requested %r", client_id, msg_type)
502 self.log.info("client::client %r requested %r", client_id, msg_type)
503 handler = self.query_handlers.get(msg_type, None)
503 handler = self.query_handlers.get(msg_type, None)
504 try:
504 try:
505 assert handler is not None, "Bad Message Type: %r" % msg_type
505 assert handler is not None, "Bad Message Type: %r" % msg_type
506 except:
506 except:
507 content = error.wrap_exception()
507 content = error.wrap_exception()
508 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
508 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
509 self.session.send(self.query, "hub_error", ident=client_id,
509 self.session.send(self.query, "hub_error", ident=client_id,
510 content=content)
510 content=content)
511 return
511 return
512
512
513 else:
513 else:
514 handler(idents, msg)
514 handler(idents, msg)
515
515
516 def dispatch_db(self, msg):
516 def dispatch_db(self, msg):
517 """"""
517 """"""
518 raise NotImplementedError
518 raise NotImplementedError
519
519
520 #---------------------------------------------------------------------------
520 #---------------------------------------------------------------------------
521 # handler methods (1 per event)
521 # handler methods (1 per event)
522 #---------------------------------------------------------------------------
522 #---------------------------------------------------------------------------
523
523
524 #----------------------- Heartbeat --------------------------------------
524 #----------------------- Heartbeat --------------------------------------
525
525
526 def handle_new_heart(self, heart):
526 def handle_new_heart(self, heart):
527 """handler to attach to heartbeater.
527 """handler to attach to heartbeater.
528 Called when a new heart starts to beat.
528 Called when a new heart starts to beat.
529 Triggers completion of registration."""
529 Triggers completion of registration."""
530 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
530 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
531 if heart not in self.incoming_registrations:
531 if heart not in self.incoming_registrations:
532 self.log.info("heartbeat::ignoring new heart: %r", heart)
532 self.log.info("heartbeat::ignoring new heart: %r", heart)
533 else:
533 else:
534 self.finish_registration(heart)
534 self.finish_registration(heart)
535
535
536
536
537 def handle_heart_failure(self, heart):
537 def handle_heart_failure(self, heart):
538 """handler to attach to heartbeater.
538 """handler to attach to heartbeater.
539 called when a previously registered heart fails to respond to beat request.
539 called when a previously registered heart fails to respond to beat request.
540 triggers unregistration"""
540 triggers unregistration"""
541 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
541 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
542 eid = self.hearts.get(heart, None)
542 eid = self.hearts.get(heart, None)
543 queue = self.engines[eid].queue
543 queue = self.engines[eid].queue
544 if eid is None or self.keytable[eid] in self.dead_engines:
544 if eid is None or self.keytable[eid] in self.dead_engines:
545 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
545 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
546 else:
546 else:
547 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
547 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
548
548
549 #----------------------- MUX Queue Traffic ------------------------------
549 #----------------------- MUX Queue Traffic ------------------------------
550
550
551 def save_queue_request(self, idents, msg):
551 def save_queue_request(self, idents, msg):
552 if len(idents) < 2:
552 if len(idents) < 2:
553 self.log.error("invalid identity prefix: %r", idents)
553 self.log.error("invalid identity prefix: %r", idents)
554 return
554 return
555 queue_id, client_id = idents[:2]
555 queue_id, client_id = idents[:2]
556 try:
556 try:
557 msg = self.session.unserialize(msg)
557 msg = self.session.unserialize(msg)
558 except Exception:
558 except Exception:
559 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
559 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
560 return
560 return
561
561
562 eid = self.by_ident.get(queue_id, None)
562 eid = self.by_ident.get(queue_id, None)
563 if eid is None:
563 if eid is None:
564 self.log.error("queue::target %r not registered", queue_id)
564 self.log.error("queue::target %r not registered", queue_id)
565 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
565 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
566 return
566 return
567 record = init_record(msg)
567 record = init_record(msg)
568 msg_id = record['msg_id']
568 msg_id = record['msg_id']
569 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
569 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
570 # Unicode in records
570 # Unicode in records
571 record['engine_uuid'] = queue_id.decode('ascii')
571 record['engine_uuid'] = queue_id.decode('ascii')
572 record['client_uuid'] = client_id.decode('ascii')
572 record['client_uuid'] = client_id.decode('ascii')
573 record['queue'] = 'mux'
573 record['queue'] = 'mux'
574
574
575 try:
575 try:
576 # it's posible iopub arrived first:
576 # it's posible iopub arrived first:
577 existing = self.db.get_record(msg_id)
577 existing = self.db.get_record(msg_id)
578 for key,evalue in existing.iteritems():
578 for key,evalue in existing.iteritems():
579 rvalue = record.get(key, None)
579 rvalue = record.get(key, None)
580 if evalue and rvalue and evalue != rvalue:
580 if evalue and rvalue and evalue != rvalue:
581 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
581 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
582 elif evalue and not rvalue:
582 elif evalue and not rvalue:
583 record[key] = evalue
583 record[key] = evalue
584 try:
584 try:
585 self.db.update_record(msg_id, record)
585 self.db.update_record(msg_id, record)
586 except Exception:
586 except Exception:
587 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
587 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
588 except KeyError:
588 except KeyError:
589 try:
589 try:
590 self.db.add_record(msg_id, record)
590 self.db.add_record(msg_id, record)
591 except Exception:
591 except Exception:
592 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
592 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
593
593
594
594
595 self.pending.add(msg_id)
595 self.pending.add(msg_id)
596 self.queues[eid].append(msg_id)
596 self.queues[eid].append(msg_id)
597
597
598 def save_queue_result(self, idents, msg):
598 def save_queue_result(self, idents, msg):
599 if len(idents) < 2:
599 if len(idents) < 2:
600 self.log.error("invalid identity prefix: %r", idents)
600 self.log.error("invalid identity prefix: %r", idents)
601 return
601 return
602
602
603 client_id, queue_id = idents[:2]
603 client_id, queue_id = idents[:2]
604 try:
604 try:
605 msg = self.session.unserialize(msg)
605 msg = self.session.unserialize(msg)
606 except Exception:
606 except Exception:
607 self.log.error("queue::engine %r sent invalid message to %r: %r",
607 self.log.error("queue::engine %r sent invalid message to %r: %r",
608 queue_id, client_id, msg, exc_info=True)
608 queue_id, client_id, msg, exc_info=True)
609 return
609 return
610
610
611 eid = self.by_ident.get(queue_id, None)
611 eid = self.by_ident.get(queue_id, None)
612 if eid is None:
612 if eid is None:
613 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
613 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
614 return
614 return
615
615
616 parent = msg['parent_header']
616 parent = msg['parent_header']
617 if not parent:
617 if not parent:
618 return
618 return
619 msg_id = parent['msg_id']
619 msg_id = parent['msg_id']
620 if msg_id in self.pending:
620 if msg_id in self.pending:
621 self.pending.remove(msg_id)
621 self.pending.remove(msg_id)
622 self.all_completed.add(msg_id)
622 self.all_completed.add(msg_id)
623 self.queues[eid].remove(msg_id)
623 self.queues[eid].remove(msg_id)
624 self.completed[eid].append(msg_id)
624 self.completed[eid].append(msg_id)
625 self.log.info("queue::request %r completed on %s", msg_id, eid)
625 self.log.info("queue::request %r completed on %s", msg_id, eid)
626 elif msg_id not in self.all_completed:
626 elif msg_id not in self.all_completed:
627 # it could be a result from a dead engine that died before delivering the
627 # it could be a result from a dead engine that died before delivering the
628 # result
628 # result
629 self.log.warn("queue:: unknown msg finished %r", msg_id)
629 self.log.warn("queue:: unknown msg finished %r", msg_id)
630 return
630 return
631 # update record anyway, because the unregistration could have been premature
631 # update record anyway, because the unregistration could have been premature
632 rheader = msg['header']
632 rheader = msg['header']
633 completed = rheader['date']
633 completed = rheader['date']
634 started = rheader.get('started', None)
634 started = rheader.get('started', None)
635 result = {
635 result = {
636 'result_header' : rheader,
636 'result_header' : rheader,
637 'result_content': msg['content'],
637 'result_content': msg['content'],
638 'received': datetime.now(),
638 'received': datetime.now(),
639 'started' : started,
639 'started' : started,
640 'completed' : completed
640 'completed' : completed
641 }
641 }
642
642
643 result['result_buffers'] = msg['buffers']
643 result['result_buffers'] = msg['buffers']
644 try:
644 try:
645 self.db.update_record(msg_id, result)
645 self.db.update_record(msg_id, result)
646 except Exception:
646 except Exception:
647 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
647 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
648
648
649
649
650 #--------------------- Task Queue Traffic ------------------------------
650 #--------------------- Task Queue Traffic ------------------------------
651
651
652 def save_task_request(self, idents, msg):
652 def save_task_request(self, idents, msg):
653 """Save the submission of a task."""
653 """Save the submission of a task."""
654 client_id = idents[0]
654 client_id = idents[0]
655
655
656 try:
656 try:
657 msg = self.session.unserialize(msg)
657 msg = self.session.unserialize(msg)
658 except Exception:
658 except Exception:
659 self.log.error("task::client %r sent invalid task message: %r",
659 self.log.error("task::client %r sent invalid task message: %r",
660 client_id, msg, exc_info=True)
660 client_id, msg, exc_info=True)
661 return
661 return
662 record = init_record(msg)
662 record = init_record(msg)
663
663
664 record['client_uuid'] = client_id.decode('ascii')
664 record['client_uuid'] = client_id.decode('ascii')
665 record['queue'] = 'task'
665 record['queue'] = 'task'
666 header = msg['header']
666 header = msg['header']
667 msg_id = header['msg_id']
667 msg_id = header['msg_id']
668 self.pending.add(msg_id)
668 self.pending.add(msg_id)
669 self.unassigned.add(msg_id)
669 self.unassigned.add(msg_id)
670 try:
670 try:
671 # it's posible iopub arrived first:
671 # it's posible iopub arrived first:
672 existing = self.db.get_record(msg_id)
672 existing = self.db.get_record(msg_id)
673 if existing['resubmitted']:
673 if existing['resubmitted']:
674 for key in ('submitted', 'client_uuid', 'buffers'):
674 for key in ('submitted', 'client_uuid', 'buffers'):
675 # don't clobber these keys on resubmit
675 # don't clobber these keys on resubmit
676 # submitted and client_uuid should be different
676 # submitted and client_uuid should be different
677 # and buffers might be big, and shouldn't have changed
677 # and buffers might be big, and shouldn't have changed
678 record.pop(key)
678 record.pop(key)
679 # still check content,header which should not change
679 # still check content,header which should not change
680 # but are not expensive to compare as buffers
680 # but are not expensive to compare as buffers
681
681
682 for key,evalue in existing.iteritems():
682 for key,evalue in existing.iteritems():
683 if key.endswith('buffers'):
683 if key.endswith('buffers'):
684 # don't compare buffers
684 # don't compare buffers
685 continue
685 continue
686 rvalue = record.get(key, None)
686 rvalue = record.get(key, None)
687 if evalue and rvalue and evalue != rvalue:
687 if evalue and rvalue and evalue != rvalue:
688 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
688 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
689 elif evalue and not rvalue:
689 elif evalue and not rvalue:
690 record[key] = evalue
690 record[key] = evalue
691 try:
691 try:
692 self.db.update_record(msg_id, record)
692 self.db.update_record(msg_id, record)
693 except Exception:
693 except Exception:
694 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
694 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
695 except KeyError:
695 except KeyError:
696 try:
696 try:
697 self.db.add_record(msg_id, record)
697 self.db.add_record(msg_id, record)
698 except Exception:
698 except Exception:
699 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
699 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
700 except Exception:
700 except Exception:
701 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
701 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
702
702
703 def save_task_result(self, idents, msg):
703 def save_task_result(self, idents, msg):
704 """save the result of a completed task."""
704 """save the result of a completed task."""
705 client_id = idents[0]
705 client_id = idents[0]
706 try:
706 try:
707 msg = self.session.unserialize(msg)
707 msg = self.session.unserialize(msg)
708 except Exception:
708 except Exception:
709 self.log.error("task::invalid task result message send to %r: %r",
709 self.log.error("task::invalid task result message send to %r: %r",
710 client_id, msg, exc_info=True)
710 client_id, msg, exc_info=True)
711 return
711 return
712
712
713 parent = msg['parent_header']
713 parent = msg['parent_header']
714 if not parent:
714 if not parent:
715 # print msg
715 # print msg
716 self.log.warn("Task %r had no parent!", msg)
716 self.log.warn("Task %r had no parent!", msg)
717 return
717 return
718 msg_id = parent['msg_id']
718 msg_id = parent['msg_id']
719 if msg_id in self.unassigned:
719 if msg_id in self.unassigned:
720 self.unassigned.remove(msg_id)
720 self.unassigned.remove(msg_id)
721
721
722 header = msg['header']
722 header = msg['header']
723 engine_uuid = header.get('engine', u'')
723 engine_uuid = header.get('engine', u'')
724 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
724 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
725
725
726 status = header.get('status', None)
726 status = header.get('status', None)
727
727
728 if msg_id in self.pending:
728 if msg_id in self.pending:
729 self.log.info("task::task %r finished on %s", msg_id, eid)
729 self.log.info("task::task %r finished on %s", msg_id, eid)
730 self.pending.remove(msg_id)
730 self.pending.remove(msg_id)
731 self.all_completed.add(msg_id)
731 self.all_completed.add(msg_id)
732 if eid is not None:
732 if eid is not None:
733 if status != 'aborted':
733 if status != 'aborted':
734 self.completed[eid].append(msg_id)
734 self.completed[eid].append(msg_id)
735 if msg_id in self.tasks[eid]:
735 if msg_id in self.tasks[eid]:
736 self.tasks[eid].remove(msg_id)
736 self.tasks[eid].remove(msg_id)
737 completed = header['date']
737 completed = header['date']
738 started = header.get('started', None)
738 started = header.get('started', None)
739 result = {
739 result = {
740 'result_header' : header,
740 'result_header' : header,
741 'result_content': msg['content'],
741 'result_content': msg['content'],
742 'started' : started,
742 'started' : started,
743 'completed' : completed,
743 'completed' : completed,
744 'received' : datetime.now(),
744 'received' : datetime.now(),
745 'engine_uuid': engine_uuid,
745 'engine_uuid': engine_uuid,
746 }
746 }
747
747
748 result['result_buffers'] = msg['buffers']
748 result['result_buffers'] = msg['buffers']
749 try:
749 try:
750 self.db.update_record(msg_id, result)
750 self.db.update_record(msg_id, result)
751 except Exception:
751 except Exception:
752 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
752 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
753
753
754 else:
754 else:
755 self.log.debug("task::unknown task %r finished", msg_id)
755 self.log.debug("task::unknown task %r finished", msg_id)
756
756
757 def save_task_destination(self, idents, msg):
757 def save_task_destination(self, idents, msg):
758 try:
758 try:
759 msg = self.session.unserialize(msg, content=True)
759 msg = self.session.unserialize(msg, content=True)
760 except Exception:
760 except Exception:
761 self.log.error("task::invalid task tracking message", exc_info=True)
761 self.log.error("task::invalid task tracking message", exc_info=True)
762 return
762 return
763 content = msg['content']
763 content = msg['content']
764 # print (content)
764 # print (content)
765 msg_id = content['msg_id']
765 msg_id = content['msg_id']
766 engine_uuid = content['engine_id']
766 engine_uuid = content['engine_id']
767 eid = self.by_ident[cast_bytes(engine_uuid)]
767 eid = self.by_ident[cast_bytes(engine_uuid)]
768
768
769 self.log.info("task::task %r arrived on %r", msg_id, eid)
769 self.log.info("task::task %r arrived on %r", msg_id, eid)
770 if msg_id in self.unassigned:
770 if msg_id in self.unassigned:
771 self.unassigned.remove(msg_id)
771 self.unassigned.remove(msg_id)
772 # else:
772 # else:
773 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
773 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
774
774
775 self.tasks[eid].append(msg_id)
775 self.tasks[eid].append(msg_id)
776 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
776 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
777 try:
777 try:
778 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
778 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
779 except Exception:
779 except Exception:
780 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
780 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
781
781
782
782
783 def mia_task_request(self, idents, msg):
783 def mia_task_request(self, idents, msg):
784 raise NotImplementedError
784 raise NotImplementedError
785 client_id = idents[0]
785 client_id = idents[0]
786 # content = dict(mia=self.mia,status='ok')
786 # content = dict(mia=self.mia,status='ok')
787 # self.session.send('mia_reply', content=content, idents=client_id)
787 # self.session.send('mia_reply', content=content, idents=client_id)
788
788
789
789
790 #--------------------- IOPub Traffic ------------------------------
790 #--------------------- IOPub Traffic ------------------------------
791
791
792 def save_iopub_message(self, topics, msg):
792 def save_iopub_message(self, topics, msg):
793 """save an iopub message into the db"""
793 """save an iopub message into the db"""
794 # print (topics)
794 # print (topics)
795 try:
795 try:
796 msg = self.session.unserialize(msg, content=True)
796 msg = self.session.unserialize(msg, content=True)
797 except Exception:
797 except Exception:
798 self.log.error("iopub::invalid IOPub message", exc_info=True)
798 self.log.error("iopub::invalid IOPub message", exc_info=True)
799 return
799 return
800
800
801 parent = msg['parent_header']
801 parent = msg['parent_header']
802 if not parent:
802 if not parent:
803 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
803 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
804 return
804 return
805 msg_id = parent['msg_id']
805 msg_id = parent['msg_id']
806 msg_type = msg['header']['msg_type']
806 msg_type = msg['header']['msg_type']
807 content = msg['content']
807 content = msg['content']
808
808
809 # ensure msg_id is in db
809 # ensure msg_id is in db
810 try:
810 try:
811 rec = self.db.get_record(msg_id)
811 rec = self.db.get_record(msg_id)
812 except KeyError:
812 except KeyError:
813 rec = empty_record()
813 rec = empty_record()
814 rec['msg_id'] = msg_id
814 rec['msg_id'] = msg_id
815 self.db.add_record(msg_id, rec)
815 self.db.add_record(msg_id, rec)
816 # stream
816 # stream
817 d = {}
817 d = {}
818 if msg_type == 'stream':
818 if msg_type == 'stream':
819 name = content['name']
819 name = content['name']
820 s = rec[name] or ''
820 s = rec[name] or ''
821 d[name] = s + content['data']
821 d[name] = s + content['data']
822
822
823 elif msg_type == 'pyerr':
823 elif msg_type == 'pyerr':
824 d['pyerr'] = content
824 d['pyerr'] = content
825 elif msg_type == 'pyin':
825 elif msg_type == 'pyin':
826 d['pyin'] = content['code']
826 d['pyin'] = content['code']
827 else:
827 else:
828 d[msg_type] = content.get('data', '')
828 d[msg_type] = content.get('data', '')
829
829
830 try:
830 try:
831 self.db.update_record(msg_id, d)
831 self.db.update_record(msg_id, d)
832 except Exception:
832 except Exception:
833 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
833 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
834
834
835
835
836
836
837 #-------------------------------------------------------------------------
837 #-------------------------------------------------------------------------
838 # Registration requests
838 # Registration requests
839 #-------------------------------------------------------------------------
839 #-------------------------------------------------------------------------
840
840
841 def connection_request(self, client_id, msg):
841 def connection_request(self, client_id, msg):
842 """Reply with connection addresses for clients."""
842 """Reply with connection addresses for clients."""
843 self.log.info("client::client %r connected", client_id)
843 self.log.info("client::client %r connected", client_id)
844 content = dict(status='ok')
844 content = dict(status='ok')
845 content.update(self.client_info)
845 content.update(self.client_info)
846 jsonable = {}
846 jsonable = {}
847 for k,v in self.keytable.iteritems():
847 for k,v in self.keytable.iteritems():
848 if v not in self.dead_engines:
848 if v not in self.dead_engines:
849 jsonable[str(k)] = v.decode('ascii')
849 jsonable[str(k)] = v.decode('ascii')
850 content['engines'] = jsonable
850 content['engines'] = jsonable
851 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
851 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
852
852
853 def register_engine(self, reg, msg):
853 def register_engine(self, reg, msg):
854 """Register a new engine."""
854 """Register a new engine."""
855 content = msg['content']
855 content = msg['content']
856 try:
856 try:
857 queue = cast_bytes(content['queue'])
857 queue = cast_bytes(content['queue'])
858 except KeyError:
858 except KeyError:
859 self.log.error("registration::queue not specified", exc_info=True)
859 self.log.error("registration::queue not specified", exc_info=True)
860 return
860 return
861 heart = content.get('heartbeat', None)
861 heart = content.get('heartbeat', None)
862 if heart:
862 if heart:
863 heart = cast_bytes(heart)
863 heart = cast_bytes(heart)
864 """register a new engine, and create the socket(s) necessary"""
864 """register a new engine, and create the socket(s) necessary"""
865 eid = self._next_id
865 eid = self._next_id
866 # print (eid, queue, reg, heart)
866 # print (eid, queue, reg, heart)
867
867
868 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
868 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
869
869
870 content = dict(id=eid,status='ok')
870 content = dict(id=eid,status='ok')
871 content.update(self.engine_info)
871 content.update(self.engine_info)
872 # check if requesting available IDs:
872 # check if requesting available IDs:
873 if queue in self.by_ident:
873 if queue in self.by_ident:
874 try:
874 try:
875 raise KeyError("queue_id %r in use" % queue)
875 raise KeyError("queue_id %r in use" % queue)
876 except:
876 except:
877 content = error.wrap_exception()
877 content = error.wrap_exception()
878 self.log.error("queue_id %r in use", queue, exc_info=True)
878 self.log.error("queue_id %r in use", queue, exc_info=True)
879 elif heart in self.hearts: # need to check unique hearts?
879 elif heart in self.hearts: # need to check unique hearts?
880 try:
880 try:
881 raise KeyError("heart_id %r in use" % heart)
881 raise KeyError("heart_id %r in use" % heart)
882 except:
882 except:
883 self.log.error("heart_id %r in use", heart, exc_info=True)
883 self.log.error("heart_id %r in use", heart, exc_info=True)
884 content = error.wrap_exception()
884 content = error.wrap_exception()
885 else:
885 else:
886 for h, pack in self.incoming_registrations.iteritems():
886 for h, pack in self.incoming_registrations.iteritems():
887 if heart == h:
887 if heart == h:
888 try:
888 try:
889 raise KeyError("heart_id %r in use" % heart)
889 raise KeyError("heart_id %r in use" % heart)
890 except:
890 except:
891 self.log.error("heart_id %r in use", heart, exc_info=True)
891 self.log.error("heart_id %r in use", heart, exc_info=True)
892 content = error.wrap_exception()
892 content = error.wrap_exception()
893 break
893 break
894 elif queue == pack[1]:
894 elif queue == pack[1]:
895 try:
895 try:
896 raise KeyError("queue_id %r in use" % queue)
896 raise KeyError("queue_id %r in use" % queue)
897 except:
897 except:
898 self.log.error("queue_id %r in use", queue, exc_info=True)
898 self.log.error("queue_id %r in use", queue, exc_info=True)
899 content = error.wrap_exception()
899 content = error.wrap_exception()
900 break
900 break
901
901
902 msg = self.session.send(self.query, "registration_reply",
902 msg = self.session.send(self.query, "registration_reply",
903 content=content,
903 content=content,
904 ident=reg)
904 ident=reg)
905
905
906 if content['status'] == 'ok':
906 if content['status'] == 'ok':
907 if heart in self.heartmonitor.hearts:
907 if heart in self.heartmonitor.hearts:
908 # already beating
908 # already beating
909 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
909 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
910 self.finish_registration(heart)
910 self.finish_registration(heart)
911 else:
911 else:
912 purge = lambda : self._purge_stalled_registration(heart)
912 purge = lambda : self._purge_stalled_registration(heart)
913 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
913 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
914 dc.start()
914 dc.start()
915 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
915 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
916 else:
916 else:
917 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
917 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
918 return eid
918 return eid
919
919
920 def unregister_engine(self, ident, msg):
920 def unregister_engine(self, ident, msg):
921 """Unregister an engine that explicitly requested to leave."""
921 """Unregister an engine that explicitly requested to leave."""
922 try:
922 try:
923 eid = msg['content']['id']
923 eid = msg['content']['id']
924 except:
924 except:
925 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
925 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
926 return
926 return
927 self.log.info("registration::unregister_engine(%r)", eid)
927 self.log.info("registration::unregister_engine(%r)", eid)
928 # print (eid)
928 # print (eid)
929 uuid = self.keytable[eid]
929 uuid = self.keytable[eid]
930 content=dict(id=eid, queue=uuid.decode('ascii'))
930 content=dict(id=eid, queue=uuid.decode('ascii'))
931 self.dead_engines.add(uuid)
931 self.dead_engines.add(uuid)
932 # self.ids.remove(eid)
932 # self.ids.remove(eid)
933 # uuid = self.keytable.pop(eid)
933 # uuid = self.keytable.pop(eid)
934 #
934 #
935 # ec = self.engines.pop(eid)
935 # ec = self.engines.pop(eid)
936 # self.hearts.pop(ec.heartbeat)
936 # self.hearts.pop(ec.heartbeat)
937 # self.by_ident.pop(ec.queue)
937 # self.by_ident.pop(ec.queue)
938 # self.completed.pop(eid)
938 # self.completed.pop(eid)
939 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
939 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
940 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
940 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
941 dc.start()
941 dc.start()
942 ############## TODO: HANDLE IT ################
942 ############## TODO: HANDLE IT ################
943
943
944 if self.notifier:
944 if self.notifier:
945 self.session.send(self.notifier, "unregistration_notification", content=content)
945 self.session.send(self.notifier, "unregistration_notification", content=content)
946
946
947 def _handle_stranded_msgs(self, eid, uuid):
947 def _handle_stranded_msgs(self, eid, uuid):
948 """Handle messages known to be on an engine when the engine unregisters.
948 """Handle messages known to be on an engine when the engine unregisters.
949
949
950 It is possible that this will fire prematurely - that is, an engine will
950 It is possible that this will fire prematurely - that is, an engine will
951 go down after completing a result, and the client will be notified
951 go down after completing a result, and the client will be notified
952 that the result failed and later receive the actual result.
952 that the result failed and later receive the actual result.
953 """
953 """
954
954
955 outstanding = self.queues[eid]
955 outstanding = self.queues[eid]
956
956
957 for msg_id in outstanding:
957 for msg_id in outstanding:
958 self.pending.remove(msg_id)
958 self.pending.remove(msg_id)
959 self.all_completed.add(msg_id)
959 self.all_completed.add(msg_id)
960 try:
960 try:
961 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
961 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
962 except:
962 except:
963 content = error.wrap_exception()
963 content = error.wrap_exception()
964 # build a fake header:
964 # build a fake header:
965 header = {}
965 header = {}
966 header['engine'] = uuid
966 header['engine'] = uuid
967 header['date'] = datetime.now()
967 header['date'] = datetime.now()
968 rec = dict(result_content=content, result_header=header, result_buffers=[])
968 rec = dict(result_content=content, result_header=header, result_buffers=[])
969 rec['completed'] = header['date']
969 rec['completed'] = header['date']
970 rec['engine_uuid'] = uuid
970 rec['engine_uuid'] = uuid
971 try:
971 try:
972 self.db.update_record(msg_id, rec)
972 self.db.update_record(msg_id, rec)
973 except Exception:
973 except Exception:
974 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
974 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
975
975
976
976
977 def finish_registration(self, heart):
977 def finish_registration(self, heart):
978 """Second half of engine registration, called after our HeartMonitor
978 """Second half of engine registration, called after our HeartMonitor
979 has received a beat from the Engine's Heart."""
979 has received a beat from the Engine's Heart."""
980 try:
980 try:
981 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
981 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
982 except KeyError:
982 except KeyError:
983 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
983 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
984 return
984 return
985 self.log.info("registration::finished registering engine %i:%r", eid, queue)
985 self.log.info("registration::finished registering engine %i:%r", eid, queue)
986 if purge is not None:
986 if purge is not None:
987 purge.stop()
987 purge.stop()
988 control = queue
988 control = queue
989 self.ids.add(eid)
989 self.ids.add(eid)
990 self.keytable[eid] = queue
990 self.keytable[eid] = queue
991 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
991 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
992 control=control, heartbeat=heart)
992 control=control, heartbeat=heart)
993 self.by_ident[queue] = eid
993 self.by_ident[queue] = eid
994 self.queues[eid] = list()
994 self.queues[eid] = list()
995 self.tasks[eid] = list()
995 self.tasks[eid] = list()
996 self.completed[eid] = list()
996 self.completed[eid] = list()
997 self.hearts[heart] = eid
997 self.hearts[heart] = eid
998 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
998 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
999 if self.notifier:
999 if self.notifier:
1000 self.session.send(self.notifier, "registration_notification", content=content)
1000 self.session.send(self.notifier, "registration_notification", content=content)
1001 self.log.info("engine::Engine Connected: %i", eid)
1001 self.log.info("engine::Engine Connected: %i", eid)
1002
1002
1003 def _purge_stalled_registration(self, heart):
1003 def _purge_stalled_registration(self, heart):
1004 if heart in self.incoming_registrations:
1004 if heart in self.incoming_registrations:
1005 eid = self.incoming_registrations.pop(heart)[0]
1005 eid = self.incoming_registrations.pop(heart)[0]
1006 self.log.info("registration::purging stalled registration: %i", eid)
1006 self.log.info("registration::purging stalled registration: %i", eid)
1007 else:
1007 else:
1008 pass
1008 pass
1009
1009
1010 #-------------------------------------------------------------------------
1010 #-------------------------------------------------------------------------
1011 # Client Requests
1011 # Client Requests
1012 #-------------------------------------------------------------------------
1012 #-------------------------------------------------------------------------
1013
1013
1014 def shutdown_request(self, client_id, msg):
1014 def shutdown_request(self, client_id, msg):
1015 """handle shutdown request."""
1015 """handle shutdown request."""
1016 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1016 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1017 # also notify other clients of shutdown
1017 # also notify other clients of shutdown
1018 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1018 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1019 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1019 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1020 dc.start()
1020 dc.start()
1021
1021
1022 def _shutdown(self):
1022 def _shutdown(self):
1023 self.log.info("hub::hub shutting down.")
1023 self.log.info("hub::hub shutting down.")
1024 time.sleep(0.1)
1024 time.sleep(0.1)
1025 sys.exit(0)
1025 sys.exit(0)
1026
1026
1027
1027
1028 def check_load(self, client_id, msg):
1028 def check_load(self, client_id, msg):
1029 content = msg['content']
1029 content = msg['content']
1030 try:
1030 try:
1031 targets = content['targets']
1031 targets = content['targets']
1032 targets = self._validate_targets(targets)
1032 targets = self._validate_targets(targets)
1033 except:
1033 except:
1034 content = error.wrap_exception()
1034 content = error.wrap_exception()
1035 self.session.send(self.query, "hub_error",
1035 self.session.send(self.query, "hub_error",
1036 content=content, ident=client_id)
1036 content=content, ident=client_id)
1037 return
1037 return
1038
1038
1039 content = dict(status='ok')
1039 content = dict(status='ok')
1040 # loads = {}
1040 # loads = {}
1041 for t in targets:
1041 for t in targets:
1042 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1042 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1043 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1043 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1044
1044
1045
1045
1046 def queue_status(self, client_id, msg):
1046 def queue_status(self, client_id, msg):
1047 """Return the Queue status of one or more targets.
1047 """Return the Queue status of one or more targets.
1048 if verbose: return the msg_ids
1048 if verbose: return the msg_ids
1049 else: return len of each type.
1049 else: return len of each type.
1050 keys: queue (pending MUX jobs)
1050 keys: queue (pending MUX jobs)
1051 tasks (pending Task jobs)
1051 tasks (pending Task jobs)
1052 completed (finished jobs from both queues)"""
1052 completed (finished jobs from both queues)"""
1053 content = msg['content']
1053 content = msg['content']
1054 targets = content['targets']
1054 targets = content['targets']
1055 try:
1055 try:
1056 targets = self._validate_targets(targets)
1056 targets = self._validate_targets(targets)
1057 except:
1057 except:
1058 content = error.wrap_exception()
1058 content = error.wrap_exception()
1059 self.session.send(self.query, "hub_error",
1059 self.session.send(self.query, "hub_error",
1060 content=content, ident=client_id)
1060 content=content, ident=client_id)
1061 return
1061 return
1062 verbose = content.get('verbose', False)
1062 verbose = content.get('verbose', False)
1063 content = dict(status='ok')
1063 content = dict(status='ok')
1064 for t in targets:
1064 for t in targets:
1065 queue = self.queues[t]
1065 queue = self.queues[t]
1066 completed = self.completed[t]
1066 completed = self.completed[t]
1067 tasks = self.tasks[t]
1067 tasks = self.tasks[t]
1068 if not verbose:
1068 if not verbose:
1069 queue = len(queue)
1069 queue = len(queue)
1070 completed = len(completed)
1070 completed = len(completed)
1071 tasks = len(tasks)
1071 tasks = len(tasks)
1072 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1072 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1073 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1073 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1074 # print (content)
1074 # print (content)
1075 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1075 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1076
1076
1077 def purge_results(self, client_id, msg):
1077 def purge_results(self, client_id, msg):
1078 """Purge results from memory. This method is more valuable before we move
1078 """Purge results from memory. This method is more valuable before we move
1079 to a DB based message storage mechanism."""
1079 to a DB based message storage mechanism."""
1080 content = msg['content']
1080 content = msg['content']
1081 self.log.info("Dropping records with %s", content)
1081 self.log.info("Dropping records with %s", content)
1082 msg_ids = content.get('msg_ids', [])
1082 msg_ids = content.get('msg_ids', [])
1083 reply = dict(status='ok')
1083 reply = dict(status='ok')
1084 if msg_ids == 'all':
1084 if msg_ids == 'all':
1085 try:
1085 try:
1086 self.db.drop_matching_records(dict(completed={'$ne':None}))
1086 self.db.drop_matching_records(dict(completed={'$ne':None}))
1087 except Exception:
1087 except Exception:
1088 reply = error.wrap_exception()
1088 reply = error.wrap_exception()
1089 else:
1089 else:
1090 pending = filter(lambda m: m in self.pending, msg_ids)
1090 pending = filter(lambda m: m in self.pending, msg_ids)
1091 if pending:
1091 if pending:
1092 try:
1092 try:
1093 raise IndexError("msg pending: %r" % pending[0])
1093 raise IndexError("msg pending: %r" % pending[0])
1094 except:
1094 except:
1095 reply = error.wrap_exception()
1095 reply = error.wrap_exception()
1096 else:
1096 else:
1097 try:
1097 try:
1098 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1098 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1099 except Exception:
1099 except Exception:
1100 reply = error.wrap_exception()
1100 reply = error.wrap_exception()
1101
1101
1102 if reply['status'] == 'ok':
1102 if reply['status'] == 'ok':
1103 eids = content.get('engine_ids', [])
1103 eids = content.get('engine_ids', [])
1104 for eid in eids:
1104 for eid in eids:
1105 if eid not in self.engines:
1105 if eid not in self.engines:
1106 try:
1106 try:
1107 raise IndexError("No such engine: %i" % eid)
1107 raise IndexError("No such engine: %i" % eid)
1108 except:
1108 except:
1109 reply = error.wrap_exception()
1109 reply = error.wrap_exception()
1110 break
1110 break
1111 uid = self.engines[eid].queue
1111 uid = self.engines[eid].queue
1112 try:
1112 try:
1113 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1113 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1114 except Exception:
1114 except Exception:
1115 reply = error.wrap_exception()
1115 reply = error.wrap_exception()
1116 break
1116 break
1117
1117
1118 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1118 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1119
1119
1120 def resubmit_task(self, client_id, msg):
1120 def resubmit_task(self, client_id, msg):
1121 """Resubmit one or more tasks."""
1121 """Resubmit one or more tasks."""
1122 def finish(reply):
1122 def finish(reply):
1123 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1123 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1124
1124
1125 content = msg['content']
1125 content = msg['content']
1126 msg_ids = content['msg_ids']
1126 msg_ids = content['msg_ids']
1127 reply = dict(status='ok')
1127 reply = dict(status='ok')
1128 try:
1128 try:
1129 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1129 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1130 'header', 'content', 'buffers'])
1130 'header', 'content', 'buffers'])
1131 except Exception:
1131 except Exception:
1132 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1132 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1133 return finish(error.wrap_exception())
1133 return finish(error.wrap_exception())
1134
1134
1135 # validate msg_ids
1135 # validate msg_ids
1136 found_ids = [ rec['msg_id'] for rec in records ]
1136 found_ids = [ rec['msg_id'] for rec in records ]
1137 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1137 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1138 if len(records) > len(msg_ids):
1138 if len(records) > len(msg_ids):
1139 try:
1139 try:
1140 raise RuntimeError("DB appears to be in an inconsistent state."
1140 raise RuntimeError("DB appears to be in an inconsistent state."
1141 "More matching records were found than should exist")
1141 "More matching records were found than should exist")
1142 except Exception:
1142 except Exception:
1143 return finish(error.wrap_exception())
1143 return finish(error.wrap_exception())
1144 elif len(records) < len(msg_ids):
1144 elif len(records) < len(msg_ids):
1145 missing = [ m for m in msg_ids if m not in found_ids ]
1145 missing = [ m for m in msg_ids if m not in found_ids ]
1146 try:
1146 try:
1147 raise KeyError("No such msg(s): %r" % missing)
1147 raise KeyError("No such msg(s): %r" % missing)
1148 except KeyError:
1148 except KeyError:
1149 return finish(error.wrap_exception())
1149 return finish(error.wrap_exception())
1150 elif pending_ids:
1150 elif pending_ids:
1151 pass
1151 pass
1152 # no need to raise on resubmit of pending task, now that we
1152 # no need to raise on resubmit of pending task, now that we
1153 # resubmit under new ID, but do we want to raise anyway?
1153 # resubmit under new ID, but do we want to raise anyway?
1154 # msg_id = invalid_ids[0]
1154 # msg_id = invalid_ids[0]
1155 # try:
1155 # try:
1156 # raise ValueError("Task(s) %r appears to be inflight" % )
1156 # raise ValueError("Task(s) %r appears to be inflight" % )
1157 # except Exception:
1157 # except Exception:
1158 # return finish(error.wrap_exception())
1158 # return finish(error.wrap_exception())
1159
1159
1160 # mapping of original IDs to resubmitted IDs
1160 # mapping of original IDs to resubmitted IDs
1161 resubmitted = {}
1161 resubmitted = {}
1162
1162
1163 # send the messages
1163 # send the messages
1164 for rec in records:
1164 for rec in records:
1165 header = rec['header']
1165 header = rec['header']
1166 msg = self.session.msg(header['msg_type'])
1166 msg = self.session.msg(header['msg_type'], parent=header)
1167 msg_id = msg['msg_id']
1167 msg_id = msg['msg_id']
1168 msg['content'] = rec['content']
1168 msg['content'] = rec['content']
1169
1169
1170 # use the old header, but update msg_id and timestamp
1170 # use the old header, but update msg_id and timestamp
1171 fresh = msg['header']
1171 fresh = msg['header']
1172 header['msg_id'] = fresh['msg_id']
1172 header['msg_id'] = fresh['msg_id']
1173 header['date'] = fresh['date']
1173 header['date'] = fresh['date']
1174 msg['header'] = header
1174 msg['header'] = header
1175
1175
1176 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1176 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1177
1177
1178 resubmitted[rec['msg_id']] = msg_id
1178 resubmitted[rec['msg_id']] = msg_id
1179 self.pending.add(msg_id)
1179 self.pending.add(msg_id)
1180 msg['buffers'] = []
1180 msg['buffers'] = rec['buffers']
1181 try:
1181 try:
1182 self.db.add_record(msg_id, init_record(msg))
1182 self.db.add_record(msg_id, init_record(msg))
1183 except Exception:
1183 except Exception:
1184 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1184 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1185
1185
1186 finish(dict(status='ok', resubmitted=resubmitted))
1186 finish(dict(status='ok', resubmitted=resubmitted))
1187
1187
1188 # store the new IDs in the Task DB
1188 # store the new IDs in the Task DB
1189 for msg_id, resubmit_id in resubmitted.iteritems():
1189 for msg_id, resubmit_id in resubmitted.iteritems():
1190 try:
1190 try:
1191 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1191 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1192 except Exception:
1192 except Exception:
1193 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1193 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1194
1194
1195
1195
1196 def _extract_record(self, rec):
1196 def _extract_record(self, rec):
1197 """decompose a TaskRecord dict into subsection of reply for get_result"""
1197 """decompose a TaskRecord dict into subsection of reply for get_result"""
1198 io_dict = {}
1198 io_dict = {}
1199 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1199 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1200 io_dict[key] = rec[key]
1200 io_dict[key] = rec[key]
1201 content = { 'result_content': rec['result_content'],
1201 content = { 'result_content': rec['result_content'],
1202 'header': rec['header'],
1202 'header': rec['header'],
1203 'result_header' : rec['result_header'],
1203 'result_header' : rec['result_header'],
1204 'received' : rec['received'],
1204 'received' : rec['received'],
1205 'io' : io_dict,
1205 'io' : io_dict,
1206 }
1206 }
1207 if rec['result_buffers']:
1207 if rec['result_buffers']:
1208 buffers = map(bytes, rec['result_buffers'])
1208 buffers = map(bytes, rec['result_buffers'])
1209 else:
1209 else:
1210 buffers = []
1210 buffers = []
1211
1211
1212 return content, buffers
1212 return content, buffers
1213
1213
1214 def get_results(self, client_id, msg):
1214 def get_results(self, client_id, msg):
1215 """Get the result of 1 or more messages."""
1215 """Get the result of 1 or more messages."""
1216 content = msg['content']
1216 content = msg['content']
1217 msg_ids = sorted(set(content['msg_ids']))
1217 msg_ids = sorted(set(content['msg_ids']))
1218 statusonly = content.get('status_only', False)
1218 statusonly = content.get('status_only', False)
1219 pending = []
1219 pending = []
1220 completed = []
1220 completed = []
1221 content = dict(status='ok')
1221 content = dict(status='ok')
1222 content['pending'] = pending
1222 content['pending'] = pending
1223 content['completed'] = completed
1223 content['completed'] = completed
1224 buffers = []
1224 buffers = []
1225 if not statusonly:
1225 if not statusonly:
1226 try:
1226 try:
1227 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1227 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1228 # turn match list into dict, for faster lookup
1228 # turn match list into dict, for faster lookup
1229 records = {}
1229 records = {}
1230 for rec in matches:
1230 for rec in matches:
1231 records[rec['msg_id']] = rec
1231 records[rec['msg_id']] = rec
1232 except Exception:
1232 except Exception:
1233 content = error.wrap_exception()
1233 content = error.wrap_exception()
1234 self.session.send(self.query, "result_reply", content=content,
1234 self.session.send(self.query, "result_reply", content=content,
1235 parent=msg, ident=client_id)
1235 parent=msg, ident=client_id)
1236 return
1236 return
1237 else:
1237 else:
1238 records = {}
1238 records = {}
1239 for msg_id in msg_ids:
1239 for msg_id in msg_ids:
1240 if msg_id in self.pending:
1240 if msg_id in self.pending:
1241 pending.append(msg_id)
1241 pending.append(msg_id)
1242 elif msg_id in self.all_completed:
1242 elif msg_id in self.all_completed:
1243 completed.append(msg_id)
1243 completed.append(msg_id)
1244 if not statusonly:
1244 if not statusonly:
1245 c,bufs = self._extract_record(records[msg_id])
1245 c,bufs = self._extract_record(records[msg_id])
1246 content[msg_id] = c
1246 content[msg_id] = c
1247 buffers.extend(bufs)
1247 buffers.extend(bufs)
1248 elif msg_id in records:
1248 elif msg_id in records:
1249 if rec['completed']:
1249 if rec['completed']:
1250 completed.append(msg_id)
1250 completed.append(msg_id)
1251 c,bufs = self._extract_record(records[msg_id])
1251 c,bufs = self._extract_record(records[msg_id])
1252 content[msg_id] = c
1252 content[msg_id] = c
1253 buffers.extend(bufs)
1253 buffers.extend(bufs)
1254 else:
1254 else:
1255 pending.append(msg_id)
1255 pending.append(msg_id)
1256 else:
1256 else:
1257 try:
1257 try:
1258 raise KeyError('No such message: '+msg_id)
1258 raise KeyError('No such message: '+msg_id)
1259 except:
1259 except:
1260 content = error.wrap_exception()
1260 content = error.wrap_exception()
1261 break
1261 break
1262 self.session.send(self.query, "result_reply", content=content,
1262 self.session.send(self.query, "result_reply", content=content,
1263 parent=msg, ident=client_id,
1263 parent=msg, ident=client_id,
1264 buffers=buffers)
1264 buffers=buffers)
1265
1265
1266 def get_history(self, client_id, msg):
1266 def get_history(self, client_id, msg):
1267 """Get a list of all msg_ids in our DB records"""
1267 """Get a list of all msg_ids in our DB records"""
1268 try:
1268 try:
1269 msg_ids = self.db.get_history()
1269 msg_ids = self.db.get_history()
1270 except Exception as e:
1270 except Exception as e:
1271 content = error.wrap_exception()
1271 content = error.wrap_exception()
1272 else:
1272 else:
1273 content = dict(status='ok', history=msg_ids)
1273 content = dict(status='ok', history=msg_ids)
1274
1274
1275 self.session.send(self.query, "history_reply", content=content,
1275 self.session.send(self.query, "history_reply", content=content,
1276 parent=msg, ident=client_id)
1276 parent=msg, ident=client_id)
1277
1277
1278 def db_query(self, client_id, msg):
1278 def db_query(self, client_id, msg):
1279 """Perform a raw query on the task record database."""
1279 """Perform a raw query on the task record database."""
1280 content = msg['content']
1280 content = msg['content']
1281 query = content.get('query', {})
1281 query = content.get('query', {})
1282 keys = content.get('keys', None)
1282 keys = content.get('keys', None)
1283 buffers = []
1283 buffers = []
1284 empty = list()
1284 empty = list()
1285 try:
1285 try:
1286 records = self.db.find_records(query, keys)
1286 records = self.db.find_records(query, keys)
1287 except Exception as e:
1287 except Exception as e:
1288 content = error.wrap_exception()
1288 content = error.wrap_exception()
1289 else:
1289 else:
1290 # extract buffers from reply content:
1290 # extract buffers from reply content:
1291 if keys is not None:
1291 if keys is not None:
1292 buffer_lens = [] if 'buffers' in keys else None
1292 buffer_lens = [] if 'buffers' in keys else None
1293 result_buffer_lens = [] if 'result_buffers' in keys else None
1293 result_buffer_lens = [] if 'result_buffers' in keys else None
1294 else:
1294 else:
1295 buffer_lens = None
1295 buffer_lens = None
1296 result_buffer_lens = None
1296 result_buffer_lens = None
1297
1297
1298 for rec in records:
1298 for rec in records:
1299 # buffers may be None, so double check
1299 # buffers may be None, so double check
1300 b = rec.pop('buffers', empty) or empty
1300 b = rec.pop('buffers', empty) or empty
1301 if buffer_lens is not None:
1301 if buffer_lens is not None:
1302 buffer_lens.append(len(b))
1302 buffer_lens.append(len(b))
1303 buffers.extend(b)
1303 buffers.extend(b)
1304 rb = rec.pop('result_buffers', empty) or empty
1304 rb = rec.pop('result_buffers', empty) or empty
1305 if result_buffer_lens is not None:
1305 if result_buffer_lens is not None:
1306 result_buffer_lens.append(len(rb))
1306 result_buffer_lens.append(len(rb))
1307 buffers.extend(rb)
1307 buffers.extend(rb)
1308 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1308 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1309 result_buffer_lens=result_buffer_lens)
1309 result_buffer_lens=result_buffer_lens)
1310 # self.log.debug (content)
1310 # self.log.debug (content)
1311 self.session.send(self.query, "db_reply", content=content,
1311 self.session.send(self.query, "db_reply", content=content,
1312 parent=msg, ident=client_id,
1312 parent=msg, ident=client_id,
1313 buffers=buffers)
1313 buffers=buffers)
1314
1314
General Comments 0
You need to be logged in to leave comments. Login now