Show More
@@ -1,148 +1,132 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """A simple engine that talks to a controller over 0MQ. |
|
2 | """A simple engine that talks to a controller over 0MQ. | |
3 | it handles registration, etc. and launches a kernel |
|
3 | it handles registration, etc. and launches a kernel | |
4 | connected to the Controller's queue(s). |
|
4 | connected to the Controller's queue(s). | |
5 | """ |
|
5 | """ | |
6 | from __future__ import print_function |
|
6 | from __future__ import print_function | |
7 | import sys |
|
7 | import sys | |
8 | import time |
|
8 | import time | |
9 | import traceback |
|
9 | import traceback | |
10 | import uuid |
|
10 | import uuid | |
11 | from pprint import pprint |
|
11 | from pprint import pprint | |
12 |
|
12 | |||
13 | import zmq |
|
13 | import zmq | |
14 | from zmq.eventloop import ioloop, zmqstream |
|
14 | from zmq.eventloop import ioloop, zmqstream | |
15 |
|
15 | |||
|
16 | from IPython.utils.traitlets import HasTraits | |||
|
17 | from IPython.utils.localinterfaces import LOCALHOST | |||
|
18 | ||||
16 | from streamsession import Message, StreamSession |
|
19 | from streamsession import Message, StreamSession | |
17 | from client import Client |
|
20 | from client import Client | |
18 |
|
|
21 | from streamkernel import Kernel, make_kernel | |
19 | import heartmonitor |
|
22 | import heartmonitor | |
20 | from entry_point import make_base_argument_parser, connect_logger, parse_url |
|
23 | from entry_point import make_base_argument_parser, connect_logger, parse_url | |
21 | # import taskthread |
|
24 | # import taskthread | |
22 | # from log import logger |
|
25 | # from log import logger | |
23 |
|
26 | |||
24 |
|
27 | |||
25 | def printer(*msg): |
|
28 | def printer(*msg): | |
26 | pprint(msg) |
|
29 | pprint(msg) | |
27 |
|
30 | |||
28 | class Engine(object): |
|
31 | class Engine(object): | |
29 | """IPython engine""" |
|
32 | """IPython engine""" | |
30 |
|
33 | |||
31 | id=None |
|
34 | id=None | |
32 | context=None |
|
35 | context=None | |
33 | loop=None |
|
36 | loop=None | |
34 | session=None |
|
37 | session=None | |
35 | ident=None |
|
38 | ident=None | |
36 | registrar=None |
|
39 | registrar=None | |
37 | heart=None |
|
40 | heart=None | |
38 | kernel=None |
|
41 | kernel=None | |
39 |
|
42 | |||
40 | def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None): |
|
43 | def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None): | |
41 | self.context = context |
|
44 | self.context = context | |
42 | self.loop = loop |
|
45 | self.loop = loop | |
43 | self.session = session |
|
46 | self.session = session | |
44 | self.registrar = registrar |
|
47 | self.registrar = registrar | |
45 | self.client = client |
|
48 | self.client = client | |
46 | self.ident = ident if ident else str(uuid.uuid4()) |
|
49 | self.ident = ident if ident else str(uuid.uuid4()) | |
47 | self.registrar.on_send(printer) |
|
50 | self.registrar.on_send(printer) | |
48 |
|
51 | |||
49 | def register(self): |
|
52 | def register(self): | |
50 |
|
53 | |||
51 | content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) |
|
54 | content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) | |
52 | self.registrar.on_recv(self.complete_registration) |
|
55 | self.registrar.on_recv(self.complete_registration) | |
53 | self.session.send(self.registrar, "registration_request",content=content) |
|
56 | self.session.send(self.registrar, "registration_request",content=content) | |
54 |
|
57 | |||
55 | def complete_registration(self, msg): |
|
58 | def complete_registration(self, msg): | |
56 | # print msg |
|
59 | # print msg | |
57 | idents,msg = self.session.feed_identities(msg) |
|
60 | idents,msg = self.session.feed_identities(msg) | |
58 | msg = Message(self.session.unpack_message(msg)) |
|
61 | msg = Message(self.session.unpack_message(msg)) | |
59 | if msg.content.status == 'ok': |
|
62 | if msg.content.status == 'ok': | |
60 | self.session.username = str(msg.content.id) |
|
63 | self.session.username = str(msg.content.id) | |
61 | queue_addr = msg.content.queue |
|
64 | queue_addr = msg.content.queue | |
62 |
|
|
65 | shell_addrs = [str(queue_addr)] | |
63 | queue = self.context.socket(zmq.PAIR) |
|
66 | control_addr = str(msg.content.control) | |
64 | queue.setsockopt(zmq.IDENTITY, self.ident) |
|
|||
65 | queue.connect(str(queue_addr)) |
|
|||
66 | self.queue = zmqstream.ZMQStream(queue, self.loop) |
|
|||
67 |
|
||||
68 | control_addr = msg.content.control |
|
|||
69 | if control_addr: |
|
|||
70 | control = self.context.socket(zmq.PAIR) |
|
|||
71 | control.setsockopt(zmq.IDENTITY, self.ident) |
|
|||
72 | control.connect(str(control_addr)) |
|
|||
73 | self.control = zmqstream.ZMQStream(control, self.loop) |
|
|||
74 |
|
||||
75 | task_addr = msg.content.task |
|
67 | task_addr = msg.content.task | |
76 | print (task_addr) |
|
|||
77 | if task_addr: |
|
68 | if task_addr: | |
78 | # task as stream: |
|
69 | shell_addrs.append(str(task_addr)) | |
79 | task = self.context.socket(zmq.PAIR) |
|
70 | ||
80 | task.setsockopt(zmq.IDENTITY, self.ident) |
|
71 | hb_addrs = msg.content.heartbeat | |
81 | task.connect(str(task_addr)) |
|
|||
82 | self.task_stream = zmqstream.ZMQStream(task, self.loop) |
|
|||
83 | # TaskThread: |
|
|||
84 | # mon_addr = msg.content.monitor |
|
|||
85 | # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident) |
|
|||
86 | # task.connect_in(str(task_addr)) |
|
|||
87 | # task.connect_out(str(mon_addr)) |
|
|||
88 | # self.task_stream = taskthread.QueueStream(*task.queues) |
|
|||
89 | # task.start() |
|
|||
90 |
|
||||
91 | hbs = msg.content.heartbeat |
|
|||
92 | self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident) |
|
|||
93 | self.heart.start() |
|
|||
94 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() |
|
72 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() | |
95 | # placeholder for now: |
|
73 | ||
96 | pub = self.context.socket(zmq.PUB) |
|
74 | # placeholder for no, since pub isn't hooked up: | |
97 | pub = zmqstream.ZMQStream(pub, self.loop) |
|
75 | sub = self.context.socket(zmq.SUB) | |
98 | # create and start the kernel |
|
76 | sub = zmqstream.ZMQStream(sub, self.loop) | |
99 | self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client) |
|
77 | sub.on_recv(lambda *a: None) | |
100 | self.kernel.start() |
|
78 | port = sub.bind_to_random_port("tcp://%s"%LOCALHOST) | |
|
79 | iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345) | |||
|
80 | ||||
|
81 | make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs, | |||
|
82 | client_addr=None, loop=self.loop, context=self.context) | |||
|
83 | ||||
101 | else: |
|
84 | else: | |
102 | # logger.error("Registration Failed: %s"%msg) |
|
85 | # logger.error("Registration Failed: %s"%msg) | |
103 | raise Exception("Registration Failed: %s"%msg) |
|
86 | raise Exception("Registration Failed: %s"%msg) | |
104 |
|
87 | |||
105 | # logger.info("engine::completed registration with id %s"%self.session.username) |
|
88 | # logger.info("engine::completed registration with id %s"%self.session.username) | |
106 |
|
89 | |||
107 | print (msg) |
|
90 | print (msg) | |
108 |
|
91 | |||
109 | def unregister(self): |
|
92 | def unregister(self): | |
110 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) |
|
93 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) | |
111 | time.sleep(1) |
|
94 | time.sleep(1) | |
112 | sys.exit(0) |
|
95 | sys.exit(0) | |
113 |
|
96 | |||
114 | def start(self): |
|
97 | def start(self): | |
115 | print ("registering") |
|
98 | print ("registering") | |
116 | self.register() |
|
99 | self.register() | |
117 |
|
100 | |||
118 |
|
101 | |||
|
102 | ||||
119 | def main(): |
|
103 | def main(): | |
120 |
|
104 | |||
121 | parser = make_base_argument_parser() |
|
105 | parser = make_base_argument_parser() | |
122 |
|
106 | |||
123 | args = parser.parse_args() |
|
107 | args = parser.parse_args() | |
124 |
|
108 | |||
125 | parse_url(args) |
|
109 | parse_url(args) | |
126 |
|
110 | |||
127 | iface="%s://%s"%(args.transport,args.ip)+':%i' |
|
111 | iface="%s://%s"%(args.transport,args.ip)+':%i' | |
128 |
|
112 | |||
129 | loop = ioloop.IOLoop.instance() |
|
113 | loop = ioloop.IOLoop.instance() | |
130 | session = StreamSession() |
|
114 | session = StreamSession() | |
131 | ctx = zmq.Context() |
|
115 | ctx = zmq.Context() | |
132 |
|
116 | |||
133 | # setup logging |
|
117 | # setup logging | |
134 | connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) |
|
118 | connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) | |
135 |
|
119 | |||
136 | reg_conn = iface % args.regport |
|
120 | reg_conn = iface % args.regport | |
137 | print (reg_conn) |
|
121 | print (reg_conn) | |
138 | print ("Starting the engine...", file=sys.__stderr__) |
|
122 | print ("Starting the engine...", file=sys.__stderr__) | |
139 |
|
123 | |||
140 | reg = ctx.socket(zmq.PAIR) |
|
124 | reg = ctx.socket(zmq.PAIR) | |
141 | reg.connect(reg_conn) |
|
125 | reg.connect(reg_conn) | |
142 | reg = zmqstream.ZMQStream(reg, loop) |
|
126 | reg = zmqstream.ZMQStream(reg, loop) | |
143 | client = Client(reg_conn) |
|
127 | client = Client(reg_conn) | |
144 |
|
128 | |||
145 | e = Engine(ctx, loop, session, reg, client, args.ident) |
|
129 | e = Engine(ctx, loop, session, reg, client, args.ident) | |
146 | dc = ioloop.DelayedCallback(e.start, 100, loop) |
|
130 | dc = ioloop.DelayedCallback(e.start, 100, loop) | |
147 | dc.start() |
|
131 | dc.start() | |
148 | loop.start() No newline at end of file |
|
132 | loop.start() |
@@ -1,505 +1,413 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | """ |
|
2 | """ | |
3 | Kernel adapted from kernel.py to use ZMQ Streams |
|
3 | Kernel adapted from kernel.py to use ZMQ Streams | |
4 | """ |
|
4 | """ | |
5 |
|
5 | |||
|
6 | #----------------------------------------------------------------------------- | |||
|
7 | # Imports | |||
|
8 | #----------------------------------------------------------------------------- | |||
|
9 | ||||
|
10 | # Standard library imports. | |||
6 | from __future__ import print_function |
|
11 | from __future__ import print_function | |
7 | import __builtin__ |
|
12 | import __builtin__ | |
|
13 | from code import CommandCompiler | |||
8 | import os |
|
14 | import os | |
9 | import sys |
|
15 | import sys | |
10 | import time |
|
16 | import time | |
11 | import traceback |
|
17 | import traceback | |
12 | from signal import SIGTERM, SIGKILL |
|
18 | from signal import SIGTERM, SIGKILL | |
13 | from pprint import pprint |
|
19 | from pprint import pprint | |
14 |
|
20 | |||
15 | from code import CommandCompiler |
|
21 | # System library imports. | |
16 |
|
||||
17 | import zmq |
|
22 | import zmq | |
18 | from zmq.eventloop import ioloop, zmqstream |
|
23 | from zmq.eventloop import ioloop, zmqstream | |
19 |
|
24 | |||
|
25 | # Local imports. | |||
|
26 | from IPython.utils.traitlets import HasTraits, Instance, List | |||
20 | from IPython.zmq.completer import KernelCompleter |
|
27 | from IPython.zmq.completer import KernelCompleter | |
21 |
|
28 | |||
22 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ |
|
29 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ | |
23 | unpack_apply_message |
|
30 | unpack_apply_message | |
24 | from dependency import UnmetDependency |
|
31 | from dependency import UnmetDependency | |
|
32 | import heartmonitor | |||
|
33 | from client import Client | |||
25 |
|
34 | |||
26 | def printer(*args): |
|
35 | def printer(*args): | |
27 | pprint(args) |
|
36 | pprint(args) | |
28 |
|
37 | |||
29 | class OutStream(object): |
|
38 | #----------------------------------------------------------------------------- | |
30 | """A file like object that publishes the stream to a 0MQ PUB socket.""" |
|
39 | # Main kernel class | |
31 |
|
40 | #----------------------------------------------------------------------------- | ||
32 | def __init__(self, session, pub_socket, name, max_buffer=200): |
|
|||
33 | self.session = session |
|
|||
34 | self.pub_socket = pub_socket |
|
|||
35 | self.name = name |
|
|||
36 | self._buffer = [] |
|
|||
37 | self._buffer_len = 0 |
|
|||
38 | self.max_buffer = max_buffer |
|
|||
39 | self.parent_header = {} |
|
|||
40 |
|
||||
41 | def set_parent(self, parent): |
|
|||
42 | self.parent_header = extract_header(parent) |
|
|||
43 |
|
||||
44 | def close(self): |
|
|||
45 | self.pub_socket = None |
|
|||
46 |
|
||||
47 | def flush(self): |
|
|||
48 | if self.pub_socket is None: |
|
|||
49 | raise ValueError(u'I/O operation on closed file') |
|
|||
50 | else: |
|
|||
51 | if self._buffer: |
|
|||
52 | data = ''.join(self._buffer) |
|
|||
53 | content = {u'name':self.name, u'data':data} |
|
|||
54 | # msg = self.session.msg(u'stream', content=content, |
|
|||
55 | # parent=self.parent_header) |
|
|||
56 | msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header) |
|
|||
57 | # print>>sys.__stdout__, Message(msg) |
|
|||
58 | # self.pub_socket.send_json(msg) |
|
|||
59 | self._buffer_len = 0 |
|
|||
60 | self._buffer = [] |
|
|||
61 |
|
||||
62 | def isattr(self): |
|
|||
63 | return False |
|
|||
64 |
|
||||
65 | def next(self): |
|
|||
66 | raise IOError('Read not supported on a write only stream.') |
|
|||
67 |
|
||||
68 | def read(self, size=None): |
|
|||
69 | raise IOError('Read not supported on a write only stream.') |
|
|||
70 |
|
||||
71 | readline=read |
|
|||
72 |
|
||||
73 | def write(self, s): |
|
|||
74 | if self.pub_socket is None: |
|
|||
75 | raise ValueError('I/O operation on closed file') |
|
|||
76 | else: |
|
|||
77 | self._buffer.append(s) |
|
|||
78 | self._buffer_len += len(s) |
|
|||
79 | self._maybe_send() |
|
|||
80 |
|
||||
81 | def _maybe_send(self): |
|
|||
82 | if '\n' in self._buffer[-1]: |
|
|||
83 | self.flush() |
|
|||
84 | if self._buffer_len > self.max_buffer: |
|
|||
85 | self.flush() |
|
|||
86 |
|
||||
87 | def writelines(self, sequence): |
|
|||
88 | if self.pub_socket is None: |
|
|||
89 | raise ValueError('I/O operation on closed file') |
|
|||
90 | else: |
|
|||
91 | for s in sequence: |
|
|||
92 | self.write(s) |
|
|||
93 |
|
||||
94 |
|
||||
95 | class DisplayHook(object): |
|
|||
96 |
|
||||
97 | def __init__(self, session, pub_socket): |
|
|||
98 | self.session = session |
|
|||
99 | self.pub_socket = pub_socket |
|
|||
100 | self.parent_header = {} |
|
|||
101 |
|
||||
102 | def __call__(self, obj): |
|
|||
103 | if obj is None: |
|
|||
104 | return |
|
|||
105 |
|
||||
106 | __builtin__._ = obj |
|
|||
107 | # msg = self.session.msg(u'pyout', {u'data':repr(obj)}, |
|
|||
108 | # parent=self.parent_header) |
|
|||
109 | # self.pub_socket.send_json(msg) |
|
|||
110 | self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header) |
|
|||
111 |
|
||||
112 | def set_parent(self, parent): |
|
|||
113 | self.parent_header = extract_header(parent) |
|
|||
114 |
|
41 | |||
|
42 | class Kernel(HasTraits): | |||
115 |
|
43 | |||
116 | class RawInput(object): |
|
44 | #--------------------------------------------------------------------------- | |
|
45 | # Kernel interface | |||
|
46 | #--------------------------------------------------------------------------- | |||
117 |
|
47 | |||
118 | def __init__(self, session, socket): |
|
48 | session = Instance(StreamSession) | |
119 | self.session = session |
|
49 | shell_streams = Instance(list) | |
120 | self.socket = socket |
|
50 | control_stream = Instance(zmqstream.ZMQStream) | |
|
51 | task_stream = Instance(zmqstream.ZMQStream) | |||
|
52 | iopub_stream = Instance(zmqstream.ZMQStream) | |||
|
53 | client = Instance(Client) | |||
121 |
|
54 | |||
122 |
def __ |
|
55 | def __init__(self, **kwargs): | |
123 | msg = self.session.msg(u'raw_input') |
|
56 | super(Kernel, self).__init__(**kwargs) | |
124 | self.socket.send_json(msg) |
|
57 | self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) | |
125 | while True: |
|
|||
126 | try: |
|
|||
127 | reply = self.socket.recv_json(zmq.NOBLOCK) |
|
|||
128 | except zmq.ZMQError as e: |
|
|||
129 | if e.errno == zmq.EAGAIN: |
|
|||
130 | pass |
|
|||
131 | else: |
|
|||
132 | raise |
|
|||
133 | else: |
|
|||
134 | break |
|
|||
135 | return reply[u'content'][u'data'] |
|
|||
136 |
|
||||
137 |
|
||||
138 | class Kernel(object): |
|
|||
139 |
|
||||
140 | def __init__(self, session, control_stream, reply_stream, pub_stream, |
|
|||
141 | task_stream=None, client=None): |
|
|||
142 | self.session = session |
|
|||
143 | self.control_stream = control_stream |
|
|||
144 | # self.control_socket = control_stream.socket |
|
|||
145 | self.reply_stream = reply_stream |
|
|||
146 | self.identity = self.reply_stream.getsockopt(zmq.IDENTITY) |
|
|||
147 | self.task_stream = task_stream |
|
|||
148 | self.pub_stream = pub_stream |
|
|||
149 | self.client = client |
|
|||
150 | self.user_ns = {} |
|
58 | self.user_ns = {} | |
151 | self.history = [] |
|
59 | self.history = [] | |
152 | self.compiler = CommandCompiler() |
|
60 | self.compiler = CommandCompiler() | |
153 | self.completer = KernelCompleter(self.user_ns) |
|
61 | self.completer = KernelCompleter(self.user_ns) | |
154 | self.aborted = set() |
|
62 | self.aborted = set() | |
155 |
|
63 | |||
156 | # Build dict of handlers for message types |
|
64 | # Build dict of handlers for message types | |
157 |
self. |
|
65 | self.shell_handlers = {} | |
158 | self.control_handlers = {} |
|
66 | self.control_handlers = {} | |
159 |
for msg_type in ['execute_request', 'complete_request', 'apply_request' |
|
67 | for msg_type in ['execute_request', 'complete_request', 'apply_request', | |
160 | self.queue_handlers[msg_type] = getattr(self, msg_type) |
|
68 | 'clear_request']: | |
|
69 | self.shell_handlers[msg_type] = getattr(self, msg_type) | |||
161 |
|
70 | |||
162 |
for msg_type in [' |
|
71 | for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys(): | |
163 | self.control_handlers[msg_type] = getattr(self, msg_type) |
|
72 | self.control_handlers[msg_type] = getattr(self, msg_type) | |
164 |
|
73 | |||
165 | #-------------------- control handlers ----------------------------- |
|
74 | #-------------------- control handlers ----------------------------- | |
166 | def abort_queues(self): |
|
75 | def abort_queues(self): | |
167 |
for stream in |
|
76 | for stream in self.shell_streams: | |
168 | if stream: |
|
77 | if stream: | |
169 | self.abort_queue(stream) |
|
78 | self.abort_queue(stream) | |
170 |
|
79 | |||
171 | def abort_queue(self, stream): |
|
80 | def abort_queue(self, stream): | |
172 | while True: |
|
81 | while True: | |
173 | try: |
|
82 | try: | |
174 | msg = self.session.recv(stream, zmq.NOBLOCK,content=True) |
|
83 | msg = self.session.recv(stream, zmq.NOBLOCK,content=True) | |
175 | except zmq.ZMQError as e: |
|
84 | except zmq.ZMQError as e: | |
176 | if e.errno == zmq.EAGAIN: |
|
85 | if e.errno == zmq.EAGAIN: | |
177 | break |
|
86 | break | |
178 | else: |
|
87 | else: | |
179 | return |
|
88 | return | |
180 | else: |
|
89 | else: | |
181 | if msg is None: |
|
90 | if msg is None: | |
182 | return |
|
91 | return | |
183 | else: |
|
92 | else: | |
184 | idents,msg = msg |
|
93 | idents,msg = msg | |
185 |
|
94 | |||
186 | # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." |
|
95 | # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." | |
187 | # msg = self.reply_socket.recv_json() |
|
96 | # msg = self.reply_socket.recv_json() | |
188 | print ("Aborting:", file=sys.__stdout__) |
|
97 | print ("Aborting:", file=sys.__stdout__) | |
189 | print (Message(msg), file=sys.__stdout__) |
|
98 | print (Message(msg), file=sys.__stdout__) | |
190 | msg_type = msg['msg_type'] |
|
99 | msg_type = msg['msg_type'] | |
191 | reply_type = msg_type.split('_')[0] + '_reply' |
|
100 | reply_type = msg_type.split('_')[0] + '_reply' | |
192 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) |
|
101 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | |
193 | # self.reply_socket.send(ident,zmq.SNDMORE) |
|
102 | # self.reply_socket.send(ident,zmq.SNDMORE) | |
194 | # self.reply_socket.send_json(reply_msg) |
|
103 | # self.reply_socket.send_json(reply_msg) | |
195 | reply_msg = self.session.send(stream, reply_type, |
|
104 | reply_msg = self.session.send(stream, reply_type, | |
196 | content={'status' : 'aborted'}, parent=msg, ident=idents)[0] |
|
105 | content={'status' : 'aborted'}, parent=msg, ident=idents)[0] | |
197 | print(Message(reply_msg), file=sys.__stdout__) |
|
106 | print(Message(reply_msg), file=sys.__stdout__) | |
198 | # We need to wait a bit for requests to come in. This can probably |
|
107 | # We need to wait a bit for requests to come in. This can probably | |
199 | # be set shorter for true asynchronous clients. |
|
108 | # be set shorter for true asynchronous clients. | |
200 | time.sleep(0.05) |
|
109 | time.sleep(0.05) | |
201 |
|
110 | |||
202 | def abort_request(self, stream, ident, parent): |
|
111 | def abort_request(self, stream, ident, parent): | |
203 | """abort a specifig msg by id""" |
|
112 | """abort a specifig msg by id""" | |
204 | msg_ids = parent['content'].get('msg_ids', None) |
|
113 | msg_ids = parent['content'].get('msg_ids', None) | |
205 | if isinstance(msg_ids, basestring): |
|
114 | if isinstance(msg_ids, basestring): | |
206 | msg_ids = [msg_ids] |
|
115 | msg_ids = [msg_ids] | |
207 | if not msg_ids: |
|
116 | if not msg_ids: | |
208 | self.abort_queues() |
|
117 | self.abort_queues() | |
209 | for mid in msg_ids: |
|
118 | for mid in msg_ids: | |
210 | self.aborted.add(str(mid)) |
|
119 | self.aborted.add(str(mid)) | |
211 |
|
120 | |||
212 | content = dict(status='ok') |
|
121 | content = dict(status='ok') | |
213 | reply_msg = self.session.send(stream, 'abort_reply', content=content, |
|
122 | reply_msg = self.session.send(stream, 'abort_reply', content=content, | |
214 | parent=parent, ident=ident)[0] |
|
123 | parent=parent, ident=ident)[0] | |
215 | print(Message(reply_msg), file=sys.__stdout__) |
|
124 | print(Message(reply_msg), file=sys.__stdout__) | |
216 |
|
125 | |||
217 |
def |
|
126 | def shutdown_request(self, stream, ident, parent): | |
218 | """kill ourself. This should really be handled in an external process""" |
|
127 | """kill ourself. This should really be handled in an external process""" | |
219 | self.abort_queues() |
|
128 | self.abort_queues() | |
220 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, |
|
129 | content = dict(parent['content']) | |
221 | content = dict(status='ok')) |
|
130 | msg = self.session.send(self.reply_socket, 'shutdown_reply', | |
222 | # we can know that a message is done if we *don't* use streams, but |
|
131 | content, parent, ident) | |
223 | # use a socket directly with MessageTracker |
|
132 | msg = self.session.send(self.pub_socket, 'shutdown_reply', | |
224 | time.sleep(.5) |
|
133 | content, parent, ident) | |
225 | os.kill(os.getpid(), SIGTERM) |
|
134 | # print >> sys.__stdout__, msg | |
226 | time.sleep(1) |
|
135 | time.sleep(0.1) | |
227 | os.kill(os.getpid(), SIGKILL) |
|
136 | sys.exit(0) | |
228 |
|
||||
229 | def clear_request(self, stream, idents, parent): |
|
|||
230 | """Clear our namespace.""" |
|
|||
231 | self.user_ns = {} |
|
|||
232 | msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, |
|
|||
233 | content = dict(status='ok')) |
|
|||
234 |
|
137 | |||
235 | def dispatch_control(self, msg): |
|
138 | def dispatch_control(self, msg): | |
236 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
139 | idents,msg = self.session.feed_identities(msg, copy=False) | |
237 | msg = self.session.unpack_message(msg, content=True, copy=False) |
|
140 | msg = self.session.unpack_message(msg, content=True, copy=False) | |
238 |
|
141 | |||
239 | header = msg['header'] |
|
142 | header = msg['header'] | |
240 | msg_id = header['msg_id'] |
|
143 | msg_id = header['msg_id'] | |
241 |
|
144 | |||
242 | handler = self.control_handlers.get(msg['msg_type'], None) |
|
145 | handler = self.control_handlers.get(msg['msg_type'], None) | |
243 | if handler is None: |
|
146 | if handler is None: | |
244 | print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__) |
|
147 | print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__) | |
245 | else: |
|
148 | else: | |
246 | handler(self.control_stream, idents, msg) |
|
149 | handler(self.control_stream, idents, msg) | |
247 |
|
150 | |||
248 |
|
151 | |||
249 | #-------------------- queue helpers ------------------------------ |
|
152 | #-------------------- queue helpers ------------------------------ | |
250 |
|
153 | |||
251 | def check_dependencies(self, dependencies): |
|
154 | def check_dependencies(self, dependencies): | |
252 | if not dependencies: |
|
155 | if not dependencies: | |
253 | return True |
|
156 | return True | |
254 | if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): |
|
157 | if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): | |
255 | anyorall = dependencies[0] |
|
158 | anyorall = dependencies[0] | |
256 | dependencies = dependencies[1] |
|
159 | dependencies = dependencies[1] | |
257 | else: |
|
160 | else: | |
258 | anyorall = 'all' |
|
161 | anyorall = 'all' | |
259 | results = self.client.get_results(dependencies,status_only=True) |
|
162 | results = self.client.get_results(dependencies,status_only=True) | |
260 | if results['status'] != 'ok': |
|
163 | if results['status'] != 'ok': | |
261 | return False |
|
164 | return False | |
262 |
|
165 | |||
263 | if anyorall == 'any': |
|
166 | if anyorall == 'any': | |
264 | if not results['completed']: |
|
167 | if not results['completed']: | |
265 | return False |
|
168 | return False | |
266 | else: |
|
169 | else: | |
267 | if results['pending']: |
|
170 | if results['pending']: | |
268 | return False |
|
171 | return False | |
269 |
|
172 | |||
270 | return True |
|
173 | return True | |
271 |
|
174 | |||
272 | def check_aborted(self, msg_id): |
|
175 | def check_aborted(self, msg_id): | |
273 | return msg_id in self.aborted |
|
176 | return msg_id in self.aborted | |
274 |
|
177 | |||
275 | #-------------------- queue handlers ----------------------------- |
|
178 | #-------------------- queue handlers ----------------------------- | |
276 |
|
179 | |||
|
180 | def clear_request(self, stream, idents, parent): | |||
|
181 | """Clear our namespace.""" | |||
|
182 | self.user_ns = {} | |||
|
183 | msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | |||
|
184 | content = dict(status='ok')) | |||
|
185 | ||||
277 | def execute_request(self, stream, ident, parent): |
|
186 | def execute_request(self, stream, ident, parent): | |
278 | try: |
|
187 | try: | |
279 | code = parent[u'content'][u'code'] |
|
188 | code = parent[u'content'][u'code'] | |
280 | except: |
|
189 | except: | |
281 | print("Got bad msg: ", file=sys.__stderr__) |
|
190 | print("Got bad msg: ", file=sys.__stderr__) | |
282 | print(Message(parent), file=sys.__stderr__) |
|
191 | print(Message(parent), file=sys.__stderr__) | |
283 | return |
|
192 | return | |
284 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
193 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
285 | # self.pub_stream.send(pyin_msg) |
|
194 | # self.iopub_stream.send(pyin_msg) | |
286 | self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) |
|
195 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) | |
287 | try: |
|
196 | try: | |
288 | comp_code = self.compiler(code, '<zmq-kernel>') |
|
197 | comp_code = self.compiler(code, '<zmq-kernel>') | |
289 | # allow for not overriding displayhook |
|
198 | # allow for not overriding displayhook | |
290 | if hasattr(sys.displayhook, 'set_parent'): |
|
199 | if hasattr(sys.displayhook, 'set_parent'): | |
291 | sys.displayhook.set_parent(parent) |
|
200 | sys.displayhook.set_parent(parent) | |
292 | exec comp_code in self.user_ns, self.user_ns |
|
201 | exec comp_code in self.user_ns, self.user_ns | |
293 | except: |
|
202 | except: | |
294 | # result = u'error' |
|
203 | # result = u'error' | |
295 | etype, evalue, tb = sys.exc_info() |
|
204 | etype, evalue, tb = sys.exc_info() | |
296 | tb = traceback.format_exception(etype, evalue, tb) |
|
205 | tb = traceback.format_exception(etype, evalue, tb) | |
297 | exc_content = { |
|
206 | exc_content = { | |
298 | u'status' : u'error', |
|
207 | u'status' : u'error', | |
299 | u'traceback' : tb, |
|
208 | u'traceback' : tb, | |
300 | u'etype' : unicode(etype), |
|
209 | u'etype' : unicode(etype), | |
301 | u'evalue' : unicode(evalue) |
|
210 | u'evalue' : unicode(evalue) | |
302 | } |
|
211 | } | |
303 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
212 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
304 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) |
|
213 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) | |
305 | reply_content = exc_content |
|
214 | reply_content = exc_content | |
306 | else: |
|
215 | else: | |
307 | reply_content = {'status' : 'ok'} |
|
216 | reply_content = {'status' : 'ok'} | |
308 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) |
|
217 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | |
309 | # self.reply_socket.send(ident, zmq.SNDMORE) |
|
218 | # self.reply_socket.send(ident, zmq.SNDMORE) | |
310 | # self.reply_socket.send_json(reply_msg) |
|
219 | # self.reply_socket.send_json(reply_msg) | |
311 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident) |
|
220 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident) | |
312 | print(Message(reply_msg), file=sys.__stdout__) |
|
221 | print(Message(reply_msg), file=sys.__stdout__) | |
313 | if reply_msg['content']['status'] == u'error': |
|
222 | if reply_msg['content']['status'] == u'error': | |
314 | self.abort_queues() |
|
223 | self.abort_queues() | |
315 |
|
224 | |||
316 | def complete_request(self, stream, ident, parent): |
|
225 | def complete_request(self, stream, ident, parent): | |
317 | matches = {'matches' : self.complete(parent), |
|
226 | matches = {'matches' : self.complete(parent), | |
318 | 'status' : 'ok'} |
|
227 | 'status' : 'ok'} | |
319 | completion_msg = self.session.send(stream, 'complete_reply', |
|
228 | completion_msg = self.session.send(stream, 'complete_reply', | |
320 | matches, parent, ident) |
|
229 | matches, parent, ident) | |
321 | # print >> sys.__stdout__, completion_msg |
|
230 | # print >> sys.__stdout__, completion_msg | |
322 |
|
231 | |||
323 | def complete(self, msg): |
|
232 | def complete(self, msg): | |
324 | return self.completer.complete(msg.content.line, msg.content.text) |
|
233 | return self.completer.complete(msg.content.line, msg.content.text) | |
325 |
|
234 | |||
326 | def apply_request(self, stream, ident, parent): |
|
235 | def apply_request(self, stream, ident, parent): | |
327 | print (parent) |
|
236 | print (parent) | |
328 | try: |
|
237 | try: | |
329 | content = parent[u'content'] |
|
238 | content = parent[u'content'] | |
330 | bufs = parent[u'buffers'] |
|
239 | bufs = parent[u'buffers'] | |
331 | msg_id = parent['header']['msg_id'] |
|
240 | msg_id = parent['header']['msg_id'] | |
332 | bound = content.get('bound', False) |
|
241 | bound = content.get('bound', False) | |
333 | except: |
|
242 | except: | |
334 | print("Got bad msg: ", file=sys.__stderr__) |
|
243 | print("Got bad msg: ", file=sys.__stderr__) | |
335 | print(Message(parent), file=sys.__stderr__) |
|
244 | print(Message(parent), file=sys.__stderr__) | |
336 | return |
|
245 | return | |
337 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
246 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
338 | # self.pub_stream.send(pyin_msg) |
|
247 | # self.iopub_stream.send(pyin_msg) | |
339 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) |
|
248 | # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) | |
340 | sub = {'dependencies_met' : True, 'engine' : self.identity} |
|
249 | sub = {'dependencies_met' : True, 'engine' : self.identity} | |
341 | try: |
|
250 | try: | |
342 | # allow for not overriding displayhook |
|
251 | # allow for not overriding displayhook | |
343 | if hasattr(sys.displayhook, 'set_parent'): |
|
252 | if hasattr(sys.displayhook, 'set_parent'): | |
344 | sys.displayhook.set_parent(parent) |
|
253 | sys.displayhook.set_parent(parent) | |
345 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns |
|
254 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns | |
346 | if bound: |
|
255 | if bound: | |
347 | working = self.user_ns |
|
256 | working = self.user_ns | |
348 | suffix = str(msg_id).replace("-","") |
|
257 | suffix = str(msg_id).replace("-","") | |
349 | prefix = "_" |
|
258 | prefix = "_" | |
350 |
|
259 | |||
351 | else: |
|
260 | else: | |
352 | working = dict() |
|
261 | working = dict() | |
353 | suffix = prefix = "_" # prevent keyword collisions with lambda |
|
262 | suffix = prefix = "_" # prevent keyword collisions with lambda | |
354 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) |
|
263 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | |
355 | # if f.fun |
|
264 | # if f.fun | |
356 | fname = prefix+f.func_name.strip('<>')+suffix |
|
265 | fname = prefix+f.func_name.strip('<>')+suffix | |
357 | argname = prefix+"args"+suffix |
|
266 | argname = prefix+"args"+suffix | |
358 | kwargname = prefix+"kwargs"+suffix |
|
267 | kwargname = prefix+"kwargs"+suffix | |
359 | resultname = prefix+"result"+suffix |
|
268 | resultname = prefix+"result"+suffix | |
360 |
|
269 | |||
361 | ns = { fname : f, argname : args, kwargname : kwargs } |
|
270 | ns = { fname : f, argname : args, kwargname : kwargs } | |
362 | # print ns |
|
271 | # print ns | |
363 | working.update(ns) |
|
272 | working.update(ns) | |
364 | code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) |
|
273 | code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) | |
365 | exec code in working, working |
|
274 | exec code in working, working | |
366 | result = working.get(resultname) |
|
275 | result = working.get(resultname) | |
367 | # clear the namespace |
|
276 | # clear the namespace | |
368 | if bound: |
|
277 | if bound: | |
369 | for key in ns.iterkeys(): |
|
278 | for key in ns.iterkeys(): | |
370 | self.user_ns.pop(key) |
|
279 | self.user_ns.pop(key) | |
371 | else: |
|
280 | else: | |
372 | del working |
|
281 | del working | |
373 |
|
282 | |||
374 | packed_result,buf = serialize_object(result) |
|
283 | packed_result,buf = serialize_object(result) | |
375 | result_buf = [packed_result]+buf |
|
284 | result_buf = [packed_result]+buf | |
376 | except: |
|
285 | except: | |
377 | result = u'error' |
|
286 | result = u'error' | |
378 | etype, evalue, tb = sys.exc_info() |
|
287 | etype, evalue, tb = sys.exc_info() | |
379 | tb = traceback.format_exception(etype, evalue, tb) |
|
288 | tb = traceback.format_exception(etype, evalue, tb) | |
380 | exc_content = { |
|
289 | exc_content = { | |
381 | u'status' : u'error', |
|
290 | u'status' : u'error', | |
382 | u'traceback' : tb, |
|
291 | u'traceback' : tb, | |
383 | u'etype' : unicode(etype), |
|
292 | u'etype' : unicode(etype), | |
384 | u'evalue' : unicode(evalue) |
|
293 | u'evalue' : unicode(evalue) | |
385 | } |
|
294 | } | |
386 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
295 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |
387 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) |
|
296 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) | |
388 | reply_content = exc_content |
|
297 | reply_content = exc_content | |
389 | result_buf = [] |
|
298 | result_buf = [] | |
390 |
|
299 | |||
391 | if etype is UnmetDependency: |
|
300 | if etype is UnmetDependency: | |
392 | sub['dependencies_met'] = False |
|
301 | sub['dependencies_met'] = False | |
393 | else: |
|
302 | else: | |
394 | reply_content = {'status' : 'ok'} |
|
303 | reply_content = {'status' : 'ok'} | |
395 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) |
|
304 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | |
396 | # self.reply_socket.send(ident, zmq.SNDMORE) |
|
305 | # self.reply_socket.send(ident, zmq.SNDMORE) | |
397 | # self.reply_socket.send_json(reply_msg) |
|
306 | # self.reply_socket.send_json(reply_msg) | |
398 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, |
|
307 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | |
399 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) |
|
308 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) | |
400 | print(Message(reply_msg), file=sys.__stdout__) |
|
309 | print(Message(reply_msg), file=sys.__stdout__) | |
401 | # if reply_msg['content']['status'] == u'error': |
|
310 | # if reply_msg['content']['status'] == u'error': | |
402 | # self.abort_queues() |
|
311 | # self.abort_queues() | |
403 |
|
312 | |||
404 | def dispatch_queue(self, stream, msg): |
|
313 | def dispatch_queue(self, stream, msg): | |
405 | self.control_stream.flush() |
|
314 | self.control_stream.flush() | |
406 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
315 | idents,msg = self.session.feed_identities(msg, copy=False) | |
407 | msg = self.session.unpack_message(msg, content=True, copy=False) |
|
316 | msg = self.session.unpack_message(msg, content=True, copy=False) | |
408 |
|
317 | |||
409 | header = msg['header'] |
|
318 | header = msg['header'] | |
410 | msg_id = header['msg_id'] |
|
319 | msg_id = header['msg_id'] | |
411 | if self.check_aborted(msg_id): |
|
320 | if self.check_aborted(msg_id): | |
412 | self.aborted.remove(msg_id) |
|
321 | self.aborted.remove(msg_id) | |
413 | # is it safe to assume a msg_id will not be resubmitted? |
|
322 | # is it safe to assume a msg_id will not be resubmitted? | |
414 | reply_type = msg['msg_type'].split('_')[0] + '_reply' |
|
323 | reply_type = msg['msg_type'].split('_')[0] + '_reply' | |
415 | reply_msg = self.session.send(stream, reply_type, |
|
324 | reply_msg = self.session.send(stream, reply_type, | |
416 | content={'status' : 'aborted'}, parent=msg, ident=idents) |
|
325 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |
417 | return |
|
326 | return | |
418 |
handler = self. |
|
327 | handler = self.shell_handlers.get(msg['msg_type'], None) | |
419 | if handler is None: |
|
328 | if handler is None: | |
420 | print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__) |
|
329 | print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__) | |
421 | else: |
|
330 | else: | |
422 | handler(stream, idents, msg) |
|
331 | handler(stream, idents, msg) | |
423 |
|
332 | |||
424 | def start(self): |
|
333 | def start(self): | |
425 | #### stream mode: |
|
334 | #### stream mode: | |
426 | if self.control_stream: |
|
335 | if self.control_stream: | |
427 | self.control_stream.on_recv(self.dispatch_control, copy=False) |
|
336 | self.control_stream.on_recv(self.dispatch_control, copy=False) | |
428 | self.control_stream.on_err(printer) |
|
337 | self.control_stream.on_err(printer) | |
429 | if self.reply_stream: |
|
338 | ||
430 |
|
|
339 | for s in self.shell_streams: | |
431 | self.dispatch_queue(self.reply_stream, msg), copy=False) |
|
340 | s.on_recv(lambda msg: | |
432 | self.reply_stream.on_err(printer) |
|
341 | self.dispatch_queue(s, msg), copy=False) | |
433 | if self.task_stream: |
|
342 | s.on_err(printer) | |
434 | self.task_stream.on_recv(lambda msg: |
|
343 | ||
435 | self.dispatch_queue(self.task_stream, msg), copy=False) |
|
344 | if self.iopub_stream: | |
436 |
self. |
|
345 | self.iopub_stream.on_err(printer) | |
|
346 | self.iopub_stream.on_send(printer) | |||
437 |
|
347 | |||
438 | #### while True mode: |
|
348 | #### while True mode: | |
439 | # while True: |
|
349 | # while True: | |
440 | # idle = True |
|
350 | # idle = True | |
441 | # try: |
|
351 | # try: | |
442 |
# msg = self. |
|
352 | # msg = self.shell_stream.socket.recv_multipart( | |
443 | # zmq.NOBLOCK, copy=False) |
|
353 | # zmq.NOBLOCK, copy=False) | |
444 | # except zmq.ZMQError, e: |
|
354 | # except zmq.ZMQError, e: | |
445 | # if e.errno != zmq.EAGAIN: |
|
355 | # if e.errno != zmq.EAGAIN: | |
446 | # raise e |
|
356 | # raise e | |
447 | # else: |
|
357 | # else: | |
448 | # idle=False |
|
358 | # idle=False | |
449 |
# self.dispatch_queue(self. |
|
359 | # self.dispatch_queue(self.shell_stream, msg) | |
450 | # |
|
360 | # | |
451 | # if not self.task_stream.empty(): |
|
361 | # if not self.task_stream.empty(): | |
452 | # idle=False |
|
362 | # idle=False | |
453 | # msg = self.task_stream.recv_multipart() |
|
363 | # msg = self.task_stream.recv_multipart() | |
454 | # self.dispatch_queue(self.task_stream, msg) |
|
364 | # self.dispatch_queue(self.task_stream, msg) | |
455 | # if idle: |
|
365 | # if idle: | |
456 | # # don't busywait |
|
366 | # # don't busywait | |
457 | # time.sleep(1e-3) |
|
367 | # time.sleep(1e-3) | |
458 |
|
368 | |||
459 |
|
369 | def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, | ||
460 | def main(): |
|
370 | client_addr=None, loop=None, context=None): | |
461 | raise Exception("Don't run me anymore") |
|
371 | # create loop, context, and session: | |
|
372 | if loop is None: | |||
462 | loop = ioloop.IOLoop.instance() |
|
373 | loop = ioloop.IOLoop.instance() | |
463 | c = zmq.Context() |
|
374 | if context is None: | |
464 |
|
375 | context = zmq.Context() | ||
465 | ip = '127.0.0.1' |
|
376 | c = context | |
466 | port_base = 5575 |
|
377 | session = StreamSession() | |
467 | connection = ('tcp://%s' % ip) + ':%i' |
|
378 | print (control_addr, shell_addrs, iopub_addr, hb_addrs) | |
468 | rep_conn = connection % port_base |
|
379 | ||
469 | pub_conn = connection % (port_base+1) |
|
380 | # create Control Stream | |
470 |
|
381 | control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) | ||
471 | print("Starting the kernel...", file=sys.__stdout__) |
|
382 | control_stream.setsockopt(zmq.IDENTITY, identity) | |
472 | # print >>sys.__stdout__, "XREQ Channel:", rep_conn |
|
383 | control_stream.connect(control_addr) | |
473 | # print >>sys.__stdout__, "PUB Channel:", pub_conn |
|
384 | ||
474 |
|
385 | # create Shell Streams (MUX, Task, etc.): | ||
475 | session = StreamSession(username=u'kernel') |
|
386 | shell_streams = [] | |
476 |
|
387 | for addr in shell_addrs: | ||
477 | reply_socket = c.socket(zmq.XREQ) |
|
388 | stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) | |
478 | reply_socket.connect(rep_conn) |
|
389 | stream.setsockopt(zmq.IDENTITY, identity) | |
479 |
|
390 | stream.connect(addr) | ||
480 | pub_socket = c.socket(zmq.PUB) |
|
391 | shell_streams.append(stream) | |
481 | pub_socket.connect(pub_conn) |
|
392 | ||
482 |
|
393 | # create iopub stream: | ||
483 | stdout = OutStream(session, pub_socket, u'stdout') |
|
394 | iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop) | |
484 | stderr = OutStream(session, pub_socket, u'stderr') |
|
395 | iopub_stream.setsockopt(zmq.IDENTITY, identity) | |
485 | sys.stdout = stdout |
|
396 | iopub_stream.connect(iopub_addr) | |
486 | sys.stderr = stderr |
|
397 | ||
487 |
|
398 | # launch heartbeat | ||
488 | display_hook = DisplayHook(session, pub_socket) |
|
399 | heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) | |
489 | sys.displayhook = display_hook |
|
400 | heart.start() | |
490 | reply_stream = zmqstream.ZMQStream(reply_socket,loop) |
|
401 | ||
491 | pub_stream = zmqstream.ZMQStream(pub_socket,loop) |
|
402 | # create (optional) Client | |
492 | kernel = Kernel(session, reply_stream, pub_stream) |
|
403 | if client_addr: | |
493 |
|
404 | client = Client(client_addr, username=identity) | ||
494 | # For debugging convenience, put sleep and a string in the namespace, so we |
|
405 | else: | |
495 | # have them every time we start. |
|
406 | client = None | |
496 | kernel.user_ns['sleep'] = time.sleep |
|
|||
497 | kernel.user_ns['s'] = 'Test string' |
|
|||
498 |
|
407 | |||
499 | print ("Use Ctrl-\\ (NOT Ctrl-C!) to terminate.", file=sys.__stdout__) |
|
408 | kernel = Kernel(session=session, control_stream=control_stream, | |
|
409 | shell_streams=shell_streams, iopub_stream=iopub_stream, | |||
|
410 | client=client) | |||
500 | kernel.start() |
|
411 | kernel.start() | |
501 | loop.start() |
|
412 | return loop, c | |
502 |
|
||||
503 |
|
413 | |||
504 | if __name__ == '__main__': |
|
|||
505 | main() |
|
General Comments 0
You need to be logged in to leave comments.
Login now