##// END OF EJS Templates
update parallel apps to use ProfileDir
MinRK -
Show More
@@ -20,243 +20,31 b' from __future__ import with_statement'
20 20 import os
21 21 import logging
22 22 import re
23 import shutil
24 23 import sys
25 24
26 25 from subprocess import Popen, PIPE
27 26
28 from IPython.config.loader import PyFileConfigLoader, Config
29 from IPython.config.configurable import Configurable
30 from IPython.config.application import Application
31 from IPython.core.crashhandler import CrashHandler
32 from IPython.core.newapplication import BaseIPythonApplication
27 from IPython.config.loader import Config
33 28 from IPython.core import release
34 from IPython.utils.path import (
35 get_ipython_package_dir,
36 get_ipython_dir,
37 expand_path
29 from IPython.core.crashhandler import CrashHandler
30 from IPython.core.newapplication import (
31 BaseIPythonApplication,
32 base_aliases as base_ip_aliases,
33 base_flags as base_ip_flags
38 34 )
35 from IPython.utils.path import expand_path
36
39 37 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
40 38
41 39 #-----------------------------------------------------------------------------
42 40 # Module errors
43 41 #-----------------------------------------------------------------------------
44 42
45 class ClusterDirError(Exception):
46 pass
47
48
49 43 class PIDFileError(Exception):
50 44 pass
51 45
52 46
53 47 #-----------------------------------------------------------------------------
54 # Class for managing cluster directories
55 #-----------------------------------------------------------------------------
56
57 class ClusterDir(Configurable):
58 """An object to manage the cluster directory and its resources.
59
60 The cluster directory is used by :command:`ipengine`,
61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
62 configuration, logging and security of these applications.
63
64 This object knows how to find, create and manage these directories. This
65 should be used by any code that want's to handle cluster directories.
66 """
67
68 security_dir_name = Unicode('security')
69 log_dir_name = Unicode('log')
70 pid_dir_name = Unicode('pid')
71 security_dir = Unicode(u'')
72 log_dir = Unicode(u'')
73 pid_dir = Unicode(u'')
74
75 auto_create = Bool(False,
76 help="""Whether to automatically create the ClusterDirectory if it does
77 not exist""")
78 overwrite = Bool(False,
79 help="""Whether to overwrite existing config files""")
80 location = Unicode(u'', config=True,
81 help="""Set the cluster dir. This overrides the logic used by the
82 `profile` option.""",
83 )
84 profile = Unicode(u'default', config=True,
85 help="""The string name of the profile to be used. This determines the name
86 of the cluster dir as: cluster_<profile>. The default profile is named
87 'default'. The cluster directory is resolve this way if the
88 `cluster_dir` option is not used."""
89 )
90
91 _location_isset = Bool(False) # flag for detecting multiply set location
92 _new_dir = Bool(False) # flag for whether a new dir was created
93
94 def __init__(self, **kwargs):
95 # make sure auto_create,overwrite are set *before* location
96 for name in ('auto_create', 'overwrite'):
97 v = kwargs.pop(name, None)
98 if v is not None:
99 setattr(self, name, v)
100 super(ClusterDir, self).__init__(**kwargs)
101 if not self.location:
102 self._profile_changed('profile', 'default', self.profile)
103
104 def _location_changed(self, name, old, new):
105 if self._location_isset:
106 raise RuntimeError("Cannot set ClusterDir more than once.")
107 self._location_isset = True
108 if not os.path.isdir(new):
109 if self.auto_create:# or self.config.ClusterDir.auto_create:
110 os.makedirs(new)
111 self._new_dir = True
112 else:
113 raise ClusterDirError('Directory not found: %s' % new)
114
115 # ensure config files exist:
116 self.copy_all_config_files(overwrite=self.overwrite)
117 self.security_dir = os.path.join(new, self.security_dir_name)
118 self.log_dir = os.path.join(new, self.log_dir_name)
119 self.pid_dir = os.path.join(new, self.pid_dir_name)
120 self.check_dirs()
121
122 def _profile_changed(self, name, old, new):
123 if self._location_isset:
124 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
125 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
126
127 def _log_dir_changed(self, name, old, new):
128 self.check_log_dir()
129
130 def check_log_dir(self):
131 if not os.path.isdir(self.log_dir):
132 os.mkdir(self.log_dir)
133
134 def _security_dir_changed(self, name, old, new):
135 self.check_security_dir()
136
137 def check_security_dir(self):
138 if not os.path.isdir(self.security_dir):
139 os.mkdir(self.security_dir, 0700)
140 os.chmod(self.security_dir, 0700)
141
142 def _pid_dir_changed(self, name, old, new):
143 self.check_pid_dir()
144
145 def check_pid_dir(self):
146 if not os.path.isdir(self.pid_dir):
147 os.mkdir(self.pid_dir, 0700)
148 os.chmod(self.pid_dir, 0700)
149
150 def check_dirs(self):
151 self.check_security_dir()
152 self.check_log_dir()
153 self.check_pid_dir()
154
155 def copy_config_file(self, config_file, path=None, overwrite=False):
156 """Copy a default config file into the active cluster directory.
157
158 Default configuration files are kept in :mod:`IPython.config.default`.
159 This function moves these from that location to the working cluster
160 directory.
161 """
162 if path is None:
163 import IPython.config.default
164 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
165 path = os.path.sep.join(path)
166 src = os.path.join(path, config_file)
167 dst = os.path.join(self.location, config_file)
168 if not os.path.isfile(dst) or overwrite:
169 shutil.copy(src, dst)
170
171 def copy_all_config_files(self, path=None, overwrite=False):
172 """Copy all config files into the active cluster directory."""
173 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
174 u'ipcluster_config.py']:
175 self.copy_config_file(f, path=path, overwrite=overwrite)
176
177 @classmethod
178 def create_cluster_dir(csl, cluster_dir):
179 """Create a new cluster directory given a full path.
180
181 Parameters
182 ----------
183 cluster_dir : str
184 The full path to the cluster directory. If it does exist, it will
185 be used. If not, it will be created.
186 """
187 return ClusterDir(location=cluster_dir)
188
189 @classmethod
190 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
191 """Create a cluster dir by profile name and path.
192
193 Parameters
194 ----------
195 path : str
196 The path (directory) to put the cluster directory in.
197 profile : str
198 The name of the profile. The name of the cluster directory will
199 be "cluster_<profile>".
200 """
201 if not os.path.isdir(path):
202 raise ClusterDirError('Directory not found: %s' % path)
203 cluster_dir = os.path.join(path, u'cluster_' + profile)
204 return ClusterDir(location=cluster_dir)
205
206 @classmethod
207 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
208 """Find an existing cluster dir by profile name, return its ClusterDir.
209
210 This searches through a sequence of paths for a cluster dir. If it
211 is not found, a :class:`ClusterDirError` exception will be raised.
212
213 The search path algorithm is:
214 1. ``os.getcwd()``
215 2. ``ipython_dir``
216 3. The directories found in the ":" separated
217 :env:`IPCLUSTER_DIR_PATH` environment variable.
218
219 Parameters
220 ----------
221 ipython_dir : unicode or str
222 The IPython directory to use.
223 profile : unicode or str
224 The name of the profile. The name of the cluster directory
225 will be "cluster_<profile>".
226 """
227 dirname = u'cluster_' + profile
228 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
229 if cluster_dir_paths:
230 cluster_dir_paths = cluster_dir_paths.split(':')
231 else:
232 cluster_dir_paths = []
233 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
234 for p in paths:
235 cluster_dir = os.path.join(p, dirname)
236 if os.path.isdir(cluster_dir):
237 return ClusterDir(location=cluster_dir)
238 else:
239 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
240
241 @classmethod
242 def find_cluster_dir(cls, cluster_dir):
243 """Find/create a cluster dir and return its ClusterDir.
244
245 This will create the cluster directory if it doesn't exist.
246
247 Parameters
248 ----------
249 cluster_dir : unicode or str
250 The path of the cluster directory. This is expanded using
251 :func:`IPython.utils.genutils.expand_path`.
252 """
253 cluster_dir = expand_path(cluster_dir)
254 if not os.path.isdir(cluster_dir):
255 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
256 return ClusterDir(location=cluster_dir)
257
258
259 #-----------------------------------------------------------------------------
260 48 # Crash handler for this application
261 49 #-----------------------------------------------------------------------------
262 50
@@ -283,7 +71,7 b' To ensure accurate tracking of this issue, please file a report about it at:'
283 71 $self.bug_tracker
284 72 """
285 73
286 class ClusterDirCrashHandler(CrashHandler):
74 class ParallelCrashHandler(CrashHandler):
287 75 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
288 76
289 77 message_template = _message_template
@@ -292,7 +80,7 b' class ClusterDirCrashHandler(CrashHandler):'
292 80 contact_name = release.authors['Min'][0]
293 81 contact_email = release.authors['Min'][1]
294 82 bug_tracker = 'http://github.com/ipython/ipython/issues'
295 super(ClusterDirCrashHandler,self).__init__(
83 super(ParallelCrashHandler,self).__init__(
296 84 app, contact_name, contact_email, bug_tracker
297 85 )
298 86
@@ -300,49 +88,35 b' class ClusterDirCrashHandler(CrashHandler):'
300 88 #-----------------------------------------------------------------------------
301 89 # Main application
302 90 #-----------------------------------------------------------------------------
303 base_aliases = {
304 'profile' : "ClusterDir.profile",
305 'cluster_dir' : 'ClusterDir.location',
306 'log_level' : 'ClusterApplication.log_level',
307 'work_dir' : 'ClusterApplication.work_dir',
308 'log_to_file' : 'ClusterApplication.log_to_file',
309 'clean_logs' : 'ClusterApplication.clean_logs',
310 'log_url' : 'ClusterApplication.log_url',
311 'config' : 'ClusterApplication.config_file',
312 }
91 base_aliases = {}
92 base_aliases.update(base_ip_aliases)
93 base_aliases.update({
94 'profile_dir' : 'ProfileDir.location',
95 'log_level' : 'BaseParallelApplication.log_level',
96 'work_dir' : 'BaseParallelApplication.work_dir',
97 'log_to_file' : 'BaseParallelApplication.log_to_file',
98 'clean_logs' : 'BaseParallelApplication.clean_logs',
99 'log_url' : 'BaseParallelApplication.log_url',
100 })
313 101
314 102 base_flags = {
315 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
316 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
317 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
103 'log-to-file' : ({'BaseParallelApplication' : Config({
104 'log_to_file' : True}),
105 }, "send log output to a file")
318 106 }
319 for k,v in base_flags.iteritems():
320 base_flags[k] = (Config(v[0]),v[1])
321
322 class ClusterApplication(BaseIPythonApplication):
323 """An application that puts everything into a cluster directory.
324
325 Instead of looking for things in the ipython_dir, this type of application
326 will use its own private directory called the "cluster directory"
327 for things like config files, log files, etc.
328
329 The cluster directory is resolved as follows:
107 base_flags.update(base_ip_flags)
330 108
331 * If the ``cluster_dir`` option is given, it is used.
332 * If ``cluster_dir`` is not given, the application directory is
333 resolve using the profile name as ``cluster_<profile>``. The search
334 path for this directory is then i) cwd if it is found there
335 and ii) in ipython_dir otherwise.
336
337 The config file for the application is to be put in the cluster
338 dir and named the value of the ``config_file_name`` class attribute.
109 class BaseParallelApplication(BaseIPythonApplication):
110 """The base Application for IPython.parallel apps
111
112 Principle extensions to BaseIPyythonApplication:
113
114 * work_dir
115 * remote logging via pyzmq
116 * IOLoop instance
339 117 """
340 118
341 crash_handler_class = ClusterDirCrashHandler
342 auto_create_cluster_dir = Bool(True, config=True,
343 help="whether to create the cluster_dir if it doesn't exist")
344 cluster_dir = Instance(ClusterDir)
345 classes = [ClusterDir]
119 crash_handler_class = ParallelCrashHandler
346 120
347 121 def _log_level_default(self):
348 122 # temporarily override default_log_level to INFO
@@ -363,21 +137,8 b' class ClusterApplication(BaseIPythonApplication):'
363 137 log_url = Unicode('', config=True,
364 138 help="The ZMQ URL of the iplogger to aggregate logging.")
365 139
366 config_file = Unicode(u'', config=True,
367 help="""Path to ip<appname> configuration file. The default is to use
368 <appname>_config.py, as found by cluster-dir."""
369 )
370 def _config_file_paths_default(self):
371 # don't include profile dir
372 return [ os.getcwdu(), self.ipython_dir ]
373
374 def _config_file_changed(self, name, old, new):
375 if os.pathsep in new:
376 path, new = new.rsplit(os.pathsep)
377 self.config_file_paths.insert(0, path)
378 self.config_file_name = new
379
380 config_file_name = Unicode('')
140 def _config_files_default(self):
141 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
381 142
382 143 loop = Instance('zmq.eventloop.ioloop.IOLoop')
383 144 def _loop_default(self):
@@ -386,54 +147,10 b' class ClusterApplication(BaseIPythonApplication):'
386 147
387 148 aliases = Dict(base_aliases)
388 149 flags = Dict(base_flags)
389
390 def init_clusterdir(self):
391 """This resolves the cluster directory.
392
393 This tries to find the cluster directory and if successful, it will
394 have done:
395 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
396 the application.
397 * Sets ``self.cluster_dir`` attribute of the application and config
398 objects.
399
400 The algorithm used for this is as follows:
401 1. Try ``Global.cluster_dir``.
402 2. Try using ``Global.profile``.
403 3. If both of these fail and ``self.auto_create_cluster_dir`` is
404 ``True``, then create the new cluster dir in the IPython directory.
405 4. If all fails, then raise :class:`ClusterDirError`.
406 """
407 try:
408 self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config)
409 except ClusterDirError as e:
410 self.log.fatal("Error initializing cluster dir: %s"%e)
411 self.log.fatal("A cluster dir must be created before running this command.")
412 self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more "
413 "information about creating and listing cluster dirs."
414 )
415 self.exit(1)
416
417 if self.cluster_dir._new_dir:
418 self.log.info('Creating new cluster dir: %s' % \
419 self.cluster_dir.location)
420 else:
421 self.log.info('Using existing cluster dir: %s' % \
422 self.cluster_dir.location)
423
424 # insert after cwd:
425 self.config_file_paths.insert(1, self.cluster_dir.location)
426 150
427 151 def initialize(self, argv=None):
428 152 """initialize the app"""
429 self.init_crash_handler()
430 self.parse_command_line(argv)
431 cl_config = self.config
432 self.init_clusterdir()
433 self.load_config_file()
434 # command-line should *override* config file, but command-line is necessary
435 # to determine clusterdir, etc.
436 self.update_config(cl_config)
153 super(BaseParallelApplication, self).initialize(argv)
437 154 self.to_work_dir()
438 155 self.reinit_logging()
439 156
@@ -447,7 +164,7 b' class ClusterApplication(BaseIPythonApplication):'
447 164
448 165 def reinit_logging(self):
449 166 # Remove old log files
450 log_dir = self.cluster_dir.log_dir
167 log_dir = self.profile_dir.log_dir
451 168 if self.clean_logs:
452 169 for f in os.listdir(log_dir):
453 170 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
@@ -472,7 +189,7 b' class ClusterApplication(BaseIPythonApplication):'
472 189 This must be called after pre_construct, which sets `self.pid_dir`.
473 190 This raises :exc:`PIDFileError` if the pid file exists already.
474 191 """
475 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
192 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
476 193 if os.path.isfile(pid_file):
477 194 pid = self.get_pid_from_file()
478 195 if not overwrite:
@@ -491,7 +208,7 b' class ClusterApplication(BaseIPythonApplication):'
491 208 :func:`reactor.addSystemEventTrigger`. This needs to return
492 209 ``None``.
493 210 """
494 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
211 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
495 212 if os.path.isfile(pid_file):
496 213 try:
497 214 self.log.info("Removing pid file: %s" % pid_file)
@@ -504,7 +221,7 b' class ClusterApplication(BaseIPythonApplication):'
504 221
505 222 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
506 223 """
507 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
224 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
508 225 if os.path.isfile(pid_file):
509 226 with open(pid_file, 'r') as f:
510 227 pid = int(f.read().strip())
@@ -27,12 +27,12 b' from zmq.eventloop import ioloop'
27 27
28 28 from IPython.config.application import Application, boolean_flag
29 29 from IPython.config.loader import Config
30 from IPython.core.newapplication import BaseIPythonApplication
30 from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
31 31 from IPython.utils.importstring import import_item
32 32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
33 33
34 34 from IPython.parallel.apps.clusterdir import (
35 ClusterApplication, ClusterDirError, ClusterDir,
35 BaseParallelApplication,
36 36 PIDFileError,
37 37 base_flags, base_aliases
38 38 )
@@ -86,7 +86,7 b' security related files and are named using the convention'
86 86 subcommand of 'ipcluster'. If your cluster directory is in
87 87 the cwd or the ipython directory, you can simply refer to it
88 88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
89 otherwise use the 'cluster_dir' option.
89 otherwise use the 'profile_dir' option.
90 90 """
91 91 stop_help = """Stop a running IPython cluster
92 92
@@ -95,7 +95,7 b' directory. Cluster directories are named using the convention'
95 95 'cluster_<profile>'. If your cluster directory is in
96 96 the cwd or the ipython directory, you can simply refer to it
97 97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
98 use the 'cluster_dir' option.
98 use the 'profile_dir' option.
99 99 """
100 100 engines_help = """Start engines connected to an existing IPython cluster
101 101
@@ -107,7 +107,7 b' security related files and are named using the convention'
107 107 subcommand of 'ipcluster'. If your cluster directory is in
108 108 the cwd or the ipython directory, you can simply refer to it
109 109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
110 otherwise use the 'cluster_dir' option.
110 otherwise use the 'profile_dir' option.
111 111 """
112 112 create_help = """Create an ipcluster profile by name
113 113
@@ -142,74 +142,63 b' class IPClusterList(BaseIPythonApplication):'
142 142 def _log_level_default(self):
143 143 return 20
144 144
145 def list_cluster_dirs(self):
145 def list_profile_dirs(self):
146 146 # Find the search paths
147 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
148 if cluster_dir_paths:
149 cluster_dir_paths = cluster_dir_paths.split(':')
147 profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
148 if profile_dir_paths:
149 profile_dir_paths = profile_dir_paths.split(':')
150 150 else:
151 cluster_dir_paths = []
151 profile_dir_paths = []
152 152
153 153 ipython_dir = self.ipython_dir
154 154
155 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
155 paths = [os.getcwd(), ipython_dir] + profile_dir_paths
156 156 paths = list(set(paths))
157 157
158 self.log.info('Searching for cluster dirs in paths: %r' % paths)
158 self.log.info('Searching for cluster profiles in paths: %r' % paths)
159 159 for path in paths:
160 160 files = os.listdir(path)
161 161 for f in files:
162 162 full_path = os.path.join(path, f)
163 if os.path.isdir(full_path) and f.startswith('cluster_'):
164 profile = full_path.split('_')[-1]
163 if os.path.isdir(full_path) and f.startswith('profile_') and \
164 os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
165 profile = f.split('_')[-1]
165 166 start_cmd = 'ipcluster start profile=%s n=4' % profile
166 167 print start_cmd + " ==> " + full_path
167 168
168 169 def start(self):
169 self.list_cluster_dirs()
170 self.list_profile_dirs()
171
172
173 # `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
170 174
171 175 create_flags = {}
172 176 create_flags.update(base_flags)
173 create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
177 create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
174 178 "reset config files to defaults", "leave existing config files"))
175 179
176 class IPClusterCreate(ClusterApplication):
177 name = u'ipcluster'
180 class IPClusterCreate(BaseParallelApplication):
181 name = u'ipcluster-create'
178 182 description = create_help
179 auto_create_cluster_dir = Bool(True,
180 help="whether to create the cluster_dir if it doesn't exist")
183 auto_create = Bool(True)
181 184 config_file_name = Unicode(default_config_file_name)
182 185
183 reset = Bool(False, config=True,
184 help="Whether to reset config files as part of 'create'."
185 )
186
187 186 flags = Dict(create_flags)
188 187
189 aliases = Dict(dict(profile='ClusterDir.profile'))
188 aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
190 189
191 classes = [ClusterDir]
190 classes = [ProfileDir]
192 191
193 def init_clusterdir(self):
194 super(IPClusterCreate, self).init_clusterdir()
195 self.log.info('Copying default config files to cluster directory '
196 '[overwrite=%r]' % (self.reset,))
197 self.cluster_dir.copy_all_config_files(overwrite=self.reset)
198
199 def initialize(self, argv=None):
200 self.parse_command_line(argv)
201 self.init_clusterdir()
202 192
203 193 stop_aliases = dict(
204 194 signal='IPClusterStop.signal',
205 profile='ClusterDir.profile',
206 cluster_dir='ClusterDir.location',
195 profile='BaseIPythonApplication.profile',
196 profile_dir='ProfileDir.location',
207 197 )
208 198
209 class IPClusterStop(ClusterApplication):
199 class IPClusterStop(BaseParallelApplication):
210 200 name = u'ipcluster'
211 201 description = stop_help
212 auto_create_cluster_dir = Bool(False)
213 202 config_file_name = Unicode(default_config_file_name)
214 203
215 204 signal = Int(signal.SIGINT, config=True,
@@ -217,13 +206,6 b' class IPClusterStop(ClusterApplication):'
217 206
218 207 aliases = Dict(stop_aliases)
219 208
220 def init_clusterdir(self):
221 try:
222 super(IPClusterStop, self).init_clusterdir()
223 except ClusterDirError as e:
224 self.log.fatal("Failed ClusterDir init: %s"%e)
225 self.exit(1)
226
227 209 def start(self):
228 210 """Start the app for the stop subcommand."""
229 211 try:
@@ -272,20 +254,19 b' engine_aliases.update(dict('
272 254 n='IPClusterEngines.n',
273 255 elauncher = 'IPClusterEngines.engine_launcher_class',
274 256 ))
275 class IPClusterEngines(ClusterApplication):
257 class IPClusterEngines(BaseParallelApplication):
276 258
277 259 name = u'ipcluster'
278 260 description = engines_help
279 261 usage = None
280 262 config_file_name = Unicode(default_config_file_name)
281 263 default_log_level = logging.INFO
282 auto_create_cluster_dir = Bool(False)
283 264 classes = List()
284 265 def _classes_default(self):
285 266 from IPython.parallel.apps import launcher
286 267 launchers = launcher.all_launchers
287 268 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
288 return [ClusterDir]+eslaunchers
269 return [ProfileDir]+eslaunchers
289 270
290 271 n = Int(2, config=True,
291 272 help="The number of engines to start.")
@@ -327,7 +308,7 b' class IPClusterEngines(ClusterApplication):'
327 308 klass = import_item(clsname)
328 309
329 310 launcher = klass(
330 work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
311 work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
331 312 )
332 313 return launcher
333 314
@@ -335,7 +316,7 b' class IPClusterEngines(ClusterApplication):'
335 316 self.log.info("Starting %i engines"%self.n)
336 317 self.engine_launcher.start(
337 318 self.n,
338 cluster_dir=self.cluster_dir.location
319 profile_dir=self.profile_dir.location
339 320 )
340 321
341 322 def stop_engines(self):
@@ -362,12 +343,12 b' class IPClusterEngines(ClusterApplication):'
362 343 def start_logging(self):
363 344 # Remove old log files of the controller and engine
364 345 if self.clean_logs:
365 log_dir = self.cluster_dir.log_dir
346 log_dir = self.profile_dir.log_dir
366 347 for f in os.listdir(log_dir):
367 348 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
368 349 os.remove(os.path.join(log_dir, f))
369 350 # This will remove old log files for ipcluster itself
370 # super(IPClusterApp, self).start_logging()
351 # super(IPBaseParallelApplication, self).start_logging()
371 352
372 353 def start(self):
373 354 """Start the app for the engines subcommand."""
@@ -410,12 +391,12 b' class IPClusterStart(IPClusterEngines):'
410 391 name = u'ipcluster'
411 392 description = start_help
412 393 default_log_level = logging.INFO
413 auto_create_cluster_dir = Bool(True, config=True,
414 help="whether to create the cluster_dir if it doesn't exist")
394 auto_create = Bool(True, config=True,
395 help="whether to create the profile_dir if it doesn't exist")
415 396 classes = List()
416 397 def _classes_default(self,):
417 398 from IPython.parallel.apps import launcher
418 return [ClusterDir]+launcher.all_launchers
399 return [ProfileDir]+launcher.all_launchers
419 400
420 401 clean_logs = Bool(True, config=True,
421 402 help="whether to cleanup old logs before starting")
@@ -441,7 +422,7 b' class IPClusterStart(IPClusterEngines):'
441 422
442 423 def start_controller(self):
443 424 self.controller_launcher.start(
444 cluster_dir=self.cluster_dir.location
425 profile_dir=self.profile_dir.location
445 426 )
446 427
447 428 def stop_controller(self):
@@ -504,7 +485,7 b' class IPClusterStart(IPClusterEngines):'
504 485
505 486 base='IPython.parallel.apps.ipclusterapp.IPCluster'
506 487
507 class IPClusterApp(Application):
488 class IPBaseParallelApplication(Application):
508 489 name = u'ipcluster'
509 490 description = _description
510 491
@@ -530,7 +511,7 b' class IPClusterApp(Application):'
530 511
531 512 def launch_new_instance():
532 513 """Create and run the IPython cluster."""
533 app = IPClusterApp()
514 app = IPBaseParallelApplication()
534 515 app.initialize()
535 516 app.start()
536 517
@@ -17,9 +17,7 b' The IPython controller application.'
17 17
18 18 from __future__ import with_statement
19 19
20 import copy
21 20 import os
22 import logging
23 21 import socket
24 22 import stat
25 23 import sys
@@ -33,14 +31,11 b' from zmq.log.handlers import PUBHandler'
33 31 from zmq.utils import jsonapi as json
34 32
35 33 from IPython.config.loader import Config
36
37 from IPython.parallel import factory
34 from IPython.core.newapplication import ProfileDir
38 35
39 36 from IPython.parallel.apps.clusterdir import (
40 ClusterDir,
41 ClusterApplication,
37 BaseParallelApplication,
42 38 base_flags
43 # ClusterDirConfigLoader
44 39 )
45 40 from IPython.utils.importstring import import_item
46 41 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
@@ -48,11 +43,11 b' from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict'
48 43 # from IPython.parallel.controller.controller import ControllerFactory
49 44 from IPython.parallel.streamsession import StreamSession
50 45 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 from IPython.parallel.controller.hub import Hub, HubFactory
46 from IPython.parallel.controller.hub import HubFactory
52 47 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 48 from IPython.parallel.controller.sqlitedb import SQLiteDB
54 49
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
50 from IPython.parallel.util import signal_children, split_url
56 51
57 52 # conditional import of MongoDB backend class
58 53
@@ -80,7 +75,7 b' clients. The controller needs to be started before the engines and can be'
80 75 configured using command line options or using a cluster directory. Cluster
81 76 directories contain config, log and security files and are usually located in
82 77 your ipython directory and named as "cluster_<profile>". See the `profile`
83 and `cluster_dir` options for details.
78 and `profile_dir` options for details.
84 79 """
85 80
86 81
@@ -106,15 +101,17 b' flags.update({'
106 101
107 102 flags.update()
108 103
109 class IPControllerApp(ClusterApplication):
104 class IPControllerApp(BaseParallelApplication):
110 105
111 106 name = u'ipcontroller'
112 107 description = _description
113 108 config_file_name = Unicode(default_config_file_name)
114 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
109 classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
110
111 # change default to True
112 auto_create = Bool(True, config=True,
113 help="""Whether to create profile dir if it doesn't exist""")
115 114
116 auto_create_cluster_dir = Bool(True, config=True,
117 help="Whether to create cluster_dir if it exists.")
118 115 reuse_files = Bool(False, config=True,
119 116 help='Whether to reuse existing json connection files [default: False]'
120 117 )
@@ -146,8 +143,6 b' class IPControllerApp(ClusterApplication):'
146 143 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
147 144
148 145 aliases = Dict(dict(
149 config = 'IPControllerApp.config_file',
150 # file = 'IPControllerApp.url_file',
151 146 log_level = 'IPControllerApp.log_level',
152 147 log_url = 'IPControllerApp.log_url',
153 148 reuse_files = 'IPControllerApp.reuse_files',
@@ -172,8 +167,8 b' class IPControllerApp(ClusterApplication):'
172 167 hwm = 'TaskScheduler.hwm',
173 168
174 169
175 profile = "ClusterDir.profile",
176 cluster_dir = 'ClusterDir.location',
170 profile = "BaseIPythonApplication.profile",
171 profile_dir = 'ProfileDir.location',
177 172
178 173 ))
179 174 flags = Dict(flags)
@@ -192,7 +187,7 b' class IPControllerApp(ClusterApplication):'
192 187 else:
193 188 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
194 189 cdict['location'] = location
195 fname = os.path.join(self.cluster_dir.security_dir, fname)
190 fname = os.path.join(self.profile_dir.security_dir, fname)
196 191 with open(fname, 'w') as f:
197 192 f.write(json.dumps(cdict, indent=2))
198 193 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
@@ -201,7 +196,7 b' class IPControllerApp(ClusterApplication):'
201 196 """load config from existing json connector files."""
202 197 c = self.config
203 198 # load from engine config
204 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
199 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
205 200 cfg = json.loads(f.read())
206 201 key = c.StreamSession.key = cfg['exec_key']
207 202 xport,addr = cfg['url'].split('://')
@@ -212,7 +207,7 b' class IPControllerApp(ClusterApplication):'
212 207 self.location = cfg['location']
213 208
214 209 # load client config
215 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
210 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
216 211 cfg = json.loads(f.read())
217 212 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
218 213 xport,addr = cfg['url'].split('://')
@@ -237,7 +232,7 b' class IPControllerApp(ClusterApplication):'
237 232 pass
238 233 elif self.secure:
239 234 key = str(uuid.uuid4())
240 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
235 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
241 236 # with open(keyfile, 'w') as f:
242 237 # f.write(key)
243 238 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
@@ -332,7 +327,7 b' class IPControllerApp(ClusterApplication):'
332 327 """save the registration urls to files."""
333 328 c = self.config
334 329
335 sec_dir = self.cluster_dir.security_dir
330 sec_dir = self.profile_dir.security_dir
336 331 cf = self.factory
337 332
338 333 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
@@ -22,10 +22,9 b' import sys'
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24
25 from IPython.core.newapplication import ProfileDir
25 26 from IPython.parallel.apps.clusterdir import (
26 ClusterApplication,
27 ClusterDir,
28 # ClusterDirConfigLoader
27 BaseParallelApplication,
29 28 )
30 29 from IPython.zmq.log import EnginePUBHandler
31 30
@@ -53,7 +52,7 b' and controller. A controller needs to be started before the engines. The'
53 52 engine can be configured using command line options or using a cluster
54 53 directory. Cluster directories contain config, log and security files and are
55 54 usually located in your ipython directory and named as "cluster_<profile>".
56 See the `profile` and `cluster_dir` options for details.
55 See the `profile` and `profile_dir` options for details.
57 56 """
58 57
59 58
@@ -98,16 +97,13 b' class MPI(Configurable):'
98 97 #-----------------------------------------------------------------------------
99 98
100 99
101 class IPEngineApp(ClusterApplication):
100 class IPEngineApp(BaseParallelApplication):
102 101
103 102 app_name = Unicode(u'ipengine')
104 103 description = Unicode(_description)
105 104 config_file_name = Unicode(default_config_file_name)
106 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
105 classes = List([ProfileDir, StreamSession, EngineFactory, Kernel, MPI])
107 106
108 auto_create_cluster_dir = Bool(False,
109 help="whether to create the cluster_dir if it doesn't exist")
110
111 107 startup_script = Unicode(u'', config=True,
112 108 help='specify a script to be run at startup')
113 109 startup_command = Unicode('', config=True,
@@ -117,7 +113,7 b' class IPEngineApp(ClusterApplication):'
117 113 help="""The full location of the file containing the connection information for
118 114 the controller. If this is not given, the file must be in the
119 115 security directory of the cluster directory. This location is
120 resolved using the `profile` or `cluster_dir` options.""",
116 resolved using the `profile` or `profile_dir` options.""",
121 117 )
122 118
123 119 url_file_name = Unicode(u'ipcontroller-engine.json')
@@ -126,7 +122,6 b' class IPEngineApp(ClusterApplication):'
126 122 logging to a central location.""")
127 123
128 124 aliases = Dict(dict(
129 config = 'IPEngineApp.config_file',
130 125 file = 'IPEngineApp.url_file',
131 126 c = 'IPEngineApp.startup_command',
132 127 s = 'IPEngineApp.startup_script',
@@ -143,8 +138,8 b' class IPEngineApp(ClusterApplication):'
143 138
144 139 timeout = 'EngineFactory.timeout',
145 140
146 profile = "ClusterDir.profile",
147 cluster_dir = 'ClusterDir.location',
141 profile = "IPEngineApp.profile",
142 profile_dir = 'ProfileDir.location',
148 143
149 144 mpi = 'MPI.use',
150 145
@@ -162,7 +157,7 b' class IPEngineApp(ClusterApplication):'
162 157 # # Find the actual controller key file
163 158 # if not config.Global.key_file:
164 159 # try_this = os.path.join(
165 # config.Global.cluster_dir,
160 # config.Global.profile_dir,
166 161 # config.Global.security_dir,
167 162 # config.Global.key_file_name
168 163 # )
@@ -178,7 +173,7 b' class IPEngineApp(ClusterApplication):'
178 173 # Find the actual controller key file
179 174 if not self.url_file:
180 175 self.url_file = os.path.join(
181 self.cluster_dir.security_dir,
176 self.profile_dir.security_dir,
182 177 self.url_file_name
183 178 )
184 179 def init_engine(self):
@@ -20,11 +20,11 b' import sys'
20 20
21 21 import zmq
22 22
23 from IPython.core.newapplication import ProfileDir
23 24 from IPython.utils.traitlets import Bool, Dict, Unicode
24 25
25 26 from IPython.parallel.apps.clusterdir import (
26 ClusterApplication,
27 ClusterDir,
27 BaseParallelApplication,
28 28 base_aliases
29 29 )
30 30 from IPython.parallel.apps.logwatcher import LogWatcher
@@ -43,7 +43,7 b' by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The'
43 43 logger can be configured using command line options or using a cluster
44 44 directory. Cluster directories contain config, log and security files and are
45 45 usually located in your ipython directory and named as "cluster_<profile>".
46 See the `profile` and `cluster_dir` options for details.
46 See the `profile` and `profile_dir` options for details.
47 47 """
48 48
49 49
@@ -54,14 +54,13 b' aliases = {}'
54 54 aliases.update(base_aliases)
55 55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
56 56
57 class IPLoggerApp(ClusterApplication):
57 class IPLoggerApp(BaseParallelApplication):
58 58
59 59 name = u'iploggerz'
60 60 description = _description
61 61 config_file_name = Unicode(default_config_file_name)
62 auto_create_cluster_dir = Bool(False)
63 62
64 classes = [LogWatcher, ClusterDir]
63 classes = [LogWatcher, ProfileDir]
65 64 aliases = Dict(aliases)
66 65
67 66 def initialize(self, argv=None):
@@ -101,7 +101,7 b' class BaseLauncher(LoggingFactory):'
101 101 """An asbtraction for starting, stopping and signaling a process."""
102 102
103 103 # In all of the launchers, the work_dir is where child processes will be
104 # run. This will usually be the cluster_dir, but may not be. any work_dir
104 # run. This will usually be the profile_dir, but may not be. any work_dir
105 105 # passed into the __init__ method will override the config value.
106 106 # This should not be used to set the work_dir for the actual engine
107 107 # and controller. Instead, use their own config files or the
@@ -337,10 +337,10 b' class LocalControllerLauncher(LocalProcessLauncher):'
337 337 def find_args(self):
338 338 return self.controller_cmd + self.controller_args
339 339
340 def start(self, cluster_dir):
341 """Start the controller by cluster_dir."""
342 self.controller_args.extend(['cluster_dir=%s'%cluster_dir])
343 self.cluster_dir = unicode(cluster_dir)
340 def start(self, profile_dir):
341 """Start the controller by profile_dir."""
342 self.controller_args.extend(['profile_dir=%s'%profile_dir])
343 self.profile_dir = unicode(profile_dir)
344 344 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
345 345 return super(LocalControllerLauncher, self).start()
346 346
@@ -358,10 +358,10 b' class LocalEngineLauncher(LocalProcessLauncher):'
358 358 def find_args(self):
359 359 return self.engine_cmd + self.engine_args
360 360
361 def start(self, cluster_dir):
362 """Start the engine by cluster_dir."""
363 self.engine_args.extend(['cluster_dir=%s'%cluster_dir])
364 self.cluster_dir = unicode(cluster_dir)
361 def start(self, profile_dir):
362 """Start the engine by profile_dir."""
363 self.engine_args.extend(['profile_dir=%s'%profile_dir])
364 self.profile_dir = unicode(profile_dir)
365 365 return super(LocalEngineLauncher, self).start()
366 366
367 367
@@ -385,16 +385,16 b' class LocalEngineSetLauncher(BaseLauncher):'
385 385 )
386 386 self.stop_data = {}
387 387
388 def start(self, n, cluster_dir):
389 """Start n engines by profile or cluster_dir."""
390 self.cluster_dir = unicode(cluster_dir)
388 def start(self, n, profile_dir):
389 """Start n engines by profile or profile_dir."""
390 self.profile_dir = unicode(profile_dir)
391 391 dlist = []
392 392 for i in range(n):
393 393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
394 394 # Copy the engine args over to each engine launcher.
395 395 el.engine_args = copy.deepcopy(self.engine_args)
396 396 el.on_stop(self._notice_engine_stopped)
397 d = el.start(cluster_dir)
397 d = el.start(profile_dir)
398 398 if i==0:
399 399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 400 self.launchers[i] = el
@@ -481,10 +481,10 b' class MPIExecControllerLauncher(MPIExecLauncher):'
481 481 )
482 482 n = Int(1)
483 483
484 def start(self, cluster_dir):
485 """Start the controller by cluster_dir."""
486 self.controller_args.extend(['cluster_dir=%s'%cluster_dir])
487 self.cluster_dir = unicode(cluster_dir)
484 def start(self, profile_dir):
485 """Start the controller by profile_dir."""
486 self.controller_args.extend(['profile_dir=%s'%profile_dir])
487 self.profile_dir = unicode(profile_dir)
488 488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 489 return super(MPIExecControllerLauncher, self).start(1)
490 490
@@ -504,10 +504,10 b' class MPIExecEngineSetLauncher(MPIExecLauncher):'
504 504 )
505 505 n = Int(1)
506 506
507 def start(self, n, cluster_dir):
508 """Start n engines by profile or cluster_dir."""
509 self.program_args.extend(['cluster_dir=%s'%cluster_dir])
510 self.cluster_dir = unicode(cluster_dir)
507 def start(self, n, profile_dir):
508 """Start n engines by profile or profile_dir."""
509 self.program_args.extend(['profile_dir=%s'%profile_dir])
510 self.profile_dir = unicode(profile_dir)
511 511 self.n = n
512 512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 513 return super(MPIExecEngineSetLauncher, self).start(n)
@@ -554,8 +554,8 b' class SSHLauncher(LocalProcessLauncher):'
554 554 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 555 self.program + self.program_args
556 556
557 def start(self, cluster_dir, hostname=None, user=None):
558 self.cluster_dir = unicode(cluster_dir)
557 def start(self, profile_dir, hostname=None, user=None):
558 self.profile_dir = unicode(profile_dir)
559 559 if hostname is not None:
560 560 self.hostname = hostname
561 561 if user is not None:
@@ -594,12 +594,12 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
594 594 help="""dict of engines to launch. This is a dict by hostname of ints,
595 595 corresponding to the number of engines to start on that host.""")
596 596
597 def start(self, n, cluster_dir):
598 """Start engines by profile or cluster_dir.
597 def start(self, n, profile_dir):
598 """Start engines by profile or profile_dir.
599 599 `n` is ignored, and the `engines` config property is used instead.
600 600 """
601 601
602 self.cluster_dir = unicode(cluster_dir)
602 self.profile_dir = unicode(profile_dir)
603 603 dlist = []
604 604 for host, n in self.engines.iteritems():
605 605 if isinstance(n, (tuple, list)):
@@ -618,7 +618,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
618 618 i
619 619 el.program_args = args
620 620 el.on_stop(self._notice_engine_stopped)
621 d = el.start(cluster_dir, user=user, hostname=host)
621 d = el.start(profile_dir, user=user, hostname=host)
622 622 if i==0:
623 623 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
624 624 self.launchers[host+str(i)] = el
@@ -739,8 +739,8 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
739 739 # The tasks work directory is *not* the actual work directory of
740 740 # the controller. It is used as the base path for the stdout/stderr
741 741 # files that the scheduler redirects to.
742 t.work_directory = self.cluster_dir
743 # Add the cluster_dir and from self.start().
742 t.work_directory = self.profile_dir
743 # Add the profile_dir and from self.start().
744 744 t.controller_args.extend(self.extra_args)
745 745 job.add_task(t)
746 746
@@ -749,12 +749,12 b' class WindowsHPCControllerLauncher(WindowsHPCLauncher):'
749 749
750 750 @property
751 751 def job_file(self):
752 return os.path.join(self.cluster_dir, self.job_file_name)
752 return os.path.join(self.profile_dir, self.job_file_name)
753 753
754 def start(self, cluster_dir):
755 """Start the controller by cluster_dir."""
756 self.extra_args = ['cluster_dir=%s'%cluster_dir]
757 self.cluster_dir = unicode(cluster_dir)
754 def start(self, profile_dir):
755 """Start the controller by profile_dir."""
756 self.extra_args = ['profile_dir=%s'%profile_dir]
757 self.profile_dir = unicode(profile_dir)
758 758 return super(WindowsHPCControllerLauncher, self).start(1)
759 759
760 760
@@ -773,8 +773,8 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
773 773 # The tasks work directory is *not* the actual work directory of
774 774 # the engine. It is used as the base path for the stdout/stderr
775 775 # files that the scheduler redirects to.
776 t.work_directory = self.cluster_dir
777 # Add the cluster_dir and from self.start().
776 t.work_directory = self.profile_dir
777 # Add the profile_dir and from self.start().
778 778 t.engine_args.extend(self.extra_args)
779 779 job.add_task(t)
780 780
@@ -783,12 +783,12 b' class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):'
783 783
784 784 @property
785 785 def job_file(self):
786 return os.path.join(self.cluster_dir, self.job_file_name)
786 return os.path.join(self.profile_dir, self.job_file_name)
787 787
788 def start(self, n, cluster_dir):
789 """Start the controller by cluster_dir."""
790 self.extra_args = ['cluster_dir=%s'%cluster_dir]
791 self.cluster_dir = unicode(cluster_dir)
788 def start(self, n, profile_dir):
789 """Start the controller by profile_dir."""
790 self.extra_args = ['profile_dir=%s'%profile_dir]
791 self.profile_dir = unicode(profile_dir)
792 792 return super(WindowsHPCEngineSetLauncher, self).start(n)
793 793
794 794
@@ -897,13 +897,13 b' class BatchSystemLauncher(BaseLauncher):'
897 897 f.write(script_as_string)
898 898 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
899 899
900 def start(self, n, cluster_dir):
900 def start(self, n, profile_dir):
901 901 """Start n copies of the process using a batch system."""
902 # Here we save profile and cluster_dir in the context so they
902 # Here we save profile and profile_dir in the context so they
903 903 # can be used in the batch script template as ${profile} and
904 # ${cluster_dir}
905 self.context['cluster_dir'] = cluster_dir
906 self.cluster_dir = unicode(cluster_dir)
904 # ${profile_dir}
905 self.context['profile_dir'] = profile_dir
906 self.profile_dir = unicode(profile_dir)
907 907 self.write_batch_script(n)
908 908 output = check_output(self.args, env=os.environ)
909 909
@@ -942,13 +942,13 b' class PBSControllerLauncher(PBSLauncher):'
942 942 default_template= Unicode("""#!/bin/sh
943 943 #PBS -V
944 944 #PBS -N ipcontroller
945 %s --log-to-file cluster_dir $cluster_dir
945 %s --log-to-file profile_dir $profile_dir
946 946 """%(' '.join(ipcontroller_cmd_argv)))
947 947
948 def start(self, cluster_dir):
949 """Start the controller by profile or cluster_dir."""
948 def start(self, profile_dir):
949 """Start the controller by profile or profile_dir."""
950 950 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
951 return super(PBSControllerLauncher, self).start(1, cluster_dir)
951 return super(PBSControllerLauncher, self).start(1, profile_dir)
952 952
953 953
954 954 class PBSEngineSetLauncher(PBSLauncher):
@@ -958,13 +958,13 b' class PBSEngineSetLauncher(PBSLauncher):'
958 958 default_template= Unicode(u"""#!/bin/sh
959 959 #PBS -V
960 960 #PBS -N ipengine
961 %s cluster_dir $cluster_dir
961 %s profile_dir $profile_dir
962 962 """%(' '.join(ipengine_cmd_argv)))
963 963
964 def start(self, n, cluster_dir):
965 """Start n engines by profile or cluster_dir."""
964 def start(self, n, profile_dir):
965 """Start n engines by profile or profile_dir."""
966 966 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
967 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
967 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
968 968
969 969 #SGE is very similar to PBS
970 970
@@ -983,13 +983,13 b' class SGEControllerLauncher(SGELauncher):'
983 983 default_template= Unicode(u"""#$$ -V
984 984 #$$ -S /bin/sh
985 985 #$$ -N ipcontroller
986 %s --log-to-file cluster_dir=$cluster_dir
986 %s --log-to-file profile_dir=$profile_dir
987 987 """%(' '.join(ipcontroller_cmd_argv)))
988 988
989 def start(self, cluster_dir):
990 """Start the controller by profile or cluster_dir."""
989 def start(self, profile_dir):
990 """Start the controller by profile or profile_dir."""
991 991 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
992 return super(PBSControllerLauncher, self).start(1, cluster_dir)
992 return super(PBSControllerLauncher, self).start(1, profile_dir)
993 993
994 994 class SGEEngineSetLauncher(SGELauncher):
995 995 """Launch Engines with SGE"""
@@ -998,13 +998,13 b' class SGEEngineSetLauncher(SGELauncher):'
998 998 default_template = Unicode("""#$$ -V
999 999 #$$ -S /bin/sh
1000 1000 #$$ -N ipengine
1001 %s cluster_dir=$cluster_dir
1001 %s profile_dir=$profile_dir
1002 1002 """%(' '.join(ipengine_cmd_argv)))
1003 1003
1004 def start(self, n, cluster_dir):
1005 """Start n engines by profile or cluster_dir."""
1004 def start(self, n, profile_dir):
1005 """Start n engines by profile or profile_dir."""
1006 1006 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1007 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
1007 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1008 1008
1009 1009
1010 1010 #-----------------------------------------------------------------------------
@@ -34,7 +34,7 b' from IPython.parallel import streamsession as ss'
34 34 from IPython.parallel import util
35 35
36 36 from .asyncresult import AsyncResult, AsyncHubResult
37 from IPython.parallel.apps.clusterdir import ClusterDir, ClusterDirError
37 from IPython.core.newapplication import ProfileDir, ProfileDirError
38 38 from .view import DirectView, LoadBalancedView
39 39
40 40 #--------------------------------------------------------------------------
@@ -234,7 +234,7 b' class Client(HasTraits):'
234 234 _ignored_control_replies=Int(0)
235 235 _ignored_hub_replies=Int(0)
236 236
237 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
237 def __init__(self, url_or_file=None, profile='default', profile_dir=None, ipython_dir=None,
238 238 context=None, username=None, debug=False, exec_key=None,
239 239 sshserver=None, sshkey=None, password=None, paramiko=None,
240 240 timeout=10
@@ -245,7 +245,7 b' class Client(HasTraits):'
245 245 self._context = context
246 246
247 247
248 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
248 self._setup_profile_dir(profile, profile_dir, ipython_dir)
249 249 if self._cd is not None:
250 250 if url_or_file is None:
251 251 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
@@ -318,21 +318,21 b' class Client(HasTraits):'
318 318 """cleanup sockets, but _not_ context."""
319 319 self.close()
320 320
321 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
321 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
322 322 if ipython_dir is None:
323 323 ipython_dir = get_ipython_dir()
324 if cluster_dir is not None:
324 if profile_dir is not None:
325 325 try:
326 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
326 self._cd = ProfileDir.find_profile_dir(profile_dir)
327 327 return
328 except ClusterDirError:
328 except ProfileDirError:
329 329 pass
330 330 elif profile is not None:
331 331 try:
332 self._cd = ClusterDir.find_cluster_dir_by_profile(
332 self._cd = ProfileDir.find_profile_dir_by_name(
333 333 ipython_dir, profile)
334 334 return
335 except ClusterDirError:
335 except ProfileDirError:
336 336 pass
337 337 self._cd = None
338 338
@@ -122,10 +122,16 b' class SQLiteDB(BaseDB):'
122 122 # use session, and prefix _, since starting with # is illegal
123 123 self.table = '_'+self.session.replace('-','_')
124 124 if not self.location:
125 if hasattr(self.config.Global, 'cluster_dir'):
126 self.location = self.config.Global.cluster_dir
125 # get current profile
126 from IPython.core.newapplication import BaseIPythonApplication
127 if BaseIPythonApplication.initialized():
128 app = BaseIPythonApplication.instance()
129 if app.profile_dir is not None:
130 self.location = app.profile_dir.location
131 else:
132 self.location = u'.'
127 133 else:
128 self.location = '.'
134 self.location = u'.'
129 135 self._init_db()
130 136
131 137 # register db commit as 2s periodic callback
General Comments 0
You need to be logged in to leave comments. Login now