##// END OF EJS Templates
Parallel kernel/engine startup looks a bit more like pykernel
MinRK -
Show More
@@ -1,148 +1,132 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple engine that talks to a controller over 0MQ.
2 """A simple engine that talks to a controller over 0MQ.
3 it handles registration, etc. and launches a kernel
3 it handles registration, etc. and launches a kernel
4 connected to the Controller's queue(s).
4 connected to the Controller's queue(s).
5 """
5 """
6 from __future__ import print_function
6 from __future__ import print_function
7 import sys
7 import sys
8 import time
8 import time
9 import traceback
9 import traceback
10 import uuid
10 import uuid
11 from pprint import pprint
11 from pprint import pprint
12
12
13 import zmq
13 import zmq
14 from zmq.eventloop import ioloop, zmqstream
14 from zmq.eventloop import ioloop, zmqstream
15
15
16 from IPython.utils.traitlets import HasTraits
17 from IPython.utils.localinterfaces import LOCALHOST
18
16 from streamsession import Message, StreamSession
19 from streamsession import Message, StreamSession
17 from client import Client
20 from client import Client
18 import streamkernel as kernel
21 from streamkernel import Kernel, make_kernel
19 import heartmonitor
22 import heartmonitor
20 from entry_point import make_base_argument_parser, connect_logger, parse_url
23 from entry_point import make_base_argument_parser, connect_logger, parse_url
21 # import taskthread
24 # import taskthread
22 # from log import logger
25 # from log import logger
23
26
24
27
25 def printer(*msg):
28 def printer(*msg):
26 pprint(msg)
29 pprint(msg)
27
30
28 class Engine(object):
31 class Engine(object):
29 """IPython engine"""
32 """IPython engine"""
30
33
31 id=None
34 id=None
32 context=None
35 context=None
33 loop=None
36 loop=None
34 session=None
37 session=None
35 ident=None
38 ident=None
36 registrar=None
39 registrar=None
37 heart=None
40 heart=None
38 kernel=None
41 kernel=None
39
42
40 def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None):
43 def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None):
41 self.context = context
44 self.context = context
42 self.loop = loop
45 self.loop = loop
43 self.session = session
46 self.session = session
44 self.registrar = registrar
47 self.registrar = registrar
45 self.client = client
48 self.client = client
46 self.ident = ident if ident else str(uuid.uuid4())
49 self.ident = ident if ident else str(uuid.uuid4())
47 self.registrar.on_send(printer)
50 self.registrar.on_send(printer)
48
51
49 def register(self):
52 def register(self):
50
53
51 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
54 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
52 self.registrar.on_recv(self.complete_registration)
55 self.registrar.on_recv(self.complete_registration)
53 self.session.send(self.registrar, "registration_request",content=content)
56 self.session.send(self.registrar, "registration_request",content=content)
54
57
55 def complete_registration(self, msg):
58 def complete_registration(self, msg):
56 # print msg
59 # print msg
57 idents,msg = self.session.feed_identities(msg)
60 idents,msg = self.session.feed_identities(msg)
58 msg = Message(self.session.unpack_message(msg))
61 msg = Message(self.session.unpack_message(msg))
59 if msg.content.status == 'ok':
62 if msg.content.status == 'ok':
60 self.session.username = str(msg.content.id)
63 self.session.username = str(msg.content.id)
61 queue_addr = msg.content.queue
64 queue_addr = msg.content.queue
62 if queue_addr:
65 shell_addrs = [str(queue_addr)]
63 queue = self.context.socket(zmq.PAIR)
66 control_addr = str(msg.content.control)
64 queue.setsockopt(zmq.IDENTITY, self.ident)
65 queue.connect(str(queue_addr))
66 self.queue = zmqstream.ZMQStream(queue, self.loop)
67
68 control_addr = msg.content.control
69 if control_addr:
70 control = self.context.socket(zmq.PAIR)
71 control.setsockopt(zmq.IDENTITY, self.ident)
72 control.connect(str(control_addr))
73 self.control = zmqstream.ZMQStream(control, self.loop)
74
75 task_addr = msg.content.task
67 task_addr = msg.content.task
76 print (task_addr)
77 if task_addr:
68 if task_addr:
78 # task as stream:
69 shell_addrs.append(str(task_addr))
79 task = self.context.socket(zmq.PAIR)
80 task.setsockopt(zmq.IDENTITY, self.ident)
81 task.connect(str(task_addr))
82 self.task_stream = zmqstream.ZMQStream(task, self.loop)
83 # TaskThread:
84 # mon_addr = msg.content.monitor
85 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
86 # task.connect_in(str(task_addr))
87 # task.connect_out(str(mon_addr))
88 # self.task_stream = taskthread.QueueStream(*task.queues)
89 # task.start()
90
70
91 hbs = msg.content.heartbeat
71 hb_addrs = msg.content.heartbeat
92 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
93 self.heart.start()
94 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
72 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
95 # placeholder for now:
73
96 pub = self.context.socket(zmq.PUB)
74 # placeholder for no, since pub isn't hooked up:
97 pub = zmqstream.ZMQStream(pub, self.loop)
75 sub = self.context.socket(zmq.SUB)
98 # create and start the kernel
76 sub = zmqstream.ZMQStream(sub, self.loop)
99 self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
77 sub.on_recv(lambda *a: None)
100 self.kernel.start()
78 port = sub.bind_to_random_port("tcp://%s"%LOCALHOST)
79 iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345)
80
81 make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs,
82 client_addr=None, loop=self.loop, context=self.context)
83
101 else:
84 else:
102 # logger.error("Registration Failed: %s"%msg)
85 # logger.error("Registration Failed: %s"%msg)
103 raise Exception("Registration Failed: %s"%msg)
86 raise Exception("Registration Failed: %s"%msg)
104
87
105 # logger.info("engine::completed registration with id %s"%self.session.username)
88 # logger.info("engine::completed registration with id %s"%self.session.username)
106
89
107 print (msg)
90 print (msg)
108
91
109 def unregister(self):
92 def unregister(self):
110 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
93 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
111 time.sleep(1)
94 time.sleep(1)
112 sys.exit(0)
95 sys.exit(0)
113
96
114 def start(self):
97 def start(self):
115 print ("registering")
98 print ("registering")
116 self.register()
99 self.register()
100
117
101
118
102
119 def main():
103 def main():
120
104
121 parser = make_base_argument_parser()
105 parser = make_base_argument_parser()
122
106
123 args = parser.parse_args()
107 args = parser.parse_args()
124
108
125 parse_url(args)
109 parse_url(args)
126
110
127 iface="%s://%s"%(args.transport,args.ip)+':%i'
111 iface="%s://%s"%(args.transport,args.ip)+':%i'
128
112
129 loop = ioloop.IOLoop.instance()
113 loop = ioloop.IOLoop.instance()
130 session = StreamSession()
114 session = StreamSession()
131 ctx = zmq.Context()
115 ctx = zmq.Context()
132
116
133 # setup logging
117 # setup logging
134 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
118 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
135
119
136 reg_conn = iface % args.regport
120 reg_conn = iface % args.regport
137 print (reg_conn)
121 print (reg_conn)
138 print ("Starting the engine...", file=sys.__stderr__)
122 print ("Starting the engine...", file=sys.__stderr__)
139
123
140 reg = ctx.socket(zmq.PAIR)
124 reg = ctx.socket(zmq.PAIR)
141 reg.connect(reg_conn)
125 reg.connect(reg_conn)
142 reg = zmqstream.ZMQStream(reg, loop)
126 reg = zmqstream.ZMQStream(reg, loop)
143 client = Client(reg_conn)
127 client = Client(reg_conn)
144
128
145 e = Engine(ctx, loop, session, reg, client, args.ident)
129 e = Engine(ctx, loop, session, reg, client, args.ident)
146 dc = ioloop.DelayedCallback(e.start, 100, loop)
130 dc = ioloop.DelayedCallback(e.start, 100, loop)
147 dc.start()
131 dc.start()
148 loop.start() No newline at end of file
132 loop.start()
@@ -1,505 +1,413 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
4 """
5
5
6 #-----------------------------------------------------------------------------
7 # Imports
8 #-----------------------------------------------------------------------------
9
10 # Standard library imports.
6 from __future__ import print_function
11 from __future__ import print_function
7 import __builtin__
12 import __builtin__
13 from code import CommandCompiler
8 import os
14 import os
9 import sys
15 import sys
10 import time
16 import time
11 import traceback
17 import traceback
12 from signal import SIGTERM, SIGKILL
18 from signal import SIGTERM, SIGKILL
13 from pprint import pprint
19 from pprint import pprint
14
20
15 from code import CommandCompiler
21 # System library imports.
16
17 import zmq
22 import zmq
18 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
19
24
25 # Local imports.
26 from IPython.utils.traitlets import HasTraits, Instance, List
20 from IPython.zmq.completer import KernelCompleter
27 from IPython.zmq.completer import KernelCompleter
21
28
22 from streamsession import StreamSession, Message, extract_header, serialize_object,\
29 from streamsession import StreamSession, Message, extract_header, serialize_object,\
23 unpack_apply_message
30 unpack_apply_message
24 from dependency import UnmetDependency
31 from dependency import UnmetDependency
32 import heartmonitor
33 from client import Client
25
34
26 def printer(*args):
35 def printer(*args):
27 pprint(args)
36 pprint(args)
28
37
29 class OutStream(object):
38 #-----------------------------------------------------------------------------
30 """A file like object that publishes the stream to a 0MQ PUB socket."""
39 # Main kernel class
31
40 #-----------------------------------------------------------------------------
32 def __init__(self, session, pub_socket, name, max_buffer=200):
33 self.session = session
34 self.pub_socket = pub_socket
35 self.name = name
36 self._buffer = []
37 self._buffer_len = 0
38 self.max_buffer = max_buffer
39 self.parent_header = {}
40
41 def set_parent(self, parent):
42 self.parent_header = extract_header(parent)
43
44 def close(self):
45 self.pub_socket = None
46
47 def flush(self):
48 if self.pub_socket is None:
49 raise ValueError(u'I/O operation on closed file')
50 else:
51 if self._buffer:
52 data = ''.join(self._buffer)
53 content = {u'name':self.name, u'data':data}
54 # msg = self.session.msg(u'stream', content=content,
55 # parent=self.parent_header)
56 msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header)
57 # print>>sys.__stdout__, Message(msg)
58 # self.pub_socket.send_json(msg)
59 self._buffer_len = 0
60 self._buffer = []
61
62 def isattr(self):
63 return False
64
65 def next(self):
66 raise IOError('Read not supported on a write only stream.')
67
68 def read(self, size=None):
69 raise IOError('Read not supported on a write only stream.')
70
71 readline=read
72
73 def write(self, s):
74 if self.pub_socket is None:
75 raise ValueError('I/O operation on closed file')
76 else:
77 self._buffer.append(s)
78 self._buffer_len += len(s)
79 self._maybe_send()
80
81 def _maybe_send(self):
82 if '\n' in self._buffer[-1]:
83 self.flush()
84 if self._buffer_len > self.max_buffer:
85 self.flush()
86
87 def writelines(self, sequence):
88 if self.pub_socket is None:
89 raise ValueError('I/O operation on closed file')
90 else:
91 for s in sequence:
92 self.write(s)
93
94
95 class DisplayHook(object):
96
97 def __init__(self, session, pub_socket):
98 self.session = session
99 self.pub_socket = pub_socket
100 self.parent_header = {}
101
102 def __call__(self, obj):
103 if obj is None:
104 return
105
106 __builtin__._ = obj
107 # msg = self.session.msg(u'pyout', {u'data':repr(obj)},
108 # parent=self.parent_header)
109 # self.pub_socket.send_json(msg)
110 self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header)
111
112 def set_parent(self, parent):
113 self.parent_header = extract_header(parent)
114
115
116 class RawInput(object):
117
118 def __init__(self, session, socket):
119 self.session = session
120 self.socket = socket
121
122 def __call__(self, prompt=None):
123 msg = self.session.msg(u'raw_input')
124 self.socket.send_json(msg)
125 while True:
126 try:
127 reply = self.socket.recv_json(zmq.NOBLOCK)
128 except zmq.ZMQError as e:
129 if e.errno == zmq.EAGAIN:
130 pass
131 else:
132 raise
133 else:
134 break
135 return reply[u'content'][u'data']
136
41
42 class Kernel(HasTraits):
137
43
138 class Kernel(object):
44 #---------------------------------------------------------------------------
45 # Kernel interface
46 #---------------------------------------------------------------------------
139
47
140 def __init__(self, session, control_stream, reply_stream, pub_stream,
48 session = Instance(StreamSession)
141 task_stream=None, client=None):
49 shell_streams = Instance(list)
142 self.session = session
50 control_stream = Instance(zmqstream.ZMQStream)
143 self.control_stream = control_stream
51 task_stream = Instance(zmqstream.ZMQStream)
144 # self.control_socket = control_stream.socket
52 iopub_stream = Instance(zmqstream.ZMQStream)
145 self.reply_stream = reply_stream
53 client = Instance(Client)
146 self.identity = self.reply_stream.getsockopt(zmq.IDENTITY)
54
147 self.task_stream = task_stream
55 def __init__(self, **kwargs):
148 self.pub_stream = pub_stream
56 super(Kernel, self).__init__(**kwargs)
149 self.client = client
57 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
150 self.user_ns = {}
58 self.user_ns = {}
151 self.history = []
59 self.history = []
152 self.compiler = CommandCompiler()
60 self.compiler = CommandCompiler()
153 self.completer = KernelCompleter(self.user_ns)
61 self.completer = KernelCompleter(self.user_ns)
154 self.aborted = set()
62 self.aborted = set()
155
63
156 # Build dict of handlers for message types
64 # Build dict of handlers for message types
157 self.queue_handlers = {}
65 self.shell_handlers = {}
158 self.control_handlers = {}
66 self.control_handlers = {}
159 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
67 for msg_type in ['execute_request', 'complete_request', 'apply_request',
160 self.queue_handlers[msg_type] = getattr(self, msg_type)
68 'clear_request']:
69 self.shell_handlers[msg_type] = getattr(self, msg_type)
161
70
162 for msg_type in ['kill_request', 'abort_request', 'clear_request']+self.queue_handlers.keys():
71 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
163 self.control_handlers[msg_type] = getattr(self, msg_type)
72 self.control_handlers[msg_type] = getattr(self, msg_type)
164
73
165 #-------------------- control handlers -----------------------------
74 #-------------------- control handlers -----------------------------
166 def abort_queues(self):
75 def abort_queues(self):
167 for stream in (self.task_stream, self.reply_stream):
76 for stream in self.shell_streams:
168 if stream:
77 if stream:
169 self.abort_queue(stream)
78 self.abort_queue(stream)
170
79
171 def abort_queue(self, stream):
80 def abort_queue(self, stream):
172 while True:
81 while True:
173 try:
82 try:
174 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
83 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
175 except zmq.ZMQError as e:
84 except zmq.ZMQError as e:
176 if e.errno == zmq.EAGAIN:
85 if e.errno == zmq.EAGAIN:
177 break
86 break
178 else:
87 else:
179 return
88 return
180 else:
89 else:
181 if msg is None:
90 if msg is None:
182 return
91 return
183 else:
92 else:
184 idents,msg = msg
93 idents,msg = msg
185
94
186 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
95 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
187 # msg = self.reply_socket.recv_json()
96 # msg = self.reply_socket.recv_json()
188 print ("Aborting:", file=sys.__stdout__)
97 print ("Aborting:", file=sys.__stdout__)
189 print (Message(msg), file=sys.__stdout__)
98 print (Message(msg), file=sys.__stdout__)
190 msg_type = msg['msg_type']
99 msg_type = msg['msg_type']
191 reply_type = msg_type.split('_')[0] + '_reply'
100 reply_type = msg_type.split('_')[0] + '_reply'
192 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
101 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
193 # self.reply_socket.send(ident,zmq.SNDMORE)
102 # self.reply_socket.send(ident,zmq.SNDMORE)
194 # self.reply_socket.send_json(reply_msg)
103 # self.reply_socket.send_json(reply_msg)
195 reply_msg = self.session.send(stream, reply_type,
104 reply_msg = self.session.send(stream, reply_type,
196 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
105 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
197 print(Message(reply_msg), file=sys.__stdout__)
106 print(Message(reply_msg), file=sys.__stdout__)
198 # We need to wait a bit for requests to come in. This can probably
107 # We need to wait a bit for requests to come in. This can probably
199 # be set shorter for true asynchronous clients.
108 # be set shorter for true asynchronous clients.
200 time.sleep(0.05)
109 time.sleep(0.05)
201
110
202 def abort_request(self, stream, ident, parent):
111 def abort_request(self, stream, ident, parent):
203 """abort a specifig msg by id"""
112 """abort a specifig msg by id"""
204 msg_ids = parent['content'].get('msg_ids', None)
113 msg_ids = parent['content'].get('msg_ids', None)
205 if isinstance(msg_ids, basestring):
114 if isinstance(msg_ids, basestring):
206 msg_ids = [msg_ids]
115 msg_ids = [msg_ids]
207 if not msg_ids:
116 if not msg_ids:
208 self.abort_queues()
117 self.abort_queues()
209 for mid in msg_ids:
118 for mid in msg_ids:
210 self.aborted.add(str(mid))
119 self.aborted.add(str(mid))
211
120
212 content = dict(status='ok')
121 content = dict(status='ok')
213 reply_msg = self.session.send(stream, 'abort_reply', content=content,
122 reply_msg = self.session.send(stream, 'abort_reply', content=content,
214 parent=parent, ident=ident)[0]
123 parent=parent, ident=ident)[0]
215 print(Message(reply_msg), file=sys.__stdout__)
124 print(Message(reply_msg), file=sys.__stdout__)
216
125
217 def kill_request(self, stream, idents, parent):
126 def shutdown_request(self, stream, ident, parent):
218 """kill ourself. This should really be handled in an external process"""
127 """kill ourself. This should really be handled in an external process"""
219 self.abort_queues()
128 self.abort_queues()
220 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
129 content = dict(parent['content'])
221 content = dict(status='ok'))
130 msg = self.session.send(self.reply_socket, 'shutdown_reply',
222 # we can know that a message is done if we *don't* use streams, but
131 content, parent, ident)
223 # use a socket directly with MessageTracker
132 msg = self.session.send(self.pub_socket, 'shutdown_reply',
224 time.sleep(.5)
133 content, parent, ident)
225 os.kill(os.getpid(), SIGTERM)
134 # print >> sys.__stdout__, msg
226 time.sleep(1)
135 time.sleep(0.1)
227 os.kill(os.getpid(), SIGKILL)
136 sys.exit(0)
228
229 def clear_request(self, stream, idents, parent):
230 """Clear our namespace."""
231 self.user_ns = {}
232 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
233 content = dict(status='ok'))
234
137
235 def dispatch_control(self, msg):
138 def dispatch_control(self, msg):
236 idents,msg = self.session.feed_identities(msg, copy=False)
139 idents,msg = self.session.feed_identities(msg, copy=False)
237 msg = self.session.unpack_message(msg, content=True, copy=False)
140 msg = self.session.unpack_message(msg, content=True, copy=False)
238
141
239 header = msg['header']
142 header = msg['header']
240 msg_id = header['msg_id']
143 msg_id = header['msg_id']
241
144
242 handler = self.control_handlers.get(msg['msg_type'], None)
145 handler = self.control_handlers.get(msg['msg_type'], None)
243 if handler is None:
146 if handler is None:
244 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
147 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
245 else:
148 else:
246 handler(self.control_stream, idents, msg)
149 handler(self.control_stream, idents, msg)
247
150
248
151
249 #-------------------- queue helpers ------------------------------
152 #-------------------- queue helpers ------------------------------
250
153
251 def check_dependencies(self, dependencies):
154 def check_dependencies(self, dependencies):
252 if not dependencies:
155 if not dependencies:
253 return True
156 return True
254 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
157 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
255 anyorall = dependencies[0]
158 anyorall = dependencies[0]
256 dependencies = dependencies[1]
159 dependencies = dependencies[1]
257 else:
160 else:
258 anyorall = 'all'
161 anyorall = 'all'
259 results = self.client.get_results(dependencies,status_only=True)
162 results = self.client.get_results(dependencies,status_only=True)
260 if results['status'] != 'ok':
163 if results['status'] != 'ok':
261 return False
164 return False
262
165
263 if anyorall == 'any':
166 if anyorall == 'any':
264 if not results['completed']:
167 if not results['completed']:
265 return False
168 return False
266 else:
169 else:
267 if results['pending']:
170 if results['pending']:
268 return False
171 return False
269
172
270 return True
173 return True
271
174
272 def check_aborted(self, msg_id):
175 def check_aborted(self, msg_id):
273 return msg_id in self.aborted
176 return msg_id in self.aborted
274
177
275 #-------------------- queue handlers -----------------------------
178 #-------------------- queue handlers -----------------------------
276
179
180 def clear_request(self, stream, idents, parent):
181 """Clear our namespace."""
182 self.user_ns = {}
183 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
184 content = dict(status='ok'))
185
277 def execute_request(self, stream, ident, parent):
186 def execute_request(self, stream, ident, parent):
278 try:
187 try:
279 code = parent[u'content'][u'code']
188 code = parent[u'content'][u'code']
280 except:
189 except:
281 print("Got bad msg: ", file=sys.__stderr__)
190 print("Got bad msg: ", file=sys.__stderr__)
282 print(Message(parent), file=sys.__stderr__)
191 print(Message(parent), file=sys.__stderr__)
283 return
192 return
284 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
193 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
285 # self.pub_stream.send(pyin_msg)
194 # self.iopub_stream.send(pyin_msg)
286 self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
195 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
287 try:
196 try:
288 comp_code = self.compiler(code, '<zmq-kernel>')
197 comp_code = self.compiler(code, '<zmq-kernel>')
289 # allow for not overriding displayhook
198 # allow for not overriding displayhook
290 if hasattr(sys.displayhook, 'set_parent'):
199 if hasattr(sys.displayhook, 'set_parent'):
291 sys.displayhook.set_parent(parent)
200 sys.displayhook.set_parent(parent)
292 exec comp_code in self.user_ns, self.user_ns
201 exec comp_code in self.user_ns, self.user_ns
293 except:
202 except:
294 # result = u'error'
203 # result = u'error'
295 etype, evalue, tb = sys.exc_info()
204 etype, evalue, tb = sys.exc_info()
296 tb = traceback.format_exception(etype, evalue, tb)
205 tb = traceback.format_exception(etype, evalue, tb)
297 exc_content = {
206 exc_content = {
298 u'status' : u'error',
207 u'status' : u'error',
299 u'traceback' : tb,
208 u'traceback' : tb,
300 u'etype' : unicode(etype),
209 u'etype' : unicode(etype),
301 u'evalue' : unicode(evalue)
210 u'evalue' : unicode(evalue)
302 }
211 }
303 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
212 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
304 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
213 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
305 reply_content = exc_content
214 reply_content = exc_content
306 else:
215 else:
307 reply_content = {'status' : 'ok'}
216 reply_content = {'status' : 'ok'}
308 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
217 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
309 # self.reply_socket.send(ident, zmq.SNDMORE)
218 # self.reply_socket.send(ident, zmq.SNDMORE)
310 # self.reply_socket.send_json(reply_msg)
219 # self.reply_socket.send_json(reply_msg)
311 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
220 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
312 print(Message(reply_msg), file=sys.__stdout__)
221 print(Message(reply_msg), file=sys.__stdout__)
313 if reply_msg['content']['status'] == u'error':
222 if reply_msg['content']['status'] == u'error':
314 self.abort_queues()
223 self.abort_queues()
315
224
316 def complete_request(self, stream, ident, parent):
225 def complete_request(self, stream, ident, parent):
317 matches = {'matches' : self.complete(parent),
226 matches = {'matches' : self.complete(parent),
318 'status' : 'ok'}
227 'status' : 'ok'}
319 completion_msg = self.session.send(stream, 'complete_reply',
228 completion_msg = self.session.send(stream, 'complete_reply',
320 matches, parent, ident)
229 matches, parent, ident)
321 # print >> sys.__stdout__, completion_msg
230 # print >> sys.__stdout__, completion_msg
322
231
323 def complete(self, msg):
232 def complete(self, msg):
324 return self.completer.complete(msg.content.line, msg.content.text)
233 return self.completer.complete(msg.content.line, msg.content.text)
325
234
326 def apply_request(self, stream, ident, parent):
235 def apply_request(self, stream, ident, parent):
327 print (parent)
236 print (parent)
328 try:
237 try:
329 content = parent[u'content']
238 content = parent[u'content']
330 bufs = parent[u'buffers']
239 bufs = parent[u'buffers']
331 msg_id = parent['header']['msg_id']
240 msg_id = parent['header']['msg_id']
332 bound = content.get('bound', False)
241 bound = content.get('bound', False)
333 except:
242 except:
334 print("Got bad msg: ", file=sys.__stderr__)
243 print("Got bad msg: ", file=sys.__stderr__)
335 print(Message(parent), file=sys.__stderr__)
244 print(Message(parent), file=sys.__stderr__)
336 return
245 return
337 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
246 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
338 # self.pub_stream.send(pyin_msg)
247 # self.iopub_stream.send(pyin_msg)
339 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
248 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
340 sub = {'dependencies_met' : True, 'engine' : self.identity}
249 sub = {'dependencies_met' : True, 'engine' : self.identity}
341 try:
250 try:
342 # allow for not overriding displayhook
251 # allow for not overriding displayhook
343 if hasattr(sys.displayhook, 'set_parent'):
252 if hasattr(sys.displayhook, 'set_parent'):
344 sys.displayhook.set_parent(parent)
253 sys.displayhook.set_parent(parent)
345 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
254 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
346 if bound:
255 if bound:
347 working = self.user_ns
256 working = self.user_ns
348 suffix = str(msg_id).replace("-","")
257 suffix = str(msg_id).replace("-","")
349 prefix = "_"
258 prefix = "_"
350
259
351 else:
260 else:
352 working = dict()
261 working = dict()
353 suffix = prefix = "_" # prevent keyword collisions with lambda
262 suffix = prefix = "_" # prevent keyword collisions with lambda
354 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
263 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
355 # if f.fun
264 # if f.fun
356 fname = prefix+f.func_name.strip('<>')+suffix
265 fname = prefix+f.func_name.strip('<>')+suffix
357 argname = prefix+"args"+suffix
266 argname = prefix+"args"+suffix
358 kwargname = prefix+"kwargs"+suffix
267 kwargname = prefix+"kwargs"+suffix
359 resultname = prefix+"result"+suffix
268 resultname = prefix+"result"+suffix
360
269
361 ns = { fname : f, argname : args, kwargname : kwargs }
270 ns = { fname : f, argname : args, kwargname : kwargs }
362 # print ns
271 # print ns
363 working.update(ns)
272 working.update(ns)
364 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
273 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
365 exec code in working, working
274 exec code in working, working
366 result = working.get(resultname)
275 result = working.get(resultname)
367 # clear the namespace
276 # clear the namespace
368 if bound:
277 if bound:
369 for key in ns.iterkeys():
278 for key in ns.iterkeys():
370 self.user_ns.pop(key)
279 self.user_ns.pop(key)
371 else:
280 else:
372 del working
281 del working
373
282
374 packed_result,buf = serialize_object(result)
283 packed_result,buf = serialize_object(result)
375 result_buf = [packed_result]+buf
284 result_buf = [packed_result]+buf
376 except:
285 except:
377 result = u'error'
286 result = u'error'
378 etype, evalue, tb = sys.exc_info()
287 etype, evalue, tb = sys.exc_info()
379 tb = traceback.format_exception(etype, evalue, tb)
288 tb = traceback.format_exception(etype, evalue, tb)
380 exc_content = {
289 exc_content = {
381 u'status' : u'error',
290 u'status' : u'error',
382 u'traceback' : tb,
291 u'traceback' : tb,
383 u'etype' : unicode(etype),
292 u'etype' : unicode(etype),
384 u'evalue' : unicode(evalue)
293 u'evalue' : unicode(evalue)
385 }
294 }
386 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
295 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
387 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
296 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
388 reply_content = exc_content
297 reply_content = exc_content
389 result_buf = []
298 result_buf = []
390
299
391 if etype is UnmetDependency:
300 if etype is UnmetDependency:
392 sub['dependencies_met'] = False
301 sub['dependencies_met'] = False
393 else:
302 else:
394 reply_content = {'status' : 'ok'}
303 reply_content = {'status' : 'ok'}
395 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
304 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
396 # self.reply_socket.send(ident, zmq.SNDMORE)
305 # self.reply_socket.send(ident, zmq.SNDMORE)
397 # self.reply_socket.send_json(reply_msg)
306 # self.reply_socket.send_json(reply_msg)
398 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
307 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
399 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
308 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
400 print(Message(reply_msg), file=sys.__stdout__)
309 print(Message(reply_msg), file=sys.__stdout__)
401 # if reply_msg['content']['status'] == u'error':
310 # if reply_msg['content']['status'] == u'error':
402 # self.abort_queues()
311 # self.abort_queues()
403
312
404 def dispatch_queue(self, stream, msg):
313 def dispatch_queue(self, stream, msg):
405 self.control_stream.flush()
314 self.control_stream.flush()
406 idents,msg = self.session.feed_identities(msg, copy=False)
315 idents,msg = self.session.feed_identities(msg, copy=False)
407 msg = self.session.unpack_message(msg, content=True, copy=False)
316 msg = self.session.unpack_message(msg, content=True, copy=False)
408
317
409 header = msg['header']
318 header = msg['header']
410 msg_id = header['msg_id']
319 msg_id = header['msg_id']
411 if self.check_aborted(msg_id):
320 if self.check_aborted(msg_id):
412 self.aborted.remove(msg_id)
321 self.aborted.remove(msg_id)
413 # is it safe to assume a msg_id will not be resubmitted?
322 # is it safe to assume a msg_id will not be resubmitted?
414 reply_type = msg['msg_type'].split('_')[0] + '_reply'
323 reply_type = msg['msg_type'].split('_')[0] + '_reply'
415 reply_msg = self.session.send(stream, reply_type,
324 reply_msg = self.session.send(stream, reply_type,
416 content={'status' : 'aborted'}, parent=msg, ident=idents)
325 content={'status' : 'aborted'}, parent=msg, ident=idents)
417 return
326 return
418 handler = self.queue_handlers.get(msg['msg_type'], None)
327 handler = self.shell_handlers.get(msg['msg_type'], None)
419 if handler is None:
328 if handler is None:
420 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
329 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
421 else:
330 else:
422 handler(stream, idents, msg)
331 handler(stream, idents, msg)
423
332
424 def start(self):
333 def start(self):
425 #### stream mode:
334 #### stream mode:
426 if self.control_stream:
335 if self.control_stream:
427 self.control_stream.on_recv(self.dispatch_control, copy=False)
336 self.control_stream.on_recv(self.dispatch_control, copy=False)
428 self.control_stream.on_err(printer)
337 self.control_stream.on_err(printer)
429 if self.reply_stream:
338
430 self.reply_stream.on_recv(lambda msg:
339 for s in self.shell_streams:
431 self.dispatch_queue(self.reply_stream, msg), copy=False)
340 s.on_recv(lambda msg:
432 self.reply_stream.on_err(printer)
341 self.dispatch_queue(s, msg), copy=False)
433 if self.task_stream:
342 s.on_err(printer)
434 self.task_stream.on_recv(lambda msg:
343
435 self.dispatch_queue(self.task_stream, msg), copy=False)
344 if self.iopub_stream:
436 self.task_stream.on_err(printer)
345 self.iopub_stream.on_err(printer)
346 self.iopub_stream.on_send(printer)
437
347
438 #### while True mode:
348 #### while True mode:
439 # while True:
349 # while True:
440 # idle = True
350 # idle = True
441 # try:
351 # try:
442 # msg = self.reply_stream.socket.recv_multipart(
352 # msg = self.shell_stream.socket.recv_multipart(
443 # zmq.NOBLOCK, copy=False)
353 # zmq.NOBLOCK, copy=False)
444 # except zmq.ZMQError, e:
354 # except zmq.ZMQError, e:
445 # if e.errno != zmq.EAGAIN:
355 # if e.errno != zmq.EAGAIN:
446 # raise e
356 # raise e
447 # else:
357 # else:
448 # idle=False
358 # idle=False
449 # self.dispatch_queue(self.reply_stream, msg)
359 # self.dispatch_queue(self.shell_stream, msg)
450 #
360 #
451 # if not self.task_stream.empty():
361 # if not self.task_stream.empty():
452 # idle=False
362 # idle=False
453 # msg = self.task_stream.recv_multipart()
363 # msg = self.task_stream.recv_multipart()
454 # self.dispatch_queue(self.task_stream, msg)
364 # self.dispatch_queue(self.task_stream, msg)
455 # if idle:
365 # if idle:
456 # # don't busywait
366 # # don't busywait
457 # time.sleep(1e-3)
367 # time.sleep(1e-3)
458
368
459
369 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
460 def main():
370 client_addr=None, loop=None, context=None):
461 raise Exception("Don't run me anymore")
371 # create loop, context, and session:
462 loop = ioloop.IOLoop.instance()
372 if loop is None:
463 c = zmq.Context()
373 loop = ioloop.IOLoop.instance()
464
374 if context is None:
465 ip = '127.0.0.1'
375 context = zmq.Context()
466 port_base = 5575
376 c = context
467 connection = ('tcp://%s' % ip) + ':%i'
377 session = StreamSession()
468 rep_conn = connection % port_base
378 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
469 pub_conn = connection % (port_base+1)
379
470
380 # create Control Stream
471 print("Starting the kernel...", file=sys.__stdout__)
381 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
472 # print >>sys.__stdout__, "XREQ Channel:", rep_conn
382 control_stream.setsockopt(zmq.IDENTITY, identity)
473 # print >>sys.__stdout__, "PUB Channel:", pub_conn
383 control_stream.connect(control_addr)
474
384
475 session = StreamSession(username=u'kernel')
385 # create Shell Streams (MUX, Task, etc.):
476
386 shell_streams = []
477 reply_socket = c.socket(zmq.XREQ)
387 for addr in shell_addrs:
478 reply_socket.connect(rep_conn)
388 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
479
389 stream.setsockopt(zmq.IDENTITY, identity)
480 pub_socket = c.socket(zmq.PUB)
390 stream.connect(addr)
481 pub_socket.connect(pub_conn)
391 shell_streams.append(stream)
482
483 stdout = OutStream(session, pub_socket, u'stdout')
484 stderr = OutStream(session, pub_socket, u'stderr')
485 sys.stdout = stdout
486 sys.stderr = stderr
487
488 display_hook = DisplayHook(session, pub_socket)
489 sys.displayhook = display_hook
490 reply_stream = zmqstream.ZMQStream(reply_socket,loop)
491 pub_stream = zmqstream.ZMQStream(pub_socket,loop)
492 kernel = Kernel(session, reply_stream, pub_stream)
493
494 # For debugging convenience, put sleep and a string in the namespace, so we
495 # have them every time we start.
496 kernel.user_ns['sleep'] = time.sleep
497 kernel.user_ns['s'] = 'Test string'
498
392
499 print ("Use Ctrl-\\ (NOT Ctrl-C!) to terminate.", file=sys.__stdout__)
393 # create iopub stream:
394 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
395 iopub_stream.setsockopt(zmq.IDENTITY, identity)
396 iopub_stream.connect(iopub_addr)
397
398 # launch heartbeat
399 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
400 heart.start()
401
402 # create (optional) Client
403 if client_addr:
404 client = Client(client_addr, username=identity)
405 else:
406 client = None
407
408 kernel = Kernel(session=session, control_stream=control_stream,
409 shell_streams=shell_streams, iopub_stream=iopub_stream,
410 client=client)
500 kernel.start()
411 kernel.start()
501 loop.start()
412 return loop, c
502
503
413
504 if __name__ == '__main__':
505 main()
General Comments 0
You need to be logged in to leave comments. Login now