##// END OF EJS Templates
fix a couple of datetime entries in the Hub
MinRK -
Show More
@@ -1,1426 +1,1426 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import json
21 import json
22 import os
22 import os
23 import sys
23 import sys
24 import time
24 import time
25 from datetime import datetime
25 from datetime import datetime
26
26
27 import zmq
27 import zmq
28 from zmq.eventloop import ioloop
28 from zmq.eventloop import ioloop
29 from zmq.eventloop.zmqstream import ZMQStream
29 from zmq.eventloop.zmqstream import ZMQStream
30
30
31 # internal:
31 # internal:
32 from IPython.utils.importstring import import_item
32 from IPython.utils.importstring import import_item
33 from IPython.utils.jsonutil import extract_dates
33 from IPython.utils.jsonutil import extract_dates
34 from IPython.utils.localinterfaces import localhost
34 from IPython.utils.localinterfaces import localhost
35 from IPython.utils.py3compat import cast_bytes, unicode_type, iteritems
35 from IPython.utils.py3compat import cast_bytes, unicode_type, iteritems
36 from IPython.utils.traitlets import (
36 from IPython.utils.traitlets import (
37 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
37 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
38 )
38 )
39
39
40 from IPython.parallel import error, util
40 from IPython.parallel import error, util
41 from IPython.parallel.factory import RegistrationFactory
41 from IPython.parallel.factory import RegistrationFactory
42
42
43 from IPython.kernel.zmq.session import SessionFactory
43 from IPython.kernel.zmq.session import SessionFactory
44
44
45 from .heartmonitor import HeartMonitor
45 from .heartmonitor import HeartMonitor
46
46
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48 # Code
48 # Code
49 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
50
50
51 def _passer(*args, **kwargs):
51 def _passer(*args, **kwargs):
52 return
52 return
53
53
54 def _printer(*args, **kwargs):
54 def _printer(*args, **kwargs):
55 print (args)
55 print (args)
56 print (kwargs)
56 print (kwargs)
57
57
58 def empty_record():
58 def empty_record():
59 """Return an empty dict with all record keys."""
59 """Return an empty dict with all record keys."""
60 return {
60 return {
61 'msg_id' : None,
61 'msg_id' : None,
62 'header' : None,
62 'header' : None,
63 'metadata' : None,
63 'metadata' : None,
64 'content': None,
64 'content': None,
65 'buffers': None,
65 'buffers': None,
66 'submitted': None,
66 'submitted': None,
67 'client_uuid' : None,
67 'client_uuid' : None,
68 'engine_uuid' : None,
68 'engine_uuid' : None,
69 'started': None,
69 'started': None,
70 'completed': None,
70 'completed': None,
71 'resubmitted': None,
71 'resubmitted': None,
72 'received': None,
72 'received': None,
73 'result_header' : None,
73 'result_header' : None,
74 'result_metadata' : None,
74 'result_metadata' : None,
75 'result_content' : None,
75 'result_content' : None,
76 'result_buffers' : None,
76 'result_buffers' : None,
77 'queue' : None,
77 'queue' : None,
78 'pyin' : None,
78 'pyin' : None,
79 'pyout': None,
79 'pyout': None,
80 'pyerr': None,
80 'pyerr': None,
81 'stdout': '',
81 'stdout': '',
82 'stderr': '',
82 'stderr': '',
83 }
83 }
84
84
85 def init_record(msg):
85 def init_record(msg):
86 """Initialize a TaskRecord based on a request."""
86 """Initialize a TaskRecord based on a request."""
87 header = msg['header']
87 header = msg['header']
88 return {
88 return {
89 'msg_id' : header['msg_id'],
89 'msg_id' : header['msg_id'],
90 'header' : header,
90 'header' : header,
91 'content': msg['content'],
91 'content': msg['content'],
92 'metadata': msg['metadata'],
92 'metadata': msg['metadata'],
93 'buffers': msg['buffers'],
93 'buffers': msg['buffers'],
94 'submitted': header['date'],
94 'submitted': header['date'],
95 'client_uuid' : None,
95 'client_uuid' : None,
96 'engine_uuid' : None,
96 'engine_uuid' : None,
97 'started': None,
97 'started': None,
98 'completed': None,
98 'completed': None,
99 'resubmitted': None,
99 'resubmitted': None,
100 'received': None,
100 'received': None,
101 'result_header' : None,
101 'result_header' : None,
102 'result_metadata': None,
102 'result_metadata': None,
103 'result_content' : None,
103 'result_content' : None,
104 'result_buffers' : None,
104 'result_buffers' : None,
105 'queue' : None,
105 'queue' : None,
106 'pyin' : None,
106 'pyin' : None,
107 'pyout': None,
107 'pyout': None,
108 'pyerr': None,
108 'pyerr': None,
109 'stdout': '',
109 'stdout': '',
110 'stderr': '',
110 'stderr': '',
111 }
111 }
112
112
113
113
114 class EngineConnector(HasTraits):
114 class EngineConnector(HasTraits):
115 """A simple object for accessing the various zmq connections of an object.
115 """A simple object for accessing the various zmq connections of an object.
116 Attributes are:
116 Attributes are:
117 id (int): engine ID
117 id (int): engine ID
118 uuid (unicode): engine UUID
118 uuid (unicode): engine UUID
119 pending: set of msg_ids
119 pending: set of msg_ids
120 stallback: DelayedCallback for stalled registration
120 stallback: DelayedCallback for stalled registration
121 """
121 """
122
122
123 id = Integer(0)
123 id = Integer(0)
124 uuid = Unicode()
124 uuid = Unicode()
125 pending = Set()
125 pending = Set()
126 stallback = Instance(ioloop.DelayedCallback)
126 stallback = Instance(ioloop.DelayedCallback)
127
127
128
128
129 _db_shortcuts = {
129 _db_shortcuts = {
130 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
130 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
131 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
131 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
132 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
132 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
133 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
133 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
134 }
134 }
135
135
136 class HubFactory(RegistrationFactory):
136 class HubFactory(RegistrationFactory):
137 """The Configurable for setting up a Hub."""
137 """The Configurable for setting up a Hub."""
138
138
139 # port-pairs for monitoredqueues:
139 # port-pairs for monitoredqueues:
140 hb = Tuple(Integer,Integer,config=True,
140 hb = Tuple(Integer,Integer,config=True,
141 help="""PUB/ROUTER Port pair for Engine heartbeats""")
141 help="""PUB/ROUTER Port pair for Engine heartbeats""")
142 def _hb_default(self):
142 def _hb_default(self):
143 return tuple(util.select_random_ports(2))
143 return tuple(util.select_random_ports(2))
144
144
145 mux = Tuple(Integer,Integer,config=True,
145 mux = Tuple(Integer,Integer,config=True,
146 help="""Client/Engine Port pair for MUX queue""")
146 help="""Client/Engine Port pair for MUX queue""")
147
147
148 def _mux_default(self):
148 def _mux_default(self):
149 return tuple(util.select_random_ports(2))
149 return tuple(util.select_random_ports(2))
150
150
151 task = Tuple(Integer,Integer,config=True,
151 task = Tuple(Integer,Integer,config=True,
152 help="""Client/Engine Port pair for Task queue""")
152 help="""Client/Engine Port pair for Task queue""")
153 def _task_default(self):
153 def _task_default(self):
154 return tuple(util.select_random_ports(2))
154 return tuple(util.select_random_ports(2))
155
155
156 control = Tuple(Integer,Integer,config=True,
156 control = Tuple(Integer,Integer,config=True,
157 help="""Client/Engine Port pair for Control queue""")
157 help="""Client/Engine Port pair for Control queue""")
158
158
159 def _control_default(self):
159 def _control_default(self):
160 return tuple(util.select_random_ports(2))
160 return tuple(util.select_random_ports(2))
161
161
162 iopub = Tuple(Integer,Integer,config=True,
162 iopub = Tuple(Integer,Integer,config=True,
163 help="""Client/Engine Port pair for IOPub relay""")
163 help="""Client/Engine Port pair for IOPub relay""")
164
164
165 def _iopub_default(self):
165 def _iopub_default(self):
166 return tuple(util.select_random_ports(2))
166 return tuple(util.select_random_ports(2))
167
167
168 # single ports:
168 # single ports:
169 mon_port = Integer(config=True,
169 mon_port = Integer(config=True,
170 help="""Monitor (SUB) port for queue traffic""")
170 help="""Monitor (SUB) port for queue traffic""")
171
171
172 def _mon_port_default(self):
172 def _mon_port_default(self):
173 return util.select_random_ports(1)[0]
173 return util.select_random_ports(1)[0]
174
174
175 notifier_port = Integer(config=True,
175 notifier_port = Integer(config=True,
176 help="""PUB port for sending engine status notifications""")
176 help="""PUB port for sending engine status notifications""")
177
177
178 def _notifier_port_default(self):
178 def _notifier_port_default(self):
179 return util.select_random_ports(1)[0]
179 return util.select_random_ports(1)[0]
180
180
181 engine_ip = Unicode(config=True,
181 engine_ip = Unicode(config=True,
182 help="IP on which to listen for engine connections. [default: loopback]")
182 help="IP on which to listen for engine connections. [default: loopback]")
183 def _engine_ip_default(self):
183 def _engine_ip_default(self):
184 return localhost()
184 return localhost()
185 engine_transport = Unicode('tcp', config=True,
185 engine_transport = Unicode('tcp', config=True,
186 help="0MQ transport for engine connections. [default: tcp]")
186 help="0MQ transport for engine connections. [default: tcp]")
187
187
188 client_ip = Unicode(config=True,
188 client_ip = Unicode(config=True,
189 help="IP on which to listen for client connections. [default: loopback]")
189 help="IP on which to listen for client connections. [default: loopback]")
190 client_transport = Unicode('tcp', config=True,
190 client_transport = Unicode('tcp', config=True,
191 help="0MQ transport for client connections. [default : tcp]")
191 help="0MQ transport for client connections. [default : tcp]")
192
192
193 monitor_ip = Unicode(config=True,
193 monitor_ip = Unicode(config=True,
194 help="IP on which to listen for monitor messages. [default: loopback]")
194 help="IP on which to listen for monitor messages. [default: loopback]")
195 monitor_transport = Unicode('tcp', config=True,
195 monitor_transport = Unicode('tcp', config=True,
196 help="0MQ transport for monitor messages. [default : tcp]")
196 help="0MQ transport for monitor messages. [default : tcp]")
197
197
198 _client_ip_default = _monitor_ip_default = _engine_ip_default
198 _client_ip_default = _monitor_ip_default = _engine_ip_default
199
199
200
200
201 monitor_url = Unicode('')
201 monitor_url = Unicode('')
202
202
203 db_class = DottedObjectName('NoDB',
203 db_class = DottedObjectName('NoDB',
204 config=True, help="""The class to use for the DB backend
204 config=True, help="""The class to use for the DB backend
205
205
206 Options include:
206 Options include:
207
207
208 SQLiteDB: SQLite
208 SQLiteDB: SQLite
209 MongoDB : use MongoDB
209 MongoDB : use MongoDB
210 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
210 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
211 NoDB : disable database altogether (default)
211 NoDB : disable database altogether (default)
212
212
213 """)
213 """)
214
214
215 # not configurable
215 # not configurable
216 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
216 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
217 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
217 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
218
218
219 def _ip_changed(self, name, old, new):
219 def _ip_changed(self, name, old, new):
220 self.engine_ip = new
220 self.engine_ip = new
221 self.client_ip = new
221 self.client_ip = new
222 self.monitor_ip = new
222 self.monitor_ip = new
223 self._update_monitor_url()
223 self._update_monitor_url()
224
224
225 def _update_monitor_url(self):
225 def _update_monitor_url(self):
226 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
226 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
227
227
228 def _transport_changed(self, name, old, new):
228 def _transport_changed(self, name, old, new):
229 self.engine_transport = new
229 self.engine_transport = new
230 self.client_transport = new
230 self.client_transport = new
231 self.monitor_transport = new
231 self.monitor_transport = new
232 self._update_monitor_url()
232 self._update_monitor_url()
233
233
234 def __init__(self, **kwargs):
234 def __init__(self, **kwargs):
235 super(HubFactory, self).__init__(**kwargs)
235 super(HubFactory, self).__init__(**kwargs)
236 self._update_monitor_url()
236 self._update_monitor_url()
237
237
238
238
239 def construct(self):
239 def construct(self):
240 self.init_hub()
240 self.init_hub()
241
241
242 def start(self):
242 def start(self):
243 self.heartmonitor.start()
243 self.heartmonitor.start()
244 self.log.info("Heartmonitor started")
244 self.log.info("Heartmonitor started")
245
245
246 def client_url(self, channel):
246 def client_url(self, channel):
247 """return full zmq url for a named client channel"""
247 """return full zmq url for a named client channel"""
248 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
248 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
249
249
250 def engine_url(self, channel):
250 def engine_url(self, channel):
251 """return full zmq url for a named engine channel"""
251 """return full zmq url for a named engine channel"""
252 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
252 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
253
253
254 def init_hub(self):
254 def init_hub(self):
255 """construct Hub object"""
255 """construct Hub object"""
256
256
257 ctx = self.context
257 ctx = self.context
258 loop = self.loop
258 loop = self.loop
259 if 'TaskScheduler.scheme_name' in self.config:
259 if 'TaskScheduler.scheme_name' in self.config:
260 scheme = self.config.TaskScheduler.scheme_name
260 scheme = self.config.TaskScheduler.scheme_name
261 else:
261 else:
262 from .scheduler import TaskScheduler
262 from .scheduler import TaskScheduler
263 scheme = TaskScheduler.scheme_name.get_default_value()
263 scheme = TaskScheduler.scheme_name.get_default_value()
264
264
265 # build connection dicts
265 # build connection dicts
266 engine = self.engine_info = {
266 engine = self.engine_info = {
267 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
267 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
268 'registration' : self.regport,
268 'registration' : self.regport,
269 'control' : self.control[1],
269 'control' : self.control[1],
270 'mux' : self.mux[1],
270 'mux' : self.mux[1],
271 'hb_ping' : self.hb[0],
271 'hb_ping' : self.hb[0],
272 'hb_pong' : self.hb[1],
272 'hb_pong' : self.hb[1],
273 'task' : self.task[1],
273 'task' : self.task[1],
274 'iopub' : self.iopub[1],
274 'iopub' : self.iopub[1],
275 }
275 }
276
276
277 client = self.client_info = {
277 client = self.client_info = {
278 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
278 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
279 'registration' : self.regport,
279 'registration' : self.regport,
280 'control' : self.control[0],
280 'control' : self.control[0],
281 'mux' : self.mux[0],
281 'mux' : self.mux[0],
282 'task' : self.task[0],
282 'task' : self.task[0],
283 'task_scheme' : scheme,
283 'task_scheme' : scheme,
284 'iopub' : self.iopub[0],
284 'iopub' : self.iopub[0],
285 'notification' : self.notifier_port,
285 'notification' : self.notifier_port,
286 }
286 }
287
287
288 self.log.debug("Hub engine addrs: %s", self.engine_info)
288 self.log.debug("Hub engine addrs: %s", self.engine_info)
289 self.log.debug("Hub client addrs: %s", self.client_info)
289 self.log.debug("Hub client addrs: %s", self.client_info)
290
290
291 # Registrar socket
291 # Registrar socket
292 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
292 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
293 util.set_hwm(q, 0)
293 util.set_hwm(q, 0)
294 q.bind(self.client_url('registration'))
294 q.bind(self.client_url('registration'))
295 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
295 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
296 if self.client_ip != self.engine_ip:
296 if self.client_ip != self.engine_ip:
297 q.bind(self.engine_url('registration'))
297 q.bind(self.engine_url('registration'))
298 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
298 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
299
299
300 ### Engine connections ###
300 ### Engine connections ###
301
301
302 # heartbeat
302 # heartbeat
303 hpub = ctx.socket(zmq.PUB)
303 hpub = ctx.socket(zmq.PUB)
304 hpub.bind(self.engine_url('hb_ping'))
304 hpub.bind(self.engine_url('hb_ping'))
305 hrep = ctx.socket(zmq.ROUTER)
305 hrep = ctx.socket(zmq.ROUTER)
306 util.set_hwm(hrep, 0)
306 util.set_hwm(hrep, 0)
307 hrep.bind(self.engine_url('hb_pong'))
307 hrep.bind(self.engine_url('hb_pong'))
308 self.heartmonitor = HeartMonitor(loop=loop, parent=self, log=self.log,
308 self.heartmonitor = HeartMonitor(loop=loop, parent=self, log=self.log,
309 pingstream=ZMQStream(hpub,loop),
309 pingstream=ZMQStream(hpub,loop),
310 pongstream=ZMQStream(hrep,loop)
310 pongstream=ZMQStream(hrep,loop)
311 )
311 )
312
312
313 ### Client connections ###
313 ### Client connections ###
314
314
315 # Notifier socket
315 # Notifier socket
316 n = ZMQStream(ctx.socket(zmq.PUB), loop)
316 n = ZMQStream(ctx.socket(zmq.PUB), loop)
317 n.bind(self.client_url('notification'))
317 n.bind(self.client_url('notification'))
318
318
319 ### build and launch the queues ###
319 ### build and launch the queues ###
320
320
321 # monitor socket
321 # monitor socket
322 sub = ctx.socket(zmq.SUB)
322 sub = ctx.socket(zmq.SUB)
323 sub.setsockopt(zmq.SUBSCRIBE, b"")
323 sub.setsockopt(zmq.SUBSCRIBE, b"")
324 sub.bind(self.monitor_url)
324 sub.bind(self.monitor_url)
325 sub.bind('inproc://monitor')
325 sub.bind('inproc://monitor')
326 sub = ZMQStream(sub, loop)
326 sub = ZMQStream(sub, loop)
327
327
328 # connect the db
328 # connect the db
329 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
329 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
330 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
330 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
331 self.db = import_item(str(db_class))(session=self.session.session,
331 self.db = import_item(str(db_class))(session=self.session.session,
332 parent=self, log=self.log)
332 parent=self, log=self.log)
333 time.sleep(.25)
333 time.sleep(.25)
334
334
335 # resubmit stream
335 # resubmit stream
336 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
336 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
337 url = util.disambiguate_url(self.client_url('task'))
337 url = util.disambiguate_url(self.client_url('task'))
338 r.connect(url)
338 r.connect(url)
339
339
340 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
340 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
341 query=q, notifier=n, resubmit=r, db=self.db,
341 query=q, notifier=n, resubmit=r, db=self.db,
342 engine_info=self.engine_info, client_info=self.client_info,
342 engine_info=self.engine_info, client_info=self.client_info,
343 log=self.log)
343 log=self.log)
344
344
345
345
346 class Hub(SessionFactory):
346 class Hub(SessionFactory):
347 """The IPython Controller Hub with 0MQ connections
347 """The IPython Controller Hub with 0MQ connections
348
348
349 Parameters
349 Parameters
350 ==========
350 ==========
351 loop: zmq IOLoop instance
351 loop: zmq IOLoop instance
352 session: Session object
352 session: Session object
353 <removed> context: zmq context for creating new connections (?)
353 <removed> context: zmq context for creating new connections (?)
354 queue: ZMQStream for monitoring the command queue (SUB)
354 queue: ZMQStream for monitoring the command queue (SUB)
355 query: ZMQStream for engine registration and client queries requests (ROUTER)
355 query: ZMQStream for engine registration and client queries requests (ROUTER)
356 heartbeat: HeartMonitor object checking the pulse of the engines
356 heartbeat: HeartMonitor object checking the pulse of the engines
357 notifier: ZMQStream for broadcasting engine registration changes (PUB)
357 notifier: ZMQStream for broadcasting engine registration changes (PUB)
358 db: connection to db for out of memory logging of commands
358 db: connection to db for out of memory logging of commands
359 NotImplemented
359 NotImplemented
360 engine_info: dict of zmq connection information for engines to connect
360 engine_info: dict of zmq connection information for engines to connect
361 to the queues.
361 to the queues.
362 client_info: dict of zmq connection information for engines to connect
362 client_info: dict of zmq connection information for engines to connect
363 to the queues.
363 to the queues.
364 """
364 """
365
365
366 engine_state_file = Unicode()
366 engine_state_file = Unicode()
367
367
368 # internal data structures:
368 # internal data structures:
369 ids=Set() # engine IDs
369 ids=Set() # engine IDs
370 keytable=Dict()
370 keytable=Dict()
371 by_ident=Dict()
371 by_ident=Dict()
372 engines=Dict()
372 engines=Dict()
373 clients=Dict()
373 clients=Dict()
374 hearts=Dict()
374 hearts=Dict()
375 pending=Set()
375 pending=Set()
376 queues=Dict() # pending msg_ids keyed by engine_id
376 queues=Dict() # pending msg_ids keyed by engine_id
377 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
377 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
378 completed=Dict() # completed msg_ids keyed by engine_id
378 completed=Dict() # completed msg_ids keyed by engine_id
379 all_completed=Set() # completed msg_ids keyed by engine_id
379 all_completed=Set() # completed msg_ids keyed by engine_id
380 dead_engines=Set() # completed msg_ids keyed by engine_id
380 dead_engines=Set() # completed msg_ids keyed by engine_id
381 unassigned=Set() # set of task msg_ds not yet assigned a destination
381 unassigned=Set() # set of task msg_ds not yet assigned a destination
382 incoming_registrations=Dict()
382 incoming_registrations=Dict()
383 registration_timeout=Integer()
383 registration_timeout=Integer()
384 _idcounter=Integer(0)
384 _idcounter=Integer(0)
385
385
386 # objects from constructor:
386 # objects from constructor:
387 query=Instance(ZMQStream)
387 query=Instance(ZMQStream)
388 monitor=Instance(ZMQStream)
388 monitor=Instance(ZMQStream)
389 notifier=Instance(ZMQStream)
389 notifier=Instance(ZMQStream)
390 resubmit=Instance(ZMQStream)
390 resubmit=Instance(ZMQStream)
391 heartmonitor=Instance(HeartMonitor)
391 heartmonitor=Instance(HeartMonitor)
392 db=Instance(object)
392 db=Instance(object)
393 client_info=Dict()
393 client_info=Dict()
394 engine_info=Dict()
394 engine_info=Dict()
395
395
396
396
397 def __init__(self, **kwargs):
397 def __init__(self, **kwargs):
398 """
398 """
399 # universal:
399 # universal:
400 loop: IOLoop for creating future connections
400 loop: IOLoop for creating future connections
401 session: streamsession for sending serialized data
401 session: streamsession for sending serialized data
402 # engine:
402 # engine:
403 queue: ZMQStream for monitoring queue messages
403 queue: ZMQStream for monitoring queue messages
404 query: ZMQStream for engine+client registration and client requests
404 query: ZMQStream for engine+client registration and client requests
405 heartbeat: HeartMonitor object for tracking engines
405 heartbeat: HeartMonitor object for tracking engines
406 # extra:
406 # extra:
407 db: ZMQStream for db connection (NotImplemented)
407 db: ZMQStream for db connection (NotImplemented)
408 engine_info: zmq address/protocol dict for engine connections
408 engine_info: zmq address/protocol dict for engine connections
409 client_info: zmq address/protocol dict for client connections
409 client_info: zmq address/protocol dict for client connections
410 """
410 """
411
411
412 super(Hub, self).__init__(**kwargs)
412 super(Hub, self).__init__(**kwargs)
413 self.registration_timeout = max(10000, 5*self.heartmonitor.period)
413 self.registration_timeout = max(10000, 5*self.heartmonitor.period)
414
414
415 # register our callbacks
415 # register our callbacks
416 self.query.on_recv(self.dispatch_query)
416 self.query.on_recv(self.dispatch_query)
417 self.monitor.on_recv(self.dispatch_monitor_traffic)
417 self.monitor.on_recv(self.dispatch_monitor_traffic)
418
418
419 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
419 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
420 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
420 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
421
421
422 self.monitor_handlers = {b'in' : self.save_queue_request,
422 self.monitor_handlers = {b'in' : self.save_queue_request,
423 b'out': self.save_queue_result,
423 b'out': self.save_queue_result,
424 b'intask': self.save_task_request,
424 b'intask': self.save_task_request,
425 b'outtask': self.save_task_result,
425 b'outtask': self.save_task_result,
426 b'tracktask': self.save_task_destination,
426 b'tracktask': self.save_task_destination,
427 b'incontrol': _passer,
427 b'incontrol': _passer,
428 b'outcontrol': _passer,
428 b'outcontrol': _passer,
429 b'iopub': self.save_iopub_message,
429 b'iopub': self.save_iopub_message,
430 }
430 }
431
431
432 self.query_handlers = {'queue_request': self.queue_status,
432 self.query_handlers = {'queue_request': self.queue_status,
433 'result_request': self.get_results,
433 'result_request': self.get_results,
434 'history_request': self.get_history,
434 'history_request': self.get_history,
435 'db_request': self.db_query,
435 'db_request': self.db_query,
436 'purge_request': self.purge_results,
436 'purge_request': self.purge_results,
437 'load_request': self.check_load,
437 'load_request': self.check_load,
438 'resubmit_request': self.resubmit_task,
438 'resubmit_request': self.resubmit_task,
439 'shutdown_request': self.shutdown_request,
439 'shutdown_request': self.shutdown_request,
440 'registration_request' : self.register_engine,
440 'registration_request' : self.register_engine,
441 'unregistration_request' : self.unregister_engine,
441 'unregistration_request' : self.unregister_engine,
442 'connection_request': self.connection_request,
442 'connection_request': self.connection_request,
443 }
443 }
444
444
445 # ignore resubmit replies
445 # ignore resubmit replies
446 self.resubmit.on_recv(lambda msg: None, copy=False)
446 self.resubmit.on_recv(lambda msg: None, copy=False)
447
447
448 self.log.info("hub::created hub")
448 self.log.info("hub::created hub")
449
449
450 @property
450 @property
451 def _next_id(self):
451 def _next_id(self):
452 """gemerate a new ID.
452 """gemerate a new ID.
453
453
454 No longer reuse old ids, just count from 0."""
454 No longer reuse old ids, just count from 0."""
455 newid = self._idcounter
455 newid = self._idcounter
456 self._idcounter += 1
456 self._idcounter += 1
457 return newid
457 return newid
458 # newid = 0
458 # newid = 0
459 # incoming = [id[0] for id in itervalues(self.incoming_registrations)]
459 # incoming = [id[0] for id in itervalues(self.incoming_registrations)]
460 # # print newid, self.ids, self.incoming_registrations
460 # # print newid, self.ids, self.incoming_registrations
461 # while newid in self.ids or newid in incoming:
461 # while newid in self.ids or newid in incoming:
462 # newid += 1
462 # newid += 1
463 # return newid
463 # return newid
464
464
465 #-----------------------------------------------------------------------------
465 #-----------------------------------------------------------------------------
466 # message validation
466 # message validation
467 #-----------------------------------------------------------------------------
467 #-----------------------------------------------------------------------------
468
468
469 def _validate_targets(self, targets):
469 def _validate_targets(self, targets):
470 """turn any valid targets argument into a list of integer ids"""
470 """turn any valid targets argument into a list of integer ids"""
471 if targets is None:
471 if targets is None:
472 # default to all
472 # default to all
473 return self.ids
473 return self.ids
474
474
475 if isinstance(targets, (int,str,unicode_type)):
475 if isinstance(targets, (int,str,unicode_type)):
476 # only one target specified
476 # only one target specified
477 targets = [targets]
477 targets = [targets]
478 _targets = []
478 _targets = []
479 for t in targets:
479 for t in targets:
480 # map raw identities to ids
480 # map raw identities to ids
481 if isinstance(t, (str,unicode_type)):
481 if isinstance(t, (str,unicode_type)):
482 t = self.by_ident.get(cast_bytes(t), t)
482 t = self.by_ident.get(cast_bytes(t), t)
483 _targets.append(t)
483 _targets.append(t)
484 targets = _targets
484 targets = _targets
485 bad_targets = [ t for t in targets if t not in self.ids ]
485 bad_targets = [ t for t in targets if t not in self.ids ]
486 if bad_targets:
486 if bad_targets:
487 raise IndexError("No Such Engine: %r" % bad_targets)
487 raise IndexError("No Such Engine: %r" % bad_targets)
488 if not targets:
488 if not targets:
489 raise IndexError("No Engines Registered")
489 raise IndexError("No Engines Registered")
490 return targets
490 return targets
491
491
492 #-----------------------------------------------------------------------------
492 #-----------------------------------------------------------------------------
493 # dispatch methods (1 per stream)
493 # dispatch methods (1 per stream)
494 #-----------------------------------------------------------------------------
494 #-----------------------------------------------------------------------------
495
495
496
496
497 @util.log_errors
497 @util.log_errors
498 def dispatch_monitor_traffic(self, msg):
498 def dispatch_monitor_traffic(self, msg):
499 """all ME and Task queue messages come through here, as well as
499 """all ME and Task queue messages come through here, as well as
500 IOPub traffic."""
500 IOPub traffic."""
501 self.log.debug("monitor traffic: %r", msg[0])
501 self.log.debug("monitor traffic: %r", msg[0])
502 switch = msg[0]
502 switch = msg[0]
503 try:
503 try:
504 idents, msg = self.session.feed_identities(msg[1:])
504 idents, msg = self.session.feed_identities(msg[1:])
505 except ValueError:
505 except ValueError:
506 idents=[]
506 idents=[]
507 if not idents:
507 if not idents:
508 self.log.error("Monitor message without topic: %r", msg)
508 self.log.error("Monitor message without topic: %r", msg)
509 return
509 return
510 handler = self.monitor_handlers.get(switch, None)
510 handler = self.monitor_handlers.get(switch, None)
511 if handler is not None:
511 if handler is not None:
512 handler(idents, msg)
512 handler(idents, msg)
513 else:
513 else:
514 self.log.error("Unrecognized monitor topic: %r", switch)
514 self.log.error("Unrecognized monitor topic: %r", switch)
515
515
516
516
517 @util.log_errors
517 @util.log_errors
518 def dispatch_query(self, msg):
518 def dispatch_query(self, msg):
519 """Route registration requests and queries from clients."""
519 """Route registration requests and queries from clients."""
520 try:
520 try:
521 idents, msg = self.session.feed_identities(msg)
521 idents, msg = self.session.feed_identities(msg)
522 except ValueError:
522 except ValueError:
523 idents = []
523 idents = []
524 if not idents:
524 if not idents:
525 self.log.error("Bad Query Message: %r", msg)
525 self.log.error("Bad Query Message: %r", msg)
526 return
526 return
527 client_id = idents[0]
527 client_id = idents[0]
528 try:
528 try:
529 msg = self.session.unserialize(msg, content=True)
529 msg = self.session.unserialize(msg, content=True)
530 except Exception:
530 except Exception:
531 content = error.wrap_exception()
531 content = error.wrap_exception()
532 self.log.error("Bad Query Message: %r", msg, exc_info=True)
532 self.log.error("Bad Query Message: %r", msg, exc_info=True)
533 self.session.send(self.query, "hub_error", ident=client_id,
533 self.session.send(self.query, "hub_error", ident=client_id,
534 content=content)
534 content=content)
535 return
535 return
536 # print client_id, header, parent, content
536 # print client_id, header, parent, content
537 #switch on message type:
537 #switch on message type:
538 msg_type = msg['header']['msg_type']
538 msg_type = msg['header']['msg_type']
539 self.log.info("client::client %r requested %r", client_id, msg_type)
539 self.log.info("client::client %r requested %r", client_id, msg_type)
540 handler = self.query_handlers.get(msg_type, None)
540 handler = self.query_handlers.get(msg_type, None)
541 try:
541 try:
542 assert handler is not None, "Bad Message Type: %r" % msg_type
542 assert handler is not None, "Bad Message Type: %r" % msg_type
543 except:
543 except:
544 content = error.wrap_exception()
544 content = error.wrap_exception()
545 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
545 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
546 self.session.send(self.query, "hub_error", ident=client_id,
546 self.session.send(self.query, "hub_error", ident=client_id,
547 content=content)
547 content=content)
548 return
548 return
549
549
550 else:
550 else:
551 handler(idents, msg)
551 handler(idents, msg)
552
552
553 def dispatch_db(self, msg):
553 def dispatch_db(self, msg):
554 """"""
554 """"""
555 raise NotImplementedError
555 raise NotImplementedError
556
556
557 #---------------------------------------------------------------------------
557 #---------------------------------------------------------------------------
558 # handler methods (1 per event)
558 # handler methods (1 per event)
559 #---------------------------------------------------------------------------
559 #---------------------------------------------------------------------------
560
560
561 #----------------------- Heartbeat --------------------------------------
561 #----------------------- Heartbeat --------------------------------------
562
562
563 def handle_new_heart(self, heart):
563 def handle_new_heart(self, heart):
564 """handler to attach to heartbeater.
564 """handler to attach to heartbeater.
565 Called when a new heart starts to beat.
565 Called when a new heart starts to beat.
566 Triggers completion of registration."""
566 Triggers completion of registration."""
567 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
567 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
568 if heart not in self.incoming_registrations:
568 if heart not in self.incoming_registrations:
569 self.log.info("heartbeat::ignoring new heart: %r", heart)
569 self.log.info("heartbeat::ignoring new heart: %r", heart)
570 else:
570 else:
571 self.finish_registration(heart)
571 self.finish_registration(heart)
572
572
573
573
574 def handle_heart_failure(self, heart):
574 def handle_heart_failure(self, heart):
575 """handler to attach to heartbeater.
575 """handler to attach to heartbeater.
576 called when a previously registered heart fails to respond to beat request.
576 called when a previously registered heart fails to respond to beat request.
577 triggers unregistration"""
577 triggers unregistration"""
578 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
578 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
579 eid = self.hearts.get(heart, None)
579 eid = self.hearts.get(heart, None)
580 uuid = self.engines[eid].uuid
580 uuid = self.engines[eid].uuid
581 if eid is None or self.keytable[eid] in self.dead_engines:
581 if eid is None or self.keytable[eid] in self.dead_engines:
582 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
582 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
583 else:
583 else:
584 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
584 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
585
585
586 #----------------------- MUX Queue Traffic ------------------------------
586 #----------------------- MUX Queue Traffic ------------------------------
587
587
588 def save_queue_request(self, idents, msg):
588 def save_queue_request(self, idents, msg):
589 if len(idents) < 2:
589 if len(idents) < 2:
590 self.log.error("invalid identity prefix: %r", idents)
590 self.log.error("invalid identity prefix: %r", idents)
591 return
591 return
592 queue_id, client_id = idents[:2]
592 queue_id, client_id = idents[:2]
593 try:
593 try:
594 msg = self.session.unserialize(msg)
594 msg = self.session.unserialize(msg)
595 except Exception:
595 except Exception:
596 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
596 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
597 return
597 return
598
598
599 eid = self.by_ident.get(queue_id, None)
599 eid = self.by_ident.get(queue_id, None)
600 if eid is None:
600 if eid is None:
601 self.log.error("queue::target %r not registered", queue_id)
601 self.log.error("queue::target %r not registered", queue_id)
602 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
602 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
603 return
603 return
604 record = init_record(msg)
604 record = init_record(msg)
605 msg_id = record['msg_id']
605 msg_id = record['msg_id']
606 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
606 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
607 # Unicode in records
607 # Unicode in records
608 record['engine_uuid'] = queue_id.decode('ascii')
608 record['engine_uuid'] = queue_id.decode('ascii')
609 record['client_uuid'] = msg['header']['session']
609 record['client_uuid'] = msg['header']['session']
610 record['queue'] = 'mux'
610 record['queue'] = 'mux'
611
611
612 try:
612 try:
613 # it's posible iopub arrived first:
613 # it's posible iopub arrived first:
614 existing = self.db.get_record(msg_id)
614 existing = self.db.get_record(msg_id)
615 for key,evalue in iteritems(existing):
615 for key,evalue in iteritems(existing):
616 rvalue = record.get(key, None)
616 rvalue = record.get(key, None)
617 if evalue and rvalue and evalue != rvalue:
617 if evalue and rvalue and evalue != rvalue:
618 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
618 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
619 elif evalue and not rvalue:
619 elif evalue and not rvalue:
620 record[key] = evalue
620 record[key] = evalue
621 try:
621 try:
622 self.db.update_record(msg_id, record)
622 self.db.update_record(msg_id, record)
623 except Exception:
623 except Exception:
624 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
624 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
625 except KeyError:
625 except KeyError:
626 try:
626 try:
627 self.db.add_record(msg_id, record)
627 self.db.add_record(msg_id, record)
628 except Exception:
628 except Exception:
629 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
629 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
630
630
631
631
632 self.pending.add(msg_id)
632 self.pending.add(msg_id)
633 self.queues[eid].append(msg_id)
633 self.queues[eid].append(msg_id)
634
634
635 def save_queue_result(self, idents, msg):
635 def save_queue_result(self, idents, msg):
636 if len(idents) < 2:
636 if len(idents) < 2:
637 self.log.error("invalid identity prefix: %r", idents)
637 self.log.error("invalid identity prefix: %r", idents)
638 return
638 return
639
639
640 client_id, queue_id = idents[:2]
640 client_id, queue_id = idents[:2]
641 try:
641 try:
642 msg = self.session.unserialize(msg)
642 msg = self.session.unserialize(msg)
643 except Exception:
643 except Exception:
644 self.log.error("queue::engine %r sent invalid message to %r: %r",
644 self.log.error("queue::engine %r sent invalid message to %r: %r",
645 queue_id, client_id, msg, exc_info=True)
645 queue_id, client_id, msg, exc_info=True)
646 return
646 return
647
647
648 eid = self.by_ident.get(queue_id, None)
648 eid = self.by_ident.get(queue_id, None)
649 if eid is None:
649 if eid is None:
650 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
650 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
651 return
651 return
652
652
653 parent = msg['parent_header']
653 parent = msg['parent_header']
654 if not parent:
654 if not parent:
655 return
655 return
656 msg_id = parent['msg_id']
656 msg_id = parent['msg_id']
657 if msg_id in self.pending:
657 if msg_id in self.pending:
658 self.pending.remove(msg_id)
658 self.pending.remove(msg_id)
659 self.all_completed.add(msg_id)
659 self.all_completed.add(msg_id)
660 self.queues[eid].remove(msg_id)
660 self.queues[eid].remove(msg_id)
661 self.completed[eid].append(msg_id)
661 self.completed[eid].append(msg_id)
662 self.log.info("queue::request %r completed on %s", msg_id, eid)
662 self.log.info("queue::request %r completed on %s", msg_id, eid)
663 elif msg_id not in self.all_completed:
663 elif msg_id not in self.all_completed:
664 # it could be a result from a dead engine that died before delivering the
664 # it could be a result from a dead engine that died before delivering the
665 # result
665 # result
666 self.log.warn("queue:: unknown msg finished %r", msg_id)
666 self.log.warn("queue:: unknown msg finished %r", msg_id)
667 return
667 return
668 # update record anyway, because the unregistration could have been premature
668 # update record anyway, because the unregistration could have been premature
669 rheader = msg['header']
669 rheader = msg['header']
670 md = msg['metadata']
670 md = msg['metadata']
671 completed = rheader['date']
671 completed = rheader['date']
672 started = md.get('started', None)
672 started = extract_dates(md.get('started', None))
673 result = {
673 result = {
674 'result_header' : rheader,
674 'result_header' : rheader,
675 'result_metadata': md,
675 'result_metadata': md,
676 'result_content': msg['content'],
676 'result_content': msg['content'],
677 'received': datetime.now(),
677 'received': datetime.now(),
678 'started' : started,
678 'started' : started,
679 'completed' : completed
679 'completed' : completed
680 }
680 }
681
681
682 result['result_buffers'] = msg['buffers']
682 result['result_buffers'] = msg['buffers']
683 try:
683 try:
684 self.db.update_record(msg_id, result)
684 self.db.update_record(msg_id, result)
685 except Exception:
685 except Exception:
686 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
686 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
687
687
688
688
689 #--------------------- Task Queue Traffic ------------------------------
689 #--------------------- Task Queue Traffic ------------------------------
690
690
691 def save_task_request(self, idents, msg):
691 def save_task_request(self, idents, msg):
692 """Save the submission of a task."""
692 """Save the submission of a task."""
693 client_id = idents[0]
693 client_id = idents[0]
694
694
695 try:
695 try:
696 msg = self.session.unserialize(msg)
696 msg = self.session.unserialize(msg)
697 except Exception:
697 except Exception:
698 self.log.error("task::client %r sent invalid task message: %r",
698 self.log.error("task::client %r sent invalid task message: %r",
699 client_id, msg, exc_info=True)
699 client_id, msg, exc_info=True)
700 return
700 return
701 record = init_record(msg)
701 record = init_record(msg)
702
702
703 record['client_uuid'] = msg['header']['session']
703 record['client_uuid'] = msg['header']['session']
704 record['queue'] = 'task'
704 record['queue'] = 'task'
705 header = msg['header']
705 header = msg['header']
706 msg_id = header['msg_id']
706 msg_id = header['msg_id']
707 self.pending.add(msg_id)
707 self.pending.add(msg_id)
708 self.unassigned.add(msg_id)
708 self.unassigned.add(msg_id)
709 try:
709 try:
710 # it's posible iopub arrived first:
710 # it's posible iopub arrived first:
711 existing = self.db.get_record(msg_id)
711 existing = self.db.get_record(msg_id)
712 if existing['resubmitted']:
712 if existing['resubmitted']:
713 for key in ('submitted', 'client_uuid', 'buffers'):
713 for key in ('submitted', 'client_uuid', 'buffers'):
714 # don't clobber these keys on resubmit
714 # don't clobber these keys on resubmit
715 # submitted and client_uuid should be different
715 # submitted and client_uuid should be different
716 # and buffers might be big, and shouldn't have changed
716 # and buffers might be big, and shouldn't have changed
717 record.pop(key)
717 record.pop(key)
718 # still check content,header which should not change
718 # still check content,header which should not change
719 # but are not expensive to compare as buffers
719 # but are not expensive to compare as buffers
720
720
721 for key,evalue in iteritems(existing):
721 for key,evalue in iteritems(existing):
722 if key.endswith('buffers'):
722 if key.endswith('buffers'):
723 # don't compare buffers
723 # don't compare buffers
724 continue
724 continue
725 rvalue = record.get(key, None)
725 rvalue = record.get(key, None)
726 if evalue and rvalue and evalue != rvalue:
726 if evalue and rvalue and evalue != rvalue:
727 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
727 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
728 elif evalue and not rvalue:
728 elif evalue and not rvalue:
729 record[key] = evalue
729 record[key] = evalue
730 try:
730 try:
731 self.db.update_record(msg_id, record)
731 self.db.update_record(msg_id, record)
732 except Exception:
732 except Exception:
733 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
733 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
734 except KeyError:
734 except KeyError:
735 try:
735 try:
736 self.db.add_record(msg_id, record)
736 self.db.add_record(msg_id, record)
737 except Exception:
737 except Exception:
738 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
738 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
739 except Exception:
739 except Exception:
740 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
740 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
741
741
742 def save_task_result(self, idents, msg):
742 def save_task_result(self, idents, msg):
743 """save the result of a completed task."""
743 """save the result of a completed task."""
744 client_id = idents[0]
744 client_id = idents[0]
745 try:
745 try:
746 msg = self.session.unserialize(msg)
746 msg = self.session.unserialize(msg)
747 except Exception:
747 except Exception:
748 self.log.error("task::invalid task result message send to %r: %r",
748 self.log.error("task::invalid task result message send to %r: %r",
749 client_id, msg, exc_info=True)
749 client_id, msg, exc_info=True)
750 return
750 return
751
751
752 parent = msg['parent_header']
752 parent = msg['parent_header']
753 if not parent:
753 if not parent:
754 # print msg
754 # print msg
755 self.log.warn("Task %r had no parent!", msg)
755 self.log.warn("Task %r had no parent!", msg)
756 return
756 return
757 msg_id = parent['msg_id']
757 msg_id = parent['msg_id']
758 if msg_id in self.unassigned:
758 if msg_id in self.unassigned:
759 self.unassigned.remove(msg_id)
759 self.unassigned.remove(msg_id)
760
760
761 header = msg['header']
761 header = msg['header']
762 md = msg['metadata']
762 md = msg['metadata']
763 engine_uuid = md.get('engine', u'')
763 engine_uuid = md.get('engine', u'')
764 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
764 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
765
765
766 status = md.get('status', None)
766 status = md.get('status', None)
767
767
768 if msg_id in self.pending:
768 if msg_id in self.pending:
769 self.log.info("task::task %r finished on %s", msg_id, eid)
769 self.log.info("task::task %r finished on %s", msg_id, eid)
770 self.pending.remove(msg_id)
770 self.pending.remove(msg_id)
771 self.all_completed.add(msg_id)
771 self.all_completed.add(msg_id)
772 if eid is not None:
772 if eid is not None:
773 if status != 'aborted':
773 if status != 'aborted':
774 self.completed[eid].append(msg_id)
774 self.completed[eid].append(msg_id)
775 if msg_id in self.tasks[eid]:
775 if msg_id in self.tasks[eid]:
776 self.tasks[eid].remove(msg_id)
776 self.tasks[eid].remove(msg_id)
777 completed = header['date']
777 completed = header['date']
778 started = md.get('started', None)
778 started = extract_dates(md.get('started', None))
779 result = {
779 result = {
780 'result_header' : header,
780 'result_header' : header,
781 'result_metadata': msg['metadata'],
781 'result_metadata': msg['metadata'],
782 'result_content': msg['content'],
782 'result_content': msg['content'],
783 'started' : started,
783 'started' : started,
784 'completed' : completed,
784 'completed' : completed,
785 'received' : datetime.now(),
785 'received' : datetime.now(),
786 'engine_uuid': engine_uuid,
786 'engine_uuid': engine_uuid,
787 }
787 }
788
788
789 result['result_buffers'] = msg['buffers']
789 result['result_buffers'] = msg['buffers']
790 try:
790 try:
791 self.db.update_record(msg_id, result)
791 self.db.update_record(msg_id, result)
792 except Exception:
792 except Exception:
793 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
793 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
794
794
795 else:
795 else:
796 self.log.debug("task::unknown task %r finished", msg_id)
796 self.log.debug("task::unknown task %r finished", msg_id)
797
797
798 def save_task_destination(self, idents, msg):
798 def save_task_destination(self, idents, msg):
799 try:
799 try:
800 msg = self.session.unserialize(msg, content=True)
800 msg = self.session.unserialize(msg, content=True)
801 except Exception:
801 except Exception:
802 self.log.error("task::invalid task tracking message", exc_info=True)
802 self.log.error("task::invalid task tracking message", exc_info=True)
803 return
803 return
804 content = msg['content']
804 content = msg['content']
805 # print (content)
805 # print (content)
806 msg_id = content['msg_id']
806 msg_id = content['msg_id']
807 engine_uuid = content['engine_id']
807 engine_uuid = content['engine_id']
808 eid = self.by_ident[cast_bytes(engine_uuid)]
808 eid = self.by_ident[cast_bytes(engine_uuid)]
809
809
810 self.log.info("task::task %r arrived on %r", msg_id, eid)
810 self.log.info("task::task %r arrived on %r", msg_id, eid)
811 if msg_id in self.unassigned:
811 if msg_id in self.unassigned:
812 self.unassigned.remove(msg_id)
812 self.unassigned.remove(msg_id)
813 # else:
813 # else:
814 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
814 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
815
815
816 self.tasks[eid].append(msg_id)
816 self.tasks[eid].append(msg_id)
817 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
817 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
818 try:
818 try:
819 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
819 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
820 except Exception:
820 except Exception:
821 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
821 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
822
822
823
823
824 def mia_task_request(self, idents, msg):
824 def mia_task_request(self, idents, msg):
825 raise NotImplementedError
825 raise NotImplementedError
826 client_id = idents[0]
826 client_id = idents[0]
827 # content = dict(mia=self.mia,status='ok')
827 # content = dict(mia=self.mia,status='ok')
828 # self.session.send('mia_reply', content=content, idents=client_id)
828 # self.session.send('mia_reply', content=content, idents=client_id)
829
829
830
830
831 #--------------------- IOPub Traffic ------------------------------
831 #--------------------- IOPub Traffic ------------------------------
832
832
833 def save_iopub_message(self, topics, msg):
833 def save_iopub_message(self, topics, msg):
834 """save an iopub message into the db"""
834 """save an iopub message into the db"""
835 # print (topics)
835 # print (topics)
836 try:
836 try:
837 msg = self.session.unserialize(msg, content=True)
837 msg = self.session.unserialize(msg, content=True)
838 except Exception:
838 except Exception:
839 self.log.error("iopub::invalid IOPub message", exc_info=True)
839 self.log.error("iopub::invalid IOPub message", exc_info=True)
840 return
840 return
841
841
842 parent = msg['parent_header']
842 parent = msg['parent_header']
843 if not parent:
843 if not parent:
844 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
844 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
845 return
845 return
846 msg_id = parent['msg_id']
846 msg_id = parent['msg_id']
847 msg_type = msg['header']['msg_type']
847 msg_type = msg['header']['msg_type']
848 content = msg['content']
848 content = msg['content']
849
849
850 # ensure msg_id is in db
850 # ensure msg_id is in db
851 try:
851 try:
852 rec = self.db.get_record(msg_id)
852 rec = self.db.get_record(msg_id)
853 except KeyError:
853 except KeyError:
854 rec = empty_record()
854 rec = empty_record()
855 rec['msg_id'] = msg_id
855 rec['msg_id'] = msg_id
856 self.db.add_record(msg_id, rec)
856 self.db.add_record(msg_id, rec)
857 # stream
857 # stream
858 d = {}
858 d = {}
859 if msg_type == 'stream':
859 if msg_type == 'stream':
860 name = content['name']
860 name = content['name']
861 s = rec[name] or ''
861 s = rec[name] or ''
862 d[name] = s + content['data']
862 d[name] = s + content['data']
863
863
864 elif msg_type == 'pyerr':
864 elif msg_type == 'pyerr':
865 d['pyerr'] = content
865 d['pyerr'] = content
866 elif msg_type == 'pyin':
866 elif msg_type == 'pyin':
867 d['pyin'] = content['code']
867 d['pyin'] = content['code']
868 elif msg_type in ('display_data', 'pyout'):
868 elif msg_type in ('display_data', 'pyout'):
869 d[msg_type] = content
869 d[msg_type] = content
870 elif msg_type == 'status':
870 elif msg_type == 'status':
871 pass
871 pass
872 elif msg_type == 'data_pub':
872 elif msg_type == 'data_pub':
873 self.log.info("ignored data_pub message for %s" % msg_id)
873 self.log.info("ignored data_pub message for %s" % msg_id)
874 else:
874 else:
875 self.log.warn("unhandled iopub msg_type: %r", msg_type)
875 self.log.warn("unhandled iopub msg_type: %r", msg_type)
876
876
877 if not d:
877 if not d:
878 return
878 return
879
879
880 try:
880 try:
881 self.db.update_record(msg_id, d)
881 self.db.update_record(msg_id, d)
882 except Exception:
882 except Exception:
883 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
883 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
884
884
885
885
886
886
887 #-------------------------------------------------------------------------
887 #-------------------------------------------------------------------------
888 # Registration requests
888 # Registration requests
889 #-------------------------------------------------------------------------
889 #-------------------------------------------------------------------------
890
890
891 def connection_request(self, client_id, msg):
891 def connection_request(self, client_id, msg):
892 """Reply with connection addresses for clients."""
892 """Reply with connection addresses for clients."""
893 self.log.info("client::client %r connected", client_id)
893 self.log.info("client::client %r connected", client_id)
894 content = dict(status='ok')
894 content = dict(status='ok')
895 jsonable = {}
895 jsonable = {}
896 for k,v in iteritems(self.keytable):
896 for k,v in iteritems(self.keytable):
897 if v not in self.dead_engines:
897 if v not in self.dead_engines:
898 jsonable[str(k)] = v
898 jsonable[str(k)] = v
899 content['engines'] = jsonable
899 content['engines'] = jsonable
900 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
900 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
901
901
902 def register_engine(self, reg, msg):
902 def register_engine(self, reg, msg):
903 """Register a new engine."""
903 """Register a new engine."""
904 content = msg['content']
904 content = msg['content']
905 try:
905 try:
906 uuid = content['uuid']
906 uuid = content['uuid']
907 except KeyError:
907 except KeyError:
908 self.log.error("registration::queue not specified", exc_info=True)
908 self.log.error("registration::queue not specified", exc_info=True)
909 return
909 return
910
910
911 eid = self._next_id
911 eid = self._next_id
912
912
913 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
913 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
914
914
915 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
915 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
916 # check if requesting available IDs:
916 # check if requesting available IDs:
917 if cast_bytes(uuid) in self.by_ident:
917 if cast_bytes(uuid) in self.by_ident:
918 try:
918 try:
919 raise KeyError("uuid %r in use" % uuid)
919 raise KeyError("uuid %r in use" % uuid)
920 except:
920 except:
921 content = error.wrap_exception()
921 content = error.wrap_exception()
922 self.log.error("uuid %r in use", uuid, exc_info=True)
922 self.log.error("uuid %r in use", uuid, exc_info=True)
923 else:
923 else:
924 for h, ec in iteritems(self.incoming_registrations):
924 for h, ec in iteritems(self.incoming_registrations):
925 if uuid == h:
925 if uuid == h:
926 try:
926 try:
927 raise KeyError("heart_id %r in use" % uuid)
927 raise KeyError("heart_id %r in use" % uuid)
928 except:
928 except:
929 self.log.error("heart_id %r in use", uuid, exc_info=True)
929 self.log.error("heart_id %r in use", uuid, exc_info=True)
930 content = error.wrap_exception()
930 content = error.wrap_exception()
931 break
931 break
932 elif uuid == ec.uuid:
932 elif uuid == ec.uuid:
933 try:
933 try:
934 raise KeyError("uuid %r in use" % uuid)
934 raise KeyError("uuid %r in use" % uuid)
935 except:
935 except:
936 self.log.error("uuid %r in use", uuid, exc_info=True)
936 self.log.error("uuid %r in use", uuid, exc_info=True)
937 content = error.wrap_exception()
937 content = error.wrap_exception()
938 break
938 break
939
939
940 msg = self.session.send(self.query, "registration_reply",
940 msg = self.session.send(self.query, "registration_reply",
941 content=content,
941 content=content,
942 ident=reg)
942 ident=reg)
943
943
944 heart = cast_bytes(uuid)
944 heart = cast_bytes(uuid)
945
945
946 if content['status'] == 'ok':
946 if content['status'] == 'ok':
947 if heart in self.heartmonitor.hearts:
947 if heart in self.heartmonitor.hearts:
948 # already beating
948 # already beating
949 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
949 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
950 self.finish_registration(heart)
950 self.finish_registration(heart)
951 else:
951 else:
952 purge = lambda : self._purge_stalled_registration(heart)
952 purge = lambda : self._purge_stalled_registration(heart)
953 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
953 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
954 dc.start()
954 dc.start()
955 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
955 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
956 else:
956 else:
957 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
957 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
958
958
959 return eid
959 return eid
960
960
961 def unregister_engine(self, ident, msg):
961 def unregister_engine(self, ident, msg):
962 """Unregister an engine that explicitly requested to leave."""
962 """Unregister an engine that explicitly requested to leave."""
963 try:
963 try:
964 eid = msg['content']['id']
964 eid = msg['content']['id']
965 except:
965 except:
966 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
966 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
967 return
967 return
968 self.log.info("registration::unregister_engine(%r)", eid)
968 self.log.info("registration::unregister_engine(%r)", eid)
969 # print (eid)
969 # print (eid)
970 uuid = self.keytable[eid]
970 uuid = self.keytable[eid]
971 content=dict(id=eid, uuid=uuid)
971 content=dict(id=eid, uuid=uuid)
972 self.dead_engines.add(uuid)
972 self.dead_engines.add(uuid)
973 # self.ids.remove(eid)
973 # self.ids.remove(eid)
974 # uuid = self.keytable.pop(eid)
974 # uuid = self.keytable.pop(eid)
975 #
975 #
976 # ec = self.engines.pop(eid)
976 # ec = self.engines.pop(eid)
977 # self.hearts.pop(ec.heartbeat)
977 # self.hearts.pop(ec.heartbeat)
978 # self.by_ident.pop(ec.queue)
978 # self.by_ident.pop(ec.queue)
979 # self.completed.pop(eid)
979 # self.completed.pop(eid)
980 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
980 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
981 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
981 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
982 dc.start()
982 dc.start()
983 ############## TODO: HANDLE IT ################
983 ############## TODO: HANDLE IT ################
984
984
985 self._save_engine_state()
985 self._save_engine_state()
986
986
987 if self.notifier:
987 if self.notifier:
988 self.session.send(self.notifier, "unregistration_notification", content=content)
988 self.session.send(self.notifier, "unregistration_notification", content=content)
989
989
990 def _handle_stranded_msgs(self, eid, uuid):
990 def _handle_stranded_msgs(self, eid, uuid):
991 """Handle messages known to be on an engine when the engine unregisters.
991 """Handle messages known to be on an engine when the engine unregisters.
992
992
993 It is possible that this will fire prematurely - that is, an engine will
993 It is possible that this will fire prematurely - that is, an engine will
994 go down after completing a result, and the client will be notified
994 go down after completing a result, and the client will be notified
995 that the result failed and later receive the actual result.
995 that the result failed and later receive the actual result.
996 """
996 """
997
997
998 outstanding = self.queues[eid]
998 outstanding = self.queues[eid]
999
999
1000 for msg_id in outstanding:
1000 for msg_id in outstanding:
1001 self.pending.remove(msg_id)
1001 self.pending.remove(msg_id)
1002 self.all_completed.add(msg_id)
1002 self.all_completed.add(msg_id)
1003 try:
1003 try:
1004 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
1004 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
1005 except:
1005 except:
1006 content = error.wrap_exception()
1006 content = error.wrap_exception()
1007 # build a fake header:
1007 # build a fake header:
1008 header = {}
1008 header = {}
1009 header['engine'] = uuid
1009 header['engine'] = uuid
1010 header['date'] = datetime.now()
1010 header['date'] = datetime.now()
1011 rec = dict(result_content=content, result_header=header, result_buffers=[])
1011 rec = dict(result_content=content, result_header=header, result_buffers=[])
1012 rec['completed'] = header['date']
1012 rec['completed'] = header['date']
1013 rec['engine_uuid'] = uuid
1013 rec['engine_uuid'] = uuid
1014 try:
1014 try:
1015 self.db.update_record(msg_id, rec)
1015 self.db.update_record(msg_id, rec)
1016 except Exception:
1016 except Exception:
1017 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1017 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1018
1018
1019
1019
1020 def finish_registration(self, heart):
1020 def finish_registration(self, heart):
1021 """Second half of engine registration, called after our HeartMonitor
1021 """Second half of engine registration, called after our HeartMonitor
1022 has received a beat from the Engine's Heart."""
1022 has received a beat from the Engine's Heart."""
1023 try:
1023 try:
1024 ec = self.incoming_registrations.pop(heart)
1024 ec = self.incoming_registrations.pop(heart)
1025 except KeyError:
1025 except KeyError:
1026 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1026 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1027 return
1027 return
1028 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1028 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1029 if ec.stallback is not None:
1029 if ec.stallback is not None:
1030 ec.stallback.stop()
1030 ec.stallback.stop()
1031 eid = ec.id
1031 eid = ec.id
1032 self.ids.add(eid)
1032 self.ids.add(eid)
1033 self.keytable[eid] = ec.uuid
1033 self.keytable[eid] = ec.uuid
1034 self.engines[eid] = ec
1034 self.engines[eid] = ec
1035 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1035 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1036 self.queues[eid] = list()
1036 self.queues[eid] = list()
1037 self.tasks[eid] = list()
1037 self.tasks[eid] = list()
1038 self.completed[eid] = list()
1038 self.completed[eid] = list()
1039 self.hearts[heart] = eid
1039 self.hearts[heart] = eid
1040 content = dict(id=eid, uuid=self.engines[eid].uuid)
1040 content = dict(id=eid, uuid=self.engines[eid].uuid)
1041 if self.notifier:
1041 if self.notifier:
1042 self.session.send(self.notifier, "registration_notification", content=content)
1042 self.session.send(self.notifier, "registration_notification", content=content)
1043 self.log.info("engine::Engine Connected: %i", eid)
1043 self.log.info("engine::Engine Connected: %i", eid)
1044
1044
1045 self._save_engine_state()
1045 self._save_engine_state()
1046
1046
1047 def _purge_stalled_registration(self, heart):
1047 def _purge_stalled_registration(self, heart):
1048 if heart in self.incoming_registrations:
1048 if heart in self.incoming_registrations:
1049 ec = self.incoming_registrations.pop(heart)
1049 ec = self.incoming_registrations.pop(heart)
1050 self.log.info("registration::purging stalled registration: %i", ec.id)
1050 self.log.info("registration::purging stalled registration: %i", ec.id)
1051 else:
1051 else:
1052 pass
1052 pass
1053
1053
1054 #-------------------------------------------------------------------------
1054 #-------------------------------------------------------------------------
1055 # Engine State
1055 # Engine State
1056 #-------------------------------------------------------------------------
1056 #-------------------------------------------------------------------------
1057
1057
1058
1058
1059 def _cleanup_engine_state_file(self):
1059 def _cleanup_engine_state_file(self):
1060 """cleanup engine state mapping"""
1060 """cleanup engine state mapping"""
1061
1061
1062 if os.path.exists(self.engine_state_file):
1062 if os.path.exists(self.engine_state_file):
1063 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1063 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1064 try:
1064 try:
1065 os.remove(self.engine_state_file)
1065 os.remove(self.engine_state_file)
1066 except IOError:
1066 except IOError:
1067 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1067 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1068
1068
1069
1069
1070 def _save_engine_state(self):
1070 def _save_engine_state(self):
1071 """save engine mapping to JSON file"""
1071 """save engine mapping to JSON file"""
1072 if not self.engine_state_file:
1072 if not self.engine_state_file:
1073 return
1073 return
1074 self.log.debug("save engine state to %s" % self.engine_state_file)
1074 self.log.debug("save engine state to %s" % self.engine_state_file)
1075 state = {}
1075 state = {}
1076 engines = {}
1076 engines = {}
1077 for eid, ec in iteritems(self.engines):
1077 for eid, ec in iteritems(self.engines):
1078 if ec.uuid not in self.dead_engines:
1078 if ec.uuid not in self.dead_engines:
1079 engines[eid] = ec.uuid
1079 engines[eid] = ec.uuid
1080
1080
1081 state['engines'] = engines
1081 state['engines'] = engines
1082
1082
1083 state['next_id'] = self._idcounter
1083 state['next_id'] = self._idcounter
1084
1084
1085 with open(self.engine_state_file, 'w') as f:
1085 with open(self.engine_state_file, 'w') as f:
1086 json.dump(state, f)
1086 json.dump(state, f)
1087
1087
1088
1088
1089 def _load_engine_state(self):
1089 def _load_engine_state(self):
1090 """load engine mapping from JSON file"""
1090 """load engine mapping from JSON file"""
1091 if not os.path.exists(self.engine_state_file):
1091 if not os.path.exists(self.engine_state_file):
1092 return
1092 return
1093
1093
1094 self.log.info("loading engine state from %s" % self.engine_state_file)
1094 self.log.info("loading engine state from %s" % self.engine_state_file)
1095
1095
1096 with open(self.engine_state_file) as f:
1096 with open(self.engine_state_file) as f:
1097 state = json.load(f)
1097 state = json.load(f)
1098
1098
1099 save_notifier = self.notifier
1099 save_notifier = self.notifier
1100 self.notifier = None
1100 self.notifier = None
1101 for eid, uuid in iteritems(state['engines']):
1101 for eid, uuid in iteritems(state['engines']):
1102 heart = uuid.encode('ascii')
1102 heart = uuid.encode('ascii')
1103 # start with this heart as current and beating:
1103 # start with this heart as current and beating:
1104 self.heartmonitor.responses.add(heart)
1104 self.heartmonitor.responses.add(heart)
1105 self.heartmonitor.hearts.add(heart)
1105 self.heartmonitor.hearts.add(heart)
1106
1106
1107 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1107 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1108 self.finish_registration(heart)
1108 self.finish_registration(heart)
1109
1109
1110 self.notifier = save_notifier
1110 self.notifier = save_notifier
1111
1111
1112 self._idcounter = state['next_id']
1112 self._idcounter = state['next_id']
1113
1113
1114 #-------------------------------------------------------------------------
1114 #-------------------------------------------------------------------------
1115 # Client Requests
1115 # Client Requests
1116 #-------------------------------------------------------------------------
1116 #-------------------------------------------------------------------------
1117
1117
1118 def shutdown_request(self, client_id, msg):
1118 def shutdown_request(self, client_id, msg):
1119 """handle shutdown request."""
1119 """handle shutdown request."""
1120 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1120 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1121 # also notify other clients of shutdown
1121 # also notify other clients of shutdown
1122 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1122 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1123 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1123 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1124 dc.start()
1124 dc.start()
1125
1125
1126 def _shutdown(self):
1126 def _shutdown(self):
1127 self.log.info("hub::hub shutting down.")
1127 self.log.info("hub::hub shutting down.")
1128 time.sleep(0.1)
1128 time.sleep(0.1)
1129 sys.exit(0)
1129 sys.exit(0)
1130
1130
1131
1131
1132 def check_load(self, client_id, msg):
1132 def check_load(self, client_id, msg):
1133 content = msg['content']
1133 content = msg['content']
1134 try:
1134 try:
1135 targets = content['targets']
1135 targets = content['targets']
1136 targets = self._validate_targets(targets)
1136 targets = self._validate_targets(targets)
1137 except:
1137 except:
1138 content = error.wrap_exception()
1138 content = error.wrap_exception()
1139 self.session.send(self.query, "hub_error",
1139 self.session.send(self.query, "hub_error",
1140 content=content, ident=client_id)
1140 content=content, ident=client_id)
1141 return
1141 return
1142
1142
1143 content = dict(status='ok')
1143 content = dict(status='ok')
1144 # loads = {}
1144 # loads = {}
1145 for t in targets:
1145 for t in targets:
1146 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1146 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1147 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1147 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1148
1148
1149
1149
1150 def queue_status(self, client_id, msg):
1150 def queue_status(self, client_id, msg):
1151 """Return the Queue status of one or more targets.
1151 """Return the Queue status of one or more targets.
1152
1152
1153 If verbose, return the msg_ids, else return len of each type.
1153 If verbose, return the msg_ids, else return len of each type.
1154
1154
1155 Keys:
1155 Keys:
1156
1156
1157 * queue (pending MUX jobs)
1157 * queue (pending MUX jobs)
1158 * tasks (pending Task jobs)
1158 * tasks (pending Task jobs)
1159 * completed (finished jobs from both queues)
1159 * completed (finished jobs from both queues)
1160 """
1160 """
1161 content = msg['content']
1161 content = msg['content']
1162 targets = content['targets']
1162 targets = content['targets']
1163 try:
1163 try:
1164 targets = self._validate_targets(targets)
1164 targets = self._validate_targets(targets)
1165 except:
1165 except:
1166 content = error.wrap_exception()
1166 content = error.wrap_exception()
1167 self.session.send(self.query, "hub_error",
1167 self.session.send(self.query, "hub_error",
1168 content=content, ident=client_id)
1168 content=content, ident=client_id)
1169 return
1169 return
1170 verbose = content.get('verbose', False)
1170 verbose = content.get('verbose', False)
1171 content = dict(status='ok')
1171 content = dict(status='ok')
1172 for t in targets:
1172 for t in targets:
1173 queue = self.queues[t]
1173 queue = self.queues[t]
1174 completed = self.completed[t]
1174 completed = self.completed[t]
1175 tasks = self.tasks[t]
1175 tasks = self.tasks[t]
1176 if not verbose:
1176 if not verbose:
1177 queue = len(queue)
1177 queue = len(queue)
1178 completed = len(completed)
1178 completed = len(completed)
1179 tasks = len(tasks)
1179 tasks = len(tasks)
1180 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1180 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1181 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1181 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1182 # print (content)
1182 # print (content)
1183 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1183 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1184
1184
1185 def purge_results(self, client_id, msg):
1185 def purge_results(self, client_id, msg):
1186 """Purge results from memory. This method is more valuable before we move
1186 """Purge results from memory. This method is more valuable before we move
1187 to a DB based message storage mechanism."""
1187 to a DB based message storage mechanism."""
1188 content = msg['content']
1188 content = msg['content']
1189 self.log.info("Dropping records with %s", content)
1189 self.log.info("Dropping records with %s", content)
1190 msg_ids = content.get('msg_ids', [])
1190 msg_ids = content.get('msg_ids', [])
1191 reply = dict(status='ok')
1191 reply = dict(status='ok')
1192 if msg_ids == 'all':
1192 if msg_ids == 'all':
1193 try:
1193 try:
1194 self.db.drop_matching_records(dict(completed={'$ne':None}))
1194 self.db.drop_matching_records(dict(completed={'$ne':None}))
1195 except Exception:
1195 except Exception:
1196 reply = error.wrap_exception()
1196 reply = error.wrap_exception()
1197 else:
1197 else:
1198 pending = [m for m in msg_ids if (m in self.pending)]
1198 pending = [m for m in msg_ids if (m in self.pending)]
1199 if pending:
1199 if pending:
1200 try:
1200 try:
1201 raise IndexError("msg pending: %r" % pending[0])
1201 raise IndexError("msg pending: %r" % pending[0])
1202 except:
1202 except:
1203 reply = error.wrap_exception()
1203 reply = error.wrap_exception()
1204 else:
1204 else:
1205 try:
1205 try:
1206 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1206 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1207 except Exception:
1207 except Exception:
1208 reply = error.wrap_exception()
1208 reply = error.wrap_exception()
1209
1209
1210 if reply['status'] == 'ok':
1210 if reply['status'] == 'ok':
1211 eids = content.get('engine_ids', [])
1211 eids = content.get('engine_ids', [])
1212 for eid in eids:
1212 for eid in eids:
1213 if eid not in self.engines:
1213 if eid not in self.engines:
1214 try:
1214 try:
1215 raise IndexError("No such engine: %i" % eid)
1215 raise IndexError("No such engine: %i" % eid)
1216 except:
1216 except:
1217 reply = error.wrap_exception()
1217 reply = error.wrap_exception()
1218 break
1218 break
1219 uid = self.engines[eid].uuid
1219 uid = self.engines[eid].uuid
1220 try:
1220 try:
1221 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1221 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1222 except Exception:
1222 except Exception:
1223 reply = error.wrap_exception()
1223 reply = error.wrap_exception()
1224 break
1224 break
1225
1225
1226 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1226 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1227
1227
1228 def resubmit_task(self, client_id, msg):
1228 def resubmit_task(self, client_id, msg):
1229 """Resubmit one or more tasks."""
1229 """Resubmit one or more tasks."""
1230 def finish(reply):
1230 def finish(reply):
1231 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1231 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1232
1232
1233 content = msg['content']
1233 content = msg['content']
1234 msg_ids = content['msg_ids']
1234 msg_ids = content['msg_ids']
1235 reply = dict(status='ok')
1235 reply = dict(status='ok')
1236 try:
1236 try:
1237 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1237 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1238 'header', 'content', 'buffers'])
1238 'header', 'content', 'buffers'])
1239 except Exception:
1239 except Exception:
1240 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1240 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1241 return finish(error.wrap_exception())
1241 return finish(error.wrap_exception())
1242
1242
1243 # validate msg_ids
1243 # validate msg_ids
1244 found_ids = [ rec['msg_id'] for rec in records ]
1244 found_ids = [ rec['msg_id'] for rec in records ]
1245 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1245 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1246 if len(records) > len(msg_ids):
1246 if len(records) > len(msg_ids):
1247 try:
1247 try:
1248 raise RuntimeError("DB appears to be in an inconsistent state."
1248 raise RuntimeError("DB appears to be in an inconsistent state."
1249 "More matching records were found than should exist")
1249 "More matching records were found than should exist")
1250 except Exception:
1250 except Exception:
1251 return finish(error.wrap_exception())
1251 return finish(error.wrap_exception())
1252 elif len(records) < len(msg_ids):
1252 elif len(records) < len(msg_ids):
1253 missing = [ m for m in msg_ids if m not in found_ids ]
1253 missing = [ m for m in msg_ids if m not in found_ids ]
1254 try:
1254 try:
1255 raise KeyError("No such msg(s): %r" % missing)
1255 raise KeyError("No such msg(s): %r" % missing)
1256 except KeyError:
1256 except KeyError:
1257 return finish(error.wrap_exception())
1257 return finish(error.wrap_exception())
1258 elif pending_ids:
1258 elif pending_ids:
1259 pass
1259 pass
1260 # no need to raise on resubmit of pending task, now that we
1260 # no need to raise on resubmit of pending task, now that we
1261 # resubmit under new ID, but do we want to raise anyway?
1261 # resubmit under new ID, but do we want to raise anyway?
1262 # msg_id = invalid_ids[0]
1262 # msg_id = invalid_ids[0]
1263 # try:
1263 # try:
1264 # raise ValueError("Task(s) %r appears to be inflight" % )
1264 # raise ValueError("Task(s) %r appears to be inflight" % )
1265 # except Exception:
1265 # except Exception:
1266 # return finish(error.wrap_exception())
1266 # return finish(error.wrap_exception())
1267
1267
1268 # mapping of original IDs to resubmitted IDs
1268 # mapping of original IDs to resubmitted IDs
1269 resubmitted = {}
1269 resubmitted = {}
1270
1270
1271 # send the messages
1271 # send the messages
1272 for rec in records:
1272 for rec in records:
1273 header = rec['header']
1273 header = rec['header']
1274 msg = self.session.msg(header['msg_type'], parent=header)
1274 msg = self.session.msg(header['msg_type'], parent=header)
1275 msg_id = msg['msg_id']
1275 msg_id = msg['msg_id']
1276 msg['content'] = rec['content']
1276 msg['content'] = rec['content']
1277
1277
1278 # use the old header, but update msg_id and timestamp
1278 # use the old header, but update msg_id and timestamp
1279 fresh = msg['header']
1279 fresh = msg['header']
1280 header['msg_id'] = fresh['msg_id']
1280 header['msg_id'] = fresh['msg_id']
1281 header['date'] = fresh['date']
1281 header['date'] = fresh['date']
1282 msg['header'] = header
1282 msg['header'] = header
1283
1283
1284 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1284 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1285
1285
1286 resubmitted[rec['msg_id']] = msg_id
1286 resubmitted[rec['msg_id']] = msg_id
1287 self.pending.add(msg_id)
1287 self.pending.add(msg_id)
1288 msg['buffers'] = rec['buffers']
1288 msg['buffers'] = rec['buffers']
1289 try:
1289 try:
1290 self.db.add_record(msg_id, init_record(msg))
1290 self.db.add_record(msg_id, init_record(msg))
1291 except Exception:
1291 except Exception:
1292 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1292 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1293 return finish(error.wrap_exception())
1293 return finish(error.wrap_exception())
1294
1294
1295 finish(dict(status='ok', resubmitted=resubmitted))
1295 finish(dict(status='ok', resubmitted=resubmitted))
1296
1296
1297 # store the new IDs in the Task DB
1297 # store the new IDs in the Task DB
1298 for msg_id, resubmit_id in iteritems(resubmitted):
1298 for msg_id, resubmit_id in iteritems(resubmitted):
1299 try:
1299 try:
1300 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1300 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1301 except Exception:
1301 except Exception:
1302 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1302 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1303
1303
1304
1304
1305 def _extract_record(self, rec):
1305 def _extract_record(self, rec):
1306 """decompose a TaskRecord dict into subsection of reply for get_result"""
1306 """decompose a TaskRecord dict into subsection of reply for get_result"""
1307 io_dict = {}
1307 io_dict = {}
1308 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1308 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1309 io_dict[key] = rec[key]
1309 io_dict[key] = rec[key]
1310 content = {
1310 content = {
1311 'header': rec['header'],
1311 'header': rec['header'],
1312 'metadata': rec['metadata'],
1312 'metadata': rec['metadata'],
1313 'result_metadata': rec['result_metadata'],
1313 'result_metadata': rec['result_metadata'],
1314 'result_header' : rec['result_header'],
1314 'result_header' : rec['result_header'],
1315 'result_content': rec['result_content'],
1315 'result_content': rec['result_content'],
1316 'received' : rec['received'],
1316 'received' : rec['received'],
1317 'io' : io_dict,
1317 'io' : io_dict,
1318 }
1318 }
1319 if rec['result_buffers']:
1319 if rec['result_buffers']:
1320 buffers = list(map(bytes, rec['result_buffers']))
1320 buffers = list(map(bytes, rec['result_buffers']))
1321 else:
1321 else:
1322 buffers = []
1322 buffers = []
1323
1323
1324 return content, buffers
1324 return content, buffers
1325
1325
1326 def get_results(self, client_id, msg):
1326 def get_results(self, client_id, msg):
1327 """Get the result of 1 or more messages."""
1327 """Get the result of 1 or more messages."""
1328 content = msg['content']
1328 content = msg['content']
1329 msg_ids = sorted(set(content['msg_ids']))
1329 msg_ids = sorted(set(content['msg_ids']))
1330 statusonly = content.get('status_only', False)
1330 statusonly = content.get('status_only', False)
1331 pending = []
1331 pending = []
1332 completed = []
1332 completed = []
1333 content = dict(status='ok')
1333 content = dict(status='ok')
1334 content['pending'] = pending
1334 content['pending'] = pending
1335 content['completed'] = completed
1335 content['completed'] = completed
1336 buffers = []
1336 buffers = []
1337 if not statusonly:
1337 if not statusonly:
1338 try:
1338 try:
1339 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1339 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1340 # turn match list into dict, for faster lookup
1340 # turn match list into dict, for faster lookup
1341 records = {}
1341 records = {}
1342 for rec in matches:
1342 for rec in matches:
1343 records[rec['msg_id']] = rec
1343 records[rec['msg_id']] = rec
1344 except Exception:
1344 except Exception:
1345 content = error.wrap_exception()
1345 content = error.wrap_exception()
1346 self.session.send(self.query, "result_reply", content=content,
1346 self.session.send(self.query, "result_reply", content=content,
1347 parent=msg, ident=client_id)
1347 parent=msg, ident=client_id)
1348 return
1348 return
1349 else:
1349 else:
1350 records = {}
1350 records = {}
1351 for msg_id in msg_ids:
1351 for msg_id in msg_ids:
1352 if msg_id in self.pending:
1352 if msg_id in self.pending:
1353 pending.append(msg_id)
1353 pending.append(msg_id)
1354 elif msg_id in self.all_completed:
1354 elif msg_id in self.all_completed:
1355 completed.append(msg_id)
1355 completed.append(msg_id)
1356 if not statusonly:
1356 if not statusonly:
1357 c,bufs = self._extract_record(records[msg_id])
1357 c,bufs = self._extract_record(records[msg_id])
1358 content[msg_id] = c
1358 content[msg_id] = c
1359 buffers.extend(bufs)
1359 buffers.extend(bufs)
1360 elif msg_id in records:
1360 elif msg_id in records:
1361 if rec['completed']:
1361 if rec['completed']:
1362 completed.append(msg_id)
1362 completed.append(msg_id)
1363 c,bufs = self._extract_record(records[msg_id])
1363 c,bufs = self._extract_record(records[msg_id])
1364 content[msg_id] = c
1364 content[msg_id] = c
1365 buffers.extend(bufs)
1365 buffers.extend(bufs)
1366 else:
1366 else:
1367 pending.append(msg_id)
1367 pending.append(msg_id)
1368 else:
1368 else:
1369 try:
1369 try:
1370 raise KeyError('No such message: '+msg_id)
1370 raise KeyError('No such message: '+msg_id)
1371 except:
1371 except:
1372 content = error.wrap_exception()
1372 content = error.wrap_exception()
1373 break
1373 break
1374 self.session.send(self.query, "result_reply", content=content,
1374 self.session.send(self.query, "result_reply", content=content,
1375 parent=msg, ident=client_id,
1375 parent=msg, ident=client_id,
1376 buffers=buffers)
1376 buffers=buffers)
1377
1377
1378 def get_history(self, client_id, msg):
1378 def get_history(self, client_id, msg):
1379 """Get a list of all msg_ids in our DB records"""
1379 """Get a list of all msg_ids in our DB records"""
1380 try:
1380 try:
1381 msg_ids = self.db.get_history()
1381 msg_ids = self.db.get_history()
1382 except Exception as e:
1382 except Exception as e:
1383 content = error.wrap_exception()
1383 content = error.wrap_exception()
1384 else:
1384 else:
1385 content = dict(status='ok', history=msg_ids)
1385 content = dict(status='ok', history=msg_ids)
1386
1386
1387 self.session.send(self.query, "history_reply", content=content,
1387 self.session.send(self.query, "history_reply", content=content,
1388 parent=msg, ident=client_id)
1388 parent=msg, ident=client_id)
1389
1389
1390 def db_query(self, client_id, msg):
1390 def db_query(self, client_id, msg):
1391 """Perform a raw query on the task record database."""
1391 """Perform a raw query on the task record database."""
1392 content = msg['content']
1392 content = msg['content']
1393 query = extract_dates(content.get('query', {}))
1393 query = extract_dates(content.get('query', {}))
1394 keys = content.get('keys', None)
1394 keys = content.get('keys', None)
1395 buffers = []
1395 buffers = []
1396 empty = list()
1396 empty = list()
1397 try:
1397 try:
1398 records = self.db.find_records(query, keys)
1398 records = self.db.find_records(query, keys)
1399 except Exception as e:
1399 except Exception as e:
1400 content = error.wrap_exception()
1400 content = error.wrap_exception()
1401 else:
1401 else:
1402 # extract buffers from reply content:
1402 # extract buffers from reply content:
1403 if keys is not None:
1403 if keys is not None:
1404 buffer_lens = [] if 'buffers' in keys else None
1404 buffer_lens = [] if 'buffers' in keys else None
1405 result_buffer_lens = [] if 'result_buffers' in keys else None
1405 result_buffer_lens = [] if 'result_buffers' in keys else None
1406 else:
1406 else:
1407 buffer_lens = None
1407 buffer_lens = None
1408 result_buffer_lens = None
1408 result_buffer_lens = None
1409
1409
1410 for rec in records:
1410 for rec in records:
1411 # buffers may be None, so double check
1411 # buffers may be None, so double check
1412 b = rec.pop('buffers', empty) or empty
1412 b = rec.pop('buffers', empty) or empty
1413 if buffer_lens is not None:
1413 if buffer_lens is not None:
1414 buffer_lens.append(len(b))
1414 buffer_lens.append(len(b))
1415 buffers.extend(b)
1415 buffers.extend(b)
1416 rb = rec.pop('result_buffers', empty) or empty
1416 rb = rec.pop('result_buffers', empty) or empty
1417 if result_buffer_lens is not None:
1417 if result_buffer_lens is not None:
1418 result_buffer_lens.append(len(rb))
1418 result_buffer_lens.append(len(rb))
1419 buffers.extend(rb)
1419 buffers.extend(rb)
1420 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1420 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1421 result_buffer_lens=result_buffer_lens)
1421 result_buffer_lens=result_buffer_lens)
1422 # self.log.debug (content)
1422 # self.log.debug (content)
1423 self.session.send(self.query, "db_reply", content=content,
1423 self.session.send(self.query, "db_reply", content=content,
1424 parent=msg, ident=client_id,
1424 parent=msg, ident=client_id,
1425 buffers=buffers)
1425 buffers=buffers)
1426
1426
General Comments 0
You need to be logged in to leave comments. Login now