##// END OF EJS Templates
Draft of context closing....
Brian Granger -
Show More
@@ -1,250 +1,253 b''
1 """ Defines helper functions for creating kernel entry points and process
1 """ Defines helper functions for creating kernel entry points and process
2 launchers.
2 launchers.
3 """
3 """
4
4
5 # Standard library imports.
5 # Standard library imports.
6 import atexit
6 import os
7 import os
7 import socket
8 import socket
8 from subprocess import Popen, PIPE
9 from subprocess import Popen, PIPE
9 import sys
10 import sys
10
11
11 # System library imports.
12 # System library imports.
12 import zmq
13 import zmq
13
14
14 # Local imports.
15 # Local imports.
15 from IPython.core.ultratb import FormattedTB
16 from IPython.core.ultratb import FormattedTB
16 from IPython.external.argparse import ArgumentParser
17 from IPython.external.argparse import ArgumentParser
17 from IPython.utils import io
18 from IPython.utils import io
18 from displayhook import DisplayHook
19 from displayhook import DisplayHook
19 from heartbeat import Heartbeat
20 from heartbeat import Heartbeat
20 from iostream import OutStream
21 from iostream import OutStream
21 from parentpoller import ParentPollerUnix, ParentPollerWindows
22 from parentpoller import ParentPollerUnix, ParentPollerWindows
22 from session import Session
23 from session import Session
23
24
24 def bind_port(socket, ip, port):
25 def bind_port(socket, ip, port):
25 """ Binds the specified ZMQ socket. If the port is zero, a random port is
26 """ Binds the specified ZMQ socket. If the port is zero, a random port is
26 chosen. Returns the port that was bound.
27 chosen. Returns the port that was bound.
27 """
28 """
28 connection = 'tcp://%s' % ip
29 connection = 'tcp://%s' % ip
29 if port <= 0:
30 if port <= 0:
30 port = socket.bind_to_random_port(connection)
31 port = socket.bind_to_random_port(connection)
31 else:
32 else:
32 connection += ':%i' % port
33 connection += ':%i' % port
33 socket.bind(connection)
34 socket.bind(connection)
34 return port
35 return port
35
36
36
37
37 def make_argument_parser():
38 def make_argument_parser():
38 """ Creates an ArgumentParser for the generic arguments supported by all
39 """ Creates an ArgumentParser for the generic arguments supported by all
39 kernel entry points.
40 kernel entry points.
40 """
41 """
41 parser = ArgumentParser()
42 parser = ArgumentParser()
42 parser.add_argument('--ip', type=str, default='127.0.0.1',
43 parser.add_argument('--ip', type=str, default='127.0.0.1',
43 help='set the kernel\'s IP address [default: local]')
44 help='set the kernel\'s IP address [default: local]')
44 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
45 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
45 help='set the XREP channel port [default: random]')
46 help='set the XREP channel port [default: random]')
46 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
47 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
47 help='set the PUB channel port [default: random]')
48 help='set the PUB channel port [default: random]')
48 parser.add_argument('--req', type=int, metavar='PORT', default=0,
49 parser.add_argument('--req', type=int, metavar='PORT', default=0,
49 help='set the REQ channel port [default: random]')
50 help='set the REQ channel port [default: random]')
50 parser.add_argument('--hb', type=int, metavar='PORT', default=0,
51 parser.add_argument('--hb', type=int, metavar='PORT', default=0,
51 help='set the heartbeat port [default: random]')
52 help='set the heartbeat port [default: random]')
52
53
53 if sys.platform == 'win32':
54 if sys.platform == 'win32':
54 parser.add_argument('--interrupt', type=int, metavar='HANDLE',
55 parser.add_argument('--interrupt', type=int, metavar='HANDLE',
55 default=0, help='interrupt this process when '
56 default=0, help='interrupt this process when '
56 'HANDLE is signaled')
57 'HANDLE is signaled')
57 parser.add_argument('--parent', type=int, metavar='HANDLE',
58 parser.add_argument('--parent', type=int, metavar='HANDLE',
58 default=0, help='kill this process if the process '
59 default=0, help='kill this process if the process '
59 'with HANDLE dies')
60 'with HANDLE dies')
60 else:
61 else:
61 parser.add_argument('--parent', action='store_true',
62 parser.add_argument('--parent', action='store_true',
62 help='kill this process if its parent dies')
63 help='kill this process if its parent dies')
63
64
64 return parser
65 return parser
65
66
66
67
67 def make_kernel(namespace, kernel_factory,
68 def make_kernel(namespace, kernel_factory,
68 out_stream_factory=None, display_hook_factory=None):
69 out_stream_factory=None, display_hook_factory=None):
69 """ Creates a kernel, redirects stdout/stderr, and installs a display hook
70 """ Creates a kernel, redirects stdout/stderr, and installs a display hook
70 and exception handler.
71 and exception handler.
71 """
72 """
72 # If running under pythonw.exe, the interpreter will crash if more than 4KB
73 # If running under pythonw.exe, the interpreter will crash if more than 4KB
73 # of data is written to stdout or stderr. This is a bug that has been with
74 # of data is written to stdout or stderr. This is a bug that has been with
74 # Python for a very long time; see http://bugs.python.org/issue706263.
75 # Python for a very long time; see http://bugs.python.org/issue706263.
75 if sys.executable.endswith('pythonw.exe'):
76 if sys.executable.endswith('pythonw.exe'):
76 blackhole = file(os.devnull, 'w')
77 blackhole = file(os.devnull, 'w')
77 sys.stdout = sys.stderr = blackhole
78 sys.stdout = sys.stderr = blackhole
78 sys.__stdout__ = sys.__stderr__ = blackhole
79 sys.__stdout__ = sys.__stderr__ = blackhole
79
80
80 # Install minimal exception handling
81 # Install minimal exception handling
81 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
82 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
82 ostream=sys.__stdout__)
83 ostream=sys.__stdout__)
83
84
84 # Create a context, a session, and the kernel sockets.
85 # Create a context, a session, and the kernel sockets.
85 io.raw_print("Starting the kernel at pid:", os.getpid())
86 io.raw_print("Starting the kernel at pid:", os.getpid())
86 context = zmq.Context()
87 context = zmq.Context()
88 # Uncomment this to try closing the context.
89 # atexit.register(context.close)
87 session = Session(username=u'kernel')
90 session = Session(username=u'kernel')
88
91
89 reply_socket = context.socket(zmq.XREP)
92 reply_socket = context.socket(zmq.XREP)
90 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
93 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
91 io.raw_print("XREP Channel on port", xrep_port)
94 io.raw_print("XREP Channel on port", xrep_port)
92
95
93 pub_socket = context.socket(zmq.PUB)
96 pub_socket = context.socket(zmq.PUB)
94 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
97 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
95 io.raw_print("PUB Channel on port", pub_port)
98 io.raw_print("PUB Channel on port", pub_port)
96
99
97 req_socket = context.socket(zmq.XREQ)
100 req_socket = context.socket(zmq.XREQ)
98 req_port = bind_port(req_socket, namespace.ip, namespace.req)
101 req_port = bind_port(req_socket, namespace.ip, namespace.req)
99 io.raw_print("REQ Channel on port", req_port)
102 io.raw_print("REQ Channel on port", req_port)
100
103
101 hb = Heartbeat(context, (namespace.ip, namespace.hb))
104 hb = Heartbeat(context, (namespace.ip, namespace.hb))
102 hb.start()
105 hb.start()
103 hb_port = hb.port
106 hb_port = hb.port
104 io.raw_print("Heartbeat REP Channel on port", hb_port)
107 io.raw_print("Heartbeat REP Channel on port", hb_port)
105
108
106 # Redirect input streams and set a display hook.
109 # Redirect input streams and set a display hook.
107 if out_stream_factory:
110 if out_stream_factory:
108 sys.stdout = out_stream_factory(session, pub_socket, u'stdout')
111 sys.stdout = out_stream_factory(session, pub_socket, u'stdout')
109 sys.stderr = out_stream_factory(session, pub_socket, u'stderr')
112 sys.stderr = out_stream_factory(session, pub_socket, u'stderr')
110 if display_hook_factory:
113 if display_hook_factory:
111 sys.displayhook = display_hook_factory(session, pub_socket)
114 sys.displayhook = display_hook_factory(session, pub_socket)
112
115
113 # Create the kernel.
116 # Create the kernel.
114 kernel = kernel_factory(session=session, reply_socket=reply_socket,
117 kernel = kernel_factory(session=session, reply_socket=reply_socket,
115 pub_socket=pub_socket, req_socket=req_socket)
118 pub_socket=pub_socket, req_socket=req_socket)
116 kernel.record_ports(xrep_port=xrep_port, pub_port=pub_port,
119 kernel.record_ports(xrep_port=xrep_port, pub_port=pub_port,
117 req_port=req_port, hb_port=hb_port)
120 req_port=req_port, hb_port=hb_port)
118 return kernel
121 return kernel
119
122
120
123
121 def start_kernel(namespace, kernel):
124 def start_kernel(namespace, kernel):
122 """ Starts a kernel.
125 """ Starts a kernel.
123 """
126 """
124 # Configure this kernel process to poll the parent process, if necessary.
127 # Configure this kernel process to poll the parent process, if necessary.
125 if sys.platform == 'win32':
128 if sys.platform == 'win32':
126 if namespace.interrupt or namespace.parent:
129 if namespace.interrupt or namespace.parent:
127 poller = ParentPollerWindows(namespace.interrupt, namespace.parent)
130 poller = ParentPollerWindows(namespace.interrupt, namespace.parent)
128 poller.start()
131 poller.start()
129 elif namespace.parent:
132 elif namespace.parent:
130 poller = ParentPollerUnix()
133 poller = ParentPollerUnix()
131 poller.start()
134 poller.start()
132
135
133 # Start the kernel mainloop.
136 # Start the kernel mainloop.
134 kernel.start()
137 kernel.start()
135
138
136
139
137 def make_default_main(kernel_factory):
140 def make_default_main(kernel_factory):
138 """ Creates the simplest possible kernel entry point.
141 """ Creates the simplest possible kernel entry point.
139 """
142 """
140 def main():
143 def main():
141 namespace = make_argument_parser().parse_args()
144 namespace = make_argument_parser().parse_args()
142 kernel = make_kernel(namespace, kernel_factory, OutStream, DisplayHook)
145 kernel = make_kernel(namespace, kernel_factory, OutStream, DisplayHook)
143 start_kernel(namespace, kernel)
146 start_kernel(namespace, kernel)
144 return main
147 return main
145
148
146
149
147 def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
150 def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
148 independent=False, extra_arguments=[]):
151 independent=False, extra_arguments=[]):
149 """ Launches a localhost kernel, binding to the specified ports.
152 """ Launches a localhost kernel, binding to the specified ports.
150
153
151 Parameters
154 Parameters
152 ----------
155 ----------
153 code : str,
156 code : str,
154 A string of Python code that imports and executes a kernel entry point.
157 A string of Python code that imports and executes a kernel entry point.
155
158
156 xrep_port : int, optional
159 xrep_port : int, optional
157 The port to use for XREP channel.
160 The port to use for XREP channel.
158
161
159 pub_port : int, optional
162 pub_port : int, optional
160 The port to use for the SUB channel.
163 The port to use for the SUB channel.
161
164
162 req_port : int, optional
165 req_port : int, optional
163 The port to use for the REQ (raw input) channel.
166 The port to use for the REQ (raw input) channel.
164
167
165 hb_port : int, optional
168 hb_port : int, optional
166 The port to use for the hearbeat REP channel.
169 The port to use for the hearbeat REP channel.
167
170
168 independent : bool, optional (default False)
171 independent : bool, optional (default False)
169 If set, the kernel process is guaranteed to survive if this process
172 If set, the kernel process is guaranteed to survive if this process
170 dies. If not set, an effort is made to ensure that the kernel is killed
173 dies. If not set, an effort is made to ensure that the kernel is killed
171 when this process dies. Note that in this case it is still good practice
174 when this process dies. Note that in this case it is still good practice
172 to kill kernels manually before exiting.
175 to kill kernels manually before exiting.
173
176
174 extra_arguments = list, optional
177 extra_arguments = list, optional
175 A list of extra arguments to pass when executing the launch code.
178 A list of extra arguments to pass when executing the launch code.
176
179
177 Returns
180 Returns
178 -------
181 -------
179 A tuple of form:
182 A tuple of form:
180 (kernel_process, xrep_port, pub_port, req_port)
183 (kernel_process, xrep_port, pub_port, req_port)
181 where kernel_process is a Popen object and the ports are integers.
184 where kernel_process is a Popen object and the ports are integers.
182 """
185 """
183 # Find open ports as necessary.
186 # Find open ports as necessary.
184 ports = []
187 ports = []
185 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + \
188 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + \
186 int(req_port <= 0) + int(hb_port <= 0)
189 int(req_port <= 0) + int(hb_port <= 0)
187 for i in xrange(ports_needed):
190 for i in xrange(ports_needed):
188 sock = socket.socket()
191 sock = socket.socket()
189 sock.bind(('', 0))
192 sock.bind(('', 0))
190 ports.append(sock)
193 ports.append(sock)
191 for i, sock in enumerate(ports):
194 for i, sock in enumerate(ports):
192 port = sock.getsockname()[1]
195 port = sock.getsockname()[1]
193 sock.close()
196 sock.close()
194 ports[i] = port
197 ports[i] = port
195 if xrep_port <= 0:
198 if xrep_port <= 0:
196 xrep_port = ports.pop(0)
199 xrep_port = ports.pop(0)
197 if pub_port <= 0:
200 if pub_port <= 0:
198 pub_port = ports.pop(0)
201 pub_port = ports.pop(0)
199 if req_port <= 0:
202 if req_port <= 0:
200 req_port = ports.pop(0)
203 req_port = ports.pop(0)
201 if hb_port <= 0:
204 if hb_port <= 0:
202 hb_port = ports.pop(0)
205 hb_port = ports.pop(0)
203
206
204 # Build the kernel launch command.
207 # Build the kernel launch command.
205 arguments = [ sys.executable, '-c', code, '--xrep', str(xrep_port),
208 arguments = [ sys.executable, '-c', code, '--xrep', str(xrep_port),
206 '--pub', str(pub_port), '--req', str(req_port),
209 '--pub', str(pub_port), '--req', str(req_port),
207 '--hb', str(hb_port) ]
210 '--hb', str(hb_port) ]
208 arguments.extend(extra_arguments)
211 arguments.extend(extra_arguments)
209
212
210 # Spawn a kernel.
213 # Spawn a kernel.
211 if sys.platform == 'win32':
214 if sys.platform == 'win32':
212 # Create a Win32 event for interrupting the kernel.
215 # Create a Win32 event for interrupting the kernel.
213 interrupt_event = ParentPollerWindows.create_interrupt_event()
216 interrupt_event = ParentPollerWindows.create_interrupt_event()
214 arguments += [ '--interrupt', str(int(interrupt_event)) ]
217 arguments += [ '--interrupt', str(int(interrupt_event)) ]
215
218
216 # If using pythonw, stdin, stdout, and stderr are invalid. Popen will
219 # If using pythonw, stdin, stdout, and stderr are invalid. Popen will
217 # fail unless they are suitably redirected. We don't read from the
220 # fail unless they are suitably redirected. We don't read from the
218 # pipes, but they must exist.
221 # pipes, but they must exist.
219 redirect = PIPE if sys.executable.endswith('pythonw.exe') else None
222 redirect = PIPE if sys.executable.endswith('pythonw.exe') else None
220
223
221 if independent:
224 if independent:
222 proc = Popen(arguments,
225 proc = Popen(arguments,
223 creationflags=512, # CREATE_NEW_PROCESS_GROUP
226 creationflags=512, # CREATE_NEW_PROCESS_GROUP
224 stdout=redirect, stderr=redirect, stdin=redirect)
227 stdout=redirect, stderr=redirect, stdin=redirect)
225 else:
228 else:
226 from _subprocess import DuplicateHandle, GetCurrentProcess, \
229 from _subprocess import DuplicateHandle, GetCurrentProcess, \
227 DUPLICATE_SAME_ACCESS
230 DUPLICATE_SAME_ACCESS
228 pid = GetCurrentProcess()
231 pid = GetCurrentProcess()
229 handle = DuplicateHandle(pid, pid, pid, 0,
232 handle = DuplicateHandle(pid, pid, pid, 0,
230 True, # Inheritable by new processes.
233 True, # Inheritable by new processes.
231 DUPLICATE_SAME_ACCESS)
234 DUPLICATE_SAME_ACCESS)
232 proc = Popen(arguments + ['--parent', str(int(handle))],
235 proc = Popen(arguments + ['--parent', str(int(handle))],
233 stdout=redirect, stderr=redirect, stdin=redirect)
236 stdout=redirect, stderr=redirect, stdin=redirect)
234
237
235 # Attach the interrupt event to the Popen objet so it can be used later.
238 # Attach the interrupt event to the Popen objet so it can be used later.
236 proc.win32_interrupt_event = interrupt_event
239 proc.win32_interrupt_event = interrupt_event
237
240
238 # Clean up pipes created to work around Popen bug.
241 # Clean up pipes created to work around Popen bug.
239 if redirect is not None:
242 if redirect is not None:
240 proc.stdout.close()
243 proc.stdout.close()
241 proc.stderr.close()
244 proc.stderr.close()
242 proc.stdin.close()
245 proc.stdin.close()
243
246
244 else:
247 else:
245 if independent:
248 if independent:
246 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
249 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
247 else:
250 else:
248 proc = Popen(arguments + ['--parent'])
251 proc = Popen(arguments + ['--parent'])
249
252
250 return proc, xrep_port, pub_port, req_port, hb_port
253 return proc, xrep_port, pub_port, req_port, hb_port
@@ -1,899 +1,904 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2010 The IPython Development Team
8 # Copyright (C) 2008-2010 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 from Queue import Queue, Empty
20 from Queue import Queue, Empty
20 from subprocess import Popen
21 from subprocess import Popen
21 import signal
22 import signal
22 import sys
23 import sys
23 from threading import Thread
24 from threading import Thread
24 import time
25 import time
25
26
26 # System library imports.
27 # System library imports.
27 import zmq
28 import zmq
28 from zmq import POLLIN, POLLOUT, POLLERR
29 from zmq import POLLIN, POLLOUT, POLLERR
29 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
30
31
31 # Local imports.
32 # Local imports.
32 from IPython.utils import io
33 from IPython.utils import io
33 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
34 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
34 from session import Session
35 from session import Session
35
36
36 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
37 # Constants and exceptions
38 # Constants and exceptions
38 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
39
40
40 LOCALHOST = '127.0.0.1'
41 LOCALHOST = '127.0.0.1'
41
42
42 class InvalidPortNumber(Exception):
43 class InvalidPortNumber(Exception):
43 pass
44 pass
44
45
45 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
46 # Utility functions
47 # Utility functions
47 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
48
49
49 # some utilities to validate message structure, these might get moved elsewhere
50 # some utilities to validate message structure, these might get moved elsewhere
50 # if they prove to have more generic utility
51 # if they prove to have more generic utility
51
52
52 def validate_string_list(lst):
53 def validate_string_list(lst):
53 """Validate that the input is a list of strings.
54 """Validate that the input is a list of strings.
54
55
55 Raises ValueError if not."""
56 Raises ValueError if not."""
56 if not isinstance(lst, list):
57 if not isinstance(lst, list):
57 raise ValueError('input %r must be a list' % lst)
58 raise ValueError('input %r must be a list' % lst)
58 for x in lst:
59 for x in lst:
59 if not isinstance(x, basestring):
60 if not isinstance(x, basestring):
60 raise ValueError('element %r in list must be a string' % x)
61 raise ValueError('element %r in list must be a string' % x)
61
62
62
63
63 def validate_string_dict(dct):
64 def validate_string_dict(dct):
64 """Validate that the input is a dict with string keys and values.
65 """Validate that the input is a dict with string keys and values.
65
66
66 Raises ValueError if not."""
67 Raises ValueError if not."""
67 for k,v in dct.iteritems():
68 for k,v in dct.iteritems():
68 if not isinstance(k, basestring):
69 if not isinstance(k, basestring):
69 raise ValueError('key %r in dict must be a string' % k)
70 raise ValueError('key %r in dict must be a string' % k)
70 if not isinstance(v, basestring):
71 if not isinstance(v, basestring):
71 raise ValueError('value %r in dict must be a string' % v)
72 raise ValueError('value %r in dict must be a string' % v)
72
73
73
74
74 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
75 # ZMQ Socket Channel classes
76 # ZMQ Socket Channel classes
76 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
77
78
78 class ZmqSocketChannel(Thread):
79 class ZmqSocketChannel(Thread):
79 """The base class for the channels that use ZMQ sockets.
80 """The base class for the channels that use ZMQ sockets.
80 """
81 """
81 context = None
82 context = None
82 session = None
83 session = None
83 socket = None
84 socket = None
84 ioloop = None
85 ioloop = None
85 iostate = None
86 iostate = None
86 _address = None
87 _address = None
87
88
88 def __init__(self, context, session, address):
89 def __init__(self, context, session, address):
89 """Create a channel
90 """Create a channel
90
91
91 Parameters
92 Parameters
92 ----------
93 ----------
93 context : :class:`zmq.Context`
94 context : :class:`zmq.Context`
94 The ZMQ context to use.
95 The ZMQ context to use.
95 session : :class:`session.Session`
96 session : :class:`session.Session`
96 The session to use.
97 The session to use.
97 address : tuple
98 address : tuple
98 Standard (ip, port) tuple that the kernel is listening on.
99 Standard (ip, port) tuple that the kernel is listening on.
99 """
100 """
100 super(ZmqSocketChannel, self).__init__()
101 super(ZmqSocketChannel, self).__init__()
101 self.daemon = True
102 self.daemon = True
102
103
103 self.context = context
104 self.context = context
104 self.session = session
105 self.session = session
105 if address[1] == 0:
106 if address[1] == 0:
106 message = 'The port number for a channel cannot be 0.'
107 message = 'The port number for a channel cannot be 0.'
107 raise InvalidPortNumber(message)
108 raise InvalidPortNumber(message)
108 self._address = address
109 self._address = address
109
110
110 def stop(self):
111 def stop(self):
111 """Stop the channel's activity.
112 """Stop the channel's activity.
112
113
113 This calls :method:`Thread.join` and returns when the thread
114 This calls :method:`Thread.join` and returns when the thread
114 terminates. :class:`RuntimeError` will be raised if
115 terminates. :class:`RuntimeError` will be raised if
115 :method:`self.start` is called again.
116 :method:`self.start` is called again.
116 """
117 """
117 self.join()
118 self.join()
118
119
119 @property
120 @property
120 def address(self):
121 def address(self):
121 """Get the channel's address as an (ip, port) tuple.
122 """Get the channel's address as an (ip, port) tuple.
122
123
123 By the default, the address is (localhost, 0), where 0 means a random
124 By the default, the address is (localhost, 0), where 0 means a random
124 port.
125 port.
125 """
126 """
126 return self._address
127 return self._address
127
128
128 def add_io_state(self, state):
129 def add_io_state(self, state):
129 """Add IO state to the eventloop.
130 """Add IO state to the eventloop.
130
131
131 Parameters
132 Parameters
132 ----------
133 ----------
133 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 The IO state flag to set.
135 The IO state flag to set.
135
136
136 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 """
138 """
138 def add_io_state_callback():
139 def add_io_state_callback():
139 if not self.iostate & state:
140 if not self.iostate & state:
140 self.iostate = self.iostate | state
141 self.iostate = self.iostate | state
141 self.ioloop.update_handler(self.socket, self.iostate)
142 self.ioloop.update_handler(self.socket, self.iostate)
142 self.ioloop.add_callback(add_io_state_callback)
143 self.ioloop.add_callback(add_io_state_callback)
143
144
144 def drop_io_state(self, state):
145 def drop_io_state(self, state):
145 """Drop IO state from the eventloop.
146 """Drop IO state from the eventloop.
146
147
147 Parameters
148 Parameters
148 ----------
149 ----------
149 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 The IO state flag to set.
151 The IO state flag to set.
151
152
152 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 """
154 """
154 def drop_io_state_callback():
155 def drop_io_state_callback():
155 if self.iostate & state:
156 if self.iostate & state:
156 self.iostate = self.iostate & (~state)
157 self.iostate = self.iostate & (~state)
157 self.ioloop.update_handler(self.socket, self.iostate)
158 self.ioloop.update_handler(self.socket, self.iostate)
158 self.ioloop.add_callback(drop_io_state_callback)
159 self.ioloop.add_callback(drop_io_state_callback)
159
160
160
161
161 class XReqSocketChannel(ZmqSocketChannel):
162 class XReqSocketChannel(ZmqSocketChannel):
162 """The XREQ channel for issues request/replies to the kernel.
163 """The XREQ channel for issues request/replies to the kernel.
163 """
164 """
164
165
165 command_queue = None
166 command_queue = None
166
167
167 def __init__(self, context, session, address):
168 def __init__(self, context, session, address):
168 super(XReqSocketChannel, self).__init__(context, session, address)
169 super(XReqSocketChannel, self).__init__(context, session, address)
169 self.command_queue = Queue()
170 self.command_queue = Queue()
170 self.ioloop = ioloop.IOLoop()
171 self.ioloop = ioloop.IOLoop()
171
172
172 def run(self):
173 def run(self):
173 """The thread's main activity. Call start() instead."""
174 """The thread's main activity. Call start() instead."""
174 self.socket = self.context.socket(zmq.XREQ)
175 self.socket = self.context.socket(zmq.XREQ)
175 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
176 self.socket.connect('tcp://%s:%i' % self.address)
177 self.socket.connect('tcp://%s:%i' % self.address)
177 self.iostate = POLLERR|POLLIN
178 self.iostate = POLLERR|POLLIN
178 self.ioloop.add_handler(self.socket, self._handle_events,
179 self.ioloop.add_handler(self.socket, self._handle_events,
179 self.iostate)
180 self.iostate)
180 self.ioloop.start()
181 self.ioloop.start()
181
182
182 def stop(self):
183 def stop(self):
183 self.ioloop.stop()
184 self.ioloop.stop()
184 super(XReqSocketChannel, self).stop()
185 super(XReqSocketChannel, self).stop()
185
186
186 def call_handlers(self, msg):
187 def call_handlers(self, msg):
187 """This method is called in the ioloop thread when a message arrives.
188 """This method is called in the ioloop thread when a message arrives.
188
189
189 Subclasses should override this method to handle incoming messages.
190 Subclasses should override this method to handle incoming messages.
190 It is important to remember that this method is called in the thread
191 It is important to remember that this method is called in the thread
191 so that some logic must be done to ensure that the application leve
192 so that some logic must be done to ensure that the application leve
192 handlers are called in the application thread.
193 handlers are called in the application thread.
193 """
194 """
194 raise NotImplementedError('call_handlers must be defined in a subclass.')
195 raise NotImplementedError('call_handlers must be defined in a subclass.')
195
196
196 def execute(self, code, silent=False,
197 def execute(self, code, silent=False,
197 user_variables=None, user_expressions=None):
198 user_variables=None, user_expressions=None):
198 """Execute code in the kernel.
199 """Execute code in the kernel.
199
200
200 Parameters
201 Parameters
201 ----------
202 ----------
202 code : str
203 code : str
203 A string of Python code.
204 A string of Python code.
204
205
205 silent : bool, optional (default False)
206 silent : bool, optional (default False)
206 If set, the kernel will execute the code as quietly possible.
207 If set, the kernel will execute the code as quietly possible.
207
208
208 user_variables : list, optional
209 user_variables : list, optional
209 A list of variable names to pull from the user's namespace. They
210 A list of variable names to pull from the user's namespace. They
210 will come back as a dict with these names as keys and their
211 will come back as a dict with these names as keys and their
211 :func:`repr` as values.
212 :func:`repr` as values.
212
213
213 user_expressions : dict, optional
214 user_expressions : dict, optional
214 A dict with string keys and to pull from the user's
215 A dict with string keys and to pull from the user's
215 namespace. They will come back as a dict with these names as keys
216 namespace. They will come back as a dict with these names as keys
216 and their :func:`repr` as values.
217 and their :func:`repr` as values.
217
218
218 Returns
219 Returns
219 -------
220 -------
220 The msg_id of the message sent.
221 The msg_id of the message sent.
221 """
222 """
222 if user_variables is None:
223 if user_variables is None:
223 user_variables = []
224 user_variables = []
224 if user_expressions is None:
225 if user_expressions is None:
225 user_expressions = {}
226 user_expressions = {}
226
227
227 # Don't waste network traffic if inputs are invalid
228 # Don't waste network traffic if inputs are invalid
228 if not isinstance(code, basestring):
229 if not isinstance(code, basestring):
229 raise ValueError('code %r must be a string' % code)
230 raise ValueError('code %r must be a string' % code)
230 validate_string_list(user_variables)
231 validate_string_list(user_variables)
231 validate_string_dict(user_expressions)
232 validate_string_dict(user_expressions)
232
233
233 # Create class for content/msg creation. Related to, but possibly
234 # Create class for content/msg creation. Related to, but possibly
234 # not in Session.
235 # not in Session.
235 content = dict(code=code, silent=silent,
236 content = dict(code=code, silent=silent,
236 user_variables=user_variables,
237 user_variables=user_variables,
237 user_expressions=user_expressions)
238 user_expressions=user_expressions)
238 msg = self.session.msg('execute_request', content)
239 msg = self.session.msg('execute_request', content)
239 self._queue_request(msg)
240 self._queue_request(msg)
240 return msg['header']['msg_id']
241 return msg['header']['msg_id']
241
242
242 def complete(self, text, line, cursor_pos, block=None):
243 def complete(self, text, line, cursor_pos, block=None):
243 """Tab complete text in the kernel's namespace.
244 """Tab complete text in the kernel's namespace.
244
245
245 Parameters
246 Parameters
246 ----------
247 ----------
247 text : str
248 text : str
248 The text to complete.
249 The text to complete.
249 line : str
250 line : str
250 The full line of text that is the surrounding context for the
251 The full line of text that is the surrounding context for the
251 text to complete.
252 text to complete.
252 cursor_pos : int
253 cursor_pos : int
253 The position of the cursor in the line where the completion was
254 The position of the cursor in the line where the completion was
254 requested.
255 requested.
255 block : str, optional
256 block : str, optional
256 The full block of code in which the completion is being requested.
257 The full block of code in which the completion is being requested.
257
258
258 Returns
259 Returns
259 -------
260 -------
260 The msg_id of the message sent.
261 The msg_id of the message sent.
261 """
262 """
262 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 msg = self.session.msg('complete_request', content)
264 msg = self.session.msg('complete_request', content)
264 self._queue_request(msg)
265 self._queue_request(msg)
265 return msg['header']['msg_id']
266 return msg['header']['msg_id']
266
267
267 def object_info(self, oname):
268 def object_info(self, oname):
268 """Get metadata information about an object.
269 """Get metadata information about an object.
269
270
270 Parameters
271 Parameters
271 ----------
272 ----------
272 oname : str
273 oname : str
273 A string specifying the object name.
274 A string specifying the object name.
274
275
275 Returns
276 Returns
276 -------
277 -------
277 The msg_id of the message sent.
278 The msg_id of the message sent.
278 """
279 """
279 content = dict(oname=oname)
280 content = dict(oname=oname)
280 msg = self.session.msg('object_info_request', content)
281 msg = self.session.msg('object_info_request', content)
281 self._queue_request(msg)
282 self._queue_request(msg)
282 return msg['header']['msg_id']
283 return msg['header']['msg_id']
283
284
284 def history(self, index=None, raw=False, output=True):
285 def history(self, index=None, raw=False, output=True):
285 """Get the history list.
286 """Get the history list.
286
287
287 Parameters
288 Parameters
288 ----------
289 ----------
289 index : n or (n1, n2) or None
290 index : n or (n1, n2) or None
290 If n, then the last entries. If a tuple, then all in
291 If n, then the last entries. If a tuple, then all in
291 range(n1, n2). If None, then all entries. Raises IndexError if
292 range(n1, n2). If None, then all entries. Raises IndexError if
292 the format of index is incorrect.
293 the format of index is incorrect.
293 raw : bool
294 raw : bool
294 If True, return the raw input.
295 If True, return the raw input.
295 output : bool
296 output : bool
296 If True, then return the output as well.
297 If True, then return the output as well.
297
298
298 Returns
299 Returns
299 -------
300 -------
300 The msg_id of the message sent.
301 The msg_id of the message sent.
301 """
302 """
302 content = dict(index=index, raw=raw, output=output)
303 content = dict(index=index, raw=raw, output=output)
303 msg = self.session.msg('history_request', content)
304 msg = self.session.msg('history_request', content)
304 self._queue_request(msg)
305 self._queue_request(msg)
305 return msg['header']['msg_id']
306 return msg['header']['msg_id']
306
307
307 def shutdown(self):
308 def shutdown(self):
308 """Request an immediate kernel shutdown.
309 """Request an immediate kernel shutdown.
309
310
310 Upon receipt of the (empty) reply, client code can safely assume that
311 Upon receipt of the (empty) reply, client code can safely assume that
311 the kernel has shut down and it's safe to forcefully terminate it if
312 the kernel has shut down and it's safe to forcefully terminate it if
312 it's still alive.
313 it's still alive.
313
314
314 The kernel will send the reply via a function registered with Python's
315 The kernel will send the reply via a function registered with Python's
315 atexit module, ensuring it's truly done as the kernel is done with all
316 atexit module, ensuring it's truly done as the kernel is done with all
316 normal operation.
317 normal operation.
317 """
318 """
318 # Send quit message to kernel. Once we implement kernel-side setattr,
319 # Send quit message to kernel. Once we implement kernel-side setattr,
319 # this should probably be done that way, but for now this will do.
320 # this should probably be done that way, but for now this will do.
320 msg = self.session.msg('shutdown_request', {})
321 msg = self.session.msg('shutdown_request', {})
321 self._queue_request(msg)
322 self._queue_request(msg)
322 return msg['header']['msg_id']
323 return msg['header']['msg_id']
323
324
324 def _handle_events(self, socket, events):
325 def _handle_events(self, socket, events):
325 if events & POLLERR:
326 if events & POLLERR:
326 self._handle_err()
327 self._handle_err()
327 if events & POLLOUT:
328 if events & POLLOUT:
328 self._handle_send()
329 self._handle_send()
329 if events & POLLIN:
330 if events & POLLIN:
330 self._handle_recv()
331 self._handle_recv()
331
332
332 def _handle_recv(self):
333 def _handle_recv(self):
333 msg = self.socket.recv_json()
334 msg = self.socket.recv_json()
334 self.call_handlers(msg)
335 self.call_handlers(msg)
335
336
336 def _handle_send(self):
337 def _handle_send(self):
337 try:
338 try:
338 msg = self.command_queue.get(False)
339 msg = self.command_queue.get(False)
339 except Empty:
340 except Empty:
340 pass
341 pass
341 else:
342 else:
342 self.socket.send_json(msg)
343 self.socket.send_json(msg)
343 if self.command_queue.empty():
344 if self.command_queue.empty():
344 self.drop_io_state(POLLOUT)
345 self.drop_io_state(POLLOUT)
345
346
346 def _handle_err(self):
347 def _handle_err(self):
347 # We don't want to let this go silently, so eventually we should log.
348 # We don't want to let this go silently, so eventually we should log.
348 raise zmq.ZMQError()
349 raise zmq.ZMQError()
349
350
350 def _queue_request(self, msg):
351 def _queue_request(self, msg):
351 self.command_queue.put(msg)
352 self.command_queue.put(msg)
352 self.add_io_state(POLLOUT)
353 self.add_io_state(POLLOUT)
353
354
354
355
355 class SubSocketChannel(ZmqSocketChannel):
356 class SubSocketChannel(ZmqSocketChannel):
356 """The SUB channel which listens for messages that the kernel publishes.
357 """The SUB channel which listens for messages that the kernel publishes.
357 """
358 """
358
359
359 def __init__(self, context, session, address):
360 def __init__(self, context, session, address):
360 super(SubSocketChannel, self).__init__(context, session, address)
361 super(SubSocketChannel, self).__init__(context, session, address)
361 self.ioloop = ioloop.IOLoop()
362 self.ioloop = ioloop.IOLoop()
362
363
363 def run(self):
364 def run(self):
364 """The thread's main activity. Call start() instead."""
365 """The thread's main activity. Call start() instead."""
365 self.socket = self.context.socket(zmq.SUB)
366 self.socket = self.context.socket(zmq.SUB)
366 self.socket.setsockopt(zmq.SUBSCRIBE,'')
367 self.socket.setsockopt(zmq.SUBSCRIBE,'')
367 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
368 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
368 self.socket.connect('tcp://%s:%i' % self.address)
369 self.socket.connect('tcp://%s:%i' % self.address)
369 self.iostate = POLLIN|POLLERR
370 self.iostate = POLLIN|POLLERR
370 self.ioloop.add_handler(self.socket, self._handle_events,
371 self.ioloop.add_handler(self.socket, self._handle_events,
371 self.iostate)
372 self.iostate)
372 self.ioloop.start()
373 self.ioloop.start()
373
374
374 def stop(self):
375 def stop(self):
375 self.ioloop.stop()
376 self.ioloop.stop()
376 super(SubSocketChannel, self).stop()
377 super(SubSocketChannel, self).stop()
377
378
378 def call_handlers(self, msg):
379 def call_handlers(self, msg):
379 """This method is called in the ioloop thread when a message arrives.
380 """This method is called in the ioloop thread when a message arrives.
380
381
381 Subclasses should override this method to handle incoming messages.
382 Subclasses should override this method to handle incoming messages.
382 It is important to remember that this method is called in the thread
383 It is important to remember that this method is called in the thread
383 so that some logic must be done to ensure that the application leve
384 so that some logic must be done to ensure that the application leve
384 handlers are called in the application thread.
385 handlers are called in the application thread.
385 """
386 """
386 raise NotImplementedError('call_handlers must be defined in a subclass.')
387 raise NotImplementedError('call_handlers must be defined in a subclass.')
387
388
388 def flush(self, timeout=1.0):
389 def flush(self, timeout=1.0):
389 """Immediately processes all pending messages on the SUB channel.
390 """Immediately processes all pending messages on the SUB channel.
390
391
391 Callers should use this method to ensure that :method:`call_handlers`
392 Callers should use this method to ensure that :method:`call_handlers`
392 has been called for all messages that have been received on the
393 has been called for all messages that have been received on the
393 0MQ SUB socket of this channel.
394 0MQ SUB socket of this channel.
394
395
395 This method is thread safe.
396 This method is thread safe.
396
397
397 Parameters
398 Parameters
398 ----------
399 ----------
399 timeout : float, optional
400 timeout : float, optional
400 The maximum amount of time to spend flushing, in seconds. The
401 The maximum amount of time to spend flushing, in seconds. The
401 default is one second.
402 default is one second.
402 """
403 """
403 # We do the IOLoop callback process twice to ensure that the IOLoop
404 # We do the IOLoop callback process twice to ensure that the IOLoop
404 # gets to perform at least one full poll.
405 # gets to perform at least one full poll.
405 stop_time = time.time() + timeout
406 stop_time = time.time() + timeout
406 for i in xrange(2):
407 for i in xrange(2):
407 self._flushed = False
408 self._flushed = False
408 self.ioloop.add_callback(self._flush)
409 self.ioloop.add_callback(self._flush)
409 while not self._flushed and time.time() < stop_time:
410 while not self._flushed and time.time() < stop_time:
410 time.sleep(0.01)
411 time.sleep(0.01)
411
412
412 def _handle_events(self, socket, events):
413 def _handle_events(self, socket, events):
413 # Turn on and off POLLOUT depending on if we have made a request
414 # Turn on and off POLLOUT depending on if we have made a request
414 if events & POLLERR:
415 if events & POLLERR:
415 self._handle_err()
416 self._handle_err()
416 if events & POLLIN:
417 if events & POLLIN:
417 self._handle_recv()
418 self._handle_recv()
418
419
419 def _handle_err(self):
420 def _handle_err(self):
420 # We don't want to let this go silently, so eventually we should log.
421 # We don't want to let this go silently, so eventually we should log.
421 raise zmq.ZMQError()
422 raise zmq.ZMQError()
422
423
423 def _handle_recv(self):
424 def _handle_recv(self):
424 # Get all of the messages we can
425 # Get all of the messages we can
425 while True:
426 while True:
426 try:
427 try:
427 msg = self.socket.recv_json(zmq.NOBLOCK)
428 msg = self.socket.recv_json(zmq.NOBLOCK)
428 except zmq.ZMQError:
429 except zmq.ZMQError:
429 # Check the errno?
430 # Check the errno?
430 # Will this trigger POLLERR?
431 # Will this trigger POLLERR?
431 break
432 break
432 else:
433 else:
433 self.call_handlers(msg)
434 self.call_handlers(msg)
434
435
435 def _flush(self):
436 def _flush(self):
436 """Callback for :method:`self.flush`."""
437 """Callback for :method:`self.flush`."""
437 self._flushed = True
438 self._flushed = True
438
439
439
440
440 class RepSocketChannel(ZmqSocketChannel):
441 class RepSocketChannel(ZmqSocketChannel):
441 """A reply channel to handle raw_input requests that the kernel makes."""
442 """A reply channel to handle raw_input requests that the kernel makes."""
442
443
443 msg_queue = None
444 msg_queue = None
444
445
445 def __init__(self, context, session, address):
446 def __init__(self, context, session, address):
446 super(RepSocketChannel, self).__init__(context, session, address)
447 super(RepSocketChannel, self).__init__(context, session, address)
447 self.ioloop = ioloop.IOLoop()
448 self.ioloop = ioloop.IOLoop()
448 self.msg_queue = Queue()
449 self.msg_queue = Queue()
449
450
450 def run(self):
451 def run(self):
451 """The thread's main activity. Call start() instead."""
452 """The thread's main activity. Call start() instead."""
452 self.socket = self.context.socket(zmq.XREQ)
453 self.socket = self.context.socket(zmq.XREQ)
453 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
454 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
454 self.socket.connect('tcp://%s:%i' % self.address)
455 self.socket.connect('tcp://%s:%i' % self.address)
455 self.iostate = POLLERR|POLLIN
456 self.iostate = POLLERR|POLLIN
456 self.ioloop.add_handler(self.socket, self._handle_events,
457 self.ioloop.add_handler(self.socket, self._handle_events,
457 self.iostate)
458 self.iostate)
458 self.ioloop.start()
459 self.ioloop.start()
459
460
460 def stop(self):
461 def stop(self):
461 self.ioloop.stop()
462 self.ioloop.stop()
462 super(RepSocketChannel, self).stop()
463 super(RepSocketChannel, self).stop()
463
464
464 def call_handlers(self, msg):
465 def call_handlers(self, msg):
465 """This method is called in the ioloop thread when a message arrives.
466 """This method is called in the ioloop thread when a message arrives.
466
467
467 Subclasses should override this method to handle incoming messages.
468 Subclasses should override this method to handle incoming messages.
468 It is important to remember that this method is called in the thread
469 It is important to remember that this method is called in the thread
469 so that some logic must be done to ensure that the application leve
470 so that some logic must be done to ensure that the application leve
470 handlers are called in the application thread.
471 handlers are called in the application thread.
471 """
472 """
472 raise NotImplementedError('call_handlers must be defined in a subclass.')
473 raise NotImplementedError('call_handlers must be defined in a subclass.')
473
474
474 def input(self, string):
475 def input(self, string):
475 """Send a string of raw input to the kernel."""
476 """Send a string of raw input to the kernel."""
476 content = dict(value=string)
477 content = dict(value=string)
477 msg = self.session.msg('input_reply', content)
478 msg = self.session.msg('input_reply', content)
478 self._queue_reply(msg)
479 self._queue_reply(msg)
479
480
480 def _handle_events(self, socket, events):
481 def _handle_events(self, socket, events):
481 if events & POLLERR:
482 if events & POLLERR:
482 self._handle_err()
483 self._handle_err()
483 if events & POLLOUT:
484 if events & POLLOUT:
484 self._handle_send()
485 self._handle_send()
485 if events & POLLIN:
486 if events & POLLIN:
486 self._handle_recv()
487 self._handle_recv()
487
488
488 def _handle_recv(self):
489 def _handle_recv(self):
489 msg = self.socket.recv_json()
490 msg = self.socket.recv_json()
490 self.call_handlers(msg)
491 self.call_handlers(msg)
491
492
492 def _handle_send(self):
493 def _handle_send(self):
493 try:
494 try:
494 msg = self.msg_queue.get(False)
495 msg = self.msg_queue.get(False)
495 except Empty:
496 except Empty:
496 pass
497 pass
497 else:
498 else:
498 self.socket.send_json(msg)
499 self.socket.send_json(msg)
499 if self.msg_queue.empty():
500 if self.msg_queue.empty():
500 self.drop_io_state(POLLOUT)
501 self.drop_io_state(POLLOUT)
501
502
502 def _handle_err(self):
503 def _handle_err(self):
503 # We don't want to let this go silently, so eventually we should log.
504 # We don't want to let this go silently, so eventually we should log.
504 raise zmq.ZMQError()
505 raise zmq.ZMQError()
505
506
506 def _queue_reply(self, msg):
507 def _queue_reply(self, msg):
507 self.msg_queue.put(msg)
508 self.msg_queue.put(msg)
508 self.add_io_state(POLLOUT)
509 self.add_io_state(POLLOUT)
509
510
510
511
511 class HBSocketChannel(ZmqSocketChannel):
512 class HBSocketChannel(ZmqSocketChannel):
512 """The heartbeat channel which monitors the kernel heartbeat.
513 """The heartbeat channel which monitors the kernel heartbeat.
513
514
514 Note that the heartbeat channel is paused by default. As long as you start
515 Note that the heartbeat channel is paused by default. As long as you start
515 this channel, the kernel manager will ensure that it is paused and un-paused
516 this channel, the kernel manager will ensure that it is paused and un-paused
516 as appropriate.
517 as appropriate.
517 """
518 """
518
519
519 time_to_dead = 3.0
520 time_to_dead = 3.0
520 socket = None
521 socket = None
521 poller = None
522 poller = None
522 _running = None
523 _running = None
523 _pause = None
524 _pause = None
524
525
525 def __init__(self, context, session, address):
526 def __init__(self, context, session, address):
526 super(HBSocketChannel, self).__init__(context, session, address)
527 super(HBSocketChannel, self).__init__(context, session, address)
527 self._running = False
528 self._running = False
528 self._pause = True
529 self._pause = True
529
530
530 def _create_socket(self):
531 def _create_socket(self):
531 self.socket = self.context.socket(zmq.REQ)
532 self.socket = self.context.socket(zmq.REQ)
532 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
533 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
533 self.socket.connect('tcp://%s:%i' % self.address)
534 self.socket.connect('tcp://%s:%i' % self.address)
534 self.poller = zmq.Poller()
535 self.poller = zmq.Poller()
535 self.poller.register(self.socket, zmq.POLLIN)
536 self.poller.register(self.socket, zmq.POLLIN)
536
537
537 def run(self):
538 def run(self):
538 """The thread's main activity. Call start() instead."""
539 """The thread's main activity. Call start() instead."""
539 self._create_socket()
540 self._create_socket()
540 self._running = True
541 self._running = True
541 while self._running:
542 while self._running:
542 if self._pause:
543 if self._pause:
543 time.sleep(self.time_to_dead)
544 time.sleep(self.time_to_dead)
544 else:
545 else:
545 since_last_heartbeat = 0.0
546 since_last_heartbeat = 0.0
546 request_time = time.time()
547 request_time = time.time()
547 try:
548 try:
548 #io.rprint('Ping from HB channel') # dbg
549 #io.rprint('Ping from HB channel') # dbg
549 self.socket.send_json('ping')
550 self.socket.send_json('ping')
550 except zmq.ZMQError, e:
551 except zmq.ZMQError, e:
551 #io.rprint('*** HB Error:', e) # dbg
552 #io.rprint('*** HB Error:', e) # dbg
552 if e.errno == zmq.EFSM:
553 if e.errno == zmq.EFSM:
553 #io.rprint('sleep...', self.time_to_dead) # dbg
554 #io.rprint('sleep...', self.time_to_dead) # dbg
554 time.sleep(self.time_to_dead)
555 time.sleep(self.time_to_dead)
555 self._create_socket()
556 self._create_socket()
556 else:
557 else:
557 raise
558 raise
558 else:
559 else:
559 while True:
560 while True:
560 try:
561 try:
561 self.socket.recv_json(zmq.NOBLOCK)
562 self.socket.recv_json(zmq.NOBLOCK)
562 except zmq.ZMQError, e:
563 except zmq.ZMQError, e:
563 #io.rprint('*** HB Error 2:', e) # dbg
564 #io.rprint('*** HB Error 2:', e) # dbg
564 if e.errno == zmq.EAGAIN:
565 if e.errno == zmq.EAGAIN:
565 before_poll = time.time()
566 before_poll = time.time()
566 until_dead = self.time_to_dead - (before_poll -
567 until_dead = self.time_to_dead - (before_poll -
567 request_time)
568 request_time)
568
569
569 # When the return value of poll() is an empty
570 # When the return value of poll() is an empty
570 # list, that is when things have gone wrong
571 # list, that is when things have gone wrong
571 # (zeromq bug). As long as it is not an empty
572 # (zeromq bug). As long as it is not an empty
572 # list, poll is working correctly even if it
573 # list, poll is working correctly even if it
573 # returns quickly. Note: poll timeout is in
574 # returns quickly. Note: poll timeout is in
574 # milliseconds.
575 # milliseconds.
575 self.poller.poll(1000*until_dead)
576 self.poller.poll(1000*until_dead)
576
577
577 since_last_heartbeat = time.time()-request_time
578 since_last_heartbeat = time.time()-request_time
578 if since_last_heartbeat > self.time_to_dead:
579 if since_last_heartbeat > self.time_to_dead:
579 self.call_handlers(since_last_heartbeat)
580 self.call_handlers(since_last_heartbeat)
580 break
581 break
581 else:
582 else:
582 # FIXME: We should probably log this instead.
583 # FIXME: We should probably log this instead.
583 raise
584 raise
584 else:
585 else:
585 until_dead = self.time_to_dead - (time.time() -
586 until_dead = self.time_to_dead - (time.time() -
586 request_time)
587 request_time)
587 if until_dead > 0.0:
588 if until_dead > 0.0:
588 #io.rprint('sleep...', self.time_to_dead) # dbg
589 #io.rprint('sleep...', self.time_to_dead) # dbg
589 time.sleep(until_dead)
590 time.sleep(until_dead)
590 break
591 break
591
592
592 def pause(self):
593 def pause(self):
593 """Pause the heartbeat."""
594 """Pause the heartbeat."""
594 self._pause = True
595 self._pause = True
595
596
596 def unpause(self):
597 def unpause(self):
597 """Unpause the heartbeat."""
598 """Unpause the heartbeat."""
598 self._pause = False
599 self._pause = False
599
600
600 def is_beating(self):
601 def is_beating(self):
601 """Is the heartbeat running and not paused."""
602 """Is the heartbeat running and not paused."""
602 if self.is_alive() and not self._pause:
603 if self.is_alive() and not self._pause:
603 return True
604 return True
604 else:
605 else:
605 return False
606 return False
606
607
607 def stop(self):
608 def stop(self):
608 self._running = False
609 self._running = False
609 super(HBSocketChannel, self).stop()
610 super(HBSocketChannel, self).stop()
610
611
611 def call_handlers(self, since_last_heartbeat):
612 def call_handlers(self, since_last_heartbeat):
612 """This method is called in the ioloop thread when a message arrives.
613 """This method is called in the ioloop thread when a message arrives.
613
614
614 Subclasses should override this method to handle incoming messages.
615 Subclasses should override this method to handle incoming messages.
615 It is important to remember that this method is called in the thread
616 It is important to remember that this method is called in the thread
616 so that some logic must be done to ensure that the application leve
617 so that some logic must be done to ensure that the application leve
617 handlers are called in the application thread.
618 handlers are called in the application thread.
618 """
619 """
619 raise NotImplementedError('call_handlers must be defined in a subclass.')
620 raise NotImplementedError('call_handlers must be defined in a subclass.')
620
621
621
622
622 #-----------------------------------------------------------------------------
623 #-----------------------------------------------------------------------------
623 # Main kernel manager class
624 # Main kernel manager class
624 #-----------------------------------------------------------------------------
625 #-----------------------------------------------------------------------------
625
626
626 class KernelManager(HasTraits):
627 class KernelManager(HasTraits):
627 """ Manages a kernel for a frontend.
628 """ Manages a kernel for a frontend.
628
629
629 The SUB channel is for the frontend to receive messages published by the
630 The SUB channel is for the frontend to receive messages published by the
630 kernel.
631 kernel.
631
632
632 The REQ channel is for the frontend to make requests of the kernel.
633 The REQ channel is for the frontend to make requests of the kernel.
633
634
634 The REP channel is for the kernel to request stdin (raw_input) from the
635 The REP channel is for the kernel to request stdin (raw_input) from the
635 frontend.
636 frontend.
636 """
637 """
637 # The PyZMQ Context to use for communication with the kernel.
638 # The PyZMQ Context to use for communication with the kernel.
638 context = Instance(zmq.Context,(),{})
639 context = Instance(zmq.Context,(),{})
639
640
640 # The Session to use for communication with the kernel.
641 # The Session to use for communication with the kernel.
641 session = Instance(Session,(),{})
642 session = Instance(Session,(),{})
642
643
643 # The kernel process with which the KernelManager is communicating.
644 # The kernel process with which the KernelManager is communicating.
644 kernel = Instance(Popen)
645 kernel = Instance(Popen)
645
646
646 # The addresses for the communication channels.
647 # The addresses for the communication channels.
647 xreq_address = TCPAddress((LOCALHOST, 0))
648 xreq_address = TCPAddress((LOCALHOST, 0))
648 sub_address = TCPAddress((LOCALHOST, 0))
649 sub_address = TCPAddress((LOCALHOST, 0))
649 rep_address = TCPAddress((LOCALHOST, 0))
650 rep_address = TCPAddress((LOCALHOST, 0))
650 hb_address = TCPAddress((LOCALHOST, 0))
651 hb_address = TCPAddress((LOCALHOST, 0))
651
652
652 # The classes to use for the various channels.
653 # The classes to use for the various channels.
653 xreq_channel_class = Type(XReqSocketChannel)
654 xreq_channel_class = Type(XReqSocketChannel)
654 sub_channel_class = Type(SubSocketChannel)
655 sub_channel_class = Type(SubSocketChannel)
655 rep_channel_class = Type(RepSocketChannel)
656 rep_channel_class = Type(RepSocketChannel)
656 hb_channel_class = Type(HBSocketChannel)
657 hb_channel_class = Type(HBSocketChannel)
657
658
658 # Protected traits.
659 # Protected traits.
659 _launch_args = Any
660 _launch_args = Any
660 _xreq_channel = Any
661 _xreq_channel = Any
661 _sub_channel = Any
662 _sub_channel = Any
662 _rep_channel = Any
663 _rep_channel = Any
663 _hb_channel = Any
664 _hb_channel = Any
664
665
666 def __init__(self, **kwargs):
667 super(KernelManager, self).__init__(**kwargs)
668 atexit.register(self.context.close)
669
665 #--------------------------------------------------------------------------
670 #--------------------------------------------------------------------------
666 # Channel management methods:
671 # Channel management methods:
667 #--------------------------------------------------------------------------
672 #--------------------------------------------------------------------------
668
673
669 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
674 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
670 """Starts the channels for this kernel.
675 """Starts the channels for this kernel.
671
676
672 This will create the channels if they do not exist and then start
677 This will create the channels if they do not exist and then start
673 them. If port numbers of 0 are being used (random ports) then you
678 them. If port numbers of 0 are being used (random ports) then you
674 must first call :method:`start_kernel`. If the channels have been
679 must first call :method:`start_kernel`. If the channels have been
675 stopped and you call this, :class:`RuntimeError` will be raised.
680 stopped and you call this, :class:`RuntimeError` will be raised.
676 """
681 """
677 if xreq:
682 if xreq:
678 self.xreq_channel.start()
683 self.xreq_channel.start()
679 if sub:
684 if sub:
680 self.sub_channel.start()
685 self.sub_channel.start()
681 if rep:
686 if rep:
682 self.rep_channel.start()
687 self.rep_channel.start()
683 if hb:
688 if hb:
684 self.hb_channel.start()
689 self.hb_channel.start()
685
690
686 def stop_channels(self):
691 def stop_channels(self):
687 """Stops all the running channels for this kernel.
692 """Stops all the running channels for this kernel.
688 """
693 """
689 if self.xreq_channel.is_alive():
694 if self.xreq_channel.is_alive():
690 self.xreq_channel.stop()
695 self.xreq_channel.stop()
691 if self.sub_channel.is_alive():
696 if self.sub_channel.is_alive():
692 self.sub_channel.stop()
697 self.sub_channel.stop()
693 if self.rep_channel.is_alive():
698 if self.rep_channel.is_alive():
694 self.rep_channel.stop()
699 self.rep_channel.stop()
695 if self.hb_channel.is_alive():
700 if self.hb_channel.is_alive():
696 self.hb_channel.stop()
701 self.hb_channel.stop()
697
702
698 @property
703 @property
699 def channels_running(self):
704 def channels_running(self):
700 """Are any of the channels created and running?"""
705 """Are any of the channels created and running?"""
701 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
706 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
702 self.rep_channel.is_alive() or self.hb_channel.is_alive())
707 self.rep_channel.is_alive() or self.hb_channel.is_alive())
703
708
704 #--------------------------------------------------------------------------
709 #--------------------------------------------------------------------------
705 # Kernel process management methods:
710 # Kernel process management methods:
706 #--------------------------------------------------------------------------
711 #--------------------------------------------------------------------------
707
712
708 def start_kernel(self, **kw):
713 def start_kernel(self, **kw):
709 """Starts a kernel process and configures the manager to use it.
714 """Starts a kernel process and configures the manager to use it.
710
715
711 If random ports (port=0) are being used, this method must be called
716 If random ports (port=0) are being used, this method must be called
712 before the channels are created.
717 before the channels are created.
713
718
714 Parameters:
719 Parameters:
715 -----------
720 -----------
716 ipython : bool, optional (default True)
721 ipython : bool, optional (default True)
717 Whether to use an IPython kernel instead of a plain Python kernel.
722 Whether to use an IPython kernel instead of a plain Python kernel.
718 """
723 """
719 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
724 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
720 self.rep_address, self.hb_address
725 self.rep_address, self.hb_address
721 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
726 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
722 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
727 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
723 raise RuntimeError("Can only launch a kernel on localhost."
728 raise RuntimeError("Can only launch a kernel on localhost."
724 "Make sure that the '*_address' attributes are "
729 "Make sure that the '*_address' attributes are "
725 "configured properly.")
730 "configured properly.")
726
731
727 self._launch_args = kw.copy()
732 self._launch_args = kw.copy()
728 if kw.pop('ipython', True):
733 if kw.pop('ipython', True):
729 from ipkernel import launch_kernel
734 from ipkernel import launch_kernel
730 else:
735 else:
731 from pykernel import launch_kernel
736 from pykernel import launch_kernel
732 self.kernel, xrep, pub, req, hb = launch_kernel(
737 self.kernel, xrep, pub, req, hb = launch_kernel(
733 xrep_port=xreq[1], pub_port=sub[1],
738 xrep_port=xreq[1], pub_port=sub[1],
734 req_port=rep[1], hb_port=hb[1], **kw)
739 req_port=rep[1], hb_port=hb[1], **kw)
735 self.xreq_address = (LOCALHOST, xrep)
740 self.xreq_address = (LOCALHOST, xrep)
736 self.sub_address = (LOCALHOST, pub)
741 self.sub_address = (LOCALHOST, pub)
737 self.rep_address = (LOCALHOST, req)
742 self.rep_address = (LOCALHOST, req)
738 self.hb_address = (LOCALHOST, hb)
743 self.hb_address = (LOCALHOST, hb)
739
744
740 def shutdown_kernel(self):
745 def shutdown_kernel(self):
741 """ Attempts to the stop the kernel process cleanly. If the kernel
746 """ Attempts to the stop the kernel process cleanly. If the kernel
742 cannot be stopped, it is killed, if possible.
747 cannot be stopped, it is killed, if possible.
743 """
748 """
744 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
749 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
745 if sys.platform == 'win32':
750 if sys.platform == 'win32':
746 self.kill_kernel()
751 self.kill_kernel()
747 return
752 return
748
753
749 # Pause the heart beat channel if it exists.
754 # Pause the heart beat channel if it exists.
750 if self._hb_channel is not None:
755 if self._hb_channel is not None:
751 self._hb_channel.pause()
756 self._hb_channel.pause()
752
757
753 # Don't send any additional kernel kill messages immediately, to give
758 # Don't send any additional kernel kill messages immediately, to give
754 # the kernel a chance to properly execute shutdown actions. Wait for at
759 # the kernel a chance to properly execute shutdown actions. Wait for at
755 # most 1s, checking every 0.1s.
760 # most 1s, checking every 0.1s.
756 self.xreq_channel.shutdown()
761 self.xreq_channel.shutdown()
757 for i in range(10):
762 for i in range(10):
758 if self.is_alive:
763 if self.is_alive:
759 time.sleep(0.1)
764 time.sleep(0.1)
760 else:
765 else:
761 break
766 break
762 else:
767 else:
763 # OK, we've waited long enough.
768 # OK, we've waited long enough.
764 if self.has_kernel:
769 if self.has_kernel:
765 self.kill_kernel()
770 self.kill_kernel()
766
771
767 def restart_kernel(self, now=False):
772 def restart_kernel(self, now=False):
768 """Restarts a kernel with the same arguments that were used to launch
773 """Restarts a kernel with the same arguments that were used to launch
769 it. If the old kernel was launched with random ports, the same ports
774 it. If the old kernel was launched with random ports, the same ports
770 will be used for the new kernel.
775 will be used for the new kernel.
771
776
772 Parameters
777 Parameters
773 ----------
778 ----------
774 now : bool, optional
779 now : bool, optional
775 If True, the kernel is forcefully restarted *immediately*, without
780 If True, the kernel is forcefully restarted *immediately*, without
776 having a chance to do any cleanup action. Otherwise the kernel is
781 having a chance to do any cleanup action. Otherwise the kernel is
777 given 1s to clean up before a forceful restart is issued.
782 given 1s to clean up before a forceful restart is issued.
778
783
779 In all cases the kernel is restarted, the only difference is whether
784 In all cases the kernel is restarted, the only difference is whether
780 it is given a chance to perform a clean shutdown or not.
785 it is given a chance to perform a clean shutdown or not.
781 """
786 """
782 if self._launch_args is None:
787 if self._launch_args is None:
783 raise RuntimeError("Cannot restart the kernel. "
788 raise RuntimeError("Cannot restart the kernel. "
784 "No previous call to 'start_kernel'.")
789 "No previous call to 'start_kernel'.")
785 else:
790 else:
786 if self.has_kernel:
791 if self.has_kernel:
787 if now:
792 if now:
788 self.kill_kernel()
793 self.kill_kernel()
789 else:
794 else:
790 self.shutdown_kernel()
795 self.shutdown_kernel()
791 self.start_kernel(**self._launch_args)
796 self.start_kernel(**self._launch_args)
792
797
793 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
798 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
794 # unless there is some delay here.
799 # unless there is some delay here.
795 if sys.platform == 'win32':
800 if sys.platform == 'win32':
796 time.sleep(0.2)
801 time.sleep(0.2)
797
802
798 @property
803 @property
799 def has_kernel(self):
804 def has_kernel(self):
800 """Returns whether a kernel process has been specified for the kernel
805 """Returns whether a kernel process has been specified for the kernel
801 manager.
806 manager.
802 """
807 """
803 return self.kernel is not None
808 return self.kernel is not None
804
809
805 def kill_kernel(self):
810 def kill_kernel(self):
806 """ Kill the running kernel. """
811 """ Kill the running kernel. """
807 if self.has_kernel:
812 if self.has_kernel:
808 # Pause the heart beat channel if it exists.
813 # Pause the heart beat channel if it exists.
809 if self._hb_channel is not None:
814 if self._hb_channel is not None:
810 self._hb_channel.pause()
815 self._hb_channel.pause()
811
816
812 # Attempt to kill the kernel.
817 # Attempt to kill the kernel.
813 try:
818 try:
814 self.kernel.kill()
819 self.kernel.kill()
815 except OSError, e:
820 except OSError, e:
816 # In Windows, we will get an Access Denied error if the process
821 # In Windows, we will get an Access Denied error if the process
817 # has already terminated. Ignore it.
822 # has already terminated. Ignore it.
818 if not (sys.platform == 'win32' and e.winerror == 5):
823 if not (sys.platform == 'win32' and e.winerror == 5):
819 raise
824 raise
820 self.kernel = None
825 self.kernel = None
821 else:
826 else:
822 raise RuntimeError("Cannot kill kernel. No kernel is running!")
827 raise RuntimeError("Cannot kill kernel. No kernel is running!")
823
828
824 def interrupt_kernel(self):
829 def interrupt_kernel(self):
825 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
830 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
826 well supported on all platforms.
831 well supported on all platforms.
827 """
832 """
828 if self.has_kernel:
833 if self.has_kernel:
829 if sys.platform == 'win32':
834 if sys.platform == 'win32':
830 from parentpoller import ParentPollerWindows as Poller
835 from parentpoller import ParentPollerWindows as Poller
831 Poller.send_interrupt(self.kernel.win32_interrupt_event)
836 Poller.send_interrupt(self.kernel.win32_interrupt_event)
832 else:
837 else:
833 self.kernel.send_signal(signal.SIGINT)
838 self.kernel.send_signal(signal.SIGINT)
834 else:
839 else:
835 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
840 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
836
841
837 def signal_kernel(self, signum):
842 def signal_kernel(self, signum):
838 """ Sends a signal to the kernel. Note that since only SIGTERM is
843 """ Sends a signal to the kernel. Note that since only SIGTERM is
839 supported on Windows, this function is only useful on Unix systems.
844 supported on Windows, this function is only useful on Unix systems.
840 """
845 """
841 if self.has_kernel:
846 if self.has_kernel:
842 self.kernel.send_signal(signum)
847 self.kernel.send_signal(signum)
843 else:
848 else:
844 raise RuntimeError("Cannot signal kernel. No kernel is running!")
849 raise RuntimeError("Cannot signal kernel. No kernel is running!")
845
850
846 @property
851 @property
847 def is_alive(self):
852 def is_alive(self):
848 """Is the kernel process still running?"""
853 """Is the kernel process still running?"""
849 # FIXME: not using a heartbeat means this method is broken for any
854 # FIXME: not using a heartbeat means this method is broken for any
850 # remote kernel, it's only capable of handling local kernels.
855 # remote kernel, it's only capable of handling local kernels.
851 if self.has_kernel:
856 if self.has_kernel:
852 if self.kernel.poll() is None:
857 if self.kernel.poll() is None:
853 return True
858 return True
854 else:
859 else:
855 return False
860 return False
856 else:
861 else:
857 # We didn't start the kernel with this KernelManager so we don't
862 # We didn't start the kernel with this KernelManager so we don't
858 # know if it is running. We should use a heartbeat for this case.
863 # know if it is running. We should use a heartbeat for this case.
859 return True
864 return True
860
865
861 #--------------------------------------------------------------------------
866 #--------------------------------------------------------------------------
862 # Channels used for communication with the kernel:
867 # Channels used for communication with the kernel:
863 #--------------------------------------------------------------------------
868 #--------------------------------------------------------------------------
864
869
865 @property
870 @property
866 def xreq_channel(self):
871 def xreq_channel(self):
867 """Get the REQ socket channel object to make requests of the kernel."""
872 """Get the REQ socket channel object to make requests of the kernel."""
868 if self._xreq_channel is None:
873 if self._xreq_channel is None:
869 self._xreq_channel = self.xreq_channel_class(self.context,
874 self._xreq_channel = self.xreq_channel_class(self.context,
870 self.session,
875 self.session,
871 self.xreq_address)
876 self.xreq_address)
872 return self._xreq_channel
877 return self._xreq_channel
873
878
874 @property
879 @property
875 def sub_channel(self):
880 def sub_channel(self):
876 """Get the SUB socket channel object."""
881 """Get the SUB socket channel object."""
877 if self._sub_channel is None:
882 if self._sub_channel is None:
878 self._sub_channel = self.sub_channel_class(self.context,
883 self._sub_channel = self.sub_channel_class(self.context,
879 self.session,
884 self.session,
880 self.sub_address)
885 self.sub_address)
881 return self._sub_channel
886 return self._sub_channel
882
887
883 @property
888 @property
884 def rep_channel(self):
889 def rep_channel(self):
885 """Get the REP socket channel object to handle stdin (raw_input)."""
890 """Get the REP socket channel object to handle stdin (raw_input)."""
886 if self._rep_channel is None:
891 if self._rep_channel is None:
887 self._rep_channel = self.rep_channel_class(self.context,
892 self._rep_channel = self.rep_channel_class(self.context,
888 self.session,
893 self.session,
889 self.rep_address)
894 self.rep_address)
890 return self._rep_channel
895 return self._rep_channel
891
896
892 @property
897 @property
893 def hb_channel(self):
898 def hb_channel(self):
894 """Get the REP socket channel object to handle stdin (raw_input)."""
899 """Get the REP socket channel object to handle stdin (raw_input)."""
895 if self._hb_channel is None:
900 if self._hb_channel is None:
896 self._hb_channel = self.hb_channel_class(self.context,
901 self._hb_channel = self.hb_channel_class(self.context,
897 self.session,
902 self.session,
898 self.hb_address)
903 self.hb_address)
899 return self._hb_channel
904 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now