Show More
@@ -1,148 +1,132 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | """A simple engine that talks to a controller over 0MQ. |
|
3 | 3 | it handles registration, etc. and launches a kernel |
|
4 | 4 | connected to the Controller's queue(s). |
|
5 | 5 | """ |
|
6 | 6 | from __future__ import print_function |
|
7 | 7 | import sys |
|
8 | 8 | import time |
|
9 | 9 | import traceback |
|
10 | 10 | import uuid |
|
11 | 11 | from pprint import pprint |
|
12 | 12 | |
|
13 | 13 | import zmq |
|
14 | 14 | from zmq.eventloop import ioloop, zmqstream |
|
15 | 15 | |
|
16 | from IPython.utils.traitlets import HasTraits | |
|
17 | from IPython.utils.localinterfaces import LOCALHOST | |
|
18 | ||
|
16 | 19 | from streamsession import Message, StreamSession |
|
17 | 20 | from client import Client |
|
18 |
|
|
|
21 | from streamkernel import Kernel, make_kernel | |
|
19 | 22 | import heartmonitor |
|
20 | 23 | from entry_point import make_base_argument_parser, connect_logger, parse_url |
|
21 | 24 | # import taskthread |
|
22 | 25 | # from log import logger |
|
23 | 26 | |
|
24 | 27 | |
|
25 | 28 | def printer(*msg): |
|
26 | 29 | pprint(msg) |
|
27 | 30 | |
|
28 | 31 | class Engine(object): |
|
29 | 32 | """IPython engine""" |
|
30 | 33 | |
|
31 | 34 | id=None |
|
32 | 35 | context=None |
|
33 | 36 | loop=None |
|
34 | 37 | session=None |
|
35 | 38 | ident=None |
|
36 | 39 | registrar=None |
|
37 | 40 | heart=None |
|
38 | 41 | kernel=None |
|
39 | 42 | |
|
40 | 43 | def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None): |
|
41 | 44 | self.context = context |
|
42 | 45 | self.loop = loop |
|
43 | 46 | self.session = session |
|
44 | 47 | self.registrar = registrar |
|
45 | 48 | self.client = client |
|
46 | 49 | self.ident = ident if ident else str(uuid.uuid4()) |
|
47 | 50 | self.registrar.on_send(printer) |
|
48 | 51 | |
|
49 | 52 | def register(self): |
|
50 | 53 | |
|
51 | 54 | content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) |
|
52 | 55 | self.registrar.on_recv(self.complete_registration) |
|
53 | 56 | self.session.send(self.registrar, "registration_request",content=content) |
|
54 | 57 | |
|
55 | 58 | def complete_registration(self, msg): |
|
56 | 59 | # print msg |
|
57 | 60 | idents,msg = self.session.feed_identities(msg) |
|
58 | 61 | msg = Message(self.session.unpack_message(msg)) |
|
59 | 62 | if msg.content.status == 'ok': |
|
60 | 63 | self.session.username = str(msg.content.id) |
|
61 | 64 | queue_addr = msg.content.queue |
|
62 |
|
|
|
63 | queue = self.context.socket(zmq.PAIR) | |
|
64 | queue.setsockopt(zmq.IDENTITY, self.ident) | |
|
65 | queue.connect(str(queue_addr)) | |
|
66 | self.queue = zmqstream.ZMQStream(queue, self.loop) | |
|
67 | ||
|
68 | control_addr = msg.content.control | |
|
69 | if control_addr: | |
|
70 | control = self.context.socket(zmq.PAIR) | |
|
71 | control.setsockopt(zmq.IDENTITY, self.ident) | |
|
72 | control.connect(str(control_addr)) | |
|
73 | self.control = zmqstream.ZMQStream(control, self.loop) | |
|
74 | ||
|
65 | shell_addrs = [str(queue_addr)] | |
|
66 | control_addr = str(msg.content.control) | |
|
75 | 67 | task_addr = msg.content.task |
|
76 | print (task_addr) | |
|
77 | 68 | if task_addr: |
|
78 | # task as stream: | |
|
79 | task = self.context.socket(zmq.PAIR) | |
|
80 | task.setsockopt(zmq.IDENTITY, self.ident) | |
|
81 | task.connect(str(task_addr)) | |
|
82 | self.task_stream = zmqstream.ZMQStream(task, self.loop) | |
|
83 | # TaskThread: | |
|
84 | # mon_addr = msg.content.monitor | |
|
85 | # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident) | |
|
86 | # task.connect_in(str(task_addr)) | |
|
87 | # task.connect_out(str(mon_addr)) | |
|
88 | # self.task_stream = taskthread.QueueStream(*task.queues) | |
|
89 | # task.start() | |
|
90 | ||
|
91 | hbs = msg.content.heartbeat | |
|
92 | self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident) | |
|
93 | self.heart.start() | |
|
69 | shell_addrs.append(str(task_addr)) | |
|
70 | ||
|
71 | hb_addrs = msg.content.heartbeat | |
|
94 | 72 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() |
|
95 | # placeholder for now: | |
|
96 | pub = self.context.socket(zmq.PUB) | |
|
97 | pub = zmqstream.ZMQStream(pub, self.loop) | |
|
98 | # create and start the kernel | |
|
99 | self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client) | |
|
100 | self.kernel.start() | |
|
73 | ||
|
74 | # placeholder for no, since pub isn't hooked up: | |
|
75 | sub = self.context.socket(zmq.SUB) | |
|
76 | sub = zmqstream.ZMQStream(sub, self.loop) | |
|
77 | sub.on_recv(lambda *a: None) | |
|
78 | port = sub.bind_to_random_port("tcp://%s"%LOCALHOST) | |
|
79 | iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345) | |
|
80 | ||
|
81 | make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs, | |
|
82 | client_addr=None, loop=self.loop, context=self.context) | |
|
83 | ||
|
101 | 84 | else: |
|
102 | 85 | # logger.error("Registration Failed: %s"%msg) |
|
103 | 86 | raise Exception("Registration Failed: %s"%msg) |
|
104 | 87 | |
|
105 | 88 | # logger.info("engine::completed registration with id %s"%self.session.username) |
|
106 | 89 | |
|
107 | 90 | print (msg) |
|
108 | 91 | |
|
109 | 92 | def unregister(self): |
|
110 | 93 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) |
|
111 | 94 | time.sleep(1) |
|
112 | 95 | sys.exit(0) |
|
113 | 96 | |
|
114 | 97 | def start(self): |
|
115 | 98 | print ("registering") |
|
116 | 99 | self.register() |
|
117 | 100 | |
|
118 | 101 | |
|
102 | ||
|
119 | 103 | def main(): |
|
120 | 104 | |
|
121 | 105 | parser = make_base_argument_parser() |
|
122 | 106 | |
|
123 | 107 | args = parser.parse_args() |
|
124 | 108 | |
|
125 | 109 | parse_url(args) |
|
126 | 110 | |
|
127 | 111 | iface="%s://%s"%(args.transport,args.ip)+':%i' |
|
128 | 112 | |
|
129 | 113 | loop = ioloop.IOLoop.instance() |
|
130 | 114 | session = StreamSession() |
|
131 | 115 | ctx = zmq.Context() |
|
132 | 116 | |
|
133 | 117 | # setup logging |
|
134 | 118 | connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) |
|
135 | 119 | |
|
136 | 120 | reg_conn = iface % args.regport |
|
137 | 121 | print (reg_conn) |
|
138 | 122 | print ("Starting the engine...", file=sys.__stderr__) |
|
139 | 123 | |
|
140 | 124 | reg = ctx.socket(zmq.PAIR) |
|
141 | 125 | reg.connect(reg_conn) |
|
142 | 126 | reg = zmqstream.ZMQStream(reg, loop) |
|
143 | 127 | client = Client(reg_conn) |
|
144 | 128 | |
|
145 | 129 | e = Engine(ctx, loop, session, reg, client, args.ident) |
|
146 | 130 | dc = ioloop.DelayedCallback(e.start, 100, loop) |
|
147 | 131 | dc.start() |
|
148 | 132 | loop.start() No newline at end of file |
@@ -1,505 +1,413 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | """ |
|
3 | 3 | Kernel adapted from kernel.py to use ZMQ Streams |
|
4 | 4 | """ |
|
5 | 5 | |
|
6 | #----------------------------------------------------------------------------- | |
|
7 | # Imports | |
|
8 | #----------------------------------------------------------------------------- | |
|
9 | ||
|
10 | # Standard library imports. | |
|
6 | 11 | from __future__ import print_function |
|
7 | 12 | import __builtin__ |
|
13 | from code import CommandCompiler | |
|
8 | 14 | import os |
|
9 | 15 | import sys |
|
10 | 16 | import time |
|
11 | 17 | import traceback |
|
12 | 18 | from signal import SIGTERM, SIGKILL |
|
13 | 19 | from pprint import pprint |
|
14 | 20 | |
|
15 | from code import CommandCompiler | |
|
16 | ||
|
21 | # System library imports. | |
|
17 | 22 | import zmq |
|
18 | 23 | from zmq.eventloop import ioloop, zmqstream |
|
19 | 24 | |
|
25 | # Local imports. | |
|
26 | from IPython.utils.traitlets import HasTraits, Instance, List | |
|
20 | 27 | from IPython.zmq.completer import KernelCompleter |
|
21 | 28 | |
|
22 | 29 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ |
|
23 | 30 | unpack_apply_message |
|
24 | 31 | from dependency import UnmetDependency |
|
32 | import heartmonitor | |
|
33 | from client import Client | |
|
25 | 34 | |
|
26 | 35 | def printer(*args): |
|
27 | 36 | pprint(args) |
|
28 | 37 | |
|
29 | class OutStream(object): | |
|
30 | """A file like object that publishes the stream to a 0MQ PUB socket.""" | |
|
31 | ||
|
32 | def __init__(self, session, pub_socket, name, max_buffer=200): | |
|
33 | self.session = session | |
|
34 | self.pub_socket = pub_socket | |
|
35 | self.name = name | |
|
36 | self._buffer = [] | |
|
37 | self._buffer_len = 0 | |
|
38 | self.max_buffer = max_buffer | |
|
39 | self.parent_header = {} | |
|
40 | ||
|
41 | def set_parent(self, parent): | |
|
42 | self.parent_header = extract_header(parent) | |
|
43 | ||
|
44 | def close(self): | |
|
45 | self.pub_socket = None | |
|
46 | ||
|
47 | def flush(self): | |
|
48 | if self.pub_socket is None: | |
|
49 | raise ValueError(u'I/O operation on closed file') | |
|
50 | else: | |
|
51 | if self._buffer: | |
|
52 | data = ''.join(self._buffer) | |
|
53 | content = {u'name':self.name, u'data':data} | |
|
54 | # msg = self.session.msg(u'stream', content=content, | |
|
55 | # parent=self.parent_header) | |
|
56 | msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header) | |
|
57 | # print>>sys.__stdout__, Message(msg) | |
|
58 | # self.pub_socket.send_json(msg) | |
|
59 | self._buffer_len = 0 | |
|
60 | self._buffer = [] | |
|
61 | ||
|
62 | def isattr(self): | |
|
63 | return False | |
|
64 | ||
|
65 | def next(self): | |
|
66 | raise IOError('Read not supported on a write only stream.') | |
|
67 | ||
|
68 | def read(self, size=None): | |
|
69 | raise IOError('Read not supported on a write only stream.') | |
|
70 | ||
|
71 | readline=read | |
|
72 | ||
|
73 | def write(self, s): | |
|
74 | if self.pub_socket is None: | |
|
75 | raise ValueError('I/O operation on closed file') | |
|
76 | else: | |
|
77 | self._buffer.append(s) | |
|
78 | self._buffer_len += len(s) | |
|
79 | self._maybe_send() | |
|
80 | ||
|
81 | def _maybe_send(self): | |
|
82 | if '\n' in self._buffer[-1]: | |
|
83 | self.flush() | |
|
84 | if self._buffer_len > self.max_buffer: | |
|
85 | self.flush() | |
|
86 | ||
|
87 | def writelines(self, sequence): | |
|
88 | if self.pub_socket is None: | |
|
89 | raise ValueError('I/O operation on closed file') | |
|
90 | else: | |
|
91 | for s in sequence: | |
|
92 | self.write(s) | |
|
93 | ||
|
94 | ||
|
95 | class DisplayHook(object): | |
|
96 | ||
|
97 | def __init__(self, session, pub_socket): | |
|
98 | self.session = session | |
|
99 | self.pub_socket = pub_socket | |
|
100 | self.parent_header = {} | |
|
101 | ||
|
102 | def __call__(self, obj): | |
|
103 | if obj is None: | |
|
104 | return | |
|
105 | ||
|
106 | __builtin__._ = obj | |
|
107 | # msg = self.session.msg(u'pyout', {u'data':repr(obj)}, | |
|
108 | # parent=self.parent_header) | |
|
109 | # self.pub_socket.send_json(msg) | |
|
110 | self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header) | |
|
111 | ||
|
112 | def set_parent(self, parent): | |
|
113 | self.parent_header = extract_header(parent) | |
|
38 | #----------------------------------------------------------------------------- | |
|
39 | # Main kernel class | |
|
40 | #----------------------------------------------------------------------------- | |
|
114 | 41 | |
|
42 | class Kernel(HasTraits): | |
|
115 | 43 | |
|
116 | class RawInput(object): | |
|
44 | #--------------------------------------------------------------------------- | |
|
45 | # Kernel interface | |
|
46 | #--------------------------------------------------------------------------- | |
|
117 | 47 | |
|
118 | def __init__(self, session, socket): | |
|
119 | self.session = session | |
|
120 | self.socket = socket | |
|
48 | session = Instance(StreamSession) | |
|
49 | shell_streams = Instance(list) | |
|
50 | control_stream = Instance(zmqstream.ZMQStream) | |
|
51 | task_stream = Instance(zmqstream.ZMQStream) | |
|
52 | iopub_stream = Instance(zmqstream.ZMQStream) | |
|
53 | client = Instance(Client) | |
|
121 | 54 | |
|
122 |
def __ |
|
|
123 | msg = self.session.msg(u'raw_input') | |
|
124 | self.socket.send_json(msg) | |
|
125 | while True: | |
|
126 | try: | |
|
127 | reply = self.socket.recv_json(zmq.NOBLOCK) | |
|
128 | except zmq.ZMQError as e: | |
|
129 | if e.errno == zmq.EAGAIN: | |
|
130 | pass | |
|
131 | else: | |
|
132 | raise | |
|
133 | else: | |
|
134 | break | |
|
135 | return reply[u'content'][u'data'] | |
|
136 | ||
|
137 | ||
|
138 | class Kernel(object): | |
|
139 | ||
|
140 | def __init__(self, session, control_stream, reply_stream, pub_stream, | |
|
141 | task_stream=None, client=None): | |
|
142 | self.session = session | |
|
143 | self.control_stream = control_stream | |
|
144 | # self.control_socket = control_stream.socket | |
|
145 | self.reply_stream = reply_stream | |
|
146 | self.identity = self.reply_stream.getsockopt(zmq.IDENTITY) | |
|
147 | self.task_stream = task_stream | |
|
148 | self.pub_stream = pub_stream | |
|
149 | self.client = client | |
|
55 | def __init__(self, **kwargs): | |
|
56 | super(Kernel, self).__init__(**kwargs) | |
|
57 | self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) | |
|
150 | 58 | self.user_ns = {} |
|
151 | 59 | self.history = [] |
|
152 | 60 | self.compiler = CommandCompiler() |
|
153 | 61 | self.completer = KernelCompleter(self.user_ns) |
|
154 | 62 | self.aborted = set() |
|
155 | 63 | |
|
156 | 64 | # Build dict of handlers for message types |
|
157 |
self. |
|
|
65 | self.shell_handlers = {} | |
|
158 | 66 | self.control_handlers = {} |
|
159 |
for msg_type in ['execute_request', 'complete_request', 'apply_request' |
|
|
160 | self.queue_handlers[msg_type] = getattr(self, msg_type) | |
|
67 | for msg_type in ['execute_request', 'complete_request', 'apply_request', | |
|
68 | 'clear_request']: | |
|
69 | self.shell_handlers[msg_type] = getattr(self, msg_type) | |
|
161 | 70 | |
|
162 |
for msg_type in [' |
|
|
71 | for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys(): | |
|
163 | 72 | self.control_handlers[msg_type] = getattr(self, msg_type) |
|
164 | 73 | |
|
165 | 74 | #-------------------- control handlers ----------------------------- |
|
166 | 75 | def abort_queues(self): |
|
167 |
for stream in |
|
|
76 | for stream in self.shell_streams: | |
|
168 | 77 | if stream: |
|
169 | 78 | self.abort_queue(stream) |
|
170 | 79 | |
|
171 | 80 | def abort_queue(self, stream): |
|
172 | 81 | while True: |
|
173 | 82 | try: |
|
174 | 83 | msg = self.session.recv(stream, zmq.NOBLOCK,content=True) |
|
175 | 84 | except zmq.ZMQError as e: |
|
176 | 85 | if e.errno == zmq.EAGAIN: |
|
177 | 86 | break |
|
178 | 87 | else: |
|
179 | 88 | return |
|
180 | 89 | else: |
|
181 | 90 | if msg is None: |
|
182 | 91 | return |
|
183 | 92 | else: |
|
184 | 93 | idents,msg = msg |
|
185 | 94 | |
|
186 | 95 | # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." |
|
187 | 96 | # msg = self.reply_socket.recv_json() |
|
188 | 97 | print ("Aborting:", file=sys.__stdout__) |
|
189 | 98 | print (Message(msg), file=sys.__stdout__) |
|
190 | 99 | msg_type = msg['msg_type'] |
|
191 | 100 | reply_type = msg_type.split('_')[0] + '_reply' |
|
192 | 101 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) |
|
193 | 102 | # self.reply_socket.send(ident,zmq.SNDMORE) |
|
194 | 103 | # self.reply_socket.send_json(reply_msg) |
|
195 | 104 | reply_msg = self.session.send(stream, reply_type, |
|
196 | 105 | content={'status' : 'aborted'}, parent=msg, ident=idents)[0] |
|
197 | 106 | print(Message(reply_msg), file=sys.__stdout__) |
|
198 | 107 | # We need to wait a bit for requests to come in. This can probably |
|
199 | 108 | # be set shorter for true asynchronous clients. |
|
200 | 109 | time.sleep(0.05) |
|
201 | 110 | |
|
202 | 111 | def abort_request(self, stream, ident, parent): |
|
203 | 112 | """abort a specifig msg by id""" |
|
204 | 113 | msg_ids = parent['content'].get('msg_ids', None) |
|
205 | 114 | if isinstance(msg_ids, basestring): |
|
206 | 115 | msg_ids = [msg_ids] |
|
207 | 116 | if not msg_ids: |
|
208 | 117 | self.abort_queues() |
|
209 | 118 | for mid in msg_ids: |
|
210 | 119 | self.aborted.add(str(mid)) |
|
211 | 120 | |
|
212 | 121 | content = dict(status='ok') |
|
213 | 122 | reply_msg = self.session.send(stream, 'abort_reply', content=content, |
|
214 | 123 | parent=parent, ident=ident)[0] |
|
215 | 124 | print(Message(reply_msg), file=sys.__stdout__) |
|
216 | 125 | |
|
217 |
def |
|
|
126 | def shutdown_request(self, stream, ident, parent): | |
|
218 | 127 | """kill ourself. This should really be handled in an external process""" |
|
219 | 128 | self.abort_queues() |
|
220 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, | |
|
221 | content = dict(status='ok')) | |
|
222 | # we can know that a message is done if we *don't* use streams, but | |
|
223 | # use a socket directly with MessageTracker | |
|
224 | time.sleep(.5) | |
|
225 | os.kill(os.getpid(), SIGTERM) | |
|
226 | time.sleep(1) | |
|
227 | os.kill(os.getpid(), SIGKILL) | |
|
228 | ||
|
229 | def clear_request(self, stream, idents, parent): | |
|
230 | """Clear our namespace.""" | |
|
231 | self.user_ns = {} | |
|
232 | msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | |
|
233 | content = dict(status='ok')) | |
|
129 | content = dict(parent['content']) | |
|
130 | msg = self.session.send(self.reply_socket, 'shutdown_reply', | |
|
131 | content, parent, ident) | |
|
132 | msg = self.session.send(self.pub_socket, 'shutdown_reply', | |
|
133 | content, parent, ident) | |
|
134 | # print >> sys.__stdout__, msg | |
|
135 | time.sleep(0.1) | |
|
136 | sys.exit(0) | |
|
234 | 137 | |
|
235 | 138 | def dispatch_control(self, msg): |
|
236 | 139 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
237 | 140 | msg = self.session.unpack_message(msg, content=True, copy=False) |
|
238 | 141 | |
|
239 | 142 | header = msg['header'] |
|
240 | 143 | msg_id = header['msg_id'] |
|
241 | 144 | |
|
242 | 145 | handler = self.control_handlers.get(msg['msg_type'], None) |
|
243 | 146 | if handler is None: |
|
244 | 147 | print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__) |
|
245 | 148 | else: |
|
246 | 149 | handler(self.control_stream, idents, msg) |
|
247 | 150 | |
|
248 | 151 | |
|
249 | 152 | #-------------------- queue helpers ------------------------------ |
|
250 | 153 | |
|
251 | 154 | def check_dependencies(self, dependencies): |
|
252 | 155 | if not dependencies: |
|
253 | 156 | return True |
|
254 | 157 | if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): |
|
255 | 158 | anyorall = dependencies[0] |
|
256 | 159 | dependencies = dependencies[1] |
|
257 | 160 | else: |
|
258 | 161 | anyorall = 'all' |
|
259 | 162 | results = self.client.get_results(dependencies,status_only=True) |
|
260 | 163 | if results['status'] != 'ok': |
|
261 | 164 | return False |
|
262 | 165 | |
|
263 | 166 | if anyorall == 'any': |
|
264 | 167 | if not results['completed']: |
|
265 | 168 | return False |
|
266 | 169 | else: |
|
267 | 170 | if results['pending']: |
|
268 | 171 | return False |
|
269 | 172 | |
|
270 | 173 | return True |
|
271 | 174 | |
|
272 | 175 | def check_aborted(self, msg_id): |
|
273 | 176 | return msg_id in self.aborted |
|
274 | 177 | |
|
275 | 178 | #-------------------- queue handlers ----------------------------- |
|
276 | 179 | |
|
180 | def clear_request(self, stream, idents, parent): | |
|
181 | """Clear our namespace.""" | |
|
182 | self.user_ns = {} | |
|
183 | msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | |
|
184 | content = dict(status='ok')) | |
|
185 | ||
|
277 | 186 | def execute_request(self, stream, ident, parent): |
|
278 | 187 | try: |
|
279 | 188 | code = parent[u'content'][u'code'] |
|
280 | 189 | except: |
|
281 | 190 | print("Got bad msg: ", file=sys.__stderr__) |
|
282 | 191 | print(Message(parent), file=sys.__stderr__) |
|
283 | 192 | return |
|
284 | 193 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
285 | # self.pub_stream.send(pyin_msg) | |
|
286 | self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) | |
|
194 | # self.iopub_stream.send(pyin_msg) | |
|
195 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) | |
|
287 | 196 | try: |
|
288 | 197 | comp_code = self.compiler(code, '<zmq-kernel>') |
|
289 | 198 | # allow for not overriding displayhook |
|
290 | 199 | if hasattr(sys.displayhook, 'set_parent'): |
|
291 | 200 | sys.displayhook.set_parent(parent) |
|
292 | 201 | exec comp_code in self.user_ns, self.user_ns |
|
293 | 202 | except: |
|
294 | 203 | # result = u'error' |
|
295 | 204 | etype, evalue, tb = sys.exc_info() |
|
296 | 205 | tb = traceback.format_exception(etype, evalue, tb) |
|
297 | 206 | exc_content = { |
|
298 | 207 | u'status' : u'error', |
|
299 | 208 | u'traceback' : tb, |
|
300 | 209 | u'etype' : unicode(etype), |
|
301 | 210 | u'evalue' : unicode(evalue) |
|
302 | 211 | } |
|
303 | 212 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
304 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) | |
|
213 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) | |
|
305 | 214 | reply_content = exc_content |
|
306 | 215 | else: |
|
307 | 216 | reply_content = {'status' : 'ok'} |
|
308 | 217 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) |
|
309 | 218 | # self.reply_socket.send(ident, zmq.SNDMORE) |
|
310 | 219 | # self.reply_socket.send_json(reply_msg) |
|
311 | 220 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident) |
|
312 | 221 | print(Message(reply_msg), file=sys.__stdout__) |
|
313 | 222 | if reply_msg['content']['status'] == u'error': |
|
314 | 223 | self.abort_queues() |
|
315 | 224 | |
|
316 | 225 | def complete_request(self, stream, ident, parent): |
|
317 | 226 | matches = {'matches' : self.complete(parent), |
|
318 | 227 | 'status' : 'ok'} |
|
319 | 228 | completion_msg = self.session.send(stream, 'complete_reply', |
|
320 | 229 | matches, parent, ident) |
|
321 | 230 | # print >> sys.__stdout__, completion_msg |
|
322 | 231 | |
|
323 | 232 | def complete(self, msg): |
|
324 | 233 | return self.completer.complete(msg.content.line, msg.content.text) |
|
325 | 234 | |
|
326 | 235 | def apply_request(self, stream, ident, parent): |
|
327 | 236 | print (parent) |
|
328 | 237 | try: |
|
329 | 238 | content = parent[u'content'] |
|
330 | 239 | bufs = parent[u'buffers'] |
|
331 | 240 | msg_id = parent['header']['msg_id'] |
|
332 | 241 | bound = content.get('bound', False) |
|
333 | 242 | except: |
|
334 | 243 | print("Got bad msg: ", file=sys.__stderr__) |
|
335 | 244 | print(Message(parent), file=sys.__stderr__) |
|
336 | 245 | return |
|
337 | 246 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
338 | # self.pub_stream.send(pyin_msg) | |
|
339 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) | |
|
247 | # self.iopub_stream.send(pyin_msg) | |
|
248 | # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) | |
|
340 | 249 | sub = {'dependencies_met' : True, 'engine' : self.identity} |
|
341 | 250 | try: |
|
342 | 251 | # allow for not overriding displayhook |
|
343 | 252 | if hasattr(sys.displayhook, 'set_parent'): |
|
344 | 253 | sys.displayhook.set_parent(parent) |
|
345 | 254 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns |
|
346 | 255 | if bound: |
|
347 | 256 | working = self.user_ns |
|
348 | 257 | suffix = str(msg_id).replace("-","") |
|
349 | 258 | prefix = "_" |
|
350 | 259 | |
|
351 | 260 | else: |
|
352 | 261 | working = dict() |
|
353 | 262 | suffix = prefix = "_" # prevent keyword collisions with lambda |
|
354 | 263 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) |
|
355 | 264 | # if f.fun |
|
356 | 265 | fname = prefix+f.func_name.strip('<>')+suffix |
|
357 | 266 | argname = prefix+"args"+suffix |
|
358 | 267 | kwargname = prefix+"kwargs"+suffix |
|
359 | 268 | resultname = prefix+"result"+suffix |
|
360 | 269 | |
|
361 | 270 | ns = { fname : f, argname : args, kwargname : kwargs } |
|
362 | 271 | # print ns |
|
363 | 272 | working.update(ns) |
|
364 | 273 | code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) |
|
365 | 274 | exec code in working, working |
|
366 | 275 | result = working.get(resultname) |
|
367 | 276 | # clear the namespace |
|
368 | 277 | if bound: |
|
369 | 278 | for key in ns.iterkeys(): |
|
370 | 279 | self.user_ns.pop(key) |
|
371 | 280 | else: |
|
372 | 281 | del working |
|
373 | 282 | |
|
374 | 283 | packed_result,buf = serialize_object(result) |
|
375 | 284 | result_buf = [packed_result]+buf |
|
376 | 285 | except: |
|
377 | 286 | result = u'error' |
|
378 | 287 | etype, evalue, tb = sys.exc_info() |
|
379 | 288 | tb = traceback.format_exception(etype, evalue, tb) |
|
380 | 289 | exc_content = { |
|
381 | 290 | u'status' : u'error', |
|
382 | 291 | u'traceback' : tb, |
|
383 | 292 | u'etype' : unicode(etype), |
|
384 | 293 | u'evalue' : unicode(evalue) |
|
385 | 294 | } |
|
386 | 295 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
387 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) | |
|
296 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) | |
|
388 | 297 | reply_content = exc_content |
|
389 | 298 | result_buf = [] |
|
390 | 299 | |
|
391 | 300 | if etype is UnmetDependency: |
|
392 | 301 | sub['dependencies_met'] = False |
|
393 | 302 | else: |
|
394 | 303 | reply_content = {'status' : 'ok'} |
|
395 | 304 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) |
|
396 | 305 | # self.reply_socket.send(ident, zmq.SNDMORE) |
|
397 | 306 | # self.reply_socket.send_json(reply_msg) |
|
398 | 307 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, |
|
399 | 308 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) |
|
400 | 309 | print(Message(reply_msg), file=sys.__stdout__) |
|
401 | 310 | # if reply_msg['content']['status'] == u'error': |
|
402 | 311 | # self.abort_queues() |
|
403 | 312 | |
|
404 | 313 | def dispatch_queue(self, stream, msg): |
|
405 | 314 | self.control_stream.flush() |
|
406 | 315 | idents,msg = self.session.feed_identities(msg, copy=False) |
|
407 | 316 | msg = self.session.unpack_message(msg, content=True, copy=False) |
|
408 | 317 | |
|
409 | 318 | header = msg['header'] |
|
410 | 319 | msg_id = header['msg_id'] |
|
411 | 320 | if self.check_aborted(msg_id): |
|
412 | 321 | self.aborted.remove(msg_id) |
|
413 | 322 | # is it safe to assume a msg_id will not be resubmitted? |
|
414 | 323 | reply_type = msg['msg_type'].split('_')[0] + '_reply' |
|
415 | 324 | reply_msg = self.session.send(stream, reply_type, |
|
416 | 325 | content={'status' : 'aborted'}, parent=msg, ident=idents) |
|
417 | 326 | return |
|
418 |
handler = self. |
|
|
327 | handler = self.shell_handlers.get(msg['msg_type'], None) | |
|
419 | 328 | if handler is None: |
|
420 | 329 | print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__) |
|
421 | 330 | else: |
|
422 | 331 | handler(stream, idents, msg) |
|
423 | 332 | |
|
424 | 333 | def start(self): |
|
425 | 334 | #### stream mode: |
|
426 | 335 | if self.control_stream: |
|
427 | 336 | self.control_stream.on_recv(self.dispatch_control, copy=False) |
|
428 | 337 | self.control_stream.on_err(printer) |
|
429 | if self.reply_stream: | |
|
430 |
|
|
|
431 | self.dispatch_queue(self.reply_stream, msg), copy=False) | |
|
432 | self.reply_stream.on_err(printer) | |
|
433 | if self.task_stream: | |
|
434 | self.task_stream.on_recv(lambda msg: | |
|
435 | self.dispatch_queue(self.task_stream, msg), copy=False) | |
|
436 |
self. |
|
|
338 | ||
|
339 | for s in self.shell_streams: | |
|
340 | s.on_recv(lambda msg: | |
|
341 | self.dispatch_queue(s, msg), copy=False) | |
|
342 | s.on_err(printer) | |
|
343 | ||
|
344 | if self.iopub_stream: | |
|
345 | self.iopub_stream.on_err(printer) | |
|
346 | self.iopub_stream.on_send(printer) | |
|
437 | 347 | |
|
438 | 348 | #### while True mode: |
|
439 | 349 | # while True: |
|
440 | 350 | # idle = True |
|
441 | 351 | # try: |
|
442 |
# msg = self. |
|
|
352 | # msg = self.shell_stream.socket.recv_multipart( | |
|
443 | 353 | # zmq.NOBLOCK, copy=False) |
|
444 | 354 | # except zmq.ZMQError, e: |
|
445 | 355 | # if e.errno != zmq.EAGAIN: |
|
446 | 356 | # raise e |
|
447 | 357 | # else: |
|
448 | 358 | # idle=False |
|
449 |
# self.dispatch_queue(self. |
|
|
359 | # self.dispatch_queue(self.shell_stream, msg) | |
|
450 | 360 | # |
|
451 | 361 | # if not self.task_stream.empty(): |
|
452 | 362 | # idle=False |
|
453 | 363 | # msg = self.task_stream.recv_multipart() |
|
454 | 364 | # self.dispatch_queue(self.task_stream, msg) |
|
455 | 365 | # if idle: |
|
456 | 366 | # # don't busywait |
|
457 | 367 | # time.sleep(1e-3) |
|
458 | 368 | |
|
459 | ||
|
460 | def main(): | |
|
461 | raise Exception("Don't run me anymore") | |
|
369 | def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, | |
|
370 | client_addr=None, loop=None, context=None): | |
|
371 | # create loop, context, and session: | |
|
372 | if loop is None: | |
|
462 | 373 | loop = ioloop.IOLoop.instance() |
|
463 | c = zmq.Context() | |
|
464 | ||
|
465 | ip = '127.0.0.1' | |
|
466 | port_base = 5575 | |
|
467 | connection = ('tcp://%s' % ip) + ':%i' | |
|
468 | rep_conn = connection % port_base | |
|
469 | pub_conn = connection % (port_base+1) | |
|
470 | ||
|
471 | print("Starting the kernel...", file=sys.__stdout__) | |
|
472 | # print >>sys.__stdout__, "XREQ Channel:", rep_conn | |
|
473 | # print >>sys.__stdout__, "PUB Channel:", pub_conn | |
|
474 | ||
|
475 | session = StreamSession(username=u'kernel') | |
|
476 | ||
|
477 | reply_socket = c.socket(zmq.XREQ) | |
|
478 | reply_socket.connect(rep_conn) | |
|
479 | ||
|
480 | pub_socket = c.socket(zmq.PUB) | |
|
481 | pub_socket.connect(pub_conn) | |
|
482 | ||
|
483 | stdout = OutStream(session, pub_socket, u'stdout') | |
|
484 | stderr = OutStream(session, pub_socket, u'stderr') | |
|
485 | sys.stdout = stdout | |
|
486 | sys.stderr = stderr | |
|
487 | ||
|
488 | display_hook = DisplayHook(session, pub_socket) | |
|
489 | sys.displayhook = display_hook | |
|
490 | reply_stream = zmqstream.ZMQStream(reply_socket,loop) | |
|
491 | pub_stream = zmqstream.ZMQStream(pub_socket,loop) | |
|
492 | kernel = Kernel(session, reply_stream, pub_stream) | |
|
493 | ||
|
494 | # For debugging convenience, put sleep and a string in the namespace, so we | |
|
495 | # have them every time we start. | |
|
496 | kernel.user_ns['sleep'] = time.sleep | |
|
497 | kernel.user_ns['s'] = 'Test string' | |
|
374 | if context is None: | |
|
375 | context = zmq.Context() | |
|
376 | c = context | |
|
377 | session = StreamSession() | |
|
378 | print (control_addr, shell_addrs, iopub_addr, hb_addrs) | |
|
379 | ||
|
380 | # create Control Stream | |
|
381 | control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) | |
|
382 | control_stream.setsockopt(zmq.IDENTITY, identity) | |
|
383 | control_stream.connect(control_addr) | |
|
384 | ||
|
385 | # create Shell Streams (MUX, Task, etc.): | |
|
386 | shell_streams = [] | |
|
387 | for addr in shell_addrs: | |
|
388 | stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) | |
|
389 | stream.setsockopt(zmq.IDENTITY, identity) | |
|
390 | stream.connect(addr) | |
|
391 | shell_streams.append(stream) | |
|
392 | ||
|
393 | # create iopub stream: | |
|
394 | iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop) | |
|
395 | iopub_stream.setsockopt(zmq.IDENTITY, identity) | |
|
396 | iopub_stream.connect(iopub_addr) | |
|
397 | ||
|
398 | # launch heartbeat | |
|
399 | heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) | |
|
400 | heart.start() | |
|
401 | ||
|
402 | # create (optional) Client | |
|
403 | if client_addr: | |
|
404 | client = Client(client_addr, username=identity) | |
|
405 | else: | |
|
406 | client = None | |
|
498 | 407 | |
|
499 | print ("Use Ctrl-\\ (NOT Ctrl-C!) to terminate.", file=sys.__stdout__) | |
|
408 | kernel = Kernel(session=session, control_stream=control_stream, | |
|
409 | shell_streams=shell_streams, iopub_stream=iopub_stream, | |
|
410 | client=client) | |
|
500 | 411 | kernel.start() |
|
501 | loop.start() | |
|
502 | ||
|
412 | return loop, c | |
|
503 | 413 | |
|
504 | if __name__ == '__main__': | |
|
505 | main() |
General Comments 0
You need to be logged in to leave comments.
Login now