##// END OF EJS Templates
better handle aborted/unschedulers tasks
MinRK -
Show More
@@ -1,1091 +1,1095 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from datetime import datetime
20 from datetime import datetime
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25
25
26 # internal:
26 # internal:
27 from IPython.utils.importstring import import_item
27 from IPython.utils.importstring import import_item
28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
29
29
30 from IPython.parallel import error
30 from IPython.parallel import error
31 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
31 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
32 from IPython.parallel.util import select_random_ports, validate_url_container, ISO8601
32 from IPython.parallel.util import select_random_ports, validate_url_container, ISO8601
33
33
34 from .heartmonitor import HeartMonitor
34 from .heartmonitor import HeartMonitor
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Code
37 # Code
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 def _passer(*args, **kwargs):
40 def _passer(*args, **kwargs):
41 return
41 return
42
42
43 def _printer(*args, **kwargs):
43 def _printer(*args, **kwargs):
44 print (args)
44 print (args)
45 print (kwargs)
45 print (kwargs)
46
46
47 def empty_record():
47 def empty_record():
48 """Return an empty dict with all record keys."""
48 """Return an empty dict with all record keys."""
49 return {
49 return {
50 'msg_id' : None,
50 'msg_id' : None,
51 'header' : None,
51 'header' : None,
52 'content': None,
52 'content': None,
53 'buffers': None,
53 'buffers': None,
54 'submitted': None,
54 'submitted': None,
55 'client_uuid' : None,
55 'client_uuid' : None,
56 'engine_uuid' : None,
56 'engine_uuid' : None,
57 'started': None,
57 'started': None,
58 'completed': None,
58 'completed': None,
59 'resubmitted': None,
59 'resubmitted': None,
60 'result_header' : None,
60 'result_header' : None,
61 'result_content' : None,
61 'result_content' : None,
62 'result_buffers' : None,
62 'result_buffers' : None,
63 'queue' : None,
63 'queue' : None,
64 'pyin' : None,
64 'pyin' : None,
65 'pyout': None,
65 'pyout': None,
66 'pyerr': None,
66 'pyerr': None,
67 'stdout': '',
67 'stdout': '',
68 'stderr': '',
68 'stderr': '',
69 }
69 }
70
70
71 def init_record(msg):
71 def init_record(msg):
72 """Initialize a TaskRecord based on a request."""
72 """Initialize a TaskRecord based on a request."""
73 header = msg['header']
73 header = msg['header']
74 return {
74 return {
75 'msg_id' : header['msg_id'],
75 'msg_id' : header['msg_id'],
76 'header' : header,
76 'header' : header,
77 'content': msg['content'],
77 'content': msg['content'],
78 'buffers': msg['buffers'],
78 'buffers': msg['buffers'],
79 'submitted': datetime.strptime(header['date'], ISO8601),
79 'submitted': datetime.strptime(header['date'], ISO8601),
80 'client_uuid' : None,
80 'client_uuid' : None,
81 'engine_uuid' : None,
81 'engine_uuid' : None,
82 'started': None,
82 'started': None,
83 'completed': None,
83 'completed': None,
84 'resubmitted': None,
84 'resubmitted': None,
85 'result_header' : None,
85 'result_header' : None,
86 'result_content' : None,
86 'result_content' : None,
87 'result_buffers' : None,
87 'result_buffers' : None,
88 'queue' : None,
88 'queue' : None,
89 'pyin' : None,
89 'pyin' : None,
90 'pyout': None,
90 'pyout': None,
91 'pyerr': None,
91 'pyerr': None,
92 'stdout': '',
92 'stdout': '',
93 'stderr': '',
93 'stderr': '',
94 }
94 }
95
95
96
96
97 class EngineConnector(HasTraits):
97 class EngineConnector(HasTraits):
98 """A simple object for accessing the various zmq connections of an object.
98 """A simple object for accessing the various zmq connections of an object.
99 Attributes are:
99 Attributes are:
100 id (int): engine ID
100 id (int): engine ID
101 uuid (str): uuid (unused?)
101 uuid (str): uuid (unused?)
102 queue (str): identity of queue's XREQ socket
102 queue (str): identity of queue's XREQ socket
103 registration (str): identity of registration XREQ socket
103 registration (str): identity of registration XREQ socket
104 heartbeat (str): identity of heartbeat XREQ socket
104 heartbeat (str): identity of heartbeat XREQ socket
105 """
105 """
106 id=Int(0)
106 id=Int(0)
107 queue=Str()
107 queue=Str()
108 control=Str()
108 control=Str()
109 registration=Str()
109 registration=Str()
110 heartbeat=Str()
110 heartbeat=Str()
111 pending=Set()
111 pending=Set()
112
112
113 class HubFactory(RegistrationFactory):
113 class HubFactory(RegistrationFactory):
114 """The Configurable for setting up a Hub."""
114 """The Configurable for setting up a Hub."""
115
115
116 # name of a scheduler scheme
116 # name of a scheduler scheme
117 scheme = Str('leastload', config=True)
117 scheme = Str('leastload', config=True)
118
118
119 # port-pairs for monitoredqueues:
119 # port-pairs for monitoredqueues:
120 hb = Instance(list, config=True)
120 hb = Instance(list, config=True)
121 def _hb_default(self):
121 def _hb_default(self):
122 return select_random_ports(2)
122 return select_random_ports(2)
123
123
124 mux = Instance(list, config=True)
124 mux = Instance(list, config=True)
125 def _mux_default(self):
125 def _mux_default(self):
126 return select_random_ports(2)
126 return select_random_ports(2)
127
127
128 task = Instance(list, config=True)
128 task = Instance(list, config=True)
129 def _task_default(self):
129 def _task_default(self):
130 return select_random_ports(2)
130 return select_random_ports(2)
131
131
132 control = Instance(list, config=True)
132 control = Instance(list, config=True)
133 def _control_default(self):
133 def _control_default(self):
134 return select_random_ports(2)
134 return select_random_ports(2)
135
135
136 iopub = Instance(list, config=True)
136 iopub = Instance(list, config=True)
137 def _iopub_default(self):
137 def _iopub_default(self):
138 return select_random_ports(2)
138 return select_random_ports(2)
139
139
140 # single ports:
140 # single ports:
141 mon_port = Instance(int, config=True)
141 mon_port = Instance(int, config=True)
142 def _mon_port_default(self):
142 def _mon_port_default(self):
143 return select_random_ports(1)[0]
143 return select_random_ports(1)[0]
144
144
145 notifier_port = Instance(int, config=True)
145 notifier_port = Instance(int, config=True)
146 def _notifier_port_default(self):
146 def _notifier_port_default(self):
147 return select_random_ports(1)[0]
147 return select_random_ports(1)[0]
148
148
149 ping = Int(1000, config=True) # ping frequency
149 ping = Int(1000, config=True) # ping frequency
150
150
151 engine_ip = CStr('127.0.0.1', config=True)
151 engine_ip = CStr('127.0.0.1', config=True)
152 engine_transport = CStr('tcp', config=True)
152 engine_transport = CStr('tcp', config=True)
153
153
154 client_ip = CStr('127.0.0.1', config=True)
154 client_ip = CStr('127.0.0.1', config=True)
155 client_transport = CStr('tcp', config=True)
155 client_transport = CStr('tcp', config=True)
156
156
157 monitor_ip = CStr('127.0.0.1', config=True)
157 monitor_ip = CStr('127.0.0.1', config=True)
158 monitor_transport = CStr('tcp', config=True)
158 monitor_transport = CStr('tcp', config=True)
159
159
160 monitor_url = CStr('')
160 monitor_url = CStr('')
161
161
162 db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True)
162 db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True)
163
163
164 # not configurable
164 # not configurable
165 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
165 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
166 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
166 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
167 subconstructors = List()
167 subconstructors = List()
168 _constructed = Bool(False)
168 _constructed = Bool(False)
169
169
170 def _ip_changed(self, name, old, new):
170 def _ip_changed(self, name, old, new):
171 self.engine_ip = new
171 self.engine_ip = new
172 self.client_ip = new
172 self.client_ip = new
173 self.monitor_ip = new
173 self.monitor_ip = new
174 self._update_monitor_url()
174 self._update_monitor_url()
175
175
176 def _update_monitor_url(self):
176 def _update_monitor_url(self):
177 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
177 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
178
178
179 def _transport_changed(self, name, old, new):
179 def _transport_changed(self, name, old, new):
180 self.engine_transport = new
180 self.engine_transport = new
181 self.client_transport = new
181 self.client_transport = new
182 self.monitor_transport = new
182 self.monitor_transport = new
183 self._update_monitor_url()
183 self._update_monitor_url()
184
184
185 def __init__(self, **kwargs):
185 def __init__(self, **kwargs):
186 super(HubFactory, self).__init__(**kwargs)
186 super(HubFactory, self).__init__(**kwargs)
187 self._update_monitor_url()
187 self._update_monitor_url()
188 # self.on_trait_change(self._sync_ips, 'ip')
188 # self.on_trait_change(self._sync_ips, 'ip')
189 # self.on_trait_change(self._sync_transports, 'transport')
189 # self.on_trait_change(self._sync_transports, 'transport')
190 self.subconstructors.append(self.construct_hub)
190 self.subconstructors.append(self.construct_hub)
191
191
192
192
193 def construct(self):
193 def construct(self):
194 assert not self._constructed, "already constructed!"
194 assert not self._constructed, "already constructed!"
195
195
196 for subc in self.subconstructors:
196 for subc in self.subconstructors:
197 subc()
197 subc()
198
198
199 self._constructed = True
199 self._constructed = True
200
200
201
201
202 def start(self):
202 def start(self):
203 assert self._constructed, "must be constructed by self.construct() first!"
203 assert self._constructed, "must be constructed by self.construct() first!"
204 self.heartmonitor.start()
204 self.heartmonitor.start()
205 self.log.info("Heartmonitor started")
205 self.log.info("Heartmonitor started")
206
206
207 def construct_hub(self):
207 def construct_hub(self):
208 """construct"""
208 """construct"""
209 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
209 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
210 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
210 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
211
211
212 ctx = self.context
212 ctx = self.context
213 loop = self.loop
213 loop = self.loop
214
214
215 # Registrar socket
215 # Registrar socket
216 q = ZMQStream(ctx.socket(zmq.XREP), loop)
216 q = ZMQStream(ctx.socket(zmq.XREP), loop)
217 q.bind(client_iface % self.regport)
217 q.bind(client_iface % self.regport)
218 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
218 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
219 if self.client_ip != self.engine_ip:
219 if self.client_ip != self.engine_ip:
220 q.bind(engine_iface % self.regport)
220 q.bind(engine_iface % self.regport)
221 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
221 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
222
222
223 ### Engine connections ###
223 ### Engine connections ###
224
224
225 # heartbeat
225 # heartbeat
226 hpub = ctx.socket(zmq.PUB)
226 hpub = ctx.socket(zmq.PUB)
227 hpub.bind(engine_iface % self.hb[0])
227 hpub.bind(engine_iface % self.hb[0])
228 hrep = ctx.socket(zmq.XREP)
228 hrep = ctx.socket(zmq.XREP)
229 hrep.bind(engine_iface % self.hb[1])
229 hrep.bind(engine_iface % self.hb[1])
230 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
230 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
231 period=self.ping, logname=self.log.name)
231 period=self.ping, logname=self.log.name)
232
232
233 ### Client connections ###
233 ### Client connections ###
234 # Notifier socket
234 # Notifier socket
235 n = ZMQStream(ctx.socket(zmq.PUB), loop)
235 n = ZMQStream(ctx.socket(zmq.PUB), loop)
236 n.bind(client_iface%self.notifier_port)
236 n.bind(client_iface%self.notifier_port)
237
237
238 ### build and launch the queues ###
238 ### build and launch the queues ###
239
239
240 # monitor socket
240 # monitor socket
241 sub = ctx.socket(zmq.SUB)
241 sub = ctx.socket(zmq.SUB)
242 sub.setsockopt(zmq.SUBSCRIBE, "")
242 sub.setsockopt(zmq.SUBSCRIBE, "")
243 sub.bind(self.monitor_url)
243 sub.bind(self.monitor_url)
244 sub.bind('inproc://monitor')
244 sub.bind('inproc://monitor')
245 sub = ZMQStream(sub, loop)
245 sub = ZMQStream(sub, loop)
246
246
247 # connect the db
247 # connect the db
248 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
248 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
249 # cdir = self.config.Global.cluster_dir
249 # cdir = self.config.Global.cluster_dir
250 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
250 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
251 time.sleep(.25)
251 time.sleep(.25)
252
252
253 # build connection dicts
253 # build connection dicts
254 self.engine_info = {
254 self.engine_info = {
255 'control' : engine_iface%self.control[1],
255 'control' : engine_iface%self.control[1],
256 'mux': engine_iface%self.mux[1],
256 'mux': engine_iface%self.mux[1],
257 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
257 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
258 'task' : engine_iface%self.task[1],
258 'task' : engine_iface%self.task[1],
259 'iopub' : engine_iface%self.iopub[1],
259 'iopub' : engine_iface%self.iopub[1],
260 # 'monitor' : engine_iface%self.mon_port,
260 # 'monitor' : engine_iface%self.mon_port,
261 }
261 }
262
262
263 self.client_info = {
263 self.client_info = {
264 'control' : client_iface%self.control[0],
264 'control' : client_iface%self.control[0],
265 'mux': client_iface%self.mux[0],
265 'mux': client_iface%self.mux[0],
266 'task' : (self.scheme, client_iface%self.task[0]),
266 'task' : (self.scheme, client_iface%self.task[0]),
267 'iopub' : client_iface%self.iopub[0],
267 'iopub' : client_iface%self.iopub[0],
268 'notification': client_iface%self.notifier_port
268 'notification': client_iface%self.notifier_port
269 }
269 }
270 self.log.debug("Hub engine addrs: %s"%self.engine_info)
270 self.log.debug("Hub engine addrs: %s"%self.engine_info)
271 self.log.debug("Hub client addrs: %s"%self.client_info)
271 self.log.debug("Hub client addrs: %s"%self.client_info)
272 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
272 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
273 query=q, notifier=n, db=self.db,
273 query=q, notifier=n, db=self.db,
274 engine_info=self.engine_info, client_info=self.client_info,
274 engine_info=self.engine_info, client_info=self.client_info,
275 logname=self.log.name)
275 logname=self.log.name)
276
276
277
277
278 class Hub(LoggingFactory):
278 class Hub(LoggingFactory):
279 """The IPython Controller Hub with 0MQ connections
279 """The IPython Controller Hub with 0MQ connections
280
280
281 Parameters
281 Parameters
282 ==========
282 ==========
283 loop: zmq IOLoop instance
283 loop: zmq IOLoop instance
284 session: StreamSession object
284 session: StreamSession object
285 <removed> context: zmq context for creating new connections (?)
285 <removed> context: zmq context for creating new connections (?)
286 queue: ZMQStream for monitoring the command queue (SUB)
286 queue: ZMQStream for monitoring the command queue (SUB)
287 query: ZMQStream for engine registration and client queries requests (XREP)
287 query: ZMQStream for engine registration and client queries requests (XREP)
288 heartbeat: HeartMonitor object checking the pulse of the engines
288 heartbeat: HeartMonitor object checking the pulse of the engines
289 notifier: ZMQStream for broadcasting engine registration changes (PUB)
289 notifier: ZMQStream for broadcasting engine registration changes (PUB)
290 db: connection to db for out of memory logging of commands
290 db: connection to db for out of memory logging of commands
291 NotImplemented
291 NotImplemented
292 engine_info: dict of zmq connection information for engines to connect
292 engine_info: dict of zmq connection information for engines to connect
293 to the queues.
293 to the queues.
294 client_info: dict of zmq connection information for engines to connect
294 client_info: dict of zmq connection information for engines to connect
295 to the queues.
295 to the queues.
296 """
296 """
297 # internal data structures:
297 # internal data structures:
298 ids=Set() # engine IDs
298 ids=Set() # engine IDs
299 keytable=Dict()
299 keytable=Dict()
300 by_ident=Dict()
300 by_ident=Dict()
301 engines=Dict()
301 engines=Dict()
302 clients=Dict()
302 clients=Dict()
303 hearts=Dict()
303 hearts=Dict()
304 pending=Set()
304 pending=Set()
305 queues=Dict() # pending msg_ids keyed by engine_id
305 queues=Dict() # pending msg_ids keyed by engine_id
306 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
306 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
307 completed=Dict() # completed msg_ids keyed by engine_id
307 completed=Dict() # completed msg_ids keyed by engine_id
308 all_completed=Set() # completed msg_ids keyed by engine_id
308 all_completed=Set() # completed msg_ids keyed by engine_id
309 dead_engines=Set() # completed msg_ids keyed by engine_id
309 dead_engines=Set() # completed msg_ids keyed by engine_id
310 # mia=None
310 unassigned=Set() # set of task msg_ds not yet assigned a destination
311 incoming_registrations=Dict()
311 incoming_registrations=Dict()
312 registration_timeout=Int()
312 registration_timeout=Int()
313 _idcounter=Int(0)
313 _idcounter=Int(0)
314
314
315 # objects from constructor:
315 # objects from constructor:
316 loop=Instance(ioloop.IOLoop)
316 loop=Instance(ioloop.IOLoop)
317 query=Instance(ZMQStream)
317 query=Instance(ZMQStream)
318 monitor=Instance(ZMQStream)
318 monitor=Instance(ZMQStream)
319 heartmonitor=Instance(HeartMonitor)
319 heartmonitor=Instance(HeartMonitor)
320 notifier=Instance(ZMQStream)
320 notifier=Instance(ZMQStream)
321 db=Instance(object)
321 db=Instance(object)
322 client_info=Dict()
322 client_info=Dict()
323 engine_info=Dict()
323 engine_info=Dict()
324
324
325
325
326 def __init__(self, **kwargs):
326 def __init__(self, **kwargs):
327 """
327 """
328 # universal:
328 # universal:
329 loop: IOLoop for creating future connections
329 loop: IOLoop for creating future connections
330 session: streamsession for sending serialized data
330 session: streamsession for sending serialized data
331 # engine:
331 # engine:
332 queue: ZMQStream for monitoring queue messages
332 queue: ZMQStream for monitoring queue messages
333 query: ZMQStream for engine+client registration and client requests
333 query: ZMQStream for engine+client registration and client requests
334 heartbeat: HeartMonitor object for tracking engines
334 heartbeat: HeartMonitor object for tracking engines
335 # extra:
335 # extra:
336 db: ZMQStream for db connection (NotImplemented)
336 db: ZMQStream for db connection (NotImplemented)
337 engine_info: zmq address/protocol dict for engine connections
337 engine_info: zmq address/protocol dict for engine connections
338 client_info: zmq address/protocol dict for client connections
338 client_info: zmq address/protocol dict for client connections
339 """
339 """
340
340
341 super(Hub, self).__init__(**kwargs)
341 super(Hub, self).__init__(**kwargs)
342 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
342 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
343
343
344 # validate connection dicts:
344 # validate connection dicts:
345 for k,v in self.client_info.iteritems():
345 for k,v in self.client_info.iteritems():
346 if k == 'task':
346 if k == 'task':
347 validate_url_container(v[1])
347 validate_url_container(v[1])
348 else:
348 else:
349 validate_url_container(v)
349 validate_url_container(v)
350 # validate_url_container(self.client_info)
350 # validate_url_container(self.client_info)
351 validate_url_container(self.engine_info)
351 validate_url_container(self.engine_info)
352
352
353 # register our callbacks
353 # register our callbacks
354 self.query.on_recv(self.dispatch_query)
354 self.query.on_recv(self.dispatch_query)
355 self.monitor.on_recv(self.dispatch_monitor_traffic)
355 self.monitor.on_recv(self.dispatch_monitor_traffic)
356
356
357 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
357 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
358 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
358 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
359
359
360 self.monitor_handlers = { 'in' : self.save_queue_request,
360 self.monitor_handlers = { 'in' : self.save_queue_request,
361 'out': self.save_queue_result,
361 'out': self.save_queue_result,
362 'intask': self.save_task_request,
362 'intask': self.save_task_request,
363 'outtask': self.save_task_result,
363 'outtask': self.save_task_result,
364 'tracktask': self.save_task_destination,
364 'tracktask': self.save_task_destination,
365 'incontrol': _passer,
365 'incontrol': _passer,
366 'outcontrol': _passer,
366 'outcontrol': _passer,
367 'iopub': self.save_iopub_message,
367 'iopub': self.save_iopub_message,
368 }
368 }
369
369
370 self.query_handlers = {'queue_request': self.queue_status,
370 self.query_handlers = {'queue_request': self.queue_status,
371 'result_request': self.get_results,
371 'result_request': self.get_results,
372 'purge_request': self.purge_results,
372 'purge_request': self.purge_results,
373 'load_request': self.check_load,
373 'load_request': self.check_load,
374 'resubmit_request': self.resubmit_task,
374 'resubmit_request': self.resubmit_task,
375 'shutdown_request': self.shutdown_request,
375 'shutdown_request': self.shutdown_request,
376 'registration_request' : self.register_engine,
376 'registration_request' : self.register_engine,
377 'unregistration_request' : self.unregister_engine,
377 'unregistration_request' : self.unregister_engine,
378 'connection_request': self.connection_request,
378 'connection_request': self.connection_request,
379 }
379 }
380
380
381 self.log.info("hub::created hub")
381 self.log.info("hub::created hub")
382
382
383 @property
383 @property
384 def _next_id(self):
384 def _next_id(self):
385 """gemerate a new ID.
385 """gemerate a new ID.
386
386
387 No longer reuse old ids, just count from 0."""
387 No longer reuse old ids, just count from 0."""
388 newid = self._idcounter
388 newid = self._idcounter
389 self._idcounter += 1
389 self._idcounter += 1
390 return newid
390 return newid
391 # newid = 0
391 # newid = 0
392 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
392 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
393 # # print newid, self.ids, self.incoming_registrations
393 # # print newid, self.ids, self.incoming_registrations
394 # while newid in self.ids or newid in incoming:
394 # while newid in self.ids or newid in incoming:
395 # newid += 1
395 # newid += 1
396 # return newid
396 # return newid
397
397
398 #-----------------------------------------------------------------------------
398 #-----------------------------------------------------------------------------
399 # message validation
399 # message validation
400 #-----------------------------------------------------------------------------
400 #-----------------------------------------------------------------------------
401
401
402 def _validate_targets(self, targets):
402 def _validate_targets(self, targets):
403 """turn any valid targets argument into a list of integer ids"""
403 """turn any valid targets argument into a list of integer ids"""
404 if targets is None:
404 if targets is None:
405 # default to all
405 # default to all
406 targets = self.ids
406 targets = self.ids
407
407
408 if isinstance(targets, (int,str,unicode)):
408 if isinstance(targets, (int,str,unicode)):
409 # only one target specified
409 # only one target specified
410 targets = [targets]
410 targets = [targets]
411 _targets = []
411 _targets = []
412 for t in targets:
412 for t in targets:
413 # map raw identities to ids
413 # map raw identities to ids
414 if isinstance(t, (str,unicode)):
414 if isinstance(t, (str,unicode)):
415 t = self.by_ident.get(t, t)
415 t = self.by_ident.get(t, t)
416 _targets.append(t)
416 _targets.append(t)
417 targets = _targets
417 targets = _targets
418 bad_targets = [ t for t in targets if t not in self.ids ]
418 bad_targets = [ t for t in targets if t not in self.ids ]
419 if bad_targets:
419 if bad_targets:
420 raise IndexError("No Such Engine: %r"%bad_targets)
420 raise IndexError("No Such Engine: %r"%bad_targets)
421 if not targets:
421 if not targets:
422 raise IndexError("No Engines Registered")
422 raise IndexError("No Engines Registered")
423 return targets
423 return targets
424
424
425 #-----------------------------------------------------------------------------
425 #-----------------------------------------------------------------------------
426 # dispatch methods (1 per stream)
426 # dispatch methods (1 per stream)
427 #-----------------------------------------------------------------------------
427 #-----------------------------------------------------------------------------
428
428
429 # def dispatch_registration_request(self, msg):
429 # def dispatch_registration_request(self, msg):
430 # """"""
430 # """"""
431 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
431 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
432 # idents,msg = self.session.feed_identities(msg)
432 # idents,msg = self.session.feed_identities(msg)
433 # if not idents:
433 # if not idents:
434 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
434 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
435 # return
435 # return
436 # try:
436 # try:
437 # msg = self.session.unpack_message(msg,content=True)
437 # msg = self.session.unpack_message(msg,content=True)
438 # except:
438 # except:
439 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
439 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
440 # return
440 # return
441 #
441 #
442 # msg_type = msg['msg_type']
442 # msg_type = msg['msg_type']
443 # content = msg['content']
443 # content = msg['content']
444 #
444 #
445 # handler = self.query_handlers.get(msg_type, None)
445 # handler = self.query_handlers.get(msg_type, None)
446 # if handler is None:
446 # if handler is None:
447 # self.log.error("registration::got bad registration message: %s"%msg)
447 # self.log.error("registration::got bad registration message: %s"%msg)
448 # else:
448 # else:
449 # handler(idents, msg)
449 # handler(idents, msg)
450
450
451 def dispatch_monitor_traffic(self, msg):
451 def dispatch_monitor_traffic(self, msg):
452 """all ME and Task queue messages come through here, as well as
452 """all ME and Task queue messages come through here, as well as
453 IOPub traffic."""
453 IOPub traffic."""
454 self.log.debug("monitor traffic: %s"%msg[:2])
454 self.log.debug("monitor traffic: %s"%msg[:2])
455 switch = msg[0]
455 switch = msg[0]
456 idents, msg = self.session.feed_identities(msg[1:])
456 idents, msg = self.session.feed_identities(msg[1:])
457 if not idents:
457 if not idents:
458 self.log.error("Bad Monitor Message: %s"%msg)
458 self.log.error("Bad Monitor Message: %s"%msg)
459 return
459 return
460 handler = self.monitor_handlers.get(switch, None)
460 handler = self.monitor_handlers.get(switch, None)
461 if handler is not None:
461 if handler is not None:
462 handler(idents, msg)
462 handler(idents, msg)
463 else:
463 else:
464 self.log.error("Invalid monitor topic: %s"%switch)
464 self.log.error("Invalid monitor topic: %s"%switch)
465
465
466
466
467 def dispatch_query(self, msg):
467 def dispatch_query(self, msg):
468 """Route registration requests and queries from clients."""
468 """Route registration requests and queries from clients."""
469 idents, msg = self.session.feed_identities(msg)
469 idents, msg = self.session.feed_identities(msg)
470 if not idents:
470 if not idents:
471 self.log.error("Bad Query Message: %s"%msg)
471 self.log.error("Bad Query Message: %s"%msg)
472 return
472 return
473 client_id = idents[0]
473 client_id = idents[0]
474 try:
474 try:
475 msg = self.session.unpack_message(msg, content=True)
475 msg = self.session.unpack_message(msg, content=True)
476 except:
476 except:
477 content = error.wrap_exception()
477 content = error.wrap_exception()
478 self.log.error("Bad Query Message: %s"%msg, exc_info=True)
478 self.log.error("Bad Query Message: %s"%msg, exc_info=True)
479 self.session.send(self.query, "hub_error", ident=client_id,
479 self.session.send(self.query, "hub_error", ident=client_id,
480 content=content)
480 content=content)
481 return
481 return
482
482
483 # print client_id, header, parent, content
483 # print client_id, header, parent, content
484 #switch on message type:
484 #switch on message type:
485 msg_type = msg['msg_type']
485 msg_type = msg['msg_type']
486 self.log.info("client::client %s requested %s"%(client_id, msg_type))
486 self.log.info("client::client %s requested %s"%(client_id, msg_type))
487 handler = self.query_handlers.get(msg_type, None)
487 handler = self.query_handlers.get(msg_type, None)
488 try:
488 try:
489 assert handler is not None, "Bad Message Type: %s"%msg_type
489 assert handler is not None, "Bad Message Type: %s"%msg_type
490 except:
490 except:
491 content = error.wrap_exception()
491 content = error.wrap_exception()
492 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
492 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
493 self.session.send(self.query, "hub_error", ident=client_id,
493 self.session.send(self.query, "hub_error", ident=client_id,
494 content=content)
494 content=content)
495 return
495 return
496 else:
496 else:
497 handler(idents, msg)
497 handler(idents, msg)
498
498
499 def dispatch_db(self, msg):
499 def dispatch_db(self, msg):
500 """"""
500 """"""
501 raise NotImplementedError
501 raise NotImplementedError
502
502
503 #---------------------------------------------------------------------------
503 #---------------------------------------------------------------------------
504 # handler methods (1 per event)
504 # handler methods (1 per event)
505 #---------------------------------------------------------------------------
505 #---------------------------------------------------------------------------
506
506
507 #----------------------- Heartbeat --------------------------------------
507 #----------------------- Heartbeat --------------------------------------
508
508
509 def handle_new_heart(self, heart):
509 def handle_new_heart(self, heart):
510 """handler to attach to heartbeater.
510 """handler to attach to heartbeater.
511 Called when a new heart starts to beat.
511 Called when a new heart starts to beat.
512 Triggers completion of registration."""
512 Triggers completion of registration."""
513 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
513 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
514 if heart not in self.incoming_registrations:
514 if heart not in self.incoming_registrations:
515 self.log.info("heartbeat::ignoring new heart: %r"%heart)
515 self.log.info("heartbeat::ignoring new heart: %r"%heart)
516 else:
516 else:
517 self.finish_registration(heart)
517 self.finish_registration(heart)
518
518
519
519
520 def handle_heart_failure(self, heart):
520 def handle_heart_failure(self, heart):
521 """handler to attach to heartbeater.
521 """handler to attach to heartbeater.
522 called when a previously registered heart fails to respond to beat request.
522 called when a previously registered heart fails to respond to beat request.
523 triggers unregistration"""
523 triggers unregistration"""
524 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
524 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
525 eid = self.hearts.get(heart, None)
525 eid = self.hearts.get(heart, None)
526 queue = self.engines[eid].queue
526 queue = self.engines[eid].queue
527 if eid is None:
527 if eid is None:
528 self.log.info("heartbeat::ignoring heart failure %r"%heart)
528 self.log.info("heartbeat::ignoring heart failure %r"%heart)
529 else:
529 else:
530 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
530 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
531
531
532 #----------------------- MUX Queue Traffic ------------------------------
532 #----------------------- MUX Queue Traffic ------------------------------
533
533
534 def save_queue_request(self, idents, msg):
534 def save_queue_request(self, idents, msg):
535 if len(idents) < 2:
535 if len(idents) < 2:
536 self.log.error("invalid identity prefix: %s"%idents)
536 self.log.error("invalid identity prefix: %s"%idents)
537 return
537 return
538 queue_id, client_id = idents[:2]
538 queue_id, client_id = idents[:2]
539 try:
539 try:
540 msg = self.session.unpack_message(msg, content=False)
540 msg = self.session.unpack_message(msg, content=False)
541 except:
541 except:
542 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
542 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
543 return
543 return
544
544
545 eid = self.by_ident.get(queue_id, None)
545 eid = self.by_ident.get(queue_id, None)
546 if eid is None:
546 if eid is None:
547 self.log.error("queue::target %r not registered"%queue_id)
547 self.log.error("queue::target %r not registered"%queue_id)
548 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
548 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
549 return
549 return
550
550
551 header = msg['header']
551 header = msg['header']
552 msg_id = header['msg_id']
552 msg_id = header['msg_id']
553 record = init_record(msg)
553 record = init_record(msg)
554 record['engine_uuid'] = queue_id
554 record['engine_uuid'] = queue_id
555 record['client_uuid'] = client_id
555 record['client_uuid'] = client_id
556 record['queue'] = 'mux'
556 record['queue'] = 'mux'
557
557
558 try:
558 try:
559 # it's posible iopub arrived first:
559 # it's posible iopub arrived first:
560 existing = self.db.get_record(msg_id)
560 existing = self.db.get_record(msg_id)
561 for key,evalue in existing.iteritems():
561 for key,evalue in existing.iteritems():
562 rvalue = record[key]
562 rvalue = record[key]
563 if evalue and rvalue and evalue != rvalue:
563 if evalue and rvalue and evalue != rvalue:
564 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
564 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
565 elif evalue and not rvalue:
565 elif evalue and not rvalue:
566 record[key] = evalue
566 record[key] = evalue
567 self.db.update_record(msg_id, record)
567 self.db.update_record(msg_id, record)
568 except KeyError:
568 except KeyError:
569 self.db.add_record(msg_id, record)
569 self.db.add_record(msg_id, record)
570
570
571 self.pending.add(msg_id)
571 self.pending.add(msg_id)
572 self.queues[eid].append(msg_id)
572 self.queues[eid].append(msg_id)
573
573
574 def save_queue_result(self, idents, msg):
574 def save_queue_result(self, idents, msg):
575 if len(idents) < 2:
575 if len(idents) < 2:
576 self.log.error("invalid identity prefix: %s"%idents)
576 self.log.error("invalid identity prefix: %s"%idents)
577 return
577 return
578
578
579 client_id, queue_id = idents[:2]
579 client_id, queue_id = idents[:2]
580 try:
580 try:
581 msg = self.session.unpack_message(msg, content=False)
581 msg = self.session.unpack_message(msg, content=False)
582 except:
582 except:
583 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
583 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
584 queue_id,client_id, msg), exc_info=True)
584 queue_id,client_id, msg), exc_info=True)
585 return
585 return
586
586
587 eid = self.by_ident.get(queue_id, None)
587 eid = self.by_ident.get(queue_id, None)
588 if eid is None:
588 if eid is None:
589 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
589 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
590 # self.log.debug("queue:: %s"%msg[2:])
590 # self.log.debug("queue:: %s"%msg[2:])
591 return
591 return
592
592
593 parent = msg['parent_header']
593 parent = msg['parent_header']
594 if not parent:
594 if not parent:
595 return
595 return
596 msg_id = parent['msg_id']
596 msg_id = parent['msg_id']
597 if msg_id in self.pending:
597 if msg_id in self.pending:
598 self.pending.remove(msg_id)
598 self.pending.remove(msg_id)
599 self.all_completed.add(msg_id)
599 self.all_completed.add(msg_id)
600 self.queues[eid].remove(msg_id)
600 self.queues[eid].remove(msg_id)
601 self.completed[eid].append(msg_id)
601 self.completed[eid].append(msg_id)
602 elif msg_id not in self.all_completed:
602 elif msg_id not in self.all_completed:
603 # it could be a result from a dead engine that died before delivering the
603 # it could be a result from a dead engine that died before delivering the
604 # result
604 # result
605 self.log.warn("queue:: unknown msg finished %s"%msg_id)
605 self.log.warn("queue:: unknown msg finished %s"%msg_id)
606 return
606 return
607 # update record anyway, because the unregistration could have been premature
607 # update record anyway, because the unregistration could have been premature
608 rheader = msg['header']
608 rheader = msg['header']
609 completed = datetime.strptime(rheader['date'], ISO8601)
609 completed = datetime.strptime(rheader['date'], ISO8601)
610 started = rheader.get('started', None)
610 started = rheader.get('started', None)
611 if started is not None:
611 if started is not None:
612 started = datetime.strptime(started, ISO8601)
612 started = datetime.strptime(started, ISO8601)
613 result = {
613 result = {
614 'result_header' : rheader,
614 'result_header' : rheader,
615 'result_content': msg['content'],
615 'result_content': msg['content'],
616 'started' : started,
616 'started' : started,
617 'completed' : completed
617 'completed' : completed
618 }
618 }
619
619
620 result['result_buffers'] = msg['buffers']
620 result['result_buffers'] = msg['buffers']
621 self.db.update_record(msg_id, result)
621 self.db.update_record(msg_id, result)
622
622
623
623
624 #--------------------- Task Queue Traffic ------------------------------
624 #--------------------- Task Queue Traffic ------------------------------
625
625
626 def save_task_request(self, idents, msg):
626 def save_task_request(self, idents, msg):
627 """Save the submission of a task."""
627 """Save the submission of a task."""
628 client_id = idents[0]
628 client_id = idents[0]
629
629
630 try:
630 try:
631 msg = self.session.unpack_message(msg, content=False)
631 msg = self.session.unpack_message(msg, content=False)
632 except:
632 except:
633 self.log.error("task::client %r sent invalid task message: %s"%(
633 self.log.error("task::client %r sent invalid task message: %s"%(
634 client_id, msg), exc_info=True)
634 client_id, msg), exc_info=True)
635 return
635 return
636 record = init_record(msg)
636 record = init_record(msg)
637
637
638 record['client_uuid'] = client_id
638 record['client_uuid'] = client_id
639 record['queue'] = 'task'
639 record['queue'] = 'task'
640 header = msg['header']
640 header = msg['header']
641 msg_id = header['msg_id']
641 msg_id = header['msg_id']
642 self.pending.add(msg_id)
642 self.pending.add(msg_id)
643 self.unassigned.add(msg_id)
643 try:
644 try:
644 # it's posible iopub arrived first:
645 # it's posible iopub arrived first:
645 existing = self.db.get_record(msg_id)
646 existing = self.db.get_record(msg_id)
646 for key,evalue in existing.iteritems():
647 for key,evalue in existing.iteritems():
647 rvalue = record[key]
648 rvalue = record[key]
648 if evalue and rvalue and evalue != rvalue:
649 if evalue and rvalue and evalue != rvalue:
649 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
650 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
650 elif evalue and not rvalue:
651 elif evalue and not rvalue:
651 record[key] = evalue
652 record[key] = evalue
652 self.db.update_record(msg_id, record)
653 self.db.update_record(msg_id, record)
653 except KeyError:
654 except KeyError:
654 self.db.add_record(msg_id, record)
655 self.db.add_record(msg_id, record)
655
656
656 def save_task_result(self, idents, msg):
657 def save_task_result(self, idents, msg):
657 """save the result of a completed task."""
658 """save the result of a completed task."""
658 client_id = idents[0]
659 client_id = idents[0]
659 try:
660 try:
660 msg = self.session.unpack_message(msg, content=False)
661 msg = self.session.unpack_message(msg, content=False)
661 except:
662 except:
662 self.log.error("task::invalid task result message send to %r: %s"%(
663 self.log.error("task::invalid task result message send to %r: %s"%(
663 client_id, msg), exc_info=True)
664 client_id, msg), exc_info=True)
664 raise
665 raise
665 return
666 return
666
667
667 parent = msg['parent_header']
668 parent = msg['parent_header']
668 if not parent:
669 if not parent:
669 # print msg
670 # print msg
670 self.log.warn("Task %r had no parent!"%msg)
671 self.log.warn("Task %r had no parent!"%msg)
671 return
672 return
672 msg_id = parent['msg_id']
673 msg_id = parent['msg_id']
674 if msg_id in self.unassigned:
675 self.unassigned.remove(msg_id)
673
676
674 header = msg['header']
677 header = msg['header']
675 engine_uuid = header.get('engine', None)
678 engine_uuid = header.get('engine', None)
676 eid = self.by_ident.get(engine_uuid, None)
679 eid = self.by_ident.get(engine_uuid, None)
677
680
678 if msg_id in self.pending:
681 if msg_id in self.pending:
679 self.pending.remove(msg_id)
682 self.pending.remove(msg_id)
680 self.all_completed.add(msg_id)
683 self.all_completed.add(msg_id)
681 if eid is not None:
684 if eid is not None:
682 self.completed[eid].append(msg_id)
685 self.completed[eid].append(msg_id)
683 if msg_id in self.tasks[eid]:
686 if msg_id in self.tasks[eid]:
684 self.tasks[eid].remove(msg_id)
687 self.tasks[eid].remove(msg_id)
685 completed = datetime.strptime(header['date'], ISO8601)
688 completed = datetime.strptime(header['date'], ISO8601)
686 started = header.get('started', None)
689 started = header.get('started', None)
687 if started is not None:
690 if started is not None:
688 started = datetime.strptime(started, ISO8601)
691 started = datetime.strptime(started, ISO8601)
689 result = {
692 result = {
690 'result_header' : header,
693 'result_header' : header,
691 'result_content': msg['content'],
694 'result_content': msg['content'],
692 'started' : started,
695 'started' : started,
693 'completed' : completed,
696 'completed' : completed,
694 'engine_uuid': engine_uuid
697 'engine_uuid': engine_uuid
695 }
698 }
696
699
697 result['result_buffers'] = msg['buffers']
700 result['result_buffers'] = msg['buffers']
698 self.db.update_record(msg_id, result)
701 self.db.update_record(msg_id, result)
699
702
700 else:
703 else:
701 self.log.debug("task::unknown task %s finished"%msg_id)
704 self.log.debug("task::unknown task %s finished"%msg_id)
702
705
703 def save_task_destination(self, idents, msg):
706 def save_task_destination(self, idents, msg):
704 try:
707 try:
705 msg = self.session.unpack_message(msg, content=True)
708 msg = self.session.unpack_message(msg, content=True)
706 except:
709 except:
707 self.log.error("task::invalid task tracking message", exc_info=True)
710 self.log.error("task::invalid task tracking message", exc_info=True)
708 return
711 return
709 content = msg['content']
712 content = msg['content']
710 # print (content)
713 # print (content)
711 msg_id = content['msg_id']
714 msg_id = content['msg_id']
712 engine_uuid = content['engine_id']
715 engine_uuid = content['engine_id']
713 eid = self.by_ident[engine_uuid]
716 eid = self.by_ident[engine_uuid]
714
717
715 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
718 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
716 # if msg_id in self.mia:
719 if msg_id in self.unassigned:
717 # self.mia.remove(msg_id)
720 self.unassigned.remove(msg_id)
718 # else:
721 # else:
719 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
722 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
720
723
721 self.tasks[eid].append(msg_id)
724 self.tasks[eid].append(msg_id)
722 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
725 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
723 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
726 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
724
727
725 def mia_task_request(self, idents, msg):
728 def mia_task_request(self, idents, msg):
726 raise NotImplementedError
729 raise NotImplementedError
727 client_id = idents[0]
730 client_id = idents[0]
728 # content = dict(mia=self.mia,status='ok')
731 # content = dict(mia=self.mia,status='ok')
729 # self.session.send('mia_reply', content=content, idents=client_id)
732 # self.session.send('mia_reply', content=content, idents=client_id)
730
733
731
734
732 #--------------------- IOPub Traffic ------------------------------
735 #--------------------- IOPub Traffic ------------------------------
733
736
734 def save_iopub_message(self, topics, msg):
737 def save_iopub_message(self, topics, msg):
735 """save an iopub message into the db"""
738 """save an iopub message into the db"""
736 # print (topics)
739 # print (topics)
737 try:
740 try:
738 msg = self.session.unpack_message(msg, content=True)
741 msg = self.session.unpack_message(msg, content=True)
739 except:
742 except:
740 self.log.error("iopub::invalid IOPub message", exc_info=True)
743 self.log.error("iopub::invalid IOPub message", exc_info=True)
741 return
744 return
742
745
743 parent = msg['parent_header']
746 parent = msg['parent_header']
744 if not parent:
747 if not parent:
745 self.log.error("iopub::invalid IOPub message: %s"%msg)
748 self.log.error("iopub::invalid IOPub message: %s"%msg)
746 return
749 return
747 msg_id = parent['msg_id']
750 msg_id = parent['msg_id']
748 msg_type = msg['msg_type']
751 msg_type = msg['msg_type']
749 content = msg['content']
752 content = msg['content']
750
753
751 # ensure msg_id is in db
754 # ensure msg_id is in db
752 try:
755 try:
753 rec = self.db.get_record(msg_id)
756 rec = self.db.get_record(msg_id)
754 except KeyError:
757 except KeyError:
755 rec = empty_record()
758 rec = empty_record()
756 rec['msg_id'] = msg_id
759 rec['msg_id'] = msg_id
757 self.db.add_record(msg_id, rec)
760 self.db.add_record(msg_id, rec)
758 # stream
761 # stream
759 d = {}
762 d = {}
760 if msg_type == 'stream':
763 if msg_type == 'stream':
761 name = content['name']
764 name = content['name']
762 s = rec[name] or ''
765 s = rec[name] or ''
763 d[name] = s + content['data']
766 d[name] = s + content['data']
764
767
765 elif msg_type == 'pyerr':
768 elif msg_type == 'pyerr':
766 d['pyerr'] = content
769 d['pyerr'] = content
767 elif msg_type == 'pyin':
770 elif msg_type == 'pyin':
768 d['pyin'] = content['code']
771 d['pyin'] = content['code']
769 else:
772 else:
770 d[msg_type] = content.get('data', '')
773 d[msg_type] = content.get('data', '')
771
774
772 self.db.update_record(msg_id, d)
775 self.db.update_record(msg_id, d)
773
776
774
777
775
778
776 #-------------------------------------------------------------------------
779 #-------------------------------------------------------------------------
777 # Registration requests
780 # Registration requests
778 #-------------------------------------------------------------------------
781 #-------------------------------------------------------------------------
779
782
780 def connection_request(self, client_id, msg):
783 def connection_request(self, client_id, msg):
781 """Reply with connection addresses for clients."""
784 """Reply with connection addresses for clients."""
782 self.log.info("client::client %s connected"%client_id)
785 self.log.info("client::client %s connected"%client_id)
783 content = dict(status='ok')
786 content = dict(status='ok')
784 content.update(self.client_info)
787 content.update(self.client_info)
785 jsonable = {}
788 jsonable = {}
786 for k,v in self.keytable.iteritems():
789 for k,v in self.keytable.iteritems():
787 if v not in self.dead_engines:
790 if v not in self.dead_engines:
788 jsonable[str(k)] = v
791 jsonable[str(k)] = v
789 content['engines'] = jsonable
792 content['engines'] = jsonable
790 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
793 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
791
794
792 def register_engine(self, reg, msg):
795 def register_engine(self, reg, msg):
793 """Register a new engine."""
796 """Register a new engine."""
794 content = msg['content']
797 content = msg['content']
795 try:
798 try:
796 queue = content['queue']
799 queue = content['queue']
797 except KeyError:
800 except KeyError:
798 self.log.error("registration::queue not specified", exc_info=True)
801 self.log.error("registration::queue not specified", exc_info=True)
799 return
802 return
800 heart = content.get('heartbeat', None)
803 heart = content.get('heartbeat', None)
801 """register a new engine, and create the socket(s) necessary"""
804 """register a new engine, and create the socket(s) necessary"""
802 eid = self._next_id
805 eid = self._next_id
803 # print (eid, queue, reg, heart)
806 # print (eid, queue, reg, heart)
804
807
805 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
808 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
806
809
807 content = dict(id=eid,status='ok')
810 content = dict(id=eid,status='ok')
808 content.update(self.engine_info)
811 content.update(self.engine_info)
809 # check if requesting available IDs:
812 # check if requesting available IDs:
810 if queue in self.by_ident:
813 if queue in self.by_ident:
811 try:
814 try:
812 raise KeyError("queue_id %r in use"%queue)
815 raise KeyError("queue_id %r in use"%queue)
813 except:
816 except:
814 content = error.wrap_exception()
817 content = error.wrap_exception()
815 self.log.error("queue_id %r in use"%queue, exc_info=True)
818 self.log.error("queue_id %r in use"%queue, exc_info=True)
816 elif heart in self.hearts: # need to check unique hearts?
819 elif heart in self.hearts: # need to check unique hearts?
817 try:
820 try:
818 raise KeyError("heart_id %r in use"%heart)
821 raise KeyError("heart_id %r in use"%heart)
819 except:
822 except:
820 self.log.error("heart_id %r in use"%heart, exc_info=True)
823 self.log.error("heart_id %r in use"%heart, exc_info=True)
821 content = error.wrap_exception()
824 content = error.wrap_exception()
822 else:
825 else:
823 for h, pack in self.incoming_registrations.iteritems():
826 for h, pack in self.incoming_registrations.iteritems():
824 if heart == h:
827 if heart == h:
825 try:
828 try:
826 raise KeyError("heart_id %r in use"%heart)
829 raise KeyError("heart_id %r in use"%heart)
827 except:
830 except:
828 self.log.error("heart_id %r in use"%heart, exc_info=True)
831 self.log.error("heart_id %r in use"%heart, exc_info=True)
829 content = error.wrap_exception()
832 content = error.wrap_exception()
830 break
833 break
831 elif queue == pack[1]:
834 elif queue == pack[1]:
832 try:
835 try:
833 raise KeyError("queue_id %r in use"%queue)
836 raise KeyError("queue_id %r in use"%queue)
834 except:
837 except:
835 self.log.error("queue_id %r in use"%queue, exc_info=True)
838 self.log.error("queue_id %r in use"%queue, exc_info=True)
836 content = error.wrap_exception()
839 content = error.wrap_exception()
837 break
840 break
838
841
839 msg = self.session.send(self.query, "registration_reply",
842 msg = self.session.send(self.query, "registration_reply",
840 content=content,
843 content=content,
841 ident=reg)
844 ident=reg)
842
845
843 if content['status'] == 'ok':
846 if content['status'] == 'ok':
844 if heart in self.heartmonitor.hearts:
847 if heart in self.heartmonitor.hearts:
845 # already beating
848 # already beating
846 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
849 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
847 self.finish_registration(heart)
850 self.finish_registration(heart)
848 else:
851 else:
849 purge = lambda : self._purge_stalled_registration(heart)
852 purge = lambda : self._purge_stalled_registration(heart)
850 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
853 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
851 dc.start()
854 dc.start()
852 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
855 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
853 else:
856 else:
854 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
857 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
855 return eid
858 return eid
856
859
857 def unregister_engine(self, ident, msg):
860 def unregister_engine(self, ident, msg):
858 """Unregister an engine that explicitly requested to leave."""
861 """Unregister an engine that explicitly requested to leave."""
859 try:
862 try:
860 eid = msg['content']['id']
863 eid = msg['content']['id']
861 except:
864 except:
862 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
865 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
863 return
866 return
864 self.log.info("registration::unregister_engine(%s)"%eid)
867 self.log.info("registration::unregister_engine(%s)"%eid)
865 # print (eid)
868 # print (eid)
866 uuid = self.keytable[eid]
869 uuid = self.keytable[eid]
867 content=dict(id=eid, queue=uuid)
870 content=dict(id=eid, queue=uuid)
868 self.dead_engines.add(uuid)
871 self.dead_engines.add(uuid)
869 # self.ids.remove(eid)
872 # self.ids.remove(eid)
870 # uuid = self.keytable.pop(eid)
873 # uuid = self.keytable.pop(eid)
871 #
874 #
872 # ec = self.engines.pop(eid)
875 # ec = self.engines.pop(eid)
873 # self.hearts.pop(ec.heartbeat)
876 # self.hearts.pop(ec.heartbeat)
874 # self.by_ident.pop(ec.queue)
877 # self.by_ident.pop(ec.queue)
875 # self.completed.pop(eid)
878 # self.completed.pop(eid)
876 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
879 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
877 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
880 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
878 dc.start()
881 dc.start()
879 ############## TODO: HANDLE IT ################
882 ############## TODO: HANDLE IT ################
880
883
881 if self.notifier:
884 if self.notifier:
882 self.session.send(self.notifier, "unregistration_notification", content=content)
885 self.session.send(self.notifier, "unregistration_notification", content=content)
883
886
884 def _handle_stranded_msgs(self, eid, uuid):
887 def _handle_stranded_msgs(self, eid, uuid):
885 """Handle messages known to be on an engine when the engine unregisters.
888 """Handle messages known to be on an engine when the engine unregisters.
886
889
887 It is possible that this will fire prematurely - that is, an engine will
890 It is possible that this will fire prematurely - that is, an engine will
888 go down after completing a result, and the client will be notified
891 go down after completing a result, and the client will be notified
889 that the result failed and later receive the actual result.
892 that the result failed and later receive the actual result.
890 """
893 """
891
894
892 outstanding = self.queues[eid]
895 outstanding = self.queues[eid]
893
896
894 for msg_id in outstanding:
897 for msg_id in outstanding:
895 self.pending.remove(msg_id)
898 self.pending.remove(msg_id)
896 self.all_completed.add(msg_id)
899 self.all_completed.add(msg_id)
897 try:
900 try:
898 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
901 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
899 except:
902 except:
900 content = error.wrap_exception()
903 content = error.wrap_exception()
901 # build a fake header:
904 # build a fake header:
902 header = {}
905 header = {}
903 header['engine'] = uuid
906 header['engine'] = uuid
904 header['date'] = datetime.now().strftime(ISO8601)
907 header['date'] = datetime.now().strftime(ISO8601)
905 rec = dict(result_content=content, result_header=header, result_buffers=[])
908 rec = dict(result_content=content, result_header=header, result_buffers=[])
906 rec['completed'] = header['date']
909 rec['completed'] = header['date']
907 rec['engine_uuid'] = uuid
910 rec['engine_uuid'] = uuid
908 self.db.update_record(msg_id, rec)
911 self.db.update_record(msg_id, rec)
909
912
910 def finish_registration(self, heart):
913 def finish_registration(self, heart):
911 """Second half of engine registration, called after our HeartMonitor
914 """Second half of engine registration, called after our HeartMonitor
912 has received a beat from the Engine's Heart."""
915 has received a beat from the Engine's Heart."""
913 try:
916 try:
914 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
917 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
915 except KeyError:
918 except KeyError:
916 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
919 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
917 return
920 return
918 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
921 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
919 if purge is not None:
922 if purge is not None:
920 purge.stop()
923 purge.stop()
921 control = queue
924 control = queue
922 self.ids.add(eid)
925 self.ids.add(eid)
923 self.keytable[eid] = queue
926 self.keytable[eid] = queue
924 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
927 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
925 control=control, heartbeat=heart)
928 control=control, heartbeat=heart)
926 self.by_ident[queue] = eid
929 self.by_ident[queue] = eid
927 self.queues[eid] = list()
930 self.queues[eid] = list()
928 self.tasks[eid] = list()
931 self.tasks[eid] = list()
929 self.completed[eid] = list()
932 self.completed[eid] = list()
930 self.hearts[heart] = eid
933 self.hearts[heart] = eid
931 content = dict(id=eid, queue=self.engines[eid].queue)
934 content = dict(id=eid, queue=self.engines[eid].queue)
932 if self.notifier:
935 if self.notifier:
933 self.session.send(self.notifier, "registration_notification", content=content)
936 self.session.send(self.notifier, "registration_notification", content=content)
934 self.log.info("engine::Engine Connected: %i"%eid)
937 self.log.info("engine::Engine Connected: %i"%eid)
935
938
936 def _purge_stalled_registration(self, heart):
939 def _purge_stalled_registration(self, heart):
937 if heart in self.incoming_registrations:
940 if heart in self.incoming_registrations:
938 eid = self.incoming_registrations.pop(heart)[0]
941 eid = self.incoming_registrations.pop(heart)[0]
939 self.log.info("registration::purging stalled registration: %i"%eid)
942 self.log.info("registration::purging stalled registration: %i"%eid)
940 else:
943 else:
941 pass
944 pass
942
945
943 #-------------------------------------------------------------------------
946 #-------------------------------------------------------------------------
944 # Client Requests
947 # Client Requests
945 #-------------------------------------------------------------------------
948 #-------------------------------------------------------------------------
946
949
947 def shutdown_request(self, client_id, msg):
950 def shutdown_request(self, client_id, msg):
948 """handle shutdown request."""
951 """handle shutdown request."""
949 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
952 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
950 # also notify other clients of shutdown
953 # also notify other clients of shutdown
951 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
954 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
952 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
955 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
953 dc.start()
956 dc.start()
954
957
955 def _shutdown(self):
958 def _shutdown(self):
956 self.log.info("hub::hub shutting down.")
959 self.log.info("hub::hub shutting down.")
957 time.sleep(0.1)
960 time.sleep(0.1)
958 sys.exit(0)
961 sys.exit(0)
959
962
960
963
961 def check_load(self, client_id, msg):
964 def check_load(self, client_id, msg):
962 content = msg['content']
965 content = msg['content']
963 try:
966 try:
964 targets = content['targets']
967 targets = content['targets']
965 targets = self._validate_targets(targets)
968 targets = self._validate_targets(targets)
966 except:
969 except:
967 content = error.wrap_exception()
970 content = error.wrap_exception()
968 self.session.send(self.query, "hub_error",
971 self.session.send(self.query, "hub_error",
969 content=content, ident=client_id)
972 content=content, ident=client_id)
970 return
973 return
971
974
972 content = dict(status='ok')
975 content = dict(status='ok')
973 # loads = {}
976 # loads = {}
974 for t in targets:
977 for t in targets:
975 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
978 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
976 self.session.send(self.query, "load_reply", content=content, ident=client_id)
979 self.session.send(self.query, "load_reply", content=content, ident=client_id)
977
980
978
981
979 def queue_status(self, client_id, msg):
982 def queue_status(self, client_id, msg):
980 """Return the Queue status of one or more targets.
983 """Return the Queue status of one or more targets.
981 if verbose: return the msg_ids
984 if verbose: return the msg_ids
982 else: return len of each type.
985 else: return len of each type.
983 keys: queue (pending MUX jobs)
986 keys: queue (pending MUX jobs)
984 tasks (pending Task jobs)
987 tasks (pending Task jobs)
985 completed (finished jobs from both queues)"""
988 completed (finished jobs from both queues)"""
986 content = msg['content']
989 content = msg['content']
987 targets = content['targets']
990 targets = content['targets']
988 try:
991 try:
989 targets = self._validate_targets(targets)
992 targets = self._validate_targets(targets)
990 except:
993 except:
991 content = error.wrap_exception()
994 content = error.wrap_exception()
992 self.session.send(self.query, "hub_error",
995 self.session.send(self.query, "hub_error",
993 content=content, ident=client_id)
996 content=content, ident=client_id)
994 return
997 return
995 verbose = content.get('verbose', False)
998 verbose = content.get('verbose', False)
996 content = dict(status='ok')
999 content = dict(status='ok')
997 for t in targets:
1000 for t in targets:
998 queue = self.queues[t]
1001 queue = self.queues[t]
999 completed = self.completed[t]
1002 completed = self.completed[t]
1000 tasks = self.tasks[t]
1003 tasks = self.tasks[t]
1001 if not verbose:
1004 if not verbose:
1002 queue = len(queue)
1005 queue = len(queue)
1003 completed = len(completed)
1006 completed = len(completed)
1004 tasks = len(tasks)
1007 tasks = len(tasks)
1005 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1008 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1006 # pending
1009 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1010
1007 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1011 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1008
1012
1009 def purge_results(self, client_id, msg):
1013 def purge_results(self, client_id, msg):
1010 """Purge results from memory. This method is more valuable before we move
1014 """Purge results from memory. This method is more valuable before we move
1011 to a DB based message storage mechanism."""
1015 to a DB based message storage mechanism."""
1012 content = msg['content']
1016 content = msg['content']
1013 msg_ids = content.get('msg_ids', [])
1017 msg_ids = content.get('msg_ids', [])
1014 reply = dict(status='ok')
1018 reply = dict(status='ok')
1015 if msg_ids == 'all':
1019 if msg_ids == 'all':
1016 self.db.drop_matching_records(dict(completed={'$ne':None}))
1020 self.db.drop_matching_records(dict(completed={'$ne':None}))
1017 else:
1021 else:
1018 for msg_id in msg_ids:
1022 for msg_id in msg_ids:
1019 if msg_id in self.all_completed:
1023 if msg_id in self.all_completed:
1020 self.db.drop_record(msg_id)
1024 self.db.drop_record(msg_id)
1021 else:
1025 else:
1022 if msg_id in self.pending:
1026 if msg_id in self.pending:
1023 try:
1027 try:
1024 raise IndexError("msg pending: %r"%msg_id)
1028 raise IndexError("msg pending: %r"%msg_id)
1025 except:
1029 except:
1026 reply = error.wrap_exception()
1030 reply = error.wrap_exception()
1027 else:
1031 else:
1028 try:
1032 try:
1029 raise IndexError("No such msg: %r"%msg_id)
1033 raise IndexError("No such msg: %r"%msg_id)
1030 except:
1034 except:
1031 reply = error.wrap_exception()
1035 reply = error.wrap_exception()
1032 break
1036 break
1033 eids = content.get('engine_ids', [])
1037 eids = content.get('engine_ids', [])
1034 for eid in eids:
1038 for eid in eids:
1035 if eid not in self.engines:
1039 if eid not in self.engines:
1036 try:
1040 try:
1037 raise IndexError("No such engine: %i"%eid)
1041 raise IndexError("No such engine: %i"%eid)
1038 except:
1042 except:
1039 reply = error.wrap_exception()
1043 reply = error.wrap_exception()
1040 break
1044 break
1041 msg_ids = self.completed.pop(eid)
1045 msg_ids = self.completed.pop(eid)
1042 uid = self.engines[eid].queue
1046 uid = self.engines[eid].queue
1043 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1047 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1044
1048
1045 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1049 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1046
1050
1047 def resubmit_task(self, client_id, msg, buffers):
1051 def resubmit_task(self, client_id, msg, buffers):
1048 """Resubmit a task."""
1052 """Resubmit a task."""
1049 raise NotImplementedError
1053 raise NotImplementedError
1050
1054
1051 def get_results(self, client_id, msg):
1055 def get_results(self, client_id, msg):
1052 """Get the result of 1 or more messages."""
1056 """Get the result of 1 or more messages."""
1053 content = msg['content']
1057 content = msg['content']
1054 msg_ids = sorted(set(content['msg_ids']))
1058 msg_ids = sorted(set(content['msg_ids']))
1055 statusonly = content.get('status_only', False)
1059 statusonly = content.get('status_only', False)
1056 pending = []
1060 pending = []
1057 completed = []
1061 completed = []
1058 content = dict(status='ok')
1062 content = dict(status='ok')
1059 content['pending'] = pending
1063 content['pending'] = pending
1060 content['completed'] = completed
1064 content['completed'] = completed
1061 buffers = []
1065 buffers = []
1062 if not statusonly:
1066 if not statusonly:
1063 content['results'] = {}
1067 content['results'] = {}
1064 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1068 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1065 for msg_id in msg_ids:
1069 for msg_id in msg_ids:
1066 if msg_id in self.pending:
1070 if msg_id in self.pending:
1067 pending.append(msg_id)
1071 pending.append(msg_id)
1068 elif msg_id in self.all_completed:
1072 elif msg_id in self.all_completed:
1069 completed.append(msg_id)
1073 completed.append(msg_id)
1070 if not statusonly:
1074 if not statusonly:
1071 rec = records[msg_id]
1075 rec = records[msg_id]
1072 io_dict = {}
1076 io_dict = {}
1073 for key in 'pyin pyout pyerr stdout stderr'.split():
1077 for key in 'pyin pyout pyerr stdout stderr'.split():
1074 io_dict[key] = rec[key]
1078 io_dict[key] = rec[key]
1075 content[msg_id] = { 'result_content': rec['result_content'],
1079 content[msg_id] = { 'result_content': rec['result_content'],
1076 'header': rec['header'],
1080 'header': rec['header'],
1077 'result_header' : rec['result_header'],
1081 'result_header' : rec['result_header'],
1078 'io' : io_dict,
1082 'io' : io_dict,
1079 }
1083 }
1080 if rec['result_buffers']:
1084 if rec['result_buffers']:
1081 buffers.extend(map(str, rec['result_buffers']))
1085 buffers.extend(map(str, rec['result_buffers']))
1082 else:
1086 else:
1083 try:
1087 try:
1084 raise KeyError('No such message: '+msg_id)
1088 raise KeyError('No such message: '+msg_id)
1085 except:
1089 except:
1086 content = error.wrap_exception()
1090 content = error.wrap_exception()
1087 break
1091 break
1088 self.session.send(self.query, "result_reply", content=content,
1092 self.session.send(self.query, "result_reply", content=content,
1089 parent=msg, ident=client_id,
1093 parent=msg, ident=client_id,
1090 buffers=buffers)
1094 buffers=buffers)
1091
1095
@@ -1,430 +1,431 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2011 The IPython Development Team
6 # Copyright (C) 2010-2011 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 # Standard library imports.
16 # Standard library imports.
17 from __future__ import print_function
17 from __future__ import print_function
18
18
19 import sys
19 import sys
20 import time
20 import time
21
21
22 from code import CommandCompiler
22 from code import CommandCompiler
23 from datetime import datetime
23 from datetime import datetime
24 from pprint import pprint
24 from pprint import pprint
25
25
26 # System library imports.
26 # System library imports.
27 import zmq
27 import zmq
28 from zmq.eventloop import ioloop, zmqstream
28 from zmq.eventloop import ioloop, zmqstream
29
29
30 # Local imports.
30 # Local imports.
31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str
31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str
32 from IPython.zmq.completer import KernelCompleter
32 from IPython.zmq.completer import KernelCompleter
33
33
34 from IPython.parallel.error import wrap_exception
34 from IPython.parallel.error import wrap_exception
35 from IPython.parallel.factory import SessionFactory
35 from IPython.parallel.factory import SessionFactory
36 from IPython.parallel.util import serialize_object, unpack_apply_message, ISO8601
36 from IPython.parallel.util import serialize_object, unpack_apply_message, ISO8601
37
37
38 def printer(*args):
38 def printer(*args):
39 pprint(args, stream=sys.__stdout__)
39 pprint(args, stream=sys.__stdout__)
40
40
41
41
42 class _Passer(zmqstream.ZMQStream):
42 class _Passer(zmqstream.ZMQStream):
43 """Empty class that implements `send()` that does nothing.
43 """Empty class that implements `send()` that does nothing.
44
44
45 Subclass ZMQStream for StreamSession typechecking
45 Subclass ZMQStream for StreamSession typechecking
46
46
47 """
47 """
48 def __init__(self, *args, **kwargs):
48 def __init__(self, *args, **kwargs):
49 pass
49 pass
50
50
51 def send(self, *args, **kwargs):
51 def send(self, *args, **kwargs):
52 pass
52 pass
53 send_multipart = send
53 send_multipart = send
54
54
55
55
56 #-----------------------------------------------------------------------------
56 #-----------------------------------------------------------------------------
57 # Main kernel class
57 # Main kernel class
58 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
59
59
60 class Kernel(SessionFactory):
60 class Kernel(SessionFactory):
61
61
62 #---------------------------------------------------------------------------
62 #---------------------------------------------------------------------------
63 # Kernel interface
63 # Kernel interface
64 #---------------------------------------------------------------------------
64 #---------------------------------------------------------------------------
65
65
66 # kwargs:
66 # kwargs:
67 int_id = Int(-1, config=True)
67 int_id = Int(-1, config=True)
68 user_ns = Dict(config=True)
68 user_ns = Dict(config=True)
69 exec_lines = List(config=True)
69 exec_lines = List(config=True)
70
70
71 control_stream = Instance(zmqstream.ZMQStream)
71 control_stream = Instance(zmqstream.ZMQStream)
72 task_stream = Instance(zmqstream.ZMQStream)
72 task_stream = Instance(zmqstream.ZMQStream)
73 iopub_stream = Instance(zmqstream.ZMQStream)
73 iopub_stream = Instance(zmqstream.ZMQStream)
74 client = Instance('IPython.parallel.Client')
74 client = Instance('IPython.parallel.Client')
75
75
76 # internals
76 # internals
77 shell_streams = List()
77 shell_streams = List()
78 compiler = Instance(CommandCompiler, (), {})
78 compiler = Instance(CommandCompiler, (), {})
79 completer = Instance(KernelCompleter)
79 completer = Instance(KernelCompleter)
80
80
81 aborted = Set()
81 aborted = Set()
82 shell_handlers = Dict()
82 shell_handlers = Dict()
83 control_handlers = Dict()
83 control_handlers = Dict()
84
84
85 def _set_prefix(self):
85 def _set_prefix(self):
86 self.prefix = "engine.%s"%self.int_id
86 self.prefix = "engine.%s"%self.int_id
87
87
88 def _connect_completer(self):
88 def _connect_completer(self):
89 self.completer = KernelCompleter(self.user_ns)
89 self.completer = KernelCompleter(self.user_ns)
90
90
91 def __init__(self, **kwargs):
91 def __init__(self, **kwargs):
92 super(Kernel, self).__init__(**kwargs)
92 super(Kernel, self).__init__(**kwargs)
93 self._set_prefix()
93 self._set_prefix()
94 self._connect_completer()
94 self._connect_completer()
95
95
96 self.on_trait_change(self._set_prefix, 'id')
96 self.on_trait_change(self._set_prefix, 'id')
97 self.on_trait_change(self._connect_completer, 'user_ns')
97 self.on_trait_change(self._connect_completer, 'user_ns')
98
98
99 # Build dict of handlers for message types
99 # Build dict of handlers for message types
100 for msg_type in ['execute_request', 'complete_request', 'apply_request',
100 for msg_type in ['execute_request', 'complete_request', 'apply_request',
101 'clear_request']:
101 'clear_request']:
102 self.shell_handlers[msg_type] = getattr(self, msg_type)
102 self.shell_handlers[msg_type] = getattr(self, msg_type)
103
103
104 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
104 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
105 self.control_handlers[msg_type] = getattr(self, msg_type)
105 self.control_handlers[msg_type] = getattr(self, msg_type)
106
106
107 self._initial_exec_lines()
107 self._initial_exec_lines()
108
108
109 def _wrap_exception(self, method=None):
109 def _wrap_exception(self, method=None):
110 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
110 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
111 content=wrap_exception(e_info)
111 content=wrap_exception(e_info)
112 return content
112 return content
113
113
114 def _initial_exec_lines(self):
114 def _initial_exec_lines(self):
115 s = _Passer()
115 s = _Passer()
116 content = dict(silent=True, user_variable=[],user_expressions=[])
116 content = dict(silent=True, user_variable=[],user_expressions=[])
117 for line in self.exec_lines:
117 for line in self.exec_lines:
118 self.log.debug("executing initialization: %s"%line)
118 self.log.debug("executing initialization: %s"%line)
119 content.update({'code':line})
119 content.update({'code':line})
120 msg = self.session.msg('execute_request', content)
120 msg = self.session.msg('execute_request', content)
121 self.execute_request(s, [], msg)
121 self.execute_request(s, [], msg)
122
122
123
123
124 #-------------------- control handlers -----------------------------
124 #-------------------- control handlers -----------------------------
125 def abort_queues(self):
125 def abort_queues(self):
126 for stream in self.shell_streams:
126 for stream in self.shell_streams:
127 if stream:
127 if stream:
128 self.abort_queue(stream)
128 self.abort_queue(stream)
129
129
130 def abort_queue(self, stream):
130 def abort_queue(self, stream):
131 while True:
131 while True:
132 try:
132 try:
133 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
133 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
134 except zmq.ZMQError as e:
134 except zmq.ZMQError as e:
135 if e.errno == zmq.EAGAIN:
135 if e.errno == zmq.EAGAIN:
136 break
136 break
137 else:
137 else:
138 return
138 return
139 else:
139 else:
140 if msg is None:
140 if msg is None:
141 return
141 return
142 else:
142 else:
143 idents,msg = msg
143 idents,msg = msg
144
144
145 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
145 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
146 # msg = self.reply_socket.recv_json()
146 # msg = self.reply_socket.recv_json()
147 self.log.info("Aborting:")
147 self.log.info("Aborting:")
148 self.log.info(str(msg))
148 self.log.info(str(msg))
149 msg_type = msg['msg_type']
149 msg_type = msg['msg_type']
150 reply_type = msg_type.split('_')[0] + '_reply'
150 reply_type = msg_type.split('_')[0] + '_reply'
151 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
151 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
152 # self.reply_socket.send(ident,zmq.SNDMORE)
152 # self.reply_socket.send(ident,zmq.SNDMORE)
153 # self.reply_socket.send_json(reply_msg)
153 # self.reply_socket.send_json(reply_msg)
154 reply_msg = self.session.send(stream, reply_type,
154 reply_msg = self.session.send(stream, reply_type,
155 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
155 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
156 self.log.debug(str(reply_msg))
156 self.log.debug(str(reply_msg))
157 # We need to wait a bit for requests to come in. This can probably
157 # We need to wait a bit for requests to come in. This can probably
158 # be set shorter for true asynchronous clients.
158 # be set shorter for true asynchronous clients.
159 time.sleep(0.05)
159 time.sleep(0.05)
160
160
161 def abort_request(self, stream, ident, parent):
161 def abort_request(self, stream, ident, parent):
162 """abort a specifig msg by id"""
162 """abort a specifig msg by id"""
163 msg_ids = parent['content'].get('msg_ids', None)
163 msg_ids = parent['content'].get('msg_ids', None)
164 if isinstance(msg_ids, basestring):
164 if isinstance(msg_ids, basestring):
165 msg_ids = [msg_ids]
165 msg_ids = [msg_ids]
166 if not msg_ids:
166 if not msg_ids:
167 self.abort_queues()
167 self.abort_queues()
168 for mid in msg_ids:
168 for mid in msg_ids:
169 self.aborted.add(str(mid))
169 self.aborted.add(str(mid))
170
170
171 content = dict(status='ok')
171 content = dict(status='ok')
172 reply_msg = self.session.send(stream, 'abort_reply', content=content,
172 reply_msg = self.session.send(stream, 'abort_reply', content=content,
173 parent=parent, ident=ident)
173 parent=parent, ident=ident)
174 self.log.debug(str(reply_msg))
174 self.log.debug(str(reply_msg))
175
175
176 def shutdown_request(self, stream, ident, parent):
176 def shutdown_request(self, stream, ident, parent):
177 """kill ourself. This should really be handled in an external process"""
177 """kill ourself. This should really be handled in an external process"""
178 try:
178 try:
179 self.abort_queues()
179 self.abort_queues()
180 except:
180 except:
181 content = self._wrap_exception('shutdown')
181 content = self._wrap_exception('shutdown')
182 else:
182 else:
183 content = dict(parent['content'])
183 content = dict(parent['content'])
184 content['status'] = 'ok'
184 content['status'] = 'ok'
185 msg = self.session.send(stream, 'shutdown_reply',
185 msg = self.session.send(stream, 'shutdown_reply',
186 content=content, parent=parent, ident=ident)
186 content=content, parent=parent, ident=ident)
187 self.log.debug(str(msg))
187 self.log.debug(str(msg))
188 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
188 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
189 dc.start()
189 dc.start()
190
190
191 def dispatch_control(self, msg):
191 def dispatch_control(self, msg):
192 idents,msg = self.session.feed_identities(msg, copy=False)
192 idents,msg = self.session.feed_identities(msg, copy=False)
193 try:
193 try:
194 msg = self.session.unpack_message(msg, content=True, copy=False)
194 msg = self.session.unpack_message(msg, content=True, copy=False)
195 except:
195 except:
196 self.log.error("Invalid Message", exc_info=True)
196 self.log.error("Invalid Message", exc_info=True)
197 return
197 return
198
198
199 header = msg['header']
199 header = msg['header']
200 msg_id = header['msg_id']
200 msg_id = header['msg_id']
201
201
202 handler = self.control_handlers.get(msg['msg_type'], None)
202 handler = self.control_handlers.get(msg['msg_type'], None)
203 if handler is None:
203 if handler is None:
204 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
204 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
205 else:
205 else:
206 handler(self.control_stream, idents, msg)
206 handler(self.control_stream, idents, msg)
207
207
208
208
209 #-------------------- queue helpers ------------------------------
209 #-------------------- queue helpers ------------------------------
210
210
211 def check_dependencies(self, dependencies):
211 def check_dependencies(self, dependencies):
212 if not dependencies:
212 if not dependencies:
213 return True
213 return True
214 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
214 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
215 anyorall = dependencies[0]
215 anyorall = dependencies[0]
216 dependencies = dependencies[1]
216 dependencies = dependencies[1]
217 else:
217 else:
218 anyorall = 'all'
218 anyorall = 'all'
219 results = self.client.get_results(dependencies,status_only=True)
219 results = self.client.get_results(dependencies,status_only=True)
220 if results['status'] != 'ok':
220 if results['status'] != 'ok':
221 return False
221 return False
222
222
223 if anyorall == 'any':
223 if anyorall == 'any':
224 if not results['completed']:
224 if not results['completed']:
225 return False
225 return False
226 else:
226 else:
227 if results['pending']:
227 if results['pending']:
228 return False
228 return False
229
229
230 return True
230 return True
231
231
232 def check_aborted(self, msg_id):
232 def check_aborted(self, msg_id):
233 return msg_id in self.aborted
233 return msg_id in self.aborted
234
234
235 #-------------------- queue handlers -----------------------------
235 #-------------------- queue handlers -----------------------------
236
236
237 def clear_request(self, stream, idents, parent):
237 def clear_request(self, stream, idents, parent):
238 """Clear our namespace."""
238 """Clear our namespace."""
239 self.user_ns = {}
239 self.user_ns = {}
240 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
240 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
241 content = dict(status='ok'))
241 content = dict(status='ok'))
242 self._initial_exec_lines()
242 self._initial_exec_lines()
243
243
244 def execute_request(self, stream, ident, parent):
244 def execute_request(self, stream, ident, parent):
245 self.log.debug('execute request %s'%parent)
245 self.log.debug('execute request %s'%parent)
246 try:
246 try:
247 code = parent[u'content'][u'code']
247 code = parent[u'content'][u'code']
248 except:
248 except:
249 self.log.error("Got bad msg: %s"%parent, exc_info=True)
249 self.log.error("Got bad msg: %s"%parent, exc_info=True)
250 return
250 return
251 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
251 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
252 ident='%s.pyin'%self.prefix)
252 ident='%s.pyin'%self.prefix)
253 started = datetime.now().strftime(ISO8601)
253 started = datetime.now().strftime(ISO8601)
254 try:
254 try:
255 comp_code = self.compiler(code, '<zmq-kernel>')
255 comp_code = self.compiler(code, '<zmq-kernel>')
256 # allow for not overriding displayhook
256 # allow for not overriding displayhook
257 if hasattr(sys.displayhook, 'set_parent'):
257 if hasattr(sys.displayhook, 'set_parent'):
258 sys.displayhook.set_parent(parent)
258 sys.displayhook.set_parent(parent)
259 sys.stdout.set_parent(parent)
259 sys.stdout.set_parent(parent)
260 sys.stderr.set_parent(parent)
260 sys.stderr.set_parent(parent)
261 exec comp_code in self.user_ns, self.user_ns
261 exec comp_code in self.user_ns, self.user_ns
262 except:
262 except:
263 exc_content = self._wrap_exception('execute')
263 exc_content = self._wrap_exception('execute')
264 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
264 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
265 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
265 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
266 ident='%s.pyerr'%self.prefix)
266 ident='%s.pyerr'%self.prefix)
267 reply_content = exc_content
267 reply_content = exc_content
268 else:
268 else:
269 reply_content = {'status' : 'ok'}
269 reply_content = {'status' : 'ok'}
270
270
271 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
271 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
272 ident=ident, subheader = dict(started=started))
272 ident=ident, subheader = dict(started=started))
273 self.log.debug(str(reply_msg))
273 self.log.debug(str(reply_msg))
274 if reply_msg['content']['status'] == u'error':
274 if reply_msg['content']['status'] == u'error':
275 self.abort_queues()
275 self.abort_queues()
276
276
277 def complete_request(self, stream, ident, parent):
277 def complete_request(self, stream, ident, parent):
278 matches = {'matches' : self.complete(parent),
278 matches = {'matches' : self.complete(parent),
279 'status' : 'ok'}
279 'status' : 'ok'}
280 completion_msg = self.session.send(stream, 'complete_reply',
280 completion_msg = self.session.send(stream, 'complete_reply',
281 matches, parent, ident)
281 matches, parent, ident)
282 # print >> sys.__stdout__, completion_msg
282 # print >> sys.__stdout__, completion_msg
283
283
284 def complete(self, msg):
284 def complete(self, msg):
285 return self.completer.complete(msg.content.line, msg.content.text)
285 return self.completer.complete(msg.content.line, msg.content.text)
286
286
287 def apply_request(self, stream, ident, parent):
287 def apply_request(self, stream, ident, parent):
288 # flush previous reply, so this request won't block it
288 # flush previous reply, so this request won't block it
289 stream.flush(zmq.POLLOUT)
289 stream.flush(zmq.POLLOUT)
290
290
291 try:
291 try:
292 content = parent[u'content']
292 content = parent[u'content']
293 bufs = parent[u'buffers']
293 bufs = parent[u'buffers']
294 msg_id = parent['header']['msg_id']
294 msg_id = parent['header']['msg_id']
295 # bound = parent['header'].get('bound', False)
295 # bound = parent['header'].get('bound', False)
296 except:
296 except:
297 self.log.error("Got bad msg: %s"%parent, exc_info=True)
297 self.log.error("Got bad msg: %s"%parent, exc_info=True)
298 return
298 return
299 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
299 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
300 # self.iopub_stream.send(pyin_msg)
300 # self.iopub_stream.send(pyin_msg)
301 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
301 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
302 sub = {'dependencies_met' : True, 'engine' : self.ident,
302 sub = {'dependencies_met' : True, 'engine' : self.ident,
303 'started': datetime.now().strftime(ISO8601)}
303 'started': datetime.now().strftime(ISO8601)}
304 try:
304 try:
305 # allow for not overriding displayhook
305 # allow for not overriding displayhook
306 if hasattr(sys.displayhook, 'set_parent'):
306 if hasattr(sys.displayhook, 'set_parent'):
307 sys.displayhook.set_parent(parent)
307 sys.displayhook.set_parent(parent)
308 sys.stdout.set_parent(parent)
308 sys.stdout.set_parent(parent)
309 sys.stderr.set_parent(parent)
309 sys.stderr.set_parent(parent)
310 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
310 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
311 working = self.user_ns
311 working = self.user_ns
312 # suffix =
312 # suffix =
313 prefix = "_"+str(msg_id).replace("-","")+"_"
313 prefix = "_"+str(msg_id).replace("-","")+"_"
314
314
315 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
315 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
316 # if bound:
316 # if bound:
317 # bound_ns = Namespace(working)
317 # bound_ns = Namespace(working)
318 # args = [bound_ns]+list(args)
318 # args = [bound_ns]+list(args)
319
319
320 fname = getattr(f, '__name__', 'f')
320 fname = getattr(f, '__name__', 'f')
321
321
322 fname = prefix+"f"
322 fname = prefix+"f"
323 argname = prefix+"args"
323 argname = prefix+"args"
324 kwargname = prefix+"kwargs"
324 kwargname = prefix+"kwargs"
325 resultname = prefix+"result"
325 resultname = prefix+"result"
326
326
327 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
327 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
328 # print ns
328 # print ns
329 working.update(ns)
329 working.update(ns)
330 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
330 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
331 try:
331 try:
332 exec code in working,working
332 exec code in working,working
333 result = working.get(resultname)
333 result = working.get(resultname)
334 finally:
334 finally:
335 for key in ns.iterkeys():
335 for key in ns.iterkeys():
336 working.pop(key)
336 working.pop(key)
337 # if bound:
337 # if bound:
338 # working.update(bound_ns)
338 # working.update(bound_ns)
339
339
340 packed_result,buf = serialize_object(result)
340 packed_result,buf = serialize_object(result)
341 result_buf = [packed_result]+buf
341 result_buf = [packed_result]+buf
342 except:
342 except:
343 exc_content = self._wrap_exception('apply')
343 exc_content = self._wrap_exception('apply')
344 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
344 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
345 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
345 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
346 ident='%s.pyerr'%self.prefix)
346 ident='%s.pyerr'%self.prefix)
347 reply_content = exc_content
347 reply_content = exc_content
348 result_buf = []
348 result_buf = []
349
349
350 if exc_content['ename'] == 'UnmetDependency':
350 if exc_content['ename'] == 'UnmetDependency':
351 sub['dependencies_met'] = False
351 sub['dependencies_met'] = False
352 else:
352 else:
353 reply_content = {'status' : 'ok'}
353 reply_content = {'status' : 'ok'}
354
354
355 # put 'ok'/'error' status in header, for scheduler introspection:
355 # put 'ok'/'error' status in header, for scheduler introspection:
356 sub['status'] = reply_content['status']
356 sub['status'] = reply_content['status']
357
357
358 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
358 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
359 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
359 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
360
360
361 # flush i/o
361 # flush i/o
362 # should this be before reply_msg is sent, like in the single-kernel code,
362 # should this be before reply_msg is sent, like in the single-kernel code,
363 # or should nothing get in the way of real results?
363 # or should nothing get in the way of real results?
364 sys.stdout.flush()
364 sys.stdout.flush()
365 sys.stderr.flush()
365 sys.stderr.flush()
366
366
367 def dispatch_queue(self, stream, msg):
367 def dispatch_queue(self, stream, msg):
368 self.control_stream.flush()
368 self.control_stream.flush()
369 idents,msg = self.session.feed_identities(msg, copy=False)
369 idents,msg = self.session.feed_identities(msg, copy=False)
370 try:
370 try:
371 msg = self.session.unpack_message(msg, content=True, copy=False)
371 msg = self.session.unpack_message(msg, content=True, copy=False)
372 except:
372 except:
373 self.log.error("Invalid Message", exc_info=True)
373 self.log.error("Invalid Message", exc_info=True)
374 return
374 return
375
375
376
376
377 header = msg['header']
377 header = msg['header']
378 msg_id = header['msg_id']
378 msg_id = header['msg_id']
379 if self.check_aborted(msg_id):
379 if self.check_aborted(msg_id):
380 self.aborted.remove(msg_id)
380 self.aborted.remove(msg_id)
381 # is it safe to assume a msg_id will not be resubmitted?
381 # is it safe to assume a msg_id will not be resubmitted?
382 reply_type = msg['msg_type'].split('_')[0] + '_reply'
382 reply_type = msg['msg_type'].split('_')[0] + '_reply'
383 reply_msg = self.session.send(stream, reply_type,
383 status = {'status' : 'aborted'}
384 content={'status' : 'aborted'}, parent=msg, ident=idents)
384 reply_msg = self.session.send(stream, reply_type, subheader=status,
385 content=status, parent=msg, ident=idents)
385 return
386 return
386 handler = self.shell_handlers.get(msg['msg_type'], None)
387 handler = self.shell_handlers.get(msg['msg_type'], None)
387 if handler is None:
388 if handler is None:
388 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
389 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
389 else:
390 else:
390 handler(stream, idents, msg)
391 handler(stream, idents, msg)
391
392
392 def start(self):
393 def start(self):
393 #### stream mode:
394 #### stream mode:
394 if self.control_stream:
395 if self.control_stream:
395 self.control_stream.on_recv(self.dispatch_control, copy=False)
396 self.control_stream.on_recv(self.dispatch_control, copy=False)
396 self.control_stream.on_err(printer)
397 self.control_stream.on_err(printer)
397
398
398 def make_dispatcher(stream):
399 def make_dispatcher(stream):
399 def dispatcher(msg):
400 def dispatcher(msg):
400 return self.dispatch_queue(stream, msg)
401 return self.dispatch_queue(stream, msg)
401 return dispatcher
402 return dispatcher
402
403
403 for s in self.shell_streams:
404 for s in self.shell_streams:
404 s.on_recv(make_dispatcher(s), copy=False)
405 s.on_recv(make_dispatcher(s), copy=False)
405 s.on_err(printer)
406 s.on_err(printer)
406
407
407 if self.iopub_stream:
408 if self.iopub_stream:
408 self.iopub_stream.on_err(printer)
409 self.iopub_stream.on_err(printer)
409
410
410 #### while True mode:
411 #### while True mode:
411 # while True:
412 # while True:
412 # idle = True
413 # idle = True
413 # try:
414 # try:
414 # msg = self.shell_stream.socket.recv_multipart(
415 # msg = self.shell_stream.socket.recv_multipart(
415 # zmq.NOBLOCK, copy=False)
416 # zmq.NOBLOCK, copy=False)
416 # except zmq.ZMQError, e:
417 # except zmq.ZMQError, e:
417 # if e.errno != zmq.EAGAIN:
418 # if e.errno != zmq.EAGAIN:
418 # raise e
419 # raise e
419 # else:
420 # else:
420 # idle=False
421 # idle=False
421 # self.dispatch_queue(self.shell_stream, msg)
422 # self.dispatch_queue(self.shell_stream, msg)
422 #
423 #
423 # if not self.task_stream.empty():
424 # if not self.task_stream.empty():
424 # idle=False
425 # idle=False
425 # msg = self.task_stream.recv_multipart()
426 # msg = self.task_stream.recv_multipart()
426 # self.dispatch_queue(self.task_stream, msg)
427 # self.dispatch_queue(self.task_stream, msg)
427 # if idle:
428 # if idle:
428 # # don't busywait
429 # # don't busywait
429 # time.sleep(1e-3)
430 # time.sleep(1e-3)
430
431
General Comments 0
You need to be logged in to leave comments. Login now