##// END OF EJS Templates
re-enable log forwarding and iplogger
MinRK -
Show More
@@ -1,539 +1,544 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import os
21 21 import logging
22 22 import re
23 23 import shutil
24 24 import sys
25 25
26 26 from subprocess import Popen, PIPE
27 27
28 28 from IPython.config.loader import PyFileConfigLoader, Config
29 29 from IPython.config.configurable import Configurable
30 30 from IPython.config.application import Application
31 31 from IPython.core.crashhandler import CrashHandler
32 32 from IPython.core.newapplication import BaseIPythonApplication
33 33 from IPython.core import release
34 34 from IPython.utils.path import (
35 35 get_ipython_package_dir,
36 36 get_ipython_dir,
37 37 expand_path
38 38 )
39 39 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Module errors
43 43 #-----------------------------------------------------------------------------
44 44
45 45 class ClusterDirError(Exception):
46 46 pass
47 47
48 48
49 49 class PIDFileError(Exception):
50 50 pass
51 51
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Class for managing cluster directories
55 55 #-----------------------------------------------------------------------------
56 56
57 57 class ClusterDir(Configurable):
58 58 """An object to manage the cluster directory and its resources.
59 59
60 60 The cluster directory is used by :command:`ipengine`,
61 61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
62 62 configuration, logging and security of these applications.
63 63
64 64 This object knows how to find, create and manage these directories. This
65 65 should be used by any code that want's to handle cluster directories.
66 66 """
67 67
68 68 security_dir_name = Unicode('security')
69 69 log_dir_name = Unicode('log')
70 70 pid_dir_name = Unicode('pid')
71 71 security_dir = Unicode(u'')
72 72 log_dir = Unicode(u'')
73 73 pid_dir = Unicode(u'')
74 74
75 75 auto_create = Bool(False,
76 76 help="""Whether to automatically create the ClusterDirectory if it does
77 77 not exist""")
78 78 overwrite = Bool(False,
79 79 help="""Whether to overwrite existing config files""")
80 80 location = Unicode(u'', config=True,
81 81 help="""Set the cluster dir. This overrides the logic used by the
82 82 `profile` option.""",
83 83 )
84 84 profile = Unicode(u'default', config=True,
85 85 help="""The string name of the profile to be used. This determines the name
86 86 of the cluster dir as: cluster_<profile>. The default profile is named
87 87 'default'. The cluster directory is resolve this way if the
88 88 `cluster_dir` option is not used."""
89 89 )
90 90
91 91 _location_isset = Bool(False) # flag for detecting multiply set location
92 92 _new_dir = Bool(False) # flag for whether a new dir was created
93 93
94 94 def __init__(self, **kwargs):
95 95 # make sure auto_create,overwrite are set *before* location
96 96 for name in ('auto_create', 'overwrite'):
97 97 v = kwargs.pop(name, None)
98 98 if v is not None:
99 99 setattr(self, name, v)
100 100 super(ClusterDir, self).__init__(**kwargs)
101 101 if not self.location:
102 102 self._profile_changed('profile', 'default', self.profile)
103 103
104 104 def _location_changed(self, name, old, new):
105 105 if self._location_isset:
106 106 raise RuntimeError("Cannot set ClusterDir more than once.")
107 107 self._location_isset = True
108 108 if not os.path.isdir(new):
109 109 if self.auto_create:# or self.config.ClusterDir.auto_create:
110 110 os.makedirs(new)
111 111 self._new_dir = True
112 112 else:
113 113 raise ClusterDirError('Directory not found: %s' % new)
114 114
115 115 # ensure config files exist:
116 116 self.copy_all_config_files(overwrite=self.overwrite)
117 117 self.security_dir = os.path.join(new, self.security_dir_name)
118 118 self.log_dir = os.path.join(new, self.log_dir_name)
119 119 self.pid_dir = os.path.join(new, self.pid_dir_name)
120 120 self.check_dirs()
121 121
122 122 def _profile_changed(self, name, old, new):
123 123 if self._location_isset:
124 124 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
125 125 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
126 126
127 127 def _log_dir_changed(self, name, old, new):
128 128 self.check_log_dir()
129 129
130 130 def check_log_dir(self):
131 131 if not os.path.isdir(self.log_dir):
132 132 os.mkdir(self.log_dir)
133 133
134 134 def _security_dir_changed(self, name, old, new):
135 135 self.check_security_dir()
136 136
137 137 def check_security_dir(self):
138 138 if not os.path.isdir(self.security_dir):
139 139 os.mkdir(self.security_dir, 0700)
140 140 os.chmod(self.security_dir, 0700)
141 141
142 142 def _pid_dir_changed(self, name, old, new):
143 143 self.check_pid_dir()
144 144
145 145 def check_pid_dir(self):
146 146 if not os.path.isdir(self.pid_dir):
147 147 os.mkdir(self.pid_dir, 0700)
148 148 os.chmod(self.pid_dir, 0700)
149 149
150 150 def check_dirs(self):
151 151 self.check_security_dir()
152 152 self.check_log_dir()
153 153 self.check_pid_dir()
154 154
155 155 def copy_config_file(self, config_file, path=None, overwrite=False):
156 156 """Copy a default config file into the active cluster directory.
157 157
158 158 Default configuration files are kept in :mod:`IPython.config.default`.
159 159 This function moves these from that location to the working cluster
160 160 directory.
161 161 """
162 162 if path is None:
163 163 import IPython.config.default
164 164 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
165 165 path = os.path.sep.join(path)
166 166 src = os.path.join(path, config_file)
167 167 dst = os.path.join(self.location, config_file)
168 168 if not os.path.isfile(dst) or overwrite:
169 169 shutil.copy(src, dst)
170 170
171 171 def copy_all_config_files(self, path=None, overwrite=False):
172 172 """Copy all config files into the active cluster directory."""
173 173 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
174 174 u'ipcluster_config.py']:
175 175 self.copy_config_file(f, path=path, overwrite=overwrite)
176 176
177 177 @classmethod
178 178 def create_cluster_dir(csl, cluster_dir):
179 179 """Create a new cluster directory given a full path.
180 180
181 181 Parameters
182 182 ----------
183 183 cluster_dir : str
184 184 The full path to the cluster directory. If it does exist, it will
185 185 be used. If not, it will be created.
186 186 """
187 187 return ClusterDir(location=cluster_dir)
188 188
189 189 @classmethod
190 190 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
191 191 """Create a cluster dir by profile name and path.
192 192
193 193 Parameters
194 194 ----------
195 195 path : str
196 196 The path (directory) to put the cluster directory in.
197 197 profile : str
198 198 The name of the profile. The name of the cluster directory will
199 199 be "cluster_<profile>".
200 200 """
201 201 if not os.path.isdir(path):
202 202 raise ClusterDirError('Directory not found: %s' % path)
203 203 cluster_dir = os.path.join(path, u'cluster_' + profile)
204 204 return ClusterDir(location=cluster_dir)
205 205
206 206 @classmethod
207 207 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
208 208 """Find an existing cluster dir by profile name, return its ClusterDir.
209 209
210 210 This searches through a sequence of paths for a cluster dir. If it
211 211 is not found, a :class:`ClusterDirError` exception will be raised.
212 212
213 213 The search path algorithm is:
214 214 1. ``os.getcwd()``
215 215 2. ``ipython_dir``
216 216 3. The directories found in the ":" separated
217 217 :env:`IPCLUSTER_DIR_PATH` environment variable.
218 218
219 219 Parameters
220 220 ----------
221 221 ipython_dir : unicode or str
222 222 The IPython directory to use.
223 223 profile : unicode or str
224 224 The name of the profile. The name of the cluster directory
225 225 will be "cluster_<profile>".
226 226 """
227 227 dirname = u'cluster_' + profile
228 228 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
229 229 if cluster_dir_paths:
230 230 cluster_dir_paths = cluster_dir_paths.split(':')
231 231 else:
232 232 cluster_dir_paths = []
233 233 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
234 234 for p in paths:
235 235 cluster_dir = os.path.join(p, dirname)
236 236 if os.path.isdir(cluster_dir):
237 237 return ClusterDir(location=cluster_dir)
238 238 else:
239 239 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
240 240
241 241 @classmethod
242 242 def find_cluster_dir(cls, cluster_dir):
243 243 """Find/create a cluster dir and return its ClusterDir.
244 244
245 245 This will create the cluster directory if it doesn't exist.
246 246
247 247 Parameters
248 248 ----------
249 249 cluster_dir : unicode or str
250 250 The path of the cluster directory. This is expanded using
251 251 :func:`IPython.utils.genutils.expand_path`.
252 252 """
253 253 cluster_dir = expand_path(cluster_dir)
254 254 if not os.path.isdir(cluster_dir):
255 255 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
256 256 return ClusterDir(location=cluster_dir)
257 257
258 258
259 259 #-----------------------------------------------------------------------------
260 260 # Crash handler for this application
261 261 #-----------------------------------------------------------------------------
262 262
263 263
264 264 _message_template = """\
265 265 Oops, $self.app_name crashed. We do our best to make it stable, but...
266 266
267 267 A crash report was automatically generated with the following information:
268 268 - A verbatim copy of the crash traceback.
269 269 - Data on your current $self.app_name configuration.
270 270
271 271 It was left in the file named:
272 272 \t'$self.crash_report_fname'
273 273 If you can email this file to the developers, the information in it will help
274 274 them in understanding and correcting the problem.
275 275
276 276 You can mail it to: $self.contact_name at $self.contact_email
277 277 with the subject '$self.app_name Crash Report'.
278 278
279 279 If you want to do it now, the following command will work (under Unix):
280 280 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
281 281
282 282 To ensure accurate tracking of this issue, please file a report about it at:
283 283 $self.bug_tracker
284 284 """
285 285
286 286 class ClusterDirCrashHandler(CrashHandler):
287 287 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
288 288
289 289 message_template = _message_template
290 290
291 291 def __init__(self, app):
292 292 contact_name = release.authors['Min'][0]
293 293 contact_email = release.authors['Min'][1]
294 294 bug_tracker = 'http://github.com/ipython/ipython/issues'
295 295 super(ClusterDirCrashHandler,self).__init__(
296 296 app, contact_name, contact_email, bug_tracker
297 297 )
298 298
299 299
300 300 #-----------------------------------------------------------------------------
301 301 # Main application
302 302 #-----------------------------------------------------------------------------
303 303 base_aliases = {
304 304 'profile' : "ClusterDir.profile",
305 305 'cluster_dir' : 'ClusterDir.location',
306 306 'auto_create' : 'ClusterDirApplication.auto_create',
307 307 'log_level' : 'ClusterApplication.log_level',
308 308 'work_dir' : 'ClusterApplication.work_dir',
309 309 'log_to_file' : 'ClusterApplication.log_to_file',
310 310 'clean_logs' : 'ClusterApplication.clean_logs',
311 311 'log_url' : 'ClusterApplication.log_url',
312 312 }
313 313
314 314 base_flags = {
315 315 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
316 316 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
317 317 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
318 318 }
319 319 for k,v in base_flags.iteritems():
320 320 base_flags[k] = (Config(v[0]),v[1])
321 321
322 322 class ClusterApplication(BaseIPythonApplication):
323 323 """An application that puts everything into a cluster directory.
324 324
325 325 Instead of looking for things in the ipython_dir, this type of application
326 326 will use its own private directory called the "cluster directory"
327 327 for things like config files, log files, etc.
328 328
329 329 The cluster directory is resolved as follows:
330 330
331 331 * If the ``--cluster-dir`` option is given, it is used.
332 332 * If ``--cluster-dir`` is not given, the application directory is
333 333 resolve using the profile name as ``cluster_<profile>``. The search
334 334 path for this directory is then i) cwd if it is found there
335 335 and ii) in ipython_dir otherwise.
336 336
337 337 The config file for the application is to be put in the cluster
338 338 dir and named the value of the ``config_file_name`` class attribute.
339 339 """
340 340
341 341 crash_handler_class = ClusterDirCrashHandler
342 342 auto_create_cluster_dir = Bool(True, config=True,
343 343 help="whether to create the cluster_dir if it doesn't exist")
344 344 cluster_dir = Instance(ClusterDir)
345 345 classes = [ClusterDir]
346 346
347 347 def _log_level_default(self):
348 348 # temporarily override default_log_level to INFO
349 349 return logging.INFO
350 350
351 351 work_dir = Unicode(os.getcwdu(), config=True,
352 352 help='Set the working dir for the process.'
353 353 )
354 354 def _work_dir_changed(self, name, old, new):
355 355 self.work_dir = unicode(expand_path(new))
356 356
357 357 log_to_file = Bool(config=True,
358 358 help="whether to log to a file")
359 359
360 360 clean_logs = Bool(False, shortname='--clean-logs', config=True,
361 361 help="whether to cleanup old logfiles before starting")
362 362
363 363 log_url = Unicode('', shortname='--log-url', config=True,
364 364 help="The ZMQ URL of the iplogger to aggregate logging.")
365 365
366 366 config_file = Unicode(u'', config=True,
367 367 help="""Path to ipcontroller configuration file. The default is to use
368 368 <appname>_config.py, as found by cluster-dir."""
369 369 )
370 370
371 371 loop = Instance('zmq.eventloop.ioloop.IOLoop')
372 372 def _loop_default(self):
373 373 from zmq.eventloop.ioloop import IOLoop
374 374 return IOLoop.instance()
375 375
376 376 aliases = Dict(base_aliases)
377 377 flags = Dict(base_flags)
378 378
379 379 def init_clusterdir(self):
380 380 """This resolves the cluster directory.
381 381
382 382 This tries to find the cluster directory and if successful, it will
383 383 have done:
384 384 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
385 385 the application.
386 386 * Sets ``self.cluster_dir`` attribute of the application and config
387 387 objects.
388 388
389 389 The algorithm used for this is as follows:
390 390 1. Try ``Global.cluster_dir``.
391 391 2. Try using ``Global.profile``.
392 392 3. If both of these fail and ``self.auto_create_cluster_dir`` is
393 393 ``True``, then create the new cluster dir in the IPython directory.
394 394 4. If all fails, then raise :class:`ClusterDirError`.
395 395 """
396 396 try:
397 397 self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config)
398 398 except ClusterDirError as e:
399 399 self.log.fatal("Error initializing cluster dir: %s"%e)
400 400 self.log.fatal("A cluster dir must be created before running this command.")
401 401 self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more "
402 402 "information about creating and listing cluster dirs."
403 403 )
404 404 self.exit(1)
405 405
406 406 if self.cluster_dir._new_dir:
407 407 self.log.info('Creating new cluster dir: %s' % \
408 408 self.cluster_dir.location)
409 409 else:
410 410 self.log.info('Using existing cluster dir: %s' % \
411 411 self.cluster_dir.location)
412 412
413 413 def initialize(self, argv=None):
414 414 """initialize the app"""
415 415 self.init_crash_handler()
416 416 self.parse_command_line(argv)
417 417 cl_config = self.config
418 418 self.init_clusterdir()
419 419 if self.config_file:
420 420 self.load_config_file(self.config_file)
421 else:
422 self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location)
421 elif self.default_config_file_name:
422 try:
423 self.load_config_file(self.default_config_file_name,
424 path=self.cluster_dir.location)
425 except IOError:
426 self.log.warn("Warning: Default config file not found")
423 427 # command-line should *override* config file, but command-line is necessary
424 428 # to determine clusterdir, etc.
425 429 self.update_config(cl_config)
426 self.reinit_logging()
427
428 430 self.to_work_dir()
431 self.reinit_logging()
429 432
430 433 def to_work_dir(self):
431 434 wd = self.work_dir
432 435 if unicode(wd) != os.getcwdu():
433 436 os.chdir(wd)
434 437 self.log.info("Changing to working dir: %s" % wd)
438 # This is the working dir by now.
439 sys.path.insert(0, '')
435 440
436 441 def load_config_file(self, filename, path=None):
437 442 """Load a .py based config file by filename and path."""
438 443 # use config.application.Application.load_config
439 444 # instead of inflexible core.newapplication.BaseIPythonApplication.load_config
440 445 return Application.load_config_file(self, filename, path=path)
441 446 #
442 447 # def load_default_config_file(self):
443 448 # """Load a .py based config file by filename and path."""
444 449 # return BaseIPythonApplication.load_config_file(self)
445 450
446 451 # disable URL-logging
447 452 def reinit_logging(self):
448 453 # Remove old log files
449 454 log_dir = self.cluster_dir.log_dir
450 455 if self.clean_logs:
451 456 for f in os.listdir(log_dir):
452 457 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
453 458 os.remove(os.path.join(log_dir, f))
454 459 if self.log_to_file:
455 460 # Start logging to the new log file
456 461 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
457 462 logfile = os.path.join(log_dir, log_filename)
458 463 open_log_file = open(logfile, 'w')
459 464 else:
460 465 open_log_file = None
461 466 if open_log_file is not None:
462 467 self.log.removeHandler(self._log_handler)
463 468 self._log_handler = logging.StreamHandler(open_log_file)
464 469 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
465 470 self._log_handler.setFormatter(self._log_formatter)
466 471 self.log.addHandler(self._log_handler)
467 472
468 473 def write_pid_file(self, overwrite=False):
469 474 """Create a .pid file in the pid_dir with my pid.
470 475
471 476 This must be called after pre_construct, which sets `self.pid_dir`.
472 477 This raises :exc:`PIDFileError` if the pid file exists already.
473 478 """
474 479 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
475 480 if os.path.isfile(pid_file):
476 481 pid = self.get_pid_from_file()
477 482 if not overwrite:
478 483 raise PIDFileError(
479 484 'The pid file [%s] already exists. \nThis could mean that this '
480 485 'server is already running with [pid=%s].' % (pid_file, pid)
481 486 )
482 487 with open(pid_file, 'w') as f:
483 488 self.log.info("Creating pid file: %s" % pid_file)
484 489 f.write(repr(os.getpid())+'\n')
485 490
486 491 def remove_pid_file(self):
487 492 """Remove the pid file.
488 493
489 494 This should be called at shutdown by registering a callback with
490 495 :func:`reactor.addSystemEventTrigger`. This needs to return
491 496 ``None``.
492 497 """
493 498 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
494 499 if os.path.isfile(pid_file):
495 500 try:
496 501 self.log.info("Removing pid file: %s" % pid_file)
497 502 os.remove(pid_file)
498 503 except:
499 504 self.log.warn("Error removing the pid file: %s" % pid_file)
500 505
501 506 def get_pid_from_file(self):
502 507 """Get the pid from the pid file.
503 508
504 509 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
505 510 """
506 511 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
507 512 if os.path.isfile(pid_file):
508 513 with open(pid_file, 'r') as f:
509 514 pid = int(f.read().strip())
510 515 return pid
511 516 else:
512 517 raise PIDFileError('pid file not found: %s' % pid_file)
513 518
514 519 def check_pid(self, pid):
515 520 if os.name == 'nt':
516 521 try:
517 522 import ctypes
518 523 # returns 0 if no such process (of ours) exists
519 524 # positive int otherwise
520 525 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
521 526 except Exception:
522 527 self.log.warn(
523 528 "Could not determine whether pid %i is running via `OpenProcess`. "
524 529 " Making the likely assumption that it is."%pid
525 530 )
526 531 return True
527 532 return bool(p)
528 533 else:
529 534 try:
530 535 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
531 536 output,_ = p.communicate()
532 537 except OSError:
533 538 self.log.warn(
534 539 "Could not determine whether pid %i is running via `ps x`. "
535 540 " Making the likely assumption that it is."%pid
536 541 )
537 542 return True
538 543 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
539 544 return pid in pids
@@ -1,401 +1,403 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import os
22 22 import logging
23 23 import socket
24 24 import stat
25 25 import sys
26 26 import uuid
27 27
28 28 from multiprocessing import Process
29 29
30 30 import zmq
31 31 from zmq.devices import ProcessMonitoredQueue
32 32 from zmq.log.handlers import PUBHandler
33 33 from zmq.utils import jsonapi as json
34 34
35 35 from IPython.config.loader import Config
36 36
37 37 from IPython.parallel import factory
38 38
39 39 from IPython.parallel.apps.clusterdir import (
40 40 ClusterDir,
41 41 ClusterApplication,
42 42 base_flags
43 43 # ClusterDirConfigLoader
44 44 )
45 45 from IPython.utils.importstring import import_item
46 46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
47 47
48 48 # from IPython.parallel.controller.controller import ControllerFactory
49 49 from IPython.parallel.streamsession import StreamSession
50 50 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 51 from IPython.parallel.controller.hub import Hub, HubFactory
52 52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 53 from IPython.parallel.controller.sqlitedb import SQLiteDB
54 54
55 55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
56 56
57 57 # conditional import of MongoDB backend class
58 58
59 59 try:
60 60 from IPython.parallel.controller.mongodb import MongoDB
61 61 except ImportError:
62 62 maybe_mongo = []
63 63 else:
64 64 maybe_mongo = [MongoDB]
65 65
66 66
67 67 #-----------------------------------------------------------------------------
68 68 # Module level variables
69 69 #-----------------------------------------------------------------------------
70 70
71 71
72 72 #: The default config file name for this application
73 73 default_config_file_name = u'ipcontroller_config.py'
74 74
75 75
76 76 _description = """Start the IPython controller for parallel computing.
77 77
78 78 The IPython controller provides a gateway between the IPython engines and
79 79 clients. The controller needs to be started before the engines and can be
80 80 configured using command line options or using a cluster directory. Cluster
81 81 directories contain config, log and security files and are usually located in
82 82 your ipython directory and named as "cluster_<profile>". See the --profile
83 83 and --cluster-dir options for details.
84 84 """
85 85
86 86
87 87
88 88
89 89 #-----------------------------------------------------------------------------
90 90 # The main application
91 91 #-----------------------------------------------------------------------------
92 92 flags = {}
93 93 flags.update(base_flags)
94 94 flags.update({
95 95 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
96 96 'Use threads instead of processes for the schedulers'),
97 97 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
98 98 'use the SQLiteDB backend'),
99 99 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
100 100 'use the MongoDB backend'),
101 101 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
102 102 'use the in-memory DictDB backend'),
103 103 })
104 104
105 105 flags.update()
106 106
107 107 class IPControllerApp(ClusterApplication):
108 108
109 109 name = u'ipcontroller'
110 110 description = _description
111 111 # command_line_loader = IPControllerAppConfigLoader
112 112 default_config_file_name = default_config_file_name
113 113 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
114 114
115 115 auto_create_cluster_dir = Bool(True, config=True,
116 116 help="Whether to create cluster_dir if it exists.")
117 117 reuse_files = Bool(False, config=True,
118 118 help='Whether to reuse existing json connection files [default: False]'
119 119 )
120 120 secure = Bool(True, config=True,
121 121 help='Whether to use exec_keys for extra authentication [default: True]'
122 122 )
123 123 ssh_server = Unicode(u'', config=True,
124 124 help="""ssh url for clients to use when connecting to the Controller
125 125 processes. It should be of the form: [user@]server[:port]. The
126 126 Controller\'s listening addresses must be accessible from the ssh server""",
127 127 )
128 128 location = Unicode(u'', config=True,
129 129 help="""The external IP or domain name of the Controller, used for disambiguating
130 130 engine and client connections.""",
131 131 )
132 132 import_statements = List([], config=True,
133 133 help="import statements to be run at startup. Necessary in some environments"
134 134 )
135 135
136 136 usethreads = Bool(False, config=True,
137 137 help='Use threads instead of processes for the schedulers',
138 138 )
139 139
140 140 # internal
141 141 children = List()
142 142 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
143 143
144 144 def _usethreads_changed(self, name, old, new):
145 145 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
146 146
147 147 aliases = Dict(dict(
148 148 config = 'IPControllerApp.config_file',
149 149 # file = 'IPControllerApp.url_file',
150 150 log_level = 'IPControllerApp.log_level',
151 log_url = 'IPControllerApp.log_url',
151 152 reuse_files = 'IPControllerApp.reuse_files',
152 153 secure = 'IPControllerApp.secure',
153 154 ssh = 'IPControllerApp.ssh_server',
154 155 usethreads = 'IPControllerApp.usethreads',
155 156 import_statements = 'IPControllerApp.import_statements',
156 157 location = 'IPControllerApp.location',
157 158
158 159 ident = 'StreamSession.session',
159 160 user = 'StreamSession.username',
160 161 exec_key = 'StreamSession.keyfile',
161 162
162 163 url = 'HubFactory.url',
163 164 ip = 'HubFactory.ip',
164 165 transport = 'HubFactory.transport',
165 166 port = 'HubFactory.regport',
166 167
167 168 ping = 'HeartMonitor.period',
168 169
169 170 scheme = 'TaskScheduler.scheme_name',
170 171 hwm = 'TaskScheduler.hwm',
171 172
172 173
173 174 profile = "ClusterDir.profile",
174 175 cluster_dir = 'ClusterDir.location',
175 176
176 177 ))
177 178 flags = Dict(flags)
178 179
179 180
180 181 def save_connection_dict(self, fname, cdict):
181 182 """save a connection dict to json file."""
182 183 c = self.config
183 184 url = cdict['url']
184 185 location = cdict['location']
185 186 if not location:
186 187 try:
187 188 proto,ip,port = split_url(url)
188 189 except AssertionError:
189 190 pass
190 191 else:
191 192 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
192 193 cdict['location'] = location
193 194 fname = os.path.join(self.cluster_dir.security_dir, fname)
194 195 with open(fname, 'w') as f:
195 196 f.write(json.dumps(cdict, indent=2))
196 197 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
197 198
198 199 def load_config_from_json(self):
199 200 """load config from existing json connector files."""
200 201 c = self.config
201 202 # load from engine config
202 203 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
203 204 cfg = json.loads(f.read())
204 205 key = c.StreamSession.key = cfg['exec_key']
205 206 xport,addr = cfg['url'].split('://')
206 207 c.HubFactory.engine_transport = xport
207 208 ip,ports = addr.split(':')
208 209 c.HubFactory.engine_ip = ip
209 210 c.HubFactory.regport = int(ports)
210 211 self.location = cfg['location']
211 212
212 213 # load client config
213 214 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
214 215 cfg = json.loads(f.read())
215 216 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
216 217 xport,addr = cfg['url'].split('://')
217 218 c.HubFactory.client_transport = xport
218 219 ip,ports = addr.split(':')
219 220 c.HubFactory.client_ip = ip
220 221 self.ssh_server = cfg['ssh']
221 222 assert int(ports) == c.HubFactory.regport, "regport mismatch"
222 223
223 224 def init_hub(self):
224 # This is the working dir by now.
225 sys.path.insert(0, '')
226 225 c = self.config
227 226
228 227 self.do_import_statements()
229 228 reusing = self.reuse_files
230 229 if reusing:
231 230 try:
232 231 self.load_config_from_json()
233 232 except (AssertionError,IOError):
234 233 reusing=False
235 234 # check again, because reusing may have failed:
236 235 if reusing:
237 236 pass
238 237 elif self.secure:
239 238 key = str(uuid.uuid4())
240 239 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
241 240 # with open(keyfile, 'w') as f:
242 241 # f.write(key)
243 242 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
244 243 c.StreamSession.key = key
245 244 else:
246 245 key = c.StreamSession.key = ''
247 246
248 247 try:
249 248 self.factory = HubFactory(config=c, log=self.log)
250 249 # self.start_logging()
251 250 self.factory.init_hub()
252 251 except:
253 252 self.log.error("Couldn't construct the Controller", exc_info=True)
254 253 self.exit(1)
255 254
256 255 if not reusing:
257 256 # save to new json config files
258 257 f = self.factory
259 258 cdict = {'exec_key' : key,
260 259 'ssh' : self.ssh_server,
261 260 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
262 261 'location' : self.location
263 262 }
264 263 self.save_connection_dict('ipcontroller-client.json', cdict)
265 264 edict = cdict
266 265 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
267 266 self.save_connection_dict('ipcontroller-engine.json', edict)
268 267
269 268 #
270 269 def init_schedulers(self):
271 270 children = self.children
272 mq = import_item(self.mq_class)
271 mq = import_item(str(self.mq_class))
273 272
274 273 hub = self.factory
275 274 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
276 275 # IOPub relay (in a Process)
277 276 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
278 277 q.bind_in(hub.client_info['iopub'])
279 278 q.bind_out(hub.engine_info['iopub'])
280 279 q.setsockopt_out(zmq.SUBSCRIBE, '')
281 280 q.connect_mon(hub.monitor_url)
282 281 q.daemon=True
283 282 children.append(q)
284 283
285 284 # Multiplexer Queue (in a Process)
286 285 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
287 286 q.bind_in(hub.client_info['mux'])
288 287 q.setsockopt_in(zmq.IDENTITY, 'mux')
289 288 q.bind_out(hub.engine_info['mux'])
290 289 q.connect_mon(hub.monitor_url)
291 290 q.daemon=True
292 291 children.append(q)
293 292
294 293 # Control Queue (in a Process)
295 294 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
296 295 q.bind_in(hub.client_info['control'])
297 296 q.setsockopt_in(zmq.IDENTITY, 'control')
298 297 q.bind_out(hub.engine_info['control'])
299 298 q.connect_mon(hub.monitor_url)
300 299 q.daemon=True
301 300 children.append(q)
302 301 try:
303 302 scheme = self.config.TaskScheduler.scheme_name
304 303 except AttributeError:
305 304 scheme = TaskScheduler.scheme_name.get_default_value()
306 305 # Task Queue (in a Process)
307 306 if scheme == 'pure':
308 307 self.log.warn("task::using pure XREQ Task scheduler")
309 308 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
310 309 # q.setsockopt_out(zmq.HWM, hub.hwm)
311 310 q.bind_in(hub.client_info['task'][1])
312 311 q.setsockopt_in(zmq.IDENTITY, 'task')
313 312 q.bind_out(hub.engine_info['task'])
314 313 q.connect_mon(hub.monitor_url)
315 314 q.daemon=True
316 315 children.append(q)
317 316 elif scheme == 'none':
318 317 self.log.warn("task::using no Task scheduler")
319 318
320 319 else:
321 320 self.log.info("task::using Python %s Task scheduler"%scheme)
322 321 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
323 322 hub.monitor_url, hub.client_info['notification'])
324 kwargs = dict(logname=self.log.name, loglevel=self.log_level,
325 config=dict(self.config))
323 kwargs = dict(logname='scheduler', loglevel=self.log_level,
324 log_url = self.log_url, config=dict(self.config))
326 325 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
327 326 q.daemon=True
328 327 children.append(q)
329 328
330 329
331 330 def save_urls(self):
332 331 """save the registration urls to files."""
333 332 c = self.config
334 333
335 334 sec_dir = self.cluster_dir.security_dir
336 335 cf = self.factory
337 336
338 337 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
339 338 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
340 339
341 340 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
342 341 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
343 342
344 343
345 344 def do_import_statements(self):
346 345 statements = self.import_statements
347 346 for s in statements:
348 347 try:
349 348 self.log.msg("Executing statement: '%s'" % s)
350 349 exec s in globals(), locals()
351 350 except:
352 351 self.log.msg("Error running statement: %s" % s)
353 352
354 # def start_logging(self):
355 # super(IPControllerApp, self).start_logging()
356 # if self.config.Global.log_url:
357 # context = self.factory.context
358 # lsock = context.socket(zmq.PUB)
359 # lsock.connect(self.config.Global.log_url)
360 # handler = PUBHandler(lsock)
361 # handler.root_topic = 'controller'
362 # handler.setLevel(self.log_level)
363 # self.log.addHandler(handler)
353 def forward_logging(self):
354 if self.log_url:
355 self.log.info("Forwarding logging to %s"%self.log_url)
356 context = zmq.Context.instance()
357 lsock = context.socket(zmq.PUB)
358 lsock.connect(self.log_url)
359 handler = PUBHandler(lsock)
360 self.log.removeHandler(self._log_handler)
361 handler.root_topic = 'controller'
362 handler.setLevel(self.log_level)
363 self.log.addHandler(handler)
364 self._log_handler = handler
364 365 # #
365 366
366 367 def initialize(self, argv=None):
367 368 super(IPControllerApp, self).initialize(argv)
369 self.forward_logging()
368 370 self.init_hub()
369 371 self.init_schedulers()
370 372
371 373 def start(self):
372 374 # Start the subprocesses:
373 375 self.factory.start()
374 376 child_procs = []
375 377 for child in self.children:
376 378 child.start()
377 379 if isinstance(child, ProcessMonitoredQueue):
378 380 child_procs.append(child.launcher)
379 381 elif isinstance(child, Process):
380 382 child_procs.append(child)
381 383 if child_procs:
382 384 signal_children(child_procs)
383 385
384 386 self.write_pid_file(overwrite=True)
385 387
386 388 try:
387 389 self.factory.loop.start()
388 390 except KeyboardInterrupt:
389 391 self.log.critical("Interrupted, Exiting...\n")
390 392
391 393
392 394
393 395 def launch_new_instance():
394 396 """Create and run the IPython controller"""
395 397 app = IPControllerApp()
396 398 app.initialize()
397 399 app.start()
398 400
399 401
400 402 if __name__ == '__main__':
401 403 launch_new_instance()
@@ -1,289 +1,277 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import json
19 19 import os
20 20 import sys
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24
25 25 from IPython.parallel.apps.clusterdir import (
26 26 ClusterApplication,
27 27 ClusterDir,
28 base_aliases,
29 28 # ClusterDirConfigLoader
30 29 )
31 30 from IPython.zmq.log import EnginePUBHandler
32 31
33 32 from IPython.config.configurable import Configurable
34 33 from IPython.parallel.streamsession import StreamSession
35 34 from IPython.parallel.engine.engine import EngineFactory
36 35 from IPython.parallel.engine.streamkernel import Kernel
37 36 from IPython.parallel.util import disambiguate_url
38 37
39 38 from IPython.utils.importstring import import_item
40 from IPython.utils.traitlets import Bool, Unicode, Dict, List, CStr
39 from IPython.utils.traitlets import Bool, Unicode, Dict, List
41 40
42 41
43 42 #-----------------------------------------------------------------------------
44 43 # Module level variables
45 44 #-----------------------------------------------------------------------------
46 45
47 46 #: The default config file name for this application
48 47 default_config_file_name = u'ipengine_config.py'
49 48
50 49 _description = """Start an IPython engine for parallel computing.\n\n
51 50
52 51 IPython engines run in parallel and perform computations on behalf of a client
53 52 and controller. A controller needs to be started before the engines. The
54 53 engine can be configured using command line options or using a cluster
55 54 directory. Cluster directories contain config, log and security files and are
56 55 usually located in your ipython directory and named as "cluster_<profile>".
57 56 See the `profile` and `cluster_dir` options for details.
58 57 """
59 58
60 59
61 60 #-----------------------------------------------------------------------------
62 61 # MPI configuration
63 62 #-----------------------------------------------------------------------------
64 63
65 64 mpi4py_init = """from mpi4py import MPI as mpi
66 65 mpi.size = mpi.COMM_WORLD.Get_size()
67 66 mpi.rank = mpi.COMM_WORLD.Get_rank()
68 67 """
69 68
70 69
71 70 pytrilinos_init = """from PyTrilinos import Epetra
72 71 class SimpleStruct:
73 72 pass
74 73 mpi = SimpleStruct()
75 74 mpi.rank = 0
76 75 mpi.size = 0
77 76 """
78 77
79 78 class MPI(Configurable):
80 79 """Configurable for MPI initialization"""
81 80 use = Unicode('', config=True,
82 81 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
83 82 )
84 83
85 84 def _on_use_changed(self, old, new):
86 85 # load default init script if it's not set
87 86 if not self.init_script:
88 87 self.init_script = self.default_inits.get(new, '')
89 88
90 89 init_script = Unicode('', config=True,
91 90 help="Initialization code for MPI")
92 91
93 92 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
94 93 config=True)
95 94
96 95
97 96 #-----------------------------------------------------------------------------
98 97 # Main application
99 98 #-----------------------------------------------------------------------------
100 99
101 100
102 101 class IPEngineApp(ClusterApplication):
103 102
104 103 app_name = Unicode(u'ipengine')
105 104 description = Unicode(_description)
106 105 default_config_file_name = default_config_file_name
107 106 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
108 107
109 108 auto_create_cluster_dir = Bool(False,
110 109 help="whether to create the cluster_dir if it doesn't exist")
111 110
112 111 startup_script = Unicode(u'', config=True,
113 112 help='specify a script to be run at startup')
114 113 startup_command = Unicode('', config=True,
115 114 help='specify a command to be run at startup')
116 115
117 116 url_file = Unicode(u'', config=True,
118 117 help="""The full location of the file containing the connection information for
119 118 the controller. If this is not given, the file must be in the
120 119 security directory of the cluster directory. This location is
121 120 resolved using the `profile` or `cluster_dir` options.""",
122 121 )
123 122
124 123 url_file_name = Unicode(u'ipcontroller-engine.json')
124 log_url = Unicode('', config=True,
125 help="""The URL for the iploggerapp instance, for forwarding
126 logging to a central location.""")
125 127
126 128 aliases = Dict(dict(
127 129 config = 'IPEngineApp.config_file',
128 130 file = 'IPEngineApp.url_file',
129 131 c = 'IPEngineApp.startup_command',
130 132 s = 'IPEngineApp.startup_script',
131 133
132 134 ident = 'StreamSession.session',
133 135 user = 'StreamSession.username',
134 136 exec_key = 'StreamSession.keyfile',
135 137
136 138 url = 'EngineFactory.url',
137 139 ip = 'EngineFactory.ip',
138 140 transport = 'EngineFactory.transport',
139 141 port = 'EngineFactory.regport',
140 142 location = 'EngineFactory.location',
141 143
142 144 timeout = 'EngineFactory.timeout',
143 145
144 146 profile = "ClusterDir.profile",
145 147 cluster_dir = 'ClusterDir.location',
146 148
147 149 mpi = 'MPI.use',
148 150
149 151 log_level = 'IPEngineApp.log_level',
152 log_url = 'IPEngineApp.log_url'
150 153 ))
151 154
152 155 # def find_key_file(self):
153 156 # """Set the key file.
154 157 #
155 158 # Here we don't try to actually see if it exists for is valid as that
156 159 # is hadled by the connection logic.
157 160 # """
158 161 # config = self.master_config
159 162 # # Find the actual controller key file
160 163 # if not config.Global.key_file:
161 164 # try_this = os.path.join(
162 165 # config.Global.cluster_dir,
163 166 # config.Global.security_dir,
164 167 # config.Global.key_file_name
165 168 # )
166 169 # config.Global.key_file = try_this
167 170
168 171 def find_url_file(self):
169 172 """Set the key file.
170 173
171 174 Here we don't try to actually see if it exists for is valid as that
172 175 is hadled by the connection logic.
173 176 """
174 177 config = self.config
175 178 # Find the actual controller key file
176 179 if not self.url_file:
177 180 self.url_file = os.path.join(
178 181 self.cluster_dir.security_dir,
179 182 self.url_file_name
180 183 )
181 184 def init_engine(self):
182 185 # This is the working dir by now.
183 186 sys.path.insert(0, '')
184 187 config = self.config
185 188 # print config
186 189 self.find_url_file()
187 190
188 191 # if os.path.exists(config.Global.key_file) and config.Global.secure:
189 192 # config.SessionFactory.exec_key = config.Global.key_file
190 193 if os.path.exists(self.url_file):
191 194 with open(self.url_file) as f:
192 195 d = json.loads(f.read())
193 196 for k,v in d.iteritems():
194 197 if isinstance(v, unicode):
195 198 d[k] = v.encode()
196 199 if d['exec_key']:
197 200 config.StreamSession.key = d['exec_key']
198 201 d['url'] = disambiguate_url(d['url'], d['location'])
199 202 config.EngineFactory.url = d['url']
200 203 config.EngineFactory.location = d['location']
201 204
202 205 try:
203 206 exec_lines = config.Kernel.exec_lines
204 207 except AttributeError:
205 208 config.Kernel.exec_lines = []
206 209 exec_lines = config.Kernel.exec_lines
207 210
208 211 if self.startup_script:
209 212 enc = sys.getfilesystemencoding() or 'utf8'
210 213 cmd="execfile(%r)"%self.startup_script.encode(enc)
211 214 exec_lines.append(cmd)
212 215 if self.startup_command:
213 216 exec_lines.append(self.startup_command)
214 217
215 218 # Create the underlying shell class and Engine
216 219 # shell_class = import_item(self.master_config.Global.shell_class)
217 220 # print self.config
218 221 try:
219 222 self.engine = EngineFactory(config=config, log=self.log)
220 223 except:
221 224 self.log.error("Couldn't start the Engine", exc_info=True)
222 225 self.exit(1)
223 226
224 # self.start_logging()
225
226 # Create the service hierarchy
227 # self.main_service = service.MultiService()
228 # self.engine_service.setServiceParent(self.main_service)
229 # self.tub_service = Tub()
230 # self.tub_service.setServiceParent(self.main_service)
231 # # This needs to be called before the connection is initiated
232 # self.main_service.startService()
233
234 # This initiates the connection to the controller and calls
235 # register_engine to tell the controller we are ready to do work
236 # self.engine_connector = EngineConnector(self.tub_service)
237
238 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
239
240 # reactor.callWhenRunning(self.call_connect)
241
242 # def start_logging(self):
243 # super(IPEngineApp, self).start_logging()
244 # if self.master_config.Global.log_url:
245 # context = self.engine.context
246 # lsock = context.socket(zmq.PUB)
247 # lsock.connect(self.master_config.Global.log_url)
248 # handler = EnginePUBHandler(self.engine, lsock)
249 # handler.setLevel(self.log_level)
250 # self.log.addHandler(handler)
227 def forward_logging(self):
228 if self.log_url:
229 self.log.info("Forwarding logging to %s"%self.log_url)
230 context = self.engine.context
231 lsock = context.socket(zmq.PUB)
232 lsock.connect(self.log_url)
233 self.log.removeHandler(self._log_handler)
234 handler = EnginePUBHandler(self.engine, lsock)
235 handler.setLevel(self.log_level)
236 self.log.addHandler(handler)
237 self._log_handler = handler
251 238 #
252 239 def init_mpi(self):
253 240 global mpi
254 241 self.mpi = MPI(config=self.config)
255 242
256 243 mpi_import_statement = self.mpi.init_script
257 244 if mpi_import_statement:
258 245 try:
259 246 self.log.info("Initializing MPI:")
260 247 self.log.info(mpi_import_statement)
261 248 exec mpi_import_statement in globals()
262 249 except:
263 250 mpi = None
264 251 else:
265 252 mpi = None
266 253
267 254 def initialize(self, argv=None):
268 255 super(IPEngineApp, self).initialize(argv)
269 256 self.init_mpi()
270 257 self.init_engine()
258 self.forward_logging()
271 259
272 260 def start(self):
273 261 self.engine.start()
274 262 try:
275 263 self.engine.loop.start()
276 264 except KeyboardInterrupt:
277 265 self.log.critical("Engine Interrupted, shutting down...\n")
278 266
279 267
280 268 def launch_new_instance():
281 269 """Create and run the IPython engine"""
282 270 app = IPEngineApp()
283 271 app.initialize()
284 272 app.start()
285 273
286 274
287 275 if __name__ == '__main__':
288 276 launch_new_instance()
289 277
@@ -1,132 +1,97 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 A simple IPython logger application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import sys
20 20
21 21 import zmq
22 22
23 from IPython.utils.traitlets import Bool, Dict
24
23 25 from IPython.parallel.apps.clusterdir import (
24 26 ClusterApplication,
25 ClusterDirConfigLoader
27 ClusterDir,
28 base_aliases
26 29 )
27 30 from IPython.parallel.apps.logwatcher import LogWatcher
28 31
29 32 #-----------------------------------------------------------------------------
30 33 # Module level variables
31 34 #-----------------------------------------------------------------------------
32 35
33 36 #: The default config file name for this application
34 37 default_config_file_name = u'iplogger_config.py'
35 38
36 39 _description = """Start an IPython logger for parallel computing.\n\n
37 40
38 41 IPython controllers and engines (and your own processes) can broadcast log messages
39 42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
40 43 logger can be configured using command line options or using a cluster
41 44 directory. Cluster directories contain config, log and security files and are
42 45 usually located in your ipython directory and named as "cluster_<profile>".
43 46 See the --profile and --cluster-dir options for details.
44 47 """
45 48
46 #-----------------------------------------------------------------------------
47 # Command line options
48 #-----------------------------------------------------------------------------
49
50
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52
53 def _add_arguments(self):
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 paa = self.parser.add_argument
56 # Controller config
57 paa('--url',
58 type=str, dest='LogWatcher.url',
59 help='The url the LogWatcher will listen on',
60 )
61 # MPI
62 paa('--topics',
63 type=str, dest='LogWatcher.topics', nargs='+',
64 help='What topics to subscribe to',
65 metavar='topics')
66 # Global config
67 paa('--log-to-file',
68 action='store_true', dest='Global.log_to_file',
69 help='Log to a file in the log directory (default is stdout)')
70
71 49
72 50 #-----------------------------------------------------------------------------
73 51 # Main application
74 52 #-----------------------------------------------------------------------------
75
53 aliases = {}
54 aliases.update(base_aliases)
55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
76 56
77 57 class IPLoggerApp(ClusterApplication):
78 58
79 59 name = u'iploggerz'
80 60 description = _description
81 command_line_loader = IPLoggerAppConfigLoader
82 61 default_config_file_name = default_config_file_name
83 auto_create_cluster_dir = True
84
85 def create_default_config(self):
86 super(IPLoggerApp, self).create_default_config()
87
88 # The engine should not clean logs as we don't want to remove the
89 # active log files of other running engines.
90 self.default_config.Global.clean_logs = False
91
92 # If given, this is the actual location of the logger's URL file.
93 # If not, this is computed using the profile, app_dir and furl_file_name
94 self.default_config.Global.url_file_name = u'iplogger.url'
95 self.default_config.Global.url_file = u''
96
97 def post_load_command_line_config(self):
98 pass
99
100 def pre_construct(self):
101 super(IPLoggerApp, self).pre_construct()
102
103 def construct(self):
104 # This is the working dir by now.
105 sys.path.insert(0, '')
106
107 self.start_logging()
108
62 auto_create_cluster_dir = Bool(False)
63
64 classes = [LogWatcher, ClusterDir]
65 aliases = Dict(aliases)
66
67 def initialize(self, argv=None):
68 super(IPLoggerApp, self).initialize(argv)
69 self.init_watcher()
70
71 def init_watcher(self):
109 72 try:
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
73 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
111 74 except:
112 75 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 76 self.exit(1)
77 self.log.info("Listening for log messages on %r"%self.watcher.url)
114 78
115 79
116 def start_app(self):
80 def start(self):
81 self.watcher.start()
117 82 try:
118 self.watcher.start()
119 83 self.watcher.loop.start()
120 84 except KeyboardInterrupt:
121 85 self.log.critical("Logging Interrupted, shutting down...\n")
122 86
123 87
124 88 def launch_new_instance():
125 89 """Create and run the IPython LogWatcher"""
126 90 app = IPLoggerApp()
91 app.initialize()
127 92 app.start()
128 93
129 94
130 95 if __name__ == '__main__':
131 96 launch_new_instance()
132 97
@@ -1,98 +1,108 b''
1 1 #!/usr/bin/env python
2 2 """A simple logger object that consolidates messages incoming from ipcluster processes."""
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2011 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 15
16 16 import logging
17 17 import sys
18 18
19 19 import zmq
20 20 from zmq.eventloop import ioloop, zmqstream
21 21
22 22 from IPython.utils.traitlets import Int, Unicode, Instance, List
23 23
24 24 from IPython.parallel.factory import LoggingFactory
25 25
26 26 #-----------------------------------------------------------------------------
27 27 # Classes
28 28 #-----------------------------------------------------------------------------
29 29
30 30
31 31 class LogWatcher(LoggingFactory):
32 32 """A simple class that receives messages on a SUB socket, as published
33 33 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
34 34
35 35 This can subscribe to multiple topics, but defaults to all topics.
36 36 """
37 37 # configurables
38 topics = List([''], config=True)
39 url = Unicode('tcp://127.0.0.1:20202', config=True)
38 topics = List([''], config=True,
39 help="The ZMQ topics to subscribe to. Default is to subscribe to all messages")
40 url = Unicode('tcp://127.0.0.1:20202', config=True,
41 help="ZMQ url on which to listen for log messages")
40 42
41 43 # internals
42 context = Instance(zmq.Context, (), {})
43 44 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
44 loop = Instance('zmq.eventloop.ioloop.IOLoop')
45
46 context = Instance(zmq.Context)
47 def _context_default(self):
48 return zmq.Context.instance()
49
50 loop = Instance(zmq.eventloop.ioloop.IOLoop)
45 51 def _loop_default(self):
46 52 return ioloop.IOLoop.instance()
47 53
48 54 def __init__(self, **kwargs):
49 55 super(LogWatcher, self).__init__(**kwargs)
50 56 s = self.context.socket(zmq.SUB)
51 57 s.bind(self.url)
52 58 self.stream = zmqstream.ZMQStream(s, self.loop)
53 59 self.subscribe()
54 60 self.on_trait_change(self.subscribe, 'topics')
55 61
56 62 def start(self):
57 63 self.stream.on_recv(self.log_message)
58 64
59 65 def stop(self):
60 66 self.stream.stop_on_recv()
61 67
62 68 def subscribe(self):
63 69 """Update our SUB socket's subscriptions."""
64 70 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
65 for topic in self.topics:
66 self.log.debug("Subscribing to: %r"%topic)
67 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
71 if '' in self.topics:
72 self.log.debug("Subscribing to: everything")
73 self.stream.setsockopt(zmq.SUBSCRIBE, '')
74 else:
75 for topic in self.topics:
76 self.log.debug("Subscribing to: %r"%(topic))
77 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
68 78
69 79 def _extract_level(self, topic_str):
70 80 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
71 81 topics = topic_str.split('.')
72 82 for idx,t in enumerate(topics):
73 83 level = getattr(logging, t, None)
74 84 if level is not None:
75 85 break
76 86
77 87 if level is None:
78 88 level = logging.INFO
79 89 else:
80 90 topics.pop(idx)
81 91
82 92 return level, '.'.join(topics)
83 93
84 94
85 95 def log_message(self, raw):
86 96 """receive and parse a message, then log it."""
87 97 if len(raw) != 2 or '.' not in raw[0]:
88 98 self.log.error("Invalid log message: %s"%raw)
89 99 return
90 100 else:
91 101 topic, msg = raw
92 102 # don't newline, since log messages always newline:
93 103 topic,level_name = topic.rsplit('.',1)
94 104 level,topic = self._extract_level(topic)
95 105 if msg[-1] == '\n':
96 106 msg = msg[:-1]
97 logging.log(level, "[%s] %s" % (topic, msg))
107 self.log.log(level, "[%s] %s" % (topic, msg))
98 108
@@ -1,677 +1,679 b''
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #----------------------------------------------------------------------
15 15 # Imports
16 16 #----------------------------------------------------------------------
17 17
18 18 from __future__ import print_function
19 19
20 20 import logging
21 21 import sys
22 22
23 23 from datetime import datetime, timedelta
24 24 from random import randint, random
25 25 from types import FunctionType
26 26
27 27 try:
28 28 import numpy
29 29 except ImportError:
30 30 numpy = None
31 31
32 32 import zmq
33 33 from zmq.eventloop import ioloop, zmqstream
34 34
35 35 # local imports
36 36 from IPython.external.decorator import decorator
37 37 from IPython.config.loader import Config
38 38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Str, Enum
39 39
40 40 from IPython.parallel import error
41 41 from IPython.parallel.factory import SessionFactory
42 42 from IPython.parallel.util import connect_logger, local_logger
43 43
44 44 from .dependency import Dependency
45 45
46 46 @decorator
47 47 def logged(f,self,*args,**kwargs):
48 48 # print ("#--------------------")
49 49 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
50 50 # print ("#--")
51 51 return f(self,*args, **kwargs)
52 52
53 53 #----------------------------------------------------------------------
54 54 # Chooser functions
55 55 #----------------------------------------------------------------------
56 56
57 57 def plainrandom(loads):
58 58 """Plain random pick."""
59 59 n = len(loads)
60 60 return randint(0,n-1)
61 61
62 62 def lru(loads):
63 63 """Always pick the front of the line.
64 64
65 65 The content of `loads` is ignored.
66 66
67 67 Assumes LRU ordering of loads, with oldest first.
68 68 """
69 69 return 0
70 70
71 71 def twobin(loads):
72 72 """Pick two at random, use the LRU of the two.
73 73
74 74 The content of loads is ignored.
75 75
76 76 Assumes LRU ordering of loads, with oldest first.
77 77 """
78 78 n = len(loads)
79 79 a = randint(0,n-1)
80 80 b = randint(0,n-1)
81 81 return min(a,b)
82 82
83 83 def weighted(loads):
84 84 """Pick two at random using inverse load as weight.
85 85
86 86 Return the less loaded of the two.
87 87 """
88 88 # weight 0 a million times more than 1:
89 89 weights = 1./(1e-6+numpy.array(loads))
90 90 sums = weights.cumsum()
91 91 t = sums[-1]
92 92 x = random()*t
93 93 y = random()*t
94 94 idx = 0
95 95 idy = 0
96 96 while sums[idx] < x:
97 97 idx += 1
98 98 while sums[idy] < y:
99 99 idy += 1
100 100 if weights[idy] > weights[idx]:
101 101 return idy
102 102 else:
103 103 return idx
104 104
105 105 def leastload(loads):
106 106 """Always choose the lowest load.
107 107
108 108 If the lowest load occurs more than once, the first
109 109 occurance will be used. If loads has LRU ordering, this means
110 110 the LRU of those with the lowest load is chosen.
111 111 """
112 112 return loads.index(min(loads))
113 113
114 114 #---------------------------------------------------------------------
115 115 # Classes
116 116 #---------------------------------------------------------------------
117 117 # store empty default dependency:
118 118 MET = Dependency([])
119 119
120 120 class TaskScheduler(SessionFactory):
121 121 """Python TaskScheduler object.
122 122
123 123 This is the simplest object that supports msg_id based
124 124 DAG dependencies. *Only* task msg_ids are checked, not
125 125 msg_ids of jobs submitted via the MUX queue.
126 126
127 127 """
128 128
129 129 hwm = Int(0, config=True, shortname='hwm',
130 130 help="""specify the High Water Mark (HWM) for the downstream
131 131 socket in the Task scheduler. This is the maximum number
132 132 of allowed outstanding tasks on each engine."""
133 133 )
134 134 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
135 135 'leastload', config=True, shortname='scheme', allow_none=False,
136 136 help="""select the task scheduler scheme [default: Python LRU]
137 137 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
138 138 )
139 139 def _scheme_name_changed(self, old, new):
140 140 self.log.debug("Using scheme %r"%new)
141 141 self.scheme = globals()[new]
142 142
143 143 # input arguments:
144 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
144 scheme = Instance(FunctionType) # function for determining the destination
145 def _scheme_default(self):
146 return leastload
145 147 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
146 148 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
147 149 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
148 150 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
149 151
150 152 # internals:
151 153 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
152 154 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
153 155 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
154 156 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
155 157 pending = Dict() # dict by engine_uuid of submitted tasks
156 158 completed = Dict() # dict by engine_uuid of completed tasks
157 159 failed = Dict() # dict by engine_uuid of failed tasks
158 160 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
159 161 clients = Dict() # dict by msg_id for who submitted the task
160 162 targets = List() # list of target IDENTs
161 163 loads = List() # list of engine loads
162 164 # full = Set() # set of IDENTs that have HWM outstanding tasks
163 165 all_completed = Set() # set of all completed tasks
164 166 all_failed = Set() # set of all failed tasks
165 167 all_done = Set() # set of all finished tasks=union(completed,failed)
166 168 all_ids = Set() # set of all submitted task IDs
167 169 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
168 170 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
169 171
170 172
171 173 def start(self):
172 174 self.engine_stream.on_recv(self.dispatch_result, copy=False)
173 175 self._notification_handlers = dict(
174 176 registration_notification = self._register_engine,
175 177 unregistration_notification = self._unregister_engine
176 178 )
177 179 self.notifier_stream.on_recv(self.dispatch_notification)
178 180 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
179 181 self.auditor.start()
180 182 self.log.info("Scheduler started...%r"%self)
181 183
182 184 def resume_receiving(self):
183 185 """Resume accepting jobs."""
184 186 self.client_stream.on_recv(self.dispatch_submission, copy=False)
185 187
186 188 def stop_receiving(self):
187 189 """Stop accepting jobs while there are no engines.
188 190 Leave them in the ZMQ queue."""
189 191 self.client_stream.on_recv(None)
190 192
191 193 #-----------------------------------------------------------------------
192 194 # [Un]Registration Handling
193 195 #-----------------------------------------------------------------------
194 196
195 197 def dispatch_notification(self, msg):
196 198 """dispatch register/unregister events."""
197 199 idents,msg = self.session.feed_identities(msg)
198 200 msg = self.session.unpack_message(msg)
199 201 msg_type = msg['msg_type']
200 202 handler = self._notification_handlers.get(msg_type, None)
201 203 if handler is None:
202 204 raise Exception("Unhandled message type: %s"%msg_type)
203 205 else:
204 206 try:
205 207 handler(str(msg['content']['queue']))
206 208 except KeyError:
207 209 self.log.error("task::Invalid notification msg: %s"%msg)
208 210
209 211 @logged
210 212 def _register_engine(self, uid):
211 213 """New engine with ident `uid` became available."""
212 214 # head of the line:
213 215 self.targets.insert(0,uid)
214 216 self.loads.insert(0,0)
215 217 # initialize sets
216 218 self.completed[uid] = set()
217 219 self.failed[uid] = set()
218 220 self.pending[uid] = {}
219 221 if len(self.targets) == 1:
220 222 self.resume_receiving()
221 223 # rescan the graph:
222 224 self.update_graph(None)
223 225
224 226 def _unregister_engine(self, uid):
225 227 """Existing engine with ident `uid` became unavailable."""
226 228 if len(self.targets) == 1:
227 229 # this was our only engine
228 230 self.stop_receiving()
229 231
230 232 # handle any potentially finished tasks:
231 233 self.engine_stream.flush()
232 234
233 235 # don't pop destinations, because they might be used later
234 236 # map(self.destinations.pop, self.completed.pop(uid))
235 237 # map(self.destinations.pop, self.failed.pop(uid))
236 238
237 239 # prevent this engine from receiving work
238 240 idx = self.targets.index(uid)
239 241 self.targets.pop(idx)
240 242 self.loads.pop(idx)
241 243
242 244 # wait 5 seconds before cleaning up pending jobs, since the results might
243 245 # still be incoming
244 246 if self.pending[uid]:
245 247 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
246 248 dc.start()
247 249 else:
248 250 self.completed.pop(uid)
249 251 self.failed.pop(uid)
250 252
251 253
252 254 @logged
253 255 def handle_stranded_tasks(self, engine):
254 256 """Deal with jobs resident in an engine that died."""
255 257 lost = self.pending[engine]
256 258 for msg_id in lost.keys():
257 259 if msg_id not in self.pending[engine]:
258 260 # prevent double-handling of messages
259 261 continue
260 262
261 263 raw_msg = lost[msg_id][0]
262 264
263 265 idents,msg = self.session.feed_identities(raw_msg, copy=False)
264 266 msg = self.session.unpack_message(msg, copy=False, content=False)
265 267 parent = msg['header']
266 268 idents = [engine, idents[0]]
267 269
268 270 # build fake error reply
269 271 try:
270 272 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
271 273 except:
272 274 content = error.wrap_exception()
273 275 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
274 276 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
275 277 # and dispatch it
276 278 self.dispatch_result(raw_reply)
277 279
278 280 # finally scrub completed/failed lists
279 281 self.completed.pop(engine)
280 282 self.failed.pop(engine)
281 283
282 284
283 285 #-----------------------------------------------------------------------
284 286 # Job Submission
285 287 #-----------------------------------------------------------------------
286 288 @logged
287 289 def dispatch_submission(self, raw_msg):
288 290 """Dispatch job submission to appropriate handlers."""
289 291 # ensure targets up to date:
290 292 self.notifier_stream.flush()
291 293 try:
292 294 idents, msg = self.session.feed_identities(raw_msg, copy=False)
293 295 msg = self.session.unpack_message(msg, content=False, copy=False)
294 296 except Exception:
295 297 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
296 298 return
297 299
298 300 # send to monitor
299 301 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
300 302
301 303 header = msg['header']
302 304 msg_id = header['msg_id']
303 305 self.all_ids.add(msg_id)
304 306
305 307 # targets
306 308 targets = set(header.get('targets', []))
307 309 retries = header.get('retries', 0)
308 310 self.retries[msg_id] = retries
309 311
310 312 # time dependencies
311 313 after = Dependency(header.get('after', []))
312 314 if after.all:
313 315 if after.success:
314 316 after.difference_update(self.all_completed)
315 317 if after.failure:
316 318 after.difference_update(self.all_failed)
317 319 if after.check(self.all_completed, self.all_failed):
318 320 # recast as empty set, if `after` already met,
319 321 # to prevent unnecessary set comparisons
320 322 after = MET
321 323
322 324 # location dependencies
323 325 follow = Dependency(header.get('follow', []))
324 326
325 327 # turn timeouts into datetime objects:
326 328 timeout = header.get('timeout', None)
327 329 if timeout:
328 330 timeout = datetime.now() + timedelta(0,timeout,0)
329 331
330 332 args = [raw_msg, targets, after, follow, timeout]
331 333
332 334 # validate and reduce dependencies:
333 335 for dep in after,follow:
334 336 # check valid:
335 337 if msg_id in dep or dep.difference(self.all_ids):
336 338 self.depending[msg_id] = args
337 339 return self.fail_unreachable(msg_id, error.InvalidDependency)
338 340 # check if unreachable:
339 341 if dep.unreachable(self.all_completed, self.all_failed):
340 342 self.depending[msg_id] = args
341 343 return self.fail_unreachable(msg_id)
342 344
343 345 if after.check(self.all_completed, self.all_failed):
344 346 # time deps already met, try to run
345 347 if not self.maybe_run(msg_id, *args):
346 348 # can't run yet
347 349 if msg_id not in self.all_failed:
348 350 # could have failed as unreachable
349 351 self.save_unmet(msg_id, *args)
350 352 else:
351 353 self.save_unmet(msg_id, *args)
352 354
353 355 # @logged
354 356 def audit_timeouts(self):
355 357 """Audit all waiting tasks for expired timeouts."""
356 358 now = datetime.now()
357 359 for msg_id in self.depending.keys():
358 360 # must recheck, in case one failure cascaded to another:
359 361 if msg_id in self.depending:
360 362 raw,after,targets,follow,timeout = self.depending[msg_id]
361 363 if timeout and timeout < now:
362 364 self.fail_unreachable(msg_id, error.TaskTimeout)
363 365
364 366 @logged
365 367 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
366 368 """a task has become unreachable, send a reply with an ImpossibleDependency
367 369 error."""
368 370 if msg_id not in self.depending:
369 371 self.log.error("msg %r already failed!"%msg_id)
370 372 return
371 373 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
372 374 for mid in follow.union(after):
373 375 if mid in self.graph:
374 376 self.graph[mid].remove(msg_id)
375 377
376 378 # FIXME: unpacking a message I've already unpacked, but didn't save:
377 379 idents,msg = self.session.feed_identities(raw_msg, copy=False)
378 380 msg = self.session.unpack_message(msg, copy=False, content=False)
379 381 header = msg['header']
380 382
381 383 try:
382 384 raise why()
383 385 except:
384 386 content = error.wrap_exception()
385 387
386 388 self.all_done.add(msg_id)
387 389 self.all_failed.add(msg_id)
388 390
389 391 msg = self.session.send(self.client_stream, 'apply_reply', content,
390 392 parent=header, ident=idents)
391 393 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
392 394
393 395 self.update_graph(msg_id, success=False)
394 396
395 397 @logged
396 398 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
397 399 """check location dependencies, and run if they are met."""
398 400 blacklist = self.blacklist.setdefault(msg_id, set())
399 401 if follow or targets or blacklist or self.hwm:
400 402 # we need a can_run filter
401 403 def can_run(idx):
402 404 # check hwm
403 405 if self.hwm and self.loads[idx] == self.hwm:
404 406 return False
405 407 target = self.targets[idx]
406 408 # check blacklist
407 409 if target in blacklist:
408 410 return False
409 411 # check targets
410 412 if targets and target not in targets:
411 413 return False
412 414 # check follow
413 415 return follow.check(self.completed[target], self.failed[target])
414 416
415 417 indices = filter(can_run, range(len(self.targets)))
416 418
417 419 if not indices:
418 420 # couldn't run
419 421 if follow.all:
420 422 # check follow for impossibility
421 423 dests = set()
422 424 relevant = set()
423 425 if follow.success:
424 426 relevant = self.all_completed
425 427 if follow.failure:
426 428 relevant = relevant.union(self.all_failed)
427 429 for m in follow.intersection(relevant):
428 430 dests.add(self.destinations[m])
429 431 if len(dests) > 1:
430 432 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
431 433 self.fail_unreachable(msg_id)
432 434 return False
433 435 if targets:
434 436 # check blacklist+targets for impossibility
435 437 targets.difference_update(blacklist)
436 438 if not targets or not targets.intersection(self.targets):
437 439 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
438 440 self.fail_unreachable(msg_id)
439 441 return False
440 442 return False
441 443 else:
442 444 indices = None
443 445
444 446 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
445 447 return True
446 448
447 449 @logged
448 450 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
449 451 """Save a message for later submission when its dependencies are met."""
450 452 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
451 453 # track the ids in follow or after, but not those already finished
452 454 for dep_id in after.union(follow).difference(self.all_done):
453 455 if dep_id not in self.graph:
454 456 self.graph[dep_id] = set()
455 457 self.graph[dep_id].add(msg_id)
456 458
457 459 @logged
458 460 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
459 461 """Submit a task to any of a subset of our targets."""
460 462 if indices:
461 463 loads = [self.loads[i] for i in indices]
462 464 else:
463 465 loads = self.loads
464 466 idx = self.scheme(loads)
465 467 if indices:
466 468 idx = indices[idx]
467 469 target = self.targets[idx]
468 470 # print (target, map(str, msg[:3]))
469 471 # send job to the engine
470 472 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
471 473 self.engine_stream.send_multipart(raw_msg, copy=False)
472 474 # update load
473 475 self.add_job(idx)
474 476 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
475 477 # notify Hub
476 478 content = dict(msg_id=msg_id, engine_id=target)
477 479 self.session.send(self.mon_stream, 'task_destination', content=content,
478 480 ident=['tracktask',self.session.session])
479 481
480 482
481 483 #-----------------------------------------------------------------------
482 484 # Result Handling
483 485 #-----------------------------------------------------------------------
484 486 @logged
485 487 def dispatch_result(self, raw_msg):
486 488 """dispatch method for result replies"""
487 489 try:
488 490 idents,msg = self.session.feed_identities(raw_msg, copy=False)
489 491 msg = self.session.unpack_message(msg, content=False, copy=False)
490 492 engine = idents[0]
491 493 try:
492 494 idx = self.targets.index(engine)
493 495 except ValueError:
494 496 pass # skip load-update for dead engines
495 497 else:
496 498 self.finish_job(idx)
497 499 except Exception:
498 500 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
499 501 return
500 502
501 503 header = msg['header']
502 504 parent = msg['parent_header']
503 505 if header.get('dependencies_met', True):
504 506 success = (header['status'] == 'ok')
505 507 msg_id = parent['msg_id']
506 508 retries = self.retries[msg_id]
507 509 if not success and retries > 0:
508 510 # failed
509 511 self.retries[msg_id] = retries - 1
510 512 self.handle_unmet_dependency(idents, parent)
511 513 else:
512 514 del self.retries[msg_id]
513 515 # relay to client and update graph
514 516 self.handle_result(idents, parent, raw_msg, success)
515 517 # send to Hub monitor
516 518 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
517 519 else:
518 520 self.handle_unmet_dependency(idents, parent)
519 521
520 522 @logged
521 523 def handle_result(self, idents, parent, raw_msg, success=True):
522 524 """handle a real task result, either success or failure"""
523 525 # first, relay result to client
524 526 engine = idents[0]
525 527 client = idents[1]
526 528 # swap_ids for XREP-XREP mirror
527 529 raw_msg[:2] = [client,engine]
528 530 # print (map(str, raw_msg[:4]))
529 531 self.client_stream.send_multipart(raw_msg, copy=False)
530 532 # now, update our data structures
531 533 msg_id = parent['msg_id']
532 534 self.blacklist.pop(msg_id, None)
533 535 self.pending[engine].pop(msg_id)
534 536 if success:
535 537 self.completed[engine].add(msg_id)
536 538 self.all_completed.add(msg_id)
537 539 else:
538 540 self.failed[engine].add(msg_id)
539 541 self.all_failed.add(msg_id)
540 542 self.all_done.add(msg_id)
541 543 self.destinations[msg_id] = engine
542 544
543 545 self.update_graph(msg_id, success)
544 546
545 547 @logged
546 548 def handle_unmet_dependency(self, idents, parent):
547 549 """handle an unmet dependency"""
548 550 engine = idents[0]
549 551 msg_id = parent['msg_id']
550 552
551 553 if msg_id not in self.blacklist:
552 554 self.blacklist[msg_id] = set()
553 555 self.blacklist[msg_id].add(engine)
554 556
555 557 args = self.pending[engine].pop(msg_id)
556 558 raw,targets,after,follow,timeout = args
557 559
558 560 if self.blacklist[msg_id] == targets:
559 561 self.depending[msg_id] = args
560 562 self.fail_unreachable(msg_id)
561 563 elif not self.maybe_run(msg_id, *args):
562 564 # resubmit failed
563 565 if msg_id not in self.all_failed:
564 566 # put it back in our dependency tree
565 567 self.save_unmet(msg_id, *args)
566 568
567 569 if self.hwm:
568 570 try:
569 571 idx = self.targets.index(engine)
570 572 except ValueError:
571 573 pass # skip load-update for dead engines
572 574 else:
573 575 if self.loads[idx] == self.hwm-1:
574 576 self.update_graph(None)
575 577
576 578
577 579
578 580 @logged
579 581 def update_graph(self, dep_id=None, success=True):
580 582 """dep_id just finished. Update our dependency
581 583 graph and submit any jobs that just became runable.
582 584
583 585 Called with dep_id=None to update entire graph for hwm, but without finishing
584 586 a task.
585 587 """
586 588 # print ("\n\n***********")
587 589 # pprint (dep_id)
588 590 # pprint (self.graph)
589 591 # pprint (self.depending)
590 592 # pprint (self.all_completed)
591 593 # pprint (self.all_failed)
592 594 # print ("\n\n***********\n\n")
593 595 # update any jobs that depended on the dependency
594 596 jobs = self.graph.pop(dep_id, [])
595 597
596 598 # recheck *all* jobs if
597 599 # a) we have HWM and an engine just become no longer full
598 600 # or b) dep_id was given as None
599 601 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
600 602 jobs = self.depending.keys()
601 603
602 604 for msg_id in jobs:
603 605 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
604 606
605 607 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
606 608 self.fail_unreachable(msg_id)
607 609
608 610 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
609 611 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
610 612
611 613 self.depending.pop(msg_id)
612 614 for mid in follow.union(after):
613 615 if mid in self.graph:
614 616 self.graph[mid].remove(msg_id)
615 617
616 618 #----------------------------------------------------------------------
617 619 # methods to be overridden by subclasses
618 620 #----------------------------------------------------------------------
619 621
620 622 def add_job(self, idx):
621 623 """Called after self.targets[idx] just got the job with header.
622 624 Override with subclasses. The default ordering is simple LRU.
623 625 The default loads are the number of outstanding jobs."""
624 626 self.loads[idx] += 1
625 627 for lis in (self.targets, self.loads):
626 628 lis.append(lis.pop(idx))
627 629
628 630
629 631 def finish_job(self, idx):
630 632 """Called after self.targets[idx] just finished a job.
631 633 Override with subclasses."""
632 634 self.loads[idx] -= 1
633 635
634 636
635 637
636 638 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
637 log_addr=None, loglevel=logging.DEBUG,
639 log_url=None, loglevel=logging.DEBUG,
638 640 identity=b'task'):
639 641 from zmq.eventloop import ioloop
640 642 from zmq.eventloop.zmqstream import ZMQStream
641 643
642 644 if config:
643 645 # unwrap dict back into Config
644 646 config = Config(config)
645 647
646 648 ctx = zmq.Context()
647 649 loop = ioloop.IOLoop()
648 650 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
649 651 ins.setsockopt(zmq.IDENTITY, identity)
650 652 ins.bind(in_addr)
651 653
652 654 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
653 655 outs.setsockopt(zmq.IDENTITY, identity)
654 656 outs.bind(out_addr)
655 657 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
656 658 mons.connect(mon_addr)
657 659 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
658 660 nots.setsockopt(zmq.SUBSCRIBE, '')
659 661 nots.connect(not_addr)
660 662
661 # scheme = globals().get(scheme, None)
662 # setup logging
663 if log_addr:
664 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
663 # setup logging. Note that these will not work in-process, because they clobber
664 # existing loggers.
665 if log_url:
666 connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
665 667 else:
666 668 local_logger(logname, loglevel)
667 669
668 670 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
669 671 mon_stream=mons, notifier_stream=nots,
670 672 loop=loop, logname=logname,
671 673 config=config)
672 674 scheduler.start()
673 675 try:
674 676 loop.start()
675 677 except KeyboardInterrupt:
676 678 print ("interrupted, exiting...", file=sys.__stderr__)
677 679
General Comments 0
You need to be logged in to leave comments. Login now