##// END OF EJS Templates
Parallel kernel/engine startup looks a bit more like pykernel
MinRK -
Show More
@@ -1,148 +1,132 b''
1 1 #!/usr/bin/env python
2 2 """A simple engine that talks to a controller over 0MQ.
3 3 it handles registration, etc. and launches a kernel
4 4 connected to the Controller's queue(s).
5 5 """
6 6 from __future__ import print_function
7 7 import sys
8 8 import time
9 9 import traceback
10 10 import uuid
11 11 from pprint import pprint
12 12
13 13 import zmq
14 14 from zmq.eventloop import ioloop, zmqstream
15 15
16 from IPython.utils.traitlets import HasTraits
17 from IPython.utils.localinterfaces import LOCALHOST
18
16 19 from streamsession import Message, StreamSession
17 20 from client import Client
18 import streamkernel as kernel
21 from streamkernel import Kernel, make_kernel
19 22 import heartmonitor
20 23 from entry_point import make_base_argument_parser, connect_logger, parse_url
21 24 # import taskthread
22 25 # from log import logger
23 26
24 27
25 28 def printer(*msg):
26 29 pprint(msg)
27 30
28 31 class Engine(object):
29 32 """IPython engine"""
30 33
31 34 id=None
32 35 context=None
33 36 loop=None
34 37 session=None
35 38 ident=None
36 39 registrar=None
37 40 heart=None
38 41 kernel=None
39 42
40 43 def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None):
41 44 self.context = context
42 45 self.loop = loop
43 46 self.session = session
44 47 self.registrar = registrar
45 48 self.client = client
46 49 self.ident = ident if ident else str(uuid.uuid4())
47 50 self.registrar.on_send(printer)
48 51
49 52 def register(self):
50 53
51 54 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
52 55 self.registrar.on_recv(self.complete_registration)
53 56 self.session.send(self.registrar, "registration_request",content=content)
54 57
55 58 def complete_registration(self, msg):
56 59 # print msg
57 60 idents,msg = self.session.feed_identities(msg)
58 61 msg = Message(self.session.unpack_message(msg))
59 62 if msg.content.status == 'ok':
60 63 self.session.username = str(msg.content.id)
61 64 queue_addr = msg.content.queue
62 if queue_addr:
63 queue = self.context.socket(zmq.PAIR)
64 queue.setsockopt(zmq.IDENTITY, self.ident)
65 queue.connect(str(queue_addr))
66 self.queue = zmqstream.ZMQStream(queue, self.loop)
67
68 control_addr = msg.content.control
69 if control_addr:
70 control = self.context.socket(zmq.PAIR)
71 control.setsockopt(zmq.IDENTITY, self.ident)
72 control.connect(str(control_addr))
73 self.control = zmqstream.ZMQStream(control, self.loop)
74
65 shell_addrs = [str(queue_addr)]
66 control_addr = str(msg.content.control)
75 67 task_addr = msg.content.task
76 print (task_addr)
77 68 if task_addr:
78 # task as stream:
79 task = self.context.socket(zmq.PAIR)
80 task.setsockopt(zmq.IDENTITY, self.ident)
81 task.connect(str(task_addr))
82 self.task_stream = zmqstream.ZMQStream(task, self.loop)
83 # TaskThread:
84 # mon_addr = msg.content.monitor
85 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
86 # task.connect_in(str(task_addr))
87 # task.connect_out(str(mon_addr))
88 # self.task_stream = taskthread.QueueStream(*task.queues)
89 # task.start()
90
91 hbs = msg.content.heartbeat
92 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
93 self.heart.start()
69 shell_addrs.append(str(task_addr))
70
71 hb_addrs = msg.content.heartbeat
94 72 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
95 # placeholder for now:
96 pub = self.context.socket(zmq.PUB)
97 pub = zmqstream.ZMQStream(pub, self.loop)
98 # create and start the kernel
99 self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
100 self.kernel.start()
73
74 # placeholder for no, since pub isn't hooked up:
75 sub = self.context.socket(zmq.SUB)
76 sub = zmqstream.ZMQStream(sub, self.loop)
77 sub.on_recv(lambda *a: None)
78 port = sub.bind_to_random_port("tcp://%s"%LOCALHOST)
79 iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345)
80
81 make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs,
82 client_addr=None, loop=self.loop, context=self.context)
83
101 84 else:
102 85 # logger.error("Registration Failed: %s"%msg)
103 86 raise Exception("Registration Failed: %s"%msg)
104 87
105 88 # logger.info("engine::completed registration with id %s"%self.session.username)
106 89
107 90 print (msg)
108 91
109 92 def unregister(self):
110 93 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
111 94 time.sleep(1)
112 95 sys.exit(0)
113 96
114 97 def start(self):
115 98 print ("registering")
116 99 self.register()
117 100
118 101
102
119 103 def main():
120 104
121 105 parser = make_base_argument_parser()
122 106
123 107 args = parser.parse_args()
124 108
125 109 parse_url(args)
126 110
127 111 iface="%s://%s"%(args.transport,args.ip)+':%i'
128 112
129 113 loop = ioloop.IOLoop.instance()
130 114 session = StreamSession()
131 115 ctx = zmq.Context()
132 116
133 117 # setup logging
134 118 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
135 119
136 120 reg_conn = iface % args.regport
137 121 print (reg_conn)
138 122 print ("Starting the engine...", file=sys.__stderr__)
139 123
140 124 reg = ctx.socket(zmq.PAIR)
141 125 reg.connect(reg_conn)
142 126 reg = zmqstream.ZMQStream(reg, loop)
143 127 client = Client(reg_conn)
144 128
145 129 e = Engine(ctx, loop, session, reg, client, args.ident)
146 130 dc = ioloop.DelayedCallback(e.start, 100, loop)
147 131 dc.start()
148 132 loop.start() No newline at end of file
@@ -1,505 +1,413 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 Kernel adapted from kernel.py to use ZMQ Streams
4 4 """
5 5
6 #-----------------------------------------------------------------------------
7 # Imports
8 #-----------------------------------------------------------------------------
9
10 # Standard library imports.
6 11 from __future__ import print_function
7 12 import __builtin__
13 from code import CommandCompiler
8 14 import os
9 15 import sys
10 16 import time
11 17 import traceback
12 18 from signal import SIGTERM, SIGKILL
13 19 from pprint import pprint
14 20
15 from code import CommandCompiler
16
21 # System library imports.
17 22 import zmq
18 23 from zmq.eventloop import ioloop, zmqstream
19 24
25 # Local imports.
26 from IPython.utils.traitlets import HasTraits, Instance, List
20 27 from IPython.zmq.completer import KernelCompleter
21 28
22 29 from streamsession import StreamSession, Message, extract_header, serialize_object,\
23 30 unpack_apply_message
24 31 from dependency import UnmetDependency
32 import heartmonitor
33 from client import Client
25 34
26 35 def printer(*args):
27 36 pprint(args)
28 37
29 class OutStream(object):
30 """A file like object that publishes the stream to a 0MQ PUB socket."""
31
32 def __init__(self, session, pub_socket, name, max_buffer=200):
33 self.session = session
34 self.pub_socket = pub_socket
35 self.name = name
36 self._buffer = []
37 self._buffer_len = 0
38 self.max_buffer = max_buffer
39 self.parent_header = {}
40
41 def set_parent(self, parent):
42 self.parent_header = extract_header(parent)
43
44 def close(self):
45 self.pub_socket = None
46
47 def flush(self):
48 if self.pub_socket is None:
49 raise ValueError(u'I/O operation on closed file')
50 else:
51 if self._buffer:
52 data = ''.join(self._buffer)
53 content = {u'name':self.name, u'data':data}
54 # msg = self.session.msg(u'stream', content=content,
55 # parent=self.parent_header)
56 msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header)
57 # print>>sys.__stdout__, Message(msg)
58 # self.pub_socket.send_json(msg)
59 self._buffer_len = 0
60 self._buffer = []
61
62 def isattr(self):
63 return False
64
65 def next(self):
66 raise IOError('Read not supported on a write only stream.')
67
68 def read(self, size=None):
69 raise IOError('Read not supported on a write only stream.')
70
71 readline=read
72
73 def write(self, s):
74 if self.pub_socket is None:
75 raise ValueError('I/O operation on closed file')
76 else:
77 self._buffer.append(s)
78 self._buffer_len += len(s)
79 self._maybe_send()
80
81 def _maybe_send(self):
82 if '\n' in self._buffer[-1]:
83 self.flush()
84 if self._buffer_len > self.max_buffer:
85 self.flush()
86
87 def writelines(self, sequence):
88 if self.pub_socket is None:
89 raise ValueError('I/O operation on closed file')
90 else:
91 for s in sequence:
92 self.write(s)
93
94
95 class DisplayHook(object):
96
97 def __init__(self, session, pub_socket):
98 self.session = session
99 self.pub_socket = pub_socket
100 self.parent_header = {}
101
102 def __call__(self, obj):
103 if obj is None:
104 return
105
106 __builtin__._ = obj
107 # msg = self.session.msg(u'pyout', {u'data':repr(obj)},
108 # parent=self.parent_header)
109 # self.pub_socket.send_json(msg)
110 self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header)
111
112 def set_parent(self, parent):
113 self.parent_header = extract_header(parent)
38 #-----------------------------------------------------------------------------
39 # Main kernel class
40 #-----------------------------------------------------------------------------
114 41
42 class Kernel(HasTraits):
115 43
116 class RawInput(object):
44 #---------------------------------------------------------------------------
45 # Kernel interface
46 #---------------------------------------------------------------------------
117 47
118 def __init__(self, session, socket):
119 self.session = session
120 self.socket = socket
48 session = Instance(StreamSession)
49 shell_streams = Instance(list)
50 control_stream = Instance(zmqstream.ZMQStream)
51 task_stream = Instance(zmqstream.ZMQStream)
52 iopub_stream = Instance(zmqstream.ZMQStream)
53 client = Instance(Client)
121 54
122 def __call__(self, prompt=None):
123 msg = self.session.msg(u'raw_input')
124 self.socket.send_json(msg)
125 while True:
126 try:
127 reply = self.socket.recv_json(zmq.NOBLOCK)
128 except zmq.ZMQError as e:
129 if e.errno == zmq.EAGAIN:
130 pass
131 else:
132 raise
133 else:
134 break
135 return reply[u'content'][u'data']
136
137
138 class Kernel(object):
139
140 def __init__(self, session, control_stream, reply_stream, pub_stream,
141 task_stream=None, client=None):
142 self.session = session
143 self.control_stream = control_stream
144 # self.control_socket = control_stream.socket
145 self.reply_stream = reply_stream
146 self.identity = self.reply_stream.getsockopt(zmq.IDENTITY)
147 self.task_stream = task_stream
148 self.pub_stream = pub_stream
149 self.client = client
55 def __init__(self, **kwargs):
56 super(Kernel, self).__init__(**kwargs)
57 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
150 58 self.user_ns = {}
151 59 self.history = []
152 60 self.compiler = CommandCompiler()
153 61 self.completer = KernelCompleter(self.user_ns)
154 62 self.aborted = set()
155 63
156 64 # Build dict of handlers for message types
157 self.queue_handlers = {}
65 self.shell_handlers = {}
158 66 self.control_handlers = {}
159 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
160 self.queue_handlers[msg_type] = getattr(self, msg_type)
67 for msg_type in ['execute_request', 'complete_request', 'apply_request',
68 'clear_request']:
69 self.shell_handlers[msg_type] = getattr(self, msg_type)
161 70
162 for msg_type in ['kill_request', 'abort_request', 'clear_request']+self.queue_handlers.keys():
71 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
163 72 self.control_handlers[msg_type] = getattr(self, msg_type)
164 73
165 74 #-------------------- control handlers -----------------------------
166 75 def abort_queues(self):
167 for stream in (self.task_stream, self.reply_stream):
76 for stream in self.shell_streams:
168 77 if stream:
169 78 self.abort_queue(stream)
170 79
171 80 def abort_queue(self, stream):
172 81 while True:
173 82 try:
174 83 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
175 84 except zmq.ZMQError as e:
176 85 if e.errno == zmq.EAGAIN:
177 86 break
178 87 else:
179 88 return
180 89 else:
181 90 if msg is None:
182 91 return
183 92 else:
184 93 idents,msg = msg
185 94
186 95 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
187 96 # msg = self.reply_socket.recv_json()
188 97 print ("Aborting:", file=sys.__stdout__)
189 98 print (Message(msg), file=sys.__stdout__)
190 99 msg_type = msg['msg_type']
191 100 reply_type = msg_type.split('_')[0] + '_reply'
192 101 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
193 102 # self.reply_socket.send(ident,zmq.SNDMORE)
194 103 # self.reply_socket.send_json(reply_msg)
195 104 reply_msg = self.session.send(stream, reply_type,
196 105 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
197 106 print(Message(reply_msg), file=sys.__stdout__)
198 107 # We need to wait a bit for requests to come in. This can probably
199 108 # be set shorter for true asynchronous clients.
200 109 time.sleep(0.05)
201 110
202 111 def abort_request(self, stream, ident, parent):
203 112 """abort a specifig msg by id"""
204 113 msg_ids = parent['content'].get('msg_ids', None)
205 114 if isinstance(msg_ids, basestring):
206 115 msg_ids = [msg_ids]
207 116 if not msg_ids:
208 117 self.abort_queues()
209 118 for mid in msg_ids:
210 119 self.aborted.add(str(mid))
211 120
212 121 content = dict(status='ok')
213 122 reply_msg = self.session.send(stream, 'abort_reply', content=content,
214 123 parent=parent, ident=ident)[0]
215 124 print(Message(reply_msg), file=sys.__stdout__)
216 125
217 def kill_request(self, stream, idents, parent):
126 def shutdown_request(self, stream, ident, parent):
218 127 """kill ourself. This should really be handled in an external process"""
219 128 self.abort_queues()
220 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
221 content = dict(status='ok'))
222 # we can know that a message is done if we *don't* use streams, but
223 # use a socket directly with MessageTracker
224 time.sleep(.5)
225 os.kill(os.getpid(), SIGTERM)
226 time.sleep(1)
227 os.kill(os.getpid(), SIGKILL)
228
229 def clear_request(self, stream, idents, parent):
230 """Clear our namespace."""
231 self.user_ns = {}
232 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
233 content = dict(status='ok'))
129 content = dict(parent['content'])
130 msg = self.session.send(self.reply_socket, 'shutdown_reply',
131 content, parent, ident)
132 msg = self.session.send(self.pub_socket, 'shutdown_reply',
133 content, parent, ident)
134 # print >> sys.__stdout__, msg
135 time.sleep(0.1)
136 sys.exit(0)
234 137
235 138 def dispatch_control(self, msg):
236 139 idents,msg = self.session.feed_identities(msg, copy=False)
237 140 msg = self.session.unpack_message(msg, content=True, copy=False)
238 141
239 142 header = msg['header']
240 143 msg_id = header['msg_id']
241 144
242 145 handler = self.control_handlers.get(msg['msg_type'], None)
243 146 if handler is None:
244 147 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
245 148 else:
246 149 handler(self.control_stream, idents, msg)
247 150
248 151
249 152 #-------------------- queue helpers ------------------------------
250 153
251 154 def check_dependencies(self, dependencies):
252 155 if not dependencies:
253 156 return True
254 157 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
255 158 anyorall = dependencies[0]
256 159 dependencies = dependencies[1]
257 160 else:
258 161 anyorall = 'all'
259 162 results = self.client.get_results(dependencies,status_only=True)
260 163 if results['status'] != 'ok':
261 164 return False
262 165
263 166 if anyorall == 'any':
264 167 if not results['completed']:
265 168 return False
266 169 else:
267 170 if results['pending']:
268 171 return False
269 172
270 173 return True
271 174
272 175 def check_aborted(self, msg_id):
273 176 return msg_id in self.aborted
274 177
275 178 #-------------------- queue handlers -----------------------------
276 179
180 def clear_request(self, stream, idents, parent):
181 """Clear our namespace."""
182 self.user_ns = {}
183 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
184 content = dict(status='ok'))
185
277 186 def execute_request(self, stream, ident, parent):
278 187 try:
279 188 code = parent[u'content'][u'code']
280 189 except:
281 190 print("Got bad msg: ", file=sys.__stderr__)
282 191 print(Message(parent), file=sys.__stderr__)
283 192 return
284 193 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
285 # self.pub_stream.send(pyin_msg)
286 self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
194 # self.iopub_stream.send(pyin_msg)
195 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
287 196 try:
288 197 comp_code = self.compiler(code, '<zmq-kernel>')
289 198 # allow for not overriding displayhook
290 199 if hasattr(sys.displayhook, 'set_parent'):
291 200 sys.displayhook.set_parent(parent)
292 201 exec comp_code in self.user_ns, self.user_ns
293 202 except:
294 203 # result = u'error'
295 204 etype, evalue, tb = sys.exc_info()
296 205 tb = traceback.format_exception(etype, evalue, tb)
297 206 exc_content = {
298 207 u'status' : u'error',
299 208 u'traceback' : tb,
300 209 u'etype' : unicode(etype),
301 210 u'evalue' : unicode(evalue)
302 211 }
303 212 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
304 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
213 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
305 214 reply_content = exc_content
306 215 else:
307 216 reply_content = {'status' : 'ok'}
308 217 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
309 218 # self.reply_socket.send(ident, zmq.SNDMORE)
310 219 # self.reply_socket.send_json(reply_msg)
311 220 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
312 221 print(Message(reply_msg), file=sys.__stdout__)
313 222 if reply_msg['content']['status'] == u'error':
314 223 self.abort_queues()
315 224
316 225 def complete_request(self, stream, ident, parent):
317 226 matches = {'matches' : self.complete(parent),
318 227 'status' : 'ok'}
319 228 completion_msg = self.session.send(stream, 'complete_reply',
320 229 matches, parent, ident)
321 230 # print >> sys.__stdout__, completion_msg
322 231
323 232 def complete(self, msg):
324 233 return self.completer.complete(msg.content.line, msg.content.text)
325 234
326 235 def apply_request(self, stream, ident, parent):
327 236 print (parent)
328 237 try:
329 238 content = parent[u'content']
330 239 bufs = parent[u'buffers']
331 240 msg_id = parent['header']['msg_id']
332 241 bound = content.get('bound', False)
333 242 except:
334 243 print("Got bad msg: ", file=sys.__stderr__)
335 244 print(Message(parent), file=sys.__stderr__)
336 245 return
337 246 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
338 # self.pub_stream.send(pyin_msg)
339 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
247 # self.iopub_stream.send(pyin_msg)
248 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
340 249 sub = {'dependencies_met' : True, 'engine' : self.identity}
341 250 try:
342 251 # allow for not overriding displayhook
343 252 if hasattr(sys.displayhook, 'set_parent'):
344 253 sys.displayhook.set_parent(parent)
345 254 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
346 255 if bound:
347 256 working = self.user_ns
348 257 suffix = str(msg_id).replace("-","")
349 258 prefix = "_"
350 259
351 260 else:
352 261 working = dict()
353 262 suffix = prefix = "_" # prevent keyword collisions with lambda
354 263 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
355 264 # if f.fun
356 265 fname = prefix+f.func_name.strip('<>')+suffix
357 266 argname = prefix+"args"+suffix
358 267 kwargname = prefix+"kwargs"+suffix
359 268 resultname = prefix+"result"+suffix
360 269
361 270 ns = { fname : f, argname : args, kwargname : kwargs }
362 271 # print ns
363 272 working.update(ns)
364 273 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
365 274 exec code in working, working
366 275 result = working.get(resultname)
367 276 # clear the namespace
368 277 if bound:
369 278 for key in ns.iterkeys():
370 279 self.user_ns.pop(key)
371 280 else:
372 281 del working
373 282
374 283 packed_result,buf = serialize_object(result)
375 284 result_buf = [packed_result]+buf
376 285 except:
377 286 result = u'error'
378 287 etype, evalue, tb = sys.exc_info()
379 288 tb = traceback.format_exception(etype, evalue, tb)
380 289 exc_content = {
381 290 u'status' : u'error',
382 291 u'traceback' : tb,
383 292 u'etype' : unicode(etype),
384 293 u'evalue' : unicode(evalue)
385 294 }
386 295 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
387 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
296 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
388 297 reply_content = exc_content
389 298 result_buf = []
390 299
391 300 if etype is UnmetDependency:
392 301 sub['dependencies_met'] = False
393 302 else:
394 303 reply_content = {'status' : 'ok'}
395 304 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
396 305 # self.reply_socket.send(ident, zmq.SNDMORE)
397 306 # self.reply_socket.send_json(reply_msg)
398 307 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
399 308 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
400 309 print(Message(reply_msg), file=sys.__stdout__)
401 310 # if reply_msg['content']['status'] == u'error':
402 311 # self.abort_queues()
403 312
404 313 def dispatch_queue(self, stream, msg):
405 314 self.control_stream.flush()
406 315 idents,msg = self.session.feed_identities(msg, copy=False)
407 316 msg = self.session.unpack_message(msg, content=True, copy=False)
408 317
409 318 header = msg['header']
410 319 msg_id = header['msg_id']
411 320 if self.check_aborted(msg_id):
412 321 self.aborted.remove(msg_id)
413 322 # is it safe to assume a msg_id will not be resubmitted?
414 323 reply_type = msg['msg_type'].split('_')[0] + '_reply'
415 324 reply_msg = self.session.send(stream, reply_type,
416 325 content={'status' : 'aborted'}, parent=msg, ident=idents)
417 326 return
418 handler = self.queue_handlers.get(msg['msg_type'], None)
327 handler = self.shell_handlers.get(msg['msg_type'], None)
419 328 if handler is None:
420 329 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
421 330 else:
422 331 handler(stream, idents, msg)
423 332
424 333 def start(self):
425 334 #### stream mode:
426 335 if self.control_stream:
427 336 self.control_stream.on_recv(self.dispatch_control, copy=False)
428 337 self.control_stream.on_err(printer)
429 if self.reply_stream:
430 self.reply_stream.on_recv(lambda msg:
431 self.dispatch_queue(self.reply_stream, msg), copy=False)
432 self.reply_stream.on_err(printer)
433 if self.task_stream:
434 self.task_stream.on_recv(lambda msg:
435 self.dispatch_queue(self.task_stream, msg), copy=False)
436 self.task_stream.on_err(printer)
338
339 for s in self.shell_streams:
340 s.on_recv(lambda msg:
341 self.dispatch_queue(s, msg), copy=False)
342 s.on_err(printer)
343
344 if self.iopub_stream:
345 self.iopub_stream.on_err(printer)
346 self.iopub_stream.on_send(printer)
437 347
438 348 #### while True mode:
439 349 # while True:
440 350 # idle = True
441 351 # try:
442 # msg = self.reply_stream.socket.recv_multipart(
352 # msg = self.shell_stream.socket.recv_multipart(
443 353 # zmq.NOBLOCK, copy=False)
444 354 # except zmq.ZMQError, e:
445 355 # if e.errno != zmq.EAGAIN:
446 356 # raise e
447 357 # else:
448 358 # idle=False
449 # self.dispatch_queue(self.reply_stream, msg)
359 # self.dispatch_queue(self.shell_stream, msg)
450 360 #
451 361 # if not self.task_stream.empty():
452 362 # idle=False
453 363 # msg = self.task_stream.recv_multipart()
454 364 # self.dispatch_queue(self.task_stream, msg)
455 365 # if idle:
456 366 # # don't busywait
457 367 # time.sleep(1e-3)
458 368
459
460 def main():
461 raise Exception("Don't run me anymore")
369 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
370 client_addr=None, loop=None, context=None):
371 # create loop, context, and session:
372 if loop is None:
462 373 loop = ioloop.IOLoop.instance()
463 c = zmq.Context()
464
465 ip = '127.0.0.1'
466 port_base = 5575
467 connection = ('tcp://%s' % ip) + ':%i'
468 rep_conn = connection % port_base
469 pub_conn = connection % (port_base+1)
470
471 print("Starting the kernel...", file=sys.__stdout__)
472 # print >>sys.__stdout__, "XREQ Channel:", rep_conn
473 # print >>sys.__stdout__, "PUB Channel:", pub_conn
474
475 session = StreamSession(username=u'kernel')
476
477 reply_socket = c.socket(zmq.XREQ)
478 reply_socket.connect(rep_conn)
479
480 pub_socket = c.socket(zmq.PUB)
481 pub_socket.connect(pub_conn)
482
483 stdout = OutStream(session, pub_socket, u'stdout')
484 stderr = OutStream(session, pub_socket, u'stderr')
485 sys.stdout = stdout
486 sys.stderr = stderr
487
488 display_hook = DisplayHook(session, pub_socket)
489 sys.displayhook = display_hook
490 reply_stream = zmqstream.ZMQStream(reply_socket,loop)
491 pub_stream = zmqstream.ZMQStream(pub_socket,loop)
492 kernel = Kernel(session, reply_stream, pub_stream)
493
494 # For debugging convenience, put sleep and a string in the namespace, so we
495 # have them every time we start.
496 kernel.user_ns['sleep'] = time.sleep
497 kernel.user_ns['s'] = 'Test string'
374 if context is None:
375 context = zmq.Context()
376 c = context
377 session = StreamSession()
378 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
379
380 # create Control Stream
381 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
382 control_stream.setsockopt(zmq.IDENTITY, identity)
383 control_stream.connect(control_addr)
384
385 # create Shell Streams (MUX, Task, etc.):
386 shell_streams = []
387 for addr in shell_addrs:
388 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
389 stream.setsockopt(zmq.IDENTITY, identity)
390 stream.connect(addr)
391 shell_streams.append(stream)
392
393 # create iopub stream:
394 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
395 iopub_stream.setsockopt(zmq.IDENTITY, identity)
396 iopub_stream.connect(iopub_addr)
397
398 # launch heartbeat
399 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
400 heart.start()
401
402 # create (optional) Client
403 if client_addr:
404 client = Client(client_addr, username=identity)
405 else:
406 client = None
498 407
499 print ("Use Ctrl-\\ (NOT Ctrl-C!) to terminate.", file=sys.__stdout__)
408 kernel = Kernel(session=session, control_stream=control_stream,
409 shell_streams=shell_streams, iopub_stream=iopub_stream,
410 client=client)
500 411 kernel.start()
501 loop.start()
502
412 return loop, c
503 413
504 if __name__ == '__main__':
505 main()
General Comments 0
You need to be logged in to leave comments. Login now