##// END OF EJS Templates
Parallel kernel/engine startup looks a bit more like pykernel
MinRK -
Show More
@@ -13,9 +13,12 b' from pprint import pprint'
13 import zmq
13 import zmq
14 from zmq.eventloop import ioloop, zmqstream
14 from zmq.eventloop import ioloop, zmqstream
15
15
16 from IPython.utils.traitlets import HasTraits
17 from IPython.utils.localinterfaces import LOCALHOST
18
16 from streamsession import Message, StreamSession
19 from streamsession import Message, StreamSession
17 from client import Client
20 from client import Client
18 import streamkernel as kernel
21 from streamkernel import Kernel, make_kernel
19 import heartmonitor
22 import heartmonitor
20 from entry_point import make_base_argument_parser, connect_logger, parse_url
23 from entry_point import make_base_argument_parser, connect_logger, parse_url
21 # import taskthread
24 # import taskthread
@@ -59,45 +62,25 b' class Engine(object):'
59 if msg.content.status == 'ok':
62 if msg.content.status == 'ok':
60 self.session.username = str(msg.content.id)
63 self.session.username = str(msg.content.id)
61 queue_addr = msg.content.queue
64 queue_addr = msg.content.queue
62 if queue_addr:
65 shell_addrs = [str(queue_addr)]
63 queue = self.context.socket(zmq.PAIR)
66 control_addr = str(msg.content.control)
64 queue.setsockopt(zmq.IDENTITY, self.ident)
65 queue.connect(str(queue_addr))
66 self.queue = zmqstream.ZMQStream(queue, self.loop)
67
68 control_addr = msg.content.control
69 if control_addr:
70 control = self.context.socket(zmq.PAIR)
71 control.setsockopt(zmq.IDENTITY, self.ident)
72 control.connect(str(control_addr))
73 self.control = zmqstream.ZMQStream(control, self.loop)
74
75 task_addr = msg.content.task
67 task_addr = msg.content.task
76 print (task_addr)
77 if task_addr:
68 if task_addr:
78 # task as stream:
69 shell_addrs.append(str(task_addr))
79 task = self.context.socket(zmq.PAIR)
80 task.setsockopt(zmq.IDENTITY, self.ident)
81 task.connect(str(task_addr))
82 self.task_stream = zmqstream.ZMQStream(task, self.loop)
83 # TaskThread:
84 # mon_addr = msg.content.monitor
85 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
86 # task.connect_in(str(task_addr))
87 # task.connect_out(str(mon_addr))
88 # self.task_stream = taskthread.QueueStream(*task.queues)
89 # task.start()
90
70
91 hbs = msg.content.heartbeat
71 hb_addrs = msg.content.heartbeat
92 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
93 self.heart.start()
94 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
72 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
95 # placeholder for now:
73
96 pub = self.context.socket(zmq.PUB)
74 # placeholder for no, since pub isn't hooked up:
97 pub = zmqstream.ZMQStream(pub, self.loop)
75 sub = self.context.socket(zmq.SUB)
98 # create and start the kernel
76 sub = zmqstream.ZMQStream(sub, self.loop)
99 self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
77 sub.on_recv(lambda *a: None)
100 self.kernel.start()
78 port = sub.bind_to_random_port("tcp://%s"%LOCALHOST)
79 iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345)
80
81 make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs,
82 client_addr=None, loop=self.loop, context=self.context)
83
101 else:
84 else:
102 # logger.error("Registration Failed: %s"%msg)
85 # logger.error("Registration Failed: %s"%msg)
103 raise Exception("Registration Failed: %s"%msg)
86 raise Exception("Registration Failed: %s"%msg)
@@ -114,6 +97,7 b' class Engine(object):'
114 def start(self):
97 def start(self):
115 print ("registering")
98 print ("registering")
116 self.register()
99 self.register()
100
117
101
118
102
119 def main():
103 def main():
@@ -3,8 +3,14 b''
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
4 """
5
5
6 #-----------------------------------------------------------------------------
7 # Imports
8 #-----------------------------------------------------------------------------
9
10 # Standard library imports.
6 from __future__ import print_function
11 from __future__ import print_function
7 import __builtin__
12 import __builtin__
13 from code import CommandCompiler
8 import os
14 import os
9 import sys
15 import sys
10 import time
16 import time
@@ -12,141 +18,43 b' import traceback'
12 from signal import SIGTERM, SIGKILL
18 from signal import SIGTERM, SIGKILL
13 from pprint import pprint
19 from pprint import pprint
14
20
15 from code import CommandCompiler
21 # System library imports.
16
17 import zmq
22 import zmq
18 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
19
24
25 # Local imports.
26 from IPython.utils.traitlets import HasTraits, Instance, List
20 from IPython.zmq.completer import KernelCompleter
27 from IPython.zmq.completer import KernelCompleter
21
28
22 from streamsession import StreamSession, Message, extract_header, serialize_object,\
29 from streamsession import StreamSession, Message, extract_header, serialize_object,\
23 unpack_apply_message
30 unpack_apply_message
24 from dependency import UnmetDependency
31 from dependency import UnmetDependency
32 import heartmonitor
33 from client import Client
25
34
26 def printer(*args):
35 def printer(*args):
27 pprint(args)
36 pprint(args)
28
37
29 class OutStream(object):
38 #-----------------------------------------------------------------------------
30 """A file like object that publishes the stream to a 0MQ PUB socket."""
39 # Main kernel class
31
40 #-----------------------------------------------------------------------------
32 def __init__(self, session, pub_socket, name, max_buffer=200):
33 self.session = session
34 self.pub_socket = pub_socket
35 self.name = name
36 self._buffer = []
37 self._buffer_len = 0
38 self.max_buffer = max_buffer
39 self.parent_header = {}
40
41 def set_parent(self, parent):
42 self.parent_header = extract_header(parent)
43
44 def close(self):
45 self.pub_socket = None
46
47 def flush(self):
48 if self.pub_socket is None:
49 raise ValueError(u'I/O operation on closed file')
50 else:
51 if self._buffer:
52 data = ''.join(self._buffer)
53 content = {u'name':self.name, u'data':data}
54 # msg = self.session.msg(u'stream', content=content,
55 # parent=self.parent_header)
56 msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header)
57 # print>>sys.__stdout__, Message(msg)
58 # self.pub_socket.send_json(msg)
59 self._buffer_len = 0
60 self._buffer = []
61
62 def isattr(self):
63 return False
64
65 def next(self):
66 raise IOError('Read not supported on a write only stream.')
67
68 def read(self, size=None):
69 raise IOError('Read not supported on a write only stream.')
70
71 readline=read
72
73 def write(self, s):
74 if self.pub_socket is None:
75 raise ValueError('I/O operation on closed file')
76 else:
77 self._buffer.append(s)
78 self._buffer_len += len(s)
79 self._maybe_send()
80
81 def _maybe_send(self):
82 if '\n' in self._buffer[-1]:
83 self.flush()
84 if self._buffer_len > self.max_buffer:
85 self.flush()
86
87 def writelines(self, sequence):
88 if self.pub_socket is None:
89 raise ValueError('I/O operation on closed file')
90 else:
91 for s in sequence:
92 self.write(s)
93
94
95 class DisplayHook(object):
96
97 def __init__(self, session, pub_socket):
98 self.session = session
99 self.pub_socket = pub_socket
100 self.parent_header = {}
101
102 def __call__(self, obj):
103 if obj is None:
104 return
105
106 __builtin__._ = obj
107 # msg = self.session.msg(u'pyout', {u'data':repr(obj)},
108 # parent=self.parent_header)
109 # self.pub_socket.send_json(msg)
110 self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header)
111
112 def set_parent(self, parent):
113 self.parent_header = extract_header(parent)
114
115
116 class RawInput(object):
117
118 def __init__(self, session, socket):
119 self.session = session
120 self.socket = socket
121
122 def __call__(self, prompt=None):
123 msg = self.session.msg(u'raw_input')
124 self.socket.send_json(msg)
125 while True:
126 try:
127 reply = self.socket.recv_json(zmq.NOBLOCK)
128 except zmq.ZMQError as e:
129 if e.errno == zmq.EAGAIN:
130 pass
131 else:
132 raise
133 else:
134 break
135 return reply[u'content'][u'data']
136
41
42 class Kernel(HasTraits):
137
43
138 class Kernel(object):
44 #---------------------------------------------------------------------------
45 # Kernel interface
46 #---------------------------------------------------------------------------
139
47
140 def __init__(self, session, control_stream, reply_stream, pub_stream,
48 session = Instance(StreamSession)
141 task_stream=None, client=None):
49 shell_streams = Instance(list)
142 self.session = session
50 control_stream = Instance(zmqstream.ZMQStream)
143 self.control_stream = control_stream
51 task_stream = Instance(zmqstream.ZMQStream)
144 # self.control_socket = control_stream.socket
52 iopub_stream = Instance(zmqstream.ZMQStream)
145 self.reply_stream = reply_stream
53 client = Instance(Client)
146 self.identity = self.reply_stream.getsockopt(zmq.IDENTITY)
54
147 self.task_stream = task_stream
55 def __init__(self, **kwargs):
148 self.pub_stream = pub_stream
56 super(Kernel, self).__init__(**kwargs)
149 self.client = client
57 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
150 self.user_ns = {}
58 self.user_ns = {}
151 self.history = []
59 self.history = []
152 self.compiler = CommandCompiler()
60 self.compiler = CommandCompiler()
@@ -154,17 +62,18 b' class Kernel(object):'
154 self.aborted = set()
62 self.aborted = set()
155
63
156 # Build dict of handlers for message types
64 # Build dict of handlers for message types
157 self.queue_handlers = {}
65 self.shell_handlers = {}
158 self.control_handlers = {}
66 self.control_handlers = {}
159 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
67 for msg_type in ['execute_request', 'complete_request', 'apply_request',
160 self.queue_handlers[msg_type] = getattr(self, msg_type)
68 'clear_request']:
69 self.shell_handlers[msg_type] = getattr(self, msg_type)
161
70
162 for msg_type in ['kill_request', 'abort_request', 'clear_request']+self.queue_handlers.keys():
71 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
163 self.control_handlers[msg_type] = getattr(self, msg_type)
72 self.control_handlers[msg_type] = getattr(self, msg_type)
164
73
165 #-------------------- control handlers -----------------------------
74 #-------------------- control handlers -----------------------------
166 def abort_queues(self):
75 def abort_queues(self):
167 for stream in (self.task_stream, self.reply_stream):
76 for stream in self.shell_streams:
168 if stream:
77 if stream:
169 self.abort_queue(stream)
78 self.abort_queue(stream)
170
79
@@ -214,23 +123,17 b' class Kernel(object):'
214 parent=parent, ident=ident)[0]
123 parent=parent, ident=ident)[0]
215 print(Message(reply_msg), file=sys.__stdout__)
124 print(Message(reply_msg), file=sys.__stdout__)
216
125
217 def kill_request(self, stream, idents, parent):
126 def shutdown_request(self, stream, ident, parent):
218 """kill ourself. This should really be handled in an external process"""
127 """kill ourself. This should really be handled in an external process"""
219 self.abort_queues()
128 self.abort_queues()
220 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
129 content = dict(parent['content'])
221 content = dict(status='ok'))
130 msg = self.session.send(self.reply_socket, 'shutdown_reply',
222 # we can know that a message is done if we *don't* use streams, but
131 content, parent, ident)
223 # use a socket directly with MessageTracker
132 msg = self.session.send(self.pub_socket, 'shutdown_reply',
224 time.sleep(.5)
133 content, parent, ident)
225 os.kill(os.getpid(), SIGTERM)
134 # print >> sys.__stdout__, msg
226 time.sleep(1)
135 time.sleep(0.1)
227 os.kill(os.getpid(), SIGKILL)
136 sys.exit(0)
228
229 def clear_request(self, stream, idents, parent):
230 """Clear our namespace."""
231 self.user_ns = {}
232 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
233 content = dict(status='ok'))
234
137
235 def dispatch_control(self, msg):
138 def dispatch_control(self, msg):
236 idents,msg = self.session.feed_identities(msg, copy=False)
139 idents,msg = self.session.feed_identities(msg, copy=False)
@@ -274,6 +177,12 b' class Kernel(object):'
274
177
275 #-------------------- queue handlers -----------------------------
178 #-------------------- queue handlers -----------------------------
276
179
180 def clear_request(self, stream, idents, parent):
181 """Clear our namespace."""
182 self.user_ns = {}
183 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
184 content = dict(status='ok'))
185
277 def execute_request(self, stream, ident, parent):
186 def execute_request(self, stream, ident, parent):
278 try:
187 try:
279 code = parent[u'content'][u'code']
188 code = parent[u'content'][u'code']
@@ -282,8 +191,8 b' class Kernel(object):'
282 print(Message(parent), file=sys.__stderr__)
191 print(Message(parent), file=sys.__stderr__)
283 return
192 return
284 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
193 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
285 # self.pub_stream.send(pyin_msg)
194 # self.iopub_stream.send(pyin_msg)
286 self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
195 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
287 try:
196 try:
288 comp_code = self.compiler(code, '<zmq-kernel>')
197 comp_code = self.compiler(code, '<zmq-kernel>')
289 # allow for not overriding displayhook
198 # allow for not overriding displayhook
@@ -301,7 +210,7 b' class Kernel(object):'
301 u'evalue' : unicode(evalue)
210 u'evalue' : unicode(evalue)
302 }
211 }
303 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
212 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
304 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
213 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
305 reply_content = exc_content
214 reply_content = exc_content
306 else:
215 else:
307 reply_content = {'status' : 'ok'}
216 reply_content = {'status' : 'ok'}
@@ -335,8 +244,8 b' class Kernel(object):'
335 print(Message(parent), file=sys.__stderr__)
244 print(Message(parent), file=sys.__stderr__)
336 return
245 return
337 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
246 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
338 # self.pub_stream.send(pyin_msg)
247 # self.iopub_stream.send(pyin_msg)
339 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
248 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
340 sub = {'dependencies_met' : True, 'engine' : self.identity}
249 sub = {'dependencies_met' : True, 'engine' : self.identity}
341 try:
250 try:
342 # allow for not overriding displayhook
251 # allow for not overriding displayhook
@@ -384,7 +293,7 b' class Kernel(object):'
384 u'evalue' : unicode(evalue)
293 u'evalue' : unicode(evalue)
385 }
294 }
386 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
295 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
387 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
296 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
388 reply_content = exc_content
297 reply_content = exc_content
389 result_buf = []
298 result_buf = []
390
299
@@ -415,7 +324,7 b' class Kernel(object):'
415 reply_msg = self.session.send(stream, reply_type,
324 reply_msg = self.session.send(stream, reply_type,
416 content={'status' : 'aborted'}, parent=msg, ident=idents)
325 content={'status' : 'aborted'}, parent=msg, ident=idents)
417 return
326 return
418 handler = self.queue_handlers.get(msg['msg_type'], None)
327 handler = self.shell_handlers.get(msg['msg_type'], None)
419 if handler is None:
328 if handler is None:
420 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
329 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
421 else:
330 else:
@@ -426,27 +335,28 b' class Kernel(object):'
426 if self.control_stream:
335 if self.control_stream:
427 self.control_stream.on_recv(self.dispatch_control, copy=False)
336 self.control_stream.on_recv(self.dispatch_control, copy=False)
428 self.control_stream.on_err(printer)
337 self.control_stream.on_err(printer)
429 if self.reply_stream:
338
430 self.reply_stream.on_recv(lambda msg:
339 for s in self.shell_streams:
431 self.dispatch_queue(self.reply_stream, msg), copy=False)
340 s.on_recv(lambda msg:
432 self.reply_stream.on_err(printer)
341 self.dispatch_queue(s, msg), copy=False)
433 if self.task_stream:
342 s.on_err(printer)
434 self.task_stream.on_recv(lambda msg:
343
435 self.dispatch_queue(self.task_stream, msg), copy=False)
344 if self.iopub_stream:
436 self.task_stream.on_err(printer)
345 self.iopub_stream.on_err(printer)
346 self.iopub_stream.on_send(printer)
437
347
438 #### while True mode:
348 #### while True mode:
439 # while True:
349 # while True:
440 # idle = True
350 # idle = True
441 # try:
351 # try:
442 # msg = self.reply_stream.socket.recv_multipart(
352 # msg = self.shell_stream.socket.recv_multipart(
443 # zmq.NOBLOCK, copy=False)
353 # zmq.NOBLOCK, copy=False)
444 # except zmq.ZMQError, e:
354 # except zmq.ZMQError, e:
445 # if e.errno != zmq.EAGAIN:
355 # if e.errno != zmq.EAGAIN:
446 # raise e
356 # raise e
447 # else:
357 # else:
448 # idle=False
358 # idle=False
449 # self.dispatch_queue(self.reply_stream, msg)
359 # self.dispatch_queue(self.shell_stream, msg)
450 #
360 #
451 # if not self.task_stream.empty():
361 # if not self.task_stream.empty():
452 # idle=False
362 # idle=False
@@ -456,50 +366,48 b' class Kernel(object):'
456 # # don't busywait
366 # # don't busywait
457 # time.sleep(1e-3)
367 # time.sleep(1e-3)
458
368
459
369 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
460 def main():
370 client_addr=None, loop=None, context=None):
461 raise Exception("Don't run me anymore")
371 # create loop, context, and session:
462 loop = ioloop.IOLoop.instance()
372 if loop is None:
463 c = zmq.Context()
373 loop = ioloop.IOLoop.instance()
464
374 if context is None:
465 ip = '127.0.0.1'
375 context = zmq.Context()
466 port_base = 5575
376 c = context
467 connection = ('tcp://%s' % ip) + ':%i'
377 session = StreamSession()
468 rep_conn = connection % port_base
378 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
469 pub_conn = connection % (port_base+1)
379
470
380 # create Control Stream
471 print("Starting the kernel...", file=sys.__stdout__)
381 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
472 # print >>sys.__stdout__, "XREQ Channel:", rep_conn
382 control_stream.setsockopt(zmq.IDENTITY, identity)
473 # print >>sys.__stdout__, "PUB Channel:", pub_conn
383 control_stream.connect(control_addr)
474
384
475 session = StreamSession(username=u'kernel')
385 # create Shell Streams (MUX, Task, etc.):
476
386 shell_streams = []
477 reply_socket = c.socket(zmq.XREQ)
387 for addr in shell_addrs:
478 reply_socket.connect(rep_conn)
388 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
479
389 stream.setsockopt(zmq.IDENTITY, identity)
480 pub_socket = c.socket(zmq.PUB)
390 stream.connect(addr)
481 pub_socket.connect(pub_conn)
391 shell_streams.append(stream)
482
483 stdout = OutStream(session, pub_socket, u'stdout')
484 stderr = OutStream(session, pub_socket, u'stderr')
485 sys.stdout = stdout
486 sys.stderr = stderr
487
488 display_hook = DisplayHook(session, pub_socket)
489 sys.displayhook = display_hook
490 reply_stream = zmqstream.ZMQStream(reply_socket,loop)
491 pub_stream = zmqstream.ZMQStream(pub_socket,loop)
492 kernel = Kernel(session, reply_stream, pub_stream)
493
494 # For debugging convenience, put sleep and a string in the namespace, so we
495 # have them every time we start.
496 kernel.user_ns['sleep'] = time.sleep
497 kernel.user_ns['s'] = 'Test string'
498
392
499 print ("Use Ctrl-\\ (NOT Ctrl-C!) to terminate.", file=sys.__stdout__)
393 # create iopub stream:
394 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
395 iopub_stream.setsockopt(zmq.IDENTITY, identity)
396 iopub_stream.connect(iopub_addr)
397
398 # launch heartbeat
399 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
400 heart.start()
401
402 # create (optional) Client
403 if client_addr:
404 client = Client(client_addr, username=identity)
405 else:
406 client = None
407
408 kernel = Kernel(session=session, control_stream=control_stream,
409 shell_streams=shell_streams, iopub_stream=iopub_stream,
410 client=client)
500 kernel.start()
411 kernel.start()
501 loop.start()
412 return loop, c
502
503
413
504 if __name__ == '__main__':
505 main()
General Comments 0
You need to be logged in to leave comments. Login now