##// END OF EJS Templates
Renaming unpack_message to unserialize and updating docstrings.
Brian E. Granger -
Show More
@@ -1,1291 +1,1291 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5
5
6 Authors:
6 Authors:
7
7
8 * Min RK
8 * Min RK
9 """
9 """
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2010 The IPython Development Team
11 # Copyright (C) 2010 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18 # Imports
18 # Imports
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 from __future__ import print_function
20 from __future__ import print_function
21
21
22 import sys
22 import sys
23 import time
23 import time
24 from datetime import datetime
24 from datetime import datetime
25
25
26 import zmq
26 import zmq
27 from zmq.eventloop import ioloop
27 from zmq.eventloop import ioloop
28 from zmq.eventloop.zmqstream import ZMQStream
28 from zmq.eventloop.zmqstream import ZMQStream
29
29
30 # internal:
30 # internal:
31 from IPython.utils.importstring import import_item
31 from IPython.utils.importstring import import_item
32 from IPython.utils.traitlets import (
32 from IPython.utils.traitlets import (
33 HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
34 )
34 )
35
35
36 from IPython.parallel import error, util
36 from IPython.parallel import error, util
37 from IPython.parallel.factory import RegistrationFactory
37 from IPython.parallel.factory import RegistrationFactory
38
38
39 from IPython.zmq.session import SessionFactory
39 from IPython.zmq.session import SessionFactory
40
40
41 from .heartmonitor import HeartMonitor
41 from .heartmonitor import HeartMonitor
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Code
44 # Code
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 def _passer(*args, **kwargs):
47 def _passer(*args, **kwargs):
48 return
48 return
49
49
50 def _printer(*args, **kwargs):
50 def _printer(*args, **kwargs):
51 print (args)
51 print (args)
52 print (kwargs)
52 print (kwargs)
53
53
54 def empty_record():
54 def empty_record():
55 """Return an empty dict with all record keys."""
55 """Return an empty dict with all record keys."""
56 return {
56 return {
57 'msg_id' : None,
57 'msg_id' : None,
58 'header' : None,
58 'header' : None,
59 'content': None,
59 'content': None,
60 'buffers': None,
60 'buffers': None,
61 'submitted': None,
61 'submitted': None,
62 'client_uuid' : None,
62 'client_uuid' : None,
63 'engine_uuid' : None,
63 'engine_uuid' : None,
64 'started': None,
64 'started': None,
65 'completed': None,
65 'completed': None,
66 'resubmitted': None,
66 'resubmitted': None,
67 'result_header' : None,
67 'result_header' : None,
68 'result_content' : None,
68 'result_content' : None,
69 'result_buffers' : None,
69 'result_buffers' : None,
70 'queue' : None,
70 'queue' : None,
71 'pyin' : None,
71 'pyin' : None,
72 'pyout': None,
72 'pyout': None,
73 'pyerr': None,
73 'pyerr': None,
74 'stdout': '',
74 'stdout': '',
75 'stderr': '',
75 'stderr': '',
76 }
76 }
77
77
78 def init_record(msg):
78 def init_record(msg):
79 """Initialize a TaskRecord based on a request."""
79 """Initialize a TaskRecord based on a request."""
80 header = msg['header']
80 header = msg['header']
81 return {
81 return {
82 'msg_id' : header['msg_id'],
82 'msg_id' : header['msg_id'],
83 'header' : header,
83 'header' : header,
84 'content': msg['content'],
84 'content': msg['content'],
85 'buffers': msg['buffers'],
85 'buffers': msg['buffers'],
86 'submitted': header['date'],
86 'submitted': header['date'],
87 'client_uuid' : None,
87 'client_uuid' : None,
88 'engine_uuid' : None,
88 'engine_uuid' : None,
89 'started': None,
89 'started': None,
90 'completed': None,
90 'completed': None,
91 'resubmitted': None,
91 'resubmitted': None,
92 'result_header' : None,
92 'result_header' : None,
93 'result_content' : None,
93 'result_content' : None,
94 'result_buffers' : None,
94 'result_buffers' : None,
95 'queue' : None,
95 'queue' : None,
96 'pyin' : None,
96 'pyin' : None,
97 'pyout': None,
97 'pyout': None,
98 'pyerr': None,
98 'pyerr': None,
99 'stdout': '',
99 'stdout': '',
100 'stderr': '',
100 'stderr': '',
101 }
101 }
102
102
103
103
104 class EngineConnector(HasTraits):
104 class EngineConnector(HasTraits):
105 """A simple object for accessing the various zmq connections of an object.
105 """A simple object for accessing the various zmq connections of an object.
106 Attributes are:
106 Attributes are:
107 id (int): engine ID
107 id (int): engine ID
108 uuid (str): uuid (unused?)
108 uuid (str): uuid (unused?)
109 queue (str): identity of queue's XREQ socket
109 queue (str): identity of queue's XREQ socket
110 registration (str): identity of registration XREQ socket
110 registration (str): identity of registration XREQ socket
111 heartbeat (str): identity of heartbeat XREQ socket
111 heartbeat (str): identity of heartbeat XREQ socket
112 """
112 """
113 id=Int(0)
113 id=Int(0)
114 queue=CBytes()
114 queue=CBytes()
115 control=CBytes()
115 control=CBytes()
116 registration=CBytes()
116 registration=CBytes()
117 heartbeat=CBytes()
117 heartbeat=CBytes()
118 pending=Set()
118 pending=Set()
119
119
120 class HubFactory(RegistrationFactory):
120 class HubFactory(RegistrationFactory):
121 """The Configurable for setting up a Hub."""
121 """The Configurable for setting up a Hub."""
122
122
123 # port-pairs for monitoredqueues:
123 # port-pairs for monitoredqueues:
124 hb = Tuple(Int,Int,config=True,
124 hb = Tuple(Int,Int,config=True,
125 help="""XREQ/SUB Port pair for Engine heartbeats""")
125 help="""XREQ/SUB Port pair for Engine heartbeats""")
126 def _hb_default(self):
126 def _hb_default(self):
127 return tuple(util.select_random_ports(2))
127 return tuple(util.select_random_ports(2))
128
128
129 mux = Tuple(Int,Int,config=True,
129 mux = Tuple(Int,Int,config=True,
130 help="""Engine/Client Port pair for MUX queue""")
130 help="""Engine/Client Port pair for MUX queue""")
131
131
132 def _mux_default(self):
132 def _mux_default(self):
133 return tuple(util.select_random_ports(2))
133 return tuple(util.select_random_ports(2))
134
134
135 task = Tuple(Int,Int,config=True,
135 task = Tuple(Int,Int,config=True,
136 help="""Engine/Client Port pair for Task queue""")
136 help="""Engine/Client Port pair for Task queue""")
137 def _task_default(self):
137 def _task_default(self):
138 return tuple(util.select_random_ports(2))
138 return tuple(util.select_random_ports(2))
139
139
140 control = Tuple(Int,Int,config=True,
140 control = Tuple(Int,Int,config=True,
141 help="""Engine/Client Port pair for Control queue""")
141 help="""Engine/Client Port pair for Control queue""")
142
142
143 def _control_default(self):
143 def _control_default(self):
144 return tuple(util.select_random_ports(2))
144 return tuple(util.select_random_ports(2))
145
145
146 iopub = Tuple(Int,Int,config=True,
146 iopub = Tuple(Int,Int,config=True,
147 help="""Engine/Client Port pair for IOPub relay""")
147 help="""Engine/Client Port pair for IOPub relay""")
148
148
149 def _iopub_default(self):
149 def _iopub_default(self):
150 return tuple(util.select_random_ports(2))
150 return tuple(util.select_random_ports(2))
151
151
152 # single ports:
152 # single ports:
153 mon_port = Int(config=True,
153 mon_port = Int(config=True,
154 help="""Monitor (SUB) port for queue traffic""")
154 help="""Monitor (SUB) port for queue traffic""")
155
155
156 def _mon_port_default(self):
156 def _mon_port_default(self):
157 return util.select_random_ports(1)[0]
157 return util.select_random_ports(1)[0]
158
158
159 notifier_port = Int(config=True,
159 notifier_port = Int(config=True,
160 help="""PUB port for sending engine status notifications""")
160 help="""PUB port for sending engine status notifications""")
161
161
162 def _notifier_port_default(self):
162 def _notifier_port_default(self):
163 return util.select_random_ports(1)[0]
163 return util.select_random_ports(1)[0]
164
164
165 engine_ip = Unicode('127.0.0.1', config=True,
165 engine_ip = Unicode('127.0.0.1', config=True,
166 help="IP on which to listen for engine connections. [default: loopback]")
166 help="IP on which to listen for engine connections. [default: loopback]")
167 engine_transport = Unicode('tcp', config=True,
167 engine_transport = Unicode('tcp', config=True,
168 help="0MQ transport for engine connections. [default: tcp]")
168 help="0MQ transport for engine connections. [default: tcp]")
169
169
170 client_ip = Unicode('127.0.0.1', config=True,
170 client_ip = Unicode('127.0.0.1', config=True,
171 help="IP on which to listen for client connections. [default: loopback]")
171 help="IP on which to listen for client connections. [default: loopback]")
172 client_transport = Unicode('tcp', config=True,
172 client_transport = Unicode('tcp', config=True,
173 help="0MQ transport for client connections. [default : tcp]")
173 help="0MQ transport for client connections. [default : tcp]")
174
174
175 monitor_ip = Unicode('127.0.0.1', config=True,
175 monitor_ip = Unicode('127.0.0.1', config=True,
176 help="IP on which to listen for monitor messages. [default: loopback]")
176 help="IP on which to listen for monitor messages. [default: loopback]")
177 monitor_transport = Unicode('tcp', config=True,
177 monitor_transport = Unicode('tcp', config=True,
178 help="0MQ transport for monitor messages. [default : tcp]")
178 help="0MQ transport for monitor messages. [default : tcp]")
179
179
180 monitor_url = Unicode('')
180 monitor_url = Unicode('')
181
181
182 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
182 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
183 config=True, help="""The class to use for the DB backend""")
183 config=True, help="""The class to use for the DB backend""")
184
184
185 # not configurable
185 # not configurable
186 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
186 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
187 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
187 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
188
188
189 def _ip_changed(self, name, old, new):
189 def _ip_changed(self, name, old, new):
190 self.engine_ip = new
190 self.engine_ip = new
191 self.client_ip = new
191 self.client_ip = new
192 self.monitor_ip = new
192 self.monitor_ip = new
193 self._update_monitor_url()
193 self._update_monitor_url()
194
194
195 def _update_monitor_url(self):
195 def _update_monitor_url(self):
196 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
196 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
197
197
198 def _transport_changed(self, name, old, new):
198 def _transport_changed(self, name, old, new):
199 self.engine_transport = new
199 self.engine_transport = new
200 self.client_transport = new
200 self.client_transport = new
201 self.monitor_transport = new
201 self.monitor_transport = new
202 self._update_monitor_url()
202 self._update_monitor_url()
203
203
204 def __init__(self, **kwargs):
204 def __init__(self, **kwargs):
205 super(HubFactory, self).__init__(**kwargs)
205 super(HubFactory, self).__init__(**kwargs)
206 self._update_monitor_url()
206 self._update_monitor_url()
207
207
208
208
209 def construct(self):
209 def construct(self):
210 self.init_hub()
210 self.init_hub()
211
211
212 def start(self):
212 def start(self):
213 self.heartmonitor.start()
213 self.heartmonitor.start()
214 self.log.info("Heartmonitor started")
214 self.log.info("Heartmonitor started")
215
215
216 def init_hub(self):
216 def init_hub(self):
217 """construct"""
217 """construct"""
218 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
218 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
219 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
219 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
220
220
221 ctx = self.context
221 ctx = self.context
222 loop = self.loop
222 loop = self.loop
223
223
224 # Registrar socket
224 # Registrar socket
225 q = ZMQStream(ctx.socket(zmq.XREP), loop)
225 q = ZMQStream(ctx.socket(zmq.XREP), loop)
226 q.bind(client_iface % self.regport)
226 q.bind(client_iface % self.regport)
227 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
227 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
228 if self.client_ip != self.engine_ip:
228 if self.client_ip != self.engine_ip:
229 q.bind(engine_iface % self.regport)
229 q.bind(engine_iface % self.regport)
230 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
230 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
231
231
232 ### Engine connections ###
232 ### Engine connections ###
233
233
234 # heartbeat
234 # heartbeat
235 hpub = ctx.socket(zmq.PUB)
235 hpub = ctx.socket(zmq.PUB)
236 hpub.bind(engine_iface % self.hb[0])
236 hpub.bind(engine_iface % self.hb[0])
237 hrep = ctx.socket(zmq.XREP)
237 hrep = ctx.socket(zmq.XREP)
238 hrep.bind(engine_iface % self.hb[1])
238 hrep.bind(engine_iface % self.hb[1])
239 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
239 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
240 pingstream=ZMQStream(hpub,loop),
240 pingstream=ZMQStream(hpub,loop),
241 pongstream=ZMQStream(hrep,loop)
241 pongstream=ZMQStream(hrep,loop)
242 )
242 )
243
243
244 ### Client connections ###
244 ### Client connections ###
245 # Notifier socket
245 # Notifier socket
246 n = ZMQStream(ctx.socket(zmq.PUB), loop)
246 n = ZMQStream(ctx.socket(zmq.PUB), loop)
247 n.bind(client_iface%self.notifier_port)
247 n.bind(client_iface%self.notifier_port)
248
248
249 ### build and launch the queues ###
249 ### build and launch the queues ###
250
250
251 # monitor socket
251 # monitor socket
252 sub = ctx.socket(zmq.SUB)
252 sub = ctx.socket(zmq.SUB)
253 sub.setsockopt(zmq.SUBSCRIBE, b"")
253 sub.setsockopt(zmq.SUBSCRIBE, b"")
254 sub.bind(self.monitor_url)
254 sub.bind(self.monitor_url)
255 sub.bind('inproc://monitor')
255 sub.bind('inproc://monitor')
256 sub = ZMQStream(sub, loop)
256 sub = ZMQStream(sub, loop)
257
257
258 # connect the db
258 # connect the db
259 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
259 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
260 # cdir = self.config.Global.cluster_dir
260 # cdir = self.config.Global.cluster_dir
261 self.db = import_item(str(self.db_class))(session=self.session.session,
261 self.db = import_item(str(self.db_class))(session=self.session.session,
262 config=self.config, log=self.log)
262 config=self.config, log=self.log)
263 time.sleep(.25)
263 time.sleep(.25)
264 try:
264 try:
265 scheme = self.config.TaskScheduler.scheme_name
265 scheme = self.config.TaskScheduler.scheme_name
266 except AttributeError:
266 except AttributeError:
267 from .scheduler import TaskScheduler
267 from .scheduler import TaskScheduler
268 scheme = TaskScheduler.scheme_name.get_default_value()
268 scheme = TaskScheduler.scheme_name.get_default_value()
269 # build connection dicts
269 # build connection dicts
270 self.engine_info = {
270 self.engine_info = {
271 'control' : engine_iface%self.control[1],
271 'control' : engine_iface%self.control[1],
272 'mux': engine_iface%self.mux[1],
272 'mux': engine_iface%self.mux[1],
273 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
273 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
274 'task' : engine_iface%self.task[1],
274 'task' : engine_iface%self.task[1],
275 'iopub' : engine_iface%self.iopub[1],
275 'iopub' : engine_iface%self.iopub[1],
276 # 'monitor' : engine_iface%self.mon_port,
276 # 'monitor' : engine_iface%self.mon_port,
277 }
277 }
278
278
279 self.client_info = {
279 self.client_info = {
280 'control' : client_iface%self.control[0],
280 'control' : client_iface%self.control[0],
281 'mux': client_iface%self.mux[0],
281 'mux': client_iface%self.mux[0],
282 'task' : (scheme, client_iface%self.task[0]),
282 'task' : (scheme, client_iface%self.task[0]),
283 'iopub' : client_iface%self.iopub[0],
283 'iopub' : client_iface%self.iopub[0],
284 'notification': client_iface%self.notifier_port
284 'notification': client_iface%self.notifier_port
285 }
285 }
286 self.log.debug("Hub engine addrs: %s"%self.engine_info)
286 self.log.debug("Hub engine addrs: %s"%self.engine_info)
287 self.log.debug("Hub client addrs: %s"%self.client_info)
287 self.log.debug("Hub client addrs: %s"%self.client_info)
288
288
289 # resubmit stream
289 # resubmit stream
290 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
290 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
291 url = util.disambiguate_url(self.client_info['task'][-1])
291 url = util.disambiguate_url(self.client_info['task'][-1])
292 r.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session))
292 r.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session))
293 r.connect(url)
293 r.connect(url)
294
294
295 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
295 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
296 query=q, notifier=n, resubmit=r, db=self.db,
296 query=q, notifier=n, resubmit=r, db=self.db,
297 engine_info=self.engine_info, client_info=self.client_info,
297 engine_info=self.engine_info, client_info=self.client_info,
298 log=self.log)
298 log=self.log)
299
299
300
300
301 class Hub(SessionFactory):
301 class Hub(SessionFactory):
302 """The IPython Controller Hub with 0MQ connections
302 """The IPython Controller Hub with 0MQ connections
303
303
304 Parameters
304 Parameters
305 ==========
305 ==========
306 loop: zmq IOLoop instance
306 loop: zmq IOLoop instance
307 session: Session object
307 session: Session object
308 <removed> context: zmq context for creating new connections (?)
308 <removed> context: zmq context for creating new connections (?)
309 queue: ZMQStream for monitoring the command queue (SUB)
309 queue: ZMQStream for monitoring the command queue (SUB)
310 query: ZMQStream for engine registration and client queries requests (XREP)
310 query: ZMQStream for engine registration and client queries requests (XREP)
311 heartbeat: HeartMonitor object checking the pulse of the engines
311 heartbeat: HeartMonitor object checking the pulse of the engines
312 notifier: ZMQStream for broadcasting engine registration changes (PUB)
312 notifier: ZMQStream for broadcasting engine registration changes (PUB)
313 db: connection to db for out of memory logging of commands
313 db: connection to db for out of memory logging of commands
314 NotImplemented
314 NotImplemented
315 engine_info: dict of zmq connection information for engines to connect
315 engine_info: dict of zmq connection information for engines to connect
316 to the queues.
316 to the queues.
317 client_info: dict of zmq connection information for engines to connect
317 client_info: dict of zmq connection information for engines to connect
318 to the queues.
318 to the queues.
319 """
319 """
320 # internal data structures:
320 # internal data structures:
321 ids=Set() # engine IDs
321 ids=Set() # engine IDs
322 keytable=Dict()
322 keytable=Dict()
323 by_ident=Dict()
323 by_ident=Dict()
324 engines=Dict()
324 engines=Dict()
325 clients=Dict()
325 clients=Dict()
326 hearts=Dict()
326 hearts=Dict()
327 pending=Set()
327 pending=Set()
328 queues=Dict() # pending msg_ids keyed by engine_id
328 queues=Dict() # pending msg_ids keyed by engine_id
329 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
329 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
330 completed=Dict() # completed msg_ids keyed by engine_id
330 completed=Dict() # completed msg_ids keyed by engine_id
331 all_completed=Set() # completed msg_ids keyed by engine_id
331 all_completed=Set() # completed msg_ids keyed by engine_id
332 dead_engines=Set() # completed msg_ids keyed by engine_id
332 dead_engines=Set() # completed msg_ids keyed by engine_id
333 unassigned=Set() # set of task msg_ds not yet assigned a destination
333 unassigned=Set() # set of task msg_ds not yet assigned a destination
334 incoming_registrations=Dict()
334 incoming_registrations=Dict()
335 registration_timeout=Int()
335 registration_timeout=Int()
336 _idcounter=Int(0)
336 _idcounter=Int(0)
337
337
338 # objects from constructor:
338 # objects from constructor:
339 query=Instance(ZMQStream)
339 query=Instance(ZMQStream)
340 monitor=Instance(ZMQStream)
340 monitor=Instance(ZMQStream)
341 notifier=Instance(ZMQStream)
341 notifier=Instance(ZMQStream)
342 resubmit=Instance(ZMQStream)
342 resubmit=Instance(ZMQStream)
343 heartmonitor=Instance(HeartMonitor)
343 heartmonitor=Instance(HeartMonitor)
344 db=Instance(object)
344 db=Instance(object)
345 client_info=Dict()
345 client_info=Dict()
346 engine_info=Dict()
346 engine_info=Dict()
347
347
348
348
349 def __init__(self, **kwargs):
349 def __init__(self, **kwargs):
350 """
350 """
351 # universal:
351 # universal:
352 loop: IOLoop for creating future connections
352 loop: IOLoop for creating future connections
353 session: streamsession for sending serialized data
353 session: streamsession for sending serialized data
354 # engine:
354 # engine:
355 queue: ZMQStream for monitoring queue messages
355 queue: ZMQStream for monitoring queue messages
356 query: ZMQStream for engine+client registration and client requests
356 query: ZMQStream for engine+client registration and client requests
357 heartbeat: HeartMonitor object for tracking engines
357 heartbeat: HeartMonitor object for tracking engines
358 # extra:
358 # extra:
359 db: ZMQStream for db connection (NotImplemented)
359 db: ZMQStream for db connection (NotImplemented)
360 engine_info: zmq address/protocol dict for engine connections
360 engine_info: zmq address/protocol dict for engine connections
361 client_info: zmq address/protocol dict for client connections
361 client_info: zmq address/protocol dict for client connections
362 """
362 """
363
363
364 super(Hub, self).__init__(**kwargs)
364 super(Hub, self).__init__(**kwargs)
365 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
365 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
366
366
367 # validate connection dicts:
367 # validate connection dicts:
368 for k,v in self.client_info.iteritems():
368 for k,v in self.client_info.iteritems():
369 if k == 'task':
369 if k == 'task':
370 util.validate_url_container(v[1])
370 util.validate_url_container(v[1])
371 else:
371 else:
372 util.validate_url_container(v)
372 util.validate_url_container(v)
373 # util.validate_url_container(self.client_info)
373 # util.validate_url_container(self.client_info)
374 util.validate_url_container(self.engine_info)
374 util.validate_url_container(self.engine_info)
375
375
376 # register our callbacks
376 # register our callbacks
377 self.query.on_recv(self.dispatch_query)
377 self.query.on_recv(self.dispatch_query)
378 self.monitor.on_recv(self.dispatch_monitor_traffic)
378 self.monitor.on_recv(self.dispatch_monitor_traffic)
379
379
380 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
380 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
381 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
381 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
382
382
383 self.monitor_handlers = {b'in' : self.save_queue_request,
383 self.monitor_handlers = {b'in' : self.save_queue_request,
384 b'out': self.save_queue_result,
384 b'out': self.save_queue_result,
385 b'intask': self.save_task_request,
385 b'intask': self.save_task_request,
386 b'outtask': self.save_task_result,
386 b'outtask': self.save_task_result,
387 b'tracktask': self.save_task_destination,
387 b'tracktask': self.save_task_destination,
388 b'incontrol': _passer,
388 b'incontrol': _passer,
389 b'outcontrol': _passer,
389 b'outcontrol': _passer,
390 b'iopub': self.save_iopub_message,
390 b'iopub': self.save_iopub_message,
391 }
391 }
392
392
393 self.query_handlers = {'queue_request': self.queue_status,
393 self.query_handlers = {'queue_request': self.queue_status,
394 'result_request': self.get_results,
394 'result_request': self.get_results,
395 'history_request': self.get_history,
395 'history_request': self.get_history,
396 'db_request': self.db_query,
396 'db_request': self.db_query,
397 'purge_request': self.purge_results,
397 'purge_request': self.purge_results,
398 'load_request': self.check_load,
398 'load_request': self.check_load,
399 'resubmit_request': self.resubmit_task,
399 'resubmit_request': self.resubmit_task,
400 'shutdown_request': self.shutdown_request,
400 'shutdown_request': self.shutdown_request,
401 'registration_request' : self.register_engine,
401 'registration_request' : self.register_engine,
402 'unregistration_request' : self.unregister_engine,
402 'unregistration_request' : self.unregister_engine,
403 'connection_request': self.connection_request,
403 'connection_request': self.connection_request,
404 }
404 }
405
405
406 # ignore resubmit replies
406 # ignore resubmit replies
407 self.resubmit.on_recv(lambda msg: None, copy=False)
407 self.resubmit.on_recv(lambda msg: None, copy=False)
408
408
409 self.log.info("hub::created hub")
409 self.log.info("hub::created hub")
410
410
411 @property
411 @property
412 def _next_id(self):
412 def _next_id(self):
413 """gemerate a new ID.
413 """gemerate a new ID.
414
414
415 No longer reuse old ids, just count from 0."""
415 No longer reuse old ids, just count from 0."""
416 newid = self._idcounter
416 newid = self._idcounter
417 self._idcounter += 1
417 self._idcounter += 1
418 return newid
418 return newid
419 # newid = 0
419 # newid = 0
420 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
420 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
421 # # print newid, self.ids, self.incoming_registrations
421 # # print newid, self.ids, self.incoming_registrations
422 # while newid in self.ids or newid in incoming:
422 # while newid in self.ids or newid in incoming:
423 # newid += 1
423 # newid += 1
424 # return newid
424 # return newid
425
425
426 #-----------------------------------------------------------------------------
426 #-----------------------------------------------------------------------------
427 # message validation
427 # message validation
428 #-----------------------------------------------------------------------------
428 #-----------------------------------------------------------------------------
429
429
430 def _validate_targets(self, targets):
430 def _validate_targets(self, targets):
431 """turn any valid targets argument into a list of integer ids"""
431 """turn any valid targets argument into a list of integer ids"""
432 if targets is None:
432 if targets is None:
433 # default to all
433 # default to all
434 targets = self.ids
434 targets = self.ids
435
435
436 if isinstance(targets, (int,str,unicode)):
436 if isinstance(targets, (int,str,unicode)):
437 # only one target specified
437 # only one target specified
438 targets = [targets]
438 targets = [targets]
439 _targets = []
439 _targets = []
440 for t in targets:
440 for t in targets:
441 # map raw identities to ids
441 # map raw identities to ids
442 if isinstance(t, (str,unicode)):
442 if isinstance(t, (str,unicode)):
443 t = self.by_ident.get(t, t)
443 t = self.by_ident.get(t, t)
444 _targets.append(t)
444 _targets.append(t)
445 targets = _targets
445 targets = _targets
446 bad_targets = [ t for t in targets if t not in self.ids ]
446 bad_targets = [ t for t in targets if t not in self.ids ]
447 if bad_targets:
447 if bad_targets:
448 raise IndexError("No Such Engine: %r"%bad_targets)
448 raise IndexError("No Such Engine: %r"%bad_targets)
449 if not targets:
449 if not targets:
450 raise IndexError("No Engines Registered")
450 raise IndexError("No Engines Registered")
451 return targets
451 return targets
452
452
453 #-----------------------------------------------------------------------------
453 #-----------------------------------------------------------------------------
454 # dispatch methods (1 per stream)
454 # dispatch methods (1 per stream)
455 #-----------------------------------------------------------------------------
455 #-----------------------------------------------------------------------------
456
456
457
457
458 def dispatch_monitor_traffic(self, msg):
458 def dispatch_monitor_traffic(self, msg):
459 """all ME and Task queue messages come through here, as well as
459 """all ME and Task queue messages come through here, as well as
460 IOPub traffic."""
460 IOPub traffic."""
461 self.log.debug("monitor traffic: %r"%msg[:2])
461 self.log.debug("monitor traffic: %r"%msg[:2])
462 switch = msg[0]
462 switch = msg[0]
463 try:
463 try:
464 idents, msg = self.session.feed_identities(msg[1:])
464 idents, msg = self.session.feed_identities(msg[1:])
465 except ValueError:
465 except ValueError:
466 idents=[]
466 idents=[]
467 if not idents:
467 if not idents:
468 self.log.error("Bad Monitor Message: %r"%msg)
468 self.log.error("Bad Monitor Message: %r"%msg)
469 return
469 return
470 handler = self.monitor_handlers.get(switch, None)
470 handler = self.monitor_handlers.get(switch, None)
471 if handler is not None:
471 if handler is not None:
472 handler(idents, msg)
472 handler(idents, msg)
473 else:
473 else:
474 self.log.error("Invalid monitor topic: %r"%switch)
474 self.log.error("Invalid monitor topic: %r"%switch)
475
475
476
476
477 def dispatch_query(self, msg):
477 def dispatch_query(self, msg):
478 """Route registration requests and queries from clients."""
478 """Route registration requests and queries from clients."""
479 try:
479 try:
480 idents, msg = self.session.feed_identities(msg)
480 idents, msg = self.session.feed_identities(msg)
481 except ValueError:
481 except ValueError:
482 idents = []
482 idents = []
483 if not idents:
483 if not idents:
484 self.log.error("Bad Query Message: %r"%msg)
484 self.log.error("Bad Query Message: %r"%msg)
485 return
485 return
486 client_id = idents[0]
486 client_id = idents[0]
487 try:
487 try:
488 msg = self.session.unpack_message(msg, content=True)
488 msg = self.session.unserialize(msg, content=True)
489 except Exception:
489 except Exception:
490 content = error.wrap_exception()
490 content = error.wrap_exception()
491 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
491 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
492 self.session.send(self.query, "hub_error", ident=client_id,
492 self.session.send(self.query, "hub_error", ident=client_id,
493 content=content)
493 content=content)
494 return
494 return
495 # print client_id, header, parent, content
495 # print client_id, header, parent, content
496 #switch on message type:
496 #switch on message type:
497 msg_type = msg['header']['msg_type']
497 msg_type = msg['header']['msg_type']
498 self.log.info("client::client %r requested %r"%(client_id, msg_type))
498 self.log.info("client::client %r requested %r"%(client_id, msg_type))
499 handler = self.query_handlers.get(msg_type, None)
499 handler = self.query_handlers.get(msg_type, None)
500 try:
500 try:
501 assert handler is not None, "Bad Message Type: %r"%msg_type
501 assert handler is not None, "Bad Message Type: %r"%msg_type
502 except:
502 except:
503 content = error.wrap_exception()
503 content = error.wrap_exception()
504 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
504 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
505 self.session.send(self.query, "hub_error", ident=client_id,
505 self.session.send(self.query, "hub_error", ident=client_id,
506 content=content)
506 content=content)
507 return
507 return
508
508
509 else:
509 else:
510 handler(idents, msg)
510 handler(idents, msg)
511
511
512 def dispatch_db(self, msg):
512 def dispatch_db(self, msg):
513 """"""
513 """"""
514 raise NotImplementedError
514 raise NotImplementedError
515
515
516 #---------------------------------------------------------------------------
516 #---------------------------------------------------------------------------
517 # handler methods (1 per event)
517 # handler methods (1 per event)
518 #---------------------------------------------------------------------------
518 #---------------------------------------------------------------------------
519
519
520 #----------------------- Heartbeat --------------------------------------
520 #----------------------- Heartbeat --------------------------------------
521
521
522 def handle_new_heart(self, heart):
522 def handle_new_heart(self, heart):
523 """handler to attach to heartbeater.
523 """handler to attach to heartbeater.
524 Called when a new heart starts to beat.
524 Called when a new heart starts to beat.
525 Triggers completion of registration."""
525 Triggers completion of registration."""
526 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
526 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
527 if heart not in self.incoming_registrations:
527 if heart not in self.incoming_registrations:
528 self.log.info("heartbeat::ignoring new heart: %r"%heart)
528 self.log.info("heartbeat::ignoring new heart: %r"%heart)
529 else:
529 else:
530 self.finish_registration(heart)
530 self.finish_registration(heart)
531
531
532
532
533 def handle_heart_failure(self, heart):
533 def handle_heart_failure(self, heart):
534 """handler to attach to heartbeater.
534 """handler to attach to heartbeater.
535 called when a previously registered heart fails to respond to beat request.
535 called when a previously registered heart fails to respond to beat request.
536 triggers unregistration"""
536 triggers unregistration"""
537 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
537 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
538 eid = self.hearts.get(heart, None)
538 eid = self.hearts.get(heart, None)
539 queue = self.engines[eid].queue
539 queue = self.engines[eid].queue
540 if eid is None:
540 if eid is None:
541 self.log.info("heartbeat::ignoring heart failure %r"%heart)
541 self.log.info("heartbeat::ignoring heart failure %r"%heart)
542 else:
542 else:
543 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
543 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
544
544
545 #----------------------- MUX Queue Traffic ------------------------------
545 #----------------------- MUX Queue Traffic ------------------------------
546
546
547 def save_queue_request(self, idents, msg):
547 def save_queue_request(self, idents, msg):
548 if len(idents) < 2:
548 if len(idents) < 2:
549 self.log.error("invalid identity prefix: %r"%idents)
549 self.log.error("invalid identity prefix: %r"%idents)
550 return
550 return
551 queue_id, client_id = idents[:2]
551 queue_id, client_id = idents[:2]
552 try:
552 try:
553 msg = self.session.unpack_message(msg)
553 msg = self.session.unserialize(msg)
554 except Exception:
554 except Exception:
555 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
555 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
556 return
556 return
557
557
558 eid = self.by_ident.get(queue_id, None)
558 eid = self.by_ident.get(queue_id, None)
559 if eid is None:
559 if eid is None:
560 self.log.error("queue::target %r not registered"%queue_id)
560 self.log.error("queue::target %r not registered"%queue_id)
561 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
561 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
562 return
562 return
563 record = init_record(msg)
563 record = init_record(msg)
564 msg_id = record['msg_id']
564 msg_id = record['msg_id']
565 # Unicode in records
565 # Unicode in records
566 record['engine_uuid'] = queue_id.decode('ascii')
566 record['engine_uuid'] = queue_id.decode('ascii')
567 record['client_uuid'] = client_id.decode('ascii')
567 record['client_uuid'] = client_id.decode('ascii')
568 record['queue'] = 'mux'
568 record['queue'] = 'mux'
569
569
570 try:
570 try:
571 # it's posible iopub arrived first:
571 # it's posible iopub arrived first:
572 existing = self.db.get_record(msg_id)
572 existing = self.db.get_record(msg_id)
573 for key,evalue in existing.iteritems():
573 for key,evalue in existing.iteritems():
574 rvalue = record.get(key, None)
574 rvalue = record.get(key, None)
575 if evalue and rvalue and evalue != rvalue:
575 if evalue and rvalue and evalue != rvalue:
576 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
576 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
577 elif evalue and not rvalue:
577 elif evalue and not rvalue:
578 record[key] = evalue
578 record[key] = evalue
579 try:
579 try:
580 self.db.update_record(msg_id, record)
580 self.db.update_record(msg_id, record)
581 except Exception:
581 except Exception:
582 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
582 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
583 except KeyError:
583 except KeyError:
584 try:
584 try:
585 self.db.add_record(msg_id, record)
585 self.db.add_record(msg_id, record)
586 except Exception:
586 except Exception:
587 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
587 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
588
588
589
589
590 self.pending.add(msg_id)
590 self.pending.add(msg_id)
591 self.queues[eid].append(msg_id)
591 self.queues[eid].append(msg_id)
592
592
593 def save_queue_result(self, idents, msg):
593 def save_queue_result(self, idents, msg):
594 if len(idents) < 2:
594 if len(idents) < 2:
595 self.log.error("invalid identity prefix: %r"%idents)
595 self.log.error("invalid identity prefix: %r"%idents)
596 return
596 return
597
597
598 client_id, queue_id = idents[:2]
598 client_id, queue_id = idents[:2]
599 try:
599 try:
600 msg = self.session.unpack_message(msg)
600 msg = self.session.unserialize(msg)
601 except Exception:
601 except Exception:
602 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
602 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
603 queue_id,client_id, msg), exc_info=True)
603 queue_id,client_id, msg), exc_info=True)
604 return
604 return
605
605
606 eid = self.by_ident.get(queue_id, None)
606 eid = self.by_ident.get(queue_id, None)
607 if eid is None:
607 if eid is None:
608 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
608 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
609 return
609 return
610
610
611 parent = msg['parent_header']
611 parent = msg['parent_header']
612 if not parent:
612 if not parent:
613 return
613 return
614 msg_id = parent['msg_id']
614 msg_id = parent['msg_id']
615 if msg_id in self.pending:
615 if msg_id in self.pending:
616 self.pending.remove(msg_id)
616 self.pending.remove(msg_id)
617 self.all_completed.add(msg_id)
617 self.all_completed.add(msg_id)
618 self.queues[eid].remove(msg_id)
618 self.queues[eid].remove(msg_id)
619 self.completed[eid].append(msg_id)
619 self.completed[eid].append(msg_id)
620 elif msg_id not in self.all_completed:
620 elif msg_id not in self.all_completed:
621 # it could be a result from a dead engine that died before delivering the
621 # it could be a result from a dead engine that died before delivering the
622 # result
622 # result
623 self.log.warn("queue:: unknown msg finished %r"%msg_id)
623 self.log.warn("queue:: unknown msg finished %r"%msg_id)
624 return
624 return
625 # update record anyway, because the unregistration could have been premature
625 # update record anyway, because the unregistration could have been premature
626 rheader = msg['header']
626 rheader = msg['header']
627 completed = rheader['date']
627 completed = rheader['date']
628 started = rheader.get('started', None)
628 started = rheader.get('started', None)
629 result = {
629 result = {
630 'result_header' : rheader,
630 'result_header' : rheader,
631 'result_content': msg['content'],
631 'result_content': msg['content'],
632 'started' : started,
632 'started' : started,
633 'completed' : completed
633 'completed' : completed
634 }
634 }
635
635
636 result['result_buffers'] = msg['buffers']
636 result['result_buffers'] = msg['buffers']
637 try:
637 try:
638 self.db.update_record(msg_id, result)
638 self.db.update_record(msg_id, result)
639 except Exception:
639 except Exception:
640 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
640 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
641
641
642
642
643 #--------------------- Task Queue Traffic ------------------------------
643 #--------------------- Task Queue Traffic ------------------------------
644
644
645 def save_task_request(self, idents, msg):
645 def save_task_request(self, idents, msg):
646 """Save the submission of a task."""
646 """Save the submission of a task."""
647 client_id = idents[0]
647 client_id = idents[0]
648
648
649 try:
649 try:
650 msg = self.session.unpack_message(msg)
650 msg = self.session.unserialize(msg)
651 except Exception:
651 except Exception:
652 self.log.error("task::client %r sent invalid task message: %r"%(
652 self.log.error("task::client %r sent invalid task message: %r"%(
653 client_id, msg), exc_info=True)
653 client_id, msg), exc_info=True)
654 return
654 return
655 record = init_record(msg)
655 record = init_record(msg)
656
656
657 record['client_uuid'] = client_id
657 record['client_uuid'] = client_id
658 record['queue'] = 'task'
658 record['queue'] = 'task'
659 header = msg['header']
659 header = msg['header']
660 msg_id = header['msg_id']
660 msg_id = header['msg_id']
661 self.pending.add(msg_id)
661 self.pending.add(msg_id)
662 self.unassigned.add(msg_id)
662 self.unassigned.add(msg_id)
663 try:
663 try:
664 # it's posible iopub arrived first:
664 # it's posible iopub arrived first:
665 existing = self.db.get_record(msg_id)
665 existing = self.db.get_record(msg_id)
666 if existing['resubmitted']:
666 if existing['resubmitted']:
667 for key in ('submitted', 'client_uuid', 'buffers'):
667 for key in ('submitted', 'client_uuid', 'buffers'):
668 # don't clobber these keys on resubmit
668 # don't clobber these keys on resubmit
669 # submitted and client_uuid should be different
669 # submitted and client_uuid should be different
670 # and buffers might be big, and shouldn't have changed
670 # and buffers might be big, and shouldn't have changed
671 record.pop(key)
671 record.pop(key)
672 # still check content,header which should not change
672 # still check content,header which should not change
673 # but are not expensive to compare as buffers
673 # but are not expensive to compare as buffers
674
674
675 for key,evalue in existing.iteritems():
675 for key,evalue in existing.iteritems():
676 if key.endswith('buffers'):
676 if key.endswith('buffers'):
677 # don't compare buffers
677 # don't compare buffers
678 continue
678 continue
679 rvalue = record.get(key, None)
679 rvalue = record.get(key, None)
680 if evalue and rvalue and evalue != rvalue:
680 if evalue and rvalue and evalue != rvalue:
681 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
681 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
682 elif evalue and not rvalue:
682 elif evalue and not rvalue:
683 record[key] = evalue
683 record[key] = evalue
684 try:
684 try:
685 self.db.update_record(msg_id, record)
685 self.db.update_record(msg_id, record)
686 except Exception:
686 except Exception:
687 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
687 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
688 except KeyError:
688 except KeyError:
689 try:
689 try:
690 self.db.add_record(msg_id, record)
690 self.db.add_record(msg_id, record)
691 except Exception:
691 except Exception:
692 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
692 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
693 except Exception:
693 except Exception:
694 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
694 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
695
695
696 def save_task_result(self, idents, msg):
696 def save_task_result(self, idents, msg):
697 """save the result of a completed task."""
697 """save the result of a completed task."""
698 client_id = idents[0]
698 client_id = idents[0]
699 try:
699 try:
700 msg = self.session.unpack_message(msg)
700 msg = self.session.unserialize(msg)
701 except Exception:
701 except Exception:
702 self.log.error("task::invalid task result message send to %r: %r"%(
702 self.log.error("task::invalid task result message send to %r: %r"%(
703 client_id, msg), exc_info=True)
703 client_id, msg), exc_info=True)
704 return
704 return
705
705
706 parent = msg['parent_header']
706 parent = msg['parent_header']
707 if not parent:
707 if not parent:
708 # print msg
708 # print msg
709 self.log.warn("Task %r had no parent!"%msg)
709 self.log.warn("Task %r had no parent!"%msg)
710 return
710 return
711 msg_id = parent['msg_id']
711 msg_id = parent['msg_id']
712 if msg_id in self.unassigned:
712 if msg_id in self.unassigned:
713 self.unassigned.remove(msg_id)
713 self.unassigned.remove(msg_id)
714
714
715 header = msg['header']
715 header = msg['header']
716 engine_uuid = header.get('engine', None)
716 engine_uuid = header.get('engine', None)
717 eid = self.by_ident.get(engine_uuid, None)
717 eid = self.by_ident.get(engine_uuid, None)
718
718
719 if msg_id in self.pending:
719 if msg_id in self.pending:
720 self.pending.remove(msg_id)
720 self.pending.remove(msg_id)
721 self.all_completed.add(msg_id)
721 self.all_completed.add(msg_id)
722 if eid is not None:
722 if eid is not None:
723 self.completed[eid].append(msg_id)
723 self.completed[eid].append(msg_id)
724 if msg_id in self.tasks[eid]:
724 if msg_id in self.tasks[eid]:
725 self.tasks[eid].remove(msg_id)
725 self.tasks[eid].remove(msg_id)
726 completed = header['date']
726 completed = header['date']
727 started = header.get('started', None)
727 started = header.get('started', None)
728 result = {
728 result = {
729 'result_header' : header,
729 'result_header' : header,
730 'result_content': msg['content'],
730 'result_content': msg['content'],
731 'started' : started,
731 'started' : started,
732 'completed' : completed,
732 'completed' : completed,
733 'engine_uuid': engine_uuid
733 'engine_uuid': engine_uuid
734 }
734 }
735
735
736 result['result_buffers'] = msg['buffers']
736 result['result_buffers'] = msg['buffers']
737 try:
737 try:
738 self.db.update_record(msg_id, result)
738 self.db.update_record(msg_id, result)
739 except Exception:
739 except Exception:
740 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
740 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
741
741
742 else:
742 else:
743 self.log.debug("task::unknown task %r finished"%msg_id)
743 self.log.debug("task::unknown task %r finished"%msg_id)
744
744
745 def save_task_destination(self, idents, msg):
745 def save_task_destination(self, idents, msg):
746 try:
746 try:
747 msg = self.session.unpack_message(msg, content=True)
747 msg = self.session.unserialize(msg, content=True)
748 except Exception:
748 except Exception:
749 self.log.error("task::invalid task tracking message", exc_info=True)
749 self.log.error("task::invalid task tracking message", exc_info=True)
750 return
750 return
751 content = msg['content']
751 content = msg['content']
752 # print (content)
752 # print (content)
753 msg_id = content['msg_id']
753 msg_id = content['msg_id']
754 engine_uuid = content['engine_id']
754 engine_uuid = content['engine_id']
755 eid = self.by_ident[util.asbytes(engine_uuid)]
755 eid = self.by_ident[util.asbytes(engine_uuid)]
756
756
757 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
757 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
758 if msg_id in self.unassigned:
758 if msg_id in self.unassigned:
759 self.unassigned.remove(msg_id)
759 self.unassigned.remove(msg_id)
760 # else:
760 # else:
761 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
761 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
762
762
763 self.tasks[eid].append(msg_id)
763 self.tasks[eid].append(msg_id)
764 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
764 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
765 try:
765 try:
766 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
766 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
767 except Exception:
767 except Exception:
768 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
768 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
769
769
770
770
771 def mia_task_request(self, idents, msg):
771 def mia_task_request(self, idents, msg):
772 raise NotImplementedError
772 raise NotImplementedError
773 client_id = idents[0]
773 client_id = idents[0]
774 # content = dict(mia=self.mia,status='ok')
774 # content = dict(mia=self.mia,status='ok')
775 # self.session.send('mia_reply', content=content, idents=client_id)
775 # self.session.send('mia_reply', content=content, idents=client_id)
776
776
777
777
778 #--------------------- IOPub Traffic ------------------------------
778 #--------------------- IOPub Traffic ------------------------------
779
779
780 def save_iopub_message(self, topics, msg):
780 def save_iopub_message(self, topics, msg):
781 """save an iopub message into the db"""
781 """save an iopub message into the db"""
782 # print (topics)
782 # print (topics)
783 try:
783 try:
784 msg = self.session.unpack_message(msg, content=True)
784 msg = self.session.unserialize(msg, content=True)
785 except Exception:
785 except Exception:
786 self.log.error("iopub::invalid IOPub message", exc_info=True)
786 self.log.error("iopub::invalid IOPub message", exc_info=True)
787 return
787 return
788
788
789 parent = msg['parent_header']
789 parent = msg['parent_header']
790 if not parent:
790 if not parent:
791 self.log.error("iopub::invalid IOPub message: %r"%msg)
791 self.log.error("iopub::invalid IOPub message: %r"%msg)
792 return
792 return
793 msg_id = parent['msg_id']
793 msg_id = parent['msg_id']
794 msg_type = msg['header']['msg_type']
794 msg_type = msg['header']['msg_type']
795 content = msg['content']
795 content = msg['content']
796
796
797 # ensure msg_id is in db
797 # ensure msg_id is in db
798 try:
798 try:
799 rec = self.db.get_record(msg_id)
799 rec = self.db.get_record(msg_id)
800 except KeyError:
800 except KeyError:
801 rec = empty_record()
801 rec = empty_record()
802 rec['msg_id'] = msg_id
802 rec['msg_id'] = msg_id
803 self.db.add_record(msg_id, rec)
803 self.db.add_record(msg_id, rec)
804 # stream
804 # stream
805 d = {}
805 d = {}
806 if msg_type == 'stream':
806 if msg_type == 'stream':
807 name = content['name']
807 name = content['name']
808 s = rec[name] or ''
808 s = rec[name] or ''
809 d[name] = s + content['data']
809 d[name] = s + content['data']
810
810
811 elif msg_type == 'pyerr':
811 elif msg_type == 'pyerr':
812 d['pyerr'] = content
812 d['pyerr'] = content
813 elif msg_type == 'pyin':
813 elif msg_type == 'pyin':
814 d['pyin'] = content['code']
814 d['pyin'] = content['code']
815 else:
815 else:
816 d[msg_type] = content.get('data', '')
816 d[msg_type] = content.get('data', '')
817
817
818 try:
818 try:
819 self.db.update_record(msg_id, d)
819 self.db.update_record(msg_id, d)
820 except Exception:
820 except Exception:
821 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
821 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
822
822
823
823
824
824
825 #-------------------------------------------------------------------------
825 #-------------------------------------------------------------------------
826 # Registration requests
826 # Registration requests
827 #-------------------------------------------------------------------------
827 #-------------------------------------------------------------------------
828
828
829 def connection_request(self, client_id, msg):
829 def connection_request(self, client_id, msg):
830 """Reply with connection addresses for clients."""
830 """Reply with connection addresses for clients."""
831 self.log.info("client::client %r connected"%client_id)
831 self.log.info("client::client %r connected"%client_id)
832 content = dict(status='ok')
832 content = dict(status='ok')
833 content.update(self.client_info)
833 content.update(self.client_info)
834 jsonable = {}
834 jsonable = {}
835 for k,v in self.keytable.iteritems():
835 for k,v in self.keytable.iteritems():
836 if v not in self.dead_engines:
836 if v not in self.dead_engines:
837 jsonable[str(k)] = v.decode('ascii')
837 jsonable[str(k)] = v.decode('ascii')
838 content['engines'] = jsonable
838 content['engines'] = jsonable
839 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
839 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
840
840
841 def register_engine(self, reg, msg):
841 def register_engine(self, reg, msg):
842 """Register a new engine."""
842 """Register a new engine."""
843 content = msg['content']
843 content = msg['content']
844 try:
844 try:
845 queue = util.asbytes(content['queue'])
845 queue = util.asbytes(content['queue'])
846 except KeyError:
846 except KeyError:
847 self.log.error("registration::queue not specified", exc_info=True)
847 self.log.error("registration::queue not specified", exc_info=True)
848 return
848 return
849 heart = content.get('heartbeat', None)
849 heart = content.get('heartbeat', None)
850 if heart:
850 if heart:
851 heart = util.asbytes(heart)
851 heart = util.asbytes(heart)
852 """register a new engine, and create the socket(s) necessary"""
852 """register a new engine, and create the socket(s) necessary"""
853 eid = self._next_id
853 eid = self._next_id
854 # print (eid, queue, reg, heart)
854 # print (eid, queue, reg, heart)
855
855
856 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
856 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
857
857
858 content = dict(id=eid,status='ok')
858 content = dict(id=eid,status='ok')
859 content.update(self.engine_info)
859 content.update(self.engine_info)
860 # check if requesting available IDs:
860 # check if requesting available IDs:
861 if queue in self.by_ident:
861 if queue in self.by_ident:
862 try:
862 try:
863 raise KeyError("queue_id %r in use"%queue)
863 raise KeyError("queue_id %r in use"%queue)
864 except:
864 except:
865 content = error.wrap_exception()
865 content = error.wrap_exception()
866 self.log.error("queue_id %r in use"%queue, exc_info=True)
866 self.log.error("queue_id %r in use"%queue, exc_info=True)
867 elif heart in self.hearts: # need to check unique hearts?
867 elif heart in self.hearts: # need to check unique hearts?
868 try:
868 try:
869 raise KeyError("heart_id %r in use"%heart)
869 raise KeyError("heart_id %r in use"%heart)
870 except:
870 except:
871 self.log.error("heart_id %r in use"%heart, exc_info=True)
871 self.log.error("heart_id %r in use"%heart, exc_info=True)
872 content = error.wrap_exception()
872 content = error.wrap_exception()
873 else:
873 else:
874 for h, pack in self.incoming_registrations.iteritems():
874 for h, pack in self.incoming_registrations.iteritems():
875 if heart == h:
875 if heart == h:
876 try:
876 try:
877 raise KeyError("heart_id %r in use"%heart)
877 raise KeyError("heart_id %r in use"%heart)
878 except:
878 except:
879 self.log.error("heart_id %r in use"%heart, exc_info=True)
879 self.log.error("heart_id %r in use"%heart, exc_info=True)
880 content = error.wrap_exception()
880 content = error.wrap_exception()
881 break
881 break
882 elif queue == pack[1]:
882 elif queue == pack[1]:
883 try:
883 try:
884 raise KeyError("queue_id %r in use"%queue)
884 raise KeyError("queue_id %r in use"%queue)
885 except:
885 except:
886 self.log.error("queue_id %r in use"%queue, exc_info=True)
886 self.log.error("queue_id %r in use"%queue, exc_info=True)
887 content = error.wrap_exception()
887 content = error.wrap_exception()
888 break
888 break
889
889
890 msg = self.session.send(self.query, "registration_reply",
890 msg = self.session.send(self.query, "registration_reply",
891 content=content,
891 content=content,
892 ident=reg)
892 ident=reg)
893
893
894 if content['status'] == 'ok':
894 if content['status'] == 'ok':
895 if heart in self.heartmonitor.hearts:
895 if heart in self.heartmonitor.hearts:
896 # already beating
896 # already beating
897 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
897 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
898 self.finish_registration(heart)
898 self.finish_registration(heart)
899 else:
899 else:
900 purge = lambda : self._purge_stalled_registration(heart)
900 purge = lambda : self._purge_stalled_registration(heart)
901 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
901 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
902 dc.start()
902 dc.start()
903 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
903 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
904 else:
904 else:
905 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
905 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
906 return eid
906 return eid
907
907
908 def unregister_engine(self, ident, msg):
908 def unregister_engine(self, ident, msg):
909 """Unregister an engine that explicitly requested to leave."""
909 """Unregister an engine that explicitly requested to leave."""
910 try:
910 try:
911 eid = msg['content']['id']
911 eid = msg['content']['id']
912 except:
912 except:
913 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
913 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
914 return
914 return
915 self.log.info("registration::unregister_engine(%r)"%eid)
915 self.log.info("registration::unregister_engine(%r)"%eid)
916 # print (eid)
916 # print (eid)
917 uuid = self.keytable[eid]
917 uuid = self.keytable[eid]
918 content=dict(id=eid, queue=uuid.decode('ascii'))
918 content=dict(id=eid, queue=uuid.decode('ascii'))
919 self.dead_engines.add(uuid)
919 self.dead_engines.add(uuid)
920 # self.ids.remove(eid)
920 # self.ids.remove(eid)
921 # uuid = self.keytable.pop(eid)
921 # uuid = self.keytable.pop(eid)
922 #
922 #
923 # ec = self.engines.pop(eid)
923 # ec = self.engines.pop(eid)
924 # self.hearts.pop(ec.heartbeat)
924 # self.hearts.pop(ec.heartbeat)
925 # self.by_ident.pop(ec.queue)
925 # self.by_ident.pop(ec.queue)
926 # self.completed.pop(eid)
926 # self.completed.pop(eid)
927 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
927 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
928 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
928 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
929 dc.start()
929 dc.start()
930 ############## TODO: HANDLE IT ################
930 ############## TODO: HANDLE IT ################
931
931
932 if self.notifier:
932 if self.notifier:
933 self.session.send(self.notifier, "unregistration_notification", content=content)
933 self.session.send(self.notifier, "unregistration_notification", content=content)
934
934
935 def _handle_stranded_msgs(self, eid, uuid):
935 def _handle_stranded_msgs(self, eid, uuid):
936 """Handle messages known to be on an engine when the engine unregisters.
936 """Handle messages known to be on an engine when the engine unregisters.
937
937
938 It is possible that this will fire prematurely - that is, an engine will
938 It is possible that this will fire prematurely - that is, an engine will
939 go down after completing a result, and the client will be notified
939 go down after completing a result, and the client will be notified
940 that the result failed and later receive the actual result.
940 that the result failed and later receive the actual result.
941 """
941 """
942
942
943 outstanding = self.queues[eid]
943 outstanding = self.queues[eid]
944
944
945 for msg_id in outstanding:
945 for msg_id in outstanding:
946 self.pending.remove(msg_id)
946 self.pending.remove(msg_id)
947 self.all_completed.add(msg_id)
947 self.all_completed.add(msg_id)
948 try:
948 try:
949 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
949 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
950 except:
950 except:
951 content = error.wrap_exception()
951 content = error.wrap_exception()
952 # build a fake header:
952 # build a fake header:
953 header = {}
953 header = {}
954 header['engine'] = uuid
954 header['engine'] = uuid
955 header['date'] = datetime.now()
955 header['date'] = datetime.now()
956 rec = dict(result_content=content, result_header=header, result_buffers=[])
956 rec = dict(result_content=content, result_header=header, result_buffers=[])
957 rec['completed'] = header['date']
957 rec['completed'] = header['date']
958 rec['engine_uuid'] = uuid
958 rec['engine_uuid'] = uuid
959 try:
959 try:
960 self.db.update_record(msg_id, rec)
960 self.db.update_record(msg_id, rec)
961 except Exception:
961 except Exception:
962 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
962 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
963
963
964
964
965 def finish_registration(self, heart):
965 def finish_registration(self, heart):
966 """Second half of engine registration, called after our HeartMonitor
966 """Second half of engine registration, called after our HeartMonitor
967 has received a beat from the Engine's Heart."""
967 has received a beat from the Engine's Heart."""
968 try:
968 try:
969 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
969 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
970 except KeyError:
970 except KeyError:
971 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
971 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
972 return
972 return
973 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
973 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
974 if purge is not None:
974 if purge is not None:
975 purge.stop()
975 purge.stop()
976 control = queue
976 control = queue
977 self.ids.add(eid)
977 self.ids.add(eid)
978 self.keytable[eid] = queue
978 self.keytable[eid] = queue
979 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
979 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
980 control=control, heartbeat=heart)
980 control=control, heartbeat=heart)
981 self.by_ident[queue] = eid
981 self.by_ident[queue] = eid
982 self.queues[eid] = list()
982 self.queues[eid] = list()
983 self.tasks[eid] = list()
983 self.tasks[eid] = list()
984 self.completed[eid] = list()
984 self.completed[eid] = list()
985 self.hearts[heart] = eid
985 self.hearts[heart] = eid
986 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
986 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
987 if self.notifier:
987 if self.notifier:
988 self.session.send(self.notifier, "registration_notification", content=content)
988 self.session.send(self.notifier, "registration_notification", content=content)
989 self.log.info("engine::Engine Connected: %i"%eid)
989 self.log.info("engine::Engine Connected: %i"%eid)
990
990
991 def _purge_stalled_registration(self, heart):
991 def _purge_stalled_registration(self, heart):
992 if heart in self.incoming_registrations:
992 if heart in self.incoming_registrations:
993 eid = self.incoming_registrations.pop(heart)[0]
993 eid = self.incoming_registrations.pop(heart)[0]
994 self.log.info("registration::purging stalled registration: %i"%eid)
994 self.log.info("registration::purging stalled registration: %i"%eid)
995 else:
995 else:
996 pass
996 pass
997
997
998 #-------------------------------------------------------------------------
998 #-------------------------------------------------------------------------
999 # Client Requests
999 # Client Requests
1000 #-------------------------------------------------------------------------
1000 #-------------------------------------------------------------------------
1001
1001
1002 def shutdown_request(self, client_id, msg):
1002 def shutdown_request(self, client_id, msg):
1003 """handle shutdown request."""
1003 """handle shutdown request."""
1004 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1004 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1005 # also notify other clients of shutdown
1005 # also notify other clients of shutdown
1006 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1006 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1007 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1007 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1008 dc.start()
1008 dc.start()
1009
1009
1010 def _shutdown(self):
1010 def _shutdown(self):
1011 self.log.info("hub::hub shutting down.")
1011 self.log.info("hub::hub shutting down.")
1012 time.sleep(0.1)
1012 time.sleep(0.1)
1013 sys.exit(0)
1013 sys.exit(0)
1014
1014
1015
1015
1016 def check_load(self, client_id, msg):
1016 def check_load(self, client_id, msg):
1017 content = msg['content']
1017 content = msg['content']
1018 try:
1018 try:
1019 targets = content['targets']
1019 targets = content['targets']
1020 targets = self._validate_targets(targets)
1020 targets = self._validate_targets(targets)
1021 except:
1021 except:
1022 content = error.wrap_exception()
1022 content = error.wrap_exception()
1023 self.session.send(self.query, "hub_error",
1023 self.session.send(self.query, "hub_error",
1024 content=content, ident=client_id)
1024 content=content, ident=client_id)
1025 return
1025 return
1026
1026
1027 content = dict(status='ok')
1027 content = dict(status='ok')
1028 # loads = {}
1028 # loads = {}
1029 for t in targets:
1029 for t in targets:
1030 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1030 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1031 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1031 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1032
1032
1033
1033
1034 def queue_status(self, client_id, msg):
1034 def queue_status(self, client_id, msg):
1035 """Return the Queue status of one or more targets.
1035 """Return the Queue status of one or more targets.
1036 if verbose: return the msg_ids
1036 if verbose: return the msg_ids
1037 else: return len of each type.
1037 else: return len of each type.
1038 keys: queue (pending MUX jobs)
1038 keys: queue (pending MUX jobs)
1039 tasks (pending Task jobs)
1039 tasks (pending Task jobs)
1040 completed (finished jobs from both queues)"""
1040 completed (finished jobs from both queues)"""
1041 content = msg['content']
1041 content = msg['content']
1042 targets = content['targets']
1042 targets = content['targets']
1043 try:
1043 try:
1044 targets = self._validate_targets(targets)
1044 targets = self._validate_targets(targets)
1045 except:
1045 except:
1046 content = error.wrap_exception()
1046 content = error.wrap_exception()
1047 self.session.send(self.query, "hub_error",
1047 self.session.send(self.query, "hub_error",
1048 content=content, ident=client_id)
1048 content=content, ident=client_id)
1049 return
1049 return
1050 verbose = content.get('verbose', False)
1050 verbose = content.get('verbose', False)
1051 content = dict(status='ok')
1051 content = dict(status='ok')
1052 for t in targets:
1052 for t in targets:
1053 queue = self.queues[t]
1053 queue = self.queues[t]
1054 completed = self.completed[t]
1054 completed = self.completed[t]
1055 tasks = self.tasks[t]
1055 tasks = self.tasks[t]
1056 if not verbose:
1056 if not verbose:
1057 queue = len(queue)
1057 queue = len(queue)
1058 completed = len(completed)
1058 completed = len(completed)
1059 tasks = len(tasks)
1059 tasks = len(tasks)
1060 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1060 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1061 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1061 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1062 # print (content)
1062 # print (content)
1063 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1063 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1064
1064
1065 def purge_results(self, client_id, msg):
1065 def purge_results(self, client_id, msg):
1066 """Purge results from memory. This method is more valuable before we move
1066 """Purge results from memory. This method is more valuable before we move
1067 to a DB based message storage mechanism."""
1067 to a DB based message storage mechanism."""
1068 content = msg['content']
1068 content = msg['content']
1069 self.log.info("Dropping records with %s", content)
1069 self.log.info("Dropping records with %s", content)
1070 msg_ids = content.get('msg_ids', [])
1070 msg_ids = content.get('msg_ids', [])
1071 reply = dict(status='ok')
1071 reply = dict(status='ok')
1072 if msg_ids == 'all':
1072 if msg_ids == 'all':
1073 try:
1073 try:
1074 self.db.drop_matching_records(dict(completed={'$ne':None}))
1074 self.db.drop_matching_records(dict(completed={'$ne':None}))
1075 except Exception:
1075 except Exception:
1076 reply = error.wrap_exception()
1076 reply = error.wrap_exception()
1077 else:
1077 else:
1078 pending = filter(lambda m: m in self.pending, msg_ids)
1078 pending = filter(lambda m: m in self.pending, msg_ids)
1079 if pending:
1079 if pending:
1080 try:
1080 try:
1081 raise IndexError("msg pending: %r"%pending[0])
1081 raise IndexError("msg pending: %r"%pending[0])
1082 except:
1082 except:
1083 reply = error.wrap_exception()
1083 reply = error.wrap_exception()
1084 else:
1084 else:
1085 try:
1085 try:
1086 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1086 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1087 except Exception:
1087 except Exception:
1088 reply = error.wrap_exception()
1088 reply = error.wrap_exception()
1089
1089
1090 if reply['status'] == 'ok':
1090 if reply['status'] == 'ok':
1091 eids = content.get('engine_ids', [])
1091 eids = content.get('engine_ids', [])
1092 for eid in eids:
1092 for eid in eids:
1093 if eid not in self.engines:
1093 if eid not in self.engines:
1094 try:
1094 try:
1095 raise IndexError("No such engine: %i"%eid)
1095 raise IndexError("No such engine: %i"%eid)
1096 except:
1096 except:
1097 reply = error.wrap_exception()
1097 reply = error.wrap_exception()
1098 break
1098 break
1099 uid = self.engines[eid].queue
1099 uid = self.engines[eid].queue
1100 try:
1100 try:
1101 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1101 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1102 except Exception:
1102 except Exception:
1103 reply = error.wrap_exception()
1103 reply = error.wrap_exception()
1104 break
1104 break
1105
1105
1106 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1106 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1107
1107
1108 def resubmit_task(self, client_id, msg):
1108 def resubmit_task(self, client_id, msg):
1109 """Resubmit one or more tasks."""
1109 """Resubmit one or more tasks."""
1110 def finish(reply):
1110 def finish(reply):
1111 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1111 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1112
1112
1113 content = msg['content']
1113 content = msg['content']
1114 msg_ids = content['msg_ids']
1114 msg_ids = content['msg_ids']
1115 reply = dict(status='ok')
1115 reply = dict(status='ok')
1116 try:
1116 try:
1117 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1117 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1118 'header', 'content', 'buffers'])
1118 'header', 'content', 'buffers'])
1119 except Exception:
1119 except Exception:
1120 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1120 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1121 return finish(error.wrap_exception())
1121 return finish(error.wrap_exception())
1122
1122
1123 # validate msg_ids
1123 # validate msg_ids
1124 found_ids = [ rec['msg_id'] for rec in records ]
1124 found_ids = [ rec['msg_id'] for rec in records ]
1125 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1125 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1126 if len(records) > len(msg_ids):
1126 if len(records) > len(msg_ids):
1127 try:
1127 try:
1128 raise RuntimeError("DB appears to be in an inconsistent state."
1128 raise RuntimeError("DB appears to be in an inconsistent state."
1129 "More matching records were found than should exist")
1129 "More matching records were found than should exist")
1130 except Exception:
1130 except Exception:
1131 return finish(error.wrap_exception())
1131 return finish(error.wrap_exception())
1132 elif len(records) < len(msg_ids):
1132 elif len(records) < len(msg_ids):
1133 missing = [ m for m in msg_ids if m not in found_ids ]
1133 missing = [ m for m in msg_ids if m not in found_ids ]
1134 try:
1134 try:
1135 raise KeyError("No such msg(s): %r"%missing)
1135 raise KeyError("No such msg(s): %r"%missing)
1136 except KeyError:
1136 except KeyError:
1137 return finish(error.wrap_exception())
1137 return finish(error.wrap_exception())
1138 elif invalid_ids:
1138 elif invalid_ids:
1139 msg_id = invalid_ids[0]
1139 msg_id = invalid_ids[0]
1140 try:
1140 try:
1141 raise ValueError("Task %r appears to be inflight"%(msg_id))
1141 raise ValueError("Task %r appears to be inflight"%(msg_id))
1142 except Exception:
1142 except Exception:
1143 return finish(error.wrap_exception())
1143 return finish(error.wrap_exception())
1144
1144
1145 # clear the existing records
1145 # clear the existing records
1146 now = datetime.now()
1146 now = datetime.now()
1147 rec = empty_record()
1147 rec = empty_record()
1148 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1148 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1149 rec['resubmitted'] = now
1149 rec['resubmitted'] = now
1150 rec['queue'] = 'task'
1150 rec['queue'] = 'task'
1151 rec['client_uuid'] = client_id[0]
1151 rec['client_uuid'] = client_id[0]
1152 try:
1152 try:
1153 for msg_id in msg_ids:
1153 for msg_id in msg_ids:
1154 self.all_completed.discard(msg_id)
1154 self.all_completed.discard(msg_id)
1155 self.db.update_record(msg_id, rec)
1155 self.db.update_record(msg_id, rec)
1156 except Exception:
1156 except Exception:
1157 self.log.error('db::db error upating record', exc_info=True)
1157 self.log.error('db::db error upating record', exc_info=True)
1158 reply = error.wrap_exception()
1158 reply = error.wrap_exception()
1159 else:
1159 else:
1160 # send the messages
1160 # send the messages
1161 for rec in records:
1161 for rec in records:
1162 header = rec['header']
1162 header = rec['header']
1163 # include resubmitted in header to prevent digest collision
1163 # include resubmitted in header to prevent digest collision
1164 header['resubmitted'] = now
1164 header['resubmitted'] = now
1165 msg = self.session.msg(header['msg_type'])
1165 msg = self.session.msg(header['msg_type'])
1166 msg['content'] = rec['content']
1166 msg['content'] = rec['content']
1167 msg['header'] = header
1167 msg['header'] = header
1168 msg['msg_id'] = rec['msg_id']
1168 msg['msg_id'] = rec['msg_id']
1169 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1169 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1170
1170
1171 finish(dict(status='ok'))
1171 finish(dict(status='ok'))
1172
1172
1173
1173
1174 def _extract_record(self, rec):
1174 def _extract_record(self, rec):
1175 """decompose a TaskRecord dict into subsection of reply for get_result"""
1175 """decompose a TaskRecord dict into subsection of reply for get_result"""
1176 io_dict = {}
1176 io_dict = {}
1177 for key in 'pyin pyout pyerr stdout stderr'.split():
1177 for key in 'pyin pyout pyerr stdout stderr'.split():
1178 io_dict[key] = rec[key]
1178 io_dict[key] = rec[key]
1179 content = { 'result_content': rec['result_content'],
1179 content = { 'result_content': rec['result_content'],
1180 'header': rec['header'],
1180 'header': rec['header'],
1181 'result_header' : rec['result_header'],
1181 'result_header' : rec['result_header'],
1182 'io' : io_dict,
1182 'io' : io_dict,
1183 }
1183 }
1184 if rec['result_buffers']:
1184 if rec['result_buffers']:
1185 buffers = map(bytes, rec['result_buffers'])
1185 buffers = map(bytes, rec['result_buffers'])
1186 else:
1186 else:
1187 buffers = []
1187 buffers = []
1188
1188
1189 return content, buffers
1189 return content, buffers
1190
1190
1191 def get_results(self, client_id, msg):
1191 def get_results(self, client_id, msg):
1192 """Get the result of 1 or more messages."""
1192 """Get the result of 1 or more messages."""
1193 content = msg['content']
1193 content = msg['content']
1194 msg_ids = sorted(set(content['msg_ids']))
1194 msg_ids = sorted(set(content['msg_ids']))
1195 statusonly = content.get('status_only', False)
1195 statusonly = content.get('status_only', False)
1196 pending = []
1196 pending = []
1197 completed = []
1197 completed = []
1198 content = dict(status='ok')
1198 content = dict(status='ok')
1199 content['pending'] = pending
1199 content['pending'] = pending
1200 content['completed'] = completed
1200 content['completed'] = completed
1201 buffers = []
1201 buffers = []
1202 if not statusonly:
1202 if not statusonly:
1203 try:
1203 try:
1204 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1204 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1205 # turn match list into dict, for faster lookup
1205 # turn match list into dict, for faster lookup
1206 records = {}
1206 records = {}
1207 for rec in matches:
1207 for rec in matches:
1208 records[rec['msg_id']] = rec
1208 records[rec['msg_id']] = rec
1209 except Exception:
1209 except Exception:
1210 content = error.wrap_exception()
1210 content = error.wrap_exception()
1211 self.session.send(self.query, "result_reply", content=content,
1211 self.session.send(self.query, "result_reply", content=content,
1212 parent=msg, ident=client_id)
1212 parent=msg, ident=client_id)
1213 return
1213 return
1214 else:
1214 else:
1215 records = {}
1215 records = {}
1216 for msg_id in msg_ids:
1216 for msg_id in msg_ids:
1217 if msg_id in self.pending:
1217 if msg_id in self.pending:
1218 pending.append(msg_id)
1218 pending.append(msg_id)
1219 elif msg_id in self.all_completed:
1219 elif msg_id in self.all_completed:
1220 completed.append(msg_id)
1220 completed.append(msg_id)
1221 if not statusonly:
1221 if not statusonly:
1222 c,bufs = self._extract_record(records[msg_id])
1222 c,bufs = self._extract_record(records[msg_id])
1223 content[msg_id] = c
1223 content[msg_id] = c
1224 buffers.extend(bufs)
1224 buffers.extend(bufs)
1225 elif msg_id in records:
1225 elif msg_id in records:
1226 if rec['completed']:
1226 if rec['completed']:
1227 completed.append(msg_id)
1227 completed.append(msg_id)
1228 c,bufs = self._extract_record(records[msg_id])
1228 c,bufs = self._extract_record(records[msg_id])
1229 content[msg_id] = c
1229 content[msg_id] = c
1230 buffers.extend(bufs)
1230 buffers.extend(bufs)
1231 else:
1231 else:
1232 pending.append(msg_id)
1232 pending.append(msg_id)
1233 else:
1233 else:
1234 try:
1234 try:
1235 raise KeyError('No such message: '+msg_id)
1235 raise KeyError('No such message: '+msg_id)
1236 except:
1236 except:
1237 content = error.wrap_exception()
1237 content = error.wrap_exception()
1238 break
1238 break
1239 self.session.send(self.query, "result_reply", content=content,
1239 self.session.send(self.query, "result_reply", content=content,
1240 parent=msg, ident=client_id,
1240 parent=msg, ident=client_id,
1241 buffers=buffers)
1241 buffers=buffers)
1242
1242
1243 def get_history(self, client_id, msg):
1243 def get_history(self, client_id, msg):
1244 """Get a list of all msg_ids in our DB records"""
1244 """Get a list of all msg_ids in our DB records"""
1245 try:
1245 try:
1246 msg_ids = self.db.get_history()
1246 msg_ids = self.db.get_history()
1247 except Exception as e:
1247 except Exception as e:
1248 content = error.wrap_exception()
1248 content = error.wrap_exception()
1249 else:
1249 else:
1250 content = dict(status='ok', history=msg_ids)
1250 content = dict(status='ok', history=msg_ids)
1251
1251
1252 self.session.send(self.query, "history_reply", content=content,
1252 self.session.send(self.query, "history_reply", content=content,
1253 parent=msg, ident=client_id)
1253 parent=msg, ident=client_id)
1254
1254
1255 def db_query(self, client_id, msg):
1255 def db_query(self, client_id, msg):
1256 """Perform a raw query on the task record database."""
1256 """Perform a raw query on the task record database."""
1257 content = msg['content']
1257 content = msg['content']
1258 query = content.get('query', {})
1258 query = content.get('query', {})
1259 keys = content.get('keys', None)
1259 keys = content.get('keys', None)
1260 buffers = []
1260 buffers = []
1261 empty = list()
1261 empty = list()
1262 try:
1262 try:
1263 records = self.db.find_records(query, keys)
1263 records = self.db.find_records(query, keys)
1264 except Exception as e:
1264 except Exception as e:
1265 content = error.wrap_exception()
1265 content = error.wrap_exception()
1266 else:
1266 else:
1267 # extract buffers from reply content:
1267 # extract buffers from reply content:
1268 if keys is not None:
1268 if keys is not None:
1269 buffer_lens = [] if 'buffers' in keys else None
1269 buffer_lens = [] if 'buffers' in keys else None
1270 result_buffer_lens = [] if 'result_buffers' in keys else None
1270 result_buffer_lens = [] if 'result_buffers' in keys else None
1271 else:
1271 else:
1272 buffer_lens = []
1272 buffer_lens = []
1273 result_buffer_lens = []
1273 result_buffer_lens = []
1274
1274
1275 for rec in records:
1275 for rec in records:
1276 # buffers may be None, so double check
1276 # buffers may be None, so double check
1277 if buffer_lens is not None:
1277 if buffer_lens is not None:
1278 b = rec.pop('buffers', empty) or empty
1278 b = rec.pop('buffers', empty) or empty
1279 buffer_lens.append(len(b))
1279 buffer_lens.append(len(b))
1280 buffers.extend(b)
1280 buffers.extend(b)
1281 if result_buffer_lens is not None:
1281 if result_buffer_lens is not None:
1282 rb = rec.pop('result_buffers', empty) or empty
1282 rb = rec.pop('result_buffers', empty) or empty
1283 result_buffer_lens.append(len(rb))
1283 result_buffer_lens.append(len(rb))
1284 buffers.extend(rb)
1284 buffers.extend(rb)
1285 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1285 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1286 result_buffer_lens=result_buffer_lens)
1286 result_buffer_lens=result_buffer_lens)
1287 # self.log.debug (content)
1287 # self.log.debug (content)
1288 self.session.send(self.query, "db_reply", content=content,
1288 self.session.send(self.query, "db_reply", content=content,
1289 parent=msg, ident=client_id,
1289 parent=msg, ident=client_id,
1290 buffers=buffers)
1290 buffers=buffers)
1291
1291
@@ -1,714 +1,714 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6
6
7 Authors:
7 Authors:
8
8
9 * Min RK
9 * Min RK
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #----------------------------------------------------------------------
18 #----------------------------------------------------------------------
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 from __future__ import print_function
22 from __future__ import print_function
23
23
24 import logging
24 import logging
25 import sys
25 import sys
26
26
27 from datetime import datetime, timedelta
27 from datetime import datetime, timedelta
28 from random import randint, random
28 from random import randint, random
29 from types import FunctionType
29 from types import FunctionType
30
30
31 try:
31 try:
32 import numpy
32 import numpy
33 except ImportError:
33 except ImportError:
34 numpy = None
34 numpy = None
35
35
36 import zmq
36 import zmq
37 from zmq.eventloop import ioloop, zmqstream
37 from zmq.eventloop import ioloop, zmqstream
38
38
39 # local imports
39 # local imports
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.application import Application
42 from IPython.config.loader import Config
42 from IPython.config.loader import Config
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum, CBytes
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum, CBytes
44
44
45 from IPython.parallel import error
45 from IPython.parallel import error
46 from IPython.parallel.factory import SessionFactory
46 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.util import connect_logger, local_logger, asbytes
47 from IPython.parallel.util import connect_logger, local_logger, asbytes
48
48
49 from .dependency import Dependency
49 from .dependency import Dependency
50
50
51 @decorator
51 @decorator
52 def logged(f,self,*args,**kwargs):
52 def logged(f,self,*args,**kwargs):
53 # print ("#--------------------")
53 # print ("#--------------------")
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 # print ("#--")
55 # print ("#--")
56 return f(self,*args, **kwargs)
56 return f(self,*args, **kwargs)
57
57
58 #----------------------------------------------------------------------
58 #----------------------------------------------------------------------
59 # Chooser functions
59 # Chooser functions
60 #----------------------------------------------------------------------
60 #----------------------------------------------------------------------
61
61
62 def plainrandom(loads):
62 def plainrandom(loads):
63 """Plain random pick."""
63 """Plain random pick."""
64 n = len(loads)
64 n = len(loads)
65 return randint(0,n-1)
65 return randint(0,n-1)
66
66
67 def lru(loads):
67 def lru(loads):
68 """Always pick the front of the line.
68 """Always pick the front of the line.
69
69
70 The content of `loads` is ignored.
70 The content of `loads` is ignored.
71
71
72 Assumes LRU ordering of loads, with oldest first.
72 Assumes LRU ordering of loads, with oldest first.
73 """
73 """
74 return 0
74 return 0
75
75
76 def twobin(loads):
76 def twobin(loads):
77 """Pick two at random, use the LRU of the two.
77 """Pick two at random, use the LRU of the two.
78
78
79 The content of loads is ignored.
79 The content of loads is ignored.
80
80
81 Assumes LRU ordering of loads, with oldest first.
81 Assumes LRU ordering of loads, with oldest first.
82 """
82 """
83 n = len(loads)
83 n = len(loads)
84 a = randint(0,n-1)
84 a = randint(0,n-1)
85 b = randint(0,n-1)
85 b = randint(0,n-1)
86 return min(a,b)
86 return min(a,b)
87
87
88 def weighted(loads):
88 def weighted(loads):
89 """Pick two at random using inverse load as weight.
89 """Pick two at random using inverse load as weight.
90
90
91 Return the less loaded of the two.
91 Return the less loaded of the two.
92 """
92 """
93 # weight 0 a million times more than 1:
93 # weight 0 a million times more than 1:
94 weights = 1./(1e-6+numpy.array(loads))
94 weights = 1./(1e-6+numpy.array(loads))
95 sums = weights.cumsum()
95 sums = weights.cumsum()
96 t = sums[-1]
96 t = sums[-1]
97 x = random()*t
97 x = random()*t
98 y = random()*t
98 y = random()*t
99 idx = 0
99 idx = 0
100 idy = 0
100 idy = 0
101 while sums[idx] < x:
101 while sums[idx] < x:
102 idx += 1
102 idx += 1
103 while sums[idy] < y:
103 while sums[idy] < y:
104 idy += 1
104 idy += 1
105 if weights[idy] > weights[idx]:
105 if weights[idy] > weights[idx]:
106 return idy
106 return idy
107 else:
107 else:
108 return idx
108 return idx
109
109
110 def leastload(loads):
110 def leastload(loads):
111 """Always choose the lowest load.
111 """Always choose the lowest load.
112
112
113 If the lowest load occurs more than once, the first
113 If the lowest load occurs more than once, the first
114 occurance will be used. If loads has LRU ordering, this means
114 occurance will be used. If loads has LRU ordering, this means
115 the LRU of those with the lowest load is chosen.
115 the LRU of those with the lowest load is chosen.
116 """
116 """
117 return loads.index(min(loads))
117 return loads.index(min(loads))
118
118
119 #---------------------------------------------------------------------
119 #---------------------------------------------------------------------
120 # Classes
120 # Classes
121 #---------------------------------------------------------------------
121 #---------------------------------------------------------------------
122 # store empty default dependency:
122 # store empty default dependency:
123 MET = Dependency([])
123 MET = Dependency([])
124
124
125 class TaskScheduler(SessionFactory):
125 class TaskScheduler(SessionFactory):
126 """Python TaskScheduler object.
126 """Python TaskScheduler object.
127
127
128 This is the simplest object that supports msg_id based
128 This is the simplest object that supports msg_id based
129 DAG dependencies. *Only* task msg_ids are checked, not
129 DAG dependencies. *Only* task msg_ids are checked, not
130 msg_ids of jobs submitted via the MUX queue.
130 msg_ids of jobs submitted via the MUX queue.
131
131
132 """
132 """
133
133
134 hwm = Int(0, config=True, shortname='hwm',
134 hwm = Int(0, config=True, shortname='hwm',
135 help="""specify the High Water Mark (HWM) for the downstream
135 help="""specify the High Water Mark (HWM) for the downstream
136 socket in the Task scheduler. This is the maximum number
136 socket in the Task scheduler. This is the maximum number
137 of allowed outstanding tasks on each engine."""
137 of allowed outstanding tasks on each engine."""
138 )
138 )
139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
140 'leastload', config=True, shortname='scheme', allow_none=False,
140 'leastload', config=True, shortname='scheme', allow_none=False,
141 help="""select the task scheduler scheme [default: Python LRU]
141 help="""select the task scheduler scheme [default: Python LRU]
142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
143 )
143 )
144 def _scheme_name_changed(self, old, new):
144 def _scheme_name_changed(self, old, new):
145 self.log.debug("Using scheme %r"%new)
145 self.log.debug("Using scheme %r"%new)
146 self.scheme = globals()[new]
146 self.scheme = globals()[new]
147
147
148 # input arguments:
148 # input arguments:
149 scheme = Instance(FunctionType) # function for determining the destination
149 scheme = Instance(FunctionType) # function for determining the destination
150 def _scheme_default(self):
150 def _scheme_default(self):
151 return leastload
151 return leastload
152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
156
156
157 # internals:
157 # internals:
158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
162 pending = Dict() # dict by engine_uuid of submitted tasks
162 pending = Dict() # dict by engine_uuid of submitted tasks
163 completed = Dict() # dict by engine_uuid of completed tasks
163 completed = Dict() # dict by engine_uuid of completed tasks
164 failed = Dict() # dict by engine_uuid of failed tasks
164 failed = Dict() # dict by engine_uuid of failed tasks
165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
166 clients = Dict() # dict by msg_id for who submitted the task
166 clients = Dict() # dict by msg_id for who submitted the task
167 targets = List() # list of target IDENTs
167 targets = List() # list of target IDENTs
168 loads = List() # list of engine loads
168 loads = List() # list of engine loads
169 # full = Set() # set of IDENTs that have HWM outstanding tasks
169 # full = Set() # set of IDENTs that have HWM outstanding tasks
170 all_completed = Set() # set of all completed tasks
170 all_completed = Set() # set of all completed tasks
171 all_failed = Set() # set of all failed tasks
171 all_failed = Set() # set of all failed tasks
172 all_done = Set() # set of all finished tasks=union(completed,failed)
172 all_done = Set() # set of all finished tasks=union(completed,failed)
173 all_ids = Set() # set of all submitted task IDs
173 all_ids = Set() # set of all submitted task IDs
174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
176
176
177 ident = CBytes() # ZMQ identity. This should just be self.session.session
177 ident = CBytes() # ZMQ identity. This should just be self.session.session
178 # but ensure Bytes
178 # but ensure Bytes
179 def _ident_default(self):
179 def _ident_default(self):
180 return asbytes(self.session.session)
180 return asbytes(self.session.session)
181
181
182 def start(self):
182 def start(self):
183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
184 self._notification_handlers = dict(
184 self._notification_handlers = dict(
185 registration_notification = self._register_engine,
185 registration_notification = self._register_engine,
186 unregistration_notification = self._unregister_engine
186 unregistration_notification = self._unregister_engine
187 )
187 )
188 self.notifier_stream.on_recv(self.dispatch_notification)
188 self.notifier_stream.on_recv(self.dispatch_notification)
189 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
189 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
190 self.auditor.start()
190 self.auditor.start()
191 self.log.info("Scheduler started [%s]"%self.scheme_name)
191 self.log.info("Scheduler started [%s]"%self.scheme_name)
192
192
193 def resume_receiving(self):
193 def resume_receiving(self):
194 """Resume accepting jobs."""
194 """Resume accepting jobs."""
195 self.client_stream.on_recv(self.dispatch_submission, copy=False)
195 self.client_stream.on_recv(self.dispatch_submission, copy=False)
196
196
197 def stop_receiving(self):
197 def stop_receiving(self):
198 """Stop accepting jobs while there are no engines.
198 """Stop accepting jobs while there are no engines.
199 Leave them in the ZMQ queue."""
199 Leave them in the ZMQ queue."""
200 self.client_stream.on_recv(None)
200 self.client_stream.on_recv(None)
201
201
202 #-----------------------------------------------------------------------
202 #-----------------------------------------------------------------------
203 # [Un]Registration Handling
203 # [Un]Registration Handling
204 #-----------------------------------------------------------------------
204 #-----------------------------------------------------------------------
205
205
206 def dispatch_notification(self, msg):
206 def dispatch_notification(self, msg):
207 """dispatch register/unregister events."""
207 """dispatch register/unregister events."""
208 try:
208 try:
209 idents,msg = self.session.feed_identities(msg)
209 idents,msg = self.session.feed_identities(msg)
210 except ValueError:
210 except ValueError:
211 self.log.warn("task::Invalid Message: %r",msg)
211 self.log.warn("task::Invalid Message: %r",msg)
212 return
212 return
213 try:
213 try:
214 msg = self.session.unpack_message(msg)
214 msg = self.session.unserialize(msg)
215 except ValueError:
215 except ValueError:
216 self.log.warn("task::Unauthorized message from: %r"%idents)
216 self.log.warn("task::Unauthorized message from: %r"%idents)
217 return
217 return
218
218
219 msg_type = msg['header']['msg_type']
219 msg_type = msg['header']['msg_type']
220
220
221 handler = self._notification_handlers.get(msg_type, None)
221 handler = self._notification_handlers.get(msg_type, None)
222 if handler is None:
222 if handler is None:
223 self.log.error("Unhandled message type: %r"%msg_type)
223 self.log.error("Unhandled message type: %r"%msg_type)
224 else:
224 else:
225 try:
225 try:
226 handler(asbytes(msg['content']['queue']))
226 handler(asbytes(msg['content']['queue']))
227 except Exception:
227 except Exception:
228 self.log.error("task::Invalid notification msg: %r",msg)
228 self.log.error("task::Invalid notification msg: %r",msg)
229
229
230 def _register_engine(self, uid):
230 def _register_engine(self, uid):
231 """New engine with ident `uid` became available."""
231 """New engine with ident `uid` became available."""
232 # head of the line:
232 # head of the line:
233 self.targets.insert(0,uid)
233 self.targets.insert(0,uid)
234 self.loads.insert(0,0)
234 self.loads.insert(0,0)
235
235
236 # initialize sets
236 # initialize sets
237 self.completed[uid] = set()
237 self.completed[uid] = set()
238 self.failed[uid] = set()
238 self.failed[uid] = set()
239 self.pending[uid] = {}
239 self.pending[uid] = {}
240 if len(self.targets) == 1:
240 if len(self.targets) == 1:
241 self.resume_receiving()
241 self.resume_receiving()
242 # rescan the graph:
242 # rescan the graph:
243 self.update_graph(None)
243 self.update_graph(None)
244
244
245 def _unregister_engine(self, uid):
245 def _unregister_engine(self, uid):
246 """Existing engine with ident `uid` became unavailable."""
246 """Existing engine with ident `uid` became unavailable."""
247 if len(self.targets) == 1:
247 if len(self.targets) == 1:
248 # this was our only engine
248 # this was our only engine
249 self.stop_receiving()
249 self.stop_receiving()
250
250
251 # handle any potentially finished tasks:
251 # handle any potentially finished tasks:
252 self.engine_stream.flush()
252 self.engine_stream.flush()
253
253
254 # don't pop destinations, because they might be used later
254 # don't pop destinations, because they might be used later
255 # map(self.destinations.pop, self.completed.pop(uid))
255 # map(self.destinations.pop, self.completed.pop(uid))
256 # map(self.destinations.pop, self.failed.pop(uid))
256 # map(self.destinations.pop, self.failed.pop(uid))
257
257
258 # prevent this engine from receiving work
258 # prevent this engine from receiving work
259 idx = self.targets.index(uid)
259 idx = self.targets.index(uid)
260 self.targets.pop(idx)
260 self.targets.pop(idx)
261 self.loads.pop(idx)
261 self.loads.pop(idx)
262
262
263 # wait 5 seconds before cleaning up pending jobs, since the results might
263 # wait 5 seconds before cleaning up pending jobs, since the results might
264 # still be incoming
264 # still be incoming
265 if self.pending[uid]:
265 if self.pending[uid]:
266 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
266 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
267 dc.start()
267 dc.start()
268 else:
268 else:
269 self.completed.pop(uid)
269 self.completed.pop(uid)
270 self.failed.pop(uid)
270 self.failed.pop(uid)
271
271
272
272
273 def handle_stranded_tasks(self, engine):
273 def handle_stranded_tasks(self, engine):
274 """Deal with jobs resident in an engine that died."""
274 """Deal with jobs resident in an engine that died."""
275 lost = self.pending[engine]
275 lost = self.pending[engine]
276 for msg_id in lost.keys():
276 for msg_id in lost.keys():
277 if msg_id not in self.pending[engine]:
277 if msg_id not in self.pending[engine]:
278 # prevent double-handling of messages
278 # prevent double-handling of messages
279 continue
279 continue
280
280
281 raw_msg = lost[msg_id][0]
281 raw_msg = lost[msg_id][0]
282 idents,msg = self.session.feed_identities(raw_msg, copy=False)
282 idents,msg = self.session.feed_identities(raw_msg, copy=False)
283 parent = self.session.unpack(msg[1].bytes)
283 parent = self.session.unpack(msg[1].bytes)
284 idents = [engine, idents[0]]
284 idents = [engine, idents[0]]
285
285
286 # build fake error reply
286 # build fake error reply
287 try:
287 try:
288 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
288 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
289 except:
289 except:
290 content = error.wrap_exception()
290 content = error.wrap_exception()
291 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
291 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
292 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
292 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
293 # and dispatch it
293 # and dispatch it
294 self.dispatch_result(raw_reply)
294 self.dispatch_result(raw_reply)
295
295
296 # finally scrub completed/failed lists
296 # finally scrub completed/failed lists
297 self.completed.pop(engine)
297 self.completed.pop(engine)
298 self.failed.pop(engine)
298 self.failed.pop(engine)
299
299
300
300
301 #-----------------------------------------------------------------------
301 #-----------------------------------------------------------------------
302 # Job Submission
302 # Job Submission
303 #-----------------------------------------------------------------------
303 #-----------------------------------------------------------------------
304 def dispatch_submission(self, raw_msg):
304 def dispatch_submission(self, raw_msg):
305 """Dispatch job submission to appropriate handlers."""
305 """Dispatch job submission to appropriate handlers."""
306 # ensure targets up to date:
306 # ensure targets up to date:
307 self.notifier_stream.flush()
307 self.notifier_stream.flush()
308 try:
308 try:
309 idents, msg = self.session.feed_identities(raw_msg, copy=False)
309 idents, msg = self.session.feed_identities(raw_msg, copy=False)
310 msg = self.session.unpack_message(msg, content=False, copy=False)
310 msg = self.session.unserialize(msg, content=False, copy=False)
311 except Exception:
311 except Exception:
312 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
312 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
313 return
313 return
314
314
315
315
316 # send to monitor
316 # send to monitor
317 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
317 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
318
318
319 header = msg['header']
319 header = msg['header']
320 msg_id = header['msg_id']
320 msg_id = header['msg_id']
321 self.all_ids.add(msg_id)
321 self.all_ids.add(msg_id)
322
322
323 # get targets as a set of bytes objects
323 # get targets as a set of bytes objects
324 # from a list of unicode objects
324 # from a list of unicode objects
325 targets = header.get('targets', [])
325 targets = header.get('targets', [])
326 targets = map(asbytes, targets)
326 targets = map(asbytes, targets)
327 targets = set(targets)
327 targets = set(targets)
328
328
329 retries = header.get('retries', 0)
329 retries = header.get('retries', 0)
330 self.retries[msg_id] = retries
330 self.retries[msg_id] = retries
331
331
332 # time dependencies
332 # time dependencies
333 after = header.get('after', None)
333 after = header.get('after', None)
334 if after:
334 if after:
335 after = Dependency(after)
335 after = Dependency(after)
336 if after.all:
336 if after.all:
337 if after.success:
337 if after.success:
338 after = Dependency(after.difference(self.all_completed),
338 after = Dependency(after.difference(self.all_completed),
339 success=after.success,
339 success=after.success,
340 failure=after.failure,
340 failure=after.failure,
341 all=after.all,
341 all=after.all,
342 )
342 )
343 if after.failure:
343 if after.failure:
344 after = Dependency(after.difference(self.all_failed),
344 after = Dependency(after.difference(self.all_failed),
345 success=after.success,
345 success=after.success,
346 failure=after.failure,
346 failure=after.failure,
347 all=after.all,
347 all=after.all,
348 )
348 )
349 if after.check(self.all_completed, self.all_failed):
349 if after.check(self.all_completed, self.all_failed):
350 # recast as empty set, if `after` already met,
350 # recast as empty set, if `after` already met,
351 # to prevent unnecessary set comparisons
351 # to prevent unnecessary set comparisons
352 after = MET
352 after = MET
353 else:
353 else:
354 after = MET
354 after = MET
355
355
356 # location dependencies
356 # location dependencies
357 follow = Dependency(header.get('follow', []))
357 follow = Dependency(header.get('follow', []))
358
358
359 # turn timeouts into datetime objects:
359 # turn timeouts into datetime objects:
360 timeout = header.get('timeout', None)
360 timeout = header.get('timeout', None)
361 if timeout:
361 if timeout:
362 timeout = datetime.now() + timedelta(0,timeout,0)
362 timeout = datetime.now() + timedelta(0,timeout,0)
363
363
364 args = [raw_msg, targets, after, follow, timeout]
364 args = [raw_msg, targets, after, follow, timeout]
365
365
366 # validate and reduce dependencies:
366 # validate and reduce dependencies:
367 for dep in after,follow:
367 for dep in after,follow:
368 if not dep: # empty dependency
368 if not dep: # empty dependency
369 continue
369 continue
370 # check valid:
370 # check valid:
371 if msg_id in dep or dep.difference(self.all_ids):
371 if msg_id in dep or dep.difference(self.all_ids):
372 self.depending[msg_id] = args
372 self.depending[msg_id] = args
373 return self.fail_unreachable(msg_id, error.InvalidDependency)
373 return self.fail_unreachable(msg_id, error.InvalidDependency)
374 # check if unreachable:
374 # check if unreachable:
375 if dep.unreachable(self.all_completed, self.all_failed):
375 if dep.unreachable(self.all_completed, self.all_failed):
376 self.depending[msg_id] = args
376 self.depending[msg_id] = args
377 return self.fail_unreachable(msg_id)
377 return self.fail_unreachable(msg_id)
378
378
379 if after.check(self.all_completed, self.all_failed):
379 if after.check(self.all_completed, self.all_failed):
380 # time deps already met, try to run
380 # time deps already met, try to run
381 if not self.maybe_run(msg_id, *args):
381 if not self.maybe_run(msg_id, *args):
382 # can't run yet
382 # can't run yet
383 if msg_id not in self.all_failed:
383 if msg_id not in self.all_failed:
384 # could have failed as unreachable
384 # could have failed as unreachable
385 self.save_unmet(msg_id, *args)
385 self.save_unmet(msg_id, *args)
386 else:
386 else:
387 self.save_unmet(msg_id, *args)
387 self.save_unmet(msg_id, *args)
388
388
389 def audit_timeouts(self):
389 def audit_timeouts(self):
390 """Audit all waiting tasks for expired timeouts."""
390 """Audit all waiting tasks for expired timeouts."""
391 now = datetime.now()
391 now = datetime.now()
392 for msg_id in self.depending.keys():
392 for msg_id in self.depending.keys():
393 # must recheck, in case one failure cascaded to another:
393 # must recheck, in case one failure cascaded to another:
394 if msg_id in self.depending:
394 if msg_id in self.depending:
395 raw,after,targets,follow,timeout = self.depending[msg_id]
395 raw,after,targets,follow,timeout = self.depending[msg_id]
396 if timeout and timeout < now:
396 if timeout and timeout < now:
397 self.fail_unreachable(msg_id, error.TaskTimeout)
397 self.fail_unreachable(msg_id, error.TaskTimeout)
398
398
399 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
399 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
400 """a task has become unreachable, send a reply with an ImpossibleDependency
400 """a task has become unreachable, send a reply with an ImpossibleDependency
401 error."""
401 error."""
402 if msg_id not in self.depending:
402 if msg_id not in self.depending:
403 self.log.error("msg %r already failed!", msg_id)
403 self.log.error("msg %r already failed!", msg_id)
404 return
404 return
405 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
405 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
406 for mid in follow.union(after):
406 for mid in follow.union(after):
407 if mid in self.graph:
407 if mid in self.graph:
408 self.graph[mid].remove(msg_id)
408 self.graph[mid].remove(msg_id)
409
409
410 # FIXME: unpacking a message I've already unpacked, but didn't save:
410 # FIXME: unpacking a message I've already unpacked, but didn't save:
411 idents,msg = self.session.feed_identities(raw_msg, copy=False)
411 idents,msg = self.session.feed_identities(raw_msg, copy=False)
412 header = self.session.unpack(msg[1].bytes)
412 header = self.session.unpack(msg[1].bytes)
413
413
414 try:
414 try:
415 raise why()
415 raise why()
416 except:
416 except:
417 content = error.wrap_exception()
417 content = error.wrap_exception()
418
418
419 self.all_done.add(msg_id)
419 self.all_done.add(msg_id)
420 self.all_failed.add(msg_id)
420 self.all_failed.add(msg_id)
421
421
422 msg = self.session.send(self.client_stream, 'apply_reply', content,
422 msg = self.session.send(self.client_stream, 'apply_reply', content,
423 parent=header, ident=idents)
423 parent=header, ident=idents)
424 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
424 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
425
425
426 self.update_graph(msg_id, success=False)
426 self.update_graph(msg_id, success=False)
427
427
428 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
428 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
429 """check location dependencies, and run if they are met."""
429 """check location dependencies, and run if they are met."""
430 blacklist = self.blacklist.setdefault(msg_id, set())
430 blacklist = self.blacklist.setdefault(msg_id, set())
431 if follow or targets or blacklist or self.hwm:
431 if follow or targets or blacklist or self.hwm:
432 # we need a can_run filter
432 # we need a can_run filter
433 def can_run(idx):
433 def can_run(idx):
434 # check hwm
434 # check hwm
435 if self.hwm and self.loads[idx] == self.hwm:
435 if self.hwm and self.loads[idx] == self.hwm:
436 return False
436 return False
437 target = self.targets[idx]
437 target = self.targets[idx]
438 # check blacklist
438 # check blacklist
439 if target in blacklist:
439 if target in blacklist:
440 return False
440 return False
441 # check targets
441 # check targets
442 if targets and target not in targets:
442 if targets and target not in targets:
443 return False
443 return False
444 # check follow
444 # check follow
445 return follow.check(self.completed[target], self.failed[target])
445 return follow.check(self.completed[target], self.failed[target])
446
446
447 indices = filter(can_run, range(len(self.targets)))
447 indices = filter(can_run, range(len(self.targets)))
448
448
449 if not indices:
449 if not indices:
450 # couldn't run
450 # couldn't run
451 if follow.all:
451 if follow.all:
452 # check follow for impossibility
452 # check follow for impossibility
453 dests = set()
453 dests = set()
454 relevant = set()
454 relevant = set()
455 if follow.success:
455 if follow.success:
456 relevant = self.all_completed
456 relevant = self.all_completed
457 if follow.failure:
457 if follow.failure:
458 relevant = relevant.union(self.all_failed)
458 relevant = relevant.union(self.all_failed)
459 for m in follow.intersection(relevant):
459 for m in follow.intersection(relevant):
460 dests.add(self.destinations[m])
460 dests.add(self.destinations[m])
461 if len(dests) > 1:
461 if len(dests) > 1:
462 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
462 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
463 self.fail_unreachable(msg_id)
463 self.fail_unreachable(msg_id)
464 return False
464 return False
465 if targets:
465 if targets:
466 # check blacklist+targets for impossibility
466 # check blacklist+targets for impossibility
467 targets.difference_update(blacklist)
467 targets.difference_update(blacklist)
468 if not targets or not targets.intersection(self.targets):
468 if not targets or not targets.intersection(self.targets):
469 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
469 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
470 self.fail_unreachable(msg_id)
470 self.fail_unreachable(msg_id)
471 return False
471 return False
472 return False
472 return False
473 else:
473 else:
474 indices = None
474 indices = None
475
475
476 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
476 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
477 return True
477 return True
478
478
479 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
479 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
480 """Save a message for later submission when its dependencies are met."""
480 """Save a message for later submission when its dependencies are met."""
481 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
481 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
482 # track the ids in follow or after, but not those already finished
482 # track the ids in follow or after, but not those already finished
483 for dep_id in after.union(follow).difference(self.all_done):
483 for dep_id in after.union(follow).difference(self.all_done):
484 if dep_id not in self.graph:
484 if dep_id not in self.graph:
485 self.graph[dep_id] = set()
485 self.graph[dep_id] = set()
486 self.graph[dep_id].add(msg_id)
486 self.graph[dep_id].add(msg_id)
487
487
488 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
488 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
489 """Submit a task to any of a subset of our targets."""
489 """Submit a task to any of a subset of our targets."""
490 if indices:
490 if indices:
491 loads = [self.loads[i] for i in indices]
491 loads = [self.loads[i] for i in indices]
492 else:
492 else:
493 loads = self.loads
493 loads = self.loads
494 idx = self.scheme(loads)
494 idx = self.scheme(loads)
495 if indices:
495 if indices:
496 idx = indices[idx]
496 idx = indices[idx]
497 target = self.targets[idx]
497 target = self.targets[idx]
498 # print (target, map(str, msg[:3]))
498 # print (target, map(str, msg[:3]))
499 # send job to the engine
499 # send job to the engine
500 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
500 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
501 self.engine_stream.send_multipart(raw_msg, copy=False)
501 self.engine_stream.send_multipart(raw_msg, copy=False)
502 # update load
502 # update load
503 self.add_job(idx)
503 self.add_job(idx)
504 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
504 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
505 # notify Hub
505 # notify Hub
506 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
506 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
507 self.session.send(self.mon_stream, 'task_destination', content=content,
507 self.session.send(self.mon_stream, 'task_destination', content=content,
508 ident=[b'tracktask',self.ident])
508 ident=[b'tracktask',self.ident])
509
509
510
510
511 #-----------------------------------------------------------------------
511 #-----------------------------------------------------------------------
512 # Result Handling
512 # Result Handling
513 #-----------------------------------------------------------------------
513 #-----------------------------------------------------------------------
514 def dispatch_result(self, raw_msg):
514 def dispatch_result(self, raw_msg):
515 """dispatch method for result replies"""
515 """dispatch method for result replies"""
516 try:
516 try:
517 idents,msg = self.session.feed_identities(raw_msg, copy=False)
517 idents,msg = self.session.feed_identities(raw_msg, copy=False)
518 msg = self.session.unpack_message(msg, content=False, copy=False)
518 msg = self.session.unserialize(msg, content=False, copy=False)
519 engine = idents[0]
519 engine = idents[0]
520 try:
520 try:
521 idx = self.targets.index(engine)
521 idx = self.targets.index(engine)
522 except ValueError:
522 except ValueError:
523 pass # skip load-update for dead engines
523 pass # skip load-update for dead engines
524 else:
524 else:
525 self.finish_job(idx)
525 self.finish_job(idx)
526 except Exception:
526 except Exception:
527 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
527 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
528 return
528 return
529
529
530 header = msg['header']
530 header = msg['header']
531 parent = msg['parent_header']
531 parent = msg['parent_header']
532 if header.get('dependencies_met', True):
532 if header.get('dependencies_met', True):
533 success = (header['status'] == 'ok')
533 success = (header['status'] == 'ok')
534 msg_id = parent['msg_id']
534 msg_id = parent['msg_id']
535 retries = self.retries[msg_id]
535 retries = self.retries[msg_id]
536 if not success and retries > 0:
536 if not success and retries > 0:
537 # failed
537 # failed
538 self.retries[msg_id] = retries - 1
538 self.retries[msg_id] = retries - 1
539 self.handle_unmet_dependency(idents, parent)
539 self.handle_unmet_dependency(idents, parent)
540 else:
540 else:
541 del self.retries[msg_id]
541 del self.retries[msg_id]
542 # relay to client and update graph
542 # relay to client and update graph
543 self.handle_result(idents, parent, raw_msg, success)
543 self.handle_result(idents, parent, raw_msg, success)
544 # send to Hub monitor
544 # send to Hub monitor
545 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
545 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
546 else:
546 else:
547 self.handle_unmet_dependency(idents, parent)
547 self.handle_unmet_dependency(idents, parent)
548
548
549 def handle_result(self, idents, parent, raw_msg, success=True):
549 def handle_result(self, idents, parent, raw_msg, success=True):
550 """handle a real task result, either success or failure"""
550 """handle a real task result, either success or failure"""
551 # first, relay result to client
551 # first, relay result to client
552 engine = idents[0]
552 engine = idents[0]
553 client = idents[1]
553 client = idents[1]
554 # swap_ids for XREP-XREP mirror
554 # swap_ids for XREP-XREP mirror
555 raw_msg[:2] = [client,engine]
555 raw_msg[:2] = [client,engine]
556 # print (map(str, raw_msg[:4]))
556 # print (map(str, raw_msg[:4]))
557 self.client_stream.send_multipart(raw_msg, copy=False)
557 self.client_stream.send_multipart(raw_msg, copy=False)
558 # now, update our data structures
558 # now, update our data structures
559 msg_id = parent['msg_id']
559 msg_id = parent['msg_id']
560 self.blacklist.pop(msg_id, None)
560 self.blacklist.pop(msg_id, None)
561 self.pending[engine].pop(msg_id)
561 self.pending[engine].pop(msg_id)
562 if success:
562 if success:
563 self.completed[engine].add(msg_id)
563 self.completed[engine].add(msg_id)
564 self.all_completed.add(msg_id)
564 self.all_completed.add(msg_id)
565 else:
565 else:
566 self.failed[engine].add(msg_id)
566 self.failed[engine].add(msg_id)
567 self.all_failed.add(msg_id)
567 self.all_failed.add(msg_id)
568 self.all_done.add(msg_id)
568 self.all_done.add(msg_id)
569 self.destinations[msg_id] = engine
569 self.destinations[msg_id] = engine
570
570
571 self.update_graph(msg_id, success)
571 self.update_graph(msg_id, success)
572
572
573 def handle_unmet_dependency(self, idents, parent):
573 def handle_unmet_dependency(self, idents, parent):
574 """handle an unmet dependency"""
574 """handle an unmet dependency"""
575 engine = idents[0]
575 engine = idents[0]
576 msg_id = parent['msg_id']
576 msg_id = parent['msg_id']
577
577
578 if msg_id not in self.blacklist:
578 if msg_id not in self.blacklist:
579 self.blacklist[msg_id] = set()
579 self.blacklist[msg_id] = set()
580 self.blacklist[msg_id].add(engine)
580 self.blacklist[msg_id].add(engine)
581
581
582 args = self.pending[engine].pop(msg_id)
582 args = self.pending[engine].pop(msg_id)
583 raw,targets,after,follow,timeout = args
583 raw,targets,after,follow,timeout = args
584
584
585 if self.blacklist[msg_id] == targets:
585 if self.blacklist[msg_id] == targets:
586 self.depending[msg_id] = args
586 self.depending[msg_id] = args
587 self.fail_unreachable(msg_id)
587 self.fail_unreachable(msg_id)
588 elif not self.maybe_run(msg_id, *args):
588 elif not self.maybe_run(msg_id, *args):
589 # resubmit failed
589 # resubmit failed
590 if msg_id not in self.all_failed:
590 if msg_id not in self.all_failed:
591 # put it back in our dependency tree
591 # put it back in our dependency tree
592 self.save_unmet(msg_id, *args)
592 self.save_unmet(msg_id, *args)
593
593
594 if self.hwm:
594 if self.hwm:
595 try:
595 try:
596 idx = self.targets.index(engine)
596 idx = self.targets.index(engine)
597 except ValueError:
597 except ValueError:
598 pass # skip load-update for dead engines
598 pass # skip load-update for dead engines
599 else:
599 else:
600 if self.loads[idx] == self.hwm-1:
600 if self.loads[idx] == self.hwm-1:
601 self.update_graph(None)
601 self.update_graph(None)
602
602
603
603
604
604
605 def update_graph(self, dep_id=None, success=True):
605 def update_graph(self, dep_id=None, success=True):
606 """dep_id just finished. Update our dependency
606 """dep_id just finished. Update our dependency
607 graph and submit any jobs that just became runable.
607 graph and submit any jobs that just became runable.
608
608
609 Called with dep_id=None to update entire graph for hwm, but without finishing
609 Called with dep_id=None to update entire graph for hwm, but without finishing
610 a task.
610 a task.
611 """
611 """
612 # print ("\n\n***********")
612 # print ("\n\n***********")
613 # pprint (dep_id)
613 # pprint (dep_id)
614 # pprint (self.graph)
614 # pprint (self.graph)
615 # pprint (self.depending)
615 # pprint (self.depending)
616 # pprint (self.all_completed)
616 # pprint (self.all_completed)
617 # pprint (self.all_failed)
617 # pprint (self.all_failed)
618 # print ("\n\n***********\n\n")
618 # print ("\n\n***********\n\n")
619 # update any jobs that depended on the dependency
619 # update any jobs that depended on the dependency
620 jobs = self.graph.pop(dep_id, [])
620 jobs = self.graph.pop(dep_id, [])
621
621
622 # recheck *all* jobs if
622 # recheck *all* jobs if
623 # a) we have HWM and an engine just become no longer full
623 # a) we have HWM and an engine just become no longer full
624 # or b) dep_id was given as None
624 # or b) dep_id was given as None
625 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
625 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
626 jobs = self.depending.keys()
626 jobs = self.depending.keys()
627
627
628 for msg_id in jobs:
628 for msg_id in jobs:
629 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
629 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
630
630
631 if after.unreachable(self.all_completed, self.all_failed)\
631 if after.unreachable(self.all_completed, self.all_failed)\
632 or follow.unreachable(self.all_completed, self.all_failed):
632 or follow.unreachable(self.all_completed, self.all_failed):
633 self.fail_unreachable(msg_id)
633 self.fail_unreachable(msg_id)
634
634
635 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
635 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
636 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
636 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
637
637
638 self.depending.pop(msg_id)
638 self.depending.pop(msg_id)
639 for mid in follow.union(after):
639 for mid in follow.union(after):
640 if mid in self.graph:
640 if mid in self.graph:
641 self.graph[mid].remove(msg_id)
641 self.graph[mid].remove(msg_id)
642
642
643 #----------------------------------------------------------------------
643 #----------------------------------------------------------------------
644 # methods to be overridden by subclasses
644 # methods to be overridden by subclasses
645 #----------------------------------------------------------------------
645 #----------------------------------------------------------------------
646
646
647 def add_job(self, idx):
647 def add_job(self, idx):
648 """Called after self.targets[idx] just got the job with header.
648 """Called after self.targets[idx] just got the job with header.
649 Override with subclasses. The default ordering is simple LRU.
649 Override with subclasses. The default ordering is simple LRU.
650 The default loads are the number of outstanding jobs."""
650 The default loads are the number of outstanding jobs."""
651 self.loads[idx] += 1
651 self.loads[idx] += 1
652 for lis in (self.targets, self.loads):
652 for lis in (self.targets, self.loads):
653 lis.append(lis.pop(idx))
653 lis.append(lis.pop(idx))
654
654
655
655
656 def finish_job(self, idx):
656 def finish_job(self, idx):
657 """Called after self.targets[idx] just finished a job.
657 """Called after self.targets[idx] just finished a job.
658 Override with subclasses."""
658 Override with subclasses."""
659 self.loads[idx] -= 1
659 self.loads[idx] -= 1
660
660
661
661
662
662
663 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
663 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
664 logname='root', log_url=None, loglevel=logging.DEBUG,
664 logname='root', log_url=None, loglevel=logging.DEBUG,
665 identity=b'task', in_thread=False):
665 identity=b'task', in_thread=False):
666
666
667 ZMQStream = zmqstream.ZMQStream
667 ZMQStream = zmqstream.ZMQStream
668
668
669 if config:
669 if config:
670 # unwrap dict back into Config
670 # unwrap dict back into Config
671 config = Config(config)
671 config = Config(config)
672
672
673 if in_thread:
673 if in_thread:
674 # use instance() to get the same Context/Loop as our parent
674 # use instance() to get the same Context/Loop as our parent
675 ctx = zmq.Context.instance()
675 ctx = zmq.Context.instance()
676 loop = ioloop.IOLoop.instance()
676 loop = ioloop.IOLoop.instance()
677 else:
677 else:
678 # in a process, don't use instance()
678 # in a process, don't use instance()
679 # for safety with multiprocessing
679 # for safety with multiprocessing
680 ctx = zmq.Context()
680 ctx = zmq.Context()
681 loop = ioloop.IOLoop()
681 loop = ioloop.IOLoop()
682 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
682 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
683 ins.setsockopt(zmq.IDENTITY, identity)
683 ins.setsockopt(zmq.IDENTITY, identity)
684 ins.bind(in_addr)
684 ins.bind(in_addr)
685
685
686 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
686 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
687 outs.setsockopt(zmq.IDENTITY, identity)
687 outs.setsockopt(zmq.IDENTITY, identity)
688 outs.bind(out_addr)
688 outs.bind(out_addr)
689 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
689 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
690 mons.connect(mon_addr)
690 mons.connect(mon_addr)
691 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
691 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
692 nots.setsockopt(zmq.SUBSCRIBE, b'')
692 nots.setsockopt(zmq.SUBSCRIBE, b'')
693 nots.connect(not_addr)
693 nots.connect(not_addr)
694
694
695 # setup logging.
695 # setup logging.
696 if in_thread:
696 if in_thread:
697 log = Application.instance().log
697 log = Application.instance().log
698 else:
698 else:
699 if log_url:
699 if log_url:
700 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
700 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
701 else:
701 else:
702 log = local_logger(logname, loglevel)
702 log = local_logger(logname, loglevel)
703
703
704 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
704 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
705 mon_stream=mons, notifier_stream=nots,
705 mon_stream=mons, notifier_stream=nots,
706 loop=loop, log=log,
706 loop=loop, log=log,
707 config=config)
707 config=config)
708 scheduler.start()
708 scheduler.start()
709 if not in_thread:
709 if not in_thread:
710 try:
710 try:
711 loop.start()
711 loop.start()
712 except KeyboardInterrupt:
712 except KeyboardInterrupt:
713 print ("interrupted, exiting...", file=sys.__stderr__)
713 print ("interrupted, exiting...", file=sys.__stderr__)
714
714
@@ -1,174 +1,174 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple engine that talks to a controller over 0MQ.
2 """A simple engine that talks to a controller over 0MQ.
3 it handles registration, etc. and launches a kernel
3 it handles registration, etc. and launches a kernel
4 connected to the Controller's Schedulers.
4 connected to the Controller's Schedulers.
5
5
6 Authors:
6 Authors:
7
7
8 * Min RK
8 * Min RK
9 """
9 """
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2010-2011 The IPython Development Team
11 # Copyright (C) 2010-2011 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import print_function
17 from __future__ import print_function
18
18
19 import sys
19 import sys
20 import time
20 import time
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 # internal
25 # internal
26 from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, CBytes
26 from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, CBytes
27 # from IPython.utils.localinterfaces import LOCALHOST
27 # from IPython.utils.localinterfaces import LOCALHOST
28
28
29 from IPython.parallel.controller.heartmonitor import Heart
29 from IPython.parallel.controller.heartmonitor import Heart
30 from IPython.parallel.factory import RegistrationFactory
30 from IPython.parallel.factory import RegistrationFactory
31 from IPython.parallel.util import disambiguate_url, asbytes
31 from IPython.parallel.util import disambiguate_url, asbytes
32
32
33 from IPython.zmq.session import Message
33 from IPython.zmq.session import Message
34
34
35 from .streamkernel import Kernel
35 from .streamkernel import Kernel
36
36
37 class EngineFactory(RegistrationFactory):
37 class EngineFactory(RegistrationFactory):
38 """IPython engine"""
38 """IPython engine"""
39
39
40 # configurables:
40 # configurables:
41 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
41 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
42 help="""The OutStream for handling stdout/err.
42 help="""The OutStream for handling stdout/err.
43 Typically 'IPython.zmq.iostream.OutStream'""")
43 Typically 'IPython.zmq.iostream.OutStream'""")
44 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
44 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
45 help="""The class for handling displayhook.
45 help="""The class for handling displayhook.
46 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
46 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
47 location=Unicode(config=True,
47 location=Unicode(config=True,
48 help="""The location (an IP address) of the controller. This is
48 help="""The location (an IP address) of the controller. This is
49 used for disambiguating URLs, to determine whether
49 used for disambiguating URLs, to determine whether
50 loopback should be used to connect or the public address.""")
50 loopback should be used to connect or the public address.""")
51 timeout=CFloat(2,config=True,
51 timeout=CFloat(2,config=True,
52 help="""The time (in seconds) to wait for the Controller to respond
52 help="""The time (in seconds) to wait for the Controller to respond
53 to registration requests before giving up.""")
53 to registration requests before giving up.""")
54
54
55 # not configurable:
55 # not configurable:
56 user_ns=Dict()
56 user_ns=Dict()
57 id=Int(allow_none=True)
57 id=Int(allow_none=True)
58 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
58 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
59 kernel=Instance(Kernel)
59 kernel=Instance(Kernel)
60
60
61 bident = CBytes()
61 bident = CBytes()
62 ident = Unicode()
62 ident = Unicode()
63 def _ident_changed(self, name, old, new):
63 def _ident_changed(self, name, old, new):
64 self.bident = asbytes(new)
64 self.bident = asbytes(new)
65
65
66
66
67 def __init__(self, **kwargs):
67 def __init__(self, **kwargs):
68 super(EngineFactory, self).__init__(**kwargs)
68 super(EngineFactory, self).__init__(**kwargs)
69 self.ident = self.session.session
69 self.ident = self.session.session
70 ctx = self.context
70 ctx = self.context
71
71
72 reg = ctx.socket(zmq.XREQ)
72 reg = ctx.socket(zmq.XREQ)
73 reg.setsockopt(zmq.IDENTITY, self.bident)
73 reg.setsockopt(zmq.IDENTITY, self.bident)
74 reg.connect(self.url)
74 reg.connect(self.url)
75 self.registrar = zmqstream.ZMQStream(reg, self.loop)
75 self.registrar = zmqstream.ZMQStream(reg, self.loop)
76
76
77 def register(self):
77 def register(self):
78 """send the registration_request"""
78 """send the registration_request"""
79
79
80 self.log.info("Registering with controller at %s"%self.url)
80 self.log.info("Registering with controller at %s"%self.url)
81 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
81 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
82 self.registrar.on_recv(self.complete_registration)
82 self.registrar.on_recv(self.complete_registration)
83 # print (self.session.key)
83 # print (self.session.key)
84 self.session.send(self.registrar, "registration_request",content=content)
84 self.session.send(self.registrar, "registration_request",content=content)
85
85
86 def complete_registration(self, msg):
86 def complete_registration(self, msg):
87 # print msg
87 # print msg
88 self._abort_dc.stop()
88 self._abort_dc.stop()
89 ctx = self.context
89 ctx = self.context
90 loop = self.loop
90 loop = self.loop
91 identity = self.bident
91 identity = self.bident
92 idents,msg = self.session.feed_identities(msg)
92 idents,msg = self.session.feed_identities(msg)
93 msg = Message(self.session.unpack_message(msg))
93 msg = Message(self.session.unserialize(msg))
94
94
95 if msg.content.status == 'ok':
95 if msg.content.status == 'ok':
96 self.id = int(msg.content.id)
96 self.id = int(msg.content.id)
97
97
98 # create Shell Streams (MUX, Task, etc.):
98 # create Shell Streams (MUX, Task, etc.):
99 queue_addr = msg.content.mux
99 queue_addr = msg.content.mux
100 shell_addrs = [ str(queue_addr) ]
100 shell_addrs = [ str(queue_addr) ]
101 task_addr = msg.content.task
101 task_addr = msg.content.task
102 if task_addr:
102 if task_addr:
103 shell_addrs.append(str(task_addr))
103 shell_addrs.append(str(task_addr))
104
104
105 # Uncomment this to go back to two-socket model
105 # Uncomment this to go back to two-socket model
106 # shell_streams = []
106 # shell_streams = []
107 # for addr in shell_addrs:
107 # for addr in shell_addrs:
108 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
108 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
109 # stream.setsockopt(zmq.IDENTITY, identity)
109 # stream.setsockopt(zmq.IDENTITY, identity)
110 # stream.connect(disambiguate_url(addr, self.location))
110 # stream.connect(disambiguate_url(addr, self.location))
111 # shell_streams.append(stream)
111 # shell_streams.append(stream)
112
112
113 # Now use only one shell stream for mux and tasks
113 # Now use only one shell stream for mux and tasks
114 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
114 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
115 stream.setsockopt(zmq.IDENTITY, identity)
115 stream.setsockopt(zmq.IDENTITY, identity)
116 shell_streams = [stream]
116 shell_streams = [stream]
117 for addr in shell_addrs:
117 for addr in shell_addrs:
118 stream.connect(disambiguate_url(addr, self.location))
118 stream.connect(disambiguate_url(addr, self.location))
119 # end single stream-socket
119 # end single stream-socket
120
120
121 # control stream:
121 # control stream:
122 control_addr = str(msg.content.control)
122 control_addr = str(msg.content.control)
123 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
123 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
124 control_stream.setsockopt(zmq.IDENTITY, identity)
124 control_stream.setsockopt(zmq.IDENTITY, identity)
125 control_stream.connect(disambiguate_url(control_addr, self.location))
125 control_stream.connect(disambiguate_url(control_addr, self.location))
126
126
127 # create iopub stream:
127 # create iopub stream:
128 iopub_addr = msg.content.iopub
128 iopub_addr = msg.content.iopub
129 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
129 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
130 iopub_stream.setsockopt(zmq.IDENTITY, identity)
130 iopub_stream.setsockopt(zmq.IDENTITY, identity)
131 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
131 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
132
132
133 # launch heartbeat
133 # launch heartbeat
134 hb_addrs = msg.content.heartbeat
134 hb_addrs = msg.content.heartbeat
135 # print (hb_addrs)
135 # print (hb_addrs)
136
136
137 # # Redirect input streams and set a display hook.
137 # # Redirect input streams and set a display hook.
138 if self.out_stream_factory:
138 if self.out_stream_factory:
139 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
139 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
140 sys.stdout.topic = 'engine.%i.stdout'%self.id
140 sys.stdout.topic = 'engine.%i.stdout'%self.id
141 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
141 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
142 sys.stderr.topic = 'engine.%i.stderr'%self.id
142 sys.stderr.topic = 'engine.%i.stderr'%self.id
143 if self.display_hook_factory:
143 if self.display_hook_factory:
144 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
144 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
145 sys.displayhook.topic = 'engine.%i.pyout'%self.id
145 sys.displayhook.topic = 'engine.%i.pyout'%self.id
146
146
147 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
147 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
148 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
148 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
149 loop=loop, user_ns = self.user_ns, log=self.log)
149 loop=loop, user_ns = self.user_ns, log=self.log)
150 self.kernel.start()
150 self.kernel.start()
151 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
151 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
152 heart = Heart(*map(str, hb_addrs), heart_id=identity)
152 heart = Heart(*map(str, hb_addrs), heart_id=identity)
153 heart.start()
153 heart.start()
154
154
155
155
156 else:
156 else:
157 self.log.fatal("Registration Failed: %s"%msg)
157 self.log.fatal("Registration Failed: %s"%msg)
158 raise Exception("Registration Failed: %s"%msg)
158 raise Exception("Registration Failed: %s"%msg)
159
159
160 self.log.info("Completed registration with id %i"%self.id)
160 self.log.info("Completed registration with id %i"%self.id)
161
161
162
162
163 def abort(self):
163 def abort(self):
164 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
164 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
165 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
165 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
166 time.sleep(1)
166 time.sleep(1)
167 sys.exit(255)
167 sys.exit(255)
168
168
169 def start(self):
169 def start(self):
170 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
170 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
171 dc.start()
171 dc.start()
172 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
172 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
173 self._abort_dc.start()
173 self._abort_dc.start()
174
174
@@ -1,230 +1,230 b''
1 """KernelStarter class that intercepts Control Queue messages, and handles process management.
1 """KernelStarter class that intercepts Control Queue messages, and handles process management.
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 from zmq.eventloop import ioloop
14 from zmq.eventloop import ioloop
15
15
16 from IPython.zmq.session import Session
16 from IPython.zmq.session import Session
17
17
18 class KernelStarter(object):
18 class KernelStarter(object):
19 """Object for resetting/killing the Kernel."""
19 """Object for resetting/killing the Kernel."""
20
20
21
21
22 def __init__(self, session, upstream, downstream, *kernel_args, **kernel_kwargs):
22 def __init__(self, session, upstream, downstream, *kernel_args, **kernel_kwargs):
23 self.session = session
23 self.session = session
24 self.upstream = upstream
24 self.upstream = upstream
25 self.downstream = downstream
25 self.downstream = downstream
26 self.kernel_args = kernel_args
26 self.kernel_args = kernel_args
27 self.kernel_kwargs = kernel_kwargs
27 self.kernel_kwargs = kernel_kwargs
28 self.handlers = {}
28 self.handlers = {}
29 for method in 'shutdown_request shutdown_reply'.split():
29 for method in 'shutdown_request shutdown_reply'.split():
30 self.handlers[method] = getattr(self, method)
30 self.handlers[method] = getattr(self, method)
31
31
32 def start(self):
32 def start(self):
33 self.upstream.on_recv(self.dispatch_request)
33 self.upstream.on_recv(self.dispatch_request)
34 self.downstream.on_recv(self.dispatch_reply)
34 self.downstream.on_recv(self.dispatch_reply)
35
35
36 #--------------------------------------------------------------------------
36 #--------------------------------------------------------------------------
37 # Dispatch methods
37 # Dispatch methods
38 #--------------------------------------------------------------------------
38 #--------------------------------------------------------------------------
39
39
40 def dispatch_request(self, raw_msg):
40 def dispatch_request(self, raw_msg):
41 idents, msg = self.session.feed_identities()
41 idents, msg = self.session.feed_identities()
42 try:
42 try:
43 msg = self.session.unpack_message(msg, content=False)
43 msg = self.session.unserialize(msg, content=False)
44 except:
44 except:
45 print ("bad msg: %s"%msg)
45 print ("bad msg: %s"%msg)
46
46
47 msgtype = msg['header']['msg_type']
47 msgtype = msg['header']['msg_type']
48 handler = self.handlers.get(msgtype, None)
48 handler = self.handlers.get(msgtype, None)
49 if handler is None:
49 if handler is None:
50 self.downstream.send_multipart(raw_msg, copy=False)
50 self.downstream.send_multipart(raw_msg, copy=False)
51 else:
51 else:
52 handler(msg)
52 handler(msg)
53
53
54 def dispatch_reply(self, raw_msg):
54 def dispatch_reply(self, raw_msg):
55 idents, msg = self.session.feed_identities()
55 idents, msg = self.session.feed_identities()
56 try:
56 try:
57 msg = self.session.unpack_message(msg, content=False)
57 msg = self.session.unserialize(msg, content=False)
58 except:
58 except:
59 print ("bad msg: %s"%msg)
59 print ("bad msg: %s"%msg)
60
60
61 msgtype = msg['header']['msg_type']
61 msgtype = msg['header']['msg_type']
62 handler = self.handlers.get(msgtype, None)
62 handler = self.handlers.get(msgtype, None)
63 if handler is None:
63 if handler is None:
64 self.upstream.send_multipart(raw_msg, copy=False)
64 self.upstream.send_multipart(raw_msg, copy=False)
65 else:
65 else:
66 handler(msg)
66 handler(msg)
67
67
68 #--------------------------------------------------------------------------
68 #--------------------------------------------------------------------------
69 # Handlers
69 # Handlers
70 #--------------------------------------------------------------------------
70 #--------------------------------------------------------------------------
71
71
72 def shutdown_request(self, msg):
72 def shutdown_request(self, msg):
73 """"""
73 """"""
74 self.downstream.send_multipart(msg)
74 self.downstream.send_multipart(msg)
75
75
76 #--------------------------------------------------------------------------
76 #--------------------------------------------------------------------------
77 # Kernel process management methods, from KernelManager:
77 # Kernel process management methods, from KernelManager:
78 #--------------------------------------------------------------------------
78 #--------------------------------------------------------------------------
79
79
80 def _check_local(addr):
80 def _check_local(addr):
81 if isinstance(addr, tuple):
81 if isinstance(addr, tuple):
82 addr = addr[0]
82 addr = addr[0]
83 return addr in LOCAL_IPS
83 return addr in LOCAL_IPS
84
84
85 def start_kernel(self, **kw):
85 def start_kernel(self, **kw):
86 """Starts a kernel process and configures the manager to use it.
86 """Starts a kernel process and configures the manager to use it.
87
87
88 If random ports (port=0) are being used, this method must be called
88 If random ports (port=0) are being used, this method must be called
89 before the channels are created.
89 before the channels are created.
90
90
91 Parameters:
91 Parameters:
92 -----------
92 -----------
93 ipython : bool, optional (default True)
93 ipython : bool, optional (default True)
94 Whether to use an IPython kernel instead of a plain Python kernel.
94 Whether to use an IPython kernel instead of a plain Python kernel.
95 """
95 """
96 self.kernel = Process(target=make_kernel, args=self.kernel_args,
96 self.kernel = Process(target=make_kernel, args=self.kernel_args,
97 kwargs=self.kernel_kwargs)
97 kwargs=self.kernel_kwargs)
98
98
99 def shutdown_kernel(self, restart=False):
99 def shutdown_kernel(self, restart=False):
100 """ Attempts to the stop the kernel process cleanly. If the kernel
100 """ Attempts to the stop the kernel process cleanly. If the kernel
101 cannot be stopped, it is killed, if possible.
101 cannot be stopped, it is killed, if possible.
102 """
102 """
103 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
103 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
104 if sys.platform == 'win32':
104 if sys.platform == 'win32':
105 self.kill_kernel()
105 self.kill_kernel()
106 return
106 return
107
107
108 # Don't send any additional kernel kill messages immediately, to give
108 # Don't send any additional kernel kill messages immediately, to give
109 # the kernel a chance to properly execute shutdown actions. Wait for at
109 # the kernel a chance to properly execute shutdown actions. Wait for at
110 # most 1s, checking every 0.1s.
110 # most 1s, checking every 0.1s.
111 self.xreq_channel.shutdown(restart=restart)
111 self.xreq_channel.shutdown(restart=restart)
112 for i in range(10):
112 for i in range(10):
113 if self.is_alive:
113 if self.is_alive:
114 time.sleep(0.1)
114 time.sleep(0.1)
115 else:
115 else:
116 break
116 break
117 else:
117 else:
118 # OK, we've waited long enough.
118 # OK, we've waited long enough.
119 if self.has_kernel:
119 if self.has_kernel:
120 self.kill_kernel()
120 self.kill_kernel()
121
121
122 def restart_kernel(self, now=False):
122 def restart_kernel(self, now=False):
123 """Restarts a kernel with the same arguments that were used to launch
123 """Restarts a kernel with the same arguments that were used to launch
124 it. If the old kernel was launched with random ports, the same ports
124 it. If the old kernel was launched with random ports, the same ports
125 will be used for the new kernel.
125 will be used for the new kernel.
126
126
127 Parameters
127 Parameters
128 ----------
128 ----------
129 now : bool, optional
129 now : bool, optional
130 If True, the kernel is forcefully restarted *immediately*, without
130 If True, the kernel is forcefully restarted *immediately*, without
131 having a chance to do any cleanup action. Otherwise the kernel is
131 having a chance to do any cleanup action. Otherwise the kernel is
132 given 1s to clean up before a forceful restart is issued.
132 given 1s to clean up before a forceful restart is issued.
133
133
134 In all cases the kernel is restarted, the only difference is whether
134 In all cases the kernel is restarted, the only difference is whether
135 it is given a chance to perform a clean shutdown or not.
135 it is given a chance to perform a clean shutdown or not.
136 """
136 """
137 if self._launch_args is None:
137 if self._launch_args is None:
138 raise RuntimeError("Cannot restart the kernel. "
138 raise RuntimeError("Cannot restart the kernel. "
139 "No previous call to 'start_kernel'.")
139 "No previous call to 'start_kernel'.")
140 else:
140 else:
141 if self.has_kernel:
141 if self.has_kernel:
142 if now:
142 if now:
143 self.kill_kernel()
143 self.kill_kernel()
144 else:
144 else:
145 self.shutdown_kernel(restart=True)
145 self.shutdown_kernel(restart=True)
146 self.start_kernel(**self._launch_args)
146 self.start_kernel(**self._launch_args)
147
147
148 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
148 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
149 # unless there is some delay here.
149 # unless there is some delay here.
150 if sys.platform == 'win32':
150 if sys.platform == 'win32':
151 time.sleep(0.2)
151 time.sleep(0.2)
152
152
153 @property
153 @property
154 def has_kernel(self):
154 def has_kernel(self):
155 """Returns whether a kernel process has been specified for the kernel
155 """Returns whether a kernel process has been specified for the kernel
156 manager.
156 manager.
157 """
157 """
158 return self.kernel is not None
158 return self.kernel is not None
159
159
160 def kill_kernel(self):
160 def kill_kernel(self):
161 """ Kill the running kernel. """
161 """ Kill the running kernel. """
162 if self.has_kernel:
162 if self.has_kernel:
163 # Pause the heart beat channel if it exists.
163 # Pause the heart beat channel if it exists.
164 if self._hb_channel is not None:
164 if self._hb_channel is not None:
165 self._hb_channel.pause()
165 self._hb_channel.pause()
166
166
167 # Attempt to kill the kernel.
167 # Attempt to kill the kernel.
168 try:
168 try:
169 self.kernel.kill()
169 self.kernel.kill()
170 except OSError, e:
170 except OSError, e:
171 # In Windows, we will get an Access Denied error if the process
171 # In Windows, we will get an Access Denied error if the process
172 # has already terminated. Ignore it.
172 # has already terminated. Ignore it.
173 if not (sys.platform == 'win32' and e.winerror == 5):
173 if not (sys.platform == 'win32' and e.winerror == 5):
174 raise
174 raise
175 self.kernel = None
175 self.kernel = None
176 else:
176 else:
177 raise RuntimeError("Cannot kill kernel. No kernel is running!")
177 raise RuntimeError("Cannot kill kernel. No kernel is running!")
178
178
179 def interrupt_kernel(self):
179 def interrupt_kernel(self):
180 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
180 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
181 well supported on all platforms.
181 well supported on all platforms.
182 """
182 """
183 if self.has_kernel:
183 if self.has_kernel:
184 if sys.platform == 'win32':
184 if sys.platform == 'win32':
185 from parentpoller import ParentPollerWindows as Poller
185 from parentpoller import ParentPollerWindows as Poller
186 Poller.send_interrupt(self.kernel.win32_interrupt_event)
186 Poller.send_interrupt(self.kernel.win32_interrupt_event)
187 else:
187 else:
188 self.kernel.send_signal(signal.SIGINT)
188 self.kernel.send_signal(signal.SIGINT)
189 else:
189 else:
190 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
190 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
191
191
192 def signal_kernel(self, signum):
192 def signal_kernel(self, signum):
193 """ Sends a signal to the kernel. Note that since only SIGTERM is
193 """ Sends a signal to the kernel. Note that since only SIGTERM is
194 supported on Windows, this function is only useful on Unix systems.
194 supported on Windows, this function is only useful on Unix systems.
195 """
195 """
196 if self.has_kernel:
196 if self.has_kernel:
197 self.kernel.send_signal(signum)
197 self.kernel.send_signal(signum)
198 else:
198 else:
199 raise RuntimeError("Cannot signal kernel. No kernel is running!")
199 raise RuntimeError("Cannot signal kernel. No kernel is running!")
200
200
201 @property
201 @property
202 def is_alive(self):
202 def is_alive(self):
203 """Is the kernel process still running?"""
203 """Is the kernel process still running?"""
204 # FIXME: not using a heartbeat means this method is broken for any
204 # FIXME: not using a heartbeat means this method is broken for any
205 # remote kernel, it's only capable of handling local kernels.
205 # remote kernel, it's only capable of handling local kernels.
206 if self.has_kernel:
206 if self.has_kernel:
207 if self.kernel.poll() is None:
207 if self.kernel.poll() is None:
208 return True
208 return True
209 else:
209 else:
210 return False
210 return False
211 else:
211 else:
212 # We didn't start the kernel with this KernelManager so we don't
212 # We didn't start the kernel with this KernelManager so we don't
213 # know if it is running. We should use a heartbeat for this case.
213 # know if it is running. We should use a heartbeat for this case.
214 return True
214 return True
215
215
216
216
217 def make_starter(up_addr, down_addr, *args, **kwargs):
217 def make_starter(up_addr, down_addr, *args, **kwargs):
218 """entry point function for launching a kernelstarter in a subprocess"""
218 """entry point function for launching a kernelstarter in a subprocess"""
219 loop = ioloop.IOLoop.instance()
219 loop = ioloop.IOLoop.instance()
220 ctx = zmq.Context()
220 ctx = zmq.Context()
221 session = Session()
221 session = Session()
222 upstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
222 upstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
223 upstream.connect(up_addr)
223 upstream.connect(up_addr)
224 downstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
224 downstream = zmqstream.ZMQStream(ctx.socket(zmq.XREQ),loop)
225 downstream.connect(down_addr)
225 downstream.connect(down_addr)
226
226
227 starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
227 starter = KernelStarter(session, upstream, downstream, *args, **kwargs)
228 starter.start()
228 starter.start()
229 loop.start()
229 loop.start()
230
230
@@ -1,438 +1,438 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 * Brian Granger
8 * Brian Granger
9 * Fernando Perez
9 * Fernando Perez
10 * Evan Patterson
10 * Evan Patterson
11 """
11 """
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2010-2011 The IPython Development Team
13 # Copyright (C) 2010-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 # Standard library imports.
23 # Standard library imports.
24 from __future__ import print_function
24 from __future__ import print_function
25
25
26 import sys
26 import sys
27 import time
27 import time
28
28
29 from code import CommandCompiler
29 from code import CommandCompiler
30 from datetime import datetime
30 from datetime import datetime
31 from pprint import pprint
31 from pprint import pprint
32
32
33 # System library imports.
33 # System library imports.
34 import zmq
34 import zmq
35 from zmq.eventloop import ioloop, zmqstream
35 from zmq.eventloop import ioloop, zmqstream
36
36
37 # Local imports.
37 # Local imports.
38 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes
38 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes
39 from IPython.zmq.completer import KernelCompleter
39 from IPython.zmq.completer import KernelCompleter
40
40
41 from IPython.parallel.error import wrap_exception
41 from IPython.parallel.error import wrap_exception
42 from IPython.parallel.factory import SessionFactory
42 from IPython.parallel.factory import SessionFactory
43 from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
43 from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
44
44
45 def printer(*args):
45 def printer(*args):
46 pprint(args, stream=sys.__stdout__)
46 pprint(args, stream=sys.__stdout__)
47
47
48
48
49 class _Passer(zmqstream.ZMQStream):
49 class _Passer(zmqstream.ZMQStream):
50 """Empty class that implements `send()` that does nothing.
50 """Empty class that implements `send()` that does nothing.
51
51
52 Subclass ZMQStream for Session typechecking
52 Subclass ZMQStream for Session typechecking
53
53
54 """
54 """
55 def __init__(self, *args, **kwargs):
55 def __init__(self, *args, **kwargs):
56 pass
56 pass
57
57
58 def send(self, *args, **kwargs):
58 def send(self, *args, **kwargs):
59 pass
59 pass
60 send_multipart = send
60 send_multipart = send
61
61
62
62
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64 # Main kernel class
64 # Main kernel class
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66
66
67 class Kernel(SessionFactory):
67 class Kernel(SessionFactory):
68
68
69 #---------------------------------------------------------------------------
69 #---------------------------------------------------------------------------
70 # Kernel interface
70 # Kernel interface
71 #---------------------------------------------------------------------------
71 #---------------------------------------------------------------------------
72
72
73 # kwargs:
73 # kwargs:
74 exec_lines = List(Unicode, config=True,
74 exec_lines = List(Unicode, config=True,
75 help="List of lines to execute")
75 help="List of lines to execute")
76
76
77 # identities:
77 # identities:
78 int_id = Int(-1)
78 int_id = Int(-1)
79 bident = CBytes()
79 bident = CBytes()
80 ident = Unicode()
80 ident = Unicode()
81 def _ident_changed(self, name, old, new):
81 def _ident_changed(self, name, old, new):
82 self.bident = asbytes(new)
82 self.bident = asbytes(new)
83
83
84 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
84 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
85
85
86 control_stream = Instance(zmqstream.ZMQStream)
86 control_stream = Instance(zmqstream.ZMQStream)
87 task_stream = Instance(zmqstream.ZMQStream)
87 task_stream = Instance(zmqstream.ZMQStream)
88 iopub_stream = Instance(zmqstream.ZMQStream)
88 iopub_stream = Instance(zmqstream.ZMQStream)
89 client = Instance('IPython.parallel.Client')
89 client = Instance('IPython.parallel.Client')
90
90
91 # internals
91 # internals
92 shell_streams = List()
92 shell_streams = List()
93 compiler = Instance(CommandCompiler, (), {})
93 compiler = Instance(CommandCompiler, (), {})
94 completer = Instance(KernelCompleter)
94 completer = Instance(KernelCompleter)
95
95
96 aborted = Set()
96 aborted = Set()
97 shell_handlers = Dict()
97 shell_handlers = Dict()
98 control_handlers = Dict()
98 control_handlers = Dict()
99
99
100 def _set_prefix(self):
100 def _set_prefix(self):
101 self.prefix = "engine.%s"%self.int_id
101 self.prefix = "engine.%s"%self.int_id
102
102
103 def _connect_completer(self):
103 def _connect_completer(self):
104 self.completer = KernelCompleter(self.user_ns)
104 self.completer = KernelCompleter(self.user_ns)
105
105
106 def __init__(self, **kwargs):
106 def __init__(self, **kwargs):
107 super(Kernel, self).__init__(**kwargs)
107 super(Kernel, self).__init__(**kwargs)
108 self._set_prefix()
108 self._set_prefix()
109 self._connect_completer()
109 self._connect_completer()
110
110
111 self.on_trait_change(self._set_prefix, 'id')
111 self.on_trait_change(self._set_prefix, 'id')
112 self.on_trait_change(self._connect_completer, 'user_ns')
112 self.on_trait_change(self._connect_completer, 'user_ns')
113
113
114 # Build dict of handlers for message types
114 # Build dict of handlers for message types
115 for msg_type in ['execute_request', 'complete_request', 'apply_request',
115 for msg_type in ['execute_request', 'complete_request', 'apply_request',
116 'clear_request']:
116 'clear_request']:
117 self.shell_handlers[msg_type] = getattr(self, msg_type)
117 self.shell_handlers[msg_type] = getattr(self, msg_type)
118
118
119 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
119 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
120 self.control_handlers[msg_type] = getattr(self, msg_type)
120 self.control_handlers[msg_type] = getattr(self, msg_type)
121
121
122 self._initial_exec_lines()
122 self._initial_exec_lines()
123
123
124 def _wrap_exception(self, method=None):
124 def _wrap_exception(self, method=None):
125 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
125 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
126 content=wrap_exception(e_info)
126 content=wrap_exception(e_info)
127 return content
127 return content
128
128
129 def _initial_exec_lines(self):
129 def _initial_exec_lines(self):
130 s = _Passer()
130 s = _Passer()
131 content = dict(silent=True, user_variable=[],user_expressions=[])
131 content = dict(silent=True, user_variable=[],user_expressions=[])
132 for line in self.exec_lines:
132 for line in self.exec_lines:
133 self.log.debug("executing initialization: %s"%line)
133 self.log.debug("executing initialization: %s"%line)
134 content.update({'code':line})
134 content.update({'code':line})
135 msg = self.session.msg('execute_request', content)
135 msg = self.session.msg('execute_request', content)
136 self.execute_request(s, [], msg)
136 self.execute_request(s, [], msg)
137
137
138
138
139 #-------------------- control handlers -----------------------------
139 #-------------------- control handlers -----------------------------
140 def abort_queues(self):
140 def abort_queues(self):
141 for stream in self.shell_streams:
141 for stream in self.shell_streams:
142 if stream:
142 if stream:
143 self.abort_queue(stream)
143 self.abort_queue(stream)
144
144
145 def abort_queue(self, stream):
145 def abort_queue(self, stream):
146 while True:
146 while True:
147 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
147 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
148 if msg is None:
148 if msg is None:
149 return
149 return
150
150
151 self.log.info("Aborting:")
151 self.log.info("Aborting:")
152 self.log.info(str(msg))
152 self.log.info(str(msg))
153 msg_type = msg['header']['msg_type']
153 msg_type = msg['header']['msg_type']
154 reply_type = msg_type.split('_')[0] + '_reply'
154 reply_type = msg_type.split('_')[0] + '_reply'
155 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
155 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
156 # self.reply_socket.send(ident,zmq.SNDMORE)
156 # self.reply_socket.send(ident,zmq.SNDMORE)
157 # self.reply_socket.send_json(reply_msg)
157 # self.reply_socket.send_json(reply_msg)
158 reply_msg = self.session.send(stream, reply_type,
158 reply_msg = self.session.send(stream, reply_type,
159 content={'status' : 'aborted'}, parent=msg, ident=idents)
159 content={'status' : 'aborted'}, parent=msg, ident=idents)
160 self.log.debug(str(reply_msg))
160 self.log.debug(str(reply_msg))
161 # We need to wait a bit for requests to come in. This can probably
161 # We need to wait a bit for requests to come in. This can probably
162 # be set shorter for true asynchronous clients.
162 # be set shorter for true asynchronous clients.
163 time.sleep(0.05)
163 time.sleep(0.05)
164
164
165 def abort_request(self, stream, ident, parent):
165 def abort_request(self, stream, ident, parent):
166 """abort a specifig msg by id"""
166 """abort a specifig msg by id"""
167 msg_ids = parent['content'].get('msg_ids', None)
167 msg_ids = parent['content'].get('msg_ids', None)
168 if isinstance(msg_ids, basestring):
168 if isinstance(msg_ids, basestring):
169 msg_ids = [msg_ids]
169 msg_ids = [msg_ids]
170 if not msg_ids:
170 if not msg_ids:
171 self.abort_queues()
171 self.abort_queues()
172 for mid in msg_ids:
172 for mid in msg_ids:
173 self.aborted.add(str(mid))
173 self.aborted.add(str(mid))
174
174
175 content = dict(status='ok')
175 content = dict(status='ok')
176 reply_msg = self.session.send(stream, 'abort_reply', content=content,
176 reply_msg = self.session.send(stream, 'abort_reply', content=content,
177 parent=parent, ident=ident)
177 parent=parent, ident=ident)
178 self.log.debug(str(reply_msg))
178 self.log.debug(str(reply_msg))
179
179
180 def shutdown_request(self, stream, ident, parent):
180 def shutdown_request(self, stream, ident, parent):
181 """kill ourself. This should really be handled in an external process"""
181 """kill ourself. This should really be handled in an external process"""
182 try:
182 try:
183 self.abort_queues()
183 self.abort_queues()
184 except:
184 except:
185 content = self._wrap_exception('shutdown')
185 content = self._wrap_exception('shutdown')
186 else:
186 else:
187 content = dict(parent['content'])
187 content = dict(parent['content'])
188 content['status'] = 'ok'
188 content['status'] = 'ok'
189 msg = self.session.send(stream, 'shutdown_reply',
189 msg = self.session.send(stream, 'shutdown_reply',
190 content=content, parent=parent, ident=ident)
190 content=content, parent=parent, ident=ident)
191 self.log.debug(str(msg))
191 self.log.debug(str(msg))
192 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
192 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
193 dc.start()
193 dc.start()
194
194
195 def dispatch_control(self, msg):
195 def dispatch_control(self, msg):
196 idents,msg = self.session.feed_identities(msg, copy=False)
196 idents,msg = self.session.feed_identities(msg, copy=False)
197 try:
197 try:
198 msg = self.session.unpack_message(msg, content=True, copy=False)
198 msg = self.session.unserialize(msg, content=True, copy=False)
199 except:
199 except:
200 self.log.error("Invalid Message", exc_info=True)
200 self.log.error("Invalid Message", exc_info=True)
201 return
201 return
202 else:
202 else:
203 self.log.debug("Control received, %s", msg)
203 self.log.debug("Control received, %s", msg)
204
204
205 header = msg['header']
205 header = msg['header']
206 msg_id = header['msg_id']
206 msg_id = header['msg_id']
207
207
208 handler = self.control_handlers.get(msg['header']['msg_type'], None)
208 handler = self.control_handlers.get(msg['header']['msg_type'], None)
209 if handler is None:
209 if handler is None:
210 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['header']['msg_type'])
210 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['header']['msg_type'])
211 else:
211 else:
212 handler(self.control_stream, idents, msg)
212 handler(self.control_stream, idents, msg)
213
213
214
214
215 #-------------------- queue helpers ------------------------------
215 #-------------------- queue helpers ------------------------------
216
216
217 def check_dependencies(self, dependencies):
217 def check_dependencies(self, dependencies):
218 if not dependencies:
218 if not dependencies:
219 return True
219 return True
220 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
220 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
221 anyorall = dependencies[0]
221 anyorall = dependencies[0]
222 dependencies = dependencies[1]
222 dependencies = dependencies[1]
223 else:
223 else:
224 anyorall = 'all'
224 anyorall = 'all'
225 results = self.client.get_results(dependencies,status_only=True)
225 results = self.client.get_results(dependencies,status_only=True)
226 if results['status'] != 'ok':
226 if results['status'] != 'ok':
227 return False
227 return False
228
228
229 if anyorall == 'any':
229 if anyorall == 'any':
230 if not results['completed']:
230 if not results['completed']:
231 return False
231 return False
232 else:
232 else:
233 if results['pending']:
233 if results['pending']:
234 return False
234 return False
235
235
236 return True
236 return True
237
237
238 def check_aborted(self, msg_id):
238 def check_aborted(self, msg_id):
239 return msg_id in self.aborted
239 return msg_id in self.aborted
240
240
241 #-------------------- queue handlers -----------------------------
241 #-------------------- queue handlers -----------------------------
242
242
243 def clear_request(self, stream, idents, parent):
243 def clear_request(self, stream, idents, parent):
244 """Clear our namespace."""
244 """Clear our namespace."""
245 self.user_ns = {}
245 self.user_ns = {}
246 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
246 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
247 content = dict(status='ok'))
247 content = dict(status='ok'))
248 self._initial_exec_lines()
248 self._initial_exec_lines()
249
249
250 def execute_request(self, stream, ident, parent):
250 def execute_request(self, stream, ident, parent):
251 self.log.debug('execute request %s'%parent)
251 self.log.debug('execute request %s'%parent)
252 try:
252 try:
253 code = parent[u'content'][u'code']
253 code = parent[u'content'][u'code']
254 except:
254 except:
255 self.log.error("Got bad msg: %s"%parent, exc_info=True)
255 self.log.error("Got bad msg: %s"%parent, exc_info=True)
256 return
256 return
257 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
257 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
258 ident=asbytes('%s.pyin'%self.prefix))
258 ident=asbytes('%s.pyin'%self.prefix))
259 started = datetime.now()
259 started = datetime.now()
260 try:
260 try:
261 comp_code = self.compiler(code, '<zmq-kernel>')
261 comp_code = self.compiler(code, '<zmq-kernel>')
262 # allow for not overriding displayhook
262 # allow for not overriding displayhook
263 if hasattr(sys.displayhook, 'set_parent'):
263 if hasattr(sys.displayhook, 'set_parent'):
264 sys.displayhook.set_parent(parent)
264 sys.displayhook.set_parent(parent)
265 sys.stdout.set_parent(parent)
265 sys.stdout.set_parent(parent)
266 sys.stderr.set_parent(parent)
266 sys.stderr.set_parent(parent)
267 exec comp_code in self.user_ns, self.user_ns
267 exec comp_code in self.user_ns, self.user_ns
268 except:
268 except:
269 exc_content = self._wrap_exception('execute')
269 exc_content = self._wrap_exception('execute')
270 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
270 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
271 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
271 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
272 ident=asbytes('%s.pyerr'%self.prefix))
272 ident=asbytes('%s.pyerr'%self.prefix))
273 reply_content = exc_content
273 reply_content = exc_content
274 else:
274 else:
275 reply_content = {'status' : 'ok'}
275 reply_content = {'status' : 'ok'}
276
276
277 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
277 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
278 ident=ident, subheader = dict(started=started))
278 ident=ident, subheader = dict(started=started))
279 self.log.debug(str(reply_msg))
279 self.log.debug(str(reply_msg))
280 if reply_msg['content']['status'] == u'error':
280 if reply_msg['content']['status'] == u'error':
281 self.abort_queues()
281 self.abort_queues()
282
282
283 def complete_request(self, stream, ident, parent):
283 def complete_request(self, stream, ident, parent):
284 matches = {'matches' : self.complete(parent),
284 matches = {'matches' : self.complete(parent),
285 'status' : 'ok'}
285 'status' : 'ok'}
286 completion_msg = self.session.send(stream, 'complete_reply',
286 completion_msg = self.session.send(stream, 'complete_reply',
287 matches, parent, ident)
287 matches, parent, ident)
288 # print >> sys.__stdout__, completion_msg
288 # print >> sys.__stdout__, completion_msg
289
289
290 def complete(self, msg):
290 def complete(self, msg):
291 return self.completer.complete(msg.content.line, msg.content.text)
291 return self.completer.complete(msg.content.line, msg.content.text)
292
292
293 def apply_request(self, stream, ident, parent):
293 def apply_request(self, stream, ident, parent):
294 # flush previous reply, so this request won't block it
294 # flush previous reply, so this request won't block it
295 stream.flush(zmq.POLLOUT)
295 stream.flush(zmq.POLLOUT)
296 try:
296 try:
297 content = parent[u'content']
297 content = parent[u'content']
298 bufs = parent[u'buffers']
298 bufs = parent[u'buffers']
299 msg_id = parent['header']['msg_id']
299 msg_id = parent['header']['msg_id']
300 # bound = parent['header'].get('bound', False)
300 # bound = parent['header'].get('bound', False)
301 except:
301 except:
302 self.log.error("Got bad msg: %s"%parent, exc_info=True)
302 self.log.error("Got bad msg: %s"%parent, exc_info=True)
303 return
303 return
304 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
304 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
305 # self.iopub_stream.send(pyin_msg)
305 # self.iopub_stream.send(pyin_msg)
306 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
306 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
307 sub = {'dependencies_met' : True, 'engine' : self.ident,
307 sub = {'dependencies_met' : True, 'engine' : self.ident,
308 'started': datetime.now()}
308 'started': datetime.now()}
309 try:
309 try:
310 # allow for not overriding displayhook
310 # allow for not overriding displayhook
311 if hasattr(sys.displayhook, 'set_parent'):
311 if hasattr(sys.displayhook, 'set_parent'):
312 sys.displayhook.set_parent(parent)
312 sys.displayhook.set_parent(parent)
313 sys.stdout.set_parent(parent)
313 sys.stdout.set_parent(parent)
314 sys.stderr.set_parent(parent)
314 sys.stderr.set_parent(parent)
315 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
315 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
316 working = self.user_ns
316 working = self.user_ns
317 # suffix =
317 # suffix =
318 prefix = "_"+str(msg_id).replace("-","")+"_"
318 prefix = "_"+str(msg_id).replace("-","")+"_"
319
319
320 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
320 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
321 # if bound:
321 # if bound:
322 # bound_ns = Namespace(working)
322 # bound_ns = Namespace(working)
323 # args = [bound_ns]+list(args)
323 # args = [bound_ns]+list(args)
324
324
325 fname = getattr(f, '__name__', 'f')
325 fname = getattr(f, '__name__', 'f')
326
326
327 fname = prefix+"f"
327 fname = prefix+"f"
328 argname = prefix+"args"
328 argname = prefix+"args"
329 kwargname = prefix+"kwargs"
329 kwargname = prefix+"kwargs"
330 resultname = prefix+"result"
330 resultname = prefix+"result"
331
331
332 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
332 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
333 # print ns
333 # print ns
334 working.update(ns)
334 working.update(ns)
335 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
335 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
336 try:
336 try:
337 exec code in working,working
337 exec code in working,working
338 result = working.get(resultname)
338 result = working.get(resultname)
339 finally:
339 finally:
340 for key in ns.iterkeys():
340 for key in ns.iterkeys():
341 working.pop(key)
341 working.pop(key)
342 # if bound:
342 # if bound:
343 # working.update(bound_ns)
343 # working.update(bound_ns)
344
344
345 packed_result,buf = serialize_object(result)
345 packed_result,buf = serialize_object(result)
346 result_buf = [packed_result]+buf
346 result_buf = [packed_result]+buf
347 except:
347 except:
348 exc_content = self._wrap_exception('apply')
348 exc_content = self._wrap_exception('apply')
349 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
349 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
350 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
350 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
351 ident=asbytes('%s.pyerr'%self.prefix))
351 ident=asbytes('%s.pyerr'%self.prefix))
352 reply_content = exc_content
352 reply_content = exc_content
353 result_buf = []
353 result_buf = []
354
354
355 if exc_content['ename'] == 'UnmetDependency':
355 if exc_content['ename'] == 'UnmetDependency':
356 sub['dependencies_met'] = False
356 sub['dependencies_met'] = False
357 else:
357 else:
358 reply_content = {'status' : 'ok'}
358 reply_content = {'status' : 'ok'}
359
359
360 # put 'ok'/'error' status in header, for scheduler introspection:
360 # put 'ok'/'error' status in header, for scheduler introspection:
361 sub['status'] = reply_content['status']
361 sub['status'] = reply_content['status']
362
362
363 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
363 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
364 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
364 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
365
365
366 # flush i/o
366 # flush i/o
367 # should this be before reply_msg is sent, like in the single-kernel code,
367 # should this be before reply_msg is sent, like in the single-kernel code,
368 # or should nothing get in the way of real results?
368 # or should nothing get in the way of real results?
369 sys.stdout.flush()
369 sys.stdout.flush()
370 sys.stderr.flush()
370 sys.stderr.flush()
371
371
372 def dispatch_queue(self, stream, msg):
372 def dispatch_queue(self, stream, msg):
373 self.control_stream.flush()
373 self.control_stream.flush()
374 idents,msg = self.session.feed_identities(msg, copy=False)
374 idents,msg = self.session.feed_identities(msg, copy=False)
375 try:
375 try:
376 msg = self.session.unpack_message(msg, content=True, copy=False)
376 msg = self.session.unserialize(msg, content=True, copy=False)
377 except:
377 except:
378 self.log.error("Invalid Message", exc_info=True)
378 self.log.error("Invalid Message", exc_info=True)
379 return
379 return
380 else:
380 else:
381 self.log.debug("Message received, %s", msg)
381 self.log.debug("Message received, %s", msg)
382
382
383
383
384 header = msg['header']
384 header = msg['header']
385 msg_id = header['msg_id']
385 msg_id = header['msg_id']
386 if self.check_aborted(msg_id):
386 if self.check_aborted(msg_id):
387 self.aborted.remove(msg_id)
387 self.aborted.remove(msg_id)
388 # is it safe to assume a msg_id will not be resubmitted?
388 # is it safe to assume a msg_id will not be resubmitted?
389 reply_type = msg['header']['msg_type'].split('_')[0] + '_reply'
389 reply_type = msg['header']['msg_type'].split('_')[0] + '_reply'
390 status = {'status' : 'aborted'}
390 status = {'status' : 'aborted'}
391 reply_msg = self.session.send(stream, reply_type, subheader=status,
391 reply_msg = self.session.send(stream, reply_type, subheader=status,
392 content=status, parent=msg, ident=idents)
392 content=status, parent=msg, ident=idents)
393 return
393 return
394 handler = self.shell_handlers.get(msg['header']['msg_type'], None)
394 handler = self.shell_handlers.get(msg['header']['msg_type'], None)
395 if handler is None:
395 if handler is None:
396 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['header']['msg_type'])
396 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['header']['msg_type'])
397 else:
397 else:
398 handler(stream, idents, msg)
398 handler(stream, idents, msg)
399
399
400 def start(self):
400 def start(self):
401 #### stream mode:
401 #### stream mode:
402 if self.control_stream:
402 if self.control_stream:
403 self.control_stream.on_recv(self.dispatch_control, copy=False)
403 self.control_stream.on_recv(self.dispatch_control, copy=False)
404 self.control_stream.on_err(printer)
404 self.control_stream.on_err(printer)
405
405
406 def make_dispatcher(stream):
406 def make_dispatcher(stream):
407 def dispatcher(msg):
407 def dispatcher(msg):
408 return self.dispatch_queue(stream, msg)
408 return self.dispatch_queue(stream, msg)
409 return dispatcher
409 return dispatcher
410
410
411 for s in self.shell_streams:
411 for s in self.shell_streams:
412 s.on_recv(make_dispatcher(s), copy=False)
412 s.on_recv(make_dispatcher(s), copy=False)
413 s.on_err(printer)
413 s.on_err(printer)
414
414
415 if self.iopub_stream:
415 if self.iopub_stream:
416 self.iopub_stream.on_err(printer)
416 self.iopub_stream.on_err(printer)
417
417
418 #### while True mode:
418 #### while True mode:
419 # while True:
419 # while True:
420 # idle = True
420 # idle = True
421 # try:
421 # try:
422 # msg = self.shell_stream.socket.recv_multipart(
422 # msg = self.shell_stream.socket.recv_multipart(
423 # zmq.NOBLOCK, copy=False)
423 # zmq.NOBLOCK, copy=False)
424 # except zmq.ZMQError, e:
424 # except zmq.ZMQError, e:
425 # if e.errno != zmq.EAGAIN:
425 # if e.errno != zmq.EAGAIN:
426 # raise e
426 # raise e
427 # else:
427 # else:
428 # idle=False
428 # idle=False
429 # self.dispatch_queue(self.shell_stream, msg)
429 # self.dispatch_queue(self.shell_stream, msg)
430 #
430 #
431 # if not self.task_stream.empty():
431 # if not self.task_stream.empty():
432 # idle=False
432 # idle=False
433 # msg = self.task_stream.recv_multipart()
433 # msg = self.task_stream.recv_multipart()
434 # self.dispatch_queue(self.task_stream, msg)
434 # self.dispatch_queue(self.task_stream, msg)
435 # if idle:
435 # if idle:
436 # # don't busywait
436 # # don't busywait
437 # time.sleep(1e-3)
437 # time.sleep(1e-3)
438
438
@@ -1,676 +1,691 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Session object for building, serializing, sending, and receiving messages in
2 """Session object for building, serializing, sending, and receiving messages in
3 IPython. The Session object supports serialization, HMAC signatures, and
3 IPython. The Session object supports serialization, HMAC signatures, and
4 metadata on messages.
4 metadata on messages.
5
5
6 Also defined here are utilities for working with Sessions:
6 Also defined here are utilities for working with Sessions:
7 * A SessionFactory to be used as a base class for configurables that work with
7 * A SessionFactory to be used as a base class for configurables that work with
8 Sessions.
8 Sessions.
9 * A Message object for convenience that allows attribute-access to the msg dict.
9 * A Message object for convenience that allows attribute-access to the msg dict.
10
10
11 Authors:
11 Authors:
12
12
13 * Min RK
13 * Min RK
14 * Brian Granger
14 * Brian Granger
15 * Fernando Perez
15 * Fernando Perez
16 """
16 """
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18 # Copyright (C) 2010-2011 The IPython Development Team
18 # Copyright (C) 2010-2011 The IPython Development Team
19 #
19 #
20 # Distributed under the terms of the BSD License. The full license is in
20 # Distributed under the terms of the BSD License. The full license is in
21 # the file COPYING, distributed as part of this software.
21 # the file COPYING, distributed as part of this software.
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Imports
25 # Imports
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28 import hmac
28 import hmac
29 import logging
29 import logging
30 import os
30 import os
31 import pprint
31 import pprint
32 import uuid
32 import uuid
33 from datetime import datetime
33 from datetime import datetime
34
34
35 try:
35 try:
36 import cPickle
36 import cPickle
37 pickle = cPickle
37 pickle = cPickle
38 except:
38 except:
39 cPickle = None
39 cPickle = None
40 import pickle
40 import pickle
41
41
42 import zmq
42 import zmq
43 from zmq.utils import jsonapi
43 from zmq.utils import jsonapi
44 from zmq.eventloop.ioloop import IOLoop
44 from zmq.eventloop.ioloop import IOLoop
45 from zmq.eventloop.zmqstream import ZMQStream
45 from zmq.eventloop.zmqstream import ZMQStream
46
46
47 from IPython.config.configurable import Configurable, LoggingConfigurable
47 from IPython.config.configurable import Configurable, LoggingConfigurable
48 from IPython.utils.importstring import import_item
48 from IPython.utils.importstring import import_item
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
50 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
51 DottedObjectName)
51 DottedObjectName)
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # utility functions
54 # utility functions
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 def squash_unicode(obj):
57 def squash_unicode(obj):
58 """coerce unicode back to bytestrings."""
58 """coerce unicode back to bytestrings."""
59 if isinstance(obj,dict):
59 if isinstance(obj,dict):
60 for key in obj.keys():
60 for key in obj.keys():
61 obj[key] = squash_unicode(obj[key])
61 obj[key] = squash_unicode(obj[key])
62 if isinstance(key, unicode):
62 if isinstance(key, unicode):
63 obj[squash_unicode(key)] = obj.pop(key)
63 obj[squash_unicode(key)] = obj.pop(key)
64 elif isinstance(obj, list):
64 elif isinstance(obj, list):
65 for i,v in enumerate(obj):
65 for i,v in enumerate(obj):
66 obj[i] = squash_unicode(v)
66 obj[i] = squash_unicode(v)
67 elif isinstance(obj, unicode):
67 elif isinstance(obj, unicode):
68 obj = obj.encode('utf8')
68 obj = obj.encode('utf8')
69 return obj
69 return obj
70
70
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72 # globals and defaults
72 # globals and defaults
73 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
74 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
74 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
75 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
75 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
76 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
76 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
77
77
78 pickle_packer = lambda o: pickle.dumps(o,-1)
78 pickle_packer = lambda o: pickle.dumps(o,-1)
79 pickle_unpacker = pickle.loads
79 pickle_unpacker = pickle.loads
80
80
81 default_packer = json_packer
81 default_packer = json_packer
82 default_unpacker = json_unpacker
82 default_unpacker = json_unpacker
83
83
84
84
85 DELIM=b"<IDS|MSG>"
85 DELIM=b"<IDS|MSG>"
86
86
87 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
88 # Classes
88 # Classes
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90
90
91 class SessionFactory(LoggingConfigurable):
91 class SessionFactory(LoggingConfigurable):
92 """The Base class for configurables that have a Session, Context, logger,
92 """The Base class for configurables that have a Session, Context, logger,
93 and IOLoop.
93 and IOLoop.
94 """
94 """
95
95
96 logname = Unicode('')
96 logname = Unicode('')
97 def _logname_changed(self, name, old, new):
97 def _logname_changed(self, name, old, new):
98 self.log = logging.getLogger(new)
98 self.log = logging.getLogger(new)
99
99
100 # not configurable:
100 # not configurable:
101 context = Instance('zmq.Context')
101 context = Instance('zmq.Context')
102 def _context_default(self):
102 def _context_default(self):
103 return zmq.Context.instance()
103 return zmq.Context.instance()
104
104
105 session = Instance('IPython.zmq.session.Session')
105 session = Instance('IPython.zmq.session.Session')
106
106
107 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
107 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
108 def _loop_default(self):
108 def _loop_default(self):
109 return IOLoop.instance()
109 return IOLoop.instance()
110
110
111 def __init__(self, **kwargs):
111 def __init__(self, **kwargs):
112 super(SessionFactory, self).__init__(**kwargs)
112 super(SessionFactory, self).__init__(**kwargs)
113
113
114 if self.session is None:
114 if self.session is None:
115 # construct the session
115 # construct the session
116 self.session = Session(**kwargs)
116 self.session = Session(**kwargs)
117
117
118
118
119 class Message(object):
119 class Message(object):
120 """A simple message object that maps dict keys to attributes.
120 """A simple message object that maps dict keys to attributes.
121
121
122 A Message can be created from a dict and a dict from a Message instance
122 A Message can be created from a dict and a dict from a Message instance
123 simply by calling dict(msg_obj)."""
123 simply by calling dict(msg_obj)."""
124
124
125 def __init__(self, msg_dict):
125 def __init__(self, msg_dict):
126 dct = self.__dict__
126 dct = self.__dict__
127 for k, v in dict(msg_dict).iteritems():
127 for k, v in dict(msg_dict).iteritems():
128 if isinstance(v, dict):
128 if isinstance(v, dict):
129 v = Message(v)
129 v = Message(v)
130 dct[k] = v
130 dct[k] = v
131
131
132 # Having this iterator lets dict(msg_obj) work out of the box.
132 # Having this iterator lets dict(msg_obj) work out of the box.
133 def __iter__(self):
133 def __iter__(self):
134 return iter(self.__dict__.iteritems())
134 return iter(self.__dict__.iteritems())
135
135
136 def __repr__(self):
136 def __repr__(self):
137 return repr(self.__dict__)
137 return repr(self.__dict__)
138
138
139 def __str__(self):
139 def __str__(self):
140 return pprint.pformat(self.__dict__)
140 return pprint.pformat(self.__dict__)
141
141
142 def __contains__(self, k):
142 def __contains__(self, k):
143 return k in self.__dict__
143 return k in self.__dict__
144
144
145 def __getitem__(self, k):
145 def __getitem__(self, k):
146 return self.__dict__[k]
146 return self.__dict__[k]
147
147
148
148
149 def msg_header(msg_id, msg_type, username, session):
149 def msg_header(msg_id, msg_type, username, session):
150 date = datetime.now()
150 date = datetime.now()
151 return locals()
151 return locals()
152
152
153 def extract_header(msg_or_header):
153 def extract_header(msg_or_header):
154 """Given a message or header, return the header."""
154 """Given a message or header, return the header."""
155 if not msg_or_header:
155 if not msg_or_header:
156 return {}
156 return {}
157 try:
157 try:
158 # See if msg_or_header is the entire message.
158 # See if msg_or_header is the entire message.
159 h = msg_or_header['header']
159 h = msg_or_header['header']
160 except KeyError:
160 except KeyError:
161 try:
161 try:
162 # See if msg_or_header is just the header
162 # See if msg_or_header is just the header
163 h = msg_or_header['msg_id']
163 h = msg_or_header['msg_id']
164 except KeyError:
164 except KeyError:
165 raise
165 raise
166 else:
166 else:
167 h = msg_or_header
167 h = msg_or_header
168 if not isinstance(h, dict):
168 if not isinstance(h, dict):
169 h = dict(h)
169 h = dict(h)
170 return h
170 return h
171
171
172 class Session(Configurable):
172 class Session(Configurable):
173 """Object for handling serialization and sending of messages.
173 """Object for handling serialization and sending of messages.
174
174
175 The Session object handles building messages and sending them
175 The Session object handles building messages and sending them
176 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
176 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
177 other over the network via Session objects, and only need to work with the
177 other over the network via Session objects, and only need to work with the
178 dict-based IPython message spec. The Session will handle
178 dict-based IPython message spec. The Session will handle
179 serialization/deserialization, security, and metadata.
179 serialization/deserialization, security, and metadata.
180
180
181 Sessions support configurable serialiization via packer/unpacker traits,
181 Sessions support configurable serialiization via packer/unpacker traits,
182 and signing with HMAC digests via the key/keyfile traits.
182 and signing with HMAC digests via the key/keyfile traits.
183
183
184 Parameters
184 Parameters
185 ----------
185 ----------
186
186
187 debug : bool
187 debug : bool
188 whether to trigger extra debugging statements
188 whether to trigger extra debugging statements
189 packer/unpacker : str : 'json', 'pickle' or import_string
189 packer/unpacker : str : 'json', 'pickle' or import_string
190 importstrings for methods to serialize message parts. If just
190 importstrings for methods to serialize message parts. If just
191 'json' or 'pickle', predefined JSON and pickle packers will be used.
191 'json' or 'pickle', predefined JSON and pickle packers will be used.
192 Otherwise, the entire importstring must be used.
192 Otherwise, the entire importstring must be used.
193
193
194 The functions must accept at least valid JSON input, and output *bytes*.
194 The functions must accept at least valid JSON input, and output *bytes*.
195
195
196 For example, to use msgpack:
196 For example, to use msgpack:
197 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
197 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
198 pack/unpack : callables
198 pack/unpack : callables
199 You can also set the pack/unpack callables for serialization directly.
199 You can also set the pack/unpack callables for serialization directly.
200 session : bytes
200 session : bytes
201 the ID of this Session object. The default is to generate a new UUID.
201 the ID of this Session object. The default is to generate a new UUID.
202 username : unicode
202 username : unicode
203 username added to message headers. The default is to ask the OS.
203 username added to message headers. The default is to ask the OS.
204 key : bytes
204 key : bytes
205 The key used to initialize an HMAC signature. If unset, messages
205 The key used to initialize an HMAC signature. If unset, messages
206 will not be signed or checked.
206 will not be signed or checked.
207 keyfile : filepath
207 keyfile : filepath
208 The file containing a key. If this is set, `key` will be initialized
208 The file containing a key. If this is set, `key` will be initialized
209 to the contents of the file.
209 to the contents of the file.
210
210
211 """
211 """
212
212
213 debug=Bool(False, config=True, help="""Debug output in the Session""")
213 debug=Bool(False, config=True, help="""Debug output in the Session""")
214
214
215 packer = DottedObjectName('json',config=True,
215 packer = DottedObjectName('json',config=True,
216 help="""The name of the packer for serializing messages.
216 help="""The name of the packer for serializing messages.
217 Should be one of 'json', 'pickle', or an import name
217 Should be one of 'json', 'pickle', or an import name
218 for a custom callable serializer.""")
218 for a custom callable serializer.""")
219 def _packer_changed(self, name, old, new):
219 def _packer_changed(self, name, old, new):
220 if new.lower() == 'json':
220 if new.lower() == 'json':
221 self.pack = json_packer
221 self.pack = json_packer
222 self.unpack = json_unpacker
222 self.unpack = json_unpacker
223 elif new.lower() == 'pickle':
223 elif new.lower() == 'pickle':
224 self.pack = pickle_packer
224 self.pack = pickle_packer
225 self.unpack = pickle_unpacker
225 self.unpack = pickle_unpacker
226 else:
226 else:
227 self.pack = import_item(str(new))
227 self.pack = import_item(str(new))
228
228
229 unpacker = DottedObjectName('json', config=True,
229 unpacker = DottedObjectName('json', config=True,
230 help="""The name of the unpacker for unserializing messages.
230 help="""The name of the unpacker for unserializing messages.
231 Only used with custom functions for `packer`.""")
231 Only used with custom functions for `packer`.""")
232 def _unpacker_changed(self, name, old, new):
232 def _unpacker_changed(self, name, old, new):
233 if new.lower() == 'json':
233 if new.lower() == 'json':
234 self.pack = json_packer
234 self.pack = json_packer
235 self.unpack = json_unpacker
235 self.unpack = json_unpacker
236 elif new.lower() == 'pickle':
236 elif new.lower() == 'pickle':
237 self.pack = pickle_packer
237 self.pack = pickle_packer
238 self.unpack = pickle_unpacker
238 self.unpack = pickle_unpacker
239 else:
239 else:
240 self.unpack = import_item(str(new))
240 self.unpack = import_item(str(new))
241
241
242 session = CBytes(b'', config=True,
242 session = CBytes(b'', config=True,
243 help="""The UUID identifying this session.""")
243 help="""The UUID identifying this session.""")
244 def _session_default(self):
244 def _session_default(self):
245 return bytes(uuid.uuid4())
245 return bytes(uuid.uuid4())
246
246
247 username = Unicode(os.environ.get('USER','username'), config=True,
247 username = Unicode(os.environ.get('USER','username'), config=True,
248 help="""Username for the Session. Default is your system username.""")
248 help="""Username for the Session. Default is your system username.""")
249
249
250 # message signature related traits:
250 # message signature related traits:
251 key = CBytes(b'', config=True,
251 key = CBytes(b'', config=True,
252 help="""execution key, for extra authentication.""")
252 help="""execution key, for extra authentication.""")
253 def _key_changed(self, name, old, new):
253 def _key_changed(self, name, old, new):
254 if new:
254 if new:
255 self.auth = hmac.HMAC(new)
255 self.auth = hmac.HMAC(new)
256 else:
256 else:
257 self.auth = None
257 self.auth = None
258 auth = Instance(hmac.HMAC)
258 auth = Instance(hmac.HMAC)
259 digest_history = Set()
259 digest_history = Set()
260
260
261 keyfile = Unicode('', config=True,
261 keyfile = Unicode('', config=True,
262 help="""path to file containing execution key.""")
262 help="""path to file containing execution key.""")
263 def _keyfile_changed(self, name, old, new):
263 def _keyfile_changed(self, name, old, new):
264 with open(new, 'rb') as f:
264 with open(new, 'rb') as f:
265 self.key = f.read().strip()
265 self.key = f.read().strip()
266
266
267 pack = Any(default_packer) # the actual packer function
267 pack = Any(default_packer) # the actual packer function
268 def _pack_changed(self, name, old, new):
268 def _pack_changed(self, name, old, new):
269 if not callable(new):
269 if not callable(new):
270 raise TypeError("packer must be callable, not %s"%type(new))
270 raise TypeError("packer must be callable, not %s"%type(new))
271
271
272 unpack = Any(default_unpacker) # the actual packer function
272 unpack = Any(default_unpacker) # the actual packer function
273 def _unpack_changed(self, name, old, new):
273 def _unpack_changed(self, name, old, new):
274 # unpacker is not checked - it is assumed to be
274 # unpacker is not checked - it is assumed to be
275 if not callable(new):
275 if not callable(new):
276 raise TypeError("unpacker must be callable, not %s"%type(new))
276 raise TypeError("unpacker must be callable, not %s"%type(new))
277
277
278 def __init__(self, **kwargs):
278 def __init__(self, **kwargs):
279 """create a Session object
279 """create a Session object
280
280
281 Parameters
281 Parameters
282 ----------
282 ----------
283
283
284 debug : bool
284 debug : bool
285 whether to trigger extra debugging statements
285 whether to trigger extra debugging statements
286 packer/unpacker : str : 'json', 'pickle' or import_string
286 packer/unpacker : str : 'json', 'pickle' or import_string
287 importstrings for methods to serialize message parts. If just
287 importstrings for methods to serialize message parts. If just
288 'json' or 'pickle', predefined JSON and pickle packers will be used.
288 'json' or 'pickle', predefined JSON and pickle packers will be used.
289 Otherwise, the entire importstring must be used.
289 Otherwise, the entire importstring must be used.
290
290
291 The functions must accept at least valid JSON input, and output
291 The functions must accept at least valid JSON input, and output
292 *bytes*.
292 *bytes*.
293
293
294 For example, to use msgpack:
294 For example, to use msgpack:
295 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
295 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
296 pack/unpack : callables
296 pack/unpack : callables
297 You can also set the pack/unpack callables for serialization
297 You can also set the pack/unpack callables for serialization
298 directly.
298 directly.
299 session : bytes
299 session : bytes
300 the ID of this Session object. The default is to generate a new
300 the ID of this Session object. The default is to generate a new
301 UUID.
301 UUID.
302 username : unicode
302 username : unicode
303 username added to message headers. The default is to ask the OS.
303 username added to message headers. The default is to ask the OS.
304 key : bytes
304 key : bytes
305 The key used to initialize an HMAC signature. If unset, messages
305 The key used to initialize an HMAC signature. If unset, messages
306 will not be signed or checked.
306 will not be signed or checked.
307 keyfile : filepath
307 keyfile : filepath
308 The file containing a key. If this is set, `key` will be
308 The file containing a key. If this is set, `key` will be
309 initialized to the contents of the file.
309 initialized to the contents of the file.
310 """
310 """
311 super(Session, self).__init__(**kwargs)
311 super(Session, self).__init__(**kwargs)
312 self._check_packers()
312 self._check_packers()
313 self.none = self.pack({})
313 self.none = self.pack({})
314
314
315 @property
315 @property
316 def msg_id(self):
316 def msg_id(self):
317 """always return new uuid"""
317 """always return new uuid"""
318 return str(uuid.uuid4())
318 return str(uuid.uuid4())
319
319
320 def _check_packers(self):
320 def _check_packers(self):
321 """check packers for binary data and datetime support."""
321 """check packers for binary data and datetime support."""
322 pack = self.pack
322 pack = self.pack
323 unpack = self.unpack
323 unpack = self.unpack
324
324
325 # check simple serialization
325 # check simple serialization
326 msg = dict(a=[1,'hi'])
326 msg = dict(a=[1,'hi'])
327 try:
327 try:
328 packed = pack(msg)
328 packed = pack(msg)
329 except Exception:
329 except Exception:
330 raise ValueError("packer could not serialize a simple message")
330 raise ValueError("packer could not serialize a simple message")
331
331
332 # ensure packed message is bytes
332 # ensure packed message is bytes
333 if not isinstance(packed, bytes):
333 if not isinstance(packed, bytes):
334 raise ValueError("message packed to %r, but bytes are required"%type(packed))
334 raise ValueError("message packed to %r, but bytes are required"%type(packed))
335
335
336 # check that unpack is pack's inverse
336 # check that unpack is pack's inverse
337 try:
337 try:
338 unpacked = unpack(packed)
338 unpacked = unpack(packed)
339 except Exception:
339 except Exception:
340 raise ValueError("unpacker could not handle the packer's output")
340 raise ValueError("unpacker could not handle the packer's output")
341
341
342 # check datetime support
342 # check datetime support
343 msg = dict(t=datetime.now())
343 msg = dict(t=datetime.now())
344 try:
344 try:
345 unpacked = unpack(pack(msg))
345 unpacked = unpack(pack(msg))
346 except Exception:
346 except Exception:
347 self.pack = lambda o: pack(squash_dates(o))
347 self.pack = lambda o: pack(squash_dates(o))
348 self.unpack = lambda s: extract_dates(unpack(s))
348 self.unpack = lambda s: extract_dates(unpack(s))
349
349
350 def msg_header(self, msg_type):
350 def msg_header(self, msg_type):
351 return msg_header(self.msg_id, msg_type, self.username, self.session)
351 return msg_header(self.msg_id, msg_type, self.username, self.session)
352
352
353 def msg(self, msg_type, content=None, parent=None, subheader=None):
353 def msg(self, msg_type, content=None, parent=None, subheader=None):
354 """Return the nested message dict.
354 """Return the nested message dict.
355
355
356 This format is different from what is sent over the wire. The
356 This format is different from what is sent over the wire. The
357 self.serialize method converts this nested message dict to the wire
357 self.serialize method converts this nested message dict to the wire
358 format, which uses a message list.
358 format, which uses a message list.
359 """
359 """
360 msg = {}
360 msg = {}
361 msg['header'] = self.msg_header(msg_type)
361 msg['header'] = self.msg_header(msg_type)
362 msg['parent_header'] = {} if parent is None else extract_header(parent)
362 msg['parent_header'] = {} if parent is None else extract_header(parent)
363 msg['content'] = {} if content is None else content
363 msg['content'] = {} if content is None else content
364 sub = {} if subheader is None else subheader
364 sub = {} if subheader is None else subheader
365 msg['header'].update(sub)
365 msg['header'].update(sub)
366 return msg
366 return msg
367
367
368 def sign(self, msg_list):
368 def sign(self, msg_list):
369 """Sign a message with HMAC digest. If no auth, return b''.
369 """Sign a message with HMAC digest. If no auth, return b''.
370
370
371 Parameters
371 Parameters
372 ----------
372 ----------
373 msg_list : list
373 msg_list : list
374 The [p_header,p_parent,p_content] part of the message list.
374 The [p_header,p_parent,p_content] part of the message list.
375 """
375 """
376 if self.auth is None:
376 if self.auth is None:
377 return b''
377 return b''
378 h = self.auth.copy()
378 h = self.auth.copy()
379 for m in msg_list:
379 for m in msg_list:
380 h.update(m)
380 h.update(m)
381 return h.hexdigest()
381 return h.hexdigest()
382
382
383 def serialize(self, msg, ident=None):
383 def serialize(self, msg, ident=None):
384 """Serialize the message components to bytes.
384 """Serialize the message components to bytes.
385
385
386 This is roughly the inverse of unserialize. The serialize/unserialize
387 methods work with full message lists, whereas pack/unpack work with
388 the individual message parts in the message list.
389
386 Parameters
390 Parameters
387 ----------
391 ----------
388 msg : dict or Message
392 msg : dict or Message
389 The nexted message dict as returned by the self.msg method.
393 The nexted message dict as returned by the self.msg method.
390
394
391 Returns
395 Returns
392 -------
396 -------
393 msg_list : list
397 msg_list : list
394 The list of bytes objects to be sent with the format:
398 The list of bytes objects to be sent with the format:
395 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
399 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
396 buffer1,buffer2,...]. In this list, the p_* entities are
400 buffer1,buffer2,...]. In this list, the p_* entities are
397 the packed or serialized versions, so if JSON is used, these
401 the packed or serialized versions, so if JSON is used, these
398 are uft8 encoded JSON strings.
402 are uft8 encoded JSON strings.
399 """
403 """
400 content = msg.get('content', {})
404 content = msg.get('content', {})
401 if content is None:
405 if content is None:
402 content = self.none
406 content = self.none
403 elif isinstance(content, dict):
407 elif isinstance(content, dict):
404 content = self.pack(content)
408 content = self.pack(content)
405 elif isinstance(content, bytes):
409 elif isinstance(content, bytes):
406 # content is already packed, as in a relayed message
410 # content is already packed, as in a relayed message
407 pass
411 pass
408 elif isinstance(content, unicode):
412 elif isinstance(content, unicode):
409 # should be bytes, but JSON often spits out unicode
413 # should be bytes, but JSON often spits out unicode
410 content = content.encode('utf8')
414 content = content.encode('utf8')
411 else:
415 else:
412 raise TypeError("Content incorrect type: %s"%type(content))
416 raise TypeError("Content incorrect type: %s"%type(content))
413
417
414 real_message = [self.pack(msg['header']),
418 real_message = [self.pack(msg['header']),
415 self.pack(msg['parent_header']),
419 self.pack(msg['parent_header']),
416 content
420 content
417 ]
421 ]
418
422
419 to_send = []
423 to_send = []
420
424
421 if isinstance(ident, list):
425 if isinstance(ident, list):
422 # accept list of idents
426 # accept list of idents
423 to_send.extend(ident)
427 to_send.extend(ident)
424 elif ident is not None:
428 elif ident is not None:
425 to_send.append(ident)
429 to_send.append(ident)
426 to_send.append(DELIM)
430 to_send.append(DELIM)
427
431
428 signature = self.sign(real_message)
432 signature = self.sign(real_message)
429 to_send.append(signature)
433 to_send.append(signature)
430
434
431 to_send.extend(real_message)
435 to_send.extend(real_message)
432
436
433 return to_send
437 return to_send
434
438
435 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
439 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
436 buffers=None, subheader=None, track=False):
440 buffers=None, subheader=None, track=False):
437 """Build and send a message via stream or socket.
441 """Build and send a message via stream or socket.
438
442
439 The message format used by this function internally is as follows:
443 The message format used by this function internally is as follows:
440
444
441 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
445 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
442 buffer1,buffer2,...]
446 buffer1,buffer2,...]
443
447
444 The self.serialize method converts the nested message dict into this
448 The self.serialize method converts the nested message dict into this
445 format.
449 format.
446
450
447 Parameters
451 Parameters
448 ----------
452 ----------
449
453
450 stream : zmq.Socket or ZMQStream
454 stream : zmq.Socket or ZMQStream
451 the socket-like object used to send the data
455 the socket-like object used to send the data
452 msg_or_type : str or Message/dict
456 msg_or_type : str or Message/dict
453 Normally, msg_or_type will be a msg_type unless a message is being
457 Normally, msg_or_type will be a msg_type unless a message is being
454 sent more than once.
458 sent more than once.
455
459
456 content : dict or None
460 content : dict or None
457 the content of the message (ignored if msg_or_type is a message)
461 the content of the message (ignored if msg_or_type is a message)
458 parent : Message or dict or None
462 parent : Message or dict or None
459 the parent or parent header describing the parent of this message
463 the parent or parent header describing the parent of this message
460 ident : bytes or list of bytes
464 ident : bytes or list of bytes
461 the zmq.IDENTITY routing path
465 the zmq.IDENTITY routing path
462 subheader : dict or None
466 subheader : dict or None
463 extra header keys for this message's header
467 extra header keys for this message's header
464 buffers : list or None
468 buffers : list or None
465 the already-serialized buffers to be appended to the message
469 the already-serialized buffers to be appended to the message
466 track : bool
470 track : bool
467 whether to track. Only for use with Sockets,
471 whether to track. Only for use with Sockets,
468 because ZMQStream objects cannot track messages.
472 because ZMQStream objects cannot track messages.
469
473
470 Returns
474 Returns
471 -------
475 -------
472 msg : message dict
476 msg : message dict
473 the constructed message
477 the constructed message
474 (msg,tracker) : (message dict, MessageTracker)
478 (msg,tracker) : (message dict, MessageTracker)
475 if track=True, then a 2-tuple will be returned,
479 if track=True, then a 2-tuple will be returned,
476 the first element being the constructed
480 the first element being the constructed
477 message, and the second being the MessageTracker
481 message, and the second being the MessageTracker
478
482
479 """
483 """
480
484
481 if not isinstance(stream, (zmq.Socket, ZMQStream)):
485 if not isinstance(stream, (zmq.Socket, ZMQStream)):
482 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
486 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
483 elif track and isinstance(stream, ZMQStream):
487 elif track and isinstance(stream, ZMQStream):
484 raise TypeError("ZMQStream cannot track messages")
488 raise TypeError("ZMQStream cannot track messages")
485
489
486 if isinstance(msg_or_type, (Message, dict)):
490 if isinstance(msg_or_type, (Message, dict)):
487 # we got a Message, not a msg_type
491 # we got a Message, not a msg_type
488 # don't build a new Message
492 # don't build a new Message
489 msg = msg_or_type
493 msg = msg_or_type
490 else:
494 else:
491 msg = self.msg(msg_or_type, content, parent, subheader)
495 msg = self.msg(msg_or_type, content, parent, subheader)
492
496
493 buffers = [] if buffers is None else buffers
497 buffers = [] if buffers is None else buffers
494 to_send = self.serialize(msg, ident)
498 to_send = self.serialize(msg, ident)
495 flag = 0
499 flag = 0
496 if buffers:
500 if buffers:
497 flag = zmq.SNDMORE
501 flag = zmq.SNDMORE
498 _track = False
502 _track = False
499 else:
503 else:
500 _track=track
504 _track=track
501 if track:
505 if track:
502 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
506 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
503 else:
507 else:
504 tracker = stream.send_multipart(to_send, flag, copy=False)
508 tracker = stream.send_multipart(to_send, flag, copy=False)
505 for b in buffers[:-1]:
509 for b in buffers[:-1]:
506 stream.send(b, flag, copy=False)
510 stream.send(b, flag, copy=False)
507 if buffers:
511 if buffers:
508 if track:
512 if track:
509 tracker = stream.send(buffers[-1], copy=False, track=track)
513 tracker = stream.send(buffers[-1], copy=False, track=track)
510 else:
514 else:
511 tracker = stream.send(buffers[-1], copy=False)
515 tracker = stream.send(buffers[-1], copy=False)
512
516
513 # omsg = Message(msg)
517 # omsg = Message(msg)
514 if self.debug:
518 if self.debug:
515 pprint.pprint(msg)
519 pprint.pprint(msg)
516 pprint.pprint(to_send)
520 pprint.pprint(to_send)
517 pprint.pprint(buffers)
521 pprint.pprint(buffers)
518
522
519 msg['tracker'] = tracker
523 msg['tracker'] = tracker
520
524
521 return msg
525 return msg
522
526
523 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
527 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
524 """Send a raw message via ident path.
528 """Send a raw message via ident path.
525
529
526 This method is used to send a already serialized message.
530 This method is used to send a already serialized message.
527
531
528 Parameters
532 Parameters
529 ----------
533 ----------
530 stream : ZMQStream or Socket
534 stream : ZMQStream or Socket
531 The ZMQ stream or socket to use for sending the message.
535 The ZMQ stream or socket to use for sending the message.
532 msg_list : list
536 msg_list : list
533 The serialized list of messages to send. This only includes the
537 The serialized list of messages to send. This only includes the
534 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
538 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
535 the message.
539 the message.
536 ident : ident or list
540 ident : ident or list
537 A single ident or a list of idents to use in sending.
541 A single ident or a list of idents to use in sending.
538 """
542 """
539 to_send = []
543 to_send = []
540 if isinstance(ident, bytes):
544 if isinstance(ident, bytes):
541 ident = [ident]
545 ident = [ident]
542 if ident is not None:
546 if ident is not None:
543 to_send.extend(ident)
547 to_send.extend(ident)
544
548
545 to_send.append(DELIM)
549 to_send.append(DELIM)
546 to_send.append(self.sign(msg_list))
550 to_send.append(self.sign(msg_list))
547 to_send.extend(msg_list)
551 to_send.extend(msg_list)
548 stream.send_multipart(msg_list, flags, copy=copy)
552 stream.send_multipart(msg_list, flags, copy=copy)
549
553
550 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
554 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
551 """Receive and unpack a message.
555 """Receive and unpack a message.
552
556
553 Parameters
557 Parameters
554 ----------
558 ----------
555 socket : ZMQStream or Socket
559 socket : ZMQStream or Socket
556 The socket or stream to use in receiving.
560 The socket or stream to use in receiving.
557
561
558 Returns
562 Returns
559 -------
563 -------
560 [idents], msg
564 [idents], msg
561 [idents] is a list of idents and msg is a nested message dict of
565 [idents] is a list of idents and msg is a nested message dict of
562 same format as self.msg returns.
566 same format as self.msg returns.
563 """
567 """
564 if isinstance(socket, ZMQStream):
568 if isinstance(socket, ZMQStream):
565 socket = socket.socket
569 socket = socket.socket
566 try:
570 try:
567 msg_list = socket.recv_multipart(mode)
571 msg_list = socket.recv_multipart(mode)
568 except zmq.ZMQError as e:
572 except zmq.ZMQError as e:
569 if e.errno == zmq.EAGAIN:
573 if e.errno == zmq.EAGAIN:
570 # We can convert EAGAIN to None as we know in this case
574 # We can convert EAGAIN to None as we know in this case
571 # recv_multipart won't return None.
575 # recv_multipart won't return None.
572 return None,None
576 return None,None
573 else:
577 else:
574 raise
578 raise
575 # split multipart message into identity list and message dict
579 # split multipart message into identity list and message dict
576 # invalid large messages can cause very expensive string comparisons
580 # invalid large messages can cause very expensive string comparisons
577 idents, msg_list = self.feed_identities(msg_list, copy)
581 idents, msg_list = self.feed_identities(msg_list, copy)
578 try:
582 try:
579 return idents, self.unpack_message(msg_list, content=content, copy=copy)
583 return idents, self.unserialize(msg_list, content=content, copy=copy)
580 except Exception as e:
584 except Exception as e:
581 print (idents, msg_list)
585 print (idents, msg_list)
582 # TODO: handle it
586 # TODO: handle it
583 raise e
587 raise e
584
588
585 def feed_identities(self, msg_list, copy=True):
589 def feed_identities(self, msg_list, copy=True):
586 """Split the identities from the rest of the message.
590 """Split the identities from the rest of the message.
587
591
588 Feed until DELIM is reached, then return the prefix as idents and
592 Feed until DELIM is reached, then return the prefix as idents and
589 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
593 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
590 but that would be silly.
594 but that would be silly.
591
595
592 Parameters
596 Parameters
593 ----------
597 ----------
594 msg_list : a list of Message or bytes objects
598 msg_list : a list of Message or bytes objects
595 The message to be split.
599 The message to be split.
596 copy : bool
600 copy : bool
597 flag determining whether the arguments are bytes or Messages
601 flag determining whether the arguments are bytes or Messages
598
602
599 Returns
603 Returns
600 -------
604 -------
601 (idents,msg_list) : two lists
605 (idents, msg_list) : two lists
602 idents will always be a list of bytes - the indentity prefix
606 idents will always be a list of bytes, each of which is a ZMQ
603 msg_list will be a list of bytes or Messages, unchanged from input
607 identity. msg_list will be a list of bytes or zmq.Messages of the
604 msg_list should be unpackable via self.unpack_message at this point.
608 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
609 should be unpackable/unserializable via self.unserialize at this
610 point.
605 """
611 """
606 if copy:
612 if copy:
607 idx = msg_list.index(DELIM)
613 idx = msg_list.index(DELIM)
608 return msg_list[:idx], msg_list[idx+1:]
614 return msg_list[:idx], msg_list[idx+1:]
609 else:
615 else:
610 failed = True
616 failed = True
611 for idx,m in enumerate(msg_list):
617 for idx,m in enumerate(msg_list):
612 if m.bytes == DELIM:
618 if m.bytes == DELIM:
613 failed = False
619 failed = False
614 break
620 break
615 if failed:
621 if failed:
616 raise ValueError("DELIM not in msg_list")
622 raise ValueError("DELIM not in msg_list")
617 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
623 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
618 return [m.bytes for m in idents], msg_list
624 return [m.bytes for m in idents], msg_list
619
625
620 def unpack_message(self, msg_list, content=True, copy=True):
626 def unserialize(self, msg_list, content=True, copy=True):
621 """Return a message object from the format
627 """Unserialize a msg_list to a nested message dict.
622 sent by self.send.
628
623
629 This is roughly the inverse of serialize. The serialize/unserialize
630 methods work with full message lists, whereas pack/unpack work with
631 the individual message parts in the message list.
632
624 Parameters:
633 Parameters:
625 -----------
634 -----------
626
635 msg_list : list of bytes or Message objects
636 The list of message parts of the form [HMAC,p_header,p_parent,
637 p_content,buffer1,buffer2,...].
627 content : bool (True)
638 content : bool (True)
628 whether to unpack the content dict (True),
639 Whether to unpack the content dict (True), or leave it packed
629 or leave it serialized (False)
640 (False).
630
631 copy : bool (True)
641 copy : bool (True)
632 whether to return the bytes (True),
642 Whether to return the bytes (True), or the non-copying Message
633 or the non-copying Message object in each place (False)
643 object in each place (False).
634
644
645 Returns
646 -------
647 msg : dict
648 The nested message dict with top-level keys [header, parent_header,
649 content, buffers].
635 """
650 """
636 minlen = 4
651 minlen = 4
637 message = {}
652 message = {}
638 if not copy:
653 if not copy:
639 for i in range(minlen):
654 for i in range(minlen):
640 msg_list[i] = msg_list[i].bytes
655 msg_list[i] = msg_list[i].bytes
641 if self.auth is not None:
656 if self.auth is not None:
642 signature = msg_list[0]
657 signature = msg_list[0]
643 if signature in self.digest_history:
658 if signature in self.digest_history:
644 raise ValueError("Duplicate Signature: %r"%signature)
659 raise ValueError("Duplicate Signature: %r"%signature)
645 self.digest_history.add(signature)
660 self.digest_history.add(signature)
646 check = self.sign(msg_list[1:4])
661 check = self.sign(msg_list[1:4])
647 if not signature == check:
662 if not signature == check:
648 raise ValueError("Invalid Signature: %r"%signature)
663 raise ValueError("Invalid Signature: %r"%signature)
649 if not len(msg_list) >= minlen:
664 if not len(msg_list) >= minlen:
650 raise TypeError("malformed message, must have at least %i elements"%minlen)
665 raise TypeError("malformed message, must have at least %i elements"%minlen)
651 message['header'] = self.unpack(msg_list[1])
666 message['header'] = self.unpack(msg_list[1])
652 message['parent_header'] = self.unpack(msg_list[2])
667 message['parent_header'] = self.unpack(msg_list[2])
653 if content:
668 if content:
654 message['content'] = self.unpack(msg_list[3])
669 message['content'] = self.unpack(msg_list[3])
655 else:
670 else:
656 message['content'] = msg_list[3]
671 message['content'] = msg_list[3]
657
672
658 message['buffers'] = msg_list[4:]
673 message['buffers'] = msg_list[4:]
659 return message
674 return message
660
675
661 def test_msg2obj():
676 def test_msg2obj():
662 am = dict(x=1)
677 am = dict(x=1)
663 ao = Message(am)
678 ao = Message(am)
664 assert ao.x == am['x']
679 assert ao.x == am['x']
665
680
666 am['y'] = dict(z=1)
681 am['y'] = dict(z=1)
667 ao = Message(am)
682 ao = Message(am)
668 assert ao.y.z == am['y']['z']
683 assert ao.y.z == am['y']['z']
669
684
670 k1, k2 = 'y', 'z'
685 k1, k2 = 'y', 'z'
671 assert ao[k1][k2] == am[k1][k2]
686 assert ao[k1][k2] == am[k1][k2]
672
687
673 am2 = dict(ao)
688 am2 = dict(ao)
674 assert am['x'] == am2['x']
689 assert am['x'] == am2['x']
675 assert am['y']['z'] == am2['y']['z']
690 assert am['y']['z'] == am2['y']['z']
676
691
General Comments 0
You need to be logged in to leave comments. Login now