##// END OF EJS Templates
remove remaining references to deprecated XREP/XREQ names...
MinRK -
Show More
@@ -1,356 +1,356 b''
1 1 """ A minimal application base mixin for all ZMQ based IPython frontends.
2 2
3 3 This is not a complete console app, as subprocess will not be able to receive
4 4 input, there is no real readline support, among other limitations. This is a
5 5 refactoring of what used to be the IPython/frontend/qt/console/qtconsoleapp.py
6 6
7 7 Authors:
8 8
9 9 * Evan Patterson
10 10 * Min RK
11 11 * Erik Tollerud
12 12 * Fernando Perez
13 13 * Bussonnier Matthias
14 14 * Thomas Kluyver
15 15 * Paul Ivanov
16 16
17 17 """
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22
23 23 # stdlib imports
24 24 import atexit
25 25 import json
26 26 import os
27 27 import signal
28 28 import sys
29 29 import uuid
30 30
31 31
32 32 # Local imports
33 33 from IPython.config.application import boolean_flag
34 34 from IPython.config.configurable import Configurable
35 35 from IPython.core.profiledir import ProfileDir
36 36 from IPython.lib.kernel import tunnel_to_kernel, find_connection_file, swallow_argv
37 37 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
38 38 from IPython.utils.path import filefind
39 39 from IPython.utils.py3compat import str_to_bytes
40 40 from IPython.utils.traitlets import (
41 41 Dict, List, Unicode, CUnicode, Int, CBool, Any
42 42 )
43 43 from IPython.zmq.ipkernel import (
44 44 flags as ipkernel_flags,
45 45 aliases as ipkernel_aliases,
46 46 IPKernelApp
47 47 )
48 48 from IPython.zmq.session import Session, default_secure
49 49 from IPython.zmq.zmqshell import ZMQInteractiveShell
50 50
51 51 #-----------------------------------------------------------------------------
52 52 # Network Constants
53 53 #-----------------------------------------------------------------------------
54 54
55 55 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
56 56
57 57 #-----------------------------------------------------------------------------
58 58 # Globals
59 59 #-----------------------------------------------------------------------------
60 60
61 61
62 62 #-----------------------------------------------------------------------------
63 63 # Aliases and Flags
64 64 #-----------------------------------------------------------------------------
65 65
66 66 flags = dict(ipkernel_flags)
67 67
68 68 # the flags that are specific to the frontend
69 69 # these must be scrubbed before being passed to the kernel,
70 70 # or it will raise an error on unrecognized flags
71 71 app_flags = {
72 72 'existing' : ({'IPythonConsoleApp' : {'existing' : 'kernel*.json'}},
73 73 "Connect to an existing kernel. If no argument specified, guess most recent"),
74 74 }
75 75 app_flags.update(boolean_flag(
76 76 'confirm-exit', 'IPythonConsoleApp.confirm_exit',
77 77 """Set to display confirmation dialog on exit. You can always use 'exit' or 'quit',
78 78 to force a direct exit without any confirmation.
79 79 """,
80 80 """Don't prompt the user when exiting. This will terminate the kernel
81 81 if it is owned by the frontend, and leave it alive if it is external.
82 82 """
83 83 ))
84 84 flags.update(app_flags)
85 85
86 86 aliases = dict(ipkernel_aliases)
87 87
88 88 # also scrub aliases from the frontend
89 89 app_aliases = dict(
90 90 hb = 'IPythonConsoleApp.hb_port',
91 91 shell = 'IPythonConsoleApp.shell_port',
92 92 iopub = 'IPythonConsoleApp.iopub_port',
93 93 stdin = 'IPythonConsoleApp.stdin_port',
94 94 ip = 'IPythonConsoleApp.ip',
95 95 existing = 'IPythonConsoleApp.existing',
96 96 f = 'IPythonConsoleApp.connection_file',
97 97
98 98
99 99 ssh = 'IPythonConsoleApp.sshserver',
100 100 )
101 101 aliases.update(app_aliases)
102 102
103 103 #-----------------------------------------------------------------------------
104 104 # Classes
105 105 #-----------------------------------------------------------------------------
106 106
107 107 #-----------------------------------------------------------------------------
108 108 # IPythonConsole
109 109 #-----------------------------------------------------------------------------
110 110
111 111 classes = [IPKernelApp, ZMQInteractiveShell, ProfileDir, Session]
112 112
113 113 try:
114 114 from IPython.zmq.pylab.backend_inline import InlineBackend
115 115 except ImportError:
116 116 pass
117 117 else:
118 118 classes.append(InlineBackend)
119 119
120 120 class IPythonConsoleApp(Configurable):
121 121 name = 'ipython-console-mixin'
122 122 default_config_file_name='ipython_config.py'
123 123
124 124 description = """
125 125 The IPython Mixin Console.
126 126
127 127 This class contains the common portions of console client (QtConsole,
128 128 ZMQ-based terminal console, etc). It is not a full console, in that
129 129 launched terminal subprocesses will not be able to accept input.
130 130
131 131 The Console using this mixing supports various extra features beyond
132 132 the single-process Terminal IPython shell, such as connecting to
133 133 existing kernel, via:
134 134
135 135 ipython <appname> --existing
136 136
137 137 as well as tunnel via SSH
138 138
139 139 """
140 140
141 141 classes = classes
142 142 flags = Dict(flags)
143 143 aliases = Dict(aliases)
144 144 kernel_manager_class = BlockingKernelManager
145 145
146 146 kernel_argv = List(Unicode)
147 147 # frontend flags&aliases to be stripped when building kernel_argv
148 148 frontend_flags = Any(app_flags)
149 149 frontend_aliases = Any(app_aliases)
150 150
151 151 # create requested profiles by default, if they don't exist:
152 152 auto_create = CBool(True)
153 153 # connection info:
154 154 ip = Unicode(LOCALHOST, config=True,
155 155 help="""Set the kernel\'s IP address [default localhost].
156 156 If the IP address is something other than localhost, then
157 157 Consoles on other machines will be able to connect
158 158 to the Kernel, so be careful!"""
159 159 )
160 160
161 161 sshserver = Unicode('', config=True,
162 162 help="""The SSH server to use to connect to the kernel.""")
163 163 sshkey = Unicode('', config=True,
164 164 help="""Path to the ssh key to use for logging in to the ssh server.""")
165 165
166 166 hb_port = Int(0, config=True,
167 167 help="set the heartbeat port [default: random]")
168 168 shell_port = Int(0, config=True,
169 help="set the shell (XREP) port [default: random]")
169 help="set the shell (ROUTER) port [default: random]")
170 170 iopub_port = Int(0, config=True,
171 171 help="set the iopub (PUB) port [default: random]")
172 172 stdin_port = Int(0, config=True,
173 help="set the stdin (XREQ) port [default: random]")
173 help="set the stdin (DEALER) port [default: random]")
174 174 connection_file = Unicode('', config=True,
175 175 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
176 176
177 177 This file will contain the IP, ports, and authentication key needed to connect
178 178 clients to this kernel. By default, this file will be created in the security-dir
179 179 of the current profile, but can be specified by absolute path.
180 180 """)
181 181 def _connection_file_default(self):
182 182 return 'kernel-%i.json' % os.getpid()
183 183
184 184 existing = CUnicode('', config=True,
185 185 help="""Connect to an already running kernel""")
186 186
187 187 confirm_exit = CBool(True, config=True,
188 188 help="""
189 189 Set to display confirmation dialog on exit. You can always use 'exit' or 'quit',
190 190 to force a direct exit without any confirmation.""",
191 191 )
192 192
193 193
194 194 def build_kernel_argv(self, argv=None):
195 195 """build argv to be passed to kernel subprocess"""
196 196 if argv is None:
197 197 argv = sys.argv[1:]
198 198 self.kernel_argv = swallow_argv(argv, self.frontend_aliases, self.frontend_flags)
199 199 # kernel should inherit default config file from frontend
200 200 self.kernel_argv.append("--KernelApp.parent_appname='%s'"%self.name)
201 201
202 202 def init_connection_file(self):
203 203 """find the connection file, and load the info if found.
204 204
205 205 The current working directory and the current profile's security
206 206 directory will be searched for the file if it is not given by
207 207 absolute path.
208 208
209 209 When attempting to connect to an existing kernel and the `--existing`
210 210 argument does not match an existing file, it will be interpreted as a
211 211 fileglob, and the matching file in the current profile's security dir
212 212 with the latest access time will be used.
213 213
214 214 After this method is called, self.connection_file contains the *full path*
215 215 to the connection file, never just its name.
216 216 """
217 217 if self.existing:
218 218 try:
219 219 cf = find_connection_file(self.existing)
220 220 except Exception:
221 221 self.log.critical("Could not find existing kernel connection file %s", self.existing)
222 222 self.exit(1)
223 223 self.log.info("Connecting to existing kernel: %s" % cf)
224 224 self.connection_file = cf
225 225 else:
226 226 # not existing, check if we are going to write the file
227 227 # and ensure that self.connection_file is a full path, not just the shortname
228 228 try:
229 229 cf = find_connection_file(self.connection_file)
230 230 except Exception:
231 231 # file might not exist
232 232 if self.connection_file == os.path.basename(self.connection_file):
233 233 # just shortname, put it in security dir
234 234 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
235 235 else:
236 236 cf = self.connection_file
237 237 self.connection_file = cf
238 238
239 239 # should load_connection_file only be used for existing?
240 240 # as it is now, this allows reusing ports if an existing
241 241 # file is requested
242 242 try:
243 243 self.load_connection_file()
244 244 except Exception:
245 245 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
246 246 self.exit(1)
247 247
248 248 def load_connection_file(self):
249 249 """load ip/port/hmac config from JSON connection file"""
250 250 # this is identical to KernelApp.load_connection_file
251 251 # perhaps it can be centralized somewhere?
252 252 try:
253 253 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
254 254 except IOError:
255 255 self.log.debug("Connection File not found: %s", self.connection_file)
256 256 return
257 257 self.log.debug(u"Loading connection file %s", fname)
258 258 with open(fname) as f:
259 259 s = f.read()
260 260 cfg = json.loads(s)
261 261 if self.ip == LOCALHOST and 'ip' in cfg:
262 262 # not overridden by config or cl_args
263 263 self.ip = cfg['ip']
264 264 for channel in ('hb', 'shell', 'iopub', 'stdin'):
265 265 name = channel + '_port'
266 266 if getattr(self, name) == 0 and name in cfg:
267 267 # not overridden by config or cl_args
268 268 setattr(self, name, cfg[name])
269 269 if 'key' in cfg:
270 270 self.config.Session.key = str_to_bytes(cfg['key'])
271 271
272 272 def init_ssh(self):
273 273 """set up ssh tunnels, if needed."""
274 274 if not self.sshserver and not self.sshkey:
275 275 return
276 276
277 277 if self.sshkey and not self.sshserver:
278 278 # specifying just the key implies that we are connecting directly
279 279 self.sshserver = self.ip
280 280 self.ip = LOCALHOST
281 281
282 282 # build connection dict for tunnels:
283 283 info = dict(ip=self.ip,
284 284 shell_port=self.shell_port,
285 285 iopub_port=self.iopub_port,
286 286 stdin_port=self.stdin_port,
287 287 hb_port=self.hb_port
288 288 )
289 289
290 290 self.log.info("Forwarding connections to %s via %s"%(self.ip, self.sshserver))
291 291
292 292 # tunnels return a new set of ports, which will be on localhost:
293 293 self.ip = LOCALHOST
294 294 try:
295 295 newports = tunnel_to_kernel(info, self.sshserver, self.sshkey)
296 296 except:
297 297 # even catch KeyboardInterrupt
298 298 self.log.error("Could not setup tunnels", exc_info=True)
299 299 self.exit(1)
300 300
301 301 self.shell_port, self.iopub_port, self.stdin_port, self.hb_port = newports
302 302
303 303 cf = self.connection_file
304 304 base,ext = os.path.splitext(cf)
305 305 base = os.path.basename(base)
306 306 self.connection_file = os.path.basename(base)+'-ssh'+ext
307 307 self.log.critical("To connect another client via this tunnel, use:")
308 308 self.log.critical("--existing %s" % self.connection_file)
309 309
310 310 def _new_connection_file(self):
311 311 cf = ''
312 312 while not cf:
313 313 # we don't need a 128b id to distinguish kernels, use more readable
314 314 # 48b node segment (12 hex chars). Users running more than 32k simultaneous
315 315 # kernels can subclass.
316 316 ident = str(uuid.uuid4()).split('-')[-1]
317 317 cf = os.path.join(self.profile_dir.security_dir, 'kernel-%s.json' % ident)
318 318 # only keep if it's actually new. Protect against unlikely collision
319 319 # in 48b random search space
320 320 cf = cf if not os.path.exists(cf) else ''
321 321 return cf
322 322
323 323 def init_kernel_manager(self):
324 324 # Don't let Qt or ZMQ swallow KeyboardInterupts.
325 325 signal.signal(signal.SIGINT, signal.SIG_DFL)
326 326
327 327 # Create a KernelManager and start a kernel.
328 328 self.kernel_manager = self.kernel_manager_class(
329 329 ip=self.ip,
330 330 shell_port=self.shell_port,
331 331 iopub_port=self.iopub_port,
332 332 stdin_port=self.stdin_port,
333 333 hb_port=self.hb_port,
334 334 connection_file=self.connection_file,
335 335 config=self.config,
336 336 )
337 337 # start the kernel
338 338 if not self.existing:
339 339 self.kernel_manager.start_kernel(extra_arguments=self.kernel_argv)
340 340 elif self.sshserver:
341 341 # ssh, write new connection file
342 342 self.kernel_manager.write_connection_file()
343 343 atexit.register(self.kernel_manager.cleanup_connection_file)
344 344 self.kernel_manager.start_channels()
345 345
346 346
347 347 def initialize(self, argv=None):
348 348 """
349 349 Classes which mix this class in should call:
350 350 IPythonConsoleApp.initialize(self,argv)
351 351 """
352 352 self.init_connection_file()
353 353 default_secure(self.config)
354 354 self.init_ssh()
355 355 self.init_kernel_manager()
356 356
@@ -1,324 +1,324 b''
1 1 """A kernel manager for multiple kernels.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import os
20 20 import signal
21 21 import sys
22 22 import uuid
23 23
24 24 import zmq
25 25 from zmq.eventloop.zmqstream import ZMQStream
26 26
27 27 from tornado import web
28 28
29 29 from IPython.config.configurable import LoggingConfigurable
30 30 from IPython.utils.importstring import import_item
31 31 from IPython.utils.traitlets import (
32 32 Instance, Dict, List, Unicode, Float, Integer, Any, DottedObjectName,
33 33 )
34 34 #-----------------------------------------------------------------------------
35 35 # Classes
36 36 #-----------------------------------------------------------------------------
37 37
38 38 class DuplicateKernelError(Exception):
39 39 pass
40 40
41 41
42 42 class MultiKernelManager(LoggingConfigurable):
43 43 """A class for managing multiple kernels."""
44 44
45 45 kernel_manager_class = DottedObjectName(
46 46 "IPython.zmq.kernelmanager.KernelManager", config=True,
47 47 help="""The kernel manager class. This is configurable to allow
48 48 subclassing of the KernelManager for customized behavior.
49 49 """
50 50 )
51 51 def _kernel_manager_class_changed(self, name, old, new):
52 52 self.kernel_manager_factory = import_item(new)
53 53
54 54 kernel_manager_factory = Any(help="this is kernel_manager_class after import")
55 55 def _kernel_manager_factory_default(self):
56 56 return import_item(self.kernel_manager_class)
57 57
58 58 context = Instance('zmq.Context')
59 59 def _context_default(self):
60 60 return zmq.Context.instance()
61 61
62 62 connection_dir = Unicode('')
63 63
64 64 _kernels = Dict()
65 65
66 66 @property
67 67 def kernel_ids(self):
68 68 """Return a list of the kernel ids of the active kernels."""
69 69 return self._kernels.keys()
70 70
71 71 def __len__(self):
72 72 """Return the number of running kernels."""
73 73 return len(self.kernel_ids)
74 74
75 75 def __contains__(self, kernel_id):
76 76 if kernel_id in self.kernel_ids:
77 77 return True
78 78 else:
79 79 return False
80 80
81 81 def start_kernel(self, **kwargs):
82 82 """Start a new kernel."""
83 83 kernel_id = unicode(uuid.uuid4())
84 84 # use base KernelManager for each Kernel
85 85 km = self.kernel_manager_factory(connection_file=os.path.join(
86 86 self.connection_dir, "kernel-%s.json" % kernel_id),
87 87 config=self.config,
88 88 )
89 89 km.start_kernel(**kwargs)
90 90 self._kernels[kernel_id] = km
91 91 return kernel_id
92 92
93 93 def kill_kernel(self, kernel_id):
94 94 """Kill a kernel by its kernel uuid.
95 95
96 96 Parameters
97 97 ==========
98 98 kernel_id : uuid
99 99 The id of the kernel to kill.
100 100 """
101 101 self.get_kernel(kernel_id).kill_kernel()
102 102 del self._kernels[kernel_id]
103 103
104 104 def interrupt_kernel(self, kernel_id):
105 105 """Interrupt (SIGINT) the kernel by its uuid.
106 106
107 107 Parameters
108 108 ==========
109 109 kernel_id : uuid
110 110 The id of the kernel to interrupt.
111 111 """
112 112 return self.get_kernel(kernel_id).interrupt_kernel()
113 113
114 114 def signal_kernel(self, kernel_id, signum):
115 115 """ Sends a signal to the kernel by its uuid.
116 116
117 117 Note that since only SIGTERM is supported on Windows, this function
118 118 is only useful on Unix systems.
119 119
120 120 Parameters
121 121 ==========
122 122 kernel_id : uuid
123 123 The id of the kernel to signal.
124 124 """
125 125 return self.get_kernel(kernel_id).signal_kernel(signum)
126 126
127 127 def get_kernel(self, kernel_id):
128 128 """Get the single KernelManager object for a kernel by its uuid.
129 129
130 130 Parameters
131 131 ==========
132 132 kernel_id : uuid
133 133 The id of the kernel.
134 134 """
135 135 km = self._kernels.get(kernel_id)
136 136 if km is not None:
137 137 return km
138 138 else:
139 139 raise KeyError("Kernel with id not found: %s" % kernel_id)
140 140
141 141 def get_kernel_ports(self, kernel_id):
142 142 """Return a dictionary of ports for a kernel.
143 143
144 144 Parameters
145 145 ==========
146 146 kernel_id : uuid
147 147 The id of the kernel.
148 148
149 149 Returns
150 150 =======
151 151 port_dict : dict
152 152 A dict of key, value pairs where the keys are the names
153 153 (stdin_port,iopub_port,shell_port) and the values are the
154 154 integer port numbers for those channels.
155 155 """
156 156 # this will raise a KeyError if not found:
157 157 km = self.get_kernel(kernel_id)
158 158 return dict(shell_port=km.shell_port,
159 159 iopub_port=km.iopub_port,
160 160 stdin_port=km.stdin_port,
161 161 hb_port=km.hb_port,
162 162 )
163 163
164 164 def get_kernel_ip(self, kernel_id):
165 165 """Return ip address for a kernel.
166 166
167 167 Parameters
168 168 ==========
169 169 kernel_id : uuid
170 170 The id of the kernel.
171 171
172 172 Returns
173 173 =======
174 174 ip : str
175 175 The ip address of the kernel.
176 176 """
177 177 return self.get_kernel(kernel_id).ip
178 178
179 179 def create_connected_stream(self, ip, port, socket_type):
180 180 sock = self.context.socket(socket_type)
181 181 addr = "tcp://%s:%i" % (ip, port)
182 182 self.log.info("Connecting to: %s" % addr)
183 183 sock.connect(addr)
184 184 return ZMQStream(sock)
185 185
186 186 def create_iopub_stream(self, kernel_id):
187 187 ip = self.get_kernel_ip(kernel_id)
188 188 ports = self.get_kernel_ports(kernel_id)
189 189 iopub_stream = self.create_connected_stream(ip, ports['iopub_port'], zmq.SUB)
190 190 iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'')
191 191 return iopub_stream
192 192
193 193 def create_shell_stream(self, kernel_id):
194 194 ip = self.get_kernel_ip(kernel_id)
195 195 ports = self.get_kernel_ports(kernel_id)
196 shell_stream = self.create_connected_stream(ip, ports['shell_port'], zmq.XREQ)
196 shell_stream = self.create_connected_stream(ip, ports['shell_port'], zmq.DEALER)
197 197 return shell_stream
198 198
199 199 def create_hb_stream(self, kernel_id):
200 200 ip = self.get_kernel_ip(kernel_id)
201 201 ports = self.get_kernel_ports(kernel_id)
202 202 hb_stream = self.create_connected_stream(ip, ports['hb_port'], zmq.REQ)
203 203 return hb_stream
204 204
205 205
206 206 class MappingKernelManager(MultiKernelManager):
207 207 """A KernelManager that handles notebok mapping and HTTP error handling"""
208 208
209 209 kernel_argv = List(Unicode)
210 210
211 211 time_to_dead = Float(3.0, config=True, help="""Kernel heartbeat interval in seconds.""")
212 212 first_beat = Float(5.0, config=True, help="Delay (in seconds) before sending first heartbeat.")
213 213
214 214 max_msg_size = Integer(65536, config=True, help="""
215 215 The max raw message size accepted from the browser
216 216 over a WebSocket connection.
217 217 """)
218 218
219 219 _notebook_mapping = Dict()
220 220
221 221 #-------------------------------------------------------------------------
222 222 # Methods for managing kernels and sessions
223 223 #-------------------------------------------------------------------------
224 224
225 225 def kernel_for_notebook(self, notebook_id):
226 226 """Return the kernel_id for a notebook_id or None."""
227 227 return self._notebook_mapping.get(notebook_id)
228 228
229 229 def set_kernel_for_notebook(self, notebook_id, kernel_id):
230 230 """Associate a notebook with a kernel."""
231 231 if notebook_id is not None:
232 232 self._notebook_mapping[notebook_id] = kernel_id
233 233
234 234 def notebook_for_kernel(self, kernel_id):
235 235 """Return the notebook_id for a kernel_id or None."""
236 236 notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id]
237 237 if len(notebook_ids) == 1:
238 238 return notebook_ids[0]
239 239 else:
240 240 return None
241 241
242 242 def delete_mapping_for_kernel(self, kernel_id):
243 243 """Remove the kernel/notebook mapping for kernel_id."""
244 244 notebook_id = self.notebook_for_kernel(kernel_id)
245 245 if notebook_id is not None:
246 246 del self._notebook_mapping[notebook_id]
247 247
248 248 def start_kernel(self, notebook_id=None):
249 249 """Start a kernel for a notebok an return its kernel_id.
250 250
251 251 Parameters
252 252 ----------
253 253 notebook_id : uuid
254 254 The uuid of the notebook to associate the new kernel with. If this
255 255 is not None, this kernel will be persistent whenever the notebook
256 256 requests a kernel.
257 257 """
258 258 kernel_id = self.kernel_for_notebook(notebook_id)
259 259 if kernel_id is None:
260 260 kwargs = dict()
261 261 kwargs['extra_arguments'] = self.kernel_argv
262 262 kernel_id = super(MappingKernelManager, self).start_kernel(**kwargs)
263 263 self.set_kernel_for_notebook(notebook_id, kernel_id)
264 264 self.log.info("Kernel started: %s" % kernel_id)
265 265 self.log.debug("Kernel args: %r" % kwargs)
266 266 else:
267 267 self.log.info("Using existing kernel: %s" % kernel_id)
268 268 return kernel_id
269 269
270 270 def kill_kernel(self, kernel_id):
271 271 """Kill a kernel and remove its notebook association."""
272 272 self._check_kernel_id(kernel_id)
273 273 super(MappingKernelManager, self).kill_kernel(kernel_id)
274 274 self.delete_mapping_for_kernel(kernel_id)
275 275 self.log.info("Kernel killed: %s" % kernel_id)
276 276
277 277 def interrupt_kernel(self, kernel_id):
278 278 """Interrupt a kernel."""
279 279 self._check_kernel_id(kernel_id)
280 280 super(MappingKernelManager, self).interrupt_kernel(kernel_id)
281 281 self.log.info("Kernel interrupted: %s" % kernel_id)
282 282
283 283 def restart_kernel(self, kernel_id):
284 284 """Restart a kernel while keeping clients connected."""
285 285 self._check_kernel_id(kernel_id)
286 286 km = self.get_kernel(kernel_id)
287 287 km.restart_kernel(now=True)
288 288 self.log.info("Kernel restarted: %s" % kernel_id)
289 289 return kernel_id
290 290
291 291 # the following remains, in case the KM restart machinery is
292 292 # somehow unacceptable
293 293 # Get the notebook_id to preserve the kernel/notebook association.
294 294 notebook_id = self.notebook_for_kernel(kernel_id)
295 295 # Create the new kernel first so we can move the clients over.
296 296 new_kernel_id = self.start_kernel()
297 297 # Now kill the old kernel.
298 298 self.kill_kernel(kernel_id)
299 299 # Now save the new kernel/notebook association. We have to save it
300 300 # after the old kernel is killed as that will delete the mapping.
301 301 self.set_kernel_for_notebook(notebook_id, new_kernel_id)
302 302 self.log.info("Kernel restarted: %s" % new_kernel_id)
303 303 return new_kernel_id
304 304
305 305 def create_iopub_stream(self, kernel_id):
306 306 """Create a new iopub stream."""
307 307 self._check_kernel_id(kernel_id)
308 308 return super(MappingKernelManager, self).create_iopub_stream(kernel_id)
309 309
310 310 def create_shell_stream(self, kernel_id):
311 311 """Create a new shell stream."""
312 312 self._check_kernel_id(kernel_id)
313 313 return super(MappingKernelManager, self).create_shell_stream(kernel_id)
314 314
315 315 def create_hb_stream(self, kernel_id):
316 316 """Create a new hb stream."""
317 317 self._check_kernel_id(kernel_id)
318 318 return super(MappingKernelManager, self).create_hb_stream(kernel_id)
319 319
320 320 def _check_kernel_id(self, kernel_id):
321 321 """Check a that a kernel_id exists and raise 404 if not."""
322 322 if kernel_id not in self:
323 323 raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id)
324 324
@@ -1,96 +1,96 b''
1 1 """Unfinished code for ZMQ/HTTP bridging. We use WebSockets instead.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import json
20 20 import logging
21 21
22 22 from tornado import web
23 23
24 24 #-----------------------------------------------------------------------------
25 25 # Code
26 26 #-----------------------------------------------------------------------------
27 27
28 28 class ZMQHandler(web.RequestHandler):
29 29
30 30 def get_stream(self):
31 31 """Get the ZMQStream for this request."""
32 32 raise NotImplementedError('Implement get_stream() in a subclass.')
33 33
34 34 def _save_method_args(self, *args, **kwargs):
35 35 """Save the args and kwargs to get/post/put/delete for future use.
36 36
37 37 These arguments are not saved in the request or handler objects, but
38 38 are often needed by methods such as get_stream().
39 39 """
40 40 self._method_args = args
41 41 self._method_kwargs = kwargs
42 42
43 43 def _handle_msgs(self, msg):
44 44 msgs = [msg]
45 45 stream = self.get_stream()
46 46 stream.on_recv(lambda m: msgs.append(json.loads(m)))
47 47 stream.flush()
48 48 stream.stop_on_recv()
49 49 logging.info("Reply: %r" % msgs)
50 50 self.write(json.dumps(msgs))
51 51 self.finish()
52 52
53 53
54 54 class ZMQPubHandler(ZMQHandler):
55 55
56 56 SUPPORTED_METHODS = ("POST",)
57 57
58 58 def post(self, *args, **kwargs):
59 59 self._save_method_args(*args, **kwargs)
60 60 try:
61 61 msg = json.loads(self.request.body)
62 62 except:
63 63 self.send_error(status_code=415)
64 64 else:
65 65 logging.info("Request: %r" % msg)
66 66 self.get_stream().send_json(msg)
67 67
68 68
69 69 class ZMQSubHandler(ZMQHandler):
70 70
71 71 SUPPORTED_METHODS = ("GET",)
72 72
73 73 @web.asynchronous
74 74 def get(self, *args, **kwargs):
75 75 self._save_method_args(*args, **kwargs)
76 76 self.get_stream().on_recv(self._handle_msgs)
77 77
78 78
79 class ZMQXReqHandler(ZMQHandler):
79 class ZMQDealerHandler(ZMQHandler):
80 80
81 81 SUPPORTED_METHODS = ("POST",)
82 82
83 83 @web.asynchronous
84 84 def post(self, *args, **kwargs):
85 85 self._save_method_args(*args, **kwargs)
86 86 logging.info("request: %r" % self.request)
87 87 try:
88 88 msg = json.loads(self.request.body)
89 89 except:
90 90 self.send_error(status_code=415)
91 91 else:
92 92 logging.info("Reply: %r" % msg)
93 93 stream = self.get_stream()
94 94 stream.send_json(msg)
95 95 stream.on_recv(self._handle_msgs)
96 96
@@ -1,491 +1,491 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import json
27 27 import os
28 28 import socket
29 29 import stat
30 30 import sys
31 31
32 32 from multiprocessing import Process
33 33 from signal import signal, SIGINT, SIGABRT, SIGTERM
34 34
35 35 import zmq
36 36 from zmq.devices import ProcessMonitoredQueue
37 37 from zmq.log.handlers import PUBHandler
38 38
39 39 from IPython.core.profiledir import ProfileDir
40 40
41 41 from IPython.parallel.apps.baseapp import (
42 42 BaseParallelApplication,
43 43 base_aliases,
44 44 base_flags,
45 45 catch_config_error,
46 46 )
47 47 from IPython.utils.importstring import import_item
48 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
49 49
50 50 from IPython.zmq.session import (
51 51 Session, session_aliases, session_flags, default_secure
52 52 )
53 53
54 54 from IPython.parallel.controller.heartmonitor import HeartMonitor
55 55 from IPython.parallel.controller.hub import HubFactory
56 56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
57 57 from IPython.parallel.controller.sqlitedb import SQLiteDB
58 58
59 59 from IPython.parallel.util import split_url, disambiguate_url
60 60
61 61 # conditional import of MongoDB backend class
62 62
63 63 try:
64 64 from IPython.parallel.controller.mongodb import MongoDB
65 65 except ImportError:
66 66 maybe_mongo = []
67 67 else:
68 68 maybe_mongo = [MongoDB]
69 69
70 70
71 71 #-----------------------------------------------------------------------------
72 72 # Module level variables
73 73 #-----------------------------------------------------------------------------
74 74
75 75
76 76 #: The default config file name for this application
77 77 default_config_file_name = u'ipcontroller_config.py'
78 78
79 79
80 80 _description = """Start the IPython controller for parallel computing.
81 81
82 82 The IPython controller provides a gateway between the IPython engines and
83 83 clients. The controller needs to be started before the engines and can be
84 84 configured using command line options or using a cluster directory. Cluster
85 85 directories contain config, log and security files and are usually located in
86 86 your ipython directory and named as "profile_name". See the `profile`
87 87 and `profile-dir` options for details.
88 88 """
89 89
90 90 _examples = """
91 91 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
92 92 ipcontroller --scheme=pure # use the pure zeromq scheduler
93 93 """
94 94
95 95
96 96 #-----------------------------------------------------------------------------
97 97 # The main application
98 98 #-----------------------------------------------------------------------------
99 99 flags = {}
100 100 flags.update(base_flags)
101 101 flags.update({
102 102 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
103 103 'Use threads instead of processes for the schedulers'),
104 104 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
105 105 'use the SQLiteDB backend'),
106 106 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
107 107 'use the MongoDB backend'),
108 108 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
109 109 'use the in-memory DictDB backend'),
110 110 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
111 111 """use dummy DB backend, which doesn't store any information.
112 112
113 113 This can be used to prevent growth of the memory footprint of the Hub
114 114 in cases where its record-keeping is not required. Requesting results
115 115 of tasks submitted by other clients, db_queries, and task resubmission
116 116 will not be available."""),
117 117 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
118 118 'reuse existing json connection files')
119 119 })
120 120
121 121 flags.update(session_flags)
122 122
123 123 aliases = dict(
124 124 ssh = 'IPControllerApp.ssh_server',
125 125 enginessh = 'IPControllerApp.engine_ssh_server',
126 126 location = 'IPControllerApp.location',
127 127
128 128 url = 'HubFactory.url',
129 129 ip = 'HubFactory.ip',
130 130 transport = 'HubFactory.transport',
131 131 port = 'HubFactory.regport',
132 132
133 133 ping = 'HeartMonitor.period',
134 134
135 135 scheme = 'TaskScheduler.scheme_name',
136 136 hwm = 'TaskScheduler.hwm',
137 137 )
138 138 aliases.update(base_aliases)
139 139 aliases.update(session_aliases)
140 140
141 141
142 142 class IPControllerApp(BaseParallelApplication):
143 143
144 144 name = u'ipcontroller'
145 145 description = _description
146 146 examples = _examples
147 147 config_file_name = Unicode(default_config_file_name)
148 148 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
149 149
150 150 # change default to True
151 151 auto_create = Bool(True, config=True,
152 152 help="""Whether to create profile dir if it doesn't exist.""")
153 153
154 154 reuse_files = Bool(False, config=True,
155 155 help="""Whether to reuse existing json connection files.
156 156 If False, connection files will be removed on a clean exit.
157 157 """
158 158 )
159 159 ssh_server = Unicode(u'', config=True,
160 160 help="""ssh url for clients to use when connecting to the Controller
161 161 processes. It should be of the form: [user@]server[:port]. The
162 162 Controller's listening addresses must be accessible from the ssh server""",
163 163 )
164 164 engine_ssh_server = Unicode(u'', config=True,
165 165 help="""ssh url for engines to use when connecting to the Controller
166 166 processes. It should be of the form: [user@]server[:port]. The
167 167 Controller's listening addresses must be accessible from the ssh server""",
168 168 )
169 169 location = Unicode(u'', config=True,
170 170 help="""The external IP or domain name of the Controller, used for disambiguating
171 171 engine and client connections.""",
172 172 )
173 173 import_statements = List([], config=True,
174 174 help="import statements to be run at startup. Necessary in some environments"
175 175 )
176 176
177 177 use_threads = Bool(False, config=True,
178 178 help='Use threads instead of processes for the schedulers',
179 179 )
180 180
181 181 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
182 182 help="JSON filename where engine connection info will be stored.")
183 183 client_json_file = Unicode('ipcontroller-client.json', config=True,
184 184 help="JSON filename where client connection info will be stored.")
185 185
186 186 def _cluster_id_changed(self, name, old, new):
187 187 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
188 188 self.engine_json_file = "%s-engine.json" % self.name
189 189 self.client_json_file = "%s-client.json" % self.name
190 190
191 191
192 192 # internal
193 193 children = List()
194 194 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
195 195
196 196 def _use_threads_changed(self, name, old, new):
197 197 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
198 198
199 199 write_connection_files = Bool(True,
200 200 help="""Whether to write connection files to disk.
201 201 True in all cases other than runs with `reuse_files=True` *after the first*
202 202 """
203 203 )
204 204
205 205 aliases = Dict(aliases)
206 206 flags = Dict(flags)
207 207
208 208
209 209 def save_connection_dict(self, fname, cdict):
210 210 """save a connection dict to json file."""
211 211 c = self.config
212 212 url = cdict['url']
213 213 location = cdict['location']
214 214 if not location:
215 215 try:
216 216 proto,ip,port = split_url(url)
217 217 except AssertionError:
218 218 pass
219 219 else:
220 220 try:
221 221 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
222 222 except (socket.gaierror, IndexError):
223 223 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
224 224 " You may need to specify '--location=<external_ip_address>' to help"
225 225 " IPython decide when to connect via loopback.")
226 226 location = '127.0.0.1'
227 227 cdict['location'] = location
228 228 fname = os.path.join(self.profile_dir.security_dir, fname)
229 229 self.log.info("writing connection info to %s", fname)
230 230 with open(fname, 'w') as f:
231 231 f.write(json.dumps(cdict, indent=2))
232 232 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
233 233
234 234 def load_config_from_json(self):
235 235 """load config from existing json connector files."""
236 236 c = self.config
237 237 self.log.debug("loading config from JSON")
238 238 # load from engine config
239 239 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
240 240 self.log.info("loading connection info from %s", fname)
241 241 with open(fname) as f:
242 242 cfg = json.loads(f.read())
243 243 key = cfg['exec_key']
244 244 # json gives unicode, Session.key wants bytes
245 245 c.Session.key = key.encode('ascii')
246 246 xport,addr = cfg['url'].split('://')
247 247 c.HubFactory.engine_transport = xport
248 248 ip,ports = addr.split(':')
249 249 c.HubFactory.engine_ip = ip
250 250 c.HubFactory.regport = int(ports)
251 251 self.location = cfg['location']
252 252 if not self.engine_ssh_server:
253 253 self.engine_ssh_server = cfg['ssh']
254 254 # load client config
255 255 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
256 256 self.log.info("loading connection info from %s", fname)
257 257 with open(fname) as f:
258 258 cfg = json.loads(f.read())
259 259 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
260 260 xport,addr = cfg['url'].split('://')
261 261 c.HubFactory.client_transport = xport
262 262 ip,ports = addr.split(':')
263 263 c.HubFactory.client_ip = ip
264 264 if not self.ssh_server:
265 265 self.ssh_server = cfg['ssh']
266 266 assert int(ports) == c.HubFactory.regport, "regport mismatch"
267 267
268 268 def cleanup_connection_files(self):
269 269 if self.reuse_files:
270 270 self.log.debug("leaving JSON connection files for reuse")
271 271 return
272 272 self.log.debug("cleaning up JSON connection files")
273 273 for f in (self.client_json_file, self.engine_json_file):
274 274 f = os.path.join(self.profile_dir.security_dir, f)
275 275 try:
276 276 os.remove(f)
277 277 except Exception as e:
278 278 self.log.error("Failed to cleanup connection file: %s", e)
279 279 else:
280 280 self.log.debug(u"removed %s", f)
281 281
282 282 def load_secondary_config(self):
283 283 """secondary config, loading from JSON and setting defaults"""
284 284 if self.reuse_files:
285 285 try:
286 286 self.load_config_from_json()
287 287 except (AssertionError,IOError) as e:
288 288 self.log.error("Could not load config from JSON: %s" % e)
289 289 else:
290 290 # successfully loaded config from JSON, and reuse=True
291 291 # no need to wite back the same file
292 292 self.write_connection_files = False
293 293
294 294 # switch Session.key default to secure
295 295 default_secure(self.config)
296 296 self.log.debug("Config changed")
297 297 self.log.debug(repr(self.config))
298 298
299 299 def init_hub(self):
300 300 c = self.config
301 301
302 302 self.do_import_statements()
303 303
304 304 try:
305 305 self.factory = HubFactory(config=c, log=self.log)
306 306 # self.start_logging()
307 307 self.factory.init_hub()
308 308 except TraitError:
309 309 raise
310 310 except Exception:
311 311 self.log.error("Couldn't construct the Controller", exc_info=True)
312 312 self.exit(1)
313 313
314 314 if self.write_connection_files:
315 315 # save to new json config files
316 316 f = self.factory
317 317 cdict = {'exec_key' : f.session.key.decode('ascii'),
318 318 'ssh' : self.ssh_server,
319 319 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
320 320 'location' : self.location
321 321 }
322 322 self.save_connection_dict(self.client_json_file, cdict)
323 323 edict = cdict
324 324 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
325 325 edict['ssh'] = self.engine_ssh_server
326 326 self.save_connection_dict(self.engine_json_file, edict)
327 327
328 328 def init_schedulers(self):
329 329 children = self.children
330 330 mq = import_item(str(self.mq_class))
331 331
332 332 hub = self.factory
333 333 # disambiguate url, in case of *
334 334 monitor_url = disambiguate_url(hub.monitor_url)
335 335 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
336 336 # IOPub relay (in a Process)
337 337 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
338 338 q.bind_in(hub.client_info['iopub'])
339 339 q.bind_out(hub.engine_info['iopub'])
340 340 q.setsockopt_out(zmq.SUBSCRIBE, b'')
341 341 q.connect_mon(monitor_url)
342 342 q.daemon=True
343 343 children.append(q)
344 344
345 345 # Multiplexer Queue (in a Process)
346 346 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
347 347 q.bind_in(hub.client_info['mux'])
348 348 q.setsockopt_in(zmq.IDENTITY, b'mux')
349 349 q.bind_out(hub.engine_info['mux'])
350 350 q.connect_mon(monitor_url)
351 351 q.daemon=True
352 352 children.append(q)
353 353
354 354 # Control Queue (in a Process)
355 355 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
356 356 q.bind_in(hub.client_info['control'])
357 357 q.setsockopt_in(zmq.IDENTITY, b'control')
358 358 q.bind_out(hub.engine_info['control'])
359 359 q.connect_mon(monitor_url)
360 360 q.daemon=True
361 361 children.append(q)
362 362 try:
363 363 scheme = self.config.TaskScheduler.scheme_name
364 364 except AttributeError:
365 365 scheme = TaskScheduler.scheme_name.get_default_value()
366 366 # Task Queue (in a Process)
367 367 if scheme == 'pure':
368 self.log.warn("task::using pure XREQ Task scheduler")
368 self.log.warn("task::using pure DEALER Task scheduler")
369 369 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
370 370 # q.setsockopt_out(zmq.HWM, hub.hwm)
371 371 q.bind_in(hub.client_info['task'][1])
372 372 q.setsockopt_in(zmq.IDENTITY, b'task')
373 373 q.bind_out(hub.engine_info['task'])
374 374 q.connect_mon(monitor_url)
375 375 q.daemon=True
376 376 children.append(q)
377 377 elif scheme == 'none':
378 378 self.log.warn("task::using no Task scheduler")
379 379
380 380 else:
381 381 self.log.info("task::using Python %s Task scheduler"%scheme)
382 382 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
383 383 monitor_url, disambiguate_url(hub.client_info['notification']))
384 384 kwargs = dict(logname='scheduler', loglevel=self.log_level,
385 385 log_url = self.log_url, config=dict(self.config))
386 386 if 'Process' in self.mq_class:
387 387 # run the Python scheduler in a Process
388 388 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
389 389 q.daemon=True
390 390 children.append(q)
391 391 else:
392 392 # single-threaded Controller
393 393 kwargs['in_thread'] = True
394 394 launch_scheduler(*sargs, **kwargs)
395 395
396 396 def terminate_children(self):
397 397 child_procs = []
398 398 for child in self.children:
399 399 if isinstance(child, ProcessMonitoredQueue):
400 400 child_procs.append(child.launcher)
401 401 elif isinstance(child, Process):
402 402 child_procs.append(child)
403 403 if child_procs:
404 404 self.log.critical("terminating children...")
405 405 for child in child_procs:
406 406 try:
407 407 child.terminate()
408 408 except OSError:
409 409 # already dead
410 410 pass
411 411
412 412 def handle_signal(self, sig, frame):
413 413 self.log.critical("Received signal %i, shutting down", sig)
414 414 self.terminate_children()
415 415 self.loop.stop()
416 416
417 417 def init_signal(self):
418 418 for sig in (SIGINT, SIGABRT, SIGTERM):
419 419 signal(sig, self.handle_signal)
420 420
421 421 def do_import_statements(self):
422 422 statements = self.import_statements
423 423 for s in statements:
424 424 try:
425 425 self.log.msg("Executing statement: '%s'" % s)
426 426 exec s in globals(), locals()
427 427 except:
428 428 self.log.msg("Error running statement: %s" % s)
429 429
430 430 def forward_logging(self):
431 431 if self.log_url:
432 432 self.log.info("Forwarding logging to %s"%self.log_url)
433 433 context = zmq.Context.instance()
434 434 lsock = context.socket(zmq.PUB)
435 435 lsock.connect(self.log_url)
436 436 handler = PUBHandler(lsock)
437 437 handler.root_topic = 'controller'
438 438 handler.setLevel(self.log_level)
439 439 self.log.addHandler(handler)
440 440
441 441 @catch_config_error
442 442 def initialize(self, argv=None):
443 443 super(IPControllerApp, self).initialize(argv)
444 444 self.forward_logging()
445 445 self.load_secondary_config()
446 446 self.init_hub()
447 447 self.init_schedulers()
448 448
449 449 def start(self):
450 450 # Start the subprocesses:
451 451 self.factory.start()
452 452 # children must be started before signals are setup,
453 453 # otherwise signal-handling will fire multiple times
454 454 for child in self.children:
455 455 child.start()
456 456 self.init_signal()
457 457
458 458 self.write_pid_file(overwrite=True)
459 459
460 460 try:
461 461 self.factory.loop.start()
462 462 except KeyboardInterrupt:
463 463 self.log.critical("Interrupted, Exiting...\n")
464 464 finally:
465 465 self.cleanup_connection_files()
466 466
467 467
468 468
469 469 def launch_new_instance():
470 470 """Create and run the IPython controller"""
471 471 if sys.platform == 'win32':
472 472 # make sure we don't get called from a multiprocessing subprocess
473 473 # this can result in infinite Controllers being started on Windows
474 474 # which doesn't have a proper fork, so multiprocessing is wonky
475 475
476 476 # this only comes up when IPython has been installed using vanilla
477 477 # setuptools, and *not* distribute.
478 478 import multiprocessing
479 479 p = multiprocessing.current_process()
480 480 # the main process has name 'MainProcess'
481 481 # subprocesses will have names like 'Process-1'
482 482 if p.name != 'MainProcess':
483 483 # we are a subprocess, don't start another Controller!
484 484 return
485 485 app = IPControllerApp.instance()
486 486 app.initialize()
487 487 app.start()
488 488
489 489
490 490 if __name__ == '__main__':
491 491 launch_new_instance()
@@ -1,182 +1,182 b''
1 1 #!/usr/bin/env python
2 2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
3 A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their DEALER identities.
5 5
6 6 Authors:
7 7
8 8 * Min RK
9 9 """
10 10 #-----------------------------------------------------------------------------
11 11 # Copyright (C) 2010-2011 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-----------------------------------------------------------------------------
16 16
17 17 from __future__ import print_function
18 18 import time
19 19 import uuid
20 20
21 21 import zmq
22 22 from zmq.devices import ThreadDevice
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.config.configurable import LoggingConfigurable
26 26 from IPython.utils.py3compat import str_to_bytes
27 27 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
28 28
29 29 from IPython.parallel.util import log_errors
30 30
31 31 class Heart(object):
32 32 """A basic heart object for responding to a HeartMonitor.
33 33 This is a simple wrapper with defaults for the most common
34 34 Device model for responding to heartbeats.
35 35
36 36 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
37 SUB/XREQ for in/out.
37 SUB/DEALER for in/out.
38 38
39 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 You can specify the DEALER's IDENTITY via the optional heart_id argument."""
40 40 device=None
41 41 id=None
42 42 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
43 43 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
44 44 # do not allow the device to share global Context.instance,
45 45 # which is the default behavior in pyzmq > 2.1.10
46 46 self.device.context_factory = zmq.Context
47 47
48 48 self.device.daemon=True
49 49 self.device.connect_in(in_addr)
50 50 self.device.connect_out(out_addr)
51 51 if in_type == zmq.SUB:
52 52 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
53 53 if heart_id is None:
54 54 heart_id = uuid.uuid4().bytes
55 55 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
56 56 self.id = heart_id
57 57
58 58 def start(self):
59 59 return self.device.start()
60 60
61 61
62 62 class HeartMonitor(LoggingConfigurable):
63 63 """A basic HeartMonitor class
64 64 pingstream: a PUB stream
65 pongstream: an XREP stream
65 pongstream: an ROUTER stream
66 66 period: the period of the heartbeat in milliseconds"""
67 67
68 68 period = Integer(3000, config=True,
69 69 help='The frequency at which the Hub pings the engines for heartbeats '
70 70 '(in ms)',
71 71 )
72 72
73 73 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
74 74 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
75 75 loop = Instance('zmq.eventloop.ioloop.IOLoop')
76 76 def _loop_default(self):
77 77 return ioloop.IOLoop.instance()
78 78
79 79 # not settable:
80 80 hearts=Set()
81 81 responses=Set()
82 82 on_probation=Set()
83 83 last_ping=CFloat(0)
84 84 _new_handlers = Set()
85 85 _failure_handlers = Set()
86 86 lifetime = CFloat(0)
87 87 tic = CFloat(0)
88 88
89 89 def __init__(self, **kwargs):
90 90 super(HeartMonitor, self).__init__(**kwargs)
91 91
92 92 self.pongstream.on_recv(self.handle_pong)
93 93
94 94 def start(self):
95 95 self.tic = time.time()
96 96 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
97 97 self.caller.start()
98 98
99 99 def add_new_heart_handler(self, handler):
100 100 """add a new handler for new hearts"""
101 101 self.log.debug("heartbeat::new_heart_handler: %s", handler)
102 102 self._new_handlers.add(handler)
103 103
104 104 def add_heart_failure_handler(self, handler):
105 105 """add a new handler for heart failure"""
106 106 self.log.debug("heartbeat::new heart failure handler: %s", handler)
107 107 self._failure_handlers.add(handler)
108 108
109 109 def beat(self):
110 110 self.pongstream.flush()
111 111 self.last_ping = self.lifetime
112 112
113 113 toc = time.time()
114 114 self.lifetime += toc-self.tic
115 115 self.tic = toc
116 116 self.log.debug("heartbeat::sending %s", self.lifetime)
117 117 goodhearts = self.hearts.intersection(self.responses)
118 118 missed_beats = self.hearts.difference(goodhearts)
119 119 heartfailures = self.on_probation.intersection(missed_beats)
120 120 newhearts = self.responses.difference(goodhearts)
121 121 map(self.handle_new_heart, newhearts)
122 122 map(self.handle_heart_failure, heartfailures)
123 123 self.on_probation = missed_beats.intersection(self.hearts)
124 124 self.responses = set()
125 125 # print self.on_probation, self.hearts
126 126 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
127 127 self.pingstream.send(str_to_bytes(str(self.lifetime)))
128 128 # flush stream to force immediate socket send
129 129 self.pingstream.flush()
130 130
131 131 def handle_new_heart(self, heart):
132 132 if self._new_handlers:
133 133 for handler in self._new_handlers:
134 134 handler(heart)
135 135 else:
136 136 self.log.info("heartbeat::yay, got new heart %s!", heart)
137 137 self.hearts.add(heart)
138 138
139 139 def handle_heart_failure(self, heart):
140 140 if self._failure_handlers:
141 141 for handler in self._failure_handlers:
142 142 try:
143 143 handler(heart)
144 144 except Exception as e:
145 145 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
146 146 pass
147 147 else:
148 148 self.log.info("heartbeat::Heart %s failed :(", heart)
149 149 self.hearts.remove(heart)
150 150
151 151
152 152 @log_errors
153 153 def handle_pong(self, msg):
154 154 "a heart just beat"
155 155 current = str_to_bytes(str(self.lifetime))
156 156 last = str_to_bytes(str(self.last_ping))
157 157 if msg[1] == current:
158 158 delta = time.time()-self.tic
159 159 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
160 160 self.responses.add(msg[0])
161 161 elif msg[1] == last:
162 162 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
163 163 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
164 164 self.responses.add(msg[0])
165 165 else:
166 166 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
167 167
168 168
169 169 if __name__ == '__main__':
170 170 loop = ioloop.IOLoop.instance()
171 171 context = zmq.Context()
172 172 pub = context.socket(zmq.PUB)
173 173 pub.bind('tcp://127.0.0.1:5555')
174 xrep = context.socket(zmq.ROUTER)
175 xrep.bind('tcp://127.0.0.1:5556')
174 router = context.socket(zmq.ROUTER)
175 router.bind('tcp://127.0.0.1:5556')
176 176
177 177 outstream = zmqstream.ZMQStream(pub, loop)
178 instream = zmqstream.ZMQStream(xrep, loop)
178 instream = zmqstream.ZMQStream(router, loop)
179 179
180 180 hb = HeartMonitor(loop, outstream, instream)
181 181
182 182 loop.start()
@@ -1,1314 +1,1314 b''
1 1 """The IPython Controller Hub with 0MQ
2 2 This is the master object that handles connections from engines and clients,
3 3 and monitors traffic through the various queues.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19 from __future__ import print_function
20 20
21 21 import sys
22 22 import time
23 23 from datetime import datetime
24 24
25 25 import zmq
26 26 from zmq.eventloop import ioloop
27 27 from zmq.eventloop.zmqstream import ZMQStream
28 28
29 29 # internal:
30 30 from IPython.utils.importstring import import_item
31 31 from IPython.utils.py3compat import cast_bytes
32 32 from IPython.utils.traitlets import (
33 33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
34 34 )
35 35
36 36 from IPython.parallel import error, util
37 37 from IPython.parallel.factory import RegistrationFactory
38 38
39 39 from IPython.zmq.session import SessionFactory
40 40
41 41 from .heartmonitor import HeartMonitor
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Code
45 45 #-----------------------------------------------------------------------------
46 46
47 47 def _passer(*args, **kwargs):
48 48 return
49 49
50 50 def _printer(*args, **kwargs):
51 51 print (args)
52 52 print (kwargs)
53 53
54 54 def empty_record():
55 55 """Return an empty dict with all record keys."""
56 56 return {
57 57 'msg_id' : None,
58 58 'header' : None,
59 59 'content': None,
60 60 'buffers': None,
61 61 'submitted': None,
62 62 'client_uuid' : None,
63 63 'engine_uuid' : None,
64 64 'started': None,
65 65 'completed': None,
66 66 'resubmitted': None,
67 67 'received': None,
68 68 'result_header' : None,
69 69 'result_content' : None,
70 70 'result_buffers' : None,
71 71 'queue' : None,
72 72 'pyin' : None,
73 73 'pyout': None,
74 74 'pyerr': None,
75 75 'stdout': '',
76 76 'stderr': '',
77 77 }
78 78
79 79 def init_record(msg):
80 80 """Initialize a TaskRecord based on a request."""
81 81 header = msg['header']
82 82 return {
83 83 'msg_id' : header['msg_id'],
84 84 'header' : header,
85 85 'content': msg['content'],
86 86 'buffers': msg['buffers'],
87 87 'submitted': header['date'],
88 88 'client_uuid' : None,
89 89 'engine_uuid' : None,
90 90 'started': None,
91 91 'completed': None,
92 92 'resubmitted': None,
93 93 'received': None,
94 94 'result_header' : None,
95 95 'result_content' : None,
96 96 'result_buffers' : None,
97 97 'queue' : None,
98 98 'pyin' : None,
99 99 'pyout': None,
100 100 'pyerr': None,
101 101 'stdout': '',
102 102 'stderr': '',
103 103 }
104 104
105 105
106 106 class EngineConnector(HasTraits):
107 107 """A simple object for accessing the various zmq connections of an object.
108 108 Attributes are:
109 109 id (int): engine ID
110 110 uuid (str): uuid (unused?)
111 queue (str): identity of queue's XREQ socket
112 registration (str): identity of registration XREQ socket
113 heartbeat (str): identity of heartbeat XREQ socket
111 queue (str): identity of queue's DEALER socket
112 registration (str): identity of registration DEALER socket
113 heartbeat (str): identity of heartbeat DEALER socket
114 114 """
115 115 id=Integer(0)
116 116 queue=CBytes()
117 117 control=CBytes()
118 118 registration=CBytes()
119 119 heartbeat=CBytes()
120 120 pending=Set()
121 121
122 122 class HubFactory(RegistrationFactory):
123 123 """The Configurable for setting up a Hub."""
124 124
125 125 # port-pairs for monitoredqueues:
126 126 hb = Tuple(Integer,Integer,config=True,
127 help="""XREQ/SUB Port pair for Engine heartbeats""")
127 help="""DEALER/SUB Port pair for Engine heartbeats""")
128 128 def _hb_default(self):
129 129 return tuple(util.select_random_ports(2))
130 130
131 131 mux = Tuple(Integer,Integer,config=True,
132 132 help="""Engine/Client Port pair for MUX queue""")
133 133
134 134 def _mux_default(self):
135 135 return tuple(util.select_random_ports(2))
136 136
137 137 task = Tuple(Integer,Integer,config=True,
138 138 help="""Engine/Client Port pair for Task queue""")
139 139 def _task_default(self):
140 140 return tuple(util.select_random_ports(2))
141 141
142 142 control = Tuple(Integer,Integer,config=True,
143 143 help="""Engine/Client Port pair for Control queue""")
144 144
145 145 def _control_default(self):
146 146 return tuple(util.select_random_ports(2))
147 147
148 148 iopub = Tuple(Integer,Integer,config=True,
149 149 help="""Engine/Client Port pair for IOPub relay""")
150 150
151 151 def _iopub_default(self):
152 152 return tuple(util.select_random_ports(2))
153 153
154 154 # single ports:
155 155 mon_port = Integer(config=True,
156 156 help="""Monitor (SUB) port for queue traffic""")
157 157
158 158 def _mon_port_default(self):
159 159 return util.select_random_ports(1)[0]
160 160
161 161 notifier_port = Integer(config=True,
162 162 help="""PUB port for sending engine status notifications""")
163 163
164 164 def _notifier_port_default(self):
165 165 return util.select_random_ports(1)[0]
166 166
167 167 engine_ip = Unicode('127.0.0.1', config=True,
168 168 help="IP on which to listen for engine connections. [default: loopback]")
169 169 engine_transport = Unicode('tcp', config=True,
170 170 help="0MQ transport for engine connections. [default: tcp]")
171 171
172 172 client_ip = Unicode('127.0.0.1', config=True,
173 173 help="IP on which to listen for client connections. [default: loopback]")
174 174 client_transport = Unicode('tcp', config=True,
175 175 help="0MQ transport for client connections. [default : tcp]")
176 176
177 177 monitor_ip = Unicode('127.0.0.1', config=True,
178 178 help="IP on which to listen for monitor messages. [default: loopback]")
179 179 monitor_transport = Unicode('tcp', config=True,
180 180 help="0MQ transport for monitor messages. [default : tcp]")
181 181
182 182 monitor_url = Unicode('')
183 183
184 184 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
185 185 config=True, help="""The class to use for the DB backend""")
186 186
187 187 # not configurable
188 188 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
189 189 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
190 190
191 191 def _ip_changed(self, name, old, new):
192 192 self.engine_ip = new
193 193 self.client_ip = new
194 194 self.monitor_ip = new
195 195 self._update_monitor_url()
196 196
197 197 def _update_monitor_url(self):
198 198 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
199 199
200 200 def _transport_changed(self, name, old, new):
201 201 self.engine_transport = new
202 202 self.client_transport = new
203 203 self.monitor_transport = new
204 204 self._update_monitor_url()
205 205
206 206 def __init__(self, **kwargs):
207 207 super(HubFactory, self).__init__(**kwargs)
208 208 self._update_monitor_url()
209 209
210 210
211 211 def construct(self):
212 212 self.init_hub()
213 213
214 214 def start(self):
215 215 self.heartmonitor.start()
216 216 self.log.info("Heartmonitor started")
217 217
218 218 def init_hub(self):
219 219 """construct"""
220 220 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
221 221 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
222 222
223 223 ctx = self.context
224 224 loop = self.loop
225 225
226 226 # Registrar socket
227 227 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
228 228 q.bind(client_iface % self.regport)
229 229 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
230 230 if self.client_ip != self.engine_ip:
231 231 q.bind(engine_iface % self.regport)
232 232 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
233 233
234 234 ### Engine connections ###
235 235
236 236 # heartbeat
237 237 hpub = ctx.socket(zmq.PUB)
238 238 hpub.bind(engine_iface % self.hb[0])
239 239 hrep = ctx.socket(zmq.ROUTER)
240 240 hrep.bind(engine_iface % self.hb[1])
241 241 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
242 242 pingstream=ZMQStream(hpub,loop),
243 243 pongstream=ZMQStream(hrep,loop)
244 244 )
245 245
246 246 ### Client connections ###
247 247 # Notifier socket
248 248 n = ZMQStream(ctx.socket(zmq.PUB), loop)
249 249 n.bind(client_iface%self.notifier_port)
250 250
251 251 ### build and launch the queues ###
252 252
253 253 # monitor socket
254 254 sub = ctx.socket(zmq.SUB)
255 255 sub.setsockopt(zmq.SUBSCRIBE, b"")
256 256 sub.bind(self.monitor_url)
257 257 sub.bind('inproc://monitor')
258 258 sub = ZMQStream(sub, loop)
259 259
260 260 # connect the db
261 261 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
262 262 # cdir = self.config.Global.cluster_dir
263 263 self.db = import_item(str(self.db_class))(session=self.session.session,
264 264 config=self.config, log=self.log)
265 265 time.sleep(.25)
266 266 try:
267 267 scheme = self.config.TaskScheduler.scheme_name
268 268 except AttributeError:
269 269 from .scheduler import TaskScheduler
270 270 scheme = TaskScheduler.scheme_name.get_default_value()
271 271 # build connection dicts
272 272 self.engine_info = {
273 273 'control' : engine_iface%self.control[1],
274 274 'mux': engine_iface%self.mux[1],
275 275 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
276 276 'task' : engine_iface%self.task[1],
277 277 'iopub' : engine_iface%self.iopub[1],
278 278 # 'monitor' : engine_iface%self.mon_port,
279 279 }
280 280
281 281 self.client_info = {
282 282 'control' : client_iface%self.control[0],
283 283 'mux': client_iface%self.mux[0],
284 284 'task' : (scheme, client_iface%self.task[0]),
285 285 'iopub' : client_iface%self.iopub[0],
286 286 'notification': client_iface%self.notifier_port
287 287 }
288 288 self.log.debug("Hub engine addrs: %s", self.engine_info)
289 289 self.log.debug("Hub client addrs: %s", self.client_info)
290 290
291 291 # resubmit stream
292 292 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
293 293 url = util.disambiguate_url(self.client_info['task'][-1])
294 294 r.setsockopt(zmq.IDENTITY, self.session.bsession)
295 295 r.connect(url)
296 296
297 297 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
298 298 query=q, notifier=n, resubmit=r, db=self.db,
299 299 engine_info=self.engine_info, client_info=self.client_info,
300 300 log=self.log)
301 301
302 302
303 303 class Hub(SessionFactory):
304 304 """The IPython Controller Hub with 0MQ connections
305 305
306 306 Parameters
307 307 ==========
308 308 loop: zmq IOLoop instance
309 309 session: Session object
310 310 <removed> context: zmq context for creating new connections (?)
311 311 queue: ZMQStream for monitoring the command queue (SUB)
312 query: ZMQStream for engine registration and client queries requests (XREP)
312 query: ZMQStream for engine registration and client queries requests (ROUTER)
313 313 heartbeat: HeartMonitor object checking the pulse of the engines
314 314 notifier: ZMQStream for broadcasting engine registration changes (PUB)
315 315 db: connection to db for out of memory logging of commands
316 316 NotImplemented
317 317 engine_info: dict of zmq connection information for engines to connect
318 318 to the queues.
319 319 client_info: dict of zmq connection information for engines to connect
320 320 to the queues.
321 321 """
322 322 # internal data structures:
323 323 ids=Set() # engine IDs
324 324 keytable=Dict()
325 325 by_ident=Dict()
326 326 engines=Dict()
327 327 clients=Dict()
328 328 hearts=Dict()
329 329 pending=Set()
330 330 queues=Dict() # pending msg_ids keyed by engine_id
331 331 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
332 332 completed=Dict() # completed msg_ids keyed by engine_id
333 333 all_completed=Set() # completed msg_ids keyed by engine_id
334 334 dead_engines=Set() # completed msg_ids keyed by engine_id
335 335 unassigned=Set() # set of task msg_ds not yet assigned a destination
336 336 incoming_registrations=Dict()
337 337 registration_timeout=Integer()
338 338 _idcounter=Integer(0)
339 339
340 340 # objects from constructor:
341 341 query=Instance(ZMQStream)
342 342 monitor=Instance(ZMQStream)
343 343 notifier=Instance(ZMQStream)
344 344 resubmit=Instance(ZMQStream)
345 345 heartmonitor=Instance(HeartMonitor)
346 346 db=Instance(object)
347 347 client_info=Dict()
348 348 engine_info=Dict()
349 349
350 350
351 351 def __init__(self, **kwargs):
352 352 """
353 353 # universal:
354 354 loop: IOLoop for creating future connections
355 355 session: streamsession for sending serialized data
356 356 # engine:
357 357 queue: ZMQStream for monitoring queue messages
358 358 query: ZMQStream for engine+client registration and client requests
359 359 heartbeat: HeartMonitor object for tracking engines
360 360 # extra:
361 361 db: ZMQStream for db connection (NotImplemented)
362 362 engine_info: zmq address/protocol dict for engine connections
363 363 client_info: zmq address/protocol dict for client connections
364 364 """
365 365
366 366 super(Hub, self).__init__(**kwargs)
367 367 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
368 368
369 369 # validate connection dicts:
370 370 for k,v in self.client_info.iteritems():
371 371 if k == 'task':
372 372 util.validate_url_container(v[1])
373 373 else:
374 374 util.validate_url_container(v)
375 375 # util.validate_url_container(self.client_info)
376 376 util.validate_url_container(self.engine_info)
377 377
378 378 # register our callbacks
379 379 self.query.on_recv(self.dispatch_query)
380 380 self.monitor.on_recv(self.dispatch_monitor_traffic)
381 381
382 382 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
383 383 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
384 384
385 385 self.monitor_handlers = {b'in' : self.save_queue_request,
386 386 b'out': self.save_queue_result,
387 387 b'intask': self.save_task_request,
388 388 b'outtask': self.save_task_result,
389 389 b'tracktask': self.save_task_destination,
390 390 b'incontrol': _passer,
391 391 b'outcontrol': _passer,
392 392 b'iopub': self.save_iopub_message,
393 393 }
394 394
395 395 self.query_handlers = {'queue_request': self.queue_status,
396 396 'result_request': self.get_results,
397 397 'history_request': self.get_history,
398 398 'db_request': self.db_query,
399 399 'purge_request': self.purge_results,
400 400 'load_request': self.check_load,
401 401 'resubmit_request': self.resubmit_task,
402 402 'shutdown_request': self.shutdown_request,
403 403 'registration_request' : self.register_engine,
404 404 'unregistration_request' : self.unregister_engine,
405 405 'connection_request': self.connection_request,
406 406 }
407 407
408 408 # ignore resubmit replies
409 409 self.resubmit.on_recv(lambda msg: None, copy=False)
410 410
411 411 self.log.info("hub::created hub")
412 412
413 413 @property
414 414 def _next_id(self):
415 415 """gemerate a new ID.
416 416
417 417 No longer reuse old ids, just count from 0."""
418 418 newid = self._idcounter
419 419 self._idcounter += 1
420 420 return newid
421 421 # newid = 0
422 422 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
423 423 # # print newid, self.ids, self.incoming_registrations
424 424 # while newid in self.ids or newid in incoming:
425 425 # newid += 1
426 426 # return newid
427 427
428 428 #-----------------------------------------------------------------------------
429 429 # message validation
430 430 #-----------------------------------------------------------------------------
431 431
432 432 def _validate_targets(self, targets):
433 433 """turn any valid targets argument into a list of integer ids"""
434 434 if targets is None:
435 435 # default to all
436 436 return self.ids
437 437
438 438 if isinstance(targets, (int,str,unicode)):
439 439 # only one target specified
440 440 targets = [targets]
441 441 _targets = []
442 442 for t in targets:
443 443 # map raw identities to ids
444 444 if isinstance(t, (str,unicode)):
445 445 t = self.by_ident.get(cast_bytes(t), t)
446 446 _targets.append(t)
447 447 targets = _targets
448 448 bad_targets = [ t for t in targets if t not in self.ids ]
449 449 if bad_targets:
450 450 raise IndexError("No Such Engine: %r" % bad_targets)
451 451 if not targets:
452 452 raise IndexError("No Engines Registered")
453 453 return targets
454 454
455 455 #-----------------------------------------------------------------------------
456 456 # dispatch methods (1 per stream)
457 457 #-----------------------------------------------------------------------------
458 458
459 459
460 460 @util.log_errors
461 461 def dispatch_monitor_traffic(self, msg):
462 462 """all ME and Task queue messages come through here, as well as
463 463 IOPub traffic."""
464 464 self.log.debug("monitor traffic: %r", msg[0])
465 465 switch = msg[0]
466 466 try:
467 467 idents, msg = self.session.feed_identities(msg[1:])
468 468 except ValueError:
469 469 idents=[]
470 470 if not idents:
471 471 self.log.error("Monitor message without topic: %r", msg)
472 472 return
473 473 handler = self.monitor_handlers.get(switch, None)
474 474 if handler is not None:
475 475 handler(idents, msg)
476 476 else:
477 477 self.log.error("Unrecognized monitor topic: %r", switch)
478 478
479 479
480 480 @util.log_errors
481 481 def dispatch_query(self, msg):
482 482 """Route registration requests and queries from clients."""
483 483 try:
484 484 idents, msg = self.session.feed_identities(msg)
485 485 except ValueError:
486 486 idents = []
487 487 if not idents:
488 488 self.log.error("Bad Query Message: %r", msg)
489 489 return
490 490 client_id = idents[0]
491 491 try:
492 492 msg = self.session.unserialize(msg, content=True)
493 493 except Exception:
494 494 content = error.wrap_exception()
495 495 self.log.error("Bad Query Message: %r", msg, exc_info=True)
496 496 self.session.send(self.query, "hub_error", ident=client_id,
497 497 content=content)
498 498 return
499 499 # print client_id, header, parent, content
500 500 #switch on message type:
501 501 msg_type = msg['header']['msg_type']
502 502 self.log.info("client::client %r requested %r", client_id, msg_type)
503 503 handler = self.query_handlers.get(msg_type, None)
504 504 try:
505 505 assert handler is not None, "Bad Message Type: %r" % msg_type
506 506 except:
507 507 content = error.wrap_exception()
508 508 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
509 509 self.session.send(self.query, "hub_error", ident=client_id,
510 510 content=content)
511 511 return
512 512
513 513 else:
514 514 handler(idents, msg)
515 515
516 516 def dispatch_db(self, msg):
517 517 """"""
518 518 raise NotImplementedError
519 519
520 520 #---------------------------------------------------------------------------
521 521 # handler methods (1 per event)
522 522 #---------------------------------------------------------------------------
523 523
524 524 #----------------------- Heartbeat --------------------------------------
525 525
526 526 def handle_new_heart(self, heart):
527 527 """handler to attach to heartbeater.
528 528 Called when a new heart starts to beat.
529 529 Triggers completion of registration."""
530 530 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
531 531 if heart not in self.incoming_registrations:
532 532 self.log.info("heartbeat::ignoring new heart: %r", heart)
533 533 else:
534 534 self.finish_registration(heart)
535 535
536 536
537 537 def handle_heart_failure(self, heart):
538 538 """handler to attach to heartbeater.
539 539 called when a previously registered heart fails to respond to beat request.
540 540 triggers unregistration"""
541 541 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
542 542 eid = self.hearts.get(heart, None)
543 543 queue = self.engines[eid].queue
544 544 if eid is None or self.keytable[eid] in self.dead_engines:
545 545 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
546 546 else:
547 547 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
548 548
549 549 #----------------------- MUX Queue Traffic ------------------------------
550 550
551 551 def save_queue_request(self, idents, msg):
552 552 if len(idents) < 2:
553 553 self.log.error("invalid identity prefix: %r", idents)
554 554 return
555 555 queue_id, client_id = idents[:2]
556 556 try:
557 557 msg = self.session.unserialize(msg)
558 558 except Exception:
559 559 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
560 560 return
561 561
562 562 eid = self.by_ident.get(queue_id, None)
563 563 if eid is None:
564 564 self.log.error("queue::target %r not registered", queue_id)
565 565 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
566 566 return
567 567 record = init_record(msg)
568 568 msg_id = record['msg_id']
569 569 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
570 570 # Unicode in records
571 571 record['engine_uuid'] = queue_id.decode('ascii')
572 572 record['client_uuid'] = client_id.decode('ascii')
573 573 record['queue'] = 'mux'
574 574
575 575 try:
576 576 # it's posible iopub arrived first:
577 577 existing = self.db.get_record(msg_id)
578 578 for key,evalue in existing.iteritems():
579 579 rvalue = record.get(key, None)
580 580 if evalue and rvalue and evalue != rvalue:
581 581 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
582 582 elif evalue and not rvalue:
583 583 record[key] = evalue
584 584 try:
585 585 self.db.update_record(msg_id, record)
586 586 except Exception:
587 587 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
588 588 except KeyError:
589 589 try:
590 590 self.db.add_record(msg_id, record)
591 591 except Exception:
592 592 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
593 593
594 594
595 595 self.pending.add(msg_id)
596 596 self.queues[eid].append(msg_id)
597 597
598 598 def save_queue_result(self, idents, msg):
599 599 if len(idents) < 2:
600 600 self.log.error("invalid identity prefix: %r", idents)
601 601 return
602 602
603 603 client_id, queue_id = idents[:2]
604 604 try:
605 605 msg = self.session.unserialize(msg)
606 606 except Exception:
607 607 self.log.error("queue::engine %r sent invalid message to %r: %r",
608 608 queue_id, client_id, msg, exc_info=True)
609 609 return
610 610
611 611 eid = self.by_ident.get(queue_id, None)
612 612 if eid is None:
613 613 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
614 614 return
615 615
616 616 parent = msg['parent_header']
617 617 if not parent:
618 618 return
619 619 msg_id = parent['msg_id']
620 620 if msg_id in self.pending:
621 621 self.pending.remove(msg_id)
622 622 self.all_completed.add(msg_id)
623 623 self.queues[eid].remove(msg_id)
624 624 self.completed[eid].append(msg_id)
625 625 self.log.info("queue::request %r completed on %s", msg_id, eid)
626 626 elif msg_id not in self.all_completed:
627 627 # it could be a result from a dead engine that died before delivering the
628 628 # result
629 629 self.log.warn("queue:: unknown msg finished %r", msg_id)
630 630 return
631 631 # update record anyway, because the unregistration could have been premature
632 632 rheader = msg['header']
633 633 completed = rheader['date']
634 634 started = rheader.get('started', None)
635 635 result = {
636 636 'result_header' : rheader,
637 637 'result_content': msg['content'],
638 638 'received': datetime.now(),
639 639 'started' : started,
640 640 'completed' : completed
641 641 }
642 642
643 643 result['result_buffers'] = msg['buffers']
644 644 try:
645 645 self.db.update_record(msg_id, result)
646 646 except Exception:
647 647 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
648 648
649 649
650 650 #--------------------- Task Queue Traffic ------------------------------
651 651
652 652 def save_task_request(self, idents, msg):
653 653 """Save the submission of a task."""
654 654 client_id = idents[0]
655 655
656 656 try:
657 657 msg = self.session.unserialize(msg)
658 658 except Exception:
659 659 self.log.error("task::client %r sent invalid task message: %r",
660 660 client_id, msg, exc_info=True)
661 661 return
662 662 record = init_record(msg)
663 663
664 664 record['client_uuid'] = client_id.decode('ascii')
665 665 record['queue'] = 'task'
666 666 header = msg['header']
667 667 msg_id = header['msg_id']
668 668 self.pending.add(msg_id)
669 669 self.unassigned.add(msg_id)
670 670 try:
671 671 # it's posible iopub arrived first:
672 672 existing = self.db.get_record(msg_id)
673 673 if existing['resubmitted']:
674 674 for key in ('submitted', 'client_uuid', 'buffers'):
675 675 # don't clobber these keys on resubmit
676 676 # submitted and client_uuid should be different
677 677 # and buffers might be big, and shouldn't have changed
678 678 record.pop(key)
679 679 # still check content,header which should not change
680 680 # but are not expensive to compare as buffers
681 681
682 682 for key,evalue in existing.iteritems():
683 683 if key.endswith('buffers'):
684 684 # don't compare buffers
685 685 continue
686 686 rvalue = record.get(key, None)
687 687 if evalue and rvalue and evalue != rvalue:
688 688 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
689 689 elif evalue and not rvalue:
690 690 record[key] = evalue
691 691 try:
692 692 self.db.update_record(msg_id, record)
693 693 except Exception:
694 694 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
695 695 except KeyError:
696 696 try:
697 697 self.db.add_record(msg_id, record)
698 698 except Exception:
699 699 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
700 700 except Exception:
701 701 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
702 702
703 703 def save_task_result(self, idents, msg):
704 704 """save the result of a completed task."""
705 705 client_id = idents[0]
706 706 try:
707 707 msg = self.session.unserialize(msg)
708 708 except Exception:
709 709 self.log.error("task::invalid task result message send to %r: %r",
710 710 client_id, msg, exc_info=True)
711 711 return
712 712
713 713 parent = msg['parent_header']
714 714 if not parent:
715 715 # print msg
716 716 self.log.warn("Task %r had no parent!", msg)
717 717 return
718 718 msg_id = parent['msg_id']
719 719 if msg_id in self.unassigned:
720 720 self.unassigned.remove(msg_id)
721 721
722 722 header = msg['header']
723 723 engine_uuid = header.get('engine', u'')
724 724 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
725 725
726 726 status = header.get('status', None)
727 727
728 728 if msg_id in self.pending:
729 729 self.log.info("task::task %r finished on %s", msg_id, eid)
730 730 self.pending.remove(msg_id)
731 731 self.all_completed.add(msg_id)
732 732 if eid is not None:
733 733 if status != 'aborted':
734 734 self.completed[eid].append(msg_id)
735 735 if msg_id in self.tasks[eid]:
736 736 self.tasks[eid].remove(msg_id)
737 737 completed = header['date']
738 738 started = header.get('started', None)
739 739 result = {
740 740 'result_header' : header,
741 741 'result_content': msg['content'],
742 742 'started' : started,
743 743 'completed' : completed,
744 744 'received' : datetime.now(),
745 745 'engine_uuid': engine_uuid,
746 746 }
747 747
748 748 result['result_buffers'] = msg['buffers']
749 749 try:
750 750 self.db.update_record(msg_id, result)
751 751 except Exception:
752 752 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
753 753
754 754 else:
755 755 self.log.debug("task::unknown task %r finished", msg_id)
756 756
757 757 def save_task_destination(self, idents, msg):
758 758 try:
759 759 msg = self.session.unserialize(msg, content=True)
760 760 except Exception:
761 761 self.log.error("task::invalid task tracking message", exc_info=True)
762 762 return
763 763 content = msg['content']
764 764 # print (content)
765 765 msg_id = content['msg_id']
766 766 engine_uuid = content['engine_id']
767 767 eid = self.by_ident[cast_bytes(engine_uuid)]
768 768
769 769 self.log.info("task::task %r arrived on %r", msg_id, eid)
770 770 if msg_id in self.unassigned:
771 771 self.unassigned.remove(msg_id)
772 772 # else:
773 773 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
774 774
775 775 self.tasks[eid].append(msg_id)
776 776 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
777 777 try:
778 778 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
779 779 except Exception:
780 780 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
781 781
782 782
783 783 def mia_task_request(self, idents, msg):
784 784 raise NotImplementedError
785 785 client_id = idents[0]
786 786 # content = dict(mia=self.mia,status='ok')
787 787 # self.session.send('mia_reply', content=content, idents=client_id)
788 788
789 789
790 790 #--------------------- IOPub Traffic ------------------------------
791 791
792 792 def save_iopub_message(self, topics, msg):
793 793 """save an iopub message into the db"""
794 794 # print (topics)
795 795 try:
796 796 msg = self.session.unserialize(msg, content=True)
797 797 except Exception:
798 798 self.log.error("iopub::invalid IOPub message", exc_info=True)
799 799 return
800 800
801 801 parent = msg['parent_header']
802 802 if not parent:
803 803 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
804 804 return
805 805 msg_id = parent['msg_id']
806 806 msg_type = msg['header']['msg_type']
807 807 content = msg['content']
808 808
809 809 # ensure msg_id is in db
810 810 try:
811 811 rec = self.db.get_record(msg_id)
812 812 except KeyError:
813 813 rec = empty_record()
814 814 rec['msg_id'] = msg_id
815 815 self.db.add_record(msg_id, rec)
816 816 # stream
817 817 d = {}
818 818 if msg_type == 'stream':
819 819 name = content['name']
820 820 s = rec[name] or ''
821 821 d[name] = s + content['data']
822 822
823 823 elif msg_type == 'pyerr':
824 824 d['pyerr'] = content
825 825 elif msg_type == 'pyin':
826 826 d['pyin'] = content['code']
827 827 else:
828 828 d[msg_type] = content.get('data', '')
829 829
830 830 try:
831 831 self.db.update_record(msg_id, d)
832 832 except Exception:
833 833 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
834 834
835 835
836 836
837 837 #-------------------------------------------------------------------------
838 838 # Registration requests
839 839 #-------------------------------------------------------------------------
840 840
841 841 def connection_request(self, client_id, msg):
842 842 """Reply with connection addresses for clients."""
843 843 self.log.info("client::client %r connected", client_id)
844 844 content = dict(status='ok')
845 845 content.update(self.client_info)
846 846 jsonable = {}
847 847 for k,v in self.keytable.iteritems():
848 848 if v not in self.dead_engines:
849 849 jsonable[str(k)] = v.decode('ascii')
850 850 content['engines'] = jsonable
851 851 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
852 852
853 853 def register_engine(self, reg, msg):
854 854 """Register a new engine."""
855 855 content = msg['content']
856 856 try:
857 857 queue = cast_bytes(content['queue'])
858 858 except KeyError:
859 859 self.log.error("registration::queue not specified", exc_info=True)
860 860 return
861 861 heart = content.get('heartbeat', None)
862 862 if heart:
863 863 heart = cast_bytes(heart)
864 864 """register a new engine, and create the socket(s) necessary"""
865 865 eid = self._next_id
866 866 # print (eid, queue, reg, heart)
867 867
868 868 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
869 869
870 870 content = dict(id=eid,status='ok')
871 871 content.update(self.engine_info)
872 872 # check if requesting available IDs:
873 873 if queue in self.by_ident:
874 874 try:
875 875 raise KeyError("queue_id %r in use" % queue)
876 876 except:
877 877 content = error.wrap_exception()
878 878 self.log.error("queue_id %r in use", queue, exc_info=True)
879 879 elif heart in self.hearts: # need to check unique hearts?
880 880 try:
881 881 raise KeyError("heart_id %r in use" % heart)
882 882 except:
883 883 self.log.error("heart_id %r in use", heart, exc_info=True)
884 884 content = error.wrap_exception()
885 885 else:
886 886 for h, pack in self.incoming_registrations.iteritems():
887 887 if heart == h:
888 888 try:
889 889 raise KeyError("heart_id %r in use" % heart)
890 890 except:
891 891 self.log.error("heart_id %r in use", heart, exc_info=True)
892 892 content = error.wrap_exception()
893 893 break
894 894 elif queue == pack[1]:
895 895 try:
896 896 raise KeyError("queue_id %r in use" % queue)
897 897 except:
898 898 self.log.error("queue_id %r in use", queue, exc_info=True)
899 899 content = error.wrap_exception()
900 900 break
901 901
902 902 msg = self.session.send(self.query, "registration_reply",
903 903 content=content,
904 904 ident=reg)
905 905
906 906 if content['status'] == 'ok':
907 907 if heart in self.heartmonitor.hearts:
908 908 # already beating
909 909 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
910 910 self.finish_registration(heart)
911 911 else:
912 912 purge = lambda : self._purge_stalled_registration(heart)
913 913 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
914 914 dc.start()
915 915 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
916 916 else:
917 917 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
918 918 return eid
919 919
920 920 def unregister_engine(self, ident, msg):
921 921 """Unregister an engine that explicitly requested to leave."""
922 922 try:
923 923 eid = msg['content']['id']
924 924 except:
925 925 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
926 926 return
927 927 self.log.info("registration::unregister_engine(%r)", eid)
928 928 # print (eid)
929 929 uuid = self.keytable[eid]
930 930 content=dict(id=eid, queue=uuid.decode('ascii'))
931 931 self.dead_engines.add(uuid)
932 932 # self.ids.remove(eid)
933 933 # uuid = self.keytable.pop(eid)
934 934 #
935 935 # ec = self.engines.pop(eid)
936 936 # self.hearts.pop(ec.heartbeat)
937 937 # self.by_ident.pop(ec.queue)
938 938 # self.completed.pop(eid)
939 939 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
940 940 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
941 941 dc.start()
942 942 ############## TODO: HANDLE IT ################
943 943
944 944 if self.notifier:
945 945 self.session.send(self.notifier, "unregistration_notification", content=content)
946 946
947 947 def _handle_stranded_msgs(self, eid, uuid):
948 948 """Handle messages known to be on an engine when the engine unregisters.
949 949
950 950 It is possible that this will fire prematurely - that is, an engine will
951 951 go down after completing a result, and the client will be notified
952 952 that the result failed and later receive the actual result.
953 953 """
954 954
955 955 outstanding = self.queues[eid]
956 956
957 957 for msg_id in outstanding:
958 958 self.pending.remove(msg_id)
959 959 self.all_completed.add(msg_id)
960 960 try:
961 961 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
962 962 except:
963 963 content = error.wrap_exception()
964 964 # build a fake header:
965 965 header = {}
966 966 header['engine'] = uuid
967 967 header['date'] = datetime.now()
968 968 rec = dict(result_content=content, result_header=header, result_buffers=[])
969 969 rec['completed'] = header['date']
970 970 rec['engine_uuid'] = uuid
971 971 try:
972 972 self.db.update_record(msg_id, rec)
973 973 except Exception:
974 974 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
975 975
976 976
977 977 def finish_registration(self, heart):
978 978 """Second half of engine registration, called after our HeartMonitor
979 979 has received a beat from the Engine's Heart."""
980 980 try:
981 981 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
982 982 except KeyError:
983 983 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
984 984 return
985 985 self.log.info("registration::finished registering engine %i:%r", eid, queue)
986 986 if purge is not None:
987 987 purge.stop()
988 988 control = queue
989 989 self.ids.add(eid)
990 990 self.keytable[eid] = queue
991 991 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
992 992 control=control, heartbeat=heart)
993 993 self.by_ident[queue] = eid
994 994 self.queues[eid] = list()
995 995 self.tasks[eid] = list()
996 996 self.completed[eid] = list()
997 997 self.hearts[heart] = eid
998 998 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
999 999 if self.notifier:
1000 1000 self.session.send(self.notifier, "registration_notification", content=content)
1001 1001 self.log.info("engine::Engine Connected: %i", eid)
1002 1002
1003 1003 def _purge_stalled_registration(self, heart):
1004 1004 if heart in self.incoming_registrations:
1005 1005 eid = self.incoming_registrations.pop(heart)[0]
1006 1006 self.log.info("registration::purging stalled registration: %i", eid)
1007 1007 else:
1008 1008 pass
1009 1009
1010 1010 #-------------------------------------------------------------------------
1011 1011 # Client Requests
1012 1012 #-------------------------------------------------------------------------
1013 1013
1014 1014 def shutdown_request(self, client_id, msg):
1015 1015 """handle shutdown request."""
1016 1016 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1017 1017 # also notify other clients of shutdown
1018 1018 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1019 1019 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1020 1020 dc.start()
1021 1021
1022 1022 def _shutdown(self):
1023 1023 self.log.info("hub::hub shutting down.")
1024 1024 time.sleep(0.1)
1025 1025 sys.exit(0)
1026 1026
1027 1027
1028 1028 def check_load(self, client_id, msg):
1029 1029 content = msg['content']
1030 1030 try:
1031 1031 targets = content['targets']
1032 1032 targets = self._validate_targets(targets)
1033 1033 except:
1034 1034 content = error.wrap_exception()
1035 1035 self.session.send(self.query, "hub_error",
1036 1036 content=content, ident=client_id)
1037 1037 return
1038 1038
1039 1039 content = dict(status='ok')
1040 1040 # loads = {}
1041 1041 for t in targets:
1042 1042 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1043 1043 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1044 1044
1045 1045
1046 1046 def queue_status(self, client_id, msg):
1047 1047 """Return the Queue status of one or more targets.
1048 1048 if verbose: return the msg_ids
1049 1049 else: return len of each type.
1050 1050 keys: queue (pending MUX jobs)
1051 1051 tasks (pending Task jobs)
1052 1052 completed (finished jobs from both queues)"""
1053 1053 content = msg['content']
1054 1054 targets = content['targets']
1055 1055 try:
1056 1056 targets = self._validate_targets(targets)
1057 1057 except:
1058 1058 content = error.wrap_exception()
1059 1059 self.session.send(self.query, "hub_error",
1060 1060 content=content, ident=client_id)
1061 1061 return
1062 1062 verbose = content.get('verbose', False)
1063 1063 content = dict(status='ok')
1064 1064 for t in targets:
1065 1065 queue = self.queues[t]
1066 1066 completed = self.completed[t]
1067 1067 tasks = self.tasks[t]
1068 1068 if not verbose:
1069 1069 queue = len(queue)
1070 1070 completed = len(completed)
1071 1071 tasks = len(tasks)
1072 1072 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1073 1073 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1074 1074 # print (content)
1075 1075 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1076 1076
1077 1077 def purge_results(self, client_id, msg):
1078 1078 """Purge results from memory. This method is more valuable before we move
1079 1079 to a DB based message storage mechanism."""
1080 1080 content = msg['content']
1081 1081 self.log.info("Dropping records with %s", content)
1082 1082 msg_ids = content.get('msg_ids', [])
1083 1083 reply = dict(status='ok')
1084 1084 if msg_ids == 'all':
1085 1085 try:
1086 1086 self.db.drop_matching_records(dict(completed={'$ne':None}))
1087 1087 except Exception:
1088 1088 reply = error.wrap_exception()
1089 1089 else:
1090 1090 pending = filter(lambda m: m in self.pending, msg_ids)
1091 1091 if pending:
1092 1092 try:
1093 1093 raise IndexError("msg pending: %r" % pending[0])
1094 1094 except:
1095 1095 reply = error.wrap_exception()
1096 1096 else:
1097 1097 try:
1098 1098 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1099 1099 except Exception:
1100 1100 reply = error.wrap_exception()
1101 1101
1102 1102 if reply['status'] == 'ok':
1103 1103 eids = content.get('engine_ids', [])
1104 1104 for eid in eids:
1105 1105 if eid not in self.engines:
1106 1106 try:
1107 1107 raise IndexError("No such engine: %i" % eid)
1108 1108 except:
1109 1109 reply = error.wrap_exception()
1110 1110 break
1111 1111 uid = self.engines[eid].queue
1112 1112 try:
1113 1113 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1114 1114 except Exception:
1115 1115 reply = error.wrap_exception()
1116 1116 break
1117 1117
1118 1118 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1119 1119
1120 1120 def resubmit_task(self, client_id, msg):
1121 1121 """Resubmit one or more tasks."""
1122 1122 def finish(reply):
1123 1123 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1124 1124
1125 1125 content = msg['content']
1126 1126 msg_ids = content['msg_ids']
1127 1127 reply = dict(status='ok')
1128 1128 try:
1129 1129 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1130 1130 'header', 'content', 'buffers'])
1131 1131 except Exception:
1132 1132 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1133 1133 return finish(error.wrap_exception())
1134 1134
1135 1135 # validate msg_ids
1136 1136 found_ids = [ rec['msg_id'] for rec in records ]
1137 1137 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1138 1138 if len(records) > len(msg_ids):
1139 1139 try:
1140 1140 raise RuntimeError("DB appears to be in an inconsistent state."
1141 1141 "More matching records were found than should exist")
1142 1142 except Exception:
1143 1143 return finish(error.wrap_exception())
1144 1144 elif len(records) < len(msg_ids):
1145 1145 missing = [ m for m in msg_ids if m not in found_ids ]
1146 1146 try:
1147 1147 raise KeyError("No such msg(s): %r" % missing)
1148 1148 except KeyError:
1149 1149 return finish(error.wrap_exception())
1150 1150 elif pending_ids:
1151 1151 pass
1152 1152 # no need to raise on resubmit of pending task, now that we
1153 1153 # resubmit under new ID, but do we want to raise anyway?
1154 1154 # msg_id = invalid_ids[0]
1155 1155 # try:
1156 1156 # raise ValueError("Task(s) %r appears to be inflight" % )
1157 1157 # except Exception:
1158 1158 # return finish(error.wrap_exception())
1159 1159
1160 1160 # mapping of original IDs to resubmitted IDs
1161 1161 resubmitted = {}
1162 1162
1163 1163 # send the messages
1164 1164 for rec in records:
1165 1165 header = rec['header']
1166 1166 msg = self.session.msg(header['msg_type'], parent=header)
1167 1167 msg_id = msg['msg_id']
1168 1168 msg['content'] = rec['content']
1169 1169
1170 1170 # use the old header, but update msg_id and timestamp
1171 1171 fresh = msg['header']
1172 1172 header['msg_id'] = fresh['msg_id']
1173 1173 header['date'] = fresh['date']
1174 1174 msg['header'] = header
1175 1175
1176 1176 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1177 1177
1178 1178 resubmitted[rec['msg_id']] = msg_id
1179 1179 self.pending.add(msg_id)
1180 1180 msg['buffers'] = rec['buffers']
1181 1181 try:
1182 1182 self.db.add_record(msg_id, init_record(msg))
1183 1183 except Exception:
1184 1184 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1185 1185
1186 1186 finish(dict(status='ok', resubmitted=resubmitted))
1187 1187
1188 1188 # store the new IDs in the Task DB
1189 1189 for msg_id, resubmit_id in resubmitted.iteritems():
1190 1190 try:
1191 1191 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1192 1192 except Exception:
1193 1193 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1194 1194
1195 1195
1196 1196 def _extract_record(self, rec):
1197 1197 """decompose a TaskRecord dict into subsection of reply for get_result"""
1198 1198 io_dict = {}
1199 1199 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1200 1200 io_dict[key] = rec[key]
1201 1201 content = { 'result_content': rec['result_content'],
1202 1202 'header': rec['header'],
1203 1203 'result_header' : rec['result_header'],
1204 1204 'received' : rec['received'],
1205 1205 'io' : io_dict,
1206 1206 }
1207 1207 if rec['result_buffers']:
1208 1208 buffers = map(bytes, rec['result_buffers'])
1209 1209 else:
1210 1210 buffers = []
1211 1211
1212 1212 return content, buffers
1213 1213
1214 1214 def get_results(self, client_id, msg):
1215 1215 """Get the result of 1 or more messages."""
1216 1216 content = msg['content']
1217 1217 msg_ids = sorted(set(content['msg_ids']))
1218 1218 statusonly = content.get('status_only', False)
1219 1219 pending = []
1220 1220 completed = []
1221 1221 content = dict(status='ok')
1222 1222 content['pending'] = pending
1223 1223 content['completed'] = completed
1224 1224 buffers = []
1225 1225 if not statusonly:
1226 1226 try:
1227 1227 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1228 1228 # turn match list into dict, for faster lookup
1229 1229 records = {}
1230 1230 for rec in matches:
1231 1231 records[rec['msg_id']] = rec
1232 1232 except Exception:
1233 1233 content = error.wrap_exception()
1234 1234 self.session.send(self.query, "result_reply", content=content,
1235 1235 parent=msg, ident=client_id)
1236 1236 return
1237 1237 else:
1238 1238 records = {}
1239 1239 for msg_id in msg_ids:
1240 1240 if msg_id in self.pending:
1241 1241 pending.append(msg_id)
1242 1242 elif msg_id in self.all_completed:
1243 1243 completed.append(msg_id)
1244 1244 if not statusonly:
1245 1245 c,bufs = self._extract_record(records[msg_id])
1246 1246 content[msg_id] = c
1247 1247 buffers.extend(bufs)
1248 1248 elif msg_id in records:
1249 1249 if rec['completed']:
1250 1250 completed.append(msg_id)
1251 1251 c,bufs = self._extract_record(records[msg_id])
1252 1252 content[msg_id] = c
1253 1253 buffers.extend(bufs)
1254 1254 else:
1255 1255 pending.append(msg_id)
1256 1256 else:
1257 1257 try:
1258 1258 raise KeyError('No such message: '+msg_id)
1259 1259 except:
1260 1260 content = error.wrap_exception()
1261 1261 break
1262 1262 self.session.send(self.query, "result_reply", content=content,
1263 1263 parent=msg, ident=client_id,
1264 1264 buffers=buffers)
1265 1265
1266 1266 def get_history(self, client_id, msg):
1267 1267 """Get a list of all msg_ids in our DB records"""
1268 1268 try:
1269 1269 msg_ids = self.db.get_history()
1270 1270 except Exception as e:
1271 1271 content = error.wrap_exception()
1272 1272 else:
1273 1273 content = dict(status='ok', history=msg_ids)
1274 1274
1275 1275 self.session.send(self.query, "history_reply", content=content,
1276 1276 parent=msg, ident=client_id)
1277 1277
1278 1278 def db_query(self, client_id, msg):
1279 1279 """Perform a raw query on the task record database."""
1280 1280 content = msg['content']
1281 1281 query = content.get('query', {})
1282 1282 keys = content.get('keys', None)
1283 1283 buffers = []
1284 1284 empty = list()
1285 1285 try:
1286 1286 records = self.db.find_records(query, keys)
1287 1287 except Exception as e:
1288 1288 content = error.wrap_exception()
1289 1289 else:
1290 1290 # extract buffers from reply content:
1291 1291 if keys is not None:
1292 1292 buffer_lens = [] if 'buffers' in keys else None
1293 1293 result_buffer_lens = [] if 'result_buffers' in keys else None
1294 1294 else:
1295 1295 buffer_lens = None
1296 1296 result_buffer_lens = None
1297 1297
1298 1298 for rec in records:
1299 1299 # buffers may be None, so double check
1300 1300 b = rec.pop('buffers', empty) or empty
1301 1301 if buffer_lens is not None:
1302 1302 buffer_lens.append(len(b))
1303 1303 buffers.extend(b)
1304 1304 rb = rec.pop('result_buffers', empty) or empty
1305 1305 if result_buffer_lens is not None:
1306 1306 result_buffer_lens.append(len(rb))
1307 1307 buffers.extend(rb)
1308 1308 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1309 1309 result_buffer_lens=result_buffer_lens)
1310 1310 # self.log.debug (content)
1311 1311 self.session.send(self.query, "db_reply", content=content,
1312 1312 parent=msg, ident=client_id,
1313 1313 buffers=buffers)
1314 1314
@@ -1,768 +1,768 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6
7 7 Authors:
8 8
9 9 * Min RK
10 10 """
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2010-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #----------------------------------------------------------------------
19 19 # Imports
20 20 #----------------------------------------------------------------------
21 21
22 22 from __future__ import print_function
23 23
24 24 import logging
25 25 import sys
26 26 import time
27 27
28 28 from datetime import datetime, timedelta
29 29 from random import randint, random
30 30 from types import FunctionType
31 31
32 32 try:
33 33 import numpy
34 34 except ImportError:
35 35 numpy = None
36 36
37 37 import zmq
38 38 from zmq.eventloop import ioloop, zmqstream
39 39
40 40 # local imports
41 41 from IPython.external.decorator import decorator
42 42 from IPython.config.application import Application
43 43 from IPython.config.loader import Config
44 44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
45 45 from IPython.utils.py3compat import cast_bytes
46 46
47 47 from IPython.parallel import error, util
48 48 from IPython.parallel.factory import SessionFactory
49 49 from IPython.parallel.util import connect_logger, local_logger
50 50
51 51 from .dependency import Dependency
52 52
53 53 @decorator
54 54 def logged(f,self,*args,**kwargs):
55 55 # print ("#--------------------")
56 56 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
57 57 # print ("#--")
58 58 return f(self,*args, **kwargs)
59 59
60 60 #----------------------------------------------------------------------
61 61 # Chooser functions
62 62 #----------------------------------------------------------------------
63 63
64 64 def plainrandom(loads):
65 65 """Plain random pick."""
66 66 n = len(loads)
67 67 return randint(0,n-1)
68 68
69 69 def lru(loads):
70 70 """Always pick the front of the line.
71 71
72 72 The content of `loads` is ignored.
73 73
74 74 Assumes LRU ordering of loads, with oldest first.
75 75 """
76 76 return 0
77 77
78 78 def twobin(loads):
79 79 """Pick two at random, use the LRU of the two.
80 80
81 81 The content of loads is ignored.
82 82
83 83 Assumes LRU ordering of loads, with oldest first.
84 84 """
85 85 n = len(loads)
86 86 a = randint(0,n-1)
87 87 b = randint(0,n-1)
88 88 return min(a,b)
89 89
90 90 def weighted(loads):
91 91 """Pick two at random using inverse load as weight.
92 92
93 93 Return the less loaded of the two.
94 94 """
95 95 # weight 0 a million times more than 1:
96 96 weights = 1./(1e-6+numpy.array(loads))
97 97 sums = weights.cumsum()
98 98 t = sums[-1]
99 99 x = random()*t
100 100 y = random()*t
101 101 idx = 0
102 102 idy = 0
103 103 while sums[idx] < x:
104 104 idx += 1
105 105 while sums[idy] < y:
106 106 idy += 1
107 107 if weights[idy] > weights[idx]:
108 108 return idy
109 109 else:
110 110 return idx
111 111
112 112 def leastload(loads):
113 113 """Always choose the lowest load.
114 114
115 115 If the lowest load occurs more than once, the first
116 116 occurance will be used. If loads has LRU ordering, this means
117 117 the LRU of those with the lowest load is chosen.
118 118 """
119 119 return loads.index(min(loads))
120 120
121 121 #---------------------------------------------------------------------
122 122 # Classes
123 123 #---------------------------------------------------------------------
124 124
125 125
126 126 # store empty default dependency:
127 127 MET = Dependency([])
128 128
129 129
130 130 class Job(object):
131 131 """Simple container for a job"""
132 132 def __init__(self, msg_id, raw_msg, idents, msg, header, targets, after, follow, timeout):
133 133 self.msg_id = msg_id
134 134 self.raw_msg = raw_msg
135 135 self.idents = idents
136 136 self.msg = msg
137 137 self.header = header
138 138 self.targets = targets
139 139 self.after = after
140 140 self.follow = follow
141 141 self.timeout = timeout
142 142
143 143
144 144 self.timestamp = time.time()
145 145 self.blacklist = set()
146 146
147 147 @property
148 148 def dependents(self):
149 149 return self.follow.union(self.after)
150 150
151 151 class TaskScheduler(SessionFactory):
152 152 """Python TaskScheduler object.
153 153
154 154 This is the simplest object that supports msg_id based
155 155 DAG dependencies. *Only* task msg_ids are checked, not
156 156 msg_ids of jobs submitted via the MUX queue.
157 157
158 158 """
159 159
160 160 hwm = Integer(1, config=True,
161 161 help="""specify the High Water Mark (HWM) for the downstream
162 162 socket in the Task scheduler. This is the maximum number
163 163 of allowed outstanding tasks on each engine.
164 164
165 165 The default (1) means that only one task can be outstanding on each
166 166 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
167 167 engines continue to be assigned tasks while they are working,
168 168 effectively hiding network latency behind computation, but can result
169 169 in an imbalance of work when submitting many heterogenous tasks all at
170 170 once. Any positive value greater than one is a compromise between the
171 171 two.
172 172
173 173 """
174 174 )
175 175 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
176 176 'leastload', config=True, allow_none=False,
177 177 help="""select the task scheduler scheme [default: Python LRU]
178 178 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
179 179 )
180 180 def _scheme_name_changed(self, old, new):
181 181 self.log.debug("Using scheme %r"%new)
182 182 self.scheme = globals()[new]
183 183
184 184 # input arguments:
185 185 scheme = Instance(FunctionType) # function for determining the destination
186 186 def _scheme_default(self):
187 187 return leastload
188 188 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
189 189 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
190 190 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
191 191 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
192 192
193 193 # internals:
194 194 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
195 195 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
196 196 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
197 197 depending = Dict() # dict by msg_id of Jobs
198 198 pending = Dict() # dict by engine_uuid of submitted tasks
199 199 completed = Dict() # dict by engine_uuid of completed tasks
200 200 failed = Dict() # dict by engine_uuid of failed tasks
201 201 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
202 202 clients = Dict() # dict by msg_id for who submitted the task
203 203 targets = List() # list of target IDENTs
204 204 loads = List() # list of engine loads
205 205 # full = Set() # set of IDENTs that have HWM outstanding tasks
206 206 all_completed = Set() # set of all completed tasks
207 207 all_failed = Set() # set of all failed tasks
208 208 all_done = Set() # set of all finished tasks=union(completed,failed)
209 209 all_ids = Set() # set of all submitted task IDs
210 210
211 211 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
212 212
213 213 ident = CBytes() # ZMQ identity. This should just be self.session.session
214 214 # but ensure Bytes
215 215 def _ident_default(self):
216 216 return self.session.bsession
217 217
218 218 def start(self):
219 219 self.engine_stream.on_recv(self.dispatch_result, copy=False)
220 220 self.client_stream.on_recv(self.dispatch_submission, copy=False)
221 221
222 222 self._notification_handlers = dict(
223 223 registration_notification = self._register_engine,
224 224 unregistration_notification = self._unregister_engine
225 225 )
226 226 self.notifier_stream.on_recv(self.dispatch_notification)
227 227 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
228 228 self.auditor.start()
229 229 self.log.info("Scheduler started [%s]"%self.scheme_name)
230 230
231 231 def resume_receiving(self):
232 232 """Resume accepting jobs."""
233 233 self.client_stream.on_recv(self.dispatch_submission, copy=False)
234 234
235 235 def stop_receiving(self):
236 236 """Stop accepting jobs while there are no engines.
237 237 Leave them in the ZMQ queue."""
238 238 self.client_stream.on_recv(None)
239 239
240 240 #-----------------------------------------------------------------------
241 241 # [Un]Registration Handling
242 242 #-----------------------------------------------------------------------
243 243
244 244
245 245 @util.log_errors
246 246 def dispatch_notification(self, msg):
247 247 """dispatch register/unregister events."""
248 248 try:
249 249 idents,msg = self.session.feed_identities(msg)
250 250 except ValueError:
251 251 self.log.warn("task::Invalid Message: %r",msg)
252 252 return
253 253 try:
254 254 msg = self.session.unserialize(msg)
255 255 except ValueError:
256 256 self.log.warn("task::Unauthorized message from: %r"%idents)
257 257 return
258 258
259 259 msg_type = msg['header']['msg_type']
260 260
261 261 handler = self._notification_handlers.get(msg_type, None)
262 262 if handler is None:
263 263 self.log.error("Unhandled message type: %r"%msg_type)
264 264 else:
265 265 try:
266 266 handler(cast_bytes(msg['content']['queue']))
267 267 except Exception:
268 268 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
269 269
270 270 def _register_engine(self, uid):
271 271 """New engine with ident `uid` became available."""
272 272 # head of the line:
273 273 self.targets.insert(0,uid)
274 274 self.loads.insert(0,0)
275 275
276 276 # initialize sets
277 277 self.completed[uid] = set()
278 278 self.failed[uid] = set()
279 279 self.pending[uid] = {}
280 280
281 281 # rescan the graph:
282 282 self.update_graph(None)
283 283
284 284 def _unregister_engine(self, uid):
285 285 """Existing engine with ident `uid` became unavailable."""
286 286 if len(self.targets) == 1:
287 287 # this was our only engine
288 288 pass
289 289
290 290 # handle any potentially finished tasks:
291 291 self.engine_stream.flush()
292 292
293 293 # don't pop destinations, because they might be used later
294 294 # map(self.destinations.pop, self.completed.pop(uid))
295 295 # map(self.destinations.pop, self.failed.pop(uid))
296 296
297 297 # prevent this engine from receiving work
298 298 idx = self.targets.index(uid)
299 299 self.targets.pop(idx)
300 300 self.loads.pop(idx)
301 301
302 302 # wait 5 seconds before cleaning up pending jobs, since the results might
303 303 # still be incoming
304 304 if self.pending[uid]:
305 305 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
306 306 dc.start()
307 307 else:
308 308 self.completed.pop(uid)
309 309 self.failed.pop(uid)
310 310
311 311
312 312 def handle_stranded_tasks(self, engine):
313 313 """Deal with jobs resident in an engine that died."""
314 314 lost = self.pending[engine]
315 315 for msg_id in lost.keys():
316 316 if msg_id not in self.pending[engine]:
317 317 # prevent double-handling of messages
318 318 continue
319 319
320 320 raw_msg = lost[msg_id].raw_msg
321 321 idents,msg = self.session.feed_identities(raw_msg, copy=False)
322 322 parent = self.session.unpack(msg[1].bytes)
323 323 idents = [engine, idents[0]]
324 324
325 325 # build fake error reply
326 326 try:
327 327 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
328 328 except:
329 329 content = error.wrap_exception()
330 330 # build fake header
331 331 header = dict(
332 332 status='error',
333 333 engine=engine,
334 334 date=datetime.now(),
335 335 )
336 336 msg = self.session.msg('apply_reply', content, parent=parent, subheader=header)
337 337 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
338 338 # and dispatch it
339 339 self.dispatch_result(raw_reply)
340 340
341 341 # finally scrub completed/failed lists
342 342 self.completed.pop(engine)
343 343 self.failed.pop(engine)
344 344
345 345
346 346 #-----------------------------------------------------------------------
347 347 # Job Submission
348 348 #-----------------------------------------------------------------------
349 349
350 350
351 351 @util.log_errors
352 352 def dispatch_submission(self, raw_msg):
353 353 """Dispatch job submission to appropriate handlers."""
354 354 # ensure targets up to date:
355 355 self.notifier_stream.flush()
356 356 try:
357 357 idents, msg = self.session.feed_identities(raw_msg, copy=False)
358 358 msg = self.session.unserialize(msg, content=False, copy=False)
359 359 except Exception:
360 360 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
361 361 return
362 362
363 363
364 364 # send to monitor
365 365 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
366 366
367 367 header = msg['header']
368 368 msg_id = header['msg_id']
369 369 self.all_ids.add(msg_id)
370 370
371 371 # get targets as a set of bytes objects
372 372 # from a list of unicode objects
373 373 targets = header.get('targets', [])
374 374 targets = map(cast_bytes, targets)
375 375 targets = set(targets)
376 376
377 377 retries = header.get('retries', 0)
378 378 self.retries[msg_id] = retries
379 379
380 380 # time dependencies
381 381 after = header.get('after', None)
382 382 if after:
383 383 after = Dependency(after)
384 384 if after.all:
385 385 if after.success:
386 386 after = Dependency(after.difference(self.all_completed),
387 387 success=after.success,
388 388 failure=after.failure,
389 389 all=after.all,
390 390 )
391 391 if after.failure:
392 392 after = Dependency(after.difference(self.all_failed),
393 393 success=after.success,
394 394 failure=after.failure,
395 395 all=after.all,
396 396 )
397 397 if after.check(self.all_completed, self.all_failed):
398 398 # recast as empty set, if `after` already met,
399 399 # to prevent unnecessary set comparisons
400 400 after = MET
401 401 else:
402 402 after = MET
403 403
404 404 # location dependencies
405 405 follow = Dependency(header.get('follow', []))
406 406
407 407 # turn timeouts into datetime objects:
408 408 timeout = header.get('timeout', None)
409 409 if timeout:
410 410 # cast to float, because jsonlib returns floats as decimal.Decimal,
411 411 # which timedelta does not accept
412 412 timeout = datetime.now() + timedelta(0,float(timeout),0)
413 413
414 414 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
415 415 header=header, targets=targets, after=after, follow=follow,
416 416 timeout=timeout,
417 417 )
418 418
419 419 # validate and reduce dependencies:
420 420 for dep in after,follow:
421 421 if not dep: # empty dependency
422 422 continue
423 423 # check valid:
424 424 if msg_id in dep or dep.difference(self.all_ids):
425 425 self.depending[msg_id] = job
426 426 return self.fail_unreachable(msg_id, error.InvalidDependency)
427 427 # check if unreachable:
428 428 if dep.unreachable(self.all_completed, self.all_failed):
429 429 self.depending[msg_id] = job
430 430 return self.fail_unreachable(msg_id)
431 431
432 432 if after.check(self.all_completed, self.all_failed):
433 433 # time deps already met, try to run
434 434 if not self.maybe_run(job):
435 435 # can't run yet
436 436 if msg_id not in self.all_failed:
437 437 # could have failed as unreachable
438 438 self.save_unmet(job)
439 439 else:
440 440 self.save_unmet(job)
441 441
442 442 def audit_timeouts(self):
443 443 """Audit all waiting tasks for expired timeouts."""
444 444 now = datetime.now()
445 445 for msg_id in self.depending.keys():
446 446 # must recheck, in case one failure cascaded to another:
447 447 if msg_id in self.depending:
448 448 job = self.depending[msg_id]
449 449 if job.timeout and job.timeout < now:
450 450 self.fail_unreachable(msg_id, error.TaskTimeout)
451 451
452 452 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
453 453 """a task has become unreachable, send a reply with an ImpossibleDependency
454 454 error."""
455 455 if msg_id not in self.depending:
456 456 self.log.error("msg %r already failed!", msg_id)
457 457 return
458 458 job = self.depending.pop(msg_id)
459 459 for mid in job.dependents:
460 460 if mid in self.graph:
461 461 self.graph[mid].remove(msg_id)
462 462
463 463 try:
464 464 raise why()
465 465 except:
466 466 content = error.wrap_exception()
467 467
468 468 self.all_done.add(msg_id)
469 469 self.all_failed.add(msg_id)
470 470
471 471 msg = self.session.send(self.client_stream, 'apply_reply', content,
472 472 parent=job.header, ident=job.idents)
473 473 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
474 474
475 475 self.update_graph(msg_id, success=False)
476 476
477 477 def maybe_run(self, job):
478 478 """check location dependencies, and run if they are met."""
479 479 msg_id = job.msg_id
480 480 self.log.debug("Attempting to assign task %s", msg_id)
481 481 if not self.targets:
482 482 # no engines, definitely can't run
483 483 return False
484 484
485 485 if job.follow or job.targets or job.blacklist or self.hwm:
486 486 # we need a can_run filter
487 487 def can_run(idx):
488 488 # check hwm
489 489 if self.hwm and self.loads[idx] == self.hwm:
490 490 return False
491 491 target = self.targets[idx]
492 492 # check blacklist
493 493 if target in job.blacklist:
494 494 return False
495 495 # check targets
496 496 if job.targets and target not in job.targets:
497 497 return False
498 498 # check follow
499 499 return job.follow.check(self.completed[target], self.failed[target])
500 500
501 501 indices = filter(can_run, range(len(self.targets)))
502 502
503 503 if not indices:
504 504 # couldn't run
505 505 if job.follow.all:
506 506 # check follow for impossibility
507 507 dests = set()
508 508 relevant = set()
509 509 if job.follow.success:
510 510 relevant = self.all_completed
511 511 if job.follow.failure:
512 512 relevant = relevant.union(self.all_failed)
513 513 for m in job.follow.intersection(relevant):
514 514 dests.add(self.destinations[m])
515 515 if len(dests) > 1:
516 516 self.depending[msg_id] = job
517 517 self.fail_unreachable(msg_id)
518 518 return False
519 519 if job.targets:
520 520 # check blacklist+targets for impossibility
521 521 job.targets.difference_update(job.blacklist)
522 522 if not job.targets or not job.targets.intersection(self.targets):
523 523 self.depending[msg_id] = job
524 524 self.fail_unreachable(msg_id)
525 525 return False
526 526 return False
527 527 else:
528 528 indices = None
529 529
530 530 self.submit_task(job, indices)
531 531 return True
532 532
533 533 def save_unmet(self, job):
534 534 """Save a message for later submission when its dependencies are met."""
535 535 msg_id = job.msg_id
536 536 self.depending[msg_id] = job
537 537 # track the ids in follow or after, but not those already finished
538 538 for dep_id in job.after.union(job.follow).difference(self.all_done):
539 539 if dep_id not in self.graph:
540 540 self.graph[dep_id] = set()
541 541 self.graph[dep_id].add(msg_id)
542 542
543 543 def submit_task(self, job, indices=None):
544 544 """Submit a task to any of a subset of our targets."""
545 545 if indices:
546 546 loads = [self.loads[i] for i in indices]
547 547 else:
548 548 loads = self.loads
549 549 idx = self.scheme(loads)
550 550 if indices:
551 551 idx = indices[idx]
552 552 target = self.targets[idx]
553 553 # print (target, map(str, msg[:3]))
554 554 # send job to the engine
555 555 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
556 556 self.engine_stream.send_multipart(job.raw_msg, copy=False)
557 557 # update load
558 558 self.add_job(idx)
559 559 self.pending[target][job.msg_id] = job
560 560 # notify Hub
561 561 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
562 562 self.session.send(self.mon_stream, 'task_destination', content=content,
563 563 ident=[b'tracktask',self.ident])
564 564
565 565
566 566 #-----------------------------------------------------------------------
567 567 # Result Handling
568 568 #-----------------------------------------------------------------------
569 569
570 570
571 571 @util.log_errors
572 572 def dispatch_result(self, raw_msg):
573 573 """dispatch method for result replies"""
574 574 try:
575 575 idents,msg = self.session.feed_identities(raw_msg, copy=False)
576 576 msg = self.session.unserialize(msg, content=False, copy=False)
577 577 engine = idents[0]
578 578 try:
579 579 idx = self.targets.index(engine)
580 580 except ValueError:
581 581 pass # skip load-update for dead engines
582 582 else:
583 583 self.finish_job(idx)
584 584 except Exception:
585 585 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
586 586 return
587 587
588 588 header = msg['header']
589 589 parent = msg['parent_header']
590 590 if header.get('dependencies_met', True):
591 591 success = (header['status'] == 'ok')
592 592 msg_id = parent['msg_id']
593 593 retries = self.retries[msg_id]
594 594 if not success and retries > 0:
595 595 # failed
596 596 self.retries[msg_id] = retries - 1
597 597 self.handle_unmet_dependency(idents, parent)
598 598 else:
599 599 del self.retries[msg_id]
600 600 # relay to client and update graph
601 601 self.handle_result(idents, parent, raw_msg, success)
602 602 # send to Hub monitor
603 603 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
604 604 else:
605 605 self.handle_unmet_dependency(idents, parent)
606 606
607 607 def handle_result(self, idents, parent, raw_msg, success=True):
608 608 """handle a real task result, either success or failure"""
609 609 # first, relay result to client
610 610 engine = idents[0]
611 611 client = idents[1]
612 # swap_ids for XREP-XREP mirror
612 # swap_ids for ROUTER-ROUTER mirror
613 613 raw_msg[:2] = [client,engine]
614 614 # print (map(str, raw_msg[:4]))
615 615 self.client_stream.send_multipart(raw_msg, copy=False)
616 616 # now, update our data structures
617 617 msg_id = parent['msg_id']
618 618 self.pending[engine].pop(msg_id)
619 619 if success:
620 620 self.completed[engine].add(msg_id)
621 621 self.all_completed.add(msg_id)
622 622 else:
623 623 self.failed[engine].add(msg_id)
624 624 self.all_failed.add(msg_id)
625 625 self.all_done.add(msg_id)
626 626 self.destinations[msg_id] = engine
627 627
628 628 self.update_graph(msg_id, success)
629 629
630 630 def handle_unmet_dependency(self, idents, parent):
631 631 """handle an unmet dependency"""
632 632 engine = idents[0]
633 633 msg_id = parent['msg_id']
634 634
635 635 job = self.pending[engine].pop(msg_id)
636 636 job.blacklist.add(engine)
637 637
638 638 if job.blacklist == job.targets:
639 639 self.depending[msg_id] = job
640 640 self.fail_unreachable(msg_id)
641 641 elif not self.maybe_run(job):
642 642 # resubmit failed
643 643 if msg_id not in self.all_failed:
644 644 # put it back in our dependency tree
645 645 self.save_unmet(job)
646 646
647 647 if self.hwm:
648 648 try:
649 649 idx = self.targets.index(engine)
650 650 except ValueError:
651 651 pass # skip load-update for dead engines
652 652 else:
653 653 if self.loads[idx] == self.hwm-1:
654 654 self.update_graph(None)
655 655
656 656
657 657
658 658 def update_graph(self, dep_id=None, success=True):
659 659 """dep_id just finished. Update our dependency
660 660 graph and submit any jobs that just became runable.
661 661
662 662 Called with dep_id=None to update entire graph for hwm, but without finishing
663 663 a task.
664 664 """
665 665 # print ("\n\n***********")
666 666 # pprint (dep_id)
667 667 # pprint (self.graph)
668 668 # pprint (self.depending)
669 669 # pprint (self.all_completed)
670 670 # pprint (self.all_failed)
671 671 # print ("\n\n***********\n\n")
672 672 # update any jobs that depended on the dependency
673 673 jobs = self.graph.pop(dep_id, [])
674 674
675 675 # recheck *all* jobs if
676 676 # a) we have HWM and an engine just become no longer full
677 677 # or b) dep_id was given as None
678 678
679 679 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
680 680 jobs = self.depending.keys()
681 681
682 682 for msg_id in sorted(jobs, key=lambda msg_id: self.depending[msg_id].timestamp):
683 683 job = self.depending[msg_id]
684 684
685 685 if job.after.unreachable(self.all_completed, self.all_failed)\
686 686 or job.follow.unreachable(self.all_completed, self.all_failed):
687 687 self.fail_unreachable(msg_id)
688 688
689 689 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
690 690 if self.maybe_run(job):
691 691
692 692 self.depending.pop(msg_id)
693 693 for mid in job.dependents:
694 694 if mid in self.graph:
695 695 self.graph[mid].remove(msg_id)
696 696
697 697 #----------------------------------------------------------------------
698 698 # methods to be overridden by subclasses
699 699 #----------------------------------------------------------------------
700 700
701 701 def add_job(self, idx):
702 702 """Called after self.targets[idx] just got the job with header.
703 703 Override with subclasses. The default ordering is simple LRU.
704 704 The default loads are the number of outstanding jobs."""
705 705 self.loads[idx] += 1
706 706 for lis in (self.targets, self.loads):
707 707 lis.append(lis.pop(idx))
708 708
709 709
710 710 def finish_job(self, idx):
711 711 """Called after self.targets[idx] just finished a job.
712 712 Override with subclasses."""
713 713 self.loads[idx] -= 1
714 714
715 715
716 716
717 717 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
718 718 logname='root', log_url=None, loglevel=logging.DEBUG,
719 719 identity=b'task', in_thread=False):
720 720
721 721 ZMQStream = zmqstream.ZMQStream
722 722
723 723 if config:
724 724 # unwrap dict back into Config
725 725 config = Config(config)
726 726
727 727 if in_thread:
728 728 # use instance() to get the same Context/Loop as our parent
729 729 ctx = zmq.Context.instance()
730 730 loop = ioloop.IOLoop.instance()
731 731 else:
732 732 # in a process, don't use instance()
733 733 # for safety with multiprocessing
734 734 ctx = zmq.Context()
735 735 loop = ioloop.IOLoop()
736 736 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
737 737 ins.setsockopt(zmq.IDENTITY, identity)
738 738 ins.bind(in_addr)
739 739
740 740 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
741 741 outs.setsockopt(zmq.IDENTITY, identity)
742 742 outs.bind(out_addr)
743 743 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
744 744 mons.connect(mon_addr)
745 745 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
746 746 nots.setsockopt(zmq.SUBSCRIBE, b'')
747 747 nots.connect(not_addr)
748 748
749 749 # setup logging.
750 750 if in_thread:
751 751 log = Application.instance().log
752 752 else:
753 753 if log_url:
754 754 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
755 755 else:
756 756 log = local_logger(logname, loglevel)
757 757
758 758 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
759 759 mon_stream=mons, notifier_stream=nots,
760 760 loop=loop, log=log,
761 761 config=config)
762 762 scheduler.start()
763 763 if not in_thread:
764 764 try:
765 765 loop.start()
766 766 except KeyboardInterrupt:
767 767 scheduler.log.critical("Interrupted, exiting...")
768 768
@@ -1,68 +1,68 b''
1 1 #-----------------------------------------------------------------------------
2 2 # Copyright (C) 2010-2011 The IPython Development Team
3 3 #
4 4 # Distributed under the terms of the BSD License. The full license is in
5 5 # the file COPYING.txt, distributed as part of this software.
6 6 #-----------------------------------------------------------------------------
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Verify zmq version dependency >= 2.1.4
10 10 #-----------------------------------------------------------------------------
11 11
12 12 import warnings
13 13 from distutils.version import LooseVersion as V
14 14
15 15
16 16 def patch_pyzmq():
17 17 """backport a few patches from newer pyzmq
18 18
19 19 These can be removed as we bump our minimum pyzmq version
20 20 """
21 21
22 22 import zmq
23 23
24 24 # ioloop.install, introduced in pyzmq 2.1.7
25 25 from zmq.eventloop import ioloop
26 26
27 27 def install():
28 28 import tornado.ioloop
29 29 tornado.ioloop.IOLoop = ioloop.IOLoop
30 30
31 31 if not hasattr(ioloop, 'install'):
32 32 ioloop.install = install
33 33
34 34 # fix missing DEALER/ROUTER aliases in pyzmq < 2.1.9
35 35 if not hasattr(zmq, 'DEALER'):
36 zmq.DEALER = zmq.XREQ
36 zmq.DEALER = zmq.DEALER
37 37 if not hasattr(zmq, 'ROUTER'):
38 zmq.ROUTER = zmq.XREP
38 zmq.ROUTER = zmq.ROUTER
39 39
40 40 # fallback on stdlib json if jsonlib is selected, because jsonlib breaks things.
41 41 # jsonlib support is removed from pyzmq >= 2.2.0
42 42
43 43 from zmq.utils import jsonapi
44 44 if jsonapi.jsonmod.__name__ == 'jsonlib':
45 45 import json
46 46 jsonapi.jsonmod = json
47 47
48 48
49 49 def check_for_zmq(minimum_version, module='IPython.zmq'):
50 50 try:
51 51 import zmq
52 52 except ImportError:
53 53 raise ImportError("%s requires pyzmq >= %s"%(module, minimum_version))
54 54
55 55 pyzmq_version = zmq.__version__
56 56
57 57 if 'dev' not in pyzmq_version and V(pyzmq_version) < V(minimum_version):
58 58 raise ImportError("%s requires pyzmq >= %s, but you have %s"%(
59 59 module, minimum_version, pyzmq_version))
60 60
61 61 if V(zmq.zmq_version()) >= V('4.0.0'):
62 62 warnings.warn("""libzmq 4 detected.
63 63 It is unlikely that IPython's zmq code will work properly.
64 64 Please install libzmq stable, which is 2.1.x or 2.2.x""",
65 65 RuntimeWarning)
66 66
67 67 check_for_zmq('2.1.4')
68 68 patch_pyzmq()
@@ -1,208 +1,208 b''
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3 """
4 4
5 5 # Standard library imports.
6 6 import atexit
7 7 import json
8 8 import os
9 9 import socket
10 10 from subprocess import Popen, PIPE
11 11 import sys
12 12 import tempfile
13 13
14 14 # System library imports
15 15
16 16 # IPython imports
17 17 from IPython.utils.localinterfaces import LOCALHOST
18 18 from IPython.utils.py3compat import bytes_to_str
19 19
20 20 # Local imports.
21 21 from parentpoller import ParentPollerWindows
22 22
23 23 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
24 24 ip=LOCALHOST, key=b''):
25 25 """Generates a JSON config file, including the selection of random ports.
26 26
27 27 Parameters
28 28 ----------
29 29
30 30 fname : unicode
31 31 The path to the file to write
32 32
33 33 shell_port : int, optional
34 The port to use for XREP channel.
34 The port to use for ROUTER channel.
35 35
36 36 iopub_port : int, optional
37 37 The port to use for the SUB channel.
38 38
39 39 stdin_port : int, optional
40 40 The port to use for the REQ (raw input) channel.
41 41
42 42 hb_port : int, optional
43 43 The port to use for the hearbeat REP channel.
44 44
45 45 ip : str, optional
46 46 The ip address the kernel will bind to.
47 47
48 48 key : str, optional
49 49 The Session key used for HMAC authentication.
50 50
51 51 """
52 52 # default to temporary connector file
53 53 if not fname:
54 54 fname = tempfile.mktemp('.json')
55 55
56 56 # Find open ports as necessary.
57 57 ports = []
58 58 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
59 59 int(stdin_port <= 0) + int(hb_port <= 0)
60 60 for i in xrange(ports_needed):
61 61 sock = socket.socket()
62 62 sock.bind(('', 0))
63 63 ports.append(sock)
64 64 for i, sock in enumerate(ports):
65 65 port = sock.getsockname()[1]
66 66 sock.close()
67 67 ports[i] = port
68 68 if shell_port <= 0:
69 69 shell_port = ports.pop(0)
70 70 if iopub_port <= 0:
71 71 iopub_port = ports.pop(0)
72 72 if stdin_port <= 0:
73 73 stdin_port = ports.pop(0)
74 74 if hb_port <= 0:
75 75 hb_port = ports.pop(0)
76 76
77 77 cfg = dict( shell_port=shell_port,
78 78 iopub_port=iopub_port,
79 79 stdin_port=stdin_port,
80 80 hb_port=hb_port,
81 81 )
82 82 cfg['ip'] = ip
83 83 cfg['key'] = bytes_to_str(key)
84 84
85 85 with open(fname, 'w') as f:
86 86 f.write(json.dumps(cfg, indent=2))
87 87
88 88 return fname, cfg
89 89
90 90
91 91 def base_launch_kernel(code, fname, stdin=None, stdout=None, stderr=None,
92 92 executable=None, independent=False, extra_arguments=[]):
93 93 """ Launches a localhost kernel, binding to the specified ports.
94 94
95 95 Parameters
96 96 ----------
97 97 code : str,
98 98 A string of Python code that imports and executes a kernel entry point.
99 99
100 100 stdin, stdout, stderr : optional (default None)
101 101 Standards streams, as defined in subprocess.Popen.
102 102
103 103 fname : unicode, optional
104 104 The JSON connector file, containing ip/port/hmac key information.
105 105
106 106 key : str, optional
107 107 The Session key used for HMAC authentication.
108 108
109 109 executable : str, optional (default sys.executable)
110 110 The Python executable to use for the kernel process.
111 111
112 112 independent : bool, optional (default False)
113 113 If set, the kernel process is guaranteed to survive if this process
114 114 dies. If not set, an effort is made to ensure that the kernel is killed
115 115 when this process dies. Note that in this case it is still good practice
116 116 to kill kernels manually before exiting.
117 117
118 118 extra_arguments = list, optional
119 119 A list of extra arguments to pass when executing the launch code.
120 120
121 121 Returns
122 122 -------
123 123 A tuple of form:
124 124 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
125 125 where kernel_process is a Popen object and the ports are integers.
126 126 """
127 127
128 128 # Build the kernel launch command.
129 129 if executable is None:
130 130 executable = sys.executable
131 131 arguments = [ executable, '-c', code, '-f', fname ]
132 132 arguments.extend(extra_arguments)
133 133
134 134 # Popen will fail (sometimes with a deadlock) if stdin, stdout, and stderr
135 135 # are invalid. Unfortunately, there is in general no way to detect whether
136 136 # they are valid. The following two blocks redirect them to (temporary)
137 137 # pipes in certain important cases.
138 138
139 139 # If this process has been backgrounded, our stdin is invalid. Since there
140 140 # is no compelling reason for the kernel to inherit our stdin anyway, we'll
141 141 # place this one safe and always redirect.
142 142 redirect_in = True
143 143 _stdin = PIPE if stdin is None else stdin
144 144
145 145 # If this process in running on pythonw, we know that stdin, stdout, and
146 146 # stderr are all invalid.
147 147 redirect_out = sys.executable.endswith('pythonw.exe')
148 148 if redirect_out:
149 149 _stdout = PIPE if stdout is None else stdout
150 150 _stderr = PIPE if stderr is None else stderr
151 151 else:
152 152 _stdout, _stderr = stdout, stderr
153 153
154 154 # Spawn a kernel.
155 155 if sys.platform == 'win32':
156 156 # Create a Win32 event for interrupting the kernel.
157 157 interrupt_event = ParentPollerWindows.create_interrupt_event()
158 158 arguments += [ '--interrupt=%i'%interrupt_event ]
159 159
160 160 # If the kernel is running on pythonw and stdout/stderr are not been
161 161 # re-directed, it will crash when more than 4KB of data is written to
162 162 # stdout or stderr. This is a bug that has been with Python for a very
163 163 # long time; see http://bugs.python.org/issue706263.
164 164 # A cleaner solution to this problem would be to pass os.devnull to
165 165 # Popen directly. Unfortunately, that does not work.
166 166 if executable.endswith('pythonw.exe'):
167 167 if stdout is None:
168 168 arguments.append('--no-stdout')
169 169 if stderr is None:
170 170 arguments.append('--no-stderr')
171 171
172 172 # Launch the kernel process.
173 173 if independent:
174 174 proc = Popen(arguments,
175 175 creationflags=512, # CREATE_NEW_PROCESS_GROUP
176 176 stdin=_stdin, stdout=_stdout, stderr=_stderr)
177 177 else:
178 178 from _subprocess import DuplicateHandle, GetCurrentProcess, \
179 179 DUPLICATE_SAME_ACCESS
180 180 pid = GetCurrentProcess()
181 181 handle = DuplicateHandle(pid, pid, pid, 0,
182 182 True, # Inheritable by new processes.
183 183 DUPLICATE_SAME_ACCESS)
184 184 proc = Popen(arguments + ['--parent=%i'%int(handle)],
185 185 stdin=_stdin, stdout=_stdout, stderr=_stderr)
186 186
187 187 # Attach the interrupt event to the Popen objet so it can be used later.
188 188 proc.win32_interrupt_event = interrupt_event
189 189
190 190 else:
191 191 if independent:
192 192 proc = Popen(arguments, preexec_fn=lambda: os.setsid(),
193 193 stdin=_stdin, stdout=_stdout, stderr=_stderr)
194 194 else:
195 195 proc = Popen(arguments + ['--parent=1'],
196 196 stdin=_stdin, stdout=_stdout, stderr=_stderr)
197 197
198 198 # Clean up pipes created to work around Popen bug.
199 199 if redirect_in:
200 200 if stdin is None:
201 201 proc.stdin.close()
202 202 if redirect_out:
203 203 if stdout is None:
204 204 proc.stdout.close()
205 205 if stderr is None:
206 206 proc.stderr.close()
207 207
208 208 return proc
@@ -1,333 +1,333 b''
1 1 """An Application for launching a kernel
2 2
3 3 Authors
4 4 -------
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING.txt, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports
19 19 import atexit
20 20 import json
21 21 import os
22 22 import sys
23 23 import signal
24 24
25 25 # System library imports
26 26 import zmq
27 27 from zmq.eventloop import ioloop
28 28
29 29 # IPython imports
30 30 from IPython.core.ultratb import FormattedTB
31 31 from IPython.core.application import (
32 32 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
33 33 )
34 34 from IPython.utils import io
35 35 from IPython.utils.localinterfaces import LOCALHOST
36 36 from IPython.utils.path import filefind
37 37 from IPython.utils.py3compat import str_to_bytes
38 38 from IPython.utils.traitlets import (Any, Instance, Dict, Unicode, Integer, Bool,
39 39 DottedObjectName)
40 40 from IPython.utils.importstring import import_item
41 41 # local imports
42 42 from IPython.zmq.entry_point import write_connection_file
43 43 from IPython.zmq.heartbeat import Heartbeat
44 44 from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows
45 45 from IPython.zmq.session import (
46 46 Session, session_flags, session_aliases, default_secure,
47 47 )
48 48
49 49
50 50 #-----------------------------------------------------------------------------
51 51 # Flags and Aliases
52 52 #-----------------------------------------------------------------------------
53 53
54 54 kernel_aliases = dict(base_aliases)
55 55 kernel_aliases.update({
56 56 'ip' : 'KernelApp.ip',
57 57 'hb' : 'KernelApp.hb_port',
58 58 'shell' : 'KernelApp.shell_port',
59 59 'iopub' : 'KernelApp.iopub_port',
60 60 'stdin' : 'KernelApp.stdin_port',
61 61 'f' : 'KernelApp.connection_file',
62 62 'parent': 'KernelApp.parent',
63 63 })
64 64 if sys.platform.startswith('win'):
65 65 kernel_aliases['interrupt'] = 'KernelApp.interrupt'
66 66
67 67 kernel_flags = dict(base_flags)
68 68 kernel_flags.update({
69 69 'no-stdout' : (
70 70 {'KernelApp' : {'no_stdout' : True}},
71 71 "redirect stdout to the null device"),
72 72 'no-stderr' : (
73 73 {'KernelApp' : {'no_stderr' : True}},
74 74 "redirect stderr to the null device"),
75 75 })
76 76
77 77 # inherit flags&aliases for Sessions
78 78 kernel_aliases.update(session_aliases)
79 79 kernel_flags.update(session_flags)
80 80
81 81
82 82
83 83 #-----------------------------------------------------------------------------
84 84 # Application class for starting a Kernel
85 85 #-----------------------------------------------------------------------------
86 86
87 87 class KernelApp(BaseIPythonApplication):
88 88 name='ipkernel'
89 89 aliases = Dict(kernel_aliases)
90 90 flags = Dict(kernel_flags)
91 91 classes = [Session]
92 92 # the kernel class, as an importstring
93 93 kernel_class = DottedObjectName('IPython.zmq.ipkernel.Kernel')
94 94 kernel = Any()
95 95 poller = Any() # don't restrict this even though current pollers are all Threads
96 96 heartbeat = Instance(Heartbeat)
97 97 session = Instance('IPython.zmq.session.Session')
98 98 ports = Dict()
99 99 _full_connection_file = Unicode()
100 100
101 101 # inherit config file name from parent:
102 102 parent_appname = Unicode(config=True)
103 103 def _parent_appname_changed(self, name, old, new):
104 104 if self.config_file_specified:
105 105 # it was manually specified, ignore
106 106 return
107 107 self.config_file_name = new.replace('-','_') + u'_config.py'
108 108 # don't let this count as specifying the config file
109 109 self.config_file_specified = False
110 110
111 111 # connection info:
112 112 ip = Unicode(LOCALHOST, config=True,
113 113 help="Set the IP or interface on which the kernel will listen.")
114 114 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
115 shell_port = Integer(0, config=True, help="set the shell (XREP) port [default: random]")
115 shell_port = Integer(0, config=True, help="set the shell (ROUTER) port [default: random]")
116 116 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
117 stdin_port = Integer(0, config=True, help="set the stdin (XREQ) port [default: random]")
117 stdin_port = Integer(0, config=True, help="set the stdin (DEALER) port [default: random]")
118 118 connection_file = Unicode('', config=True,
119 119 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
120 120
121 121 This file will contain the IP, ports, and authentication key needed to connect
122 122 clients to this kernel. By default, this file will be created in the security-dir
123 123 of the current profile, but can be specified by absolute path.
124 124 """)
125 125
126 126 # streams, etc.
127 127 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
128 128 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
129 129 outstream_class = DottedObjectName('IPython.zmq.iostream.OutStream',
130 130 config=True, help="The importstring for the OutStream factory")
131 131 displayhook_class = DottedObjectName('IPython.zmq.displayhook.ZMQDisplayHook',
132 132 config=True, help="The importstring for the DisplayHook factory")
133 133
134 134 # polling
135 135 parent = Integer(0, config=True,
136 136 help="""kill this process if its parent dies. On Windows, the argument
137 137 specifies the HANDLE of the parent process, otherwise it is simply boolean.
138 138 """)
139 139 interrupt = Integer(0, config=True,
140 140 help="""ONLY USED ON WINDOWS
141 141 Interrupt this process when the parent is signalled.
142 142 """)
143 143
144 144 def init_crash_handler(self):
145 145 # Install minimal exception handling
146 146 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
147 147 ostream=sys.__stdout__)
148 148
149 149 def init_poller(self):
150 150 if sys.platform == 'win32':
151 151 if self.interrupt or self.parent:
152 152 self.poller = ParentPollerWindows(self.interrupt, self.parent)
153 153 elif self.parent:
154 154 self.poller = ParentPollerUnix()
155 155
156 156 def _bind_socket(self, s, port):
157 157 iface = 'tcp://%s' % self.ip
158 158 if port <= 0:
159 159 port = s.bind_to_random_port(iface)
160 160 else:
161 161 s.bind(iface + ':%i'%port)
162 162 return port
163 163
164 164 def load_connection_file(self):
165 165 """load ip/port/hmac config from JSON connection file"""
166 166 try:
167 167 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
168 168 except IOError:
169 169 self.log.debug("Connection file not found: %s", self.connection_file)
170 170 # This means I own it, so I will clean it up:
171 171 atexit.register(self.cleanup_connection_file)
172 172 return
173 173 self.log.debug(u"Loading connection file %s", fname)
174 174 with open(fname) as f:
175 175 s = f.read()
176 176 cfg = json.loads(s)
177 177 if self.ip == LOCALHOST and 'ip' in cfg:
178 178 # not overridden by config or cl_args
179 179 self.ip = cfg['ip']
180 180 for channel in ('hb', 'shell', 'iopub', 'stdin'):
181 181 name = channel + '_port'
182 182 if getattr(self, name) == 0 and name in cfg:
183 183 # not overridden by config or cl_args
184 184 setattr(self, name, cfg[name])
185 185 if 'key' in cfg:
186 186 self.config.Session.key = str_to_bytes(cfg['key'])
187 187
188 188 def write_connection_file(self):
189 189 """write connection info to JSON file"""
190 190 if os.path.basename(self.connection_file) == self.connection_file:
191 191 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
192 192 else:
193 193 cf = self.connection_file
194 194 write_connection_file(cf, ip=self.ip, key=self.session.key,
195 195 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
196 196 iopub_port=self.iopub_port)
197 197
198 198 self._full_connection_file = cf
199 199
200 200 def cleanup_connection_file(self):
201 201 cf = self._full_connection_file
202 202 self.log.debug("cleaning up connection file: %r", cf)
203 203 try:
204 204 os.remove(cf)
205 205 except (IOError, OSError):
206 206 pass
207 207
208 208 def init_connection_file(self):
209 209 if not self.connection_file:
210 210 self.connection_file = "kernel-%s.json"%os.getpid()
211 211 try:
212 212 self.load_connection_file()
213 213 except Exception:
214 214 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
215 215 self.exit(1)
216 216
217 217 def init_sockets(self):
218 218 # Create a context, a session, and the kernel sockets.
219 219 self.log.info("Starting the kernel at pid: %i", os.getpid())
220 220 context = zmq.Context.instance()
221 221 # Uncomment this to try closing the context.
222 222 # atexit.register(context.term)
223 223
224 224 self.shell_socket = context.socket(zmq.ROUTER)
225 225 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
226 226 self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port)
227 227
228 228 self.iopub_socket = context.socket(zmq.PUB)
229 229 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
230 230 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
231 231
232 232 self.stdin_socket = context.socket(zmq.ROUTER)
233 233 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
234 234 self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port)
235 235
236 236 def init_heartbeat(self):
237 237 """start the heart beating"""
238 238 # heartbeat doesn't share context, because it mustn't be blocked
239 239 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
240 240 hb_ctx = zmq.Context()
241 241 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
242 242 self.hb_port = self.heartbeat.port
243 243 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
244 244 self.heartbeat.start()
245 245
246 246 # Helper to make it easier to connect to an existing kernel.
247 247 # set log-level to critical, to make sure it is output
248 248 self.log.critical("To connect another client to this kernel, use:")
249 249
250 250 def log_connection_info(self):
251 251 """display connection info, and store ports"""
252 252 basename = os.path.basename(self.connection_file)
253 253 if basename == self.connection_file or \
254 254 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
255 255 # use shortname
256 256 tail = basename
257 257 if self.profile != 'default':
258 258 tail += " --profile %s" % self.profile
259 259 else:
260 260 tail = self.connection_file
261 261 self.log.critical("--existing %s", tail)
262 262
263 263
264 264 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
265 265 stdin=self.stdin_port, hb=self.hb_port)
266 266
267 267 def init_session(self):
268 268 """create our session object"""
269 269 default_secure(self.config)
270 270 self.session = Session(config=self.config, username=u'kernel')
271 271
272 272 def init_blackhole(self):
273 273 """redirects stdout/stderr to devnull if necessary"""
274 274 if self.no_stdout or self.no_stderr:
275 275 blackhole = open(os.devnull, 'w')
276 276 if self.no_stdout:
277 277 sys.stdout = sys.__stdout__ = blackhole
278 278 if self.no_stderr:
279 279 sys.stderr = sys.__stderr__ = blackhole
280 280
281 281 def init_io(self):
282 282 """Redirect input streams and set a display hook."""
283 283 if self.outstream_class:
284 284 outstream_factory = import_item(str(self.outstream_class))
285 285 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
286 286 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
287 287 if self.displayhook_class:
288 288 displayhook_factory = import_item(str(self.displayhook_class))
289 289 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
290 290
291 291 def init_signal(self):
292 292 signal.signal(signal.SIGINT, signal.SIG_IGN)
293 293
294 294 def init_kernel(self):
295 295 """Create the Kernel object itself"""
296 296 kernel_factory = import_item(str(self.kernel_class))
297 297 self.kernel = kernel_factory(config=self.config, session=self.session,
298 298 shell_socket=self.shell_socket,
299 299 iopub_socket=self.iopub_socket,
300 300 stdin_socket=self.stdin_socket,
301 301 log=self.log
302 302 )
303 303 self.kernel.record_ports(self.ports)
304 304
305 305 @catch_config_error
306 306 def initialize(self, argv=None):
307 307 super(KernelApp, self).initialize(argv)
308 308 self.init_blackhole()
309 309 self.init_connection_file()
310 310 self.init_session()
311 311 self.init_poller()
312 312 self.init_sockets()
313 313 self.init_heartbeat()
314 314 # writing/displaying connection info must be *after* init_sockets/heartbeat
315 315 self.log_connection_info()
316 316 self.write_connection_file()
317 317 self.init_io()
318 318 self.init_signal()
319 319 self.init_kernel()
320 320 # flush stdout/stderr, so that anything written to these streams during
321 321 # initialization do not get associated with the first execution request
322 322 sys.stdout.flush()
323 323 sys.stderr.flush()
324 324
325 325 def start(self):
326 326 if self.poller is not None:
327 327 self.poller.start()
328 328 self.kernel.start()
329 329 try:
330 330 ioloop.IOLoop.instance().start()
331 331 except KeyboardInterrupt:
332 332 pass
333 333
@@ -1,994 +1,994 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 import errno
21 21 import json
22 22 from subprocess import Popen
23 23 import os
24 24 import signal
25 25 import sys
26 26 from threading import Thread
27 27 import time
28 28
29 29 # System library imports.
30 30 import zmq
31 31 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
32 32 # during garbage collection of threads at exit:
33 33 from zmq import ZMQError
34 34 from zmq.eventloop import ioloop, zmqstream
35 35
36 36 # Local imports.
37 37 from IPython.config.loader import Config
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
44 44 from session import Session
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Constants and exceptions
48 48 #-----------------------------------------------------------------------------
49 49
50 50 class InvalidPortNumber(Exception):
51 51 pass
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Utility functions
55 55 #-----------------------------------------------------------------------------
56 56
57 57 # some utilities to validate message structure, these might get moved elsewhere
58 58 # if they prove to have more generic utility
59 59
60 60 def validate_string_list(lst):
61 61 """Validate that the input is a list of strings.
62 62
63 63 Raises ValueError if not."""
64 64 if not isinstance(lst, list):
65 65 raise ValueError('input %r must be a list' % lst)
66 66 for x in lst:
67 67 if not isinstance(x, basestring):
68 68 raise ValueError('element %r in list must be a string' % x)
69 69
70 70
71 71 def validate_string_dict(dct):
72 72 """Validate that the input is a dict with string keys and values.
73 73
74 74 Raises ValueError if not."""
75 75 for k,v in dct.iteritems():
76 76 if not isinstance(k, basestring):
77 77 raise ValueError('key %r in dict must be a string' % k)
78 78 if not isinstance(v, basestring):
79 79 raise ValueError('value %r in dict must be a string' % v)
80 80
81 81
82 82 #-----------------------------------------------------------------------------
83 83 # ZMQ Socket Channel classes
84 84 #-----------------------------------------------------------------------------
85 85
86 86 class ZMQSocketChannel(Thread):
87 87 """The base class for the channels that use ZMQ sockets.
88 88 """
89 89 context = None
90 90 session = None
91 91 socket = None
92 92 ioloop = None
93 93 stream = None
94 94 _address = None
95 95 _exiting = False
96 96
97 97 def __init__(self, context, session, address):
98 98 """Create a channel
99 99
100 100 Parameters
101 101 ----------
102 102 context : :class:`zmq.Context`
103 103 The ZMQ context to use.
104 104 session : :class:`session.Session`
105 105 The session to use.
106 106 address : tuple
107 107 Standard (ip, port) tuple that the kernel is listening on.
108 108 """
109 109 super(ZMQSocketChannel, self).__init__()
110 110 self.daemon = True
111 111
112 112 self.context = context
113 113 self.session = session
114 114 if address[1] == 0:
115 115 message = 'The port number for a channel cannot be 0.'
116 116 raise InvalidPortNumber(message)
117 117 self._address = address
118 118 atexit.register(self._notice_exit)
119 119
120 120 def _notice_exit(self):
121 121 self._exiting = True
122 122
123 123 def _run_loop(self):
124 124 """Run my loop, ignoring EINTR events in the poller"""
125 125 while True:
126 126 try:
127 127 self.ioloop.start()
128 128 except ZMQError as e:
129 129 if e.errno == errno.EINTR:
130 130 continue
131 131 else:
132 132 raise
133 133 except Exception:
134 134 if self._exiting:
135 135 break
136 136 else:
137 137 raise
138 138 else:
139 139 break
140 140
141 141 def stop(self):
142 142 """Stop the channel's activity.
143 143
144 144 This calls :method:`Thread.join` and returns when the thread
145 145 terminates. :class:`RuntimeError` will be raised if
146 146 :method:`self.start` is called again.
147 147 """
148 148 self.join()
149 149
150 150 @property
151 151 def address(self):
152 152 """Get the channel's address as an (ip, port) tuple.
153 153
154 154 By the default, the address is (localhost, 0), where 0 means a random
155 155 port.
156 156 """
157 157 return self._address
158 158
159 159 def _queue_send(self, msg):
160 160 """Queue a message to be sent from the IOLoop's thread.
161 161
162 162 Parameters
163 163 ----------
164 164 msg : message to send
165 165
166 166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 167 thread control of the action.
168 168 """
169 169 def thread_send():
170 170 self.session.send(self.stream, msg)
171 171 self.ioloop.add_callback(thread_send)
172 172
173 173 def _handle_recv(self, msg):
174 174 """callback for stream.on_recv
175 175
176 176 unpacks message, and calls handlers with it.
177 177 """
178 178 ident,smsg = self.session.feed_identities(msg)
179 179 self.call_handlers(self.session.unserialize(smsg))
180 180
181 181
182 182
183 183 class ShellSocketChannel(ZMQSocketChannel):
184 """The XREQ channel for issues request/replies to the kernel.
184 """The DEALER channel for issues request/replies to the kernel.
185 185 """
186 186
187 187 command_queue = None
188 188 # flag for whether execute requests should be allowed to call raw_input:
189 189 allow_stdin = True
190 190
191 191 def __init__(self, context, session, address):
192 192 super(ShellSocketChannel, self).__init__(context, session, address)
193 193 self.ioloop = ioloop.IOLoop()
194 194
195 195 def run(self):
196 196 """The thread's main activity. Call start() instead."""
197 197 self.socket = self.context.socket(zmq.DEALER)
198 198 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
199 199 self.socket.connect('tcp://%s:%i' % self.address)
200 200 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
201 201 self.stream.on_recv(self._handle_recv)
202 202 self._run_loop()
203 203 try:
204 204 self.socket.close()
205 205 except:
206 206 pass
207 207
208 208 def stop(self):
209 209 self.ioloop.stop()
210 210 super(ShellSocketChannel, self).stop()
211 211
212 212 def call_handlers(self, msg):
213 213 """This method is called in the ioloop thread when a message arrives.
214 214
215 215 Subclasses should override this method to handle incoming messages.
216 216 It is important to remember that this method is called in the thread
217 217 so that some logic must be done to ensure that the application leve
218 218 handlers are called in the application thread.
219 219 """
220 220 raise NotImplementedError('call_handlers must be defined in a subclass.')
221 221
222 222 def execute(self, code, silent=False,
223 223 user_variables=None, user_expressions=None, allow_stdin=None):
224 224 """Execute code in the kernel.
225 225
226 226 Parameters
227 227 ----------
228 228 code : str
229 229 A string of Python code.
230 230
231 231 silent : bool, optional (default False)
232 232 If set, the kernel will execute the code as quietly possible.
233 233
234 234 user_variables : list, optional
235 235 A list of variable names to pull from the user's namespace. They
236 236 will come back as a dict with these names as keys and their
237 237 :func:`repr` as values.
238 238
239 239 user_expressions : dict, optional
240 240 A dict with string keys and to pull from the user's
241 241 namespace. They will come back as a dict with these names as keys
242 242 and their :func:`repr` as values.
243 243
244 244 allow_stdin : bool, optional
245 245 Flag for
246 246 A dict with string keys and to pull from the user's
247 247 namespace. They will come back as a dict with these names as keys
248 248 and their :func:`repr` as values.
249 249
250 250 Returns
251 251 -------
252 252 The msg_id of the message sent.
253 253 """
254 254 if user_variables is None:
255 255 user_variables = []
256 256 if user_expressions is None:
257 257 user_expressions = {}
258 258 if allow_stdin is None:
259 259 allow_stdin = self.allow_stdin
260 260
261 261
262 262 # Don't waste network traffic if inputs are invalid
263 263 if not isinstance(code, basestring):
264 264 raise ValueError('code %r must be a string' % code)
265 265 validate_string_list(user_variables)
266 266 validate_string_dict(user_expressions)
267 267
268 268 # Create class for content/msg creation. Related to, but possibly
269 269 # not in Session.
270 270 content = dict(code=code, silent=silent,
271 271 user_variables=user_variables,
272 272 user_expressions=user_expressions,
273 273 allow_stdin=allow_stdin,
274 274 )
275 275 msg = self.session.msg('execute_request', content)
276 276 self._queue_send(msg)
277 277 return msg['header']['msg_id']
278 278
279 279 def complete(self, text, line, cursor_pos, block=None):
280 280 """Tab complete text in the kernel's namespace.
281 281
282 282 Parameters
283 283 ----------
284 284 text : str
285 285 The text to complete.
286 286 line : str
287 287 The full line of text that is the surrounding context for the
288 288 text to complete.
289 289 cursor_pos : int
290 290 The position of the cursor in the line where the completion was
291 291 requested.
292 292 block : str, optional
293 293 The full block of code in which the completion is being requested.
294 294
295 295 Returns
296 296 -------
297 297 The msg_id of the message sent.
298 298 """
299 299 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
300 300 msg = self.session.msg('complete_request', content)
301 301 self._queue_send(msg)
302 302 return msg['header']['msg_id']
303 303
304 304 def object_info(self, oname, detail_level=0):
305 305 """Get metadata information about an object.
306 306
307 307 Parameters
308 308 ----------
309 309 oname : str
310 310 A string specifying the object name.
311 311 detail_level : int, optional
312 312 The level of detail for the introspection (0-2)
313 313
314 314 Returns
315 315 -------
316 316 The msg_id of the message sent.
317 317 """
318 318 content = dict(oname=oname, detail_level=detail_level)
319 319 msg = self.session.msg('object_info_request', content)
320 320 self._queue_send(msg)
321 321 return msg['header']['msg_id']
322 322
323 323 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
324 324 """Get entries from the history list.
325 325
326 326 Parameters
327 327 ----------
328 328 raw : bool
329 329 If True, return the raw input.
330 330 output : bool
331 331 If True, then return the output as well.
332 332 hist_access_type : str
333 333 'range' (fill in session, start and stop params), 'tail' (fill in n)
334 334 or 'search' (fill in pattern param).
335 335
336 336 session : int
337 337 For a range request, the session from which to get lines. Session
338 338 numbers are positive integers; negative ones count back from the
339 339 current session.
340 340 start : int
341 341 The first line number of a history range.
342 342 stop : int
343 343 The final (excluded) line number of a history range.
344 344
345 345 n : int
346 346 The number of lines of history to get for a tail request.
347 347
348 348 pattern : str
349 349 The glob-syntax pattern for a search request.
350 350
351 351 Returns
352 352 -------
353 353 The msg_id of the message sent.
354 354 """
355 355 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
356 356 **kwargs)
357 357 msg = self.session.msg('history_request', content)
358 358 self._queue_send(msg)
359 359 return msg['header']['msg_id']
360 360
361 361 def shutdown(self, restart=False):
362 362 """Request an immediate kernel shutdown.
363 363
364 364 Upon receipt of the (empty) reply, client code can safely assume that
365 365 the kernel has shut down and it's safe to forcefully terminate it if
366 366 it's still alive.
367 367
368 368 The kernel will send the reply via a function registered with Python's
369 369 atexit module, ensuring it's truly done as the kernel is done with all
370 370 normal operation.
371 371 """
372 372 # Send quit message to kernel. Once we implement kernel-side setattr,
373 373 # this should probably be done that way, but for now this will do.
374 374 msg = self.session.msg('shutdown_request', {'restart':restart})
375 375 self._queue_send(msg)
376 376 return msg['header']['msg_id']
377 377
378 378
379 379
380 380 class SubSocketChannel(ZMQSocketChannel):
381 381 """The SUB channel which listens for messages that the kernel publishes.
382 382 """
383 383
384 384 def __init__(self, context, session, address):
385 385 super(SubSocketChannel, self).__init__(context, session, address)
386 386 self.ioloop = ioloop.IOLoop()
387 387
388 388 def run(self):
389 389 """The thread's main activity. Call start() instead."""
390 390 self.socket = self.context.socket(zmq.SUB)
391 391 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
392 392 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
393 393 self.socket.connect('tcp://%s:%i' % self.address)
394 394 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
395 395 self.stream.on_recv(self._handle_recv)
396 396 self._run_loop()
397 397 try:
398 398 self.socket.close()
399 399 except:
400 400 pass
401 401
402 402 def stop(self):
403 403 self.ioloop.stop()
404 404 super(SubSocketChannel, self).stop()
405 405
406 406 def call_handlers(self, msg):
407 407 """This method is called in the ioloop thread when a message arrives.
408 408
409 409 Subclasses should override this method to handle incoming messages.
410 410 It is important to remember that this method is called in the thread
411 411 so that some logic must be done to ensure that the application leve
412 412 handlers are called in the application thread.
413 413 """
414 414 raise NotImplementedError('call_handlers must be defined in a subclass.')
415 415
416 416 def flush(self, timeout=1.0):
417 417 """Immediately processes all pending messages on the SUB channel.
418 418
419 419 Callers should use this method to ensure that :method:`call_handlers`
420 420 has been called for all messages that have been received on the
421 421 0MQ SUB socket of this channel.
422 422
423 423 This method is thread safe.
424 424
425 425 Parameters
426 426 ----------
427 427 timeout : float, optional
428 428 The maximum amount of time to spend flushing, in seconds. The
429 429 default is one second.
430 430 """
431 431 # We do the IOLoop callback process twice to ensure that the IOLoop
432 432 # gets to perform at least one full poll.
433 433 stop_time = time.time() + timeout
434 434 for i in xrange(2):
435 435 self._flushed = False
436 436 self.ioloop.add_callback(self._flush)
437 437 while not self._flushed and time.time() < stop_time:
438 438 time.sleep(0.01)
439 439
440 440 def _flush(self):
441 441 """Callback for :method:`self.flush`."""
442 442 self.stream.flush()
443 443 self._flushed = True
444 444
445 445
446 446 class StdInSocketChannel(ZMQSocketChannel):
447 447 """A reply channel to handle raw_input requests that the kernel makes."""
448 448
449 449 msg_queue = None
450 450
451 451 def __init__(self, context, session, address):
452 452 super(StdInSocketChannel, self).__init__(context, session, address)
453 453 self.ioloop = ioloop.IOLoop()
454 454
455 455 def run(self):
456 456 """The thread's main activity. Call start() instead."""
457 457 self.socket = self.context.socket(zmq.DEALER)
458 458 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
459 459 self.socket.connect('tcp://%s:%i' % self.address)
460 460 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
461 461 self.stream.on_recv(self._handle_recv)
462 462 self._run_loop()
463 463 try:
464 464 self.socket.close()
465 465 except:
466 466 pass
467 467
468 468
469 469 def stop(self):
470 470 self.ioloop.stop()
471 471 super(StdInSocketChannel, self).stop()
472 472
473 473 def call_handlers(self, msg):
474 474 """This method is called in the ioloop thread when a message arrives.
475 475
476 476 Subclasses should override this method to handle incoming messages.
477 477 It is important to remember that this method is called in the thread
478 478 so that some logic must be done to ensure that the application leve
479 479 handlers are called in the application thread.
480 480 """
481 481 raise NotImplementedError('call_handlers must be defined in a subclass.')
482 482
483 483 def input(self, string):
484 484 """Send a string of raw input to the kernel."""
485 485 content = dict(value=string)
486 486 msg = self.session.msg('input_reply', content)
487 487 self._queue_send(msg)
488 488
489 489
490 490 class HBSocketChannel(ZMQSocketChannel):
491 491 """The heartbeat channel which monitors the kernel heartbeat.
492 492
493 493 Note that the heartbeat channel is paused by default. As long as you start
494 494 this channel, the kernel manager will ensure that it is paused and un-paused
495 495 as appropriate.
496 496 """
497 497
498 498 time_to_dead = 3.0
499 499 socket = None
500 500 poller = None
501 501 _running = None
502 502 _pause = None
503 503 _beating = None
504 504
505 505 def __init__(self, context, session, address):
506 506 super(HBSocketChannel, self).__init__(context, session, address)
507 507 self._running = False
508 508 self._pause =True
509 509 self.poller = zmq.Poller()
510 510
511 511 def _create_socket(self):
512 512 if self.socket is not None:
513 513 # close previous socket, before opening a new one
514 514 self.poller.unregister(self.socket)
515 515 self.socket.close()
516 516 self.socket = self.context.socket(zmq.REQ)
517 517 self.socket.setsockopt(zmq.LINGER, 0)
518 518 self.socket.connect('tcp://%s:%i' % self.address)
519 519
520 520 self.poller.register(self.socket, zmq.POLLIN)
521 521
522 522 def _poll(self, start_time):
523 523 """poll for heartbeat replies until we reach self.time_to_dead
524 524
525 525 Ignores interrupts, and returns the result of poll(), which
526 526 will be an empty list if no messages arrived before the timeout,
527 527 or the event tuple if there is a message to receive.
528 528 """
529 529
530 530 until_dead = self.time_to_dead - (time.time() - start_time)
531 531 # ensure poll at least once
532 532 until_dead = max(until_dead, 1e-3)
533 533 events = []
534 534 while True:
535 535 try:
536 536 events = self.poller.poll(1000 * until_dead)
537 537 except ZMQError as e:
538 538 if e.errno == errno.EINTR:
539 539 # ignore interrupts during heartbeat
540 540 # this may never actually happen
541 541 until_dead = self.time_to_dead - (time.time() - start_time)
542 542 until_dead = max(until_dead, 1e-3)
543 543 pass
544 544 else:
545 545 raise
546 546 except Exception:
547 547 if self._exiting:
548 548 break
549 549 else:
550 550 raise
551 551 else:
552 552 break
553 553 return events
554 554
555 555 def run(self):
556 556 """The thread's main activity. Call start() instead."""
557 557 self._create_socket()
558 558 self._running = True
559 559 self._beating = True
560 560
561 561 while self._running:
562 562 if self._pause:
563 563 # just sleep, and skip the rest of the loop
564 564 time.sleep(self.time_to_dead)
565 565 continue
566 566
567 567 since_last_heartbeat = 0.0
568 568 # io.rprint('Ping from HB channel') # dbg
569 569 # no need to catch EFSM here, because the previous event was
570 570 # either a recv or connect, which cannot be followed by EFSM
571 571 self.socket.send(b'ping')
572 572 request_time = time.time()
573 573 ready = self._poll(request_time)
574 574 if ready:
575 575 self._beating = True
576 576 # the poll above guarantees we have something to recv
577 577 self.socket.recv()
578 578 # sleep the remainder of the cycle
579 579 remainder = self.time_to_dead - (time.time() - request_time)
580 580 if remainder > 0:
581 581 time.sleep(remainder)
582 582 continue
583 583 else:
584 584 # nothing was received within the time limit, signal heart failure
585 585 self._beating = False
586 586 since_last_heartbeat = time.time() - request_time
587 587 self.call_handlers(since_last_heartbeat)
588 588 # and close/reopen the socket, because the REQ/REP cycle has been broken
589 589 self._create_socket()
590 590 continue
591 591 try:
592 592 self.socket.close()
593 593 except:
594 594 pass
595 595
596 596 def pause(self):
597 597 """Pause the heartbeat."""
598 598 self._pause = True
599 599
600 600 def unpause(self):
601 601 """Unpause the heartbeat."""
602 602 self._pause = False
603 603
604 604 def is_beating(self):
605 605 """Is the heartbeat running and responsive (and not paused)."""
606 606 if self.is_alive() and not self._pause and self._beating:
607 607 return True
608 608 else:
609 609 return False
610 610
611 611 def stop(self):
612 612 self._running = False
613 613 super(HBSocketChannel, self).stop()
614 614
615 615 def call_handlers(self, since_last_heartbeat):
616 616 """This method is called in the ioloop thread when a message arrives.
617 617
618 618 Subclasses should override this method to handle incoming messages.
619 619 It is important to remember that this method is called in the thread
620 620 so that some logic must be done to ensure that the application level
621 621 handlers are called in the application thread.
622 622 """
623 623 raise NotImplementedError('call_handlers must be defined in a subclass.')
624 624
625 625
626 626 #-----------------------------------------------------------------------------
627 627 # Main kernel manager class
628 628 #-----------------------------------------------------------------------------
629 629
630 630 class KernelManager(HasTraits):
631 631 """ Manages a kernel for a frontend.
632 632
633 633 The SUB channel is for the frontend to receive messages published by the
634 634 kernel.
635 635
636 636 The REQ channel is for the frontend to make requests of the kernel.
637 637
638 638 The REP channel is for the kernel to request stdin (raw_input) from the
639 639 frontend.
640 640 """
641 641 # config object for passing to child configurables
642 642 config = Instance(Config)
643 643
644 644 # The PyZMQ Context to use for communication with the kernel.
645 645 context = Instance(zmq.Context)
646 646 def _context_default(self):
647 647 return zmq.Context.instance()
648 648
649 649 # The Session to use for communication with the kernel.
650 650 session = Instance(Session)
651 651
652 652 # The kernel process with which the KernelManager is communicating.
653 653 kernel = Instance(Popen)
654 654
655 655 # The addresses for the communication channels.
656 656 connection_file = Unicode('')
657 657 ip = Unicode(LOCALHOST)
658 658 def _ip_changed(self, name, old, new):
659 659 if new == '*':
660 660 self.ip = '0.0.0.0'
661 661 shell_port = Integer(0)
662 662 iopub_port = Integer(0)
663 663 stdin_port = Integer(0)
664 664 hb_port = Integer(0)
665 665
666 666 # The classes to use for the various channels.
667 667 shell_channel_class = Type(ShellSocketChannel)
668 668 sub_channel_class = Type(SubSocketChannel)
669 669 stdin_channel_class = Type(StdInSocketChannel)
670 670 hb_channel_class = Type(HBSocketChannel)
671 671
672 672 # Protected traits.
673 673 _launch_args = Any
674 674 _shell_channel = Any
675 675 _sub_channel = Any
676 676 _stdin_channel = Any
677 677 _hb_channel = Any
678 678 _connection_file_written=Bool(False)
679 679
680 680 def __init__(self, **kwargs):
681 681 super(KernelManager, self).__init__(**kwargs)
682 682 if self.session is None:
683 683 self.session = Session(config=self.config)
684 684
685 685 def __del__(self):
686 686 self.cleanup_connection_file()
687 687
688 688
689 689 #--------------------------------------------------------------------------
690 690 # Channel management methods:
691 691 #--------------------------------------------------------------------------
692 692
693 693 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
694 694 """Starts the channels for this kernel.
695 695
696 696 This will create the channels if they do not exist and then start
697 697 them. If port numbers of 0 are being used (random ports) then you
698 698 must first call :method:`start_kernel`. If the channels have been
699 699 stopped and you call this, :class:`RuntimeError` will be raised.
700 700 """
701 701 if shell:
702 702 self.shell_channel.start()
703 703 if sub:
704 704 self.sub_channel.start()
705 705 if stdin:
706 706 self.stdin_channel.start()
707 707 self.shell_channel.allow_stdin = True
708 708 else:
709 709 self.shell_channel.allow_stdin = False
710 710 if hb:
711 711 self.hb_channel.start()
712 712
713 713 def stop_channels(self):
714 714 """Stops all the running channels for this kernel.
715 715 """
716 716 if self.shell_channel.is_alive():
717 717 self.shell_channel.stop()
718 718 if self.sub_channel.is_alive():
719 719 self.sub_channel.stop()
720 720 if self.stdin_channel.is_alive():
721 721 self.stdin_channel.stop()
722 722 if self.hb_channel.is_alive():
723 723 self.hb_channel.stop()
724 724
725 725 @property
726 726 def channels_running(self):
727 727 """Are any of the channels created and running?"""
728 728 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
729 729 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
730 730
731 731 #--------------------------------------------------------------------------
732 732 # Kernel process management methods:
733 733 #--------------------------------------------------------------------------
734 734
735 735 def cleanup_connection_file(self):
736 736 """cleanup connection file *if we wrote it*
737 737
738 738 Will not raise if the connection file was already removed somehow.
739 739 """
740 740 if self._connection_file_written:
741 741 # cleanup connection files on full shutdown of kernel we started
742 742 self._connection_file_written = False
743 743 try:
744 744 os.remove(self.connection_file)
745 745 except OSError:
746 746 pass
747 747
748 748 def load_connection_file(self):
749 749 """load connection info from JSON dict in self.connection_file"""
750 750 with open(self.connection_file) as f:
751 751 cfg = json.loads(f.read())
752 752
753 753 self.ip = cfg['ip']
754 754 self.shell_port = cfg['shell_port']
755 755 self.stdin_port = cfg['stdin_port']
756 756 self.iopub_port = cfg['iopub_port']
757 757 self.hb_port = cfg['hb_port']
758 758 self.session.key = str_to_bytes(cfg['key'])
759 759
760 760 def write_connection_file(self):
761 761 """write connection info to JSON dict in self.connection_file"""
762 762 if self._connection_file_written:
763 763 return
764 764 self.connection_file,cfg = write_connection_file(self.connection_file,
765 765 ip=self.ip, key=self.session.key,
766 766 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
767 767 shell_port=self.shell_port, hb_port=self.hb_port)
768 768 # write_connection_file also sets default ports:
769 769 self.shell_port = cfg['shell_port']
770 770 self.stdin_port = cfg['stdin_port']
771 771 self.iopub_port = cfg['iopub_port']
772 772 self.hb_port = cfg['hb_port']
773 773
774 774 self._connection_file_written = True
775 775
776 776 def start_kernel(self, **kw):
777 777 """Starts a kernel process and configures the manager to use it.
778 778
779 779 If random ports (port=0) are being used, this method must be called
780 780 before the channels are created.
781 781
782 782 Parameters:
783 783 -----------
784 784 launcher : callable, optional (default None)
785 785 A custom function for launching the kernel process (generally a
786 786 wrapper around ``entry_point.base_launch_kernel``). In most cases,
787 787 it should not be necessary to use this parameter.
788 788
789 789 **kw : optional
790 790 See respective options for IPython and Python kernels.
791 791 """
792 792 if self.ip not in LOCAL_IPS:
793 793 raise RuntimeError("Can only launch a kernel on a local interface. "
794 794 "Make sure that the '*_address' attributes are "
795 795 "configured properly. "
796 796 "Currently valid addresses are: %s"%LOCAL_IPS
797 797 )
798 798
799 799 # write connection file / get default ports
800 800 self.write_connection_file()
801 801
802 802 self._launch_args = kw.copy()
803 803 launch_kernel = kw.pop('launcher', None)
804 804 if launch_kernel is None:
805 805 from ipkernel import launch_kernel
806 806 self.kernel = launch_kernel(fname=self.connection_file, **kw)
807 807
808 808 def shutdown_kernel(self, restart=False):
809 809 """ Attempts to the stop the kernel process cleanly. If the kernel
810 810 cannot be stopped, it is killed, if possible.
811 811 """
812 812 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
813 813 if sys.platform == 'win32':
814 814 self.kill_kernel()
815 815 return
816 816
817 817 # Pause the heart beat channel if it exists.
818 818 if self._hb_channel is not None:
819 819 self._hb_channel.pause()
820 820
821 821 # Don't send any additional kernel kill messages immediately, to give
822 822 # the kernel a chance to properly execute shutdown actions. Wait for at
823 823 # most 1s, checking every 0.1s.
824 824 self.shell_channel.shutdown(restart=restart)
825 825 for i in range(10):
826 826 if self.is_alive:
827 827 time.sleep(0.1)
828 828 else:
829 829 break
830 830 else:
831 831 # OK, we've waited long enough.
832 832 if self.has_kernel:
833 833 self.kill_kernel()
834 834
835 835 if not restart and self._connection_file_written:
836 836 # cleanup connection files on full shutdown of kernel we started
837 837 self._connection_file_written = False
838 838 try:
839 839 os.remove(self.connection_file)
840 840 except IOError:
841 841 pass
842 842
843 843 def restart_kernel(self, now=False, **kw):
844 844 """Restarts a kernel with the arguments that were used to launch it.
845 845
846 846 If the old kernel was launched with random ports, the same ports will be
847 847 used for the new kernel.
848 848
849 849 Parameters
850 850 ----------
851 851 now : bool, optional
852 852 If True, the kernel is forcefully restarted *immediately*, without
853 853 having a chance to do any cleanup action. Otherwise the kernel is
854 854 given 1s to clean up before a forceful restart is issued.
855 855
856 856 In all cases the kernel is restarted, the only difference is whether
857 857 it is given a chance to perform a clean shutdown or not.
858 858
859 859 **kw : optional
860 860 Any options specified here will replace those used to launch the
861 861 kernel.
862 862 """
863 863 if self._launch_args is None:
864 864 raise RuntimeError("Cannot restart the kernel. "
865 865 "No previous call to 'start_kernel'.")
866 866 else:
867 867 # Stop currently running kernel.
868 868 if self.has_kernel:
869 869 if now:
870 870 self.kill_kernel()
871 871 else:
872 872 self.shutdown_kernel(restart=True)
873 873
874 874 # Start new kernel.
875 875 self._launch_args.update(kw)
876 876 self.start_kernel(**self._launch_args)
877 877
878 878 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
879 879 # unless there is some delay here.
880 880 if sys.platform == 'win32':
881 881 time.sleep(0.2)
882 882
883 883 @property
884 884 def has_kernel(self):
885 885 """Returns whether a kernel process has been specified for the kernel
886 886 manager.
887 887 """
888 888 return self.kernel is not None
889 889
890 890 def kill_kernel(self):
891 891 """ Kill the running kernel. """
892 892 if self.has_kernel:
893 893 # Pause the heart beat channel if it exists.
894 894 if self._hb_channel is not None:
895 895 self._hb_channel.pause()
896 896
897 897 # Attempt to kill the kernel.
898 898 try:
899 899 self.kernel.kill()
900 900 except OSError, e:
901 901 # In Windows, we will get an Access Denied error if the process
902 902 # has already terminated. Ignore it.
903 903 if sys.platform == 'win32':
904 904 if e.winerror != 5:
905 905 raise
906 906 # On Unix, we may get an ESRCH error if the process has already
907 907 # terminated. Ignore it.
908 908 else:
909 909 from errno import ESRCH
910 910 if e.errno != ESRCH:
911 911 raise
912 912 self.kernel = None
913 913 else:
914 914 raise RuntimeError("Cannot kill kernel. No kernel is running!")
915 915
916 916 def interrupt_kernel(self):
917 917 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
918 918 well supported on all platforms.
919 919 """
920 920 if self.has_kernel:
921 921 if sys.platform == 'win32':
922 922 from parentpoller import ParentPollerWindows as Poller
923 923 Poller.send_interrupt(self.kernel.win32_interrupt_event)
924 924 else:
925 925 self.kernel.send_signal(signal.SIGINT)
926 926 else:
927 927 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
928 928
929 929 def signal_kernel(self, signum):
930 930 """ Sends a signal to the kernel. Note that since only SIGTERM is
931 931 supported on Windows, this function is only useful on Unix systems.
932 932 """
933 933 if self.has_kernel:
934 934 self.kernel.send_signal(signum)
935 935 else:
936 936 raise RuntimeError("Cannot signal kernel. No kernel is running!")
937 937
938 938 @property
939 939 def is_alive(self):
940 940 """Is the kernel process still running?"""
941 941 if self.has_kernel:
942 942 if self.kernel.poll() is None:
943 943 return True
944 944 else:
945 945 return False
946 946 elif self._hb_channel is not None:
947 947 # We didn't start the kernel with this KernelManager so we
948 948 # use the heartbeat.
949 949 return self._hb_channel.is_beating()
950 950 else:
951 951 # no heartbeat and not local, we can't tell if it's running,
952 952 # so naively return True
953 953 return True
954 954
955 955 #--------------------------------------------------------------------------
956 956 # Channels used for communication with the kernel:
957 957 #--------------------------------------------------------------------------
958 958
959 959 @property
960 960 def shell_channel(self):
961 961 """Get the REQ socket channel object to make requests of the kernel."""
962 962 if self._shell_channel is None:
963 963 self._shell_channel = self.shell_channel_class(self.context,
964 964 self.session,
965 965 (self.ip, self.shell_port))
966 966 return self._shell_channel
967 967
968 968 @property
969 969 def sub_channel(self):
970 970 """Get the SUB socket channel object."""
971 971 if self._sub_channel is None:
972 972 self._sub_channel = self.sub_channel_class(self.context,
973 973 self.session,
974 974 (self.ip, self.iopub_port))
975 975 return self._sub_channel
976 976
977 977 @property
978 978 def stdin_channel(self):
979 979 """Get the REP socket channel object to handle stdin (raw_input)."""
980 980 if self._stdin_channel is None:
981 981 self._stdin_channel = self.stdin_channel_class(self.context,
982 982 self.session,
983 983 (self.ip, self.stdin_port))
984 984 return self._stdin_channel
985 985
986 986 @property
987 987 def hb_channel(self):
988 988 """Get the heartbeat socket channel object to check that the
989 989 kernel is alive."""
990 990 if self._hb_channel is None:
991 991 self._hb_channel = self.hb_channel_class(self.context,
992 992 self.session,
993 993 (self.ip, self.hb_port))
994 994 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now