##// END OF EJS Templates
increase registration timeout...
MinRK -
Show More
@@ -1,1415 +1,1415 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import json
21 import json
22 import os
22 import os
23 import sys
23 import sys
24 import time
24 import time
25 from datetime import datetime
25 from datetime import datetime
26
26
27 import zmq
27 import zmq
28 from zmq.eventloop import ioloop
28 from zmq.eventloop import ioloop
29 from zmq.eventloop.zmqstream import ZMQStream
29 from zmq.eventloop.zmqstream import ZMQStream
30
30
31 # internal:
31 # internal:
32 from IPython.utils.importstring import import_item
32 from IPython.utils.importstring import import_item
33 from IPython.utils.localinterfaces import LOCALHOST
33 from IPython.utils.localinterfaces import LOCALHOST
34 from IPython.utils.py3compat import cast_bytes
34 from IPython.utils.py3compat import cast_bytes
35 from IPython.utils.traitlets import (
35 from IPython.utils.traitlets import (
36 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
36 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
37 )
37 )
38
38
39 from IPython.parallel import error, util
39 from IPython.parallel import error, util
40 from IPython.parallel.factory import RegistrationFactory
40 from IPython.parallel.factory import RegistrationFactory
41
41
42 from IPython.kernel.zmq.session import SessionFactory
42 from IPython.kernel.zmq.session import SessionFactory
43
43
44 from .heartmonitor import HeartMonitor
44 from .heartmonitor import HeartMonitor
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Code
47 # Code
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 def _passer(*args, **kwargs):
50 def _passer(*args, **kwargs):
51 return
51 return
52
52
53 def _printer(*args, **kwargs):
53 def _printer(*args, **kwargs):
54 print (args)
54 print (args)
55 print (kwargs)
55 print (kwargs)
56
56
57 def empty_record():
57 def empty_record():
58 """Return an empty dict with all record keys."""
58 """Return an empty dict with all record keys."""
59 return {
59 return {
60 'msg_id' : None,
60 'msg_id' : None,
61 'header' : None,
61 'header' : None,
62 'metadata' : None,
62 'metadata' : None,
63 'content': None,
63 'content': None,
64 'buffers': None,
64 'buffers': None,
65 'submitted': None,
65 'submitted': None,
66 'client_uuid' : None,
66 'client_uuid' : None,
67 'engine_uuid' : None,
67 'engine_uuid' : None,
68 'started': None,
68 'started': None,
69 'completed': None,
69 'completed': None,
70 'resubmitted': None,
70 'resubmitted': None,
71 'received': None,
71 'received': None,
72 'result_header' : None,
72 'result_header' : None,
73 'result_metadata' : None,
73 'result_metadata' : None,
74 'result_content' : None,
74 'result_content' : None,
75 'result_buffers' : None,
75 'result_buffers' : None,
76 'queue' : None,
76 'queue' : None,
77 'pyin' : None,
77 'pyin' : None,
78 'pyout': None,
78 'pyout': None,
79 'pyerr': None,
79 'pyerr': None,
80 'stdout': '',
80 'stdout': '',
81 'stderr': '',
81 'stderr': '',
82 }
82 }
83
83
84 def init_record(msg):
84 def init_record(msg):
85 """Initialize a TaskRecord based on a request."""
85 """Initialize a TaskRecord based on a request."""
86 header = msg['header']
86 header = msg['header']
87 return {
87 return {
88 'msg_id' : header['msg_id'],
88 'msg_id' : header['msg_id'],
89 'header' : header,
89 'header' : header,
90 'content': msg['content'],
90 'content': msg['content'],
91 'metadata': msg['metadata'],
91 'metadata': msg['metadata'],
92 'buffers': msg['buffers'],
92 'buffers': msg['buffers'],
93 'submitted': header['date'],
93 'submitted': header['date'],
94 'client_uuid' : None,
94 'client_uuid' : None,
95 'engine_uuid' : None,
95 'engine_uuid' : None,
96 'started': None,
96 'started': None,
97 'completed': None,
97 'completed': None,
98 'resubmitted': None,
98 'resubmitted': None,
99 'received': None,
99 'received': None,
100 'result_header' : None,
100 'result_header' : None,
101 'result_metadata': None,
101 'result_metadata': None,
102 'result_content' : None,
102 'result_content' : None,
103 'result_buffers' : None,
103 'result_buffers' : None,
104 'queue' : None,
104 'queue' : None,
105 'pyin' : None,
105 'pyin' : None,
106 'pyout': None,
106 'pyout': None,
107 'pyerr': None,
107 'pyerr': None,
108 'stdout': '',
108 'stdout': '',
109 'stderr': '',
109 'stderr': '',
110 }
110 }
111
111
112
112
113 class EngineConnector(HasTraits):
113 class EngineConnector(HasTraits):
114 """A simple object for accessing the various zmq connections of an object.
114 """A simple object for accessing the various zmq connections of an object.
115 Attributes are:
115 Attributes are:
116 id (int): engine ID
116 id (int): engine ID
117 uuid (unicode): engine UUID
117 uuid (unicode): engine UUID
118 pending: set of msg_ids
118 pending: set of msg_ids
119 stallback: DelayedCallback for stalled registration
119 stallback: DelayedCallback for stalled registration
120 """
120 """
121
121
122 id = Integer(0)
122 id = Integer(0)
123 uuid = Unicode()
123 uuid = Unicode()
124 pending = Set()
124 pending = Set()
125 stallback = Instance(ioloop.DelayedCallback)
125 stallback = Instance(ioloop.DelayedCallback)
126
126
127
127
128 _db_shortcuts = {
128 _db_shortcuts = {
129 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
129 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
130 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
130 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
131 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
131 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
132 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
132 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
133 }
133 }
134
134
135 class HubFactory(RegistrationFactory):
135 class HubFactory(RegistrationFactory):
136 """The Configurable for setting up a Hub."""
136 """The Configurable for setting up a Hub."""
137
137
138 # port-pairs for monitoredqueues:
138 # port-pairs for monitoredqueues:
139 hb = Tuple(Integer,Integer,config=True,
139 hb = Tuple(Integer,Integer,config=True,
140 help="""PUB/ROUTER Port pair for Engine heartbeats""")
140 help="""PUB/ROUTER Port pair for Engine heartbeats""")
141 def _hb_default(self):
141 def _hb_default(self):
142 return tuple(util.select_random_ports(2))
142 return tuple(util.select_random_ports(2))
143
143
144 mux = Tuple(Integer,Integer,config=True,
144 mux = Tuple(Integer,Integer,config=True,
145 help="""Client/Engine Port pair for MUX queue""")
145 help="""Client/Engine Port pair for MUX queue""")
146
146
147 def _mux_default(self):
147 def _mux_default(self):
148 return tuple(util.select_random_ports(2))
148 return tuple(util.select_random_ports(2))
149
149
150 task = Tuple(Integer,Integer,config=True,
150 task = Tuple(Integer,Integer,config=True,
151 help="""Client/Engine Port pair for Task queue""")
151 help="""Client/Engine Port pair for Task queue""")
152 def _task_default(self):
152 def _task_default(self):
153 return tuple(util.select_random_ports(2))
153 return tuple(util.select_random_ports(2))
154
154
155 control = Tuple(Integer,Integer,config=True,
155 control = Tuple(Integer,Integer,config=True,
156 help="""Client/Engine Port pair for Control queue""")
156 help="""Client/Engine Port pair for Control queue""")
157
157
158 def _control_default(self):
158 def _control_default(self):
159 return tuple(util.select_random_ports(2))
159 return tuple(util.select_random_ports(2))
160
160
161 iopub = Tuple(Integer,Integer,config=True,
161 iopub = Tuple(Integer,Integer,config=True,
162 help="""Client/Engine Port pair for IOPub relay""")
162 help="""Client/Engine Port pair for IOPub relay""")
163
163
164 def _iopub_default(self):
164 def _iopub_default(self):
165 return tuple(util.select_random_ports(2))
165 return tuple(util.select_random_ports(2))
166
166
167 # single ports:
167 # single ports:
168 mon_port = Integer(config=True,
168 mon_port = Integer(config=True,
169 help="""Monitor (SUB) port for queue traffic""")
169 help="""Monitor (SUB) port for queue traffic""")
170
170
171 def _mon_port_default(self):
171 def _mon_port_default(self):
172 return util.select_random_ports(1)[0]
172 return util.select_random_ports(1)[0]
173
173
174 notifier_port = Integer(config=True,
174 notifier_port = Integer(config=True,
175 help="""PUB port for sending engine status notifications""")
175 help="""PUB port for sending engine status notifications""")
176
176
177 def _notifier_port_default(self):
177 def _notifier_port_default(self):
178 return util.select_random_ports(1)[0]
178 return util.select_random_ports(1)[0]
179
179
180 engine_ip = Unicode(LOCALHOST, config=True,
180 engine_ip = Unicode(LOCALHOST, config=True,
181 help="IP on which to listen for engine connections. [default: loopback]")
181 help="IP on which to listen for engine connections. [default: loopback]")
182 engine_transport = Unicode('tcp', config=True,
182 engine_transport = Unicode('tcp', config=True,
183 help="0MQ transport for engine connections. [default: tcp]")
183 help="0MQ transport for engine connections. [default: tcp]")
184
184
185 client_ip = Unicode(LOCALHOST, config=True,
185 client_ip = Unicode(LOCALHOST, config=True,
186 help="IP on which to listen for client connections. [default: loopback]")
186 help="IP on which to listen for client connections. [default: loopback]")
187 client_transport = Unicode('tcp', config=True,
187 client_transport = Unicode('tcp', config=True,
188 help="0MQ transport for client connections. [default : tcp]")
188 help="0MQ transport for client connections. [default : tcp]")
189
189
190 monitor_ip = Unicode(LOCALHOST, config=True,
190 monitor_ip = Unicode(LOCALHOST, config=True,
191 help="IP on which to listen for monitor messages. [default: loopback]")
191 help="IP on which to listen for monitor messages. [default: loopback]")
192 monitor_transport = Unicode('tcp', config=True,
192 monitor_transport = Unicode('tcp', config=True,
193 help="0MQ transport for monitor messages. [default : tcp]")
193 help="0MQ transport for monitor messages. [default : tcp]")
194
194
195 monitor_url = Unicode('')
195 monitor_url = Unicode('')
196
196
197 db_class = DottedObjectName('NoDB',
197 db_class = DottedObjectName('NoDB',
198 config=True, help="""The class to use for the DB backend
198 config=True, help="""The class to use for the DB backend
199
199
200 Options include:
200 Options include:
201
201
202 SQLiteDB: SQLite
202 SQLiteDB: SQLite
203 MongoDB : use MongoDB
203 MongoDB : use MongoDB
204 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
204 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
205 NoDB : disable database altogether (default)
205 NoDB : disable database altogether (default)
206
206
207 """)
207 """)
208
208
209 # not configurable
209 # not configurable
210 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
210 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
211 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
211 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
212
212
213 def _ip_changed(self, name, old, new):
213 def _ip_changed(self, name, old, new):
214 self.engine_ip = new
214 self.engine_ip = new
215 self.client_ip = new
215 self.client_ip = new
216 self.monitor_ip = new
216 self.monitor_ip = new
217 self._update_monitor_url()
217 self._update_monitor_url()
218
218
219 def _update_monitor_url(self):
219 def _update_monitor_url(self):
220 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
220 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
221
221
222 def _transport_changed(self, name, old, new):
222 def _transport_changed(self, name, old, new):
223 self.engine_transport = new
223 self.engine_transport = new
224 self.client_transport = new
224 self.client_transport = new
225 self.monitor_transport = new
225 self.monitor_transport = new
226 self._update_monitor_url()
226 self._update_monitor_url()
227
227
228 def __init__(self, **kwargs):
228 def __init__(self, **kwargs):
229 super(HubFactory, self).__init__(**kwargs)
229 super(HubFactory, self).__init__(**kwargs)
230 self._update_monitor_url()
230 self._update_monitor_url()
231
231
232
232
233 def construct(self):
233 def construct(self):
234 self.init_hub()
234 self.init_hub()
235
235
236 def start(self):
236 def start(self):
237 self.heartmonitor.start()
237 self.heartmonitor.start()
238 self.log.info("Heartmonitor started")
238 self.log.info("Heartmonitor started")
239
239
240 def client_url(self, channel):
240 def client_url(self, channel):
241 """return full zmq url for a named client channel"""
241 """return full zmq url for a named client channel"""
242 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
242 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
243
243
244 def engine_url(self, channel):
244 def engine_url(self, channel):
245 """return full zmq url for a named engine channel"""
245 """return full zmq url for a named engine channel"""
246 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
246 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
247
247
248 def init_hub(self):
248 def init_hub(self):
249 """construct Hub object"""
249 """construct Hub object"""
250
250
251 ctx = self.context
251 ctx = self.context
252 loop = self.loop
252 loop = self.loop
253
253
254 try:
254 try:
255 scheme = self.config.TaskScheduler.scheme_name
255 scheme = self.config.TaskScheduler.scheme_name
256 except AttributeError:
256 except AttributeError:
257 from .scheduler import TaskScheduler
257 from .scheduler import TaskScheduler
258 scheme = TaskScheduler.scheme_name.get_default_value()
258 scheme = TaskScheduler.scheme_name.get_default_value()
259
259
260 # build connection dicts
260 # build connection dicts
261 engine = self.engine_info = {
261 engine = self.engine_info = {
262 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
262 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
263 'registration' : self.regport,
263 'registration' : self.regport,
264 'control' : self.control[1],
264 'control' : self.control[1],
265 'mux' : self.mux[1],
265 'mux' : self.mux[1],
266 'hb_ping' : self.hb[0],
266 'hb_ping' : self.hb[0],
267 'hb_pong' : self.hb[1],
267 'hb_pong' : self.hb[1],
268 'task' : self.task[1],
268 'task' : self.task[1],
269 'iopub' : self.iopub[1],
269 'iopub' : self.iopub[1],
270 }
270 }
271
271
272 client = self.client_info = {
272 client = self.client_info = {
273 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
273 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
274 'registration' : self.regport,
274 'registration' : self.regport,
275 'control' : self.control[0],
275 'control' : self.control[0],
276 'mux' : self.mux[0],
276 'mux' : self.mux[0],
277 'task' : self.task[0],
277 'task' : self.task[0],
278 'task_scheme' : scheme,
278 'task_scheme' : scheme,
279 'iopub' : self.iopub[0],
279 'iopub' : self.iopub[0],
280 'notification' : self.notifier_port,
280 'notification' : self.notifier_port,
281 }
281 }
282
282
283 self.log.debug("Hub engine addrs: %s", self.engine_info)
283 self.log.debug("Hub engine addrs: %s", self.engine_info)
284 self.log.debug("Hub client addrs: %s", self.client_info)
284 self.log.debug("Hub client addrs: %s", self.client_info)
285
285
286 # Registrar socket
286 # Registrar socket
287 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
287 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
288 q.bind(self.client_url('registration'))
288 q.bind(self.client_url('registration'))
289 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
289 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
290 if self.client_ip != self.engine_ip:
290 if self.client_ip != self.engine_ip:
291 q.bind(self.engine_url('registration'))
291 q.bind(self.engine_url('registration'))
292 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
292 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
293
293
294 ### Engine connections ###
294 ### Engine connections ###
295
295
296 # heartbeat
296 # heartbeat
297 hpub = ctx.socket(zmq.PUB)
297 hpub = ctx.socket(zmq.PUB)
298 hpub.bind(self.engine_url('hb_ping'))
298 hpub.bind(self.engine_url('hb_ping'))
299 hrep = ctx.socket(zmq.ROUTER)
299 hrep = ctx.socket(zmq.ROUTER)
300 hrep.bind(self.engine_url('hb_pong'))
300 hrep.bind(self.engine_url('hb_pong'))
301 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
301 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
302 pingstream=ZMQStream(hpub,loop),
302 pingstream=ZMQStream(hpub,loop),
303 pongstream=ZMQStream(hrep,loop)
303 pongstream=ZMQStream(hrep,loop)
304 )
304 )
305
305
306 ### Client connections ###
306 ### Client connections ###
307
307
308 # Notifier socket
308 # Notifier socket
309 n = ZMQStream(ctx.socket(zmq.PUB), loop)
309 n = ZMQStream(ctx.socket(zmq.PUB), loop)
310 n.bind(self.client_url('notification'))
310 n.bind(self.client_url('notification'))
311
311
312 ### build and launch the queues ###
312 ### build and launch the queues ###
313
313
314 # monitor socket
314 # monitor socket
315 sub = ctx.socket(zmq.SUB)
315 sub = ctx.socket(zmq.SUB)
316 sub.setsockopt(zmq.SUBSCRIBE, b"")
316 sub.setsockopt(zmq.SUBSCRIBE, b"")
317 sub.bind(self.monitor_url)
317 sub.bind(self.monitor_url)
318 sub.bind('inproc://monitor')
318 sub.bind('inproc://monitor')
319 sub = ZMQStream(sub, loop)
319 sub = ZMQStream(sub, loop)
320
320
321 # connect the db
321 # connect the db
322 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
322 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
323 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
323 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
324 self.db = import_item(str(db_class))(session=self.session.session,
324 self.db = import_item(str(db_class))(session=self.session.session,
325 config=self.config, log=self.log)
325 config=self.config, log=self.log)
326 time.sleep(.25)
326 time.sleep(.25)
327
327
328 # resubmit stream
328 # resubmit stream
329 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
329 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
330 url = util.disambiguate_url(self.client_url('task'))
330 url = util.disambiguate_url(self.client_url('task'))
331 r.connect(url)
331 r.connect(url)
332
332
333 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
333 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
334 query=q, notifier=n, resubmit=r, db=self.db,
334 query=q, notifier=n, resubmit=r, db=self.db,
335 engine_info=self.engine_info, client_info=self.client_info,
335 engine_info=self.engine_info, client_info=self.client_info,
336 log=self.log)
336 log=self.log)
337
337
338
338
339 class Hub(SessionFactory):
339 class Hub(SessionFactory):
340 """The IPython Controller Hub with 0MQ connections
340 """The IPython Controller Hub with 0MQ connections
341
341
342 Parameters
342 Parameters
343 ==========
343 ==========
344 loop: zmq IOLoop instance
344 loop: zmq IOLoop instance
345 session: Session object
345 session: Session object
346 <removed> context: zmq context for creating new connections (?)
346 <removed> context: zmq context for creating new connections (?)
347 queue: ZMQStream for monitoring the command queue (SUB)
347 queue: ZMQStream for monitoring the command queue (SUB)
348 query: ZMQStream for engine registration and client queries requests (ROUTER)
348 query: ZMQStream for engine registration and client queries requests (ROUTER)
349 heartbeat: HeartMonitor object checking the pulse of the engines
349 heartbeat: HeartMonitor object checking the pulse of the engines
350 notifier: ZMQStream for broadcasting engine registration changes (PUB)
350 notifier: ZMQStream for broadcasting engine registration changes (PUB)
351 db: connection to db for out of memory logging of commands
351 db: connection to db for out of memory logging of commands
352 NotImplemented
352 NotImplemented
353 engine_info: dict of zmq connection information for engines to connect
353 engine_info: dict of zmq connection information for engines to connect
354 to the queues.
354 to the queues.
355 client_info: dict of zmq connection information for engines to connect
355 client_info: dict of zmq connection information for engines to connect
356 to the queues.
356 to the queues.
357 """
357 """
358
358
359 engine_state_file = Unicode()
359 engine_state_file = Unicode()
360
360
361 # internal data structures:
361 # internal data structures:
362 ids=Set() # engine IDs
362 ids=Set() # engine IDs
363 keytable=Dict()
363 keytable=Dict()
364 by_ident=Dict()
364 by_ident=Dict()
365 engines=Dict()
365 engines=Dict()
366 clients=Dict()
366 clients=Dict()
367 hearts=Dict()
367 hearts=Dict()
368 pending=Set()
368 pending=Set()
369 queues=Dict() # pending msg_ids keyed by engine_id
369 queues=Dict() # pending msg_ids keyed by engine_id
370 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
370 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
371 completed=Dict() # completed msg_ids keyed by engine_id
371 completed=Dict() # completed msg_ids keyed by engine_id
372 all_completed=Set() # completed msg_ids keyed by engine_id
372 all_completed=Set() # completed msg_ids keyed by engine_id
373 dead_engines=Set() # completed msg_ids keyed by engine_id
373 dead_engines=Set() # completed msg_ids keyed by engine_id
374 unassigned=Set() # set of task msg_ds not yet assigned a destination
374 unassigned=Set() # set of task msg_ds not yet assigned a destination
375 incoming_registrations=Dict()
375 incoming_registrations=Dict()
376 registration_timeout=Integer()
376 registration_timeout=Integer()
377 _idcounter=Integer(0)
377 _idcounter=Integer(0)
378
378
379 # objects from constructor:
379 # objects from constructor:
380 query=Instance(ZMQStream)
380 query=Instance(ZMQStream)
381 monitor=Instance(ZMQStream)
381 monitor=Instance(ZMQStream)
382 notifier=Instance(ZMQStream)
382 notifier=Instance(ZMQStream)
383 resubmit=Instance(ZMQStream)
383 resubmit=Instance(ZMQStream)
384 heartmonitor=Instance(HeartMonitor)
384 heartmonitor=Instance(HeartMonitor)
385 db=Instance(object)
385 db=Instance(object)
386 client_info=Dict()
386 client_info=Dict()
387 engine_info=Dict()
387 engine_info=Dict()
388
388
389
389
390 def __init__(self, **kwargs):
390 def __init__(self, **kwargs):
391 """
391 """
392 # universal:
392 # universal:
393 loop: IOLoop for creating future connections
393 loop: IOLoop for creating future connections
394 session: streamsession for sending serialized data
394 session: streamsession for sending serialized data
395 # engine:
395 # engine:
396 queue: ZMQStream for monitoring queue messages
396 queue: ZMQStream for monitoring queue messages
397 query: ZMQStream for engine+client registration and client requests
397 query: ZMQStream for engine+client registration and client requests
398 heartbeat: HeartMonitor object for tracking engines
398 heartbeat: HeartMonitor object for tracking engines
399 # extra:
399 # extra:
400 db: ZMQStream for db connection (NotImplemented)
400 db: ZMQStream for db connection (NotImplemented)
401 engine_info: zmq address/protocol dict for engine connections
401 engine_info: zmq address/protocol dict for engine connections
402 client_info: zmq address/protocol dict for client connections
402 client_info: zmq address/protocol dict for client connections
403 """
403 """
404
404
405 super(Hub, self).__init__(**kwargs)
405 super(Hub, self).__init__(**kwargs)
406 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
406 self.registration_timeout = max(10000, 5*self.heartmonitor.period)
407
407
408 # register our callbacks
408 # register our callbacks
409 self.query.on_recv(self.dispatch_query)
409 self.query.on_recv(self.dispatch_query)
410 self.monitor.on_recv(self.dispatch_monitor_traffic)
410 self.monitor.on_recv(self.dispatch_monitor_traffic)
411
411
412 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
412 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
413 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
413 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
414
414
415 self.monitor_handlers = {b'in' : self.save_queue_request,
415 self.monitor_handlers = {b'in' : self.save_queue_request,
416 b'out': self.save_queue_result,
416 b'out': self.save_queue_result,
417 b'intask': self.save_task_request,
417 b'intask': self.save_task_request,
418 b'outtask': self.save_task_result,
418 b'outtask': self.save_task_result,
419 b'tracktask': self.save_task_destination,
419 b'tracktask': self.save_task_destination,
420 b'incontrol': _passer,
420 b'incontrol': _passer,
421 b'outcontrol': _passer,
421 b'outcontrol': _passer,
422 b'iopub': self.save_iopub_message,
422 b'iopub': self.save_iopub_message,
423 }
423 }
424
424
425 self.query_handlers = {'queue_request': self.queue_status,
425 self.query_handlers = {'queue_request': self.queue_status,
426 'result_request': self.get_results,
426 'result_request': self.get_results,
427 'history_request': self.get_history,
427 'history_request': self.get_history,
428 'db_request': self.db_query,
428 'db_request': self.db_query,
429 'purge_request': self.purge_results,
429 'purge_request': self.purge_results,
430 'load_request': self.check_load,
430 'load_request': self.check_load,
431 'resubmit_request': self.resubmit_task,
431 'resubmit_request': self.resubmit_task,
432 'shutdown_request': self.shutdown_request,
432 'shutdown_request': self.shutdown_request,
433 'registration_request' : self.register_engine,
433 'registration_request' : self.register_engine,
434 'unregistration_request' : self.unregister_engine,
434 'unregistration_request' : self.unregister_engine,
435 'connection_request': self.connection_request,
435 'connection_request': self.connection_request,
436 }
436 }
437
437
438 # ignore resubmit replies
438 # ignore resubmit replies
439 self.resubmit.on_recv(lambda msg: None, copy=False)
439 self.resubmit.on_recv(lambda msg: None, copy=False)
440
440
441 self.log.info("hub::created hub")
441 self.log.info("hub::created hub")
442
442
443 @property
443 @property
444 def _next_id(self):
444 def _next_id(self):
445 """gemerate a new ID.
445 """gemerate a new ID.
446
446
447 No longer reuse old ids, just count from 0."""
447 No longer reuse old ids, just count from 0."""
448 newid = self._idcounter
448 newid = self._idcounter
449 self._idcounter += 1
449 self._idcounter += 1
450 return newid
450 return newid
451 # newid = 0
451 # newid = 0
452 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
452 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
453 # # print newid, self.ids, self.incoming_registrations
453 # # print newid, self.ids, self.incoming_registrations
454 # while newid in self.ids or newid in incoming:
454 # while newid in self.ids or newid in incoming:
455 # newid += 1
455 # newid += 1
456 # return newid
456 # return newid
457
457
458 #-----------------------------------------------------------------------------
458 #-----------------------------------------------------------------------------
459 # message validation
459 # message validation
460 #-----------------------------------------------------------------------------
460 #-----------------------------------------------------------------------------
461
461
462 def _validate_targets(self, targets):
462 def _validate_targets(self, targets):
463 """turn any valid targets argument into a list of integer ids"""
463 """turn any valid targets argument into a list of integer ids"""
464 if targets is None:
464 if targets is None:
465 # default to all
465 # default to all
466 return self.ids
466 return self.ids
467
467
468 if isinstance(targets, (int,str,unicode)):
468 if isinstance(targets, (int,str,unicode)):
469 # only one target specified
469 # only one target specified
470 targets = [targets]
470 targets = [targets]
471 _targets = []
471 _targets = []
472 for t in targets:
472 for t in targets:
473 # map raw identities to ids
473 # map raw identities to ids
474 if isinstance(t, (str,unicode)):
474 if isinstance(t, (str,unicode)):
475 t = self.by_ident.get(cast_bytes(t), t)
475 t = self.by_ident.get(cast_bytes(t), t)
476 _targets.append(t)
476 _targets.append(t)
477 targets = _targets
477 targets = _targets
478 bad_targets = [ t for t in targets if t not in self.ids ]
478 bad_targets = [ t for t in targets if t not in self.ids ]
479 if bad_targets:
479 if bad_targets:
480 raise IndexError("No Such Engine: %r" % bad_targets)
480 raise IndexError("No Such Engine: %r" % bad_targets)
481 if not targets:
481 if not targets:
482 raise IndexError("No Engines Registered")
482 raise IndexError("No Engines Registered")
483 return targets
483 return targets
484
484
485 #-----------------------------------------------------------------------------
485 #-----------------------------------------------------------------------------
486 # dispatch methods (1 per stream)
486 # dispatch methods (1 per stream)
487 #-----------------------------------------------------------------------------
487 #-----------------------------------------------------------------------------
488
488
489
489
490 @util.log_errors
490 @util.log_errors
491 def dispatch_monitor_traffic(self, msg):
491 def dispatch_monitor_traffic(self, msg):
492 """all ME and Task queue messages come through here, as well as
492 """all ME and Task queue messages come through here, as well as
493 IOPub traffic."""
493 IOPub traffic."""
494 self.log.debug("monitor traffic: %r", msg[0])
494 self.log.debug("monitor traffic: %r", msg[0])
495 switch = msg[0]
495 switch = msg[0]
496 try:
496 try:
497 idents, msg = self.session.feed_identities(msg[1:])
497 idents, msg = self.session.feed_identities(msg[1:])
498 except ValueError:
498 except ValueError:
499 idents=[]
499 idents=[]
500 if not idents:
500 if not idents:
501 self.log.error("Monitor message without topic: %r", msg)
501 self.log.error("Monitor message without topic: %r", msg)
502 return
502 return
503 handler = self.monitor_handlers.get(switch, None)
503 handler = self.monitor_handlers.get(switch, None)
504 if handler is not None:
504 if handler is not None:
505 handler(idents, msg)
505 handler(idents, msg)
506 else:
506 else:
507 self.log.error("Unrecognized monitor topic: %r", switch)
507 self.log.error("Unrecognized monitor topic: %r", switch)
508
508
509
509
510 @util.log_errors
510 @util.log_errors
511 def dispatch_query(self, msg):
511 def dispatch_query(self, msg):
512 """Route registration requests and queries from clients."""
512 """Route registration requests and queries from clients."""
513 try:
513 try:
514 idents, msg = self.session.feed_identities(msg)
514 idents, msg = self.session.feed_identities(msg)
515 except ValueError:
515 except ValueError:
516 idents = []
516 idents = []
517 if not idents:
517 if not idents:
518 self.log.error("Bad Query Message: %r", msg)
518 self.log.error("Bad Query Message: %r", msg)
519 return
519 return
520 client_id = idents[0]
520 client_id = idents[0]
521 try:
521 try:
522 msg = self.session.unserialize(msg, content=True)
522 msg = self.session.unserialize(msg, content=True)
523 except Exception:
523 except Exception:
524 content = error.wrap_exception()
524 content = error.wrap_exception()
525 self.log.error("Bad Query Message: %r", msg, exc_info=True)
525 self.log.error("Bad Query Message: %r", msg, exc_info=True)
526 self.session.send(self.query, "hub_error", ident=client_id,
526 self.session.send(self.query, "hub_error", ident=client_id,
527 content=content)
527 content=content)
528 return
528 return
529 # print client_id, header, parent, content
529 # print client_id, header, parent, content
530 #switch on message type:
530 #switch on message type:
531 msg_type = msg['header']['msg_type']
531 msg_type = msg['header']['msg_type']
532 self.log.info("client::client %r requested %r", client_id, msg_type)
532 self.log.info("client::client %r requested %r", client_id, msg_type)
533 handler = self.query_handlers.get(msg_type, None)
533 handler = self.query_handlers.get(msg_type, None)
534 try:
534 try:
535 assert handler is not None, "Bad Message Type: %r" % msg_type
535 assert handler is not None, "Bad Message Type: %r" % msg_type
536 except:
536 except:
537 content = error.wrap_exception()
537 content = error.wrap_exception()
538 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
538 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
539 self.session.send(self.query, "hub_error", ident=client_id,
539 self.session.send(self.query, "hub_error", ident=client_id,
540 content=content)
540 content=content)
541 return
541 return
542
542
543 else:
543 else:
544 handler(idents, msg)
544 handler(idents, msg)
545
545
546 def dispatch_db(self, msg):
546 def dispatch_db(self, msg):
547 """"""
547 """"""
548 raise NotImplementedError
548 raise NotImplementedError
549
549
550 #---------------------------------------------------------------------------
550 #---------------------------------------------------------------------------
551 # handler methods (1 per event)
551 # handler methods (1 per event)
552 #---------------------------------------------------------------------------
552 #---------------------------------------------------------------------------
553
553
554 #----------------------- Heartbeat --------------------------------------
554 #----------------------- Heartbeat --------------------------------------
555
555
556 def handle_new_heart(self, heart):
556 def handle_new_heart(self, heart):
557 """handler to attach to heartbeater.
557 """handler to attach to heartbeater.
558 Called when a new heart starts to beat.
558 Called when a new heart starts to beat.
559 Triggers completion of registration."""
559 Triggers completion of registration."""
560 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
560 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
561 if heart not in self.incoming_registrations:
561 if heart not in self.incoming_registrations:
562 self.log.info("heartbeat::ignoring new heart: %r", heart)
562 self.log.info("heartbeat::ignoring new heart: %r", heart)
563 else:
563 else:
564 self.finish_registration(heart)
564 self.finish_registration(heart)
565
565
566
566
567 def handle_heart_failure(self, heart):
567 def handle_heart_failure(self, heart):
568 """handler to attach to heartbeater.
568 """handler to attach to heartbeater.
569 called when a previously registered heart fails to respond to beat request.
569 called when a previously registered heart fails to respond to beat request.
570 triggers unregistration"""
570 triggers unregistration"""
571 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
571 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
572 eid = self.hearts.get(heart, None)
572 eid = self.hearts.get(heart, None)
573 uuid = self.engines[eid].uuid
573 uuid = self.engines[eid].uuid
574 if eid is None or self.keytable[eid] in self.dead_engines:
574 if eid is None or self.keytable[eid] in self.dead_engines:
575 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
575 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
576 else:
576 else:
577 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
577 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
578
578
579 #----------------------- MUX Queue Traffic ------------------------------
579 #----------------------- MUX Queue Traffic ------------------------------
580
580
581 def save_queue_request(self, idents, msg):
581 def save_queue_request(self, idents, msg):
582 if len(idents) < 2:
582 if len(idents) < 2:
583 self.log.error("invalid identity prefix: %r", idents)
583 self.log.error("invalid identity prefix: %r", idents)
584 return
584 return
585 queue_id, client_id = idents[:2]
585 queue_id, client_id = idents[:2]
586 try:
586 try:
587 msg = self.session.unserialize(msg)
587 msg = self.session.unserialize(msg)
588 except Exception:
588 except Exception:
589 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
589 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
590 return
590 return
591
591
592 eid = self.by_ident.get(queue_id, None)
592 eid = self.by_ident.get(queue_id, None)
593 if eid is None:
593 if eid is None:
594 self.log.error("queue::target %r not registered", queue_id)
594 self.log.error("queue::target %r not registered", queue_id)
595 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
595 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
596 return
596 return
597 record = init_record(msg)
597 record = init_record(msg)
598 msg_id = record['msg_id']
598 msg_id = record['msg_id']
599 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
599 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
600 # Unicode in records
600 # Unicode in records
601 record['engine_uuid'] = queue_id.decode('ascii')
601 record['engine_uuid'] = queue_id.decode('ascii')
602 record['client_uuid'] = msg['header']['session']
602 record['client_uuid'] = msg['header']['session']
603 record['queue'] = 'mux'
603 record['queue'] = 'mux'
604
604
605 try:
605 try:
606 # it's posible iopub arrived first:
606 # it's posible iopub arrived first:
607 existing = self.db.get_record(msg_id)
607 existing = self.db.get_record(msg_id)
608 for key,evalue in existing.iteritems():
608 for key,evalue in existing.iteritems():
609 rvalue = record.get(key, None)
609 rvalue = record.get(key, None)
610 if evalue and rvalue and evalue != rvalue:
610 if evalue and rvalue and evalue != rvalue:
611 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
611 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
612 elif evalue and not rvalue:
612 elif evalue and not rvalue:
613 record[key] = evalue
613 record[key] = evalue
614 try:
614 try:
615 self.db.update_record(msg_id, record)
615 self.db.update_record(msg_id, record)
616 except Exception:
616 except Exception:
617 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
617 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
618 except KeyError:
618 except KeyError:
619 try:
619 try:
620 self.db.add_record(msg_id, record)
620 self.db.add_record(msg_id, record)
621 except Exception:
621 except Exception:
622 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
622 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
623
623
624
624
625 self.pending.add(msg_id)
625 self.pending.add(msg_id)
626 self.queues[eid].append(msg_id)
626 self.queues[eid].append(msg_id)
627
627
628 def save_queue_result(self, idents, msg):
628 def save_queue_result(self, idents, msg):
629 if len(idents) < 2:
629 if len(idents) < 2:
630 self.log.error("invalid identity prefix: %r", idents)
630 self.log.error("invalid identity prefix: %r", idents)
631 return
631 return
632
632
633 client_id, queue_id = idents[:2]
633 client_id, queue_id = idents[:2]
634 try:
634 try:
635 msg = self.session.unserialize(msg)
635 msg = self.session.unserialize(msg)
636 except Exception:
636 except Exception:
637 self.log.error("queue::engine %r sent invalid message to %r: %r",
637 self.log.error("queue::engine %r sent invalid message to %r: %r",
638 queue_id, client_id, msg, exc_info=True)
638 queue_id, client_id, msg, exc_info=True)
639 return
639 return
640
640
641 eid = self.by_ident.get(queue_id, None)
641 eid = self.by_ident.get(queue_id, None)
642 if eid is None:
642 if eid is None:
643 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
643 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
644 return
644 return
645
645
646 parent = msg['parent_header']
646 parent = msg['parent_header']
647 if not parent:
647 if not parent:
648 return
648 return
649 msg_id = parent['msg_id']
649 msg_id = parent['msg_id']
650 if msg_id in self.pending:
650 if msg_id in self.pending:
651 self.pending.remove(msg_id)
651 self.pending.remove(msg_id)
652 self.all_completed.add(msg_id)
652 self.all_completed.add(msg_id)
653 self.queues[eid].remove(msg_id)
653 self.queues[eid].remove(msg_id)
654 self.completed[eid].append(msg_id)
654 self.completed[eid].append(msg_id)
655 self.log.info("queue::request %r completed on %s", msg_id, eid)
655 self.log.info("queue::request %r completed on %s", msg_id, eid)
656 elif msg_id not in self.all_completed:
656 elif msg_id not in self.all_completed:
657 # it could be a result from a dead engine that died before delivering the
657 # it could be a result from a dead engine that died before delivering the
658 # result
658 # result
659 self.log.warn("queue:: unknown msg finished %r", msg_id)
659 self.log.warn("queue:: unknown msg finished %r", msg_id)
660 return
660 return
661 # update record anyway, because the unregistration could have been premature
661 # update record anyway, because the unregistration could have been premature
662 rheader = msg['header']
662 rheader = msg['header']
663 md = msg['metadata']
663 md = msg['metadata']
664 completed = rheader['date']
664 completed = rheader['date']
665 started = md.get('started', None)
665 started = md.get('started', None)
666 result = {
666 result = {
667 'result_header' : rheader,
667 'result_header' : rheader,
668 'result_metadata': md,
668 'result_metadata': md,
669 'result_content': msg['content'],
669 'result_content': msg['content'],
670 'received': datetime.now(),
670 'received': datetime.now(),
671 'started' : started,
671 'started' : started,
672 'completed' : completed
672 'completed' : completed
673 }
673 }
674
674
675 result['result_buffers'] = msg['buffers']
675 result['result_buffers'] = msg['buffers']
676 try:
676 try:
677 self.db.update_record(msg_id, result)
677 self.db.update_record(msg_id, result)
678 except Exception:
678 except Exception:
679 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
679 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
680
680
681
681
682 #--------------------- Task Queue Traffic ------------------------------
682 #--------------------- Task Queue Traffic ------------------------------
683
683
684 def save_task_request(self, idents, msg):
684 def save_task_request(self, idents, msg):
685 """Save the submission of a task."""
685 """Save the submission of a task."""
686 client_id = idents[0]
686 client_id = idents[0]
687
687
688 try:
688 try:
689 msg = self.session.unserialize(msg)
689 msg = self.session.unserialize(msg)
690 except Exception:
690 except Exception:
691 self.log.error("task::client %r sent invalid task message: %r",
691 self.log.error("task::client %r sent invalid task message: %r",
692 client_id, msg, exc_info=True)
692 client_id, msg, exc_info=True)
693 return
693 return
694 record = init_record(msg)
694 record = init_record(msg)
695
695
696 record['client_uuid'] = msg['header']['session']
696 record['client_uuid'] = msg['header']['session']
697 record['queue'] = 'task'
697 record['queue'] = 'task'
698 header = msg['header']
698 header = msg['header']
699 msg_id = header['msg_id']
699 msg_id = header['msg_id']
700 self.pending.add(msg_id)
700 self.pending.add(msg_id)
701 self.unassigned.add(msg_id)
701 self.unassigned.add(msg_id)
702 try:
702 try:
703 # it's posible iopub arrived first:
703 # it's posible iopub arrived first:
704 existing = self.db.get_record(msg_id)
704 existing = self.db.get_record(msg_id)
705 if existing['resubmitted']:
705 if existing['resubmitted']:
706 for key in ('submitted', 'client_uuid', 'buffers'):
706 for key in ('submitted', 'client_uuid', 'buffers'):
707 # don't clobber these keys on resubmit
707 # don't clobber these keys on resubmit
708 # submitted and client_uuid should be different
708 # submitted and client_uuid should be different
709 # and buffers might be big, and shouldn't have changed
709 # and buffers might be big, and shouldn't have changed
710 record.pop(key)
710 record.pop(key)
711 # still check content,header which should not change
711 # still check content,header which should not change
712 # but are not expensive to compare as buffers
712 # but are not expensive to compare as buffers
713
713
714 for key,evalue in existing.iteritems():
714 for key,evalue in existing.iteritems():
715 if key.endswith('buffers'):
715 if key.endswith('buffers'):
716 # don't compare buffers
716 # don't compare buffers
717 continue
717 continue
718 rvalue = record.get(key, None)
718 rvalue = record.get(key, None)
719 if evalue and rvalue and evalue != rvalue:
719 if evalue and rvalue and evalue != rvalue:
720 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
720 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
721 elif evalue and not rvalue:
721 elif evalue and not rvalue:
722 record[key] = evalue
722 record[key] = evalue
723 try:
723 try:
724 self.db.update_record(msg_id, record)
724 self.db.update_record(msg_id, record)
725 except Exception:
725 except Exception:
726 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
726 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
727 except KeyError:
727 except KeyError:
728 try:
728 try:
729 self.db.add_record(msg_id, record)
729 self.db.add_record(msg_id, record)
730 except Exception:
730 except Exception:
731 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
731 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
732 except Exception:
732 except Exception:
733 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
733 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
734
734
735 def save_task_result(self, idents, msg):
735 def save_task_result(self, idents, msg):
736 """save the result of a completed task."""
736 """save the result of a completed task."""
737 client_id = idents[0]
737 client_id = idents[0]
738 try:
738 try:
739 msg = self.session.unserialize(msg)
739 msg = self.session.unserialize(msg)
740 except Exception:
740 except Exception:
741 self.log.error("task::invalid task result message send to %r: %r",
741 self.log.error("task::invalid task result message send to %r: %r",
742 client_id, msg, exc_info=True)
742 client_id, msg, exc_info=True)
743 return
743 return
744
744
745 parent = msg['parent_header']
745 parent = msg['parent_header']
746 if not parent:
746 if not parent:
747 # print msg
747 # print msg
748 self.log.warn("Task %r had no parent!", msg)
748 self.log.warn("Task %r had no parent!", msg)
749 return
749 return
750 msg_id = parent['msg_id']
750 msg_id = parent['msg_id']
751 if msg_id in self.unassigned:
751 if msg_id in self.unassigned:
752 self.unassigned.remove(msg_id)
752 self.unassigned.remove(msg_id)
753
753
754 header = msg['header']
754 header = msg['header']
755 md = msg['metadata']
755 md = msg['metadata']
756 engine_uuid = md.get('engine', u'')
756 engine_uuid = md.get('engine', u'')
757 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
757 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
758
758
759 status = md.get('status', None)
759 status = md.get('status', None)
760
760
761 if msg_id in self.pending:
761 if msg_id in self.pending:
762 self.log.info("task::task %r finished on %s", msg_id, eid)
762 self.log.info("task::task %r finished on %s", msg_id, eid)
763 self.pending.remove(msg_id)
763 self.pending.remove(msg_id)
764 self.all_completed.add(msg_id)
764 self.all_completed.add(msg_id)
765 if eid is not None:
765 if eid is not None:
766 if status != 'aborted':
766 if status != 'aborted':
767 self.completed[eid].append(msg_id)
767 self.completed[eid].append(msg_id)
768 if msg_id in self.tasks[eid]:
768 if msg_id in self.tasks[eid]:
769 self.tasks[eid].remove(msg_id)
769 self.tasks[eid].remove(msg_id)
770 completed = header['date']
770 completed = header['date']
771 started = md.get('started', None)
771 started = md.get('started', None)
772 result = {
772 result = {
773 'result_header' : header,
773 'result_header' : header,
774 'result_metadata': msg['metadata'],
774 'result_metadata': msg['metadata'],
775 'result_content': msg['content'],
775 'result_content': msg['content'],
776 'started' : started,
776 'started' : started,
777 'completed' : completed,
777 'completed' : completed,
778 'received' : datetime.now(),
778 'received' : datetime.now(),
779 'engine_uuid': engine_uuid,
779 'engine_uuid': engine_uuid,
780 }
780 }
781
781
782 result['result_buffers'] = msg['buffers']
782 result['result_buffers'] = msg['buffers']
783 try:
783 try:
784 self.db.update_record(msg_id, result)
784 self.db.update_record(msg_id, result)
785 except Exception:
785 except Exception:
786 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
786 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
787
787
788 else:
788 else:
789 self.log.debug("task::unknown task %r finished", msg_id)
789 self.log.debug("task::unknown task %r finished", msg_id)
790
790
791 def save_task_destination(self, idents, msg):
791 def save_task_destination(self, idents, msg):
792 try:
792 try:
793 msg = self.session.unserialize(msg, content=True)
793 msg = self.session.unserialize(msg, content=True)
794 except Exception:
794 except Exception:
795 self.log.error("task::invalid task tracking message", exc_info=True)
795 self.log.error("task::invalid task tracking message", exc_info=True)
796 return
796 return
797 content = msg['content']
797 content = msg['content']
798 # print (content)
798 # print (content)
799 msg_id = content['msg_id']
799 msg_id = content['msg_id']
800 engine_uuid = content['engine_id']
800 engine_uuid = content['engine_id']
801 eid = self.by_ident[cast_bytes(engine_uuid)]
801 eid = self.by_ident[cast_bytes(engine_uuid)]
802
802
803 self.log.info("task::task %r arrived on %r", msg_id, eid)
803 self.log.info("task::task %r arrived on %r", msg_id, eid)
804 if msg_id in self.unassigned:
804 if msg_id in self.unassigned:
805 self.unassigned.remove(msg_id)
805 self.unassigned.remove(msg_id)
806 # else:
806 # else:
807 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
807 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
808
808
809 self.tasks[eid].append(msg_id)
809 self.tasks[eid].append(msg_id)
810 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
810 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
811 try:
811 try:
812 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
812 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
813 except Exception:
813 except Exception:
814 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
814 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
815
815
816
816
817 def mia_task_request(self, idents, msg):
817 def mia_task_request(self, idents, msg):
818 raise NotImplementedError
818 raise NotImplementedError
819 client_id = idents[0]
819 client_id = idents[0]
820 # content = dict(mia=self.mia,status='ok')
820 # content = dict(mia=self.mia,status='ok')
821 # self.session.send('mia_reply', content=content, idents=client_id)
821 # self.session.send('mia_reply', content=content, idents=client_id)
822
822
823
823
824 #--------------------- IOPub Traffic ------------------------------
824 #--------------------- IOPub Traffic ------------------------------
825
825
826 def save_iopub_message(self, topics, msg):
826 def save_iopub_message(self, topics, msg):
827 """save an iopub message into the db"""
827 """save an iopub message into the db"""
828 # print (topics)
828 # print (topics)
829 try:
829 try:
830 msg = self.session.unserialize(msg, content=True)
830 msg = self.session.unserialize(msg, content=True)
831 except Exception:
831 except Exception:
832 self.log.error("iopub::invalid IOPub message", exc_info=True)
832 self.log.error("iopub::invalid IOPub message", exc_info=True)
833 return
833 return
834
834
835 parent = msg['parent_header']
835 parent = msg['parent_header']
836 if not parent:
836 if not parent:
837 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
837 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
838 return
838 return
839 msg_id = parent['msg_id']
839 msg_id = parent['msg_id']
840 msg_type = msg['header']['msg_type']
840 msg_type = msg['header']['msg_type']
841 content = msg['content']
841 content = msg['content']
842
842
843 # ensure msg_id is in db
843 # ensure msg_id is in db
844 try:
844 try:
845 rec = self.db.get_record(msg_id)
845 rec = self.db.get_record(msg_id)
846 except KeyError:
846 except KeyError:
847 rec = empty_record()
847 rec = empty_record()
848 rec['msg_id'] = msg_id
848 rec['msg_id'] = msg_id
849 self.db.add_record(msg_id, rec)
849 self.db.add_record(msg_id, rec)
850 # stream
850 # stream
851 d = {}
851 d = {}
852 if msg_type == 'stream':
852 if msg_type == 'stream':
853 name = content['name']
853 name = content['name']
854 s = rec[name] or ''
854 s = rec[name] or ''
855 d[name] = s + content['data']
855 d[name] = s + content['data']
856
856
857 elif msg_type == 'pyerr':
857 elif msg_type == 'pyerr':
858 d['pyerr'] = content
858 d['pyerr'] = content
859 elif msg_type == 'pyin':
859 elif msg_type == 'pyin':
860 d['pyin'] = content['code']
860 d['pyin'] = content['code']
861 elif msg_type in ('display_data', 'pyout'):
861 elif msg_type in ('display_data', 'pyout'):
862 d[msg_type] = content
862 d[msg_type] = content
863 elif msg_type == 'status':
863 elif msg_type == 'status':
864 pass
864 pass
865 elif msg_type == 'data_pub':
865 elif msg_type == 'data_pub':
866 self.log.info("ignored data_pub message for %s" % msg_id)
866 self.log.info("ignored data_pub message for %s" % msg_id)
867 else:
867 else:
868 self.log.warn("unhandled iopub msg_type: %r", msg_type)
868 self.log.warn("unhandled iopub msg_type: %r", msg_type)
869
869
870 if not d:
870 if not d:
871 return
871 return
872
872
873 try:
873 try:
874 self.db.update_record(msg_id, d)
874 self.db.update_record(msg_id, d)
875 except Exception:
875 except Exception:
876 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
876 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
877
877
878
878
879
879
880 #-------------------------------------------------------------------------
880 #-------------------------------------------------------------------------
881 # Registration requests
881 # Registration requests
882 #-------------------------------------------------------------------------
882 #-------------------------------------------------------------------------
883
883
884 def connection_request(self, client_id, msg):
884 def connection_request(self, client_id, msg):
885 """Reply with connection addresses for clients."""
885 """Reply with connection addresses for clients."""
886 self.log.info("client::client %r connected", client_id)
886 self.log.info("client::client %r connected", client_id)
887 content = dict(status='ok')
887 content = dict(status='ok')
888 jsonable = {}
888 jsonable = {}
889 for k,v in self.keytable.iteritems():
889 for k,v in self.keytable.iteritems():
890 if v not in self.dead_engines:
890 if v not in self.dead_engines:
891 jsonable[str(k)] = v
891 jsonable[str(k)] = v
892 content['engines'] = jsonable
892 content['engines'] = jsonable
893 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
893 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
894
894
895 def register_engine(self, reg, msg):
895 def register_engine(self, reg, msg):
896 """Register a new engine."""
896 """Register a new engine."""
897 content = msg['content']
897 content = msg['content']
898 try:
898 try:
899 uuid = content['uuid']
899 uuid = content['uuid']
900 except KeyError:
900 except KeyError:
901 self.log.error("registration::queue not specified", exc_info=True)
901 self.log.error("registration::queue not specified", exc_info=True)
902 return
902 return
903
903
904 eid = self._next_id
904 eid = self._next_id
905
905
906 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
906 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
907
907
908 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
908 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
909 # check if requesting available IDs:
909 # check if requesting available IDs:
910 if cast_bytes(uuid) in self.by_ident:
910 if cast_bytes(uuid) in self.by_ident:
911 try:
911 try:
912 raise KeyError("uuid %r in use" % uuid)
912 raise KeyError("uuid %r in use" % uuid)
913 except:
913 except:
914 content = error.wrap_exception()
914 content = error.wrap_exception()
915 self.log.error("uuid %r in use", uuid, exc_info=True)
915 self.log.error("uuid %r in use", uuid, exc_info=True)
916 else:
916 else:
917 for h, ec in self.incoming_registrations.iteritems():
917 for h, ec in self.incoming_registrations.iteritems():
918 if uuid == h:
918 if uuid == h:
919 try:
919 try:
920 raise KeyError("heart_id %r in use" % uuid)
920 raise KeyError("heart_id %r in use" % uuid)
921 except:
921 except:
922 self.log.error("heart_id %r in use", uuid, exc_info=True)
922 self.log.error("heart_id %r in use", uuid, exc_info=True)
923 content = error.wrap_exception()
923 content = error.wrap_exception()
924 break
924 break
925 elif uuid == ec.uuid:
925 elif uuid == ec.uuid:
926 try:
926 try:
927 raise KeyError("uuid %r in use" % uuid)
927 raise KeyError("uuid %r in use" % uuid)
928 except:
928 except:
929 self.log.error("uuid %r in use", uuid, exc_info=True)
929 self.log.error("uuid %r in use", uuid, exc_info=True)
930 content = error.wrap_exception()
930 content = error.wrap_exception()
931 break
931 break
932
932
933 msg = self.session.send(self.query, "registration_reply",
933 msg = self.session.send(self.query, "registration_reply",
934 content=content,
934 content=content,
935 ident=reg)
935 ident=reg)
936
936
937 heart = cast_bytes(uuid)
937 heart = cast_bytes(uuid)
938
938
939 if content['status'] == 'ok':
939 if content['status'] == 'ok':
940 if heart in self.heartmonitor.hearts:
940 if heart in self.heartmonitor.hearts:
941 # already beating
941 # already beating
942 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
942 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
943 self.finish_registration(heart)
943 self.finish_registration(heart)
944 else:
944 else:
945 purge = lambda : self._purge_stalled_registration(heart)
945 purge = lambda : self._purge_stalled_registration(heart)
946 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
946 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
947 dc.start()
947 dc.start()
948 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
948 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
949 else:
949 else:
950 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
950 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
951
951
952 return eid
952 return eid
953
953
954 def unregister_engine(self, ident, msg):
954 def unregister_engine(self, ident, msg):
955 """Unregister an engine that explicitly requested to leave."""
955 """Unregister an engine that explicitly requested to leave."""
956 try:
956 try:
957 eid = msg['content']['id']
957 eid = msg['content']['id']
958 except:
958 except:
959 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
959 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
960 return
960 return
961 self.log.info("registration::unregister_engine(%r)", eid)
961 self.log.info("registration::unregister_engine(%r)", eid)
962 # print (eid)
962 # print (eid)
963 uuid = self.keytable[eid]
963 uuid = self.keytable[eid]
964 content=dict(id=eid, uuid=uuid)
964 content=dict(id=eid, uuid=uuid)
965 self.dead_engines.add(uuid)
965 self.dead_engines.add(uuid)
966 # self.ids.remove(eid)
966 # self.ids.remove(eid)
967 # uuid = self.keytable.pop(eid)
967 # uuid = self.keytable.pop(eid)
968 #
968 #
969 # ec = self.engines.pop(eid)
969 # ec = self.engines.pop(eid)
970 # self.hearts.pop(ec.heartbeat)
970 # self.hearts.pop(ec.heartbeat)
971 # self.by_ident.pop(ec.queue)
971 # self.by_ident.pop(ec.queue)
972 # self.completed.pop(eid)
972 # self.completed.pop(eid)
973 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
973 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
974 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
974 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
975 dc.start()
975 dc.start()
976 ############## TODO: HANDLE IT ################
976 ############## TODO: HANDLE IT ################
977
977
978 self._save_engine_state()
978 self._save_engine_state()
979
979
980 if self.notifier:
980 if self.notifier:
981 self.session.send(self.notifier, "unregistration_notification", content=content)
981 self.session.send(self.notifier, "unregistration_notification", content=content)
982
982
983 def _handle_stranded_msgs(self, eid, uuid):
983 def _handle_stranded_msgs(self, eid, uuid):
984 """Handle messages known to be on an engine when the engine unregisters.
984 """Handle messages known to be on an engine when the engine unregisters.
985
985
986 It is possible that this will fire prematurely - that is, an engine will
986 It is possible that this will fire prematurely - that is, an engine will
987 go down after completing a result, and the client will be notified
987 go down after completing a result, and the client will be notified
988 that the result failed and later receive the actual result.
988 that the result failed and later receive the actual result.
989 """
989 """
990
990
991 outstanding = self.queues[eid]
991 outstanding = self.queues[eid]
992
992
993 for msg_id in outstanding:
993 for msg_id in outstanding:
994 self.pending.remove(msg_id)
994 self.pending.remove(msg_id)
995 self.all_completed.add(msg_id)
995 self.all_completed.add(msg_id)
996 try:
996 try:
997 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
997 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
998 except:
998 except:
999 content = error.wrap_exception()
999 content = error.wrap_exception()
1000 # build a fake header:
1000 # build a fake header:
1001 header = {}
1001 header = {}
1002 header['engine'] = uuid
1002 header['engine'] = uuid
1003 header['date'] = datetime.now()
1003 header['date'] = datetime.now()
1004 rec = dict(result_content=content, result_header=header, result_buffers=[])
1004 rec = dict(result_content=content, result_header=header, result_buffers=[])
1005 rec['completed'] = header['date']
1005 rec['completed'] = header['date']
1006 rec['engine_uuid'] = uuid
1006 rec['engine_uuid'] = uuid
1007 try:
1007 try:
1008 self.db.update_record(msg_id, rec)
1008 self.db.update_record(msg_id, rec)
1009 except Exception:
1009 except Exception:
1010 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1010 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1011
1011
1012
1012
1013 def finish_registration(self, heart):
1013 def finish_registration(self, heart):
1014 """Second half of engine registration, called after our HeartMonitor
1014 """Second half of engine registration, called after our HeartMonitor
1015 has received a beat from the Engine's Heart."""
1015 has received a beat from the Engine's Heart."""
1016 try:
1016 try:
1017 ec = self.incoming_registrations.pop(heart)
1017 ec = self.incoming_registrations.pop(heart)
1018 except KeyError:
1018 except KeyError:
1019 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1019 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1020 return
1020 return
1021 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1021 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1022 if ec.stallback is not None:
1022 if ec.stallback is not None:
1023 ec.stallback.stop()
1023 ec.stallback.stop()
1024 eid = ec.id
1024 eid = ec.id
1025 self.ids.add(eid)
1025 self.ids.add(eid)
1026 self.keytable[eid] = ec.uuid
1026 self.keytable[eid] = ec.uuid
1027 self.engines[eid] = ec
1027 self.engines[eid] = ec
1028 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1028 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1029 self.queues[eid] = list()
1029 self.queues[eid] = list()
1030 self.tasks[eid] = list()
1030 self.tasks[eid] = list()
1031 self.completed[eid] = list()
1031 self.completed[eid] = list()
1032 self.hearts[heart] = eid
1032 self.hearts[heart] = eid
1033 content = dict(id=eid, uuid=self.engines[eid].uuid)
1033 content = dict(id=eid, uuid=self.engines[eid].uuid)
1034 if self.notifier:
1034 if self.notifier:
1035 self.session.send(self.notifier, "registration_notification", content=content)
1035 self.session.send(self.notifier, "registration_notification", content=content)
1036 self.log.info("engine::Engine Connected: %i", eid)
1036 self.log.info("engine::Engine Connected: %i", eid)
1037
1037
1038 self._save_engine_state()
1038 self._save_engine_state()
1039
1039
1040 def _purge_stalled_registration(self, heart):
1040 def _purge_stalled_registration(self, heart):
1041 if heart in self.incoming_registrations:
1041 if heart in self.incoming_registrations:
1042 ec = self.incoming_registrations.pop(heart)
1042 ec = self.incoming_registrations.pop(heart)
1043 self.log.info("registration::purging stalled registration: %i", ec.id)
1043 self.log.info("registration::purging stalled registration: %i", ec.id)
1044 else:
1044 else:
1045 pass
1045 pass
1046
1046
1047 #-------------------------------------------------------------------------
1047 #-------------------------------------------------------------------------
1048 # Engine State
1048 # Engine State
1049 #-------------------------------------------------------------------------
1049 #-------------------------------------------------------------------------
1050
1050
1051
1051
1052 def _cleanup_engine_state_file(self):
1052 def _cleanup_engine_state_file(self):
1053 """cleanup engine state mapping"""
1053 """cleanup engine state mapping"""
1054
1054
1055 if os.path.exists(self.engine_state_file):
1055 if os.path.exists(self.engine_state_file):
1056 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1056 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1057 try:
1057 try:
1058 os.remove(self.engine_state_file)
1058 os.remove(self.engine_state_file)
1059 except IOError:
1059 except IOError:
1060 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1060 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1061
1061
1062
1062
1063 def _save_engine_state(self):
1063 def _save_engine_state(self):
1064 """save engine mapping to JSON file"""
1064 """save engine mapping to JSON file"""
1065 if not self.engine_state_file:
1065 if not self.engine_state_file:
1066 return
1066 return
1067 self.log.debug("save engine state to %s" % self.engine_state_file)
1067 self.log.debug("save engine state to %s" % self.engine_state_file)
1068 state = {}
1068 state = {}
1069 engines = {}
1069 engines = {}
1070 for eid, ec in self.engines.iteritems():
1070 for eid, ec in self.engines.iteritems():
1071 if ec.uuid not in self.dead_engines:
1071 if ec.uuid not in self.dead_engines:
1072 engines[eid] = ec.uuid
1072 engines[eid] = ec.uuid
1073
1073
1074 state['engines'] = engines
1074 state['engines'] = engines
1075
1075
1076 state['next_id'] = self._idcounter
1076 state['next_id'] = self._idcounter
1077
1077
1078 with open(self.engine_state_file, 'w') as f:
1078 with open(self.engine_state_file, 'w') as f:
1079 json.dump(state, f)
1079 json.dump(state, f)
1080
1080
1081
1081
1082 def _load_engine_state(self):
1082 def _load_engine_state(self):
1083 """load engine mapping from JSON file"""
1083 """load engine mapping from JSON file"""
1084 if not os.path.exists(self.engine_state_file):
1084 if not os.path.exists(self.engine_state_file):
1085 return
1085 return
1086
1086
1087 self.log.info("loading engine state from %s" % self.engine_state_file)
1087 self.log.info("loading engine state from %s" % self.engine_state_file)
1088
1088
1089 with open(self.engine_state_file) as f:
1089 with open(self.engine_state_file) as f:
1090 state = json.load(f)
1090 state = json.load(f)
1091
1091
1092 save_notifier = self.notifier
1092 save_notifier = self.notifier
1093 self.notifier = None
1093 self.notifier = None
1094 for eid, uuid in state['engines'].iteritems():
1094 for eid, uuid in state['engines'].iteritems():
1095 heart = uuid.encode('ascii')
1095 heart = uuid.encode('ascii')
1096 # start with this heart as current and beating:
1096 # start with this heart as current and beating:
1097 self.heartmonitor.responses.add(heart)
1097 self.heartmonitor.responses.add(heart)
1098 self.heartmonitor.hearts.add(heart)
1098 self.heartmonitor.hearts.add(heart)
1099
1099
1100 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1100 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1101 self.finish_registration(heart)
1101 self.finish_registration(heart)
1102
1102
1103 self.notifier = save_notifier
1103 self.notifier = save_notifier
1104
1104
1105 self._idcounter = state['next_id']
1105 self._idcounter = state['next_id']
1106
1106
1107 #-------------------------------------------------------------------------
1107 #-------------------------------------------------------------------------
1108 # Client Requests
1108 # Client Requests
1109 #-------------------------------------------------------------------------
1109 #-------------------------------------------------------------------------
1110
1110
1111 def shutdown_request(self, client_id, msg):
1111 def shutdown_request(self, client_id, msg):
1112 """handle shutdown request."""
1112 """handle shutdown request."""
1113 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1113 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1114 # also notify other clients of shutdown
1114 # also notify other clients of shutdown
1115 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1115 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1116 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1116 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1117 dc.start()
1117 dc.start()
1118
1118
1119 def _shutdown(self):
1119 def _shutdown(self):
1120 self.log.info("hub::hub shutting down.")
1120 self.log.info("hub::hub shutting down.")
1121 time.sleep(0.1)
1121 time.sleep(0.1)
1122 sys.exit(0)
1122 sys.exit(0)
1123
1123
1124
1124
1125 def check_load(self, client_id, msg):
1125 def check_load(self, client_id, msg):
1126 content = msg['content']
1126 content = msg['content']
1127 try:
1127 try:
1128 targets = content['targets']
1128 targets = content['targets']
1129 targets = self._validate_targets(targets)
1129 targets = self._validate_targets(targets)
1130 except:
1130 except:
1131 content = error.wrap_exception()
1131 content = error.wrap_exception()
1132 self.session.send(self.query, "hub_error",
1132 self.session.send(self.query, "hub_error",
1133 content=content, ident=client_id)
1133 content=content, ident=client_id)
1134 return
1134 return
1135
1135
1136 content = dict(status='ok')
1136 content = dict(status='ok')
1137 # loads = {}
1137 # loads = {}
1138 for t in targets:
1138 for t in targets:
1139 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1139 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1140 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1140 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1141
1141
1142
1142
1143 def queue_status(self, client_id, msg):
1143 def queue_status(self, client_id, msg):
1144 """Return the Queue status of one or more targets.
1144 """Return the Queue status of one or more targets.
1145 if verbose: return the msg_ids
1145 if verbose: return the msg_ids
1146 else: return len of each type.
1146 else: return len of each type.
1147 keys: queue (pending MUX jobs)
1147 keys: queue (pending MUX jobs)
1148 tasks (pending Task jobs)
1148 tasks (pending Task jobs)
1149 completed (finished jobs from both queues)"""
1149 completed (finished jobs from both queues)"""
1150 content = msg['content']
1150 content = msg['content']
1151 targets = content['targets']
1151 targets = content['targets']
1152 try:
1152 try:
1153 targets = self._validate_targets(targets)
1153 targets = self._validate_targets(targets)
1154 except:
1154 except:
1155 content = error.wrap_exception()
1155 content = error.wrap_exception()
1156 self.session.send(self.query, "hub_error",
1156 self.session.send(self.query, "hub_error",
1157 content=content, ident=client_id)
1157 content=content, ident=client_id)
1158 return
1158 return
1159 verbose = content.get('verbose', False)
1159 verbose = content.get('verbose', False)
1160 content = dict(status='ok')
1160 content = dict(status='ok')
1161 for t in targets:
1161 for t in targets:
1162 queue = self.queues[t]
1162 queue = self.queues[t]
1163 completed = self.completed[t]
1163 completed = self.completed[t]
1164 tasks = self.tasks[t]
1164 tasks = self.tasks[t]
1165 if not verbose:
1165 if not verbose:
1166 queue = len(queue)
1166 queue = len(queue)
1167 completed = len(completed)
1167 completed = len(completed)
1168 tasks = len(tasks)
1168 tasks = len(tasks)
1169 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1169 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1170 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1170 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1171 # print (content)
1171 # print (content)
1172 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1172 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1173
1173
1174 def purge_results(self, client_id, msg):
1174 def purge_results(self, client_id, msg):
1175 """Purge results from memory. This method is more valuable before we move
1175 """Purge results from memory. This method is more valuable before we move
1176 to a DB based message storage mechanism."""
1176 to a DB based message storage mechanism."""
1177 content = msg['content']
1177 content = msg['content']
1178 self.log.info("Dropping records with %s", content)
1178 self.log.info("Dropping records with %s", content)
1179 msg_ids = content.get('msg_ids', [])
1179 msg_ids = content.get('msg_ids', [])
1180 reply = dict(status='ok')
1180 reply = dict(status='ok')
1181 if msg_ids == 'all':
1181 if msg_ids == 'all':
1182 try:
1182 try:
1183 self.db.drop_matching_records(dict(completed={'$ne':None}))
1183 self.db.drop_matching_records(dict(completed={'$ne':None}))
1184 except Exception:
1184 except Exception:
1185 reply = error.wrap_exception()
1185 reply = error.wrap_exception()
1186 else:
1186 else:
1187 pending = filter(lambda m: m in self.pending, msg_ids)
1187 pending = filter(lambda m: m in self.pending, msg_ids)
1188 if pending:
1188 if pending:
1189 try:
1189 try:
1190 raise IndexError("msg pending: %r" % pending[0])
1190 raise IndexError("msg pending: %r" % pending[0])
1191 except:
1191 except:
1192 reply = error.wrap_exception()
1192 reply = error.wrap_exception()
1193 else:
1193 else:
1194 try:
1194 try:
1195 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1195 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1196 except Exception:
1196 except Exception:
1197 reply = error.wrap_exception()
1197 reply = error.wrap_exception()
1198
1198
1199 if reply['status'] == 'ok':
1199 if reply['status'] == 'ok':
1200 eids = content.get('engine_ids', [])
1200 eids = content.get('engine_ids', [])
1201 for eid in eids:
1201 for eid in eids:
1202 if eid not in self.engines:
1202 if eid not in self.engines:
1203 try:
1203 try:
1204 raise IndexError("No such engine: %i" % eid)
1204 raise IndexError("No such engine: %i" % eid)
1205 except:
1205 except:
1206 reply = error.wrap_exception()
1206 reply = error.wrap_exception()
1207 break
1207 break
1208 uid = self.engines[eid].uuid
1208 uid = self.engines[eid].uuid
1209 try:
1209 try:
1210 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1210 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1211 except Exception:
1211 except Exception:
1212 reply = error.wrap_exception()
1212 reply = error.wrap_exception()
1213 break
1213 break
1214
1214
1215 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1215 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1216
1216
1217 def resubmit_task(self, client_id, msg):
1217 def resubmit_task(self, client_id, msg):
1218 """Resubmit one or more tasks."""
1218 """Resubmit one or more tasks."""
1219 def finish(reply):
1219 def finish(reply):
1220 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1220 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1221
1221
1222 content = msg['content']
1222 content = msg['content']
1223 msg_ids = content['msg_ids']
1223 msg_ids = content['msg_ids']
1224 reply = dict(status='ok')
1224 reply = dict(status='ok')
1225 try:
1225 try:
1226 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1226 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1227 'header', 'content', 'buffers'])
1227 'header', 'content', 'buffers'])
1228 except Exception:
1228 except Exception:
1229 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1229 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1230 return finish(error.wrap_exception())
1230 return finish(error.wrap_exception())
1231
1231
1232 # validate msg_ids
1232 # validate msg_ids
1233 found_ids = [ rec['msg_id'] for rec in records ]
1233 found_ids = [ rec['msg_id'] for rec in records ]
1234 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1234 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1235 if len(records) > len(msg_ids):
1235 if len(records) > len(msg_ids):
1236 try:
1236 try:
1237 raise RuntimeError("DB appears to be in an inconsistent state."
1237 raise RuntimeError("DB appears to be in an inconsistent state."
1238 "More matching records were found than should exist")
1238 "More matching records were found than should exist")
1239 except Exception:
1239 except Exception:
1240 return finish(error.wrap_exception())
1240 return finish(error.wrap_exception())
1241 elif len(records) < len(msg_ids):
1241 elif len(records) < len(msg_ids):
1242 missing = [ m for m in msg_ids if m not in found_ids ]
1242 missing = [ m for m in msg_ids if m not in found_ids ]
1243 try:
1243 try:
1244 raise KeyError("No such msg(s): %r" % missing)
1244 raise KeyError("No such msg(s): %r" % missing)
1245 except KeyError:
1245 except KeyError:
1246 return finish(error.wrap_exception())
1246 return finish(error.wrap_exception())
1247 elif pending_ids:
1247 elif pending_ids:
1248 pass
1248 pass
1249 # no need to raise on resubmit of pending task, now that we
1249 # no need to raise on resubmit of pending task, now that we
1250 # resubmit under new ID, but do we want to raise anyway?
1250 # resubmit under new ID, but do we want to raise anyway?
1251 # msg_id = invalid_ids[0]
1251 # msg_id = invalid_ids[0]
1252 # try:
1252 # try:
1253 # raise ValueError("Task(s) %r appears to be inflight" % )
1253 # raise ValueError("Task(s) %r appears to be inflight" % )
1254 # except Exception:
1254 # except Exception:
1255 # return finish(error.wrap_exception())
1255 # return finish(error.wrap_exception())
1256
1256
1257 # mapping of original IDs to resubmitted IDs
1257 # mapping of original IDs to resubmitted IDs
1258 resubmitted = {}
1258 resubmitted = {}
1259
1259
1260 # send the messages
1260 # send the messages
1261 for rec in records:
1261 for rec in records:
1262 header = rec['header']
1262 header = rec['header']
1263 msg = self.session.msg(header['msg_type'], parent=header)
1263 msg = self.session.msg(header['msg_type'], parent=header)
1264 msg_id = msg['msg_id']
1264 msg_id = msg['msg_id']
1265 msg['content'] = rec['content']
1265 msg['content'] = rec['content']
1266
1266
1267 # use the old header, but update msg_id and timestamp
1267 # use the old header, but update msg_id and timestamp
1268 fresh = msg['header']
1268 fresh = msg['header']
1269 header['msg_id'] = fresh['msg_id']
1269 header['msg_id'] = fresh['msg_id']
1270 header['date'] = fresh['date']
1270 header['date'] = fresh['date']
1271 msg['header'] = header
1271 msg['header'] = header
1272
1272
1273 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1273 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1274
1274
1275 resubmitted[rec['msg_id']] = msg_id
1275 resubmitted[rec['msg_id']] = msg_id
1276 self.pending.add(msg_id)
1276 self.pending.add(msg_id)
1277 msg['buffers'] = rec['buffers']
1277 msg['buffers'] = rec['buffers']
1278 try:
1278 try:
1279 self.db.add_record(msg_id, init_record(msg))
1279 self.db.add_record(msg_id, init_record(msg))
1280 except Exception:
1280 except Exception:
1281 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1281 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1282 return finish(error.wrap_exception())
1282 return finish(error.wrap_exception())
1283
1283
1284 finish(dict(status='ok', resubmitted=resubmitted))
1284 finish(dict(status='ok', resubmitted=resubmitted))
1285
1285
1286 # store the new IDs in the Task DB
1286 # store the new IDs in the Task DB
1287 for msg_id, resubmit_id in resubmitted.iteritems():
1287 for msg_id, resubmit_id in resubmitted.iteritems():
1288 try:
1288 try:
1289 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1289 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1290 except Exception:
1290 except Exception:
1291 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1291 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1292
1292
1293
1293
1294 def _extract_record(self, rec):
1294 def _extract_record(self, rec):
1295 """decompose a TaskRecord dict into subsection of reply for get_result"""
1295 """decompose a TaskRecord dict into subsection of reply for get_result"""
1296 io_dict = {}
1296 io_dict = {}
1297 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1297 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1298 io_dict[key] = rec[key]
1298 io_dict[key] = rec[key]
1299 content = {
1299 content = {
1300 'header': rec['header'],
1300 'header': rec['header'],
1301 'metadata': rec['metadata'],
1301 'metadata': rec['metadata'],
1302 'result_metadata': rec['result_metadata'],
1302 'result_metadata': rec['result_metadata'],
1303 'result_header' : rec['result_header'],
1303 'result_header' : rec['result_header'],
1304 'result_content': rec['result_content'],
1304 'result_content': rec['result_content'],
1305 'received' : rec['received'],
1305 'received' : rec['received'],
1306 'io' : io_dict,
1306 'io' : io_dict,
1307 }
1307 }
1308 if rec['result_buffers']:
1308 if rec['result_buffers']:
1309 buffers = map(bytes, rec['result_buffers'])
1309 buffers = map(bytes, rec['result_buffers'])
1310 else:
1310 else:
1311 buffers = []
1311 buffers = []
1312
1312
1313 return content, buffers
1313 return content, buffers
1314
1314
1315 def get_results(self, client_id, msg):
1315 def get_results(self, client_id, msg):
1316 """Get the result of 1 or more messages."""
1316 """Get the result of 1 or more messages."""
1317 content = msg['content']
1317 content = msg['content']
1318 msg_ids = sorted(set(content['msg_ids']))
1318 msg_ids = sorted(set(content['msg_ids']))
1319 statusonly = content.get('status_only', False)
1319 statusonly = content.get('status_only', False)
1320 pending = []
1320 pending = []
1321 completed = []
1321 completed = []
1322 content = dict(status='ok')
1322 content = dict(status='ok')
1323 content['pending'] = pending
1323 content['pending'] = pending
1324 content['completed'] = completed
1324 content['completed'] = completed
1325 buffers = []
1325 buffers = []
1326 if not statusonly:
1326 if not statusonly:
1327 try:
1327 try:
1328 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1328 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1329 # turn match list into dict, for faster lookup
1329 # turn match list into dict, for faster lookup
1330 records = {}
1330 records = {}
1331 for rec in matches:
1331 for rec in matches:
1332 records[rec['msg_id']] = rec
1332 records[rec['msg_id']] = rec
1333 except Exception:
1333 except Exception:
1334 content = error.wrap_exception()
1334 content = error.wrap_exception()
1335 self.session.send(self.query, "result_reply", content=content,
1335 self.session.send(self.query, "result_reply", content=content,
1336 parent=msg, ident=client_id)
1336 parent=msg, ident=client_id)
1337 return
1337 return
1338 else:
1338 else:
1339 records = {}
1339 records = {}
1340 for msg_id in msg_ids:
1340 for msg_id in msg_ids:
1341 if msg_id in self.pending:
1341 if msg_id in self.pending:
1342 pending.append(msg_id)
1342 pending.append(msg_id)
1343 elif msg_id in self.all_completed:
1343 elif msg_id in self.all_completed:
1344 completed.append(msg_id)
1344 completed.append(msg_id)
1345 if not statusonly:
1345 if not statusonly:
1346 c,bufs = self._extract_record(records[msg_id])
1346 c,bufs = self._extract_record(records[msg_id])
1347 content[msg_id] = c
1347 content[msg_id] = c
1348 buffers.extend(bufs)
1348 buffers.extend(bufs)
1349 elif msg_id in records:
1349 elif msg_id in records:
1350 if rec['completed']:
1350 if rec['completed']:
1351 completed.append(msg_id)
1351 completed.append(msg_id)
1352 c,bufs = self._extract_record(records[msg_id])
1352 c,bufs = self._extract_record(records[msg_id])
1353 content[msg_id] = c
1353 content[msg_id] = c
1354 buffers.extend(bufs)
1354 buffers.extend(bufs)
1355 else:
1355 else:
1356 pending.append(msg_id)
1356 pending.append(msg_id)
1357 else:
1357 else:
1358 try:
1358 try:
1359 raise KeyError('No such message: '+msg_id)
1359 raise KeyError('No such message: '+msg_id)
1360 except:
1360 except:
1361 content = error.wrap_exception()
1361 content = error.wrap_exception()
1362 break
1362 break
1363 self.session.send(self.query, "result_reply", content=content,
1363 self.session.send(self.query, "result_reply", content=content,
1364 parent=msg, ident=client_id,
1364 parent=msg, ident=client_id,
1365 buffers=buffers)
1365 buffers=buffers)
1366
1366
1367 def get_history(self, client_id, msg):
1367 def get_history(self, client_id, msg):
1368 """Get a list of all msg_ids in our DB records"""
1368 """Get a list of all msg_ids in our DB records"""
1369 try:
1369 try:
1370 msg_ids = self.db.get_history()
1370 msg_ids = self.db.get_history()
1371 except Exception as e:
1371 except Exception as e:
1372 content = error.wrap_exception()
1372 content = error.wrap_exception()
1373 else:
1373 else:
1374 content = dict(status='ok', history=msg_ids)
1374 content = dict(status='ok', history=msg_ids)
1375
1375
1376 self.session.send(self.query, "history_reply", content=content,
1376 self.session.send(self.query, "history_reply", content=content,
1377 parent=msg, ident=client_id)
1377 parent=msg, ident=client_id)
1378
1378
1379 def db_query(self, client_id, msg):
1379 def db_query(self, client_id, msg):
1380 """Perform a raw query on the task record database."""
1380 """Perform a raw query on the task record database."""
1381 content = msg['content']
1381 content = msg['content']
1382 query = content.get('query', {})
1382 query = content.get('query', {})
1383 keys = content.get('keys', None)
1383 keys = content.get('keys', None)
1384 buffers = []
1384 buffers = []
1385 empty = list()
1385 empty = list()
1386 try:
1386 try:
1387 records = self.db.find_records(query, keys)
1387 records = self.db.find_records(query, keys)
1388 except Exception as e:
1388 except Exception as e:
1389 content = error.wrap_exception()
1389 content = error.wrap_exception()
1390 else:
1390 else:
1391 # extract buffers from reply content:
1391 # extract buffers from reply content:
1392 if keys is not None:
1392 if keys is not None:
1393 buffer_lens = [] if 'buffers' in keys else None
1393 buffer_lens = [] if 'buffers' in keys else None
1394 result_buffer_lens = [] if 'result_buffers' in keys else None
1394 result_buffer_lens = [] if 'result_buffers' in keys else None
1395 else:
1395 else:
1396 buffer_lens = None
1396 buffer_lens = None
1397 result_buffer_lens = None
1397 result_buffer_lens = None
1398
1398
1399 for rec in records:
1399 for rec in records:
1400 # buffers may be None, so double check
1400 # buffers may be None, so double check
1401 b = rec.pop('buffers', empty) or empty
1401 b = rec.pop('buffers', empty) or empty
1402 if buffer_lens is not None:
1402 if buffer_lens is not None:
1403 buffer_lens.append(len(b))
1403 buffer_lens.append(len(b))
1404 buffers.extend(b)
1404 buffers.extend(b)
1405 rb = rec.pop('result_buffers', empty) or empty
1405 rb = rec.pop('result_buffers', empty) or empty
1406 if result_buffer_lens is not None:
1406 if result_buffer_lens is not None:
1407 result_buffer_lens.append(len(rb))
1407 result_buffer_lens.append(len(rb))
1408 buffers.extend(rb)
1408 buffers.extend(rb)
1409 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1409 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1410 result_buffer_lens=result_buffer_lens)
1410 result_buffer_lens=result_buffer_lens)
1411 # self.log.debug (content)
1411 # self.log.debug (content)
1412 self.session.send(self.query, "db_reply", content=content,
1412 self.session.send(self.query, "db_reply", content=content,
1413 parent=msg, ident=client_id,
1413 parent=msg, ident=client_id,
1414 buffers=buffers)
1414 buffers=buffers)
1415
1415
General Comments 0
You need to be logged in to leave comments. Login now