##// END OF EJS Templates
all ipcluster scripts in some degree of working order with new config
MinRK -
Show More
@@ -25,16 +25,18 b' import sys'
25
25
26 from subprocess import Popen, PIPE
26 from subprocess import Popen, PIPE
27
27
28 from IPython.config.loader import PyFileConfigLoader
28 from IPython.config.loader import PyFileConfigLoader, Config
29 from IPython.config.configurable import Configurable
29 from IPython.config.configurable import Configurable
30 from IPython.core.application import Application, BaseAppConfigLoader
30 from IPython.config.application import Application
31 from IPython.core.crashhandler import CrashHandler
31 from IPython.core.crashhandler import CrashHandler
32 from IPython.core.newapplication import BaseIPythonApplication
32 from IPython.core import release
33 from IPython.core import release
33 from IPython.utils.path import (
34 from IPython.utils.path import (
34 get_ipython_package_dir,
35 get_ipython_package_dir,
36 get_ipython_dir,
35 expand_path
37 expand_path
36 )
38 )
37 from IPython.utils.traitlets import Unicode
39 from IPython.utils.traitlets import Unicode, Bool, CStr, Instance, Dict
38
40
39 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
40 # Module errors
42 # Module errors
@@ -69,19 +71,45 b' class ClusterDir(Configurable):'
69 security_dir = Unicode(u'')
71 security_dir = Unicode(u'')
70 log_dir = Unicode(u'')
72 log_dir = Unicode(u'')
71 pid_dir = Unicode(u'')
73 pid_dir = Unicode(u'')
72 location = Unicode(u'')
73
74
74 def __init__(self, location=u''):
75 location = Unicode(u'', config=True,
75 super(ClusterDir, self).__init__(location=location)
76 help="""Set the cluster dir. This overrides the logic used by the
77 `profile` option.""",
78 )
79 profile = Unicode(u'default',
80 help="""The string name of the profile to be used. This determines the name
81 of the cluster dir as: cluster_<profile>. The default profile is named
82 'default'. The cluster directory is resolve this way if the
83 `cluster_dir` option is not used.""", config=True
84 )
85
86 _location_isset = Bool(False) # flag for detecting multiply set location
87 _new_dir = Bool(False) # flag for whether a new dir was created
88
89 def __init__(self, **kwargs):
90 super(ClusterDir, self).__init__(**kwargs)
91 if not self.location:
92 self._profile_changed('profile', 'default', self.profile)
76
93
77 def _location_changed(self, name, old, new):
94 def _location_changed(self, name, old, new):
95 if self._location_isset:
96 raise RuntimeError("Cannot set ClusterDir more than once.")
97 self._location_isset = True
78 if not os.path.isdir(new):
98 if not os.path.isdir(new):
79 os.makedirs(new)
99 os.makedirs(new)
100 self._new_dir = True
101 # ensure config files exist:
102 self.copy_all_config_files(overwrite=False)
80 self.security_dir = os.path.join(new, self.security_dir_name)
103 self.security_dir = os.path.join(new, self.security_dir_name)
81 self.log_dir = os.path.join(new, self.log_dir_name)
104 self.log_dir = os.path.join(new, self.log_dir_name)
82 self.pid_dir = os.path.join(new, self.pid_dir_name)
105 self.pid_dir = os.path.join(new, self.pid_dir_name)
83 self.check_dirs()
106 self.check_dirs()
84
107
108 def _profile_changed(self, name, old, new):
109 if self._location_isset:
110 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
111 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
112
85 def _log_dir_changed(self, name, old, new):
113 def _log_dir_changed(self, name, old, new):
86 self.check_log_dir()
114 self.check_log_dir()
87
115
@@ -110,18 +138,6 b' class ClusterDir(Configurable):'
110 self.check_log_dir()
138 self.check_log_dir()
111 self.check_pid_dir()
139 self.check_pid_dir()
112
140
113 def load_config_file(self, filename):
114 """Load a config file from the top level of the cluster dir.
115
116 Parameters
117 ----------
118 filename : unicode or str
119 The filename only of the config file that must be located in
120 the top-level of the cluster directory.
121 """
122 loader = PyFileConfigLoader(filename, self.location)
123 return loader.load_config()
124
125 def copy_config_file(self, config_file, path=None, overwrite=False):
141 def copy_config_file(self, config_file, path=None, overwrite=False):
126 """Copy a default config file into the active cluster directory.
142 """Copy a default config file into the active cluster directory.
127
143
@@ -227,59 +243,6 b' class ClusterDir(Configurable):'
227
243
228
244
229 #-----------------------------------------------------------------------------
245 #-----------------------------------------------------------------------------
230 # Command line options
231 #-----------------------------------------------------------------------------
232
233 class ClusterDirConfigLoader(BaseAppConfigLoader):
234
235 def _add_cluster_profile(self, parser):
236 paa = parser.add_argument
237 paa('-p', '--profile',
238 dest='Global.profile',type=unicode,
239 help=
240 """The string name of the profile to be used. This determines the name
241 of the cluster dir as: cluster_<profile>. The default profile is named
242 'default'. The cluster directory is resolve this way if the
243 --cluster-dir option is not used.""",
244 metavar='Global.profile')
245
246 def _add_cluster_dir(self, parser):
247 paa = parser.add_argument
248 paa('--cluster-dir',
249 dest='Global.cluster_dir',type=unicode,
250 help="""Set the cluster dir. This overrides the logic used by the
251 --profile option.""",
252 metavar='Global.cluster_dir')
253
254 def _add_work_dir(self, parser):
255 paa = parser.add_argument
256 paa('--work-dir',
257 dest='Global.work_dir',type=unicode,
258 help='Set the working dir for the process.',
259 metavar='Global.work_dir')
260
261 def _add_clean_logs(self, parser):
262 paa = parser.add_argument
263 paa('--clean-logs',
264 dest='Global.clean_logs', action='store_true',
265 help='Delete old log flies before starting.')
266
267 def _add_no_clean_logs(self, parser):
268 paa = parser.add_argument
269 paa('--no-clean-logs',
270 dest='Global.clean_logs', action='store_false',
271 help="Don't Delete old log flies before starting.")
272
273 def _add_arguments(self):
274 super(ClusterDirConfigLoader, self)._add_arguments()
275 self._add_cluster_profile(self.parser)
276 self._add_cluster_dir(self.parser)
277 self._add_work_dir(self.parser)
278 self._add_clean_logs(self.parser)
279 self._add_no_clean_logs(self.parser)
280
281
282 #-----------------------------------------------------------------------------
283 # Crash handler for this application
246 # Crash handler for this application
284 #-----------------------------------------------------------------------------
247 #-----------------------------------------------------------------------------
285
248
@@ -312,8 +275,8 b' class ClusterDirCrashHandler(CrashHandler):'
312 message_template = _message_template
275 message_template = _message_template
313
276
314 def __init__(self, app):
277 def __init__(self, app):
315 contact_name = release.authors['Brian'][0]
278 contact_name = release.authors['Min'][0]
316 contact_email = release.authors['Brian'][1]
279 contact_email = release.authors['Min'][1]
317 bug_tracker = 'http://github.com/ipython/ipython/issues'
280 bug_tracker = 'http://github.com/ipython/ipython/issues'
318 super(ClusterDirCrashHandler,self).__init__(
281 super(ClusterDirCrashHandler,self).__init__(
319 app, contact_name, contact_email, bug_tracker
282 app, contact_name, contact_email, bug_tracker
@@ -323,8 +286,25 b' class ClusterDirCrashHandler(CrashHandler):'
323 #-----------------------------------------------------------------------------
286 #-----------------------------------------------------------------------------
324 # Main application
287 # Main application
325 #-----------------------------------------------------------------------------
288 #-----------------------------------------------------------------------------
326
289 base_aliases = {
327 class ApplicationWithClusterDir(Application):
290 'profile' : "ClusterDir.profile",
291 'cluster_dir' : 'ClusterDir.location',
292 'log_level' : 'Application.log_level',
293 'work_dir' : 'ClusterDirApplicaiton.work_dir',
294 'log_to_file' : 'ClusterDirApplicaiton.log_to_file',
295 'clean_logs' : 'ClusterDirApplicaiton.clean_logs',
296 'log_url' : 'ClusterDirApplicaiton.log_url',
297 }
298
299 base_flags = {
300 'debug' : ( {"Application" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
301 'clean-logs' : ( {"ClusterDirApplication" : {"clean_logs" : True}}, "cleanup old logfiles"),
302 'log-to-file' : ( {"ClusterDirApplication" : {"log_to_file" : True}}, "log to a file")
303 }
304 for k,v in base_flags.iteritems():
305 base_flags[k] = (Config(v[0]),v[1])
306
307 class ClusterDirApplication(BaseIPythonApplication):
328 """An application that puts everything into a cluster directory.
308 """An application that puts everything into a cluster directory.
329
309
330 Instead of looking for things in the ipython_dir, this type of application
310 Instead of looking for things in the ipython_dir, this type of application
@@ -343,22 +323,37 b' class ApplicationWithClusterDir(Application):'
343 dir and named the value of the ``config_file_name`` class attribute.
323 dir and named the value of the ``config_file_name`` class attribute.
344 """
324 """
345
325
346 command_line_loader = ClusterDirConfigLoader
347 crash_handler_class = ClusterDirCrashHandler
326 crash_handler_class = ClusterDirCrashHandler
348 auto_create_cluster_dir = True
327 auto_create_cluster_dir = Bool(True, config=True,
328 help="whether to create the cluster_dir if it doesn't exist")
349 # temporarily override default_log_level to INFO
329 # temporarily override default_log_level to INFO
350 default_log_level = logging.INFO
330 default_log_level = logging.INFO
331 cluster_dir = Instance(ClusterDir)
332
333 work_dir = Unicode(os.getcwdu(), config=True,
334 help='Set the working dir for the process.'
335 )
336 def _work_dir_changed(self, name, old, new):
337 self.work_dir = unicode(expand_path(new))
338
339 log_to_file = Bool(config=True,
340 help="whether to log to a file")
341
342 clean_logs = Bool(True, shortname='--clean-logs', config=True,
343 help="whether to cleanup old logfiles before starting")
351
344
352 def create_default_config(self):
345 log_url = CStr('', shortname='--log-url', config=True,
353 super(ApplicationWithClusterDir, self).create_default_config()
346 help="The ZMQ URL of the iplooger to aggregate logging.")
354 self.default_config.Global.profile = u'default'
355 self.default_config.Global.cluster_dir = u''
356 self.default_config.Global.work_dir = os.getcwd()
357 self.default_config.Global.log_to_file = False
358 self.default_config.Global.log_url = None
359 self.default_config.Global.clean_logs = False
360
347
361 def find_resources(self):
348 config_file = Unicode(u'', config=True,
349 help="""Path to ipcontroller configuration file. The default is to use
350 <appname>_config.py, as found by cluster-dir."""
351 )
352
353 aliases = Dict(base_aliases)
354 flags = Dict(base_flags)
355
356 def init_clusterdir(self):
362 """This resolves the cluster directory.
357 """This resolves the cluster directory.
363
358
364 This tries to find the cluster directory and if successful, it will
359 This tries to find the cluster directory and if successful, it will
@@ -375,121 +370,53 b' class ApplicationWithClusterDir(Application):'
375 ``True``, then create the new cluster dir in the IPython directory.
370 ``True``, then create the new cluster dir in the IPython directory.
376 4. If all fails, then raise :class:`ClusterDirError`.
371 4. If all fails, then raise :class:`ClusterDirError`.
377 """
372 """
378
373 self.cluster_dir = ClusterDir(config=self.config)
379 try:
374 if self.cluster_dir._new_dir:
380 cluster_dir = self.command_line_config.Global.cluster_dir
381 except AttributeError:
382 cluster_dir = self.default_config.Global.cluster_dir
383 cluster_dir = expand_path(cluster_dir)
384 try:
385 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
386 except ClusterDirError:
387 pass
388 else:
389 self.log.info('Using existing cluster dir: %s' % \
390 self.cluster_dir_obj.location
391 )
392 self.finish_cluster_dir()
393 return
394
395 try:
396 self.profile = self.command_line_config.Global.profile
397 except AttributeError:
398 self.profile = self.default_config.Global.profile
399 try:
400 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
401 self.ipython_dir, self.profile)
402 except ClusterDirError:
403 pass
404 else:
405 self.log.info('Using existing cluster dir: %s' % \
406 self.cluster_dir_obj.location
407 )
408 self.finish_cluster_dir()
409 return
410
411 if self.auto_create_cluster_dir:
412 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
413 self.ipython_dir, self.profile
414 )
415 self.log.info('Creating new cluster dir: %s' % \
375 self.log.info('Creating new cluster dir: %s' % \
416 self.cluster_dir_obj.location
376 self.cluster_dir.location)
417 )
418 self.finish_cluster_dir()
419 else:
377 else:
420 raise ClusterDirError('Could not find a valid cluster directory.')
378 self.log.info('Using existing cluster dir: %s' % \
421
379 self.cluster_dir.location)
422 def finish_cluster_dir(self):
423 # Set the cluster directory
424 self.cluster_dir = self.cluster_dir_obj.location
425
426 # These have to be set because they could be different from the one
427 # that we just computed. Because command line has the highest
428 # priority, this will always end up in the master_config.
429 self.default_config.Global.cluster_dir = self.cluster_dir
430 self.command_line_config.Global.cluster_dir = self.cluster_dir
431
432 def find_config_file_name(self):
433 """Find the config file name for this application."""
434 # For this type of Application it should be set as a class attribute.
435 if not hasattr(self, 'default_config_file_name'):
436 self.log.critical("No config filename found")
437 else:
438 self.config_file_name = self.default_config_file_name
439
440 def find_config_file_paths(self):
441 # Set the search path to to the cluster directory. We should NOT
442 # include IPython.config.default here as the default config files
443 # are ALWAYS automatically moved to the cluster directory.
444 conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default')
445 self.config_file_paths = (self.cluster_dir,)
446
447 def pre_construct(self):
448 # The log and security dirs were set earlier, but here we put them
449 # into the config and log them.
450 config = self.master_config
451 sdir = self.cluster_dir_obj.security_dir
452 self.security_dir = config.Global.security_dir = sdir
453 ldir = self.cluster_dir_obj.log_dir
454 self.log_dir = config.Global.log_dir = ldir
455 pdir = self.cluster_dir_obj.pid_dir
456 self.pid_dir = config.Global.pid_dir = pdir
457 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
458 config.Global.work_dir = unicode(expand_path(config.Global.work_dir))
459 # Change to the working directory. We do this just before construct
460 # is called so all the components there have the right working dir.
461 self.to_work_dir()
462
380
463 def to_work_dir(self):
381 def to_work_dir(self):
464 wd = self.master_config.Global.work_dir
382 wd = self.work_dir
465 if unicode(wd) != unicode(os.getcwd()):
383 if unicode(wd) != os.getcwdu():
466 os.chdir(wd)
384 os.chdir(wd)
467 self.log.info("Changing to working dir: %s" % wd)
385 self.log.info("Changing to working dir: %s" % wd)
468
386
469 def start_logging(self):
387 def load_config_file(self, filename, path=None):
470 # Remove old log files
388 """Load a .py based config file by filename and path."""
471 if self.master_config.Global.clean_logs:
389 return Application.load_config_file(self, filename, path=path)
472 log_dir = self.master_config.Global.log_dir
390 #
473 for f in os.listdir(log_dir):
391 # def load_default_config_file(self):
474 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
392 # """Load a .py based config file by filename and path."""
475 # if f.startswith(self.name + u'-') and f.endswith('.log'):
393 # return BaseIPythonApplication.load_config_file(self)
476 os.remove(os.path.join(log_dir, f))
394
477 # Start logging to the new log file
395 # disable URL-logging
478 if self.master_config.Global.log_to_file:
396 # def init_logging(self):
479 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
397 # # Remove old log files
480 logfile = os.path.join(self.log_dir, log_filename)
398 # if self.master_config.Global.clean_logs:
481 open_log_file = open(logfile, 'w')
399 # log_dir = self.master_config.Global.log_dir
482 elif self.master_config.Global.log_url:
400 # for f in os.listdir(log_dir):
483 open_log_file = None
401 # if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
484 else:
402 # # if f.startswith(self.name + u'-') and f.endswith('.log'):
485 open_log_file = sys.stdout
403 # os.remove(os.path.join(log_dir, f))
486 if open_log_file is not None:
404 # # Start logging to the new log file
487 self.log.removeHandler(self._log_handler)
405 # if self.master_config.Global.log_to_file:
488 self._log_handler = logging.StreamHandler(open_log_file)
406 # log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
489 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
407 # logfile = os.path.join(self.log_dir, log_filename)
490 self._log_handler.setFormatter(self._log_formatter)
408 # open_log_file = open(logfile, 'w')
491 self.log.addHandler(self._log_handler)
409 # elif self.master_config.Global.log_url:
492 # log.startLogging(open_log_file)
410 # open_log_file = None
411 # else:
412 # open_log_file = sys.stdout
413 # if open_log_file is not None:
414 # self.log.removeHandler(self._log_handler)
415 # self._log_handler = logging.StreamHandler(open_log_file)
416 # self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
417 # self._log_handler.setFormatter(self._log_formatter)
418 # self.log.addHandler(self._log_handler)
419 # # log.startLogging(open_log_file)
493
420
494 def write_pid_file(self, overwrite=False):
421 def write_pid_file(self, overwrite=False):
495 """Create a .pid file in the pid_dir with my pid.
422 """Create a .pid file in the pid_dir with my pid.
@@ -497,7 +424,7 b' class ApplicationWithClusterDir(Application):'
497 This must be called after pre_construct, which sets `self.pid_dir`.
424 This must be called after pre_construct, which sets `self.pid_dir`.
498 This raises :exc:`PIDFileError` if the pid file exists already.
425 This raises :exc:`PIDFileError` if the pid file exists already.
499 """
426 """
500 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
427 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
501 if os.path.isfile(pid_file):
428 if os.path.isfile(pid_file):
502 pid = self.get_pid_from_file()
429 pid = self.get_pid_from_file()
503 if not overwrite:
430 if not overwrite:
@@ -516,7 +443,7 b' class ApplicationWithClusterDir(Application):'
516 :func:`reactor.addSystemEventTrigger`. This needs to return
443 :func:`reactor.addSystemEventTrigger`. This needs to return
517 ``None``.
444 ``None``.
518 """
445 """
519 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
446 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
520 if os.path.isfile(pid_file):
447 if os.path.isfile(pid_file):
521 try:
448 try:
522 self.log.info("Removing pid file: %s" % pid_file)
449 self.log.info("Removing pid file: %s" % pid_file)
@@ -529,7 +456,7 b' class ApplicationWithClusterDir(Application):'
529
456
530 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
457 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
531 """
458 """
532 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
459 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
533 if os.path.isfile(pid_file):
460 if os.path.isfile(pid_file):
534 with open(pid_file, 'r') as f:
461 with open(pid_file, 'r') as f:
535 pid = int(f.read().strip())
462 pid = int(f.read().strip())
@@ -563,4 +490,3 b' class ApplicationWithClusterDir(Application):'
563 return True
490 return True
564 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
491 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
565 return pid in pids
492 return pid in pids
566 No newline at end of file
@@ -25,12 +25,14 b' from subprocess import check_call, CalledProcessError, PIPE'
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27
27
28 from IPython.external.argparse import ArgumentParser, SUPPRESS
28 from IPython.config.loader import Config
29 from IPython.utils.importstring import import_item
29 from IPython.utils.importstring import import_item
30 from IPython.utils.traitlets import Int, CStr, CUnicode, Str, Bool, CFloat, Dict, List
30
31
31 from IPython.parallel.apps.clusterdir import (
32 from IPython.parallel.apps.clusterdir import (
32 ApplicationWithClusterDir, ClusterDirConfigLoader,
33 ClusterDirApplication, ClusterDirError,
33 ClusterDirError, PIDFileError
34 PIDFileError,
35 base_flags,
34 )
36 )
35
37
36
38
@@ -49,9 +51,9 b' An IPython cluster consists of 1 controller and 1 or more engines.'
49 This command automates the startup of these processes using a wide
51 This command automates the startup of these processes using a wide
50 range of startup methods (SSH, local processes, PBS, mpiexec,
52 range of startup methods (SSH, local processes, PBS, mpiexec,
51 Windows HPC Server 2008). To start a cluster with 4 engines on your
53 Windows HPC Server 2008). To start a cluster with 4 engines on your
52 local host simply do 'ipcluster start -n 4'. For more complex usage
54 local host simply do 'ipcluster start n=4'. For more complex usage
53 you will typically do 'ipcluster create -p mycluster', then edit
55 you will typically do 'ipcluster --create profile=mycluster', then edit
54 configuration files, followed by 'ipcluster start -p mycluster -n 4'.
56 configuration files, followed by 'ipcluster --start -p mycluster -n 4'.
55 """
57 """
56
58
57
59
@@ -72,96 +74,9 b' NO_CLUSTER = 12'
72
74
73
75
74 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
75 # Command line options
77 # Main application
76 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
77
79 start_help = """Start an ipython cluster by its profile name or cluster
78
79 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
80
81 def _add_arguments(self):
82 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
83 # its defaults on self.parser. Instead, we will put those on
84 # default options on our subparsers.
85
86 # This has all the common options that all subcommands use
87 parent_parser1 = ArgumentParser(
88 add_help=False,
89 argument_default=SUPPRESS
90 )
91 self._add_ipython_dir(parent_parser1)
92 self._add_log_level(parent_parser1)
93
94 # This has all the common options that other subcommands use
95 parent_parser2 = ArgumentParser(
96 add_help=False,
97 argument_default=SUPPRESS
98 )
99 self._add_cluster_profile(parent_parser2)
100 self._add_cluster_dir(parent_parser2)
101 self._add_work_dir(parent_parser2)
102 paa = parent_parser2.add_argument
103 paa('--log-to-file',
104 action='store_true', dest='Global.log_to_file',
105 help='Log to a file in the log directory (default is stdout)')
106
107 # Create the object used to create the subparsers.
108 subparsers = self.parser.add_subparsers(
109 dest='Global.subcommand',
110 title='ipcluster subcommands',
111 description=
112 """ipcluster has a variety of subcommands. The general way of
113 running ipcluster is 'ipcluster <cmd> [options]'. To get help
114 on a particular subcommand do 'ipcluster <cmd> -h'."""
115 # help="For more help, type 'ipcluster <cmd> -h'",
116 )
117
118 # The "list" subcommand parser
119 parser_list = subparsers.add_parser(
120 'list',
121 parents=[parent_parser1],
122 argument_default=SUPPRESS,
123 help="List all clusters in cwd and ipython_dir.",
124 description=
125 """List all available clusters, by cluster directory, that can
126 be found in the current working directly or in the ipython
127 directory. Cluster directories are named using the convention
128 'cluster_<profile>'."""
129 )
130
131 # The "create" subcommand parser
132 parser_create = subparsers.add_parser(
133 'create',
134 parents=[parent_parser1, parent_parser2],
135 argument_default=SUPPRESS,
136 help="Create a new cluster directory.",
137 description=
138 """Create an ipython cluster directory by its profile name or
139 cluster directory path. Cluster directories contain
140 configuration, log and security related files and are named
141 using the convention 'cluster_<profile>'. By default they are
142 located in your ipython directory. Once created, you will
143 probably need to edit the configuration files in the cluster
144 directory to configure your cluster. Most users will create a
145 cluster directory by profile name,
146 'ipcluster create -p mycluster', which will put the directory
147 in '<ipython_dir>/cluster_mycluster'.
148 """
149 )
150 paa = parser_create.add_argument
151 paa('--reset-config',
152 dest='Global.reset_config', action='store_true',
153 help=
154 """Recopy the default config files to the cluster directory.
155 You will loose any modifications you have made to these files.""")
156
157 # The "start" subcommand parser
158 parser_start = subparsers.add_parser(
159 'start',
160 parents=[parent_parser1, parent_parser2],
161 argument_default=SUPPRESS,
162 help="Start a cluster.",
163 description=
164 """Start an ipython cluster by its profile name or cluster
165 directory. Cluster directories contain configuration, log and
80 directory. Cluster directories contain configuration, log and
166 security related files and are named using the convention
81 security related files and are named using the convention
167 'cluster_<profile>' and should be creating using the 'start'
82 'cluster_<profile>' and should be creating using the 'start'
@@ -170,121 +85,130 b' class IPClusterAppConfigLoader(ClusterDirConfigLoader):'
170 using its profile name, 'ipcluster start -n 4 -p <profile>`,
85 using its profile name, 'ipcluster start -n 4 -p <profile>`,
171 otherwise use the '--cluster-dir' option.
86 otherwise use the '--cluster-dir' option.
172 """
87 """
173 )
88 stop_help = """Stop a running ipython cluster by its profile name or cluster
174
175 paa = parser_start.add_argument
176 paa('-n', '--number',
177 type=int, dest='Global.n',
178 help='The number of engines to start.',
179 metavar='Global.n')
180 paa('--clean-logs',
181 dest='Global.clean_logs', action='store_true',
182 help='Delete old log flies before starting.')
183 paa('--no-clean-logs',
184 dest='Global.clean_logs', action='store_false',
185 help="Don't delete old log flies before starting.")
186 paa('--daemon',
187 dest='Global.daemonize', action='store_true',
188 help='Daemonize the ipcluster program. This implies --log-to-file')
189 paa('--no-daemon',
190 dest='Global.daemonize', action='store_false',
191 help="Dont't daemonize the ipcluster program.")
192 paa('--delay',
193 type=float, dest='Global.delay',
194 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
195
196 # The "stop" subcommand parser
197 parser_stop = subparsers.add_parser(
198 'stop',
199 parents=[parent_parser1, parent_parser2],
200 argument_default=SUPPRESS,
201 help="Stop a running cluster.",
202 description=
203 """Stop a running ipython cluster by its profile name or cluster
204 directory. Cluster directories are named using the convention
89 directory. Cluster directories are named using the convention
205 'cluster_<profile>'. If your cluster directory is in
90 'cluster_<profile>'. If your cluster directory is in
206 the cwd or the ipython directory, you can simply refer to it
91 the cwd or the ipython directory, you can simply refer to it
207 using its profile name, 'ipcluster stop -p <profile>`, otherwise
92 using its profile name, 'ipcluster stop -p <profile>`, otherwise
208 use the '--cluster-dir' option.
93 use the '--cluster-dir' option.
209 """
94 """
210 )
95 engines_help = """Start one or more engines to connect to an existing Cluster
211 paa = parser_stop.add_argument
212 paa('--signal',
213 dest='Global.signal', type=int,
214 help="The signal number to use in stopping the cluster (default=2).",
215 metavar="Global.signal")
216
217 # the "engines" subcommand parser
218 parser_engines = subparsers.add_parser(
219 'engines',
220 parents=[parent_parser1, parent_parser2],
221 argument_default=SUPPRESS,
222 help="Attach some engines to an existing controller or cluster.",
223 description=
224 """Start one or more engines to connect to an existing Cluster
225 by profile name or cluster directory.
96 by profile name or cluster directory.
226 Cluster directories contain configuration, log and
97 Cluster directories contain configuration, log and
227 security related files and are named using the convention
98 security related files and are named using the convention
228 'cluster_<profile>' and should be creating using the 'start'
99 'cluster_<profile>' and should be creating using the 'start'
229 subcommand of 'ipcluster'. If your cluster directory is in
100 subcommand of 'ipcluster'. If your cluster directory is in
230 the cwd or the ipython directory, you can simply refer to it
101 the cwd or the ipython directory, you can simply refer to it
231 using its profile name, 'ipcluster engines -n 4 -p <profile>`,
102 using its profile name, 'ipcluster --engines -n 4 -p <profile>`,
232 otherwise use the '--cluster-dir' option.
103 otherwise use the 'cluster_dir' option.
233 """
104 """
234 )
105 create_help = """Create an ipython cluster directory by its profile name or
235 paa = parser_engines.add_argument
106 cluster directory path. Cluster directories contain
236 paa('-n', '--number',
107 configuration, log and security related files and are named
237 type=int, dest='Global.n',
108 using the convention 'cluster_<profile>'. By default they are
238 help='The number of engines to start.',
109 located in your ipython directory. Once created, you will
239 metavar='Global.n')
110 probably need to edit the configuration files in the cluster
240 paa('--daemon',
111 directory to configure your cluster. Most users will create a
241 dest='Global.daemonize', action='store_true',
112 cluster directory by profile name,
242 help='Daemonize the ipcluster program. This implies --log-to-file')
113 'ipcluster create -p mycluster', which will put the directory
243 paa('--no-daemon',
114 in '<ipython_dir>/cluster_mycluster'.
244 dest='Global.daemonize', action='store_false',
115 """
245 help="Dont't daemonize the ipcluster program.")
116 list_help = """List all available clusters, by cluster directory, that can
117 be found in the current working directly or in the ipython
118 directory. Cluster directories are named using the convention
119 'cluster_<profile>'."""
246
120
247 #-----------------------------------------------------------------------------
248 # Main application
249 #-----------------------------------------------------------------------------
250
121
122 flags = {}
123 flags.update(base_flags)
124 flags.update({
125 'start' : ({ 'IPClusterApp': Config({'subcommand' : 'start'})} , start_help),
126 'stop' : ({ 'IPClusterApp': Config({'subcommand' : 'stop'})} , stop_help),
127 'create' : ({ 'IPClusterApp': Config({'subcommand' : 'create'})} , create_help),
128 'engines' : ({ 'IPClusterApp': Config({'subcommand' : 'engines'})} , engines_help),
129 'list' : ({ 'IPClusterApp': Config({'subcommand' : 'list'})} , list_help),
251
130
252 class IPClusterApp(ApplicationWithClusterDir):
131 })
132
133 class IPClusterApp(ClusterDirApplication):
253
134
254 name = u'ipcluster'
135 name = u'ipcluster'
255 description = _description
136 description = _description
256 usage = None
137 usage = None
257 command_line_loader = IPClusterAppConfigLoader
258 default_config_file_name = default_config_file_name
138 default_config_file_name = default_config_file_name
259 default_log_level = logging.INFO
139 default_log_level = logging.INFO
260 auto_create_cluster_dir = False
140 auto_create_cluster_dir = False
141 classes = List()
142 def _classes_default(self,):
143 from IPython.parallel.apps import launcher
144 return launcher.all_launchers
145
146 n = Int(0, config=True,
147 help="The number of engines to start.")
148 signal = Int(signal.SIGINT, config=True,
149 help="signal to use for stopping. [default: SIGINT]")
150 delay = CFloat(1., config=True,
151 help="delay (in s) between starting the controller and the engines")
152
153 subcommand = Str('', config=True,
154 help="""ipcluster has a variety of subcommands. The general way of
155 running ipcluster is 'ipcluster --<cmd> [options]'."""
156 )
261
157
262 def create_default_config(self):
158 controller_launcher_class = Str('IPython.parallel.apps.launcher.LocalControllerLauncher',
263 super(IPClusterApp, self).create_default_config()
159 config=True,
264 self.default_config.Global.controller_launcher = \
160 help="The class for launching a Controller."
265 'IPython.parallel.apps.launcher.LocalControllerLauncher'
161 )
266 self.default_config.Global.engine_launcher = \
162 engine_launcher_class = Str('IPython.parallel.apps.launcher.LocalEngineSetLauncher',
267 'IPython.parallel.apps.launcher.LocalEngineSetLauncher'
163 config=True,
268 self.default_config.Global.n = 2
164 help="The class for launching Engines."
269 self.default_config.Global.delay = 2
165 )
270 self.default_config.Global.reset_config = False
166 reset = Bool(False, config=True,
271 self.default_config.Global.clean_logs = True
167 help="Whether to reset config files as part of '--create'."
272 self.default_config.Global.signal = signal.SIGINT
168 )
273 self.default_config.Global.daemonize = False
169 daemonize = Bool(False, config=True,
274
170 help='Daemonize the ipcluster program. This implies --log-to-file')
275 def find_resources(self):
171
276 subcommand = self.command_line_config.Global.subcommand
172 def _daemonize_changed(self, name, old, new):
173 if new:
174 self.log_to_file = True
175
176 def _n_changed(self, name, old, new):
177 # propagate n all over the place...
178 # TODO make this clean
179 # ensure all classes are covered.
180 self.config.LocalEngineSetLauncher.n=new
181 self.config.MPIExecEngineSetLauncher.n=new
182 self.config.SSHEngineSetLauncher.n=new
183 self.config.PBSEngineSetLauncher.n=new
184 self.config.SGEEngineSetLauncher.n=new
185 self.config.WinHPEngineSetLauncher.n=new
186
187 aliases = Dict(dict(
188 n='IPClusterApp.n',
189 signal = 'IPClusterApp.signal',
190 delay = 'IPClusterApp.delay',
191 clauncher = 'IPClusterApp.controller_launcher_class',
192 elauncher = 'IPClusterApp.engine_launcher_class',
193 ))
194 flags = Dict(flags)
195
196 def init_clusterdir(self):
197 subcommand = self.subcommand
277 if subcommand=='list':
198 if subcommand=='list':
278 self.list_cluster_dirs()
199 self.list_cluster_dirs()
279 # Exit immediately because there is nothing left to do.
200 self.exit(0)
280 self.exit()
201 if subcommand=='create':
281 elif subcommand=='create':
202 reset = self.reset_config
282 self.auto_create_cluster_dir = True
203 self.auto_create_cluster_dir = True
283 super(IPClusterApp, self).find_resources()
204 super(IPClusterApp, self).init_clusterdir()
205 self.log.info('Copying default config files to cluster directory '
206 '[overwrite=%r]' % (reset,))
207 self.cluster_dir.copy_all_config_files(overwrite=reset)
284 elif subcommand=='start' or subcommand=='stop':
208 elif subcommand=='start' or subcommand=='stop':
285 self.auto_create_cluster_dir = True
209 self.auto_create_cluster_dir = True
286 try:
210 try:
287 super(IPClusterApp, self).find_resources()
211 super(IPClusterApp, self).init_clusterdir()
288 except ClusterDirError:
212 except ClusterDirError:
289 raise ClusterDirError(
213 raise ClusterDirError(
290 "Could not find a cluster directory. A cluster dir must "
214 "Could not find a cluster directory. A cluster dir must "
@@ -295,7 +219,7 b' class IPClusterApp(ApplicationWithClusterDir):'
295 elif subcommand=='engines':
219 elif subcommand=='engines':
296 self.auto_create_cluster_dir = False
220 self.auto_create_cluster_dir = False
297 try:
221 try:
298 super(IPClusterApp, self).find_resources()
222 super(IPClusterApp, self).init_clusterdir()
299 except ClusterDirError:
223 except ClusterDirError:
300 raise ClusterDirError(
224 raise ClusterDirError(
301 "Could not find a cluster directory. A cluster dir must "
225 "Could not find a cluster directory. A cluster dir must "
@@ -312,9 +236,9 b' class IPClusterApp(ApplicationWithClusterDir):'
312 else:
236 else:
313 cluster_dir_paths = []
237 cluster_dir_paths = []
314 try:
238 try:
315 ipython_dir = self.command_line_config.Global.ipython_dir
239 ipython_dir = self.ipython_dir
316 except AttributeError:
240 except AttributeError:
317 ipython_dir = self.default_config.Global.ipython_dir
241 ipython_dir = self.ipython_dir
318 paths = [os.getcwd(), ipython_dir] + \
242 paths = [os.getcwd(), ipython_dir] + \
319 cluster_dir_paths
243 cluster_dir_paths
320 paths = list(set(paths))
244 paths = list(set(paths))
@@ -326,32 +250,13 b' class IPClusterApp(ApplicationWithClusterDir):'
326 full_path = os.path.join(path, f)
250 full_path = os.path.join(path, f)
327 if os.path.isdir(full_path) and f.startswith('cluster_'):
251 if os.path.isdir(full_path) and f.startswith('cluster_'):
328 profile = full_path.split('_')[-1]
252 profile = full_path.split('_')[-1]
329 start_cmd = 'ipcluster start -p %s -n 4' % profile
253 start_cmd = 'ipcluster --start profile=%s n=4' % profile
330 print start_cmd + " ==> " + full_path
254 print start_cmd + " ==> " + full_path
331
255
332 def pre_construct(self):
256 def init_launchers(self):
333 # IPClusterApp.pre_construct() is where we cd to the working directory.
257 config = self.config
334 super(IPClusterApp, self).pre_construct()
258 subcmd = self.subcommand
335 config = self.master_config
336 try:
337 daemon = config.Global.daemonize
338 if daemon:
339 config.Global.log_to_file = True
340 except AttributeError:
341 pass
342
343 def construct(self):
344 config = self.master_config
345 subcmd = config.Global.subcommand
346 reset = config.Global.reset_config
347 if subcmd == 'list':
348 return
349 if subcmd == 'create':
350 self.log.info('Copying default config files to cluster directory '
351 '[overwrite=%r]' % (reset,))
352 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
353 if subcmd =='start':
259 if subcmd =='start':
354 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
355 self.start_logging()
260 self.start_logging()
356 self.loop = ioloop.IOLoop.instance()
261 self.loop = ioloop.IOLoop.instance()
357 # reactor.callWhenRunning(self.start_launchers)
262 # reactor.callWhenRunning(self.start_launchers)
@@ -366,16 +271,19 b' class IPClusterApp(ApplicationWithClusterDir):'
366 dc.start()
271 dc.start()
367
272
368 def start_launchers(self, controller=True):
273 def start_launchers(self, controller=True):
369 config = self.master_config
274 config = self.config
370
275
371 # Create the launchers. In both bases, we set the work_dir of
276 # Create the launchers. In both bases, we set the work_dir of
372 # the launcher to the cluster_dir. This is where the launcher's
277 # the launcher to the cluster_dir. This is where the launcher's
373 # subprocesses will be launched. It is not where the controller
278 # subprocesses will be launched. It is not where the controller
374 # and engine will be launched.
279 # and engine will be launched.
375 if controller:
280 if controller:
376 cl_class = import_item(config.Global.controller_launcher)
281 clsname = self.controller_launcher_class
282 if '.' not in clsname:
283 clsname = 'IPython.parallel.apps.launcher.'+clsname
284 cl_class = import_item(clsname)
377 self.controller_launcher = cl_class(
285 self.controller_launcher = cl_class(
378 work_dir=self.cluster_dir, config=config,
286 work_dir=self.cluster_dir.location, config=config,
379 logname=self.log.name
287 logname=self.log.name
380 )
288 )
381 # Setup the observing of stopping. If the controller dies, shut
289 # Setup the observing of stopping. If the controller dies, shut
@@ -391,9 +299,15 b' class IPClusterApp(ApplicationWithClusterDir):'
391 else:
299 else:
392 self.controller_launcher = None
300 self.controller_launcher = None
393
301
394 el_class = import_item(config.Global.engine_launcher)
302 clsname = self.engine_launcher_class
303 if '.' not in clsname:
304 # not a module, presume it's the raw name in apps.launcher
305 clsname = 'IPython.parallel.apps.launcher.'+clsname
306 print repr(clsname)
307 el_class = import_item(clsname)
308
395 self.engine_launcher = el_class(
309 self.engine_launcher = el_class(
396 work_dir=self.cluster_dir, config=config, logname=self.log.name
310 work_dir=self.cluster_dir.location, config=config, logname=self.log.name
397 )
311 )
398
312
399 # Setup signals
313 # Setup signals
@@ -403,7 +317,7 b' class IPClusterApp(ApplicationWithClusterDir):'
403 self._stopping = False # Make sure stop_launchers is not called 2x.
317 self._stopping = False # Make sure stop_launchers is not called 2x.
404 if controller:
318 if controller:
405 self.start_controller()
319 self.start_controller()
406 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
320 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay*controller, self.loop)
407 dc.start()
321 dc.start()
408 self.startup_message()
322 self.startup_message()
409
323
@@ -413,19 +327,19 b' class IPClusterApp(ApplicationWithClusterDir):'
413
327
414 def start_controller(self, r=None):
328 def start_controller(self, r=None):
415 # self.log.info("In start_controller")
329 # self.log.info("In start_controller")
416 config = self.master_config
330 config = self.config
417 d = self.controller_launcher.start(
331 d = self.controller_launcher.start(
418 cluster_dir=config.Global.cluster_dir
332 cluster_dir=self.cluster_dir.location
419 )
333 )
420 return d
334 return d
421
335
422 def start_engines(self, r=None):
336 def start_engines(self, r=None):
423 # self.log.info("In start_engines")
337 # self.log.info("In start_engines")
424 config = self.master_config
338 config = self.config
425
339
426 d = self.engine_launcher.start(
340 d = self.engine_launcher.start(
427 config.Global.n,
341 self.n,
428 cluster_dir=config.Global.cluster_dir
342 cluster_dir=self.cluster_dir.location
429 )
343 )
430 return d
344 return d
431
345
@@ -469,18 +383,19 b' class IPClusterApp(ApplicationWithClusterDir):'
469
383
470 def start_logging(self):
384 def start_logging(self):
471 # Remove old log files of the controller and engine
385 # Remove old log files of the controller and engine
472 if self.master_config.Global.clean_logs:
386 if self.clean_logs:
473 log_dir = self.master_config.Global.log_dir
387 log_dir = self.cluster_dir.log_dir
474 for f in os.listdir(log_dir):
388 for f in os.listdir(log_dir):
475 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
389 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
476 os.remove(os.path.join(log_dir, f))
390 os.remove(os.path.join(log_dir, f))
477 # This will remove old log files for ipcluster itself
391 # This will remove old log files for ipcluster itself
478 super(IPClusterApp, self).start_logging()
392 # super(IPClusterApp, self).start_logging()
479
393
480 def start_app(self):
394 def start(self):
481 """Start the application, depending on what subcommand is used."""
395 """Start the application, depending on what subcommand is used."""
482 subcmd = self.master_config.Global.subcommand
396 subcmd = self.subcommand
483 if subcmd=='create' or subcmd=='list':
397 if subcmd=='create':
398 # init_clusterdir step completed create action
484 return
399 return
485 elif subcmd=='start':
400 elif subcmd=='start':
486 self.start_app_start()
401 self.start_app_start()
@@ -488,10 +403,14 b' class IPClusterApp(ApplicationWithClusterDir):'
488 self.start_app_stop()
403 self.start_app_stop()
489 elif subcmd=='engines':
404 elif subcmd=='engines':
490 self.start_app_engines()
405 self.start_app_engines()
406 else:
407 self.log.fatal("one command of '--start', '--stop', '--list', '--create', '--engines'"
408 " must be specified")
409 self.exit(-1)
491
410
492 def start_app_start(self):
411 def start_app_start(self):
493 """Start the app for the start subcommand."""
412 """Start the app for the start subcommand."""
494 config = self.master_config
413 config = self.config
495 # First see if the cluster is already running
414 # First see if the cluster is already running
496 try:
415 try:
497 pid = self.get_pid_from_file()
416 pid = self.get_pid_from_file()
@@ -512,10 +431,10 b' class IPClusterApp(ApplicationWithClusterDir):'
512
431
513 # Now log and daemonize
432 # Now log and daemonize
514 self.log.info(
433 self.log.info(
515 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
434 'Starting ipcluster with [daemon=%r]' % self.daemonize
516 )
435 )
517 # TODO: Get daemonize working on Windows or as a Windows Server.
436 # TODO: Get daemonize working on Windows or as a Windows Server.
518 if config.Global.daemonize:
437 if self.daemonize:
519 if os.name=='posix':
438 if os.name=='posix':
520 from twisted.scripts._twistd_unix import daemonize
439 from twisted.scripts._twistd_unix import daemonize
521 daemonize()
440 daemonize()
@@ -536,15 +455,15 b' class IPClusterApp(ApplicationWithClusterDir):'
536
455
537 def start_app_engines(self):
456 def start_app_engines(self):
538 """Start the app for the start subcommand."""
457 """Start the app for the start subcommand."""
539 config = self.master_config
458 config = self.config
540 # First see if the cluster is already running
459 # First see if the cluster is already running
541
460
542 # Now log and daemonize
461 # Now log and daemonize
543 self.log.info(
462 self.log.info(
544 'Starting engines with [daemon=%r]' % config.Global.daemonize
463 'Starting engines with [daemon=%r]' % self.daemonize
545 )
464 )
546 # TODO: Get daemonize working on Windows or as a Windows Server.
465 # TODO: Get daemonize working on Windows or as a Windows Server.
547 if config.Global.daemonize:
466 if self.daemonize:
548 if os.name=='posix':
467 if os.name=='posix':
549 from twisted.scripts._twistd_unix import daemonize
468 from twisted.scripts._twistd_unix import daemonize
550 daemonize()
469 daemonize()
@@ -564,7 +483,7 b' class IPClusterApp(ApplicationWithClusterDir):'
564
483
565 def start_app_stop(self):
484 def start_app_stop(self):
566 """Start the app for the stop subcommand."""
485 """Start the app for the stop subcommand."""
567 config = self.master_config
486 config = self.config
568 try:
487 try:
569 pid = self.get_pid_from_file()
488 pid = self.get_pid_from_file()
570 except PIDFileError:
489 except PIDFileError:
@@ -586,7 +505,7 b' class IPClusterApp(ApplicationWithClusterDir):'
586 self.exit(ALREADY_STOPPED)
505 self.exit(ALREADY_STOPPED)
587
506
588 elif os.name=='posix':
507 elif os.name=='posix':
589 sig = config.Global.signal
508 sig = self.signal
590 self.log.info(
509 self.log.info(
591 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
510 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
592 )
511 )
@@ -609,6 +528,20 b' class IPClusterApp(ApplicationWithClusterDir):'
609 def launch_new_instance():
528 def launch_new_instance():
610 """Create and run the IPython cluster."""
529 """Create and run the IPython cluster."""
611 app = IPClusterApp()
530 app = IPClusterApp()
531 app.parse_command_line()
532 cl_config = app.config
533 app.init_clusterdir()
534 if app.config_file:
535 app.load_config_file(app.config_file)
536 else:
537 app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
538 # command-line should *override* config file, but command-line is necessary
539 # to determine clusterdir, etc.
540 app.update_config(cl_config)
541
542 app.to_work_dir()
543 app.init_launchers()
544
612 app.start()
545 app.start()
613
546
614
547
@@ -25,7 +25,10 b' import stat'
25 import sys
25 import sys
26 import uuid
26 import uuid
27
27
28 from multiprocessing import Process
29
28 import zmq
30 import zmq
31 from zmq.devices import ProcessMonitoredQueue
29 from zmq.log.handlers import PUBHandler
32 from zmq.log.handlers import PUBHandler
30 from zmq.utils import jsonapi as json
33 from zmq.utils import jsonapi as json
31
34
@@ -34,14 +37,31 b' from IPython.config.loader import Config'
34 from IPython.parallel import factory
37 from IPython.parallel import factory
35
38
36 from IPython.parallel.apps.clusterdir import (
39 from IPython.parallel.apps.clusterdir import (
37 ApplicationWithClusterDir,
40 ClusterDir,
38 ClusterDirConfigLoader
41 ClusterDirApplication,
42 base_flags
43 # ClusterDirConfigLoader
39 )
44 )
40 from IPython.parallel.util import disambiguate_ip_address, split_url
45 from IPython.utils.importstring import import_item
41 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, CStr, Dict
42 from IPython.utils.traitlets import Instance, Unicode
47
48 # from IPython.parallel.controller.controller import ControllerFactory
49 from IPython.parallel.streamsession import StreamSession
50 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 from IPython.parallel.controller.hub import Hub, HubFactory
52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 from IPython.parallel.controller.sqlitedb import SQLiteDB
54
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
43
56
44 from IPython.parallel.controller.controller import ControllerFactory
57 # conditional import of MongoDB backend class
58
59 try:
60 from IPython.parallel.controller.mongodb import MongoDB
61 except ImportError:
62 maybe_mongo = []
63 else:
64 maybe_mongo = [MongoDB]
45
65
46
66
47 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
@@ -63,234 +83,102 b' your ipython directory and named as "cluster_<profile>". See the --profile'
63 and --cluster-dir options for details.
83 and --cluster-dir options for details.
64 """
84 """
65
85
66 #-----------------------------------------------------------------------------
67 # Default interfaces
68 #-----------------------------------------------------------------------------
69
70 # The default client interfaces for FCClientServiceFactory.interfaces
71 default_client_interfaces = Config()
72 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
73
74 # Make this a dict we can pass to Config.__init__ for the default
75 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
76
77
78
86
79 # The default engine interfaces for FCEngineServiceFactory.interfaces
80 default_engine_interfaces = Config()
81 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
82
83 # Make this a dict we can pass to Config.__init__ for the default
84 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
85
86
87 #-----------------------------------------------------------------------------
88 # Service factories
89 #-----------------------------------------------------------------------------
90
91 #
92 # class FCClientServiceFactory(FCServiceFactory):
93 # """A Foolscap implementation of the client services."""
94 #
95 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
96 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
97 # allow_none=False, config=True)
98 #
99 #
100 # class FCEngineServiceFactory(FCServiceFactory):
101 # """A Foolscap implementation of the engine services."""
102 #
103 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
104 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
105 # allow_none=False, config=True)
106 #
107
108 #-----------------------------------------------------------------------------
109 # Command line options
110 #-----------------------------------------------------------------------------
111
112
113 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
114
115 def _add_arguments(self):
116 super(IPControllerAppConfigLoader, self)._add_arguments()
117 paa = self.parser.add_argument
118
119 ## Hub Config:
120 paa('--mongodb',
121 dest='HubFactory.db_class', action='store_const',
122 const='IPython.parallel.controller.mongodb.MongoDB',
123 help='Use MongoDB for task storage [default: in-memory]')
124 paa('--sqlite',
125 dest='HubFactory.db_class', action='store_const',
126 const='IPython.parallel.controller.sqlitedb.SQLiteDB',
127 help='Use SQLite3 for DB task storage [default: in-memory]')
128 paa('--hb',
129 type=int, dest='HubFactory.hb', nargs=2,
130 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
131 'connections [default: random]',
132 metavar='Hub.hb_ports')
133 paa('--ping',
134 type=int, dest='HubFactory.ping',
135 help='The frequency at which the Hub pings the engines for heartbeats '
136 ' (in ms) [default: 100]',
137 metavar='Hub.ping')
138
139 # Client config
140 paa('--client-ip',
141 type=str, dest='HubFactory.client_ip',
142 help='The IP address or hostname the Hub will listen on for '
143 'client connections. Both engine-ip and client-ip can be set simultaneously '
144 'via --ip [default: loopback]',
145 metavar='Hub.client_ip')
146 paa('--client-transport',
147 type=str, dest='HubFactory.client_transport',
148 help='The ZeroMQ transport the Hub will use for '
149 'client connections. Both engine-transport and client-transport can be set simultaneously '
150 'via --transport [default: tcp]',
151 metavar='Hub.client_transport')
152 paa('--query',
153 type=int, dest='HubFactory.query_port',
154 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
155 metavar='Hub.query_port')
156 paa('--notifier',
157 type=int, dest='HubFactory.notifier_port',
158 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
159 metavar='Hub.notifier_port')
160
161 # Engine config
162 paa('--engine-ip',
163 type=str, dest='HubFactory.engine_ip',
164 help='The IP address or hostname the Hub will listen on for '
165 'engine connections. This applies to the Hub and its schedulers'
166 'engine-ip and client-ip can be set simultaneously '
167 'via --ip [default: loopback]',
168 metavar='Hub.engine_ip')
169 paa('--engine-transport',
170 type=str, dest='HubFactory.engine_transport',
171 help='The ZeroMQ transport the Hub will use for '
172 'client connections. Both engine-transport and client-transport can be set simultaneously '
173 'via --transport [default: tcp]',
174 metavar='Hub.engine_transport')
175
176 # Scheduler config
177 paa('--mux',
178 type=int, dest='ControllerFactory.mux', nargs=2,
179 help='The (2) ports the MUX scheduler will listen on for client,engine '
180 'connections, respectively [default: random]',
181 metavar='Scheduler.mux_ports')
182 paa('--task',
183 type=int, dest='ControllerFactory.task', nargs=2,
184 help='The (2) ports the Task scheduler will listen on for client,engine '
185 'connections, respectively [default: random]',
186 metavar='Scheduler.task_ports')
187 paa('--control',
188 type=int, dest='ControllerFactory.control', nargs=2,
189 help='The (2) ports the Control scheduler will listen on for client,engine '
190 'connections, respectively [default: random]',
191 metavar='Scheduler.control_ports')
192 paa('--iopub',
193 type=int, dest='ControllerFactory.iopub', nargs=2,
194 help='The (2) ports the IOPub scheduler will listen on for client,engine '
195 'connections, respectively [default: random]',
196 metavar='Scheduler.iopub_ports')
197
198 paa('--scheme',
199 type=str, dest='HubFactory.scheme',
200 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
201 help='select the task scheduler scheme [default: Python LRU]',
202 metavar='Scheduler.scheme')
203 paa('--usethreads',
204 dest='ControllerFactory.usethreads', action="store_true",
205 help='Use threads instead of processes for the schedulers',
206 )
207 paa('--hwm',
208 dest='TaskScheduler.hwm', type=int,
209 help='specify the High Water Mark (HWM) '
210 'in the Python scheduler. This is the maximum number '
211 'of allowed outstanding tasks on each engine.',
212 )
213
214 ## Global config
215 paa('--log-to-file',
216 action='store_true', dest='Global.log_to_file',
217 help='Log to a file in the log directory (default is stdout)')
218 paa('--log-url',
219 type=str, dest='Global.log_url',
220 help='Broadcast logs to an iploggerz process [default: disabled]')
221 paa('-r','--reuse-files',
222 action='store_true', dest='Global.reuse_files',
223 help='Try to reuse existing json connection files.')
224 paa('--no-secure',
225 action='store_false', dest='Global.secure',
226 help='Turn off execution keys (default).')
227 paa('--secure',
228 action='store_true', dest='Global.secure',
229 help='Turn on execution keys.')
230 paa('--execkey',
231 type=str, dest='Global.exec_key',
232 help='path to a file containing an execution key.',
233 metavar='keyfile')
234 paa('--ssh',
235 type=str, dest='Global.sshserver',
236 help='ssh url for clients to use when connecting to the Controller '
237 'processes. It should be of the form: [user@]server[:port]. The '
238 'Controller\'s listening addresses must be accessible from the ssh server',
239 metavar='Global.sshserver')
240 paa('--location',
241 type=str, dest='Global.location',
242 help="The external IP or domain name of this machine, used for disambiguating "
243 "engine and client connections.",
244 metavar='Global.location')
245 factory.add_session_arguments(self.parser)
246 factory.add_registration_arguments(self.parser)
247
87
248
88
249 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
250 # The main application
90 # The main application
251 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
252
92 flags = {}
253
93 flags.update(base_flags)
254 class IPControllerApp(ApplicationWithClusterDir):
94 flags.update({
95 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
96 'Use threads instead of processes for the schedulers'),
97 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
98 'use the SQLiteDB backend'),
99 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
100 'use the MongoDB backend'),
101 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
102 'use the in-memory DictDB backend'),
103 })
104
105 flags.update()
106
107 class IPControllerApp(ClusterDirApplication):
255
108
256 name = u'ipcontroller'
109 name = u'ipcontroller'
257 description = _description
110 description = _description
258 command_line_loader = IPControllerAppConfigLoader
111 # command_line_loader = IPControllerAppConfigLoader
259 default_config_file_name = default_config_file_name
112 default_config_file_name = default_config_file_name
260 auto_create_cluster_dir = True
113 auto_create_cluster_dir = True
114 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
261
115
262
116 reuse_files = Bool(False, config=True,
263 def create_default_config(self):
117 help='Whether to reuse existing json connection files [default: False]'
264 super(IPControllerApp, self).create_default_config()
118 )
265 # Don't set defaults for Global.secure or Global.reuse_furls
119 secure = Bool(True, config=True,
266 # as those are set in a component.
120 help='Whether to use exec_keys for extra authentication [default: True]'
267 self.default_config.Global.import_statements = []
121 )
268 self.default_config.Global.clean_logs = True
122 ssh_server = Unicode(u'', config=True,
269 self.default_config.Global.secure = True
123 help="""ssh url for clients to use when connecting to the Controller
270 self.default_config.Global.reuse_files = False
124 processes. It should be of the form: [user@]server[:port]. The
271 self.default_config.Global.exec_key = "exec_key.key"
125 Controller\'s listening addresses must be accessible from the ssh server""",
272 self.default_config.Global.sshserver = None
126 )
273 self.default_config.Global.location = None
127 location = Unicode(u'', config=True,
274
128 help="""The external IP or domain name of the Controller, used for disambiguating
275 def pre_construct(self):
129 engine and client connections.""",
276 super(IPControllerApp, self).pre_construct()
130 )
277 c = self.master_config
131 import_statements = List([], config=True,
278 # The defaults for these are set in FCClientServiceFactory and
132 help="import statements to be run at startup. Necessary in some environments"
279 # FCEngineServiceFactory, so we only set them here if the global
133 )
280 # options have be set to override the class level defaults.
134
135 usethreads = Bool(False, config=True,
136 help='Use threads instead of processes for the schedulers',
137 )
138
139 # internal
140 children = List()
141 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
142
143 def _usethreads_changed(self, name, old, new):
144 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
145
146 aliases = Dict(dict(
147 config = 'IPControllerApp.config_file',
148 # file = 'IPControllerApp.url_file',
149 log_level = 'IPControllerApp.log_level',
150 reuse_files = 'IPControllerApp.reuse_files',
151 secure = 'IPControllerApp.secure',
152 ssh = 'IPControllerApp.ssh_server',
153 usethreads = 'IPControllerApp.usethreads',
154 import_statements = 'IPControllerApp.import_statements',
155 location = 'IPControllerApp.location',
156
157 ident = 'StreamSession.session',
158 user = 'StreamSession.username',
159 exec_key = 'StreamSession.keyfile',
160
161 url = 'HubFactory.url',
162 ip = 'HubFactory.ip',
163 transport = 'HubFactory.transport',
164 port = 'HubFactory.regport',
165
166 ping = 'HeartMonitor.period',
167
168 scheme = 'TaskScheduler.scheme_name',
169 hwm = 'TaskScheduler.hwm',
170
171
172 profile = "ClusterDir.profile",
173 cluster_dir = 'ClusterDir.location',
281
174
282 # if hasattr(c.Global, 'reuse_furls'):
175 ))
283 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
176 flags = Dict(flags)
284 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
285 # del c.Global.reuse_furls
286 # if hasattr(c.Global, 'secure'):
287 # c.FCClientServiceFactory.secure = c.Global.secure
288 # c.FCEngineServiceFactory.secure = c.Global.secure
289 # del c.Global.secure
290
177
178
291 def save_connection_dict(self, fname, cdict):
179 def save_connection_dict(self, fname, cdict):
292 """save a connection dict to json file."""
180 """save a connection dict to json file."""
293 c = self.master_config
181 c = self.config
294 url = cdict['url']
182 url = cdict['url']
295 location = cdict['location']
183 location = cdict['location']
296 if not location:
184 if not location:
@@ -301,43 +189,43 b' class IPControllerApp(ApplicationWithClusterDir):'
301 else:
189 else:
302 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
190 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
303 cdict['location'] = location
191 cdict['location'] = location
304 fname = os.path.join(c.Global.security_dir, fname)
192 fname = os.path.join(self.cluster_dir.security_dir, fname)
305 with open(fname, 'w') as f:
193 with open(fname, 'w') as f:
306 f.write(json.dumps(cdict, indent=2))
194 f.write(json.dumps(cdict, indent=2))
307 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
195 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
308
196
309 def load_config_from_json(self):
197 def load_config_from_json(self):
310 """load config from existing json connector files."""
198 """load config from existing json connector files."""
311 c = self.master_config
199 c = self.config
312 # load from engine config
200 # load from engine config
313 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
201 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
314 cfg = json.loads(f.read())
202 cfg = json.loads(f.read())
315 key = c.SessionFactory.exec_key = cfg['exec_key']
203 key = c.StreamSession.key = cfg['exec_key']
316 xport,addr = cfg['url'].split('://')
204 xport,addr = cfg['url'].split('://')
317 c.HubFactory.engine_transport = xport
205 c.HubFactory.engine_transport = xport
318 ip,ports = addr.split(':')
206 ip,ports = addr.split(':')
319 c.HubFactory.engine_ip = ip
207 c.HubFactory.engine_ip = ip
320 c.HubFactory.regport = int(ports)
208 c.HubFactory.regport = int(ports)
321 c.Global.location = cfg['location']
209 self.location = cfg['location']
322
210
323 # load client config
211 # load client config
324 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
212 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
325 cfg = json.loads(f.read())
213 cfg = json.loads(f.read())
326 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
214 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
327 xport,addr = cfg['url'].split('://')
215 xport,addr = cfg['url'].split('://')
328 c.HubFactory.client_transport = xport
216 c.HubFactory.client_transport = xport
329 ip,ports = addr.split(':')
217 ip,ports = addr.split(':')
330 c.HubFactory.client_ip = ip
218 c.HubFactory.client_ip = ip
331 c.Global.sshserver = cfg['ssh']
219 self.ssh_server = cfg['ssh']
332 assert int(ports) == c.HubFactory.regport, "regport mismatch"
220 assert int(ports) == c.HubFactory.regport, "regport mismatch"
333
221
334 def construct(self):
222 def init_hub(self):
335 # This is the working dir by now.
223 # This is the working dir by now.
336 sys.path.insert(0, '')
224 sys.path.insert(0, '')
337 c = self.master_config
225 c = self.config
338
226
339 self.import_statements()
227 self.do_import_statements()
340 reusing = c.Global.reuse_files
228 reusing = self.reuse_files
341 if reusing:
229 if reusing:
342 try:
230 try:
343 self.load_config_from_json()
231 self.load_config_from_json()
@@ -346,21 +234,20 b' class IPControllerApp(ApplicationWithClusterDir):'
346 # check again, because reusing may have failed:
234 # check again, because reusing may have failed:
347 if reusing:
235 if reusing:
348 pass
236 pass
349 elif c.Global.secure:
237 elif self.secure:
350 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
351 key = str(uuid.uuid4())
238 key = str(uuid.uuid4())
352 with open(keyfile, 'w') as f:
239 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
353 f.write(key)
240 # with open(keyfile, 'w') as f:
354 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
241 # f.write(key)
355 c.SessionFactory.exec_key = key
242 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
243 c.StreamSession.key = key
356 else:
244 else:
357 c.SessionFactory.exec_key = ''
245 key = c.StreamSession.key = ''
358 key = None
359
246
360 try:
247 try:
361 self.factory = ControllerFactory(config=c, logname=self.log.name)
248 self.factory = HubFactory(config=c, log=self.log)
362 self.start_logging()
249 # self.start_logging()
363 self.factory.construct()
250 self.factory.init_hub()
364 except:
251 except:
365 self.log.error("Couldn't construct the Controller", exc_info=True)
252 self.log.error("Couldn't construct the Controller", exc_info=True)
366 self.exit(1)
253 self.exit(1)
@@ -369,21 +256,82 b' class IPControllerApp(ApplicationWithClusterDir):'
369 # save to new json config files
256 # save to new json config files
370 f = self.factory
257 f = self.factory
371 cdict = {'exec_key' : key,
258 cdict = {'exec_key' : key,
372 'ssh' : c.Global.sshserver,
259 'ssh' : self.ssh_server,
373 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
260 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
374 'location' : c.Global.location
261 'location' : self.location
375 }
262 }
376 self.save_connection_dict('ipcontroller-client.json', cdict)
263 self.save_connection_dict('ipcontroller-client.json', cdict)
377 edict = cdict
264 edict = cdict
378 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
265 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
379 self.save_connection_dict('ipcontroller-engine.json', edict)
266 self.save_connection_dict('ipcontroller-engine.json', edict)
267
268 #
269 def init_schedulers(self):
270 children = self.children
271 mq = import_item(self.mq_class)
380
272
273 hub = self.factory
274 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
275 # IOPub relay (in a Process)
276 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
277 q.bind_in(hub.client_info['iopub'])
278 q.bind_out(hub.engine_info['iopub'])
279 q.setsockopt_out(zmq.SUBSCRIBE, '')
280 q.connect_mon(hub.monitor_url)
281 q.daemon=True
282 children.append(q)
283
284 # Multiplexer Queue (in a Process)
285 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
286 q.bind_in(hub.client_info['mux'])
287 q.setsockopt_in(zmq.IDENTITY, 'mux')
288 q.bind_out(hub.engine_info['mux'])
289 q.connect_mon(hub.monitor_url)
290 q.daemon=True
291 children.append(q)
292
293 # Control Queue (in a Process)
294 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
295 q.bind_in(hub.client_info['control'])
296 q.setsockopt_in(zmq.IDENTITY, 'control')
297 q.bind_out(hub.engine_info['control'])
298 q.connect_mon(hub.monitor_url)
299 q.daemon=True
300 children.append(q)
301 try:
302 scheme = self.config.TaskScheduler.scheme_name
303 except AttributeError:
304 scheme = TaskScheduler.scheme_name.get_default_value()
305 # Task Queue (in a Process)
306 if scheme == 'pure':
307 self.log.warn("task::using pure XREQ Task scheduler")
308 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
309 # q.setsockopt_out(zmq.HWM, hub.hwm)
310 q.bind_in(hub.client_info['task'][1])
311 q.setsockopt_in(zmq.IDENTITY, 'task')
312 q.bind_out(hub.engine_info['task'])
313 q.connect_mon(hub.monitor_url)
314 q.daemon=True
315 children.append(q)
316 elif scheme == 'none':
317 self.log.warn("task::using no Task scheduler")
318
319 else:
320 self.log.info("task::using Python %s Task scheduler"%scheme)
321 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
322 hub.monitor_url, hub.client_info['notification'])
323 kwargs = dict(logname=self.log.name, loglevel=self.log_level,
324 config=dict(self.config))
325 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
326 q.daemon=True
327 children.append(q)
328
381
329
382 def save_urls(self):
330 def save_urls(self):
383 """save the registration urls to files."""
331 """save the registration urls to files."""
384 c = self.master_config
332 c = self.config
385
333
386 sec_dir = c.Global.security_dir
334 sec_dir = self.cluster_dir.security_dir
387 cf = self.factory
335 cf = self.factory
388
336
389 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
337 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
@@ -393,8 +341,8 b' class IPControllerApp(ApplicationWithClusterDir):'
393 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
341 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
394
342
395
343
396 def import_statements(self):
344 def do_import_statements(self):
397 statements = self.master_config.Global.import_statements
345 statements = self.import_statements
398 for s in statements:
346 for s in statements:
399 try:
347 try:
400 self.log.msg("Executing statement: '%s'" % s)
348 self.log.msg("Executing statement: '%s'" % s)
@@ -402,21 +350,32 b' class IPControllerApp(ApplicationWithClusterDir):'
402 except:
350 except:
403 self.log.msg("Error running statement: %s" % s)
351 self.log.msg("Error running statement: %s" % s)
404
352
405 def start_logging(self):
353 # def start_logging(self):
406 super(IPControllerApp, self).start_logging()
354 # super(IPControllerApp, self).start_logging()
407 if self.master_config.Global.log_url:
355 # if self.config.Global.log_url:
408 context = self.factory.context
356 # context = self.factory.context
409 lsock = context.socket(zmq.PUB)
357 # lsock = context.socket(zmq.PUB)
410 lsock.connect(self.master_config.Global.log_url)
358 # lsock.connect(self.config.Global.log_url)
411 handler = PUBHandler(lsock)
359 # handler = PUBHandler(lsock)
412 handler.root_topic = 'controller'
360 # handler.root_topic = 'controller'
413 handler.setLevel(self.log_level)
361 # handler.setLevel(self.log_level)
414 self.log.addHandler(handler)
362 # self.log.addHandler(handler)
415 #
363 # #
416 def start_app(self):
364 def start(self):
417 # Start the subprocesses:
365 # Start the subprocesses:
418 self.factory.start()
366 self.factory.start()
367 child_procs = []
368 for child in self.children:
369 child.start()
370 if isinstance(child, ProcessMonitoredQueue):
371 child_procs.append(child.launcher)
372 elif isinstance(child, Process):
373 child_procs.append(child)
374 if child_procs:
375 signal_children(child_procs)
376
419 self.write_pid_file(overwrite=True)
377 self.write_pid_file(overwrite=True)
378
420 try:
379 try:
421 self.factory.loop.start()
380 self.factory.loop.start()
422 except KeyboardInterrupt:
381 except KeyboardInterrupt:
@@ -426,6 +385,22 b' class IPControllerApp(ApplicationWithClusterDir):'
426 def launch_new_instance():
385 def launch_new_instance():
427 """Create and run the IPython controller"""
386 """Create and run the IPython controller"""
428 app = IPControllerApp()
387 app = IPControllerApp()
388 app.parse_command_line()
389 cl_config = app.config
390 # app.load_config_file()
391 app.init_clusterdir()
392 if app.config_file:
393 app.load_config_file(app.config_file)
394 else:
395 app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
396 # command-line should *override* config file, but command-line is necessary
397 # to determine clusterdir, etc.
398 app.update_config(cl_config)
399
400 app.to_work_dir()
401 app.init_hub()
402 app.init_schedulers()
403
429 app.start()
404 app.start()
430
405
431
406
@@ -23,17 +23,21 b' import zmq'
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 from IPython.parallel.apps.clusterdir import (
25 from IPython.parallel.apps.clusterdir import (
26 ApplicationWithClusterDir,
26 ClusterDirApplication,
27 ClusterDirConfigLoader
27 ClusterDir,
28 base_aliases,
29 # ClusterDirConfigLoader
28 )
30 )
29 from IPython.zmq.log import EnginePUBHandler
31 from IPython.zmq.log import EnginePUBHandler
30
32
31 from IPython.parallel import factory
33 from IPython.config.configurable import Configurable
34 from IPython.parallel.streamsession import StreamSession
32 from IPython.parallel.engine.engine import EngineFactory
35 from IPython.parallel.engine.engine import EngineFactory
33 from IPython.parallel.engine.streamkernel import Kernel
36 from IPython.parallel.engine.streamkernel import Kernel
34 from IPython.parallel.util import disambiguate_url
37 from IPython.parallel.util import disambiguate_url
35
38
36 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.traitlets import Str, Bool, Unicode, Dict, List, CStr
37
41
38
42
39 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
@@ -43,6 +47,20 b' from IPython.utils.importstring import import_item'
43 #: The default config file name for this application
47 #: The default config file name for this application
44 default_config_file_name = u'ipengine_config.py'
48 default_config_file_name = u'ipengine_config.py'
45
49
50 _description = """Start an IPython engine for parallel computing.\n\n
51
52 IPython engines run in parallel and perform computations on behalf of a client
53 and controller. A controller needs to be started before the engines. The
54 engine can be configured using command line options or using a cluster
55 directory. Cluster directories contain config, log and security files and are
56 usually located in your ipython directory and named as "cluster_<profile>".
57 See the `profile` and `cluster_dir` options for details.
58 """
59
60
61 #-----------------------------------------------------------------------------
62 # MPI configuration
63 #-----------------------------------------------------------------------------
46
64
47 mpi4py_init = """from mpi4py import MPI as mpi
65 mpi4py_init = """from mpi4py import MPI as mpi
48 mpi.size = mpi.COMM_WORLD.Get_size()
66 mpi.size = mpi.COMM_WORLD.Get_size()
@@ -58,123 +76,75 b' mpi.rank = 0'
58 mpi.size = 0
76 mpi.size = 0
59 """
77 """
60
78
79 class MPI(Configurable):
80 """Configurable for MPI initialization"""
81 use = Str('', config=True,
82 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
83 )
61
84
62 _description = """Start an IPython engine for parallel computing.\n\n
85 def _on_use_changed(self, old, new):
86 # load default init script if it's not set
87 if not self.init_script:
88 self.init_script = self.default_inits.get(new, '')
89
90 init_script = Str('', config=True,
91 help="Initialization code for MPI")
92
93 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
94 config=True)
63
95
64 IPython engines run in parallel and perform computations on behalf of a client
65 and controller. A controller needs to be started before the engines. The
66 engine can be configured using command line options or using a cluster
67 directory. Cluster directories contain config, log and security files and are
68 usually located in your ipython directory and named as "cluster_<profile>".
69 See the --profile and --cluster-dir options for details.
70 """
71
96
72 #-----------------------------------------------------------------------------
97 #-----------------------------------------------------------------------------
73 # Command line options
98 # Main application
74 #-----------------------------------------------------------------------------
99 #-----------------------------------------------------------------------------
75
100
76
101
77 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
102 class IPEngineApp(ClusterDirApplication):
78
103
79 def _add_arguments(self):
104 app_name = Unicode(u'ipengine')
80 super(IPEngineAppConfigLoader, self)._add_arguments()
105 description = Unicode(_description)
81 paa = self.parser.add_argument
106 default_config_file_name = default_config_file_name
82 # Controller config
107 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
83 paa('--file', '-f',
108
84 type=unicode, dest='Global.url_file',
109 startup_script = Unicode(u'', config=True,
85 help='The full location of the file containing the connection information fo '
110 help='specify a script to be run at startup')
86 'controller. If this is not given, the file must be in the '
111 startup_command = Str('', config=True,
87 'security directory of the cluster directory. This location is '
88 'resolved using the --profile and --app-dir options.',
89 metavar='Global.url_file')
90 # MPI
91 paa('--mpi',
92 type=str, dest='MPI.use',
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
94 metavar='MPI.use')
95 # Global config
96 paa('--log-to-file',
97 action='store_true', dest='Global.log_to_file',
98 help='Log to a file in the log directory (default is stdout)')
99 paa('--log-url',
100 dest='Global.log_url',
101 help="url of ZMQ logger, as started with iploggerz")
102 # paa('--execkey',
103 # type=str, dest='Global.exec_key',
104 # help='path to a file containing an execution key.',
105 # metavar='keyfile')
106 # paa('--no-secure',
107 # action='store_false', dest='Global.secure',
108 # help='Turn off execution keys.')
109 # paa('--secure',
110 # action='store_true', dest='Global.secure',
111 # help='Turn on execution keys (default).')
112 # init command
113 paa('-c',
114 type=str, dest='Global.extra_exec_lines',
115 help='specify a command to be run at startup')
112 help='specify a command to be run at startup')
116 paa('-s',
117 type=unicode, dest='Global.extra_exec_file',
118 help='specify a script to be run at startup')
119
120 factory.add_session_arguments(self.parser)
121 factory.add_registration_arguments(self.parser)
122
113
114 url_file = Unicode(u'', config=True,
115 help="""The full location of the file containing the connection information for
116 the controller. If this is not given, the file must be in the
117 security directory of the cluster directory. This location is
118 resolved using the `profile` or `cluster_dir` options.""",
119 )
123
120
124 #-----------------------------------------------------------------------------
121 url_file_name = Unicode(u'ipcontroller-engine.json')
125 # Main application
126 #-----------------------------------------------------------------------------
127
122
123 aliases = Dict(dict(
124 config = 'IPEngineApp.config_file',
125 file = 'IPEngineApp.url_file',
126 c = 'IPEngineApp.startup_command',
127 s = 'IPEngineApp.startup_script',
128
128
129 class IPEngineApp(ApplicationWithClusterDir):
129 ident = 'StreamSession.session',
130 user = 'StreamSession.username',
131 exec_key = 'StreamSession.keyfile',
130
132
131 name = u'ipengine'
133 url = 'EngineFactory.url',
132 description = _description
134 ip = 'EngineFactory.ip',
133 command_line_loader = IPEngineAppConfigLoader
135 transport = 'EngineFactory.transport',
134 default_config_file_name = default_config_file_name
136 port = 'EngineFactory.regport',
135 auto_create_cluster_dir = True
137 location = 'EngineFactory.location',
136
138
137 def create_default_config(self):
139 timeout = 'EngineFactory.timeout',
138 super(IPEngineApp, self).create_default_config()
140
139
141 profile = "ClusterDir.profile",
140 # The engine should not clean logs as we don't want to remove the
142 cluster_dir = 'ClusterDir.location',
141 # active log files of other running engines.
143
142 self.default_config.Global.clean_logs = False
144 mpi = 'MPI.use',
143 self.default_config.Global.secure = True
145
144
146 log_level = 'IPEngineApp.log_level',
145 # Global config attributes
147 ))
146 self.default_config.Global.exec_lines = []
147 self.default_config.Global.extra_exec_lines = ''
148 self.default_config.Global.extra_exec_file = u''
149
150 # Configuration related to the controller
151 # This must match the filename (path not included) that the controller
152 # used for the FURL file.
153 self.default_config.Global.url_file = u''
154 self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
155 # If given, this is the actual location of the controller's FURL file.
156 # If not, this is computed using the profile, app_dir and furl_file_name
157 # self.default_config.Global.key_file_name = u'exec_key.key'
158 # self.default_config.Global.key_file = u''
159
160 # MPI related config attributes
161 self.default_config.MPI.use = ''
162 self.default_config.MPI.mpi4py = mpi4py_init
163 self.default_config.MPI.pytrilinos = pytrilinos_init
164
165 def post_load_command_line_config(self):
166 pass
167
168 def pre_construct(self):
169 super(IPEngineApp, self).pre_construct()
170 # self.find_cont_url_file()
171 self.find_url_file()
172 if self.master_config.Global.extra_exec_lines:
173 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
174 if self.master_config.Global.extra_exec_file:
175 enc = sys.getfilesystemencoding() or 'utf8'
176 cmd="execfile(%r)"%self.master_config.Global.extra_exec_file.encode(enc)
177 self.master_config.Global.exec_lines.append(cmd)
178
148
179 # def find_key_file(self):
149 # def find_key_file(self):
180 # """Set the key file.
150 # """Set the key file.
@@ -198,49 +168,58 b' class IPEngineApp(ApplicationWithClusterDir):'
198 Here we don't try to actually see if it exists for is valid as that
168 Here we don't try to actually see if it exists for is valid as that
199 is hadled by the connection logic.
169 is hadled by the connection logic.
200 """
170 """
201 config = self.master_config
171 config = self.config
202 # Find the actual controller key file
172 # Find the actual controller key file
203 if not config.Global.url_file:
173 if not self.url_file:
204 try_this = os.path.join(
174 self.url_file = os.path.join(
205 config.Global.cluster_dir,
175 self.cluster_dir.security_dir,
206 config.Global.security_dir,
176 self.url_file_name
207 config.Global.url_file_name
208 )
177 )
209 config.Global.url_file = try_this
210
178
211 def construct(self):
179 def init_engine(self):
212 # This is the working dir by now.
180 # This is the working dir by now.
213 sys.path.insert(0, '')
181 sys.path.insert(0, '')
214 config = self.master_config
182 config = self.config
183 # print config
184 self.find_url_file()
185
215 # if os.path.exists(config.Global.key_file) and config.Global.secure:
186 # if os.path.exists(config.Global.key_file) and config.Global.secure:
216 # config.SessionFactory.exec_key = config.Global.key_file
187 # config.SessionFactory.exec_key = config.Global.key_file
217 if os.path.exists(config.Global.url_file):
188 if os.path.exists(self.url_file):
218 with open(config.Global.url_file) as f:
189 with open(self.url_file) as f:
219 d = json.loads(f.read())
190 d = json.loads(f.read())
220 for k,v in d.iteritems():
191 for k,v in d.iteritems():
221 if isinstance(v, unicode):
192 if isinstance(v, unicode):
222 d[k] = v.encode()
193 d[k] = v.encode()
223 if d['exec_key']:
194 if d['exec_key']:
224 config.SessionFactory.exec_key = d['exec_key']
195 config.StreamSession.key = d['exec_key']
225 d['url'] = disambiguate_url(d['url'], d['location'])
196 d['url'] = disambiguate_url(d['url'], d['location'])
226 config.RegistrationFactory.url=d['url']
197 config.EngineFactory.url = d['url']
227 config.EngineFactory.location = d['location']
198 config.EngineFactory.location = d['location']
228
199
200 try:
201 exec_lines = config.Kernel.exec_lines
202 except AttributeError:
203 config.Kernel.exec_lines = []
204 exec_lines = config.Kernel.exec_lines
229
205
230
206 if self.startup_script:
231 config.Kernel.exec_lines = config.Global.exec_lines
207 enc = sys.getfilesystemencoding() or 'utf8'
232
208 cmd="execfile(%r)"%self.startup_script.encode(enc)
233 self.start_mpi()
209 exec_lines.append(cmd)
210 if self.startup_command:
211 exec_lines.append(self.startup_command)
234
212
235 # Create the underlying shell class and EngineService
213 # Create the underlying shell class and Engine
236 # shell_class = import_item(self.master_config.Global.shell_class)
214 # shell_class = import_item(self.master_config.Global.shell_class)
215 # print self.config
237 try:
216 try:
238 self.engine = EngineFactory(config=config, logname=self.log.name)
217 self.engine = EngineFactory(config=config, log=self.log)
239 except:
218 except:
240 self.log.error("Couldn't start the Engine", exc_info=True)
219 self.log.error("Couldn't start the Engine", exc_info=True)
241 self.exit(1)
220 self.exit(1)
242
221
243 self.start_logging()
222 # self.start_logging()
244
223
245 # Create the service hierarchy
224 # Create the service hierarchy
246 # self.main_service = service.MultiService()
225 # self.main_service = service.MultiService()
@@ -258,22 +237,22 b' class IPEngineApp(ApplicationWithClusterDir):'
258
237
259 # reactor.callWhenRunning(self.call_connect)
238 # reactor.callWhenRunning(self.call_connect)
260
239
261
240 # def start_logging(self):
262 def start_logging(self):
241 # super(IPEngineApp, self).start_logging()
263 super(IPEngineApp, self).start_logging()
242 # if self.master_config.Global.log_url:
264 if self.master_config.Global.log_url:
243 # context = self.engine.context
265 context = self.engine.context
244 # lsock = context.socket(zmq.PUB)
266 lsock = context.socket(zmq.PUB)
245 # lsock.connect(self.master_config.Global.log_url)
267 lsock.connect(self.master_config.Global.log_url)
246 # handler = EnginePUBHandler(self.engine, lsock)
268 handler = EnginePUBHandler(self.engine, lsock)
247 # handler.setLevel(self.log_level)
269 handler.setLevel(self.log_level)
248 # self.log.addHandler(handler)
270 self.log.addHandler(handler)
249 #
271
250 def init_mpi(self):
272 def start_mpi(self):
273 global mpi
251 global mpi
274 mpikey = self.master_config.MPI.use
252 self.mpi = MPI(config=self.config)
275 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
253
276 if mpi_import_statement is not None:
254 mpi_import_statement = self.mpi.init_script
255 if mpi_import_statement:
277 try:
256 try:
278 self.log.info("Initializing MPI:")
257 self.log.info("Initializing MPI:")
279 self.log.info(mpi_import_statement)
258 self.log.info(mpi_import_statement)
@@ -284,7 +263,7 b' class IPEngineApp(ApplicationWithClusterDir):'
284 mpi = None
263 mpi = None
285
264
286
265
287 def start_app(self):
266 def start(self):
288 self.engine.start()
267 self.engine.start()
289 try:
268 try:
290 self.engine.loop.start()
269 self.engine.loop.start()
@@ -293,8 +272,27 b' class IPEngineApp(ApplicationWithClusterDir):'
293
272
294
273
295 def launch_new_instance():
274 def launch_new_instance():
296 """Create and run the IPython controller"""
275 """Create and run the IPython engine"""
297 app = IPEngineApp()
276 app = IPEngineApp()
277 app.parse_command_line()
278 cl_config = app.config
279 app.init_clusterdir()
280 # app.load_config_file()
281 # print app.config
282 if app.config_file:
283 app.load_config_file(app.config_file)
284 else:
285 app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
286
287 # command-line should *override* config file, but command-line is necessary
288 # to determine clusterdir, etc.
289 app.update_config(cl_config)
290
291 # print app.config
292 app.to_work_dir()
293 app.init_mpi()
294 app.init_engine()
295 print app.config
298 app.start()
296 app.start()
299
297
300
298
@@ -21,7 +21,7 b' import sys'
21 import zmq
21 import zmq
22
22
23 from IPython.parallel.apps.clusterdir import (
23 from IPython.parallel.apps.clusterdir import (
24 ApplicationWithClusterDir,
24 ClusterDirApplication,
25 ClusterDirConfigLoader
25 ClusterDirConfigLoader
26 )
26 )
27 from IPython.parallel.apps.logwatcher import LogWatcher
27 from IPython.parallel.apps.logwatcher import LogWatcher
@@ -74,7 +74,7 b' class IPLoggerAppConfigLoader(ClusterDirConfigLoader):'
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 class IPLoggerApp(ApplicationWithClusterDir):
77 class IPLoggerApp(ClusterDirApplication):
78
78
79 name = u'iploggerz'
79 name = u'iploggerz'
80 description = _description
80 description = _description
@@ -106,7 +106,7 b' class BaseLauncher(LoggingFactory):'
106 # This should not be used to set the work_dir for the actual engine
106 # This should not be used to set the work_dir for the actual engine
107 # and controller. Instead, use their own config files or the
107 # and controller. Instead, use their own config files or the
108 # controller_args, engine_args attributes of the launchers to add
108 # controller_args, engine_args attributes of the launchers to add
109 # the --work-dir option.
109 # the work_dir option.
110 work_dir = Unicode(u'.')
110 work_dir = Unicode(u'.')
111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
112
112
@@ -328,16 +328,18 b' class LocalProcessLauncher(BaseLauncher):'
328 class LocalControllerLauncher(LocalProcessLauncher):
328 class LocalControllerLauncher(LocalProcessLauncher):
329 """Launch a controller as a regular external process."""
329 """Launch a controller as a regular external process."""
330
330
331 controller_cmd = List(ipcontroller_cmd_argv, config=True)
331 controller_cmd = List(ipcontroller_cmd_argv, config=True,
332 help="""Popen command to launch ipcontroller.""")
332 # Command line arguments to ipcontroller.
333 # Command line arguments to ipcontroller.
333 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
334 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
335 help="""command-line args to pass to ipcontroller""")
334
336
335 def find_args(self):
337 def find_args(self):
336 return self.controller_cmd + self.controller_args
338 return self.controller_cmd + self.controller_args
337
339
338 def start(self, cluster_dir):
340 def start(self, cluster_dir):
339 """Start the controller by cluster_dir."""
341 """Start the controller by cluster_dir."""
340 self.controller_args.extend(['--cluster-dir', cluster_dir])
342 self.controller_args.extend(['cluster_dir=%s'%cluster_dir])
341 self.cluster_dir = unicode(cluster_dir)
343 self.cluster_dir = unicode(cluster_dir)
342 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
344 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
343 return super(LocalControllerLauncher, self).start()
345 return super(LocalControllerLauncher, self).start()
@@ -346,10 +348,11 b' class LocalControllerLauncher(LocalProcessLauncher):'
346 class LocalEngineLauncher(LocalProcessLauncher):
348 class LocalEngineLauncher(LocalProcessLauncher):
347 """Launch a single engine as a regular externall process."""
349 """Launch a single engine as a regular externall process."""
348
350
349 engine_cmd = List(ipengine_cmd_argv, config=True)
351 engine_cmd = List(ipengine_cmd_argv, config=True,
352 help="""command to launch the Engine.""")
350 # Command line arguments for ipengine.
353 # Command line arguments for ipengine.
351 engine_args = List(
354 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
352 ['--log-to-file','--log-level', str(logging.INFO)], config=True
355 help="command-line arguments to pass to ipengine"
353 )
356 )
354
357
355 def find_args(self):
358 def find_args(self):
@@ -357,7 +360,7 b' class LocalEngineLauncher(LocalProcessLauncher):'
357
360
358 def start(self, cluster_dir):
361 def start(self, cluster_dir):
359 """Start the engine by cluster_dir."""
362 """Start the engine by cluster_dir."""
360 self.engine_args.extend(['--cluster-dir', cluster_dir])
363 self.engine_args.extend(['cluster_dir=%s'%cluster_dir])
361 self.cluster_dir = unicode(cluster_dir)
364 self.cluster_dir = unicode(cluster_dir)
362 return super(LocalEngineLauncher, self).start()
365 return super(LocalEngineLauncher, self).start()
363
366
@@ -367,7 +370,8 b' class LocalEngineSetLauncher(BaseLauncher):'
367
370
368 # Command line arguments for ipengine.
371 # Command line arguments for ipengine.
369 engine_args = List(
372 engine_args = List(
370 ['--log-to-file','--log-level', str(logging.INFO)], config=True
373 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
374 help="command-line arguments to pass to ipengine"
371 )
375 )
372 # launcher class
376 # launcher class
373 launcher_class = LocalEngineLauncher
377 launcher_class = LocalEngineLauncher
@@ -442,16 +446,18 b' class LocalEngineSetLauncher(BaseLauncher):'
442 class MPIExecLauncher(LocalProcessLauncher):
446 class MPIExecLauncher(LocalProcessLauncher):
443 """Launch an external process using mpiexec."""
447 """Launch an external process using mpiexec."""
444
448
445 # The mpiexec command to use in starting the process.
449 mpi_cmd = List(['mpiexec'], config=True,
446 mpi_cmd = List(['mpiexec'], config=True)
450 help="The mpiexec command to use in starting the process."
447 # The command line arguments to pass to mpiexec.
451 )
448 mpi_args = List([], config=True)
452 mpi_args = List([], config=True,
449 # The program to start using mpiexec.
453 help="The command line arguments to pass to mpiexec."
450 program = List(['date'], config=True)
454 )
451 # The command line argument to the program.
455 program = List(['date'], config=True,
452 program_args = List([], config=True)
456 help="The program to start via mpiexec.")
453 # The number of instances of the program to start.
457 program_args = List([], config=True,
454 n = Int(1, config=True)
458 help="The command line argument to the program."
459 )
460 n = Int(1)
455
461
456 def find_args(self):
462 def find_args(self):
457 """Build self.args using all the fields."""
463 """Build self.args using all the fields."""
@@ -467,14 +473,17 b' class MPIExecLauncher(LocalProcessLauncher):'
467 class MPIExecControllerLauncher(MPIExecLauncher):
473 class MPIExecControllerLauncher(MPIExecLauncher):
468 """Launch a controller using mpiexec."""
474 """Launch a controller using mpiexec."""
469
475
470 controller_cmd = List(ipcontroller_cmd_argv, config=True)
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
471 # Command line arguments to ipcontroller.
477 help="Popen command to launch the Contropper"
472 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
478 )
473 n = Int(1, config=False)
479 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
480 help="Command line arguments to pass to ipcontroller."
481 )
482 n = Int(1)
474
483
475 def start(self, cluster_dir):
484 def start(self, cluster_dir):
476 """Start the controller by cluster_dir."""
485 """Start the controller by cluster_dir."""
477 self.controller_args.extend(['--cluster-dir', cluster_dir])
486 self.controller_args.extend(['cluster_dir=%s'%cluster_dir])
478 self.cluster_dir = unicode(cluster_dir)
487 self.cluster_dir = unicode(cluster_dir)
479 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
480 return super(MPIExecControllerLauncher, self).start(1)
489 return super(MPIExecControllerLauncher, self).start(1)
@@ -486,16 +495,18 b' class MPIExecControllerLauncher(MPIExecLauncher):'
486
495
487 class MPIExecEngineSetLauncher(MPIExecLauncher):
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
488
497
489 program = List(ipengine_cmd_argv, config=True)
498 program = List(ipengine_cmd_argv, config=True,
490 # Command line arguments for ipengine.
499 help="Popen command for ipengine"
500 )
491 program_args = List(
501 program_args = List(
492 ['--log-to-file','--log-level', str(logging.INFO)], config=True
502 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
503 help="Command line arguments for ipengine."
493 )
504 )
494 n = Int(1, config=True)
505 n = Int(1)
495
506
496 def start(self, n, cluster_dir):
507 def start(self, n, cluster_dir):
497 """Start n engines by profile or cluster_dir."""
508 """Start n engines by profile or cluster_dir."""
498 self.program_args.extend(['--cluster-dir', cluster_dir])
509 self.program_args.extend(['cluster_dir=%s'%cluster_dir])
499 self.cluster_dir = unicode(cluster_dir)
510 self.cluster_dir = unicode(cluster_dir)
500 self.n = n
511 self.n = n
501 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
@@ -515,13 +526,20 b' class SSHLauncher(LocalProcessLauncher):'
515 as well.
526 as well.
516 """
527 """
517
528
518 ssh_cmd = List(['ssh'], config=True)
529 ssh_cmd = List(['ssh'], config=True,
519 ssh_args = List(['-tt'], config=True)
530 help="command for starting ssh")
520 program = List(['date'], config=True)
531 ssh_args = List(['-tt'], config=True,
521 program_args = List([], config=True)
532 help="args to pass to ssh")
522 hostname = CUnicode('', config=True)
533 program = List(['date'], config=True,
523 user = CUnicode('', config=True)
534 help="Program to launch via ssh")
524 location = CUnicode('')
535 program_args = List([], config=True,
536 help="args to pass to remote program")
537 hostname = CUnicode('', config=True,
538 help="hostname on which to launch the program")
539 user = CUnicode('', config=True,
540 help="username for ssh")
541 location = CUnicode('', config=True,
542 help="user@hostname location for ssh in one setting")
525
543
526 def _hostname_changed(self, name, old, new):
544 def _hostname_changed(self, name, old, new):
527 if self.user:
545 if self.user:
@@ -555,21 +573,26 b' class SSHLauncher(LocalProcessLauncher):'
555
573
556 class SSHControllerLauncher(SSHLauncher):
574 class SSHControllerLauncher(SSHLauncher):
557
575
558 program = List(ipcontroller_cmd_argv, config=True)
576 program = List(ipcontroller_cmd_argv, config=True,
559 # Command line arguments to ipcontroller.
577 help="remote ipcontroller command.")
560 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
578 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
579 help="Command line arguments to ipcontroller.")
561
580
562
581
563 class SSHEngineLauncher(SSHLauncher):
582 class SSHEngineLauncher(SSHLauncher):
564 program = List(ipengine_cmd_argv, config=True)
583 program = List(ipengine_cmd_argv, config=True,
584 help="remote ipengine command.")
565 # Command line arguments for ipengine.
585 # Command line arguments for ipengine.
566 program_args = List(
586 program_args = List(
567 ['--log-to-file','--log-level', str(logging.INFO)], config=True
587 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
588 help="Command line arguments to ipengine."
568 )
589 )
569
590
570 class SSHEngineSetLauncher(LocalEngineSetLauncher):
591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
571 launcher_class = SSHEngineLauncher
592 launcher_class = SSHEngineLauncher
572 engines = Dict(config=True)
593 engines = Dict(config=True,
594 help="""dict of engines to launch. This is a dict by hostname of ints,
595 corresponding to the number of engines to start on that host.""")
573
596
574 def start(self, n, cluster_dir):
597 def start(self, n, cluster_dir):
575 """Start engines by profile or cluster_dir.
598 """Start engines by profile or cluster_dir.
@@ -624,17 +647,19 b' def find_job_cmd():'
624
647
625 class WindowsHPCLauncher(BaseLauncher):
648 class WindowsHPCLauncher(BaseLauncher):
626
649
627 # A regular expression used to get the job id from the output of the
650 job_id_regexp = Str(r'\d+', config=True,
628 # submit_command.
651 help="""A regular expression used to get the job id from the output of the
629 job_id_regexp = Str(r'\d+', config=True)
652 submit_command. """
630 # The filename of the instantiated job script.
653 )
631 job_file_name = CUnicode(u'ipython_job.xml', config=True)
654 job_file_name = CUnicode(u'ipython_job.xml', config=True,
655 help="The filename of the instantiated job script.")
632 # The full path to the instantiated job script. This gets made dynamically
656 # The full path to the instantiated job script. This gets made dynamically
633 # by combining the work_dir with the job_file_name.
657 # by combining the work_dir with the job_file_name.
634 job_file = CUnicode(u'')
658 job_file = CUnicode(u'')
635 # The hostname of the scheduler to submit the job to
659 scheduler = CUnicode('', config=True,
636 scheduler = CUnicode('', config=True)
660 help="The hostname of the scheduler to submit the job to.")
637 job_cmd = CUnicode(find_job_cmd(), config=True)
661 job_cmd = CUnicode(find_job_cmd(), config=True,
662 help="The command for submitting jobs.")
638
663
639 def __init__(self, work_dir=u'.', config=None, **kwargs):
664 def __init__(self, work_dir=u'.', config=None, **kwargs):
640 super(WindowsHPCLauncher, self).__init__(
665 super(WindowsHPCLauncher, self).__init__(
@@ -702,8 +727,10 b' class WindowsHPCLauncher(BaseLauncher):'
702
727
703 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
728 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
704
729
705 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
730 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True,
706 extra_args = List([], config=False)
731 help="WinHPC xml job file.")
732 extra_args = List([], config=False,
733 help="extra args to pass to ipcontroller")
707
734
708 def write_job_file(self, n):
735 def write_job_file(self, n):
709 job = IPControllerJob(config=self.config)
736 job = IPControllerJob(config=self.config)
@@ -713,7 +740,7 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
713 # the controller. It is used as the base path for the stdout/stderr
740 # the controller. It is used as the base path for the stdout/stderr
714 # files that the scheduler redirects to.
741 # files that the scheduler redirects to.
715 t.work_directory = self.cluster_dir
742 t.work_directory = self.cluster_dir
716 # Add the --cluster-dir and from self.start().
743 # Add the cluster_dir and from self.start().
717 t.controller_args.extend(self.extra_args)
744 t.controller_args.extend(self.extra_args)
718 job.add_task(t)
745 job.add_task(t)
719
746
@@ -726,15 +753,17 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
726
753
727 def start(self, cluster_dir):
754 def start(self, cluster_dir):
728 """Start the controller by cluster_dir."""
755 """Start the controller by cluster_dir."""
729 self.extra_args = ['--cluster-dir', cluster_dir]
756 self.extra_args = ['cluster_dir=%s'%cluster_dir]
730 self.cluster_dir = unicode(cluster_dir)
757 self.cluster_dir = unicode(cluster_dir)
731 return super(WindowsHPCControllerLauncher, self).start(1)
758 return super(WindowsHPCControllerLauncher, self).start(1)
732
759
733
760
734 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
761 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
735
762
736 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
763 job_file_name = CUnicode(u'ipengineset_job.xml', config=True,
737 extra_args = List([], config=False)
764 help="jobfile for ipengines job")
765 extra_args = List([], config=False,
766 help="extra args to pas to ipengine")
738
767
739 def write_job_file(self, n):
768 def write_job_file(self, n):
740 job = IPEngineSetJob(config=self.config)
769 job = IPEngineSetJob(config=self.config)
@@ -745,7 +774,7 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
745 # the engine. It is used as the base path for the stdout/stderr
774 # the engine. It is used as the base path for the stdout/stderr
746 # files that the scheduler redirects to.
775 # files that the scheduler redirects to.
747 t.work_directory = self.cluster_dir
776 t.work_directory = self.cluster_dir
748 # Add the --cluster-dir and from self.start().
777 # Add the cluster_dir and from self.start().
749 t.engine_args.extend(self.extra_args)
778 t.engine_args.extend(self.extra_args)
750 job.add_task(t)
779 job.add_task(t)
751
780
@@ -758,7 +787,7 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
758
787
759 def start(self, n, cluster_dir):
788 def start(self, n, cluster_dir):
760 """Start the controller by cluster_dir."""
789 """Start the controller by cluster_dir."""
761 self.extra_args = ['--cluster-dir', cluster_dir]
790 self.extra_args = ['cluster_dir=%s'%cluster_dir]
762 self.cluster_dir = unicode(cluster_dir)
791 self.cluster_dir = unicode(cluster_dir)
763 return super(WindowsHPCEngineSetLauncher, self).start(n)
792 return super(WindowsHPCEngineSetLauncher, self).start(n)
764
793
@@ -782,21 +811,21 b' class BatchSystemLauncher(BaseLauncher):'
782 """
811 """
783
812
784 # Subclasses must fill these in. See PBSEngineSet
813 # Subclasses must fill these in. See PBSEngineSet
785 # The name of the command line program used to submit jobs.
814 submit_command = List([''], config=True,
786 submit_command = List([''], config=True)
815 help="The name of the command line program used to submit jobs.")
787 # The name of the command line program used to delete jobs.
816 delete_command = List([''], config=True,
788 delete_command = List([''], config=True)
817 help="The name of the command line program used to delete jobs.")
789 # A regular expression used to get the job id from the output of the
818 job_id_regexp = CUnicode('', config=True,
790 # submit_command.
819 help="""A regular expression used to get the job id from the output of the
791 job_id_regexp = CUnicode('', config=True)
820 submit_command.""")
792 # The string that is the batch script template itself.
821 batch_template = CUnicode('', config=True,
793 batch_template = CUnicode('', config=True)
822 help="The string that is the batch script template itself.")
794 # The file that contains the batch template
823 batch_template_file = CUnicode(u'', config=True,
795 batch_template_file = CUnicode(u'', config=True)
824 help="The file that contains the batch template.")
796 # The filename of the instantiated batch script.
825 batch_file_name = CUnicode(u'batch_script', config=True,
797 batch_file_name = CUnicode(u'batch_script', config=True)
826 help="The filename of the instantiated batch script.")
798 # The PBS Queue
827 queue = CUnicode(u'', config=True,
799 queue = CUnicode(u'', config=True)
828 help="The PBS Queue.")
800
829
801 # not configurable, override in subclasses
830 # not configurable, override in subclasses
802 # PBS Job Array regex
831 # PBS Job Array regex
@@ -891,9 +920,12 b' class BatchSystemLauncher(BaseLauncher):'
891 class PBSLauncher(BatchSystemLauncher):
920 class PBSLauncher(BatchSystemLauncher):
892 """A BatchSystemLauncher subclass for PBS."""
921 """A BatchSystemLauncher subclass for PBS."""
893
922
894 submit_command = List(['qsub'], config=True)
923 submit_command = List(['qsub'], config=True,
895 delete_command = List(['qdel'], config=True)
924 help="The PBS submit command ['qsub']")
896 job_id_regexp = CUnicode(r'\d+', config=True)
925 delete_command = List(['qdel'], config=True,
926 help="The PBS delete command ['qsub']")
927 job_id_regexp = CUnicode(r'\d+', config=True,
928 help="Regular expresion for identifying the job ID [r'\d+']")
897
929
898 batch_file = CUnicode(u'')
930 batch_file = CUnicode(u'')
899 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
931 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
@@ -905,11 +937,12 b' class PBSLauncher(BatchSystemLauncher):'
905 class PBSControllerLauncher(PBSLauncher):
937 class PBSControllerLauncher(PBSLauncher):
906 """Launch a controller using PBS."""
938 """Launch a controller using PBS."""
907
939
908 batch_file_name = CUnicode(u'pbs_controller', config=True)
940 batch_file_name = CUnicode(u'pbs_controller', config=True,
941 help="batch file name for the controller job.")
909 default_template= CUnicode("""#!/bin/sh
942 default_template= CUnicode("""#!/bin/sh
910 #PBS -V
943 #PBS -V
911 #PBS -N ipcontroller
944 #PBS -N ipcontroller
912 %s --log-to-file --cluster-dir $cluster_dir
945 %s --log-to-file cluster_dir $cluster_dir
913 """%(' '.join(ipcontroller_cmd_argv)))
946 """%(' '.join(ipcontroller_cmd_argv)))
914
947
915 def start(self, cluster_dir):
948 def start(self, cluster_dir):
@@ -920,11 +953,12 b' class PBSControllerLauncher(PBSLauncher):'
920
953
921 class PBSEngineSetLauncher(PBSLauncher):
954 class PBSEngineSetLauncher(PBSLauncher):
922 """Launch Engines using PBS"""
955 """Launch Engines using PBS"""
923 batch_file_name = CUnicode(u'pbs_engines', config=True)
956 batch_file_name = CUnicode(u'pbs_engines', config=True,
957 help="batch file name for the engine(s) job.")
924 default_template= CUnicode(u"""#!/bin/sh
958 default_template= CUnicode(u"""#!/bin/sh
925 #PBS -V
959 #PBS -V
926 #PBS -N ipengine
960 #PBS -N ipengine
927 %s --cluster-dir $cluster_dir
961 %s cluster_dir $cluster_dir
928 """%(' '.join(ipengine_cmd_argv)))
962 """%(' '.join(ipengine_cmd_argv)))
929
963
930 def start(self, n, cluster_dir):
964 def start(self, n, cluster_dir):
@@ -944,11 +978,12 b' class SGELauncher(PBSLauncher):'
944 class SGEControllerLauncher(SGELauncher):
978 class SGEControllerLauncher(SGELauncher):
945 """Launch a controller using SGE."""
979 """Launch a controller using SGE."""
946
980
947 batch_file_name = CUnicode(u'sge_controller', config=True)
981 batch_file_name = CUnicode(u'sge_controller', config=True,
982 help="batch file name for the ipontroller job.")
948 default_template= CUnicode(u"""#$$ -V
983 default_template= CUnicode(u"""#$$ -V
949 #$$ -S /bin/sh
984 #$$ -S /bin/sh
950 #$$ -N ipcontroller
985 #$$ -N ipcontroller
951 %s --log-to-file --cluster-dir $cluster_dir
986 %s --log-to-file cluster_dir=$cluster_dir
952 """%(' '.join(ipcontroller_cmd_argv)))
987 """%(' '.join(ipcontroller_cmd_argv)))
953
988
954 def start(self, cluster_dir):
989 def start(self, cluster_dir):
@@ -958,11 +993,12 b' class SGEControllerLauncher(SGELauncher):'
958
993
959 class SGEEngineSetLauncher(SGELauncher):
994 class SGEEngineSetLauncher(SGELauncher):
960 """Launch Engines with SGE"""
995 """Launch Engines with SGE"""
961 batch_file_name = CUnicode(u'sge_engines', config=True)
996 batch_file_name = CUnicode(u'sge_engines', config=True,
997 help="batch file name for the engine(s) job.")
962 default_template = CUnicode("""#$$ -V
998 default_template = CUnicode("""#$$ -V
963 #$$ -S /bin/sh
999 #$$ -S /bin/sh
964 #$$ -N ipengine
1000 #$$ -N ipengine
965 %s --cluster-dir $cluster_dir
1001 %s cluster_dir=$cluster_dir
966 """%(' '.join(ipengine_cmd_argv)))
1002 """%(' '.join(ipengine_cmd_argv)))
967
1003
968 def start(self, n, cluster_dir):
1004 def start(self, n, cluster_dir):
@@ -979,18 +1015,56 b' class SGEEngineSetLauncher(SGELauncher):'
979 class IPClusterLauncher(LocalProcessLauncher):
1015 class IPClusterLauncher(LocalProcessLauncher):
980 """Launch the ipcluster program in an external process."""
1016 """Launch the ipcluster program in an external process."""
981
1017
982 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
1018 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
983 # Command line arguments to pass to ipcluster.
1019 help="Popen command for ipcluster")
984 ipcluster_args = List(
1020 ipcluster_args = List(
985 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
1021 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1022 help="Command line arguments to pass to ipcluster.")
986 ipcluster_subcommand = Str('start')
1023 ipcluster_subcommand = Str('start')
987 ipcluster_n = Int(2)
1024 ipcluster_n = Int(2)
988
1025
989 def find_args(self):
1026 def find_args(self):
990 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1027 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
991 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
1028 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
992
1029
993 def start(self):
1030 def start(self):
994 self.log.info("Starting ipcluster: %r" % self.args)
1031 self.log.info("Starting ipcluster: %r" % self.args)
995 return super(IPClusterLauncher, self).start()
1032 return super(IPClusterLauncher, self).start()
996
1033
1034 #-----------------------------------------------------------------------------
1035 # Collections of launchers
1036 #-----------------------------------------------------------------------------
1037
1038 local_launchers = [
1039 LocalControllerLauncher,
1040 LocalEngineLauncher,
1041 LocalEngineSetLauncher,
1042 ]
1043 mpi_launchers = [
1044 MPIExecLauncher,
1045 MPIExecControllerLauncher,
1046 MPIExecEngineSetLauncher,
1047 ]
1048 ssh_launchers = [
1049 SSHLauncher,
1050 SSHControllerLauncher,
1051 SSHEngineLauncher,
1052 SSHEngineSetLauncher,
1053 ]
1054 winhpc_launchers = [
1055 WindowsHPCLauncher,
1056 WindowsHPCControllerLauncher,
1057 WindowsHPCEngineSetLauncher,
1058 ]
1059 pbs_launchers = [
1060 PBSLauncher,
1061 PBSControllerLauncher,
1062 PBSEngineSetLauncher,
1063 ]
1064 sge_launchers = [
1065 SGELauncher,
1066 SGEControllerLauncher,
1067 SGEEngineSetLauncher,
1068 ]
1069 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1070 + pbs_launchers + sge_launchers No newline at end of file
@@ -296,7 +296,7 b' class Client(HasTraits):'
296 if username is None:
296 if username is None:
297 self.session = ss.StreamSession(**key_arg)
297 self.session = ss.StreamSession(**key_arg)
298 else:
298 else:
299 self.session = ss.StreamSession(username, **key_arg)
299 self.session = ss.StreamSession(username=username, **key_arg)
300 self._query_socket = self._context.socket(zmq.XREQ)
300 self._query_socket = self._context.socket(zmq.XREQ)
301 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
301 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
302 if self._ssh:
302 if self._ssh:
@@ -18,7 +18,7 b' import zmq'
18 from zmq.devices import ProcessDevice, ThreadDevice
18 from zmq.devices import ProcessDevice, ThreadDevice
19 from zmq.eventloop import ioloop, zmqstream
19 from zmq.eventloop import ioloop, zmqstream
20
20
21 from IPython.utils.traitlets import Set, Instance, CFloat, Bool
21 from IPython.utils.traitlets import Set, Instance, CFloat, Bool, CStr
22 from IPython.parallel.factory import LoggingFactory
22 from IPython.parallel.factory import LoggingFactory
23
23
24 class Heart(object):
24 class Heart(object):
@@ -53,14 +53,16 b' class HeartMonitor(LoggingFactory):'
53 pongstream: an XREP stream
53 pongstream: an XREP stream
54 period: the period of the heartbeat in milliseconds"""
54 period: the period of the heartbeat in milliseconds"""
55
55
56 period=CFloat(1000, config=True) # in milliseconds
56 period=CFloat(1000, config=True,
57 help='The frequency at which the Hub pings the engines for heartbeats '
58 ' (in ms) [default: 100]',
59 )
57
60
58 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
61 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
59 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
62 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
60 loop = Instance('zmq.eventloop.ioloop.IOLoop')
63 loop = Instance('zmq.eventloop.ioloop.IOLoop')
61 def _loop_default(self):
64 def _loop_default(self):
62 return ioloop.IOLoop.instance()
65 return ioloop.IOLoop.instance()
63 debug=Bool(False)
64
66
65 # not settable:
67 # not settable:
66 hearts=Set()
68 hearts=Set()
@@ -25,7 +25,9 b' from zmq.eventloop.zmqstream import ZMQStream'
25
25
26 # internal:
26 # internal:
27 from IPython.utils.importstring import import_item
27 from IPython.utils.importstring import import_item
28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
28 from IPython.utils.traitlets import (
29 HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool, Tuple
30 )
29
31
30 from IPython.parallel import error, util
32 from IPython.parallel import error, util
31 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
33 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
@@ -112,59 +114,71 b' class EngineConnector(HasTraits):'
112 class HubFactory(RegistrationFactory):
114 class HubFactory(RegistrationFactory):
113 """The Configurable for setting up a Hub."""
115 """The Configurable for setting up a Hub."""
114
116
115 # name of a scheduler scheme
116 scheme = Str('leastload', config=True)
117
118 # port-pairs for monitoredqueues:
117 # port-pairs for monitoredqueues:
119 hb = Instance(list, config=True)
118 hb = Tuple(Int,Int,config=True,
119 help="""XREQ/SUB Port pair for Engine heartbeats""")
120 def _hb_default(self):
120 def _hb_default(self):
121 return util.select_random_ports(2)
121 return tuple(util.select_random_ports(2))
122
123 mux = Tuple(Int,Int,config=True,
124 help="""Engine/Client Port pair for MUX queue""")
122
125
123 mux = Instance(list, config=True)
124 def _mux_default(self):
126 def _mux_default(self):
125 return util.select_random_ports(2)
127 return tuple(util.select_random_ports(2))
126
128
127 task = Instance(list, config=True)
129 task = Tuple(Int,Int,config=True,
130 help="""Engine/Client Port pair for Task queue""")
128 def _task_default(self):
131 def _task_default(self):
129 return util.select_random_ports(2)
132 return tuple(util.select_random_ports(2))
133
134 control = Tuple(Int,Int,config=True,
135 help="""Engine/Client Port pair for Control queue""")
130
136
131 control = Instance(list, config=True)
132 def _control_default(self):
137 def _control_default(self):
133 return util.select_random_ports(2)
138 return tuple(util.select_random_ports(2))
139
140 iopub = Tuple(Int,Int,config=True,
141 help="""Engine/Client Port pair for IOPub relay""")
134
142
135 iopub = Instance(list, config=True)
136 def _iopub_default(self):
143 def _iopub_default(self):
137 return util.select_random_ports(2)
144 return tuple(util.select_random_ports(2))
138
145
139 # single ports:
146 # single ports:
140 mon_port = Instance(int, config=True)
147 mon_port = Int(config=True,
148 help="""Monitor (SUB) port for queue traffic""")
149
141 def _mon_port_default(self):
150 def _mon_port_default(self):
142 return util.select_random_ports(1)[0]
151 return util.select_random_ports(1)[0]
143
152
144 notifier_port = Instance(int, config=True)
153 notifier_port = Int(config=True,
154 help="""PUB port for sending engine status notifications""")
155
145 def _notifier_port_default(self):
156 def _notifier_port_default(self):
146 return util.select_random_ports(1)[0]
157 return util.select_random_ports(1)[0]
147
158
148 ping = Int(1000, config=True) # ping frequency
159 engine_ip = CStr('127.0.0.1', config=True,
160 help="IP on which to listen for engine connections. [default: loopback]")
161 engine_transport = CStr('tcp', config=True,
162 help="0MQ transport for engine connections. [default: tcp]")
149
163
150 engine_ip = CStr('127.0.0.1', config=True)
164 client_ip = CStr('127.0.0.1', config=True,
151 engine_transport = CStr('tcp', config=True)
165 help="IP on which to listen for client connections. [default: loopback]")
166 client_transport = CStr('tcp', config=True,
167 help="0MQ transport for client connections. [default : tcp]")
152
168
153 client_ip = CStr('127.0.0.1', config=True)
169 monitor_ip = CStr('127.0.0.1', config=True,
154 client_transport = CStr('tcp', config=True)
170 help="IP on which to listen for monitor messages. [default: loopback]")
155
171 monitor_transport = CStr('tcp', config=True,
156 monitor_ip = CStr('127.0.0.1', config=True)
172 help="0MQ transport for monitor messages. [default : tcp]")
157 monitor_transport = CStr('tcp', config=True)
158
173
159 monitor_url = CStr('')
174 monitor_url = CStr('')
160
175
161 db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True)
176 db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True,
177 help="""The class to use for the DB backend""")
162
178
163 # not configurable
179 # not configurable
164 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
180 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
165 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
181 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
166 subconstructors = List()
167 _constructed = Bool(False)
168
182
169 def _ip_changed(self, name, old, new):
183 def _ip_changed(self, name, old, new):
170 self.engine_ip = new
184 self.engine_ip = new
@@ -186,24 +200,17 b' class HubFactory(RegistrationFactory):'
186 self._update_monitor_url()
200 self._update_monitor_url()
187 # self.on_trait_change(self._sync_ips, 'ip')
201 # self.on_trait_change(self._sync_ips, 'ip')
188 # self.on_trait_change(self._sync_transports, 'transport')
202 # self.on_trait_change(self._sync_transports, 'transport')
189 self.subconstructors.append(self.construct_hub)
203 # self.subconstructors.append(self.construct_hub)
190
204
191
205
192 def construct(self):
206 def construct(self):
193 assert not self._constructed, "already constructed!"
207 self.init_hub()
194
195 for subc in self.subconstructors:
196 subc()
197
198 self._constructed = True
199
200
208
201 def start(self):
209 def start(self):
202 assert self._constructed, "must be constructed by self.construct() first!"
203 self.heartmonitor.start()
210 self.heartmonitor.start()
204 self.log.info("Heartmonitor started")
211 self.log.info("Heartmonitor started")
205
212
206 def construct_hub(self):
213 def init_hub(self):
207 """construct"""
214 """construct"""
208 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
215 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
209 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
216 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
@@ -227,7 +234,7 b' class HubFactory(RegistrationFactory):'
227 hrep = ctx.socket(zmq.XREP)
234 hrep = ctx.socket(zmq.XREP)
228 hrep.bind(engine_iface % self.hb[1])
235 hrep.bind(engine_iface % self.hb[1])
229 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
236 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
230 period=self.ping, logname=self.log.name)
237 config=self.config)
231
238
232 ### Client connections ###
239 ### Client connections ###
233 # Notifier socket
240 # Notifier socket
@@ -248,7 +255,11 b' class HubFactory(RegistrationFactory):'
248 # cdir = self.config.Global.cluster_dir
255 # cdir = self.config.Global.cluster_dir
249 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
256 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
250 time.sleep(.25)
257 time.sleep(.25)
251
258 try:
259 scheme = self.config.TaskScheduler.scheme_name
260 except AttributeError:
261 from .scheduler import TaskScheduler
262 scheme = TaskScheduler.scheme_name.get_default_value()
252 # build connection dicts
263 # build connection dicts
253 self.engine_info = {
264 self.engine_info = {
254 'control' : engine_iface%self.control[1],
265 'control' : engine_iface%self.control[1],
@@ -262,7 +273,7 b' class HubFactory(RegistrationFactory):'
262 self.client_info = {
273 self.client_info = {
263 'control' : client_iface%self.control[0],
274 'control' : client_iface%self.control[0],
264 'mux': client_iface%self.mux[0],
275 'mux': client_iface%self.mux[0],
265 'task' : (self.scheme, client_iface%self.task[0]),
276 'task' : (scheme, client_iface%self.task[0]),
266 'iopub' : client_iface%self.iopub[0],
277 'iopub' : client_iface%self.iopub[0],
267 'notification': client_iface%self.notifier_port
278 'notification': client_iface%self.notifier_port
268 }
279 }
@@ -20,9 +20,20 b' from .dictdb import BaseDB'
20 class MongoDB(BaseDB):
20 class MongoDB(BaseDB):
21 """MongoDB TaskRecord backend."""
21 """MongoDB TaskRecord backend."""
22
22
23 connection_args = List(config=True) # args passed to pymongo.Connection
23 connection_args = List(config=True,
24 connection_kwargs = Dict(config=True) # kwargs passed to pymongo.Connection
24 help="""Positional arguments to be passed to pymongo.Connection. Only
25 database = CUnicode(config=True) # name of the mongodb database
25 necessary if the default mongodb configuration does not point to your
26 mongod instance.""")
27 connection_kwargs = Dict(config=True,
28 help="""Keyword arguments to be passed to pymongo.Connection. Only
29 necessary if the default mongodb configuration does not point to your
30 mongod instance."""
31 )
32 database = CUnicode(config=True,
33 help="""The MongoDB database name to use for storing tasks for this session. If unspecified,
34 a new database will be created with the Hub's IDENT. Specifying the database will result
35 in tasks from previous sessions being available via Clients' db_query and
36 get_result methods.""")
26
37
27 _connection = Instance(Connection) # pymongo connection
38 _connection = Instance(Connection) # pymongo connection
28
39
@@ -35,7 +35,7 b' from zmq.eventloop import ioloop, zmqstream'
35 # local imports
35 # local imports
36 from IPython.external.decorator import decorator
36 from IPython.external.decorator import decorator
37 from IPython.config.loader import Config
37 from IPython.config.loader import Config
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Str, Enum
39
39
40 from IPython.parallel import error
40 from IPython.parallel import error
41 from IPython.parallel.factory import SessionFactory
41 from IPython.parallel.factory import SessionFactory
@@ -126,7 +126,19 b' class TaskScheduler(SessionFactory):'
126
126
127 """
127 """
128
128
129 hwm = Int(0, config=True) # limit number of outstanding tasks
129 hwm = Int(0, config=True, shortname='hwm',
130 help="""specify the High Water Mark (HWM) for the downstream
131 socket in the Task scheduler. This is the maximum number
132 of allowed outstanding tasks on each engine."""
133 )
134 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
135 'leastload', config=True, shortname='scheme', allow_none=False,
136 help="""select the task scheduler scheme [default: Python LRU]
137 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
138 )
139 def _scheme_name_changed(self, old, new):
140 self.log.debug("Using scheme %r"%new)
141 self.scheme = globals()[new]
130
142
131 # input arguments:
143 # input arguments:
132 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
144 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
@@ -622,7 +634,7 b' class TaskScheduler(SessionFactory):'
622
634
623
635
624 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
636 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
625 log_addr=None, loglevel=logging.DEBUG, scheme='lru',
637 log_addr=None, loglevel=logging.DEBUG,
626 identity=b'task'):
638 identity=b'task'):
627 from zmq.eventloop import ioloop
639 from zmq.eventloop import ioloop
628 from zmq.eventloop.zmqstream import ZMQStream
640 from zmq.eventloop.zmqstream import ZMQStream
@@ -646,7 +658,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='
646 nots.setsockopt(zmq.SUBSCRIBE, '')
658 nots.setsockopt(zmq.SUBSCRIBE, '')
647 nots.connect(not_addr)
659 nots.connect(not_addr)
648
660
649 scheme = globals().get(scheme, None)
661 # scheme = globals().get(scheme, None)
650 # setup logging
662 # setup logging
651 if log_addr:
663 if log_addr:
652 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
664 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
@@ -655,7 +667,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='
655
667
656 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
668 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
657 mon_stream=mons, notifier_stream=nots,
669 mon_stream=mons, notifier_stream=nots,
658 scheme=scheme, loop=loop, logname=logname,
670 loop=loop, logname=logname,
659 config=config)
671 config=config)
660 scheduler.start()
672 scheduler.start()
661 try:
673 try:
@@ -83,9 +83,16 b' def _convert_bufs(bs):'
83 class SQLiteDB(BaseDB):
83 class SQLiteDB(BaseDB):
84 """SQLite3 TaskRecord backend."""
84 """SQLite3 TaskRecord backend."""
85
85
86 filename = CUnicode('tasks.db', config=True)
86 filename = CUnicode('tasks.db', config=True,
87 location = CUnicode('', config=True)
87 help="""The filename of the sqlite task database. [default: 'tasks.db']""")
88 table = CUnicode("", config=True)
88 location = CUnicode('', config=True,
89 help="""The directory containing the sqlite task database. The default
90 is to use the cluster_dir location.""")
91 table = CUnicode("", config=True,
92 help="""The SQLite Table to use for storing tasks for this session. If unspecified,
93 a new table will be created with the Hub's IDENT. Specifying the table will result
94 in tasks from previous sessions being available via Clients' db_query and
95 get_result methods.""")
89
96
90 _db = Instance('sqlite3.Connection')
97 _db = Instance('sqlite3.Connection')
91 _keys = List(['msg_id' ,
98 _keys = List(['msg_id' ,
@@ -33,13 +33,22 b' class EngineFactory(RegistrationFactory):'
33 """IPython engine"""
33 """IPython engine"""
34
34
35 # configurables:
35 # configurables:
36 user_ns=Dict(config=True)
36 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
37 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
37 help="""The OutStream for handling stdout/err.
38 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
38 Typically 'IPython.zmq.iostream.OutStream'""")
39 location=Str(config=True)
39 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True,
40 timeout=CFloat(2,config=True)
40 help="""The class for handling displayhook.
41 Typically 'IPython.zmq.displayhook.DisplayHook'""")
42 location=Str(config=True,
43 help="""The location (an IP address) of the controller. This is
44 used for disambiguating URLs, to determine whether
45 loopback should be used to connect or the public address.""")
46 timeout=CFloat(2,config=True,
47 help="""The time (in seconds) to wait for the Controller to respond
48 to registration requests before giving up.""")
41
49
42 # not configurable:
50 # not configurable:
51 user_ns=Dict()
43 id=Int(allow_none=True)
52 id=Int(allow_none=True)
44 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
53 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
45 kernel=Instance(Kernel)
54 kernel=Instance(Kernel)
@@ -47,6 +56,7 b' class EngineFactory(RegistrationFactory):'
47
56
48 def __init__(self, **kwargs):
57 def __init__(self, **kwargs):
49 super(EngineFactory, self).__init__(**kwargs)
58 super(EngineFactory, self).__init__(**kwargs)
59 self.ident = self.session.session
50 ctx = self.context
60 ctx = self.context
51
61
52 reg = ctx.socket(zmq.XREQ)
62 reg = ctx.socket(zmq.XREQ)
@@ -127,7 +137,7 b' class EngineFactory(RegistrationFactory):'
127
137
128 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
138 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
129 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
139 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
130 loop=loop, user_ns = self.user_ns, logname=self.log.name)
140 loop=loop, user_ns = self.user_ns, log=self.log)
131 self.kernel.start()
141 self.kernel.start()
132 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
142 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
133 heart = Heart(*map(str, hb_addrs), heart_id=identity)
143 heart = Heart(*map(str, hb_addrs), heart_id=identity)
@@ -143,7 +153,7 b' class EngineFactory(RegistrationFactory):'
143
153
144
154
145 def abort(self):
155 def abort(self):
146 self.log.fatal("Registration timed out")
156 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
147 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
157 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
148 time.sleep(1)
158 time.sleep(1)
149 sys.exit(255)
159 sys.exit(255)
@@ -28,7 +28,7 b' import zmq'
28 from zmq.eventloop import ioloop, zmqstream
28 from zmq.eventloop import ioloop, zmqstream
29
29
30 # Local imports.
30 # Local imports.
31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str
31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str, CStr
32 from IPython.zmq.completer import KernelCompleter
32 from IPython.zmq.completer import KernelCompleter
33
33
34 from IPython.parallel.error import wrap_exception
34 from IPython.parallel.error import wrap_exception
@@ -64,9 +64,11 b' class Kernel(SessionFactory):'
64 #---------------------------------------------------------------------------
64 #---------------------------------------------------------------------------
65
65
66 # kwargs:
66 # kwargs:
67 int_id = Int(-1, config=True)
67 exec_lines = List(CStr, config=True,
68 user_ns = Dict(config=True)
68 help="List of lines to execute")
69 exec_lines = List(config=True)
69
70 int_id = Int(-1)
71 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
70
72
71 control_stream = Instance(zmqstream.ZMQStream)
73 control_stream = Instance(zmqstream.ZMQStream)
72 task_stream = Instance(zmqstream.ZMQStream)
74 task_stream = Instance(zmqstream.ZMQStream)
@@ -14,12 +14,10 b''
14
14
15 import logging
15 import logging
16 import os
16 import os
17 import uuid
18
17
19 from zmq.eventloop.ioloop import IOLoop
18 from zmq.eventloop.ioloop import IOLoop
20
19
21 from IPython.config.configurable import Configurable
20 from IPython.config.configurable import Configurable
22 from IPython.utils.importstring import import_item
23 from IPython.utils.traitlets import Str,Int,Instance, CUnicode, CStr
21 from IPython.utils.traitlets import Str,Int,Instance, CUnicode, CStr
24
22
25 import IPython.parallel.streamsession as ss
23 import IPython.parallel.streamsession as ss
@@ -39,13 +37,6 b' class LoggingFactory(Configurable):'
39 class SessionFactory(LoggingFactory):
37 class SessionFactory(LoggingFactory):
40 """The Base factory from which every factory in IPython.parallel inherits"""
38 """The Base factory from which every factory in IPython.parallel inherits"""
41
39
42 packer = Str('',config=True)
43 unpacker = Str('',config=True)
44 ident = CStr('',config=True)
45 def _ident_default(self):
46 return str(uuid.uuid4())
47 username = CUnicode(os.environ.get('USER','username'),config=True)
48 exec_key = CUnicode('',config=True)
49 # not configurable:
40 # not configurable:
50 context = Instance('zmq.Context', (), {})
41 context = Instance('zmq.Context', (), {})
51 session = Instance('IPython.parallel.streamsession.StreamSession')
42 session = Instance('IPython.parallel.streamsession.StreamSession')
@@ -56,33 +47,28 b' class SessionFactory(LoggingFactory):'
56
47
57 def __init__(self, **kwargs):
48 def __init__(self, **kwargs):
58 super(SessionFactory, self).__init__(**kwargs)
49 super(SessionFactory, self).__init__(**kwargs)
59 exec_key = self.exec_key or None
60 # set the packers:
61 if not self.packer:
62 packer_f = unpacker_f = None
63 elif self.packer.lower() == 'json':
64 packer_f = ss.json_packer
65 unpacker_f = ss.json_unpacker
66 elif self.packer.lower() == 'pickle':
67 packer_f = ss.pickle_packer
68 unpacker_f = ss.pickle_unpacker
69 else:
70 packer_f = import_item(self.packer)
71 unpacker_f = import_item(self.unpacker)
72
50
73 # construct the session
51 # construct the session
74 self.session = ss.StreamSession(self.username, self.ident, packer=packer_f, unpacker=unpacker_f, key=exec_key)
52 self.session = ss.StreamSession(**kwargs)
75
53
76
54
77 class RegistrationFactory(SessionFactory):
55 class RegistrationFactory(SessionFactory):
78 """The Base Configurable for objects that involve registration."""
56 """The Base Configurable for objects that involve registration."""
79
57
80 url = Str('', config=True) # url takes precedence over ip,regport,transport
58 url = Str('', config=True,
81 transport = Str('tcp', config=True)
59 help="""The 0MQ url used for registration. This sets transport, ip, and port
82 ip = Str('127.0.0.1', config=True)
60 in one variable. For example: url='tcp://127.0.0.1:12345' or
83 regport = Instance(int, config=True)
61 url='epgm://*:90210'""") # url takes precedence over ip,regport,transport
62 transport = Str('tcp', config=True,
63 help="""The 0MQ transport for communications. This will likely be
64 the default of 'tcp', but other values include 'ipc', 'epgm', 'inproc'.""")
65 ip = Str('127.0.0.1', config=True,
66 help="""The IP address for registration. This is generally either
67 '127.0.0.1' for loopback only or '*' for all interfaces.
68 [default: '127.0.0.1']""")
69 regport = Int(config=True,
70 help="""The port on which the Hub listens for registration.""")
84 def _regport_default(self):
71 def _regport_default(self):
85 # return 10101
86 return select_random_ports(1)[0]
72 return select_random_ports(1)[0]
87
73
88 def __init__(self, **kwargs):
74 def __init__(self, **kwargs):
@@ -107,46 +93,3 b' class RegistrationFactory(SessionFactory):'
107 self.ip = iface[0]
93 self.ip = iface[0]
108 if iface[1]:
94 if iface[1]:
109 self.regport = int(iface[1])
95 self.regport = int(iface[1])
110
111 #-----------------------------------------------------------------------------
112 # argparse argument extenders
113 #-----------------------------------------------------------------------------
114
115
116 def add_session_arguments(parser):
117 paa = parser.add_argument
118 paa('--ident',
119 type=str, dest='SessionFactory.ident',
120 help='set the ZMQ and session identity [default: random uuid]',
121 metavar='identity')
122 # paa('--execkey',
123 # type=str, dest='SessionFactory.exec_key',
124 # help='path to a file containing an execution key.',
125 # metavar='execkey')
126 paa('--packer',
127 type=str, dest='SessionFactory.packer',
128 help='method to serialize messages: {json,pickle} [default: json]',
129 metavar='packer')
130 paa('--unpacker',
131 type=str, dest='SessionFactory.unpacker',
132 help='inverse function of `packer`. Only necessary when using something other than json|pickle',
133 metavar='packer')
134
135 def add_registration_arguments(parser):
136 paa = parser.add_argument
137 paa('--ip',
138 type=str, dest='RegistrationFactory.ip',
139 help="The IP used for registration [default: localhost]",
140 metavar='ip')
141 paa('--transport',
142 type=str, dest='RegistrationFactory.transport',
143 help="The ZeroMQ transport used for registration [default: tcp]",
144 metavar='transport')
145 paa('--url',
146 type=str, dest='RegistrationFactory.url',
147 help='set transport,ip,regport in one go, e.g. tcp://127.0.0.1:10101',
148 metavar='url')
149 paa('--regport',
150 type=int, dest='RegistrationFactory.regport',
151 help="The port used for registration [default: 10101]",
152 metavar='ip')
@@ -25,8 +25,13 b' import zmq'
25 from zmq.utils import jsonapi
25 from zmq.utils import jsonapi
26 from zmq.eventloop.zmqstream import ZMQStream
26 from zmq.eventloop.zmqstream import ZMQStream
27
27
28 from IPython.config.configurable import Configurable
29 from IPython.utils.importstring import import_item
30 from IPython.utils.traitlets import Str, CStr, CUnicode, Bool, Any
31
28 from .util import ISO8601
32 from .util import ISO8601
29
33
34
30 def squash_unicode(obj):
35 def squash_unicode(obj):
31 """coerce unicode back to bytestrings."""
36 """coerce unicode back to bytestrings."""
32 if isinstance(obj,dict):
37 if isinstance(obj,dict):
@@ -113,50 +118,72 b' def extract_header(msg_or_header):'
113 h = dict(h)
118 h = dict(h)
114 return h
119 return h
115
120
116 class StreamSession(object):
121 class StreamSession(Configurable):
117 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
122 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
118 debug=False
123 debug=Bool(False, config=True, help="""Debug output in the StreamSession""")
119 key=None
124 packer = Str('json',config=True,
120
125 help="""The name of the packer for serializing messages.
121 def __init__(self, username=None, session=None, packer=None, unpacker=None, key=None, keyfile=None):
126 Should be one of 'json', 'pickle', or an import name
122 if username is None:
127 for a custom serializer.""")
123 username = os.environ.get('USER','username')
128 def _packer_changed(self, name, old, new):
124 self.username = username
129 if new.lower() == 'json':
125 if session is None:
130 self.pack = json_packer
126 self.session = str(uuid.uuid4())
131 self.unpack = json_unpacker
132 elif new.lower() == 'pickle':
133 self.pack = pickle_packer
134 self.unpack = pickle_unpacker
127 else:
135 else:
128 self.session = session
136 self.pack = import_item(new)
129 self.msg_id = str(uuid.uuid4())
137
130 if packer is None:
138 unpacker = Str('json',config=True,
131 self.pack = default_packer
139 help="""The name of the unpacker for unserializing messages.
140 Only used with custom functions for `packer`.""")
141 def _unpacker_changed(self, name, old, new):
142 if new.lower() == 'json':
143 self.pack = json_packer
144 self.unpack = json_unpacker
145 elif new.lower() == 'pickle':
146 self.pack = pickle_packer
147 self.unpack = pickle_unpacker
132 else:
148 else:
133 if not callable(packer):
149 self.unpack = import_item(new)
134 raise TypeError("packer must be callable, not %s"%type(packer))
135 self.pack = packer
136
150
137 if unpacker is None:
151 session = CStr('',config=True,
138 self.unpack = default_unpacker
152 help="""The UUID identifying this session.""")
139 else:
153 def _session_default(self):
140 if not callable(unpacker):
154 return str(uuid.uuid4())
141 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
155 username = CUnicode(os.environ.get('USER','username'),config=True,
142 self.unpack = unpacker
156 help="""Username for the Session. Default is your system username.""")
157 key = CStr('', config=True,
158 help="""execution key, for extra authentication.""")
159
160 keyfile = CUnicode('', config=True,
161 help="""path to file containing execution key.""")
162 def _keyfile_changed(self, name, old, new):
163 with open(new, 'rb') as f:
164 self.key = f.read().strip()
165
166 pack = Any(default_packer) # the actual packer function
167 def _pack_changed(self, name, old, new):
168 if not callable(new):
169 raise TypeError("packer must be callable, not %s"%type(new))
143
170
144 if key is not None and keyfile is not None:
171 unpack = Any(default_unpacker) # the actual packer function
145 raise TypeError("Must specify key OR keyfile, not both")
172 def _unpack_changed(self, name, old, new):
146 if keyfile is not None:
173 if not callable(new):
147 with open(keyfile) as f:
174 raise TypeError("packer must be callable, not %s"%type(new))
148 self.key = f.read().strip()
175
149 else:
176 def __init__(self, **kwargs):
150 self.key = key
177 super(StreamSession, self).__init__(**kwargs)
151 if isinstance(self.key, unicode):
152 self.key = self.key.encode('utf8')
153 # print key, keyfile, self.key
154 self.none = self.pack({})
178 self.none = self.pack({})
155
179
180 @property
181 def msg_id(self):
182 """always return new uuid"""
183 return str(uuid.uuid4())
184
156 def msg_header(self, msg_type):
185 def msg_header(self, msg_type):
157 h = msg_header(self.msg_id, msg_type, self.username, self.session)
186 return msg_header(self.msg_id, msg_type, self.username, self.session)
158 self.msg_id = str(uuid.uuid4())
159 return h
160
187
161 def msg(self, msg_type, content=None, parent=None, subheader=None):
188 def msg(self, msg_type, content=None, parent=None, subheader=None):
162 msg = {}
189 msg = {}
@@ -171,10 +198,10 b' class StreamSession(object):'
171
198
172 def check_key(self, msg_or_header):
199 def check_key(self, msg_or_header):
173 """Check that a message's header has the right key"""
200 """Check that a message's header has the right key"""
174 if self.key is None:
201 if not self.key:
175 return True
202 return True
176 header = extract_header(msg_or_header)
203 header = extract_header(msg_or_header)
177 return header.get('key', None) == self.key
204 return header.get('key', '') == self.key
178
205
179
206
180 def serialize(self, msg, ident=None):
207 def serialize(self, msg, ident=None):
@@ -200,7 +227,7 b' class StreamSession(object):'
200 elif ident is not None:
227 elif ident is not None:
201 to_send.append(ident)
228 to_send.append(ident)
202 to_send.append(DELIM)
229 to_send.append(DELIM)
203 if self.key is not None:
230 if self.key:
204 to_send.append(self.key)
231 to_send.append(self.key)
205 to_send.append(self.pack(msg['header']))
232 to_send.append(self.pack(msg['header']))
206 to_send.append(self.pack(msg['parent_header']))
233 to_send.append(self.pack(msg['parent_header']))
@@ -297,7 +324,7 b' class StreamSession(object):'
297 if ident is not None:
324 if ident is not None:
298 to_send.extend(ident)
325 to_send.extend(ident)
299 to_send.append(DELIM)
326 to_send.append(DELIM)
300 if self.key is not None:
327 if self.key:
301 to_send.append(self.key)
328 to_send.append(self.key)
302 to_send.extend(msg)
329 to_send.extend(msg)
303 stream.send_multipart(msg, flags, copy=copy)
330 stream.send_multipart(msg, flags, copy=copy)
@@ -345,7 +372,7 b' class StreamSession(object):'
345 msg will be a list of bytes or Messages, unchanged from input
372 msg will be a list of bytes or Messages, unchanged from input
346 msg should be unpackable via self.unpack_message at this point.
373 msg should be unpackable via self.unpack_message at this point.
347 """
374 """
348 ikey = int(self.key is not None)
375 ikey = int(self.key != '')
349 minlen = 3 + ikey
376 minlen = 3 + ikey
350 msg = list(msg)
377 msg = list(msg)
351 idents = []
378 idents = []
@@ -379,7 +406,7 b' class StreamSession(object):'
379 or the non-copying Message object in each place (False)
406 or the non-copying Message object in each place (False)
380
407
381 """
408 """
382 ikey = int(self.key is not None)
409 ikey = int(self.key != '')
383 minlen = 3 + ikey
410 minlen = 3 + ikey
384 message = {}
411 message = {}
385 if not copy:
412 if not copy:
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now