##// END OF EJS Templates
json/jsonapi cleanup...
MinRK -
Show More
@@ -1,449 +1,445 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 import json
26 27 import os
27 28 import socket
28 29 import stat
29 30 import sys
30 31
31 32 from multiprocessing import Process
32 33
33 34 import zmq
34 35 from zmq.devices import ProcessMonitoredQueue
35 36 from zmq.log.handlers import PUBHandler
36 37
37 # Note: use our own import to work around jsonlib api mismatch. When these
38 # changes propagate to zmq, revert back to the following line instead:
39 #from zmq.utils import jsonapi as json
40 from IPython.zmq import jsonapi as json
41
42 38 from IPython.core.profiledir import ProfileDir
43 39
44 40 from IPython.parallel.apps.baseapp import (
45 41 BaseParallelApplication,
46 42 base_aliases,
47 43 base_flags,
48 44 catch_config_error,
49 45 )
50 46 from IPython.utils.importstring import import_item
51 47 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
52 48
53 49 from IPython.zmq.session import (
54 50 Session, session_aliases, session_flags, default_secure
55 51 )
56 52
57 53 from IPython.parallel.controller.heartmonitor import HeartMonitor
58 54 from IPython.parallel.controller.hub import HubFactory
59 55 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
60 56 from IPython.parallel.controller.sqlitedb import SQLiteDB
61 57
62 58 from IPython.parallel.util import signal_children, split_url, asbytes, disambiguate_url
63 59
64 60 # conditional import of MongoDB backend class
65 61
66 62 try:
67 63 from IPython.parallel.controller.mongodb import MongoDB
68 64 except ImportError:
69 65 maybe_mongo = []
70 66 else:
71 67 maybe_mongo = [MongoDB]
72 68
73 69
74 70 #-----------------------------------------------------------------------------
75 71 # Module level variables
76 72 #-----------------------------------------------------------------------------
77 73
78 74
79 75 #: The default config file name for this application
80 76 default_config_file_name = u'ipcontroller_config.py'
81 77
82 78
83 79 _description = """Start the IPython controller for parallel computing.
84 80
85 81 The IPython controller provides a gateway between the IPython engines and
86 82 clients. The controller needs to be started before the engines and can be
87 83 configured using command line options or using a cluster directory. Cluster
88 84 directories contain config, log and security files and are usually located in
89 85 your ipython directory and named as "profile_name". See the `profile`
90 86 and `profile-dir` options for details.
91 87 """
92 88
93 89 _examples = """
94 90 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
95 91 ipcontroller --scheme=pure # use the pure zeromq scheduler
96 92 """
97 93
98 94
99 95 #-----------------------------------------------------------------------------
100 96 # The main application
101 97 #-----------------------------------------------------------------------------
102 98 flags = {}
103 99 flags.update(base_flags)
104 100 flags.update({
105 101 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
106 102 'Use threads instead of processes for the schedulers'),
107 103 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
108 104 'use the SQLiteDB backend'),
109 105 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
110 106 'use the MongoDB backend'),
111 107 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
112 108 'use the in-memory DictDB backend'),
113 109 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
114 110 'reuse existing json connection files')
115 111 })
116 112
117 113 flags.update(session_flags)
118 114
119 115 aliases = dict(
120 116 ssh = 'IPControllerApp.ssh_server',
121 117 enginessh = 'IPControllerApp.engine_ssh_server',
122 118 location = 'IPControllerApp.location',
123 119
124 120 url = 'HubFactory.url',
125 121 ip = 'HubFactory.ip',
126 122 transport = 'HubFactory.transport',
127 123 port = 'HubFactory.regport',
128 124
129 125 ping = 'HeartMonitor.period',
130 126
131 127 scheme = 'TaskScheduler.scheme_name',
132 128 hwm = 'TaskScheduler.hwm',
133 129 )
134 130 aliases.update(base_aliases)
135 131 aliases.update(session_aliases)
136 132
137 133
138 134 class IPControllerApp(BaseParallelApplication):
139 135
140 136 name = u'ipcontroller'
141 137 description = _description
142 138 examples = _examples
143 139 config_file_name = Unicode(default_config_file_name)
144 140 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
145 141
146 142 # change default to True
147 143 auto_create = Bool(True, config=True,
148 144 help="""Whether to create profile dir if it doesn't exist.""")
149 145
150 146 reuse_files = Bool(False, config=True,
151 147 help='Whether to reuse existing json connection files.'
152 148 )
153 149 ssh_server = Unicode(u'', config=True,
154 150 help="""ssh url for clients to use when connecting to the Controller
155 151 processes. It should be of the form: [user@]server[:port]. The
156 152 Controller's listening addresses must be accessible from the ssh server""",
157 153 )
158 154 engine_ssh_server = Unicode(u'', config=True,
159 155 help="""ssh url for engines to use when connecting to the Controller
160 156 processes. It should be of the form: [user@]server[:port]. The
161 157 Controller's listening addresses must be accessible from the ssh server""",
162 158 )
163 159 location = Unicode(u'', config=True,
164 160 help="""The external IP or domain name of the Controller, used for disambiguating
165 161 engine and client connections.""",
166 162 )
167 163 import_statements = List([], config=True,
168 164 help="import statements to be run at startup. Necessary in some environments"
169 165 )
170 166
171 167 use_threads = Bool(False, config=True,
172 168 help='Use threads instead of processes for the schedulers',
173 169 )
174 170
175 171 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
176 172 help="JSON filename where engine connection info will be stored.")
177 173 client_json_file = Unicode('ipcontroller-client.json', config=True,
178 174 help="JSON filename where client connection info will be stored.")
179 175
180 176 def _cluster_id_changed(self, name, old, new):
181 177 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
182 178 self.engine_json_file = "%s-engine.json" % self.name
183 179 self.client_json_file = "%s-client.json" % self.name
184 180
185 181
186 182 # internal
187 183 children = List()
188 184 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
189 185
190 186 def _use_threads_changed(self, name, old, new):
191 187 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
192 188
193 189 aliases = Dict(aliases)
194 190 flags = Dict(flags)
195 191
196 192
197 193 def save_connection_dict(self, fname, cdict):
198 194 """save a connection dict to json file."""
199 195 c = self.config
200 196 url = cdict['url']
201 197 location = cdict['location']
202 198 if not location:
203 199 try:
204 200 proto,ip,port = split_url(url)
205 201 except AssertionError:
206 202 pass
207 203 else:
208 204 try:
209 205 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
210 206 except (socket.gaierror, IndexError):
211 207 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
212 208 " You may need to specify '--location=<external_ip_address>' to help"
213 209 " IPython decide when to connect via loopback.")
214 210 location = '127.0.0.1'
215 211 cdict['location'] = location
216 212 fname = os.path.join(self.profile_dir.security_dir, fname)
217 with open(fname, 'wb') as f:
213 with open(fname, 'w') as f:
218 214 f.write(json.dumps(cdict, indent=2))
219 215 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
220 216
221 217 def load_config_from_json(self):
222 218 """load config from existing json connector files."""
223 219 c = self.config
224 220 self.log.debug("loading config from JSON")
225 221 # load from engine config
226 222 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
227 223 cfg = json.loads(f.read())
228 224 key = c.Session.key = asbytes(cfg['exec_key'])
229 225 xport,addr = cfg['url'].split('://')
230 226 c.HubFactory.engine_transport = xport
231 227 ip,ports = addr.split(':')
232 228 c.HubFactory.engine_ip = ip
233 229 c.HubFactory.regport = int(ports)
234 230 self.location = cfg['location']
235 231 if not self.engine_ssh_server:
236 232 self.engine_ssh_server = cfg['ssh']
237 233 # load client config
238 234 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
239 235 cfg = json.loads(f.read())
240 236 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
241 237 xport,addr = cfg['url'].split('://')
242 238 c.HubFactory.client_transport = xport
243 239 ip,ports = addr.split(':')
244 240 c.HubFactory.client_ip = ip
245 241 if not self.ssh_server:
246 242 self.ssh_server = cfg['ssh']
247 243 assert int(ports) == c.HubFactory.regport, "regport mismatch"
248 244
249 245 def load_secondary_config(self):
250 246 """secondary config, loading from JSON and setting defaults"""
251 247 if self.reuse_files:
252 248 try:
253 249 self.load_config_from_json()
254 250 except (AssertionError,IOError) as e:
255 251 self.log.error("Could not load config from JSON: %s" % e)
256 252 self.reuse_files=False
257 253 # switch Session.key default to secure
258 254 default_secure(self.config)
259 255 self.log.debug("Config changed")
260 256 self.log.debug(repr(self.config))
261 257
262 258 def init_hub(self):
263 259 c = self.config
264 260
265 261 self.do_import_statements()
266 262
267 263 try:
268 264 self.factory = HubFactory(config=c, log=self.log)
269 265 # self.start_logging()
270 266 self.factory.init_hub()
271 267 except TraitError:
272 268 raise
273 269 except Exception:
274 270 self.log.error("Couldn't construct the Controller", exc_info=True)
275 271 self.exit(1)
276 272
277 273 if not self.reuse_files:
278 274 # save to new json config files
279 275 f = self.factory
280 276 cdict = {'exec_key' : f.session.key.decode('ascii'),
281 277 'ssh' : self.ssh_server,
282 278 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
283 279 'location' : self.location
284 280 }
285 281 self.save_connection_dict(self.client_json_file, cdict)
286 282 edict = cdict
287 283 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
288 284 edict['ssh'] = self.engine_ssh_server
289 285 self.save_connection_dict(self.engine_json_file, edict)
290 286
291 287 #
292 288 def init_schedulers(self):
293 289 children = self.children
294 290 mq = import_item(str(self.mq_class))
295 291
296 292 hub = self.factory
297 293 # disambiguate url, in case of *
298 294 monitor_url = disambiguate_url(hub.monitor_url)
299 295 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
300 296 # IOPub relay (in a Process)
301 297 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
302 298 q.bind_in(hub.client_info['iopub'])
303 299 q.bind_out(hub.engine_info['iopub'])
304 300 q.setsockopt_out(zmq.SUBSCRIBE, b'')
305 301 q.connect_mon(monitor_url)
306 302 q.daemon=True
307 303 children.append(q)
308 304
309 305 # Multiplexer Queue (in a Process)
310 306 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
311 307 q.bind_in(hub.client_info['mux'])
312 308 q.setsockopt_in(zmq.IDENTITY, b'mux')
313 309 q.bind_out(hub.engine_info['mux'])
314 310 q.connect_mon(monitor_url)
315 311 q.daemon=True
316 312 children.append(q)
317 313
318 314 # Control Queue (in a Process)
319 315 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
320 316 q.bind_in(hub.client_info['control'])
321 317 q.setsockopt_in(zmq.IDENTITY, b'control')
322 318 q.bind_out(hub.engine_info['control'])
323 319 q.connect_mon(monitor_url)
324 320 q.daemon=True
325 321 children.append(q)
326 322 try:
327 323 scheme = self.config.TaskScheduler.scheme_name
328 324 except AttributeError:
329 325 scheme = TaskScheduler.scheme_name.get_default_value()
330 326 # Task Queue (in a Process)
331 327 if scheme == 'pure':
332 328 self.log.warn("task::using pure XREQ Task scheduler")
333 329 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
334 330 # q.setsockopt_out(zmq.HWM, hub.hwm)
335 331 q.bind_in(hub.client_info['task'][1])
336 332 q.setsockopt_in(zmq.IDENTITY, b'task')
337 333 q.bind_out(hub.engine_info['task'])
338 334 q.connect_mon(monitor_url)
339 335 q.daemon=True
340 336 children.append(q)
341 337 elif scheme == 'none':
342 338 self.log.warn("task::using no Task scheduler")
343 339
344 340 else:
345 341 self.log.info("task::using Python %s Task scheduler"%scheme)
346 342 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
347 343 monitor_url, disambiguate_url(hub.client_info['notification']))
348 344 kwargs = dict(logname='scheduler', loglevel=self.log_level,
349 345 log_url = self.log_url, config=dict(self.config))
350 346 if 'Process' in self.mq_class:
351 347 # run the Python scheduler in a Process
352 348 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
353 349 q.daemon=True
354 350 children.append(q)
355 351 else:
356 352 # single-threaded Controller
357 353 kwargs['in_thread'] = True
358 354 launch_scheduler(*sargs, **kwargs)
359 355
360 356
361 357 def save_urls(self):
362 358 """save the registration urls to files."""
363 359 c = self.config
364 360
365 361 sec_dir = self.profile_dir.security_dir
366 362 cf = self.factory
367 363
368 364 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
369 365 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
370 366
371 367 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
372 368 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
373 369
374 370
375 371 def do_import_statements(self):
376 372 statements = self.import_statements
377 373 for s in statements:
378 374 try:
379 375 self.log.msg("Executing statement: '%s'" % s)
380 376 exec s in globals(), locals()
381 377 except:
382 378 self.log.msg("Error running statement: %s" % s)
383 379
384 380 def forward_logging(self):
385 381 if self.log_url:
386 382 self.log.info("Forwarding logging to %s"%self.log_url)
387 383 context = zmq.Context.instance()
388 384 lsock = context.socket(zmq.PUB)
389 385 lsock.connect(self.log_url)
390 386 handler = PUBHandler(lsock)
391 387 self.log.removeHandler(self._log_handler)
392 388 handler.root_topic = 'controller'
393 389 handler.setLevel(self.log_level)
394 390 self.log.addHandler(handler)
395 391 self._log_handler = handler
396 392
397 393 @catch_config_error
398 394 def initialize(self, argv=None):
399 395 super(IPControllerApp, self).initialize(argv)
400 396 self.forward_logging()
401 397 self.load_secondary_config()
402 398 self.init_hub()
403 399 self.init_schedulers()
404 400
405 401 def start(self):
406 402 # Start the subprocesses:
407 403 self.factory.start()
408 404 child_procs = []
409 405 for child in self.children:
410 406 child.start()
411 407 if isinstance(child, ProcessMonitoredQueue):
412 408 child_procs.append(child.launcher)
413 409 elif isinstance(child, Process):
414 410 child_procs.append(child)
415 411 if child_procs:
416 412 signal_children(child_procs)
417 413
418 414 self.write_pid_file(overwrite=True)
419 415
420 416 try:
421 417 self.factory.loop.start()
422 418 except KeyboardInterrupt:
423 419 self.log.critical("Interrupted, Exiting...\n")
424 420
425 421
426 422
427 423 def launch_new_instance():
428 424 """Create and run the IPython controller"""
429 425 if sys.platform == 'win32':
430 426 # make sure we don't get called from a multiprocessing subprocess
431 427 # this can result in infinite Controllers being started on Windows
432 428 # which doesn't have a proper fork, so multiprocessing is wonky
433 429
434 430 # this only comes up when IPython has been installed using vanilla
435 431 # setuptools, and *not* distribute.
436 432 import multiprocessing
437 433 p = multiprocessing.current_process()
438 434 # the main process has name 'MainProcess'
439 435 # subprocesses will have names like 'Process-1'
440 436 if p.name != 'MainProcess':
441 437 # we are a subprocess, don't start another Controller!
442 438 return
443 439 app = IPControllerApp.instance()
444 440 app.initialize()
445 441 app.start()
446 442
447 443
448 444 if __name__ == '__main__':
449 445 launch_new_instance()
@@ -1,714 +1,716 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6
7 7 Authors:
8 8
9 9 * Min RK
10 10 """
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2010-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #----------------------------------------------------------------------
19 19 # Imports
20 20 #----------------------------------------------------------------------
21 21
22 22 from __future__ import print_function
23 23
24 24 import logging
25 25 import sys
26 26
27 27 from datetime import datetime, timedelta
28 28 from random import randint, random
29 29 from types import FunctionType
30 30
31 31 try:
32 32 import numpy
33 33 except ImportError:
34 34 numpy = None
35 35
36 36 import zmq
37 37 from zmq.eventloop import ioloop, zmqstream
38 38
39 39 # local imports
40 40 from IPython.external.decorator import decorator
41 41 from IPython.config.application import Application
42 42 from IPython.config.loader import Config
43 43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44 44
45 45 from IPython.parallel import error
46 46 from IPython.parallel.factory import SessionFactory
47 47 from IPython.parallel.util import connect_logger, local_logger, asbytes
48 48
49 49 from .dependency import Dependency
50 50
51 51 @decorator
52 52 def logged(f,self,*args,**kwargs):
53 53 # print ("#--------------------")
54 54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 55 # print ("#--")
56 56 return f(self,*args, **kwargs)
57 57
58 58 #----------------------------------------------------------------------
59 59 # Chooser functions
60 60 #----------------------------------------------------------------------
61 61
62 62 def plainrandom(loads):
63 63 """Plain random pick."""
64 64 n = len(loads)
65 65 return randint(0,n-1)
66 66
67 67 def lru(loads):
68 68 """Always pick the front of the line.
69 69
70 70 The content of `loads` is ignored.
71 71
72 72 Assumes LRU ordering of loads, with oldest first.
73 73 """
74 74 return 0
75 75
76 76 def twobin(loads):
77 77 """Pick two at random, use the LRU of the two.
78 78
79 79 The content of loads is ignored.
80 80
81 81 Assumes LRU ordering of loads, with oldest first.
82 82 """
83 83 n = len(loads)
84 84 a = randint(0,n-1)
85 85 b = randint(0,n-1)
86 86 return min(a,b)
87 87
88 88 def weighted(loads):
89 89 """Pick two at random using inverse load as weight.
90 90
91 91 Return the less loaded of the two.
92 92 """
93 93 # weight 0 a million times more than 1:
94 94 weights = 1./(1e-6+numpy.array(loads))
95 95 sums = weights.cumsum()
96 96 t = sums[-1]
97 97 x = random()*t
98 98 y = random()*t
99 99 idx = 0
100 100 idy = 0
101 101 while sums[idx] < x:
102 102 idx += 1
103 103 while sums[idy] < y:
104 104 idy += 1
105 105 if weights[idy] > weights[idx]:
106 106 return idy
107 107 else:
108 108 return idx
109 109
110 110 def leastload(loads):
111 111 """Always choose the lowest load.
112 112
113 113 If the lowest load occurs more than once, the first
114 114 occurance will be used. If loads has LRU ordering, this means
115 115 the LRU of those with the lowest load is chosen.
116 116 """
117 117 return loads.index(min(loads))
118 118
119 119 #---------------------------------------------------------------------
120 120 # Classes
121 121 #---------------------------------------------------------------------
122 122 # store empty default dependency:
123 123 MET = Dependency([])
124 124
125 125 class TaskScheduler(SessionFactory):
126 126 """Python TaskScheduler object.
127 127
128 128 This is the simplest object that supports msg_id based
129 129 DAG dependencies. *Only* task msg_ids are checked, not
130 130 msg_ids of jobs submitted via the MUX queue.
131 131
132 132 """
133 133
134 134 hwm = Integer(0, config=True, shortname='hwm',
135 135 help="""specify the High Water Mark (HWM) for the downstream
136 136 socket in the Task scheduler. This is the maximum number
137 137 of allowed outstanding tasks on each engine."""
138 138 )
139 139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
140 140 'leastload', config=True, shortname='scheme', allow_none=False,
141 141 help="""select the task scheduler scheme [default: Python LRU]
142 142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
143 143 )
144 144 def _scheme_name_changed(self, old, new):
145 145 self.log.debug("Using scheme %r"%new)
146 146 self.scheme = globals()[new]
147 147
148 148 # input arguments:
149 149 scheme = Instance(FunctionType) # function for determining the destination
150 150 def _scheme_default(self):
151 151 return leastload
152 152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
153 153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
154 154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
155 155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
156 156
157 157 # internals:
158 158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
159 159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
160 160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
161 161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
162 162 pending = Dict() # dict by engine_uuid of submitted tasks
163 163 completed = Dict() # dict by engine_uuid of completed tasks
164 164 failed = Dict() # dict by engine_uuid of failed tasks
165 165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
166 166 clients = Dict() # dict by msg_id for who submitted the task
167 167 targets = List() # list of target IDENTs
168 168 loads = List() # list of engine loads
169 169 # full = Set() # set of IDENTs that have HWM outstanding tasks
170 170 all_completed = Set() # set of all completed tasks
171 171 all_failed = Set() # set of all failed tasks
172 172 all_done = Set() # set of all finished tasks=union(completed,failed)
173 173 all_ids = Set() # set of all submitted task IDs
174 174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
175 175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
176 176
177 177 ident = CBytes() # ZMQ identity. This should just be self.session.session
178 178 # but ensure Bytes
179 179 def _ident_default(self):
180 180 return self.session.bsession
181 181
182 182 def start(self):
183 183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
184 184 self._notification_handlers = dict(
185 185 registration_notification = self._register_engine,
186 186 unregistration_notification = self._unregister_engine
187 187 )
188 188 self.notifier_stream.on_recv(self.dispatch_notification)
189 189 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
190 190 self.auditor.start()
191 191 self.log.info("Scheduler started [%s]"%self.scheme_name)
192 192
193 193 def resume_receiving(self):
194 194 """Resume accepting jobs."""
195 195 self.client_stream.on_recv(self.dispatch_submission, copy=False)
196 196
197 197 def stop_receiving(self):
198 198 """Stop accepting jobs while there are no engines.
199 199 Leave them in the ZMQ queue."""
200 200 self.client_stream.on_recv(None)
201 201
202 202 #-----------------------------------------------------------------------
203 203 # [Un]Registration Handling
204 204 #-----------------------------------------------------------------------
205 205
206 206 def dispatch_notification(self, msg):
207 207 """dispatch register/unregister events."""
208 208 try:
209 209 idents,msg = self.session.feed_identities(msg)
210 210 except ValueError:
211 211 self.log.warn("task::Invalid Message: %r",msg)
212 212 return
213 213 try:
214 214 msg = self.session.unserialize(msg)
215 215 except ValueError:
216 216 self.log.warn("task::Unauthorized message from: %r"%idents)
217 217 return
218 218
219 219 msg_type = msg['header']['msg_type']
220 220
221 221 handler = self._notification_handlers.get(msg_type, None)
222 222 if handler is None:
223 223 self.log.error("Unhandled message type: %r"%msg_type)
224 224 else:
225 225 try:
226 226 handler(asbytes(msg['content']['queue']))
227 227 except Exception:
228 228 self.log.error("task::Invalid notification msg: %r",msg)
229 229
230 230 def _register_engine(self, uid):
231 231 """New engine with ident `uid` became available."""
232 232 # head of the line:
233 233 self.targets.insert(0,uid)
234 234 self.loads.insert(0,0)
235 235
236 236 # initialize sets
237 237 self.completed[uid] = set()
238 238 self.failed[uid] = set()
239 239 self.pending[uid] = {}
240 240 if len(self.targets) == 1:
241 241 self.resume_receiving()
242 242 # rescan the graph:
243 243 self.update_graph(None)
244 244
245 245 def _unregister_engine(self, uid):
246 246 """Existing engine with ident `uid` became unavailable."""
247 247 if len(self.targets) == 1:
248 248 # this was our only engine
249 249 self.stop_receiving()
250 250
251 251 # handle any potentially finished tasks:
252 252 self.engine_stream.flush()
253 253
254 254 # don't pop destinations, because they might be used later
255 255 # map(self.destinations.pop, self.completed.pop(uid))
256 256 # map(self.destinations.pop, self.failed.pop(uid))
257 257
258 258 # prevent this engine from receiving work
259 259 idx = self.targets.index(uid)
260 260 self.targets.pop(idx)
261 261 self.loads.pop(idx)
262 262
263 263 # wait 5 seconds before cleaning up pending jobs, since the results might
264 264 # still be incoming
265 265 if self.pending[uid]:
266 266 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
267 267 dc.start()
268 268 else:
269 269 self.completed.pop(uid)
270 270 self.failed.pop(uid)
271 271
272 272
273 273 def handle_stranded_tasks(self, engine):
274 274 """Deal with jobs resident in an engine that died."""
275 275 lost = self.pending[engine]
276 276 for msg_id in lost.keys():
277 277 if msg_id not in self.pending[engine]:
278 278 # prevent double-handling of messages
279 279 continue
280 280
281 281 raw_msg = lost[msg_id][0]
282 282 idents,msg = self.session.feed_identities(raw_msg, copy=False)
283 283 parent = self.session.unpack(msg[1].bytes)
284 284 idents = [engine, idents[0]]
285 285
286 286 # build fake error reply
287 287 try:
288 288 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
289 289 except:
290 290 content = error.wrap_exception()
291 291 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
292 292 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
293 293 # and dispatch it
294 294 self.dispatch_result(raw_reply)
295 295
296 296 # finally scrub completed/failed lists
297 297 self.completed.pop(engine)
298 298 self.failed.pop(engine)
299 299
300 300
301 301 #-----------------------------------------------------------------------
302 302 # Job Submission
303 303 #-----------------------------------------------------------------------
304 304 def dispatch_submission(self, raw_msg):
305 305 """Dispatch job submission to appropriate handlers."""
306 306 # ensure targets up to date:
307 307 self.notifier_stream.flush()
308 308 try:
309 309 idents, msg = self.session.feed_identities(raw_msg, copy=False)
310 310 msg = self.session.unserialize(msg, content=False, copy=False)
311 311 except Exception:
312 312 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
313 313 return
314 314
315 315
316 316 # send to monitor
317 317 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
318 318
319 319 header = msg['header']
320 320 msg_id = header['msg_id']
321 321 self.all_ids.add(msg_id)
322 322
323 323 # get targets as a set of bytes objects
324 324 # from a list of unicode objects
325 325 targets = header.get('targets', [])
326 326 targets = map(asbytes, targets)
327 327 targets = set(targets)
328 328
329 329 retries = header.get('retries', 0)
330 330 self.retries[msg_id] = retries
331 331
332 332 # time dependencies
333 333 after = header.get('after', None)
334 334 if after:
335 335 after = Dependency(after)
336 336 if after.all:
337 337 if after.success:
338 338 after = Dependency(after.difference(self.all_completed),
339 339 success=after.success,
340 340 failure=after.failure,
341 341 all=after.all,
342 342 )
343 343 if after.failure:
344 344 after = Dependency(after.difference(self.all_failed),
345 345 success=after.success,
346 346 failure=after.failure,
347 347 all=after.all,
348 348 )
349 349 if after.check(self.all_completed, self.all_failed):
350 350 # recast as empty set, if `after` already met,
351 351 # to prevent unnecessary set comparisons
352 352 after = MET
353 353 else:
354 354 after = MET
355 355
356 356 # location dependencies
357 357 follow = Dependency(header.get('follow', []))
358 358
359 359 # turn timeouts into datetime objects:
360 360 timeout = header.get('timeout', None)
361 361 if timeout:
362 timeout = datetime.now() + timedelta(0,timeout,0)
362 # cast to float, because jsonlib returns floats as decimal.Decimal,
363 # which timedelta does not accept
364 timeout = datetime.now() + timedelta(0,float(timeout),0)
363 365
364 366 args = [raw_msg, targets, after, follow, timeout]
365 367
366 368 # validate and reduce dependencies:
367 369 for dep in after,follow:
368 370 if not dep: # empty dependency
369 371 continue
370 372 # check valid:
371 373 if msg_id in dep or dep.difference(self.all_ids):
372 374 self.depending[msg_id] = args
373 375 return self.fail_unreachable(msg_id, error.InvalidDependency)
374 376 # check if unreachable:
375 377 if dep.unreachable(self.all_completed, self.all_failed):
376 378 self.depending[msg_id] = args
377 379 return self.fail_unreachable(msg_id)
378 380
379 381 if after.check(self.all_completed, self.all_failed):
380 382 # time deps already met, try to run
381 383 if not self.maybe_run(msg_id, *args):
382 384 # can't run yet
383 385 if msg_id not in self.all_failed:
384 386 # could have failed as unreachable
385 387 self.save_unmet(msg_id, *args)
386 388 else:
387 389 self.save_unmet(msg_id, *args)
388 390
389 391 def audit_timeouts(self):
390 392 """Audit all waiting tasks for expired timeouts."""
391 393 now = datetime.now()
392 394 for msg_id in self.depending.keys():
393 395 # must recheck, in case one failure cascaded to another:
394 396 if msg_id in self.depending:
395 397 raw,after,targets,follow,timeout = self.depending[msg_id]
396 398 if timeout and timeout < now:
397 399 self.fail_unreachable(msg_id, error.TaskTimeout)
398 400
399 401 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
400 402 """a task has become unreachable, send a reply with an ImpossibleDependency
401 403 error."""
402 404 if msg_id not in self.depending:
403 405 self.log.error("msg %r already failed!", msg_id)
404 406 return
405 407 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
406 408 for mid in follow.union(after):
407 409 if mid in self.graph:
408 410 self.graph[mid].remove(msg_id)
409 411
410 412 # FIXME: unpacking a message I've already unpacked, but didn't save:
411 413 idents,msg = self.session.feed_identities(raw_msg, copy=False)
412 414 header = self.session.unpack(msg[1].bytes)
413 415
414 416 try:
415 417 raise why()
416 418 except:
417 419 content = error.wrap_exception()
418 420
419 421 self.all_done.add(msg_id)
420 422 self.all_failed.add(msg_id)
421 423
422 424 msg = self.session.send(self.client_stream, 'apply_reply', content,
423 425 parent=header, ident=idents)
424 426 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
425 427
426 428 self.update_graph(msg_id, success=False)
427 429
428 430 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
429 431 """check location dependencies, and run if they are met."""
430 432 blacklist = self.blacklist.setdefault(msg_id, set())
431 433 if follow or targets or blacklist or self.hwm:
432 434 # we need a can_run filter
433 435 def can_run(idx):
434 436 # check hwm
435 437 if self.hwm and self.loads[idx] == self.hwm:
436 438 return False
437 439 target = self.targets[idx]
438 440 # check blacklist
439 441 if target in blacklist:
440 442 return False
441 443 # check targets
442 444 if targets and target not in targets:
443 445 return False
444 446 # check follow
445 447 return follow.check(self.completed[target], self.failed[target])
446 448
447 449 indices = filter(can_run, range(len(self.targets)))
448 450
449 451 if not indices:
450 452 # couldn't run
451 453 if follow.all:
452 454 # check follow for impossibility
453 455 dests = set()
454 456 relevant = set()
455 457 if follow.success:
456 458 relevant = self.all_completed
457 459 if follow.failure:
458 460 relevant = relevant.union(self.all_failed)
459 461 for m in follow.intersection(relevant):
460 462 dests.add(self.destinations[m])
461 463 if len(dests) > 1:
462 464 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
463 465 self.fail_unreachable(msg_id)
464 466 return False
465 467 if targets:
466 468 # check blacklist+targets for impossibility
467 469 targets.difference_update(blacklist)
468 470 if not targets or not targets.intersection(self.targets):
469 471 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
470 472 self.fail_unreachable(msg_id)
471 473 return False
472 474 return False
473 475 else:
474 476 indices = None
475 477
476 478 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
477 479 return True
478 480
479 481 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
480 482 """Save a message for later submission when its dependencies are met."""
481 483 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
482 484 # track the ids in follow or after, but not those already finished
483 485 for dep_id in after.union(follow).difference(self.all_done):
484 486 if dep_id not in self.graph:
485 487 self.graph[dep_id] = set()
486 488 self.graph[dep_id].add(msg_id)
487 489
488 490 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
489 491 """Submit a task to any of a subset of our targets."""
490 492 if indices:
491 493 loads = [self.loads[i] for i in indices]
492 494 else:
493 495 loads = self.loads
494 496 idx = self.scheme(loads)
495 497 if indices:
496 498 idx = indices[idx]
497 499 target = self.targets[idx]
498 500 # print (target, map(str, msg[:3]))
499 501 # send job to the engine
500 502 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
501 503 self.engine_stream.send_multipart(raw_msg, copy=False)
502 504 # update load
503 505 self.add_job(idx)
504 506 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
505 507 # notify Hub
506 508 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
507 509 self.session.send(self.mon_stream, 'task_destination', content=content,
508 510 ident=[b'tracktask',self.ident])
509 511
510 512
511 513 #-----------------------------------------------------------------------
512 514 # Result Handling
513 515 #-----------------------------------------------------------------------
514 516 def dispatch_result(self, raw_msg):
515 517 """dispatch method for result replies"""
516 518 try:
517 519 idents,msg = self.session.feed_identities(raw_msg, copy=False)
518 520 msg = self.session.unserialize(msg, content=False, copy=False)
519 521 engine = idents[0]
520 522 try:
521 523 idx = self.targets.index(engine)
522 524 except ValueError:
523 525 pass # skip load-update for dead engines
524 526 else:
525 527 self.finish_job(idx)
526 528 except Exception:
527 529 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
528 530 return
529 531
530 532 header = msg['header']
531 533 parent = msg['parent_header']
532 534 if header.get('dependencies_met', True):
533 535 success = (header['status'] == 'ok')
534 536 msg_id = parent['msg_id']
535 537 retries = self.retries[msg_id]
536 538 if not success and retries > 0:
537 539 # failed
538 540 self.retries[msg_id] = retries - 1
539 541 self.handle_unmet_dependency(idents, parent)
540 542 else:
541 543 del self.retries[msg_id]
542 544 # relay to client and update graph
543 545 self.handle_result(idents, parent, raw_msg, success)
544 546 # send to Hub monitor
545 547 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
546 548 else:
547 549 self.handle_unmet_dependency(idents, parent)
548 550
549 551 def handle_result(self, idents, parent, raw_msg, success=True):
550 552 """handle a real task result, either success or failure"""
551 553 # first, relay result to client
552 554 engine = idents[0]
553 555 client = idents[1]
554 556 # swap_ids for XREP-XREP mirror
555 557 raw_msg[:2] = [client,engine]
556 558 # print (map(str, raw_msg[:4]))
557 559 self.client_stream.send_multipart(raw_msg, copy=False)
558 560 # now, update our data structures
559 561 msg_id = parent['msg_id']
560 562 self.blacklist.pop(msg_id, None)
561 563 self.pending[engine].pop(msg_id)
562 564 if success:
563 565 self.completed[engine].add(msg_id)
564 566 self.all_completed.add(msg_id)
565 567 else:
566 568 self.failed[engine].add(msg_id)
567 569 self.all_failed.add(msg_id)
568 570 self.all_done.add(msg_id)
569 571 self.destinations[msg_id] = engine
570 572
571 573 self.update_graph(msg_id, success)
572 574
573 575 def handle_unmet_dependency(self, idents, parent):
574 576 """handle an unmet dependency"""
575 577 engine = idents[0]
576 578 msg_id = parent['msg_id']
577 579
578 580 if msg_id not in self.blacklist:
579 581 self.blacklist[msg_id] = set()
580 582 self.blacklist[msg_id].add(engine)
581 583
582 584 args = self.pending[engine].pop(msg_id)
583 585 raw,targets,after,follow,timeout = args
584 586
585 587 if self.blacklist[msg_id] == targets:
586 588 self.depending[msg_id] = args
587 589 self.fail_unreachable(msg_id)
588 590 elif not self.maybe_run(msg_id, *args):
589 591 # resubmit failed
590 592 if msg_id not in self.all_failed:
591 593 # put it back in our dependency tree
592 594 self.save_unmet(msg_id, *args)
593 595
594 596 if self.hwm:
595 597 try:
596 598 idx = self.targets.index(engine)
597 599 except ValueError:
598 600 pass # skip load-update for dead engines
599 601 else:
600 602 if self.loads[idx] == self.hwm-1:
601 603 self.update_graph(None)
602 604
603 605
604 606
605 607 def update_graph(self, dep_id=None, success=True):
606 608 """dep_id just finished. Update our dependency
607 609 graph and submit any jobs that just became runable.
608 610
609 611 Called with dep_id=None to update entire graph for hwm, but without finishing
610 612 a task.
611 613 """
612 614 # print ("\n\n***********")
613 615 # pprint (dep_id)
614 616 # pprint (self.graph)
615 617 # pprint (self.depending)
616 618 # pprint (self.all_completed)
617 619 # pprint (self.all_failed)
618 620 # print ("\n\n***********\n\n")
619 621 # update any jobs that depended on the dependency
620 622 jobs = self.graph.pop(dep_id, [])
621 623
622 624 # recheck *all* jobs if
623 625 # a) we have HWM and an engine just become no longer full
624 626 # or b) dep_id was given as None
625 627 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
626 628 jobs = self.depending.keys()
627 629
628 630 for msg_id in jobs:
629 631 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
630 632
631 633 if after.unreachable(self.all_completed, self.all_failed)\
632 634 or follow.unreachable(self.all_completed, self.all_failed):
633 635 self.fail_unreachable(msg_id)
634 636
635 637 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
636 638 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
637 639
638 640 self.depending.pop(msg_id)
639 641 for mid in follow.union(after):
640 642 if mid in self.graph:
641 643 self.graph[mid].remove(msg_id)
642 644
643 645 #----------------------------------------------------------------------
644 646 # methods to be overridden by subclasses
645 647 #----------------------------------------------------------------------
646 648
647 649 def add_job(self, idx):
648 650 """Called after self.targets[idx] just got the job with header.
649 651 Override with subclasses. The default ordering is simple LRU.
650 652 The default loads are the number of outstanding jobs."""
651 653 self.loads[idx] += 1
652 654 for lis in (self.targets, self.loads):
653 655 lis.append(lis.pop(idx))
654 656
655 657
656 658 def finish_job(self, idx):
657 659 """Called after self.targets[idx] just finished a job.
658 660 Override with subclasses."""
659 661 self.loads[idx] -= 1
660 662
661 663
662 664
663 665 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
664 666 logname='root', log_url=None, loglevel=logging.DEBUG,
665 667 identity=b'task', in_thread=False):
666 668
667 669 ZMQStream = zmqstream.ZMQStream
668 670
669 671 if config:
670 672 # unwrap dict back into Config
671 673 config = Config(config)
672 674
673 675 if in_thread:
674 676 # use instance() to get the same Context/Loop as our parent
675 677 ctx = zmq.Context.instance()
676 678 loop = ioloop.IOLoop.instance()
677 679 else:
678 680 # in a process, don't use instance()
679 681 # for safety with multiprocessing
680 682 ctx = zmq.Context()
681 683 loop = ioloop.IOLoop()
682 684 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
683 685 ins.setsockopt(zmq.IDENTITY, identity)
684 686 ins.bind(in_addr)
685 687
686 688 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
687 689 outs.setsockopt(zmq.IDENTITY, identity)
688 690 outs.bind(out_addr)
689 691 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
690 692 mons.connect(mon_addr)
691 693 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
692 694 nots.setsockopt(zmq.SUBSCRIBE, b'')
693 695 nots.connect(not_addr)
694 696
695 697 # setup logging.
696 698 if in_thread:
697 699 log = Application.instance().log
698 700 else:
699 701 if log_url:
700 702 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
701 703 else:
702 704 log = local_logger(logname, loglevel)
703 705
704 706 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
705 707 mon_stream=mons, notifier_stream=nots,
706 708 loop=loop, log=log,
707 709 config=config)
708 710 scheduler.start()
709 711 if not in_thread:
710 712 try:
711 713 loop.start()
712 714 except KeyboardInterrupt:
713 715 print ("interrupted, exiting...", file=sys.__stderr__)
714 716
@@ -1,212 +1,208 b''
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3 """
4 4
5 5 # Standard library imports.
6 6 import atexit
7 import json
7 8 import os
8 9 import socket
9 10 from subprocess import Popen, PIPE
10 11 import sys
11 12 import tempfile
12 13
13 14 # System library imports
14 15
15 # Note: use our own import to work around jsonlib api mismatch. When these
16 # changes propagate to zmq, revert back to the following line instead:
17 #from zmq.utils import jsonapi as json
18 from IPython.zmq import jsonapi as json
19
20 16 # IPython imports
21 17 from IPython.utils.localinterfaces import LOCALHOST
22 18 from IPython.utils.py3compat import bytes_to_str
23 19
24 20 # Local imports.
25 21 from parentpoller import ParentPollerWindows
26 22
27 23 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
28 24 ip=LOCALHOST, key=b''):
29 25 """Generates a JSON config file, including the selection of random ports.
30 26
31 27 Parameters
32 28 ----------
33 29
34 30 fname : unicode
35 31 The path to the file to write
36 32
37 33 shell_port : int, optional
38 34 The port to use for XREP channel.
39 35
40 36 iopub_port : int, optional
41 37 The port to use for the SUB channel.
42 38
43 39 stdin_port : int, optional
44 40 The port to use for the REQ (raw input) channel.
45 41
46 42 hb_port : int, optional
47 43 The port to use for the hearbeat REP channel.
48 44
49 45 ip : str, optional
50 46 The ip address the kernel will bind to.
51 47
52 48 key : str, optional
53 49 The Session key used for HMAC authentication.
54 50
55 51 """
56 52 # default to temporary connector file
57 53 if not fname:
58 54 fname = tempfile.mktemp('.json')
59 55
60 56 # Find open ports as necessary.
61 57 ports = []
62 58 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
63 59 int(stdin_port <= 0) + int(hb_port <= 0)
64 60 for i in xrange(ports_needed):
65 61 sock = socket.socket()
66 62 sock.bind(('', 0))
67 63 ports.append(sock)
68 64 for i, sock in enumerate(ports):
69 65 port = sock.getsockname()[1]
70 66 sock.close()
71 67 ports[i] = port
72 68 if shell_port <= 0:
73 69 shell_port = ports.pop(0)
74 70 if iopub_port <= 0:
75 71 iopub_port = ports.pop(0)
76 72 if stdin_port <= 0:
77 73 stdin_port = ports.pop(0)
78 74 if hb_port <= 0:
79 75 hb_port = ports.pop(0)
80 76
81 77 cfg = dict( shell_port=shell_port,
82 78 iopub_port=iopub_port,
83 79 stdin_port=stdin_port,
84 80 hb_port=hb_port,
85 81 )
86 82 cfg['ip'] = ip
87 83 cfg['key'] = bytes_to_str(key)
88 84
89 with open(fname, 'wb') as f:
85 with open(fname, 'w') as f:
90 86 f.write(json.dumps(cfg, indent=2))
91 87
92 88 return fname, cfg
93 89
94 90
95 91 def base_launch_kernel(code, fname, stdin=None, stdout=None, stderr=None,
96 92 executable=None, independent=False, extra_arguments=[]):
97 93 """ Launches a localhost kernel, binding to the specified ports.
98 94
99 95 Parameters
100 96 ----------
101 97 code : str,
102 98 A string of Python code that imports and executes a kernel entry point.
103 99
104 100 stdin, stdout, stderr : optional (default None)
105 101 Standards streams, as defined in subprocess.Popen.
106 102
107 103 fname : unicode, optional
108 104 The JSON connector file, containing ip/port/hmac key information.
109 105
110 106 key : str, optional
111 107 The Session key used for HMAC authentication.
112 108
113 109 executable : str, optional (default sys.executable)
114 110 The Python executable to use for the kernel process.
115 111
116 112 independent : bool, optional (default False)
117 113 If set, the kernel process is guaranteed to survive if this process
118 114 dies. If not set, an effort is made to ensure that the kernel is killed
119 115 when this process dies. Note that in this case it is still good practice
120 116 to kill kernels manually before exiting.
121 117
122 118 extra_arguments = list, optional
123 119 A list of extra arguments to pass when executing the launch code.
124 120
125 121 Returns
126 122 -------
127 123 A tuple of form:
128 124 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
129 125 where kernel_process is a Popen object and the ports are integers.
130 126 """
131 127
132 128 # Build the kernel launch command.
133 129 if executable is None:
134 130 executable = sys.executable
135 131 arguments = [ executable, '-c', code, '-f', fname ]
136 132 arguments.extend(extra_arguments)
137 133
138 134 # Popen will fail (sometimes with a deadlock) if stdin, stdout, and stderr
139 135 # are invalid. Unfortunately, there is in general no way to detect whether
140 136 # they are valid. The following two blocks redirect them to (temporary)
141 137 # pipes in certain important cases.
142 138
143 139 # If this process has been backgrounded, our stdin is invalid. Since there
144 140 # is no compelling reason for the kernel to inherit our stdin anyway, we'll
145 141 # place this one safe and always redirect.
146 142 redirect_in = True
147 143 _stdin = PIPE if stdin is None else stdin
148 144
149 145 # If this process in running on pythonw, we know that stdin, stdout, and
150 146 # stderr are all invalid.
151 147 redirect_out = sys.executable.endswith('pythonw.exe')
152 148 if redirect_out:
153 149 _stdout = PIPE if stdout is None else stdout
154 150 _stderr = PIPE if stderr is None else stderr
155 151 else:
156 152 _stdout, _stderr = stdout, stderr
157 153
158 154 # Spawn a kernel.
159 155 if sys.platform == 'win32':
160 156 # Create a Win32 event for interrupting the kernel.
161 157 interrupt_event = ParentPollerWindows.create_interrupt_event()
162 158 arguments += [ '--interrupt=%i'%interrupt_event ]
163 159
164 160 # If the kernel is running on pythonw and stdout/stderr are not been
165 161 # re-directed, it will crash when more than 4KB of data is written to
166 162 # stdout or stderr. This is a bug that has been with Python for a very
167 163 # long time; see http://bugs.python.org/issue706263.
168 164 # A cleaner solution to this problem would be to pass os.devnull to
169 165 # Popen directly. Unfortunately, that does not work.
170 166 if executable.endswith('pythonw.exe'):
171 167 if stdout is None:
172 168 arguments.append('--no-stdout')
173 169 if stderr is None:
174 170 arguments.append('--no-stderr')
175 171
176 172 # Launch the kernel process.
177 173 if independent:
178 174 proc = Popen(arguments,
179 175 creationflags=512, # CREATE_NEW_PROCESS_GROUP
180 176 stdin=_stdin, stdout=_stdout, stderr=_stderr)
181 177 else:
182 178 from _subprocess import DuplicateHandle, GetCurrentProcess, \
183 179 DUPLICATE_SAME_ACCESS
184 180 pid = GetCurrentProcess()
185 181 handle = DuplicateHandle(pid, pid, pid, 0,
186 182 True, # Inheritable by new processes.
187 183 DUPLICATE_SAME_ACCESS)
188 184 proc = Popen(arguments + ['--parent=%i'%int(handle)],
189 185 stdin=_stdin, stdout=_stdout, stderr=_stderr)
190 186
191 187 # Attach the interrupt event to the Popen objet so it can be used later.
192 188 proc.win32_interrupt_event = interrupt_event
193 189
194 190 else:
195 191 if independent:
196 192 proc = Popen(arguments, preexec_fn=lambda: os.setsid(),
197 193 stdin=_stdin, stdout=_stdout, stderr=_stderr)
198 194 else:
199 195 proc = Popen(arguments + ['--parent=1'],
200 196 stdin=_stdin, stdout=_stdout, stderr=_stderr)
201 197
202 198 # Clean up pipes created to work around Popen bug.
203 199 if redirect_in:
204 200 if stdin is None:
205 201 proc.stdin.close()
206 202 if redirect_out:
207 203 if stdout is None:
208 204 proc.stdout.close()
209 205 if stderr is None:
210 206 proc.stderr.close()
211 207
212 208 return proc
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now