##// END OF EJS Templates
parallel docs, tests, default config updated to newconfig
MinRK -
Show More
@@ -1,544 +1,544
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import os
21 21 import logging
22 22 import re
23 23 import shutil
24 24 import sys
25 25
26 26 from subprocess import Popen, PIPE
27 27
28 28 from IPython.config.loader import PyFileConfigLoader, Config
29 29 from IPython.config.configurable import Configurable
30 30 from IPython.config.application import Application
31 31 from IPython.core.crashhandler import CrashHandler
32 32 from IPython.core.newapplication import BaseIPythonApplication
33 33 from IPython.core import release
34 34 from IPython.utils.path import (
35 35 get_ipython_package_dir,
36 36 get_ipython_dir,
37 37 expand_path
38 38 )
39 39 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Module errors
43 43 #-----------------------------------------------------------------------------
44 44
45 45 class ClusterDirError(Exception):
46 46 pass
47 47
48 48
49 49 class PIDFileError(Exception):
50 50 pass
51 51
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Class for managing cluster directories
55 55 #-----------------------------------------------------------------------------
56 56
57 57 class ClusterDir(Configurable):
58 58 """An object to manage the cluster directory and its resources.
59 59
60 60 The cluster directory is used by :command:`ipengine`,
61 61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
62 62 configuration, logging and security of these applications.
63 63
64 64 This object knows how to find, create and manage these directories. This
65 65 should be used by any code that want's to handle cluster directories.
66 66 """
67 67
68 68 security_dir_name = Unicode('security')
69 69 log_dir_name = Unicode('log')
70 70 pid_dir_name = Unicode('pid')
71 71 security_dir = Unicode(u'')
72 72 log_dir = Unicode(u'')
73 73 pid_dir = Unicode(u'')
74 74
75 75 auto_create = Bool(False,
76 76 help="""Whether to automatically create the ClusterDirectory if it does
77 77 not exist""")
78 78 overwrite = Bool(False,
79 79 help="""Whether to overwrite existing config files""")
80 80 location = Unicode(u'', config=True,
81 81 help="""Set the cluster dir. This overrides the logic used by the
82 82 `profile` option.""",
83 83 )
84 84 profile = Unicode(u'default', config=True,
85 85 help="""The string name of the profile to be used. This determines the name
86 86 of the cluster dir as: cluster_<profile>. The default profile is named
87 87 'default'. The cluster directory is resolve this way if the
88 88 `cluster_dir` option is not used."""
89 89 )
90 90
91 91 _location_isset = Bool(False) # flag for detecting multiply set location
92 92 _new_dir = Bool(False) # flag for whether a new dir was created
93 93
94 94 def __init__(self, **kwargs):
95 95 # make sure auto_create,overwrite are set *before* location
96 96 for name in ('auto_create', 'overwrite'):
97 97 v = kwargs.pop(name, None)
98 98 if v is not None:
99 99 setattr(self, name, v)
100 100 super(ClusterDir, self).__init__(**kwargs)
101 101 if not self.location:
102 102 self._profile_changed('profile', 'default', self.profile)
103 103
104 104 def _location_changed(self, name, old, new):
105 105 if self._location_isset:
106 106 raise RuntimeError("Cannot set ClusterDir more than once.")
107 107 self._location_isset = True
108 108 if not os.path.isdir(new):
109 109 if self.auto_create:# or self.config.ClusterDir.auto_create:
110 110 os.makedirs(new)
111 111 self._new_dir = True
112 112 else:
113 113 raise ClusterDirError('Directory not found: %s' % new)
114 114
115 115 # ensure config files exist:
116 116 self.copy_all_config_files(overwrite=self.overwrite)
117 117 self.security_dir = os.path.join(new, self.security_dir_name)
118 118 self.log_dir = os.path.join(new, self.log_dir_name)
119 119 self.pid_dir = os.path.join(new, self.pid_dir_name)
120 120 self.check_dirs()
121 121
122 122 def _profile_changed(self, name, old, new):
123 123 if self._location_isset:
124 124 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
125 125 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
126 126
127 127 def _log_dir_changed(self, name, old, new):
128 128 self.check_log_dir()
129 129
130 130 def check_log_dir(self):
131 131 if not os.path.isdir(self.log_dir):
132 132 os.mkdir(self.log_dir)
133 133
134 134 def _security_dir_changed(self, name, old, new):
135 135 self.check_security_dir()
136 136
137 137 def check_security_dir(self):
138 138 if not os.path.isdir(self.security_dir):
139 139 os.mkdir(self.security_dir, 0700)
140 140 os.chmod(self.security_dir, 0700)
141 141
142 142 def _pid_dir_changed(self, name, old, new):
143 143 self.check_pid_dir()
144 144
145 145 def check_pid_dir(self):
146 146 if not os.path.isdir(self.pid_dir):
147 147 os.mkdir(self.pid_dir, 0700)
148 148 os.chmod(self.pid_dir, 0700)
149 149
150 150 def check_dirs(self):
151 151 self.check_security_dir()
152 152 self.check_log_dir()
153 153 self.check_pid_dir()
154 154
155 155 def copy_config_file(self, config_file, path=None, overwrite=False):
156 156 """Copy a default config file into the active cluster directory.
157 157
158 158 Default configuration files are kept in :mod:`IPython.config.default`.
159 159 This function moves these from that location to the working cluster
160 160 directory.
161 161 """
162 162 if path is None:
163 163 import IPython.config.default
164 164 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
165 165 path = os.path.sep.join(path)
166 166 src = os.path.join(path, config_file)
167 167 dst = os.path.join(self.location, config_file)
168 168 if not os.path.isfile(dst) or overwrite:
169 169 shutil.copy(src, dst)
170 170
171 171 def copy_all_config_files(self, path=None, overwrite=False):
172 172 """Copy all config files into the active cluster directory."""
173 173 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
174 174 u'ipcluster_config.py']:
175 175 self.copy_config_file(f, path=path, overwrite=overwrite)
176 176
177 177 @classmethod
178 178 def create_cluster_dir(csl, cluster_dir):
179 179 """Create a new cluster directory given a full path.
180 180
181 181 Parameters
182 182 ----------
183 183 cluster_dir : str
184 184 The full path to the cluster directory. If it does exist, it will
185 185 be used. If not, it will be created.
186 186 """
187 187 return ClusterDir(location=cluster_dir)
188 188
189 189 @classmethod
190 190 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
191 191 """Create a cluster dir by profile name and path.
192 192
193 193 Parameters
194 194 ----------
195 195 path : str
196 196 The path (directory) to put the cluster directory in.
197 197 profile : str
198 198 The name of the profile. The name of the cluster directory will
199 199 be "cluster_<profile>".
200 200 """
201 201 if not os.path.isdir(path):
202 202 raise ClusterDirError('Directory not found: %s' % path)
203 203 cluster_dir = os.path.join(path, u'cluster_' + profile)
204 204 return ClusterDir(location=cluster_dir)
205 205
206 206 @classmethod
207 207 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
208 208 """Find an existing cluster dir by profile name, return its ClusterDir.
209 209
210 210 This searches through a sequence of paths for a cluster dir. If it
211 211 is not found, a :class:`ClusterDirError` exception will be raised.
212 212
213 213 The search path algorithm is:
214 214 1. ``os.getcwd()``
215 215 2. ``ipython_dir``
216 216 3. The directories found in the ":" separated
217 217 :env:`IPCLUSTER_DIR_PATH` environment variable.
218 218
219 219 Parameters
220 220 ----------
221 221 ipython_dir : unicode or str
222 222 The IPython directory to use.
223 223 profile : unicode or str
224 224 The name of the profile. The name of the cluster directory
225 225 will be "cluster_<profile>".
226 226 """
227 227 dirname = u'cluster_' + profile
228 228 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
229 229 if cluster_dir_paths:
230 230 cluster_dir_paths = cluster_dir_paths.split(':')
231 231 else:
232 232 cluster_dir_paths = []
233 233 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
234 234 for p in paths:
235 235 cluster_dir = os.path.join(p, dirname)
236 236 if os.path.isdir(cluster_dir):
237 237 return ClusterDir(location=cluster_dir)
238 238 else:
239 239 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
240 240
241 241 @classmethod
242 242 def find_cluster_dir(cls, cluster_dir):
243 243 """Find/create a cluster dir and return its ClusterDir.
244 244
245 245 This will create the cluster directory if it doesn't exist.
246 246
247 247 Parameters
248 248 ----------
249 249 cluster_dir : unicode or str
250 250 The path of the cluster directory. This is expanded using
251 251 :func:`IPython.utils.genutils.expand_path`.
252 252 """
253 253 cluster_dir = expand_path(cluster_dir)
254 254 if not os.path.isdir(cluster_dir):
255 255 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
256 256 return ClusterDir(location=cluster_dir)
257 257
258 258
259 259 #-----------------------------------------------------------------------------
260 260 # Crash handler for this application
261 261 #-----------------------------------------------------------------------------
262 262
263 263
264 264 _message_template = """\
265 265 Oops, $self.app_name crashed. We do our best to make it stable, but...
266 266
267 267 A crash report was automatically generated with the following information:
268 268 - A verbatim copy of the crash traceback.
269 269 - Data on your current $self.app_name configuration.
270 270
271 271 It was left in the file named:
272 272 \t'$self.crash_report_fname'
273 273 If you can email this file to the developers, the information in it will help
274 274 them in understanding and correcting the problem.
275 275
276 276 You can mail it to: $self.contact_name at $self.contact_email
277 277 with the subject '$self.app_name Crash Report'.
278 278
279 279 If you want to do it now, the following command will work (under Unix):
280 280 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
281 281
282 282 To ensure accurate tracking of this issue, please file a report about it at:
283 283 $self.bug_tracker
284 284 """
285 285
286 286 class ClusterDirCrashHandler(CrashHandler):
287 287 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
288 288
289 289 message_template = _message_template
290 290
291 291 def __init__(self, app):
292 292 contact_name = release.authors['Min'][0]
293 293 contact_email = release.authors['Min'][1]
294 294 bug_tracker = 'http://github.com/ipython/ipython/issues'
295 295 super(ClusterDirCrashHandler,self).__init__(
296 296 app, contact_name, contact_email, bug_tracker
297 297 )
298 298
299 299
300 300 #-----------------------------------------------------------------------------
301 301 # Main application
302 302 #-----------------------------------------------------------------------------
303 303 base_aliases = {
304 304 'profile' : "ClusterDir.profile",
305 305 'cluster_dir' : 'ClusterDir.location',
306 306 'auto_create' : 'ClusterDirApplication.auto_create',
307 307 'log_level' : 'ClusterApplication.log_level',
308 308 'work_dir' : 'ClusterApplication.work_dir',
309 309 'log_to_file' : 'ClusterApplication.log_to_file',
310 310 'clean_logs' : 'ClusterApplication.clean_logs',
311 311 'log_url' : 'ClusterApplication.log_url',
312 312 }
313 313
314 314 base_flags = {
315 315 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
316 316 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
317 317 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
318 318 }
319 319 for k,v in base_flags.iteritems():
320 320 base_flags[k] = (Config(v[0]),v[1])
321 321
322 322 class ClusterApplication(BaseIPythonApplication):
323 323 """An application that puts everything into a cluster directory.
324 324
325 325 Instead of looking for things in the ipython_dir, this type of application
326 326 will use its own private directory called the "cluster directory"
327 327 for things like config files, log files, etc.
328 328
329 329 The cluster directory is resolved as follows:
330 330
331 * If the ``--cluster-dir`` option is given, it is used.
332 * If ``--cluster-dir`` is not given, the application directory is
331 * If the ``cluster_dir`` option is given, it is used.
332 * If ``cluster_dir`` is not given, the application directory is
333 333 resolve using the profile name as ``cluster_<profile>``. The search
334 334 path for this directory is then i) cwd if it is found there
335 335 and ii) in ipython_dir otherwise.
336 336
337 337 The config file for the application is to be put in the cluster
338 338 dir and named the value of the ``config_file_name`` class attribute.
339 339 """
340 340
341 341 crash_handler_class = ClusterDirCrashHandler
342 342 auto_create_cluster_dir = Bool(True, config=True,
343 343 help="whether to create the cluster_dir if it doesn't exist")
344 344 cluster_dir = Instance(ClusterDir)
345 345 classes = [ClusterDir]
346 346
347 347 def _log_level_default(self):
348 348 # temporarily override default_log_level to INFO
349 349 return logging.INFO
350 350
351 351 work_dir = Unicode(os.getcwdu(), config=True,
352 352 help='Set the working dir for the process.'
353 353 )
354 354 def _work_dir_changed(self, name, old, new):
355 355 self.work_dir = unicode(expand_path(new))
356 356
357 357 log_to_file = Bool(config=True,
358 358 help="whether to log to a file")
359 359
360 360 clean_logs = Bool(False, shortname='--clean-logs', config=True,
361 361 help="whether to cleanup old logfiles before starting")
362 362
363 363 log_url = Unicode('', shortname='--log-url', config=True,
364 364 help="The ZMQ URL of the iplogger to aggregate logging.")
365 365
366 366 config_file = Unicode(u'', config=True,
367 367 help="""Path to ipcontroller configuration file. The default is to use
368 368 <appname>_config.py, as found by cluster-dir."""
369 369 )
370 370
371 371 loop = Instance('zmq.eventloop.ioloop.IOLoop')
372 372 def _loop_default(self):
373 373 from zmq.eventloop.ioloop import IOLoop
374 374 return IOLoop.instance()
375 375
376 376 aliases = Dict(base_aliases)
377 377 flags = Dict(base_flags)
378 378
379 379 def init_clusterdir(self):
380 380 """This resolves the cluster directory.
381 381
382 382 This tries to find the cluster directory and if successful, it will
383 383 have done:
384 384 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
385 385 the application.
386 386 * Sets ``self.cluster_dir`` attribute of the application and config
387 387 objects.
388 388
389 389 The algorithm used for this is as follows:
390 390 1. Try ``Global.cluster_dir``.
391 391 2. Try using ``Global.profile``.
392 392 3. If both of these fail and ``self.auto_create_cluster_dir`` is
393 393 ``True``, then create the new cluster dir in the IPython directory.
394 394 4. If all fails, then raise :class:`ClusterDirError`.
395 395 """
396 396 try:
397 397 self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config)
398 398 except ClusterDirError as e:
399 399 self.log.fatal("Error initializing cluster dir: %s"%e)
400 400 self.log.fatal("A cluster dir must be created before running this command.")
401 401 self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more "
402 402 "information about creating and listing cluster dirs."
403 403 )
404 404 self.exit(1)
405 405
406 406 if self.cluster_dir._new_dir:
407 407 self.log.info('Creating new cluster dir: %s' % \
408 408 self.cluster_dir.location)
409 409 else:
410 410 self.log.info('Using existing cluster dir: %s' % \
411 411 self.cluster_dir.location)
412 412
413 413 def initialize(self, argv=None):
414 414 """initialize the app"""
415 415 self.init_crash_handler()
416 416 self.parse_command_line(argv)
417 417 cl_config = self.config
418 418 self.init_clusterdir()
419 419 if self.config_file:
420 420 self.load_config_file(self.config_file)
421 421 elif self.default_config_file_name:
422 422 try:
423 423 self.load_config_file(self.default_config_file_name,
424 424 path=self.cluster_dir.location)
425 425 except IOError:
426 426 self.log.warn("Warning: Default config file not found")
427 427 # command-line should *override* config file, but command-line is necessary
428 428 # to determine clusterdir, etc.
429 429 self.update_config(cl_config)
430 430 self.to_work_dir()
431 431 self.reinit_logging()
432 432
433 433 def to_work_dir(self):
434 434 wd = self.work_dir
435 435 if unicode(wd) != os.getcwdu():
436 436 os.chdir(wd)
437 437 self.log.info("Changing to working dir: %s" % wd)
438 438 # This is the working dir by now.
439 439 sys.path.insert(0, '')
440 440
441 441 def load_config_file(self, filename, path=None):
442 442 """Load a .py based config file by filename and path."""
443 443 # use config.application.Application.load_config
444 444 # instead of inflexible core.newapplication.BaseIPythonApplication.load_config
445 445 return Application.load_config_file(self, filename, path=path)
446 446 #
447 447 # def load_default_config_file(self):
448 448 # """Load a .py based config file by filename and path."""
449 449 # return BaseIPythonApplication.load_config_file(self)
450 450
451 451 # disable URL-logging
452 452 def reinit_logging(self):
453 453 # Remove old log files
454 454 log_dir = self.cluster_dir.log_dir
455 455 if self.clean_logs:
456 456 for f in os.listdir(log_dir):
457 457 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
458 458 os.remove(os.path.join(log_dir, f))
459 459 if self.log_to_file:
460 460 # Start logging to the new log file
461 461 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
462 462 logfile = os.path.join(log_dir, log_filename)
463 463 open_log_file = open(logfile, 'w')
464 464 else:
465 465 open_log_file = None
466 466 if open_log_file is not None:
467 467 self.log.removeHandler(self._log_handler)
468 468 self._log_handler = logging.StreamHandler(open_log_file)
469 469 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
470 470 self._log_handler.setFormatter(self._log_formatter)
471 471 self.log.addHandler(self._log_handler)
472 472
473 473 def write_pid_file(self, overwrite=False):
474 474 """Create a .pid file in the pid_dir with my pid.
475 475
476 476 This must be called after pre_construct, which sets `self.pid_dir`.
477 477 This raises :exc:`PIDFileError` if the pid file exists already.
478 478 """
479 479 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
480 480 if os.path.isfile(pid_file):
481 481 pid = self.get_pid_from_file()
482 482 if not overwrite:
483 483 raise PIDFileError(
484 484 'The pid file [%s] already exists. \nThis could mean that this '
485 485 'server is already running with [pid=%s].' % (pid_file, pid)
486 486 )
487 487 with open(pid_file, 'w') as f:
488 488 self.log.info("Creating pid file: %s" % pid_file)
489 489 f.write(repr(os.getpid())+'\n')
490 490
491 491 def remove_pid_file(self):
492 492 """Remove the pid file.
493 493
494 494 This should be called at shutdown by registering a callback with
495 495 :func:`reactor.addSystemEventTrigger`. This needs to return
496 496 ``None``.
497 497 """
498 498 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
499 499 if os.path.isfile(pid_file):
500 500 try:
501 501 self.log.info("Removing pid file: %s" % pid_file)
502 502 os.remove(pid_file)
503 503 except:
504 504 self.log.warn("Error removing the pid file: %s" % pid_file)
505 505
506 506 def get_pid_from_file(self):
507 507 """Get the pid from the pid file.
508 508
509 509 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
510 510 """
511 511 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
512 512 if os.path.isfile(pid_file):
513 513 with open(pid_file, 'r') as f:
514 514 pid = int(f.read().strip())
515 515 return pid
516 516 else:
517 517 raise PIDFileError('pid file not found: %s' % pid_file)
518 518
519 519 def check_pid(self, pid):
520 520 if os.name == 'nt':
521 521 try:
522 522 import ctypes
523 523 # returns 0 if no such process (of ours) exists
524 524 # positive int otherwise
525 525 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
526 526 except Exception:
527 527 self.log.warn(
528 528 "Could not determine whether pid %i is running via `OpenProcess`. "
529 529 " Making the likely assumption that it is."%pid
530 530 )
531 531 return True
532 532 return bool(p)
533 533 else:
534 534 try:
535 535 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
536 536 output,_ = p.communicate()
537 537 except OSError:
538 538 self.log.warn(
539 539 "Could not determine whether pid %i is running via `ps x`. "
540 540 " Making the likely assumption that it is."%pid
541 541 )
542 542 return True
543 543 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
544 544 return pid in pids
@@ -1,537 +1,542
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import errno
19 19 import logging
20 20 import os
21 21 import re
22 22 import signal
23 23
24 24 from subprocess import check_call, CalledProcessError, PIPE
25 25 import zmq
26 26 from zmq.eventloop import ioloop
27 27
28 28 from IPython.config.application import Application, boolean_flag
29 29 from IPython.config.loader import Config
30 30 from IPython.core.newapplication import BaseIPythonApplication
31 31 from IPython.utils.importstring import import_item
32 32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
33 33
34 34 from IPython.parallel.apps.clusterdir import (
35 35 ClusterApplication, ClusterDirError, ClusterDir,
36 36 PIDFileError,
37 37 base_flags, base_aliases
38 38 )
39 39
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Module level variables
43 43 #-----------------------------------------------------------------------------
44 44
45 45
46 46 default_config_file_name = u'ipcluster_config.py'
47 47
48 48
49 _description = """\
50 Start an IPython cluster for parallel computing.\n\n
49 _description = """Start an IPython cluster for parallel computing.
51 50
52 51 An IPython cluster consists of 1 controller and 1 or more engines.
53 52 This command automates the startup of these processes using a wide
54 53 range of startup methods (SSH, local processes, PBS, mpiexec,
55 54 Windows HPC Server 2008). To start a cluster with 4 engines on your
56 55 local host simply do 'ipcluster start n=4'. For more complex usage
57 56 you will typically do 'ipcluster create profile=mycluster', then edit
58 57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
59 58 """
60 59
61 60
62 61 # Exit codes for ipcluster
63 62
64 63 # This will be the exit code if the ipcluster appears to be running because
65 64 # a .pid file exists
66 65 ALREADY_STARTED = 10
67 66
68 67
69 68 # This will be the exit code if ipcluster stop is run, but there is not .pid
70 69 # file to be found.
71 70 ALREADY_STOPPED = 11
72 71
73 72 # This will be the exit code if ipcluster engines is run, but there is not .pid
74 73 # file to be found.
75 74 NO_CLUSTER = 12
76 75
77 76
78 77 #-----------------------------------------------------------------------------
79 78 # Main application
80 79 #-----------------------------------------------------------------------------
81 start_help = """
80 start_help = """Start an IPython cluster for parallel computing
81
82 82 Start an ipython cluster by its profile name or cluster
83 83 directory. Cluster directories contain configuration, log and
84 84 security related files and are named using the convention
85 85 'cluster_<profile>' and should be creating using the 'start'
86 86 subcommand of 'ipcluster'. If your cluster directory is in
87 87 the cwd or the ipython directory, you can simply refer to it
88 88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
89 89 otherwise use the 'cluster_dir' option.
90 90 """
91 stop_help = """
91 stop_help = """Stop a running IPython cluster
92
92 93 Stop a running ipython cluster by its profile name or cluster
93 94 directory. Cluster directories are named using the convention
94 95 'cluster_<profile>'. If your cluster directory is in
95 96 the cwd or the ipython directory, you can simply refer to it
96 97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
97 98 use the 'cluster_dir' option.
98 99 """
99 engines_help = """
100 engines_help = """Start engines connected to an existing IPython cluster
101
100 102 Start one or more engines to connect to an existing Cluster
101 103 by profile name or cluster directory.
102 104 Cluster directories contain configuration, log and
103 105 security related files and are named using the convention
104 106 'cluster_<profile>' and should be creating using the 'start'
105 107 subcommand of 'ipcluster'. If your cluster directory is in
106 108 the cwd or the ipython directory, you can simply refer to it
107 109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
108 110 otherwise use the 'cluster_dir' option.
109 111 """
110 create_help = """
112 create_help = """Create an ipcluster profile by name
113
111 114 Create an ipython cluster directory by its profile name or
112 115 cluster directory path. Cluster directories contain
113 116 configuration, log and security related files and are named
114 117 using the convention 'cluster_<profile>'. By default they are
115 118 located in your ipython directory. Once created, you will
116 119 probably need to edit the configuration files in the cluster
117 120 directory to configure your cluster. Most users will create a
118 121 cluster directory by profile name,
119 122 `ipcluster create profile=mycluster`, which will put the directory
120 123 in `<ipython_dir>/cluster_mycluster`.
121 124 """
122 list_help = """List all available clusters, by cluster directory, that can
125 list_help = """List available cluster profiles
126
127 List all available clusters, by cluster directory, that can
123 128 be found in the current working directly or in the ipython
124 129 directory. Cluster directories are named using the convention
125 130 'cluster_<profile>'.
126 131 """
127 132
128 133
129 134 class IPClusterList(BaseIPythonApplication):
130 135 name = u'ipcluster-list'
131 136 description = list_help
132 137
133 138 # empty aliases
134 139 aliases=Dict()
135 140 flags = Dict(base_flags)
136 141
137 142 def _log_level_default(self):
138 143 return 20
139 144
140 145 def list_cluster_dirs(self):
141 146 # Find the search paths
142 147 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
143 148 if cluster_dir_paths:
144 149 cluster_dir_paths = cluster_dir_paths.split(':')
145 150 else:
146 151 cluster_dir_paths = []
147 152
148 153 ipython_dir = self.ipython_dir
149 154
150 155 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
151 156 paths = list(set(paths))
152 157
153 158 self.log.info('Searching for cluster dirs in paths: %r' % paths)
154 159 for path in paths:
155 160 files = os.listdir(path)
156 161 for f in files:
157 162 full_path = os.path.join(path, f)
158 163 if os.path.isdir(full_path) and f.startswith('cluster_'):
159 164 profile = full_path.split('_')[-1]
160 165 start_cmd = 'ipcluster start profile=%s n=4' % profile
161 166 print start_cmd + " ==> " + full_path
162 167
163 168 def start(self):
164 169 self.list_cluster_dirs()
165 170
166 171 create_flags = {}
167 172 create_flags.update(base_flags)
168 173 create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
169 174 "reset config files to defaults", "leave existing config files"))
170 175
171 176 class IPClusterCreate(ClusterApplication):
172 177 name = u'ipcluster'
173 178 description = create_help
174 179 auto_create_cluster_dir = Bool(True,
175 180 help="whether to create the cluster_dir if it doesn't exist")
176 181 default_config_file_name = default_config_file_name
177 182
178 183 reset = Bool(False, config=True,
179 184 help="Whether to reset config files as part of 'create'."
180 185 )
181 186
182 187 flags = Dict(create_flags)
183 188
184 189 aliases = Dict(dict(profile='ClusterDir.profile'))
185 190
186 191 classes = [ClusterDir]
187 192
188 193 def init_clusterdir(self):
189 194 super(IPClusterCreate, self).init_clusterdir()
190 195 self.log.info('Copying default config files to cluster directory '
191 196 '[overwrite=%r]' % (self.reset,))
192 197 self.cluster_dir.copy_all_config_files(overwrite=self.reset)
193 198
194 199 def initialize(self, argv=None):
195 200 self.parse_command_line(argv)
196 201 self.init_clusterdir()
197 202
198 203 stop_aliases = dict(
199 204 signal='IPClusterStop.signal',
200 205 profile='ClusterDir.profile',
201 206 cluster_dir='ClusterDir.location',
202 207 )
203 208
204 209 class IPClusterStop(ClusterApplication):
205 210 name = u'ipcluster'
206 211 description = stop_help
207 212 auto_create_cluster_dir = Bool(False)
208 213 default_config_file_name = default_config_file_name
209 214
210 215 signal = Int(signal.SIGINT, config=True,
211 216 help="signal to use for stopping processes.")
212 217
213 218 aliases = Dict(stop_aliases)
214 219
215 220 def init_clusterdir(self):
216 221 try:
217 222 super(IPClusterStop, self).init_clusterdir()
218 223 except ClusterDirError as e:
219 224 self.log.fatal("Failed ClusterDir init: %s"%e)
220 225 self.exit(1)
221 226
222 227 def start(self):
223 228 """Start the app for the stop subcommand."""
224 229 try:
225 230 pid = self.get_pid_from_file()
226 231 except PIDFileError:
227 232 self.log.critical(
228 233 'Could not read pid file, cluster is probably not running.'
229 234 )
230 235 # Here I exit with a unusual exit status that other processes
231 236 # can watch for to learn how I existed.
232 237 self.remove_pid_file()
233 238 self.exit(ALREADY_STOPPED)
234 239
235 240 if not self.check_pid(pid):
236 241 self.log.critical(
237 242 'Cluster [pid=%r] is not running.' % pid
238 243 )
239 244 self.remove_pid_file()
240 245 # Here I exit with a unusual exit status that other processes
241 246 # can watch for to learn how I existed.
242 247 self.exit(ALREADY_STOPPED)
243 248
244 249 elif os.name=='posix':
245 250 sig = self.signal
246 251 self.log.info(
247 252 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
248 253 )
249 254 try:
250 255 os.kill(pid, sig)
251 256 except OSError:
252 257 self.log.error("Stopping cluster failed, assuming already dead.",
253 258 exc_info=True)
254 259 self.remove_pid_file()
255 260 elif os.name=='nt':
256 261 try:
257 262 # kill the whole tree
258 263 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
259 264 except (CalledProcessError, OSError):
260 265 self.log.error("Stopping cluster failed, assuming already dead.",
261 266 exc_info=True)
262 267 self.remove_pid_file()
263 268
264 269 engine_aliases = {}
265 270 engine_aliases.update(base_aliases)
266 271 engine_aliases.update(dict(
267 272 n='IPClusterEngines.n',
268 273 elauncher = 'IPClusterEngines.engine_launcher_class',
269 274 ))
270 275 class IPClusterEngines(ClusterApplication):
271 276
272 277 name = u'ipcluster'
273 278 description = engines_help
274 279 usage = None
275 280 default_config_file_name = default_config_file_name
276 281 default_log_level = logging.INFO
277 282 auto_create_cluster_dir = Bool(False)
278 283 classes = List()
279 284 def _classes_default(self):
280 285 from IPython.parallel.apps import launcher
281 286 launchers = launcher.all_launchers
282 287 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
283 288 return [ClusterDir]+eslaunchers
284 289
285 290 n = Int(2, config=True,
286 291 help="The number of engines to start.")
287 292
288 293 engine_launcher_class = Unicode('LocalEngineSetLauncher',
289 294 config=True,
290 295 help="The class for launching a set of Engines."
291 296 )
292 297 daemonize = Bool(False, config=True,
293 298 help='Daemonize the ipcluster program. This implies --log-to-file')
294 299
295 300 def _daemonize_changed(self, name, old, new):
296 301 if new:
297 302 self.log_to_file = True
298 303
299 304 aliases = Dict(engine_aliases)
300 305 # flags = Dict(flags)
301 306 _stopping = False
302 307
303 308 def initialize(self, argv=None):
304 309 super(IPClusterEngines, self).initialize(argv)
305 310 self.init_signal()
306 311 self.init_launchers()
307 312
308 313 def init_launchers(self):
309 314 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
310 315 self.engine_launcher.on_stop(lambda r: self.loop.stop())
311 316
312 317 def init_signal(self):
313 318 # Setup signals
314 319 signal.signal(signal.SIGINT, self.sigint_handler)
315 320
316 321 def build_launcher(self, clsname):
317 322 """import and instantiate a Launcher based on importstring"""
318 323 if '.' not in clsname:
319 324 # not a module, presume it's the raw name in apps.launcher
320 325 clsname = 'IPython.parallel.apps.launcher.'+clsname
321 326 # print repr(clsname)
322 327 klass = import_item(clsname)
323 328
324 329 launcher = klass(
325 330 work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
326 331 )
327 332 return launcher
328 333
329 334 def start_engines(self):
330 335 self.log.info("Starting %i engines"%self.n)
331 336 self.engine_launcher.start(
332 337 self.n,
333 338 cluster_dir=self.cluster_dir.location
334 339 )
335 340
336 341 def stop_engines(self):
337 342 self.log.info("Stopping Engines...")
338 343 if self.engine_launcher.running:
339 344 d = self.engine_launcher.stop()
340 345 return d
341 346 else:
342 347 return None
343 348
344 349 def stop_launchers(self, r=None):
345 350 if not self._stopping:
346 351 self._stopping = True
347 352 self.log.error("IPython cluster: stopping")
348 353 self.stop_engines()
349 354 # Wait a few seconds to let things shut down.
350 355 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
351 356 dc.start()
352 357
353 358 def sigint_handler(self, signum, frame):
354 359 self.log.debug("SIGINT received, stopping launchers...")
355 360 self.stop_launchers()
356 361
357 362 def start_logging(self):
358 363 # Remove old log files of the controller and engine
359 364 if self.clean_logs:
360 365 log_dir = self.cluster_dir.log_dir
361 366 for f in os.listdir(log_dir):
362 367 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
363 368 os.remove(os.path.join(log_dir, f))
364 369 # This will remove old log files for ipcluster itself
365 370 # super(IPClusterApp, self).start_logging()
366 371
367 372 def start(self):
368 373 """Start the app for the engines subcommand."""
369 374 self.log.info("IPython cluster: started")
370 375 # First see if the cluster is already running
371 376
372 377 # Now log and daemonize
373 378 self.log.info(
374 379 'Starting engines with [daemon=%r]' % self.daemonize
375 380 )
376 381 # TODO: Get daemonize working on Windows or as a Windows Server.
377 382 if self.daemonize:
378 383 if os.name=='posix':
379 384 from twisted.scripts._twistd_unix import daemonize
380 385 daemonize()
381 386
382 387 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
383 388 dc.start()
384 389 # Now write the new pid file AFTER our new forked pid is active.
385 390 # self.write_pid_file()
386 391 try:
387 392 self.loop.start()
388 393 except KeyboardInterrupt:
389 394 pass
390 395 except zmq.ZMQError as e:
391 396 if e.errno == errno.EINTR:
392 397 pass
393 398 else:
394 399 raise
395 400
396 401 start_aliases = {}
397 402 start_aliases.update(engine_aliases)
398 403 start_aliases.update(dict(
399 404 delay='IPClusterStart.delay',
400 405 clean_logs='IPClusterStart.clean_logs',
401 406 ))
402 407
403 408 class IPClusterStart(IPClusterEngines):
404 409
405 410 name = u'ipcluster'
406 411 description = start_help
407 412 usage = None
408 413 default_config_file_name = default_config_file_name
409 414 default_log_level = logging.INFO
410 415 auto_create_cluster_dir = Bool(True, config=True,
411 416 help="whether to create the cluster_dir if it doesn't exist")
412 417 classes = List()
413 418 def _classes_default(self,):
414 419 from IPython.parallel.apps import launcher
415 420 return [ClusterDir]+launcher.all_launchers
416 421
417 422 clean_logs = Bool(True, config=True,
418 423 help="whether to cleanup old logs before starting")
419 424
420 425 delay = CFloat(1., config=True,
421 426 help="delay (in s) between starting the controller and the engines")
422 427
423 428 controller_launcher_class = Unicode('LocalControllerLauncher',
424 429 config=True,
425 430 help="The class for launching a Controller."
426 431 )
427 432 reset = Bool(False, config=True,
428 433 help="Whether to reset config files as part of '--create'."
429 434 )
430 435
431 436 # flags = Dict(flags)
432 437 aliases = Dict(start_aliases)
433 438
434 439 def init_launchers(self):
435 440 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
436 441 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
437 442 self.controller_launcher.on_stop(self.stop_launchers)
438 443
439 444 def start_controller(self):
440 445 self.controller_launcher.start(
441 446 cluster_dir=self.cluster_dir.location
442 447 )
443 448
444 449 def stop_controller(self):
445 450 # self.log.info("In stop_controller")
446 451 if self.controller_launcher and self.controller_launcher.running:
447 452 return self.controller_launcher.stop()
448 453
449 454 def stop_launchers(self, r=None):
450 455 if not self._stopping:
451 456 self.stop_controller()
452 457 super(IPClusterStart, self).stop_launchers()
453 458
454 459 def start(self):
455 460 """Start the app for the start subcommand."""
456 461 # First see if the cluster is already running
457 462 try:
458 463 pid = self.get_pid_from_file()
459 464 except PIDFileError:
460 465 pass
461 466 else:
462 467 if self.check_pid(pid):
463 468 self.log.critical(
464 469 'Cluster is already running with [pid=%s]. '
465 470 'use "ipcluster stop" to stop the cluster.' % pid
466 471 )
467 472 # Here I exit with a unusual exit status that other processes
468 473 # can watch for to learn how I existed.
469 474 self.exit(ALREADY_STARTED)
470 475 else:
471 476 self.remove_pid_file()
472 477
473 478
474 479 # Now log and daemonize
475 480 self.log.info(
476 481 'Starting ipcluster with [daemon=%r]' % self.daemonize
477 482 )
478 483 # TODO: Get daemonize working on Windows or as a Windows Server.
479 484 if self.daemonize:
480 485 if os.name=='posix':
481 486 from twisted.scripts._twistd_unix import daemonize
482 487 daemonize()
483 488
484 489 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
485 490 dc.start()
486 491 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
487 492 dc.start()
488 493 # Now write the new pid file AFTER our new forked pid is active.
489 494 self.write_pid_file()
490 495 try:
491 496 self.loop.start()
492 497 except KeyboardInterrupt:
493 498 pass
494 499 except zmq.ZMQError as e:
495 500 if e.errno == errno.EINTR:
496 501 pass
497 502 else:
498 503 raise
499 504 finally:
500 505 self.remove_pid_file()
501 506
502 507 base='IPython.parallel.apps.ipclusterapp.IPCluster'
503 508
504 509 class IPClusterApp(Application):
505 510 name = u'ipcluster'
506 511 description = _description
507 512
508 513 subcommands = {'create' : (base+'Create', create_help),
509 514 'list' : (base+'List', list_help),
510 515 'start' : (base+'Start', start_help),
511 516 'stop' : (base+'Stop', stop_help),
512 517 'engines' : (base+'Engines', engines_help),
513 518 }
514 519
515 520 # no aliases or flags for parent App
516 521 aliases = Dict()
517 522 flags = Dict()
518 523
519 524 def start(self):
520 525 if self.subapp is None:
521 526 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
522 527 print
523 528 self.print_subcommands()
524 529 self.exit(1)
525 530 else:
526 531 return self.subapp.start()
527 532
528 533 def launch_new_instance():
529 534 """Create and run the IPython cluster."""
530 535 app = IPClusterApp()
531 536 app.initialize()
532 537 app.start()
533 538
534 539
535 540 if __name__ == '__main__':
536 541 launch_new_instance()
537 542
@@ -1,403 +1,405
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import os
22 22 import logging
23 23 import socket
24 24 import stat
25 25 import sys
26 26 import uuid
27 27
28 28 from multiprocessing import Process
29 29
30 30 import zmq
31 31 from zmq.devices import ProcessMonitoredQueue
32 32 from zmq.log.handlers import PUBHandler
33 33 from zmq.utils import jsonapi as json
34 34
35 35 from IPython.config.loader import Config
36 36
37 37 from IPython.parallel import factory
38 38
39 39 from IPython.parallel.apps.clusterdir import (
40 40 ClusterDir,
41 41 ClusterApplication,
42 42 base_flags
43 43 # ClusterDirConfigLoader
44 44 )
45 45 from IPython.utils.importstring import import_item
46 46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
47 47
48 48 # from IPython.parallel.controller.controller import ControllerFactory
49 49 from IPython.parallel.streamsession import StreamSession
50 50 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 51 from IPython.parallel.controller.hub import Hub, HubFactory
52 52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 53 from IPython.parallel.controller.sqlitedb import SQLiteDB
54 54
55 55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
56 56
57 57 # conditional import of MongoDB backend class
58 58
59 59 try:
60 60 from IPython.parallel.controller.mongodb import MongoDB
61 61 except ImportError:
62 62 maybe_mongo = []
63 63 else:
64 64 maybe_mongo = [MongoDB]
65 65
66 66
67 67 #-----------------------------------------------------------------------------
68 68 # Module level variables
69 69 #-----------------------------------------------------------------------------
70 70
71 71
72 72 #: The default config file name for this application
73 73 default_config_file_name = u'ipcontroller_config.py'
74 74
75 75
76 76 _description = """Start the IPython controller for parallel computing.
77 77
78 78 The IPython controller provides a gateway between the IPython engines and
79 79 clients. The controller needs to be started before the engines and can be
80 80 configured using command line options or using a cluster directory. Cluster
81 81 directories contain config, log and security files and are usually located in
82 your ipython directory and named as "cluster_<profile>". See the --profile
83 and --cluster-dir options for details.
82 your ipython directory and named as "cluster_<profile>". See the `profile`
83 and `cluster_dir` options for details.
84 84 """
85 85
86 86
87 87
88 88
89 89 #-----------------------------------------------------------------------------
90 90 # The main application
91 91 #-----------------------------------------------------------------------------
92 92 flags = {}
93 93 flags.update(base_flags)
94 94 flags.update({
95 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
95 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
96 96 'Use threads instead of processes for the schedulers'),
97 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
97 'sqlitedb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'})},
98 98 'use the SQLiteDB backend'),
99 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
99 'mongodb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'})},
100 100 'use the MongoDB backend'),
101 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
101 'dictdb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.dictdb.DictDB'})},
102 102 'use the in-memory DictDB backend'),
103 'reuse' : ({'IPControllerApp' : Config({'reuse_files' : True})},
104 'reuse existing json connection files')
103 105 })
104 106
105 107 flags.update()
106 108
107 109 class IPControllerApp(ClusterApplication):
108 110
109 111 name = u'ipcontroller'
110 112 description = _description
111 113 # command_line_loader = IPControllerAppConfigLoader
112 114 default_config_file_name = default_config_file_name
113 115 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
114 116
115 117 auto_create_cluster_dir = Bool(True, config=True,
116 118 help="Whether to create cluster_dir if it exists.")
117 119 reuse_files = Bool(False, config=True,
118 120 help='Whether to reuse existing json connection files [default: False]'
119 121 )
120 122 secure = Bool(True, config=True,
121 123 help='Whether to use exec_keys for extra authentication [default: True]'
122 124 )
123 125 ssh_server = Unicode(u'', config=True,
124 126 help="""ssh url for clients to use when connecting to the Controller
125 127 processes. It should be of the form: [user@]server[:port]. The
126 128 Controller\'s listening addresses must be accessible from the ssh server""",
127 129 )
128 130 location = Unicode(u'', config=True,
129 131 help="""The external IP or domain name of the Controller, used for disambiguating
130 132 engine and client connections.""",
131 133 )
132 134 import_statements = List([], config=True,
133 135 help="import statements to be run at startup. Necessary in some environments"
134 136 )
135 137
136 usethreads = Bool(False, config=True,
138 use_threads = Bool(False, config=True,
137 139 help='Use threads instead of processes for the schedulers',
138 140 )
139 141
140 142 # internal
141 143 children = List()
142 144 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
143 145
144 def _usethreads_changed(self, name, old, new):
146 def _use_threads_changed(self, name, old, new):
145 147 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
146 148
147 149 aliases = Dict(dict(
148 150 config = 'IPControllerApp.config_file',
149 151 # file = 'IPControllerApp.url_file',
150 152 log_level = 'IPControllerApp.log_level',
151 153 log_url = 'IPControllerApp.log_url',
152 154 reuse_files = 'IPControllerApp.reuse_files',
153 155 secure = 'IPControllerApp.secure',
154 156 ssh = 'IPControllerApp.ssh_server',
155 usethreads = 'IPControllerApp.usethreads',
157 use_threads = 'IPControllerApp.use_threads',
156 158 import_statements = 'IPControllerApp.import_statements',
157 159 location = 'IPControllerApp.location',
158 160
159 161 ident = 'StreamSession.session',
160 162 user = 'StreamSession.username',
161 163 exec_key = 'StreamSession.keyfile',
162 164
163 165 url = 'HubFactory.url',
164 166 ip = 'HubFactory.ip',
165 167 transport = 'HubFactory.transport',
166 168 port = 'HubFactory.regport',
167 169
168 170 ping = 'HeartMonitor.period',
169 171
170 172 scheme = 'TaskScheduler.scheme_name',
171 173 hwm = 'TaskScheduler.hwm',
172 174
173 175
174 176 profile = "ClusterDir.profile",
175 177 cluster_dir = 'ClusterDir.location',
176 178
177 179 ))
178 180 flags = Dict(flags)
179 181
180 182
181 183 def save_connection_dict(self, fname, cdict):
182 184 """save a connection dict to json file."""
183 185 c = self.config
184 186 url = cdict['url']
185 187 location = cdict['location']
186 188 if not location:
187 189 try:
188 190 proto,ip,port = split_url(url)
189 191 except AssertionError:
190 192 pass
191 193 else:
192 194 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
193 195 cdict['location'] = location
194 196 fname = os.path.join(self.cluster_dir.security_dir, fname)
195 197 with open(fname, 'w') as f:
196 198 f.write(json.dumps(cdict, indent=2))
197 199 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
198 200
199 201 def load_config_from_json(self):
200 202 """load config from existing json connector files."""
201 203 c = self.config
202 204 # load from engine config
203 205 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
204 206 cfg = json.loads(f.read())
205 207 key = c.StreamSession.key = cfg['exec_key']
206 208 xport,addr = cfg['url'].split('://')
207 209 c.HubFactory.engine_transport = xport
208 210 ip,ports = addr.split(':')
209 211 c.HubFactory.engine_ip = ip
210 212 c.HubFactory.regport = int(ports)
211 213 self.location = cfg['location']
212 214
213 215 # load client config
214 216 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
215 217 cfg = json.loads(f.read())
216 218 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
217 219 xport,addr = cfg['url'].split('://')
218 220 c.HubFactory.client_transport = xport
219 221 ip,ports = addr.split(':')
220 222 c.HubFactory.client_ip = ip
221 223 self.ssh_server = cfg['ssh']
222 224 assert int(ports) == c.HubFactory.regport, "regport mismatch"
223 225
224 226 def init_hub(self):
225 227 c = self.config
226 228
227 229 self.do_import_statements()
228 230 reusing = self.reuse_files
229 231 if reusing:
230 232 try:
231 233 self.load_config_from_json()
232 234 except (AssertionError,IOError):
233 235 reusing=False
234 236 # check again, because reusing may have failed:
235 237 if reusing:
236 238 pass
237 239 elif self.secure:
238 240 key = str(uuid.uuid4())
239 241 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
240 242 # with open(keyfile, 'w') as f:
241 243 # f.write(key)
242 244 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
243 245 c.StreamSession.key = key
244 246 else:
245 247 key = c.StreamSession.key = ''
246 248
247 249 try:
248 250 self.factory = HubFactory(config=c, log=self.log)
249 251 # self.start_logging()
250 252 self.factory.init_hub()
251 253 except:
252 254 self.log.error("Couldn't construct the Controller", exc_info=True)
253 255 self.exit(1)
254 256
255 257 if not reusing:
256 258 # save to new json config files
257 259 f = self.factory
258 260 cdict = {'exec_key' : key,
259 261 'ssh' : self.ssh_server,
260 262 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
261 263 'location' : self.location
262 264 }
263 265 self.save_connection_dict('ipcontroller-client.json', cdict)
264 266 edict = cdict
265 267 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
266 268 self.save_connection_dict('ipcontroller-engine.json', edict)
267 269
268 270 #
269 271 def init_schedulers(self):
270 272 children = self.children
271 273 mq = import_item(str(self.mq_class))
272 274
273 275 hub = self.factory
274 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
276 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
275 277 # IOPub relay (in a Process)
276 278 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
277 279 q.bind_in(hub.client_info['iopub'])
278 280 q.bind_out(hub.engine_info['iopub'])
279 281 q.setsockopt_out(zmq.SUBSCRIBE, '')
280 282 q.connect_mon(hub.monitor_url)
281 283 q.daemon=True
282 284 children.append(q)
283 285
284 286 # Multiplexer Queue (in a Process)
285 287 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
286 288 q.bind_in(hub.client_info['mux'])
287 289 q.setsockopt_in(zmq.IDENTITY, 'mux')
288 290 q.bind_out(hub.engine_info['mux'])
289 291 q.connect_mon(hub.monitor_url)
290 292 q.daemon=True
291 293 children.append(q)
292 294
293 295 # Control Queue (in a Process)
294 296 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
295 297 q.bind_in(hub.client_info['control'])
296 298 q.setsockopt_in(zmq.IDENTITY, 'control')
297 299 q.bind_out(hub.engine_info['control'])
298 300 q.connect_mon(hub.monitor_url)
299 301 q.daemon=True
300 302 children.append(q)
301 303 try:
302 304 scheme = self.config.TaskScheduler.scheme_name
303 305 except AttributeError:
304 306 scheme = TaskScheduler.scheme_name.get_default_value()
305 307 # Task Queue (in a Process)
306 308 if scheme == 'pure':
307 309 self.log.warn("task::using pure XREQ Task scheduler")
308 310 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
309 311 # q.setsockopt_out(zmq.HWM, hub.hwm)
310 312 q.bind_in(hub.client_info['task'][1])
311 313 q.setsockopt_in(zmq.IDENTITY, 'task')
312 314 q.bind_out(hub.engine_info['task'])
313 315 q.connect_mon(hub.monitor_url)
314 316 q.daemon=True
315 317 children.append(q)
316 318 elif scheme == 'none':
317 319 self.log.warn("task::using no Task scheduler")
318 320
319 321 else:
320 322 self.log.info("task::using Python %s Task scheduler"%scheme)
321 323 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
322 324 hub.monitor_url, hub.client_info['notification'])
323 325 kwargs = dict(logname='scheduler', loglevel=self.log_level,
324 326 log_url = self.log_url, config=dict(self.config))
325 327 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
326 328 q.daemon=True
327 329 children.append(q)
328 330
329 331
330 332 def save_urls(self):
331 333 """save the registration urls to files."""
332 334 c = self.config
333 335
334 336 sec_dir = self.cluster_dir.security_dir
335 337 cf = self.factory
336 338
337 339 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
338 340 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
339 341
340 342 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
341 343 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
342 344
343 345
344 346 def do_import_statements(self):
345 347 statements = self.import_statements
346 348 for s in statements:
347 349 try:
348 350 self.log.msg("Executing statement: '%s'" % s)
349 351 exec s in globals(), locals()
350 352 except:
351 353 self.log.msg("Error running statement: %s" % s)
352 354
353 355 def forward_logging(self):
354 356 if self.log_url:
355 357 self.log.info("Forwarding logging to %s"%self.log_url)
356 358 context = zmq.Context.instance()
357 359 lsock = context.socket(zmq.PUB)
358 360 lsock.connect(self.log_url)
359 361 handler = PUBHandler(lsock)
360 362 self.log.removeHandler(self._log_handler)
361 363 handler.root_topic = 'controller'
362 364 handler.setLevel(self.log_level)
363 365 self.log.addHandler(handler)
364 366 self._log_handler = handler
365 367 # #
366 368
367 369 def initialize(self, argv=None):
368 370 super(IPControllerApp, self).initialize(argv)
369 371 self.forward_logging()
370 372 self.init_hub()
371 373 self.init_schedulers()
372 374
373 375 def start(self):
374 376 # Start the subprocesses:
375 377 self.factory.start()
376 378 child_procs = []
377 379 for child in self.children:
378 380 child.start()
379 381 if isinstance(child, ProcessMonitoredQueue):
380 382 child_procs.append(child.launcher)
381 383 elif isinstance(child, Process):
382 384 child_procs.append(child)
383 385 if child_procs:
384 386 signal_children(child_procs)
385 387
386 388 self.write_pid_file(overwrite=True)
387 389
388 390 try:
389 391 self.factory.loop.start()
390 392 except KeyboardInterrupt:
391 393 self.log.critical("Interrupted, Exiting...\n")
392 394
393 395
394 396
395 397 def launch_new_instance():
396 398 """Create and run the IPython controller"""
397 399 app = IPControllerApp()
398 400 app.initialize()
399 401 app.start()
400 402
401 403
402 404 if __name__ == '__main__':
403 405 launch_new_instance()
@@ -1,277 +1,277
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import json
19 19 import os
20 20 import sys
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24
25 25 from IPython.parallel.apps.clusterdir import (
26 26 ClusterApplication,
27 27 ClusterDir,
28 28 # ClusterDirConfigLoader
29 29 )
30 30 from IPython.zmq.log import EnginePUBHandler
31 31
32 32 from IPython.config.configurable import Configurable
33 33 from IPython.parallel.streamsession import StreamSession
34 34 from IPython.parallel.engine.engine import EngineFactory
35 35 from IPython.parallel.engine.streamkernel import Kernel
36 36 from IPython.parallel.util import disambiguate_url
37 37
38 38 from IPython.utils.importstring import import_item
39 39 from IPython.utils.traitlets import Bool, Unicode, Dict, List
40 40
41 41
42 42 #-----------------------------------------------------------------------------
43 43 # Module level variables
44 44 #-----------------------------------------------------------------------------
45 45
46 46 #: The default config file name for this application
47 47 default_config_file_name = u'ipengine_config.py'
48 48
49 _description = """Start an IPython engine for parallel computing.\n\n
49 _description = """Start an IPython engine for parallel computing.
50 50
51 51 IPython engines run in parallel and perform computations on behalf of a client
52 52 and controller. A controller needs to be started before the engines. The
53 53 engine can be configured using command line options or using a cluster
54 54 directory. Cluster directories contain config, log and security files and are
55 55 usually located in your ipython directory and named as "cluster_<profile>".
56 56 See the `profile` and `cluster_dir` options for details.
57 57 """
58 58
59 59
60 60 #-----------------------------------------------------------------------------
61 61 # MPI configuration
62 62 #-----------------------------------------------------------------------------
63 63
64 64 mpi4py_init = """from mpi4py import MPI as mpi
65 65 mpi.size = mpi.COMM_WORLD.Get_size()
66 66 mpi.rank = mpi.COMM_WORLD.Get_rank()
67 67 """
68 68
69 69
70 70 pytrilinos_init = """from PyTrilinos import Epetra
71 71 class SimpleStruct:
72 72 pass
73 73 mpi = SimpleStruct()
74 74 mpi.rank = 0
75 75 mpi.size = 0
76 76 """
77 77
78 78 class MPI(Configurable):
79 79 """Configurable for MPI initialization"""
80 80 use = Unicode('', config=True,
81 81 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
82 82 )
83 83
84 84 def _on_use_changed(self, old, new):
85 85 # load default init script if it's not set
86 86 if not self.init_script:
87 87 self.init_script = self.default_inits.get(new, '')
88 88
89 89 init_script = Unicode('', config=True,
90 90 help="Initialization code for MPI")
91 91
92 92 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
93 93 config=True)
94 94
95 95
96 96 #-----------------------------------------------------------------------------
97 97 # Main application
98 98 #-----------------------------------------------------------------------------
99 99
100 100
101 101 class IPEngineApp(ClusterApplication):
102 102
103 103 app_name = Unicode(u'ipengine')
104 104 description = Unicode(_description)
105 105 default_config_file_name = default_config_file_name
106 106 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
107 107
108 108 auto_create_cluster_dir = Bool(False,
109 109 help="whether to create the cluster_dir if it doesn't exist")
110 110
111 111 startup_script = Unicode(u'', config=True,
112 112 help='specify a script to be run at startup')
113 113 startup_command = Unicode('', config=True,
114 114 help='specify a command to be run at startup')
115 115
116 116 url_file = Unicode(u'', config=True,
117 117 help="""The full location of the file containing the connection information for
118 118 the controller. If this is not given, the file must be in the
119 119 security directory of the cluster directory. This location is
120 120 resolved using the `profile` or `cluster_dir` options.""",
121 121 )
122 122
123 123 url_file_name = Unicode(u'ipcontroller-engine.json')
124 124 log_url = Unicode('', config=True,
125 125 help="""The URL for the iploggerapp instance, for forwarding
126 126 logging to a central location.""")
127 127
128 128 aliases = Dict(dict(
129 129 config = 'IPEngineApp.config_file',
130 130 file = 'IPEngineApp.url_file',
131 131 c = 'IPEngineApp.startup_command',
132 132 s = 'IPEngineApp.startup_script',
133 133
134 134 ident = 'StreamSession.session',
135 135 user = 'StreamSession.username',
136 136 exec_key = 'StreamSession.keyfile',
137 137
138 138 url = 'EngineFactory.url',
139 139 ip = 'EngineFactory.ip',
140 140 transport = 'EngineFactory.transport',
141 141 port = 'EngineFactory.regport',
142 142 location = 'EngineFactory.location',
143 143
144 144 timeout = 'EngineFactory.timeout',
145 145
146 146 profile = "ClusterDir.profile",
147 147 cluster_dir = 'ClusterDir.location',
148 148
149 149 mpi = 'MPI.use',
150 150
151 151 log_level = 'IPEngineApp.log_level',
152 152 log_url = 'IPEngineApp.log_url'
153 153 ))
154 154
155 155 # def find_key_file(self):
156 156 # """Set the key file.
157 157 #
158 158 # Here we don't try to actually see if it exists for is valid as that
159 159 # is hadled by the connection logic.
160 160 # """
161 161 # config = self.master_config
162 162 # # Find the actual controller key file
163 163 # if not config.Global.key_file:
164 164 # try_this = os.path.join(
165 165 # config.Global.cluster_dir,
166 166 # config.Global.security_dir,
167 167 # config.Global.key_file_name
168 168 # )
169 169 # config.Global.key_file = try_this
170 170
171 171 def find_url_file(self):
172 172 """Set the key file.
173 173
174 174 Here we don't try to actually see if it exists for is valid as that
175 175 is hadled by the connection logic.
176 176 """
177 177 config = self.config
178 178 # Find the actual controller key file
179 179 if not self.url_file:
180 180 self.url_file = os.path.join(
181 181 self.cluster_dir.security_dir,
182 182 self.url_file_name
183 183 )
184 184 def init_engine(self):
185 185 # This is the working dir by now.
186 186 sys.path.insert(0, '')
187 187 config = self.config
188 188 # print config
189 189 self.find_url_file()
190 190
191 191 # if os.path.exists(config.Global.key_file) and config.Global.secure:
192 192 # config.SessionFactory.exec_key = config.Global.key_file
193 193 if os.path.exists(self.url_file):
194 194 with open(self.url_file) as f:
195 195 d = json.loads(f.read())
196 196 for k,v in d.iteritems():
197 197 if isinstance(v, unicode):
198 198 d[k] = v.encode()
199 199 if d['exec_key']:
200 200 config.StreamSession.key = d['exec_key']
201 201 d['url'] = disambiguate_url(d['url'], d['location'])
202 202 config.EngineFactory.url = d['url']
203 203 config.EngineFactory.location = d['location']
204 204
205 205 try:
206 206 exec_lines = config.Kernel.exec_lines
207 207 except AttributeError:
208 208 config.Kernel.exec_lines = []
209 209 exec_lines = config.Kernel.exec_lines
210 210
211 211 if self.startup_script:
212 212 enc = sys.getfilesystemencoding() or 'utf8'
213 213 cmd="execfile(%r)"%self.startup_script.encode(enc)
214 214 exec_lines.append(cmd)
215 215 if self.startup_command:
216 216 exec_lines.append(self.startup_command)
217 217
218 218 # Create the underlying shell class and Engine
219 219 # shell_class = import_item(self.master_config.Global.shell_class)
220 220 # print self.config
221 221 try:
222 222 self.engine = EngineFactory(config=config, log=self.log)
223 223 except:
224 224 self.log.error("Couldn't start the Engine", exc_info=True)
225 225 self.exit(1)
226 226
227 227 def forward_logging(self):
228 228 if self.log_url:
229 229 self.log.info("Forwarding logging to %s"%self.log_url)
230 230 context = self.engine.context
231 231 lsock = context.socket(zmq.PUB)
232 232 lsock.connect(self.log_url)
233 233 self.log.removeHandler(self._log_handler)
234 234 handler = EnginePUBHandler(self.engine, lsock)
235 235 handler.setLevel(self.log_level)
236 236 self.log.addHandler(handler)
237 237 self._log_handler = handler
238 238 #
239 239 def init_mpi(self):
240 240 global mpi
241 241 self.mpi = MPI(config=self.config)
242 242
243 243 mpi_import_statement = self.mpi.init_script
244 244 if mpi_import_statement:
245 245 try:
246 246 self.log.info("Initializing MPI:")
247 247 self.log.info(mpi_import_statement)
248 248 exec mpi_import_statement in globals()
249 249 except:
250 250 mpi = None
251 251 else:
252 252 mpi = None
253 253
254 254 def initialize(self, argv=None):
255 255 super(IPEngineApp, self).initialize(argv)
256 256 self.init_mpi()
257 257 self.init_engine()
258 258 self.forward_logging()
259 259
260 260 def start(self):
261 261 self.engine.start()
262 262 try:
263 263 self.engine.loop.start()
264 264 except KeyboardInterrupt:
265 265 self.log.critical("Engine Interrupted, shutting down...\n")
266 266
267 267
268 268 def launch_new_instance():
269 269 """Create and run the IPython engine"""
270 270 app = IPEngineApp()
271 271 app.initialize()
272 272 app.start()
273 273
274 274
275 275 if __name__ == '__main__':
276 276 launch_new_instance()
277 277
@@ -1,97 +1,97
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 A simple IPython logger application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import sys
20 20
21 21 import zmq
22 22
23 23 from IPython.utils.traitlets import Bool, Dict
24 24
25 25 from IPython.parallel.apps.clusterdir import (
26 26 ClusterApplication,
27 27 ClusterDir,
28 28 base_aliases
29 29 )
30 30 from IPython.parallel.apps.logwatcher import LogWatcher
31 31
32 32 #-----------------------------------------------------------------------------
33 33 # Module level variables
34 34 #-----------------------------------------------------------------------------
35 35
36 36 #: The default config file name for this application
37 37 default_config_file_name = u'iplogger_config.py'
38 38
39 _description = """Start an IPython logger for parallel computing.\n\n
39 _description = """Start an IPython logger for parallel computing.
40 40
41 41 IPython controllers and engines (and your own processes) can broadcast log messages
42 42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
43 43 logger can be configured using command line options or using a cluster
44 44 directory. Cluster directories contain config, log and security files and are
45 45 usually located in your ipython directory and named as "cluster_<profile>".
46 See the --profile and --cluster-dir options for details.
46 See the `profile` and `cluster_dir` options for details.
47 47 """
48 48
49 49
50 50 #-----------------------------------------------------------------------------
51 51 # Main application
52 52 #-----------------------------------------------------------------------------
53 53 aliases = {}
54 54 aliases.update(base_aliases)
55 55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
56 56
57 57 class IPLoggerApp(ClusterApplication):
58 58
59 59 name = u'iploggerz'
60 60 description = _description
61 61 default_config_file_name = default_config_file_name
62 62 auto_create_cluster_dir = Bool(False)
63 63
64 64 classes = [LogWatcher, ClusterDir]
65 65 aliases = Dict(aliases)
66 66
67 67 def initialize(self, argv=None):
68 68 super(IPLoggerApp, self).initialize(argv)
69 69 self.init_watcher()
70 70
71 71 def init_watcher(self):
72 72 try:
73 73 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
74 74 except:
75 75 self.log.error("Couldn't start the LogWatcher", exc_info=True)
76 76 self.exit(1)
77 77 self.log.info("Listening for log messages on %r"%self.watcher.url)
78 78
79 79
80 80 def start(self):
81 81 self.watcher.start()
82 82 try:
83 83 self.watcher.loop.start()
84 84 except KeyboardInterrupt:
85 85 self.log.critical("Logging Interrupted, shutting down...\n")
86 86
87 87
88 88 def launch_new_instance():
89 89 """Create and run the IPython LogWatcher"""
90 90 app = IPLoggerApp()
91 91 app.initialize()
92 92 app.start()
93 93
94 94
95 95 if __name__ == '__main__':
96 96 launch_new_instance()
97 97
@@ -1,166 +1,165
1 1 #!/usr/bin/env python
2 2 """A simple engine that talks to a controller over 0MQ.
3 3 it handles registration, etc. and launches a kernel
4 4 connected to the Controller's Schedulers.
5 5 """
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010-2011 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 from __future__ import print_function
14 14
15 15 import sys
16 16 import time
17 17
18 18 import zmq
19 19 from zmq.eventloop import ioloop, zmqstream
20 20
21 21 # internal
22 22 from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode
23 23 # from IPython.utils.localinterfaces import LOCALHOST
24 24
25 25 from IPython.parallel.controller.heartmonitor import Heart
26 26 from IPython.parallel.factory import RegistrationFactory
27 27 from IPython.parallel.streamsession import Message
28 28 from IPython.parallel.util import disambiguate_url
29 29
30 30 from .streamkernel import Kernel
31 31
32 32 class EngineFactory(RegistrationFactory):
33 33 """IPython engine"""
34 34
35 35 # configurables:
36 36 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
37 37 help="""The OutStream for handling stdout/err.
38 38 Typically 'IPython.zmq.iostream.OutStream'""")
39 39 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True,
40 40 help="""The class for handling displayhook.
41 41 Typically 'IPython.zmq.displayhook.DisplayHook'""")
42 42 location=Unicode(config=True,
43 43 help="""The location (an IP address) of the controller. This is
44 44 used for disambiguating URLs, to determine whether
45 45 loopback should be used to connect or the public address.""")
46 46 timeout=CFloat(2,config=True,
47 47 help="""The time (in seconds) to wait for the Controller to respond
48 48 to registration requests before giving up.""")
49 49
50 50 # not configurable:
51 51 user_ns=Dict()
52 52 id=Int(allow_none=True)
53 53 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
54 54 kernel=Instance(Kernel)
55 55
56 56
57 57 def __init__(self, **kwargs):
58 58 super(EngineFactory, self).__init__(**kwargs)
59 59 self.ident = self.session.session
60 60 ctx = self.context
61 61
62 62 reg = ctx.socket(zmq.XREQ)
63 63 reg.setsockopt(zmq.IDENTITY, self.ident)
64 64 reg.connect(self.url)
65 65 self.registrar = zmqstream.ZMQStream(reg, self.loop)
66 66
67 67 def register(self):
68 68 """send the registration_request"""
69 69
70 70 self.log.info("registering")
71 71 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
72 72 self.registrar.on_recv(self.complete_registration)
73 73 # print (self.session.key)
74 74 self.session.send(self.registrar, "registration_request",content=content)
75 75
76 76 def complete_registration(self, msg):
77 77 # print msg
78 78 self._abort_dc.stop()
79 79 ctx = self.context
80 80 loop = self.loop
81 81 identity = self.ident
82 82
83 83 idents,msg = self.session.feed_identities(msg)
84 84 msg = Message(self.session.unpack_message(msg))
85 85
86 86 if msg.content.status == 'ok':
87 87 self.id = int(msg.content.id)
88 88
89 89 # create Shell Streams (MUX, Task, etc.):
90 90 queue_addr = msg.content.mux
91 91 shell_addrs = [ str(queue_addr) ]
92 92 task_addr = msg.content.task
93 93 if task_addr:
94 94 shell_addrs.append(str(task_addr))
95 95
96 96 # Uncomment this to go back to two-socket model
97 97 # shell_streams = []
98 98 # for addr in shell_addrs:
99 99 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
100 100 # stream.setsockopt(zmq.IDENTITY, identity)
101 101 # stream.connect(disambiguate_url(addr, self.location))
102 102 # shell_streams.append(stream)
103 103
104 104 # Now use only one shell stream for mux and tasks
105 105 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
106 106 stream.setsockopt(zmq.IDENTITY, identity)
107 107 shell_streams = [stream]
108 108 for addr in shell_addrs:
109 109 stream.connect(disambiguate_url(addr, self.location))
110 110 # end single stream-socket
111 111
112 112 # control stream:
113 113 control_addr = str(msg.content.control)
114 114 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
115 115 control_stream.setsockopt(zmq.IDENTITY, identity)
116 116 control_stream.connect(disambiguate_url(control_addr, self.location))
117 117
118 118 # create iopub stream:
119 119 iopub_addr = msg.content.iopub
120 120 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
121 121 iopub_stream.setsockopt(zmq.IDENTITY, identity)
122 122 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
123 123
124 124 # launch heartbeat
125 125 hb_addrs = msg.content.heartbeat
126 126 # print (hb_addrs)
127 127
128 128 # # Redirect input streams and set a display hook.
129 129 if self.out_stream_factory:
130 130 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
131 131 sys.stdout.topic = 'engine.%i.stdout'%self.id
132 132 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
133 133 sys.stderr.topic = 'engine.%i.stderr'%self.id
134 134 if self.display_hook_factory:
135 135 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
136 136 sys.displayhook.topic = 'engine.%i.pyout'%self.id
137 137
138 138 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
139 139 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
140 140 loop=loop, user_ns = self.user_ns, log=self.log)
141 141 self.kernel.start()
142 142 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
143 143 heart = Heart(*map(str, hb_addrs), heart_id=identity)
144 # ioloop.DelayedCallback(heart.start, 1000, self.loop).start()
145 144 heart.start()
146 145
147 146
148 147 else:
149 148 self.log.fatal("Registration Failed: %s"%msg)
150 149 raise Exception("Registration Failed: %s"%msg)
151 150
152 151 self.log.info("Completed registration with id %i"%self.id)
153 152
154 153
155 154 def abort(self):
156 155 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
157 156 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
158 157 time.sleep(1)
159 158 sys.exit(255)
160 159
161 160 def start(self):
162 161 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
163 162 dc.start()
164 163 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
165 164 self._abort_dc.start()
166 165
@@ -1,107 +1,107
1 1 """toplevel setup/teardown for parallel tests."""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import tempfile
16 16 import time
17 17 from subprocess import Popen
18 18
19 19 from IPython.utils.path import get_ipython_dir
20 20 from IPython.parallel import Client
21 21 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
22 22 ipengine_cmd_argv,
23 23 ipcontroller_cmd_argv,
24 24 SIGKILL)
25 25
26 26 # globals
27 27 launchers = []
28 28 blackhole = open(os.devnull, 'w')
29 29
30 30 # Launcher class
31 31 class TestProcessLauncher(LocalProcessLauncher):
32 32 """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
33 33 def start(self):
34 34 if self.state == 'before':
35 35 self.process = Popen(self.args,
36 36 stdout=blackhole, stderr=blackhole,
37 37 env=os.environ,
38 38 cwd=self.work_dir
39 39 )
40 40 self.notify_start(self.process.pid)
41 41 self.poll = self.process.poll
42 42 else:
43 43 s = 'The process was already started and has state: %r' % self.state
44 44 raise ProcessStateError(s)
45 45
46 46 # nose setup/teardown
47 47
48 48 def setup():
49 49 cp = TestProcessLauncher()
50 50 cp.cmd_and_args = ipcontroller_cmd_argv + \
51 ['--profile', 'iptest', '--log-level', '99', '-r']
51 ['profile=iptest', 'log_level=50', '--reuse']
52 52 cp.start()
53 53 launchers.append(cp)
54 54 cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest')
55 55 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
56 56 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
57 57 tic = time.time()
58 58 while not os.path.exists(engine_json) or not os.path.exists(client_json):
59 59 if cp.poll() is not None:
60 60 print cp.poll()
61 61 raise RuntimeError("The test controller failed to start.")
62 62 elif time.time()-tic > 10:
63 63 raise RuntimeError("Timeout waiting for the test controller to start.")
64 64 time.sleep(0.1)
65 65 add_engines(1)
66 66
67 67 def add_engines(n=1, profile='iptest'):
68 68 rc = Client(profile=profile)
69 69 base = len(rc)
70 70 eps = []
71 71 for i in range(n):
72 72 ep = TestProcessLauncher()
73 ep.cmd_and_args = ipengine_cmd_argv + ['--profile', profile, '--log-level', '99']
73 ep.cmd_and_args = ipengine_cmd_argv + ['profile=%s'%profile, 'log_level=50']
74 74 ep.start()
75 75 launchers.append(ep)
76 76 eps.append(ep)
77 77 tic = time.time()
78 78 while len(rc) < base+n:
79 79 if any([ ep.poll() is not None for ep in eps ]):
80 80 raise RuntimeError("A test engine failed to start.")
81 81 elif time.time()-tic > 10:
82 82 raise RuntimeError("Timeout waiting for engines to connect.")
83 83 time.sleep(.1)
84 84 rc.spin()
85 85 rc.close()
86 86 return eps
87 87
88 88 def teardown():
89 89 time.sleep(1)
90 90 while launchers:
91 91 p = launchers.pop()
92 92 if p.poll() is None:
93 93 try:
94 94 p.stop()
95 95 except Exception, e:
96 96 print e
97 97 pass
98 98 if p.poll() is None:
99 99 time.sleep(.25)
100 100 if p.poll() is None:
101 101 try:
102 102 print 'cleaning up test process...'
103 103 p.signal(SIGKILL)
104 104 except:
105 105 print "couldn't shutdown process: ", p
106 106 blackhole.close()
107 107
@@ -1,111 +1,111
1 1 """test building messages with streamsession"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import uuid
16 16 import zmq
17 17
18 18 from zmq.tests import BaseZMQTestCase
19 19 from zmq.eventloop.zmqstream import ZMQStream
20 20 # from IPython.zmq.tests import SessionTestCase
21 21 from IPython.parallel import streamsession as ss
22 22
23 23 class SessionTestCase(BaseZMQTestCase):
24 24
25 25 def setUp(self):
26 26 BaseZMQTestCase.setUp(self)
27 27 self.session = ss.StreamSession()
28 28
29 29 class TestSession(SessionTestCase):
30 30
31 31 def test_msg(self):
32 32 """message format"""
33 33 msg = self.session.msg('execute')
34 34 thekeys = set('header msg_id parent_header msg_type content'.split())
35 35 s = set(msg.keys())
36 36 self.assertEquals(s, thekeys)
37 37 self.assertTrue(isinstance(msg['content'],dict))
38 38 self.assertTrue(isinstance(msg['header'],dict))
39 39 self.assertTrue(isinstance(msg['parent_header'],dict))
40 40 self.assertEquals(msg['msg_type'], 'execute')
41 41
42 42
43 43
44 44 def test_args(self):
45 45 """initialization arguments for StreamSession"""
46 46 s = self.session
47 47 self.assertTrue(s.pack is ss.default_packer)
48 48 self.assertTrue(s.unpack is ss.default_unpacker)
49 49 self.assertEquals(s.username, os.environ.get('USER', 'username'))
50 50
51 s = ss.StreamSession(username=None)
51 s = ss.StreamSession()
52 52 self.assertEquals(s.username, os.environ.get('USER', 'username'))
53 53
54 self.assertRaises(TypeError, ss.StreamSession, packer='hi')
55 self.assertRaises(TypeError, ss.StreamSession, unpacker='hi')
54 self.assertRaises(TypeError, ss.StreamSession, pack='hi')
55 self.assertRaises(TypeError, ss.StreamSession, unpack='hi')
56 56 u = str(uuid.uuid4())
57 57 s = ss.StreamSession(username='carrot', session=u)
58 58 self.assertEquals(s.session, u)
59 59 self.assertEquals(s.username, 'carrot')
60 60
61 61 def test_tracking(self):
62 62 """test tracking messages"""
63 63 a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
64 64 s = self.session
65 65 stream = ZMQStream(a)
66 66 msg = s.send(a, 'hello', track=False)
67 67 self.assertTrue(msg['tracker'] is None)
68 68 msg = s.send(a, 'hello', track=True)
69 69 self.assertTrue(isinstance(msg['tracker'], zmq.MessageTracker))
70 70 M = zmq.Message(b'hi there', track=True)
71 71 msg = s.send(a, 'hello', buffers=[M], track=True)
72 72 t = msg['tracker']
73 73 self.assertTrue(isinstance(t, zmq.MessageTracker))
74 74 self.assertRaises(zmq.NotDone, t.wait, .1)
75 75 del M
76 76 t.wait(1) # this will raise
77 77
78 78
79 79 # def test_rekey(self):
80 80 # """rekeying dict around json str keys"""
81 81 # d = {'0': uuid.uuid4(), 0:uuid.uuid4()}
82 82 # self.assertRaises(KeyError, ss.rekey, d)
83 83 #
84 84 # d = {'0': uuid.uuid4(), 1:uuid.uuid4(), 'asdf':uuid.uuid4()}
85 85 # d2 = {0:d['0'],1:d[1],'asdf':d['asdf']}
86 86 # rd = ss.rekey(d)
87 87 # self.assertEquals(d2,rd)
88 88 #
89 89 # d = {'1.5':uuid.uuid4(),'1':uuid.uuid4()}
90 90 # d2 = {1.5:d['1.5'],1:d['1']}
91 91 # rd = ss.rekey(d)
92 92 # self.assertEquals(d2,rd)
93 93 #
94 94 # d = {'1.0':uuid.uuid4(),'1':uuid.uuid4()}
95 95 # self.assertRaises(KeyError, ss.rekey, d)
96 96 #
97 97 def test_unique_msg_ids(self):
98 98 """test that messages receive unique ids"""
99 99 ids = set()
100 100 for i in range(2**12):
101 101 h = self.session.msg_header('test')
102 102 msg_id = h['msg_id']
103 103 self.assertTrue(msg_id not in ids)
104 104 ids.add(msg_id)
105 105
106 106 def test_feed_identities(self):
107 107 """scrub the front for zmq IDENTITIES"""
108 108 theids = "engine client other".split()
109 109 content = dict(code='whoda',stuff=object())
110 110 themsg = self.session.msg('execute',content=content)
111 111 pmsg = theids
@@ -1,253 +1,253
1 1 .. _ip1par:
2 2
3 3 ============================
4 4 Overview and getting started
5 5 ============================
6 6
7 7 Introduction
8 8 ============
9 9
10 10 This section gives an overview of IPython's sophisticated and powerful
11 11 architecture for parallel and distributed computing. This architecture
12 12 abstracts out parallelism in a very general way, which enables IPython to
13 13 support many different styles of parallelism including:
14 14
15 15 * Single program, multiple data (SPMD) parallelism.
16 16 * Multiple program, multiple data (MPMD) parallelism.
17 17 * Message passing using MPI.
18 18 * Task farming.
19 19 * Data parallel.
20 20 * Combinations of these approaches.
21 21 * Custom user defined approaches.
22 22
23 23 Most importantly, IPython enables all types of parallel applications to
24 24 be developed, executed, debugged and monitored *interactively*. Hence,
25 25 the ``I`` in IPython. The following are some example usage cases for IPython:
26 26
27 27 * Quickly parallelize algorithms that are embarrassingly parallel
28 28 using a number of simple approaches. Many simple things can be
29 29 parallelized interactively in one or two lines of code.
30 30
31 31 * Steer traditional MPI applications on a supercomputer from an
32 32 IPython session on your laptop.
33 33
34 34 * Analyze and visualize large datasets (that could be remote and/or
35 35 distributed) interactively using IPython and tools like
36 36 matplotlib/TVTK.
37 37
38 38 * Develop, test and debug new parallel algorithms
39 39 (that may use MPI) interactively.
40 40
41 41 * Tie together multiple MPI jobs running on different systems into
42 42 one giant distributed and parallel system.
43 43
44 44 * Start a parallel job on your cluster and then have a remote
45 45 collaborator connect to it and pull back data into their
46 46 local IPython session for plotting and analysis.
47 47
48 48 * Run a set of tasks on a set of CPUs using dynamic load balancing.
49 49
50 50 Architecture overview
51 51 =====================
52 52
53 53 The IPython architecture consists of four components:
54 54
55 55 * The IPython engine.
56 56 * The IPython hub.
57 57 * The IPython schedulers.
58 58 * The controller client.
59 59
60 60 These components live in the :mod:`IPython.parallel` package and are
61 61 installed with IPython. They do, however, have additional dependencies
62 62 that must be installed. For more information, see our
63 63 :ref:`installation documentation <install_index>`.
64 64
65 65 .. TODO: include zmq in install_index
66 66
67 67 IPython engine
68 68 ---------------
69 69
70 70 The IPython engine is a Python instance that takes Python commands over a
71 71 network connection. Eventually, the IPython engine will be a full IPython
72 72 interpreter, but for now, it is a regular Python interpreter. The engine
73 73 can also handle incoming and outgoing Python objects sent over a network
74 74 connection. When multiple engines are started, parallel and distributed
75 75 computing becomes possible. An important feature of an IPython engine is
76 76 that it blocks while user code is being executed. Read on for how the
77 77 IPython controller solves this problem to expose a clean asynchronous API
78 78 to the user.
79 79
80 80 IPython controller
81 81 ------------------
82 82
83 83 The IPython controller processes provide an interface for working with a set of engines.
84 84 At a general level, the controller is a collection of processes to which IPython engines
85 85 and clients can connect. The controller is composed of a :class:`Hub` and a collection of
86 86 :class:`Schedulers`. These Schedulers are typically run in separate processes but on the
87 87 same machine as the Hub, but can be run anywhere from local threads or on remote machines.
88 88
89 89 The controller also provides a single point of contact for users who wish to
90 90 utilize the engines connected to the controller. There are different ways of
91 91 working with a controller. In IPython, all of these models are implemented via
92 92 the client's :meth:`.View.apply` method, with various arguments, or
93 93 constructing :class:`.View` objects to represent subsets of engines. The two
94 94 primary models for interacting with engines are:
95 95
96 96 * A **Direct** interface, where engines are addressed explicitly.
97 97 * A **LoadBalanced** interface, where the Scheduler is trusted with assigning work to
98 98 appropriate engines.
99 99
100 100 Advanced users can readily extend the View models to enable other
101 101 styles of parallelism.
102 102
103 103 .. note::
104 104
105 105 A single controller and set of engines can be used with multiple models
106 106 simultaneously. This opens the door for lots of interesting things.
107 107
108 108
109 109 The Hub
110 110 *******
111 111
112 112 The center of an IPython cluster is the Hub. This is the process that keeps
113 113 track of engine connections, schedulers, clients, as well as all task requests and
114 114 results. The primary role of the Hub is to facilitate queries of the cluster state, and
115 115 minimize the necessary information required to establish the many connections involved in
116 116 connecting new clients and engines.
117 117
118 118
119 119 Schedulers
120 120 **********
121 121
122 122 All actions that can be performed on the engine go through a Scheduler. While the engines
123 123 themselves block when user code is run, the schedulers hide that from the user to provide
124 124 a fully asynchronous interface to a set of engines.
125 125
126 126
127 127 IPython client and views
128 128 ------------------------
129 129
130 130 There is one primary object, the :class:`~.parallel.Client`, for connecting to a cluster.
131 131 For each execution model, there is a corresponding :class:`~.parallel.View`. These views
132 132 allow users to interact with a set of engines through the interface. Here are the two default
133 133 views:
134 134
135 135 * The :class:`DirectView` class for explicit addressing.
136 136 * The :class:`LoadBalancedView` class for destination-agnostic scheduling.
137 137
138 138 Security
139 139 --------
140 140
141 141 IPython uses ZeroMQ for networking, which has provided many advantages, but
142 142 one of the setbacks is its utter lack of security [ZeroMQ]_. By default, no IPython
143 143 connections are encrypted, but open ports only listen on localhost. The only
144 144 source of security for IPython is via ssh-tunnel. IPython supports both shell
145 145 (`openssh`) and `paramiko` based tunnels for connections. There is a key necessary
146 146 to submit requests, but due to the lack of encryption, it does not provide
147 147 significant security if loopback traffic is compromised.
148 148
149 149 In our architecture, the controller is the only process that listens on
150 150 network ports, and is thus the main point of vulnerability. The standard model
151 151 for secure connections is to designate that the controller listen on
152 152 localhost, and use ssh-tunnels to connect clients and/or
153 153 engines.
154 154
155 155 To connect and authenticate to the controller an engine or client needs
156 156 some information that the controller has stored in a JSON file.
157 157 Thus, the JSON files need to be copied to a location where
158 158 the clients and engines can find them. Typically, this is the
159 159 :file:`~/.ipython/cluster_default/security` directory on the host where the
160 160 client/engine is running (which could be a different host than the controller).
161 161 Once the JSON files are copied over, everything should work fine.
162 162
163 163 Currently, there are two JSON files that the controller creates:
164 164
165 165 ipcontroller-engine.json
166 166 This JSON file has the information necessary for an engine to connect
167 167 to a controller.
168 168
169 169 ipcontroller-client.json
170 170 The client's connection information. This may not differ from the engine's,
171 171 but since the controller may listen on different ports for clients and
172 172 engines, it is stored separately.
173 173
174 174 More details of how these JSON files are used are given below.
175 175
176 176 A detailed description of the security model and its implementation in IPython
177 177 can be found :ref:`here <parallelsecurity>`.
178 178
179 179 .. warning::
180 180
181 181 Even at its most secure, the Controller listens on ports on localhost, and
182 182 every time you make a tunnel, you open a localhost port on the connecting
183 183 machine that points to the Controller. If localhost on the Controller's
184 184 machine, or the machine of any client or engine, is untrusted, then your
185 185 Controller is insecure. There is no way around this with ZeroMQ.
186 186
187 187
188 188
189 189 Getting Started
190 190 ===============
191 191
192 192 To use IPython for parallel computing, you need to start one instance of the
193 193 controller and one or more instances of the engine. Initially, it is best to
194 194 simply start a controller and engines on a single host using the
195 195 :command:`ipcluster` command. To start a controller and 4 engines on your
196 196 localhost, just do::
197 197
198 $ ipcluster start -n 4
198 $ ipcluster start n=4
199 199
200 200 More details about starting the IPython controller and engines can be found
201 201 :ref:`here <parallel_process>`
202 202
203 203 Once you have started the IPython controller and one or more engines, you
204 204 are ready to use the engines to do something useful. To make sure
205 205 everything is working correctly, try the following commands:
206 206
207 207 .. sourcecode:: ipython
208 208
209 209 In [1]: from IPython.parallel import Client
210 210
211 211 In [2]: c = Client()
212 212
213 213 In [4]: c.ids
214 214 Out[4]: set([0, 1, 2, 3])
215 215
216 216 In [5]: c[:].apply_sync(lambda : "Hello, World")
217 217 Out[5]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ]
218 218
219 219
220 220 When a client is created with no arguments, the client tries to find the corresponding JSON file
221 221 in the local `~/.ipython/cluster_default/security` directory. Or if you specified a profile,
222 222 you can use that with the Client. This should cover most cases:
223 223
224 224 .. sourcecode:: ipython
225 225
226 226 In [2]: c = Client(profile='myprofile')
227 227
228 228 If you have put the JSON file in a different location or it has a different name, create the
229 229 client like this:
230 230
231 231 .. sourcecode:: ipython
232 232
233 233 In [2]: c = Client('/path/to/my/ipcontroller-client.json')
234 234
235 235 Remember, a client needs to be able to see the Hub's ports to connect. So if they are on a
236 236 different machine, you may need to use an ssh server to tunnel access to that machine,
237 237 then you would connect to it with:
238 238
239 239 .. sourcecode:: ipython
240 240
241 241 In [2]: c = Client(sshserver='myhub.example.com')
242 242
243 243 Where 'myhub.example.com' is the url or IP address of the machine on
244 244 which the Hub process is running (or another machine that has direct access to the Hub's ports).
245 245
246 246 The SSH server may already be specified in ipcontroller-client.json, if the controller was
247 247 instructed at its launch time.
248 248
249 249 You are now ready to learn more about the :ref:`Direct
250 250 <parallel_multiengine>` and :ref:`LoadBalanced <parallel_task>` interfaces to the
251 251 controller.
252 252
253 253 .. [ZeroMQ] ZeroMQ. http://www.zeromq.org
@@ -1,156 +1,156
1 1 .. _parallelmpi:
2 2
3 3 =======================
4 4 Using MPI with IPython
5 5 =======================
6 6
7 7 .. note::
8 8
9 9 Not adapted to zmq yet
10 10 This is out of date wrt ipcluster in general as well
11 11
12 12 Often, a parallel algorithm will require moving data between the engines. One
13 13 way of accomplishing this is by doing a pull and then a push using the
14 14 multiengine client. However, this will be slow as all the data has to go
15 15 through the controller to the client and then back through the controller, to
16 16 its final destination.
17 17
18 18 A much better way of moving data between engines is to use a message passing
19 19 library, such as the Message Passing Interface (MPI) [MPI]_. IPython's
20 20 parallel computing architecture has been designed from the ground up to
21 21 integrate with MPI. This document describes how to use MPI with IPython.
22 22
23 23 Additional installation requirements
24 24 ====================================
25 25
26 26 If you want to use MPI with IPython, you will need to install:
27 27
28 28 * A standard MPI implementation such as OpenMPI [OpenMPI]_ or MPICH.
29 29 * The mpi4py [mpi4py]_ package.
30 30
31 31 .. note::
32 32
33 33 The mpi4py package is not a strict requirement. However, you need to
34 34 have *some* way of calling MPI from Python. You also need some way of
35 35 making sure that :func:`MPI_Init` is called when the IPython engines start
36 36 up. There are a number of ways of doing this and a good number of
37 37 associated subtleties. We highly recommend just using mpi4py as it
38 38 takes care of most of these problems. If you want to do something
39 39 different, let us know and we can help you get started.
40 40
41 41 Starting the engines with MPI enabled
42 42 =====================================
43 43
44 44 To use code that calls MPI, there are typically two things that MPI requires.
45 45
46 46 1. The process that wants to call MPI must be started using
47 47 :command:`mpiexec` or a batch system (like PBS) that has MPI support.
48 48 2. Once the process starts, it must call :func:`MPI_Init`.
49 49
50 50 There are a couple of ways that you can start the IPython engines and get
51 51 these things to happen.
52 52
53 53 Automatic starting using :command:`mpiexec` and :command:`ipcluster`
54 54 --------------------------------------------------------------------
55 55
56 The easiest approach is to use the `mpiexec` mode of :command:`ipcluster`,
56 The easiest approach is to use the `MPIExec` Launchers in :command:`ipcluster`,
57 57 which will first start a controller and then a set of engines using
58 58 :command:`mpiexec`::
59 59
60 $ ipcluster mpiexec -n 4
60 $ ipcluster start n=4 elauncher=MPIExecEngineSetLauncher
61 61
62 62 This approach is best as interrupting :command:`ipcluster` will automatically
63 63 stop and clean up the controller and engines.
64 64
65 65 Manual starting using :command:`mpiexec`
66 66 ----------------------------------------
67 67
68 68 If you want to start the IPython engines using the :command:`mpiexec`, just
69 69 do::
70 70
71 $ mpiexec -n 4 ipengine --mpi=mpi4py
71 $ mpiexec n=4 ipengine mpi=mpi4py
72 72
73 73 This requires that you already have a controller running and that the FURL
74 74 files for the engines are in place. We also have built in support for
75 75 PyTrilinos [PyTrilinos]_, which can be used (assuming is installed) by
76 76 starting the engines with::
77 77
78 $ mpiexec -n 4 ipengine --mpi=pytrilinos
78 $ mpiexec n=4 ipengine mpi=pytrilinos
79 79
80 80 Automatic starting using PBS and :command:`ipcluster`
81 81 ------------------------------------------------------
82 82
83 83 The :command:`ipcluster` command also has built-in integration with PBS. For
84 84 more information on this approach, see our documentation on :ref:`ipcluster
85 85 <parallel_process>`.
86 86
87 87 Actually using MPI
88 88 ==================
89 89
90 90 Once the engines are running with MPI enabled, you are ready to go. You can
91 91 now call any code that uses MPI in the IPython engines. And, all of this can
92 92 be done interactively. Here we show a simple example that uses mpi4py
93 93 [mpi4py]_ version 1.1.0 or later.
94 94
95 95 First, lets define a simply function that uses MPI to calculate the sum of a
96 96 distributed array. Save the following text in a file called :file:`psum.py`:
97 97
98 98 .. sourcecode:: python
99 99
100 100 from mpi4py import MPI
101 101 import numpy as np
102 102
103 103 def psum(a):
104 104 s = np.sum(a)
105 105 rcvBuf = np.array(0.0,'d')
106 106 MPI.COMM_WORLD.Allreduce([s, MPI.DOUBLE],
107 107 [rcvBuf, MPI.DOUBLE],
108 108 op=MPI.SUM)
109 109 return rcvBuf
110 110
111 111 Now, start an IPython cluster::
112 112
113 $ ipcluster start -p mpi -n 4
113 $ ipcluster start profile=mpi n=4
114 114
115 115 .. note::
116 116
117 117 It is assumed here that the mpi profile has been set up, as described :ref:`here
118 118 <parallel_process>`.
119 119
120 120 Finally, connect to the cluster and use this function interactively. In this
121 121 case, we create a random array on each engine and sum up all the random arrays
122 122 using our :func:`psum` function:
123 123
124 124 .. sourcecode:: ipython
125 125
126 126 In [1]: from IPython.parallel import Client
127 127
128 128 In [2]: %load_ext parallel_magic
129 129
130 130 In [3]: c = Client(profile='mpi')
131 131
132 132 In [4]: view = c[:]
133 133
134 134 In [5]: view.activate()
135 135
136 136 # run the contents of the file on each engine:
137 137 In [6]: view.run('psum.py')
138 138
139 139 In [6]: px a = np.random.rand(100)
140 140 Parallel execution on engines: [0,1,2,3]
141 141
142 142 In [8]: px s = psum(a)
143 143 Parallel execution on engines: [0,1,2,3]
144 144
145 145 In [9]: view['s']
146 146 Out[9]: [187.451545803,187.451545803,187.451545803,187.451545803]
147 147
148 148 Any Python code that makes calls to MPI can be used in this manner, including
149 149 compiled C, C++ and Fortran libraries that have been exposed to Python.
150 150
151 151 .. [MPI] Message Passing Interface. http://www-unix.mcs.anl.gov/mpi/
152 152 .. [mpi4py] MPI for Python. mpi4py: http://mpi4py.scipy.org/
153 153 .. [OpenMPI] Open MPI. http://www.open-mpi.org/
154 154 .. [PyTrilinos] PyTrilinos. http://trilinos.sandia.gov/packages/pytrilinos/
155 155
156 156
@@ -1,843 +1,843
1 1 .. _parallel_multiengine:
2 2
3 3 ==========================
4 4 IPython's Direct interface
5 5 ==========================
6 6
7 7 The direct, or multiengine, interface represents one possible way of working with a set of
8 8 IPython engines. The basic idea behind the multiengine interface is that the
9 9 capabilities of each engine are directly and explicitly exposed to the user.
10 10 Thus, in the multiengine interface, each engine is given an id that is used to
11 11 identify the engine and give it work to do. This interface is very intuitive
12 12 and is designed with interactive usage in mind, and is the best place for
13 13 new users of IPython to begin.
14 14
15 15 Starting the IPython controller and engines
16 16 ===========================================
17 17
18 18 To follow along with this tutorial, you will need to start the IPython
19 19 controller and four IPython engines. The simplest way of doing this is to use
20 20 the :command:`ipcluster` command::
21 21
22 $ ipcluster start -n 4
22 $ ipcluster start n=4
23 23
24 24 For more detailed information about starting the controller and engines, see
25 25 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
26 26
27 27 Creating a ``Client`` instance
28 28 ==============================
29 29
30 30 The first step is to import the IPython :mod:`IPython.parallel`
31 31 module and then create a :class:`.Client` instance:
32 32
33 33 .. sourcecode:: ipython
34 34
35 35 In [1]: from IPython.parallel import Client
36 36
37 37 In [2]: rc = Client()
38 38
39 39 This form assumes that the default connection information (stored in
40 40 :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/cluster_default/security`) is
41 41 accurate. If the controller was started on a remote machine, you must copy that connection
42 42 file to the client machine, or enter its contents as arguments to the Client constructor:
43 43
44 44 .. sourcecode:: ipython
45 45
46 46 # If you have copied the json connector file from the controller:
47 47 In [2]: rc = Client('/path/to/ipcontroller-client.json')
48 48 # or to connect with a specific profile you have set up:
49 49 In [3]: rc = Client(profile='mpi')
50 50
51 51
52 52 To make sure there are engines connected to the controller, users can get a list
53 53 of engine ids:
54 54
55 55 .. sourcecode:: ipython
56 56
57 57 In [3]: rc.ids
58 58 Out[3]: [0, 1, 2, 3]
59 59
60 60 Here we see that there are four engines ready to do work for us.
61 61
62 62 For direct execution, we will make use of a :class:`DirectView` object, which can be
63 63 constructed via list-access to the client:
64 64
65 65 .. sourcecode:: ipython
66 66
67 67 In [4]: dview = rc[:] # use all engines
68 68
69 69 .. seealso::
70 70
71 71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
72 72
73 73
74 74 Quick and easy parallelism
75 75 ==========================
76 76
77 77 In many cases, you simply want to apply a Python function to a sequence of
78 78 objects, but *in parallel*. The client interface provides a simple way
79 79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
80 80
81 81 Parallel map
82 82 ------------
83 83
84 84 Python's builtin :func:`map` functions allows a function to be applied to a
85 85 sequence element-by-element. This type of code is typically trivial to
86 86 parallelize. In fact, since IPython's interface is all about functions anyway,
87 87 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
88 88 DirectView's :meth:`map` method:
89 89
90 90 .. sourcecode:: ipython
91 91
92 92 In [62]: serial_result = map(lambda x:x**10, range(32))
93 93
94 94 In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32))
95 95
96 96 In [67]: serial_result==parallel_result
97 97 Out[67]: True
98 98
99 99
100 100 .. note::
101 101
102 102 The :class:`DirectView`'s version of :meth:`map` does
103 103 not do dynamic load balancing. For a load balanced version, use a
104 104 :class:`LoadBalancedView`.
105 105
106 106 .. seealso::
107 107
108 108 :meth:`map` is implemented via :class:`ParallelFunction`.
109 109
110 110 Remote function decorators
111 111 --------------------------
112 112
113 113 Remote functions are just like normal functions, but when they are called,
114 114 they execute on one or more engines, rather than locally. IPython provides
115 115 two decorators:
116 116
117 117 .. sourcecode:: ipython
118 118
119 119 In [10]: @dview.remote(block=True)
120 120 ...: def getpid():
121 121 ...: import os
122 122 ...: return os.getpid()
123 123 ...:
124 124
125 125 In [11]: getpid()
126 126 Out[11]: [12345, 12346, 12347, 12348]
127 127
128 128 The ``@parallel`` decorator creates parallel functions, that break up an element-wise
129 129 operations and distribute them, reconstructing the result.
130 130
131 131 .. sourcecode:: ipython
132 132
133 133 In [12]: import numpy as np
134 134
135 135 In [13]: A = np.random.random((64,48))
136 136
137 137 In [14]: @dview.parallel(block=True)
138 138 ...: def pmul(A,B):
139 139 ...: return A*B
140 140
141 141 In [15]: C_local = A*A
142 142
143 143 In [16]: C_remote = pmul(A,A)
144 144
145 145 In [17]: (C_local == C_remote).all()
146 146 Out[17]: True
147 147
148 148 .. seealso::
149 149
150 150 See the docstrings for the :func:`parallel` and :func:`remote` decorators for
151 151 options.
152 152
153 153 Calling Python functions
154 154 ========================
155 155
156 156 The most basic type of operation that can be performed on the engines is to
157 157 execute Python code or call Python functions. Executing Python code can be
158 158 done in blocking or non-blocking mode (non-blocking is default) using the
159 159 :meth:`.View.execute` method, and calling functions can be done via the
160 160 :meth:`.View.apply` method.
161 161
162 162 apply
163 163 -----
164 164
165 165 The main method for doing remote execution (in fact, all methods that
166 166 communicate with the engines are built on top of it), is :meth:`View.apply`.
167 167
168 168 We strive to provide the cleanest interface we can, so `apply` has the following
169 169 signature:
170 170
171 171 .. sourcecode:: python
172 172
173 173 view.apply(f, *args, **kwargs)
174 174
175 175 There are various ways to call functions with IPython, and these flags are set as
176 176 attributes of the View. The ``DirectView`` has just two of these flags:
177 177
178 178 dv.block : bool
179 179 whether to wait for the result, or return an :class:`AsyncResult` object
180 180 immediately
181 181 dv.track : bool
182 182 whether to instruct pyzmq to track when
183 183 This is primarily useful for non-copying sends of numpy arrays that you plan to
184 184 edit in-place. You need to know when it becomes safe to edit the buffer
185 185 without corrupting the message.
186 186
187 187
188 188 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
189 189
190 190 .. sourcecode:: ipython
191 191
192 192 In [4]: view = rc[1:3]
193 193 Out[4]: <DirectView [1, 2]>
194 194
195 195 In [5]: view.apply<tab>
196 196 view.apply view.apply_async view.apply_sync
197 197
198 198 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
199 199
200 200 Blocking execution
201 201 ------------------
202 202
203 203 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
204 204 these examples) submits the command to the controller, which places the
205 205 command in the engines' queues for execution. The :meth:`apply` call then
206 206 blocks until the engines are done executing the command:
207 207
208 208 .. sourcecode:: ipython
209 209
210 210 In [2]: dview = rc[:] # A DirectView of all engines
211 211 In [3]: dview.block=True
212 212 In [4]: dview['a'] = 5
213 213
214 214 In [5]: dview['b'] = 10
215 215
216 216 In [6]: dview.apply(lambda x: a+b+x, 27)
217 217 Out[6]: [42, 42, 42, 42]
218 218
219 219 You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync`
220 220 method:
221 221
222 222 In [7]: dview.block=False
223 223
224 224 In [8]: dview.apply_sync(lambda x: a+b+x, 27)
225 225 Out[8]: [42, 42, 42, 42]
226 226
227 227 Python commands can be executed as strings on specific engines by using a View's ``execute``
228 228 method:
229 229
230 230 .. sourcecode:: ipython
231 231
232 232 In [6]: rc[::2].execute('c=a+b')
233 233
234 234 In [7]: rc[1::2].execute('c=a-b')
235 235
236 236 In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
237 237 Out[8]: [15, -5, 15, -5]
238 238
239 239
240 240 Non-blocking execution
241 241 ----------------------
242 242
243 243 In non-blocking mode, :meth:`apply` submits the command to be executed and
244 244 then returns a :class:`AsyncResult` object immediately. The
245 245 :class:`AsyncResult` object gives you a way of getting a result at a later
246 246 time through its :meth:`get` method.
247 247
248 248 .. Note::
249 249
250 250 The :class:`AsyncResult` object provides a superset of the interface in
251 251 :py:class:`multiprocessing.pool.AsyncResult`. See the
252 252 `official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
253 253 for more.
254 254
255 255
256 256 This allows you to quickly submit long running commands without blocking your
257 257 local Python/IPython session:
258 258
259 259 .. sourcecode:: ipython
260 260
261 261 # define our function
262 262 In [6]: def wait(t):
263 263 ...: import time
264 264 ...: tic = time.time()
265 265 ...: time.sleep(t)
266 266 ...: return time.time()-tic
267 267
268 268 # In non-blocking mode
269 269 In [7]: ar = dview.apply_async(wait, 2)
270 270
271 271 # Now block for the result
272 272 In [8]: ar.get()
273 273 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
274 274
275 275 # Again in non-blocking mode
276 276 In [9]: ar = dview.apply_async(wait, 10)
277 277
278 278 # Poll to see if the result is ready
279 279 In [10]: ar.ready()
280 280 Out[10]: False
281 281
282 282 # ask for the result, but wait a maximum of 1 second:
283 283 In [45]: ar.get(1)
284 284 ---------------------------------------------------------------------------
285 285 TimeoutError Traceback (most recent call last)
286 286 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
287 287 ----> 1 ar.get(1)
288 288
289 289 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
290 290 62 raise self._exception
291 291 63 else:
292 292 ---> 64 raise error.TimeoutError("Result not ready.")
293 293 65
294 294 66 def ready(self):
295 295
296 296 TimeoutError: Result not ready.
297 297
298 298 .. Note::
299 299
300 300 Note the import inside the function. This is a common model, to ensure
301 301 that the appropriate modules are imported where the task is run. You can
302 302 also manually import modules into the engine(s) namespace(s) via
303 303 :meth:`view.execute('import numpy')`.
304 304
305 305 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
306 306 are done. For this, there is a the method :meth:`wait`. This method takes a
307 307 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
308 308 and blocks until all of the associated results are ready:
309 309
310 310 .. sourcecode:: ipython
311 311
312 312 In [72]: dview.block=False
313 313
314 314 # A trivial list of AsyncResults objects
315 315 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
316 316
317 317 # Wait until all of them are done
318 318 In [74]: dview.wait(pr_list)
319 319
320 320 # Then, their results are ready using get() or the `.r` attribute
321 321 In [75]: pr_list[0].get()
322 322 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
323 323
324 324
325 325
326 326 The ``block`` and ``targets`` keyword arguments and attributes
327 327 --------------------------------------------------------------
328 328
329 329 Most DirectView methods (excluding :meth:`apply` and :meth:`map`) accept ``block`` and
330 330 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
331 331 blocking mode and which engines the command is applied to. The :class:`View` class also has
332 332 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
333 333 arguments are not provided. Thus the following logic is used for :attr:`block` and :attr:`targets`:
334 334
335 335 * If no keyword argument is provided, the instance attributes are used.
336 336 * Keyword argument, if provided override the instance attributes for
337 337 the duration of a single call.
338 338
339 339 The following examples demonstrate how to use the instance attributes:
340 340
341 341 .. sourcecode:: ipython
342 342
343 343 In [16]: dview.targets = [0,2]
344 344
345 345 In [17]: dview.block = False
346 346
347 347 In [18]: ar = dview.apply(lambda : 10)
348 348
349 349 In [19]: ar.get()
350 350 Out[19]: [10, 10]
351 351
352 352 In [16]: dview.targets = v.client.ids # all engines (4)
353 353
354 354 In [21]: dview.block = True
355 355
356 356 In [22]: dview.apply(lambda : 42)
357 357 Out[22]: [42, 42, 42, 42]
358 358
359 359 The :attr:`block` and :attr:`targets` instance attributes of the
360 360 :class:`.DirectView` also determine the behavior of the parallel magic commands.
361 361
362 362 Parallel magic commands
363 363 -----------------------
364 364
365 365 .. warning::
366 366
367 367 The magics have not been changed to work with the zeromq system. The
368 368 magics do work, but *do not* print stdin/out like they used to in IPython.kernel.
369 369
370 370 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
371 371 that make it more pleasant to execute Python commands on the engines
372 372 interactively. These are simply shortcuts to :meth:`execute` and
373 373 :meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single
374 374 Python command on the engines specified by the :attr:`targets` attribute of the
375 375 :class:`DirectView` instance:
376 376
377 377 .. sourcecode:: ipython
378 378
379 379 # load the parallel magic extension:
380 380 In [21]: %load_ext parallelmagic
381 381
382 382 # Create a DirectView for all targets
383 383 In [22]: dv = rc[:]
384 384
385 385 # Make this DirectView active for parallel magic commands
386 386 In [23]: dv.activate()
387 387
388 388 In [24]: dv.block=True
389 389
390 390 In [25]: import numpy
391 391
392 392 In [26]: %px import numpy
393 393 Parallel execution on engines: [0, 1, 2, 3]
394 394
395 395 In [27]: %px a = numpy.random.rand(2,2)
396 396 Parallel execution on engines: [0, 1, 2, 3]
397 397
398 398 In [28]: %px ev = numpy.linalg.eigvals(a)
399 399 Parallel execution on engines: [0, 1, 2, 3]
400 400
401 401 In [28]: dv['ev']
402 402 Out[28]: [ array([ 1.09522024, -0.09645227]),
403 403 array([ 1.21435496, -0.35546712]),
404 404 array([ 0.72180653, 0.07133042]),
405 405 array([ 1.46384341e+00, 1.04353244e-04])
406 406 ]
407 407
408 408 The ``%result`` magic gets the most recent result, or takes an argument
409 409 specifying the index of the result to be requested. It is simply a shortcut to the
410 410 :meth:`get_result` method:
411 411
412 412 .. sourcecode:: ipython
413 413
414 414 In [29]: dv.apply_async(lambda : ev)
415 415
416 416 In [30]: %result
417 417 Out[30]: [ [ 1.28167017 0.14197338],
418 418 [-0.14093616 1.27877273],
419 419 [-0.37023573 1.06779409],
420 420 [ 0.83664764 -0.25602658] ]
421 421
422 422 The ``%autopx`` magic switches to a mode where everything you type is executed
423 423 on the engines given by the :attr:`targets` attribute:
424 424
425 425 .. sourcecode:: ipython
426 426
427 427 In [30]: dv.block=False
428 428
429 429 In [31]: %autopx
430 430 Auto Parallel Enabled
431 431 Type %autopx to disable
432 432
433 433 In [32]: max_evals = []
434 434 <IPython.parallel.AsyncResult object at 0x17b8a70>
435 435
436 436 In [33]: for i in range(100):
437 437 ....: a = numpy.random.rand(10,10)
438 438 ....: a = a+a.transpose()
439 439 ....: evals = numpy.linalg.eigvals(a)
440 440 ....: max_evals.append(evals[0].real)
441 441 ....:
442 442 ....:
443 443 <IPython.parallel.AsyncResult object at 0x17af8f0>
444 444
445 445 In [34]: %autopx
446 446 Auto Parallel Disabled
447 447
448 448 In [35]: dv.block=True
449 449
450 450 In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals))
451 451 Parallel execution on engines: [0, 1, 2, 3]
452 452
453 453 In [37]: dv['ans']
454 454 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
455 455 'Average max eigenvalue is: 10.2076902286',
456 456 'Average max eigenvalue is: 10.1891484655',
457 457 'Average max eigenvalue is: 10.1158837784',]
458 458
459 459
460 460 Moving Python objects around
461 461 ============================
462 462
463 463 In addition to calling functions and executing code on engines, you can
464 464 transfer Python objects to and from your IPython session and the engines. In
465 465 IPython, these operations are called :meth:`push` (sending an object to the
466 466 engines) and :meth:`pull` (getting an object from the engines).
467 467
468 468 Basic push and pull
469 469 -------------------
470 470
471 471 Here are some examples of how you use :meth:`push` and :meth:`pull`:
472 472
473 473 .. sourcecode:: ipython
474 474
475 475 In [38]: dview.push(dict(a=1.03234,b=3453))
476 476 Out[38]: [None,None,None,None]
477 477
478 478 In [39]: dview.pull('a')
479 479 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
480 480
481 481 In [40]: dview.pull('b', targets=0)
482 482 Out[40]: 3453
483 483
484 484 In [41]: dview.pull(('a','b'))
485 485 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
486 486
487 487 In [43]: dview.push(dict(c='speed'))
488 488 Out[43]: [None,None,None,None]
489 489
490 490 In non-blocking mode :meth:`push` and :meth:`pull` also return
491 491 :class:`AsyncResult` objects:
492 492
493 493 .. sourcecode:: ipython
494 494
495 495 In [48]: ar = dview.pull('a', block=False)
496 496
497 497 In [49]: ar.get()
498 498 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
499 499
500 500
501 501 Dictionary interface
502 502 --------------------
503 503
504 504 Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide
505 505 dictionary-style access by key and methods such as :meth:`get` and
506 506 :meth:`update` for convenience. This make the remote namespaces of the engines
507 507 appear as a local dictionary. Underneath, these methods call :meth:`apply`:
508 508
509 509 .. sourcecode:: ipython
510 510
511 511 In [51]: dview['a']=['foo','bar']
512 512
513 513 In [52]: dview['a']
514 514 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
515 515
516 516 Scatter and gather
517 517 ------------------
518 518
519 519 Sometimes it is useful to partition a sequence and push the partitions to
520 520 different engines. In MPI language, this is know as scatter/gather and we
521 521 follow that terminology. However, it is important to remember that in
522 522 IPython's :class:`Client` class, :meth:`scatter` is from the
523 523 interactive IPython session to the engines and :meth:`gather` is from the
524 524 engines back to the interactive IPython session. For scatter/gather operations
525 525 between engines, MPI should be used:
526 526
527 527 .. sourcecode:: ipython
528 528
529 529 In [58]: dview.scatter('a',range(16))
530 530 Out[58]: [None,None,None,None]
531 531
532 532 In [59]: dview['a']
533 533 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
534 534
535 535 In [60]: dview.gather('a')
536 536 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
537 537
538 538 Other things to look at
539 539 =======================
540 540
541 541 How to do parallel list comprehensions
542 542 --------------------------------------
543 543
544 544 In many cases list comprehensions are nicer than using the map function. While
545 545 we don't have fully parallel list comprehensions, it is simple to get the
546 546 basic effect using :meth:`scatter` and :meth:`gather`:
547 547
548 548 .. sourcecode:: ipython
549 549
550 550 In [66]: dview.scatter('x',range(64))
551 551
552 552 In [67]: %px y = [i**10 for i in x]
553 553 Parallel execution on engines: [0, 1, 2, 3]
554 554 Out[67]:
555 555
556 556 In [68]: y = dview.gather('y')
557 557
558 558 In [69]: print y
559 559 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
560 560
561 561 Remote imports
562 562 --------------
563 563
564 564 Sometimes you will want to import packages both in your interactive session
565 565 and on your remote engines. This can be done with the :class:`ContextManager`
566 566 created by a DirectView's :meth:`sync_imports` method:
567 567
568 568 .. sourcecode:: ipython
569 569
570 570 In [69]: with dview.sync_imports():
571 571 ...: import numpy
572 572 importing numpy on engine(s)
573 573
574 574 Any imports made inside the block will also be performed on the view's engines.
575 575 sync_imports also takes a `local` boolean flag that defaults to True, which specifies
576 576 whether the local imports should also be performed. However, support for `local=False`
577 577 has not been implemented, so only packages that can be imported locally will work
578 578 this way.
579 579
580 580 You can also specify imports via the ``@require`` decorator. This is a decorator
581 581 designed for use in Dependencies, but can be used to handle remote imports as well.
582 582 Modules or module names passed to ``@require`` will be imported before the decorated
583 583 function is called. If they cannot be imported, the decorated function will never
584 584 execution, and will fail with an UnmetDependencyError.
585 585
586 586 .. sourcecode:: ipython
587 587
588 588 In [69]: from IPython.parallel import require
589 589
590 590 In [70]: @requre('re'):
591 591 ...: def findall(pat, x):
592 592 ...: # re is guaranteed to be available
593 593 ...: return re.findall(pat, x)
594 594
595 595 # you can also pass modules themselves, that you already have locally:
596 596 In [71]: @requre(time):
597 597 ...: def wait(t):
598 598 ...: time.sleep(t)
599 599 ...: return t
600 600
601 601
602 602 Parallel exceptions
603 603 -------------------
604 604
605 605 In the multiengine interface, parallel commands can raise Python exceptions,
606 606 just like serial commands. But, it is a little subtle, because a single
607 607 parallel command can actually raise multiple exceptions (one for each engine
608 608 the command was run on). To express this idea, we have a
609 609 :exc:`CompositeError` exception class that will be raised in most cases. The
610 610 :exc:`CompositeError` class is a special type of exception that wraps one or
611 611 more other types of exceptions. Here is how it works:
612 612
613 613 .. sourcecode:: ipython
614 614
615 615 In [76]: dview.block=True
616 616
617 617 In [77]: dview.execute('1/0')
618 618 ---------------------------------------------------------------------------
619 619 CompositeError Traceback (most recent call last)
620 620 /home/you/<ipython-input-10-15c2c22dec39> in <module>()
621 621 ----> 1 dview.execute('1/0', block=True)
622 622
623 623 /path/to/site-packages/IPython/parallel/view.py in execute(self, code, block)
624 624 460 default: self.block
625 625 461 """
626 626 --> 462 return self.apply_with_flags(util._execute, args=(code,), block=block)
627 627 463
628 628 464 def run(self, filename, block=None):
629 629
630 630 /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track)
631 631
632 632 /path/to/site-packages/IPython/parallel/view.py in sync_results(f, self, *args, **kwargs)
633 633 46 def sync_results(f, self, *args, **kwargs):
634 634 47 """sync relevant results from self.client to our results attribute."""
635 635 ---> 48 ret = f(self, *args, **kwargs)
636 636 49 delta = self.outstanding.difference(self.client.outstanding)
637 637 50 completed = self.outstanding.intersection(delta)
638 638
639 639 /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track)
640 640
641 641 /path/to/site-packages/IPython/parallel/view.py in save_ids(f, self, *args, **kwargs)
642 642 35 n_previous = len(self.client.history)
643 643 36 try:
644 644 ---> 37 ret = f(self, *args, **kwargs)
645 645 38 finally:
646 646 39 nmsgs = len(self.client.history) - n_previous
647 647
648 648 /path/to/site-packages/IPython/parallel/view.py in apply_with_flags(self, f, args, kwargs, block, track)
649 649 398 if block:
650 650 399 try:
651 651 --> 400 return ar.get()
652 652 401 except KeyboardInterrupt:
653 653 402 pass
654 654
655 655 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
656 656 87 return self._result
657 657 88 else:
658 658 ---> 89 raise self._exception
659 659 90 else:
660 660 91 raise error.TimeoutError("Result not ready.")
661 661
662 662 CompositeError: one or more exceptions from call to method: _execute
663 663 [0:apply]: ZeroDivisionError: integer division or modulo by zero
664 664 [1:apply]: ZeroDivisionError: integer division or modulo by zero
665 665 [2:apply]: ZeroDivisionError: integer division or modulo by zero
666 666 [3:apply]: ZeroDivisionError: integer division or modulo by zero
667 667
668 668
669 669 Notice how the error message printed when :exc:`CompositeError` is raised has
670 670 information about the individual exceptions that were raised on each engine.
671 671 If you want, you can even raise one of these original exceptions:
672 672
673 673 .. sourcecode:: ipython
674 674
675 675 In [80]: try:
676 676 ....: dview.execute('1/0')
677 677 ....: except client.CompositeError, e:
678 678 ....: e.raise_exception()
679 679 ....:
680 680 ....:
681 681 ---------------------------------------------------------------------------
682 682 ZeroDivisionError Traceback (most recent call last)
683 683
684 684 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
685 685
686 686 /ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid)
687 687 156 raise IndexError("an exception with index %i does not exist"%excid)
688 688 157 else:
689 689 --> 158 raise et, ev, etb
690 690 159
691 691 160 def collect_exceptions(rlist, method):
692 692
693 693 ZeroDivisionError: integer division or modulo by zero
694 694
695 695 If you are working in IPython, you can simple type ``%debug`` after one of
696 696 these :exc:`CompositeError` exceptions is raised, and inspect the exception
697 697 instance:
698 698
699 699 .. sourcecode:: ipython
700 700
701 701 In [81]: dview.execute('1/0')
702 702 ---------------------------------------------------------------------------
703 703 CompositeError Traceback (most recent call last)
704 704 /home/you/<ipython-input-10-15c2c22dec39> in <module>()
705 705 ----> 1 dview.execute('1/0', block=True)
706 706
707 707 /path/to/site-packages/IPython/parallel/view.py in execute(self, code, block)
708 708 460 default: self.block
709 709 461 """
710 710 --> 462 return self.apply_with_flags(util._execute, args=(code,), block=block)
711 711 463
712 712 464 def run(self, filename, block=None):
713 713
714 714 /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track)
715 715
716 716 /path/to/site-packages/IPython/parallel/view.py in sync_results(f, self, *args, **kwargs)
717 717 46 def sync_results(f, self, *args, **kwargs):
718 718 47 """sync relevant results from self.client to our results attribute."""
719 719 ---> 48 ret = f(self, *args, **kwargs)
720 720 49 delta = self.outstanding.difference(self.client.outstanding)
721 721 50 completed = self.outstanding.intersection(delta)
722 722
723 723 /home/you/<string> in apply_with_flags(self, f, args, kwargs, block, track)
724 724
725 725 /path/to/site-packages/IPython/parallel/view.py in save_ids(f, self, *args, **kwargs)
726 726 35 n_previous = len(self.client.history)
727 727 36 try:
728 728 ---> 37 ret = f(self, *args, **kwargs)
729 729 38 finally:
730 730 39 nmsgs = len(self.client.history) - n_previous
731 731
732 732 /path/to/site-packages/IPython/parallel/view.py in apply_with_flags(self, f, args, kwargs, block, track)
733 733 398 if block:
734 734 399 try:
735 735 --> 400 return ar.get()
736 736 401 except KeyboardInterrupt:
737 737 402 pass
738 738
739 739 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
740 740 87 return self._result
741 741 88 else:
742 742 ---> 89 raise self._exception
743 743 90 else:
744 744 91 raise error.TimeoutError("Result not ready.")
745 745
746 746 CompositeError: one or more exceptions from call to method: _execute
747 747 [0:apply]: ZeroDivisionError: integer division or modulo by zero
748 748 [1:apply]: ZeroDivisionError: integer division or modulo by zero
749 749 [2:apply]: ZeroDivisionError: integer division or modulo by zero
750 750 [3:apply]: ZeroDivisionError: integer division or modulo by zero
751 751
752 752 In [82]: %debug
753 753 > /path/to/site-packages/IPython/parallel/asyncresult.py(80)get()
754 754 79 else:
755 755 ---> 80 raise self._exception
756 756 81 else:
757 757
758 758
759 759 # With the debugger running, e is the exceptions instance. We can tab complete
760 760 # on it and see the extra methods that are available.
761 761 ipdb> e.
762 762 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
763 763 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
764 764 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
765 765 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
766 766 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
767 767 ipdb> e.print_tracebacks()
768 768 [0:apply]:
769 769 Traceback (most recent call last):
770 770 File "/path/to/site-packages/IPython/parallel/streamkernel.py", line 332, in apply_request
771 771 exec code in working, working
772 772 File "<string>", line 1, in <module>
773 773 File "/path/to/site-packages/IPython/parallel/client.py", line 69, in _execute
774 774 exec code in globals()
775 775 File "<string>", line 1, in <module>
776 776 ZeroDivisionError: integer division or modulo by zero
777 777
778 778
779 779 [1:apply]:
780 780 Traceback (most recent call last):
781 781 File "/path/to/site-packages/IPython/parallel/streamkernel.py", line 332, in apply_request
782 782 exec code in working, working
783 783 File "<string>", line 1, in <module>
784 784 File "/path/to/site-packages/IPython/parallel/client.py", line 69, in _execute
785 785 exec code in globals()
786 786 File "<string>", line 1, in <module>
787 787 ZeroDivisionError: integer division or modulo by zero
788 788
789 789
790 790 [2:apply]:
791 791 Traceback (most recent call last):
792 792 File "/path/to/site-packages/IPython/parallel/streamkernel.py", line 332, in apply_request
793 793 exec code in working, working
794 794 File "<string>", line 1, in <module>
795 795 File "/path/to/site-packages/IPython/parallel/client.py", line 69, in _execute
796 796 exec code in globals()
797 797 File "<string>", line 1, in <module>
798 798 ZeroDivisionError: integer division or modulo by zero
799 799
800 800
801 801 [3:apply]:
802 802 Traceback (most recent call last):
803 803 File "/path/to/site-packages/IPython/parallel/streamkernel.py", line 332, in apply_request
804 804 exec code in working, working
805 805 File "<string>", line 1, in <module>
806 806 File "/path/to/site-packages/IPython/parallel/client.py", line 69, in _execute
807 807 exec code in globals()
808 808 File "<string>", line 1, in <module>
809 809 ZeroDivisionError: integer division or modulo by zero
810 810
811 811
812 812 .. note::
813 813
814 814 TODO: The above tracebacks are not up to date
815 815
816 816
817 817 All of this same error handling magic even works in non-blocking mode:
818 818
819 819 .. sourcecode:: ipython
820 820
821 821 In [83]: dview.block=False
822 822
823 823 In [84]: ar = dview.execute('1/0')
824 824
825 825 In [85]: ar.get()
826 826 ---------------------------------------------------------------------------
827 827 CompositeError Traceback (most recent call last)
828 828 /Users/minrk/<ipython-input-3-8531eb3d26fb> in <module>()
829 829 ----> 1 ar.get()
830 830
831 831 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
832 832 78 return self._result
833 833 79 else:
834 834 ---> 80 raise self._exception
835 835 81 else:
836 836 82 raise error.TimeoutError("Result not ready.")
837 837
838 838 CompositeError: one or more exceptions from call to method: _execute
839 839 [0:apply]: ZeroDivisionError: integer division or modulo by zero
840 840 [1:apply]: ZeroDivisionError: integer division or modulo by zero
841 841 [2:apply]: ZeroDivisionError: integer division or modulo by zero
842 842 [3:apply]: ZeroDivisionError: integer division or modulo by zero
843 843
@@ -1,506 +1,507
1 1 .. _parallel_process:
2 2
3 3 ===========================================
4 4 Starting the IPython controller and engines
5 5 ===========================================
6 6
7 7 To use IPython for parallel computing, you need to start one instance of
8 8 the controller and one or more instances of the engine. The controller
9 9 and each engine can run on different machines or on the same machine.
10 10 Because of this, there are many different possibilities.
11 11
12 12 Broadly speaking, there are two ways of going about starting a controller and engines:
13 13
14 14 * In an automated manner using the :command:`ipcluster` command.
15 15 * In a more manual way using the :command:`ipcontroller` and
16 16 :command:`ipengine` commands.
17 17
18 18 This document describes both of these methods. We recommend that new users
19 19 start with the :command:`ipcluster` command as it simplifies many common usage
20 20 cases.
21 21
22 22 General considerations
23 23 ======================
24 24
25 25 Before delving into the details about how you can start a controller and
26 26 engines using the various methods, we outline some of the general issues that
27 27 come up when starting the controller and engines. These things come up no
28 28 matter which method you use to start your IPython cluster.
29 29
30 30 Let's say that you want to start the controller on ``host0`` and engines on
31 31 hosts ``host1``-``hostn``. The following steps are then required:
32 32
33 33 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
34 34 ``host0``.
35 35 2. Move the JSON file (:file:`ipcontroller-engine.json`) created by the
36 36 controller from ``host0`` to hosts ``host1``-``hostn``.
37 37 3. Start the engines on hosts ``host1``-``hostn`` by running
38 38 :command:`ipengine`. This command has to be told where the JSON file
39 39 (:file:`ipcontroller-engine.json`) is located.
40 40
41 41 At this point, the controller and engines will be connected. By default, the JSON files
42 42 created by the controller are put into the :file:`~/.ipython/cluster_default/security`
43 43 directory. If the engines share a filesystem with the controller, step 2 can be skipped as
44 44 the engines will automatically look at that location.
45 45
46 46 The final step required to actually use the running controller from a client is to move
47 47 the JSON file :file:`ipcontroller-client.json` from ``host0`` to any host where clients
48 48 will be run. If these file are put into the :file:`~/.ipython/cluster_default/security`
49 49 directory of the client's host, they will be found automatically. Otherwise, the full path
50 50 to them has to be passed to the client's constructor.
51 51
52 52 Using :command:`ipcluster`
53 53 ===========================
54 54
55 55 The :command:`ipcluster` command provides a simple way of starting a
56 56 controller and engines in the following situations:
57 57
58 58 1. When the controller and engines are all run on localhost. This is useful
59 59 for testing or running on a multicore computer.
60 2. When engines are started using the :command:`mpirun` command that comes
60 2. When engines are started using the :command:`mpiexec` command that comes
61 61 with most MPI [MPI]_ implementations
62 62 3. When engines are started using the PBS [PBS]_ batch system
63 63 (or other `qsub` systems, such as SGE).
64 64 4. When the controller is started on localhost and the engines are started on
65 65 remote nodes using :command:`ssh`.
66 66 5. When engines are started using the Windows HPC Server batch system.
67 67
68 68 .. note::
69 69
70 70 Currently :command:`ipcluster` requires that the
71 71 :file:`~/.ipython/cluster_<profile>/security` directory live on a shared filesystem that is
72 72 seen by both the controller and engines. If you don't have a shared file
73 73 system you will need to use :command:`ipcontroller` and
74 74 :command:`ipengine` directly.
75 75
76 76 Under the hood, :command:`ipcluster` just uses :command:`ipcontroller`
77 77 and :command:`ipengine` to perform the steps described above.
78 78
79 79 The simplest way to use ipcluster requires no configuration, and will
80 80 launch a controller and a number of engines on the local machine. For instance,
81 81 to start one controller and 4 engines on localhost, just do::
82 82
83 $ ipcluster start -n 4
83 $ ipcluster start n=4
84 84
85 To see other command line options for the local mode, do::
85 To see other command line options, do::
86 86
87 87 $ ipcluster -h
88 88
89 89
90 90 Configuring an IPython cluster
91 91 ==============================
92 92
93 93 Cluster configurations are stored as `profiles`. You can create a new profile with::
94 94
95 $ ipcluster create -p myprofile
95 $ ipcluster create profile=myprofile
96 96
97 97 This will create the directory :file:`IPYTHONDIR/cluster_myprofile`, and populate it
98 98 with the default configuration files for the three IPython cluster commands. Once
99 99 you edit those files, you can continue to call ipcluster/ipcontroller/ipengine
100 with no arguments beyond ``-p myprofile``, and any configuration will be maintained.
100 with no arguments beyond ``p=myprofile``, and any configuration will be maintained.
101 101
102 102 There is no limit to the number of profiles you can have, so you can maintain a profile for each
103 103 of your common use cases. The default profile will be used whenever the
104 104 profile argument is not specified, so edit :file:`IPYTHONDIR/cluster_default/*_config.py` to
105 105 represent your most common use case.
106 106
107 107 The configuration files are loaded with commented-out settings and explanations,
108 108 which should cover most of the available possibilities.
109 109
110 110 Using various batch systems with :command:`ipcluster`
111 111 ------------------------------------------------------
112 112
113 113 :command:`ipcluster` has a notion of Launchers that can start controllers
114 114 and engines with various remote execution schemes. Currently supported
115 models include `mpiexec`, PBS-style (Torque, SGE), and Windows HPC Server.
115 models include :command:`ssh`, :command`mpiexec`, PBS-style (Torque, SGE),
116 and Windows HPC Server.
116 117
117 118 .. note::
118 119
119 120 The Launchers and configuration are designed in such a way that advanced
120 121 users can subclass and configure them to fit their own system that we
121 122 have not yet supported (such as Condor)
122 123
123 124 Using :command:`ipcluster` in mpiexec/mpirun mode
124 125 --------------------------------------------------
125 126
126 127
127 128 The mpiexec/mpirun mode is useful if you:
128 129
129 130 1. Have MPI installed.
130 131 2. Your systems are configured to use the :command:`mpiexec` or
131 132 :command:`mpirun` commands to start MPI processes.
132 133
133 134 If these are satisfied, you can create a new profile::
134 135
135 $ ipcluster create -p mpi
136 $ ipcluster create profile=mpi
136 137
137 138 and edit the file :file:`IPYTHONDIR/cluster_mpi/ipcluster_config.py`.
138 139
139 140 There, instruct ipcluster to use the MPIExec launchers by adding the lines:
140 141
141 142 .. sourcecode:: python
142 143
143 c.Global.engine_launcher = 'IPython.parallel.apps.launcher.MPIExecEngineSetLauncher'
144 c.IPClusterEnginesApp.engine_launcher = 'IPython.parallel.apps.launcher.MPIExecEngineSetLauncher'
144 145
145 146 If the default MPI configuration is correct, then you can now start your cluster, with::
146 147
147 $ ipcluster start -n 4 -p mpi
148 $ ipcluster start n=4 profile=mpi
148 149
149 150 This does the following:
150 151
151 152 1. Starts the IPython controller on current host.
152 153 2. Uses :command:`mpiexec` to start 4 engines.
153 154
154 155 If you have a reason to also start the Controller with mpi, you can specify:
155 156
156 157 .. sourcecode:: python
157 158
158 c.Global.controller_launcher = 'IPython.parallel.apps.launcher.MPIExecControllerLauncher'
159 c.IPClusterStartApp.controller_launcher = 'IPython.parallel.apps.launcher.MPIExecControllerLauncher'
159 160
160 161 .. note::
161 162
162 163 The Controller *will not* be in the same MPI universe as the engines, so there is not
163 164 much reason to do this unless sysadmins demand it.
164 165
165 166 On newer MPI implementations (such as OpenMPI), this will work even if you
166 167 don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI
167 168 implementations actually require each process to call :func:`MPI_Init` upon
168 169 starting. The easiest way of having this done is to install the mpi4py
169 170 [mpi4py]_ package and then specify the ``c.MPI.use`` option in :file:`ipengine_config.py`:
170 171
171 172 .. sourcecode:: python
172 173
173 174 c.MPI.use = 'mpi4py'
174 175
175 176 Unfortunately, even this won't work for some MPI implementations. If you are
176 177 having problems with this, you will likely have to use a custom Python
177 178 executable that itself calls :func:`MPI_Init` at the appropriate time.
178 179 Fortunately, mpi4py comes with such a custom Python executable that is easy to
179 180 install and use. However, this custom Python executable approach will not work
180 181 with :command:`ipcluster` currently.
181 182
182 183 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
183 184
184 185
185 186 Using :command:`ipcluster` in PBS mode
186 187 ---------------------------------------
187 188
188 189 The PBS mode uses the Portable Batch System [PBS]_ to start the engines.
189 190
190 191 As usual, we will start by creating a fresh profile::
191 192
192 $ ipcluster create -p pbs
193 $ ipcluster create profile=pbs
193 194
194 195 And in :file:`ipcluster_config.py`, we will select the PBS launchers for the controller
195 196 and engines:
196 197
197 198 .. sourcecode:: python
198 199
199 200 c.Global.controller_launcher = 'IPython.parallel.apps.launcher.PBSControllerLauncher'
200 201 c.Global.engine_launcher = 'IPython.parallel.apps.launcher.PBSEngineSetLauncher'
201 202
202 203 IPython does provide simple default batch templates for PBS and SGE, but you may need
203 204 to specify your own. Here is a sample PBS script template:
204 205
205 206 .. sourcecode:: bash
206 207
207 208 #PBS -N ipython
208 209 #PBS -j oe
209 210 #PBS -l walltime=00:10:00
210 211 #PBS -l nodes=${n/4}:ppn=4
211 212 #PBS -q $queue
212 213
213 214 cd $$PBS_O_WORKDIR
214 215 export PATH=$$HOME/usr/local/bin
215 216 export PYTHONPATH=$$HOME/usr/local/lib/python2.7/site-packages
216 /usr/local/bin/mpiexec -n ${n} ipengine --cluster_dir=${cluster_dir}
217 /usr/local/bin/mpiexec -n ${n} ipengine cluster_dir=${cluster_dir}
217 218
218 219 There are a few important points about this template:
219 220
220 221 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
221 222 template engine.
222 223
223 224 2. Instead of putting in the actual number of engines, use the notation
224 225 ``${n}`` to indicate the number of engines to be started. You can also uses
225 226 expressions like ``${n/4}`` in the template to indicate the number of
226 227 nodes. There will always be a ${n} and ${cluster_dir} variable passed to the template.
227 228 These allow the batch system to know how many engines, and where the configuration
228 229 files reside. The same is true for the batch queue, with the template variable ``$queue``.
229 230
230 231 3. Because ``$`` is a special character used by the template engine, you must
231 232 escape any ``$`` by using ``$$``. This is important when referring to
232 233 environment variables in the template, or in SGE, where the config lines start
233 234 with ``#$``, which will have to be ``#$$``.
234 235
235 236 4. Any options to :command:`ipengine` can be given in the batch script
236 237 template, or in :file:`ipengine_config.py`.
237 238
238 239 5. Depending on the configuration of you system, you may have to set
239 240 environment variables in the script template.
240 241
241 242 The controller template should be similar, but simpler:
242 243
243 244 .. sourcecode:: bash
244 245
245 246 #PBS -N ipython
246 247 #PBS -j oe
247 248 #PBS -l walltime=00:10:00
248 249 #PBS -l nodes=1:ppn=4
249 250 #PBS -q $queue
250 251
251 252 cd $$PBS_O_WORKDIR
252 253 export PATH=$$HOME/usr/local/bin
253 254 export PYTHONPATH=$$HOME/usr/local/lib/python2.7/site-packages
254 ipcontroller --cluster_dir=${cluster_dir}
255 ipcontroller cluster_dir=${cluster_dir}
255 256
256 257
257 258 Once you have created these scripts, save them with names like
258 259 :file:`pbs.engine.template`. Now you can load them into the :file:`ipcluster_config` with:
259 260
260 261 .. sourcecode:: python
261 262
262 263 c.PBSEngineSetLauncher.batch_template_file = "pbs.engine.template"
263 264
264 265 c.PBSControllerLauncher.batch_template_file = "pbs.controller.template"
265 266
266 267
267 268 Alternately, you can just define the templates as strings inside :file:`ipcluster_config`.
268 269
269 270 Whether you are using your own templates or our defaults, the extra configurables available are
270 271 the number of engines to launch (``$n``, and the batch system queue to which the jobs are to be
271 272 submitted (``$queue``)). These are configurables, and can be specified in
272 273 :file:`ipcluster_config`:
273 274
274 275 .. sourcecode:: python
275 276
276 277 c.PBSLauncher.queue = 'veryshort.q'
277 278 c.PBSEngineSetLauncher.n = 64
278 279
279 280 Note that assuming you are running PBS on a multi-node cluster, the Controller's default behavior
280 281 of listening only on localhost is likely too restrictive. In this case, also assuming the
281 282 nodes are safely behind a firewall, you can simply instruct the Controller to listen for
282 283 connections on all its interfaces, by adding in :file:`ipcontroller_config`:
283 284
284 285 .. sourcecode:: python
285 286
286 287 c.RegistrationFactory.ip = '*'
287 288
288 289 You can now run the cluster with::
289 290
290 $ ipcluster start -p pbs -n 128
291 $ ipcluster start profile=pbs n=128
291 292
292 293 Additional configuration options can be found in the PBS section of :file:`ipcluster_config`.
293 294
294 295 .. note::
295 296
296 297 Due to the flexibility of configuration, the PBS launchers work with simple changes
297 298 to the template for other :command:`qsub`-using systems, such as Sun Grid Engine,
298 299 and with further configuration in similar batch systems like Condor.
299 300
300 301
301 302 Using :command:`ipcluster` in SSH mode
302 303 ---------------------------------------
303 304
304 305
305 306 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
306 307 nodes and :command:`ipcontroller` can be run remotely as well, or on localhost.
307 308
308 309 .. note::
309 310
310 311 When using this mode it highly recommended that you have set up SSH keys
311 312 and are using ssh-agent [SSH]_ for password-less logins.
312 313
313 314 As usual, we start by creating a clean profile::
314 315
315 $ ipcluster create -p ssh
316 $ ipcluster create profile= ssh
316 317
317 318 To use this mode, select the SSH launchers in :file:`ipcluster_config.py`:
318 319
319 320 .. sourcecode:: python
320 321
321 322 c.Global.engine_launcher = 'IPython.parallel.apps.launcher.SSHEngineSetLauncher'
322 323 # and if the Controller is also to be remote:
323 324 c.Global.controller_launcher = 'IPython.parallel.apps.launcher.SSHControllerLauncher'
324 325
325 326
326 327 The controller's remote location and configuration can be specified:
327 328
328 329 .. sourcecode:: python
329 330
330 331 # Set the user and hostname for the controller
331 332 # c.SSHControllerLauncher.hostname = 'controller.example.com'
332 333 # c.SSHControllerLauncher.user = os.environ.get('USER','username')
333 334
334 335 # Set the arguments to be passed to ipcontroller
335 336 # note that remotely launched ipcontroller will not get the contents of
336 337 # the local ipcontroller_config.py unless it resides on the *remote host*
337 # in the location specified by the --cluster_dir argument.
338 # in the location specified by the `cluster_dir` argument.
338 339 # c.SSHControllerLauncher.program_args = ['-r', '-ip', '0.0.0.0', '--cluster_dir', '/path/to/cd']
339 340
340 341 .. note::
341 342
342 343 SSH mode does not do any file movement, so you will need to distribute configuration
343 344 files manually. To aid in this, the `reuse_files` flag defaults to True for ssh-launched
344 345 Controllers, so you will only need to do this once, unless you override this flag back
345 346 to False.
346 347
347 348 Engines are specified in a dictionary, by hostname and the number of engines to be run
348 349 on that host.
349 350
350 351 .. sourcecode:: python
351 352
352 353 c.SSHEngineSetLauncher.engines = { 'host1.example.com' : 2,
353 354 'host2.example.com' : 5,
354 'host3.example.com' : (1, ['--cluster_dir', '/home/different/location']),
355 'host3.example.com' : (1, ['cluster_dir=/home/different/location']),
355 356 'host4.example.com' : 8 }
356 357
357 358 * The `engines` dict, where the keys are the host we want to run engines on and
358 359 the value is the number of engines to run on that host.
359 360 * on host3, the value is a tuple, where the number of engines is first, and the arguments
360 361 to be passed to :command:`ipengine` are the second element.
361 362
362 363 For engines without explicitly specified arguments, the default arguments are set in
363 364 a single location:
364 365
365 366 .. sourcecode:: python
366 367
367 368 c.SSHEngineSetLauncher.engine_args = ['--cluster_dir', '/path/to/cluster_ssh']
368 369
369 370 Current limitations of the SSH mode of :command:`ipcluster` are:
370 371
371 372 * Untested on Windows. Would require a working :command:`ssh` on Windows.
372 373 Also, we are using shell scripts to setup and execute commands on remote
373 374 hosts.
374 375 * No file movement -
375 376
376 377 Using the :command:`ipcontroller` and :command:`ipengine` commands
377 378 ====================================================================
378 379
379 380 It is also possible to use the :command:`ipcontroller` and :command:`ipengine`
380 381 commands to start your controller and engines. This approach gives you full
381 382 control over all aspects of the startup process.
382 383
383 384 Starting the controller and engine on your local machine
384 385 --------------------------------------------------------
385 386
386 387 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
387 388 local machine, do the following.
388 389
389 390 First start the controller::
390 391
391 392 $ ipcontroller
392 393
393 394 Next, start however many instances of the engine you want using (repeatedly)
394 395 the command::
395 396
396 397 $ ipengine
397 398
398 399 The engines should start and automatically connect to the controller using the
399 400 JSON files in :file:`~/.ipython/cluster_default/security`. You are now ready to use the
400 401 controller and engines from IPython.
401 402
402 403 .. warning::
403 404
404 405 The order of the above operations may be important. You *must*
405 406 start the controller before the engines, unless you are reusing connection
406 407 information (via `-r`), in which case ordering is not important.
407 408
408 409 .. note::
409 410
410 411 On some platforms (OS X), to put the controller and engine into the
411 412 background you may need to give these commands in the form ``(ipcontroller
412 413 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
413 414 properly.
414 415
415 416 Starting the controller and engines on different hosts
416 417 ------------------------------------------------------
417 418
418 419 When the controller and engines are running on different hosts, things are
419 420 slightly more complicated, but the underlying ideas are the same:
420 421
421 422 1. Start the controller on a host using :command:`ipcontroller`.
422 423 2. Copy :file:`ipcontroller-engine.json` from :file:`~/.ipython/cluster_<profile>/security` on
423 424 the controller's host to the host where the engines will run.
424 425 3. Use :command:`ipengine` on the engine's hosts to start the engines.
425 426
426 427 The only thing you have to be careful of is to tell :command:`ipengine` where
427 428 the :file:`ipcontroller-engine.json` file is located. There are two ways you
428 429 can do this:
429 430
430 431 * Put :file:`ipcontroller-engine.json` in the :file:`~/.ipython/cluster_<profile>/security`
431 432 directory on the engine's host, where it will be found automatically.
432 433 * Call :command:`ipengine` with the ``--file=full_path_to_the_file``
433 434 flag.
434 435
435 436 The ``--file`` flag works like this::
436 437
437 438 $ ipengine --file=/path/to/my/ipcontroller-engine.json
438 439
439 440 .. note::
440 441
441 442 If the controller's and engine's hosts all have a shared file system
442 443 (:file:`~/.ipython/cluster_<profile>/security` is the same on all of them), then things
443 444 will just work!
444 445
445 446 Make JSON files persistent
446 447 --------------------------
447 448
448 449 At fist glance it may seem that that managing the JSON files is a bit
449 450 annoying. Going back to the house and key analogy, copying the JSON around
450 451 each time you start the controller is like having to make a new key every time
451 452 you want to unlock the door and enter your house. As with your house, you want
452 453 to be able to create the key (or JSON file) once, and then simply use it at
453 454 any point in the future.
454 455
455 To do this, the only thing you have to do is specify the `-r` flag, so that
456 To do this, the only thing you have to do is specify the `--reuse` flag, so that
456 457 the connection information in the JSON files remains accurate::
457 458
458 $ ipcontroller -r
459 $ ipcontroller --reuse
459 460
460 461 Then, just copy the JSON files over the first time and you are set. You can
461 462 start and stop the controller and engines any many times as you want in the
462 463 future, just make sure to tell the controller to reuse the file.
463 464
464 465 .. note::
465 466
466 467 You may ask the question: what ports does the controller listen on if you
467 468 don't tell is to use specific ones? The default is to use high random port
468 469 numbers. We do this for two reasons: i) to increase security through
469 470 obscurity and ii) to multiple controllers on a given host to start and
470 471 automatically use different ports.
471 472
472 473 Log files
473 474 ---------
474 475
475 476 All of the components of IPython have log files associated with them.
476 477 These log files can be extremely useful in debugging problems with
477 478 IPython and can be found in the directory :file:`~/.ipython/cluster_<profile>/log`.
478 479 Sending the log files to us will often help us to debug any problems.
479 480
480 481
481 482 Configuring `ipcontroller`
482 483 ---------------------------
483 484
484 485 Ports and addresses
485 486 *******************
486 487
487 488
488 489 Database Backend
489 490 ****************
490 491
491 492
492 493 .. seealso::
493 494
494 495
495 496
496 497 Configuring `ipengine`
497 498 -----------------------
498 499
499 500 .. note::
500 501
501 502 TODO
502 503
503 504
504 505
505 506 .. [PBS] Portable Batch System. http://www.openpbs.org/
506 507 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/ssh-agent
@@ -1,442 +1,442
1 1 .. _parallel_task:
2 2
3 3 ==========================
4 4 The IPython task interface
5 5 ==========================
6 6
7 7 The task interface to the cluster presents the engines as a fault tolerant,
8 8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 9 the task interface the user have no direct access to individual engines. By
10 10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 11 simpler and more powerful.
12 12
13 13 Best of all, the user can use both of these interfaces running at the same time
14 14 to take advantage of their respective strengths. When the user can break up
15 15 the user's work into segments that do not depend on previous execution, the
16 16 task interface is ideal. But it also has more power and flexibility, allowing
17 17 the user to guide the distribution of jobs, without having to assign tasks to
18 18 engines explicitly.
19 19
20 20 Starting the IPython controller and engines
21 21 ===========================================
22 22
23 23 To follow along with this tutorial, you will need to start the IPython
24 24 controller and four IPython engines. The simplest way of doing this is to use
25 25 the :command:`ipcluster` command::
26 26
27 $ ipcluster start -n 4
27 $ ipcluster start n=4
28 28
29 29 For more detailed information about starting the controller and engines, see
30 30 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
31 31
32 32 Creating a ``Client`` instance
33 33 ==============================
34 34
35 35 The first step is to import the IPython :mod:`IPython.parallel`
36 36 module and then create a :class:`.Client` instance, and we will also be using
37 37 a :class:`LoadBalancedView`, here called `lview`:
38 38
39 39 .. sourcecode:: ipython
40 40
41 41 In [1]: from IPython.parallel import Client
42 42
43 43 In [2]: rc = Client()
44 44
45 45
46 46 This form assumes that the controller was started on localhost with default
47 47 configuration. If not, the location of the controller must be given as an
48 48 argument to the constructor:
49 49
50 50 .. sourcecode:: ipython
51 51
52 52 # for a visible LAN controller listening on an external port:
53 53 In [2]: rc = Client('tcp://192.168.1.16:10101')
54 54 # or to connect with a specific profile you have set up:
55 55 In [3]: rc = Client(profile='mpi')
56 56
57 57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can
58 58 be constructed via the client's :meth:`load_balanced_view` method:
59 59
60 60 .. sourcecode:: ipython
61 61
62 62 In [4]: lview = rc.load_balanced_view() # default load-balanced view
63 63
64 64 .. seealso::
65 65
66 66 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
67 67
68 68
69 69 Quick and easy parallelism
70 70 ==========================
71 71
72 72 In many cases, you simply want to apply a Python function to a sequence of
73 73 objects, but *in parallel*. Like the multiengine interface, these can be
74 74 implemented via the task interface. The exact same tools can perform these
75 75 actions in load-balanced ways as well as multiplexed ways: a parallel version
76 76 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
77 77 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
78 78 execution time per item varies significantly, you should use the versions in
79 79 the task interface.
80 80
81 81 Parallel map
82 82 ------------
83 83
84 84 To load-balance :meth:`map`,simply use a LoadBalancedView:
85 85
86 86 .. sourcecode:: ipython
87 87
88 88 In [62]: lview.block = True
89 89
90 90 In [63]: serial_result = map(lambda x:x**10, range(32))
91 91
92 92 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
93 93
94 94 In [65]: serial_result==parallel_result
95 95 Out[65]: True
96 96
97 97 Parallel function decorator
98 98 ---------------------------
99 99
100 100 Parallel functions are just like normal function, but they can be called on
101 101 sequences and *in parallel*. The multiengine interface provides a decorator
102 102 that turns any Python function into a parallel function:
103 103
104 104 .. sourcecode:: ipython
105 105
106 106 In [10]: @lview.parallel()
107 107 ....: def f(x):
108 108 ....: return 10.0*x**4
109 109 ....:
110 110
111 111 In [11]: f.map(range(32)) # this is done in parallel
112 112 Out[11]: [0.0,10.0,160.0,...]
113 113
114 114 .. _parallel_dependencies:
115 115
116 116 Dependencies
117 117 ============
118 118
119 119 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
120 120 may want to associate some kind of `Dependency` that describes when, where, or whether
121 121 a task can be run. In IPython, we provide two types of dependencies:
122 122 `Functional Dependencies`_ and `Graph Dependencies`_
123 123
124 124 .. note::
125 125
126 126 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
127 127 and you will see errors or warnings if you try to use dependencies with the pure
128 128 scheduler.
129 129
130 130 Functional Dependencies
131 131 -----------------------
132 132
133 133 Functional dependencies are used to determine whether a given engine is capable of running
134 134 a particular task. This is implemented via a special :class:`Exception` class,
135 135 :class:`UnmetDependency`, found in `IPython.parallel.error`. Its use is very simple:
136 136 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
137 137 the error up to the client like any other error, catches the error, and submits the task
138 138 to a different engine. This will repeat indefinitely, and a task will never be submitted
139 139 to a given engine a second time.
140 140
141 141 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
142 142 some decorators for facilitating this behavior.
143 143
144 144 There are two decorators and a class used for functional dependencies:
145 145
146 146 .. sourcecode:: ipython
147 147
148 148 In [9]: from IPython.parallel import depend, require, dependent
149 149
150 150 @require
151 151 ********
152 152
153 153 The simplest sort of dependency is requiring that a Python module is available. The
154 154 ``@require`` decorator lets you define a function that will only run on engines where names
155 155 you specify are importable:
156 156
157 157 .. sourcecode:: ipython
158 158
159 159 In [10]: @require('numpy', 'zmq')
160 160 ...: def myfunc():
161 161 ...: return dostuff()
162 162
163 163 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
164 164 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
165 165
166 166 @depend
167 167 *******
168 168
169 169 The ``@depend`` decorator lets you decorate any function with any *other* function to
170 170 evaluate the dependency. The dependency function will be called at the start of the task,
171 171 and if it returns ``False``, then the dependency will be considered unmet, and the task
172 172 will be assigned to another engine. If the dependency returns *anything other than
173 173 ``False``*, the rest of the task will continue.
174 174
175 175 .. sourcecode:: ipython
176 176
177 177 In [10]: def platform_specific(plat):
178 178 ...: import sys
179 179 ...: return sys.platform == plat
180 180
181 181 In [11]: @depend(platform_specific, 'darwin')
182 182 ...: def mactask():
183 183 ...: do_mac_stuff()
184 184
185 185 In [12]: @depend(platform_specific, 'nt')
186 186 ...: def wintask():
187 187 ...: do_windows_stuff()
188 188
189 189 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
190 190 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
191 191 signature.
192 192
193 193 dependents
194 194 **********
195 195
196 196 You don't have to use the decorators on your tasks, if for instance you may want
197 197 to run tasks with a single function but varying dependencies, you can directly construct
198 198 the :class:`dependent` object that the decorators use:
199 199
200 200 .. sourcecode::ipython
201 201
202 202 In [13]: def mytask(*args):
203 203 ...: dostuff()
204 204
205 205 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
206 206 # this is the same as decorating the declaration of mytask with @depend
207 207 # but you can do it again:
208 208
209 209 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
210 210
211 211 # in general:
212 212 In [16]: t = dependent(f, g, *dargs, **dkwargs)
213 213
214 214 # is equivalent to:
215 215 In [17]: @depend(g, *dargs, **dkwargs)
216 216 ...: def t(a,b,c):
217 217 ...: # contents of f
218 218
219 219 Graph Dependencies
220 220 ------------------
221 221
222 222 Sometimes you want to restrict the time and/or location to run a given task as a function
223 223 of the time and/or location of other tasks. This is implemented via a subclass of
224 224 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
225 225 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
226 226 has been met.
227 227
228 228 The switches we provide for interpreting whether a given dependency set has been met:
229 229
230 230 any|all
231 231 Whether the dependency is considered met if *any* of the dependencies are done, or
232 232 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
233 233 boolean attribute, which defaults to ``True``.
234 234
235 235 success [default: True]
236 236 Whether to consider tasks that succeeded as fulfilling dependencies.
237 237
238 238 failure [default : False]
239 239 Whether to consider tasks that failed as fulfilling dependencies.
240 240 using `failure=True,success=False` is useful for setting up cleanup tasks, to be run
241 241 only when tasks have failed.
242 242
243 243 Sometimes you want to run a task after another, but only if that task succeeded. In this case,
244 244 ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may
245 245 not care whether the task succeeds, and always want the second task to run, in which case you
246 246 should use `success=failure=True`. The default behavior is to only use successes.
247 247
248 248 There are other switches for interpretation that are made at the *task* level. These are
249 249 specified via keyword arguments to the client's :meth:`apply` method.
250 250
251 251 after,follow
252 252 You may want to run a task *after* a given set of dependencies have been run and/or
253 253 run it *where* another set of dependencies are met. To support this, every task has an
254 254 `after` dependency to restrict time, and a `follow` dependency to restrict
255 255 destination.
256 256
257 257 timeout
258 258 You may also want to set a time-limit for how long the scheduler should wait before a
259 259 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
260 260 indicates that the task should never timeout. If the timeout is reached, and the
261 261 scheduler still hasn't been able to assign the task to an engine, the task will fail
262 262 with a :class:`DependencyTimeout`.
263 263
264 264 .. note::
265 265
266 266 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
267 267 task to run after a job submitted via the MUX interface.
268 268
269 269 The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases,
270 270 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
271 271 `follow` and `after` keywords to :meth:`client.apply`:
272 272
273 273 .. sourcecode:: ipython
274 274
275 275 In [14]: client.block=False
276 276
277 277 In [15]: ar = lview.apply(f, args, kwargs)
278 278
279 279 In [16]: ar2 = lview.apply(f2)
280 280
281 281 In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2])
282 282
283 283 In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5)
284 284
285 285
286 286 .. seealso::
287 287
288 288 Some parallel workloads can be described as a `Directed Acyclic Graph
289 289 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
290 290 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
291 291 onto task dependencies.
292 292
293 293
294 294
295 295
296 296 Impossible Dependencies
297 297 ***********************
298 298
299 299 The schedulers do perform some analysis on graph dependencies to determine whether they
300 300 are not possible to be met. If the scheduler does discover that a dependency cannot be
301 301 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
302 302 scheduler realized that a task can never be run, it won't sit indefinitely in the
303 303 scheduler clogging the pipeline.
304 304
305 305 The basic cases that are checked:
306 306
307 307 * depending on nonexistent messages
308 308 * `follow` dependencies were run on more than one machine and `all=True`
309 309 * any dependencies failed and `all=True,success=True,failures=False`
310 310 * all dependencies failed and `all=False,success=True,failure=False`
311 311
312 312 .. warning::
313 313
314 314 This analysis has not been proven to be rigorous, so it is likely possible for tasks
315 315 to become impossible to run in obscure situations, so a timeout may be a good choice.
316 316
317 317
318 318 Retries and Resubmit
319 319 ====================
320 320
321 321 Retries
322 322 -------
323 323
324 324 Another flag for tasks is `retries`. This is an integer, specifying how many times
325 325 a task should be resubmitted after failure. This is useful for tasks that should still run
326 326 if their engine was shutdown, or may have some statistical chance of failing. The default
327 327 is to not retry tasks.
328 328
329 329 Resubmit
330 330 --------
331 331
332 332 Sometimes you may want to re-run a task. This could be because it failed for some reason, and
333 333 you have fixed the error, or because you want to restore the cluster to an interrupted state.
334 334 For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more
335 335 msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit
336 336 a task that is pending - only those that have finished, either successful or unsuccessful.
337 337
338 338 .. _parallel_schedulers:
339 339
340 340 Schedulers
341 341 ==========
342 342
343 343 There are a variety of valid ways to determine where jobs should be assigned in a
344 344 load-balancing situation. In IPython, we support several standard schemes, and
345 even make it easy to define your own. The scheme can be selected via the ``--scheme``
346 argument to :command:`ipcontroller`, or in the :attr:`HubFactory.scheme` attribute
345 even make it easy to define your own. The scheme can be selected via the ``scheme``
346 argument to :command:`ipcontroller`, or in the :attr:`TaskScheduler.schemename` attribute
347 347 of a controller config object.
348 348
349 349 The built-in routing schemes:
350 350
351 351 To select one of these schemes, simply do::
352 352
353 $ ipcontroller --scheme <schemename>
353 $ ipcontroller scheme=<schemename>
354 354 for instance:
355 $ ipcontroller --scheme lru
355 $ ipcontroller scheme=lru
356 356
357 357 lru: Least Recently Used
358 358
359 359 Always assign work to the least-recently-used engine. A close relative of
360 360 round-robin, it will be fair with respect to the number of tasks, agnostic
361 361 with respect to runtime of each task.
362 362
363 363 plainrandom: Plain Random
364 364
365 365 Randomly picks an engine on which to run.
366 366
367 367 twobin: Two-Bin Random
368 368
369 369 **Requires numpy**
370 370
371 371 Pick two engines at random, and use the LRU of the two. This is known to be better
372 372 than plain random in many cases, but requires a small amount of computation.
373 373
374 374 leastload: Least Load
375 375
376 376 **This is the default scheme**
377 377
378 378 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
379 379
380 380 weighted: Weighted Two-Bin Random
381 381
382 382 **Requires numpy**
383 383
384 384 Pick two engines at random using the number of outstanding tasks as inverse weights,
385 385 and use the one with the lower load.
386 386
387 387
388 388 Pure ZMQ Scheduler
389 389 ------------------
390 390
391 391 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
392 392 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
393 393 load-balancing. This scheduler does not support any of the advanced features of the Python
394 394 :class:`.Scheduler`.
395 395
396 396 Disabled features when using the ZMQ Scheduler:
397 397
398 398 * Engine unregistration
399 399 Task farming will be disabled if an engine unregisters.
400 400 Further, if an engine is unregistered during computation, the scheduler may not recover.
401 401 * Dependencies
402 402 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
403 403 based on message content.
404 404 * Early destination notification
405 405 The Python schedulers know which engine gets which task, and notify the Hub. This
406 406 allows graceful handling of Engines coming and going. There is no way to know
407 407 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
408 408 engine until they *finish*. This makes recovery from engine shutdown very difficult.
409 409
410 410
411 411 .. note::
412 412
413 413 TODO: performance comparisons
414 414
415 415
416 416
417 417
418 418 More details
419 419 ============
420 420
421 421 The :class:`LoadBalancedView` has many more powerful features that allow quite a bit
422 422 of flexibility in how tasks are defined and run. The next places to look are
423 423 in the following classes:
424 424
425 425 * :class:`~IPython.parallel.client.view.LoadBalancedView`
426 426 * :class:`~IPython.parallel.client.asyncresult.AsyncResult`
427 427 * :meth:`~IPython.parallel.client.view.LoadBalancedView.apply`
428 428 * :mod:`~IPython.parallel.controller.dependency`
429 429
430 430 The following is an overview of how to use these classes together:
431 431
432 432 1. Create a :class:`Client` and :class:`LoadBalancedView`
433 433 2. Define some functions to be run as tasks
434 434 3. Submit your tasks to using the :meth:`apply` method of your
435 435 :class:`LoadBalancedView` instance.
436 436 4. Use :meth:`Client.get_result` to get the results of the
437 437 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
438 438 for and then receive the results.
439 439
440 440 .. seealso::
441 441
442 442 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
@@ -1,334 +1,334
1 1 ============================================
2 2 Getting started with Windows HPC Server 2008
3 3 ============================================
4 4
5 5 .. note::
6 6
7 7 Not adapted to zmq yet
8 8
9 9 Introduction
10 10 ============
11 11
12 12 The Python programming language is an increasingly popular language for
13 13 numerical computing. This is due to a unique combination of factors. First,
14 14 Python is a high-level and *interactive* language that is well matched to
15 15 interactive numerical work. Second, it is easy (often times trivial) to
16 16 integrate legacy C/C++/Fortran code into Python. Third, a large number of
17 17 high-quality open source projects provide all the needed building blocks for
18 18 numerical computing: numerical arrays (NumPy), algorithms (SciPy), 2D/3D
19 19 Visualization (Matplotlib, Mayavi, Chaco), Symbolic Mathematics (Sage, Sympy)
20 20 and others.
21 21
22 22 The IPython project is a core part of this open-source toolchain and is
23 23 focused on creating a comprehensive environment for interactive and
24 24 exploratory computing in the Python programming language. It enables all of
25 25 the above tools to be used interactively and consists of two main components:
26 26
27 27 * An enhanced interactive Python shell with support for interactive plotting
28 28 and visualization.
29 29 * An architecture for interactive parallel computing.
30 30
31 31 With these components, it is possible to perform all aspects of a parallel
32 32 computation interactively. This type of workflow is particularly relevant in
33 33 scientific and numerical computing where algorithms, code and data are
34 34 continually evolving as the user/developer explores a problem. The broad
35 35 treads in computing (commodity clusters, multicore, cloud computing, etc.)
36 36 make these capabilities of IPython particularly relevant.
37 37
38 38 While IPython is a cross platform tool, it has particularly strong support for
39 39 Windows based compute clusters running Windows HPC Server 2008. This document
40 40 describes how to get started with IPython on Windows HPC Server 2008. The
41 41 content and emphasis here is practical: installing IPython, configuring
42 42 IPython to use the Windows job scheduler and running example parallel programs
43 43 interactively. A more complete description of IPython's parallel computing
44 44 capabilities can be found in IPython's online documentation
45 45 (http://ipython.scipy.org/moin/Documentation).
46 46
47 47 Setting up your Windows cluster
48 48 ===============================
49 49
50 50 This document assumes that you already have a cluster running Windows
51 51 HPC Server 2008. Here is a broad overview of what is involved with setting up
52 52 such a cluster:
53 53
54 54 1. Install Windows Server 2008 on the head and compute nodes in the cluster.
55 55 2. Setup the network configuration on each host. Each host should have a
56 56 static IP address.
57 57 3. On the head node, activate the "Active Directory Domain Services" role
58 58 and make the head node the domain controller.
59 59 4. Join the compute nodes to the newly created Active Directory (AD) domain.
60 60 5. Setup user accounts in the domain with shared home directories.
61 61 6. Install the HPC Pack 2008 on the head node to create a cluster.
62 62 7. Install the HPC Pack 2008 on the compute nodes.
63 63
64 64 More details about installing and configuring Windows HPC Server 2008 can be
65 65 found on the Windows HPC Home Page (http://www.microsoft.com/hpc). Regardless
66 66 of what steps you follow to set up your cluster, the remainder of this
67 67 document will assume that:
68 68
69 69 * There are domain users that can log on to the AD domain and submit jobs
70 70 to the cluster scheduler.
71 71 * These domain users have shared home directories. While shared home
72 72 directories are not required to use IPython, they make it much easier to
73 73 use IPython.
74 74
75 75 Installation of IPython and its dependencies
76 76 ============================================
77 77
78 78 IPython and all of its dependencies are freely available and open source.
79 79 These packages provide a powerful and cost-effective approach to numerical and
80 80 scientific computing on Windows. The following dependencies are needed to run
81 81 IPython on Windows:
82 82
83 83 * Python 2.6 or 2.7 (http://www.python.org)
84 84 * pywin32 (http://sourceforge.net/projects/pywin32/)
85 85 * PyReadline (https://launchpad.net/pyreadline)
86 86 * pyzmq (http://github.com/zeromq/pyzmq/downloads)
87 87 * IPython (http://ipython.scipy.org)
88 88
89 89 In addition, the following dependencies are needed to run the demos described
90 90 in this document.
91 91
92 92 * NumPy and SciPy (http://www.scipy.org)
93 93 * Matplotlib (http://matplotlib.sourceforge.net/)
94 94
95 95 The easiest way of obtaining these dependencies is through the Enthought
96 96 Python Distribution (EPD) (http://www.enthought.com/products/epd.php). EPD is
97 97 produced by Enthought, Inc. and contains all of these packages and others in a
98 98 single installer and is available free for academic users. While it is also
99 99 possible to download and install each package individually, this is a tedious
100 100 process. Thus, we highly recommend using EPD to install these packages on
101 101 Windows.
102 102
103 103 Regardless of how you install the dependencies, here are the steps you will
104 104 need to follow:
105 105
106 106 1. Install all of the packages listed above, either individually or using EPD
107 107 on the head node, compute nodes and user workstations.
108 108
109 109 2. Make sure that :file:`C:\\Python27` and :file:`C:\\Python27\\Scripts` are
110 110 in the system :envvar:`%PATH%` variable on each node.
111 111
112 112 3. Install the latest development version of IPython. This can be done by
113 113 downloading the the development version from the IPython website
114 114 (http://ipython.scipy.org) and following the installation instructions.
115 115
116 116 Further details about installing IPython or its dependencies can be found in
117 117 the online IPython documentation (http://ipython.scipy.org/moin/Documentation)
118 118 Once you are finished with the installation, you can try IPython out by
119 119 opening a Windows Command Prompt and typing ``ipython``. This will
120 120 start IPython's interactive shell and you should see something like the
121 121 following screenshot:
122 122
123 123 .. image:: ipython_shell.*
124 124
125 125 Starting an IPython cluster
126 126 ===========================
127 127
128 128 To use IPython's parallel computing capabilities, you will need to start an
129 129 IPython cluster. An IPython cluster consists of one controller and multiple
130 130 engines:
131 131
132 132 IPython controller
133 133 The IPython controller manages the engines and acts as a gateway between
134 134 the engines and the client, which runs in the user's interactive IPython
135 135 session. The controller is started using the :command:`ipcontroller`
136 136 command.
137 137
138 138 IPython engine
139 139 IPython engines run a user's Python code in parallel on the compute nodes.
140 140 Engines are starting using the :command:`ipengine` command.
141 141
142 142 Once these processes are started, a user can run Python code interactively and
143 143 in parallel on the engines from within the IPython shell using an appropriate
144 144 client. This includes the ability to interact with, plot and visualize data
145 145 from the engines.
146 146
147 147 IPython has a command line program called :command:`ipcluster` that automates
148 148 all aspects of starting the controller and engines on the compute nodes.
149 149 :command:`ipcluster` has full support for the Windows HPC job scheduler,
150 150 meaning that :command:`ipcluster` can use this job scheduler to start the
151 151 controller and engines. In our experience, the Windows HPC job scheduler is
152 152 particularly well suited for interactive applications, such as IPython. Once
153 153 :command:`ipcluster` is configured properly, a user can start an IPython
154 154 cluster from their local workstation almost instantly, without having to log
155 155 on to the head node (as is typically required by Unix based job schedulers).
156 156 This enables a user to move seamlessly between serial and parallel
157 157 computations.
158 158
159 159 In this section we show how to use :command:`ipcluster` to start an IPython
160 160 cluster using the Windows HPC Server 2008 job scheduler. To make sure that
161 161 :command:`ipcluster` is installed and working properly, you should first try
162 162 to start an IPython cluster on your local host. To do this, open a Windows
163 163 Command Prompt and type the following command::
164 164
165 ipcluster start -n 2
165 ipcluster start n=2
166 166
167 167 You should see a number of messages printed to the screen, ending with
168 168 "IPython cluster: started". The result should look something like the following
169 169 screenshot:
170 170
171 171 .. image:: ipcluster_start.*
172 172
173 173 At this point, the controller and two engines are running on your local host.
174 174 This configuration is useful for testing and for situations where you want to
175 175 take advantage of multiple cores on your local computer.
176 176
177 177 Now that we have confirmed that :command:`ipcluster` is working properly, we
178 178 describe how to configure and run an IPython cluster on an actual compute
179 179 cluster running Windows HPC Server 2008. Here is an outline of the needed
180 180 steps:
181 181
182 1. Create a cluster profile using: ``ipcluster create -p mycluster``
182 1. Create a cluster profile using: ``ipcluster create profile=mycluster``
183 183
184 184 2. Edit configuration files in the directory :file:`.ipython\\cluster_mycluster`
185 185
186 3. Start the cluster using: ``ipcluser start -p mycluster -n 32``
186 3. Start the cluster using: ``ipcluser start profile=mycluster n=32``
187 187
188 188 Creating a cluster profile
189 189 --------------------------
190 190
191 191 In most cases, you will have to create a cluster profile to use IPython on a
192 192 cluster. A cluster profile is a name (like "mycluster") that is associated
193 193 with a particular cluster configuration. The profile name is used by
194 194 :command:`ipcluster` when working with the cluster.
195 195
196 196 Associated with each cluster profile is a cluster directory. This cluster
197 197 directory is a specially named directory (typically located in the
198 198 :file:`.ipython` subdirectory of your home directory) that contains the
199 199 configuration files for a particular cluster profile, as well as log files and
200 200 security keys. The naming convention for cluster directories is:
201 201 :file:`cluster_<profile name>`. Thus, the cluster directory for a profile named
202 202 "foo" would be :file:`.ipython\\cluster_foo`.
203 203
204 204 To create a new cluster profile (named "mycluster") and the associated cluster
205 205 directory, type the following command at the Windows Command Prompt::
206 206
207 ipcluster create -p mycluster
207 ipcluster create profile=mycluster
208 208
209 209 The output of this command is shown in the screenshot below. Notice how
210 210 :command:`ipcluster` prints out the location of the newly created cluster
211 211 directory.
212 212
213 213 .. image:: ipcluster_create.*
214 214
215 215 Configuring a cluster profile
216 216 -----------------------------
217 217
218 218 Next, you will need to configure the newly created cluster profile by editing
219 219 the following configuration files in the cluster directory:
220 220
221 221 * :file:`ipcluster_config.py`
222 222 * :file:`ipcontroller_config.py`
223 223 * :file:`ipengine_config.py`
224 224
225 225 When :command:`ipcluster` is run, these configuration files are used to
226 226 determine how the engines and controller will be started. In most cases,
227 227 you will only have to set a few of the attributes in these files.
228 228
229 229 To configure :command:`ipcluster` to use the Windows HPC job scheduler, you
230 230 will need to edit the following attributes in the file
231 231 :file:`ipcluster_config.py`::
232 232
233 233 # Set these at the top of the file to tell ipcluster to use the
234 234 # Windows HPC job scheduler.
235 235 c.Global.controller_launcher = \
236 236 'IPython.parallel.apps.launcher.WindowsHPCControllerLauncher'
237 237 c.Global.engine_launcher = \
238 238 'IPython.parallel.apps.launcher.WindowsHPCEngineSetLauncher'
239 239
240 240 # Set these to the host name of the scheduler (head node) of your cluster.
241 241 c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
242 242 c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
243 243
244 244 There are a number of other configuration attributes that can be set, but
245 245 in most cases these will be sufficient to get you started.
246 246
247 247 .. warning::
248 248 If any of your configuration attributes involve specifying the location
249 249 of shared directories or files, you must make sure that you use UNC paths
250 250 like :file:`\\\\host\\share`. It is also important that you specify
251 251 these paths using raw Python strings: ``r'\\host\share'`` to make sure
252 252 that the backslashes are properly escaped.
253 253
254 254 Starting the cluster profile
255 255 ----------------------------
256 256
257 257 Once a cluster profile has been configured, starting an IPython cluster using
258 258 the profile is simple::
259 259
260 ipcluster start -p mycluster -n 32
260 ipcluster start profile=mycluster n=32
261 261
262 262 The ``-n`` option tells :command:`ipcluster` how many engines to start (in
263 263 this case 32). Stopping the cluster is as simple as typing Control-C.
264 264
265 265 Using the HPC Job Manager
266 266 -------------------------
267 267
268 268 When ``ipcluster start`` is run the first time, :command:`ipcluster` creates
269 269 two XML job description files in the cluster directory:
270 270
271 271 * :file:`ipcontroller_job.xml`
272 272 * :file:`ipengineset_job.xml`
273 273
274 274 Once these files have been created, they can be imported into the HPC Job
275 275 Manager application. Then, the controller and engines for that profile can be
276 276 started using the HPC Job Manager directly, without using :command:`ipcluster`.
277 277 However, anytime the cluster profile is re-configured, ``ipcluster start``
278 278 must be run again to regenerate the XML job description files. The
279 279 following screenshot shows what the HPC Job Manager interface looks like
280 280 with a running IPython cluster.
281 281
282 282 .. image:: hpc_job_manager.*
283 283
284 284 Performing a simple interactive parallel computation
285 285 ====================================================
286 286
287 287 Once you have started your IPython cluster, you can start to use it. To do
288 288 this, open up a new Windows Command Prompt and start up IPython's interactive
289 289 shell by typing::
290 290
291 291 ipython
292 292
293 293 Then you can create a :class:`MultiEngineClient` instance for your profile and
294 294 use the resulting instance to do a simple interactive parallel computation. In
295 295 the code and screenshot that follows, we take a simple Python function and
296 296 apply it to each element of an array of integers in parallel using the
297 297 :meth:`MultiEngineClient.map` method:
298 298
299 299 .. sourcecode:: ipython
300 300
301 301 In [1]: from IPython.parallel import *
302 302
303 303 In [2]: c = MultiEngineClient(profile='mycluster')
304 304
305 305 In [3]: mec.get_ids()
306 306 Out[3]: [0, 1, 2, 3, 4, 5, 67, 8, 9, 10, 11, 12, 13, 14]
307 307
308 308 In [4]: def f(x):
309 309 ...: return x**10
310 310
311 311 In [5]: mec.map(f, range(15)) # f is applied in parallel
312 312 Out[5]:
313 313 [0,
314 314 1,
315 315 1024,
316 316 59049,
317 317 1048576,
318 318 9765625,
319 319 60466176,
320 320 282475249,
321 321 1073741824,
322 322 3486784401L,
323 323 10000000000L,
324 324 25937424601L,
325 325 61917364224L,
326 326 137858491849L,
327 327 289254654976L]
328 328
329 329 The :meth:`map` method has the same signature as Python's builtin :func:`map`
330 330 function, but runs the calculation in parallel. More involved examples of using
331 331 :class:`MultiEngineClient` are provided in the examples that follow.
332 332
333 333 .. image:: mec_simple.*
334 334
General Comments 0
You need to be logged in to leave comments. Login now