##// END OF EJS Templates
all ipcluster scripts in some degree of working order with new config
MinRK -
Show More
@@ -25,16 +25,18 b' import sys'
25 25
26 26 from subprocess import Popen, PIPE
27 27
28 from IPython.config.loader import PyFileConfigLoader
28 from IPython.config.loader import PyFileConfigLoader, Config
29 29 from IPython.config.configurable import Configurable
30 from IPython.core.application import Application, BaseAppConfigLoader
30 from IPython.config.application import Application
31 31 from IPython.core.crashhandler import CrashHandler
32 from IPython.core.newapplication import BaseIPythonApplication
32 33 from IPython.core import release
33 34 from IPython.utils.path import (
34 35 get_ipython_package_dir,
36 get_ipython_dir,
35 37 expand_path
36 38 )
37 from IPython.utils.traitlets import Unicode
39 from IPython.utils.traitlets import Unicode, Bool, CStr, Instance, Dict
38 40
39 41 #-----------------------------------------------------------------------------
40 42 # Module errors
@@ -69,19 +71,45 b' class ClusterDir(Configurable):'
69 71 security_dir = Unicode(u'')
70 72 log_dir = Unicode(u'')
71 73 pid_dir = Unicode(u'')
72 location = Unicode(u'')
73 74
74 def __init__(self, location=u''):
75 super(ClusterDir, self).__init__(location=location)
75 location = Unicode(u'', config=True,
76 help="""Set the cluster dir. This overrides the logic used by the
77 `profile` option.""",
78 )
79 profile = Unicode(u'default',
80 help="""The string name of the profile to be used. This determines the name
81 of the cluster dir as: cluster_<profile>. The default profile is named
82 'default'. The cluster directory is resolve this way if the
83 `cluster_dir` option is not used.""", config=True
84 )
85
86 _location_isset = Bool(False) # flag for detecting multiply set location
87 _new_dir = Bool(False) # flag for whether a new dir was created
88
89 def __init__(self, **kwargs):
90 super(ClusterDir, self).__init__(**kwargs)
91 if not self.location:
92 self._profile_changed('profile', 'default', self.profile)
76 93
77 94 def _location_changed(self, name, old, new):
95 if self._location_isset:
96 raise RuntimeError("Cannot set ClusterDir more than once.")
97 self._location_isset = True
78 98 if not os.path.isdir(new):
79 99 os.makedirs(new)
100 self._new_dir = True
101 # ensure config files exist:
102 self.copy_all_config_files(overwrite=False)
80 103 self.security_dir = os.path.join(new, self.security_dir_name)
81 104 self.log_dir = os.path.join(new, self.log_dir_name)
82 105 self.pid_dir = os.path.join(new, self.pid_dir_name)
83 106 self.check_dirs()
84 107
108 def _profile_changed(self, name, old, new):
109 if self._location_isset:
110 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
111 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
112
85 113 def _log_dir_changed(self, name, old, new):
86 114 self.check_log_dir()
87 115
@@ -110,18 +138,6 b' class ClusterDir(Configurable):'
110 138 self.check_log_dir()
111 139 self.check_pid_dir()
112 140
113 def load_config_file(self, filename):
114 """Load a config file from the top level of the cluster dir.
115
116 Parameters
117 ----------
118 filename : unicode or str
119 The filename only of the config file that must be located in
120 the top-level of the cluster directory.
121 """
122 loader = PyFileConfigLoader(filename, self.location)
123 return loader.load_config()
124
125 141 def copy_config_file(self, config_file, path=None, overwrite=False):
126 142 """Copy a default config file into the active cluster directory.
127 143
@@ -227,59 +243,6 b' class ClusterDir(Configurable):'
227 243
228 244
229 245 #-----------------------------------------------------------------------------
230 # Command line options
231 #-----------------------------------------------------------------------------
232
233 class ClusterDirConfigLoader(BaseAppConfigLoader):
234
235 def _add_cluster_profile(self, parser):
236 paa = parser.add_argument
237 paa('-p', '--profile',
238 dest='Global.profile',type=unicode,
239 help=
240 """The string name of the profile to be used. This determines the name
241 of the cluster dir as: cluster_<profile>. The default profile is named
242 'default'. The cluster directory is resolve this way if the
243 --cluster-dir option is not used.""",
244 metavar='Global.profile')
245
246 def _add_cluster_dir(self, parser):
247 paa = parser.add_argument
248 paa('--cluster-dir',
249 dest='Global.cluster_dir',type=unicode,
250 help="""Set the cluster dir. This overrides the logic used by the
251 --profile option.""",
252 metavar='Global.cluster_dir')
253
254 def _add_work_dir(self, parser):
255 paa = parser.add_argument
256 paa('--work-dir',
257 dest='Global.work_dir',type=unicode,
258 help='Set the working dir for the process.',
259 metavar='Global.work_dir')
260
261 def _add_clean_logs(self, parser):
262 paa = parser.add_argument
263 paa('--clean-logs',
264 dest='Global.clean_logs', action='store_true',
265 help='Delete old log flies before starting.')
266
267 def _add_no_clean_logs(self, parser):
268 paa = parser.add_argument
269 paa('--no-clean-logs',
270 dest='Global.clean_logs', action='store_false',
271 help="Don't Delete old log flies before starting.")
272
273 def _add_arguments(self):
274 super(ClusterDirConfigLoader, self)._add_arguments()
275 self._add_cluster_profile(self.parser)
276 self._add_cluster_dir(self.parser)
277 self._add_work_dir(self.parser)
278 self._add_clean_logs(self.parser)
279 self._add_no_clean_logs(self.parser)
280
281
282 #-----------------------------------------------------------------------------
283 246 # Crash handler for this application
284 247 #-----------------------------------------------------------------------------
285 248
@@ -312,8 +275,8 b' class ClusterDirCrashHandler(CrashHandler):'
312 275 message_template = _message_template
313 276
314 277 def __init__(self, app):
315 contact_name = release.authors['Brian'][0]
316 contact_email = release.authors['Brian'][1]
278 contact_name = release.authors['Min'][0]
279 contact_email = release.authors['Min'][1]
317 280 bug_tracker = 'http://github.com/ipython/ipython/issues'
318 281 super(ClusterDirCrashHandler,self).__init__(
319 282 app, contact_name, contact_email, bug_tracker
@@ -323,8 +286,25 b' class ClusterDirCrashHandler(CrashHandler):'
323 286 #-----------------------------------------------------------------------------
324 287 # Main application
325 288 #-----------------------------------------------------------------------------
326
327 class ApplicationWithClusterDir(Application):
289 base_aliases = {
290 'profile' : "ClusterDir.profile",
291 'cluster_dir' : 'ClusterDir.location',
292 'log_level' : 'Application.log_level',
293 'work_dir' : 'ClusterDirApplicaiton.work_dir',
294 'log_to_file' : 'ClusterDirApplicaiton.log_to_file',
295 'clean_logs' : 'ClusterDirApplicaiton.clean_logs',
296 'log_url' : 'ClusterDirApplicaiton.log_url',
297 }
298
299 base_flags = {
300 'debug' : ( {"Application" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
301 'clean-logs' : ( {"ClusterDirApplication" : {"clean_logs" : True}}, "cleanup old logfiles"),
302 'log-to-file' : ( {"ClusterDirApplication" : {"log_to_file" : True}}, "log to a file")
303 }
304 for k,v in base_flags.iteritems():
305 base_flags[k] = (Config(v[0]),v[1])
306
307 class ClusterDirApplication(BaseIPythonApplication):
328 308 """An application that puts everything into a cluster directory.
329 309
330 310 Instead of looking for things in the ipython_dir, this type of application
@@ -343,22 +323,37 b' class ApplicationWithClusterDir(Application):'
343 323 dir and named the value of the ``config_file_name`` class attribute.
344 324 """
345 325
346 command_line_loader = ClusterDirConfigLoader
347 326 crash_handler_class = ClusterDirCrashHandler
348 auto_create_cluster_dir = True
327 auto_create_cluster_dir = Bool(True, config=True,
328 help="whether to create the cluster_dir if it doesn't exist")
349 329 # temporarily override default_log_level to INFO
350 330 default_log_level = logging.INFO
331 cluster_dir = Instance(ClusterDir)
332
333 work_dir = Unicode(os.getcwdu(), config=True,
334 help='Set the working dir for the process.'
335 )
336 def _work_dir_changed(self, name, old, new):
337 self.work_dir = unicode(expand_path(new))
338
339 log_to_file = Bool(config=True,
340 help="whether to log to a file")
341
342 clean_logs = Bool(True, shortname='--clean-logs', config=True,
343 help="whether to cleanup old logfiles before starting")
351 344
352 def create_default_config(self):
353 super(ApplicationWithClusterDir, self).create_default_config()
354 self.default_config.Global.profile = u'default'
355 self.default_config.Global.cluster_dir = u''
356 self.default_config.Global.work_dir = os.getcwd()
357 self.default_config.Global.log_to_file = False
358 self.default_config.Global.log_url = None
359 self.default_config.Global.clean_logs = False
345 log_url = CStr('', shortname='--log-url', config=True,
346 help="The ZMQ URL of the iplooger to aggregate logging.")
360 347
361 def find_resources(self):
348 config_file = Unicode(u'', config=True,
349 help="""Path to ipcontroller configuration file. The default is to use
350 <appname>_config.py, as found by cluster-dir."""
351 )
352
353 aliases = Dict(base_aliases)
354 flags = Dict(base_flags)
355
356 def init_clusterdir(self):
362 357 """This resolves the cluster directory.
363 358
364 359 This tries to find the cluster directory and if successful, it will
@@ -375,121 +370,53 b' class ApplicationWithClusterDir(Application):'
375 370 ``True``, then create the new cluster dir in the IPython directory.
376 371 4. If all fails, then raise :class:`ClusterDirError`.
377 372 """
378
379 try:
380 cluster_dir = self.command_line_config.Global.cluster_dir
381 except AttributeError:
382 cluster_dir = self.default_config.Global.cluster_dir
383 cluster_dir = expand_path(cluster_dir)
384 try:
385 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
386 except ClusterDirError:
387 pass
388 else:
389 self.log.info('Using existing cluster dir: %s' % \
390 self.cluster_dir_obj.location
391 )
392 self.finish_cluster_dir()
393 return
394
395 try:
396 self.profile = self.command_line_config.Global.profile
397 except AttributeError:
398 self.profile = self.default_config.Global.profile
399 try:
400 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
401 self.ipython_dir, self.profile)
402 except ClusterDirError:
403 pass
404 else:
405 self.log.info('Using existing cluster dir: %s' % \
406 self.cluster_dir_obj.location
407 )
408 self.finish_cluster_dir()
409 return
410
411 if self.auto_create_cluster_dir:
412 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
413 self.ipython_dir, self.profile
414 )
373 self.cluster_dir = ClusterDir(config=self.config)
374 if self.cluster_dir._new_dir:
415 375 self.log.info('Creating new cluster dir: %s' % \
416 self.cluster_dir_obj.location
417 )
418 self.finish_cluster_dir()
376 self.cluster_dir.location)
419 377 else:
420 raise ClusterDirError('Could not find a valid cluster directory.')
421
422 def finish_cluster_dir(self):
423 # Set the cluster directory
424 self.cluster_dir = self.cluster_dir_obj.location
425
426 # These have to be set because they could be different from the one
427 # that we just computed. Because command line has the highest
428 # priority, this will always end up in the master_config.
429 self.default_config.Global.cluster_dir = self.cluster_dir
430 self.command_line_config.Global.cluster_dir = self.cluster_dir
431
432 def find_config_file_name(self):
433 """Find the config file name for this application."""
434 # For this type of Application it should be set as a class attribute.
435 if not hasattr(self, 'default_config_file_name'):
436 self.log.critical("No config filename found")
437 else:
438 self.config_file_name = self.default_config_file_name
439
440 def find_config_file_paths(self):
441 # Set the search path to to the cluster directory. We should NOT
442 # include IPython.config.default here as the default config files
443 # are ALWAYS automatically moved to the cluster directory.
444 conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default')
445 self.config_file_paths = (self.cluster_dir,)
446
447 def pre_construct(self):
448 # The log and security dirs were set earlier, but here we put them
449 # into the config and log them.
450 config = self.master_config
451 sdir = self.cluster_dir_obj.security_dir
452 self.security_dir = config.Global.security_dir = sdir
453 ldir = self.cluster_dir_obj.log_dir
454 self.log_dir = config.Global.log_dir = ldir
455 pdir = self.cluster_dir_obj.pid_dir
456 self.pid_dir = config.Global.pid_dir = pdir
457 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
458 config.Global.work_dir = unicode(expand_path(config.Global.work_dir))
459 # Change to the working directory. We do this just before construct
460 # is called so all the components there have the right working dir.
461 self.to_work_dir()
378 self.log.info('Using existing cluster dir: %s' % \
379 self.cluster_dir.location)
462 380
463 381 def to_work_dir(self):
464 wd = self.master_config.Global.work_dir
465 if unicode(wd) != unicode(os.getcwd()):
382 wd = self.work_dir
383 if unicode(wd) != os.getcwdu():
466 384 os.chdir(wd)
467 385 self.log.info("Changing to working dir: %s" % wd)
468 386
469 def start_logging(self):
470 # Remove old log files
471 if self.master_config.Global.clean_logs:
472 log_dir = self.master_config.Global.log_dir
473 for f in os.listdir(log_dir):
474 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
475 # if f.startswith(self.name + u'-') and f.endswith('.log'):
476 os.remove(os.path.join(log_dir, f))
477 # Start logging to the new log file
478 if self.master_config.Global.log_to_file:
479 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
480 logfile = os.path.join(self.log_dir, log_filename)
481 open_log_file = open(logfile, 'w')
482 elif self.master_config.Global.log_url:
483 open_log_file = None
484 else:
485 open_log_file = sys.stdout
486 if open_log_file is not None:
487 self.log.removeHandler(self._log_handler)
488 self._log_handler = logging.StreamHandler(open_log_file)
489 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
490 self._log_handler.setFormatter(self._log_formatter)
491 self.log.addHandler(self._log_handler)
492 # log.startLogging(open_log_file)
387 def load_config_file(self, filename, path=None):
388 """Load a .py based config file by filename and path."""
389 return Application.load_config_file(self, filename, path=path)
390 #
391 # def load_default_config_file(self):
392 # """Load a .py based config file by filename and path."""
393 # return BaseIPythonApplication.load_config_file(self)
394
395 # disable URL-logging
396 # def init_logging(self):
397 # # Remove old log files
398 # if self.master_config.Global.clean_logs:
399 # log_dir = self.master_config.Global.log_dir
400 # for f in os.listdir(log_dir):
401 # if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
402 # # if f.startswith(self.name + u'-') and f.endswith('.log'):
403 # os.remove(os.path.join(log_dir, f))
404 # # Start logging to the new log file
405 # if self.master_config.Global.log_to_file:
406 # log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
407 # logfile = os.path.join(self.log_dir, log_filename)
408 # open_log_file = open(logfile, 'w')
409 # elif self.master_config.Global.log_url:
410 # open_log_file = None
411 # else:
412 # open_log_file = sys.stdout
413 # if open_log_file is not None:
414 # self.log.removeHandler(self._log_handler)
415 # self._log_handler = logging.StreamHandler(open_log_file)
416 # self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
417 # self._log_handler.setFormatter(self._log_formatter)
418 # self.log.addHandler(self._log_handler)
419 # # log.startLogging(open_log_file)
493 420
494 421 def write_pid_file(self, overwrite=False):
495 422 """Create a .pid file in the pid_dir with my pid.
@@ -497,7 +424,7 b' class ApplicationWithClusterDir(Application):'
497 424 This must be called after pre_construct, which sets `self.pid_dir`.
498 425 This raises :exc:`PIDFileError` if the pid file exists already.
499 426 """
500 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
427 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
501 428 if os.path.isfile(pid_file):
502 429 pid = self.get_pid_from_file()
503 430 if not overwrite:
@@ -516,7 +443,7 b' class ApplicationWithClusterDir(Application):'
516 443 :func:`reactor.addSystemEventTrigger`. This needs to return
517 444 ``None``.
518 445 """
519 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
446 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
520 447 if os.path.isfile(pid_file):
521 448 try:
522 449 self.log.info("Removing pid file: %s" % pid_file)
@@ -529,7 +456,7 b' class ApplicationWithClusterDir(Application):'
529 456
530 457 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
531 458 """
532 pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
459 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
533 460 if os.path.isfile(pid_file):
534 461 with open(pid_file, 'r') as f:
535 462 pid = int(f.read().strip())
@@ -563,4 +490,3 b' class ApplicationWithClusterDir(Application):'
563 490 return True
564 491 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
565 492 return pid in pids
566 No newline at end of file
@@ -25,12 +25,14 b' from subprocess import check_call, CalledProcessError, PIPE'
25 25 import zmq
26 26 from zmq.eventloop import ioloop
27 27
28 from IPython.external.argparse import ArgumentParser, SUPPRESS
28 from IPython.config.loader import Config
29 29 from IPython.utils.importstring import import_item
30 from IPython.utils.traitlets import Int, CStr, CUnicode, Str, Bool, CFloat, Dict, List
30 31
31 32 from IPython.parallel.apps.clusterdir import (
32 ApplicationWithClusterDir, ClusterDirConfigLoader,
33 ClusterDirError, PIDFileError
33 ClusterDirApplication, ClusterDirError,
34 PIDFileError,
35 base_flags,
34 36 )
35 37
36 38
@@ -49,9 +51,9 b' An IPython cluster consists of 1 controller and 1 or more engines.'
49 51 This command automates the startup of these processes using a wide
50 52 range of startup methods (SSH, local processes, PBS, mpiexec,
51 53 Windows HPC Server 2008). To start a cluster with 4 engines on your
52 local host simply do 'ipcluster start -n 4'. For more complex usage
53 you will typically do 'ipcluster create -p mycluster', then edit
54 configuration files, followed by 'ipcluster start -p mycluster -n 4'.
54 local host simply do 'ipcluster start n=4'. For more complex usage
55 you will typically do 'ipcluster --create profile=mycluster', then edit
56 configuration files, followed by 'ipcluster --start -p mycluster -n 4'.
55 57 """
56 58
57 59
@@ -72,96 +74,9 b' NO_CLUSTER = 12'
72 74
73 75
74 76 #-----------------------------------------------------------------------------
75 # Command line options
77 # Main application
76 78 #-----------------------------------------------------------------------------
77
78
79 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
80
81 def _add_arguments(self):
82 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
83 # its defaults on self.parser. Instead, we will put those on
84 # default options on our subparsers.
85
86 # This has all the common options that all subcommands use
87 parent_parser1 = ArgumentParser(
88 add_help=False,
89 argument_default=SUPPRESS
90 )
91 self._add_ipython_dir(parent_parser1)
92 self._add_log_level(parent_parser1)
93
94 # This has all the common options that other subcommands use
95 parent_parser2 = ArgumentParser(
96 add_help=False,
97 argument_default=SUPPRESS
98 )
99 self._add_cluster_profile(parent_parser2)
100 self._add_cluster_dir(parent_parser2)
101 self._add_work_dir(parent_parser2)
102 paa = parent_parser2.add_argument
103 paa('--log-to-file',
104 action='store_true', dest='Global.log_to_file',
105 help='Log to a file in the log directory (default is stdout)')
106
107 # Create the object used to create the subparsers.
108 subparsers = self.parser.add_subparsers(
109 dest='Global.subcommand',
110 title='ipcluster subcommands',
111 description=
112 """ipcluster has a variety of subcommands. The general way of
113 running ipcluster is 'ipcluster <cmd> [options]'. To get help
114 on a particular subcommand do 'ipcluster <cmd> -h'."""
115 # help="For more help, type 'ipcluster <cmd> -h'",
116 )
117
118 # The "list" subcommand parser
119 parser_list = subparsers.add_parser(
120 'list',
121 parents=[parent_parser1],
122 argument_default=SUPPRESS,
123 help="List all clusters in cwd and ipython_dir.",
124 description=
125 """List all available clusters, by cluster directory, that can
126 be found in the current working directly or in the ipython
127 directory. Cluster directories are named using the convention
128 'cluster_<profile>'."""
129 )
130
131 # The "create" subcommand parser
132 parser_create = subparsers.add_parser(
133 'create',
134 parents=[parent_parser1, parent_parser2],
135 argument_default=SUPPRESS,
136 help="Create a new cluster directory.",
137 description=
138 """Create an ipython cluster directory by its profile name or
139 cluster directory path. Cluster directories contain
140 configuration, log and security related files and are named
141 using the convention 'cluster_<profile>'. By default they are
142 located in your ipython directory. Once created, you will
143 probably need to edit the configuration files in the cluster
144 directory to configure your cluster. Most users will create a
145 cluster directory by profile name,
146 'ipcluster create -p mycluster', which will put the directory
147 in '<ipython_dir>/cluster_mycluster'.
148 """
149 )
150 paa = parser_create.add_argument
151 paa('--reset-config',
152 dest='Global.reset_config', action='store_true',
153 help=
154 """Recopy the default config files to the cluster directory.
155 You will loose any modifications you have made to these files.""")
156
157 # The "start" subcommand parser
158 parser_start = subparsers.add_parser(
159 'start',
160 parents=[parent_parser1, parent_parser2],
161 argument_default=SUPPRESS,
162 help="Start a cluster.",
163 description=
164 """Start an ipython cluster by its profile name or cluster
79 start_help = """Start an ipython cluster by its profile name or cluster
165 80 directory. Cluster directories contain configuration, log and
166 81 security related files and are named using the convention
167 82 'cluster_<profile>' and should be creating using the 'start'
@@ -170,121 +85,130 b' class IPClusterAppConfigLoader(ClusterDirConfigLoader):'
170 85 using its profile name, 'ipcluster start -n 4 -p <profile>`,
171 86 otherwise use the '--cluster-dir' option.
172 87 """
173 )
174
175 paa = parser_start.add_argument
176 paa('-n', '--number',
177 type=int, dest='Global.n',
178 help='The number of engines to start.',
179 metavar='Global.n')
180 paa('--clean-logs',
181 dest='Global.clean_logs', action='store_true',
182 help='Delete old log flies before starting.')
183 paa('--no-clean-logs',
184 dest='Global.clean_logs', action='store_false',
185 help="Don't delete old log flies before starting.")
186 paa('--daemon',
187 dest='Global.daemonize', action='store_true',
188 help='Daemonize the ipcluster program. This implies --log-to-file')
189 paa('--no-daemon',
190 dest='Global.daemonize', action='store_false',
191 help="Dont't daemonize the ipcluster program.")
192 paa('--delay',
193 type=float, dest='Global.delay',
194 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
195
196 # The "stop" subcommand parser
197 parser_stop = subparsers.add_parser(
198 'stop',
199 parents=[parent_parser1, parent_parser2],
200 argument_default=SUPPRESS,
201 help="Stop a running cluster.",
202 description=
203 """Stop a running ipython cluster by its profile name or cluster
88 stop_help = """Stop a running ipython cluster by its profile name or cluster
204 89 directory. Cluster directories are named using the convention
205 90 'cluster_<profile>'. If your cluster directory is in
206 91 the cwd or the ipython directory, you can simply refer to it
207 92 using its profile name, 'ipcluster stop -p <profile>`, otherwise
208 93 use the '--cluster-dir' option.
209 94 """
210 )
211 paa = parser_stop.add_argument
212 paa('--signal',
213 dest='Global.signal', type=int,
214 help="The signal number to use in stopping the cluster (default=2).",
215 metavar="Global.signal")
216
217 # the "engines" subcommand parser
218 parser_engines = subparsers.add_parser(
219 'engines',
220 parents=[parent_parser1, parent_parser2],
221 argument_default=SUPPRESS,
222 help="Attach some engines to an existing controller or cluster.",
223 description=
224 """Start one or more engines to connect to an existing Cluster
95 engines_help = """Start one or more engines to connect to an existing Cluster
225 96 by profile name or cluster directory.
226 97 Cluster directories contain configuration, log and
227 98 security related files and are named using the convention
228 99 'cluster_<profile>' and should be creating using the 'start'
229 100 subcommand of 'ipcluster'. If your cluster directory is in
230 101 the cwd or the ipython directory, you can simply refer to it
231 using its profile name, 'ipcluster engines -n 4 -p <profile>`,
232 otherwise use the '--cluster-dir' option.
102 using its profile name, 'ipcluster --engines -n 4 -p <profile>`,
103 otherwise use the 'cluster_dir' option.
233 104 """
234 )
235 paa = parser_engines.add_argument
236 paa('-n', '--number',
237 type=int, dest='Global.n',
238 help='The number of engines to start.',
239 metavar='Global.n')
240 paa('--daemon',
241 dest='Global.daemonize', action='store_true',
242 help='Daemonize the ipcluster program. This implies --log-to-file')
243 paa('--no-daemon',
244 dest='Global.daemonize', action='store_false',
245 help="Dont't daemonize the ipcluster program.")
105 create_help = """Create an ipython cluster directory by its profile name or
106 cluster directory path. Cluster directories contain
107 configuration, log and security related files and are named
108 using the convention 'cluster_<profile>'. By default they are
109 located in your ipython directory. Once created, you will
110 probably need to edit the configuration files in the cluster
111 directory to configure your cluster. Most users will create a
112 cluster directory by profile name,
113 'ipcluster create -p mycluster', which will put the directory
114 in '<ipython_dir>/cluster_mycluster'.
115 """
116 list_help = """List all available clusters, by cluster directory, that can
117 be found in the current working directly or in the ipython
118 directory. Cluster directories are named using the convention
119 'cluster_<profile>'."""
246 120
247 #-----------------------------------------------------------------------------
248 # Main application
249 #-----------------------------------------------------------------------------
250 121
122 flags = {}
123 flags.update(base_flags)
124 flags.update({
125 'start' : ({ 'IPClusterApp': Config({'subcommand' : 'start'})} , start_help),
126 'stop' : ({ 'IPClusterApp': Config({'subcommand' : 'stop'})} , stop_help),
127 'create' : ({ 'IPClusterApp': Config({'subcommand' : 'create'})} , create_help),
128 'engines' : ({ 'IPClusterApp': Config({'subcommand' : 'engines'})} , engines_help),
129 'list' : ({ 'IPClusterApp': Config({'subcommand' : 'list'})} , list_help),
251 130
252 class IPClusterApp(ApplicationWithClusterDir):
131 })
132
133 class IPClusterApp(ClusterDirApplication):
253 134
254 135 name = u'ipcluster'
255 136 description = _description
256 137 usage = None
257 command_line_loader = IPClusterAppConfigLoader
258 138 default_config_file_name = default_config_file_name
259 139 default_log_level = logging.INFO
260 140 auto_create_cluster_dir = False
141 classes = List()
142 def _classes_default(self,):
143 from IPython.parallel.apps import launcher
144 return launcher.all_launchers
145
146 n = Int(0, config=True,
147 help="The number of engines to start.")
148 signal = Int(signal.SIGINT, config=True,
149 help="signal to use for stopping. [default: SIGINT]")
150 delay = CFloat(1., config=True,
151 help="delay (in s) between starting the controller and the engines")
152
153 subcommand = Str('', config=True,
154 help="""ipcluster has a variety of subcommands. The general way of
155 running ipcluster is 'ipcluster --<cmd> [options]'."""
156 )
261 157
262 def create_default_config(self):
263 super(IPClusterApp, self).create_default_config()
264 self.default_config.Global.controller_launcher = \
265 'IPython.parallel.apps.launcher.LocalControllerLauncher'
266 self.default_config.Global.engine_launcher = \
267 'IPython.parallel.apps.launcher.LocalEngineSetLauncher'
268 self.default_config.Global.n = 2
269 self.default_config.Global.delay = 2
270 self.default_config.Global.reset_config = False
271 self.default_config.Global.clean_logs = True
272 self.default_config.Global.signal = signal.SIGINT
273 self.default_config.Global.daemonize = False
274
275 def find_resources(self):
276 subcommand = self.command_line_config.Global.subcommand
158 controller_launcher_class = Str('IPython.parallel.apps.launcher.LocalControllerLauncher',
159 config=True,
160 help="The class for launching a Controller."
161 )
162 engine_launcher_class = Str('IPython.parallel.apps.launcher.LocalEngineSetLauncher',
163 config=True,
164 help="The class for launching Engines."
165 )
166 reset = Bool(False, config=True,
167 help="Whether to reset config files as part of '--create'."
168 )
169 daemonize = Bool(False, config=True,
170 help='Daemonize the ipcluster program. This implies --log-to-file')
171
172 def _daemonize_changed(self, name, old, new):
173 if new:
174 self.log_to_file = True
175
176 def _n_changed(self, name, old, new):
177 # propagate n all over the place...
178 # TODO make this clean
179 # ensure all classes are covered.
180 self.config.LocalEngineSetLauncher.n=new
181 self.config.MPIExecEngineSetLauncher.n=new
182 self.config.SSHEngineSetLauncher.n=new
183 self.config.PBSEngineSetLauncher.n=new
184 self.config.SGEEngineSetLauncher.n=new
185 self.config.WinHPEngineSetLauncher.n=new
186
187 aliases = Dict(dict(
188 n='IPClusterApp.n',
189 signal = 'IPClusterApp.signal',
190 delay = 'IPClusterApp.delay',
191 clauncher = 'IPClusterApp.controller_launcher_class',
192 elauncher = 'IPClusterApp.engine_launcher_class',
193 ))
194 flags = Dict(flags)
195
196 def init_clusterdir(self):
197 subcommand = self.subcommand
277 198 if subcommand=='list':
278 199 self.list_cluster_dirs()
279 # Exit immediately because there is nothing left to do.
280 self.exit()
281 elif subcommand=='create':
200 self.exit(0)
201 if subcommand=='create':
202 reset = self.reset_config
282 203 self.auto_create_cluster_dir = True
283 super(IPClusterApp, self).find_resources()
204 super(IPClusterApp, self).init_clusterdir()
205 self.log.info('Copying default config files to cluster directory '
206 '[overwrite=%r]' % (reset,))
207 self.cluster_dir.copy_all_config_files(overwrite=reset)
284 208 elif subcommand=='start' or subcommand=='stop':
285 209 self.auto_create_cluster_dir = True
286 210 try:
287 super(IPClusterApp, self).find_resources()
211 super(IPClusterApp, self).init_clusterdir()
288 212 except ClusterDirError:
289 213 raise ClusterDirError(
290 214 "Could not find a cluster directory. A cluster dir must "
@@ -295,7 +219,7 b' class IPClusterApp(ApplicationWithClusterDir):'
295 219 elif subcommand=='engines':
296 220 self.auto_create_cluster_dir = False
297 221 try:
298 super(IPClusterApp, self).find_resources()
222 super(IPClusterApp, self).init_clusterdir()
299 223 except ClusterDirError:
300 224 raise ClusterDirError(
301 225 "Could not find a cluster directory. A cluster dir must "
@@ -312,9 +236,9 b' class IPClusterApp(ApplicationWithClusterDir):'
312 236 else:
313 237 cluster_dir_paths = []
314 238 try:
315 ipython_dir = self.command_line_config.Global.ipython_dir
239 ipython_dir = self.ipython_dir
316 240 except AttributeError:
317 ipython_dir = self.default_config.Global.ipython_dir
241 ipython_dir = self.ipython_dir
318 242 paths = [os.getcwd(), ipython_dir] + \
319 243 cluster_dir_paths
320 244 paths = list(set(paths))
@@ -326,32 +250,13 b' class IPClusterApp(ApplicationWithClusterDir):'
326 250 full_path = os.path.join(path, f)
327 251 if os.path.isdir(full_path) and f.startswith('cluster_'):
328 252 profile = full_path.split('_')[-1]
329 start_cmd = 'ipcluster start -p %s -n 4' % profile
253 start_cmd = 'ipcluster --start profile=%s n=4' % profile
330 254 print start_cmd + " ==> " + full_path
331 255
332 def pre_construct(self):
333 # IPClusterApp.pre_construct() is where we cd to the working directory.
334 super(IPClusterApp, self).pre_construct()
335 config = self.master_config
336 try:
337 daemon = config.Global.daemonize
338 if daemon:
339 config.Global.log_to_file = True
340 except AttributeError:
341 pass
342
343 def construct(self):
344 config = self.master_config
345 subcmd = config.Global.subcommand
346 reset = config.Global.reset_config
347 if subcmd == 'list':
348 return
349 if subcmd == 'create':
350 self.log.info('Copying default config files to cluster directory '
351 '[overwrite=%r]' % (reset,))
352 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
256 def init_launchers(self):
257 config = self.config
258 subcmd = self.subcommand
353 259 if subcmd =='start':
354 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
355 260 self.start_logging()
356 261 self.loop = ioloop.IOLoop.instance()
357 262 # reactor.callWhenRunning(self.start_launchers)
@@ -366,16 +271,19 b' class IPClusterApp(ApplicationWithClusterDir):'
366 271 dc.start()
367 272
368 273 def start_launchers(self, controller=True):
369 config = self.master_config
274 config = self.config
370 275
371 276 # Create the launchers. In both bases, we set the work_dir of
372 277 # the launcher to the cluster_dir. This is where the launcher's
373 278 # subprocesses will be launched. It is not where the controller
374 279 # and engine will be launched.
375 280 if controller:
376 cl_class = import_item(config.Global.controller_launcher)
281 clsname = self.controller_launcher_class
282 if '.' not in clsname:
283 clsname = 'IPython.parallel.apps.launcher.'+clsname
284 cl_class = import_item(clsname)
377 285 self.controller_launcher = cl_class(
378 work_dir=self.cluster_dir, config=config,
286 work_dir=self.cluster_dir.location, config=config,
379 287 logname=self.log.name
380 288 )
381 289 # Setup the observing of stopping. If the controller dies, shut
@@ -391,9 +299,15 b' class IPClusterApp(ApplicationWithClusterDir):'
391 299 else:
392 300 self.controller_launcher = None
393 301
394 el_class = import_item(config.Global.engine_launcher)
302 clsname = self.engine_launcher_class
303 if '.' not in clsname:
304 # not a module, presume it's the raw name in apps.launcher
305 clsname = 'IPython.parallel.apps.launcher.'+clsname
306 print repr(clsname)
307 el_class = import_item(clsname)
308
395 309 self.engine_launcher = el_class(
396 work_dir=self.cluster_dir, config=config, logname=self.log.name
310 work_dir=self.cluster_dir.location, config=config, logname=self.log.name
397 311 )
398 312
399 313 # Setup signals
@@ -403,7 +317,7 b' class IPClusterApp(ApplicationWithClusterDir):'
403 317 self._stopping = False # Make sure stop_launchers is not called 2x.
404 318 if controller:
405 319 self.start_controller()
406 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
320 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay*controller, self.loop)
407 321 dc.start()
408 322 self.startup_message()
409 323
@@ -413,19 +327,19 b' class IPClusterApp(ApplicationWithClusterDir):'
413 327
414 328 def start_controller(self, r=None):
415 329 # self.log.info("In start_controller")
416 config = self.master_config
330 config = self.config
417 331 d = self.controller_launcher.start(
418 cluster_dir=config.Global.cluster_dir
332 cluster_dir=self.cluster_dir.location
419 333 )
420 334 return d
421 335
422 336 def start_engines(self, r=None):
423 337 # self.log.info("In start_engines")
424 config = self.master_config
338 config = self.config
425 339
426 340 d = self.engine_launcher.start(
427 config.Global.n,
428 cluster_dir=config.Global.cluster_dir
341 self.n,
342 cluster_dir=self.cluster_dir.location
429 343 )
430 344 return d
431 345
@@ -469,18 +383,19 b' class IPClusterApp(ApplicationWithClusterDir):'
469 383
470 384 def start_logging(self):
471 385 # Remove old log files of the controller and engine
472 if self.master_config.Global.clean_logs:
473 log_dir = self.master_config.Global.log_dir
386 if self.clean_logs:
387 log_dir = self.cluster_dir.log_dir
474 388 for f in os.listdir(log_dir):
475 389 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
476 390 os.remove(os.path.join(log_dir, f))
477 391 # This will remove old log files for ipcluster itself
478 super(IPClusterApp, self).start_logging()
392 # super(IPClusterApp, self).start_logging()
479 393
480 def start_app(self):
394 def start(self):
481 395 """Start the application, depending on what subcommand is used."""
482 subcmd = self.master_config.Global.subcommand
483 if subcmd=='create' or subcmd=='list':
396 subcmd = self.subcommand
397 if subcmd=='create':
398 # init_clusterdir step completed create action
484 399 return
485 400 elif subcmd=='start':
486 401 self.start_app_start()
@@ -488,10 +403,14 b' class IPClusterApp(ApplicationWithClusterDir):'
488 403 self.start_app_stop()
489 404 elif subcmd=='engines':
490 405 self.start_app_engines()
406 else:
407 self.log.fatal("one command of '--start', '--stop', '--list', '--create', '--engines'"
408 " must be specified")
409 self.exit(-1)
491 410
492 411 def start_app_start(self):
493 412 """Start the app for the start subcommand."""
494 config = self.master_config
413 config = self.config
495 414 # First see if the cluster is already running
496 415 try:
497 416 pid = self.get_pid_from_file()
@@ -512,10 +431,10 b' class IPClusterApp(ApplicationWithClusterDir):'
512 431
513 432 # Now log and daemonize
514 433 self.log.info(
515 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
434 'Starting ipcluster with [daemon=%r]' % self.daemonize
516 435 )
517 436 # TODO: Get daemonize working on Windows or as a Windows Server.
518 if config.Global.daemonize:
437 if self.daemonize:
519 438 if os.name=='posix':
520 439 from twisted.scripts._twistd_unix import daemonize
521 440 daemonize()
@@ -536,15 +455,15 b' class IPClusterApp(ApplicationWithClusterDir):'
536 455
537 456 def start_app_engines(self):
538 457 """Start the app for the start subcommand."""
539 config = self.master_config
458 config = self.config
540 459 # First see if the cluster is already running
541 460
542 461 # Now log and daemonize
543 462 self.log.info(
544 'Starting engines with [daemon=%r]' % config.Global.daemonize
463 'Starting engines with [daemon=%r]' % self.daemonize
545 464 )
546 465 # TODO: Get daemonize working on Windows or as a Windows Server.
547 if config.Global.daemonize:
466 if self.daemonize:
548 467 if os.name=='posix':
549 468 from twisted.scripts._twistd_unix import daemonize
550 469 daemonize()
@@ -564,7 +483,7 b' class IPClusterApp(ApplicationWithClusterDir):'
564 483
565 484 def start_app_stop(self):
566 485 """Start the app for the stop subcommand."""
567 config = self.master_config
486 config = self.config
568 487 try:
569 488 pid = self.get_pid_from_file()
570 489 except PIDFileError:
@@ -586,7 +505,7 b' class IPClusterApp(ApplicationWithClusterDir):'
586 505 self.exit(ALREADY_STOPPED)
587 506
588 507 elif os.name=='posix':
589 sig = config.Global.signal
508 sig = self.signal
590 509 self.log.info(
591 510 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
592 511 )
@@ -609,6 +528,20 b' class IPClusterApp(ApplicationWithClusterDir):'
609 528 def launch_new_instance():
610 529 """Create and run the IPython cluster."""
611 530 app = IPClusterApp()
531 app.parse_command_line()
532 cl_config = app.config
533 app.init_clusterdir()
534 if app.config_file:
535 app.load_config_file(app.config_file)
536 else:
537 app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
538 # command-line should *override* config file, but command-line is necessary
539 # to determine clusterdir, etc.
540 app.update_config(cl_config)
541
542 app.to_work_dir()
543 app.init_launchers()
544
612 545 app.start()
613 546
614 547
@@ -25,7 +25,10 b' import stat'
25 25 import sys
26 26 import uuid
27 27
28 from multiprocessing import Process
29
28 30 import zmq
31 from zmq.devices import ProcessMonitoredQueue
29 32 from zmq.log.handlers import PUBHandler
30 33 from zmq.utils import jsonapi as json
31 34
@@ -34,14 +37,31 b' from IPython.config.loader import Config'
34 37 from IPython.parallel import factory
35 38
36 39 from IPython.parallel.apps.clusterdir import (
37 ApplicationWithClusterDir,
38 ClusterDirConfigLoader
40 ClusterDir,
41 ClusterDirApplication,
42 base_flags
43 # ClusterDirConfigLoader
39 44 )
40 from IPython.parallel.util import disambiguate_ip_address, split_url
41 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
42 from IPython.utils.traitlets import Instance, Unicode
45 from IPython.utils.importstring import import_item
46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, CStr, Dict
47
48 # from IPython.parallel.controller.controller import ControllerFactory
49 from IPython.parallel.streamsession import StreamSession
50 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 from IPython.parallel.controller.hub import Hub, HubFactory
52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 from IPython.parallel.controller.sqlitedb import SQLiteDB
54
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
43 56
44 from IPython.parallel.controller.controller import ControllerFactory
57 # conditional import of MongoDB backend class
58
59 try:
60 from IPython.parallel.controller.mongodb import MongoDB
61 except ImportError:
62 maybe_mongo = []
63 else:
64 maybe_mongo = [MongoDB]
45 65
46 66
47 67 #-----------------------------------------------------------------------------
@@ -63,234 +83,102 b' your ipython directory and named as "cluster_<profile>". See the --profile'
63 83 and --cluster-dir options for details.
64 84 """
65 85
66 #-----------------------------------------------------------------------------
67 # Default interfaces
68 #-----------------------------------------------------------------------------
69
70 # The default client interfaces for FCClientServiceFactory.interfaces
71 default_client_interfaces = Config()
72 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
73
74 # Make this a dict we can pass to Config.__init__ for the default
75 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
76
77
78 86
79 # The default engine interfaces for FCEngineServiceFactory.interfaces
80 default_engine_interfaces = Config()
81 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
82
83 # Make this a dict we can pass to Config.__init__ for the default
84 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
85
86
87 #-----------------------------------------------------------------------------
88 # Service factories
89 #-----------------------------------------------------------------------------
90
91 #
92 # class FCClientServiceFactory(FCServiceFactory):
93 # """A Foolscap implementation of the client services."""
94 #
95 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
96 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
97 # allow_none=False, config=True)
98 #
99 #
100 # class FCEngineServiceFactory(FCServiceFactory):
101 # """A Foolscap implementation of the engine services."""
102 #
103 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
104 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
105 # allow_none=False, config=True)
106 #
107
108 #-----------------------------------------------------------------------------
109 # Command line options
110 #-----------------------------------------------------------------------------
111
112
113 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
114
115 def _add_arguments(self):
116 super(IPControllerAppConfigLoader, self)._add_arguments()
117 paa = self.parser.add_argument
118
119 ## Hub Config:
120 paa('--mongodb',
121 dest='HubFactory.db_class', action='store_const',
122 const='IPython.parallel.controller.mongodb.MongoDB',
123 help='Use MongoDB for task storage [default: in-memory]')
124 paa('--sqlite',
125 dest='HubFactory.db_class', action='store_const',
126 const='IPython.parallel.controller.sqlitedb.SQLiteDB',
127 help='Use SQLite3 for DB task storage [default: in-memory]')
128 paa('--hb',
129 type=int, dest='HubFactory.hb', nargs=2,
130 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
131 'connections [default: random]',
132 metavar='Hub.hb_ports')
133 paa('--ping',
134 type=int, dest='HubFactory.ping',
135 help='The frequency at which the Hub pings the engines for heartbeats '
136 ' (in ms) [default: 100]',
137 metavar='Hub.ping')
138
139 # Client config
140 paa('--client-ip',
141 type=str, dest='HubFactory.client_ip',
142 help='The IP address or hostname the Hub will listen on for '
143 'client connections. Both engine-ip and client-ip can be set simultaneously '
144 'via --ip [default: loopback]',
145 metavar='Hub.client_ip')
146 paa('--client-transport',
147 type=str, dest='HubFactory.client_transport',
148 help='The ZeroMQ transport the Hub will use for '
149 'client connections. Both engine-transport and client-transport can be set simultaneously '
150 'via --transport [default: tcp]',
151 metavar='Hub.client_transport')
152 paa('--query',
153 type=int, dest='HubFactory.query_port',
154 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
155 metavar='Hub.query_port')
156 paa('--notifier',
157 type=int, dest='HubFactory.notifier_port',
158 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
159 metavar='Hub.notifier_port')
160
161 # Engine config
162 paa('--engine-ip',
163 type=str, dest='HubFactory.engine_ip',
164 help='The IP address or hostname the Hub will listen on for '
165 'engine connections. This applies to the Hub and its schedulers'
166 'engine-ip and client-ip can be set simultaneously '
167 'via --ip [default: loopback]',
168 metavar='Hub.engine_ip')
169 paa('--engine-transport',
170 type=str, dest='HubFactory.engine_transport',
171 help='The ZeroMQ transport the Hub will use for '
172 'client connections. Both engine-transport and client-transport can be set simultaneously '
173 'via --transport [default: tcp]',
174 metavar='Hub.engine_transport')
175
176 # Scheduler config
177 paa('--mux',
178 type=int, dest='ControllerFactory.mux', nargs=2,
179 help='The (2) ports the MUX scheduler will listen on for client,engine '
180 'connections, respectively [default: random]',
181 metavar='Scheduler.mux_ports')
182 paa('--task',
183 type=int, dest='ControllerFactory.task', nargs=2,
184 help='The (2) ports the Task scheduler will listen on for client,engine '
185 'connections, respectively [default: random]',
186 metavar='Scheduler.task_ports')
187 paa('--control',
188 type=int, dest='ControllerFactory.control', nargs=2,
189 help='The (2) ports the Control scheduler will listen on for client,engine '
190 'connections, respectively [default: random]',
191 metavar='Scheduler.control_ports')
192 paa('--iopub',
193 type=int, dest='ControllerFactory.iopub', nargs=2,
194 help='The (2) ports the IOPub scheduler will listen on for client,engine '
195 'connections, respectively [default: random]',
196 metavar='Scheduler.iopub_ports')
197
198 paa('--scheme',
199 type=str, dest='HubFactory.scheme',
200 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
201 help='select the task scheduler scheme [default: Python LRU]',
202 metavar='Scheduler.scheme')
203 paa('--usethreads',
204 dest='ControllerFactory.usethreads', action="store_true",
205 help='Use threads instead of processes for the schedulers',
206 )
207 paa('--hwm',
208 dest='TaskScheduler.hwm', type=int,
209 help='specify the High Water Mark (HWM) '
210 'in the Python scheduler. This is the maximum number '
211 'of allowed outstanding tasks on each engine.',
212 )
213
214 ## Global config
215 paa('--log-to-file',
216 action='store_true', dest='Global.log_to_file',
217 help='Log to a file in the log directory (default is stdout)')
218 paa('--log-url',
219 type=str, dest='Global.log_url',
220 help='Broadcast logs to an iploggerz process [default: disabled]')
221 paa('-r','--reuse-files',
222 action='store_true', dest='Global.reuse_files',
223 help='Try to reuse existing json connection files.')
224 paa('--no-secure',
225 action='store_false', dest='Global.secure',
226 help='Turn off execution keys (default).')
227 paa('--secure',
228 action='store_true', dest='Global.secure',
229 help='Turn on execution keys.')
230 paa('--execkey',
231 type=str, dest='Global.exec_key',
232 help='path to a file containing an execution key.',
233 metavar='keyfile')
234 paa('--ssh',
235 type=str, dest='Global.sshserver',
236 help='ssh url for clients to use when connecting to the Controller '
237 'processes. It should be of the form: [user@]server[:port]. The '
238 'Controller\'s listening addresses must be accessible from the ssh server',
239 metavar='Global.sshserver')
240 paa('--location',
241 type=str, dest='Global.location',
242 help="The external IP or domain name of this machine, used for disambiguating "
243 "engine and client connections.",
244 metavar='Global.location')
245 factory.add_session_arguments(self.parser)
246 factory.add_registration_arguments(self.parser)
247 87
248 88
249 89 #-----------------------------------------------------------------------------
250 90 # The main application
251 91 #-----------------------------------------------------------------------------
252
253
254 class IPControllerApp(ApplicationWithClusterDir):
92 flags = {}
93 flags.update(base_flags)
94 flags.update({
95 'usethreads' : ( {'IPControllerApp' : {'usethreads' : True}},
96 'Use threads instead of processes for the schedulers'),
97 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
98 'use the SQLiteDB backend'),
99 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
100 'use the MongoDB backend'),
101 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
102 'use the in-memory DictDB backend'),
103 })
104
105 flags.update()
106
107 class IPControllerApp(ClusterDirApplication):
255 108
256 109 name = u'ipcontroller'
257 110 description = _description
258 command_line_loader = IPControllerAppConfigLoader
111 # command_line_loader = IPControllerAppConfigLoader
259 112 default_config_file_name = default_config_file_name
260 113 auto_create_cluster_dir = True
114 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
261 115
262
263 def create_default_config(self):
264 super(IPControllerApp, self).create_default_config()
265 # Don't set defaults for Global.secure or Global.reuse_furls
266 # as those are set in a component.
267 self.default_config.Global.import_statements = []
268 self.default_config.Global.clean_logs = True
269 self.default_config.Global.secure = True
270 self.default_config.Global.reuse_files = False
271 self.default_config.Global.exec_key = "exec_key.key"
272 self.default_config.Global.sshserver = None
273 self.default_config.Global.location = None
274
275 def pre_construct(self):
276 super(IPControllerApp, self).pre_construct()
277 c = self.master_config
278 # The defaults for these are set in FCClientServiceFactory and
279 # FCEngineServiceFactory, so we only set them here if the global
280 # options have be set to override the class level defaults.
116 reuse_files = Bool(False, config=True,
117 help='Whether to reuse existing json connection files [default: False]'
118 )
119 secure = Bool(True, config=True,
120 help='Whether to use exec_keys for extra authentication [default: True]'
121 )
122 ssh_server = Unicode(u'', config=True,
123 help="""ssh url for clients to use when connecting to the Controller
124 processes. It should be of the form: [user@]server[:port]. The
125 Controller\'s listening addresses must be accessible from the ssh server""",
126 )
127 location = Unicode(u'', config=True,
128 help="""The external IP or domain name of the Controller, used for disambiguating
129 engine and client connections.""",
130 )
131 import_statements = List([], config=True,
132 help="import statements to be run at startup. Necessary in some environments"
133 )
134
135 usethreads = Bool(False, config=True,
136 help='Use threads instead of processes for the schedulers',
137 )
138
139 # internal
140 children = List()
141 mq_class = CStr('zmq.devices.ProcessMonitoredQueue')
142
143 def _usethreads_changed(self, name, old, new):
144 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
145
146 aliases = Dict(dict(
147 config = 'IPControllerApp.config_file',
148 # file = 'IPControllerApp.url_file',
149 log_level = 'IPControllerApp.log_level',
150 reuse_files = 'IPControllerApp.reuse_files',
151 secure = 'IPControllerApp.secure',
152 ssh = 'IPControllerApp.ssh_server',
153 usethreads = 'IPControllerApp.usethreads',
154 import_statements = 'IPControllerApp.import_statements',
155 location = 'IPControllerApp.location',
156
157 ident = 'StreamSession.session',
158 user = 'StreamSession.username',
159 exec_key = 'StreamSession.keyfile',
160
161 url = 'HubFactory.url',
162 ip = 'HubFactory.ip',
163 transport = 'HubFactory.transport',
164 port = 'HubFactory.regport',
165
166 ping = 'HeartMonitor.period',
167
168 scheme = 'TaskScheduler.scheme_name',
169 hwm = 'TaskScheduler.hwm',
170
171
172 profile = "ClusterDir.profile",
173 cluster_dir = 'ClusterDir.location',
281 174
282 # if hasattr(c.Global, 'reuse_furls'):
283 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
284 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
285 # del c.Global.reuse_furls
286 # if hasattr(c.Global, 'secure'):
287 # c.FCClientServiceFactory.secure = c.Global.secure
288 # c.FCEngineServiceFactory.secure = c.Global.secure
289 # del c.Global.secure
175 ))
176 flags = Dict(flags)
290 177
178
291 179 def save_connection_dict(self, fname, cdict):
292 180 """save a connection dict to json file."""
293 c = self.master_config
181 c = self.config
294 182 url = cdict['url']
295 183 location = cdict['location']
296 184 if not location:
@@ -301,43 +189,43 b' class IPControllerApp(ApplicationWithClusterDir):'
301 189 else:
302 190 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
303 191 cdict['location'] = location
304 fname = os.path.join(c.Global.security_dir, fname)
192 fname = os.path.join(self.cluster_dir.security_dir, fname)
305 193 with open(fname, 'w') as f:
306 194 f.write(json.dumps(cdict, indent=2))
307 195 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
308 196
309 197 def load_config_from_json(self):
310 198 """load config from existing json connector files."""
311 c = self.master_config
199 c = self.config
312 200 # load from engine config
313 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
201 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
314 202 cfg = json.loads(f.read())
315 key = c.SessionFactory.exec_key = cfg['exec_key']
203 key = c.StreamSession.key = cfg['exec_key']
316 204 xport,addr = cfg['url'].split('://')
317 205 c.HubFactory.engine_transport = xport
318 206 ip,ports = addr.split(':')
319 207 c.HubFactory.engine_ip = ip
320 208 c.HubFactory.regport = int(ports)
321 c.Global.location = cfg['location']
209 self.location = cfg['location']
322 210
323 211 # load client config
324 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
212 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
325 213 cfg = json.loads(f.read())
326 214 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
327 215 xport,addr = cfg['url'].split('://')
328 216 c.HubFactory.client_transport = xport
329 217 ip,ports = addr.split(':')
330 218 c.HubFactory.client_ip = ip
331 c.Global.sshserver = cfg['ssh']
219 self.ssh_server = cfg['ssh']
332 220 assert int(ports) == c.HubFactory.regport, "regport mismatch"
333 221
334 def construct(self):
222 def init_hub(self):
335 223 # This is the working dir by now.
336 224 sys.path.insert(0, '')
337 c = self.master_config
225 c = self.config
338 226
339 self.import_statements()
340 reusing = c.Global.reuse_files
227 self.do_import_statements()
228 reusing = self.reuse_files
341 229 if reusing:
342 230 try:
343 231 self.load_config_from_json()
@@ -346,21 +234,20 b' class IPControllerApp(ApplicationWithClusterDir):'
346 234 # check again, because reusing may have failed:
347 235 if reusing:
348 236 pass
349 elif c.Global.secure:
350 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
237 elif self.secure:
351 238 key = str(uuid.uuid4())
352 with open(keyfile, 'w') as f:
353 f.write(key)
354 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
355 c.SessionFactory.exec_key = key
239 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
240 # with open(keyfile, 'w') as f:
241 # f.write(key)
242 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
243 c.StreamSession.key = key
356 244 else:
357 c.SessionFactory.exec_key = ''
358 key = None
245 key = c.StreamSession.key = ''
359 246
360 247 try:
361 self.factory = ControllerFactory(config=c, logname=self.log.name)
362 self.start_logging()
363 self.factory.construct()
248 self.factory = HubFactory(config=c, log=self.log)
249 # self.start_logging()
250 self.factory.init_hub()
364 251 except:
365 252 self.log.error("Couldn't construct the Controller", exc_info=True)
366 253 self.exit(1)
@@ -369,21 +256,82 b' class IPControllerApp(ApplicationWithClusterDir):'
369 256 # save to new json config files
370 257 f = self.factory
371 258 cdict = {'exec_key' : key,
372 'ssh' : c.Global.sshserver,
259 'ssh' : self.ssh_server,
373 260 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
374 'location' : c.Global.location
261 'location' : self.location
375 262 }
376 263 self.save_connection_dict('ipcontroller-client.json', cdict)
377 264 edict = cdict
378 265 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
379 266 self.save_connection_dict('ipcontroller-engine.json', edict)
267
268 #
269 def init_schedulers(self):
270 children = self.children
271 mq = import_item(self.mq_class)
380 272
273 hub = self.factory
274 # maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
275 # IOPub relay (in a Process)
276 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
277 q.bind_in(hub.client_info['iopub'])
278 q.bind_out(hub.engine_info['iopub'])
279 q.setsockopt_out(zmq.SUBSCRIBE, '')
280 q.connect_mon(hub.monitor_url)
281 q.daemon=True
282 children.append(q)
283
284 # Multiplexer Queue (in a Process)
285 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
286 q.bind_in(hub.client_info['mux'])
287 q.setsockopt_in(zmq.IDENTITY, 'mux')
288 q.bind_out(hub.engine_info['mux'])
289 q.connect_mon(hub.monitor_url)
290 q.daemon=True
291 children.append(q)
292
293 # Control Queue (in a Process)
294 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
295 q.bind_in(hub.client_info['control'])
296 q.setsockopt_in(zmq.IDENTITY, 'control')
297 q.bind_out(hub.engine_info['control'])
298 q.connect_mon(hub.monitor_url)
299 q.daemon=True
300 children.append(q)
301 try:
302 scheme = self.config.TaskScheduler.scheme_name
303 except AttributeError:
304 scheme = TaskScheduler.scheme_name.get_default_value()
305 # Task Queue (in a Process)
306 if scheme == 'pure':
307 self.log.warn("task::using pure XREQ Task scheduler")
308 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
309 # q.setsockopt_out(zmq.HWM, hub.hwm)
310 q.bind_in(hub.client_info['task'][1])
311 q.setsockopt_in(zmq.IDENTITY, 'task')
312 q.bind_out(hub.engine_info['task'])
313 q.connect_mon(hub.monitor_url)
314 q.daemon=True
315 children.append(q)
316 elif scheme == 'none':
317 self.log.warn("task::using no Task scheduler")
318
319 else:
320 self.log.info("task::using Python %s Task scheduler"%scheme)
321 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
322 hub.monitor_url, hub.client_info['notification'])
323 kwargs = dict(logname=self.log.name, loglevel=self.log_level,
324 config=dict(self.config))
325 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
326 q.daemon=True
327 children.append(q)
328
381 329
382 330 def save_urls(self):
383 331 """save the registration urls to files."""
384 c = self.master_config
332 c = self.config
385 333
386 sec_dir = c.Global.security_dir
334 sec_dir = self.cluster_dir.security_dir
387 335 cf = self.factory
388 336
389 337 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
@@ -393,8 +341,8 b' class IPControllerApp(ApplicationWithClusterDir):'
393 341 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
394 342
395 343
396 def import_statements(self):
397 statements = self.master_config.Global.import_statements
344 def do_import_statements(self):
345 statements = self.import_statements
398 346 for s in statements:
399 347 try:
400 348 self.log.msg("Executing statement: '%s'" % s)
@@ -402,21 +350,32 b' class IPControllerApp(ApplicationWithClusterDir):'
402 350 except:
403 351 self.log.msg("Error running statement: %s" % s)
404 352
405 def start_logging(self):
406 super(IPControllerApp, self).start_logging()
407 if self.master_config.Global.log_url:
408 context = self.factory.context
409 lsock = context.socket(zmq.PUB)
410 lsock.connect(self.master_config.Global.log_url)
411 handler = PUBHandler(lsock)
412 handler.root_topic = 'controller'
413 handler.setLevel(self.log_level)
414 self.log.addHandler(handler)
415 #
416 def start_app(self):
353 # def start_logging(self):
354 # super(IPControllerApp, self).start_logging()
355 # if self.config.Global.log_url:
356 # context = self.factory.context
357 # lsock = context.socket(zmq.PUB)
358 # lsock.connect(self.config.Global.log_url)
359 # handler = PUBHandler(lsock)
360 # handler.root_topic = 'controller'
361 # handler.setLevel(self.log_level)
362 # self.log.addHandler(handler)
363 # #
364 def start(self):
417 365 # Start the subprocesses:
418 366 self.factory.start()
367 child_procs = []
368 for child in self.children:
369 child.start()
370 if isinstance(child, ProcessMonitoredQueue):
371 child_procs.append(child.launcher)
372 elif isinstance(child, Process):
373 child_procs.append(child)
374 if child_procs:
375 signal_children(child_procs)
376
419 377 self.write_pid_file(overwrite=True)
378
420 379 try:
421 380 self.factory.loop.start()
422 381 except KeyboardInterrupt:
@@ -426,6 +385,22 b' class IPControllerApp(ApplicationWithClusterDir):'
426 385 def launch_new_instance():
427 386 """Create and run the IPython controller"""
428 387 app = IPControllerApp()
388 app.parse_command_line()
389 cl_config = app.config
390 # app.load_config_file()
391 app.init_clusterdir()
392 if app.config_file:
393 app.load_config_file(app.config_file)
394 else:
395 app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
396 # command-line should *override* config file, but command-line is necessary
397 # to determine clusterdir, etc.
398 app.update_config(cl_config)
399
400 app.to_work_dir()
401 app.init_hub()
402 app.init_schedulers()
403
429 404 app.start()
430 405
431 406
@@ -23,17 +23,21 b' import zmq'
23 23 from zmq.eventloop import ioloop
24 24
25 25 from IPython.parallel.apps.clusterdir import (
26 ApplicationWithClusterDir,
27 ClusterDirConfigLoader
26 ClusterDirApplication,
27 ClusterDir,
28 base_aliases,
29 # ClusterDirConfigLoader
28 30 )
29 31 from IPython.zmq.log import EnginePUBHandler
30 32
31 from IPython.parallel import factory
33 from IPython.config.configurable import Configurable
34 from IPython.parallel.streamsession import StreamSession
32 35 from IPython.parallel.engine.engine import EngineFactory
33 36 from IPython.parallel.engine.streamkernel import Kernel
34 37 from IPython.parallel.util import disambiguate_url
35 38
36 39 from IPython.utils.importstring import import_item
40 from IPython.utils.traitlets import Str, Bool, Unicode, Dict, List, CStr
37 41
38 42
39 43 #-----------------------------------------------------------------------------
@@ -43,6 +47,20 b' from IPython.utils.importstring import import_item'
43 47 #: The default config file name for this application
44 48 default_config_file_name = u'ipengine_config.py'
45 49
50 _description = """Start an IPython engine for parallel computing.\n\n
51
52 IPython engines run in parallel and perform computations on behalf of a client
53 and controller. A controller needs to be started before the engines. The
54 engine can be configured using command line options or using a cluster
55 directory. Cluster directories contain config, log and security files and are
56 usually located in your ipython directory and named as "cluster_<profile>".
57 See the `profile` and `cluster_dir` options for details.
58 """
59
60
61 #-----------------------------------------------------------------------------
62 # MPI configuration
63 #-----------------------------------------------------------------------------
46 64
47 65 mpi4py_init = """from mpi4py import MPI as mpi
48 66 mpi.size = mpi.COMM_WORLD.Get_size()
@@ -58,123 +76,75 b' mpi.rank = 0'
58 76 mpi.size = 0
59 77 """
60 78
79 class MPI(Configurable):
80 """Configurable for MPI initialization"""
81 use = Str('', config=True,
82 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
83 )
61 84
62 _description = """Start an IPython engine for parallel computing.\n\n
85 def _on_use_changed(self, old, new):
86 # load default init script if it's not set
87 if not self.init_script:
88 self.init_script = self.default_inits.get(new, '')
89
90 init_script = Str('', config=True,
91 help="Initialization code for MPI")
92
93 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
94 config=True)
63 95
64 IPython engines run in parallel and perform computations on behalf of a client
65 and controller. A controller needs to be started before the engines. The
66 engine can be configured using command line options or using a cluster
67 directory. Cluster directories contain config, log and security files and are
68 usually located in your ipython directory and named as "cluster_<profile>".
69 See the --profile and --cluster-dir options for details.
70 """
71 96
72 97 #-----------------------------------------------------------------------------
73 # Command line options
98 # Main application
74 99 #-----------------------------------------------------------------------------
75 100
76 101
77 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
78
79 def _add_arguments(self):
80 super(IPEngineAppConfigLoader, self)._add_arguments()
81 paa = self.parser.add_argument
82 # Controller config
83 paa('--file', '-f',
84 type=unicode, dest='Global.url_file',
85 help='The full location of the file containing the connection information fo '
86 'controller. If this is not given, the file must be in the '
87 'security directory of the cluster directory. This location is '
88 'resolved using the --profile and --app-dir options.',
89 metavar='Global.url_file')
90 # MPI
91 paa('--mpi',
92 type=str, dest='MPI.use',
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
94 metavar='MPI.use')
95 # Global config
96 paa('--log-to-file',
97 action='store_true', dest='Global.log_to_file',
98 help='Log to a file in the log directory (default is stdout)')
99 paa('--log-url',
100 dest='Global.log_url',
101 help="url of ZMQ logger, as started with iploggerz")
102 # paa('--execkey',
103 # type=str, dest='Global.exec_key',
104 # help='path to a file containing an execution key.',
105 # metavar='keyfile')
106 # paa('--no-secure',
107 # action='store_false', dest='Global.secure',
108 # help='Turn off execution keys.')
109 # paa('--secure',
110 # action='store_true', dest='Global.secure',
111 # help='Turn on execution keys (default).')
112 # init command
113 paa('-c',
114 type=str, dest='Global.extra_exec_lines',
102 class IPEngineApp(ClusterDirApplication):
103
104 app_name = Unicode(u'ipengine')
105 description = Unicode(_description)
106 default_config_file_name = default_config_file_name
107 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
108
109 startup_script = Unicode(u'', config=True,
110 help='specify a script to be run at startup')
111 startup_command = Str('', config=True,
115 112 help='specify a command to be run at startup')
116 paa('-s',
117 type=unicode, dest='Global.extra_exec_file',
118 help='specify a script to be run at startup')
119
120 factory.add_session_arguments(self.parser)
121 factory.add_registration_arguments(self.parser)
122 113
114 url_file = Unicode(u'', config=True,
115 help="""The full location of the file containing the connection information for
116 the controller. If this is not given, the file must be in the
117 security directory of the cluster directory. This location is
118 resolved using the `profile` or `cluster_dir` options.""",
119 )
123 120
124 #-----------------------------------------------------------------------------
125 # Main application
126 #-----------------------------------------------------------------------------
121 url_file_name = Unicode(u'ipcontroller-engine.json')
127 122
123 aliases = Dict(dict(
124 config = 'IPEngineApp.config_file',
125 file = 'IPEngineApp.url_file',
126 c = 'IPEngineApp.startup_command',
127 s = 'IPEngineApp.startup_script',
128 128
129 class IPEngineApp(ApplicationWithClusterDir):
129 ident = 'StreamSession.session',
130 user = 'StreamSession.username',
131 exec_key = 'StreamSession.keyfile',
130 132
131 name = u'ipengine'
132 description = _description
133 command_line_loader = IPEngineAppConfigLoader
134 default_config_file_name = default_config_file_name
135 auto_create_cluster_dir = True
136
137 def create_default_config(self):
138 super(IPEngineApp, self).create_default_config()
139
140 # The engine should not clean logs as we don't want to remove the
141 # active log files of other running engines.
142 self.default_config.Global.clean_logs = False
143 self.default_config.Global.secure = True
144
145 # Global config attributes
146 self.default_config.Global.exec_lines = []
147 self.default_config.Global.extra_exec_lines = ''
148 self.default_config.Global.extra_exec_file = u''
149
150 # Configuration related to the controller
151 # This must match the filename (path not included) that the controller
152 # used for the FURL file.
153 self.default_config.Global.url_file = u''
154 self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
155 # If given, this is the actual location of the controller's FURL file.
156 # If not, this is computed using the profile, app_dir and furl_file_name
157 # self.default_config.Global.key_file_name = u'exec_key.key'
158 # self.default_config.Global.key_file = u''
159
160 # MPI related config attributes
161 self.default_config.MPI.use = ''
162 self.default_config.MPI.mpi4py = mpi4py_init
163 self.default_config.MPI.pytrilinos = pytrilinos_init
164
165 def post_load_command_line_config(self):
166 pass
167
168 def pre_construct(self):
169 super(IPEngineApp, self).pre_construct()
170 # self.find_cont_url_file()
171 self.find_url_file()
172 if self.master_config.Global.extra_exec_lines:
173 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
174 if self.master_config.Global.extra_exec_file:
175 enc = sys.getfilesystemencoding() or 'utf8'
176 cmd="execfile(%r)"%self.master_config.Global.extra_exec_file.encode(enc)
177 self.master_config.Global.exec_lines.append(cmd)
133 url = 'EngineFactory.url',
134 ip = 'EngineFactory.ip',
135 transport = 'EngineFactory.transport',
136 port = 'EngineFactory.regport',
137 location = 'EngineFactory.location',
138
139 timeout = 'EngineFactory.timeout',
140
141 profile = "ClusterDir.profile",
142 cluster_dir = 'ClusterDir.location',
143
144 mpi = 'MPI.use',
145
146 log_level = 'IPEngineApp.log_level',
147 ))
178 148
179 149 # def find_key_file(self):
180 150 # """Set the key file.
@@ -198,49 +168,58 b' class IPEngineApp(ApplicationWithClusterDir):'
198 168 Here we don't try to actually see if it exists for is valid as that
199 169 is hadled by the connection logic.
200 170 """
201 config = self.master_config
171 config = self.config
202 172 # Find the actual controller key file
203 if not config.Global.url_file:
204 try_this = os.path.join(
205 config.Global.cluster_dir,
206 config.Global.security_dir,
207 config.Global.url_file_name
173 if not self.url_file:
174 self.url_file = os.path.join(
175 self.cluster_dir.security_dir,
176 self.url_file_name
208 177 )
209 config.Global.url_file = try_this
210 178
211 def construct(self):
179 def init_engine(self):
212 180 # This is the working dir by now.
213 181 sys.path.insert(0, '')
214 config = self.master_config
182 config = self.config
183 # print config
184 self.find_url_file()
185
215 186 # if os.path.exists(config.Global.key_file) and config.Global.secure:
216 187 # config.SessionFactory.exec_key = config.Global.key_file
217 if os.path.exists(config.Global.url_file):
218 with open(config.Global.url_file) as f:
188 if os.path.exists(self.url_file):
189 with open(self.url_file) as f:
219 190 d = json.loads(f.read())
220 191 for k,v in d.iteritems():
221 192 if isinstance(v, unicode):
222 193 d[k] = v.encode()
223 194 if d['exec_key']:
224 config.SessionFactory.exec_key = d['exec_key']
195 config.StreamSession.key = d['exec_key']
225 196 d['url'] = disambiguate_url(d['url'], d['location'])
226 config.RegistrationFactory.url=d['url']
197 config.EngineFactory.url = d['url']
227 198 config.EngineFactory.location = d['location']
228 199
200 try:
201 exec_lines = config.Kernel.exec_lines
202 except AttributeError:
203 config.Kernel.exec_lines = []
204 exec_lines = config.Kernel.exec_lines
229 205
230
231 config.Kernel.exec_lines = config.Global.exec_lines
232
233 self.start_mpi()
206 if self.startup_script:
207 enc = sys.getfilesystemencoding() or 'utf8'
208 cmd="execfile(%r)"%self.startup_script.encode(enc)
209 exec_lines.append(cmd)
210 if self.startup_command:
211 exec_lines.append(self.startup_command)
234 212
235 # Create the underlying shell class and EngineService
213 # Create the underlying shell class and Engine
236 214 # shell_class = import_item(self.master_config.Global.shell_class)
215 # print self.config
237 216 try:
238 self.engine = EngineFactory(config=config, logname=self.log.name)
217 self.engine = EngineFactory(config=config, log=self.log)
239 218 except:
240 219 self.log.error("Couldn't start the Engine", exc_info=True)
241 220 self.exit(1)
242 221
243 self.start_logging()
222 # self.start_logging()
244 223
245 224 # Create the service hierarchy
246 225 # self.main_service = service.MultiService()
@@ -258,22 +237,22 b' class IPEngineApp(ApplicationWithClusterDir):'
258 237
259 238 # reactor.callWhenRunning(self.call_connect)
260 239
261
262 def start_logging(self):
263 super(IPEngineApp, self).start_logging()
264 if self.master_config.Global.log_url:
265 context = self.engine.context
266 lsock = context.socket(zmq.PUB)
267 lsock.connect(self.master_config.Global.log_url)
268 handler = EnginePUBHandler(self.engine, lsock)
269 handler.setLevel(self.log_level)
270 self.log.addHandler(handler)
271
272 def start_mpi(self):
240 # def start_logging(self):
241 # super(IPEngineApp, self).start_logging()
242 # if self.master_config.Global.log_url:
243 # context = self.engine.context
244 # lsock = context.socket(zmq.PUB)
245 # lsock.connect(self.master_config.Global.log_url)
246 # handler = EnginePUBHandler(self.engine, lsock)
247 # handler.setLevel(self.log_level)
248 # self.log.addHandler(handler)
249 #
250 def init_mpi(self):
273 251 global mpi
274 mpikey = self.master_config.MPI.use
275 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
276 if mpi_import_statement is not None:
252 self.mpi = MPI(config=self.config)
253
254 mpi_import_statement = self.mpi.init_script
255 if mpi_import_statement:
277 256 try:
278 257 self.log.info("Initializing MPI:")
279 258 self.log.info(mpi_import_statement)
@@ -284,7 +263,7 b' class IPEngineApp(ApplicationWithClusterDir):'
284 263 mpi = None
285 264
286 265
287 def start_app(self):
266 def start(self):
288 267 self.engine.start()
289 268 try:
290 269 self.engine.loop.start()
@@ -293,8 +272,27 b' class IPEngineApp(ApplicationWithClusterDir):'
293 272
294 273
295 274 def launch_new_instance():
296 """Create and run the IPython controller"""
275 """Create and run the IPython engine"""
297 276 app = IPEngineApp()
277 app.parse_command_line()
278 cl_config = app.config
279 app.init_clusterdir()
280 # app.load_config_file()
281 # print app.config
282 if app.config_file:
283 app.load_config_file(app.config_file)
284 else:
285 app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
286
287 # command-line should *override* config file, but command-line is necessary
288 # to determine clusterdir, etc.
289 app.update_config(cl_config)
290
291 # print app.config
292 app.to_work_dir()
293 app.init_mpi()
294 app.init_engine()
295 print app.config
298 296 app.start()
299 297
300 298
@@ -21,7 +21,7 b' import sys'
21 21 import zmq
22 22
23 23 from IPython.parallel.apps.clusterdir import (
24 ApplicationWithClusterDir,
24 ClusterDirApplication,
25 25 ClusterDirConfigLoader
26 26 )
27 27 from IPython.parallel.apps.logwatcher import LogWatcher
@@ -74,7 +74,7 b' class IPLoggerAppConfigLoader(ClusterDirConfigLoader):'
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 class IPLoggerApp(ApplicationWithClusterDir):
77 class IPLoggerApp(ClusterDirApplication):
78 78
79 79 name = u'iploggerz'
80 80 description = _description
@@ -106,7 +106,7 b' class BaseLauncher(LoggingFactory):'
106 106 # This should not be used to set the work_dir for the actual engine
107 107 # and controller. Instead, use their own config files or the
108 108 # controller_args, engine_args attributes of the launchers to add
109 # the --work-dir option.
109 # the work_dir option.
110 110 work_dir = Unicode(u'.')
111 111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
112 112
@@ -328,16 +328,18 b' class LocalProcessLauncher(BaseLauncher):'
328 328 class LocalControllerLauncher(LocalProcessLauncher):
329 329 """Launch a controller as a regular external process."""
330 330
331 controller_cmd = List(ipcontroller_cmd_argv, config=True)
331 controller_cmd = List(ipcontroller_cmd_argv, config=True,
332 help="""Popen command to launch ipcontroller.""")
332 333 # Command line arguments to ipcontroller.
333 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
334 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
335 help="""command-line args to pass to ipcontroller""")
334 336
335 337 def find_args(self):
336 338 return self.controller_cmd + self.controller_args
337 339
338 340 def start(self, cluster_dir):
339 341 """Start the controller by cluster_dir."""
340 self.controller_args.extend(['--cluster-dir', cluster_dir])
342 self.controller_args.extend(['cluster_dir=%s'%cluster_dir])
341 343 self.cluster_dir = unicode(cluster_dir)
342 344 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
343 345 return super(LocalControllerLauncher, self).start()
@@ -346,10 +348,11 b' class LocalControllerLauncher(LocalProcessLauncher):'
346 348 class LocalEngineLauncher(LocalProcessLauncher):
347 349 """Launch a single engine as a regular externall process."""
348 350
349 engine_cmd = List(ipengine_cmd_argv, config=True)
351 engine_cmd = List(ipengine_cmd_argv, config=True,
352 help="""command to launch the Engine.""")
350 353 # Command line arguments for ipengine.
351 engine_args = List(
352 ['--log-to-file','--log-level', str(logging.INFO)], config=True
354 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
355 help="command-line arguments to pass to ipengine"
353 356 )
354 357
355 358 def find_args(self):
@@ -357,7 +360,7 b' class LocalEngineLauncher(LocalProcessLauncher):'
357 360
358 361 def start(self, cluster_dir):
359 362 """Start the engine by cluster_dir."""
360 self.engine_args.extend(['--cluster-dir', cluster_dir])
363 self.engine_args.extend(['cluster_dir=%s'%cluster_dir])
361 364 self.cluster_dir = unicode(cluster_dir)
362 365 return super(LocalEngineLauncher, self).start()
363 366
@@ -367,7 +370,8 b' class LocalEngineSetLauncher(BaseLauncher):'
367 370
368 371 # Command line arguments for ipengine.
369 372 engine_args = List(
370 ['--log-to-file','--log-level', str(logging.INFO)], config=True
373 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
374 help="command-line arguments to pass to ipengine"
371 375 )
372 376 # launcher class
373 377 launcher_class = LocalEngineLauncher
@@ -442,16 +446,18 b' class LocalEngineSetLauncher(BaseLauncher):'
442 446 class MPIExecLauncher(LocalProcessLauncher):
443 447 """Launch an external process using mpiexec."""
444 448
445 # The mpiexec command to use in starting the process.
446 mpi_cmd = List(['mpiexec'], config=True)
447 # The command line arguments to pass to mpiexec.
448 mpi_args = List([], config=True)
449 # The program to start using mpiexec.
450 program = List(['date'], config=True)
451 # The command line argument to the program.
452 program_args = List([], config=True)
453 # The number of instances of the program to start.
454 n = Int(1, config=True)
449 mpi_cmd = List(['mpiexec'], config=True,
450 help="The mpiexec command to use in starting the process."
451 )
452 mpi_args = List([], config=True,
453 help="The command line arguments to pass to mpiexec."
454 )
455 program = List(['date'], config=True,
456 help="The program to start via mpiexec.")
457 program_args = List([], config=True,
458 help="The command line argument to the program."
459 )
460 n = Int(1)
455 461
456 462 def find_args(self):
457 463 """Build self.args using all the fields."""
@@ -467,14 +473,17 b' class MPIExecLauncher(LocalProcessLauncher):'
467 473 class MPIExecControllerLauncher(MPIExecLauncher):
468 474 """Launch a controller using mpiexec."""
469 475
470 controller_cmd = List(ipcontroller_cmd_argv, config=True)
471 # Command line arguments to ipcontroller.
472 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
473 n = Int(1, config=False)
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 help="Popen command to launch the Contropper"
478 )
479 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
480 help="Command line arguments to pass to ipcontroller."
481 )
482 n = Int(1)
474 483
475 484 def start(self, cluster_dir):
476 485 """Start the controller by cluster_dir."""
477 self.controller_args.extend(['--cluster-dir', cluster_dir])
486 self.controller_args.extend(['cluster_dir=%s'%cluster_dir])
478 487 self.cluster_dir = unicode(cluster_dir)
479 488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
480 489 return super(MPIExecControllerLauncher, self).start(1)
@@ -486,16 +495,18 b' class MPIExecControllerLauncher(MPIExecLauncher):'
486 495
487 496 class MPIExecEngineSetLauncher(MPIExecLauncher):
488 497
489 program = List(ipengine_cmd_argv, config=True)
490 # Command line arguments for ipengine.
498 program = List(ipengine_cmd_argv, config=True,
499 help="Popen command for ipengine"
500 )
491 501 program_args = List(
492 ['--log-to-file','--log-level', str(logging.INFO)], config=True
502 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
503 help="Command line arguments for ipengine."
493 504 )
494 n = Int(1, config=True)
505 n = Int(1)
495 506
496 507 def start(self, n, cluster_dir):
497 508 """Start n engines by profile or cluster_dir."""
498 self.program_args.extend(['--cluster-dir', cluster_dir])
509 self.program_args.extend(['cluster_dir=%s'%cluster_dir])
499 510 self.cluster_dir = unicode(cluster_dir)
500 511 self.n = n
501 512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
@@ -515,13 +526,20 b' class SSHLauncher(LocalProcessLauncher):'
515 526 as well.
516 527 """
517 528
518 ssh_cmd = List(['ssh'], config=True)
519 ssh_args = List(['-tt'], config=True)
520 program = List(['date'], config=True)
521 program_args = List([], config=True)
522 hostname = CUnicode('', config=True)
523 user = CUnicode('', config=True)
524 location = CUnicode('')
529 ssh_cmd = List(['ssh'], config=True,
530 help="command for starting ssh")
531 ssh_args = List(['-tt'], config=True,
532 help="args to pass to ssh")
533 program = List(['date'], config=True,
534 help="Program to launch via ssh")
535 program_args = List([], config=True,
536 help="args to pass to remote program")
537 hostname = CUnicode('', config=True,
538 help="hostname on which to launch the program")
539 user = CUnicode('', config=True,
540 help="username for ssh")
541 location = CUnicode('', config=True,
542 help="user@hostname location for ssh in one setting")
525 543
526 544 def _hostname_changed(self, name, old, new):
527 545 if self.user:
@@ -555,21 +573,26 b' class SSHLauncher(LocalProcessLauncher):'
555 573
556 574 class SSHControllerLauncher(SSHLauncher):
557 575
558 program = List(ipcontroller_cmd_argv, config=True)
559 # Command line arguments to ipcontroller.
560 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
576 program = List(ipcontroller_cmd_argv, config=True,
577 help="remote ipcontroller command.")
578 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
579 help="Command line arguments to ipcontroller.")
561 580
562 581
563 582 class SSHEngineLauncher(SSHLauncher):
564 program = List(ipengine_cmd_argv, config=True)
583 program = List(ipengine_cmd_argv, config=True,
584 help="remote ipengine command.")
565 585 # Command line arguments for ipengine.
566 586 program_args = List(
567 ['--log-to-file','--log-level', str(logging.INFO)], config=True
587 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
588 help="Command line arguments to ipengine."
568 589 )
569 590
570 591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
571 592 launcher_class = SSHEngineLauncher
572 engines = Dict(config=True)
593 engines = Dict(config=True,
594 help="""dict of engines to launch. This is a dict by hostname of ints,
595 corresponding to the number of engines to start on that host.""")
573 596
574 597 def start(self, n, cluster_dir):
575 598 """Start engines by profile or cluster_dir.
@@ -624,17 +647,19 b' def find_job_cmd():'
624 647
625 648 class WindowsHPCLauncher(BaseLauncher):
626 649
627 # A regular expression used to get the job id from the output of the
628 # submit_command.
629 job_id_regexp = Str(r'\d+', config=True)
630 # The filename of the instantiated job script.
631 job_file_name = CUnicode(u'ipython_job.xml', config=True)
650 job_id_regexp = Str(r'\d+', config=True,
651 help="""A regular expression used to get the job id from the output of the
652 submit_command. """
653 )
654 job_file_name = CUnicode(u'ipython_job.xml', config=True,
655 help="The filename of the instantiated job script.")
632 656 # The full path to the instantiated job script. This gets made dynamically
633 657 # by combining the work_dir with the job_file_name.
634 658 job_file = CUnicode(u'')
635 # The hostname of the scheduler to submit the job to
636 scheduler = CUnicode('', config=True)
637 job_cmd = CUnicode(find_job_cmd(), config=True)
659 scheduler = CUnicode('', config=True,
660 help="The hostname of the scheduler to submit the job to.")
661 job_cmd = CUnicode(find_job_cmd(), config=True,
662 help="The command for submitting jobs.")
638 663
639 664 def __init__(self, work_dir=u'.', config=None, **kwargs):
640 665 super(WindowsHPCLauncher, self).__init__(
@@ -702,8 +727,10 b' class WindowsHPCLauncher(BaseLauncher):'
702 727
703 728 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
704 729
705 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
706 extra_args = List([], config=False)
730 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True,
731 help="WinHPC xml job file.")
732 extra_args = List([], config=False,
733 help="extra args to pass to ipcontroller")
707 734
708 735 def write_job_file(self, n):
709 736 job = IPControllerJob(config=self.config)
@@ -713,7 +740,7 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
713 740 # the controller. It is used as the base path for the stdout/stderr
714 741 # files that the scheduler redirects to.
715 742 t.work_directory = self.cluster_dir
716 # Add the --cluster-dir and from self.start().
743 # Add the cluster_dir and from self.start().
717 744 t.controller_args.extend(self.extra_args)
718 745 job.add_task(t)
719 746
@@ -726,15 +753,17 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
726 753
727 754 def start(self, cluster_dir):
728 755 """Start the controller by cluster_dir."""
729 self.extra_args = ['--cluster-dir', cluster_dir]
756 self.extra_args = ['cluster_dir=%s'%cluster_dir]
730 757 self.cluster_dir = unicode(cluster_dir)
731 758 return super(WindowsHPCControllerLauncher, self).start(1)
732 759
733 760
734 761 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
735 762
736 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
737 extra_args = List([], config=False)
763 job_file_name = CUnicode(u'ipengineset_job.xml', config=True,
764 help="jobfile for ipengines job")
765 extra_args = List([], config=False,
766 help="extra args to pas to ipengine")
738 767
739 768 def write_job_file(self, n):
740 769 job = IPEngineSetJob(config=self.config)
@@ -745,7 +774,7 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
745 774 # the engine. It is used as the base path for the stdout/stderr
746 775 # files that the scheduler redirects to.
747 776 t.work_directory = self.cluster_dir
748 # Add the --cluster-dir and from self.start().
777 # Add the cluster_dir and from self.start().
749 778 t.engine_args.extend(self.extra_args)
750 779 job.add_task(t)
751 780
@@ -758,7 +787,7 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
758 787
759 788 def start(self, n, cluster_dir):
760 789 """Start the controller by cluster_dir."""
761 self.extra_args = ['--cluster-dir', cluster_dir]
790 self.extra_args = ['cluster_dir=%s'%cluster_dir]
762 791 self.cluster_dir = unicode(cluster_dir)
763 792 return super(WindowsHPCEngineSetLauncher, self).start(n)
764 793
@@ -782,21 +811,21 b' class BatchSystemLauncher(BaseLauncher):'
782 811 """
783 812
784 813 # Subclasses must fill these in. See PBSEngineSet
785 # The name of the command line program used to submit jobs.
786 submit_command = List([''], config=True)
787 # The name of the command line program used to delete jobs.
788 delete_command = List([''], config=True)
789 # A regular expression used to get the job id from the output of the
790 # submit_command.
791 job_id_regexp = CUnicode('', config=True)
792 # The string that is the batch script template itself.
793 batch_template = CUnicode('', config=True)
794 # The file that contains the batch template
795 batch_template_file = CUnicode(u'', config=True)
796 # The filename of the instantiated batch script.
797 batch_file_name = CUnicode(u'batch_script', config=True)
798 # The PBS Queue
799 queue = CUnicode(u'', config=True)
814 submit_command = List([''], config=True,
815 help="The name of the command line program used to submit jobs.")
816 delete_command = List([''], config=True,
817 help="The name of the command line program used to delete jobs.")
818 job_id_regexp = CUnicode('', config=True,
819 help="""A regular expression used to get the job id from the output of the
820 submit_command.""")
821 batch_template = CUnicode('', config=True,
822 help="The string that is the batch script template itself.")
823 batch_template_file = CUnicode(u'', config=True,
824 help="The file that contains the batch template.")
825 batch_file_name = CUnicode(u'batch_script', config=True,
826 help="The filename of the instantiated batch script.")
827 queue = CUnicode(u'', config=True,
828 help="The PBS Queue.")
800 829
801 830 # not configurable, override in subclasses
802 831 # PBS Job Array regex
@@ -891,9 +920,12 b' class BatchSystemLauncher(BaseLauncher):'
891 920 class PBSLauncher(BatchSystemLauncher):
892 921 """A BatchSystemLauncher subclass for PBS."""
893 922
894 submit_command = List(['qsub'], config=True)
895 delete_command = List(['qdel'], config=True)
896 job_id_regexp = CUnicode(r'\d+', config=True)
923 submit_command = List(['qsub'], config=True,
924 help="The PBS submit command ['qsub']")
925 delete_command = List(['qdel'], config=True,
926 help="The PBS delete command ['qsub']")
927 job_id_regexp = CUnicode(r'\d+', config=True,
928 help="Regular expresion for identifying the job ID [r'\d+']")
897 929
898 930 batch_file = CUnicode(u'')
899 931 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
@@ -905,11 +937,12 b' class PBSLauncher(BatchSystemLauncher):'
905 937 class PBSControllerLauncher(PBSLauncher):
906 938 """Launch a controller using PBS."""
907 939
908 batch_file_name = CUnicode(u'pbs_controller', config=True)
940 batch_file_name = CUnicode(u'pbs_controller', config=True,
941 help="batch file name for the controller job.")
909 942 default_template= CUnicode("""#!/bin/sh
910 943 #PBS -V
911 944 #PBS -N ipcontroller
912 %s --log-to-file --cluster-dir $cluster_dir
945 %s --log-to-file cluster_dir $cluster_dir
913 946 """%(' '.join(ipcontroller_cmd_argv)))
914 947
915 948 def start(self, cluster_dir):
@@ -920,11 +953,12 b' class PBSControllerLauncher(PBSLauncher):'
920 953
921 954 class PBSEngineSetLauncher(PBSLauncher):
922 955 """Launch Engines using PBS"""
923 batch_file_name = CUnicode(u'pbs_engines', config=True)
956 batch_file_name = CUnicode(u'pbs_engines', config=True,
957 help="batch file name for the engine(s) job.")
924 958 default_template= CUnicode(u"""#!/bin/sh
925 959 #PBS -V
926 960 #PBS -N ipengine
927 %s --cluster-dir $cluster_dir
961 %s cluster_dir $cluster_dir
928 962 """%(' '.join(ipengine_cmd_argv)))
929 963
930 964 def start(self, n, cluster_dir):
@@ -944,11 +978,12 b' class SGELauncher(PBSLauncher):'
944 978 class SGEControllerLauncher(SGELauncher):
945 979 """Launch a controller using SGE."""
946 980
947 batch_file_name = CUnicode(u'sge_controller', config=True)
981 batch_file_name = CUnicode(u'sge_controller', config=True,
982 help="batch file name for the ipontroller job.")
948 983 default_template= CUnicode(u"""#$$ -V
949 984 #$$ -S /bin/sh
950 985 #$$ -N ipcontroller
951 %s --log-to-file --cluster-dir $cluster_dir
986 %s --log-to-file cluster_dir=$cluster_dir
952 987 """%(' '.join(ipcontroller_cmd_argv)))
953 988
954 989 def start(self, cluster_dir):
@@ -958,11 +993,12 b' class SGEControllerLauncher(SGELauncher):'
958 993
959 994 class SGEEngineSetLauncher(SGELauncher):
960 995 """Launch Engines with SGE"""
961 batch_file_name = CUnicode(u'sge_engines', config=True)
996 batch_file_name = CUnicode(u'sge_engines', config=True,
997 help="batch file name for the engine(s) job.")
962 998 default_template = CUnicode("""#$$ -V
963 999 #$$ -S /bin/sh
964 1000 #$$ -N ipengine
965 %s --cluster-dir $cluster_dir
1001 %s cluster_dir=$cluster_dir
966 1002 """%(' '.join(ipengine_cmd_argv)))
967 1003
968 1004 def start(self, n, cluster_dir):
@@ -979,18 +1015,56 b' class SGEEngineSetLauncher(SGELauncher):'
979 1015 class IPClusterLauncher(LocalProcessLauncher):
980 1016 """Launch the ipcluster program in an external process."""
981 1017
982 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
983 # Command line arguments to pass to ipcluster.
1018 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1019 help="Popen command for ipcluster")
984 1020 ipcluster_args = List(
985 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
1021 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1022 help="Command line arguments to pass to ipcluster.")
986 1023 ipcluster_subcommand = Str('start')
987 1024 ipcluster_n = Int(2)
988 1025
989 1026 def find_args(self):
990 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
991 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
1027 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1028 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
992 1029
993 1030 def start(self):
994 1031 self.log.info("Starting ipcluster: %r" % self.args)
995 1032 return super(IPClusterLauncher, self).start()
996 1033
1034 #-----------------------------------------------------------------------------
1035 # Collections of launchers
1036 #-----------------------------------------------------------------------------
1037
1038 local_launchers = [
1039 LocalControllerLauncher,
1040 LocalEngineLauncher,
1041 LocalEngineSetLauncher,
1042 ]
1043 mpi_launchers = [
1044 MPIExecLauncher,
1045 MPIExecControllerLauncher,
1046 MPIExecEngineSetLauncher,
1047 ]
1048 ssh_launchers = [
1049 SSHLauncher,
1050 SSHControllerLauncher,
1051 SSHEngineLauncher,
1052 SSHEngineSetLauncher,
1053 ]
1054 winhpc_launchers = [
1055 WindowsHPCLauncher,
1056 WindowsHPCControllerLauncher,
1057 WindowsHPCEngineSetLauncher,
1058 ]
1059 pbs_launchers = [
1060 PBSLauncher,
1061 PBSControllerLauncher,
1062 PBSEngineSetLauncher,
1063 ]
1064 sge_launchers = [
1065 SGELauncher,
1066 SGEControllerLauncher,
1067 SGEEngineSetLauncher,
1068 ]
1069 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1070 + pbs_launchers + sge_launchers No newline at end of file
@@ -296,7 +296,7 b' class Client(HasTraits):'
296 296 if username is None:
297 297 self.session = ss.StreamSession(**key_arg)
298 298 else:
299 self.session = ss.StreamSession(username, **key_arg)
299 self.session = ss.StreamSession(username=username, **key_arg)
300 300 self._query_socket = self._context.socket(zmq.XREQ)
301 301 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
302 302 if self._ssh:
@@ -18,7 +18,7 b' import zmq'
18 18 from zmq.devices import ProcessDevice, ThreadDevice
19 19 from zmq.eventloop import ioloop, zmqstream
20 20
21 from IPython.utils.traitlets import Set, Instance, CFloat, Bool
21 from IPython.utils.traitlets import Set, Instance, CFloat, Bool, CStr
22 22 from IPython.parallel.factory import LoggingFactory
23 23
24 24 class Heart(object):
@@ -53,14 +53,16 b' class HeartMonitor(LoggingFactory):'
53 53 pongstream: an XREP stream
54 54 period: the period of the heartbeat in milliseconds"""
55 55
56 period=CFloat(1000, config=True) # in milliseconds
56 period=CFloat(1000, config=True,
57 help='The frequency at which the Hub pings the engines for heartbeats '
58 ' (in ms) [default: 100]',
59 )
57 60
58 61 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
59 62 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
60 63 loop = Instance('zmq.eventloop.ioloop.IOLoop')
61 64 def _loop_default(self):
62 65 return ioloop.IOLoop.instance()
63 debug=Bool(False)
64 66
65 67 # not settable:
66 68 hearts=Set()
@@ -25,7 +25,9 b' from zmq.eventloop.zmqstream import ZMQStream'
25 25
26 26 # internal:
27 27 from IPython.utils.importstring import import_item
28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
28 from IPython.utils.traitlets import (
29 HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool, Tuple
30 )
29 31
30 32 from IPython.parallel import error, util
31 33 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
@@ -112,59 +114,71 b' class EngineConnector(HasTraits):'
112 114 class HubFactory(RegistrationFactory):
113 115 """The Configurable for setting up a Hub."""
114 116
115 # name of a scheduler scheme
116 scheme = Str('leastload', config=True)
117
118 117 # port-pairs for monitoredqueues:
119 hb = Instance(list, config=True)
118 hb = Tuple(Int,Int,config=True,
119 help="""XREQ/SUB Port pair for Engine heartbeats""")
120 120 def _hb_default(self):
121 return util.select_random_ports(2)
121 return tuple(util.select_random_ports(2))
122
123 mux = Tuple(Int,Int,config=True,
124 help="""Engine/Client Port pair for MUX queue""")
122 125
123 mux = Instance(list, config=True)
124 126 def _mux_default(self):
125 return util.select_random_ports(2)
127 return tuple(util.select_random_ports(2))
126 128
127 task = Instance(list, config=True)
129 task = Tuple(Int,Int,config=True,
130 help="""Engine/Client Port pair for Task queue""")
128 131 def _task_default(self):
129 return util.select_random_ports(2)
132 return tuple(util.select_random_ports(2))
133
134 control = Tuple(Int,Int,config=True,
135 help="""Engine/Client Port pair for Control queue""")
130 136
131 control = Instance(list, config=True)
132 137 def _control_default(self):
133 return util.select_random_ports(2)
138 return tuple(util.select_random_ports(2))
139
140 iopub = Tuple(Int,Int,config=True,
141 help="""Engine/Client Port pair for IOPub relay""")
134 142
135 iopub = Instance(list, config=True)
136 143 def _iopub_default(self):
137 return util.select_random_ports(2)
144 return tuple(util.select_random_ports(2))
138 145
139 146 # single ports:
140 mon_port = Instance(int, config=True)
147 mon_port = Int(config=True,
148 help="""Monitor (SUB) port for queue traffic""")
149
141 150 def _mon_port_default(self):
142 151 return util.select_random_ports(1)[0]
143 152
144 notifier_port = Instance(int, config=True)
153 notifier_port = Int(config=True,
154 help="""PUB port for sending engine status notifications""")
155
145 156 def _notifier_port_default(self):
146 157 return util.select_random_ports(1)[0]
147 158
148 ping = Int(1000, config=True) # ping frequency
159 engine_ip = CStr('127.0.0.1', config=True,
160 help="IP on which to listen for engine connections. [default: loopback]")
161 engine_transport = CStr('tcp', config=True,
162 help="0MQ transport for engine connections. [default: tcp]")
149 163
150 engine_ip = CStr('127.0.0.1', config=True)
151 engine_transport = CStr('tcp', config=True)
164 client_ip = CStr('127.0.0.1', config=True,
165 help="IP on which to listen for client connections. [default: loopback]")
166 client_transport = CStr('tcp', config=True,
167 help="0MQ transport for client connections. [default : tcp]")
152 168
153 client_ip = CStr('127.0.0.1', config=True)
154 client_transport = CStr('tcp', config=True)
155
156 monitor_ip = CStr('127.0.0.1', config=True)
157 monitor_transport = CStr('tcp', config=True)
169 monitor_ip = CStr('127.0.0.1', config=True,
170 help="IP on which to listen for monitor messages. [default: loopback]")
171 monitor_transport = CStr('tcp', config=True,
172 help="0MQ transport for monitor messages. [default : tcp]")
158 173
159 174 monitor_url = CStr('')
160 175
161 db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True)
176 db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True,
177 help="""The class to use for the DB backend""")
162 178
163 179 # not configurable
164 180 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
165 181 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
166 subconstructors = List()
167 _constructed = Bool(False)
168 182
169 183 def _ip_changed(self, name, old, new):
170 184 self.engine_ip = new
@@ -186,24 +200,17 b' class HubFactory(RegistrationFactory):'
186 200 self._update_monitor_url()
187 201 # self.on_trait_change(self._sync_ips, 'ip')
188 202 # self.on_trait_change(self._sync_transports, 'transport')
189 self.subconstructors.append(self.construct_hub)
203 # self.subconstructors.append(self.construct_hub)
190 204
191 205
192 206 def construct(self):
193 assert not self._constructed, "already constructed!"
194
195 for subc in self.subconstructors:
196 subc()
197
198 self._constructed = True
199
207 self.init_hub()
200 208
201 209 def start(self):
202 assert self._constructed, "must be constructed by self.construct() first!"
203 210 self.heartmonitor.start()
204 211 self.log.info("Heartmonitor started")
205 212
206 def construct_hub(self):
213 def init_hub(self):
207 214 """construct"""
208 215 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
209 216 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
@@ -227,7 +234,7 b' class HubFactory(RegistrationFactory):'
227 234 hrep = ctx.socket(zmq.XREP)
228 235 hrep.bind(engine_iface % self.hb[1])
229 236 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
230 period=self.ping, logname=self.log.name)
237 config=self.config)
231 238
232 239 ### Client connections ###
233 240 # Notifier socket
@@ -248,7 +255,11 b' class HubFactory(RegistrationFactory):'
248 255 # cdir = self.config.Global.cluster_dir
249 256 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
250 257 time.sleep(.25)
251
258 try:
259 scheme = self.config.TaskScheduler.scheme_name
260 except AttributeError:
261 from .scheduler import TaskScheduler
262 scheme = TaskScheduler.scheme_name.get_default_value()
252 263 # build connection dicts
253 264 self.engine_info = {
254 265 'control' : engine_iface%self.control[1],
@@ -262,7 +273,7 b' class HubFactory(RegistrationFactory):'
262 273 self.client_info = {
263 274 'control' : client_iface%self.control[0],
264 275 'mux': client_iface%self.mux[0],
265 'task' : (self.scheme, client_iface%self.task[0]),
276 'task' : (scheme, client_iface%self.task[0]),
266 277 'iopub' : client_iface%self.iopub[0],
267 278 'notification': client_iface%self.notifier_port
268 279 }
@@ -20,9 +20,20 b' from .dictdb import BaseDB'
20 20 class MongoDB(BaseDB):
21 21 """MongoDB TaskRecord backend."""
22 22
23 connection_args = List(config=True) # args passed to pymongo.Connection
24 connection_kwargs = Dict(config=True) # kwargs passed to pymongo.Connection
25 database = CUnicode(config=True) # name of the mongodb database
23 connection_args = List(config=True,
24 help="""Positional arguments to be passed to pymongo.Connection. Only
25 necessary if the default mongodb configuration does not point to your
26 mongod instance.""")
27 connection_kwargs = Dict(config=True,
28 help="""Keyword arguments to be passed to pymongo.Connection. Only
29 necessary if the default mongodb configuration does not point to your
30 mongod instance."""
31 )
32 database = CUnicode(config=True,
33 help="""The MongoDB database name to use for storing tasks for this session. If unspecified,
34 a new database will be created with the Hub's IDENT. Specifying the database will result
35 in tasks from previous sessions being available via Clients' db_query and
36 get_result methods.""")
26 37
27 38 _connection = Instance(Connection) # pymongo connection
28 39
@@ -35,7 +35,7 b' from zmq.eventloop import ioloop, zmqstream'
35 35 # local imports
36 36 from IPython.external.decorator import decorator
37 37 from IPython.config.loader import Config
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Str, Enum
39 39
40 40 from IPython.parallel import error
41 41 from IPython.parallel.factory import SessionFactory
@@ -126,7 +126,19 b' class TaskScheduler(SessionFactory):'
126 126
127 127 """
128 128
129 hwm = Int(0, config=True) # limit number of outstanding tasks
129 hwm = Int(0, config=True, shortname='hwm',
130 help="""specify the High Water Mark (HWM) for the downstream
131 socket in the Task scheduler. This is the maximum number
132 of allowed outstanding tasks on each engine."""
133 )
134 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
135 'leastload', config=True, shortname='scheme', allow_none=False,
136 help="""select the task scheduler scheme [default: Python LRU]
137 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
138 )
139 def _scheme_name_changed(self, old, new):
140 self.log.debug("Using scheme %r"%new)
141 self.scheme = globals()[new]
130 142
131 143 # input arguments:
132 144 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
@@ -622,7 +634,7 b' class TaskScheduler(SessionFactory):'
622 634
623 635
624 636 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
625 log_addr=None, loglevel=logging.DEBUG, scheme='lru',
637 log_addr=None, loglevel=logging.DEBUG,
626 638 identity=b'task'):
627 639 from zmq.eventloop import ioloop
628 640 from zmq.eventloop.zmqstream import ZMQStream
@@ -646,7 +658,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='
646 658 nots.setsockopt(zmq.SUBSCRIBE, '')
647 659 nots.connect(not_addr)
648 660
649 scheme = globals().get(scheme, None)
661 # scheme = globals().get(scheme, None)
650 662 # setup logging
651 663 if log_addr:
652 664 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
@@ -655,7 +667,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='
655 667
656 668 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
657 669 mon_stream=mons, notifier_stream=nots,
658 scheme=scheme, loop=loop, logname=logname,
670 loop=loop, logname=logname,
659 671 config=config)
660 672 scheduler.start()
661 673 try:
@@ -83,9 +83,16 b' def _convert_bufs(bs):'
83 83 class SQLiteDB(BaseDB):
84 84 """SQLite3 TaskRecord backend."""
85 85
86 filename = CUnicode('tasks.db', config=True)
87 location = CUnicode('', config=True)
88 table = CUnicode("", config=True)
86 filename = CUnicode('tasks.db', config=True,
87 help="""The filename of the sqlite task database. [default: 'tasks.db']""")
88 location = CUnicode('', config=True,
89 help="""The directory containing the sqlite task database. The default
90 is to use the cluster_dir location.""")
91 table = CUnicode("", config=True,
92 help="""The SQLite Table to use for storing tasks for this session. If unspecified,
93 a new table will be created with the Hub's IDENT. Specifying the table will result
94 in tasks from previous sessions being available via Clients' db_query and
95 get_result methods.""")
89 96
90 97 _db = Instance('sqlite3.Connection')
91 98 _keys = List(['msg_id' ,
@@ -33,13 +33,22 b' class EngineFactory(RegistrationFactory):'
33 33 """IPython engine"""
34 34
35 35 # configurables:
36 user_ns=Dict(config=True)
37 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
38 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
39 location=Str(config=True)
40 timeout=CFloat(2,config=True)
36 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
37 help="""The OutStream for handling stdout/err.
38 Typically 'IPython.zmq.iostream.OutStream'""")
39 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True,
40 help="""The class for handling displayhook.
41 Typically 'IPython.zmq.displayhook.DisplayHook'""")
42 location=Str(config=True,
43 help="""The location (an IP address) of the controller. This is
44 used for disambiguating URLs, to determine whether
45 loopback should be used to connect or the public address.""")
46 timeout=CFloat(2,config=True,
47 help="""The time (in seconds) to wait for the Controller to respond
48 to registration requests before giving up.""")
41 49
42 50 # not configurable:
51 user_ns=Dict()
43 52 id=Int(allow_none=True)
44 53 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
45 54 kernel=Instance(Kernel)
@@ -47,6 +56,7 b' class EngineFactory(RegistrationFactory):'
47 56
48 57 def __init__(self, **kwargs):
49 58 super(EngineFactory, self).__init__(**kwargs)
59 self.ident = self.session.session
50 60 ctx = self.context
51 61
52 62 reg = ctx.socket(zmq.XREQ)
@@ -127,7 +137,7 b' class EngineFactory(RegistrationFactory):'
127 137
128 138 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
129 139 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
130 loop=loop, user_ns = self.user_ns, logname=self.log.name)
140 loop=loop, user_ns = self.user_ns, log=self.log)
131 141 self.kernel.start()
132 142 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
133 143 heart = Heart(*map(str, hb_addrs), heart_id=identity)
@@ -143,7 +153,7 b' class EngineFactory(RegistrationFactory):'
143 153
144 154
145 155 def abort(self):
146 self.log.fatal("Registration timed out")
156 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
147 157 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
148 158 time.sleep(1)
149 159 sys.exit(255)
@@ -28,7 +28,7 b' import zmq'
28 28 from zmq.eventloop import ioloop, zmqstream
29 29
30 30 # Local imports.
31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str
31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str, CStr
32 32 from IPython.zmq.completer import KernelCompleter
33 33
34 34 from IPython.parallel.error import wrap_exception
@@ -64,9 +64,11 b' class Kernel(SessionFactory):'
64 64 #---------------------------------------------------------------------------
65 65
66 66 # kwargs:
67 int_id = Int(-1, config=True)
68 user_ns = Dict(config=True)
69 exec_lines = List(config=True)
67 exec_lines = List(CStr, config=True,
68 help="List of lines to execute")
69
70 int_id = Int(-1)
71 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
70 72
71 73 control_stream = Instance(zmqstream.ZMQStream)
72 74 task_stream = Instance(zmqstream.ZMQStream)
@@ -14,12 +14,10 b''
14 14
15 15 import logging
16 16 import os
17 import uuid
18 17
19 18 from zmq.eventloop.ioloop import IOLoop
20 19
21 20 from IPython.config.configurable import Configurable
22 from IPython.utils.importstring import import_item
23 21 from IPython.utils.traitlets import Str,Int,Instance, CUnicode, CStr
24 22
25 23 import IPython.parallel.streamsession as ss
@@ -39,13 +37,6 b' class LoggingFactory(Configurable):'
39 37 class SessionFactory(LoggingFactory):
40 38 """The Base factory from which every factory in IPython.parallel inherits"""
41 39
42 packer = Str('',config=True)
43 unpacker = Str('',config=True)
44 ident = CStr('',config=True)
45 def _ident_default(self):
46 return str(uuid.uuid4())
47 username = CUnicode(os.environ.get('USER','username'),config=True)
48 exec_key = CUnicode('',config=True)
49 40 # not configurable:
50 41 context = Instance('zmq.Context', (), {})
51 42 session = Instance('IPython.parallel.streamsession.StreamSession')
@@ -56,33 +47,28 b' class SessionFactory(LoggingFactory):'
56 47
57 48 def __init__(self, **kwargs):
58 49 super(SessionFactory, self).__init__(**kwargs)
59 exec_key = self.exec_key or None
60 # set the packers:
61 if not self.packer:
62 packer_f = unpacker_f = None
63 elif self.packer.lower() == 'json':
64 packer_f = ss.json_packer
65 unpacker_f = ss.json_unpacker
66 elif self.packer.lower() == 'pickle':
67 packer_f = ss.pickle_packer
68 unpacker_f = ss.pickle_unpacker
69 else:
70 packer_f = import_item(self.packer)
71 unpacker_f = import_item(self.unpacker)
72 50
73 51 # construct the session
74 self.session = ss.StreamSession(self.username, self.ident, packer=packer_f, unpacker=unpacker_f, key=exec_key)
52 self.session = ss.StreamSession(**kwargs)
75 53
76 54
77 55 class RegistrationFactory(SessionFactory):
78 56 """The Base Configurable for objects that involve registration."""
79 57
80 url = Str('', config=True) # url takes precedence over ip,regport,transport
81 transport = Str('tcp', config=True)
82 ip = Str('127.0.0.1', config=True)
83 regport = Instance(int, config=True)
58 url = Str('', config=True,
59 help="""The 0MQ url used for registration. This sets transport, ip, and port
60 in one variable. For example: url='tcp://127.0.0.1:12345' or
61 url='epgm://*:90210'""") # url takes precedence over ip,regport,transport
62 transport = Str('tcp', config=True,
63 help="""The 0MQ transport for communications. This will likely be
64 the default of 'tcp', but other values include 'ipc', 'epgm', 'inproc'.""")
65 ip = Str('127.0.0.1', config=True,
66 help="""The IP address for registration. This is generally either
67 '127.0.0.1' for loopback only or '*' for all interfaces.
68 [default: '127.0.0.1']""")
69 regport = Int(config=True,
70 help="""The port on which the Hub listens for registration.""")
84 71 def _regport_default(self):
85 # return 10101
86 72 return select_random_ports(1)[0]
87 73
88 74 def __init__(self, **kwargs):
@@ -107,46 +93,3 b' class RegistrationFactory(SessionFactory):'
107 93 self.ip = iface[0]
108 94 if iface[1]:
109 95 self.regport = int(iface[1])
110
111 #-----------------------------------------------------------------------------
112 # argparse argument extenders
113 #-----------------------------------------------------------------------------
114
115
116 def add_session_arguments(parser):
117 paa = parser.add_argument
118 paa('--ident',
119 type=str, dest='SessionFactory.ident',
120 help='set the ZMQ and session identity [default: random uuid]',
121 metavar='identity')
122 # paa('--execkey',
123 # type=str, dest='SessionFactory.exec_key',
124 # help='path to a file containing an execution key.',
125 # metavar='execkey')
126 paa('--packer',
127 type=str, dest='SessionFactory.packer',
128 help='method to serialize messages: {json,pickle} [default: json]',
129 metavar='packer')
130 paa('--unpacker',
131 type=str, dest='SessionFactory.unpacker',
132 help='inverse function of `packer`. Only necessary when using something other than json|pickle',
133 metavar='packer')
134
135 def add_registration_arguments(parser):
136 paa = parser.add_argument
137 paa('--ip',
138 type=str, dest='RegistrationFactory.ip',
139 help="The IP used for registration [default: localhost]",
140 metavar='ip')
141 paa('--transport',
142 type=str, dest='RegistrationFactory.transport',
143 help="The ZeroMQ transport used for registration [default: tcp]",
144 metavar='transport')
145 paa('--url',
146 type=str, dest='RegistrationFactory.url',
147 help='set transport,ip,regport in one go, e.g. tcp://127.0.0.1:10101',
148 metavar='url')
149 paa('--regport',
150 type=int, dest='RegistrationFactory.regport',
151 help="The port used for registration [default: 10101]",
152 metavar='ip')
@@ -25,8 +25,13 b' import zmq'
25 25 from zmq.utils import jsonapi
26 26 from zmq.eventloop.zmqstream import ZMQStream
27 27
28 from IPython.config.configurable import Configurable
29 from IPython.utils.importstring import import_item
30 from IPython.utils.traitlets import Str, CStr, CUnicode, Bool, Any
31
28 32 from .util import ISO8601
29 33
34
30 35 def squash_unicode(obj):
31 36 """coerce unicode back to bytestrings."""
32 37 if isinstance(obj,dict):
@@ -113,50 +118,72 b' def extract_header(msg_or_header):'
113 118 h = dict(h)
114 119 return h
115 120
116 class StreamSession(object):
121 class StreamSession(Configurable):
117 122 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
118 debug=False
119 key=None
120
121 def __init__(self, username=None, session=None, packer=None, unpacker=None, key=None, keyfile=None):
122 if username is None:
123 username = os.environ.get('USER','username')
124 self.username = username
125 if session is None:
126 self.session = str(uuid.uuid4())
123 debug=Bool(False, config=True, help="""Debug output in the StreamSession""")
124 packer = Str('json',config=True,
125 help="""The name of the packer for serializing messages.
126 Should be one of 'json', 'pickle', or an import name
127 for a custom serializer.""")
128 def _packer_changed(self, name, old, new):
129 if new.lower() == 'json':
130 self.pack = json_packer
131 self.unpack = json_unpacker
132 elif new.lower() == 'pickle':
133 self.pack = pickle_packer
134 self.unpack = pickle_unpacker
127 135 else:
128 self.session = session
129 self.msg_id = str(uuid.uuid4())
130 if packer is None:
131 self.pack = default_packer
136 self.pack = import_item(new)
137
138 unpacker = Str('json',config=True,
139 help="""The name of the unpacker for unserializing messages.
140 Only used with custom functions for `packer`.""")
141 def _unpacker_changed(self, name, old, new):
142 if new.lower() == 'json':
143 self.pack = json_packer
144 self.unpack = json_unpacker
145 elif new.lower() == 'pickle':
146 self.pack = pickle_packer
147 self.unpack = pickle_unpacker
132 148 else:
133 if not callable(packer):
134 raise TypeError("packer must be callable, not %s"%type(packer))
135 self.pack = packer
149 self.unpack = import_item(new)
136 150
137 if unpacker is None:
138 self.unpack = default_unpacker
139 else:
140 if not callable(unpacker):
141 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
142 self.unpack = unpacker
151 session = CStr('',config=True,
152 help="""The UUID identifying this session.""")
153 def _session_default(self):
154 return str(uuid.uuid4())
155 username = CUnicode(os.environ.get('USER','username'),config=True,
156 help="""Username for the Session. Default is your system username.""")
157 key = CStr('', config=True,
158 help="""execution key, for extra authentication.""")
159
160 keyfile = CUnicode('', config=True,
161 help="""path to file containing execution key.""")
162 def _keyfile_changed(self, name, old, new):
163 with open(new, 'rb') as f:
164 self.key = f.read().strip()
165
166 pack = Any(default_packer) # the actual packer function
167 def _pack_changed(self, name, old, new):
168 if not callable(new):
169 raise TypeError("packer must be callable, not %s"%type(new))
143 170
144 if key is not None and keyfile is not None:
145 raise TypeError("Must specify key OR keyfile, not both")
146 if keyfile is not None:
147 with open(keyfile) as f:
148 self.key = f.read().strip()
149 else:
150 self.key = key
151 if isinstance(self.key, unicode):
152 self.key = self.key.encode('utf8')
153 # print key, keyfile, self.key
171 unpack = Any(default_unpacker) # the actual packer function
172 def _unpack_changed(self, name, old, new):
173 if not callable(new):
174 raise TypeError("packer must be callable, not %s"%type(new))
175
176 def __init__(self, **kwargs):
177 super(StreamSession, self).__init__(**kwargs)
154 178 self.none = self.pack({})
155
179
180 @property
181 def msg_id(self):
182 """always return new uuid"""
183 return str(uuid.uuid4())
184
156 185 def msg_header(self, msg_type):
157 h = msg_header(self.msg_id, msg_type, self.username, self.session)
158 self.msg_id = str(uuid.uuid4())
159 return h
186 return msg_header(self.msg_id, msg_type, self.username, self.session)
160 187
161 188 def msg(self, msg_type, content=None, parent=None, subheader=None):
162 189 msg = {}
@@ -171,10 +198,10 b' class StreamSession(object):'
171 198
172 199 def check_key(self, msg_or_header):
173 200 """Check that a message's header has the right key"""
174 if self.key is None:
201 if not self.key:
175 202 return True
176 203 header = extract_header(msg_or_header)
177 return header.get('key', None) == self.key
204 return header.get('key', '') == self.key
178 205
179 206
180 207 def serialize(self, msg, ident=None):
@@ -200,7 +227,7 b' class StreamSession(object):'
200 227 elif ident is not None:
201 228 to_send.append(ident)
202 229 to_send.append(DELIM)
203 if self.key is not None:
230 if self.key:
204 231 to_send.append(self.key)
205 232 to_send.append(self.pack(msg['header']))
206 233 to_send.append(self.pack(msg['parent_header']))
@@ -297,7 +324,7 b' class StreamSession(object):'
297 324 if ident is not None:
298 325 to_send.extend(ident)
299 326 to_send.append(DELIM)
300 if self.key is not None:
327 if self.key:
301 328 to_send.append(self.key)
302 329 to_send.extend(msg)
303 330 stream.send_multipart(msg, flags, copy=copy)
@@ -345,7 +372,7 b' class StreamSession(object):'
345 372 msg will be a list of bytes or Messages, unchanged from input
346 373 msg should be unpackable via self.unpack_message at this point.
347 374 """
348 ikey = int(self.key is not None)
375 ikey = int(self.key != '')
349 376 minlen = 3 + ikey
350 377 msg = list(msg)
351 378 idents = []
@@ -379,7 +406,7 b' class StreamSession(object):'
379 406 or the non-copying Message object in each place (False)
380 407
381 408 """
382 ikey = int(self.key is not None)
409 ikey = int(self.key != '')
383 410 minlen = 3 + ikey
384 411 message = {}
385 412 if not copy:
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now