##// END OF EJS Templates
Add the heartbeat period to the registration reply...
Jan Schulz -
Show More
@@ -1,1414 +1,1414 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import json
21 import json
22 import os
22 import os
23 import sys
23 import sys
24 import time
24 import time
25 from datetime import datetime
25 from datetime import datetime
26
26
27 import zmq
27 import zmq
28 from zmq.eventloop import ioloop
28 from zmq.eventloop import ioloop
29 from zmq.eventloop.zmqstream import ZMQStream
29 from zmq.eventloop.zmqstream import ZMQStream
30
30
31 # internal:
31 # internal:
32 from IPython.utils.importstring import import_item
32 from IPython.utils.importstring import import_item
33 from IPython.utils.py3compat import cast_bytes
33 from IPython.utils.py3compat import cast_bytes
34 from IPython.utils.traitlets import (
34 from IPython.utils.traitlets import (
35 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
35 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
36 )
36 )
37
37
38 from IPython.parallel import error, util
38 from IPython.parallel import error, util
39 from IPython.parallel.factory import RegistrationFactory
39 from IPython.parallel.factory import RegistrationFactory
40
40
41 from IPython.zmq.session import SessionFactory
41 from IPython.zmq.session import SessionFactory
42
42
43 from .heartmonitor import HeartMonitor
43 from .heartmonitor import HeartMonitor
44
44
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46 # Code
46 # Code
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48
48
49 def _passer(*args, **kwargs):
49 def _passer(*args, **kwargs):
50 return
50 return
51
51
52 def _printer(*args, **kwargs):
52 def _printer(*args, **kwargs):
53 print (args)
53 print (args)
54 print (kwargs)
54 print (kwargs)
55
55
56 def empty_record():
56 def empty_record():
57 """Return an empty dict with all record keys."""
57 """Return an empty dict with all record keys."""
58 return {
58 return {
59 'msg_id' : None,
59 'msg_id' : None,
60 'header' : None,
60 'header' : None,
61 'metadata' : None,
61 'metadata' : None,
62 'content': None,
62 'content': None,
63 'buffers': None,
63 'buffers': None,
64 'submitted': None,
64 'submitted': None,
65 'client_uuid' : None,
65 'client_uuid' : None,
66 'engine_uuid' : None,
66 'engine_uuid' : None,
67 'started': None,
67 'started': None,
68 'completed': None,
68 'completed': None,
69 'resubmitted': None,
69 'resubmitted': None,
70 'received': None,
70 'received': None,
71 'result_header' : None,
71 'result_header' : None,
72 'result_metadata' : None,
72 'result_metadata' : None,
73 'result_content' : None,
73 'result_content' : None,
74 'result_buffers' : None,
74 'result_buffers' : None,
75 'queue' : None,
75 'queue' : None,
76 'pyin' : None,
76 'pyin' : None,
77 'pyout': None,
77 'pyout': None,
78 'pyerr': None,
78 'pyerr': None,
79 'stdout': '',
79 'stdout': '',
80 'stderr': '',
80 'stderr': '',
81 }
81 }
82
82
83 def init_record(msg):
83 def init_record(msg):
84 """Initialize a TaskRecord based on a request."""
84 """Initialize a TaskRecord based on a request."""
85 header = msg['header']
85 header = msg['header']
86 return {
86 return {
87 'msg_id' : header['msg_id'],
87 'msg_id' : header['msg_id'],
88 'header' : header,
88 'header' : header,
89 'content': msg['content'],
89 'content': msg['content'],
90 'metadata': msg['metadata'],
90 'metadata': msg['metadata'],
91 'buffers': msg['buffers'],
91 'buffers': msg['buffers'],
92 'submitted': header['date'],
92 'submitted': header['date'],
93 'client_uuid' : None,
93 'client_uuid' : None,
94 'engine_uuid' : None,
94 'engine_uuid' : None,
95 'started': None,
95 'started': None,
96 'completed': None,
96 'completed': None,
97 'resubmitted': None,
97 'resubmitted': None,
98 'received': None,
98 'received': None,
99 'result_header' : None,
99 'result_header' : None,
100 'result_metadata': None,
100 'result_metadata': None,
101 'result_content' : None,
101 'result_content' : None,
102 'result_buffers' : None,
102 'result_buffers' : None,
103 'queue' : None,
103 'queue' : None,
104 'pyin' : None,
104 'pyin' : None,
105 'pyout': None,
105 'pyout': None,
106 'pyerr': None,
106 'pyerr': None,
107 'stdout': '',
107 'stdout': '',
108 'stderr': '',
108 'stderr': '',
109 }
109 }
110
110
111
111
112 class EngineConnector(HasTraits):
112 class EngineConnector(HasTraits):
113 """A simple object for accessing the various zmq connections of an object.
113 """A simple object for accessing the various zmq connections of an object.
114 Attributes are:
114 Attributes are:
115 id (int): engine ID
115 id (int): engine ID
116 uuid (unicode): engine UUID
116 uuid (unicode): engine UUID
117 pending: set of msg_ids
117 pending: set of msg_ids
118 stallback: DelayedCallback for stalled registration
118 stallback: DelayedCallback for stalled registration
119 """
119 """
120
120
121 id = Integer(0)
121 id = Integer(0)
122 uuid = Unicode()
122 uuid = Unicode()
123 pending = Set()
123 pending = Set()
124 stallback = Instance(ioloop.DelayedCallback)
124 stallback = Instance(ioloop.DelayedCallback)
125
125
126
126
127 _db_shortcuts = {
127 _db_shortcuts = {
128 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
128 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
129 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
129 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
130 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
130 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
131 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
131 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
132 }
132 }
133
133
134 class HubFactory(RegistrationFactory):
134 class HubFactory(RegistrationFactory):
135 """The Configurable for setting up a Hub."""
135 """The Configurable for setting up a Hub."""
136
136
137 # port-pairs for monitoredqueues:
137 # port-pairs for monitoredqueues:
138 hb = Tuple(Integer,Integer,config=True,
138 hb = Tuple(Integer,Integer,config=True,
139 help="""PUB/ROUTER Port pair for Engine heartbeats""")
139 help="""PUB/ROUTER Port pair for Engine heartbeats""")
140 def _hb_default(self):
140 def _hb_default(self):
141 return tuple(util.select_random_ports(2))
141 return tuple(util.select_random_ports(2))
142
142
143 mux = Tuple(Integer,Integer,config=True,
143 mux = Tuple(Integer,Integer,config=True,
144 help="""Client/Engine Port pair for MUX queue""")
144 help="""Client/Engine Port pair for MUX queue""")
145
145
146 def _mux_default(self):
146 def _mux_default(self):
147 return tuple(util.select_random_ports(2))
147 return tuple(util.select_random_ports(2))
148
148
149 task = Tuple(Integer,Integer,config=True,
149 task = Tuple(Integer,Integer,config=True,
150 help="""Client/Engine Port pair for Task queue""")
150 help="""Client/Engine Port pair for Task queue""")
151 def _task_default(self):
151 def _task_default(self):
152 return tuple(util.select_random_ports(2))
152 return tuple(util.select_random_ports(2))
153
153
154 control = Tuple(Integer,Integer,config=True,
154 control = Tuple(Integer,Integer,config=True,
155 help="""Client/Engine Port pair for Control queue""")
155 help="""Client/Engine Port pair for Control queue""")
156
156
157 def _control_default(self):
157 def _control_default(self):
158 return tuple(util.select_random_ports(2))
158 return tuple(util.select_random_ports(2))
159
159
160 iopub = Tuple(Integer,Integer,config=True,
160 iopub = Tuple(Integer,Integer,config=True,
161 help="""Client/Engine Port pair for IOPub relay""")
161 help="""Client/Engine Port pair for IOPub relay""")
162
162
163 def _iopub_default(self):
163 def _iopub_default(self):
164 return tuple(util.select_random_ports(2))
164 return tuple(util.select_random_ports(2))
165
165
166 # single ports:
166 # single ports:
167 mon_port = Integer(config=True,
167 mon_port = Integer(config=True,
168 help="""Monitor (SUB) port for queue traffic""")
168 help="""Monitor (SUB) port for queue traffic""")
169
169
170 def _mon_port_default(self):
170 def _mon_port_default(self):
171 return util.select_random_ports(1)[0]
171 return util.select_random_ports(1)[0]
172
172
173 notifier_port = Integer(config=True,
173 notifier_port = Integer(config=True,
174 help="""PUB port for sending engine status notifications""")
174 help="""PUB port for sending engine status notifications""")
175
175
176 def _notifier_port_default(self):
176 def _notifier_port_default(self):
177 return util.select_random_ports(1)[0]
177 return util.select_random_ports(1)[0]
178
178
179 engine_ip = Unicode('127.0.0.1', config=True,
179 engine_ip = Unicode('127.0.0.1', config=True,
180 help="IP on which to listen for engine connections. [default: loopback]")
180 help="IP on which to listen for engine connections. [default: loopback]")
181 engine_transport = Unicode('tcp', config=True,
181 engine_transport = Unicode('tcp', config=True,
182 help="0MQ transport for engine connections. [default: tcp]")
182 help="0MQ transport for engine connections. [default: tcp]")
183
183
184 client_ip = Unicode('127.0.0.1', config=True,
184 client_ip = Unicode('127.0.0.1', config=True,
185 help="IP on which to listen for client connections. [default: loopback]")
185 help="IP on which to listen for client connections. [default: loopback]")
186 client_transport = Unicode('tcp', config=True,
186 client_transport = Unicode('tcp', config=True,
187 help="0MQ transport for client connections. [default : tcp]")
187 help="0MQ transport for client connections. [default : tcp]")
188
188
189 monitor_ip = Unicode('127.0.0.1', config=True,
189 monitor_ip = Unicode('127.0.0.1', config=True,
190 help="IP on which to listen for monitor messages. [default: loopback]")
190 help="IP on which to listen for monitor messages. [default: loopback]")
191 monitor_transport = Unicode('tcp', config=True,
191 monitor_transport = Unicode('tcp', config=True,
192 help="0MQ transport for monitor messages. [default : tcp]")
192 help="0MQ transport for monitor messages. [default : tcp]")
193
193
194 monitor_url = Unicode('')
194 monitor_url = Unicode('')
195
195
196 db_class = DottedObjectName('NoDB',
196 db_class = DottedObjectName('NoDB',
197 config=True, help="""The class to use for the DB backend
197 config=True, help="""The class to use for the DB backend
198
198
199 Options include:
199 Options include:
200
200
201 SQLiteDB: SQLite
201 SQLiteDB: SQLite
202 MongoDB : use MongoDB
202 MongoDB : use MongoDB
203 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
203 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
204 NoDB : disable database altogether (default)
204 NoDB : disable database altogether (default)
205
205
206 """)
206 """)
207
207
208 # not configurable
208 # not configurable
209 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
209 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
210 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
210 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
211
211
212 def _ip_changed(self, name, old, new):
212 def _ip_changed(self, name, old, new):
213 self.engine_ip = new
213 self.engine_ip = new
214 self.client_ip = new
214 self.client_ip = new
215 self.monitor_ip = new
215 self.monitor_ip = new
216 self._update_monitor_url()
216 self._update_monitor_url()
217
217
218 def _update_monitor_url(self):
218 def _update_monitor_url(self):
219 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
219 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
220
220
221 def _transport_changed(self, name, old, new):
221 def _transport_changed(self, name, old, new):
222 self.engine_transport = new
222 self.engine_transport = new
223 self.client_transport = new
223 self.client_transport = new
224 self.monitor_transport = new
224 self.monitor_transport = new
225 self._update_monitor_url()
225 self._update_monitor_url()
226
226
227 def __init__(self, **kwargs):
227 def __init__(self, **kwargs):
228 super(HubFactory, self).__init__(**kwargs)
228 super(HubFactory, self).__init__(**kwargs)
229 self._update_monitor_url()
229 self._update_monitor_url()
230
230
231
231
232 def construct(self):
232 def construct(self):
233 self.init_hub()
233 self.init_hub()
234
234
235 def start(self):
235 def start(self):
236 self.heartmonitor.start()
236 self.heartmonitor.start()
237 self.log.info("Heartmonitor started")
237 self.log.info("Heartmonitor started")
238
238
239 def client_url(self, channel):
239 def client_url(self, channel):
240 """return full zmq url for a named client channel"""
240 """return full zmq url for a named client channel"""
241 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
241 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
242
242
243 def engine_url(self, channel):
243 def engine_url(self, channel):
244 """return full zmq url for a named engine channel"""
244 """return full zmq url for a named engine channel"""
245 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
245 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
246
246
247 def init_hub(self):
247 def init_hub(self):
248 """construct Hub object"""
248 """construct Hub object"""
249
249
250 ctx = self.context
250 ctx = self.context
251 loop = self.loop
251 loop = self.loop
252
252
253 try:
253 try:
254 scheme = self.config.TaskScheduler.scheme_name
254 scheme = self.config.TaskScheduler.scheme_name
255 except AttributeError:
255 except AttributeError:
256 from .scheduler import TaskScheduler
256 from .scheduler import TaskScheduler
257 scheme = TaskScheduler.scheme_name.get_default_value()
257 scheme = TaskScheduler.scheme_name.get_default_value()
258
258
259 # build connection dicts
259 # build connection dicts
260 engine = self.engine_info = {
260 engine = self.engine_info = {
261 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
261 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
262 'registration' : self.regport,
262 'registration' : self.regport,
263 'control' : self.control[1],
263 'control' : self.control[1],
264 'mux' : self.mux[1],
264 'mux' : self.mux[1],
265 'hb_ping' : self.hb[0],
265 'hb_ping' : self.hb[0],
266 'hb_pong' : self.hb[1],
266 'hb_pong' : self.hb[1],
267 'task' : self.task[1],
267 'task' : self.task[1],
268 'iopub' : self.iopub[1],
268 'iopub' : self.iopub[1],
269 }
269 }
270
270
271 client = self.client_info = {
271 client = self.client_info = {
272 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
272 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
273 'registration' : self.regport,
273 'registration' : self.regport,
274 'control' : self.control[0],
274 'control' : self.control[0],
275 'mux' : self.mux[0],
275 'mux' : self.mux[0],
276 'task' : self.task[0],
276 'task' : self.task[0],
277 'task_scheme' : scheme,
277 'task_scheme' : scheme,
278 'iopub' : self.iopub[0],
278 'iopub' : self.iopub[0],
279 'notification' : self.notifier_port,
279 'notification' : self.notifier_port,
280 }
280 }
281
281
282 self.log.debug("Hub engine addrs: %s", self.engine_info)
282 self.log.debug("Hub engine addrs: %s", self.engine_info)
283 self.log.debug("Hub client addrs: %s", self.client_info)
283 self.log.debug("Hub client addrs: %s", self.client_info)
284
284
285 # Registrar socket
285 # Registrar socket
286 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
286 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
287 q.bind(self.client_url('registration'))
287 q.bind(self.client_url('registration'))
288 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
288 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
289 if self.client_ip != self.engine_ip:
289 if self.client_ip != self.engine_ip:
290 q.bind(self.engine_url('registration'))
290 q.bind(self.engine_url('registration'))
291 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
291 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
292
292
293 ### Engine connections ###
293 ### Engine connections ###
294
294
295 # heartbeat
295 # heartbeat
296 hpub = ctx.socket(zmq.PUB)
296 hpub = ctx.socket(zmq.PUB)
297 hpub.bind(self.engine_url('hb_ping'))
297 hpub.bind(self.engine_url('hb_ping'))
298 hrep = ctx.socket(zmq.ROUTER)
298 hrep = ctx.socket(zmq.ROUTER)
299 hrep.bind(self.engine_url('hb_pong'))
299 hrep.bind(self.engine_url('hb_pong'))
300 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
300 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
301 pingstream=ZMQStream(hpub,loop),
301 pingstream=ZMQStream(hpub,loop),
302 pongstream=ZMQStream(hrep,loop)
302 pongstream=ZMQStream(hrep,loop)
303 )
303 )
304
304
305 ### Client connections ###
305 ### Client connections ###
306
306
307 # Notifier socket
307 # Notifier socket
308 n = ZMQStream(ctx.socket(zmq.PUB), loop)
308 n = ZMQStream(ctx.socket(zmq.PUB), loop)
309 n.bind(self.client_url('notification'))
309 n.bind(self.client_url('notification'))
310
310
311 ### build and launch the queues ###
311 ### build and launch the queues ###
312
312
313 # monitor socket
313 # monitor socket
314 sub = ctx.socket(zmq.SUB)
314 sub = ctx.socket(zmq.SUB)
315 sub.setsockopt(zmq.SUBSCRIBE, b"")
315 sub.setsockopt(zmq.SUBSCRIBE, b"")
316 sub.bind(self.monitor_url)
316 sub.bind(self.monitor_url)
317 sub.bind('inproc://monitor')
317 sub.bind('inproc://monitor')
318 sub = ZMQStream(sub, loop)
318 sub = ZMQStream(sub, loop)
319
319
320 # connect the db
320 # connect the db
321 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
321 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
322 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
322 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
323 self.db = import_item(str(db_class))(session=self.session.session,
323 self.db = import_item(str(db_class))(session=self.session.session,
324 config=self.config, log=self.log)
324 config=self.config, log=self.log)
325 time.sleep(.25)
325 time.sleep(.25)
326
326
327 # resubmit stream
327 # resubmit stream
328 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
328 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
329 url = util.disambiguate_url(self.client_url('task'))
329 url = util.disambiguate_url(self.client_url('task'))
330 r.connect(url)
330 r.connect(url)
331
331
332 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
332 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
333 query=q, notifier=n, resubmit=r, db=self.db,
333 query=q, notifier=n, resubmit=r, db=self.db,
334 engine_info=self.engine_info, client_info=self.client_info,
334 engine_info=self.engine_info, client_info=self.client_info,
335 log=self.log)
335 log=self.log)
336
336
337
337
338 class Hub(SessionFactory):
338 class Hub(SessionFactory):
339 """The IPython Controller Hub with 0MQ connections
339 """The IPython Controller Hub with 0MQ connections
340
340
341 Parameters
341 Parameters
342 ==========
342 ==========
343 loop: zmq IOLoop instance
343 loop: zmq IOLoop instance
344 session: Session object
344 session: Session object
345 <removed> context: zmq context for creating new connections (?)
345 <removed> context: zmq context for creating new connections (?)
346 queue: ZMQStream for monitoring the command queue (SUB)
346 queue: ZMQStream for monitoring the command queue (SUB)
347 query: ZMQStream for engine registration and client queries requests (ROUTER)
347 query: ZMQStream for engine registration and client queries requests (ROUTER)
348 heartbeat: HeartMonitor object checking the pulse of the engines
348 heartbeat: HeartMonitor object checking the pulse of the engines
349 notifier: ZMQStream for broadcasting engine registration changes (PUB)
349 notifier: ZMQStream for broadcasting engine registration changes (PUB)
350 db: connection to db for out of memory logging of commands
350 db: connection to db for out of memory logging of commands
351 NotImplemented
351 NotImplemented
352 engine_info: dict of zmq connection information for engines to connect
352 engine_info: dict of zmq connection information for engines to connect
353 to the queues.
353 to the queues.
354 client_info: dict of zmq connection information for engines to connect
354 client_info: dict of zmq connection information for engines to connect
355 to the queues.
355 to the queues.
356 """
356 """
357
357
358 engine_state_file = Unicode()
358 engine_state_file = Unicode()
359
359
360 # internal data structures:
360 # internal data structures:
361 ids=Set() # engine IDs
361 ids=Set() # engine IDs
362 keytable=Dict()
362 keytable=Dict()
363 by_ident=Dict()
363 by_ident=Dict()
364 engines=Dict()
364 engines=Dict()
365 clients=Dict()
365 clients=Dict()
366 hearts=Dict()
366 hearts=Dict()
367 pending=Set()
367 pending=Set()
368 queues=Dict() # pending msg_ids keyed by engine_id
368 queues=Dict() # pending msg_ids keyed by engine_id
369 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
369 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
370 completed=Dict() # completed msg_ids keyed by engine_id
370 completed=Dict() # completed msg_ids keyed by engine_id
371 all_completed=Set() # completed msg_ids keyed by engine_id
371 all_completed=Set() # completed msg_ids keyed by engine_id
372 dead_engines=Set() # completed msg_ids keyed by engine_id
372 dead_engines=Set() # completed msg_ids keyed by engine_id
373 unassigned=Set() # set of task msg_ds not yet assigned a destination
373 unassigned=Set() # set of task msg_ds not yet assigned a destination
374 incoming_registrations=Dict()
374 incoming_registrations=Dict()
375 registration_timeout=Integer()
375 registration_timeout=Integer()
376 _idcounter=Integer(0)
376 _idcounter=Integer(0)
377
377
378 # objects from constructor:
378 # objects from constructor:
379 query=Instance(ZMQStream)
379 query=Instance(ZMQStream)
380 monitor=Instance(ZMQStream)
380 monitor=Instance(ZMQStream)
381 notifier=Instance(ZMQStream)
381 notifier=Instance(ZMQStream)
382 resubmit=Instance(ZMQStream)
382 resubmit=Instance(ZMQStream)
383 heartmonitor=Instance(HeartMonitor)
383 heartmonitor=Instance(HeartMonitor)
384 db=Instance(object)
384 db=Instance(object)
385 client_info=Dict()
385 client_info=Dict()
386 engine_info=Dict()
386 engine_info=Dict()
387
387
388
388
389 def __init__(self, **kwargs):
389 def __init__(self, **kwargs):
390 """
390 """
391 # universal:
391 # universal:
392 loop: IOLoop for creating future connections
392 loop: IOLoop for creating future connections
393 session: streamsession for sending serialized data
393 session: streamsession for sending serialized data
394 # engine:
394 # engine:
395 queue: ZMQStream for monitoring queue messages
395 queue: ZMQStream for monitoring queue messages
396 query: ZMQStream for engine+client registration and client requests
396 query: ZMQStream for engine+client registration and client requests
397 heartbeat: HeartMonitor object for tracking engines
397 heartbeat: HeartMonitor object for tracking engines
398 # extra:
398 # extra:
399 db: ZMQStream for db connection (NotImplemented)
399 db: ZMQStream for db connection (NotImplemented)
400 engine_info: zmq address/protocol dict for engine connections
400 engine_info: zmq address/protocol dict for engine connections
401 client_info: zmq address/protocol dict for client connections
401 client_info: zmq address/protocol dict for client connections
402 """
402 """
403
403
404 super(Hub, self).__init__(**kwargs)
404 super(Hub, self).__init__(**kwargs)
405 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
405 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
406
406
407 # register our callbacks
407 # register our callbacks
408 self.query.on_recv(self.dispatch_query)
408 self.query.on_recv(self.dispatch_query)
409 self.monitor.on_recv(self.dispatch_monitor_traffic)
409 self.monitor.on_recv(self.dispatch_monitor_traffic)
410
410
411 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
411 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
412 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
412 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
413
413
414 self.monitor_handlers = {b'in' : self.save_queue_request,
414 self.monitor_handlers = {b'in' : self.save_queue_request,
415 b'out': self.save_queue_result,
415 b'out': self.save_queue_result,
416 b'intask': self.save_task_request,
416 b'intask': self.save_task_request,
417 b'outtask': self.save_task_result,
417 b'outtask': self.save_task_result,
418 b'tracktask': self.save_task_destination,
418 b'tracktask': self.save_task_destination,
419 b'incontrol': _passer,
419 b'incontrol': _passer,
420 b'outcontrol': _passer,
420 b'outcontrol': _passer,
421 b'iopub': self.save_iopub_message,
421 b'iopub': self.save_iopub_message,
422 }
422 }
423
423
424 self.query_handlers = {'queue_request': self.queue_status,
424 self.query_handlers = {'queue_request': self.queue_status,
425 'result_request': self.get_results,
425 'result_request': self.get_results,
426 'history_request': self.get_history,
426 'history_request': self.get_history,
427 'db_request': self.db_query,
427 'db_request': self.db_query,
428 'purge_request': self.purge_results,
428 'purge_request': self.purge_results,
429 'load_request': self.check_load,
429 'load_request': self.check_load,
430 'resubmit_request': self.resubmit_task,
430 'resubmit_request': self.resubmit_task,
431 'shutdown_request': self.shutdown_request,
431 'shutdown_request': self.shutdown_request,
432 'registration_request' : self.register_engine,
432 'registration_request' : self.register_engine,
433 'unregistration_request' : self.unregister_engine,
433 'unregistration_request' : self.unregister_engine,
434 'connection_request': self.connection_request,
434 'connection_request': self.connection_request,
435 }
435 }
436
436
437 # ignore resubmit replies
437 # ignore resubmit replies
438 self.resubmit.on_recv(lambda msg: None, copy=False)
438 self.resubmit.on_recv(lambda msg: None, copy=False)
439
439
440 self.log.info("hub::created hub")
440 self.log.info("hub::created hub")
441
441
442 @property
442 @property
443 def _next_id(self):
443 def _next_id(self):
444 """gemerate a new ID.
444 """gemerate a new ID.
445
445
446 No longer reuse old ids, just count from 0."""
446 No longer reuse old ids, just count from 0."""
447 newid = self._idcounter
447 newid = self._idcounter
448 self._idcounter += 1
448 self._idcounter += 1
449 return newid
449 return newid
450 # newid = 0
450 # newid = 0
451 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
451 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
452 # # print newid, self.ids, self.incoming_registrations
452 # # print newid, self.ids, self.incoming_registrations
453 # while newid in self.ids or newid in incoming:
453 # while newid in self.ids or newid in incoming:
454 # newid += 1
454 # newid += 1
455 # return newid
455 # return newid
456
456
457 #-----------------------------------------------------------------------------
457 #-----------------------------------------------------------------------------
458 # message validation
458 # message validation
459 #-----------------------------------------------------------------------------
459 #-----------------------------------------------------------------------------
460
460
461 def _validate_targets(self, targets):
461 def _validate_targets(self, targets):
462 """turn any valid targets argument into a list of integer ids"""
462 """turn any valid targets argument into a list of integer ids"""
463 if targets is None:
463 if targets is None:
464 # default to all
464 # default to all
465 return self.ids
465 return self.ids
466
466
467 if isinstance(targets, (int,str,unicode)):
467 if isinstance(targets, (int,str,unicode)):
468 # only one target specified
468 # only one target specified
469 targets = [targets]
469 targets = [targets]
470 _targets = []
470 _targets = []
471 for t in targets:
471 for t in targets:
472 # map raw identities to ids
472 # map raw identities to ids
473 if isinstance(t, (str,unicode)):
473 if isinstance(t, (str,unicode)):
474 t = self.by_ident.get(cast_bytes(t), t)
474 t = self.by_ident.get(cast_bytes(t), t)
475 _targets.append(t)
475 _targets.append(t)
476 targets = _targets
476 targets = _targets
477 bad_targets = [ t for t in targets if t not in self.ids ]
477 bad_targets = [ t for t in targets if t not in self.ids ]
478 if bad_targets:
478 if bad_targets:
479 raise IndexError("No Such Engine: %r" % bad_targets)
479 raise IndexError("No Such Engine: %r" % bad_targets)
480 if not targets:
480 if not targets:
481 raise IndexError("No Engines Registered")
481 raise IndexError("No Engines Registered")
482 return targets
482 return targets
483
483
484 #-----------------------------------------------------------------------------
484 #-----------------------------------------------------------------------------
485 # dispatch methods (1 per stream)
485 # dispatch methods (1 per stream)
486 #-----------------------------------------------------------------------------
486 #-----------------------------------------------------------------------------
487
487
488
488
489 @util.log_errors
489 @util.log_errors
490 def dispatch_monitor_traffic(self, msg):
490 def dispatch_monitor_traffic(self, msg):
491 """all ME and Task queue messages come through here, as well as
491 """all ME and Task queue messages come through here, as well as
492 IOPub traffic."""
492 IOPub traffic."""
493 self.log.debug("monitor traffic: %r", msg[0])
493 self.log.debug("monitor traffic: %r", msg[0])
494 switch = msg[0]
494 switch = msg[0]
495 try:
495 try:
496 idents, msg = self.session.feed_identities(msg[1:])
496 idents, msg = self.session.feed_identities(msg[1:])
497 except ValueError:
497 except ValueError:
498 idents=[]
498 idents=[]
499 if not idents:
499 if not idents:
500 self.log.error("Monitor message without topic: %r", msg)
500 self.log.error("Monitor message without topic: %r", msg)
501 return
501 return
502 handler = self.monitor_handlers.get(switch, None)
502 handler = self.monitor_handlers.get(switch, None)
503 if handler is not None:
503 if handler is not None:
504 handler(idents, msg)
504 handler(idents, msg)
505 else:
505 else:
506 self.log.error("Unrecognized monitor topic: %r", switch)
506 self.log.error("Unrecognized monitor topic: %r", switch)
507
507
508
508
509 @util.log_errors
509 @util.log_errors
510 def dispatch_query(self, msg):
510 def dispatch_query(self, msg):
511 """Route registration requests and queries from clients."""
511 """Route registration requests and queries from clients."""
512 try:
512 try:
513 idents, msg = self.session.feed_identities(msg)
513 idents, msg = self.session.feed_identities(msg)
514 except ValueError:
514 except ValueError:
515 idents = []
515 idents = []
516 if not idents:
516 if not idents:
517 self.log.error("Bad Query Message: %r", msg)
517 self.log.error("Bad Query Message: %r", msg)
518 return
518 return
519 client_id = idents[0]
519 client_id = idents[0]
520 try:
520 try:
521 msg = self.session.unserialize(msg, content=True)
521 msg = self.session.unserialize(msg, content=True)
522 except Exception:
522 except Exception:
523 content = error.wrap_exception()
523 content = error.wrap_exception()
524 self.log.error("Bad Query Message: %r", msg, exc_info=True)
524 self.log.error("Bad Query Message: %r", msg, exc_info=True)
525 self.session.send(self.query, "hub_error", ident=client_id,
525 self.session.send(self.query, "hub_error", ident=client_id,
526 content=content)
526 content=content)
527 return
527 return
528 # print client_id, header, parent, content
528 # print client_id, header, parent, content
529 #switch on message type:
529 #switch on message type:
530 msg_type = msg['header']['msg_type']
530 msg_type = msg['header']['msg_type']
531 self.log.info("client::client %r requested %r", client_id, msg_type)
531 self.log.info("client::client %r requested %r", client_id, msg_type)
532 handler = self.query_handlers.get(msg_type, None)
532 handler = self.query_handlers.get(msg_type, None)
533 try:
533 try:
534 assert handler is not None, "Bad Message Type: %r" % msg_type
534 assert handler is not None, "Bad Message Type: %r" % msg_type
535 except:
535 except:
536 content = error.wrap_exception()
536 content = error.wrap_exception()
537 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
537 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
538 self.session.send(self.query, "hub_error", ident=client_id,
538 self.session.send(self.query, "hub_error", ident=client_id,
539 content=content)
539 content=content)
540 return
540 return
541
541
542 else:
542 else:
543 handler(idents, msg)
543 handler(idents, msg)
544
544
545 def dispatch_db(self, msg):
545 def dispatch_db(self, msg):
546 """"""
546 """"""
547 raise NotImplementedError
547 raise NotImplementedError
548
548
549 #---------------------------------------------------------------------------
549 #---------------------------------------------------------------------------
550 # handler methods (1 per event)
550 # handler methods (1 per event)
551 #---------------------------------------------------------------------------
551 #---------------------------------------------------------------------------
552
552
553 #----------------------- Heartbeat --------------------------------------
553 #----------------------- Heartbeat --------------------------------------
554
554
555 def handle_new_heart(self, heart):
555 def handle_new_heart(self, heart):
556 """handler to attach to heartbeater.
556 """handler to attach to heartbeater.
557 Called when a new heart starts to beat.
557 Called when a new heart starts to beat.
558 Triggers completion of registration."""
558 Triggers completion of registration."""
559 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
559 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
560 if heart not in self.incoming_registrations:
560 if heart not in self.incoming_registrations:
561 self.log.info("heartbeat::ignoring new heart: %r", heart)
561 self.log.info("heartbeat::ignoring new heart: %r", heart)
562 else:
562 else:
563 self.finish_registration(heart)
563 self.finish_registration(heart)
564
564
565
565
566 def handle_heart_failure(self, heart):
566 def handle_heart_failure(self, heart):
567 """handler to attach to heartbeater.
567 """handler to attach to heartbeater.
568 called when a previously registered heart fails to respond to beat request.
568 called when a previously registered heart fails to respond to beat request.
569 triggers unregistration"""
569 triggers unregistration"""
570 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
570 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
571 eid = self.hearts.get(heart, None)
571 eid = self.hearts.get(heart, None)
572 uuid = self.engines[eid].uuid
572 uuid = self.engines[eid].uuid
573 if eid is None or self.keytable[eid] in self.dead_engines:
573 if eid is None or self.keytable[eid] in self.dead_engines:
574 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
574 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
575 else:
575 else:
576 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
576 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
577
577
578 #----------------------- MUX Queue Traffic ------------------------------
578 #----------------------- MUX Queue Traffic ------------------------------
579
579
580 def save_queue_request(self, idents, msg):
580 def save_queue_request(self, idents, msg):
581 if len(idents) < 2:
581 if len(idents) < 2:
582 self.log.error("invalid identity prefix: %r", idents)
582 self.log.error("invalid identity prefix: %r", idents)
583 return
583 return
584 queue_id, client_id = idents[:2]
584 queue_id, client_id = idents[:2]
585 try:
585 try:
586 msg = self.session.unserialize(msg)
586 msg = self.session.unserialize(msg)
587 except Exception:
587 except Exception:
588 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
588 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
589 return
589 return
590
590
591 eid = self.by_ident.get(queue_id, None)
591 eid = self.by_ident.get(queue_id, None)
592 if eid is None:
592 if eid is None:
593 self.log.error("queue::target %r not registered", queue_id)
593 self.log.error("queue::target %r not registered", queue_id)
594 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
594 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
595 return
595 return
596 record = init_record(msg)
596 record = init_record(msg)
597 msg_id = record['msg_id']
597 msg_id = record['msg_id']
598 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
598 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
599 # Unicode in records
599 # Unicode in records
600 record['engine_uuid'] = queue_id.decode('ascii')
600 record['engine_uuid'] = queue_id.decode('ascii')
601 record['client_uuid'] = msg['header']['session']
601 record['client_uuid'] = msg['header']['session']
602 record['queue'] = 'mux'
602 record['queue'] = 'mux'
603
603
604 try:
604 try:
605 # it's posible iopub arrived first:
605 # it's posible iopub arrived first:
606 existing = self.db.get_record(msg_id)
606 existing = self.db.get_record(msg_id)
607 for key,evalue in existing.iteritems():
607 for key,evalue in existing.iteritems():
608 rvalue = record.get(key, None)
608 rvalue = record.get(key, None)
609 if evalue and rvalue and evalue != rvalue:
609 if evalue and rvalue and evalue != rvalue:
610 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
610 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
611 elif evalue and not rvalue:
611 elif evalue and not rvalue:
612 record[key] = evalue
612 record[key] = evalue
613 try:
613 try:
614 self.db.update_record(msg_id, record)
614 self.db.update_record(msg_id, record)
615 except Exception:
615 except Exception:
616 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
616 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
617 except KeyError:
617 except KeyError:
618 try:
618 try:
619 self.db.add_record(msg_id, record)
619 self.db.add_record(msg_id, record)
620 except Exception:
620 except Exception:
621 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
621 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
622
622
623
623
624 self.pending.add(msg_id)
624 self.pending.add(msg_id)
625 self.queues[eid].append(msg_id)
625 self.queues[eid].append(msg_id)
626
626
627 def save_queue_result(self, idents, msg):
627 def save_queue_result(self, idents, msg):
628 if len(idents) < 2:
628 if len(idents) < 2:
629 self.log.error("invalid identity prefix: %r", idents)
629 self.log.error("invalid identity prefix: %r", idents)
630 return
630 return
631
631
632 client_id, queue_id = idents[:2]
632 client_id, queue_id = idents[:2]
633 try:
633 try:
634 msg = self.session.unserialize(msg)
634 msg = self.session.unserialize(msg)
635 except Exception:
635 except Exception:
636 self.log.error("queue::engine %r sent invalid message to %r: %r",
636 self.log.error("queue::engine %r sent invalid message to %r: %r",
637 queue_id, client_id, msg, exc_info=True)
637 queue_id, client_id, msg, exc_info=True)
638 return
638 return
639
639
640 eid = self.by_ident.get(queue_id, None)
640 eid = self.by_ident.get(queue_id, None)
641 if eid is None:
641 if eid is None:
642 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
642 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
643 return
643 return
644
644
645 parent = msg['parent_header']
645 parent = msg['parent_header']
646 if not parent:
646 if not parent:
647 return
647 return
648 msg_id = parent['msg_id']
648 msg_id = parent['msg_id']
649 if msg_id in self.pending:
649 if msg_id in self.pending:
650 self.pending.remove(msg_id)
650 self.pending.remove(msg_id)
651 self.all_completed.add(msg_id)
651 self.all_completed.add(msg_id)
652 self.queues[eid].remove(msg_id)
652 self.queues[eid].remove(msg_id)
653 self.completed[eid].append(msg_id)
653 self.completed[eid].append(msg_id)
654 self.log.info("queue::request %r completed on %s", msg_id, eid)
654 self.log.info("queue::request %r completed on %s", msg_id, eid)
655 elif msg_id not in self.all_completed:
655 elif msg_id not in self.all_completed:
656 # it could be a result from a dead engine that died before delivering the
656 # it could be a result from a dead engine that died before delivering the
657 # result
657 # result
658 self.log.warn("queue:: unknown msg finished %r", msg_id)
658 self.log.warn("queue:: unknown msg finished %r", msg_id)
659 return
659 return
660 # update record anyway, because the unregistration could have been premature
660 # update record anyway, because the unregistration could have been premature
661 rheader = msg['header']
661 rheader = msg['header']
662 md = msg['metadata']
662 md = msg['metadata']
663 completed = rheader['date']
663 completed = rheader['date']
664 started = md.get('started', None)
664 started = md.get('started', None)
665 result = {
665 result = {
666 'result_header' : rheader,
666 'result_header' : rheader,
667 'result_metadata': md,
667 'result_metadata': md,
668 'result_content': msg['content'],
668 'result_content': msg['content'],
669 'received': datetime.now(),
669 'received': datetime.now(),
670 'started' : started,
670 'started' : started,
671 'completed' : completed
671 'completed' : completed
672 }
672 }
673
673
674 result['result_buffers'] = msg['buffers']
674 result['result_buffers'] = msg['buffers']
675 try:
675 try:
676 self.db.update_record(msg_id, result)
676 self.db.update_record(msg_id, result)
677 except Exception:
677 except Exception:
678 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
678 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
679
679
680
680
681 #--------------------- Task Queue Traffic ------------------------------
681 #--------------------- Task Queue Traffic ------------------------------
682
682
683 def save_task_request(self, idents, msg):
683 def save_task_request(self, idents, msg):
684 """Save the submission of a task."""
684 """Save the submission of a task."""
685 client_id = idents[0]
685 client_id = idents[0]
686
686
687 try:
687 try:
688 msg = self.session.unserialize(msg)
688 msg = self.session.unserialize(msg)
689 except Exception:
689 except Exception:
690 self.log.error("task::client %r sent invalid task message: %r",
690 self.log.error("task::client %r sent invalid task message: %r",
691 client_id, msg, exc_info=True)
691 client_id, msg, exc_info=True)
692 return
692 return
693 record = init_record(msg)
693 record = init_record(msg)
694
694
695 record['client_uuid'] = msg['header']['session']
695 record['client_uuid'] = msg['header']['session']
696 record['queue'] = 'task'
696 record['queue'] = 'task'
697 header = msg['header']
697 header = msg['header']
698 msg_id = header['msg_id']
698 msg_id = header['msg_id']
699 self.pending.add(msg_id)
699 self.pending.add(msg_id)
700 self.unassigned.add(msg_id)
700 self.unassigned.add(msg_id)
701 try:
701 try:
702 # it's posible iopub arrived first:
702 # it's posible iopub arrived first:
703 existing = self.db.get_record(msg_id)
703 existing = self.db.get_record(msg_id)
704 if existing['resubmitted']:
704 if existing['resubmitted']:
705 for key in ('submitted', 'client_uuid', 'buffers'):
705 for key in ('submitted', 'client_uuid', 'buffers'):
706 # don't clobber these keys on resubmit
706 # don't clobber these keys on resubmit
707 # submitted and client_uuid should be different
707 # submitted and client_uuid should be different
708 # and buffers might be big, and shouldn't have changed
708 # and buffers might be big, and shouldn't have changed
709 record.pop(key)
709 record.pop(key)
710 # still check content,header which should not change
710 # still check content,header which should not change
711 # but are not expensive to compare as buffers
711 # but are not expensive to compare as buffers
712
712
713 for key,evalue in existing.iteritems():
713 for key,evalue in existing.iteritems():
714 if key.endswith('buffers'):
714 if key.endswith('buffers'):
715 # don't compare buffers
715 # don't compare buffers
716 continue
716 continue
717 rvalue = record.get(key, None)
717 rvalue = record.get(key, None)
718 if evalue and rvalue and evalue != rvalue:
718 if evalue and rvalue and evalue != rvalue:
719 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
719 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
720 elif evalue and not rvalue:
720 elif evalue and not rvalue:
721 record[key] = evalue
721 record[key] = evalue
722 try:
722 try:
723 self.db.update_record(msg_id, record)
723 self.db.update_record(msg_id, record)
724 except Exception:
724 except Exception:
725 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
725 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
726 except KeyError:
726 except KeyError:
727 try:
727 try:
728 self.db.add_record(msg_id, record)
728 self.db.add_record(msg_id, record)
729 except Exception:
729 except Exception:
730 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
730 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
731 except Exception:
731 except Exception:
732 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
732 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
733
733
734 def save_task_result(self, idents, msg):
734 def save_task_result(self, idents, msg):
735 """save the result of a completed task."""
735 """save the result of a completed task."""
736 client_id = idents[0]
736 client_id = idents[0]
737 try:
737 try:
738 msg = self.session.unserialize(msg)
738 msg = self.session.unserialize(msg)
739 except Exception:
739 except Exception:
740 self.log.error("task::invalid task result message send to %r: %r",
740 self.log.error("task::invalid task result message send to %r: %r",
741 client_id, msg, exc_info=True)
741 client_id, msg, exc_info=True)
742 return
742 return
743
743
744 parent = msg['parent_header']
744 parent = msg['parent_header']
745 if not parent:
745 if not parent:
746 # print msg
746 # print msg
747 self.log.warn("Task %r had no parent!", msg)
747 self.log.warn("Task %r had no parent!", msg)
748 return
748 return
749 msg_id = parent['msg_id']
749 msg_id = parent['msg_id']
750 if msg_id in self.unassigned:
750 if msg_id in self.unassigned:
751 self.unassigned.remove(msg_id)
751 self.unassigned.remove(msg_id)
752
752
753 header = msg['header']
753 header = msg['header']
754 md = msg['metadata']
754 md = msg['metadata']
755 engine_uuid = md.get('engine', u'')
755 engine_uuid = md.get('engine', u'')
756 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
756 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
757
757
758 status = md.get('status', None)
758 status = md.get('status', None)
759
759
760 if msg_id in self.pending:
760 if msg_id in self.pending:
761 self.log.info("task::task %r finished on %s", msg_id, eid)
761 self.log.info("task::task %r finished on %s", msg_id, eid)
762 self.pending.remove(msg_id)
762 self.pending.remove(msg_id)
763 self.all_completed.add(msg_id)
763 self.all_completed.add(msg_id)
764 if eid is not None:
764 if eid is not None:
765 if status != 'aborted':
765 if status != 'aborted':
766 self.completed[eid].append(msg_id)
766 self.completed[eid].append(msg_id)
767 if msg_id in self.tasks[eid]:
767 if msg_id in self.tasks[eid]:
768 self.tasks[eid].remove(msg_id)
768 self.tasks[eid].remove(msg_id)
769 completed = header['date']
769 completed = header['date']
770 started = md.get('started', None)
770 started = md.get('started', None)
771 result = {
771 result = {
772 'result_header' : header,
772 'result_header' : header,
773 'result_metadata': msg['metadata'],
773 'result_metadata': msg['metadata'],
774 'result_content': msg['content'],
774 'result_content': msg['content'],
775 'started' : started,
775 'started' : started,
776 'completed' : completed,
776 'completed' : completed,
777 'received' : datetime.now(),
777 'received' : datetime.now(),
778 'engine_uuid': engine_uuid,
778 'engine_uuid': engine_uuid,
779 }
779 }
780
780
781 result['result_buffers'] = msg['buffers']
781 result['result_buffers'] = msg['buffers']
782 try:
782 try:
783 self.db.update_record(msg_id, result)
783 self.db.update_record(msg_id, result)
784 except Exception:
784 except Exception:
785 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
785 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
786
786
787 else:
787 else:
788 self.log.debug("task::unknown task %r finished", msg_id)
788 self.log.debug("task::unknown task %r finished", msg_id)
789
789
790 def save_task_destination(self, idents, msg):
790 def save_task_destination(self, idents, msg):
791 try:
791 try:
792 msg = self.session.unserialize(msg, content=True)
792 msg = self.session.unserialize(msg, content=True)
793 except Exception:
793 except Exception:
794 self.log.error("task::invalid task tracking message", exc_info=True)
794 self.log.error("task::invalid task tracking message", exc_info=True)
795 return
795 return
796 content = msg['content']
796 content = msg['content']
797 # print (content)
797 # print (content)
798 msg_id = content['msg_id']
798 msg_id = content['msg_id']
799 engine_uuid = content['engine_id']
799 engine_uuid = content['engine_id']
800 eid = self.by_ident[cast_bytes(engine_uuid)]
800 eid = self.by_ident[cast_bytes(engine_uuid)]
801
801
802 self.log.info("task::task %r arrived on %r", msg_id, eid)
802 self.log.info("task::task %r arrived on %r", msg_id, eid)
803 if msg_id in self.unassigned:
803 if msg_id in self.unassigned:
804 self.unassigned.remove(msg_id)
804 self.unassigned.remove(msg_id)
805 # else:
805 # else:
806 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
806 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
807
807
808 self.tasks[eid].append(msg_id)
808 self.tasks[eid].append(msg_id)
809 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
809 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
810 try:
810 try:
811 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
811 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
812 except Exception:
812 except Exception:
813 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
813 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
814
814
815
815
816 def mia_task_request(self, idents, msg):
816 def mia_task_request(self, idents, msg):
817 raise NotImplementedError
817 raise NotImplementedError
818 client_id = idents[0]
818 client_id = idents[0]
819 # content = dict(mia=self.mia,status='ok')
819 # content = dict(mia=self.mia,status='ok')
820 # self.session.send('mia_reply', content=content, idents=client_id)
820 # self.session.send('mia_reply', content=content, idents=client_id)
821
821
822
822
823 #--------------------- IOPub Traffic ------------------------------
823 #--------------------- IOPub Traffic ------------------------------
824
824
825 def save_iopub_message(self, topics, msg):
825 def save_iopub_message(self, topics, msg):
826 """save an iopub message into the db"""
826 """save an iopub message into the db"""
827 # print (topics)
827 # print (topics)
828 try:
828 try:
829 msg = self.session.unserialize(msg, content=True)
829 msg = self.session.unserialize(msg, content=True)
830 except Exception:
830 except Exception:
831 self.log.error("iopub::invalid IOPub message", exc_info=True)
831 self.log.error("iopub::invalid IOPub message", exc_info=True)
832 return
832 return
833
833
834 parent = msg['parent_header']
834 parent = msg['parent_header']
835 if not parent:
835 if not parent:
836 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
836 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
837 return
837 return
838 msg_id = parent['msg_id']
838 msg_id = parent['msg_id']
839 msg_type = msg['header']['msg_type']
839 msg_type = msg['header']['msg_type']
840 content = msg['content']
840 content = msg['content']
841
841
842 # ensure msg_id is in db
842 # ensure msg_id is in db
843 try:
843 try:
844 rec = self.db.get_record(msg_id)
844 rec = self.db.get_record(msg_id)
845 except KeyError:
845 except KeyError:
846 rec = empty_record()
846 rec = empty_record()
847 rec['msg_id'] = msg_id
847 rec['msg_id'] = msg_id
848 self.db.add_record(msg_id, rec)
848 self.db.add_record(msg_id, rec)
849 # stream
849 # stream
850 d = {}
850 d = {}
851 if msg_type == 'stream':
851 if msg_type == 'stream':
852 name = content['name']
852 name = content['name']
853 s = rec[name] or ''
853 s = rec[name] or ''
854 d[name] = s + content['data']
854 d[name] = s + content['data']
855
855
856 elif msg_type == 'pyerr':
856 elif msg_type == 'pyerr':
857 d['pyerr'] = content
857 d['pyerr'] = content
858 elif msg_type == 'pyin':
858 elif msg_type == 'pyin':
859 d['pyin'] = content['code']
859 d['pyin'] = content['code']
860 elif msg_type in ('display_data', 'pyout'):
860 elif msg_type in ('display_data', 'pyout'):
861 d[msg_type] = content
861 d[msg_type] = content
862 elif msg_type == 'status':
862 elif msg_type == 'status':
863 pass
863 pass
864 elif msg_type == 'data_pub':
864 elif msg_type == 'data_pub':
865 self.log.info("ignored data_pub message for %s" % msg_id)
865 self.log.info("ignored data_pub message for %s" % msg_id)
866 else:
866 else:
867 self.log.warn("unhandled iopub msg_type: %r", msg_type)
867 self.log.warn("unhandled iopub msg_type: %r", msg_type)
868
868
869 if not d:
869 if not d:
870 return
870 return
871
871
872 try:
872 try:
873 self.db.update_record(msg_id, d)
873 self.db.update_record(msg_id, d)
874 except Exception:
874 except Exception:
875 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
875 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
876
876
877
877
878
878
879 #-------------------------------------------------------------------------
879 #-------------------------------------------------------------------------
880 # Registration requests
880 # Registration requests
881 #-------------------------------------------------------------------------
881 #-------------------------------------------------------------------------
882
882
883 def connection_request(self, client_id, msg):
883 def connection_request(self, client_id, msg):
884 """Reply with connection addresses for clients."""
884 """Reply with connection addresses for clients."""
885 self.log.info("client::client %r connected", client_id)
885 self.log.info("client::client %r connected", client_id)
886 content = dict(status='ok')
886 content = dict(status='ok')
887 jsonable = {}
887 jsonable = {}
888 for k,v in self.keytable.iteritems():
888 for k,v in self.keytable.iteritems():
889 if v not in self.dead_engines:
889 if v not in self.dead_engines:
890 jsonable[str(k)] = v
890 jsonable[str(k)] = v
891 content['engines'] = jsonable
891 content['engines'] = jsonable
892 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
892 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
893
893
894 def register_engine(self, reg, msg):
894 def register_engine(self, reg, msg):
895 """Register a new engine."""
895 """Register a new engine."""
896 content = msg['content']
896 content = msg['content']
897 try:
897 try:
898 uuid = content['uuid']
898 uuid = content['uuid']
899 except KeyError:
899 except KeyError:
900 self.log.error("registration::queue not specified", exc_info=True)
900 self.log.error("registration::queue not specified", exc_info=True)
901 return
901 return
902
902
903 eid = self._next_id
903 eid = self._next_id
904
904
905 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
905 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
906
906
907 content = dict(id=eid,status='ok')
907 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
908 # check if requesting available IDs:
908 # check if requesting available IDs:
909 if cast_bytes(uuid) in self.by_ident:
909 if cast_bytes(uuid) in self.by_ident:
910 try:
910 try:
911 raise KeyError("uuid %r in use" % uuid)
911 raise KeyError("uuid %r in use" % uuid)
912 except:
912 except:
913 content = error.wrap_exception()
913 content = error.wrap_exception()
914 self.log.error("uuid %r in use", uuid, exc_info=True)
914 self.log.error("uuid %r in use", uuid, exc_info=True)
915 else:
915 else:
916 for h, ec in self.incoming_registrations.iteritems():
916 for h, ec in self.incoming_registrations.iteritems():
917 if uuid == h:
917 if uuid == h:
918 try:
918 try:
919 raise KeyError("heart_id %r in use" % uuid)
919 raise KeyError("heart_id %r in use" % uuid)
920 except:
920 except:
921 self.log.error("heart_id %r in use", uuid, exc_info=True)
921 self.log.error("heart_id %r in use", uuid, exc_info=True)
922 content = error.wrap_exception()
922 content = error.wrap_exception()
923 break
923 break
924 elif uuid == ec.uuid:
924 elif uuid == ec.uuid:
925 try:
925 try:
926 raise KeyError("uuid %r in use" % uuid)
926 raise KeyError("uuid %r in use" % uuid)
927 except:
927 except:
928 self.log.error("uuid %r in use", uuid, exc_info=True)
928 self.log.error("uuid %r in use", uuid, exc_info=True)
929 content = error.wrap_exception()
929 content = error.wrap_exception()
930 break
930 break
931
931
932 msg = self.session.send(self.query, "registration_reply",
932 msg = self.session.send(self.query, "registration_reply",
933 content=content,
933 content=content,
934 ident=reg)
934 ident=reg)
935
935
936 heart = cast_bytes(uuid)
936 heart = cast_bytes(uuid)
937
937
938 if content['status'] == 'ok':
938 if content['status'] == 'ok':
939 if heart in self.heartmonitor.hearts:
939 if heart in self.heartmonitor.hearts:
940 # already beating
940 # already beating
941 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
941 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
942 self.finish_registration(heart)
942 self.finish_registration(heart)
943 else:
943 else:
944 purge = lambda : self._purge_stalled_registration(heart)
944 purge = lambda : self._purge_stalled_registration(heart)
945 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
945 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
946 dc.start()
946 dc.start()
947 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
947 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
948 else:
948 else:
949 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
949 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
950
950
951 return eid
951 return eid
952
952
953 def unregister_engine(self, ident, msg):
953 def unregister_engine(self, ident, msg):
954 """Unregister an engine that explicitly requested to leave."""
954 """Unregister an engine that explicitly requested to leave."""
955 try:
955 try:
956 eid = msg['content']['id']
956 eid = msg['content']['id']
957 except:
957 except:
958 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
958 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
959 return
959 return
960 self.log.info("registration::unregister_engine(%r)", eid)
960 self.log.info("registration::unregister_engine(%r)", eid)
961 # print (eid)
961 # print (eid)
962 uuid = self.keytable[eid]
962 uuid = self.keytable[eid]
963 content=dict(id=eid, uuid=uuid)
963 content=dict(id=eid, uuid=uuid)
964 self.dead_engines.add(uuid)
964 self.dead_engines.add(uuid)
965 # self.ids.remove(eid)
965 # self.ids.remove(eid)
966 # uuid = self.keytable.pop(eid)
966 # uuid = self.keytable.pop(eid)
967 #
967 #
968 # ec = self.engines.pop(eid)
968 # ec = self.engines.pop(eid)
969 # self.hearts.pop(ec.heartbeat)
969 # self.hearts.pop(ec.heartbeat)
970 # self.by_ident.pop(ec.queue)
970 # self.by_ident.pop(ec.queue)
971 # self.completed.pop(eid)
971 # self.completed.pop(eid)
972 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
972 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
973 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
973 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
974 dc.start()
974 dc.start()
975 ############## TODO: HANDLE IT ################
975 ############## TODO: HANDLE IT ################
976
976
977 self._save_engine_state()
977 self._save_engine_state()
978
978
979 if self.notifier:
979 if self.notifier:
980 self.session.send(self.notifier, "unregistration_notification", content=content)
980 self.session.send(self.notifier, "unregistration_notification", content=content)
981
981
982 def _handle_stranded_msgs(self, eid, uuid):
982 def _handle_stranded_msgs(self, eid, uuid):
983 """Handle messages known to be on an engine when the engine unregisters.
983 """Handle messages known to be on an engine when the engine unregisters.
984
984
985 It is possible that this will fire prematurely - that is, an engine will
985 It is possible that this will fire prematurely - that is, an engine will
986 go down after completing a result, and the client will be notified
986 go down after completing a result, and the client will be notified
987 that the result failed and later receive the actual result.
987 that the result failed and later receive the actual result.
988 """
988 """
989
989
990 outstanding = self.queues[eid]
990 outstanding = self.queues[eid]
991
991
992 for msg_id in outstanding:
992 for msg_id in outstanding:
993 self.pending.remove(msg_id)
993 self.pending.remove(msg_id)
994 self.all_completed.add(msg_id)
994 self.all_completed.add(msg_id)
995 try:
995 try:
996 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
996 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
997 except:
997 except:
998 content = error.wrap_exception()
998 content = error.wrap_exception()
999 # build a fake header:
999 # build a fake header:
1000 header = {}
1000 header = {}
1001 header['engine'] = uuid
1001 header['engine'] = uuid
1002 header['date'] = datetime.now()
1002 header['date'] = datetime.now()
1003 rec = dict(result_content=content, result_header=header, result_buffers=[])
1003 rec = dict(result_content=content, result_header=header, result_buffers=[])
1004 rec['completed'] = header['date']
1004 rec['completed'] = header['date']
1005 rec['engine_uuid'] = uuid
1005 rec['engine_uuid'] = uuid
1006 try:
1006 try:
1007 self.db.update_record(msg_id, rec)
1007 self.db.update_record(msg_id, rec)
1008 except Exception:
1008 except Exception:
1009 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1009 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1010
1010
1011
1011
1012 def finish_registration(self, heart):
1012 def finish_registration(self, heart):
1013 """Second half of engine registration, called after our HeartMonitor
1013 """Second half of engine registration, called after our HeartMonitor
1014 has received a beat from the Engine's Heart."""
1014 has received a beat from the Engine's Heart."""
1015 try:
1015 try:
1016 ec = self.incoming_registrations.pop(heart)
1016 ec = self.incoming_registrations.pop(heart)
1017 except KeyError:
1017 except KeyError:
1018 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1018 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1019 return
1019 return
1020 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1020 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1021 if ec.stallback is not None:
1021 if ec.stallback is not None:
1022 ec.stallback.stop()
1022 ec.stallback.stop()
1023 eid = ec.id
1023 eid = ec.id
1024 self.ids.add(eid)
1024 self.ids.add(eid)
1025 self.keytable[eid] = ec.uuid
1025 self.keytable[eid] = ec.uuid
1026 self.engines[eid] = ec
1026 self.engines[eid] = ec
1027 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1027 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1028 self.queues[eid] = list()
1028 self.queues[eid] = list()
1029 self.tasks[eid] = list()
1029 self.tasks[eid] = list()
1030 self.completed[eid] = list()
1030 self.completed[eid] = list()
1031 self.hearts[heart] = eid
1031 self.hearts[heart] = eid
1032 content = dict(id=eid, uuid=self.engines[eid].uuid)
1032 content = dict(id=eid, uuid=self.engines[eid].uuid)
1033 if self.notifier:
1033 if self.notifier:
1034 self.session.send(self.notifier, "registration_notification", content=content)
1034 self.session.send(self.notifier, "registration_notification", content=content)
1035 self.log.info("engine::Engine Connected: %i", eid)
1035 self.log.info("engine::Engine Connected: %i", eid)
1036
1036
1037 self._save_engine_state()
1037 self._save_engine_state()
1038
1038
1039 def _purge_stalled_registration(self, heart):
1039 def _purge_stalled_registration(self, heart):
1040 if heart in self.incoming_registrations:
1040 if heart in self.incoming_registrations:
1041 ec = self.incoming_registrations.pop(heart)
1041 ec = self.incoming_registrations.pop(heart)
1042 self.log.info("registration::purging stalled registration: %i", ec.id)
1042 self.log.info("registration::purging stalled registration: %i", ec.id)
1043 else:
1043 else:
1044 pass
1044 pass
1045
1045
1046 #-------------------------------------------------------------------------
1046 #-------------------------------------------------------------------------
1047 # Engine State
1047 # Engine State
1048 #-------------------------------------------------------------------------
1048 #-------------------------------------------------------------------------
1049
1049
1050
1050
1051 def _cleanup_engine_state_file(self):
1051 def _cleanup_engine_state_file(self):
1052 """cleanup engine state mapping"""
1052 """cleanup engine state mapping"""
1053
1053
1054 if os.path.exists(self.engine_state_file):
1054 if os.path.exists(self.engine_state_file):
1055 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1055 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1056 try:
1056 try:
1057 os.remove(self.engine_state_file)
1057 os.remove(self.engine_state_file)
1058 except IOError:
1058 except IOError:
1059 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1059 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1060
1060
1061
1061
1062 def _save_engine_state(self):
1062 def _save_engine_state(self):
1063 """save engine mapping to JSON file"""
1063 """save engine mapping to JSON file"""
1064 if not self.engine_state_file:
1064 if not self.engine_state_file:
1065 return
1065 return
1066 self.log.debug("save engine state to %s" % self.engine_state_file)
1066 self.log.debug("save engine state to %s" % self.engine_state_file)
1067 state = {}
1067 state = {}
1068 engines = {}
1068 engines = {}
1069 for eid, ec in self.engines.iteritems():
1069 for eid, ec in self.engines.iteritems():
1070 if ec.uuid not in self.dead_engines:
1070 if ec.uuid not in self.dead_engines:
1071 engines[eid] = ec.uuid
1071 engines[eid] = ec.uuid
1072
1072
1073 state['engines'] = engines
1073 state['engines'] = engines
1074
1074
1075 state['next_id'] = self._idcounter
1075 state['next_id'] = self._idcounter
1076
1076
1077 with open(self.engine_state_file, 'w') as f:
1077 with open(self.engine_state_file, 'w') as f:
1078 json.dump(state, f)
1078 json.dump(state, f)
1079
1079
1080
1080
1081 def _load_engine_state(self):
1081 def _load_engine_state(self):
1082 """load engine mapping from JSON file"""
1082 """load engine mapping from JSON file"""
1083 if not os.path.exists(self.engine_state_file):
1083 if not os.path.exists(self.engine_state_file):
1084 return
1084 return
1085
1085
1086 self.log.info("loading engine state from %s" % self.engine_state_file)
1086 self.log.info("loading engine state from %s" % self.engine_state_file)
1087
1087
1088 with open(self.engine_state_file) as f:
1088 with open(self.engine_state_file) as f:
1089 state = json.load(f)
1089 state = json.load(f)
1090
1090
1091 save_notifier = self.notifier
1091 save_notifier = self.notifier
1092 self.notifier = None
1092 self.notifier = None
1093 for eid, uuid in state['engines'].iteritems():
1093 for eid, uuid in state['engines'].iteritems():
1094 heart = uuid.encode('ascii')
1094 heart = uuid.encode('ascii')
1095 # start with this heart as current and beating:
1095 # start with this heart as current and beating:
1096 self.heartmonitor.responses.add(heart)
1096 self.heartmonitor.responses.add(heart)
1097 self.heartmonitor.hearts.add(heart)
1097 self.heartmonitor.hearts.add(heart)
1098
1098
1099 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1099 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1100 self.finish_registration(heart)
1100 self.finish_registration(heart)
1101
1101
1102 self.notifier = save_notifier
1102 self.notifier = save_notifier
1103
1103
1104 self._idcounter = state['next_id']
1104 self._idcounter = state['next_id']
1105
1105
1106 #-------------------------------------------------------------------------
1106 #-------------------------------------------------------------------------
1107 # Client Requests
1107 # Client Requests
1108 #-------------------------------------------------------------------------
1108 #-------------------------------------------------------------------------
1109
1109
1110 def shutdown_request(self, client_id, msg):
1110 def shutdown_request(self, client_id, msg):
1111 """handle shutdown request."""
1111 """handle shutdown request."""
1112 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1112 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1113 # also notify other clients of shutdown
1113 # also notify other clients of shutdown
1114 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1114 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1115 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1115 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1116 dc.start()
1116 dc.start()
1117
1117
1118 def _shutdown(self):
1118 def _shutdown(self):
1119 self.log.info("hub::hub shutting down.")
1119 self.log.info("hub::hub shutting down.")
1120 time.sleep(0.1)
1120 time.sleep(0.1)
1121 sys.exit(0)
1121 sys.exit(0)
1122
1122
1123
1123
1124 def check_load(self, client_id, msg):
1124 def check_load(self, client_id, msg):
1125 content = msg['content']
1125 content = msg['content']
1126 try:
1126 try:
1127 targets = content['targets']
1127 targets = content['targets']
1128 targets = self._validate_targets(targets)
1128 targets = self._validate_targets(targets)
1129 except:
1129 except:
1130 content = error.wrap_exception()
1130 content = error.wrap_exception()
1131 self.session.send(self.query, "hub_error",
1131 self.session.send(self.query, "hub_error",
1132 content=content, ident=client_id)
1132 content=content, ident=client_id)
1133 return
1133 return
1134
1134
1135 content = dict(status='ok')
1135 content = dict(status='ok')
1136 # loads = {}
1136 # loads = {}
1137 for t in targets:
1137 for t in targets:
1138 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1138 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1139 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1139 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1140
1140
1141
1141
1142 def queue_status(self, client_id, msg):
1142 def queue_status(self, client_id, msg):
1143 """Return the Queue status of one or more targets.
1143 """Return the Queue status of one or more targets.
1144 if verbose: return the msg_ids
1144 if verbose: return the msg_ids
1145 else: return len of each type.
1145 else: return len of each type.
1146 keys: queue (pending MUX jobs)
1146 keys: queue (pending MUX jobs)
1147 tasks (pending Task jobs)
1147 tasks (pending Task jobs)
1148 completed (finished jobs from both queues)"""
1148 completed (finished jobs from both queues)"""
1149 content = msg['content']
1149 content = msg['content']
1150 targets = content['targets']
1150 targets = content['targets']
1151 try:
1151 try:
1152 targets = self._validate_targets(targets)
1152 targets = self._validate_targets(targets)
1153 except:
1153 except:
1154 content = error.wrap_exception()
1154 content = error.wrap_exception()
1155 self.session.send(self.query, "hub_error",
1155 self.session.send(self.query, "hub_error",
1156 content=content, ident=client_id)
1156 content=content, ident=client_id)
1157 return
1157 return
1158 verbose = content.get('verbose', False)
1158 verbose = content.get('verbose', False)
1159 content = dict(status='ok')
1159 content = dict(status='ok')
1160 for t in targets:
1160 for t in targets:
1161 queue = self.queues[t]
1161 queue = self.queues[t]
1162 completed = self.completed[t]
1162 completed = self.completed[t]
1163 tasks = self.tasks[t]
1163 tasks = self.tasks[t]
1164 if not verbose:
1164 if not verbose:
1165 queue = len(queue)
1165 queue = len(queue)
1166 completed = len(completed)
1166 completed = len(completed)
1167 tasks = len(tasks)
1167 tasks = len(tasks)
1168 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1168 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1169 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1169 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1170 # print (content)
1170 # print (content)
1171 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1171 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1172
1172
1173 def purge_results(self, client_id, msg):
1173 def purge_results(self, client_id, msg):
1174 """Purge results from memory. This method is more valuable before we move
1174 """Purge results from memory. This method is more valuable before we move
1175 to a DB based message storage mechanism."""
1175 to a DB based message storage mechanism."""
1176 content = msg['content']
1176 content = msg['content']
1177 self.log.info("Dropping records with %s", content)
1177 self.log.info("Dropping records with %s", content)
1178 msg_ids = content.get('msg_ids', [])
1178 msg_ids = content.get('msg_ids', [])
1179 reply = dict(status='ok')
1179 reply = dict(status='ok')
1180 if msg_ids == 'all':
1180 if msg_ids == 'all':
1181 try:
1181 try:
1182 self.db.drop_matching_records(dict(completed={'$ne':None}))
1182 self.db.drop_matching_records(dict(completed={'$ne':None}))
1183 except Exception:
1183 except Exception:
1184 reply = error.wrap_exception()
1184 reply = error.wrap_exception()
1185 else:
1185 else:
1186 pending = filter(lambda m: m in self.pending, msg_ids)
1186 pending = filter(lambda m: m in self.pending, msg_ids)
1187 if pending:
1187 if pending:
1188 try:
1188 try:
1189 raise IndexError("msg pending: %r" % pending[0])
1189 raise IndexError("msg pending: %r" % pending[0])
1190 except:
1190 except:
1191 reply = error.wrap_exception()
1191 reply = error.wrap_exception()
1192 else:
1192 else:
1193 try:
1193 try:
1194 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1194 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1195 except Exception:
1195 except Exception:
1196 reply = error.wrap_exception()
1196 reply = error.wrap_exception()
1197
1197
1198 if reply['status'] == 'ok':
1198 if reply['status'] == 'ok':
1199 eids = content.get('engine_ids', [])
1199 eids = content.get('engine_ids', [])
1200 for eid in eids:
1200 for eid in eids:
1201 if eid not in self.engines:
1201 if eid not in self.engines:
1202 try:
1202 try:
1203 raise IndexError("No such engine: %i" % eid)
1203 raise IndexError("No such engine: %i" % eid)
1204 except:
1204 except:
1205 reply = error.wrap_exception()
1205 reply = error.wrap_exception()
1206 break
1206 break
1207 uid = self.engines[eid].uuid
1207 uid = self.engines[eid].uuid
1208 try:
1208 try:
1209 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1209 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1210 except Exception:
1210 except Exception:
1211 reply = error.wrap_exception()
1211 reply = error.wrap_exception()
1212 break
1212 break
1213
1213
1214 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1214 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1215
1215
1216 def resubmit_task(self, client_id, msg):
1216 def resubmit_task(self, client_id, msg):
1217 """Resubmit one or more tasks."""
1217 """Resubmit one or more tasks."""
1218 def finish(reply):
1218 def finish(reply):
1219 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1219 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1220
1220
1221 content = msg['content']
1221 content = msg['content']
1222 msg_ids = content['msg_ids']
1222 msg_ids = content['msg_ids']
1223 reply = dict(status='ok')
1223 reply = dict(status='ok')
1224 try:
1224 try:
1225 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1225 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1226 'header', 'content', 'buffers'])
1226 'header', 'content', 'buffers'])
1227 except Exception:
1227 except Exception:
1228 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1228 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1229 return finish(error.wrap_exception())
1229 return finish(error.wrap_exception())
1230
1230
1231 # validate msg_ids
1231 # validate msg_ids
1232 found_ids = [ rec['msg_id'] for rec in records ]
1232 found_ids = [ rec['msg_id'] for rec in records ]
1233 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1233 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1234 if len(records) > len(msg_ids):
1234 if len(records) > len(msg_ids):
1235 try:
1235 try:
1236 raise RuntimeError("DB appears to be in an inconsistent state."
1236 raise RuntimeError("DB appears to be in an inconsistent state."
1237 "More matching records were found than should exist")
1237 "More matching records were found than should exist")
1238 except Exception:
1238 except Exception:
1239 return finish(error.wrap_exception())
1239 return finish(error.wrap_exception())
1240 elif len(records) < len(msg_ids):
1240 elif len(records) < len(msg_ids):
1241 missing = [ m for m in msg_ids if m not in found_ids ]
1241 missing = [ m for m in msg_ids if m not in found_ids ]
1242 try:
1242 try:
1243 raise KeyError("No such msg(s): %r" % missing)
1243 raise KeyError("No such msg(s): %r" % missing)
1244 except KeyError:
1244 except KeyError:
1245 return finish(error.wrap_exception())
1245 return finish(error.wrap_exception())
1246 elif pending_ids:
1246 elif pending_ids:
1247 pass
1247 pass
1248 # no need to raise on resubmit of pending task, now that we
1248 # no need to raise on resubmit of pending task, now that we
1249 # resubmit under new ID, but do we want to raise anyway?
1249 # resubmit under new ID, but do we want to raise anyway?
1250 # msg_id = invalid_ids[0]
1250 # msg_id = invalid_ids[0]
1251 # try:
1251 # try:
1252 # raise ValueError("Task(s) %r appears to be inflight" % )
1252 # raise ValueError("Task(s) %r appears to be inflight" % )
1253 # except Exception:
1253 # except Exception:
1254 # return finish(error.wrap_exception())
1254 # return finish(error.wrap_exception())
1255
1255
1256 # mapping of original IDs to resubmitted IDs
1256 # mapping of original IDs to resubmitted IDs
1257 resubmitted = {}
1257 resubmitted = {}
1258
1258
1259 # send the messages
1259 # send the messages
1260 for rec in records:
1260 for rec in records:
1261 header = rec['header']
1261 header = rec['header']
1262 msg = self.session.msg(header['msg_type'], parent=header)
1262 msg = self.session.msg(header['msg_type'], parent=header)
1263 msg_id = msg['msg_id']
1263 msg_id = msg['msg_id']
1264 msg['content'] = rec['content']
1264 msg['content'] = rec['content']
1265
1265
1266 # use the old header, but update msg_id and timestamp
1266 # use the old header, but update msg_id and timestamp
1267 fresh = msg['header']
1267 fresh = msg['header']
1268 header['msg_id'] = fresh['msg_id']
1268 header['msg_id'] = fresh['msg_id']
1269 header['date'] = fresh['date']
1269 header['date'] = fresh['date']
1270 msg['header'] = header
1270 msg['header'] = header
1271
1271
1272 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1272 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1273
1273
1274 resubmitted[rec['msg_id']] = msg_id
1274 resubmitted[rec['msg_id']] = msg_id
1275 self.pending.add(msg_id)
1275 self.pending.add(msg_id)
1276 msg['buffers'] = rec['buffers']
1276 msg['buffers'] = rec['buffers']
1277 try:
1277 try:
1278 self.db.add_record(msg_id, init_record(msg))
1278 self.db.add_record(msg_id, init_record(msg))
1279 except Exception:
1279 except Exception:
1280 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1280 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1281 return finish(error.wrap_exception())
1281 return finish(error.wrap_exception())
1282
1282
1283 finish(dict(status='ok', resubmitted=resubmitted))
1283 finish(dict(status='ok', resubmitted=resubmitted))
1284
1284
1285 # store the new IDs in the Task DB
1285 # store the new IDs in the Task DB
1286 for msg_id, resubmit_id in resubmitted.iteritems():
1286 for msg_id, resubmit_id in resubmitted.iteritems():
1287 try:
1287 try:
1288 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1288 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1289 except Exception:
1289 except Exception:
1290 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1290 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1291
1291
1292
1292
1293 def _extract_record(self, rec):
1293 def _extract_record(self, rec):
1294 """decompose a TaskRecord dict into subsection of reply for get_result"""
1294 """decompose a TaskRecord dict into subsection of reply for get_result"""
1295 io_dict = {}
1295 io_dict = {}
1296 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1296 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1297 io_dict[key] = rec[key]
1297 io_dict[key] = rec[key]
1298 content = {
1298 content = {
1299 'header': rec['header'],
1299 'header': rec['header'],
1300 'metadata': rec['metadata'],
1300 'metadata': rec['metadata'],
1301 'result_metadata': rec['result_metadata'],
1301 'result_metadata': rec['result_metadata'],
1302 'result_header' : rec['result_header'],
1302 'result_header' : rec['result_header'],
1303 'result_content': rec['result_content'],
1303 'result_content': rec['result_content'],
1304 'received' : rec['received'],
1304 'received' : rec['received'],
1305 'io' : io_dict,
1305 'io' : io_dict,
1306 }
1306 }
1307 if rec['result_buffers']:
1307 if rec['result_buffers']:
1308 buffers = map(bytes, rec['result_buffers'])
1308 buffers = map(bytes, rec['result_buffers'])
1309 else:
1309 else:
1310 buffers = []
1310 buffers = []
1311
1311
1312 return content, buffers
1312 return content, buffers
1313
1313
1314 def get_results(self, client_id, msg):
1314 def get_results(self, client_id, msg):
1315 """Get the result of 1 or more messages."""
1315 """Get the result of 1 or more messages."""
1316 content = msg['content']
1316 content = msg['content']
1317 msg_ids = sorted(set(content['msg_ids']))
1317 msg_ids = sorted(set(content['msg_ids']))
1318 statusonly = content.get('status_only', False)
1318 statusonly = content.get('status_only', False)
1319 pending = []
1319 pending = []
1320 completed = []
1320 completed = []
1321 content = dict(status='ok')
1321 content = dict(status='ok')
1322 content['pending'] = pending
1322 content['pending'] = pending
1323 content['completed'] = completed
1323 content['completed'] = completed
1324 buffers = []
1324 buffers = []
1325 if not statusonly:
1325 if not statusonly:
1326 try:
1326 try:
1327 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1327 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1328 # turn match list into dict, for faster lookup
1328 # turn match list into dict, for faster lookup
1329 records = {}
1329 records = {}
1330 for rec in matches:
1330 for rec in matches:
1331 records[rec['msg_id']] = rec
1331 records[rec['msg_id']] = rec
1332 except Exception:
1332 except Exception:
1333 content = error.wrap_exception()
1333 content = error.wrap_exception()
1334 self.session.send(self.query, "result_reply", content=content,
1334 self.session.send(self.query, "result_reply", content=content,
1335 parent=msg, ident=client_id)
1335 parent=msg, ident=client_id)
1336 return
1336 return
1337 else:
1337 else:
1338 records = {}
1338 records = {}
1339 for msg_id in msg_ids:
1339 for msg_id in msg_ids:
1340 if msg_id in self.pending:
1340 if msg_id in self.pending:
1341 pending.append(msg_id)
1341 pending.append(msg_id)
1342 elif msg_id in self.all_completed:
1342 elif msg_id in self.all_completed:
1343 completed.append(msg_id)
1343 completed.append(msg_id)
1344 if not statusonly:
1344 if not statusonly:
1345 c,bufs = self._extract_record(records[msg_id])
1345 c,bufs = self._extract_record(records[msg_id])
1346 content[msg_id] = c
1346 content[msg_id] = c
1347 buffers.extend(bufs)
1347 buffers.extend(bufs)
1348 elif msg_id in records:
1348 elif msg_id in records:
1349 if rec['completed']:
1349 if rec['completed']:
1350 completed.append(msg_id)
1350 completed.append(msg_id)
1351 c,bufs = self._extract_record(records[msg_id])
1351 c,bufs = self._extract_record(records[msg_id])
1352 content[msg_id] = c
1352 content[msg_id] = c
1353 buffers.extend(bufs)
1353 buffers.extend(bufs)
1354 else:
1354 else:
1355 pending.append(msg_id)
1355 pending.append(msg_id)
1356 else:
1356 else:
1357 try:
1357 try:
1358 raise KeyError('No such message: '+msg_id)
1358 raise KeyError('No such message: '+msg_id)
1359 except:
1359 except:
1360 content = error.wrap_exception()
1360 content = error.wrap_exception()
1361 break
1361 break
1362 self.session.send(self.query, "result_reply", content=content,
1362 self.session.send(self.query, "result_reply", content=content,
1363 parent=msg, ident=client_id,
1363 parent=msg, ident=client_id,
1364 buffers=buffers)
1364 buffers=buffers)
1365
1365
1366 def get_history(self, client_id, msg):
1366 def get_history(self, client_id, msg):
1367 """Get a list of all msg_ids in our DB records"""
1367 """Get a list of all msg_ids in our DB records"""
1368 try:
1368 try:
1369 msg_ids = self.db.get_history()
1369 msg_ids = self.db.get_history()
1370 except Exception as e:
1370 except Exception as e:
1371 content = error.wrap_exception()
1371 content = error.wrap_exception()
1372 else:
1372 else:
1373 content = dict(status='ok', history=msg_ids)
1373 content = dict(status='ok', history=msg_ids)
1374
1374
1375 self.session.send(self.query, "history_reply", content=content,
1375 self.session.send(self.query, "history_reply", content=content,
1376 parent=msg, ident=client_id)
1376 parent=msg, ident=client_id)
1377
1377
1378 def db_query(self, client_id, msg):
1378 def db_query(self, client_id, msg):
1379 """Perform a raw query on the task record database."""
1379 """Perform a raw query on the task record database."""
1380 content = msg['content']
1380 content = msg['content']
1381 query = content.get('query', {})
1381 query = content.get('query', {})
1382 keys = content.get('keys', None)
1382 keys = content.get('keys', None)
1383 buffers = []
1383 buffers = []
1384 empty = list()
1384 empty = list()
1385 try:
1385 try:
1386 records = self.db.find_records(query, keys)
1386 records = self.db.find_records(query, keys)
1387 except Exception as e:
1387 except Exception as e:
1388 content = error.wrap_exception()
1388 content = error.wrap_exception()
1389 else:
1389 else:
1390 # extract buffers from reply content:
1390 # extract buffers from reply content:
1391 if keys is not None:
1391 if keys is not None:
1392 buffer_lens = [] if 'buffers' in keys else None
1392 buffer_lens = [] if 'buffers' in keys else None
1393 result_buffer_lens = [] if 'result_buffers' in keys else None
1393 result_buffer_lens = [] if 'result_buffers' in keys else None
1394 else:
1394 else:
1395 buffer_lens = None
1395 buffer_lens = None
1396 result_buffer_lens = None
1396 result_buffer_lens = None
1397
1397
1398 for rec in records:
1398 for rec in records:
1399 # buffers may be None, so double check
1399 # buffers may be None, so double check
1400 b = rec.pop('buffers', empty) or empty
1400 b = rec.pop('buffers', empty) or empty
1401 if buffer_lens is not None:
1401 if buffer_lens is not None:
1402 buffer_lens.append(len(b))
1402 buffer_lens.append(len(b))
1403 buffers.extend(b)
1403 buffers.extend(b)
1404 rb = rec.pop('result_buffers', empty) or empty
1404 rb = rec.pop('result_buffers', empty) or empty
1405 if result_buffer_lens is not None:
1405 if result_buffer_lens is not None:
1406 result_buffer_lens.append(len(rb))
1406 result_buffer_lens.append(len(rb))
1407 buffers.extend(rb)
1407 buffers.extend(rb)
1408 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1408 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1409 result_buffer_lens=result_buffer_lens)
1409 result_buffer_lens=result_buffer_lens)
1410 # self.log.debug (content)
1410 # self.log.debug (content)
1411 self.session.send(self.query, "db_reply", content=content,
1411 self.session.send(self.query, "db_reply", content=content,
1412 parent=msg, ident=client_id,
1412 parent=msg, ident=client_id,
1413 buffers=buffers)
1413 buffers=buffers)
1414
1414
General Comments 0
You need to be logged in to leave comments. Login now