##// END OF EJS Templates
Changing -1=>0 for bind to random ports.
Brian Granger -
Show More
@@ -1,361 +1,361 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Finish implementing `raw_input`.
6 * Finish implementing `raw_input`.
7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 call set_parent on all the PUB objects with the message about to be executed.
8 call set_parent on all the PUB objects with the message about to be executed.
9 * Implement random port and security key logic.
9 * Implement random port and security key logic.
10 * Implement control messages.
10 * Implement control messages.
11 * Implement event loop and poll version.
11 * Implement event loop and poll version.
12 """
12 """
13
13
14 # Standard library imports.
14 # Standard library imports.
15 import __builtin__
15 import __builtin__
16 import sys
16 import sys
17 import time
17 import time
18 import traceback
18 import traceback
19 from code import CommandCompiler
19 from code import CommandCompiler
20
20
21 # System library imports.
21 # System library imports.
22 import zmq
22 import zmq
23
23
24 # Local imports.
24 # Local imports.
25 from IPython.external.argparse import ArgumentParser
25 from IPython.external.argparse import ArgumentParser
26 from session import Session, Message, extract_header
26 from session import Session, Message, extract_header
27 from completer import KernelCompleter
27 from completer import KernelCompleter
28
28
29
29
30 class OutStream(object):
30 class OutStream(object):
31 """A file like object that publishes the stream to a 0MQ PUB socket."""
31 """A file like object that publishes the stream to a 0MQ PUB socket."""
32
32
33 def __init__(self, session, pub_socket, name, max_buffer=200):
33 def __init__(self, session, pub_socket, name, max_buffer=200):
34 self.session = session
34 self.session = session
35 self.pub_socket = pub_socket
35 self.pub_socket = pub_socket
36 self.name = name
36 self.name = name
37 self._buffer = []
37 self._buffer = []
38 self._buffer_len = 0
38 self._buffer_len = 0
39 self.max_buffer = max_buffer
39 self.max_buffer = max_buffer
40 self.parent_header = {}
40 self.parent_header = {}
41
41
42 def set_parent(self, parent):
42 def set_parent(self, parent):
43 self.parent_header = extract_header(parent)
43 self.parent_header = extract_header(parent)
44
44
45 def close(self):
45 def close(self):
46 self.pub_socket = None
46 self.pub_socket = None
47
47
48 def flush(self):
48 def flush(self):
49 if self.pub_socket is None:
49 if self.pub_socket is None:
50 raise ValueError(u'I/O operation on closed file')
50 raise ValueError(u'I/O operation on closed file')
51 else:
51 else:
52 if self._buffer:
52 if self._buffer:
53 data = ''.join(self._buffer)
53 data = ''.join(self._buffer)
54 content = {u'name':self.name, u'data':data}
54 content = {u'name':self.name, u'data':data}
55 msg = self.session.msg(u'stream', content=content,
55 msg = self.session.msg(u'stream', content=content,
56 parent=self.parent_header)
56 parent=self.parent_header)
57 print>>sys.__stdout__, Message(msg)
57 print>>sys.__stdout__, Message(msg)
58 self.pub_socket.send_json(msg)
58 self.pub_socket.send_json(msg)
59 self._buffer_len = 0
59 self._buffer_len = 0
60 self._buffer = []
60 self._buffer = []
61
61
62 def isattr(self):
62 def isattr(self):
63 return False
63 return False
64
64
65 def next(self):
65 def next(self):
66 raise IOError('Read not supported on a write only stream.')
66 raise IOError('Read not supported on a write only stream.')
67
67
68 def read(self, size=None):
68 def read(self, size=None):
69 raise IOError('Read not supported on a write only stream.')
69 raise IOError('Read not supported on a write only stream.')
70
70
71 readline=read
71 readline=read
72
72
73 def write(self, s):
73 def write(self, s):
74 if self.pub_socket is None:
74 if self.pub_socket is None:
75 raise ValueError('I/O operation on closed file')
75 raise ValueError('I/O operation on closed file')
76 else:
76 else:
77 self._buffer.append(s)
77 self._buffer.append(s)
78 self._buffer_len += len(s)
78 self._buffer_len += len(s)
79 self._maybe_send()
79 self._maybe_send()
80
80
81 def _maybe_send(self):
81 def _maybe_send(self):
82 if '\n' in self._buffer[-1]:
82 if '\n' in self._buffer[-1]:
83 self.flush()
83 self.flush()
84 if self._buffer_len > self.max_buffer:
84 if self._buffer_len > self.max_buffer:
85 self.flush()
85 self.flush()
86
86
87 def writelines(self, sequence):
87 def writelines(self, sequence):
88 if self.pub_socket is None:
88 if self.pub_socket is None:
89 raise ValueError('I/O operation on closed file')
89 raise ValueError('I/O operation on closed file')
90 else:
90 else:
91 for s in sequence:
91 for s in sequence:
92 self.write(s)
92 self.write(s)
93
93
94
94
95 class DisplayHook(object):
95 class DisplayHook(object):
96
96
97 def __init__(self, session, pub_socket):
97 def __init__(self, session, pub_socket):
98 self.session = session
98 self.session = session
99 self.pub_socket = pub_socket
99 self.pub_socket = pub_socket
100 self.parent_header = {}
100 self.parent_header = {}
101
101
102 def __call__(self, obj):
102 def __call__(self, obj):
103 if obj is None:
103 if obj is None:
104 return
104 return
105
105
106 __builtin__._ = obj
106 __builtin__._ = obj
107 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
107 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
108 parent=self.parent_header)
108 parent=self.parent_header)
109 self.pub_socket.send_json(msg)
109 self.pub_socket.send_json(msg)
110
110
111 def set_parent(self, parent):
111 def set_parent(self, parent):
112 self.parent_header = extract_header(parent)
112 self.parent_header = extract_header(parent)
113
113
114
114
115 class RawInput(object):
115 class RawInput(object):
116
116
117 def __init__(self, session, socket):
117 def __init__(self, session, socket):
118 self.session = session
118 self.session = session
119 self.socket = socket
119 self.socket = socket
120
120
121 def __call__(self, prompt=None):
121 def __call__(self, prompt=None):
122 msg = self.session.msg(u'raw_input')
122 msg = self.session.msg(u'raw_input')
123 self.socket.send_json(msg)
123 self.socket.send_json(msg)
124 while True:
124 while True:
125 try:
125 try:
126 reply = self.socket.recv_json(zmq.NOBLOCK)
126 reply = self.socket.recv_json(zmq.NOBLOCK)
127 except zmq.ZMQError, e:
127 except zmq.ZMQError, e:
128 if e.errno == zmq.EAGAIN:
128 if e.errno == zmq.EAGAIN:
129 pass
129 pass
130 else:
130 else:
131 raise
131 raise
132 else:
132 else:
133 break
133 break
134 return reply[u'content'][u'data']
134 return reply[u'content'][u'data']
135
135
136
136
137 class Kernel(object):
137 class Kernel(object):
138
138
139 def __init__(self, session, reply_socket, pub_socket):
139 def __init__(self, session, reply_socket, pub_socket):
140 self.session = session
140 self.session = session
141 self.reply_socket = reply_socket
141 self.reply_socket = reply_socket
142 self.pub_socket = pub_socket
142 self.pub_socket = pub_socket
143 self.user_ns = {}
143 self.user_ns = {}
144 self.history = []
144 self.history = []
145 self.compiler = CommandCompiler()
145 self.compiler = CommandCompiler()
146 self.completer = KernelCompleter(self.user_ns)
146 self.completer = KernelCompleter(self.user_ns)
147
147
148 # Build dict of handlers for message types
148 # Build dict of handlers for message types
149 msg_types = [ 'execute_request', 'complete_request',
149 msg_types = [ 'execute_request', 'complete_request',
150 'object_info_request' ]
150 'object_info_request' ]
151 self.handlers = {}
151 self.handlers = {}
152 for msg_type in msg_types:
152 for msg_type in msg_types:
153 self.handlers[msg_type] = getattr(self, msg_type)
153 self.handlers[msg_type] = getattr(self, msg_type)
154
154
155 def abort_queue(self):
155 def abort_queue(self):
156 while True:
156 while True:
157 try:
157 try:
158 ident = self.reply_socket.recv(zmq.NOBLOCK)
158 ident = self.reply_socket.recv(zmq.NOBLOCK)
159 except zmq.ZMQError, e:
159 except zmq.ZMQError, e:
160 if e.errno == zmq.EAGAIN:
160 if e.errno == zmq.EAGAIN:
161 break
161 break
162 else:
162 else:
163 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
163 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
164 msg = self.reply_socket.recv_json()
164 msg = self.reply_socket.recv_json()
165 print>>sys.__stdout__, "Aborting:"
165 print>>sys.__stdout__, "Aborting:"
166 print>>sys.__stdout__, Message(msg)
166 print>>sys.__stdout__, Message(msg)
167 msg_type = msg['msg_type']
167 msg_type = msg['msg_type']
168 reply_type = msg_type.split('_')[0] + '_reply'
168 reply_type = msg_type.split('_')[0] + '_reply'
169 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
169 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
170 print>>sys.__stdout__, Message(reply_msg)
170 print>>sys.__stdout__, Message(reply_msg)
171 self.reply_socket.send(ident,zmq.SNDMORE)
171 self.reply_socket.send(ident,zmq.SNDMORE)
172 self.reply_socket.send_json(reply_msg)
172 self.reply_socket.send_json(reply_msg)
173 # We need to wait a bit for requests to come in. This can probably
173 # We need to wait a bit for requests to come in. This can probably
174 # be set shorter for true asynchronous clients.
174 # be set shorter for true asynchronous clients.
175 time.sleep(0.1)
175 time.sleep(0.1)
176
176
177 def execute_request(self, ident, parent):
177 def execute_request(self, ident, parent):
178 try:
178 try:
179 code = parent[u'content'][u'code']
179 code = parent[u'content'][u'code']
180 except:
180 except:
181 print>>sys.__stderr__, "Got bad msg: "
181 print>>sys.__stderr__, "Got bad msg: "
182 print>>sys.__stderr__, Message(parent)
182 print>>sys.__stderr__, Message(parent)
183 return
183 return
184 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
184 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
185 self.pub_socket.send_json(pyin_msg)
185 self.pub_socket.send_json(pyin_msg)
186 try:
186 try:
187 comp_code = self.compiler(code, '<zmq-kernel>')
187 comp_code = self.compiler(code, '<zmq-kernel>')
188 sys.displayhook.set_parent(parent)
188 sys.displayhook.set_parent(parent)
189 exec comp_code in self.user_ns, self.user_ns
189 exec comp_code in self.user_ns, self.user_ns
190 except:
190 except:
191 result = u'error'
191 result = u'error'
192 etype, evalue, tb = sys.exc_info()
192 etype, evalue, tb = sys.exc_info()
193 tb = traceback.format_exception(etype, evalue, tb)
193 tb = traceback.format_exception(etype, evalue, tb)
194 exc_content = {
194 exc_content = {
195 u'status' : u'error',
195 u'status' : u'error',
196 u'traceback' : tb,
196 u'traceback' : tb,
197 u'etype' : unicode(etype),
197 u'etype' : unicode(etype),
198 u'evalue' : unicode(evalue)
198 u'evalue' : unicode(evalue)
199 }
199 }
200 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
200 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
201 self.pub_socket.send_json(exc_msg)
201 self.pub_socket.send_json(exc_msg)
202 reply_content = exc_content
202 reply_content = exc_content
203 else:
203 else:
204 reply_content = {'status' : 'ok'}
204 reply_content = {'status' : 'ok'}
205 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
205 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
206 print>>sys.__stdout__, Message(reply_msg)
206 print>>sys.__stdout__, Message(reply_msg)
207 self.reply_socket.send(ident, zmq.SNDMORE)
207 self.reply_socket.send(ident, zmq.SNDMORE)
208 self.reply_socket.send_json(reply_msg)
208 self.reply_socket.send_json(reply_msg)
209 if reply_msg['content']['status'] == u'error':
209 if reply_msg['content']['status'] == u'error':
210 self.abort_queue()
210 self.abort_queue()
211
211
212 def complete_request(self, ident, parent):
212 def complete_request(self, ident, parent):
213 matches = {'matches' : self.complete(parent),
213 matches = {'matches' : self.complete(parent),
214 'status' : 'ok'}
214 'status' : 'ok'}
215 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
215 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
216 matches, parent, ident)
216 matches, parent, ident)
217 print >> sys.__stdout__, completion_msg
217 print >> sys.__stdout__, completion_msg
218
218
219 def complete(self, msg):
219 def complete(self, msg):
220 return self.completer.complete(msg.content.line, msg.content.text)
220 return self.completer.complete(msg.content.line, msg.content.text)
221
221
222 def object_info_request(self, ident, parent):
222 def object_info_request(self, ident, parent):
223 context = parent['content']['oname'].split('.')
223 context = parent['content']['oname'].split('.')
224 object_info = self.object_info(context)
224 object_info = self.object_info(context)
225 msg = self.session.send(self.reply_socket, 'object_info_reply',
225 msg = self.session.send(self.reply_socket, 'object_info_reply',
226 object_info, parent, ident)
226 object_info, parent, ident)
227 print >> sys.__stdout__, msg
227 print >> sys.__stdout__, msg
228
228
229 def object_info(self, context):
229 def object_info(self, context):
230 symbol, leftover = self.symbol_from_context(context)
230 symbol, leftover = self.symbol_from_context(context)
231 if symbol is not None and not leftover:
231 if symbol is not None and not leftover:
232 doc = getattr(symbol, '__doc__', '')
232 doc = getattr(symbol, '__doc__', '')
233 else:
233 else:
234 doc = ''
234 doc = ''
235 object_info = dict(docstring = doc)
235 object_info = dict(docstring = doc)
236 return object_info
236 return object_info
237
237
238 def symbol_from_context(self, context):
238 def symbol_from_context(self, context):
239 if not context:
239 if not context:
240 return None, context
240 return None, context
241
241
242 base_symbol_string = context[0]
242 base_symbol_string = context[0]
243 symbol = self.user_ns.get(base_symbol_string, None)
243 symbol = self.user_ns.get(base_symbol_string, None)
244 if symbol is None:
244 if symbol is None:
245 symbol = __builtin__.__dict__.get(base_symbol_string, None)
245 symbol = __builtin__.__dict__.get(base_symbol_string, None)
246 if symbol is None:
246 if symbol is None:
247 return None, context
247 return None, context
248
248
249 context = context[1:]
249 context = context[1:]
250 for i, name in enumerate(context):
250 for i, name in enumerate(context):
251 new_symbol = getattr(symbol, name, None)
251 new_symbol = getattr(symbol, name, None)
252 if new_symbol is None:
252 if new_symbol is None:
253 return symbol, context[i:]
253 return symbol, context[i:]
254 else:
254 else:
255 symbol = new_symbol
255 symbol = new_symbol
256
256
257 return symbol, []
257 return symbol, []
258
258
259 def start(self):
259 def start(self):
260 while True:
260 while True:
261 ident = self.reply_socket.recv()
261 ident = self.reply_socket.recv()
262 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
262 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
263 msg = self.reply_socket.recv_json()
263 msg = self.reply_socket.recv_json()
264 omsg = Message(msg)
264 omsg = Message(msg)
265 print>>sys.__stdout__
265 print>>sys.__stdout__
266 print>>sys.__stdout__, omsg
266 print>>sys.__stdout__, omsg
267 handler = self.handlers.get(omsg.msg_type, None)
267 handler = self.handlers.get(omsg.msg_type, None)
268 if handler is None:
268 if handler is None:
269 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
269 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
270 else:
270 else:
271 handler(ident, omsg)
271 handler(ident, omsg)
272
272
273
273
274 def bind_port(socket, ip, port):
274 def bind_port(socket, ip, port):
275 """ Binds the specified ZMQ socket. If the port is less than zero, a random
275 """ Binds the specified ZMQ socket. If the port is less than zero, a random
276 port is chosen. Returns the port that was bound.
276 port is chosen. Returns the port that was bound.
277 """
277 """
278 connection = 'tcp://%s' % ip
278 connection = 'tcp://%s' % ip
279 if port < 0:
279 if port < 0:
280 port = socket.bind_to_random_port(connection)
280 port = socket.bind_to_random_port(connection)
281 else:
281 else:
282 connection += ':%i' % port
282 connection += ':%i' % port
283 socket.bind(connection)
283 socket.bind(connection)
284 return port
284 return port
285
285
286 def main():
286 def main():
287 """ Main entry point for launching a kernel.
287 """ Main entry point for launching a kernel.
288 """
288 """
289 # Parse command line arguments.
289 # Parse command line arguments.
290 parser = ArgumentParser()
290 parser = ArgumentParser()
291 parser.add_argument('--ip', type=str, default='127.0.0.1',
291 parser.add_argument('--ip', type=str, default='127.0.0.1',
292 help='set the kernel\'s IP address [default: local]')
292 help='set the kernel\'s IP address [default: local]')
293 parser.add_argument('--xrep', type=int, metavar='PORT', default=-1,
293 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
294 help='set the XREP Channel port [default: random]')
294 help='set the XREP Channel port [default: random]')
295 parser.add_argument('--pub', type=int, metavar='PORT', default=-1,
295 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
296 help='set the PUB Channel port [default: random]')
296 help='set the PUB Channel port [default: random]')
297 namespace = parser.parse_args()
297 namespace = parser.parse_args()
298
298
299 # Create context, session, and kernel sockets.
299 # Create context, session, and kernel sockets.
300 print >>sys.__stdout__, "Starting the kernel..."
300 print >>sys.__stdout__, "Starting the kernel..."
301 context = zmq.Context()
301 context = zmq.Context()
302 session = Session(username=u'kernel')
302 session = Session(username=u'kernel')
303
303
304 reply_socket = context.socket(zmq.XREP)
304 reply_socket = context.socket(zmq.XREP)
305 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
305 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
306 print >>sys.__stdout__, "XREP Channel on port", xrep_port
306 print >>sys.__stdout__, "XREP Channel on port", xrep_port
307
307
308 pub_socket = context.socket(zmq.PUB)
308 pub_socket = context.socket(zmq.PUB)
309 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
309 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
310 print >>sys.__stdout__, "PUB Channel on port", pub_port
310 print >>sys.__stdout__, "PUB Channel on port", pub_port
311
311
312 # Redirect input streams and set a display hook.
312 # Redirect input streams and set a display hook.
313 sys.stdout = OutStream(session, pub_socket, u'stdout')
313 sys.stdout = OutStream(session, pub_socket, u'stdout')
314 sys.stderr = OutStream(session, pub_socket, u'stderr')
314 sys.stderr = OutStream(session, pub_socket, u'stderr')
315 sys.displayhook = DisplayHook(session, pub_socket)
315 sys.displayhook = DisplayHook(session, pub_socket)
316
316
317 kernel = Kernel(session, reply_socket, pub_socket)
317 kernel = Kernel(session, reply_socket, pub_socket)
318
318
319 # For debugging convenience, put sleep and a string in the namespace, so we
319 # For debugging convenience, put sleep and a string in the namespace, so we
320 # have them every time we start.
320 # have them every time we start.
321 kernel.user_ns['sleep'] = time.sleep
321 kernel.user_ns['sleep'] = time.sleep
322 kernel.user_ns['s'] = 'Test string'
322 kernel.user_ns['s'] = 'Test string'
323
323
324 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
324 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
325 kernel.start()
325 kernel.start()
326
326
327 def launch_kernel(xrep_port=-1, pub_port=-1):
327 def launch_kernel(xrep_port=0, pub_port=0):
328 """ Launches a localhost kernel, binding to the specified ports. For any
328 """ Launches a localhost kernel, binding to the specified ports. For any
329 port that is left unspecified, a port is chosen by the operating system.
329 port that is left unspecified, a port is chosen by the operating system.
330
330
331 Returns a tuple of form:
331 Returns a tuple of form:
332 (kernel_process [Popen], rep_port [int], sub_port [int])
332 (kernel_process [Popen], rep_port [int], sub_port [int])
333 """
333 """
334 import socket
334 import socket
335 from subprocess import Popen
335 from subprocess import Popen
336
336
337 # Find open ports as necessary.
337 # Find open ports as necessary.
338 ports = []
338 ports = []
339 ports_needed = int(xrep_port < 0) + int(pub_port < 0)
339 ports_needed = int(xrep_port == 0) + int(pub_port == 0)
340 for i in xrange(ports_needed):
340 for i in xrange(ports_needed):
341 sock = socket.socket()
341 sock = socket.socket()
342 sock.bind(('', 0))
342 sock.bind(('', 0))
343 ports.append(sock)
343 ports.append(sock)
344 for i, sock in enumerate(ports):
344 for i, sock in enumerate(ports):
345 port = sock.getsockname()[1]
345 port = sock.getsockname()[1]
346 sock.close()
346 sock.close()
347 ports[i] = port
347 ports[i] = port
348 if xrep_port < 0:
348 if xrep_port == 0:
349 xrep_port = ports.pop()
349 xrep_port = ports.pop()
350 if pub_port < 0:
350 if pub_port == 0:
351 pub_port = ports.pop()
351 pub_port = ports.pop()
352
352
353 # Spawn a kernel.
353 # Spawn a kernel.
354 command = 'from IPython.zmq.kernel import main; main()'
354 command = 'from IPython.zmq.kernel import main; main()'
355 proc = Popen([ sys.executable, '-c', command,
355 proc = Popen([ sys.executable, '-c', command,
356 '--xrep', str(xrep_port), '--pub', str(pub_port) ])
356 '--xrep', str(xrep_port), '--pub', str(pub_port) ])
357 return proc, xrep_port, pub_port
357 return proc, xrep_port, pub_port
358
358
359
359
360 if __name__ == '__main__':
360 if __name__ == '__main__':
361 main()
361 main()
@@ -1,493 +1,493 b''
1 """Kernel frontend classes.
1 """Kernel frontend classes.
2
2
3 TODO: Create logger to handle debugging and console messages.
3 TODO: Create logger to handle debugging and console messages.
4
4
5 """
5 """
6
6
7 # Standard library imports.
7 # Standard library imports.
8 from Queue import Queue, Empty
8 from Queue import Queue, Empty
9 from subprocess import Popen
9 from subprocess import Popen
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12 import traceback
12 import traceback
13
13
14 # System library imports.
14 # System library imports.
15 import zmq
15 import zmq
16 from zmq import POLLIN, POLLOUT, POLLERR
16 from zmq import POLLIN, POLLOUT, POLLERR
17 from zmq.eventloop import ioloop
17 from zmq.eventloop import ioloop
18
18
19 # Local imports.
19 # Local imports.
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 Type
21 Type
22 from kernel import launch_kernel
22 from kernel import launch_kernel
23 from session import Session
23 from session import Session
24
24
25 # Constants.
25 # Constants.
26 LOCALHOST = '127.0.0.1'
26 LOCALHOST = '127.0.0.1'
27
27
28
28
29 class MissingHandlerError(Exception):
29 class MissingHandlerError(Exception):
30 pass
30 pass
31
31
32
32
33 class ZmqSocketChannel(Thread):
33 class ZmqSocketChannel(Thread):
34 """ The base class for the channels that use ZMQ sockets.
34 """ The base class for the channels that use ZMQ sockets.
35 """
35 """
36
36
37 def __init__(self, context, session, address=None):
37 def __init__(self, context, session, address=None):
38 super(ZmqSocketChannel, self).__init__()
38 super(ZmqSocketChannel, self).__init__()
39 self.daemon = True
39 self.daemon = True
40
40
41 self.context = context
41 self.context = context
42 self.session = session
42 self.session = session
43 self.address = address
43 self.address = address
44 self.socket = None
44 self.socket = None
45
45
46 def stop(self):
46 def stop(self):
47 """ Stop the thread's activity. Returns when the thread terminates.
47 """ Stop the thread's activity. Returns when the thread terminates.
48 """
48 """
49 self.join()
49 self.join()
50
50
51 # Allow the thread to be started again.
51 # Allow the thread to be started again.
52 # FIXME: Although this works (and there's no reason why it shouldn't),
52 # FIXME: Although this works (and there's no reason why it shouldn't),
53 # it feels wrong. Is there a cleaner way to achieve this?
53 # it feels wrong. Is there a cleaner way to achieve this?
54 Thread.__init__(self)
54 Thread.__init__(self)
55
55
56 def get_address(self):
56 def get_address(self):
57 """ Get the channel's address. By the default, a channel is on
57 """ Get the channel's address. By the default, a channel is on
58 localhost with no port specified (a negative port number).
58 localhost with no port specified (a negative port number).
59 """
59 """
60 return self._address
60 return self._address
61
61
62 def set_adresss(self, address):
62 def set_adresss(self, address):
63 """ Set the channel's address. Should be a tuple of form:
63 """ Set the channel's address. Should be a tuple of form:
64 (ip address [str], port [int]).
64 (ip address [str], port [int]).
65 or None, in which case the address is reset to its default value.
65 or None, in which case the address is reset to its default value.
66 """
66 """
67 # FIXME: Validate address.
67 # FIXME: Validate address.
68 if self.is_alive():
68 if self.is_alive():
69 raise RuntimeError("Cannot set address on a running channel!")
69 raise RuntimeError("Cannot set address on a running channel!")
70 else:
70 else:
71 if address is None:
71 if address is None:
72 address = (LOCALHOST, -1)
72 address = (LOCALHOST, 0)
73 self._address = address
73 self._address = address
74
74
75 address = property(get_address, set_adresss)
75 address = property(get_address, set_adresss)
76
76
77
77
78 class SubSocketChannel(ZmqSocketChannel):
78 class SubSocketChannel(ZmqSocketChannel):
79
79
80 handlers = None
80 handlers = None
81 _overriden_call_handler = None
81 _overriden_call_handler = None
82
82
83 def __init__(self, context, session, address=None):
83 def __init__(self, context, session, address=None):
84 self.handlers = {}
84 self.handlers = {}
85 super(SubSocketChannel, self).__init__(context, session, address)
85 super(SubSocketChannel, self).__init__(context, session, address)
86
86
87 def run(self):
87 def run(self):
88 self.socket = self.context.socket(zmq.SUB)
88 self.socket = self.context.socket(zmq.SUB)
89 self.socket.setsockopt(zmq.SUBSCRIBE,'')
89 self.socket.setsockopt(zmq.SUBSCRIBE,'')
90 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
90 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
91 self.socket.connect('tcp://%s:%i' % self.address)
91 self.socket.connect('tcp://%s:%i' % self.address)
92 self.ioloop = ioloop.IOLoop()
92 self.ioloop = ioloop.IOLoop()
93 self.ioloop.add_handler(self.socket, self._handle_events,
93 self.ioloop.add_handler(self.socket, self._handle_events,
94 POLLIN|POLLERR)
94 POLLIN|POLLERR)
95 self.ioloop.start()
95 self.ioloop.start()
96
96
97 def stop(self):
97 def stop(self):
98 self.ioloop.stop()
98 self.ioloop.stop()
99 super(SubSocketChannel, self).stop()
99 super(SubSocketChannel, self).stop()
100
100
101 def _handle_events(self, socket, events):
101 def _handle_events(self, socket, events):
102 # Turn on and off POLLOUT depending on if we have made a request
102 # Turn on and off POLLOUT depending on if we have made a request
103 if events & POLLERR:
103 if events & POLLERR:
104 self._handle_err()
104 self._handle_err()
105 if events & POLLIN:
105 if events & POLLIN:
106 self._handle_recv()
106 self._handle_recv()
107
107
108 def _handle_err(self):
108 def _handle_err(self):
109 raise zmq.ZmqError()
109 raise zmq.ZmqError()
110
110
111 def _handle_recv(self):
111 def _handle_recv(self):
112 msg = self.socket.recv_json()
112 msg = self.socket.recv_json()
113 self.call_handlers(msg)
113 self.call_handlers(msg)
114
114
115 def override_call_handler(self, func):
115 def override_call_handler(self, func):
116 """Permanently override the call_handler.
116 """Permanently override the call_handler.
117
117
118 The function func will be called as::
118 The function func will be called as::
119
119
120 func(handler, msg)
120 func(handler, msg)
121
121
122 And must call::
122 And must call::
123
123
124 handler(msg)
124 handler(msg)
125
125
126 in the main thread.
126 in the main thread.
127 """
127 """
128 assert callable(func), "not a callable: %r" % func
128 assert callable(func), "not a callable: %r" % func
129 self._overriden_call_handler = func
129 self._overriden_call_handler = func
130
130
131 def call_handlers(self, msg):
131 def call_handlers(self, msg):
132 handler = self.handlers.get(msg['msg_type'], None)
132 handler = self.handlers.get(msg['msg_type'], None)
133 if handler is not None:
133 if handler is not None:
134 try:
134 try:
135 self.call_handler(handler, msg)
135 self.call_handler(handler, msg)
136 except:
136 except:
137 # XXX: This should be logged at least
137 # XXX: This should be logged at least
138 traceback.print_last()
138 traceback.print_last()
139
139
140 def call_handler(self, handler, msg):
140 def call_handler(self, handler, msg):
141 if self._overriden_call_handler is not None:
141 if self._overriden_call_handler is not None:
142 self._overriden_call_handler(handler, msg)
142 self._overriden_call_handler(handler, msg)
143 elif hasattr(self, '_call_handler'):
143 elif hasattr(self, '_call_handler'):
144 call_handler = getattr(self, '_call_handler')
144 call_handler = getattr(self, '_call_handler')
145 call_handler(handler, msg)
145 call_handler(handler, msg)
146 else:
146 else:
147 raise RuntimeError('no handler!')
147 raise RuntimeError('no handler!')
148
148
149 def add_handler(self, callback, msg_type):
149 def add_handler(self, callback, msg_type):
150 """Register a callback for msg type."""
150 """Register a callback for msg type."""
151 self.handlers[msg_type] = callback
151 self.handlers[msg_type] = callback
152
152
153 def remove_handler(self, msg_type):
153 def remove_handler(self, msg_type):
154 """Remove the callback for msg type."""
154 """Remove the callback for msg type."""
155 self.handlers.pop(msg_type, None)
155 self.handlers.pop(msg_type, None)
156
156
157 def flush(self, timeout=1.0):
157 def flush(self, timeout=1.0):
158 """Immediately processes all pending messages on the SUB channel.
158 """Immediately processes all pending messages on the SUB channel.
159
159
160 This method is thread safe.
160 This method is thread safe.
161
161
162 Parameters
162 Parameters
163 ----------
163 ----------
164 timeout : float, optional
164 timeout : float, optional
165 The maximum amount of time to spend flushing, in seconds. The
165 The maximum amount of time to spend flushing, in seconds. The
166 default is one second.
166 default is one second.
167 """
167 """
168 # We do the IOLoop callback process twice to ensure that the IOLoop
168 # We do the IOLoop callback process twice to ensure that the IOLoop
169 # gets to perform at least one full poll.
169 # gets to perform at least one full poll.
170 stop_time = time.time() + timeout
170 stop_time = time.time() + timeout
171 for i in xrange(2):
171 for i in xrange(2):
172 self._flushed = False
172 self._flushed = False
173 self.ioloop.add_callback(self._flush)
173 self.ioloop.add_callback(self._flush)
174 while not self._flushed and time.time() < stop_time:
174 while not self._flushed and time.time() < stop_time:
175 time.sleep(0.01)
175 time.sleep(0.01)
176
176
177 def _flush(self):
177 def _flush(self):
178 """Called in this thread by the IOLoop to indicate that all events have
178 """Called in this thread by the IOLoop to indicate that all events have
179 been processed.
179 been processed.
180 """
180 """
181 self._flushed = True
181 self._flushed = True
182
182
183
183
184 class XReqSocketChannel(ZmqSocketChannel):
184 class XReqSocketChannel(ZmqSocketChannel):
185
185
186 handler_queue = None
186 handler_queue = None
187 command_queue = None
187 command_queue = None
188 handlers = None
188 handlers = None
189 _overriden_call_handler = None
189 _overriden_call_handler = None
190
190
191 def __init__(self, context, session, address=None):
191 def __init__(self, context, session, address=None):
192 self.handlers = {}
192 self.handlers = {}
193 self.handler_queue = Queue()
193 self.handler_queue = Queue()
194 self.command_queue = Queue()
194 self.command_queue = Queue()
195 super(XReqSocketChannel, self).__init__(context, session, address)
195 super(XReqSocketChannel, self).__init__(context, session, address)
196
196
197 def run(self):
197 def run(self):
198 self.socket = self.context.socket(zmq.XREQ)
198 self.socket = self.context.socket(zmq.XREQ)
199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
200 self.socket.connect('tcp://%s:%i' % self.address)
200 self.socket.connect('tcp://%s:%i' % self.address)
201 self.ioloop = ioloop.IOLoop()
201 self.ioloop = ioloop.IOLoop()
202 self.ioloop.add_handler(self.socket, self._handle_events,
202 self.ioloop.add_handler(self.socket, self._handle_events,
203 POLLIN|POLLOUT|POLLERR)
203 POLLIN|POLLOUT|POLLERR)
204 self.ioloop.start()
204 self.ioloop.start()
205
205
206 def stop(self):
206 def stop(self):
207 self.ioloop.stop()
207 self.ioloop.stop()
208 super(XReqSocketChannel, self).stop()
208 super(XReqSocketChannel, self).stop()
209
209
210 def _handle_events(self, socket, events):
210 def _handle_events(self, socket, events):
211 # Turn on and off POLLOUT depending on if we have made a request
211 # Turn on and off POLLOUT depending on if we have made a request
212 if events & POLLERR:
212 if events & POLLERR:
213 self._handle_err()
213 self._handle_err()
214 if events & POLLOUT:
214 if events & POLLOUT:
215 self._handle_send()
215 self._handle_send()
216 if events & POLLIN:
216 if events & POLLIN:
217 self._handle_recv()
217 self._handle_recv()
218
218
219 def _handle_recv(self):
219 def _handle_recv(self):
220 msg = self.socket.recv_json()
220 msg = self.socket.recv_json()
221 self.call_handlers(msg)
221 self.call_handlers(msg)
222
222
223 def _handle_send(self):
223 def _handle_send(self):
224 try:
224 try:
225 msg = self.command_queue.get(False)
225 msg = self.command_queue.get(False)
226 except Empty:
226 except Empty:
227 pass
227 pass
228 else:
228 else:
229 self.socket.send_json(msg)
229 self.socket.send_json(msg)
230
230
231 def _handle_err(self):
231 def _handle_err(self):
232 raise zmq.ZmqError()
232 raise zmq.ZmqError()
233
233
234 def _queue_request(self, msg, callback):
234 def _queue_request(self, msg, callback):
235 handler = self._find_handler(msg['msg_type'], callback)
235 handler = self._find_handler(msg['msg_type'], callback)
236 self.handler_queue.put(handler)
236 self.handler_queue.put(handler)
237 self.command_queue.put(msg)
237 self.command_queue.put(msg)
238
238
239 def execute(self, code, callback=None):
239 def execute(self, code, callback=None):
240 # Create class for content/msg creation. Related to, but possibly
240 # Create class for content/msg creation. Related to, but possibly
241 # not in Session.
241 # not in Session.
242 content = dict(code=code)
242 content = dict(code=code)
243 msg = self.session.msg('execute_request', content)
243 msg = self.session.msg('execute_request', content)
244 self._queue_request(msg, callback)
244 self._queue_request(msg, callback)
245 return msg['header']['msg_id']
245 return msg['header']['msg_id']
246
246
247 def complete(self, text, line, block=None, callback=None):
247 def complete(self, text, line, block=None, callback=None):
248 content = dict(text=text, line=line)
248 content = dict(text=text, line=line)
249 msg = self.session.msg('complete_request', content)
249 msg = self.session.msg('complete_request', content)
250 self._queue_request(msg, callback)
250 self._queue_request(msg, callback)
251 return msg['header']['msg_id']
251 return msg['header']['msg_id']
252
252
253 def object_info(self, oname, callback=None):
253 def object_info(self, oname, callback=None):
254 content = dict(oname=oname)
254 content = dict(oname=oname)
255 msg = self.session.msg('object_info_request', content)
255 msg = self.session.msg('object_info_request', content)
256 self._queue_request(msg, callback)
256 self._queue_request(msg, callback)
257 return msg['header']['msg_id']
257 return msg['header']['msg_id']
258
258
259 def _find_handler(self, name, callback):
259 def _find_handler(self, name, callback):
260 if callback is not None:
260 if callback is not None:
261 return callback
261 return callback
262 handler = self.handlers.get(name)
262 handler = self.handlers.get(name)
263 if handler is None:
263 if handler is None:
264 raise MissingHandlerError(
264 raise MissingHandlerError(
265 'No handler defined for method: %s' % name)
265 'No handler defined for method: %s' % name)
266 return handler
266 return handler
267
267
268 def override_call_handler(self, func):
268 def override_call_handler(self, func):
269 """Permanently override the call_handler.
269 """Permanently override the call_handler.
270
270
271 The function func will be called as::
271 The function func will be called as::
272
272
273 func(handler, msg)
273 func(handler, msg)
274
274
275 And must call::
275 And must call::
276
276
277 handler(msg)
277 handler(msg)
278
278
279 in the main thread.
279 in the main thread.
280 """
280 """
281 assert callable(func), "not a callable: %r" % func
281 assert callable(func), "not a callable: %r" % func
282 self._overriden_call_handler = func
282 self._overriden_call_handler = func
283
283
284 def call_handlers(self, msg):
284 def call_handlers(self, msg):
285 try:
285 try:
286 handler = self.handler_queue.get(False)
286 handler = self.handler_queue.get(False)
287 except Empty:
287 except Empty:
288 print "Message received with no handler!!!"
288 print "Message received with no handler!!!"
289 print msg
289 print msg
290 else:
290 else:
291 self.call_handler(handler, msg)
291 self.call_handler(handler, msg)
292
292
293 def call_handler(self, handler, msg):
293 def call_handler(self, handler, msg):
294 if self._overriden_call_handler is not None:
294 if self._overriden_call_handler is not None:
295 self._overriden_call_handler(handler, msg)
295 self._overriden_call_handler(handler, msg)
296 elif hasattr(self, '_call_handler'):
296 elif hasattr(self, '_call_handler'):
297 call_handler = getattr(self, '_call_handler')
297 call_handler = getattr(self, '_call_handler')
298 call_handler(handler, msg)
298 call_handler(handler, msg)
299 else:
299 else:
300 raise RuntimeError('no handler!')
300 raise RuntimeError('no handler!')
301
301
302
302
303 class RepSocketChannel(ZmqSocketChannel):
303 class RepSocketChannel(ZmqSocketChannel):
304
304
305 def on_raw_input(self):
305 def on_raw_input(self):
306 pass
306 pass
307
307
308
308
309 class KernelManager(HasTraits):
309 class KernelManager(HasTraits):
310 """ Manages a kernel for a frontend.
310 """ Manages a kernel for a frontend.
311
311
312 The SUB channel is for the frontend to receive messages published by the
312 The SUB channel is for the frontend to receive messages published by the
313 kernel.
313 kernel.
314
314
315 The REQ channel is for the frontend to make requests of the kernel.
315 The REQ channel is for the frontend to make requests of the kernel.
316
316
317 The REP channel is for the kernel to request stdin (raw_input) from the
317 The REP channel is for the kernel to request stdin (raw_input) from the
318 frontend.
318 frontend.
319 """
319 """
320
320
321 # Whether the kernel manager is currently listening on its channels.
321 # Whether the kernel manager is currently listening on its channels.
322 is_listening = Bool(False)
322 is_listening = Bool(False)
323
323
324 # The PyZMQ Context to use for communication with the kernel.
324 # The PyZMQ Context to use for communication with the kernel.
325 context = Instance(zmq.Context, ())
325 context = Instance(zmq.Context, ())
326
326
327 # The Session to use for communication with the kernel.
327 # The Session to use for communication with the kernel.
328 session = Instance(Session, ())
328 session = Instance(Session, ())
329
329
330 # The classes to use for the various channels.
330 # The classes to use for the various channels.
331 sub_channel_class = Type(SubSocketChannel)
331 sub_channel_class = Type(SubSocketChannel)
332 xreq_channel_class = Type(XReqSocketChannel)
332 xreq_channel_class = Type(XReqSocketChannel)
333 rep_channel_class = Type(RepSocketChannel)
333 rep_channel_class = Type(RepSocketChannel)
334
334
335 # Protected traits.
335 # Protected traits.
336 _kernel = Instance(Popen)
336 _kernel = Instance(Popen)
337 _sub_channel = Any
337 _sub_channel = Any
338 _xreq_channel = Any
338 _xreq_channel = Any
339 _rep_channel = Any
339 _rep_channel = Any
340
340
341 #--------------------------------------------------------------------------
341 #--------------------------------------------------------------------------
342 # Channel management methods:
342 # Channel management methods:
343 #--------------------------------------------------------------------------
343 #--------------------------------------------------------------------------
344
344
345 def start_listening(self):
345 def start_listening(self):
346 """Starts listening on the specified ports. If already listening, raises
346 """Starts listening on the specified ports. If already listening, raises
347 a RuntimeError.
347 a RuntimeError.
348 """
348 """
349 if self.is_listening:
349 if self.is_listening:
350 raise RuntimeError("Cannot start listening. Already listening!")
350 raise RuntimeError("Cannot start listening. Already listening!")
351 else:
351 else:
352 self.is_listening = True
352 self.is_listening = True
353 self.sub_channel.start()
353 self.sub_channel.start()
354 self.xreq_channel.start()
354 self.xreq_channel.start()
355 self.rep_channel.start()
355 self.rep_channel.start()
356
356
357 @property
357 @property
358 def is_alive(self):
358 def is_alive(self):
359 """ Returns whether the kernel is alive. """
359 """ Returns whether the kernel is alive. """
360 if self.is_listening:
360 if self.is_listening:
361 # TODO: check if alive.
361 # TODO: check if alive.
362 return True
362 return True
363 else:
363 else:
364 return False
364 return False
365
365
366 def stop_listening(self):
366 def stop_listening(self):
367 """Stops listening. If not listening, does nothing. """
367 """Stops listening. If not listening, does nothing. """
368 if self.is_listening:
368 if self.is_listening:
369 self.is_listening = False
369 self.is_listening = False
370 self.sub_channel.stop()
370 self.sub_channel.stop()
371 self.xreq_channel.stop()
371 self.xreq_channel.stop()
372 self.rep_channel.stop()
372 self.rep_channel.stop()
373
373
374 #--------------------------------------------------------------------------
374 #--------------------------------------------------------------------------
375 # Kernel process management methods:
375 # Kernel process management methods:
376 #--------------------------------------------------------------------------
376 #--------------------------------------------------------------------------
377
377
378 def start_kernel(self):
378 def start_kernel(self):
379 """Starts a kernel process and configures the manager to use it.
379 """Starts a kernel process and configures the manager to use it.
380
380
381 If ports have been specified via the address attributes, they are used.
381 If ports have been specified via the address attributes, they are used.
382 Otherwise, open ports are chosen by the OS and the channel port
382 Otherwise, open ports are chosen by the OS and the channel port
383 attributes are configured as appropriate.
383 attributes are configured as appropriate.
384 """
384 """
385 xreq, sub = self.xreq_address, self.sub_address
385 xreq, sub = self.xreq_address, self.sub_address
386 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
386 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
387 raise RuntimeError("Can only launch a kernel on localhost."
387 raise RuntimeError("Can only launch a kernel on localhost."
388 "Make sure that the '*_address' attributes are "
388 "Make sure that the '*_address' attributes are "
389 "configured properly.")
389 "configured properly.")
390
390
391 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
391 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
392 self.set_kernel(kernel)
392 self.set_kernel(kernel)
393 self.xreq_address = (LOCALHOST, xrep)
393 self.xreq_address = (LOCALHOST, xrep)
394 self.sub_address = (LOCALHOST, pub)
394 self.sub_address = (LOCALHOST, pub)
395
395
396 def set_kernel(self, kernel):
396 def set_kernel(self, kernel):
397 """Sets the kernel manager's kernel to an existing kernel process.
397 """Sets the kernel manager's kernel to an existing kernel process.
398
398
399 It is *not* necessary to a set a kernel to communicate with it via the
399 It is *not* necessary to a set a kernel to communicate with it via the
400 channels, and those objects must be configured separately. It
400 channels, and those objects must be configured separately. It
401 *is* necessary to set a kernel if you want to use the manager (or
401 *is* necessary to set a kernel if you want to use the manager (or
402 frontends that use the manager) to signal and/or kill the kernel.
402 frontends that use the manager) to signal and/or kill the kernel.
403
403
404 Parameters:
404 Parameters:
405 -----------
405 -----------
406 kernel : Popen
406 kernel : Popen
407 An existing kernel process.
407 An existing kernel process.
408 """
408 """
409 self._kernel = kernel
409 self._kernel = kernel
410
410
411 @property
411 @property
412 def has_kernel(self):
412 def has_kernel(self):
413 """Returns whether a kernel process has been specified for the kernel
413 """Returns whether a kernel process has been specified for the kernel
414 manager.
414 manager.
415
415
416 A kernel process can be set via 'start_kernel' or 'set_kernel'.
416 A kernel process can be set via 'start_kernel' or 'set_kernel'.
417 """
417 """
418 return self._kernel is not None
418 return self._kernel is not None
419
419
420 def kill_kernel(self):
420 def kill_kernel(self):
421 """ Kill the running kernel. """
421 """ Kill the running kernel. """
422 if self._kernel:
422 if self._kernel:
423 self._kernel.kill()
423 self._kernel.kill()
424 self._kernel = None
424 self._kernel = None
425 else:
425 else:
426 raise RuntimeError("Cannot kill kernel. No kernel is running!")
426 raise RuntimeError("Cannot kill kernel. No kernel is running!")
427
427
428 def signal_kernel(self, signum):
428 def signal_kernel(self, signum):
429 """ Sends a signal to the kernel. """
429 """ Sends a signal to the kernel. """
430 if self._kernel:
430 if self._kernel:
431 self._kernel.send_signal(signum)
431 self._kernel.send_signal(signum)
432 else:
432 else:
433 raise RuntimeError("Cannot signal kernel. No kernel is running!")
433 raise RuntimeError("Cannot signal kernel. No kernel is running!")
434
434
435 #--------------------------------------------------------------------------
435 #--------------------------------------------------------------------------
436 # Channels used for communication with the kernel:
436 # Channels used for communication with the kernel:
437 #--------------------------------------------------------------------------
437 #--------------------------------------------------------------------------
438
438
439 @property
439 @property
440 def sub_channel(self):
440 def sub_channel(self):
441 """Get the SUB socket channel object."""
441 """Get the SUB socket channel object."""
442 if self._sub_channel is None:
442 if self._sub_channel is None:
443 self._sub_channel = self.sub_channel_class(self.context,
443 self._sub_channel = self.sub_channel_class(self.context,
444 self.session)
444 self.session)
445 return self._sub_channel
445 return self._sub_channel
446
446
447 @property
447 @property
448 def xreq_channel(self):
448 def xreq_channel(self):
449 """Get the REQ socket channel object to make requests of the kernel."""
449 """Get the REQ socket channel object to make requests of the kernel."""
450 if self._xreq_channel is None:
450 if self._xreq_channel is None:
451 self._xreq_channel = self.xreq_channel_class(self.context,
451 self._xreq_channel = self.xreq_channel_class(self.context,
452 self.session)
452 self.session)
453 return self._xreq_channel
453 return self._xreq_channel
454
454
455 @property
455 @property
456 def rep_channel(self):
456 def rep_channel(self):
457 """Get the REP socket channel object to handle stdin (raw_input)."""
457 """Get the REP socket channel object to handle stdin (raw_input)."""
458 if self._rep_channel is None:
458 if self._rep_channel is None:
459 self._rep_channel = self.rep_channel_class(self.context,
459 self._rep_channel = self.rep_channel_class(self.context,
460 self.session)
460 self.session)
461 return self._rep_channel
461 return self._rep_channel
462
462
463 #--------------------------------------------------------------------------
463 #--------------------------------------------------------------------------
464 # Delegates for the Channel address attributes:
464 # Delegates for the Channel address attributes:
465 #--------------------------------------------------------------------------
465 #--------------------------------------------------------------------------
466
466
467 def get_sub_address(self):
467 def get_sub_address(self):
468 return self.sub_channel.address
468 return self.sub_channel.address
469
469
470 def set_sub_address(self, address):
470 def set_sub_address(self, address):
471 self.sub_channel.address = address
471 self.sub_channel.address = address
472
472
473 sub_address = property(get_sub_address, set_sub_address,
473 sub_address = property(get_sub_address, set_sub_address,
474 doc="The address used by SUB socket channel.")
474 doc="The address used by SUB socket channel.")
475
475
476 def get_xreq_address(self):
476 def get_xreq_address(self):
477 return self.xreq_channel.address
477 return self.xreq_channel.address
478
478
479 def set_xreq_address(self, address):
479 def set_xreq_address(self, address):
480 self.xreq_channel.address = address
480 self.xreq_channel.address = address
481
481
482 xreq_address = property(get_xreq_address, set_xreq_address,
482 xreq_address = property(get_xreq_address, set_xreq_address,
483 doc="The address used by XREQ socket channel.")
483 doc="The address used by XREQ socket channel.")
484
484
485 def get_rep_address(self):
485 def get_rep_address(self):
486 return self.rep_channel.address
486 return self.rep_channel.address
487
487
488 def set_rep_address(self, address):
488 def set_rep_address(self, address):
489 self.rep_channel.address = address
489 self.rep_channel.address = address
490
490
491 rep_address = property(get_rep_address, set_rep_address,
491 rep_address = property(get_rep_address, set_rep_address,
492 doc="The address used by REP socket channel.")
492 doc="The address used by REP socket channel.")
493
493
General Comments 0
You need to be logged in to leave comments. Login now