##// END OF EJS Templates
Parallel kernel/engine startup looks a bit more like pykernel
MinRK -
Show More
@@ -13,9 +13,12 b' from pprint import pprint'
13 13 import zmq
14 14 from zmq.eventloop import ioloop, zmqstream
15 15
16 from IPython.utils.traitlets import HasTraits
17 from IPython.utils.localinterfaces import LOCALHOST
18
16 19 from streamsession import Message, StreamSession
17 20 from client import Client
18 import streamkernel as kernel
21 from streamkernel import Kernel, make_kernel
19 22 import heartmonitor
20 23 from entry_point import make_base_argument_parser, connect_logger, parse_url
21 24 # import taskthread
@@ -59,45 +62,25 b' class Engine(object):'
59 62 if msg.content.status == 'ok':
60 63 self.session.username = str(msg.content.id)
61 64 queue_addr = msg.content.queue
62 if queue_addr:
63 queue = self.context.socket(zmq.PAIR)
64 queue.setsockopt(zmq.IDENTITY, self.ident)
65 queue.connect(str(queue_addr))
66 self.queue = zmqstream.ZMQStream(queue, self.loop)
67
68 control_addr = msg.content.control
69 if control_addr:
70 control = self.context.socket(zmq.PAIR)
71 control.setsockopt(zmq.IDENTITY, self.ident)
72 control.connect(str(control_addr))
73 self.control = zmqstream.ZMQStream(control, self.loop)
74
65 shell_addrs = [str(queue_addr)]
66 control_addr = str(msg.content.control)
75 67 task_addr = msg.content.task
76 print (task_addr)
77 68 if task_addr:
78 # task as stream:
79 task = self.context.socket(zmq.PAIR)
80 task.setsockopt(zmq.IDENTITY, self.ident)
81 task.connect(str(task_addr))
82 self.task_stream = zmqstream.ZMQStream(task, self.loop)
83 # TaskThread:
84 # mon_addr = msg.content.monitor
85 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
86 # task.connect_in(str(task_addr))
87 # task.connect_out(str(mon_addr))
88 # self.task_stream = taskthread.QueueStream(*task.queues)
89 # task.start()
69 shell_addrs.append(str(task_addr))
90 70
91 hbs = msg.content.heartbeat
92 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
93 self.heart.start()
71 hb_addrs = msg.content.heartbeat
94 72 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
95 # placeholder for now:
96 pub = self.context.socket(zmq.PUB)
97 pub = zmqstream.ZMQStream(pub, self.loop)
98 # create and start the kernel
99 self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
100 self.kernel.start()
73
74 # placeholder for no, since pub isn't hooked up:
75 sub = self.context.socket(zmq.SUB)
76 sub = zmqstream.ZMQStream(sub, self.loop)
77 sub.on_recv(lambda *a: None)
78 port = sub.bind_to_random_port("tcp://%s"%LOCALHOST)
79 iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345)
80
81 make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs,
82 client_addr=None, loop=self.loop, context=self.context)
83
101 84 else:
102 85 # logger.error("Registration Failed: %s"%msg)
103 86 raise Exception("Registration Failed: %s"%msg)
@@ -114,6 +97,7 b' class Engine(object):'
114 97 def start(self):
115 98 print ("registering")
116 99 self.register()
100
117 101
118 102
119 103 def main():
@@ -3,8 +3,14 b''
3 3 Kernel adapted from kernel.py to use ZMQ Streams
4 4 """
5 5
6 #-----------------------------------------------------------------------------
7 # Imports
8 #-----------------------------------------------------------------------------
9
10 # Standard library imports.
6 11 from __future__ import print_function
7 12 import __builtin__
13 from code import CommandCompiler
8 14 import os
9 15 import sys
10 16 import time
@@ -12,141 +18,43 b' import traceback'
12 18 from signal import SIGTERM, SIGKILL
13 19 from pprint import pprint
14 20
15 from code import CommandCompiler
16
21 # System library imports.
17 22 import zmq
18 23 from zmq.eventloop import ioloop, zmqstream
19 24
25 # Local imports.
26 from IPython.utils.traitlets import HasTraits, Instance, List
20 27 from IPython.zmq.completer import KernelCompleter
21 28
22 29 from streamsession import StreamSession, Message, extract_header, serialize_object,\
23 30 unpack_apply_message
24 31 from dependency import UnmetDependency
32 import heartmonitor
33 from client import Client
25 34
26 35 def printer(*args):
27 36 pprint(args)
28 37
29 class OutStream(object):
30 """A file like object that publishes the stream to a 0MQ PUB socket."""
31
32 def __init__(self, session, pub_socket, name, max_buffer=200):
33 self.session = session
34 self.pub_socket = pub_socket
35 self.name = name
36 self._buffer = []
37 self._buffer_len = 0
38 self.max_buffer = max_buffer
39 self.parent_header = {}
40
41 def set_parent(self, parent):
42 self.parent_header = extract_header(parent)
43
44 def close(self):
45 self.pub_socket = None
46
47 def flush(self):
48 if self.pub_socket is None:
49 raise ValueError(u'I/O operation on closed file')
50 else:
51 if self._buffer:
52 data = ''.join(self._buffer)
53 content = {u'name':self.name, u'data':data}
54 # msg = self.session.msg(u'stream', content=content,
55 # parent=self.parent_header)
56 msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header)
57 # print>>sys.__stdout__, Message(msg)
58 # self.pub_socket.send_json(msg)
59 self._buffer_len = 0
60 self._buffer = []
61
62 def isattr(self):
63 return False
64
65 def next(self):
66 raise IOError('Read not supported on a write only stream.')
67
68 def read(self, size=None):
69 raise IOError('Read not supported on a write only stream.')
70
71 readline=read
72
73 def write(self, s):
74 if self.pub_socket is None:
75 raise ValueError('I/O operation on closed file')
76 else:
77 self._buffer.append(s)
78 self._buffer_len += len(s)
79 self._maybe_send()
80
81 def _maybe_send(self):
82 if '\n' in self._buffer[-1]:
83 self.flush()
84 if self._buffer_len > self.max_buffer:
85 self.flush()
86
87 def writelines(self, sequence):
88 if self.pub_socket is None:
89 raise ValueError('I/O operation on closed file')
90 else:
91 for s in sequence:
92 self.write(s)
93
94
95 class DisplayHook(object):
96
97 def __init__(self, session, pub_socket):
98 self.session = session
99 self.pub_socket = pub_socket
100 self.parent_header = {}
101
102 def __call__(self, obj):
103 if obj is None:
104 return
105
106 __builtin__._ = obj
107 # msg = self.session.msg(u'pyout', {u'data':repr(obj)},
108 # parent=self.parent_header)
109 # self.pub_socket.send_json(msg)
110 self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header)
111
112 def set_parent(self, parent):
113 self.parent_header = extract_header(parent)
114
115
116 class RawInput(object):
117
118 def __init__(self, session, socket):
119 self.session = session
120 self.socket = socket
121
122 def __call__(self, prompt=None):
123 msg = self.session.msg(u'raw_input')
124 self.socket.send_json(msg)
125 while True:
126 try:
127 reply = self.socket.recv_json(zmq.NOBLOCK)
128 except zmq.ZMQError as e:
129 if e.errno == zmq.EAGAIN:
130 pass
131 else:
132 raise
133 else:
134 break
135 return reply[u'content'][u'data']
38 #-----------------------------------------------------------------------------
39 # Main kernel class
40 #-----------------------------------------------------------------------------
136 41
42 class Kernel(HasTraits):
137 43
138 class Kernel(object):
44 #---------------------------------------------------------------------------
45 # Kernel interface
46 #---------------------------------------------------------------------------
139 47
140 def __init__(self, session, control_stream, reply_stream, pub_stream,
141 task_stream=None, client=None):
142 self.session = session
143 self.control_stream = control_stream
144 # self.control_socket = control_stream.socket
145 self.reply_stream = reply_stream
146 self.identity = self.reply_stream.getsockopt(zmq.IDENTITY)
147 self.task_stream = task_stream
148 self.pub_stream = pub_stream
149 self.client = client
48 session = Instance(StreamSession)
49 shell_streams = Instance(list)
50 control_stream = Instance(zmqstream.ZMQStream)
51 task_stream = Instance(zmqstream.ZMQStream)
52 iopub_stream = Instance(zmqstream.ZMQStream)
53 client = Instance(Client)
54
55 def __init__(self, **kwargs):
56 super(Kernel, self).__init__(**kwargs)
57 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
150 58 self.user_ns = {}
151 59 self.history = []
152 60 self.compiler = CommandCompiler()
@@ -154,17 +62,18 b' class Kernel(object):'
154 62 self.aborted = set()
155 63
156 64 # Build dict of handlers for message types
157 self.queue_handlers = {}
65 self.shell_handlers = {}
158 66 self.control_handlers = {}
159 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
160 self.queue_handlers[msg_type] = getattr(self, msg_type)
67 for msg_type in ['execute_request', 'complete_request', 'apply_request',
68 'clear_request']:
69 self.shell_handlers[msg_type] = getattr(self, msg_type)
161 70
162 for msg_type in ['kill_request', 'abort_request', 'clear_request']+self.queue_handlers.keys():
71 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
163 72 self.control_handlers[msg_type] = getattr(self, msg_type)
164 73
165 74 #-------------------- control handlers -----------------------------
166 75 def abort_queues(self):
167 for stream in (self.task_stream, self.reply_stream):
76 for stream in self.shell_streams:
168 77 if stream:
169 78 self.abort_queue(stream)
170 79
@@ -214,23 +123,17 b' class Kernel(object):'
214 123 parent=parent, ident=ident)[0]
215 124 print(Message(reply_msg), file=sys.__stdout__)
216 125
217 def kill_request(self, stream, idents, parent):
126 def shutdown_request(self, stream, ident, parent):
218 127 """kill ourself. This should really be handled in an external process"""
219 128 self.abort_queues()
220 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
221 content = dict(status='ok'))
222 # we can know that a message is done if we *don't* use streams, but
223 # use a socket directly with MessageTracker
224 time.sleep(.5)
225 os.kill(os.getpid(), SIGTERM)
226 time.sleep(1)
227 os.kill(os.getpid(), SIGKILL)
228
229 def clear_request(self, stream, idents, parent):
230 """Clear our namespace."""
231 self.user_ns = {}
232 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
233 content = dict(status='ok'))
129 content = dict(parent['content'])
130 msg = self.session.send(self.reply_socket, 'shutdown_reply',
131 content, parent, ident)
132 msg = self.session.send(self.pub_socket, 'shutdown_reply',
133 content, parent, ident)
134 # print >> sys.__stdout__, msg
135 time.sleep(0.1)
136 sys.exit(0)
234 137
235 138 def dispatch_control(self, msg):
236 139 idents,msg = self.session.feed_identities(msg, copy=False)
@@ -274,6 +177,12 b' class Kernel(object):'
274 177
275 178 #-------------------- queue handlers -----------------------------
276 179
180 def clear_request(self, stream, idents, parent):
181 """Clear our namespace."""
182 self.user_ns = {}
183 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
184 content = dict(status='ok'))
185
277 186 def execute_request(self, stream, ident, parent):
278 187 try:
279 188 code = parent[u'content'][u'code']
@@ -282,8 +191,8 b' class Kernel(object):'
282 191 print(Message(parent), file=sys.__stderr__)
283 192 return
284 193 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
285 # self.pub_stream.send(pyin_msg)
286 self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
194 # self.iopub_stream.send(pyin_msg)
195 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
287 196 try:
288 197 comp_code = self.compiler(code, '<zmq-kernel>')
289 198 # allow for not overriding displayhook
@@ -301,7 +210,7 b' class Kernel(object):'
301 210 u'evalue' : unicode(evalue)
302 211 }
303 212 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
304 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
213 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
305 214 reply_content = exc_content
306 215 else:
307 216 reply_content = {'status' : 'ok'}
@@ -335,8 +244,8 b' class Kernel(object):'
335 244 print(Message(parent), file=sys.__stderr__)
336 245 return
337 246 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
338 # self.pub_stream.send(pyin_msg)
339 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
247 # self.iopub_stream.send(pyin_msg)
248 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
340 249 sub = {'dependencies_met' : True, 'engine' : self.identity}
341 250 try:
342 251 # allow for not overriding displayhook
@@ -384,7 +293,7 b' class Kernel(object):'
384 293 u'evalue' : unicode(evalue)
385 294 }
386 295 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
387 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
296 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
388 297 reply_content = exc_content
389 298 result_buf = []
390 299
@@ -415,7 +324,7 b' class Kernel(object):'
415 324 reply_msg = self.session.send(stream, reply_type,
416 325 content={'status' : 'aborted'}, parent=msg, ident=idents)
417 326 return
418 handler = self.queue_handlers.get(msg['msg_type'], None)
327 handler = self.shell_handlers.get(msg['msg_type'], None)
419 328 if handler is None:
420 329 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
421 330 else:
@@ -426,27 +335,28 b' class Kernel(object):'
426 335 if self.control_stream:
427 336 self.control_stream.on_recv(self.dispatch_control, copy=False)
428 337 self.control_stream.on_err(printer)
429 if self.reply_stream:
430 self.reply_stream.on_recv(lambda msg:
431 self.dispatch_queue(self.reply_stream, msg), copy=False)
432 self.reply_stream.on_err(printer)
433 if self.task_stream:
434 self.task_stream.on_recv(lambda msg:
435 self.dispatch_queue(self.task_stream, msg), copy=False)
436 self.task_stream.on_err(printer)
338
339 for s in self.shell_streams:
340 s.on_recv(lambda msg:
341 self.dispatch_queue(s, msg), copy=False)
342 s.on_err(printer)
343
344 if self.iopub_stream:
345 self.iopub_stream.on_err(printer)
346 self.iopub_stream.on_send(printer)
437 347
438 348 #### while True mode:
439 349 # while True:
440 350 # idle = True
441 351 # try:
442 # msg = self.reply_stream.socket.recv_multipart(
352 # msg = self.shell_stream.socket.recv_multipart(
443 353 # zmq.NOBLOCK, copy=False)
444 354 # except zmq.ZMQError, e:
445 355 # if e.errno != zmq.EAGAIN:
446 356 # raise e
447 357 # else:
448 358 # idle=False
449 # self.dispatch_queue(self.reply_stream, msg)
359 # self.dispatch_queue(self.shell_stream, msg)
450 360 #
451 361 # if not self.task_stream.empty():
452 362 # idle=False
@@ -456,50 +366,48 b' class Kernel(object):'
456 366 # # don't busywait
457 367 # time.sleep(1e-3)
458 368
459
460 def main():
461 raise Exception("Don't run me anymore")
462 loop = ioloop.IOLoop.instance()
463 c = zmq.Context()
464
465 ip = '127.0.0.1'
466 port_base = 5575
467 connection = ('tcp://%s' % ip) + ':%i'
468 rep_conn = connection % port_base
469 pub_conn = connection % (port_base+1)
470
471 print("Starting the kernel...", file=sys.__stdout__)
472 # print >>sys.__stdout__, "XREQ Channel:", rep_conn
473 # print >>sys.__stdout__, "PUB Channel:", pub_conn
474
475 session = StreamSession(username=u'kernel')
476
477 reply_socket = c.socket(zmq.XREQ)
478 reply_socket.connect(rep_conn)
479
480 pub_socket = c.socket(zmq.PUB)
481 pub_socket.connect(pub_conn)
482
483 stdout = OutStream(session, pub_socket, u'stdout')
484 stderr = OutStream(session, pub_socket, u'stderr')
485 sys.stdout = stdout
486 sys.stderr = stderr
487
488 display_hook = DisplayHook(session, pub_socket)
489 sys.displayhook = display_hook
490 reply_stream = zmqstream.ZMQStream(reply_socket,loop)
491 pub_stream = zmqstream.ZMQStream(pub_socket,loop)
492 kernel = Kernel(session, reply_stream, pub_stream)
493
494 # For debugging convenience, put sleep and a string in the namespace, so we
495 # have them every time we start.
496 kernel.user_ns['sleep'] = time.sleep
497 kernel.user_ns['s'] = 'Test string'
369 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
370 client_addr=None, loop=None, context=None):
371 # create loop, context, and session:
372 if loop is None:
373 loop = ioloop.IOLoop.instance()
374 if context is None:
375 context = zmq.Context()
376 c = context
377 session = StreamSession()
378 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
379
380 # create Control Stream
381 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
382 control_stream.setsockopt(zmq.IDENTITY, identity)
383 control_stream.connect(control_addr)
384
385 # create Shell Streams (MUX, Task, etc.):
386 shell_streams = []
387 for addr in shell_addrs:
388 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
389 stream.setsockopt(zmq.IDENTITY, identity)
390 stream.connect(addr)
391 shell_streams.append(stream)
498 392
499 print ("Use Ctrl-\\ (NOT Ctrl-C!) to terminate.", file=sys.__stdout__)
393 # create iopub stream:
394 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
395 iopub_stream.setsockopt(zmq.IDENTITY, identity)
396 iopub_stream.connect(iopub_addr)
397
398 # launch heartbeat
399 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
400 heart.start()
401
402 # create (optional) Client
403 if client_addr:
404 client = Client(client_addr, username=identity)
405 else:
406 client = None
407
408 kernel = Kernel(session=session, control_stream=control_stream,
409 shell_streams=shell_streams, iopub_stream=iopub_stream,
410 client=client)
500 411 kernel.start()
501 loop.start()
502
412 return loop, c
503 413
504 if __name__ == '__main__':
505 main()
General Comments 0
You need to be logged in to leave comments. Login now