Show More
@@ -13,9 +13,12 b' from pprint import pprint' | |||||
13 | import zmq |
|
13 | import zmq | |
14 | from zmq.eventloop import ioloop, zmqstream |
|
14 | from zmq.eventloop import ioloop, zmqstream | |
15 |
|
15 | |||
|
16 | from IPython.utils.traitlets import HasTraits | |||
|
17 | from IPython.utils.localinterfaces import LOCALHOST | |||
|
18 | ||||
16 | from streamsession import Message, StreamSession |
|
19 | from streamsession import Message, StreamSession | |
17 | from client import Client |
|
20 | from client import Client | |
18 |
|
|
21 | from streamkernel import Kernel, make_kernel | |
19 | import heartmonitor |
|
22 | import heartmonitor | |
20 | from entry_point import make_base_argument_parser, connect_logger, parse_url |
|
23 | from entry_point import make_base_argument_parser, connect_logger, parse_url | |
21 | # import taskthread |
|
24 | # import taskthread | |
@@ -59,45 +62,25 b' class Engine(object):' | |||||
59 | if msg.content.status == 'ok': |
|
62 | if msg.content.status == 'ok': | |
60 | self.session.username = str(msg.content.id) |
|
63 | self.session.username = str(msg.content.id) | |
61 | queue_addr = msg.content.queue |
|
64 | queue_addr = msg.content.queue | |
62 |
|
|
65 | shell_addrs = [str(queue_addr)] | |
63 | queue = self.context.socket(zmq.PAIR) |
|
66 | control_addr = str(msg.content.control) | |
64 | queue.setsockopt(zmq.IDENTITY, self.ident) |
|
|||
65 | queue.connect(str(queue_addr)) |
|
|||
66 | self.queue = zmqstream.ZMQStream(queue, self.loop) |
|
|||
67 |
|
||||
68 | control_addr = msg.content.control |
|
|||
69 | if control_addr: |
|
|||
70 | control = self.context.socket(zmq.PAIR) |
|
|||
71 | control.setsockopt(zmq.IDENTITY, self.ident) |
|
|||
72 | control.connect(str(control_addr)) |
|
|||
73 | self.control = zmqstream.ZMQStream(control, self.loop) |
|
|||
74 |
|
||||
75 | task_addr = msg.content.task |
|
67 | task_addr = msg.content.task | |
76 | print (task_addr) |
|
|||
77 | if task_addr: |
|
68 | if task_addr: | |
78 | # task as stream: |
|
69 | shell_addrs.append(str(task_addr)) | |
79 | task = self.context.socket(zmq.PAIR) |
|
|||
80 | task.setsockopt(zmq.IDENTITY, self.ident) |
|
|||
81 | task.connect(str(task_addr)) |
|
|||
82 | self.task_stream = zmqstream.ZMQStream(task, self.loop) |
|
|||
83 | # TaskThread: |
|
|||
84 | # mon_addr = msg.content.monitor |
|
|||
85 | # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident) |
|
|||
86 | # task.connect_in(str(task_addr)) |
|
|||
87 | # task.connect_out(str(mon_addr)) |
|
|||
88 | # self.task_stream = taskthread.QueueStream(*task.queues) |
|
|||
89 | # task.start() |
|
|||
90 |
|
70 | |||
91 | hbs = msg.content.heartbeat |
|
71 | hb_addrs = msg.content.heartbeat | |
92 | self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident) |
|
|||
93 | self.heart.start() |
|
|||
94 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() |
|
72 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() | |
95 | # placeholder for now: |
|
73 | ||
96 | pub = self.context.socket(zmq.PUB) |
|
74 | # placeholder for no, since pub isn't hooked up: | |
97 | pub = zmqstream.ZMQStream(pub, self.loop) |
|
75 | sub = self.context.socket(zmq.SUB) | |
98 | # create and start the kernel |
|
76 | sub = zmqstream.ZMQStream(sub, self.loop) | |
99 | self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client) |
|
77 | sub.on_recv(lambda *a: None) | |
100 | self.kernel.start() |
|
78 | port = sub.bind_to_random_port("tcp://%s"%LOCALHOST) | |
|
79 | iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345) | |||
|
80 | ||||
|
81 | make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs, | |||
|
82 | client_addr=None, loop=self.loop, context=self.context) | |||
|
83 | ||||
101 | else: |
|
84 | else: | |
102 | # logger.error("Registration Failed: %s"%msg) |
|
85 | # logger.error("Registration Failed: %s"%msg) | |
103 | raise Exception("Registration Failed: %s"%msg) |
|
86 | raise Exception("Registration Failed: %s"%msg) | |
@@ -114,6 +97,7 b' class Engine(object):' | |||||
114 | def start(self): |
|
97 | def start(self): | |
115 | print ("registering") |
|
98 | print ("registering") | |
116 | self.register() |
|
99 | self.register() | |
|
100 | ||||
117 |
|
101 | |||
118 |
|
102 | |||
119 | def main(): |
|
103 | def main(): |
@@ -3,8 +3,14 b'' | |||||
3 | Kernel adapted from kernel.py to use ZMQ Streams |
|
3 | Kernel adapted from kernel.py to use ZMQ Streams | |
4 | """ |
|
4 | """ | |
5 |
|
5 | |||
|
6 | #----------------------------------------------------------------------------- | |||
|
7 | # Imports | |||
|
8 | #----------------------------------------------------------------------------- | |||
|
9 | ||||
|
10 | # Standard library imports. | |||
6 | from __future__ import print_function |
|
11 | from __future__ import print_function | |
7 | import __builtin__ |
|
12 | import __builtin__ | |
|
13 | from code import CommandCompiler | |||
8 | import os |
|
14 | import os | |
9 | import sys |
|
15 | import sys | |
10 | import time |
|
16 | import time | |
@@ -12,141 +18,43 b' import traceback' | |||||
12 | from signal import SIGTERM, SIGKILL |
|
18 | from signal import SIGTERM, SIGKILL | |
13 | from pprint import pprint |
|
19 | from pprint import pprint | |
14 |
|
20 | |||
15 | from code import CommandCompiler |
|
21 | # System library imports. | |
16 |
|
||||
17 | import zmq |
|
22 | import zmq | |
18 | from zmq.eventloop import ioloop, zmqstream |
|
23 | from zmq.eventloop import ioloop, zmqstream | |
19 |
|
24 | |||
|
25 | # Local imports. | |||
|
26 | from IPython.utils.traitlets import HasTraits, Instance, List | |||
20 | from IPython.zmq.completer import KernelCompleter |
|
27 | from IPython.zmq.completer import KernelCompleter | |
21 |
|
28 | |||
22 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ |
|
29 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ | |
23 | unpack_apply_message |
|
30 | unpack_apply_message | |
24 | from dependency import UnmetDependency |
|
31 | from dependency import UnmetDependency | |
|
32 | import heartmonitor | |||
|
33 | from client import Client | |||
25 |
|
34 | |||
26 | def printer(*args): |
|
35 | def printer(*args): | |
27 | pprint(args) |
|
36 | pprint(args) | |
28 |
|
37 | |||
29 | class OutStream(object): |
|
38 | #----------------------------------------------------------------------------- | |
30 | """A file like object that publishes the stream to a 0MQ PUB socket.""" |
|
39 | # Main kernel class | |
31 |
|
40 | #----------------------------------------------------------------------------- | ||
32 | def __init__(self, session, pub_socket, name, max_buffer=200): |
|
|||
33 | self.session = session |
|
|||
34 | self.pub_socket = pub_socket |
|
|||
35 | self.name = name |
|
|||
36 | self._buffer = [] |
|
|||
37 | self._buffer_len = 0 |
|
|||
38 | self.max_buffer = max_buffer |
|
|||
39 | self.parent_header = {} |
|
|||
40 |
|
||||
41 | def set_parent(self, parent): |
|
|||
42 | self.parent_header = extract_header(parent) |
|
|||
43 |
|
||||
44 | def close(self): |
|
|||
45 | self.pub_socket = None |
|
|||
46 |
|
||||
47 | def flush(self): |
|
|||
48 | if self.pub_socket is None: |
|
|||
49 | raise ValueError(u'I/O operation on closed file') |
|
|||
50 | else: |
|
|||
51 | if self._buffer: |
|
|||
52 | data = ''.join(self._buffer) |
|
|||
53 | content = {u'name':self.name, u'data':data} |
|
|||
54 | # msg = self.session.msg(u'stream', content=content, |
|
|||
55 | # parent=self.parent_header) |
|
|||
56 | msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header) |
|
|||
57 | # print>>sys.__stdout__, Message(msg) |
|
|||
58 | # self.pub_socket.send_json(msg) |
|
|||
59 | self._buffer_len = 0 |
|
|||
60 | self._buffer = [] |
|
|||
61 |
|
||||
62 | def isattr(self): |
|
|||
63 | return False |
|
|||
64 |
|
||||
65 | def next(self): |
|
|||
66 | raise IOError('Read not supported on a write only stream.') |
|
|||
67 |
|
||||
68 | def read(self, size=None): |
|
|||
69 | raise IOError('Read not supported on a write only stream.') |
|
|||
70 |
|
||||
71 | readline=read |
|
|||
72 |
|
||||
73 | def write(self, s): |
|
|||
74 | if self.pub_socket is None: |
|
|||
75 | raise ValueError('I/O operation on closed file') |
|
|||
76 | else: |
|
|||
77 | self._buffer.append(s) |
|
|||
78 | self._buffer_len += len(s) |
|
|||
79 | self._maybe_send() |
|
|||
80 |
|
||||
81 | def _maybe_send(self): |
|
|||
82 | if '\n' in self._buffer[-1]: |
|
|||
83 | self.flush() |
|
|||
84 | if self._buffer_len > self.max_buffer: |
|
|||
85 | self.flush() |
|
|||
86 |
|
||||
87 | def writelines(self, sequence): |
|
|||
88 | if self.pub_socket is None: |
|
|||
89 | raise ValueError('I/O operation on closed file') |
|
|||
90 | else: |
|
|||
91 | for s in sequence: |
|
|||
92 | self.write(s) |
|
|||
93 |
|
||||
94 |
|
||||
95 | class DisplayHook(object): |
|
|||
96 |
|
||||
97 | def __init__(self, session, pub_socket): |
|
|||
98 | self.session = session |
|
|||
99 | self.pub_socket = pub_socket |
|
|||
100 | self.parent_header = {} |
|
|||
101 |
|
||||
102 | def __call__(self, obj): |
|
|||
103 | if obj is None: |
|
|||
104 | return |
|
|||
105 |
|
||||
106 | __builtin__._ = obj |
|
|||
107 | # msg = self.session.msg(u'pyout', {u'data':repr(obj)}, |
|
|||
108 | # parent=self.parent_header) |
|
|||
109 | # self.pub_socket.send_json(msg) |
|
|||
110 | self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header) |
|
|||
111 |
|
||||
112 | def set_parent(self, parent): |
|
|||
113 | self.parent_header = extract_header(parent) |
|
|||
114 |
|
||||
115 |
|
||||
116 | class RawInput(object): |
|
|||
117 |
|
||||
118 | def __init__(self, session, socket): |
|
|||
119 | self.session = session |
|
|||
120 | self.socket = socket |
|
|||
121 |
|
||||
122 | def __call__(self, prompt=None): |
|
|||
123 | msg = self.session.msg(u'raw_input') |
|
|||
124 | self.socket.send_json(msg) |
|
|||
125 | while True: |
|
|||
126 | try: |
|
|||
127 | reply = self.socket.recv_json(zmq.NOBLOCK) |
|
|||
128 | except zmq.ZMQError as e: |
|
|||
129 | if e.errno == zmq.EAGAIN: |
|
|||
130 | pass |
|
|||
131 | else: |
|
|||
132 | raise |
|
|||
133 | else: |
|
|||
134 | break |
|
|||
135 | return reply[u'content'][u'data'] |
|
|||
136 |
|
41 | |||
|
42 | class Kernel(HasTraits): | |||
137 |
|
43 | |||
138 | class Kernel(object): |
|
44 | #--------------------------------------------------------------------------- | |
|
45 | # Kernel interface | |||
|
46 | #--------------------------------------------------------------------------- | |||
139 |
|
47 | |||
140 | def __init__(self, session, control_stream, reply_stream, pub_stream, |
|
48 | session = Instance(StreamSession) | |
141 | task_stream=None, client=None): |
|
49 | shell_streams = Instance(list) | |
142 | self.session = session |
|
50 | control_stream = Instance(zmqstream.ZMQStream) | |
143 | self.control_stream = control_stream |
|
51 | task_stream = Instance(zmqstream.ZMQStream) | |
144 | # self.control_socket = control_stream.socket |
|
52 | iopub_stream = Instance(zmqstream.ZMQStream) | |
145 | self.reply_stream = reply_stream |
|
53 | client = Instance(Client) | |
146 | self.identity = self.reply_stream.getsockopt(zmq.IDENTITY) |
|
54 | ||
147 | self.task_stream = task_stream |
|
55 | def __init__(self, **kwargs): | |
148 | self.pub_stream = pub_stream |
|
56 | super(Kernel, self).__init__(**kwargs) | |
149 | self.client = client |
|
57 | self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) | |
150 | self.user_ns = {} |
|
58 | self.user_ns = {} | |
151 | self.history = [] |
|
59 | self.history = [] | |
152 | self.compiler = CommandCompiler() |
|
60 | self.compiler = CommandCompiler() | |
@@ -154,17 +62,18 b' class Kernel(object):' | |||||
154 | self.aborted = set() |
|
62 | self.aborted = set() | |
155 |
|
63 | |||
156 | # Build dict of handlers for message types |
|
64 | # Build dict of handlers for message types | |
157 |
self. |
|
65 | self.shell_handlers = {} | |
158 | self.control_handlers = {} |
|
66 | self.control_handlers = {} | |
159 |
for msg_type in ['execute_request', 'complete_request', 'apply_request' |
|
67 | for msg_type in ['execute_request', 'complete_request', 'apply_request', | |
160 | self.queue_handlers[msg_type] = getattr(self, msg_type) |
|
68 | 'clear_request']: | |
|
69 | self.shell_handlers[msg_type] = getattr(self, msg_type) | |||
161 |
|
70 | |||
162 |
for msg_type in [' |
|
71 | for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys(): | |
163 | self.control_handlers[msg_type] = getattr(self, msg_type) |
|
72 | self.control_handlers[msg_type] = getattr(self, msg_type) | |
164 |
|
73 | |||
165 | #-------------------- control handlers ----------------------------- |
|
74 | #-------------------- control handlers ----------------------------- | |
166 | def abort_queues(self): |
|
75 | def abort_queues(self): | |
167 |
for stream in |
|
76 | for stream in self.shell_streams: | |
168 | if stream: |
|
77 | if stream: | |
169 | self.abort_queue(stream) |
|
78 | self.abort_queue(stream) | |
170 |
|
79 | |||
@@ -214,23 +123,17 b' class Kernel(object):' | |||||
214 | parent=parent, ident=ident)[0] |
|
123 | parent=parent, ident=ident)[0] | |
215 | print(Message(reply_msg), file=sys.__stdout__) |
|
124 | print(Message(reply_msg), file=sys.__stdout__) | |
216 |
|
125 | |||
217 |
def |
|
126 | def shutdown_request(self, stream, ident, parent): | |
218 | """kill ourself. This should really be handled in an external process""" |
|
127 | """kill ourself. This should really be handled in an external process""" | |
219 | self.abort_queues() |
|
128 | self.abort_queues() | |
220 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, |
|
129 | content = dict(parent['content']) | |
221 | content = dict(status='ok')) |
|
130 | msg = self.session.send(self.reply_socket, 'shutdown_reply', | |
222 | # we can know that a message is done if we *don't* use streams, but |
|
131 | content, parent, ident) | |
223 | # use a socket directly with MessageTracker |
|
132 | msg = self.session.send(self.pub_socket, 'shutdown_reply', | |
224 | time.sleep(.5) |
|
133 | content, parent, ident) | |
225 | os.kill(os.getpid(), SIGTERM) |
|
134 | # print >> sys.__stdout__, msg | |
226 | time.sleep(1) |
|
135 | time.sleep(0.1) | |
227 | os.kill(os.getpid(), SIGKILL) |
|
136 | sys.exit(0) | |
228 |
|
||||
229 | def clear_request(self, stream, idents, parent): |
|
|||
230 | """Clear our namespace.""" |
|
|||
231 | self.user_ns = {} |
|
|||
232 | msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, |
|
|||
233 | content = dict(status='ok')) |
|
|||
234 |
|
137 | |||
235 | def dispatch_control(self, msg): |
|
138 | def dispatch_control(self, msg): | |
236 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
139 | idents,msg = self.session.feed_identities(msg, copy=False) | |
@@ -274,6 +177,12 b' class Kernel(object):' | |||||
274 |
|
177 | |||
275 | #-------------------- queue handlers ----------------------------- |
|
178 | #-------------------- queue handlers ----------------------------- | |
276 |
|
179 | |||
|
180 | def clear_request(self, stream, idents, parent): | |||
|
181 | """Clear our namespace.""" | |||
|
182 | self.user_ns = {} | |||
|
183 | msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | |||
|
184 | content = dict(status='ok')) | |||
|
185 | ||||
277 | def execute_request(self, stream, ident, parent): |
|
186 | def execute_request(self, stream, ident, parent): | |
278 | try: |
|
187 | try: | |
279 | code = parent[u'content'][u'code'] |
|
188 | code = parent[u'content'][u'code'] | |
@@ -282,8 +191,8 b' class Kernel(object):' | |||||
282 | print(Message(parent), file=sys.__stderr__) |
|
191 | print(Message(parent), file=sys.__stderr__) | |
283 | return |
|
192 | return | |
284 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
193 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
285 | # self.pub_stream.send(pyin_msg) |
|
194 | # self.iopub_stream.send(pyin_msg) | |
286 | self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) |
|
195 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) | |
287 | try: |
|
196 | try: | |
288 | comp_code = self.compiler(code, '<zmq-kernel>') |
|
197 | comp_code = self.compiler(code, '<zmq-kernel>') | |
289 | # allow for not overriding displayhook |
|
198 | # allow for not overriding displayhook | |
@@ -301,7 +210,7 b' class Kernel(object):' | |||||
301 | u'evalue' : unicode(evalue) |
|
210 | u'evalue' : unicode(evalue) | |
302 | } |
|
211 | } | |
303 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
212 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
304 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) |
|
213 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) | |
305 | reply_content = exc_content |
|
214 | reply_content = exc_content | |
306 | else: |
|
215 | else: | |
307 | reply_content = {'status' : 'ok'} |
|
216 | reply_content = {'status' : 'ok'} | |
@@ -335,8 +244,8 b' class Kernel(object):' | |||||
335 | print(Message(parent), file=sys.__stderr__) |
|
244 | print(Message(parent), file=sys.__stderr__) | |
336 | return |
|
245 | return | |
337 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
246 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
338 | # self.pub_stream.send(pyin_msg) |
|
247 | # self.iopub_stream.send(pyin_msg) | |
339 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) |
|
248 | # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) | |
340 | sub = {'dependencies_met' : True, 'engine' : self.identity} |
|
249 | sub = {'dependencies_met' : True, 'engine' : self.identity} | |
341 | try: |
|
250 | try: | |
342 | # allow for not overriding displayhook |
|
251 | # allow for not overriding displayhook | |
@@ -384,7 +293,7 b' class Kernel(object):' | |||||
384 | u'evalue' : unicode(evalue) |
|
293 | u'evalue' : unicode(evalue) | |
385 | } |
|
294 | } | |
386 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
295 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
387 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) |
|
296 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) | |
388 | reply_content = exc_content |
|
297 | reply_content = exc_content | |
389 | result_buf = [] |
|
298 | result_buf = [] | |
390 |
|
299 | |||
@@ -415,7 +324,7 b' class Kernel(object):' | |||||
415 | reply_msg = self.session.send(stream, reply_type, |
|
324 | reply_msg = self.session.send(stream, reply_type, | |
416 | content={'status' : 'aborted'}, parent=msg, ident=idents) |
|
325 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |
417 | return |
|
326 | return | |
418 |
handler = self. |
|
327 | handler = self.shell_handlers.get(msg['msg_type'], None) | |
419 | if handler is None: |
|
328 | if handler is None: | |
420 | print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__) |
|
329 | print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__) | |
421 | else: |
|
330 | else: | |
@@ -426,27 +335,28 b' class Kernel(object):' | |||||
426 | if self.control_stream: |
|
335 | if self.control_stream: | |
427 | self.control_stream.on_recv(self.dispatch_control, copy=False) |
|
336 | self.control_stream.on_recv(self.dispatch_control, copy=False) | |
428 | self.control_stream.on_err(printer) |
|
337 | self.control_stream.on_err(printer) | |
429 | if self.reply_stream: |
|
338 | ||
430 |
|
|
339 | for s in self.shell_streams: | |
431 | self.dispatch_queue(self.reply_stream, msg), copy=False) |
|
340 | s.on_recv(lambda msg: | |
432 | self.reply_stream.on_err(printer) |
|
341 | self.dispatch_queue(s, msg), copy=False) | |
433 | if self.task_stream: |
|
342 | s.on_err(printer) | |
434 | self.task_stream.on_recv(lambda msg: |
|
343 | ||
435 | self.dispatch_queue(self.task_stream, msg), copy=False) |
|
344 | if self.iopub_stream: | |
436 |
self. |
|
345 | self.iopub_stream.on_err(printer) | |
|
346 | self.iopub_stream.on_send(printer) | |||
437 |
|
347 | |||
438 | #### while True mode: |
|
348 | #### while True mode: | |
439 | # while True: |
|
349 | # while True: | |
440 | # idle = True |
|
350 | # idle = True | |
441 | # try: |
|
351 | # try: | |
442 |
# msg = self. |
|
352 | # msg = self.shell_stream.socket.recv_multipart( | |
443 | # zmq.NOBLOCK, copy=False) |
|
353 | # zmq.NOBLOCK, copy=False) | |
444 | # except zmq.ZMQError, e: |
|
354 | # except zmq.ZMQError, e: | |
445 | # if e.errno != zmq.EAGAIN: |
|
355 | # if e.errno != zmq.EAGAIN: | |
446 | # raise e |
|
356 | # raise e | |
447 | # else: |
|
357 | # else: | |
448 | # idle=False |
|
358 | # idle=False | |
449 |
# self.dispatch_queue(self. |
|
359 | # self.dispatch_queue(self.shell_stream, msg) | |
450 | # |
|
360 | # | |
451 | # if not self.task_stream.empty(): |
|
361 | # if not self.task_stream.empty(): | |
452 | # idle=False |
|
362 | # idle=False | |
@@ -456,50 +366,48 b' class Kernel(object):' | |||||
456 | # # don't busywait |
|
366 | # # don't busywait | |
457 | # time.sleep(1e-3) |
|
367 | # time.sleep(1e-3) | |
458 |
|
368 | |||
459 |
|
369 | def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, | ||
460 | def main(): |
|
370 | client_addr=None, loop=None, context=None): | |
461 | raise Exception("Don't run me anymore") |
|
371 | # create loop, context, and session: | |
462 | loop = ioloop.IOLoop.instance() |
|
372 | if loop is None: | |
463 | c = zmq.Context() |
|
373 | loop = ioloop.IOLoop.instance() | |
464 |
|
374 | if context is None: | ||
465 | ip = '127.0.0.1' |
|
375 | context = zmq.Context() | |
466 | port_base = 5575 |
|
376 | c = context | |
467 | connection = ('tcp://%s' % ip) + ':%i' |
|
377 | session = StreamSession() | |
468 | rep_conn = connection % port_base |
|
378 | print (control_addr, shell_addrs, iopub_addr, hb_addrs) | |
469 | pub_conn = connection % (port_base+1) |
|
379 | ||
470 |
|
380 | # create Control Stream | ||
471 | print("Starting the kernel...", file=sys.__stdout__) |
|
381 | control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) | |
472 | # print >>sys.__stdout__, "XREQ Channel:", rep_conn |
|
382 | control_stream.setsockopt(zmq.IDENTITY, identity) | |
473 | # print >>sys.__stdout__, "PUB Channel:", pub_conn |
|
383 | control_stream.connect(control_addr) | |
474 |
|
384 | |||
475 | session = StreamSession(username=u'kernel') |
|
385 | # create Shell Streams (MUX, Task, etc.): | |
476 |
|
386 | shell_streams = [] | ||
477 | reply_socket = c.socket(zmq.XREQ) |
|
387 | for addr in shell_addrs: | |
478 | reply_socket.connect(rep_conn) |
|
388 | stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) | |
479 |
|
389 | stream.setsockopt(zmq.IDENTITY, identity) | ||
480 | pub_socket = c.socket(zmq.PUB) |
|
390 | stream.connect(addr) | |
481 | pub_socket.connect(pub_conn) |
|
391 | shell_streams.append(stream) | |
482 |
|
||||
483 | stdout = OutStream(session, pub_socket, u'stdout') |
|
|||
484 | stderr = OutStream(session, pub_socket, u'stderr') |
|
|||
485 | sys.stdout = stdout |
|
|||
486 | sys.stderr = stderr |
|
|||
487 |
|
||||
488 | display_hook = DisplayHook(session, pub_socket) |
|
|||
489 | sys.displayhook = display_hook |
|
|||
490 | reply_stream = zmqstream.ZMQStream(reply_socket,loop) |
|
|||
491 | pub_stream = zmqstream.ZMQStream(pub_socket,loop) |
|
|||
492 | kernel = Kernel(session, reply_stream, pub_stream) |
|
|||
493 |
|
||||
494 | # For debugging convenience, put sleep and a string in the namespace, so we |
|
|||
495 | # have them every time we start. |
|
|||
496 | kernel.user_ns['sleep'] = time.sleep |
|
|||
497 | kernel.user_ns['s'] = 'Test string' |
|
|||
498 |
|
392 | |||
499 | print ("Use Ctrl-\\ (NOT Ctrl-C!) to terminate.", file=sys.__stdout__) |
|
393 | # create iopub stream: | |
|
394 | iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop) | |||
|
395 | iopub_stream.setsockopt(zmq.IDENTITY, identity) | |||
|
396 | iopub_stream.connect(iopub_addr) | |||
|
397 | ||||
|
398 | # launch heartbeat | |||
|
399 | heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) | |||
|
400 | heart.start() | |||
|
401 | ||||
|
402 | # create (optional) Client | |||
|
403 | if client_addr: | |||
|
404 | client = Client(client_addr, username=identity) | |||
|
405 | else: | |||
|
406 | client = None | |||
|
407 | ||||
|
408 | kernel = Kernel(session=session, control_stream=control_stream, | |||
|
409 | shell_streams=shell_streams, iopub_stream=iopub_stream, | |||
|
410 | client=client) | |||
500 | kernel.start() |
|
411 | kernel.start() | |
501 | loop.start() |
|
412 | return loop, c | |
502 |
|
||||
503 |
|
413 | |||
504 | if __name__ == '__main__': |
|
|||
505 | main() |
|
General Comments 0
You need to be logged in to leave comments.
Login now