##// END OF EJS Templates
Add message to make it easier to connect a new client to a running kernel
Fernando Perez -
Show More
@@ -1,253 +1,259 b''
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3 """
4 4
5 5 # Standard library imports.
6 6 import atexit
7 7 import os
8 8 import socket
9 9 from subprocess import Popen, PIPE
10 10 import sys
11 11
12 12 # System library imports.
13 13 import zmq
14 14
15 15 # Local imports.
16 16 from IPython.core.ultratb import FormattedTB
17 17 from IPython.external.argparse import ArgumentParser
18 18 from IPython.utils import io
19 19 from displayhook import DisplayHook
20 20 from heartbeat import Heartbeat
21 21 from iostream import OutStream
22 22 from parentpoller import ParentPollerUnix, ParentPollerWindows
23 23 from session import Session
24 24
25 25 def bind_port(socket, ip, port):
26 26 """ Binds the specified ZMQ socket. If the port is zero, a random port is
27 27 chosen. Returns the port that was bound.
28 28 """
29 29 connection = 'tcp://%s' % ip
30 30 if port <= 0:
31 31 port = socket.bind_to_random_port(connection)
32 32 else:
33 33 connection += ':%i' % port
34 34 socket.bind(connection)
35 35 return port
36 36
37 37
38 38 def make_argument_parser():
39 39 """ Creates an ArgumentParser for the generic arguments supported by all
40 40 kernel entry points.
41 41 """
42 42 parser = ArgumentParser()
43 43 parser.add_argument('--ip', type=str, default='127.0.0.1',
44 44 help='set the kernel\'s IP address [default: local]')
45 45 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
46 46 help='set the XREP channel port [default: random]')
47 47 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
48 48 help='set the PUB channel port [default: random]')
49 49 parser.add_argument('--req', type=int, metavar='PORT', default=0,
50 50 help='set the REQ channel port [default: random]')
51 51 parser.add_argument('--hb', type=int, metavar='PORT', default=0,
52 52 help='set the heartbeat port [default: random]')
53 53
54 54 if sys.platform == 'win32':
55 55 parser.add_argument('--interrupt', type=int, metavar='HANDLE',
56 56 default=0, help='interrupt this process when '
57 57 'HANDLE is signaled')
58 58 parser.add_argument('--parent', type=int, metavar='HANDLE',
59 59 default=0, help='kill this process if the process '
60 60 'with HANDLE dies')
61 61 else:
62 62 parser.add_argument('--parent', action='store_true',
63 63 help='kill this process if its parent dies')
64 64
65 65 return parser
66 66
67 67
68 68 def make_kernel(namespace, kernel_factory,
69 69 out_stream_factory=None, display_hook_factory=None):
70 70 """ Creates a kernel, redirects stdout/stderr, and installs a display hook
71 71 and exception handler.
72 72 """
73 73 # If running under pythonw.exe, the interpreter will crash if more than 4KB
74 74 # of data is written to stdout or stderr. This is a bug that has been with
75 75 # Python for a very long time; see http://bugs.python.org/issue706263.
76 76 if sys.executable.endswith('pythonw.exe'):
77 77 blackhole = file(os.devnull, 'w')
78 78 sys.stdout = sys.stderr = blackhole
79 79 sys.__stdout__ = sys.__stderr__ = blackhole
80 80
81 81 # Install minimal exception handling
82 82 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
83 83 ostream=sys.__stdout__)
84 84
85 85 # Create a context, a session, and the kernel sockets.
86 86 io.raw_print("Starting the kernel at pid:", os.getpid())
87 87 context = zmq.Context()
88 88 # Uncomment this to try closing the context.
89 89 # atexit.register(context.close)
90 90 session = Session(username=u'kernel')
91 91
92 92 reply_socket = context.socket(zmq.XREP)
93 93 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
94 94 io.raw_print("XREP Channel on port", xrep_port)
95 95
96 96 pub_socket = context.socket(zmq.PUB)
97 97 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
98 98 io.raw_print("PUB Channel on port", pub_port)
99 99
100 100 req_socket = context.socket(zmq.XREQ)
101 101 req_port = bind_port(req_socket, namespace.ip, namespace.req)
102 102 io.raw_print("REQ Channel on port", req_port)
103 103
104 104 hb = Heartbeat(context, (namespace.ip, namespace.hb))
105 105 hb.start()
106 106 hb_port = hb.port
107 107 io.raw_print("Heartbeat REP Channel on port", hb_port)
108 108
109 # Helper to make it easier to connect to an existing kernel, until we have
110 # single-port connection negotiation fully implemented.
111 io.raw_print("To connect another client to this kernel, use:")
112 io.raw_print("-e --xreq {0} --sub {1} --rep {2} --hb {3}".format(
113 xrep_port, pub_port, req_port, hb_port))
114
109 115 # Redirect input streams and set a display hook.
110 116 if out_stream_factory:
111 117 sys.stdout = out_stream_factory(session, pub_socket, u'stdout')
112 118 sys.stderr = out_stream_factory(session, pub_socket, u'stderr')
113 119 if display_hook_factory:
114 120 sys.displayhook = display_hook_factory(session, pub_socket)
115 121
116 122 # Create the kernel.
117 123 kernel = kernel_factory(session=session, reply_socket=reply_socket,
118 124 pub_socket=pub_socket, req_socket=req_socket)
119 125 kernel.record_ports(xrep_port=xrep_port, pub_port=pub_port,
120 126 req_port=req_port, hb_port=hb_port)
121 127 return kernel
122 128
123 129
124 130 def start_kernel(namespace, kernel):
125 131 """ Starts a kernel.
126 132 """
127 133 # Configure this kernel process to poll the parent process, if necessary.
128 134 if sys.platform == 'win32':
129 135 if namespace.interrupt or namespace.parent:
130 136 poller = ParentPollerWindows(namespace.interrupt, namespace.parent)
131 137 poller.start()
132 138 elif namespace.parent:
133 139 poller = ParentPollerUnix()
134 140 poller.start()
135 141
136 142 # Start the kernel mainloop.
137 143 kernel.start()
138 144
139 145
140 146 def make_default_main(kernel_factory):
141 147 """ Creates the simplest possible kernel entry point.
142 148 """
143 149 def main():
144 150 namespace = make_argument_parser().parse_args()
145 151 kernel = make_kernel(namespace, kernel_factory, OutStream, DisplayHook)
146 152 start_kernel(namespace, kernel)
147 153 return main
148 154
149 155
150 156 def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
151 157 independent=False, extra_arguments=[]):
152 158 """ Launches a localhost kernel, binding to the specified ports.
153 159
154 160 Parameters
155 161 ----------
156 162 code : str,
157 163 A string of Python code that imports and executes a kernel entry point.
158 164
159 165 xrep_port : int, optional
160 166 The port to use for XREP channel.
161 167
162 168 pub_port : int, optional
163 169 The port to use for the SUB channel.
164 170
165 171 req_port : int, optional
166 172 The port to use for the REQ (raw input) channel.
167 173
168 174 hb_port : int, optional
169 175 The port to use for the hearbeat REP channel.
170 176
171 177 independent : bool, optional (default False)
172 178 If set, the kernel process is guaranteed to survive if this process
173 179 dies. If not set, an effort is made to ensure that the kernel is killed
174 180 when this process dies. Note that in this case it is still good practice
175 181 to kill kernels manually before exiting.
176 182
177 183 extra_arguments = list, optional
178 184 A list of extra arguments to pass when executing the launch code.
179 185
180 186 Returns
181 187 -------
182 188 A tuple of form:
183 189 (kernel_process, xrep_port, pub_port, req_port)
184 190 where kernel_process is a Popen object and the ports are integers.
185 191 """
186 192 # Find open ports as necessary.
187 193 ports = []
188 194 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + \
189 195 int(req_port <= 0) + int(hb_port <= 0)
190 196 for i in xrange(ports_needed):
191 197 sock = socket.socket()
192 198 sock.bind(('', 0))
193 199 ports.append(sock)
194 200 for i, sock in enumerate(ports):
195 201 port = sock.getsockname()[1]
196 202 sock.close()
197 203 ports[i] = port
198 204 if xrep_port <= 0:
199 205 xrep_port = ports.pop(0)
200 206 if pub_port <= 0:
201 207 pub_port = ports.pop(0)
202 208 if req_port <= 0:
203 209 req_port = ports.pop(0)
204 210 if hb_port <= 0:
205 211 hb_port = ports.pop(0)
206 212
207 213 # Build the kernel launch command.
208 214 arguments = [ sys.executable, '-c', code, '--xrep', str(xrep_port),
209 215 '--pub', str(pub_port), '--req', str(req_port),
210 216 '--hb', str(hb_port) ]
211 217 arguments.extend(extra_arguments)
212 218
213 219 # Spawn a kernel.
214 220 if sys.platform == 'win32':
215 221 # Create a Win32 event for interrupting the kernel.
216 222 interrupt_event = ParentPollerWindows.create_interrupt_event()
217 223 arguments += [ '--interrupt', str(int(interrupt_event)) ]
218 224
219 225 # If using pythonw, stdin, stdout, and stderr are invalid. Popen will
220 226 # fail unless they are suitably redirected. We don't read from the
221 227 # pipes, but they must exist.
222 228 redirect = PIPE if sys.executable.endswith('pythonw.exe') else None
223 229
224 230 if independent:
225 231 proc = Popen(arguments,
226 232 creationflags=512, # CREATE_NEW_PROCESS_GROUP
227 233 stdout=redirect, stderr=redirect, stdin=redirect)
228 234 else:
229 235 from _subprocess import DuplicateHandle, GetCurrentProcess, \
230 236 DUPLICATE_SAME_ACCESS
231 237 pid = GetCurrentProcess()
232 238 handle = DuplicateHandle(pid, pid, pid, 0,
233 239 True, # Inheritable by new processes.
234 240 DUPLICATE_SAME_ACCESS)
235 241 proc = Popen(arguments + ['--parent', str(int(handle))],
236 242 stdout=redirect, stderr=redirect, stdin=redirect)
237 243
238 244 # Attach the interrupt event to the Popen objet so it can be used later.
239 245 proc.win32_interrupt_event = interrupt_event
240 246
241 247 # Clean up pipes created to work around Popen bug.
242 248 if redirect is not None:
243 249 proc.stdout.close()
244 250 proc.stderr.close()
245 251 proc.stdin.close()
246 252
247 253 else:
248 254 if independent:
249 255 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
250 256 else:
251 257 proc = Popen(arguments + ['--parent'])
252 258
253 259 return proc, xrep_port, pub_port, req_port, hb_port
General Comments 0
You need to be logged in to leave comments. Login now