##// END OF EJS Templates
ignore data_pub in Hub
MinRK -
Show More
@@ -1,1412 +1,1414 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import json
21 import json
22 import os
22 import os
23 import sys
23 import sys
24 import time
24 import time
25 from datetime import datetime
25 from datetime import datetime
26
26
27 import zmq
27 import zmq
28 from zmq.eventloop import ioloop
28 from zmq.eventloop import ioloop
29 from zmq.eventloop.zmqstream import ZMQStream
29 from zmq.eventloop.zmqstream import ZMQStream
30
30
31 # internal:
31 # internal:
32 from IPython.utils.importstring import import_item
32 from IPython.utils.importstring import import_item
33 from IPython.utils.py3compat import cast_bytes
33 from IPython.utils.py3compat import cast_bytes
34 from IPython.utils.traitlets import (
34 from IPython.utils.traitlets import (
35 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
35 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
36 )
36 )
37
37
38 from IPython.parallel import error, util
38 from IPython.parallel import error, util
39 from IPython.parallel.factory import RegistrationFactory
39 from IPython.parallel.factory import RegistrationFactory
40
40
41 from IPython.zmq.session import SessionFactory
41 from IPython.zmq.session import SessionFactory
42
42
43 from .heartmonitor import HeartMonitor
43 from .heartmonitor import HeartMonitor
44
44
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46 # Code
46 # Code
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48
48
49 def _passer(*args, **kwargs):
49 def _passer(*args, **kwargs):
50 return
50 return
51
51
52 def _printer(*args, **kwargs):
52 def _printer(*args, **kwargs):
53 print (args)
53 print (args)
54 print (kwargs)
54 print (kwargs)
55
55
56 def empty_record():
56 def empty_record():
57 """Return an empty dict with all record keys."""
57 """Return an empty dict with all record keys."""
58 return {
58 return {
59 'msg_id' : None,
59 'msg_id' : None,
60 'header' : None,
60 'header' : None,
61 'metadata' : None,
61 'metadata' : None,
62 'content': None,
62 'content': None,
63 'buffers': None,
63 'buffers': None,
64 'submitted': None,
64 'submitted': None,
65 'client_uuid' : None,
65 'client_uuid' : None,
66 'engine_uuid' : None,
66 'engine_uuid' : None,
67 'started': None,
67 'started': None,
68 'completed': None,
68 'completed': None,
69 'resubmitted': None,
69 'resubmitted': None,
70 'received': None,
70 'received': None,
71 'result_header' : None,
71 'result_header' : None,
72 'result_metadata' : None,
72 'result_metadata' : None,
73 'result_content' : None,
73 'result_content' : None,
74 'result_buffers' : None,
74 'result_buffers' : None,
75 'queue' : None,
75 'queue' : None,
76 'pyin' : None,
76 'pyin' : None,
77 'pyout': None,
77 'pyout': None,
78 'pyerr': None,
78 'pyerr': None,
79 'stdout': '',
79 'stdout': '',
80 'stderr': '',
80 'stderr': '',
81 }
81 }
82
82
83 def init_record(msg):
83 def init_record(msg):
84 """Initialize a TaskRecord based on a request."""
84 """Initialize a TaskRecord based on a request."""
85 header = msg['header']
85 header = msg['header']
86 return {
86 return {
87 'msg_id' : header['msg_id'],
87 'msg_id' : header['msg_id'],
88 'header' : header,
88 'header' : header,
89 'content': msg['content'],
89 'content': msg['content'],
90 'metadata': msg['metadata'],
90 'metadata': msg['metadata'],
91 'buffers': msg['buffers'],
91 'buffers': msg['buffers'],
92 'submitted': header['date'],
92 'submitted': header['date'],
93 'client_uuid' : None,
93 'client_uuid' : None,
94 'engine_uuid' : None,
94 'engine_uuid' : None,
95 'started': None,
95 'started': None,
96 'completed': None,
96 'completed': None,
97 'resubmitted': None,
97 'resubmitted': None,
98 'received': None,
98 'received': None,
99 'result_header' : None,
99 'result_header' : None,
100 'result_metadata': None,
100 'result_metadata': None,
101 'result_content' : None,
101 'result_content' : None,
102 'result_buffers' : None,
102 'result_buffers' : None,
103 'queue' : None,
103 'queue' : None,
104 'pyin' : None,
104 'pyin' : None,
105 'pyout': None,
105 'pyout': None,
106 'pyerr': None,
106 'pyerr': None,
107 'stdout': '',
107 'stdout': '',
108 'stderr': '',
108 'stderr': '',
109 }
109 }
110
110
111
111
112 class EngineConnector(HasTraits):
112 class EngineConnector(HasTraits):
113 """A simple object for accessing the various zmq connections of an object.
113 """A simple object for accessing the various zmq connections of an object.
114 Attributes are:
114 Attributes are:
115 id (int): engine ID
115 id (int): engine ID
116 uuid (unicode): engine UUID
116 uuid (unicode): engine UUID
117 pending: set of msg_ids
117 pending: set of msg_ids
118 stallback: DelayedCallback for stalled registration
118 stallback: DelayedCallback for stalled registration
119 """
119 """
120
120
121 id = Integer(0)
121 id = Integer(0)
122 uuid = Unicode()
122 uuid = Unicode()
123 pending = Set()
123 pending = Set()
124 stallback = Instance(ioloop.DelayedCallback)
124 stallback = Instance(ioloop.DelayedCallback)
125
125
126
126
127 _db_shortcuts = {
127 _db_shortcuts = {
128 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
128 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
129 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
129 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
130 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
130 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
131 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
131 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
132 }
132 }
133
133
134 class HubFactory(RegistrationFactory):
134 class HubFactory(RegistrationFactory):
135 """The Configurable for setting up a Hub."""
135 """The Configurable for setting up a Hub."""
136
136
137 # port-pairs for monitoredqueues:
137 # port-pairs for monitoredqueues:
138 hb = Tuple(Integer,Integer,config=True,
138 hb = Tuple(Integer,Integer,config=True,
139 help="""PUB/ROUTER Port pair for Engine heartbeats""")
139 help="""PUB/ROUTER Port pair for Engine heartbeats""")
140 def _hb_default(self):
140 def _hb_default(self):
141 return tuple(util.select_random_ports(2))
141 return tuple(util.select_random_ports(2))
142
142
143 mux = Tuple(Integer,Integer,config=True,
143 mux = Tuple(Integer,Integer,config=True,
144 help="""Client/Engine Port pair for MUX queue""")
144 help="""Client/Engine Port pair for MUX queue""")
145
145
146 def _mux_default(self):
146 def _mux_default(self):
147 return tuple(util.select_random_ports(2))
147 return tuple(util.select_random_ports(2))
148
148
149 task = Tuple(Integer,Integer,config=True,
149 task = Tuple(Integer,Integer,config=True,
150 help="""Client/Engine Port pair for Task queue""")
150 help="""Client/Engine Port pair for Task queue""")
151 def _task_default(self):
151 def _task_default(self):
152 return tuple(util.select_random_ports(2))
152 return tuple(util.select_random_ports(2))
153
153
154 control = Tuple(Integer,Integer,config=True,
154 control = Tuple(Integer,Integer,config=True,
155 help="""Client/Engine Port pair for Control queue""")
155 help="""Client/Engine Port pair for Control queue""")
156
156
157 def _control_default(self):
157 def _control_default(self):
158 return tuple(util.select_random_ports(2))
158 return tuple(util.select_random_ports(2))
159
159
160 iopub = Tuple(Integer,Integer,config=True,
160 iopub = Tuple(Integer,Integer,config=True,
161 help="""Client/Engine Port pair for IOPub relay""")
161 help="""Client/Engine Port pair for IOPub relay""")
162
162
163 def _iopub_default(self):
163 def _iopub_default(self):
164 return tuple(util.select_random_ports(2))
164 return tuple(util.select_random_ports(2))
165
165
166 # single ports:
166 # single ports:
167 mon_port = Integer(config=True,
167 mon_port = Integer(config=True,
168 help="""Monitor (SUB) port for queue traffic""")
168 help="""Monitor (SUB) port for queue traffic""")
169
169
170 def _mon_port_default(self):
170 def _mon_port_default(self):
171 return util.select_random_ports(1)[0]
171 return util.select_random_ports(1)[0]
172
172
173 notifier_port = Integer(config=True,
173 notifier_port = Integer(config=True,
174 help="""PUB port for sending engine status notifications""")
174 help="""PUB port for sending engine status notifications""")
175
175
176 def _notifier_port_default(self):
176 def _notifier_port_default(self):
177 return util.select_random_ports(1)[0]
177 return util.select_random_ports(1)[0]
178
178
179 engine_ip = Unicode('127.0.0.1', config=True,
179 engine_ip = Unicode('127.0.0.1', config=True,
180 help="IP on which to listen for engine connections. [default: loopback]")
180 help="IP on which to listen for engine connections. [default: loopback]")
181 engine_transport = Unicode('tcp', config=True,
181 engine_transport = Unicode('tcp', config=True,
182 help="0MQ transport for engine connections. [default: tcp]")
182 help="0MQ transport for engine connections. [default: tcp]")
183
183
184 client_ip = Unicode('127.0.0.1', config=True,
184 client_ip = Unicode('127.0.0.1', config=True,
185 help="IP on which to listen for client connections. [default: loopback]")
185 help="IP on which to listen for client connections. [default: loopback]")
186 client_transport = Unicode('tcp', config=True,
186 client_transport = Unicode('tcp', config=True,
187 help="0MQ transport for client connections. [default : tcp]")
187 help="0MQ transport for client connections. [default : tcp]")
188
188
189 monitor_ip = Unicode('127.0.0.1', config=True,
189 monitor_ip = Unicode('127.0.0.1', config=True,
190 help="IP on which to listen for monitor messages. [default: loopback]")
190 help="IP on which to listen for monitor messages. [default: loopback]")
191 monitor_transport = Unicode('tcp', config=True,
191 monitor_transport = Unicode('tcp', config=True,
192 help="0MQ transport for monitor messages. [default : tcp]")
192 help="0MQ transport for monitor messages. [default : tcp]")
193
193
194 monitor_url = Unicode('')
194 monitor_url = Unicode('')
195
195
196 db_class = DottedObjectName('NoDB',
196 db_class = DottedObjectName('NoDB',
197 config=True, help="""The class to use for the DB backend
197 config=True, help="""The class to use for the DB backend
198
198
199 Options include:
199 Options include:
200
200
201 SQLiteDB: SQLite
201 SQLiteDB: SQLite
202 MongoDB : use MongoDB
202 MongoDB : use MongoDB
203 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
203 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
204 NoDB : disable database altogether (default)
204 NoDB : disable database altogether (default)
205
205
206 """)
206 """)
207
207
208 # not configurable
208 # not configurable
209 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
209 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
210 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
210 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
211
211
212 def _ip_changed(self, name, old, new):
212 def _ip_changed(self, name, old, new):
213 self.engine_ip = new
213 self.engine_ip = new
214 self.client_ip = new
214 self.client_ip = new
215 self.monitor_ip = new
215 self.monitor_ip = new
216 self._update_monitor_url()
216 self._update_monitor_url()
217
217
218 def _update_monitor_url(self):
218 def _update_monitor_url(self):
219 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
219 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
220
220
221 def _transport_changed(self, name, old, new):
221 def _transport_changed(self, name, old, new):
222 self.engine_transport = new
222 self.engine_transport = new
223 self.client_transport = new
223 self.client_transport = new
224 self.monitor_transport = new
224 self.monitor_transport = new
225 self._update_monitor_url()
225 self._update_monitor_url()
226
226
227 def __init__(self, **kwargs):
227 def __init__(self, **kwargs):
228 super(HubFactory, self).__init__(**kwargs)
228 super(HubFactory, self).__init__(**kwargs)
229 self._update_monitor_url()
229 self._update_monitor_url()
230
230
231
231
232 def construct(self):
232 def construct(self):
233 self.init_hub()
233 self.init_hub()
234
234
235 def start(self):
235 def start(self):
236 self.heartmonitor.start()
236 self.heartmonitor.start()
237 self.log.info("Heartmonitor started")
237 self.log.info("Heartmonitor started")
238
238
239 def client_url(self, channel):
239 def client_url(self, channel):
240 """return full zmq url for a named client channel"""
240 """return full zmq url for a named client channel"""
241 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
241 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
242
242
243 def engine_url(self, channel):
243 def engine_url(self, channel):
244 """return full zmq url for a named engine channel"""
244 """return full zmq url for a named engine channel"""
245 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
245 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
246
246
247 def init_hub(self):
247 def init_hub(self):
248 """construct Hub object"""
248 """construct Hub object"""
249
249
250 ctx = self.context
250 ctx = self.context
251 loop = self.loop
251 loop = self.loop
252
252
253 try:
253 try:
254 scheme = self.config.TaskScheduler.scheme_name
254 scheme = self.config.TaskScheduler.scheme_name
255 except AttributeError:
255 except AttributeError:
256 from .scheduler import TaskScheduler
256 from .scheduler import TaskScheduler
257 scheme = TaskScheduler.scheme_name.get_default_value()
257 scheme = TaskScheduler.scheme_name.get_default_value()
258
258
259 # build connection dicts
259 # build connection dicts
260 engine = self.engine_info = {
260 engine = self.engine_info = {
261 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
261 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
262 'registration' : self.regport,
262 'registration' : self.regport,
263 'control' : self.control[1],
263 'control' : self.control[1],
264 'mux' : self.mux[1],
264 'mux' : self.mux[1],
265 'hb_ping' : self.hb[0],
265 'hb_ping' : self.hb[0],
266 'hb_pong' : self.hb[1],
266 'hb_pong' : self.hb[1],
267 'task' : self.task[1],
267 'task' : self.task[1],
268 'iopub' : self.iopub[1],
268 'iopub' : self.iopub[1],
269 }
269 }
270
270
271 client = self.client_info = {
271 client = self.client_info = {
272 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
272 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
273 'registration' : self.regport,
273 'registration' : self.regport,
274 'control' : self.control[0],
274 'control' : self.control[0],
275 'mux' : self.mux[0],
275 'mux' : self.mux[0],
276 'task' : self.task[0],
276 'task' : self.task[0],
277 'task_scheme' : scheme,
277 'task_scheme' : scheme,
278 'iopub' : self.iopub[0],
278 'iopub' : self.iopub[0],
279 'notification' : self.notifier_port,
279 'notification' : self.notifier_port,
280 }
280 }
281
281
282 self.log.debug("Hub engine addrs: %s", self.engine_info)
282 self.log.debug("Hub engine addrs: %s", self.engine_info)
283 self.log.debug("Hub client addrs: %s", self.client_info)
283 self.log.debug("Hub client addrs: %s", self.client_info)
284
284
285 # Registrar socket
285 # Registrar socket
286 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
286 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
287 q.bind(self.client_url('registration'))
287 q.bind(self.client_url('registration'))
288 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
288 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
289 if self.client_ip != self.engine_ip:
289 if self.client_ip != self.engine_ip:
290 q.bind(self.engine_url('registration'))
290 q.bind(self.engine_url('registration'))
291 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
291 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
292
292
293 ### Engine connections ###
293 ### Engine connections ###
294
294
295 # heartbeat
295 # heartbeat
296 hpub = ctx.socket(zmq.PUB)
296 hpub = ctx.socket(zmq.PUB)
297 hpub.bind(self.engine_url('hb_ping'))
297 hpub.bind(self.engine_url('hb_ping'))
298 hrep = ctx.socket(zmq.ROUTER)
298 hrep = ctx.socket(zmq.ROUTER)
299 hrep.bind(self.engine_url('hb_pong'))
299 hrep.bind(self.engine_url('hb_pong'))
300 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
300 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
301 pingstream=ZMQStream(hpub,loop),
301 pingstream=ZMQStream(hpub,loop),
302 pongstream=ZMQStream(hrep,loop)
302 pongstream=ZMQStream(hrep,loop)
303 )
303 )
304
304
305 ### Client connections ###
305 ### Client connections ###
306
306
307 # Notifier socket
307 # Notifier socket
308 n = ZMQStream(ctx.socket(zmq.PUB), loop)
308 n = ZMQStream(ctx.socket(zmq.PUB), loop)
309 n.bind(self.client_url('notification'))
309 n.bind(self.client_url('notification'))
310
310
311 ### build and launch the queues ###
311 ### build and launch the queues ###
312
312
313 # monitor socket
313 # monitor socket
314 sub = ctx.socket(zmq.SUB)
314 sub = ctx.socket(zmq.SUB)
315 sub.setsockopt(zmq.SUBSCRIBE, b"")
315 sub.setsockopt(zmq.SUBSCRIBE, b"")
316 sub.bind(self.monitor_url)
316 sub.bind(self.monitor_url)
317 sub.bind('inproc://monitor')
317 sub.bind('inproc://monitor')
318 sub = ZMQStream(sub, loop)
318 sub = ZMQStream(sub, loop)
319
319
320 # connect the db
320 # connect the db
321 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
321 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
322 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
322 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
323 self.db = import_item(str(db_class))(session=self.session.session,
323 self.db = import_item(str(db_class))(session=self.session.session,
324 config=self.config, log=self.log)
324 config=self.config, log=self.log)
325 time.sleep(.25)
325 time.sleep(.25)
326
326
327 # resubmit stream
327 # resubmit stream
328 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
328 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
329 url = util.disambiguate_url(self.client_url('task'))
329 url = util.disambiguate_url(self.client_url('task'))
330 r.connect(url)
330 r.connect(url)
331
331
332 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
332 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
333 query=q, notifier=n, resubmit=r, db=self.db,
333 query=q, notifier=n, resubmit=r, db=self.db,
334 engine_info=self.engine_info, client_info=self.client_info,
334 engine_info=self.engine_info, client_info=self.client_info,
335 log=self.log)
335 log=self.log)
336
336
337
337
338 class Hub(SessionFactory):
338 class Hub(SessionFactory):
339 """The IPython Controller Hub with 0MQ connections
339 """The IPython Controller Hub with 0MQ connections
340
340
341 Parameters
341 Parameters
342 ==========
342 ==========
343 loop: zmq IOLoop instance
343 loop: zmq IOLoop instance
344 session: Session object
344 session: Session object
345 <removed> context: zmq context for creating new connections (?)
345 <removed> context: zmq context for creating new connections (?)
346 queue: ZMQStream for monitoring the command queue (SUB)
346 queue: ZMQStream for monitoring the command queue (SUB)
347 query: ZMQStream for engine registration and client queries requests (ROUTER)
347 query: ZMQStream for engine registration and client queries requests (ROUTER)
348 heartbeat: HeartMonitor object checking the pulse of the engines
348 heartbeat: HeartMonitor object checking the pulse of the engines
349 notifier: ZMQStream for broadcasting engine registration changes (PUB)
349 notifier: ZMQStream for broadcasting engine registration changes (PUB)
350 db: connection to db for out of memory logging of commands
350 db: connection to db for out of memory logging of commands
351 NotImplemented
351 NotImplemented
352 engine_info: dict of zmq connection information for engines to connect
352 engine_info: dict of zmq connection information for engines to connect
353 to the queues.
353 to the queues.
354 client_info: dict of zmq connection information for engines to connect
354 client_info: dict of zmq connection information for engines to connect
355 to the queues.
355 to the queues.
356 """
356 """
357
357
358 engine_state_file = Unicode()
358 engine_state_file = Unicode()
359
359
360 # internal data structures:
360 # internal data structures:
361 ids=Set() # engine IDs
361 ids=Set() # engine IDs
362 keytable=Dict()
362 keytable=Dict()
363 by_ident=Dict()
363 by_ident=Dict()
364 engines=Dict()
364 engines=Dict()
365 clients=Dict()
365 clients=Dict()
366 hearts=Dict()
366 hearts=Dict()
367 pending=Set()
367 pending=Set()
368 queues=Dict() # pending msg_ids keyed by engine_id
368 queues=Dict() # pending msg_ids keyed by engine_id
369 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
369 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
370 completed=Dict() # completed msg_ids keyed by engine_id
370 completed=Dict() # completed msg_ids keyed by engine_id
371 all_completed=Set() # completed msg_ids keyed by engine_id
371 all_completed=Set() # completed msg_ids keyed by engine_id
372 dead_engines=Set() # completed msg_ids keyed by engine_id
372 dead_engines=Set() # completed msg_ids keyed by engine_id
373 unassigned=Set() # set of task msg_ds not yet assigned a destination
373 unassigned=Set() # set of task msg_ds not yet assigned a destination
374 incoming_registrations=Dict()
374 incoming_registrations=Dict()
375 registration_timeout=Integer()
375 registration_timeout=Integer()
376 _idcounter=Integer(0)
376 _idcounter=Integer(0)
377
377
378 # objects from constructor:
378 # objects from constructor:
379 query=Instance(ZMQStream)
379 query=Instance(ZMQStream)
380 monitor=Instance(ZMQStream)
380 monitor=Instance(ZMQStream)
381 notifier=Instance(ZMQStream)
381 notifier=Instance(ZMQStream)
382 resubmit=Instance(ZMQStream)
382 resubmit=Instance(ZMQStream)
383 heartmonitor=Instance(HeartMonitor)
383 heartmonitor=Instance(HeartMonitor)
384 db=Instance(object)
384 db=Instance(object)
385 client_info=Dict()
385 client_info=Dict()
386 engine_info=Dict()
386 engine_info=Dict()
387
387
388
388
389 def __init__(self, **kwargs):
389 def __init__(self, **kwargs):
390 """
390 """
391 # universal:
391 # universal:
392 loop: IOLoop for creating future connections
392 loop: IOLoop for creating future connections
393 session: streamsession for sending serialized data
393 session: streamsession for sending serialized data
394 # engine:
394 # engine:
395 queue: ZMQStream for monitoring queue messages
395 queue: ZMQStream for monitoring queue messages
396 query: ZMQStream for engine+client registration and client requests
396 query: ZMQStream for engine+client registration and client requests
397 heartbeat: HeartMonitor object for tracking engines
397 heartbeat: HeartMonitor object for tracking engines
398 # extra:
398 # extra:
399 db: ZMQStream for db connection (NotImplemented)
399 db: ZMQStream for db connection (NotImplemented)
400 engine_info: zmq address/protocol dict for engine connections
400 engine_info: zmq address/protocol dict for engine connections
401 client_info: zmq address/protocol dict for client connections
401 client_info: zmq address/protocol dict for client connections
402 """
402 """
403
403
404 super(Hub, self).__init__(**kwargs)
404 super(Hub, self).__init__(**kwargs)
405 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
405 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
406
406
407 # register our callbacks
407 # register our callbacks
408 self.query.on_recv(self.dispatch_query)
408 self.query.on_recv(self.dispatch_query)
409 self.monitor.on_recv(self.dispatch_monitor_traffic)
409 self.monitor.on_recv(self.dispatch_monitor_traffic)
410
410
411 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
411 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
412 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
412 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
413
413
414 self.monitor_handlers = {b'in' : self.save_queue_request,
414 self.monitor_handlers = {b'in' : self.save_queue_request,
415 b'out': self.save_queue_result,
415 b'out': self.save_queue_result,
416 b'intask': self.save_task_request,
416 b'intask': self.save_task_request,
417 b'outtask': self.save_task_result,
417 b'outtask': self.save_task_result,
418 b'tracktask': self.save_task_destination,
418 b'tracktask': self.save_task_destination,
419 b'incontrol': _passer,
419 b'incontrol': _passer,
420 b'outcontrol': _passer,
420 b'outcontrol': _passer,
421 b'iopub': self.save_iopub_message,
421 b'iopub': self.save_iopub_message,
422 }
422 }
423
423
424 self.query_handlers = {'queue_request': self.queue_status,
424 self.query_handlers = {'queue_request': self.queue_status,
425 'result_request': self.get_results,
425 'result_request': self.get_results,
426 'history_request': self.get_history,
426 'history_request': self.get_history,
427 'db_request': self.db_query,
427 'db_request': self.db_query,
428 'purge_request': self.purge_results,
428 'purge_request': self.purge_results,
429 'load_request': self.check_load,
429 'load_request': self.check_load,
430 'resubmit_request': self.resubmit_task,
430 'resubmit_request': self.resubmit_task,
431 'shutdown_request': self.shutdown_request,
431 'shutdown_request': self.shutdown_request,
432 'registration_request' : self.register_engine,
432 'registration_request' : self.register_engine,
433 'unregistration_request' : self.unregister_engine,
433 'unregistration_request' : self.unregister_engine,
434 'connection_request': self.connection_request,
434 'connection_request': self.connection_request,
435 }
435 }
436
436
437 # ignore resubmit replies
437 # ignore resubmit replies
438 self.resubmit.on_recv(lambda msg: None, copy=False)
438 self.resubmit.on_recv(lambda msg: None, copy=False)
439
439
440 self.log.info("hub::created hub")
440 self.log.info("hub::created hub")
441
441
442 @property
442 @property
443 def _next_id(self):
443 def _next_id(self):
444 """gemerate a new ID.
444 """gemerate a new ID.
445
445
446 No longer reuse old ids, just count from 0."""
446 No longer reuse old ids, just count from 0."""
447 newid = self._idcounter
447 newid = self._idcounter
448 self._idcounter += 1
448 self._idcounter += 1
449 return newid
449 return newid
450 # newid = 0
450 # newid = 0
451 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
451 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
452 # # print newid, self.ids, self.incoming_registrations
452 # # print newid, self.ids, self.incoming_registrations
453 # while newid in self.ids or newid in incoming:
453 # while newid in self.ids or newid in incoming:
454 # newid += 1
454 # newid += 1
455 # return newid
455 # return newid
456
456
457 #-----------------------------------------------------------------------------
457 #-----------------------------------------------------------------------------
458 # message validation
458 # message validation
459 #-----------------------------------------------------------------------------
459 #-----------------------------------------------------------------------------
460
460
461 def _validate_targets(self, targets):
461 def _validate_targets(self, targets):
462 """turn any valid targets argument into a list of integer ids"""
462 """turn any valid targets argument into a list of integer ids"""
463 if targets is None:
463 if targets is None:
464 # default to all
464 # default to all
465 return self.ids
465 return self.ids
466
466
467 if isinstance(targets, (int,str,unicode)):
467 if isinstance(targets, (int,str,unicode)):
468 # only one target specified
468 # only one target specified
469 targets = [targets]
469 targets = [targets]
470 _targets = []
470 _targets = []
471 for t in targets:
471 for t in targets:
472 # map raw identities to ids
472 # map raw identities to ids
473 if isinstance(t, (str,unicode)):
473 if isinstance(t, (str,unicode)):
474 t = self.by_ident.get(cast_bytes(t), t)
474 t = self.by_ident.get(cast_bytes(t), t)
475 _targets.append(t)
475 _targets.append(t)
476 targets = _targets
476 targets = _targets
477 bad_targets = [ t for t in targets if t not in self.ids ]
477 bad_targets = [ t for t in targets if t not in self.ids ]
478 if bad_targets:
478 if bad_targets:
479 raise IndexError("No Such Engine: %r" % bad_targets)
479 raise IndexError("No Such Engine: %r" % bad_targets)
480 if not targets:
480 if not targets:
481 raise IndexError("No Engines Registered")
481 raise IndexError("No Engines Registered")
482 return targets
482 return targets
483
483
484 #-----------------------------------------------------------------------------
484 #-----------------------------------------------------------------------------
485 # dispatch methods (1 per stream)
485 # dispatch methods (1 per stream)
486 #-----------------------------------------------------------------------------
486 #-----------------------------------------------------------------------------
487
487
488
488
489 @util.log_errors
489 @util.log_errors
490 def dispatch_monitor_traffic(self, msg):
490 def dispatch_monitor_traffic(self, msg):
491 """all ME and Task queue messages come through here, as well as
491 """all ME and Task queue messages come through here, as well as
492 IOPub traffic."""
492 IOPub traffic."""
493 self.log.debug("monitor traffic: %r", msg[0])
493 self.log.debug("monitor traffic: %r", msg[0])
494 switch = msg[0]
494 switch = msg[0]
495 try:
495 try:
496 idents, msg = self.session.feed_identities(msg[1:])
496 idents, msg = self.session.feed_identities(msg[1:])
497 except ValueError:
497 except ValueError:
498 idents=[]
498 idents=[]
499 if not idents:
499 if not idents:
500 self.log.error("Monitor message without topic: %r", msg)
500 self.log.error("Monitor message without topic: %r", msg)
501 return
501 return
502 handler = self.monitor_handlers.get(switch, None)
502 handler = self.monitor_handlers.get(switch, None)
503 if handler is not None:
503 if handler is not None:
504 handler(idents, msg)
504 handler(idents, msg)
505 else:
505 else:
506 self.log.error("Unrecognized monitor topic: %r", switch)
506 self.log.error("Unrecognized monitor topic: %r", switch)
507
507
508
508
509 @util.log_errors
509 @util.log_errors
510 def dispatch_query(self, msg):
510 def dispatch_query(self, msg):
511 """Route registration requests and queries from clients."""
511 """Route registration requests and queries from clients."""
512 try:
512 try:
513 idents, msg = self.session.feed_identities(msg)
513 idents, msg = self.session.feed_identities(msg)
514 except ValueError:
514 except ValueError:
515 idents = []
515 idents = []
516 if not idents:
516 if not idents:
517 self.log.error("Bad Query Message: %r", msg)
517 self.log.error("Bad Query Message: %r", msg)
518 return
518 return
519 client_id = idents[0]
519 client_id = idents[0]
520 try:
520 try:
521 msg = self.session.unserialize(msg, content=True)
521 msg = self.session.unserialize(msg, content=True)
522 except Exception:
522 except Exception:
523 content = error.wrap_exception()
523 content = error.wrap_exception()
524 self.log.error("Bad Query Message: %r", msg, exc_info=True)
524 self.log.error("Bad Query Message: %r", msg, exc_info=True)
525 self.session.send(self.query, "hub_error", ident=client_id,
525 self.session.send(self.query, "hub_error", ident=client_id,
526 content=content)
526 content=content)
527 return
527 return
528 # print client_id, header, parent, content
528 # print client_id, header, parent, content
529 #switch on message type:
529 #switch on message type:
530 msg_type = msg['header']['msg_type']
530 msg_type = msg['header']['msg_type']
531 self.log.info("client::client %r requested %r", client_id, msg_type)
531 self.log.info("client::client %r requested %r", client_id, msg_type)
532 handler = self.query_handlers.get(msg_type, None)
532 handler = self.query_handlers.get(msg_type, None)
533 try:
533 try:
534 assert handler is not None, "Bad Message Type: %r" % msg_type
534 assert handler is not None, "Bad Message Type: %r" % msg_type
535 except:
535 except:
536 content = error.wrap_exception()
536 content = error.wrap_exception()
537 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
537 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
538 self.session.send(self.query, "hub_error", ident=client_id,
538 self.session.send(self.query, "hub_error", ident=client_id,
539 content=content)
539 content=content)
540 return
540 return
541
541
542 else:
542 else:
543 handler(idents, msg)
543 handler(idents, msg)
544
544
545 def dispatch_db(self, msg):
545 def dispatch_db(self, msg):
546 """"""
546 """"""
547 raise NotImplementedError
547 raise NotImplementedError
548
548
549 #---------------------------------------------------------------------------
549 #---------------------------------------------------------------------------
550 # handler methods (1 per event)
550 # handler methods (1 per event)
551 #---------------------------------------------------------------------------
551 #---------------------------------------------------------------------------
552
552
553 #----------------------- Heartbeat --------------------------------------
553 #----------------------- Heartbeat --------------------------------------
554
554
555 def handle_new_heart(self, heart):
555 def handle_new_heart(self, heart):
556 """handler to attach to heartbeater.
556 """handler to attach to heartbeater.
557 Called when a new heart starts to beat.
557 Called when a new heart starts to beat.
558 Triggers completion of registration."""
558 Triggers completion of registration."""
559 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
559 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
560 if heart not in self.incoming_registrations:
560 if heart not in self.incoming_registrations:
561 self.log.info("heartbeat::ignoring new heart: %r", heart)
561 self.log.info("heartbeat::ignoring new heart: %r", heart)
562 else:
562 else:
563 self.finish_registration(heart)
563 self.finish_registration(heart)
564
564
565
565
566 def handle_heart_failure(self, heart):
566 def handle_heart_failure(self, heart):
567 """handler to attach to heartbeater.
567 """handler to attach to heartbeater.
568 called when a previously registered heart fails to respond to beat request.
568 called when a previously registered heart fails to respond to beat request.
569 triggers unregistration"""
569 triggers unregistration"""
570 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
570 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
571 eid = self.hearts.get(heart, None)
571 eid = self.hearts.get(heart, None)
572 uuid = self.engines[eid].uuid
572 uuid = self.engines[eid].uuid
573 if eid is None or self.keytable[eid] in self.dead_engines:
573 if eid is None or self.keytable[eid] in self.dead_engines:
574 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
574 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
575 else:
575 else:
576 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
576 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
577
577
578 #----------------------- MUX Queue Traffic ------------------------------
578 #----------------------- MUX Queue Traffic ------------------------------
579
579
580 def save_queue_request(self, idents, msg):
580 def save_queue_request(self, idents, msg):
581 if len(idents) < 2:
581 if len(idents) < 2:
582 self.log.error("invalid identity prefix: %r", idents)
582 self.log.error("invalid identity prefix: %r", idents)
583 return
583 return
584 queue_id, client_id = idents[:2]
584 queue_id, client_id = idents[:2]
585 try:
585 try:
586 msg = self.session.unserialize(msg)
586 msg = self.session.unserialize(msg)
587 except Exception:
587 except Exception:
588 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
588 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
589 return
589 return
590
590
591 eid = self.by_ident.get(queue_id, None)
591 eid = self.by_ident.get(queue_id, None)
592 if eid is None:
592 if eid is None:
593 self.log.error("queue::target %r not registered", queue_id)
593 self.log.error("queue::target %r not registered", queue_id)
594 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
594 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
595 return
595 return
596 record = init_record(msg)
596 record = init_record(msg)
597 msg_id = record['msg_id']
597 msg_id = record['msg_id']
598 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
598 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
599 # Unicode in records
599 # Unicode in records
600 record['engine_uuid'] = queue_id.decode('ascii')
600 record['engine_uuid'] = queue_id.decode('ascii')
601 record['client_uuid'] = msg['header']['session']
601 record['client_uuid'] = msg['header']['session']
602 record['queue'] = 'mux'
602 record['queue'] = 'mux'
603
603
604 try:
604 try:
605 # it's posible iopub arrived first:
605 # it's posible iopub arrived first:
606 existing = self.db.get_record(msg_id)
606 existing = self.db.get_record(msg_id)
607 for key,evalue in existing.iteritems():
607 for key,evalue in existing.iteritems():
608 rvalue = record.get(key, None)
608 rvalue = record.get(key, None)
609 if evalue and rvalue and evalue != rvalue:
609 if evalue and rvalue and evalue != rvalue:
610 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
610 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
611 elif evalue and not rvalue:
611 elif evalue and not rvalue:
612 record[key] = evalue
612 record[key] = evalue
613 try:
613 try:
614 self.db.update_record(msg_id, record)
614 self.db.update_record(msg_id, record)
615 except Exception:
615 except Exception:
616 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
616 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
617 except KeyError:
617 except KeyError:
618 try:
618 try:
619 self.db.add_record(msg_id, record)
619 self.db.add_record(msg_id, record)
620 except Exception:
620 except Exception:
621 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
621 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
622
622
623
623
624 self.pending.add(msg_id)
624 self.pending.add(msg_id)
625 self.queues[eid].append(msg_id)
625 self.queues[eid].append(msg_id)
626
626
627 def save_queue_result(self, idents, msg):
627 def save_queue_result(self, idents, msg):
628 if len(idents) < 2:
628 if len(idents) < 2:
629 self.log.error("invalid identity prefix: %r", idents)
629 self.log.error("invalid identity prefix: %r", idents)
630 return
630 return
631
631
632 client_id, queue_id = idents[:2]
632 client_id, queue_id = idents[:2]
633 try:
633 try:
634 msg = self.session.unserialize(msg)
634 msg = self.session.unserialize(msg)
635 except Exception:
635 except Exception:
636 self.log.error("queue::engine %r sent invalid message to %r: %r",
636 self.log.error("queue::engine %r sent invalid message to %r: %r",
637 queue_id, client_id, msg, exc_info=True)
637 queue_id, client_id, msg, exc_info=True)
638 return
638 return
639
639
640 eid = self.by_ident.get(queue_id, None)
640 eid = self.by_ident.get(queue_id, None)
641 if eid is None:
641 if eid is None:
642 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
642 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
643 return
643 return
644
644
645 parent = msg['parent_header']
645 parent = msg['parent_header']
646 if not parent:
646 if not parent:
647 return
647 return
648 msg_id = parent['msg_id']
648 msg_id = parent['msg_id']
649 if msg_id in self.pending:
649 if msg_id in self.pending:
650 self.pending.remove(msg_id)
650 self.pending.remove(msg_id)
651 self.all_completed.add(msg_id)
651 self.all_completed.add(msg_id)
652 self.queues[eid].remove(msg_id)
652 self.queues[eid].remove(msg_id)
653 self.completed[eid].append(msg_id)
653 self.completed[eid].append(msg_id)
654 self.log.info("queue::request %r completed on %s", msg_id, eid)
654 self.log.info("queue::request %r completed on %s", msg_id, eid)
655 elif msg_id not in self.all_completed:
655 elif msg_id not in self.all_completed:
656 # it could be a result from a dead engine that died before delivering the
656 # it could be a result from a dead engine that died before delivering the
657 # result
657 # result
658 self.log.warn("queue:: unknown msg finished %r", msg_id)
658 self.log.warn("queue:: unknown msg finished %r", msg_id)
659 return
659 return
660 # update record anyway, because the unregistration could have been premature
660 # update record anyway, because the unregistration could have been premature
661 rheader = msg['header']
661 rheader = msg['header']
662 md = msg['metadata']
662 md = msg['metadata']
663 completed = rheader['date']
663 completed = rheader['date']
664 started = md.get('started', None)
664 started = md.get('started', None)
665 result = {
665 result = {
666 'result_header' : rheader,
666 'result_header' : rheader,
667 'result_metadata': md,
667 'result_metadata': md,
668 'result_content': msg['content'],
668 'result_content': msg['content'],
669 'received': datetime.now(),
669 'received': datetime.now(),
670 'started' : started,
670 'started' : started,
671 'completed' : completed
671 'completed' : completed
672 }
672 }
673
673
674 result['result_buffers'] = msg['buffers']
674 result['result_buffers'] = msg['buffers']
675 try:
675 try:
676 self.db.update_record(msg_id, result)
676 self.db.update_record(msg_id, result)
677 except Exception:
677 except Exception:
678 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
678 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
679
679
680
680
681 #--------------------- Task Queue Traffic ------------------------------
681 #--------------------- Task Queue Traffic ------------------------------
682
682
683 def save_task_request(self, idents, msg):
683 def save_task_request(self, idents, msg):
684 """Save the submission of a task."""
684 """Save the submission of a task."""
685 client_id = idents[0]
685 client_id = idents[0]
686
686
687 try:
687 try:
688 msg = self.session.unserialize(msg)
688 msg = self.session.unserialize(msg)
689 except Exception:
689 except Exception:
690 self.log.error("task::client %r sent invalid task message: %r",
690 self.log.error("task::client %r sent invalid task message: %r",
691 client_id, msg, exc_info=True)
691 client_id, msg, exc_info=True)
692 return
692 return
693 record = init_record(msg)
693 record = init_record(msg)
694
694
695 record['client_uuid'] = msg['header']['session']
695 record['client_uuid'] = msg['header']['session']
696 record['queue'] = 'task'
696 record['queue'] = 'task'
697 header = msg['header']
697 header = msg['header']
698 msg_id = header['msg_id']
698 msg_id = header['msg_id']
699 self.pending.add(msg_id)
699 self.pending.add(msg_id)
700 self.unassigned.add(msg_id)
700 self.unassigned.add(msg_id)
701 try:
701 try:
702 # it's posible iopub arrived first:
702 # it's posible iopub arrived first:
703 existing = self.db.get_record(msg_id)
703 existing = self.db.get_record(msg_id)
704 if existing['resubmitted']:
704 if existing['resubmitted']:
705 for key in ('submitted', 'client_uuid', 'buffers'):
705 for key in ('submitted', 'client_uuid', 'buffers'):
706 # don't clobber these keys on resubmit
706 # don't clobber these keys on resubmit
707 # submitted and client_uuid should be different
707 # submitted and client_uuid should be different
708 # and buffers might be big, and shouldn't have changed
708 # and buffers might be big, and shouldn't have changed
709 record.pop(key)
709 record.pop(key)
710 # still check content,header which should not change
710 # still check content,header which should not change
711 # but are not expensive to compare as buffers
711 # but are not expensive to compare as buffers
712
712
713 for key,evalue in existing.iteritems():
713 for key,evalue in existing.iteritems():
714 if key.endswith('buffers'):
714 if key.endswith('buffers'):
715 # don't compare buffers
715 # don't compare buffers
716 continue
716 continue
717 rvalue = record.get(key, None)
717 rvalue = record.get(key, None)
718 if evalue and rvalue and evalue != rvalue:
718 if evalue and rvalue and evalue != rvalue:
719 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
719 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
720 elif evalue and not rvalue:
720 elif evalue and not rvalue:
721 record[key] = evalue
721 record[key] = evalue
722 try:
722 try:
723 self.db.update_record(msg_id, record)
723 self.db.update_record(msg_id, record)
724 except Exception:
724 except Exception:
725 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
725 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
726 except KeyError:
726 except KeyError:
727 try:
727 try:
728 self.db.add_record(msg_id, record)
728 self.db.add_record(msg_id, record)
729 except Exception:
729 except Exception:
730 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
730 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
731 except Exception:
731 except Exception:
732 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
732 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
733
733
734 def save_task_result(self, idents, msg):
734 def save_task_result(self, idents, msg):
735 """save the result of a completed task."""
735 """save the result of a completed task."""
736 client_id = idents[0]
736 client_id = idents[0]
737 try:
737 try:
738 msg = self.session.unserialize(msg)
738 msg = self.session.unserialize(msg)
739 except Exception:
739 except Exception:
740 self.log.error("task::invalid task result message send to %r: %r",
740 self.log.error("task::invalid task result message send to %r: %r",
741 client_id, msg, exc_info=True)
741 client_id, msg, exc_info=True)
742 return
742 return
743
743
744 parent = msg['parent_header']
744 parent = msg['parent_header']
745 if not parent:
745 if not parent:
746 # print msg
746 # print msg
747 self.log.warn("Task %r had no parent!", msg)
747 self.log.warn("Task %r had no parent!", msg)
748 return
748 return
749 msg_id = parent['msg_id']
749 msg_id = parent['msg_id']
750 if msg_id in self.unassigned:
750 if msg_id in self.unassigned:
751 self.unassigned.remove(msg_id)
751 self.unassigned.remove(msg_id)
752
752
753 header = msg['header']
753 header = msg['header']
754 md = msg['metadata']
754 md = msg['metadata']
755 engine_uuid = md.get('engine', u'')
755 engine_uuid = md.get('engine', u'')
756 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
756 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
757
757
758 status = md.get('status', None)
758 status = md.get('status', None)
759
759
760 if msg_id in self.pending:
760 if msg_id in self.pending:
761 self.log.info("task::task %r finished on %s", msg_id, eid)
761 self.log.info("task::task %r finished on %s", msg_id, eid)
762 self.pending.remove(msg_id)
762 self.pending.remove(msg_id)
763 self.all_completed.add(msg_id)
763 self.all_completed.add(msg_id)
764 if eid is not None:
764 if eid is not None:
765 if status != 'aborted':
765 if status != 'aborted':
766 self.completed[eid].append(msg_id)
766 self.completed[eid].append(msg_id)
767 if msg_id in self.tasks[eid]:
767 if msg_id in self.tasks[eid]:
768 self.tasks[eid].remove(msg_id)
768 self.tasks[eid].remove(msg_id)
769 completed = header['date']
769 completed = header['date']
770 started = md.get('started', None)
770 started = md.get('started', None)
771 result = {
771 result = {
772 'result_header' : header,
772 'result_header' : header,
773 'result_metadata': msg['metadata'],
773 'result_metadata': msg['metadata'],
774 'result_content': msg['content'],
774 'result_content': msg['content'],
775 'started' : started,
775 'started' : started,
776 'completed' : completed,
776 'completed' : completed,
777 'received' : datetime.now(),
777 'received' : datetime.now(),
778 'engine_uuid': engine_uuid,
778 'engine_uuid': engine_uuid,
779 }
779 }
780
780
781 result['result_buffers'] = msg['buffers']
781 result['result_buffers'] = msg['buffers']
782 try:
782 try:
783 self.db.update_record(msg_id, result)
783 self.db.update_record(msg_id, result)
784 except Exception:
784 except Exception:
785 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
785 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
786
786
787 else:
787 else:
788 self.log.debug("task::unknown task %r finished", msg_id)
788 self.log.debug("task::unknown task %r finished", msg_id)
789
789
790 def save_task_destination(self, idents, msg):
790 def save_task_destination(self, idents, msg):
791 try:
791 try:
792 msg = self.session.unserialize(msg, content=True)
792 msg = self.session.unserialize(msg, content=True)
793 except Exception:
793 except Exception:
794 self.log.error("task::invalid task tracking message", exc_info=True)
794 self.log.error("task::invalid task tracking message", exc_info=True)
795 return
795 return
796 content = msg['content']
796 content = msg['content']
797 # print (content)
797 # print (content)
798 msg_id = content['msg_id']
798 msg_id = content['msg_id']
799 engine_uuid = content['engine_id']
799 engine_uuid = content['engine_id']
800 eid = self.by_ident[cast_bytes(engine_uuid)]
800 eid = self.by_ident[cast_bytes(engine_uuid)]
801
801
802 self.log.info("task::task %r arrived on %r", msg_id, eid)
802 self.log.info("task::task %r arrived on %r", msg_id, eid)
803 if msg_id in self.unassigned:
803 if msg_id in self.unassigned:
804 self.unassigned.remove(msg_id)
804 self.unassigned.remove(msg_id)
805 # else:
805 # else:
806 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
806 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
807
807
808 self.tasks[eid].append(msg_id)
808 self.tasks[eid].append(msg_id)
809 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
809 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
810 try:
810 try:
811 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
811 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
812 except Exception:
812 except Exception:
813 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
813 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
814
814
815
815
816 def mia_task_request(self, idents, msg):
816 def mia_task_request(self, idents, msg):
817 raise NotImplementedError
817 raise NotImplementedError
818 client_id = idents[0]
818 client_id = idents[0]
819 # content = dict(mia=self.mia,status='ok')
819 # content = dict(mia=self.mia,status='ok')
820 # self.session.send('mia_reply', content=content, idents=client_id)
820 # self.session.send('mia_reply', content=content, idents=client_id)
821
821
822
822
823 #--------------------- IOPub Traffic ------------------------------
823 #--------------------- IOPub Traffic ------------------------------
824
824
825 def save_iopub_message(self, topics, msg):
825 def save_iopub_message(self, topics, msg):
826 """save an iopub message into the db"""
826 """save an iopub message into the db"""
827 # print (topics)
827 # print (topics)
828 try:
828 try:
829 msg = self.session.unserialize(msg, content=True)
829 msg = self.session.unserialize(msg, content=True)
830 except Exception:
830 except Exception:
831 self.log.error("iopub::invalid IOPub message", exc_info=True)
831 self.log.error("iopub::invalid IOPub message", exc_info=True)
832 return
832 return
833
833
834 parent = msg['parent_header']
834 parent = msg['parent_header']
835 if not parent:
835 if not parent:
836 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
836 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
837 return
837 return
838 msg_id = parent['msg_id']
838 msg_id = parent['msg_id']
839 msg_type = msg['header']['msg_type']
839 msg_type = msg['header']['msg_type']
840 content = msg['content']
840 content = msg['content']
841
841
842 # ensure msg_id is in db
842 # ensure msg_id is in db
843 try:
843 try:
844 rec = self.db.get_record(msg_id)
844 rec = self.db.get_record(msg_id)
845 except KeyError:
845 except KeyError:
846 rec = empty_record()
846 rec = empty_record()
847 rec['msg_id'] = msg_id
847 rec['msg_id'] = msg_id
848 self.db.add_record(msg_id, rec)
848 self.db.add_record(msg_id, rec)
849 # stream
849 # stream
850 d = {}
850 d = {}
851 if msg_type == 'stream':
851 if msg_type == 'stream':
852 name = content['name']
852 name = content['name']
853 s = rec[name] or ''
853 s = rec[name] or ''
854 d[name] = s + content['data']
854 d[name] = s + content['data']
855
855
856 elif msg_type == 'pyerr':
856 elif msg_type == 'pyerr':
857 d['pyerr'] = content
857 d['pyerr'] = content
858 elif msg_type == 'pyin':
858 elif msg_type == 'pyin':
859 d['pyin'] = content['code']
859 d['pyin'] = content['code']
860 elif msg_type in ('display_data', 'pyout'):
860 elif msg_type in ('display_data', 'pyout'):
861 d[msg_type] = content
861 d[msg_type] = content
862 elif msg_type == 'status':
862 elif msg_type == 'status':
863 pass
863 pass
864 elif msg_type == 'data_pub':
865 self.log.info("ignored data_pub message for %s" % msg_id)
864 else:
866 else:
865 self.log.warn("unhandled iopub msg_type: %r", msg_type)
867 self.log.warn("unhandled iopub msg_type: %r", msg_type)
866
868
867 if not d:
869 if not d:
868 return
870 return
869
871
870 try:
872 try:
871 self.db.update_record(msg_id, d)
873 self.db.update_record(msg_id, d)
872 except Exception:
874 except Exception:
873 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
875 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
874
876
875
877
876
878
877 #-------------------------------------------------------------------------
879 #-------------------------------------------------------------------------
878 # Registration requests
880 # Registration requests
879 #-------------------------------------------------------------------------
881 #-------------------------------------------------------------------------
880
882
881 def connection_request(self, client_id, msg):
883 def connection_request(self, client_id, msg):
882 """Reply with connection addresses for clients."""
884 """Reply with connection addresses for clients."""
883 self.log.info("client::client %r connected", client_id)
885 self.log.info("client::client %r connected", client_id)
884 content = dict(status='ok')
886 content = dict(status='ok')
885 jsonable = {}
887 jsonable = {}
886 for k,v in self.keytable.iteritems():
888 for k,v in self.keytable.iteritems():
887 if v not in self.dead_engines:
889 if v not in self.dead_engines:
888 jsonable[str(k)] = v
890 jsonable[str(k)] = v
889 content['engines'] = jsonable
891 content['engines'] = jsonable
890 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
892 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
891
893
892 def register_engine(self, reg, msg):
894 def register_engine(self, reg, msg):
893 """Register a new engine."""
895 """Register a new engine."""
894 content = msg['content']
896 content = msg['content']
895 try:
897 try:
896 uuid = content['uuid']
898 uuid = content['uuid']
897 except KeyError:
899 except KeyError:
898 self.log.error("registration::queue not specified", exc_info=True)
900 self.log.error("registration::queue not specified", exc_info=True)
899 return
901 return
900
902
901 eid = self._next_id
903 eid = self._next_id
902
904
903 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
905 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
904
906
905 content = dict(id=eid,status='ok')
907 content = dict(id=eid,status='ok')
906 # check if requesting available IDs:
908 # check if requesting available IDs:
907 if cast_bytes(uuid) in self.by_ident:
909 if cast_bytes(uuid) in self.by_ident:
908 try:
910 try:
909 raise KeyError("uuid %r in use" % uuid)
911 raise KeyError("uuid %r in use" % uuid)
910 except:
912 except:
911 content = error.wrap_exception()
913 content = error.wrap_exception()
912 self.log.error("uuid %r in use", uuid, exc_info=True)
914 self.log.error("uuid %r in use", uuid, exc_info=True)
913 else:
915 else:
914 for h, ec in self.incoming_registrations.iteritems():
916 for h, ec in self.incoming_registrations.iteritems():
915 if uuid == h:
917 if uuid == h:
916 try:
918 try:
917 raise KeyError("heart_id %r in use" % uuid)
919 raise KeyError("heart_id %r in use" % uuid)
918 except:
920 except:
919 self.log.error("heart_id %r in use", uuid, exc_info=True)
921 self.log.error("heart_id %r in use", uuid, exc_info=True)
920 content = error.wrap_exception()
922 content = error.wrap_exception()
921 break
923 break
922 elif uuid == ec.uuid:
924 elif uuid == ec.uuid:
923 try:
925 try:
924 raise KeyError("uuid %r in use" % uuid)
926 raise KeyError("uuid %r in use" % uuid)
925 except:
927 except:
926 self.log.error("uuid %r in use", uuid, exc_info=True)
928 self.log.error("uuid %r in use", uuid, exc_info=True)
927 content = error.wrap_exception()
929 content = error.wrap_exception()
928 break
930 break
929
931
930 msg = self.session.send(self.query, "registration_reply",
932 msg = self.session.send(self.query, "registration_reply",
931 content=content,
933 content=content,
932 ident=reg)
934 ident=reg)
933
935
934 heart = cast_bytes(uuid)
936 heart = cast_bytes(uuid)
935
937
936 if content['status'] == 'ok':
938 if content['status'] == 'ok':
937 if heart in self.heartmonitor.hearts:
939 if heart in self.heartmonitor.hearts:
938 # already beating
940 # already beating
939 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
941 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
940 self.finish_registration(heart)
942 self.finish_registration(heart)
941 else:
943 else:
942 purge = lambda : self._purge_stalled_registration(heart)
944 purge = lambda : self._purge_stalled_registration(heart)
943 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
945 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
944 dc.start()
946 dc.start()
945 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
947 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
946 else:
948 else:
947 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
949 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
948
950
949 return eid
951 return eid
950
952
951 def unregister_engine(self, ident, msg):
953 def unregister_engine(self, ident, msg):
952 """Unregister an engine that explicitly requested to leave."""
954 """Unregister an engine that explicitly requested to leave."""
953 try:
955 try:
954 eid = msg['content']['id']
956 eid = msg['content']['id']
955 except:
957 except:
956 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
958 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
957 return
959 return
958 self.log.info("registration::unregister_engine(%r)", eid)
960 self.log.info("registration::unregister_engine(%r)", eid)
959 # print (eid)
961 # print (eid)
960 uuid = self.keytable[eid]
962 uuid = self.keytable[eid]
961 content=dict(id=eid, uuid=uuid)
963 content=dict(id=eid, uuid=uuid)
962 self.dead_engines.add(uuid)
964 self.dead_engines.add(uuid)
963 # self.ids.remove(eid)
965 # self.ids.remove(eid)
964 # uuid = self.keytable.pop(eid)
966 # uuid = self.keytable.pop(eid)
965 #
967 #
966 # ec = self.engines.pop(eid)
968 # ec = self.engines.pop(eid)
967 # self.hearts.pop(ec.heartbeat)
969 # self.hearts.pop(ec.heartbeat)
968 # self.by_ident.pop(ec.queue)
970 # self.by_ident.pop(ec.queue)
969 # self.completed.pop(eid)
971 # self.completed.pop(eid)
970 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
972 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
971 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
973 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
972 dc.start()
974 dc.start()
973 ############## TODO: HANDLE IT ################
975 ############## TODO: HANDLE IT ################
974
976
975 self._save_engine_state()
977 self._save_engine_state()
976
978
977 if self.notifier:
979 if self.notifier:
978 self.session.send(self.notifier, "unregistration_notification", content=content)
980 self.session.send(self.notifier, "unregistration_notification", content=content)
979
981
980 def _handle_stranded_msgs(self, eid, uuid):
982 def _handle_stranded_msgs(self, eid, uuid):
981 """Handle messages known to be on an engine when the engine unregisters.
983 """Handle messages known to be on an engine when the engine unregisters.
982
984
983 It is possible that this will fire prematurely - that is, an engine will
985 It is possible that this will fire prematurely - that is, an engine will
984 go down after completing a result, and the client will be notified
986 go down after completing a result, and the client will be notified
985 that the result failed and later receive the actual result.
987 that the result failed and later receive the actual result.
986 """
988 """
987
989
988 outstanding = self.queues[eid]
990 outstanding = self.queues[eid]
989
991
990 for msg_id in outstanding:
992 for msg_id in outstanding:
991 self.pending.remove(msg_id)
993 self.pending.remove(msg_id)
992 self.all_completed.add(msg_id)
994 self.all_completed.add(msg_id)
993 try:
995 try:
994 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
996 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
995 except:
997 except:
996 content = error.wrap_exception()
998 content = error.wrap_exception()
997 # build a fake header:
999 # build a fake header:
998 header = {}
1000 header = {}
999 header['engine'] = uuid
1001 header['engine'] = uuid
1000 header['date'] = datetime.now()
1002 header['date'] = datetime.now()
1001 rec = dict(result_content=content, result_header=header, result_buffers=[])
1003 rec = dict(result_content=content, result_header=header, result_buffers=[])
1002 rec['completed'] = header['date']
1004 rec['completed'] = header['date']
1003 rec['engine_uuid'] = uuid
1005 rec['engine_uuid'] = uuid
1004 try:
1006 try:
1005 self.db.update_record(msg_id, rec)
1007 self.db.update_record(msg_id, rec)
1006 except Exception:
1008 except Exception:
1007 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1009 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1008
1010
1009
1011
1010 def finish_registration(self, heart):
1012 def finish_registration(self, heart):
1011 """Second half of engine registration, called after our HeartMonitor
1013 """Second half of engine registration, called after our HeartMonitor
1012 has received a beat from the Engine's Heart."""
1014 has received a beat from the Engine's Heart."""
1013 try:
1015 try:
1014 ec = self.incoming_registrations.pop(heart)
1016 ec = self.incoming_registrations.pop(heart)
1015 except KeyError:
1017 except KeyError:
1016 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1018 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1017 return
1019 return
1018 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1020 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1019 if ec.stallback is not None:
1021 if ec.stallback is not None:
1020 ec.stallback.stop()
1022 ec.stallback.stop()
1021 eid = ec.id
1023 eid = ec.id
1022 self.ids.add(eid)
1024 self.ids.add(eid)
1023 self.keytable[eid] = ec.uuid
1025 self.keytable[eid] = ec.uuid
1024 self.engines[eid] = ec
1026 self.engines[eid] = ec
1025 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1027 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1026 self.queues[eid] = list()
1028 self.queues[eid] = list()
1027 self.tasks[eid] = list()
1029 self.tasks[eid] = list()
1028 self.completed[eid] = list()
1030 self.completed[eid] = list()
1029 self.hearts[heart] = eid
1031 self.hearts[heart] = eid
1030 content = dict(id=eid, uuid=self.engines[eid].uuid)
1032 content = dict(id=eid, uuid=self.engines[eid].uuid)
1031 if self.notifier:
1033 if self.notifier:
1032 self.session.send(self.notifier, "registration_notification", content=content)
1034 self.session.send(self.notifier, "registration_notification", content=content)
1033 self.log.info("engine::Engine Connected: %i", eid)
1035 self.log.info("engine::Engine Connected: %i", eid)
1034
1036
1035 self._save_engine_state()
1037 self._save_engine_state()
1036
1038
1037 def _purge_stalled_registration(self, heart):
1039 def _purge_stalled_registration(self, heart):
1038 if heart in self.incoming_registrations:
1040 if heart in self.incoming_registrations:
1039 ec = self.incoming_registrations.pop(heart)
1041 ec = self.incoming_registrations.pop(heart)
1040 self.log.info("registration::purging stalled registration: %i", ec.id)
1042 self.log.info("registration::purging stalled registration: %i", ec.id)
1041 else:
1043 else:
1042 pass
1044 pass
1043
1045
1044 #-------------------------------------------------------------------------
1046 #-------------------------------------------------------------------------
1045 # Engine State
1047 # Engine State
1046 #-------------------------------------------------------------------------
1048 #-------------------------------------------------------------------------
1047
1049
1048
1050
1049 def _cleanup_engine_state_file(self):
1051 def _cleanup_engine_state_file(self):
1050 """cleanup engine state mapping"""
1052 """cleanup engine state mapping"""
1051
1053
1052 if os.path.exists(self.engine_state_file):
1054 if os.path.exists(self.engine_state_file):
1053 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1055 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1054 try:
1056 try:
1055 os.remove(self.engine_state_file)
1057 os.remove(self.engine_state_file)
1056 except IOError:
1058 except IOError:
1057 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1059 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1058
1060
1059
1061
1060 def _save_engine_state(self):
1062 def _save_engine_state(self):
1061 """save engine mapping to JSON file"""
1063 """save engine mapping to JSON file"""
1062 if not self.engine_state_file:
1064 if not self.engine_state_file:
1063 return
1065 return
1064 self.log.debug("save engine state to %s" % self.engine_state_file)
1066 self.log.debug("save engine state to %s" % self.engine_state_file)
1065 state = {}
1067 state = {}
1066 engines = {}
1068 engines = {}
1067 for eid, ec in self.engines.iteritems():
1069 for eid, ec in self.engines.iteritems():
1068 if ec.uuid not in self.dead_engines:
1070 if ec.uuid not in self.dead_engines:
1069 engines[eid] = ec.uuid
1071 engines[eid] = ec.uuid
1070
1072
1071 state['engines'] = engines
1073 state['engines'] = engines
1072
1074
1073 state['next_id'] = self._idcounter
1075 state['next_id'] = self._idcounter
1074
1076
1075 with open(self.engine_state_file, 'w') as f:
1077 with open(self.engine_state_file, 'w') as f:
1076 json.dump(state, f)
1078 json.dump(state, f)
1077
1079
1078
1080
1079 def _load_engine_state(self):
1081 def _load_engine_state(self):
1080 """load engine mapping from JSON file"""
1082 """load engine mapping from JSON file"""
1081 if not os.path.exists(self.engine_state_file):
1083 if not os.path.exists(self.engine_state_file):
1082 return
1084 return
1083
1085
1084 self.log.info("loading engine state from %s" % self.engine_state_file)
1086 self.log.info("loading engine state from %s" % self.engine_state_file)
1085
1087
1086 with open(self.engine_state_file) as f:
1088 with open(self.engine_state_file) as f:
1087 state = json.load(f)
1089 state = json.load(f)
1088
1090
1089 save_notifier = self.notifier
1091 save_notifier = self.notifier
1090 self.notifier = None
1092 self.notifier = None
1091 for eid, uuid in state['engines'].iteritems():
1093 for eid, uuid in state['engines'].iteritems():
1092 heart = uuid.encode('ascii')
1094 heart = uuid.encode('ascii')
1093 # start with this heart as current and beating:
1095 # start with this heart as current and beating:
1094 self.heartmonitor.responses.add(heart)
1096 self.heartmonitor.responses.add(heart)
1095 self.heartmonitor.hearts.add(heart)
1097 self.heartmonitor.hearts.add(heart)
1096
1098
1097 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1099 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1098 self.finish_registration(heart)
1100 self.finish_registration(heart)
1099
1101
1100 self.notifier = save_notifier
1102 self.notifier = save_notifier
1101
1103
1102 self._idcounter = state['next_id']
1104 self._idcounter = state['next_id']
1103
1105
1104 #-------------------------------------------------------------------------
1106 #-------------------------------------------------------------------------
1105 # Client Requests
1107 # Client Requests
1106 #-------------------------------------------------------------------------
1108 #-------------------------------------------------------------------------
1107
1109
1108 def shutdown_request(self, client_id, msg):
1110 def shutdown_request(self, client_id, msg):
1109 """handle shutdown request."""
1111 """handle shutdown request."""
1110 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1112 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1111 # also notify other clients of shutdown
1113 # also notify other clients of shutdown
1112 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1114 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1113 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1115 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1114 dc.start()
1116 dc.start()
1115
1117
1116 def _shutdown(self):
1118 def _shutdown(self):
1117 self.log.info("hub::hub shutting down.")
1119 self.log.info("hub::hub shutting down.")
1118 time.sleep(0.1)
1120 time.sleep(0.1)
1119 sys.exit(0)
1121 sys.exit(0)
1120
1122
1121
1123
1122 def check_load(self, client_id, msg):
1124 def check_load(self, client_id, msg):
1123 content = msg['content']
1125 content = msg['content']
1124 try:
1126 try:
1125 targets = content['targets']
1127 targets = content['targets']
1126 targets = self._validate_targets(targets)
1128 targets = self._validate_targets(targets)
1127 except:
1129 except:
1128 content = error.wrap_exception()
1130 content = error.wrap_exception()
1129 self.session.send(self.query, "hub_error",
1131 self.session.send(self.query, "hub_error",
1130 content=content, ident=client_id)
1132 content=content, ident=client_id)
1131 return
1133 return
1132
1134
1133 content = dict(status='ok')
1135 content = dict(status='ok')
1134 # loads = {}
1136 # loads = {}
1135 for t in targets:
1137 for t in targets:
1136 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1138 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1137 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1139 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1138
1140
1139
1141
1140 def queue_status(self, client_id, msg):
1142 def queue_status(self, client_id, msg):
1141 """Return the Queue status of one or more targets.
1143 """Return the Queue status of one or more targets.
1142 if verbose: return the msg_ids
1144 if verbose: return the msg_ids
1143 else: return len of each type.
1145 else: return len of each type.
1144 keys: queue (pending MUX jobs)
1146 keys: queue (pending MUX jobs)
1145 tasks (pending Task jobs)
1147 tasks (pending Task jobs)
1146 completed (finished jobs from both queues)"""
1148 completed (finished jobs from both queues)"""
1147 content = msg['content']
1149 content = msg['content']
1148 targets = content['targets']
1150 targets = content['targets']
1149 try:
1151 try:
1150 targets = self._validate_targets(targets)
1152 targets = self._validate_targets(targets)
1151 except:
1153 except:
1152 content = error.wrap_exception()
1154 content = error.wrap_exception()
1153 self.session.send(self.query, "hub_error",
1155 self.session.send(self.query, "hub_error",
1154 content=content, ident=client_id)
1156 content=content, ident=client_id)
1155 return
1157 return
1156 verbose = content.get('verbose', False)
1158 verbose = content.get('verbose', False)
1157 content = dict(status='ok')
1159 content = dict(status='ok')
1158 for t in targets:
1160 for t in targets:
1159 queue = self.queues[t]
1161 queue = self.queues[t]
1160 completed = self.completed[t]
1162 completed = self.completed[t]
1161 tasks = self.tasks[t]
1163 tasks = self.tasks[t]
1162 if not verbose:
1164 if not verbose:
1163 queue = len(queue)
1165 queue = len(queue)
1164 completed = len(completed)
1166 completed = len(completed)
1165 tasks = len(tasks)
1167 tasks = len(tasks)
1166 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1168 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1167 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1169 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1168 # print (content)
1170 # print (content)
1169 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1171 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1170
1172
1171 def purge_results(self, client_id, msg):
1173 def purge_results(self, client_id, msg):
1172 """Purge results from memory. This method is more valuable before we move
1174 """Purge results from memory. This method is more valuable before we move
1173 to a DB based message storage mechanism."""
1175 to a DB based message storage mechanism."""
1174 content = msg['content']
1176 content = msg['content']
1175 self.log.info("Dropping records with %s", content)
1177 self.log.info("Dropping records with %s", content)
1176 msg_ids = content.get('msg_ids', [])
1178 msg_ids = content.get('msg_ids', [])
1177 reply = dict(status='ok')
1179 reply = dict(status='ok')
1178 if msg_ids == 'all':
1180 if msg_ids == 'all':
1179 try:
1181 try:
1180 self.db.drop_matching_records(dict(completed={'$ne':None}))
1182 self.db.drop_matching_records(dict(completed={'$ne':None}))
1181 except Exception:
1183 except Exception:
1182 reply = error.wrap_exception()
1184 reply = error.wrap_exception()
1183 else:
1185 else:
1184 pending = filter(lambda m: m in self.pending, msg_ids)
1186 pending = filter(lambda m: m in self.pending, msg_ids)
1185 if pending:
1187 if pending:
1186 try:
1188 try:
1187 raise IndexError("msg pending: %r" % pending[0])
1189 raise IndexError("msg pending: %r" % pending[0])
1188 except:
1190 except:
1189 reply = error.wrap_exception()
1191 reply = error.wrap_exception()
1190 else:
1192 else:
1191 try:
1193 try:
1192 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1194 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1193 except Exception:
1195 except Exception:
1194 reply = error.wrap_exception()
1196 reply = error.wrap_exception()
1195
1197
1196 if reply['status'] == 'ok':
1198 if reply['status'] == 'ok':
1197 eids = content.get('engine_ids', [])
1199 eids = content.get('engine_ids', [])
1198 for eid in eids:
1200 for eid in eids:
1199 if eid not in self.engines:
1201 if eid not in self.engines:
1200 try:
1202 try:
1201 raise IndexError("No such engine: %i" % eid)
1203 raise IndexError("No such engine: %i" % eid)
1202 except:
1204 except:
1203 reply = error.wrap_exception()
1205 reply = error.wrap_exception()
1204 break
1206 break
1205 uid = self.engines[eid].uuid
1207 uid = self.engines[eid].uuid
1206 try:
1208 try:
1207 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1209 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1208 except Exception:
1210 except Exception:
1209 reply = error.wrap_exception()
1211 reply = error.wrap_exception()
1210 break
1212 break
1211
1213
1212 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1214 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1213
1215
1214 def resubmit_task(self, client_id, msg):
1216 def resubmit_task(self, client_id, msg):
1215 """Resubmit one or more tasks."""
1217 """Resubmit one or more tasks."""
1216 def finish(reply):
1218 def finish(reply):
1217 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1219 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1218
1220
1219 content = msg['content']
1221 content = msg['content']
1220 msg_ids = content['msg_ids']
1222 msg_ids = content['msg_ids']
1221 reply = dict(status='ok')
1223 reply = dict(status='ok')
1222 try:
1224 try:
1223 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1225 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1224 'header', 'content', 'buffers'])
1226 'header', 'content', 'buffers'])
1225 except Exception:
1227 except Exception:
1226 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1228 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1227 return finish(error.wrap_exception())
1229 return finish(error.wrap_exception())
1228
1230
1229 # validate msg_ids
1231 # validate msg_ids
1230 found_ids = [ rec['msg_id'] for rec in records ]
1232 found_ids = [ rec['msg_id'] for rec in records ]
1231 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1233 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1232 if len(records) > len(msg_ids):
1234 if len(records) > len(msg_ids):
1233 try:
1235 try:
1234 raise RuntimeError("DB appears to be in an inconsistent state."
1236 raise RuntimeError("DB appears to be in an inconsistent state."
1235 "More matching records were found than should exist")
1237 "More matching records were found than should exist")
1236 except Exception:
1238 except Exception:
1237 return finish(error.wrap_exception())
1239 return finish(error.wrap_exception())
1238 elif len(records) < len(msg_ids):
1240 elif len(records) < len(msg_ids):
1239 missing = [ m for m in msg_ids if m not in found_ids ]
1241 missing = [ m for m in msg_ids if m not in found_ids ]
1240 try:
1242 try:
1241 raise KeyError("No such msg(s): %r" % missing)
1243 raise KeyError("No such msg(s): %r" % missing)
1242 except KeyError:
1244 except KeyError:
1243 return finish(error.wrap_exception())
1245 return finish(error.wrap_exception())
1244 elif pending_ids:
1246 elif pending_ids:
1245 pass
1247 pass
1246 # no need to raise on resubmit of pending task, now that we
1248 # no need to raise on resubmit of pending task, now that we
1247 # resubmit under new ID, but do we want to raise anyway?
1249 # resubmit under new ID, but do we want to raise anyway?
1248 # msg_id = invalid_ids[0]
1250 # msg_id = invalid_ids[0]
1249 # try:
1251 # try:
1250 # raise ValueError("Task(s) %r appears to be inflight" % )
1252 # raise ValueError("Task(s) %r appears to be inflight" % )
1251 # except Exception:
1253 # except Exception:
1252 # return finish(error.wrap_exception())
1254 # return finish(error.wrap_exception())
1253
1255
1254 # mapping of original IDs to resubmitted IDs
1256 # mapping of original IDs to resubmitted IDs
1255 resubmitted = {}
1257 resubmitted = {}
1256
1258
1257 # send the messages
1259 # send the messages
1258 for rec in records:
1260 for rec in records:
1259 header = rec['header']
1261 header = rec['header']
1260 msg = self.session.msg(header['msg_type'], parent=header)
1262 msg = self.session.msg(header['msg_type'], parent=header)
1261 msg_id = msg['msg_id']
1263 msg_id = msg['msg_id']
1262 msg['content'] = rec['content']
1264 msg['content'] = rec['content']
1263
1265
1264 # use the old header, but update msg_id and timestamp
1266 # use the old header, but update msg_id and timestamp
1265 fresh = msg['header']
1267 fresh = msg['header']
1266 header['msg_id'] = fresh['msg_id']
1268 header['msg_id'] = fresh['msg_id']
1267 header['date'] = fresh['date']
1269 header['date'] = fresh['date']
1268 msg['header'] = header
1270 msg['header'] = header
1269
1271
1270 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1272 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1271
1273
1272 resubmitted[rec['msg_id']] = msg_id
1274 resubmitted[rec['msg_id']] = msg_id
1273 self.pending.add(msg_id)
1275 self.pending.add(msg_id)
1274 msg['buffers'] = rec['buffers']
1276 msg['buffers'] = rec['buffers']
1275 try:
1277 try:
1276 self.db.add_record(msg_id, init_record(msg))
1278 self.db.add_record(msg_id, init_record(msg))
1277 except Exception:
1279 except Exception:
1278 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1280 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1279 return finish(error.wrap_exception())
1281 return finish(error.wrap_exception())
1280
1282
1281 finish(dict(status='ok', resubmitted=resubmitted))
1283 finish(dict(status='ok', resubmitted=resubmitted))
1282
1284
1283 # store the new IDs in the Task DB
1285 # store the new IDs in the Task DB
1284 for msg_id, resubmit_id in resubmitted.iteritems():
1286 for msg_id, resubmit_id in resubmitted.iteritems():
1285 try:
1287 try:
1286 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1288 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1287 except Exception:
1289 except Exception:
1288 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1290 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1289
1291
1290
1292
1291 def _extract_record(self, rec):
1293 def _extract_record(self, rec):
1292 """decompose a TaskRecord dict into subsection of reply for get_result"""
1294 """decompose a TaskRecord dict into subsection of reply for get_result"""
1293 io_dict = {}
1295 io_dict = {}
1294 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1296 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1295 io_dict[key] = rec[key]
1297 io_dict[key] = rec[key]
1296 content = {
1298 content = {
1297 'header': rec['header'],
1299 'header': rec['header'],
1298 'metadata': rec['metadata'],
1300 'metadata': rec['metadata'],
1299 'result_metadata': rec['result_metadata'],
1301 'result_metadata': rec['result_metadata'],
1300 'result_header' : rec['result_header'],
1302 'result_header' : rec['result_header'],
1301 'result_content': rec['result_content'],
1303 'result_content': rec['result_content'],
1302 'received' : rec['received'],
1304 'received' : rec['received'],
1303 'io' : io_dict,
1305 'io' : io_dict,
1304 }
1306 }
1305 if rec['result_buffers']:
1307 if rec['result_buffers']:
1306 buffers = map(bytes, rec['result_buffers'])
1308 buffers = map(bytes, rec['result_buffers'])
1307 else:
1309 else:
1308 buffers = []
1310 buffers = []
1309
1311
1310 return content, buffers
1312 return content, buffers
1311
1313
1312 def get_results(self, client_id, msg):
1314 def get_results(self, client_id, msg):
1313 """Get the result of 1 or more messages."""
1315 """Get the result of 1 or more messages."""
1314 content = msg['content']
1316 content = msg['content']
1315 msg_ids = sorted(set(content['msg_ids']))
1317 msg_ids = sorted(set(content['msg_ids']))
1316 statusonly = content.get('status_only', False)
1318 statusonly = content.get('status_only', False)
1317 pending = []
1319 pending = []
1318 completed = []
1320 completed = []
1319 content = dict(status='ok')
1321 content = dict(status='ok')
1320 content['pending'] = pending
1322 content['pending'] = pending
1321 content['completed'] = completed
1323 content['completed'] = completed
1322 buffers = []
1324 buffers = []
1323 if not statusonly:
1325 if not statusonly:
1324 try:
1326 try:
1325 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1327 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1326 # turn match list into dict, for faster lookup
1328 # turn match list into dict, for faster lookup
1327 records = {}
1329 records = {}
1328 for rec in matches:
1330 for rec in matches:
1329 records[rec['msg_id']] = rec
1331 records[rec['msg_id']] = rec
1330 except Exception:
1332 except Exception:
1331 content = error.wrap_exception()
1333 content = error.wrap_exception()
1332 self.session.send(self.query, "result_reply", content=content,
1334 self.session.send(self.query, "result_reply", content=content,
1333 parent=msg, ident=client_id)
1335 parent=msg, ident=client_id)
1334 return
1336 return
1335 else:
1337 else:
1336 records = {}
1338 records = {}
1337 for msg_id in msg_ids:
1339 for msg_id in msg_ids:
1338 if msg_id in self.pending:
1340 if msg_id in self.pending:
1339 pending.append(msg_id)
1341 pending.append(msg_id)
1340 elif msg_id in self.all_completed:
1342 elif msg_id in self.all_completed:
1341 completed.append(msg_id)
1343 completed.append(msg_id)
1342 if not statusonly:
1344 if not statusonly:
1343 c,bufs = self._extract_record(records[msg_id])
1345 c,bufs = self._extract_record(records[msg_id])
1344 content[msg_id] = c
1346 content[msg_id] = c
1345 buffers.extend(bufs)
1347 buffers.extend(bufs)
1346 elif msg_id in records:
1348 elif msg_id in records:
1347 if rec['completed']:
1349 if rec['completed']:
1348 completed.append(msg_id)
1350 completed.append(msg_id)
1349 c,bufs = self._extract_record(records[msg_id])
1351 c,bufs = self._extract_record(records[msg_id])
1350 content[msg_id] = c
1352 content[msg_id] = c
1351 buffers.extend(bufs)
1353 buffers.extend(bufs)
1352 else:
1354 else:
1353 pending.append(msg_id)
1355 pending.append(msg_id)
1354 else:
1356 else:
1355 try:
1357 try:
1356 raise KeyError('No such message: '+msg_id)
1358 raise KeyError('No such message: '+msg_id)
1357 except:
1359 except:
1358 content = error.wrap_exception()
1360 content = error.wrap_exception()
1359 break
1361 break
1360 self.session.send(self.query, "result_reply", content=content,
1362 self.session.send(self.query, "result_reply", content=content,
1361 parent=msg, ident=client_id,
1363 parent=msg, ident=client_id,
1362 buffers=buffers)
1364 buffers=buffers)
1363
1365
1364 def get_history(self, client_id, msg):
1366 def get_history(self, client_id, msg):
1365 """Get a list of all msg_ids in our DB records"""
1367 """Get a list of all msg_ids in our DB records"""
1366 try:
1368 try:
1367 msg_ids = self.db.get_history()
1369 msg_ids = self.db.get_history()
1368 except Exception as e:
1370 except Exception as e:
1369 content = error.wrap_exception()
1371 content = error.wrap_exception()
1370 else:
1372 else:
1371 content = dict(status='ok', history=msg_ids)
1373 content = dict(status='ok', history=msg_ids)
1372
1374
1373 self.session.send(self.query, "history_reply", content=content,
1375 self.session.send(self.query, "history_reply", content=content,
1374 parent=msg, ident=client_id)
1376 parent=msg, ident=client_id)
1375
1377
1376 def db_query(self, client_id, msg):
1378 def db_query(self, client_id, msg):
1377 """Perform a raw query on the task record database."""
1379 """Perform a raw query on the task record database."""
1378 content = msg['content']
1380 content = msg['content']
1379 query = content.get('query', {})
1381 query = content.get('query', {})
1380 keys = content.get('keys', None)
1382 keys = content.get('keys', None)
1381 buffers = []
1383 buffers = []
1382 empty = list()
1384 empty = list()
1383 try:
1385 try:
1384 records = self.db.find_records(query, keys)
1386 records = self.db.find_records(query, keys)
1385 except Exception as e:
1387 except Exception as e:
1386 content = error.wrap_exception()
1388 content = error.wrap_exception()
1387 else:
1389 else:
1388 # extract buffers from reply content:
1390 # extract buffers from reply content:
1389 if keys is not None:
1391 if keys is not None:
1390 buffer_lens = [] if 'buffers' in keys else None
1392 buffer_lens = [] if 'buffers' in keys else None
1391 result_buffer_lens = [] if 'result_buffers' in keys else None
1393 result_buffer_lens = [] if 'result_buffers' in keys else None
1392 else:
1394 else:
1393 buffer_lens = None
1395 buffer_lens = None
1394 result_buffer_lens = None
1396 result_buffer_lens = None
1395
1397
1396 for rec in records:
1398 for rec in records:
1397 # buffers may be None, so double check
1399 # buffers may be None, so double check
1398 b = rec.pop('buffers', empty) or empty
1400 b = rec.pop('buffers', empty) or empty
1399 if buffer_lens is not None:
1401 if buffer_lens is not None:
1400 buffer_lens.append(len(b))
1402 buffer_lens.append(len(b))
1401 buffers.extend(b)
1403 buffers.extend(b)
1402 rb = rec.pop('result_buffers', empty) or empty
1404 rb = rec.pop('result_buffers', empty) or empty
1403 if result_buffer_lens is not None:
1405 if result_buffer_lens is not None:
1404 result_buffer_lens.append(len(rb))
1406 result_buffer_lens.append(len(rb))
1405 buffers.extend(rb)
1407 buffers.extend(rb)
1406 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1408 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1407 result_buffer_lens=result_buffer_lens)
1409 result_buffer_lens=result_buffer_lens)
1408 # self.log.debug (content)
1410 # self.log.debug (content)
1409 self.session.send(self.query, "db_reply", content=content,
1411 self.session.send(self.query, "db_reply", content=content,
1410 parent=msg, ident=client_id,
1412 parent=msg, ident=client_id,
1411 buffers=buffers)
1413 buffers=buffers)
1412
1414
General Comments 0
You need to be logged in to leave comments. Login now