##// END OF EJS Templates
fix subheaders for execute_reply and aborted messages...
MinRK -
Show More
@@ -1,1300 +1,1303 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import sys
21 import sys
22 import time
22 import time
23 from datetime import datetime
23 from datetime import datetime
24
24
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27 from zmq.eventloop.zmqstream import ZMQStream
27 from zmq.eventloop.zmqstream import ZMQStream
28
28
29 # internal:
29 # internal:
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31 from IPython.utils.traitlets import (
31 from IPython.utils.traitlets import (
32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 )
33 )
34
34
35 from IPython.parallel import error, util
35 from IPython.parallel import error, util
36 from IPython.parallel.factory import RegistrationFactory
36 from IPython.parallel.factory import RegistrationFactory
37
37
38 from IPython.zmq.session import SessionFactory
38 from IPython.zmq.session import SessionFactory
39
39
40 from .heartmonitor import HeartMonitor
40 from .heartmonitor import HeartMonitor
41
41
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43 # Code
43 # Code
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45
45
46 def _passer(*args, **kwargs):
46 def _passer(*args, **kwargs):
47 return
47 return
48
48
49 def _printer(*args, **kwargs):
49 def _printer(*args, **kwargs):
50 print (args)
50 print (args)
51 print (kwargs)
51 print (kwargs)
52
52
53 def empty_record():
53 def empty_record():
54 """Return an empty dict with all record keys."""
54 """Return an empty dict with all record keys."""
55 return {
55 return {
56 'msg_id' : None,
56 'msg_id' : None,
57 'header' : None,
57 'header' : None,
58 'content': None,
58 'content': None,
59 'buffers': None,
59 'buffers': None,
60 'submitted': None,
60 'submitted': None,
61 'client_uuid' : None,
61 'client_uuid' : None,
62 'engine_uuid' : None,
62 'engine_uuid' : None,
63 'started': None,
63 'started': None,
64 'completed': None,
64 'completed': None,
65 'resubmitted': None,
65 'resubmitted': None,
66 'received': None,
66 'received': None,
67 'result_header' : None,
67 'result_header' : None,
68 'result_content' : None,
68 'result_content' : None,
69 'result_buffers' : None,
69 'result_buffers' : None,
70 'queue' : None,
70 'queue' : None,
71 'pyin' : None,
71 'pyin' : None,
72 'pyout': None,
72 'pyout': None,
73 'pyerr': None,
73 'pyerr': None,
74 'stdout': '',
74 'stdout': '',
75 'stderr': '',
75 'stderr': '',
76 }
76 }
77
77
78 def init_record(msg):
78 def init_record(msg):
79 """Initialize a TaskRecord based on a request."""
79 """Initialize a TaskRecord based on a request."""
80 header = msg['header']
80 header = msg['header']
81 return {
81 return {
82 'msg_id' : header['msg_id'],
82 'msg_id' : header['msg_id'],
83 'header' : header,
83 'header' : header,
84 'content': msg['content'],
84 'content': msg['content'],
85 'buffers': msg['buffers'],
85 'buffers': msg['buffers'],
86 'submitted': header['date'],
86 'submitted': header['date'],
87 'client_uuid' : None,
87 'client_uuid' : None,
88 'engine_uuid' : None,
88 'engine_uuid' : None,
89 'started': None,
89 'started': None,
90 'completed': None,
90 'completed': None,
91 'resubmitted': None,
91 'resubmitted': None,
92 'received': None,
92 'received': None,
93 'result_header' : None,
93 'result_header' : None,
94 'result_content' : None,
94 'result_content' : None,
95 'result_buffers' : None,
95 'result_buffers' : None,
96 'queue' : None,
96 'queue' : None,
97 'pyin' : None,
97 'pyin' : None,
98 'pyout': None,
98 'pyout': None,
99 'pyerr': None,
99 'pyerr': None,
100 'stdout': '',
100 'stdout': '',
101 'stderr': '',
101 'stderr': '',
102 }
102 }
103
103
104
104
105 class EngineConnector(HasTraits):
105 class EngineConnector(HasTraits):
106 """A simple object for accessing the various zmq connections of an object.
106 """A simple object for accessing the various zmq connections of an object.
107 Attributes are:
107 Attributes are:
108 id (int): engine ID
108 id (int): engine ID
109 uuid (str): uuid (unused?)
109 uuid (str): uuid (unused?)
110 queue (str): identity of queue's XREQ socket
110 queue (str): identity of queue's XREQ socket
111 registration (str): identity of registration XREQ socket
111 registration (str): identity of registration XREQ socket
112 heartbeat (str): identity of heartbeat XREQ socket
112 heartbeat (str): identity of heartbeat XREQ socket
113 """
113 """
114 id=Integer(0)
114 id=Integer(0)
115 queue=CBytes()
115 queue=CBytes()
116 control=CBytes()
116 control=CBytes()
117 registration=CBytes()
117 registration=CBytes()
118 heartbeat=CBytes()
118 heartbeat=CBytes()
119 pending=Set()
119 pending=Set()
120
120
121 class HubFactory(RegistrationFactory):
121 class HubFactory(RegistrationFactory):
122 """The Configurable for setting up a Hub."""
122 """The Configurable for setting up a Hub."""
123
123
124 # port-pairs for monitoredqueues:
124 # port-pairs for monitoredqueues:
125 hb = Tuple(Integer,Integer,config=True,
125 hb = Tuple(Integer,Integer,config=True,
126 help="""XREQ/SUB Port pair for Engine heartbeats""")
126 help="""XREQ/SUB Port pair for Engine heartbeats""")
127 def _hb_default(self):
127 def _hb_default(self):
128 return tuple(util.select_random_ports(2))
128 return tuple(util.select_random_ports(2))
129
129
130 mux = Tuple(Integer,Integer,config=True,
130 mux = Tuple(Integer,Integer,config=True,
131 help="""Engine/Client Port pair for MUX queue""")
131 help="""Engine/Client Port pair for MUX queue""")
132
132
133 def _mux_default(self):
133 def _mux_default(self):
134 return tuple(util.select_random_ports(2))
134 return tuple(util.select_random_ports(2))
135
135
136 task = Tuple(Integer,Integer,config=True,
136 task = Tuple(Integer,Integer,config=True,
137 help="""Engine/Client Port pair for Task queue""")
137 help="""Engine/Client Port pair for Task queue""")
138 def _task_default(self):
138 def _task_default(self):
139 return tuple(util.select_random_ports(2))
139 return tuple(util.select_random_ports(2))
140
140
141 control = Tuple(Integer,Integer,config=True,
141 control = Tuple(Integer,Integer,config=True,
142 help="""Engine/Client Port pair for Control queue""")
142 help="""Engine/Client Port pair for Control queue""")
143
143
144 def _control_default(self):
144 def _control_default(self):
145 return tuple(util.select_random_ports(2))
145 return tuple(util.select_random_ports(2))
146
146
147 iopub = Tuple(Integer,Integer,config=True,
147 iopub = Tuple(Integer,Integer,config=True,
148 help="""Engine/Client Port pair for IOPub relay""")
148 help="""Engine/Client Port pair for IOPub relay""")
149
149
150 def _iopub_default(self):
150 def _iopub_default(self):
151 return tuple(util.select_random_ports(2))
151 return tuple(util.select_random_ports(2))
152
152
153 # single ports:
153 # single ports:
154 mon_port = Integer(config=True,
154 mon_port = Integer(config=True,
155 help="""Monitor (SUB) port for queue traffic""")
155 help="""Monitor (SUB) port for queue traffic""")
156
156
157 def _mon_port_default(self):
157 def _mon_port_default(self):
158 return util.select_random_ports(1)[0]
158 return util.select_random_ports(1)[0]
159
159
160 notifier_port = Integer(config=True,
160 notifier_port = Integer(config=True,
161 help="""PUB port for sending engine status notifications""")
161 help="""PUB port for sending engine status notifications""")
162
162
163 def _notifier_port_default(self):
163 def _notifier_port_default(self):
164 return util.select_random_ports(1)[0]
164 return util.select_random_ports(1)[0]
165
165
166 engine_ip = Unicode('127.0.0.1', config=True,
166 engine_ip = Unicode('127.0.0.1', config=True,
167 help="IP on which to listen for engine connections. [default: loopback]")
167 help="IP on which to listen for engine connections. [default: loopback]")
168 engine_transport = Unicode('tcp', config=True,
168 engine_transport = Unicode('tcp', config=True,
169 help="0MQ transport for engine connections. [default: tcp]")
169 help="0MQ transport for engine connections. [default: tcp]")
170
170
171 client_ip = Unicode('127.0.0.1', config=True,
171 client_ip = Unicode('127.0.0.1', config=True,
172 help="IP on which to listen for client connections. [default: loopback]")
172 help="IP on which to listen for client connections. [default: loopback]")
173 client_transport = Unicode('tcp', config=True,
173 client_transport = Unicode('tcp', config=True,
174 help="0MQ transport for client connections. [default : tcp]")
174 help="0MQ transport for client connections. [default : tcp]")
175
175
176 monitor_ip = Unicode('127.0.0.1', config=True,
176 monitor_ip = Unicode('127.0.0.1', config=True,
177 help="IP on which to listen for monitor messages. [default: loopback]")
177 help="IP on which to listen for monitor messages. [default: loopback]")
178 monitor_transport = Unicode('tcp', config=True,
178 monitor_transport = Unicode('tcp', config=True,
179 help="0MQ transport for monitor messages. [default : tcp]")
179 help="0MQ transport for monitor messages. [default : tcp]")
180
180
181 monitor_url = Unicode('')
181 monitor_url = Unicode('')
182
182
183 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
183 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
184 config=True, help="""The class to use for the DB backend""")
184 config=True, help="""The class to use for the DB backend""")
185
185
186 # not configurable
186 # not configurable
187 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
187 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
188 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
188 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
189
189
190 def _ip_changed(self, name, old, new):
190 def _ip_changed(self, name, old, new):
191 self.engine_ip = new
191 self.engine_ip = new
192 self.client_ip = new
192 self.client_ip = new
193 self.monitor_ip = new
193 self.monitor_ip = new
194 self._update_monitor_url()
194 self._update_monitor_url()
195
195
196 def _update_monitor_url(self):
196 def _update_monitor_url(self):
197 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
197 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
198
198
199 def _transport_changed(self, name, old, new):
199 def _transport_changed(self, name, old, new):
200 self.engine_transport = new
200 self.engine_transport = new
201 self.client_transport = new
201 self.client_transport = new
202 self.monitor_transport = new
202 self.monitor_transport = new
203 self._update_monitor_url()
203 self._update_monitor_url()
204
204
205 def __init__(self, **kwargs):
205 def __init__(self, **kwargs):
206 super(HubFactory, self).__init__(**kwargs)
206 super(HubFactory, self).__init__(**kwargs)
207 self._update_monitor_url()
207 self._update_monitor_url()
208
208
209
209
210 def construct(self):
210 def construct(self):
211 self.init_hub()
211 self.init_hub()
212
212
213 def start(self):
213 def start(self):
214 self.heartmonitor.start()
214 self.heartmonitor.start()
215 self.log.info("Heartmonitor started")
215 self.log.info("Heartmonitor started")
216
216
217 def init_hub(self):
217 def init_hub(self):
218 """construct"""
218 """construct"""
219 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
219 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
220 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
220 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
221
221
222 ctx = self.context
222 ctx = self.context
223 loop = self.loop
223 loop = self.loop
224
224
225 # Registrar socket
225 # Registrar socket
226 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
226 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
227 q.bind(client_iface % self.regport)
227 q.bind(client_iface % self.regport)
228 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
228 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
229 if self.client_ip != self.engine_ip:
229 if self.client_ip != self.engine_ip:
230 q.bind(engine_iface % self.regport)
230 q.bind(engine_iface % self.regport)
231 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
231 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
232
232
233 ### Engine connections ###
233 ### Engine connections ###
234
234
235 # heartbeat
235 # heartbeat
236 hpub = ctx.socket(zmq.PUB)
236 hpub = ctx.socket(zmq.PUB)
237 hpub.bind(engine_iface % self.hb[0])
237 hpub.bind(engine_iface % self.hb[0])
238 hrep = ctx.socket(zmq.ROUTER)
238 hrep = ctx.socket(zmq.ROUTER)
239 hrep.bind(engine_iface % self.hb[1])
239 hrep.bind(engine_iface % self.hb[1])
240 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
240 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
241 pingstream=ZMQStream(hpub,loop),
241 pingstream=ZMQStream(hpub,loop),
242 pongstream=ZMQStream(hrep,loop)
242 pongstream=ZMQStream(hrep,loop)
243 )
243 )
244
244
245 ### Client connections ###
245 ### Client connections ###
246 # Notifier socket
246 # Notifier socket
247 n = ZMQStream(ctx.socket(zmq.PUB), loop)
247 n = ZMQStream(ctx.socket(zmq.PUB), loop)
248 n.bind(client_iface%self.notifier_port)
248 n.bind(client_iface%self.notifier_port)
249
249
250 ### build and launch the queues ###
250 ### build and launch the queues ###
251
251
252 # monitor socket
252 # monitor socket
253 sub = ctx.socket(zmq.SUB)
253 sub = ctx.socket(zmq.SUB)
254 sub.setsockopt(zmq.SUBSCRIBE, b"")
254 sub.setsockopt(zmq.SUBSCRIBE, b"")
255 sub.bind(self.monitor_url)
255 sub.bind(self.monitor_url)
256 sub.bind('inproc://monitor')
256 sub.bind('inproc://monitor')
257 sub = ZMQStream(sub, loop)
257 sub = ZMQStream(sub, loop)
258
258
259 # connect the db
259 # connect the db
260 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
260 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
261 # cdir = self.config.Global.cluster_dir
261 # cdir = self.config.Global.cluster_dir
262 self.db = import_item(str(self.db_class))(session=self.session.session,
262 self.db = import_item(str(self.db_class))(session=self.session.session,
263 config=self.config, log=self.log)
263 config=self.config, log=self.log)
264 time.sleep(.25)
264 time.sleep(.25)
265 try:
265 try:
266 scheme = self.config.TaskScheduler.scheme_name
266 scheme = self.config.TaskScheduler.scheme_name
267 except AttributeError:
267 except AttributeError:
268 from .scheduler import TaskScheduler
268 from .scheduler import TaskScheduler
269 scheme = TaskScheduler.scheme_name.get_default_value()
269 scheme = TaskScheduler.scheme_name.get_default_value()
270 # build connection dicts
270 # build connection dicts
271 self.engine_info = {
271 self.engine_info = {
272 'control' : engine_iface%self.control[1],
272 'control' : engine_iface%self.control[1],
273 'mux': engine_iface%self.mux[1],
273 'mux': engine_iface%self.mux[1],
274 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
274 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
275 'task' : engine_iface%self.task[1],
275 'task' : engine_iface%self.task[1],
276 'iopub' : engine_iface%self.iopub[1],
276 'iopub' : engine_iface%self.iopub[1],
277 # 'monitor' : engine_iface%self.mon_port,
277 # 'monitor' : engine_iface%self.mon_port,
278 }
278 }
279
279
280 self.client_info = {
280 self.client_info = {
281 'control' : client_iface%self.control[0],
281 'control' : client_iface%self.control[0],
282 'mux': client_iface%self.mux[0],
282 'mux': client_iface%self.mux[0],
283 'task' : (scheme, client_iface%self.task[0]),
283 'task' : (scheme, client_iface%self.task[0]),
284 'iopub' : client_iface%self.iopub[0],
284 'iopub' : client_iface%self.iopub[0],
285 'notification': client_iface%self.notifier_port
285 'notification': client_iface%self.notifier_port
286 }
286 }
287 self.log.debug("Hub engine addrs: %s", self.engine_info)
287 self.log.debug("Hub engine addrs: %s", self.engine_info)
288 self.log.debug("Hub client addrs: %s", self.client_info)
288 self.log.debug("Hub client addrs: %s", self.client_info)
289
289
290 # resubmit stream
290 # resubmit stream
291 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
291 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
292 url = util.disambiguate_url(self.client_info['task'][-1])
292 url = util.disambiguate_url(self.client_info['task'][-1])
293 r.setsockopt(zmq.IDENTITY, self.session.bsession)
293 r.setsockopt(zmq.IDENTITY, self.session.bsession)
294 r.connect(url)
294 r.connect(url)
295
295
296 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
296 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
297 query=q, notifier=n, resubmit=r, db=self.db,
297 query=q, notifier=n, resubmit=r, db=self.db,
298 engine_info=self.engine_info, client_info=self.client_info,
298 engine_info=self.engine_info, client_info=self.client_info,
299 log=self.log)
299 log=self.log)
300
300
301
301
302 class Hub(SessionFactory):
302 class Hub(SessionFactory):
303 """The IPython Controller Hub with 0MQ connections
303 """The IPython Controller Hub with 0MQ connections
304
304
305 Parameters
305 Parameters
306 ==========
306 ==========
307 loop: zmq IOLoop instance
307 loop: zmq IOLoop instance
308 session: Session object
308 session: Session object
309 <removed> context: zmq context for creating new connections (?)
309 <removed> context: zmq context for creating new connections (?)
310 queue: ZMQStream for monitoring the command queue (SUB)
310 queue: ZMQStream for monitoring the command queue (SUB)
311 query: ZMQStream for engine registration and client queries requests (XREP)
311 query: ZMQStream for engine registration and client queries requests (XREP)
312 heartbeat: HeartMonitor object checking the pulse of the engines
312 heartbeat: HeartMonitor object checking the pulse of the engines
313 notifier: ZMQStream for broadcasting engine registration changes (PUB)
313 notifier: ZMQStream for broadcasting engine registration changes (PUB)
314 db: connection to db for out of memory logging of commands
314 db: connection to db for out of memory logging of commands
315 NotImplemented
315 NotImplemented
316 engine_info: dict of zmq connection information for engines to connect
316 engine_info: dict of zmq connection information for engines to connect
317 to the queues.
317 to the queues.
318 client_info: dict of zmq connection information for engines to connect
318 client_info: dict of zmq connection information for engines to connect
319 to the queues.
319 to the queues.
320 """
320 """
321 # internal data structures:
321 # internal data structures:
322 ids=Set() # engine IDs
322 ids=Set() # engine IDs
323 keytable=Dict()
323 keytable=Dict()
324 by_ident=Dict()
324 by_ident=Dict()
325 engines=Dict()
325 engines=Dict()
326 clients=Dict()
326 clients=Dict()
327 hearts=Dict()
327 hearts=Dict()
328 pending=Set()
328 pending=Set()
329 queues=Dict() # pending msg_ids keyed by engine_id
329 queues=Dict() # pending msg_ids keyed by engine_id
330 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
330 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
331 completed=Dict() # completed msg_ids keyed by engine_id
331 completed=Dict() # completed msg_ids keyed by engine_id
332 all_completed=Set() # completed msg_ids keyed by engine_id
332 all_completed=Set() # completed msg_ids keyed by engine_id
333 dead_engines=Set() # completed msg_ids keyed by engine_id
333 dead_engines=Set() # completed msg_ids keyed by engine_id
334 unassigned=Set() # set of task msg_ds not yet assigned a destination
334 unassigned=Set() # set of task msg_ds not yet assigned a destination
335 incoming_registrations=Dict()
335 incoming_registrations=Dict()
336 registration_timeout=Integer()
336 registration_timeout=Integer()
337 _idcounter=Integer(0)
337 _idcounter=Integer(0)
338
338
339 # objects from constructor:
339 # objects from constructor:
340 query=Instance(ZMQStream)
340 query=Instance(ZMQStream)
341 monitor=Instance(ZMQStream)
341 monitor=Instance(ZMQStream)
342 notifier=Instance(ZMQStream)
342 notifier=Instance(ZMQStream)
343 resubmit=Instance(ZMQStream)
343 resubmit=Instance(ZMQStream)
344 heartmonitor=Instance(HeartMonitor)
344 heartmonitor=Instance(HeartMonitor)
345 db=Instance(object)
345 db=Instance(object)
346 client_info=Dict()
346 client_info=Dict()
347 engine_info=Dict()
347 engine_info=Dict()
348
348
349
349
350 def __init__(self, **kwargs):
350 def __init__(self, **kwargs):
351 """
351 """
352 # universal:
352 # universal:
353 loop: IOLoop for creating future connections
353 loop: IOLoop for creating future connections
354 session: streamsession for sending serialized data
354 session: streamsession for sending serialized data
355 # engine:
355 # engine:
356 queue: ZMQStream for monitoring queue messages
356 queue: ZMQStream for monitoring queue messages
357 query: ZMQStream for engine+client registration and client requests
357 query: ZMQStream for engine+client registration and client requests
358 heartbeat: HeartMonitor object for tracking engines
358 heartbeat: HeartMonitor object for tracking engines
359 # extra:
359 # extra:
360 db: ZMQStream for db connection (NotImplemented)
360 db: ZMQStream for db connection (NotImplemented)
361 engine_info: zmq address/protocol dict for engine connections
361 engine_info: zmq address/protocol dict for engine connections
362 client_info: zmq address/protocol dict for client connections
362 client_info: zmq address/protocol dict for client connections
363 """
363 """
364
364
365 super(Hub, self).__init__(**kwargs)
365 super(Hub, self).__init__(**kwargs)
366 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
366 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
367
367
368 # validate connection dicts:
368 # validate connection dicts:
369 for k,v in self.client_info.iteritems():
369 for k,v in self.client_info.iteritems():
370 if k == 'task':
370 if k == 'task':
371 util.validate_url_container(v[1])
371 util.validate_url_container(v[1])
372 else:
372 else:
373 util.validate_url_container(v)
373 util.validate_url_container(v)
374 # util.validate_url_container(self.client_info)
374 # util.validate_url_container(self.client_info)
375 util.validate_url_container(self.engine_info)
375 util.validate_url_container(self.engine_info)
376
376
377 # register our callbacks
377 # register our callbacks
378 self.query.on_recv(self.dispatch_query)
378 self.query.on_recv(self.dispatch_query)
379 self.monitor.on_recv(self.dispatch_monitor_traffic)
379 self.monitor.on_recv(self.dispatch_monitor_traffic)
380
380
381 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
381 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
382 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
382 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
383
383
384 self.monitor_handlers = {b'in' : self.save_queue_request,
384 self.monitor_handlers = {b'in' : self.save_queue_request,
385 b'out': self.save_queue_result,
385 b'out': self.save_queue_result,
386 b'intask': self.save_task_request,
386 b'intask': self.save_task_request,
387 b'outtask': self.save_task_result,
387 b'outtask': self.save_task_result,
388 b'tracktask': self.save_task_destination,
388 b'tracktask': self.save_task_destination,
389 b'incontrol': _passer,
389 b'incontrol': _passer,
390 b'outcontrol': _passer,
390 b'outcontrol': _passer,
391 b'iopub': self.save_iopub_message,
391 b'iopub': self.save_iopub_message,
392 }
392 }
393
393
394 self.query_handlers = {'queue_request': self.queue_status,
394 self.query_handlers = {'queue_request': self.queue_status,
395 'result_request': self.get_results,
395 'result_request': self.get_results,
396 'history_request': self.get_history,
396 'history_request': self.get_history,
397 'db_request': self.db_query,
397 'db_request': self.db_query,
398 'purge_request': self.purge_results,
398 'purge_request': self.purge_results,
399 'load_request': self.check_load,
399 'load_request': self.check_load,
400 'resubmit_request': self.resubmit_task,
400 'resubmit_request': self.resubmit_task,
401 'shutdown_request': self.shutdown_request,
401 'shutdown_request': self.shutdown_request,
402 'registration_request' : self.register_engine,
402 'registration_request' : self.register_engine,
403 'unregistration_request' : self.unregister_engine,
403 'unregistration_request' : self.unregister_engine,
404 'connection_request': self.connection_request,
404 'connection_request': self.connection_request,
405 }
405 }
406
406
407 # ignore resubmit replies
407 # ignore resubmit replies
408 self.resubmit.on_recv(lambda msg: None, copy=False)
408 self.resubmit.on_recv(lambda msg: None, copy=False)
409
409
410 self.log.info("hub::created hub")
410 self.log.info("hub::created hub")
411
411
412 @property
412 @property
413 def _next_id(self):
413 def _next_id(self):
414 """gemerate a new ID.
414 """gemerate a new ID.
415
415
416 No longer reuse old ids, just count from 0."""
416 No longer reuse old ids, just count from 0."""
417 newid = self._idcounter
417 newid = self._idcounter
418 self._idcounter += 1
418 self._idcounter += 1
419 return newid
419 return newid
420 # newid = 0
420 # newid = 0
421 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
421 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
422 # # print newid, self.ids, self.incoming_registrations
422 # # print newid, self.ids, self.incoming_registrations
423 # while newid in self.ids or newid in incoming:
423 # while newid in self.ids or newid in incoming:
424 # newid += 1
424 # newid += 1
425 # return newid
425 # return newid
426
426
427 #-----------------------------------------------------------------------------
427 #-----------------------------------------------------------------------------
428 # message validation
428 # message validation
429 #-----------------------------------------------------------------------------
429 #-----------------------------------------------------------------------------
430
430
431 def _validate_targets(self, targets):
431 def _validate_targets(self, targets):
432 """turn any valid targets argument into a list of integer ids"""
432 """turn any valid targets argument into a list of integer ids"""
433 if targets is None:
433 if targets is None:
434 # default to all
434 # default to all
435 return self.ids
435 return self.ids
436
436
437 if isinstance(targets, (int,str,unicode)):
437 if isinstance(targets, (int,str,unicode)):
438 # only one target specified
438 # only one target specified
439 targets = [targets]
439 targets = [targets]
440 _targets = []
440 _targets = []
441 for t in targets:
441 for t in targets:
442 # map raw identities to ids
442 # map raw identities to ids
443 if isinstance(t, (str,unicode)):
443 if isinstance(t, (str,unicode)):
444 t = self.by_ident.get(t, t)
444 t = self.by_ident.get(t, t)
445 _targets.append(t)
445 _targets.append(t)
446 targets = _targets
446 targets = _targets
447 bad_targets = [ t for t in targets if t not in self.ids ]
447 bad_targets = [ t for t in targets if t not in self.ids ]
448 if bad_targets:
448 if bad_targets:
449 raise IndexError("No Such Engine: %r" % bad_targets)
449 raise IndexError("No Such Engine: %r" % bad_targets)
450 if not targets:
450 if not targets:
451 raise IndexError("No Engines Registered")
451 raise IndexError("No Engines Registered")
452 return targets
452 return targets
453
453
454 #-----------------------------------------------------------------------------
454 #-----------------------------------------------------------------------------
455 # dispatch methods (1 per stream)
455 # dispatch methods (1 per stream)
456 #-----------------------------------------------------------------------------
456 #-----------------------------------------------------------------------------
457
457
458
458
459 @util.log_errors
459 @util.log_errors
460 def dispatch_monitor_traffic(self, msg):
460 def dispatch_monitor_traffic(self, msg):
461 """all ME and Task queue messages come through here, as well as
461 """all ME and Task queue messages come through here, as well as
462 IOPub traffic."""
462 IOPub traffic."""
463 self.log.debug("monitor traffic: %r", msg[0])
463 self.log.debug("monitor traffic: %r", msg[0])
464 switch = msg[0]
464 switch = msg[0]
465 try:
465 try:
466 idents, msg = self.session.feed_identities(msg[1:])
466 idents, msg = self.session.feed_identities(msg[1:])
467 except ValueError:
467 except ValueError:
468 idents=[]
468 idents=[]
469 if not idents:
469 if not idents:
470 self.log.error("Bad Monitor Message: %r", msg)
470 self.log.error("Bad Monitor Message: %r", msg)
471 return
471 return
472 handler = self.monitor_handlers.get(switch, None)
472 handler = self.monitor_handlers.get(switch, None)
473 if handler is not None:
473 if handler is not None:
474 handler(idents, msg)
474 handler(idents, msg)
475 else:
475 else:
476 self.log.error("Invalid monitor topic: %r", switch)
476 self.log.error("Invalid monitor topic: %r", switch)
477
477
478
478
479 @util.log_errors
479 @util.log_errors
480 def dispatch_query(self, msg):
480 def dispatch_query(self, msg):
481 """Route registration requests and queries from clients."""
481 """Route registration requests and queries from clients."""
482 try:
482 try:
483 idents, msg = self.session.feed_identities(msg)
483 idents, msg = self.session.feed_identities(msg)
484 except ValueError:
484 except ValueError:
485 idents = []
485 idents = []
486 if not idents:
486 if not idents:
487 self.log.error("Bad Query Message: %r", msg)
487 self.log.error("Bad Query Message: %r", msg)
488 return
488 return
489 client_id = idents[0]
489 client_id = idents[0]
490 try:
490 try:
491 msg = self.session.unserialize(msg, content=True)
491 msg = self.session.unserialize(msg, content=True)
492 except Exception:
492 except Exception:
493 content = error.wrap_exception()
493 content = error.wrap_exception()
494 self.log.error("Bad Query Message: %r", msg, exc_info=True)
494 self.log.error("Bad Query Message: %r", msg, exc_info=True)
495 self.session.send(self.query, "hub_error", ident=client_id,
495 self.session.send(self.query, "hub_error", ident=client_id,
496 content=content)
496 content=content)
497 return
497 return
498 # print client_id, header, parent, content
498 # print client_id, header, parent, content
499 #switch on message type:
499 #switch on message type:
500 msg_type = msg['header']['msg_type']
500 msg_type = msg['header']['msg_type']
501 self.log.info("client::client %r requested %r", client_id, msg_type)
501 self.log.info("client::client %r requested %r", client_id, msg_type)
502 handler = self.query_handlers.get(msg_type, None)
502 handler = self.query_handlers.get(msg_type, None)
503 try:
503 try:
504 assert handler is not None, "Bad Message Type: %r" % msg_type
504 assert handler is not None, "Bad Message Type: %r" % msg_type
505 except:
505 except:
506 content = error.wrap_exception()
506 content = error.wrap_exception()
507 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
507 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
508 self.session.send(self.query, "hub_error", ident=client_id,
508 self.session.send(self.query, "hub_error", ident=client_id,
509 content=content)
509 content=content)
510 return
510 return
511
511
512 else:
512 else:
513 handler(idents, msg)
513 handler(idents, msg)
514
514
515 def dispatch_db(self, msg):
515 def dispatch_db(self, msg):
516 """"""
516 """"""
517 raise NotImplementedError
517 raise NotImplementedError
518
518
519 #---------------------------------------------------------------------------
519 #---------------------------------------------------------------------------
520 # handler methods (1 per event)
520 # handler methods (1 per event)
521 #---------------------------------------------------------------------------
521 #---------------------------------------------------------------------------
522
522
523 #----------------------- Heartbeat --------------------------------------
523 #----------------------- Heartbeat --------------------------------------
524
524
525 def handle_new_heart(self, heart):
525 def handle_new_heart(self, heart):
526 """handler to attach to heartbeater.
526 """handler to attach to heartbeater.
527 Called when a new heart starts to beat.
527 Called when a new heart starts to beat.
528 Triggers completion of registration."""
528 Triggers completion of registration."""
529 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
529 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
530 if heart not in self.incoming_registrations:
530 if heart not in self.incoming_registrations:
531 self.log.info("heartbeat::ignoring new heart: %r", heart)
531 self.log.info("heartbeat::ignoring new heart: %r", heart)
532 else:
532 else:
533 self.finish_registration(heart)
533 self.finish_registration(heart)
534
534
535
535
536 def handle_heart_failure(self, heart):
536 def handle_heart_failure(self, heart):
537 """handler to attach to heartbeater.
537 """handler to attach to heartbeater.
538 called when a previously registered heart fails to respond to beat request.
538 called when a previously registered heart fails to respond to beat request.
539 triggers unregistration"""
539 triggers unregistration"""
540 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
540 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
541 eid = self.hearts.get(heart, None)
541 eid = self.hearts.get(heart, None)
542 queue = self.engines[eid].queue
542 queue = self.engines[eid].queue
543 if eid is None or self.keytable[eid] in self.dead_engines:
543 if eid is None or self.keytable[eid] in self.dead_engines:
544 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
544 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
545 else:
545 else:
546 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
546 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
547
547
548 #----------------------- MUX Queue Traffic ------------------------------
548 #----------------------- MUX Queue Traffic ------------------------------
549
549
550 def save_queue_request(self, idents, msg):
550 def save_queue_request(self, idents, msg):
551 if len(idents) < 2:
551 if len(idents) < 2:
552 self.log.error("invalid identity prefix: %r", idents)
552 self.log.error("invalid identity prefix: %r", idents)
553 return
553 return
554 queue_id, client_id = idents[:2]
554 queue_id, client_id = idents[:2]
555 try:
555 try:
556 msg = self.session.unserialize(msg)
556 msg = self.session.unserialize(msg)
557 except Exception:
557 except Exception:
558 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
558 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
559 return
559 return
560
560
561 eid = self.by_ident.get(queue_id, None)
561 eid = self.by_ident.get(queue_id, None)
562 if eid is None:
562 if eid is None:
563 self.log.error("queue::target %r not registered", queue_id)
563 self.log.error("queue::target %r not registered", queue_id)
564 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
564 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
565 return
565 return
566 record = init_record(msg)
566 record = init_record(msg)
567 msg_id = record['msg_id']
567 msg_id = record['msg_id']
568 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
568 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
569 # Unicode in records
569 # Unicode in records
570 record['engine_uuid'] = queue_id.decode('ascii')
570 record['engine_uuid'] = queue_id.decode('ascii')
571 record['client_uuid'] = client_id.decode('ascii')
571 record['client_uuid'] = client_id.decode('ascii')
572 record['queue'] = 'mux'
572 record['queue'] = 'mux'
573
573
574 try:
574 try:
575 # it's posible iopub arrived first:
575 # it's posible iopub arrived first:
576 existing = self.db.get_record(msg_id)
576 existing = self.db.get_record(msg_id)
577 for key,evalue in existing.iteritems():
577 for key,evalue in existing.iteritems():
578 rvalue = record.get(key, None)
578 rvalue = record.get(key, None)
579 if evalue and rvalue and evalue != rvalue:
579 if evalue and rvalue and evalue != rvalue:
580 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
580 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
581 elif evalue and not rvalue:
581 elif evalue and not rvalue:
582 record[key] = evalue
582 record[key] = evalue
583 try:
583 try:
584 self.db.update_record(msg_id, record)
584 self.db.update_record(msg_id, record)
585 except Exception:
585 except Exception:
586 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
586 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
587 except KeyError:
587 except KeyError:
588 try:
588 try:
589 self.db.add_record(msg_id, record)
589 self.db.add_record(msg_id, record)
590 except Exception:
590 except Exception:
591 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
591 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
592
592
593
593
594 self.pending.add(msg_id)
594 self.pending.add(msg_id)
595 self.queues[eid].append(msg_id)
595 self.queues[eid].append(msg_id)
596
596
597 def save_queue_result(self, idents, msg):
597 def save_queue_result(self, idents, msg):
598 if len(idents) < 2:
598 if len(idents) < 2:
599 self.log.error("invalid identity prefix: %r", idents)
599 self.log.error("invalid identity prefix: %r", idents)
600 return
600 return
601
601
602 client_id, queue_id = idents[:2]
602 client_id, queue_id = idents[:2]
603 try:
603 try:
604 msg = self.session.unserialize(msg)
604 msg = self.session.unserialize(msg)
605 except Exception:
605 except Exception:
606 self.log.error("queue::engine %r sent invalid message to %r: %r",
606 self.log.error("queue::engine %r sent invalid message to %r: %r",
607 queue_id, client_id, msg, exc_info=True)
607 queue_id, client_id, msg, exc_info=True)
608 return
608 return
609
609
610 eid = self.by_ident.get(queue_id, None)
610 eid = self.by_ident.get(queue_id, None)
611 if eid is None:
611 if eid is None:
612 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
612 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
613 return
613 return
614
614
615 parent = msg['parent_header']
615 parent = msg['parent_header']
616 if not parent:
616 if not parent:
617 return
617 return
618 msg_id = parent['msg_id']
618 msg_id = parent['msg_id']
619 if msg_id in self.pending:
619 if msg_id in self.pending:
620 self.pending.remove(msg_id)
620 self.pending.remove(msg_id)
621 self.all_completed.add(msg_id)
621 self.all_completed.add(msg_id)
622 self.queues[eid].remove(msg_id)
622 self.queues[eid].remove(msg_id)
623 self.completed[eid].append(msg_id)
623 self.completed[eid].append(msg_id)
624 self.log.info("queue::request %r completed on %s", msg_id, eid)
624 self.log.info("queue::request %r completed on %s", msg_id, eid)
625 elif msg_id not in self.all_completed:
625 elif msg_id not in self.all_completed:
626 # it could be a result from a dead engine that died before delivering the
626 # it could be a result from a dead engine that died before delivering the
627 # result
627 # result
628 self.log.warn("queue:: unknown msg finished %r", msg_id)
628 self.log.warn("queue:: unknown msg finished %r", msg_id)
629 return
629 return
630 # update record anyway, because the unregistration could have been premature
630 # update record anyway, because the unregistration could have been premature
631 rheader = msg['header']
631 rheader = msg['header']
632 completed = rheader['date']
632 completed = rheader['date']
633 started = rheader.get('started', None)
633 started = rheader.get('started', None)
634 result = {
634 result = {
635 'result_header' : rheader,
635 'result_header' : rheader,
636 'result_content': msg['content'],
636 'result_content': msg['content'],
637 'received': datetime.now(),
637 'received': datetime.now(),
638 'started' : started,
638 'started' : started,
639 'completed' : completed
639 'completed' : completed
640 }
640 }
641
641
642 result['result_buffers'] = msg['buffers']
642 result['result_buffers'] = msg['buffers']
643 try:
643 try:
644 self.db.update_record(msg_id, result)
644 self.db.update_record(msg_id, result)
645 except Exception:
645 except Exception:
646 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
646 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
647
647
648
648
649 #--------------------- Task Queue Traffic ------------------------------
649 #--------------------- Task Queue Traffic ------------------------------
650
650
651 def save_task_request(self, idents, msg):
651 def save_task_request(self, idents, msg):
652 """Save the submission of a task."""
652 """Save the submission of a task."""
653 client_id = idents[0]
653 client_id = idents[0]
654
654
655 try:
655 try:
656 msg = self.session.unserialize(msg)
656 msg = self.session.unserialize(msg)
657 except Exception:
657 except Exception:
658 self.log.error("task::client %r sent invalid task message: %r",
658 self.log.error("task::client %r sent invalid task message: %r",
659 client_id, msg, exc_info=True)
659 client_id, msg, exc_info=True)
660 return
660 return
661 record = init_record(msg)
661 record = init_record(msg)
662
662
663 record['client_uuid'] = client_id.decode('ascii')
663 record['client_uuid'] = client_id.decode('ascii')
664 record['queue'] = 'task'
664 record['queue'] = 'task'
665 header = msg['header']
665 header = msg['header']
666 msg_id = header['msg_id']
666 msg_id = header['msg_id']
667 self.pending.add(msg_id)
667 self.pending.add(msg_id)
668 self.unassigned.add(msg_id)
668 self.unassigned.add(msg_id)
669 try:
669 try:
670 # it's posible iopub arrived first:
670 # it's posible iopub arrived first:
671 existing = self.db.get_record(msg_id)
671 existing = self.db.get_record(msg_id)
672 if existing['resubmitted']:
672 if existing['resubmitted']:
673 for key in ('submitted', 'client_uuid', 'buffers'):
673 for key in ('submitted', 'client_uuid', 'buffers'):
674 # don't clobber these keys on resubmit
674 # don't clobber these keys on resubmit
675 # submitted and client_uuid should be different
675 # submitted and client_uuid should be different
676 # and buffers might be big, and shouldn't have changed
676 # and buffers might be big, and shouldn't have changed
677 record.pop(key)
677 record.pop(key)
678 # still check content,header which should not change
678 # still check content,header which should not change
679 # but are not expensive to compare as buffers
679 # but are not expensive to compare as buffers
680
680
681 for key,evalue in existing.iteritems():
681 for key,evalue in existing.iteritems():
682 if key.endswith('buffers'):
682 if key.endswith('buffers'):
683 # don't compare buffers
683 # don't compare buffers
684 continue
684 continue
685 rvalue = record.get(key, None)
685 rvalue = record.get(key, None)
686 if evalue and rvalue and evalue != rvalue:
686 if evalue and rvalue and evalue != rvalue:
687 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
687 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
688 elif evalue and not rvalue:
688 elif evalue and not rvalue:
689 record[key] = evalue
689 record[key] = evalue
690 try:
690 try:
691 self.db.update_record(msg_id, record)
691 self.db.update_record(msg_id, record)
692 except Exception:
692 except Exception:
693 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
693 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
694 except KeyError:
694 except KeyError:
695 try:
695 try:
696 self.db.add_record(msg_id, record)
696 self.db.add_record(msg_id, record)
697 except Exception:
697 except Exception:
698 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
698 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
699 except Exception:
699 except Exception:
700 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
700 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
701
701
702 def save_task_result(self, idents, msg):
702 def save_task_result(self, idents, msg):
703 """save the result of a completed task."""
703 """save the result of a completed task."""
704 client_id = idents[0]
704 client_id = idents[0]
705 try:
705 try:
706 msg = self.session.unserialize(msg)
706 msg = self.session.unserialize(msg)
707 except Exception:
707 except Exception:
708 self.log.error("task::invalid task result message send to %r: %r",
708 self.log.error("task::invalid task result message send to %r: %r",
709 client_id, msg, exc_info=True)
709 client_id, msg, exc_info=True)
710 return
710 return
711
711
712 parent = msg['parent_header']
712 parent = msg['parent_header']
713 if not parent:
713 if not parent:
714 # print msg
714 # print msg
715 self.log.warn("Task %r had no parent!", msg)
715 self.log.warn("Task %r had no parent!", msg)
716 return
716 return
717 msg_id = parent['msg_id']
717 msg_id = parent['msg_id']
718 if msg_id in self.unassigned:
718 if msg_id in self.unassigned:
719 self.unassigned.remove(msg_id)
719 self.unassigned.remove(msg_id)
720
720
721 header = msg['header']
721 header = msg['header']
722 engine_uuid = header.get('engine', None)
722 engine_uuid = header.get('engine', None)
723 eid = self.by_ident.get(engine_uuid, None)
723 eid = self.by_ident.get(engine_uuid, None)
724
725 status = header.get('status', None)
724
726
725 if msg_id in self.pending:
727 if msg_id in self.pending:
726 self.log.info("task::task %r finished on %s", msg_id, eid)
728 self.log.info("task::task %r finished on %s", msg_id, eid)
727 self.pending.remove(msg_id)
729 self.pending.remove(msg_id)
728 self.all_completed.add(msg_id)
730 self.all_completed.add(msg_id)
729 if eid is not None:
731 if eid is not None:
730 self.completed[eid].append(msg_id)
732 if status != 'aborted':
733 self.completed[eid].append(msg_id)
731 if msg_id in self.tasks[eid]:
734 if msg_id in self.tasks[eid]:
732 self.tasks[eid].remove(msg_id)
735 self.tasks[eid].remove(msg_id)
733 completed = header['date']
736 completed = header['date']
734 started = header.get('started', None)
737 started = header.get('started', None)
735 result = {
738 result = {
736 'result_header' : header,
739 'result_header' : header,
737 'result_content': msg['content'],
740 'result_content': msg['content'],
738 'started' : started,
741 'started' : started,
739 'completed' : completed,
742 'completed' : completed,
740 'received' : datetime.now(),
743 'received' : datetime.now(),
741 'engine_uuid': engine_uuid,
744 'engine_uuid': engine_uuid,
742 }
745 }
743
746
744 result['result_buffers'] = msg['buffers']
747 result['result_buffers'] = msg['buffers']
745 try:
748 try:
746 self.db.update_record(msg_id, result)
749 self.db.update_record(msg_id, result)
747 except Exception:
750 except Exception:
748 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
751 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
749
752
750 else:
753 else:
751 self.log.debug("task::unknown task %r finished", msg_id)
754 self.log.debug("task::unknown task %r finished", msg_id)
752
755
753 def save_task_destination(self, idents, msg):
756 def save_task_destination(self, idents, msg):
754 try:
757 try:
755 msg = self.session.unserialize(msg, content=True)
758 msg = self.session.unserialize(msg, content=True)
756 except Exception:
759 except Exception:
757 self.log.error("task::invalid task tracking message", exc_info=True)
760 self.log.error("task::invalid task tracking message", exc_info=True)
758 return
761 return
759 content = msg['content']
762 content = msg['content']
760 # print (content)
763 # print (content)
761 msg_id = content['msg_id']
764 msg_id = content['msg_id']
762 engine_uuid = content['engine_id']
765 engine_uuid = content['engine_id']
763 eid = self.by_ident[util.asbytes(engine_uuid)]
766 eid = self.by_ident[util.asbytes(engine_uuid)]
764
767
765 self.log.info("task::task %r arrived on %r", msg_id, eid)
768 self.log.info("task::task %r arrived on %r", msg_id, eid)
766 if msg_id in self.unassigned:
769 if msg_id in self.unassigned:
767 self.unassigned.remove(msg_id)
770 self.unassigned.remove(msg_id)
768 # else:
771 # else:
769 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
772 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
770
773
771 self.tasks[eid].append(msg_id)
774 self.tasks[eid].append(msg_id)
772 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
775 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
773 try:
776 try:
774 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
777 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
775 except Exception:
778 except Exception:
776 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
779 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
777
780
778
781
779 def mia_task_request(self, idents, msg):
782 def mia_task_request(self, idents, msg):
780 raise NotImplementedError
783 raise NotImplementedError
781 client_id = idents[0]
784 client_id = idents[0]
782 # content = dict(mia=self.mia,status='ok')
785 # content = dict(mia=self.mia,status='ok')
783 # self.session.send('mia_reply', content=content, idents=client_id)
786 # self.session.send('mia_reply', content=content, idents=client_id)
784
787
785
788
786 #--------------------- IOPub Traffic ------------------------------
789 #--------------------- IOPub Traffic ------------------------------
787
790
788 def save_iopub_message(self, topics, msg):
791 def save_iopub_message(self, topics, msg):
789 """save an iopub message into the db"""
792 """save an iopub message into the db"""
790 # print (topics)
793 # print (topics)
791 try:
794 try:
792 msg = self.session.unserialize(msg, content=True)
795 msg = self.session.unserialize(msg, content=True)
793 except Exception:
796 except Exception:
794 self.log.error("iopub::invalid IOPub message", exc_info=True)
797 self.log.error("iopub::invalid IOPub message", exc_info=True)
795 return
798 return
796
799
797 parent = msg['parent_header']
800 parent = msg['parent_header']
798 if not parent:
801 if not parent:
799 self.log.error("iopub::invalid IOPub message: %r", msg)
802 self.log.error("iopub::invalid IOPub message: %r", msg)
800 return
803 return
801 msg_id = parent['msg_id']
804 msg_id = parent['msg_id']
802 msg_type = msg['header']['msg_type']
805 msg_type = msg['header']['msg_type']
803 content = msg['content']
806 content = msg['content']
804
807
805 # ensure msg_id is in db
808 # ensure msg_id is in db
806 try:
809 try:
807 rec = self.db.get_record(msg_id)
810 rec = self.db.get_record(msg_id)
808 except KeyError:
811 except KeyError:
809 rec = empty_record()
812 rec = empty_record()
810 rec['msg_id'] = msg_id
813 rec['msg_id'] = msg_id
811 self.db.add_record(msg_id, rec)
814 self.db.add_record(msg_id, rec)
812 # stream
815 # stream
813 d = {}
816 d = {}
814 if msg_type == 'stream':
817 if msg_type == 'stream':
815 name = content['name']
818 name = content['name']
816 s = rec[name] or ''
819 s = rec[name] or ''
817 d[name] = s + content['data']
820 d[name] = s + content['data']
818
821
819 elif msg_type == 'pyerr':
822 elif msg_type == 'pyerr':
820 d['pyerr'] = content
823 d['pyerr'] = content
821 elif msg_type == 'pyin':
824 elif msg_type == 'pyin':
822 d['pyin'] = content['code']
825 d['pyin'] = content['code']
823 else:
826 else:
824 d[msg_type] = content.get('data', '')
827 d[msg_type] = content.get('data', '')
825
828
826 try:
829 try:
827 self.db.update_record(msg_id, d)
830 self.db.update_record(msg_id, d)
828 except Exception:
831 except Exception:
829 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
832 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
830
833
831
834
832
835
833 #-------------------------------------------------------------------------
836 #-------------------------------------------------------------------------
834 # Registration requests
837 # Registration requests
835 #-------------------------------------------------------------------------
838 #-------------------------------------------------------------------------
836
839
837 def connection_request(self, client_id, msg):
840 def connection_request(self, client_id, msg):
838 """Reply with connection addresses for clients."""
841 """Reply with connection addresses for clients."""
839 self.log.info("client::client %r connected", client_id)
842 self.log.info("client::client %r connected", client_id)
840 content = dict(status='ok')
843 content = dict(status='ok')
841 content.update(self.client_info)
844 content.update(self.client_info)
842 jsonable = {}
845 jsonable = {}
843 for k,v in self.keytable.iteritems():
846 for k,v in self.keytable.iteritems():
844 if v not in self.dead_engines:
847 if v not in self.dead_engines:
845 jsonable[str(k)] = v.decode('ascii')
848 jsonable[str(k)] = v.decode('ascii')
846 content['engines'] = jsonable
849 content['engines'] = jsonable
847 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
850 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
848
851
849 def register_engine(self, reg, msg):
852 def register_engine(self, reg, msg):
850 """Register a new engine."""
853 """Register a new engine."""
851 content = msg['content']
854 content = msg['content']
852 try:
855 try:
853 queue = util.asbytes(content['queue'])
856 queue = util.asbytes(content['queue'])
854 except KeyError:
857 except KeyError:
855 self.log.error("registration::queue not specified", exc_info=True)
858 self.log.error("registration::queue not specified", exc_info=True)
856 return
859 return
857 heart = content.get('heartbeat', None)
860 heart = content.get('heartbeat', None)
858 if heart:
861 if heart:
859 heart = util.asbytes(heart)
862 heart = util.asbytes(heart)
860 """register a new engine, and create the socket(s) necessary"""
863 """register a new engine, and create the socket(s) necessary"""
861 eid = self._next_id
864 eid = self._next_id
862 # print (eid, queue, reg, heart)
865 # print (eid, queue, reg, heart)
863
866
864 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
867 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
865
868
866 content = dict(id=eid,status='ok')
869 content = dict(id=eid,status='ok')
867 content.update(self.engine_info)
870 content.update(self.engine_info)
868 # check if requesting available IDs:
871 # check if requesting available IDs:
869 if queue in self.by_ident:
872 if queue in self.by_ident:
870 try:
873 try:
871 raise KeyError("queue_id %r in use" % queue)
874 raise KeyError("queue_id %r in use" % queue)
872 except:
875 except:
873 content = error.wrap_exception()
876 content = error.wrap_exception()
874 self.log.error("queue_id %r in use", queue, exc_info=True)
877 self.log.error("queue_id %r in use", queue, exc_info=True)
875 elif heart in self.hearts: # need to check unique hearts?
878 elif heart in self.hearts: # need to check unique hearts?
876 try:
879 try:
877 raise KeyError("heart_id %r in use" % heart)
880 raise KeyError("heart_id %r in use" % heart)
878 except:
881 except:
879 self.log.error("heart_id %r in use", heart, exc_info=True)
882 self.log.error("heart_id %r in use", heart, exc_info=True)
880 content = error.wrap_exception()
883 content = error.wrap_exception()
881 else:
884 else:
882 for h, pack in self.incoming_registrations.iteritems():
885 for h, pack in self.incoming_registrations.iteritems():
883 if heart == h:
886 if heart == h:
884 try:
887 try:
885 raise KeyError("heart_id %r in use" % heart)
888 raise KeyError("heart_id %r in use" % heart)
886 except:
889 except:
887 self.log.error("heart_id %r in use", heart, exc_info=True)
890 self.log.error("heart_id %r in use", heart, exc_info=True)
888 content = error.wrap_exception()
891 content = error.wrap_exception()
889 break
892 break
890 elif queue == pack[1]:
893 elif queue == pack[1]:
891 try:
894 try:
892 raise KeyError("queue_id %r in use" % queue)
895 raise KeyError("queue_id %r in use" % queue)
893 except:
896 except:
894 self.log.error("queue_id %r in use", queue, exc_info=True)
897 self.log.error("queue_id %r in use", queue, exc_info=True)
895 content = error.wrap_exception()
898 content = error.wrap_exception()
896 break
899 break
897
900
898 msg = self.session.send(self.query, "registration_reply",
901 msg = self.session.send(self.query, "registration_reply",
899 content=content,
902 content=content,
900 ident=reg)
903 ident=reg)
901
904
902 if content['status'] == 'ok':
905 if content['status'] == 'ok':
903 if heart in self.heartmonitor.hearts:
906 if heart in self.heartmonitor.hearts:
904 # already beating
907 # already beating
905 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
908 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
906 self.finish_registration(heart)
909 self.finish_registration(heart)
907 else:
910 else:
908 purge = lambda : self._purge_stalled_registration(heart)
911 purge = lambda : self._purge_stalled_registration(heart)
909 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
912 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
910 dc.start()
913 dc.start()
911 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
914 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
912 else:
915 else:
913 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
916 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
914 return eid
917 return eid
915
918
916 def unregister_engine(self, ident, msg):
919 def unregister_engine(self, ident, msg):
917 """Unregister an engine that explicitly requested to leave."""
920 """Unregister an engine that explicitly requested to leave."""
918 try:
921 try:
919 eid = msg['content']['id']
922 eid = msg['content']['id']
920 except:
923 except:
921 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
924 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
922 return
925 return
923 self.log.info("registration::unregister_engine(%r)", eid)
926 self.log.info("registration::unregister_engine(%r)", eid)
924 # print (eid)
927 # print (eid)
925 uuid = self.keytable[eid]
928 uuid = self.keytable[eid]
926 content=dict(id=eid, queue=uuid.decode('ascii'))
929 content=dict(id=eid, queue=uuid.decode('ascii'))
927 self.dead_engines.add(uuid)
930 self.dead_engines.add(uuid)
928 # self.ids.remove(eid)
931 # self.ids.remove(eid)
929 # uuid = self.keytable.pop(eid)
932 # uuid = self.keytable.pop(eid)
930 #
933 #
931 # ec = self.engines.pop(eid)
934 # ec = self.engines.pop(eid)
932 # self.hearts.pop(ec.heartbeat)
935 # self.hearts.pop(ec.heartbeat)
933 # self.by_ident.pop(ec.queue)
936 # self.by_ident.pop(ec.queue)
934 # self.completed.pop(eid)
937 # self.completed.pop(eid)
935 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
938 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
936 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
939 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
937 dc.start()
940 dc.start()
938 ############## TODO: HANDLE IT ################
941 ############## TODO: HANDLE IT ################
939
942
940 if self.notifier:
943 if self.notifier:
941 self.session.send(self.notifier, "unregistration_notification", content=content)
944 self.session.send(self.notifier, "unregistration_notification", content=content)
942
945
943 def _handle_stranded_msgs(self, eid, uuid):
946 def _handle_stranded_msgs(self, eid, uuid):
944 """Handle messages known to be on an engine when the engine unregisters.
947 """Handle messages known to be on an engine when the engine unregisters.
945
948
946 It is possible that this will fire prematurely - that is, an engine will
949 It is possible that this will fire prematurely - that is, an engine will
947 go down after completing a result, and the client will be notified
950 go down after completing a result, and the client will be notified
948 that the result failed and later receive the actual result.
951 that the result failed and later receive the actual result.
949 """
952 """
950
953
951 outstanding = self.queues[eid]
954 outstanding = self.queues[eid]
952
955
953 for msg_id in outstanding:
956 for msg_id in outstanding:
954 self.pending.remove(msg_id)
957 self.pending.remove(msg_id)
955 self.all_completed.add(msg_id)
958 self.all_completed.add(msg_id)
956 try:
959 try:
957 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
960 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
958 except:
961 except:
959 content = error.wrap_exception()
962 content = error.wrap_exception()
960 # build a fake header:
963 # build a fake header:
961 header = {}
964 header = {}
962 header['engine'] = uuid
965 header['engine'] = uuid
963 header['date'] = datetime.now()
966 header['date'] = datetime.now()
964 rec = dict(result_content=content, result_header=header, result_buffers=[])
967 rec = dict(result_content=content, result_header=header, result_buffers=[])
965 rec['completed'] = header['date']
968 rec['completed'] = header['date']
966 rec['engine_uuid'] = uuid
969 rec['engine_uuid'] = uuid
967 try:
970 try:
968 self.db.update_record(msg_id, rec)
971 self.db.update_record(msg_id, rec)
969 except Exception:
972 except Exception:
970 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
973 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
971
974
972
975
973 def finish_registration(self, heart):
976 def finish_registration(self, heart):
974 """Second half of engine registration, called after our HeartMonitor
977 """Second half of engine registration, called after our HeartMonitor
975 has received a beat from the Engine's Heart."""
978 has received a beat from the Engine's Heart."""
976 try:
979 try:
977 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
980 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
978 except KeyError:
981 except KeyError:
979 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
982 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
980 return
983 return
981 self.log.info("registration::finished registering engine %i:%r", eid, queue)
984 self.log.info("registration::finished registering engine %i:%r", eid, queue)
982 if purge is not None:
985 if purge is not None:
983 purge.stop()
986 purge.stop()
984 control = queue
987 control = queue
985 self.ids.add(eid)
988 self.ids.add(eid)
986 self.keytable[eid] = queue
989 self.keytable[eid] = queue
987 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
990 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
988 control=control, heartbeat=heart)
991 control=control, heartbeat=heart)
989 self.by_ident[queue] = eid
992 self.by_ident[queue] = eid
990 self.queues[eid] = list()
993 self.queues[eid] = list()
991 self.tasks[eid] = list()
994 self.tasks[eid] = list()
992 self.completed[eid] = list()
995 self.completed[eid] = list()
993 self.hearts[heart] = eid
996 self.hearts[heart] = eid
994 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
997 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
995 if self.notifier:
998 if self.notifier:
996 self.session.send(self.notifier, "registration_notification", content=content)
999 self.session.send(self.notifier, "registration_notification", content=content)
997 self.log.info("engine::Engine Connected: %i", eid)
1000 self.log.info("engine::Engine Connected: %i", eid)
998
1001
999 def _purge_stalled_registration(self, heart):
1002 def _purge_stalled_registration(self, heart):
1000 if heart in self.incoming_registrations:
1003 if heart in self.incoming_registrations:
1001 eid = self.incoming_registrations.pop(heart)[0]
1004 eid = self.incoming_registrations.pop(heart)[0]
1002 self.log.info("registration::purging stalled registration: %i", eid)
1005 self.log.info("registration::purging stalled registration: %i", eid)
1003 else:
1006 else:
1004 pass
1007 pass
1005
1008
1006 #-------------------------------------------------------------------------
1009 #-------------------------------------------------------------------------
1007 # Client Requests
1010 # Client Requests
1008 #-------------------------------------------------------------------------
1011 #-------------------------------------------------------------------------
1009
1012
1010 def shutdown_request(self, client_id, msg):
1013 def shutdown_request(self, client_id, msg):
1011 """handle shutdown request."""
1014 """handle shutdown request."""
1012 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1015 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1013 # also notify other clients of shutdown
1016 # also notify other clients of shutdown
1014 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1017 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1015 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1018 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1016 dc.start()
1019 dc.start()
1017
1020
1018 def _shutdown(self):
1021 def _shutdown(self):
1019 self.log.info("hub::hub shutting down.")
1022 self.log.info("hub::hub shutting down.")
1020 time.sleep(0.1)
1023 time.sleep(0.1)
1021 sys.exit(0)
1024 sys.exit(0)
1022
1025
1023
1026
1024 def check_load(self, client_id, msg):
1027 def check_load(self, client_id, msg):
1025 content = msg['content']
1028 content = msg['content']
1026 try:
1029 try:
1027 targets = content['targets']
1030 targets = content['targets']
1028 targets = self._validate_targets(targets)
1031 targets = self._validate_targets(targets)
1029 except:
1032 except:
1030 content = error.wrap_exception()
1033 content = error.wrap_exception()
1031 self.session.send(self.query, "hub_error",
1034 self.session.send(self.query, "hub_error",
1032 content=content, ident=client_id)
1035 content=content, ident=client_id)
1033 return
1036 return
1034
1037
1035 content = dict(status='ok')
1038 content = dict(status='ok')
1036 # loads = {}
1039 # loads = {}
1037 for t in targets:
1040 for t in targets:
1038 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1041 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1039 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1042 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1040
1043
1041
1044
1042 def queue_status(self, client_id, msg):
1045 def queue_status(self, client_id, msg):
1043 """Return the Queue status of one or more targets.
1046 """Return the Queue status of one or more targets.
1044 if verbose: return the msg_ids
1047 if verbose: return the msg_ids
1045 else: return len of each type.
1048 else: return len of each type.
1046 keys: queue (pending MUX jobs)
1049 keys: queue (pending MUX jobs)
1047 tasks (pending Task jobs)
1050 tasks (pending Task jobs)
1048 completed (finished jobs from both queues)"""
1051 completed (finished jobs from both queues)"""
1049 content = msg['content']
1052 content = msg['content']
1050 targets = content['targets']
1053 targets = content['targets']
1051 try:
1054 try:
1052 targets = self._validate_targets(targets)
1055 targets = self._validate_targets(targets)
1053 except:
1056 except:
1054 content = error.wrap_exception()
1057 content = error.wrap_exception()
1055 self.session.send(self.query, "hub_error",
1058 self.session.send(self.query, "hub_error",
1056 content=content, ident=client_id)
1059 content=content, ident=client_id)
1057 return
1060 return
1058 verbose = content.get('verbose', False)
1061 verbose = content.get('verbose', False)
1059 content = dict(status='ok')
1062 content = dict(status='ok')
1060 for t in targets:
1063 for t in targets:
1061 queue = self.queues[t]
1064 queue = self.queues[t]
1062 completed = self.completed[t]
1065 completed = self.completed[t]
1063 tasks = self.tasks[t]
1066 tasks = self.tasks[t]
1064 if not verbose:
1067 if not verbose:
1065 queue = len(queue)
1068 queue = len(queue)
1066 completed = len(completed)
1069 completed = len(completed)
1067 tasks = len(tasks)
1070 tasks = len(tasks)
1068 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1071 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1069 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1072 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1070 # print (content)
1073 # print (content)
1071 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1074 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1072
1075
1073 def purge_results(self, client_id, msg):
1076 def purge_results(self, client_id, msg):
1074 """Purge results from memory. This method is more valuable before we move
1077 """Purge results from memory. This method is more valuable before we move
1075 to a DB based message storage mechanism."""
1078 to a DB based message storage mechanism."""
1076 content = msg['content']
1079 content = msg['content']
1077 self.log.info("Dropping records with %s", content)
1080 self.log.info("Dropping records with %s", content)
1078 msg_ids = content.get('msg_ids', [])
1081 msg_ids = content.get('msg_ids', [])
1079 reply = dict(status='ok')
1082 reply = dict(status='ok')
1080 if msg_ids == 'all':
1083 if msg_ids == 'all':
1081 try:
1084 try:
1082 self.db.drop_matching_records(dict(completed={'$ne':None}))
1085 self.db.drop_matching_records(dict(completed={'$ne':None}))
1083 except Exception:
1086 except Exception:
1084 reply = error.wrap_exception()
1087 reply = error.wrap_exception()
1085 else:
1088 else:
1086 pending = filter(lambda m: m in self.pending, msg_ids)
1089 pending = filter(lambda m: m in self.pending, msg_ids)
1087 if pending:
1090 if pending:
1088 try:
1091 try:
1089 raise IndexError("msg pending: %r" % pending[0])
1092 raise IndexError("msg pending: %r" % pending[0])
1090 except:
1093 except:
1091 reply = error.wrap_exception()
1094 reply = error.wrap_exception()
1092 else:
1095 else:
1093 try:
1096 try:
1094 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1097 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1095 except Exception:
1098 except Exception:
1096 reply = error.wrap_exception()
1099 reply = error.wrap_exception()
1097
1100
1098 if reply['status'] == 'ok':
1101 if reply['status'] == 'ok':
1099 eids = content.get('engine_ids', [])
1102 eids = content.get('engine_ids', [])
1100 for eid in eids:
1103 for eid in eids:
1101 if eid not in self.engines:
1104 if eid not in self.engines:
1102 try:
1105 try:
1103 raise IndexError("No such engine: %i" % eid)
1106 raise IndexError("No such engine: %i" % eid)
1104 except:
1107 except:
1105 reply = error.wrap_exception()
1108 reply = error.wrap_exception()
1106 break
1109 break
1107 uid = self.engines[eid].queue
1110 uid = self.engines[eid].queue
1108 try:
1111 try:
1109 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1112 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1110 except Exception:
1113 except Exception:
1111 reply = error.wrap_exception()
1114 reply = error.wrap_exception()
1112 break
1115 break
1113
1116
1114 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1117 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1115
1118
1116 def resubmit_task(self, client_id, msg):
1119 def resubmit_task(self, client_id, msg):
1117 """Resubmit one or more tasks."""
1120 """Resubmit one or more tasks."""
1118 def finish(reply):
1121 def finish(reply):
1119 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1122 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1120
1123
1121 content = msg['content']
1124 content = msg['content']
1122 msg_ids = content['msg_ids']
1125 msg_ids = content['msg_ids']
1123 reply = dict(status='ok')
1126 reply = dict(status='ok')
1124 try:
1127 try:
1125 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1128 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1126 'header', 'content', 'buffers'])
1129 'header', 'content', 'buffers'])
1127 except Exception:
1130 except Exception:
1128 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1131 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1129 return finish(error.wrap_exception())
1132 return finish(error.wrap_exception())
1130
1133
1131 # validate msg_ids
1134 # validate msg_ids
1132 found_ids = [ rec['msg_id'] for rec in records ]
1135 found_ids = [ rec['msg_id'] for rec in records ]
1133 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1136 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1134 if len(records) > len(msg_ids):
1137 if len(records) > len(msg_ids):
1135 try:
1138 try:
1136 raise RuntimeError("DB appears to be in an inconsistent state."
1139 raise RuntimeError("DB appears to be in an inconsistent state."
1137 "More matching records were found than should exist")
1140 "More matching records were found than should exist")
1138 except Exception:
1141 except Exception:
1139 return finish(error.wrap_exception())
1142 return finish(error.wrap_exception())
1140 elif len(records) < len(msg_ids):
1143 elif len(records) < len(msg_ids):
1141 missing = [ m for m in msg_ids if m not in found_ids ]
1144 missing = [ m for m in msg_ids if m not in found_ids ]
1142 try:
1145 try:
1143 raise KeyError("No such msg(s): %r" % missing)
1146 raise KeyError("No such msg(s): %r" % missing)
1144 except KeyError:
1147 except KeyError:
1145 return finish(error.wrap_exception())
1148 return finish(error.wrap_exception())
1146 elif invalid_ids:
1149 elif invalid_ids:
1147 msg_id = invalid_ids[0]
1150 msg_id = invalid_ids[0]
1148 try:
1151 try:
1149 raise ValueError("Task %r appears to be inflight" % msg_id)
1152 raise ValueError("Task %r appears to be inflight" % msg_id)
1150 except Exception:
1153 except Exception:
1151 return finish(error.wrap_exception())
1154 return finish(error.wrap_exception())
1152
1155
1153 # clear the existing records
1156 # clear the existing records
1154 now = datetime.now()
1157 now = datetime.now()
1155 rec = empty_record()
1158 rec = empty_record()
1156 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1159 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1157 rec['resubmitted'] = now
1160 rec['resubmitted'] = now
1158 rec['queue'] = 'task'
1161 rec['queue'] = 'task'
1159 rec['client_uuid'] = client_id[0]
1162 rec['client_uuid'] = client_id[0]
1160 try:
1163 try:
1161 for msg_id in msg_ids:
1164 for msg_id in msg_ids:
1162 self.all_completed.discard(msg_id)
1165 self.all_completed.discard(msg_id)
1163 self.db.update_record(msg_id, rec)
1166 self.db.update_record(msg_id, rec)
1164 except Exception:
1167 except Exception:
1165 self.log.error('db::db error upating record', exc_info=True)
1168 self.log.error('db::db error upating record', exc_info=True)
1166 reply = error.wrap_exception()
1169 reply = error.wrap_exception()
1167 else:
1170 else:
1168 # send the messages
1171 # send the messages
1169 for rec in records:
1172 for rec in records:
1170 header = rec['header']
1173 header = rec['header']
1171 # include resubmitted in header to prevent digest collision
1174 # include resubmitted in header to prevent digest collision
1172 header['resubmitted'] = now
1175 header['resubmitted'] = now
1173 msg = self.session.msg(header['msg_type'])
1176 msg = self.session.msg(header['msg_type'])
1174 msg['content'] = rec['content']
1177 msg['content'] = rec['content']
1175 msg['header'] = header
1178 msg['header'] = header
1176 msg['header']['msg_id'] = rec['msg_id']
1179 msg['header']['msg_id'] = rec['msg_id']
1177 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1180 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1178
1181
1179 finish(dict(status='ok'))
1182 finish(dict(status='ok'))
1180
1183
1181
1184
1182 def _extract_record(self, rec):
1185 def _extract_record(self, rec):
1183 """decompose a TaskRecord dict into subsection of reply for get_result"""
1186 """decompose a TaskRecord dict into subsection of reply for get_result"""
1184 io_dict = {}
1187 io_dict = {}
1185 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1188 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1186 io_dict[key] = rec[key]
1189 io_dict[key] = rec[key]
1187 content = { 'result_content': rec['result_content'],
1190 content = { 'result_content': rec['result_content'],
1188 'header': rec['header'],
1191 'header': rec['header'],
1189 'result_header' : rec['result_header'],
1192 'result_header' : rec['result_header'],
1190 'received' : rec['received'],
1193 'received' : rec['received'],
1191 'io' : io_dict,
1194 'io' : io_dict,
1192 }
1195 }
1193 if rec['result_buffers']:
1196 if rec['result_buffers']:
1194 buffers = map(bytes, rec['result_buffers'])
1197 buffers = map(bytes, rec['result_buffers'])
1195 else:
1198 else:
1196 buffers = []
1199 buffers = []
1197
1200
1198 return content, buffers
1201 return content, buffers
1199
1202
1200 def get_results(self, client_id, msg):
1203 def get_results(self, client_id, msg):
1201 """Get the result of 1 or more messages."""
1204 """Get the result of 1 or more messages."""
1202 content = msg['content']
1205 content = msg['content']
1203 msg_ids = sorted(set(content['msg_ids']))
1206 msg_ids = sorted(set(content['msg_ids']))
1204 statusonly = content.get('status_only', False)
1207 statusonly = content.get('status_only', False)
1205 pending = []
1208 pending = []
1206 completed = []
1209 completed = []
1207 content = dict(status='ok')
1210 content = dict(status='ok')
1208 content['pending'] = pending
1211 content['pending'] = pending
1209 content['completed'] = completed
1212 content['completed'] = completed
1210 buffers = []
1213 buffers = []
1211 if not statusonly:
1214 if not statusonly:
1212 try:
1215 try:
1213 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1216 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1214 # turn match list into dict, for faster lookup
1217 # turn match list into dict, for faster lookup
1215 records = {}
1218 records = {}
1216 for rec in matches:
1219 for rec in matches:
1217 records[rec['msg_id']] = rec
1220 records[rec['msg_id']] = rec
1218 except Exception:
1221 except Exception:
1219 content = error.wrap_exception()
1222 content = error.wrap_exception()
1220 self.session.send(self.query, "result_reply", content=content,
1223 self.session.send(self.query, "result_reply", content=content,
1221 parent=msg, ident=client_id)
1224 parent=msg, ident=client_id)
1222 return
1225 return
1223 else:
1226 else:
1224 records = {}
1227 records = {}
1225 for msg_id in msg_ids:
1228 for msg_id in msg_ids:
1226 if msg_id in self.pending:
1229 if msg_id in self.pending:
1227 pending.append(msg_id)
1230 pending.append(msg_id)
1228 elif msg_id in self.all_completed:
1231 elif msg_id in self.all_completed:
1229 completed.append(msg_id)
1232 completed.append(msg_id)
1230 if not statusonly:
1233 if not statusonly:
1231 c,bufs = self._extract_record(records[msg_id])
1234 c,bufs = self._extract_record(records[msg_id])
1232 content[msg_id] = c
1235 content[msg_id] = c
1233 buffers.extend(bufs)
1236 buffers.extend(bufs)
1234 elif msg_id in records:
1237 elif msg_id in records:
1235 if rec['completed']:
1238 if rec['completed']:
1236 completed.append(msg_id)
1239 completed.append(msg_id)
1237 c,bufs = self._extract_record(records[msg_id])
1240 c,bufs = self._extract_record(records[msg_id])
1238 content[msg_id] = c
1241 content[msg_id] = c
1239 buffers.extend(bufs)
1242 buffers.extend(bufs)
1240 else:
1243 else:
1241 pending.append(msg_id)
1244 pending.append(msg_id)
1242 else:
1245 else:
1243 try:
1246 try:
1244 raise KeyError('No such message: '+msg_id)
1247 raise KeyError('No such message: '+msg_id)
1245 except:
1248 except:
1246 content = error.wrap_exception()
1249 content = error.wrap_exception()
1247 break
1250 break
1248 self.session.send(self.query, "result_reply", content=content,
1251 self.session.send(self.query, "result_reply", content=content,
1249 parent=msg, ident=client_id,
1252 parent=msg, ident=client_id,
1250 buffers=buffers)
1253 buffers=buffers)
1251
1254
1252 def get_history(self, client_id, msg):
1255 def get_history(self, client_id, msg):
1253 """Get a list of all msg_ids in our DB records"""
1256 """Get a list of all msg_ids in our DB records"""
1254 try:
1257 try:
1255 msg_ids = self.db.get_history()
1258 msg_ids = self.db.get_history()
1256 except Exception as e:
1259 except Exception as e:
1257 content = error.wrap_exception()
1260 content = error.wrap_exception()
1258 else:
1261 else:
1259 content = dict(status='ok', history=msg_ids)
1262 content = dict(status='ok', history=msg_ids)
1260
1263
1261 self.session.send(self.query, "history_reply", content=content,
1264 self.session.send(self.query, "history_reply", content=content,
1262 parent=msg, ident=client_id)
1265 parent=msg, ident=client_id)
1263
1266
1264 def db_query(self, client_id, msg):
1267 def db_query(self, client_id, msg):
1265 """Perform a raw query on the task record database."""
1268 """Perform a raw query on the task record database."""
1266 content = msg['content']
1269 content = msg['content']
1267 query = content.get('query', {})
1270 query = content.get('query', {})
1268 keys = content.get('keys', None)
1271 keys = content.get('keys', None)
1269 buffers = []
1272 buffers = []
1270 empty = list()
1273 empty = list()
1271 try:
1274 try:
1272 records = self.db.find_records(query, keys)
1275 records = self.db.find_records(query, keys)
1273 except Exception as e:
1276 except Exception as e:
1274 content = error.wrap_exception()
1277 content = error.wrap_exception()
1275 else:
1278 else:
1276 # extract buffers from reply content:
1279 # extract buffers from reply content:
1277 if keys is not None:
1280 if keys is not None:
1278 buffer_lens = [] if 'buffers' in keys else None
1281 buffer_lens = [] if 'buffers' in keys else None
1279 result_buffer_lens = [] if 'result_buffers' in keys else None
1282 result_buffer_lens = [] if 'result_buffers' in keys else None
1280 else:
1283 else:
1281 buffer_lens = None
1284 buffer_lens = None
1282 result_buffer_lens = None
1285 result_buffer_lens = None
1283
1286
1284 for rec in records:
1287 for rec in records:
1285 # buffers may be None, so double check
1288 # buffers may be None, so double check
1286 b = rec.pop('buffers', empty) or empty
1289 b = rec.pop('buffers', empty) or empty
1287 if buffer_lens is not None:
1290 if buffer_lens is not None:
1288 buffer_lens.append(len(b))
1291 buffer_lens.append(len(b))
1289 buffers.extend(b)
1292 buffers.extend(b)
1290 rb = rec.pop('result_buffers', empty) or empty
1293 rb = rec.pop('result_buffers', empty) or empty
1291 if result_buffer_lens is not None:
1294 if result_buffer_lens is not None:
1292 result_buffer_lens.append(len(rb))
1295 result_buffer_lens.append(len(rb))
1293 buffers.extend(rb)
1296 buffers.extend(rb)
1294 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1297 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1295 result_buffer_lens=result_buffer_lens)
1298 result_buffer_lens=result_buffer_lens)
1296 # self.log.debug (content)
1299 # self.log.debug (content)
1297 self.session.send(self.query, "db_reply", content=content,
1300 self.session.send(self.query, "db_reply", content=content,
1298 parent=msg, ident=client_id,
1301 parent=msg, ident=client_id,
1299 buffers=buffers)
1302 buffers=buffers)
1300
1303
@@ -1,877 +1,897 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
8 * Implement random port and security key logic.
9 * Implement control messages.
9 * Implement control messages.
10 * Implement event loop and poll version.
10 * Implement event loop and poll version.
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 # Standard library imports
18 # Standard library imports
19 import __builtin__
19 import __builtin__
20 import atexit
20 import atexit
21 import sys
21 import sys
22 import time
22 import time
23 import traceback
23 import traceback
24 import logging
24 import logging
25 import uuid
25 import uuid
26
26
27 from datetime import datetime
27 from datetime import datetime
28 from signal import (
28 from signal import (
29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 )
30 )
31
31
32 # System library imports
32 # System library imports
33 import zmq
33 import zmq
34 from zmq.eventloop import ioloop
34 from zmq.eventloop import ioloop
35 from zmq.eventloop.zmqstream import ZMQStream
35 from zmq.eventloop.zmqstream import ZMQStream
36
36
37 # Local imports
37 # Local imports
38 from IPython.core import pylabtools
38 from IPython.core import pylabtools
39 from IPython.config.configurable import Configurable
39 from IPython.config.configurable import Configurable
40 from IPython.config.application import boolean_flag, catch_config_error
40 from IPython.config.application import boolean_flag, catch_config_error
41 from IPython.core.application import ProfileDir
41 from IPython.core.application import ProfileDir
42 from IPython.core.error import StdinNotImplementedError
42 from IPython.core.error import StdinNotImplementedError
43 from IPython.core.shellapp import (
43 from IPython.core.shellapp import (
44 InteractiveShellApp, shell_flags, shell_aliases
44 InteractiveShellApp, shell_flags, shell_aliases
45 )
45 )
46 from IPython.utils import io
46 from IPython.utils import io
47 from IPython.utils import py3compat
47 from IPython.utils import py3compat
48 from IPython.utils.frame import extract_module_locals
48 from IPython.utils.frame import extract_module_locals
49 from IPython.utils.jsonutil import json_clean
49 from IPython.utils.jsonutil import json_clean
50 from IPython.utils.traitlets import (
50 from IPython.utils.traitlets import (
51 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
51 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
52 )
52 )
53
53
54 from entry_point import base_launch_kernel
54 from entry_point import base_launch_kernel
55 from kernelapp import KernelApp, kernel_flags, kernel_aliases
55 from kernelapp import KernelApp, kernel_flags, kernel_aliases
56 from serialize import serialize_object, unpack_apply_message
56 from serialize import serialize_object, unpack_apply_message
57 from session import Session, Message
57 from session import Session, Message
58 from zmqshell import ZMQInteractiveShell
58 from zmqshell import ZMQInteractiveShell
59
59
60
60
61 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
62 # Main kernel class
62 # Main kernel class
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64
64
65 class Kernel(Configurable):
65 class Kernel(Configurable):
66
66
67 #---------------------------------------------------------------------------
67 #---------------------------------------------------------------------------
68 # Kernel interface
68 # Kernel interface
69 #---------------------------------------------------------------------------
69 #---------------------------------------------------------------------------
70
70
71 # attribute to override with a GUI
71 # attribute to override with a GUI
72 eventloop = Any(None)
72 eventloop = Any(None)
73 def _eventloop_changed(self, name, old, new):
73 def _eventloop_changed(self, name, old, new):
74 """schedule call to eventloop from IOLoop"""
74 """schedule call to eventloop from IOLoop"""
75 loop = ioloop.IOLoop.instance()
75 loop = ioloop.IOLoop.instance()
76 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
76 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
77
77
78 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
78 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
79 session = Instance(Session)
79 session = Instance(Session)
80 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
81 shell_streams = List()
81 shell_streams = List()
82 control_stream = Instance(ZMQStream)
82 control_stream = Instance(ZMQStream)
83 iopub_socket = Instance(zmq.Socket)
83 iopub_socket = Instance(zmq.Socket)
84 stdin_socket = Instance(zmq.Socket)
84 stdin_socket = Instance(zmq.Socket)
85 log = Instance(logging.Logger)
85 log = Instance(logging.Logger)
86
86
87 user_module = Instance('types.ModuleType')
87 user_module = Instance('types.ModuleType')
88 def _user_module_changed(self, name, old, new):
88 def _user_module_changed(self, name, old, new):
89 if self.shell is not None:
89 if self.shell is not None:
90 self.shell.user_module = new
90 self.shell.user_module = new
91
91
92 user_ns = Dict(default_value=None)
92 user_ns = Dict(default_value=None)
93 def _user_ns_changed(self, name, old, new):
93 def _user_ns_changed(self, name, old, new):
94 if self.shell is not None:
94 if self.shell is not None:
95 self.shell.user_ns = new
95 self.shell.user_ns = new
96 self.shell.init_user_ns()
96 self.shell.init_user_ns()
97
97
98 # identities:
98 # identities:
99 int_id = Integer(-1)
99 int_id = Integer(-1)
100 ident = Unicode()
100 ident = Unicode()
101
101
102 def _ident_default(self):
102 def _ident_default(self):
103 return unicode(uuid.uuid4())
103 return unicode(uuid.uuid4())
104
104
105
105
106 # Private interface
106 # Private interface
107
107
108 # Time to sleep after flushing the stdout/err buffers in each execute
108 # Time to sleep after flushing the stdout/err buffers in each execute
109 # cycle. While this introduces a hard limit on the minimal latency of the
109 # cycle. While this introduces a hard limit on the minimal latency of the
110 # execute cycle, it helps prevent output synchronization problems for
110 # execute cycle, it helps prevent output synchronization problems for
111 # clients.
111 # clients.
112 # Units are in seconds. The minimum zmq latency on local host is probably
112 # Units are in seconds. The minimum zmq latency on local host is probably
113 # ~150 microseconds, set this to 500us for now. We may need to increase it
113 # ~150 microseconds, set this to 500us for now. We may need to increase it
114 # a little if it's not enough after more interactive testing.
114 # a little if it's not enough after more interactive testing.
115 _execute_sleep = Float(0.0005, config=True)
115 _execute_sleep = Float(0.0005, config=True)
116
116
117 # Frequency of the kernel's event loop.
117 # Frequency of the kernel's event loop.
118 # Units are in seconds, kernel subclasses for GUI toolkits may need to
118 # Units are in seconds, kernel subclasses for GUI toolkits may need to
119 # adapt to milliseconds.
119 # adapt to milliseconds.
120 _poll_interval = Float(0.05, config=True)
120 _poll_interval = Float(0.05, config=True)
121
121
122 # If the shutdown was requested over the network, we leave here the
122 # If the shutdown was requested over the network, we leave here the
123 # necessary reply message so it can be sent by our registered atexit
123 # necessary reply message so it can be sent by our registered atexit
124 # handler. This ensures that the reply is only sent to clients truly at
124 # handler. This ensures that the reply is only sent to clients truly at
125 # the end of our shutdown process (which happens after the underlying
125 # the end of our shutdown process (which happens after the underlying
126 # IPython shell's own shutdown).
126 # IPython shell's own shutdown).
127 _shutdown_message = None
127 _shutdown_message = None
128
128
129 # This is a dict of port number that the kernel is listening on. It is set
129 # This is a dict of port number that the kernel is listening on. It is set
130 # by record_ports and used by connect_request.
130 # by record_ports and used by connect_request.
131 _recorded_ports = Dict()
131 _recorded_ports = Dict()
132
132
133 # set of aborted msg_ids
133 # set of aborted msg_ids
134 aborted = Set()
134 aborted = Set()
135
135
136
136
137 def __init__(self, **kwargs):
137 def __init__(self, **kwargs):
138 super(Kernel, self).__init__(**kwargs)
138 super(Kernel, self).__init__(**kwargs)
139
139
140 # Initialize the InteractiveShell subclass
140 # Initialize the InteractiveShell subclass
141 self.shell = ZMQInteractiveShell.instance(config=self.config,
141 self.shell = ZMQInteractiveShell.instance(config=self.config,
142 profile_dir = self.profile_dir,
142 profile_dir = self.profile_dir,
143 user_module = self.user_module,
143 user_module = self.user_module,
144 user_ns = self.user_ns,
144 user_ns = self.user_ns,
145 )
145 )
146 self.shell.displayhook.session = self.session
146 self.shell.displayhook.session = self.session
147 self.shell.displayhook.pub_socket = self.iopub_socket
147 self.shell.displayhook.pub_socket = self.iopub_socket
148 self.shell.display_pub.session = self.session
148 self.shell.display_pub.session = self.session
149 self.shell.display_pub.pub_socket = self.iopub_socket
149 self.shell.display_pub.pub_socket = self.iopub_socket
150
150
151 # TMP - hack while developing
151 # TMP - hack while developing
152 self.shell._reply_content = None
152 self.shell._reply_content = None
153
153
154 # Build dict of handlers for message types
154 # Build dict of handlers for message types
155 msg_types = [ 'execute_request', 'complete_request',
155 msg_types = [ 'execute_request', 'complete_request',
156 'object_info_request', 'history_request',
156 'object_info_request', 'history_request',
157 'connect_request', 'shutdown_request',
157 'connect_request', 'shutdown_request',
158 'apply_request',
158 'apply_request',
159 ]
159 ]
160 self.shell_handlers = {}
160 self.shell_handlers = {}
161 for msg_type in msg_types:
161 for msg_type in msg_types:
162 self.shell_handlers[msg_type] = getattr(self, msg_type)
162 self.shell_handlers[msg_type] = getattr(self, msg_type)
163
163
164 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
164 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
165 self.control_handlers = {}
165 self.control_handlers = {}
166 for msg_type in control_msg_types:
166 for msg_type in control_msg_types:
167 self.control_handlers[msg_type] = getattr(self, msg_type)
167 self.control_handlers[msg_type] = getattr(self, msg_type)
168
168
169 def dispatch_control(self, msg):
169 def dispatch_control(self, msg):
170 """dispatch control requests"""
170 """dispatch control requests"""
171 idents,msg = self.session.feed_identities(msg, copy=False)
171 idents,msg = self.session.feed_identities(msg, copy=False)
172 try:
172 try:
173 msg = self.session.unserialize(msg, content=True, copy=False)
173 msg = self.session.unserialize(msg, content=True, copy=False)
174 except:
174 except:
175 self.log.error("Invalid Control Message", exc_info=True)
175 self.log.error("Invalid Control Message", exc_info=True)
176 return
176 return
177
177
178 self.log.debug("Control received: %s", msg)
178 self.log.debug("Control received: %s", msg)
179
179
180 header = msg['header']
180 header = msg['header']
181 msg_id = header['msg_id']
181 msg_id = header['msg_id']
182 msg_type = header['msg_type']
182 msg_type = header['msg_type']
183
183
184 handler = self.control_handlers.get(msg_type, None)
184 handler = self.control_handlers.get(msg_type, None)
185 if handler is None:
185 if handler is None:
186 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
186 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
187 else:
187 else:
188 try:
188 try:
189 handler(self.control_stream, idents, msg)
189 handler(self.control_stream, idents, msg)
190 except Exception:
190 except Exception:
191 self.log.error("Exception in control handler:", exc_info=True)
191 self.log.error("Exception in control handler:", exc_info=True)
192
192
193 def dispatch_shell(self, stream, msg):
193 def dispatch_shell(self, stream, msg):
194 """dispatch shell requests"""
194 """dispatch shell requests"""
195 # flush control requests first
195 # flush control requests first
196 if self.control_stream:
196 if self.control_stream:
197 self.control_stream.flush()
197 self.control_stream.flush()
198
198
199 idents,msg = self.session.feed_identities(msg, copy=False)
199 idents,msg = self.session.feed_identities(msg, copy=False)
200 try:
200 try:
201 msg = self.session.unserialize(msg, content=True, copy=False)
201 msg = self.session.unserialize(msg, content=True, copy=False)
202 except:
202 except:
203 self.log.error("Invalid Message", exc_info=True)
203 self.log.error("Invalid Message", exc_info=True)
204 return
204 return
205
205
206 header = msg['header']
206 header = msg['header']
207 msg_id = header['msg_id']
207 msg_id = header['msg_id']
208 msg_type = msg['header']['msg_type']
208 msg_type = msg['header']['msg_type']
209
209
210 # Print some info about this message and leave a '--->' marker, so it's
210 # Print some info about this message and leave a '--->' marker, so it's
211 # easier to trace visually the message chain when debugging. Each
211 # easier to trace visually the message chain when debugging. Each
212 # handler prints its message at the end.
212 # handler prints its message at the end.
213 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
213 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
214 self.log.debug(' Content: %s\n --->\n ', msg['content'])
214 self.log.debug(' Content: %s\n --->\n ', msg['content'])
215
215
216 if msg_id in self.aborted:
216 if msg_id in self.aborted:
217 self.aborted.remove(msg_id)
217 self.aborted.remove(msg_id)
218 # is it safe to assume a msg_id will not be resubmitted?
218 # is it safe to assume a msg_id will not be resubmitted?
219 reply_type = msg_type.split('_')[0] + '_reply'
219 reply_type = msg_type.split('_')[0] + '_reply'
220 status = {'status' : 'aborted'}
220 status = {'status' : 'aborted'}
221 reply_msg = self.session.send(stream, reply_type, subheader=status,
221 sub = {'engine' : self.ident}
222 sub.update(status)
223 reply_msg = self.session.send(stream, reply_type, subheader=sub,
222 content=status, parent=msg, ident=idents)
224 content=status, parent=msg, ident=idents)
223 return
225 return
224
226
225 handler = self.shell_handlers.get(msg_type, None)
227 handler = self.shell_handlers.get(msg_type, None)
226 if handler is None:
228 if handler is None:
227 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
229 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
228 else:
230 else:
229 # ensure default_int_handler during handler call
231 # ensure default_int_handler during handler call
230 sig = signal(SIGINT, default_int_handler)
232 sig = signal(SIGINT, default_int_handler)
231 try:
233 try:
232 handler(stream, idents, msg)
234 handler(stream, idents, msg)
233 except Exception:
235 except Exception:
234 self.log.error("Exception in message handler:", exc_info=True)
236 self.log.error("Exception in message handler:", exc_info=True)
235 finally:
237 finally:
236 signal(SIGINT, sig)
238 signal(SIGINT, sig)
237
239
238 def enter_eventloop(self):
240 def enter_eventloop(self):
239 """enter eventloop"""
241 """enter eventloop"""
240 self.log.critical("entering eventloop")
242 self.log.critical("entering eventloop")
241 # restore default_int_handler
243 # restore default_int_handler
242 signal(SIGINT, default_int_handler)
244 signal(SIGINT, default_int_handler)
243 self.eventloop(self)
245 self.eventloop(self)
244 self.log.critical("exiting eventloop")
246 self.log.critical("exiting eventloop")
245 # if eventloop exits, IOLoop should stop
247 # if eventloop exits, IOLoop should stop
246 ioloop.IOLoop.instance().stop()
248 ioloop.IOLoop.instance().stop()
247
249
248 def start(self):
250 def start(self):
249 """register dispatchers for streams"""
251 """register dispatchers for streams"""
250 if self.control_stream:
252 if self.control_stream:
251 self.control_stream.on_recv(self.dispatch_control, copy=False)
253 self.control_stream.on_recv(self.dispatch_control, copy=False)
252
254
253 def make_dispatcher(stream):
255 def make_dispatcher(stream):
254 def dispatcher(msg):
256 def dispatcher(msg):
255 return self.dispatch_shell(stream, msg)
257 return self.dispatch_shell(stream, msg)
256 return dispatcher
258 return dispatcher
257
259
258 for s in self.shell_streams:
260 for s in self.shell_streams:
259 s.on_recv(make_dispatcher(s), copy=False)
261 s.on_recv(make_dispatcher(s), copy=False)
260
262
261 def do_one_iteration(self):
263 def do_one_iteration(self):
262 """step eventloop just once"""
264 """step eventloop just once"""
263 if self.control_stream:
265 if self.control_stream:
264 self.control_stream.flush()
266 self.control_stream.flush()
265 for stream in self.shell_streams:
267 for stream in self.shell_streams:
266 # handle at most one request per iteration
268 # handle at most one request per iteration
267 stream.flush(zmq.POLLIN, 1)
269 stream.flush(zmq.POLLIN, 1)
268 stream.flush(zmq.POLLOUT)
270 stream.flush(zmq.POLLOUT)
269
271
270
272
271 def record_ports(self, ports):
273 def record_ports(self, ports):
272 """Record the ports that this kernel is using.
274 """Record the ports that this kernel is using.
273
275
274 The creator of the Kernel instance must call this methods if they
276 The creator of the Kernel instance must call this methods if they
275 want the :meth:`connect_request` method to return the port numbers.
277 want the :meth:`connect_request` method to return the port numbers.
276 """
278 """
277 self._recorded_ports = ports
279 self._recorded_ports = ports
278
280
279 #---------------------------------------------------------------------------
281 #---------------------------------------------------------------------------
280 # Kernel request handlers
282 # Kernel request handlers
281 #---------------------------------------------------------------------------
283 #---------------------------------------------------------------------------
282
284
285 def _make_subheader(self):
286 """init subheader dict, for execute/apply_reply"""
287 return {
288 'dependencies_met' : True,
289 'engine' : self.ident,
290 'started': datetime.now(),
291 }
292
283 def _publish_pyin(self, code, parent, execution_count):
293 def _publish_pyin(self, code, parent, execution_count):
284 """Publish the code request on the pyin stream."""
294 """Publish the code request on the pyin stream."""
285
295
286 self.session.send(self.iopub_socket, u'pyin',
296 self.session.send(self.iopub_socket, u'pyin',
287 {u'code':code, u'execution_count': execution_count},
297 {u'code':code, u'execution_count': execution_count},
288 parent=parent, ident=self._topic('pyin')
298 parent=parent, ident=self._topic('pyin')
289 )
299 )
290
300
291 def execute_request(self, stream, ident, parent):
301 def execute_request(self, stream, ident, parent):
292
302
293 self.session.send(self.iopub_socket,
303 self.session.send(self.iopub_socket,
294 u'status',
304 u'status',
295 {u'execution_state':u'busy'},
305 {u'execution_state':u'busy'},
296 parent=parent,
306 parent=parent,
297 ident=self._topic('status'),
307 ident=self._topic('status'),
298 )
308 )
299
309
300 try:
310 try:
301 content = parent[u'content']
311 content = parent[u'content']
302 code = content[u'code']
312 code = content[u'code']
303 silent = content[u'silent']
313 silent = content[u'silent']
304 except:
314 except:
305 self.log.error("Got bad msg: ")
315 self.log.error("Got bad msg: ")
306 self.log.error("%s", parent)
316 self.log.error("%s", parent)
307 return
317 return
318
319 sub = self._make_subheader()
308
320
309 shell = self.shell # we'll need this a lot here
321 shell = self.shell # we'll need this a lot here
310
322
311 # Replace raw_input. Note that is not sufficient to replace
323 # Replace raw_input. Note that is not sufficient to replace
312 # raw_input in the user namespace.
324 # raw_input in the user namespace.
313 if content.get('allow_stdin', False):
325 if content.get('allow_stdin', False):
314 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
326 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
315 else:
327 else:
316 raw_input = lambda prompt='' : self._no_raw_input()
328 raw_input = lambda prompt='' : self._no_raw_input()
317
329
318 if py3compat.PY3:
330 if py3compat.PY3:
319 __builtin__.input = raw_input
331 __builtin__.input = raw_input
320 else:
332 else:
321 __builtin__.raw_input = raw_input
333 __builtin__.raw_input = raw_input
322
334
323 # Set the parent message of the display hook and out streams.
335 # Set the parent message of the display hook and out streams.
324 shell.displayhook.set_parent(parent)
336 shell.displayhook.set_parent(parent)
325 shell.display_pub.set_parent(parent)
337 shell.display_pub.set_parent(parent)
326 sys.stdout.set_parent(parent)
338 sys.stdout.set_parent(parent)
327 sys.stderr.set_parent(parent)
339 sys.stderr.set_parent(parent)
328
340
329 # Re-broadcast our input for the benefit of listening clients, and
341 # Re-broadcast our input for the benefit of listening clients, and
330 # start computing output
342 # start computing output
331 if not silent:
343 if not silent:
332 self._publish_pyin(code, parent, shell.execution_count)
344 self._publish_pyin(code, parent, shell.execution_count)
333
345
334 reply_content = {}
346 reply_content = {}
335 try:
347 try:
336 # FIXME: the shell calls the exception handler itself.
348 # FIXME: the shell calls the exception handler itself.
337 shell.run_cell(code, store_history=not silent, silent=silent)
349 shell.run_cell(code, store_history=not silent, silent=silent)
338 except:
350 except:
339 status = u'error'
351 status = u'error'
340 # FIXME: this code right now isn't being used yet by default,
352 # FIXME: this code right now isn't being used yet by default,
341 # because the run_cell() call above directly fires off exception
353 # because the run_cell() call above directly fires off exception
342 # reporting. This code, therefore, is only active in the scenario
354 # reporting. This code, therefore, is only active in the scenario
343 # where runlines itself has an unhandled exception. We need to
355 # where runlines itself has an unhandled exception. We need to
344 # uniformize this, for all exception construction to come from a
356 # uniformize this, for all exception construction to come from a
345 # single location in the codbase.
357 # single location in the codbase.
346 etype, evalue, tb = sys.exc_info()
358 etype, evalue, tb = sys.exc_info()
347 tb_list = traceback.format_exception(etype, evalue, tb)
359 tb_list = traceback.format_exception(etype, evalue, tb)
348 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
360 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
349 else:
361 else:
350 status = u'ok'
362 status = u'ok'
351
363
352 reply_content[u'status'] = status
364 reply_content[u'status'] = status
353
365
354 # Return the execution counter so clients can display prompts
366 # Return the execution counter so clients can display prompts
355 reply_content['execution_count'] = shell.execution_count - 1
367 reply_content['execution_count'] = shell.execution_count - 1
356
368
357 # FIXME - fish exception info out of shell, possibly left there by
369 # FIXME - fish exception info out of shell, possibly left there by
358 # runlines. We'll need to clean up this logic later.
370 # runlines. We'll need to clean up this logic later.
359 if shell._reply_content is not None:
371 if shell._reply_content is not None:
360 reply_content.update(shell._reply_content)
372 reply_content.update(shell._reply_content)
361 # reset after use
373 # reset after use
362 shell._reply_content = None
374 shell._reply_content = None
363
375
364 # At this point, we can tell whether the main code execution succeeded
376 # At this point, we can tell whether the main code execution succeeded
365 # or not. If it did, we proceed to evaluate user_variables/expressions
377 # or not. If it did, we proceed to evaluate user_variables/expressions
366 if reply_content['status'] == 'ok':
378 if reply_content['status'] == 'ok':
367 reply_content[u'user_variables'] = \
379 reply_content[u'user_variables'] = \
368 shell.user_variables(content.get(u'user_variables', []))
380 shell.user_variables(content.get(u'user_variables', []))
369 reply_content[u'user_expressions'] = \
381 reply_content[u'user_expressions'] = \
370 shell.user_expressions(content.get(u'user_expressions', {}))
382 shell.user_expressions(content.get(u'user_expressions', {}))
371 else:
383 else:
372 # If there was an error, don't even try to compute variables or
384 # If there was an error, don't even try to compute variables or
373 # expressions
385 # expressions
374 reply_content[u'user_variables'] = {}
386 reply_content[u'user_variables'] = {}
375 reply_content[u'user_expressions'] = {}
387 reply_content[u'user_expressions'] = {}
376
388
377 # Payloads should be retrieved regardless of outcome, so we can both
389 # Payloads should be retrieved regardless of outcome, so we can both
378 # recover partial output (that could have been generated early in a
390 # recover partial output (that could have been generated early in a
379 # block, before an error) and clear the payload system always.
391 # block, before an error) and clear the payload system always.
380 reply_content[u'payload'] = shell.payload_manager.read_payload()
392 reply_content[u'payload'] = shell.payload_manager.read_payload()
381 # Be agressive about clearing the payload because we don't want
393 # Be agressive about clearing the payload because we don't want
382 # it to sit in memory until the next execute_request comes in.
394 # it to sit in memory until the next execute_request comes in.
383 shell.payload_manager.clear_payload()
395 shell.payload_manager.clear_payload()
384
396
385 # Flush output before sending the reply.
397 # Flush output before sending the reply.
386 sys.stdout.flush()
398 sys.stdout.flush()
387 sys.stderr.flush()
399 sys.stderr.flush()
388 # FIXME: on rare occasions, the flush doesn't seem to make it to the
400 # FIXME: on rare occasions, the flush doesn't seem to make it to the
389 # clients... This seems to mitigate the problem, but we definitely need
401 # clients... This seems to mitigate the problem, but we definitely need
390 # to better understand what's going on.
402 # to better understand what's going on.
391 if self._execute_sleep:
403 if self._execute_sleep:
392 time.sleep(self._execute_sleep)
404 time.sleep(self._execute_sleep)
393
405
394 # Send the reply.
406 # Send the reply.
395 reply_content = json_clean(reply_content)
407 reply_content = json_clean(reply_content)
408
409 sub['status'] = reply_content['status']
410 if reply_content['status'] == 'error' and \
411 reply_content['ename'] == 'UnmetDependency':
412 sub['dependencies_met'] = False
413
396 reply_msg = self.session.send(stream, u'execute_reply',
414 reply_msg = self.session.send(stream, u'execute_reply',
397 reply_content, parent, ident=ident)
415 reply_content, parent, subheader=sub,
416 ident=ident)
417
398 self.log.debug("%s", reply_msg)
418 self.log.debug("%s", reply_msg)
399
419
400 if not silent and reply_msg['content']['status'] == u'error':
420 if not silent and reply_msg['content']['status'] == u'error':
401 self._abort_queues()
421 self._abort_queues()
402
422
403 self.session.send(self.iopub_socket,
423 self.session.send(self.iopub_socket,
404 u'status',
424 u'status',
405 {u'execution_state':u'idle'},
425 {u'execution_state':u'idle'},
406 parent=parent,
426 parent=parent,
407 ident=self._topic('status'))
427 ident=self._topic('status'))
408
428
409 def complete_request(self, stream, ident, parent):
429 def complete_request(self, stream, ident, parent):
410 txt, matches = self._complete(parent)
430 txt, matches = self._complete(parent)
411 matches = {'matches' : matches,
431 matches = {'matches' : matches,
412 'matched_text' : txt,
432 'matched_text' : txt,
413 'status' : 'ok'}
433 'status' : 'ok'}
414 matches = json_clean(matches)
434 matches = json_clean(matches)
415 completion_msg = self.session.send(stream, 'complete_reply',
435 completion_msg = self.session.send(stream, 'complete_reply',
416 matches, parent, ident)
436 matches, parent, ident)
417 self.log.debug("%s", completion_msg)
437 self.log.debug("%s", completion_msg)
418
438
419 def object_info_request(self, stream, ident, parent):
439 def object_info_request(self, stream, ident, parent):
420 content = parent['content']
440 content = parent['content']
421 object_info = self.shell.object_inspect(content['oname'],
441 object_info = self.shell.object_inspect(content['oname'],
422 detail_level = content.get('detail_level', 0)
442 detail_level = content.get('detail_level', 0)
423 )
443 )
424 # Before we send this object over, we scrub it for JSON usage
444 # Before we send this object over, we scrub it for JSON usage
425 oinfo = json_clean(object_info)
445 oinfo = json_clean(object_info)
426 msg = self.session.send(stream, 'object_info_reply',
446 msg = self.session.send(stream, 'object_info_reply',
427 oinfo, parent, ident)
447 oinfo, parent, ident)
428 self.log.debug("%s", msg)
448 self.log.debug("%s", msg)
429
449
430 def history_request(self, stream, ident, parent):
450 def history_request(self, stream, ident, parent):
431 # We need to pull these out, as passing **kwargs doesn't work with
451 # We need to pull these out, as passing **kwargs doesn't work with
432 # unicode keys before Python 2.6.5.
452 # unicode keys before Python 2.6.5.
433 hist_access_type = parent['content']['hist_access_type']
453 hist_access_type = parent['content']['hist_access_type']
434 raw = parent['content']['raw']
454 raw = parent['content']['raw']
435 output = parent['content']['output']
455 output = parent['content']['output']
436 if hist_access_type == 'tail':
456 if hist_access_type == 'tail':
437 n = parent['content']['n']
457 n = parent['content']['n']
438 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
458 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
439 include_latest=True)
459 include_latest=True)
440
460
441 elif hist_access_type == 'range':
461 elif hist_access_type == 'range':
442 session = parent['content']['session']
462 session = parent['content']['session']
443 start = parent['content']['start']
463 start = parent['content']['start']
444 stop = parent['content']['stop']
464 stop = parent['content']['stop']
445 hist = self.shell.history_manager.get_range(session, start, stop,
465 hist = self.shell.history_manager.get_range(session, start, stop,
446 raw=raw, output=output)
466 raw=raw, output=output)
447
467
448 elif hist_access_type == 'search':
468 elif hist_access_type == 'search':
449 pattern = parent['content']['pattern']
469 pattern = parent['content']['pattern']
450 hist = self.shell.history_manager.search(pattern, raw=raw,
470 hist = self.shell.history_manager.search(pattern, raw=raw,
451 output=output)
471 output=output)
452
472
453 else:
473 else:
454 hist = []
474 hist = []
455 hist = list(hist)
475 hist = list(hist)
456 content = {'history' : hist}
476 content = {'history' : hist}
457 content = json_clean(content)
477 content = json_clean(content)
458 msg = self.session.send(stream, 'history_reply',
478 msg = self.session.send(stream, 'history_reply',
459 content, parent, ident)
479 content, parent, ident)
460 self.log.debug("Sending history reply with %i entries", len(hist))
480 self.log.debug("Sending history reply with %i entries", len(hist))
461
481
462 def connect_request(self, stream, ident, parent):
482 def connect_request(self, stream, ident, parent):
463 if self._recorded_ports is not None:
483 if self._recorded_ports is not None:
464 content = self._recorded_ports.copy()
484 content = self._recorded_ports.copy()
465 else:
485 else:
466 content = {}
486 content = {}
467 msg = self.session.send(stream, 'connect_reply',
487 msg = self.session.send(stream, 'connect_reply',
468 content, parent, ident)
488 content, parent, ident)
469 self.log.debug("%s", msg)
489 self.log.debug("%s", msg)
470
490
471 def shutdown_request(self, stream, ident, parent):
491 def shutdown_request(self, stream, ident, parent):
472 self.shell.exit_now = True
492 self.shell.exit_now = True
473 content = dict(status='ok')
493 content = dict(status='ok')
474 content.update(parent['content'])
494 content.update(parent['content'])
475 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
495 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
476 # same content, but different msg_id for broadcasting on IOPub
496 # same content, but different msg_id for broadcasting on IOPub
477 self._shutdown_message = self.session.msg(u'shutdown_reply',
497 self._shutdown_message = self.session.msg(u'shutdown_reply',
478 content, parent
498 content, parent
479 )
499 )
480
500
481 self._at_shutdown()
501 self._at_shutdown()
482 # call sys.exit after a short delay
502 # call sys.exit after a short delay
483 ioloop.IOLoop.instance().add_timeout(time.time()+0.1, lambda : sys.exit(0))
503 ioloop.IOLoop.instance().add_timeout(time.time()+0.1, lambda : sys.exit(0))
484
504
485 #---------------------------------------------------------------------------
505 #---------------------------------------------------------------------------
486 # Engine methods
506 # Engine methods
487 #---------------------------------------------------------------------------
507 #---------------------------------------------------------------------------
488
508
489 def apply_request(self, stream, ident, parent):
509 def apply_request(self, stream, ident, parent):
490 try:
510 try:
491 content = parent[u'content']
511 content = parent[u'content']
492 bufs = parent[u'buffers']
512 bufs = parent[u'buffers']
493 msg_id = parent['header']['msg_id']
513 msg_id = parent['header']['msg_id']
494 except:
514 except:
495 self.log.error("Got bad msg: %s", parent, exc_info=True)
515 self.log.error("Got bad msg: %s", parent, exc_info=True)
496 return
516 return
497 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
517 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
498 # self.iopub_socket.send(pyin_msg)
518 # self.iopub_socket.send(pyin_msg)
499 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
519 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
500 sub = {'dependencies_met' : True, 'engine' : self.ident,
520 sub = self._make_subheader()
501 'started': datetime.now()}
502 try:
521 try:
503 # allow for not overriding displayhook
522 # allow for not overriding displayhook
504 if hasattr(sys.displayhook, 'set_parent'):
523 if hasattr(sys.displayhook, 'set_parent'):
505 sys.displayhook.set_parent(parent)
524 sys.displayhook.set_parent(parent)
506 sys.stdout.set_parent(parent)
525 sys.stdout.set_parent(parent)
507 sys.stderr.set_parent(parent)
526 sys.stderr.set_parent(parent)
508 working = self.shell.user_ns
527 working = self.shell.user_ns
509
528
510 prefix = "_"+str(msg_id).replace("-","")+"_"
529 prefix = "_"+str(msg_id).replace("-","")+"_"
511
530
512 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
531 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
513
532
514 fname = getattr(f, '__name__', 'f')
533 fname = getattr(f, '__name__', 'f')
515
534
516 fname = prefix+"f"
535 fname = prefix+"f"
517 argname = prefix+"args"
536 argname = prefix+"args"
518 kwargname = prefix+"kwargs"
537 kwargname = prefix+"kwargs"
519 resultname = prefix+"result"
538 resultname = prefix+"result"
520
539
521 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
540 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
522 # print ns
541 # print ns
523 working.update(ns)
542 working.update(ns)
524 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
543 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
525 try:
544 try:
526 exec code in self.shell.user_global_ns, self.shell.user_ns
545 exec code in self.shell.user_global_ns, self.shell.user_ns
527 result = working.get(resultname)
546 result = working.get(resultname)
528 finally:
547 finally:
529 for key in ns.iterkeys():
548 for key in ns.iterkeys():
530 working.pop(key)
549 working.pop(key)
531
550
532 packed_result,buf = serialize_object(result)
551 packed_result,buf = serialize_object(result)
533 result_buf = [packed_result]+buf
552 result_buf = [packed_result]+buf
534 except:
553 except:
535 exc_content = self._wrap_exception('apply')
554 exc_content = self._wrap_exception('apply')
536 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
555 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
537 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
556 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
538 ident=self._topic('pyerr'))
557 ident=self._topic('pyerr'))
539 reply_content = exc_content
558 reply_content = exc_content
540 result_buf = []
559 result_buf = []
541
560
542 if exc_content['ename'] == 'UnmetDependency':
561 if exc_content['ename'] == 'UnmetDependency':
543 sub['dependencies_met'] = False
562 sub['dependencies_met'] = False
544 else:
563 else:
545 reply_content = {'status' : 'ok'}
564 reply_content = {'status' : 'ok'}
546
565
547 # put 'ok'/'error' status in header, for scheduler introspection:
566 # put 'ok'/'error' status in header, for scheduler introspection:
548 sub['status'] = reply_content['status']
567 sub['status'] = reply_content['status']
549
568
550 # flush i/o
569 # flush i/o
551 sys.stdout.flush()
570 sys.stdout.flush()
552 sys.stderr.flush()
571 sys.stderr.flush()
553
572
554 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
573 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
555 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
574 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
556
575
557 #---------------------------------------------------------------------------
576 #---------------------------------------------------------------------------
558 # Control messages
577 # Control messages
559 #---------------------------------------------------------------------------
578 #---------------------------------------------------------------------------
560
579
561 def abort_request(self, stream, ident, parent):
580 def abort_request(self, stream, ident, parent):
562 """abort a specifig msg by id"""
581 """abort a specifig msg by id"""
563 msg_ids = parent['content'].get('msg_ids', None)
582 msg_ids = parent['content'].get('msg_ids', None)
564 if isinstance(msg_ids, basestring):
583 if isinstance(msg_ids, basestring):
565 msg_ids = [msg_ids]
584 msg_ids = [msg_ids]
566 if not msg_ids:
585 if not msg_ids:
567 self.abort_queues()
586 self.abort_queues()
568 for mid in msg_ids:
587 for mid in msg_ids:
569 self.aborted.add(str(mid))
588 self.aborted.add(str(mid))
570
589
571 content = dict(status='ok')
590 content = dict(status='ok')
572 reply_msg = self.session.send(stream, 'abort_reply', content=content,
591 reply_msg = self.session.send(stream, 'abort_reply', content=content,
573 parent=parent, ident=ident)
592 parent=parent, ident=ident)
574 self.log.debug("%s", reply_msg)
593 self.log.debug("%s", reply_msg)
575
594
576 def clear_request(self, stream, idents, parent):
595 def clear_request(self, stream, idents, parent):
577 """Clear our namespace."""
596 """Clear our namespace."""
578 self.shell.reset(False)
597 self.shell.reset(False)
579 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
598 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
580 content = dict(status='ok'))
599 content = dict(status='ok'))
581
600
582
601
583 #---------------------------------------------------------------------------
602 #---------------------------------------------------------------------------
584 # Protected interface
603 # Protected interface
585 #---------------------------------------------------------------------------
604 #---------------------------------------------------------------------------
586
605
587
606
588 def _wrap_exception(self, method=None):
607 def _wrap_exception(self, method=None):
589 # import here, because _wrap_exception is only used in parallel,
608 # import here, because _wrap_exception is only used in parallel,
590 # and parallel has higher min pyzmq version
609 # and parallel has higher min pyzmq version
591 from IPython.parallel.error import wrap_exception
610 from IPython.parallel.error import wrap_exception
592 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
611 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
593 content = wrap_exception(e_info)
612 content = wrap_exception(e_info)
594 return content
613 return content
595
614
596 def _topic(self, topic):
615 def _topic(self, topic):
597 """prefixed topic for IOPub messages"""
616 """prefixed topic for IOPub messages"""
598 if self.int_id >= 0:
617 if self.int_id >= 0:
599 base = "engine.%i" % self.int_id
618 base = "engine.%i" % self.int_id
600 else:
619 else:
601 base = "kernel.%s" % self.ident
620 base = "kernel.%s" % self.ident
602
621
603 return py3compat.cast_bytes("%s.%s" % (base, topic))
622 return py3compat.cast_bytes("%s.%s" % (base, topic))
604
623
605 def _abort_queues(self):
624 def _abort_queues(self):
606 for stream in self.shell_streams:
625 for stream in self.shell_streams:
607 if stream:
626 if stream:
608 self._abort_queue(stream)
627 self._abort_queue(stream)
609
628
610 def _abort_queue(self, stream):
629 def _abort_queue(self, stream):
611 poller = zmq.Poller()
630 poller = zmq.Poller()
612 poller.register(stream.socket, zmq.POLLIN)
631 poller.register(stream.socket, zmq.POLLIN)
613 while True:
632 while True:
614 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
633 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
615 if msg is None:
634 if msg is None:
616 return
635 return
617
636
618 self.log.info("Aborting:")
637 self.log.info("Aborting:")
619 self.log.info("%s", msg)
638 self.log.info("%s", msg)
620 msg_type = msg['header']['msg_type']
639 msg_type = msg['header']['msg_type']
621 reply_type = msg_type.split('_')[0] + '_reply'
640 reply_type = msg_type.split('_')[0] + '_reply'
622 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
641
623 # self.reply_stream.send(ident,zmq.SNDMORE)
642 status = {'status' : 'aborted'}
624 # self.reply_stream.send_json(reply_msg)
643 sub = {'engine' : self.ident}
625 reply_msg = self.session.send(stream, reply_type,
644 sub.update(status)
626 content={'status' : 'aborted'}, parent=msg, ident=idents)
645 reply_msg = self.session.send(stream, reply_type, subheader=sub,
646 content=status, parent=msg, ident=idents)
627 self.log.debug("%s", reply_msg)
647 self.log.debug("%s", reply_msg)
628 # We need to wait a bit for requests to come in. This can probably
648 # We need to wait a bit for requests to come in. This can probably
629 # be set shorter for true asynchronous clients.
649 # be set shorter for true asynchronous clients.
630 poller.poll(50)
650 poller.poll(50)
631
651
632
652
633 def _no_raw_input(self):
653 def _no_raw_input(self):
634 """Raise StdinNotImplentedError if active frontend doesn't support
654 """Raise StdinNotImplentedError if active frontend doesn't support
635 stdin."""
655 stdin."""
636 raise StdinNotImplementedError("raw_input was called, but this "
656 raise StdinNotImplementedError("raw_input was called, but this "
637 "frontend does not support stdin.")
657 "frontend does not support stdin.")
638
658
639 def _raw_input(self, prompt, ident, parent):
659 def _raw_input(self, prompt, ident, parent):
640 # Flush output before making the request.
660 # Flush output before making the request.
641 sys.stderr.flush()
661 sys.stderr.flush()
642 sys.stdout.flush()
662 sys.stdout.flush()
643
663
644 # Send the input request.
664 # Send the input request.
645 content = json_clean(dict(prompt=prompt))
665 content = json_clean(dict(prompt=prompt))
646 self.session.send(self.stdin_socket, u'input_request', content, parent,
666 self.session.send(self.stdin_socket, u'input_request', content, parent,
647 ident=ident)
667 ident=ident)
648
668
649 # Await a response.
669 # Await a response.
650 while True:
670 while True:
651 try:
671 try:
652 ident, reply = self.session.recv(self.stdin_socket, 0)
672 ident, reply = self.session.recv(self.stdin_socket, 0)
653 except Exception:
673 except Exception:
654 self.log.warn("Invalid Message:", exc_info=True)
674 self.log.warn("Invalid Message:", exc_info=True)
655 else:
675 else:
656 break
676 break
657 try:
677 try:
658 value = reply['content']['value']
678 value = reply['content']['value']
659 except:
679 except:
660 self.log.error("Got bad raw_input reply: ")
680 self.log.error("Got bad raw_input reply: ")
661 self.log.error("%s", parent)
681 self.log.error("%s", parent)
662 value = ''
682 value = ''
663 if value == '\x04':
683 if value == '\x04':
664 # EOF
684 # EOF
665 raise EOFError
685 raise EOFError
666 return value
686 return value
667
687
668 def _complete(self, msg):
688 def _complete(self, msg):
669 c = msg['content']
689 c = msg['content']
670 try:
690 try:
671 cpos = int(c['cursor_pos'])
691 cpos = int(c['cursor_pos'])
672 except:
692 except:
673 # If we don't get something that we can convert to an integer, at
693 # If we don't get something that we can convert to an integer, at
674 # least attempt the completion guessing the cursor is at the end of
694 # least attempt the completion guessing the cursor is at the end of
675 # the text, if there's any, and otherwise of the line
695 # the text, if there's any, and otherwise of the line
676 cpos = len(c['text'])
696 cpos = len(c['text'])
677 if cpos==0:
697 if cpos==0:
678 cpos = len(c['line'])
698 cpos = len(c['line'])
679 return self.shell.complete(c['text'], c['line'], cpos)
699 return self.shell.complete(c['text'], c['line'], cpos)
680
700
681 def _object_info(self, context):
701 def _object_info(self, context):
682 symbol, leftover = self._symbol_from_context(context)
702 symbol, leftover = self._symbol_from_context(context)
683 if symbol is not None and not leftover:
703 if symbol is not None and not leftover:
684 doc = getattr(symbol, '__doc__', '')
704 doc = getattr(symbol, '__doc__', '')
685 else:
705 else:
686 doc = ''
706 doc = ''
687 object_info = dict(docstring = doc)
707 object_info = dict(docstring = doc)
688 return object_info
708 return object_info
689
709
690 def _symbol_from_context(self, context):
710 def _symbol_from_context(self, context):
691 if not context:
711 if not context:
692 return None, context
712 return None, context
693
713
694 base_symbol_string = context[0]
714 base_symbol_string = context[0]
695 symbol = self.shell.user_ns.get(base_symbol_string, None)
715 symbol = self.shell.user_ns.get(base_symbol_string, None)
696 if symbol is None:
716 if symbol is None:
697 symbol = __builtin__.__dict__.get(base_symbol_string, None)
717 symbol = __builtin__.__dict__.get(base_symbol_string, None)
698 if symbol is None:
718 if symbol is None:
699 return None, context
719 return None, context
700
720
701 context = context[1:]
721 context = context[1:]
702 for i, name in enumerate(context):
722 for i, name in enumerate(context):
703 new_symbol = getattr(symbol, name, None)
723 new_symbol = getattr(symbol, name, None)
704 if new_symbol is None:
724 if new_symbol is None:
705 return symbol, context[i:]
725 return symbol, context[i:]
706 else:
726 else:
707 symbol = new_symbol
727 symbol = new_symbol
708
728
709 return symbol, []
729 return symbol, []
710
730
711 def _at_shutdown(self):
731 def _at_shutdown(self):
712 """Actions taken at shutdown by the kernel, called by python's atexit.
732 """Actions taken at shutdown by the kernel, called by python's atexit.
713 """
733 """
714 # io.rprint("Kernel at_shutdown") # dbg
734 # io.rprint("Kernel at_shutdown") # dbg
715 if self._shutdown_message is not None:
735 if self._shutdown_message is not None:
716 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
736 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
717 self.log.debug("%s", self._shutdown_message)
737 self.log.debug("%s", self._shutdown_message)
718 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
738 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
719
739
720 #-----------------------------------------------------------------------------
740 #-----------------------------------------------------------------------------
721 # Aliases and Flags for the IPKernelApp
741 # Aliases and Flags for the IPKernelApp
722 #-----------------------------------------------------------------------------
742 #-----------------------------------------------------------------------------
723
743
724 flags = dict(kernel_flags)
744 flags = dict(kernel_flags)
725 flags.update(shell_flags)
745 flags.update(shell_flags)
726
746
727 addflag = lambda *args: flags.update(boolean_flag(*args))
747 addflag = lambda *args: flags.update(boolean_flag(*args))
728
748
729 flags['pylab'] = (
749 flags['pylab'] = (
730 {'IPKernelApp' : {'pylab' : 'auto'}},
750 {'IPKernelApp' : {'pylab' : 'auto'}},
731 """Pre-load matplotlib and numpy for interactive use with
751 """Pre-load matplotlib and numpy for interactive use with
732 the default matplotlib backend."""
752 the default matplotlib backend."""
733 )
753 )
734
754
735 aliases = dict(kernel_aliases)
755 aliases = dict(kernel_aliases)
736 aliases.update(shell_aliases)
756 aliases.update(shell_aliases)
737
757
738 # it's possible we don't want short aliases for *all* of these:
758 # it's possible we don't want short aliases for *all* of these:
739 aliases.update(dict(
759 aliases.update(dict(
740 pylab='IPKernelApp.pylab',
760 pylab='IPKernelApp.pylab',
741 ))
761 ))
742
762
743 #-----------------------------------------------------------------------------
763 #-----------------------------------------------------------------------------
744 # The IPKernelApp class
764 # The IPKernelApp class
745 #-----------------------------------------------------------------------------
765 #-----------------------------------------------------------------------------
746
766
747 class IPKernelApp(KernelApp, InteractiveShellApp):
767 class IPKernelApp(KernelApp, InteractiveShellApp):
748 name = 'ipkernel'
768 name = 'ipkernel'
749
769
750 aliases = Dict(aliases)
770 aliases = Dict(aliases)
751 flags = Dict(flags)
771 flags = Dict(flags)
752 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
772 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
753
773
754 # configurables
774 # configurables
755 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
775 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
756 config=True,
776 config=True,
757 help="""Pre-load matplotlib and numpy for interactive use,
777 help="""Pre-load matplotlib and numpy for interactive use,
758 selecting a particular matplotlib backend and loop integration.
778 selecting a particular matplotlib backend and loop integration.
759 """
779 """
760 )
780 )
761
781
762 @catch_config_error
782 @catch_config_error
763 def initialize(self, argv=None):
783 def initialize(self, argv=None):
764 super(IPKernelApp, self).initialize(argv)
784 super(IPKernelApp, self).initialize(argv)
765 self.init_path()
785 self.init_path()
766 self.init_shell()
786 self.init_shell()
767 self.init_extensions()
787 self.init_extensions()
768 self.init_code()
788 self.init_code()
769
789
770 def init_kernel(self):
790 def init_kernel(self):
771
791
772 shell_stream = ZMQStream(self.shell_socket)
792 shell_stream = ZMQStream(self.shell_socket)
773
793
774 kernel = Kernel(config=self.config, session=self.session,
794 kernel = Kernel(config=self.config, session=self.session,
775 shell_streams=[shell_stream],
795 shell_streams=[shell_stream],
776 iopub_socket=self.iopub_socket,
796 iopub_socket=self.iopub_socket,
777 stdin_socket=self.stdin_socket,
797 stdin_socket=self.stdin_socket,
778 log=self.log,
798 log=self.log,
779 profile_dir=self.profile_dir,
799 profile_dir=self.profile_dir,
780 )
800 )
781 self.kernel = kernel
801 self.kernel = kernel
782 kernel.record_ports(self.ports)
802 kernel.record_ports(self.ports)
783 shell = kernel.shell
803 shell = kernel.shell
784 if self.pylab:
804 if self.pylab:
785 try:
805 try:
786 gui, backend = pylabtools.find_gui_and_backend(self.pylab)
806 gui, backend = pylabtools.find_gui_and_backend(self.pylab)
787 shell.enable_pylab(gui, import_all=self.pylab_import_all)
807 shell.enable_pylab(gui, import_all=self.pylab_import_all)
788 except Exception:
808 except Exception:
789 self.log.error("Pylab initialization failed", exc_info=True)
809 self.log.error("Pylab initialization failed", exc_info=True)
790 # print exception straight to stdout, because normally
810 # print exception straight to stdout, because normally
791 # _showtraceback associates the reply with an execution,
811 # _showtraceback associates the reply with an execution,
792 # which means frontends will never draw it, as this exception
812 # which means frontends will never draw it, as this exception
793 # is not associated with any execute request.
813 # is not associated with any execute request.
794
814
795 # replace pyerr-sending traceback with stdout
815 # replace pyerr-sending traceback with stdout
796 _showtraceback = shell._showtraceback
816 _showtraceback = shell._showtraceback
797 def print_tb(etype, evalue, stb):
817 def print_tb(etype, evalue, stb):
798 print ("Error initializing pylab, pylab mode will not "
818 print ("Error initializing pylab, pylab mode will not "
799 "be active", file=io.stderr)
819 "be active", file=io.stderr)
800 print (shell.InteractiveTB.stb2text(stb), file=io.stdout)
820 print (shell.InteractiveTB.stb2text(stb), file=io.stdout)
801 shell._showtraceback = print_tb
821 shell._showtraceback = print_tb
802
822
803 # send the traceback over stdout
823 # send the traceback over stdout
804 shell.showtraceback(tb_offset=0)
824 shell.showtraceback(tb_offset=0)
805
825
806 # restore proper _showtraceback method
826 # restore proper _showtraceback method
807 shell._showtraceback = _showtraceback
827 shell._showtraceback = _showtraceback
808
828
809
829
810 def init_shell(self):
830 def init_shell(self):
811 self.shell = self.kernel.shell
831 self.shell = self.kernel.shell
812 self.shell.configurables.append(self)
832 self.shell.configurables.append(self)
813
833
814
834
815 #-----------------------------------------------------------------------------
835 #-----------------------------------------------------------------------------
816 # Kernel main and launch functions
836 # Kernel main and launch functions
817 #-----------------------------------------------------------------------------
837 #-----------------------------------------------------------------------------
818
838
819 def launch_kernel(*args, **kwargs):
839 def launch_kernel(*args, **kwargs):
820 """Launches a localhost IPython kernel, binding to the specified ports.
840 """Launches a localhost IPython kernel, binding to the specified ports.
821
841
822 This function simply calls entry_point.base_launch_kernel with the right
842 This function simply calls entry_point.base_launch_kernel with the right
823 first command to start an ipkernel. See base_launch_kernel for arguments.
843 first command to start an ipkernel. See base_launch_kernel for arguments.
824
844
825 Returns
845 Returns
826 -------
846 -------
827 A tuple of form:
847 A tuple of form:
828 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
848 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
829 where kernel_process is a Popen object and the ports are integers.
849 where kernel_process is a Popen object and the ports are integers.
830 """
850 """
831 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
851 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
832 *args, **kwargs)
852 *args, **kwargs)
833
853
834
854
835 def embed_kernel(module=None, local_ns=None, **kwargs):
855 def embed_kernel(module=None, local_ns=None, **kwargs):
836 """Embed and start an IPython kernel in a given scope.
856 """Embed and start an IPython kernel in a given scope.
837
857
838 Parameters
858 Parameters
839 ----------
859 ----------
840 module : ModuleType, optional
860 module : ModuleType, optional
841 The module to load into IPython globals (default: caller)
861 The module to load into IPython globals (default: caller)
842 local_ns : dict, optional
862 local_ns : dict, optional
843 The namespace to load into IPython user namespace (default: caller)
863 The namespace to load into IPython user namespace (default: caller)
844
864
845 kwargs : various, optional
865 kwargs : various, optional
846 Further keyword args are relayed to the KernelApp constructor,
866 Further keyword args are relayed to the KernelApp constructor,
847 allowing configuration of the Kernel. Will only have an effect
867 allowing configuration of the Kernel. Will only have an effect
848 on the first embed_kernel call for a given process.
868 on the first embed_kernel call for a given process.
849
869
850 """
870 """
851 # get the app if it exists, or set it up if it doesn't
871 # get the app if it exists, or set it up if it doesn't
852 if IPKernelApp.initialized():
872 if IPKernelApp.initialized():
853 app = IPKernelApp.instance()
873 app = IPKernelApp.instance()
854 else:
874 else:
855 app = IPKernelApp.instance(**kwargs)
875 app = IPKernelApp.instance(**kwargs)
856 app.initialize([])
876 app.initialize([])
857
877
858 # load the calling scope if not given
878 # load the calling scope if not given
859 (caller_module, caller_locals) = extract_module_locals(1)
879 (caller_module, caller_locals) = extract_module_locals(1)
860 if module is None:
880 if module is None:
861 module = caller_module
881 module = caller_module
862 if local_ns is None:
882 if local_ns is None:
863 local_ns = caller_locals
883 local_ns = caller_locals
864
884
865 app.kernel.user_module = module
885 app.kernel.user_module = module
866 app.kernel.user_ns = local_ns
886 app.kernel.user_ns = local_ns
867 app.start()
887 app.start()
868
888
869 def main():
889 def main():
870 """Run an IPKernel as an application"""
890 """Run an IPKernel as an application"""
871 app = IPKernelApp.instance()
891 app = IPKernelApp.instance()
872 app.initialize()
892 app.initialize()
873 app.start()
893 app.start()
874
894
875
895
876 if __name__ == '__main__':
896 if __name__ == '__main__':
877 main()
897 main()
General Comments 0
You need to be logged in to leave comments. Login now