##// END OF EJS Templates
update parallel apps to use ProfileDir
MinRK -
Show More
@@ -1,540 +1,257 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython cluster directory
4 The IPython cluster directory
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import logging
21 import logging
22 import re
22 import re
23 import shutil
24 import sys
23 import sys
25
24
26 from subprocess import Popen, PIPE
25 from subprocess import Popen, PIPE
27
26
28 from IPython.config.loader import PyFileConfigLoader, Config
27 from IPython.config.loader import Config
29 from IPython.config.configurable import Configurable
30 from IPython.config.application import Application
31 from IPython.core.crashhandler import CrashHandler
32 from IPython.core.newapplication import BaseIPythonApplication
33 from IPython.core import release
28 from IPython.core import release
34 from IPython.utils.path import (
29 from IPython.core.crashhandler import CrashHandler
35 get_ipython_package_dir,
30 from IPython.core.newapplication import (
36 get_ipython_dir,
31 BaseIPythonApplication,
37 expand_path
32 base_aliases as base_ip_aliases,
33 base_flags as base_ip_flags
38 )
34 )
35 from IPython.utils.path import expand_path
36
39 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
37 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
40
38
41 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
42 # Module errors
40 # Module errors
43 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
44
42
45 class ClusterDirError(Exception):
46 pass
47
48
49 class PIDFileError(Exception):
43 class PIDFileError(Exception):
50 pass
44 pass
51
45
52
46
53 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
54 # Class for managing cluster directories
55 #-----------------------------------------------------------------------------
56
57 class ClusterDir(Configurable):
58 """An object to manage the cluster directory and its resources.
59
60 The cluster directory is used by :command:`ipengine`,
61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
62 configuration, logging and security of these applications.
63
64 This object knows how to find, create and manage these directories. This
65 should be used by any code that want's to handle cluster directories.
66 """
67
68 security_dir_name = Unicode('security')
69 log_dir_name = Unicode('log')
70 pid_dir_name = Unicode('pid')
71 security_dir = Unicode(u'')
72 log_dir = Unicode(u'')
73 pid_dir = Unicode(u'')
74
75 auto_create = Bool(False,
76 help="""Whether to automatically create the ClusterDirectory if it does
77 not exist""")
78 overwrite = Bool(False,
79 help="""Whether to overwrite existing config files""")
80 location = Unicode(u'', config=True,
81 help="""Set the cluster dir. This overrides the logic used by the
82 `profile` option.""",
83 )
84 profile = Unicode(u'default', config=True,
85 help="""The string name of the profile to be used. This determines the name
86 of the cluster dir as: cluster_<profile>. The default profile is named
87 'default'. The cluster directory is resolve this way if the
88 `cluster_dir` option is not used."""
89 )
90
91 _location_isset = Bool(False) # flag for detecting multiply set location
92 _new_dir = Bool(False) # flag for whether a new dir was created
93
94 def __init__(self, **kwargs):
95 # make sure auto_create,overwrite are set *before* location
96 for name in ('auto_create', 'overwrite'):
97 v = kwargs.pop(name, None)
98 if v is not None:
99 setattr(self, name, v)
100 super(ClusterDir, self).__init__(**kwargs)
101 if not self.location:
102 self._profile_changed('profile', 'default', self.profile)
103
104 def _location_changed(self, name, old, new):
105 if self._location_isset:
106 raise RuntimeError("Cannot set ClusterDir more than once.")
107 self._location_isset = True
108 if not os.path.isdir(new):
109 if self.auto_create:# or self.config.ClusterDir.auto_create:
110 os.makedirs(new)
111 self._new_dir = True
112 else:
113 raise ClusterDirError('Directory not found: %s' % new)
114
115 # ensure config files exist:
116 self.copy_all_config_files(overwrite=self.overwrite)
117 self.security_dir = os.path.join(new, self.security_dir_name)
118 self.log_dir = os.path.join(new, self.log_dir_name)
119 self.pid_dir = os.path.join(new, self.pid_dir_name)
120 self.check_dirs()
121
122 def _profile_changed(self, name, old, new):
123 if self._location_isset:
124 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
125 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
126
127 def _log_dir_changed(self, name, old, new):
128 self.check_log_dir()
129
130 def check_log_dir(self):
131 if not os.path.isdir(self.log_dir):
132 os.mkdir(self.log_dir)
133
134 def _security_dir_changed(self, name, old, new):
135 self.check_security_dir()
136
137 def check_security_dir(self):
138 if not os.path.isdir(self.security_dir):
139 os.mkdir(self.security_dir, 0700)
140 os.chmod(self.security_dir, 0700)
141
142 def _pid_dir_changed(self, name, old, new):
143 self.check_pid_dir()
144
145 def check_pid_dir(self):
146 if not os.path.isdir(self.pid_dir):
147 os.mkdir(self.pid_dir, 0700)
148 os.chmod(self.pid_dir, 0700)
149
150 def check_dirs(self):
151 self.check_security_dir()
152 self.check_log_dir()
153 self.check_pid_dir()
154
155 def copy_config_file(self, config_file, path=None, overwrite=False):
156 """Copy a default config file into the active cluster directory.
157
158 Default configuration files are kept in :mod:`IPython.config.default`.
159 This function moves these from that location to the working cluster
160 directory.
161 """
162 if path is None:
163 import IPython.config.default
164 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
165 path = os.path.sep.join(path)
166 src = os.path.join(path, config_file)
167 dst = os.path.join(self.location, config_file)
168 if not os.path.isfile(dst) or overwrite:
169 shutil.copy(src, dst)
170
171 def copy_all_config_files(self, path=None, overwrite=False):
172 """Copy all config files into the active cluster directory."""
173 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
174 u'ipcluster_config.py']:
175 self.copy_config_file(f, path=path, overwrite=overwrite)
176
177 @classmethod
178 def create_cluster_dir(csl, cluster_dir):
179 """Create a new cluster directory given a full path.
180
181 Parameters
182 ----------
183 cluster_dir : str
184 The full path to the cluster directory. If it does exist, it will
185 be used. If not, it will be created.
186 """
187 return ClusterDir(location=cluster_dir)
188
189 @classmethod
190 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
191 """Create a cluster dir by profile name and path.
192
193 Parameters
194 ----------
195 path : str
196 The path (directory) to put the cluster directory in.
197 profile : str
198 The name of the profile. The name of the cluster directory will
199 be "cluster_<profile>".
200 """
201 if not os.path.isdir(path):
202 raise ClusterDirError('Directory not found: %s' % path)
203 cluster_dir = os.path.join(path, u'cluster_' + profile)
204 return ClusterDir(location=cluster_dir)
205
206 @classmethod
207 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
208 """Find an existing cluster dir by profile name, return its ClusterDir.
209
210 This searches through a sequence of paths for a cluster dir. If it
211 is not found, a :class:`ClusterDirError` exception will be raised.
212
213 The search path algorithm is:
214 1. ``os.getcwd()``
215 2. ``ipython_dir``
216 3. The directories found in the ":" separated
217 :env:`IPCLUSTER_DIR_PATH` environment variable.
218
219 Parameters
220 ----------
221 ipython_dir : unicode or str
222 The IPython directory to use.
223 profile : unicode or str
224 The name of the profile. The name of the cluster directory
225 will be "cluster_<profile>".
226 """
227 dirname = u'cluster_' + profile
228 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
229 if cluster_dir_paths:
230 cluster_dir_paths = cluster_dir_paths.split(':')
231 else:
232 cluster_dir_paths = []
233 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
234 for p in paths:
235 cluster_dir = os.path.join(p, dirname)
236 if os.path.isdir(cluster_dir):
237 return ClusterDir(location=cluster_dir)
238 else:
239 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
240
241 @classmethod
242 def find_cluster_dir(cls, cluster_dir):
243 """Find/create a cluster dir and return its ClusterDir.
244
245 This will create the cluster directory if it doesn't exist.
246
247 Parameters
248 ----------
249 cluster_dir : unicode or str
250 The path of the cluster directory. This is expanded using
251 :func:`IPython.utils.genutils.expand_path`.
252 """
253 cluster_dir = expand_path(cluster_dir)
254 if not os.path.isdir(cluster_dir):
255 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
256 return ClusterDir(location=cluster_dir)
257
258
259 #-----------------------------------------------------------------------------
260 # Crash handler for this application
48 # Crash handler for this application
261 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
262
50
263
51
264 _message_template = """\
52 _message_template = """\
265 Oops, $self.app_name crashed. We do our best to make it stable, but...
53 Oops, $self.app_name crashed. We do our best to make it stable, but...
266
54
267 A crash report was automatically generated with the following information:
55 A crash report was automatically generated with the following information:
268 - A verbatim copy of the crash traceback.
56 - A verbatim copy of the crash traceback.
269 - Data on your current $self.app_name configuration.
57 - Data on your current $self.app_name configuration.
270
58
271 It was left in the file named:
59 It was left in the file named:
272 \t'$self.crash_report_fname'
60 \t'$self.crash_report_fname'
273 If you can email this file to the developers, the information in it will help
61 If you can email this file to the developers, the information in it will help
274 them in understanding and correcting the problem.
62 them in understanding and correcting the problem.
275
63
276 You can mail it to: $self.contact_name at $self.contact_email
64 You can mail it to: $self.contact_name at $self.contact_email
277 with the subject '$self.app_name Crash Report'.
65 with the subject '$self.app_name Crash Report'.
278
66
279 If you want to do it now, the following command will work (under Unix):
67 If you want to do it now, the following command will work (under Unix):
280 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
68 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
281
69
282 To ensure accurate tracking of this issue, please file a report about it at:
70 To ensure accurate tracking of this issue, please file a report about it at:
283 $self.bug_tracker
71 $self.bug_tracker
284 """
72 """
285
73
286 class ClusterDirCrashHandler(CrashHandler):
74 class ParallelCrashHandler(CrashHandler):
287 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
75 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
288
76
289 message_template = _message_template
77 message_template = _message_template
290
78
291 def __init__(self, app):
79 def __init__(self, app):
292 contact_name = release.authors['Min'][0]
80 contact_name = release.authors['Min'][0]
293 contact_email = release.authors['Min'][1]
81 contact_email = release.authors['Min'][1]
294 bug_tracker = 'http://github.com/ipython/ipython/issues'
82 bug_tracker = 'http://github.com/ipython/ipython/issues'
295 super(ClusterDirCrashHandler,self).__init__(
83 super(ParallelCrashHandler,self).__init__(
296 app, contact_name, contact_email, bug_tracker
84 app, contact_name, contact_email, bug_tracker
297 )
85 )
298
86
299
87
300 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
301 # Main application
89 # Main application
302 #-----------------------------------------------------------------------------
90 #-----------------------------------------------------------------------------
303 base_aliases = {
91 base_aliases = {}
304 'profile' : "ClusterDir.profile",
92 base_aliases.update(base_ip_aliases)
305 'cluster_dir' : 'ClusterDir.location',
93 base_aliases.update({
306 'log_level' : 'ClusterApplication.log_level',
94 'profile_dir' : 'ProfileDir.location',
307 'work_dir' : 'ClusterApplication.work_dir',
95 'log_level' : 'BaseParallelApplication.log_level',
308 'log_to_file' : 'ClusterApplication.log_to_file',
96 'work_dir' : 'BaseParallelApplication.work_dir',
309 'clean_logs' : 'ClusterApplication.clean_logs',
97 'log_to_file' : 'BaseParallelApplication.log_to_file',
310 'log_url' : 'ClusterApplication.log_url',
98 'clean_logs' : 'BaseParallelApplication.clean_logs',
311 'config' : 'ClusterApplication.config_file',
99 'log_url' : 'BaseParallelApplication.log_url',
312 }
100 })
313
101
314 base_flags = {
102 base_flags = {
315 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
103 'log-to-file' : ({'BaseParallelApplication' : Config({
316 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
104 'log_to_file' : True}),
317 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
105 }, "send log output to a file")
318 }
106 }
319 for k,v in base_flags.iteritems():
107 base_flags.update(base_ip_flags)
320 base_flags[k] = (Config(v[0]),v[1])
321
322 class ClusterApplication(BaseIPythonApplication):
323 """An application that puts everything into a cluster directory.
324
325 Instead of looking for things in the ipython_dir, this type of application
326 will use its own private directory called the "cluster directory"
327 for things like config files, log files, etc.
328
108
329 The cluster directory is resolved as follows:
109 class BaseParallelApplication(BaseIPythonApplication):
110 """The base Application for IPython.parallel apps
330
111
331 * If the ``cluster_dir`` option is given, it is used.
112 Principle extensions to BaseIPyythonApplication:
332 * If ``cluster_dir`` is not given, the application directory is
333 resolve using the profile name as ``cluster_<profile>``. The search
334 path for this directory is then i) cwd if it is found there
335 and ii) in ipython_dir otherwise.
336
113
337 The config file for the application is to be put in the cluster
114 * work_dir
338 dir and named the value of the ``config_file_name`` class attribute.
115 * remote logging via pyzmq
116 * IOLoop instance
339 """
117 """
340
118
341 crash_handler_class = ClusterDirCrashHandler
119 crash_handler_class = ParallelCrashHandler
342 auto_create_cluster_dir = Bool(True, config=True,
343 help="whether to create the cluster_dir if it doesn't exist")
344 cluster_dir = Instance(ClusterDir)
345 classes = [ClusterDir]
346
120
347 def _log_level_default(self):
121 def _log_level_default(self):
348 # temporarily override default_log_level to INFO
122 # temporarily override default_log_level to INFO
349 return logging.INFO
123 return logging.INFO
350
124
351 work_dir = Unicode(os.getcwdu(), config=True,
125 work_dir = Unicode(os.getcwdu(), config=True,
352 help='Set the working dir for the process.'
126 help='Set the working dir for the process.'
353 )
127 )
354 def _work_dir_changed(self, name, old, new):
128 def _work_dir_changed(self, name, old, new):
355 self.work_dir = unicode(expand_path(new))
129 self.work_dir = unicode(expand_path(new))
356
130
357 log_to_file = Bool(config=True,
131 log_to_file = Bool(config=True,
358 help="whether to log to a file")
132 help="whether to log to a file")
359
133
360 clean_logs = Bool(False, config=True,
134 clean_logs = Bool(False, config=True,
361 help="whether to cleanup old logfiles before starting")
135 help="whether to cleanup old logfiles before starting")
362
136
363 log_url = Unicode('', config=True,
137 log_url = Unicode('', config=True,
364 help="The ZMQ URL of the iplogger to aggregate logging.")
138 help="The ZMQ URL of the iplogger to aggregate logging.")
365
139
366 config_file = Unicode(u'', config=True,
140 def _config_files_default(self):
367 help="""Path to ip<appname> configuration file. The default is to use
141 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
368 <appname>_config.py, as found by cluster-dir."""
369 )
370 def _config_file_paths_default(self):
371 # don't include profile dir
372 return [ os.getcwdu(), self.ipython_dir ]
373
374 def _config_file_changed(self, name, old, new):
375 if os.pathsep in new:
376 path, new = new.rsplit(os.pathsep)
377 self.config_file_paths.insert(0, path)
378 self.config_file_name = new
379
380 config_file_name = Unicode('')
381
142
382 loop = Instance('zmq.eventloop.ioloop.IOLoop')
143 loop = Instance('zmq.eventloop.ioloop.IOLoop')
383 def _loop_default(self):
144 def _loop_default(self):
384 from zmq.eventloop.ioloop import IOLoop
145 from zmq.eventloop.ioloop import IOLoop
385 return IOLoop.instance()
146 return IOLoop.instance()
386
147
387 aliases = Dict(base_aliases)
148 aliases = Dict(base_aliases)
388 flags = Dict(base_flags)
149 flags = Dict(base_flags)
389
150
390 def init_clusterdir(self):
391 """This resolves the cluster directory.
392
393 This tries to find the cluster directory and if successful, it will
394 have done:
395 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
396 the application.
397 * Sets ``self.cluster_dir`` attribute of the application and config
398 objects.
399
400 The algorithm used for this is as follows:
401 1. Try ``Global.cluster_dir``.
402 2. Try using ``Global.profile``.
403 3. If both of these fail and ``self.auto_create_cluster_dir`` is
404 ``True``, then create the new cluster dir in the IPython directory.
405 4. If all fails, then raise :class:`ClusterDirError`.
406 """
407 try:
408 self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config)
409 except ClusterDirError as e:
410 self.log.fatal("Error initializing cluster dir: %s"%e)
411 self.log.fatal("A cluster dir must be created before running this command.")
412 self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more "
413 "information about creating and listing cluster dirs."
414 )
415 self.exit(1)
416
417 if self.cluster_dir._new_dir:
418 self.log.info('Creating new cluster dir: %s' % \
419 self.cluster_dir.location)
420 else:
421 self.log.info('Using existing cluster dir: %s' % \
422 self.cluster_dir.location)
423
424 # insert after cwd:
425 self.config_file_paths.insert(1, self.cluster_dir.location)
426
427 def initialize(self, argv=None):
151 def initialize(self, argv=None):
428 """initialize the app"""
152 """initialize the app"""
429 self.init_crash_handler()
153 super(BaseParallelApplication, self).initialize(argv)
430 self.parse_command_line(argv)
431 cl_config = self.config
432 self.init_clusterdir()
433 self.load_config_file()
434 # command-line should *override* config file, but command-line is necessary
435 # to determine clusterdir, etc.
436 self.update_config(cl_config)
437 self.to_work_dir()
154 self.to_work_dir()
438 self.reinit_logging()
155 self.reinit_logging()
439
156
440 def to_work_dir(self):
157 def to_work_dir(self):
441 wd = self.work_dir
158 wd = self.work_dir
442 if unicode(wd) != os.getcwdu():
159 if unicode(wd) != os.getcwdu():
443 os.chdir(wd)
160 os.chdir(wd)
444 self.log.info("Changing to working dir: %s" % wd)
161 self.log.info("Changing to working dir: %s" % wd)
445 # This is the working dir by now.
162 # This is the working dir by now.
446 sys.path.insert(0, '')
163 sys.path.insert(0, '')
447
164
448 def reinit_logging(self):
165 def reinit_logging(self):
449 # Remove old log files
166 # Remove old log files
450 log_dir = self.cluster_dir.log_dir
167 log_dir = self.profile_dir.log_dir
451 if self.clean_logs:
168 if self.clean_logs:
452 for f in os.listdir(log_dir):
169 for f in os.listdir(log_dir):
453 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
170 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
454 os.remove(os.path.join(log_dir, f))
171 os.remove(os.path.join(log_dir, f))
455 if self.log_to_file:
172 if self.log_to_file:
456 # Start logging to the new log file
173 # Start logging to the new log file
457 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
174 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
458 logfile = os.path.join(log_dir, log_filename)
175 logfile = os.path.join(log_dir, log_filename)
459 open_log_file = open(logfile, 'w')
176 open_log_file = open(logfile, 'w')
460 else:
177 else:
461 open_log_file = None
178 open_log_file = None
462 if open_log_file is not None:
179 if open_log_file is not None:
463 self.log.removeHandler(self._log_handler)
180 self.log.removeHandler(self._log_handler)
464 self._log_handler = logging.StreamHandler(open_log_file)
181 self._log_handler = logging.StreamHandler(open_log_file)
465 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
182 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
466 self._log_handler.setFormatter(self._log_formatter)
183 self._log_handler.setFormatter(self._log_formatter)
467 self.log.addHandler(self._log_handler)
184 self.log.addHandler(self._log_handler)
468
185
469 def write_pid_file(self, overwrite=False):
186 def write_pid_file(self, overwrite=False):
470 """Create a .pid file in the pid_dir with my pid.
187 """Create a .pid file in the pid_dir with my pid.
471
188
472 This must be called after pre_construct, which sets `self.pid_dir`.
189 This must be called after pre_construct, which sets `self.pid_dir`.
473 This raises :exc:`PIDFileError` if the pid file exists already.
190 This raises :exc:`PIDFileError` if the pid file exists already.
474 """
191 """
475 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
192 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
476 if os.path.isfile(pid_file):
193 if os.path.isfile(pid_file):
477 pid = self.get_pid_from_file()
194 pid = self.get_pid_from_file()
478 if not overwrite:
195 if not overwrite:
479 raise PIDFileError(
196 raise PIDFileError(
480 'The pid file [%s] already exists. \nThis could mean that this '
197 'The pid file [%s] already exists. \nThis could mean that this '
481 'server is already running with [pid=%s].' % (pid_file, pid)
198 'server is already running with [pid=%s].' % (pid_file, pid)
482 )
199 )
483 with open(pid_file, 'w') as f:
200 with open(pid_file, 'w') as f:
484 self.log.info("Creating pid file: %s" % pid_file)
201 self.log.info("Creating pid file: %s" % pid_file)
485 f.write(repr(os.getpid())+'\n')
202 f.write(repr(os.getpid())+'\n')
486
203
487 def remove_pid_file(self):
204 def remove_pid_file(self):
488 """Remove the pid file.
205 """Remove the pid file.
489
206
490 This should be called at shutdown by registering a callback with
207 This should be called at shutdown by registering a callback with
491 :func:`reactor.addSystemEventTrigger`. This needs to return
208 :func:`reactor.addSystemEventTrigger`. This needs to return
492 ``None``.
209 ``None``.
493 """
210 """
494 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
211 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
495 if os.path.isfile(pid_file):
212 if os.path.isfile(pid_file):
496 try:
213 try:
497 self.log.info("Removing pid file: %s" % pid_file)
214 self.log.info("Removing pid file: %s" % pid_file)
498 os.remove(pid_file)
215 os.remove(pid_file)
499 except:
216 except:
500 self.log.warn("Error removing the pid file: %s" % pid_file)
217 self.log.warn("Error removing the pid file: %s" % pid_file)
501
218
502 def get_pid_from_file(self):
219 def get_pid_from_file(self):
503 """Get the pid from the pid file.
220 """Get the pid from the pid file.
504
221
505 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
222 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
506 """
223 """
507 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
224 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
508 if os.path.isfile(pid_file):
225 if os.path.isfile(pid_file):
509 with open(pid_file, 'r') as f:
226 with open(pid_file, 'r') as f:
510 pid = int(f.read().strip())
227 pid = int(f.read().strip())
511 return pid
228 return pid
512 else:
229 else:
513 raise PIDFileError('pid file not found: %s' % pid_file)
230 raise PIDFileError('pid file not found: %s' % pid_file)
514
231
515 def check_pid(self, pid):
232 def check_pid(self, pid):
516 if os.name == 'nt':
233 if os.name == 'nt':
517 try:
234 try:
518 import ctypes
235 import ctypes
519 # returns 0 if no such process (of ours) exists
236 # returns 0 if no such process (of ours) exists
520 # positive int otherwise
237 # positive int otherwise
521 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
238 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
522 except Exception:
239 except Exception:
523 self.log.warn(
240 self.log.warn(
524 "Could not determine whether pid %i is running via `OpenProcess`. "
241 "Could not determine whether pid %i is running via `OpenProcess`. "
525 " Making the likely assumption that it is."%pid
242 " Making the likely assumption that it is."%pid
526 )
243 )
527 return True
244 return True
528 return bool(p)
245 return bool(p)
529 else:
246 else:
530 try:
247 try:
531 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
248 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
532 output,_ = p.communicate()
249 output,_ = p.communicate()
533 except OSError:
250 except OSError:
534 self.log.warn(
251 self.log.warn(
535 "Could not determine whether pid %i is running via `ps x`. "
252 "Could not determine whether pid %i is running via `ps x`. "
536 " Making the likely assumption that it is."%pid
253 " Making the likely assumption that it is."%pid
537 )
254 )
538 return True
255 return True
539 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
256 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
540 return pid in pids
257 return pid in pids
@@ -1,540 +1,521 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import errno
18 import errno
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import signal
22 import signal
23
23
24 from subprocess import check_call, CalledProcessError, PIPE
24 from subprocess import check_call, CalledProcessError, PIPE
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27
27
28 from IPython.config.application import Application, boolean_flag
28 from IPython.config.application import Application, boolean_flag
29 from IPython.config.loader import Config
29 from IPython.config.loader import Config
30 from IPython.core.newapplication import BaseIPythonApplication
30 from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
31 from IPython.utils.importstring import import_item
31 from IPython.utils.importstring import import_item
32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
33
33
34 from IPython.parallel.apps.clusterdir import (
34 from IPython.parallel.apps.clusterdir import (
35 ClusterApplication, ClusterDirError, ClusterDir,
35 BaseParallelApplication,
36 PIDFileError,
36 PIDFileError,
37 base_flags, base_aliases
37 base_flags, base_aliases
38 )
38 )
39
39
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Module level variables
42 # Module level variables
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45
45
46 default_config_file_name = u'ipcluster_config.py'
46 default_config_file_name = u'ipcluster_config.py'
47
47
48
48
49 _description = """Start an IPython cluster for parallel computing.
49 _description = """Start an IPython cluster for parallel computing.
50
50
51 An IPython cluster consists of 1 controller and 1 or more engines.
51 An IPython cluster consists of 1 controller and 1 or more engines.
52 This command automates the startup of these processes using a wide
52 This command automates the startup of these processes using a wide
53 range of startup methods (SSH, local processes, PBS, mpiexec,
53 range of startup methods (SSH, local processes, PBS, mpiexec,
54 Windows HPC Server 2008). To start a cluster with 4 engines on your
54 Windows HPC Server 2008). To start a cluster with 4 engines on your
55 local host simply do 'ipcluster start n=4'. For more complex usage
55 local host simply do 'ipcluster start n=4'. For more complex usage
56 you will typically do 'ipcluster create profile=mycluster', then edit
56 you will typically do 'ipcluster create profile=mycluster', then edit
57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
58 """
58 """
59
59
60
60
61 # Exit codes for ipcluster
61 # Exit codes for ipcluster
62
62
63 # This will be the exit code if the ipcluster appears to be running because
63 # This will be the exit code if the ipcluster appears to be running because
64 # a .pid file exists
64 # a .pid file exists
65 ALREADY_STARTED = 10
65 ALREADY_STARTED = 10
66
66
67
67
68 # This will be the exit code if ipcluster stop is run, but there is not .pid
68 # This will be the exit code if ipcluster stop is run, but there is not .pid
69 # file to be found.
69 # file to be found.
70 ALREADY_STOPPED = 11
70 ALREADY_STOPPED = 11
71
71
72 # This will be the exit code if ipcluster engines is run, but there is not .pid
72 # This will be the exit code if ipcluster engines is run, but there is not .pid
73 # file to be found.
73 # file to be found.
74 NO_CLUSTER = 12
74 NO_CLUSTER = 12
75
75
76
76
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78 # Main application
78 # Main application
79 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
80 start_help = """Start an IPython cluster for parallel computing
80 start_help = """Start an IPython cluster for parallel computing
81
81
82 Start an ipython cluster by its profile name or cluster
82 Start an ipython cluster by its profile name or cluster
83 directory. Cluster directories contain configuration, log and
83 directory. Cluster directories contain configuration, log and
84 security related files and are named using the convention
84 security related files and are named using the convention
85 'cluster_<profile>' and should be creating using the 'start'
85 'cluster_<profile>' and should be creating using the 'start'
86 subcommand of 'ipcluster'. If your cluster directory is in
86 subcommand of 'ipcluster'. If your cluster directory is in
87 the cwd or the ipython directory, you can simply refer to it
87 the cwd or the ipython directory, you can simply refer to it
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
89 otherwise use the 'cluster_dir' option.
89 otherwise use the 'profile_dir' option.
90 """
90 """
91 stop_help = """Stop a running IPython cluster
91 stop_help = """Stop a running IPython cluster
92
92
93 Stop a running ipython cluster by its profile name or cluster
93 Stop a running ipython cluster by its profile name or cluster
94 directory. Cluster directories are named using the convention
94 directory. Cluster directories are named using the convention
95 'cluster_<profile>'. If your cluster directory is in
95 'cluster_<profile>'. If your cluster directory is in
96 the cwd or the ipython directory, you can simply refer to it
96 the cwd or the ipython directory, you can simply refer to it
97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
98 use the 'cluster_dir' option.
98 use the 'profile_dir' option.
99 """
99 """
100 engines_help = """Start engines connected to an existing IPython cluster
100 engines_help = """Start engines connected to an existing IPython cluster
101
101
102 Start one or more engines to connect to an existing Cluster
102 Start one or more engines to connect to an existing Cluster
103 by profile name or cluster directory.
103 by profile name or cluster directory.
104 Cluster directories contain configuration, log and
104 Cluster directories contain configuration, log and
105 security related files and are named using the convention
105 security related files and are named using the convention
106 'cluster_<profile>' and should be creating using the 'start'
106 'cluster_<profile>' and should be creating using the 'start'
107 subcommand of 'ipcluster'. If your cluster directory is in
107 subcommand of 'ipcluster'. If your cluster directory is in
108 the cwd or the ipython directory, you can simply refer to it
108 the cwd or the ipython directory, you can simply refer to it
109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
110 otherwise use the 'cluster_dir' option.
110 otherwise use the 'profile_dir' option.
111 """
111 """
112 create_help = """Create an ipcluster profile by name
112 create_help = """Create an ipcluster profile by name
113
113
114 Create an ipython cluster directory by its profile name or
114 Create an ipython cluster directory by its profile name or
115 cluster directory path. Cluster directories contain
115 cluster directory path. Cluster directories contain
116 configuration, log and security related files and are named
116 configuration, log and security related files and are named
117 using the convention 'cluster_<profile>'. By default they are
117 using the convention 'cluster_<profile>'. By default they are
118 located in your ipython directory. Once created, you will
118 located in your ipython directory. Once created, you will
119 probably need to edit the configuration files in the cluster
119 probably need to edit the configuration files in the cluster
120 directory to configure your cluster. Most users will create a
120 directory to configure your cluster. Most users will create a
121 cluster directory by profile name,
121 cluster directory by profile name,
122 `ipcluster create profile=mycluster`, which will put the directory
122 `ipcluster create profile=mycluster`, which will put the directory
123 in `<ipython_dir>/cluster_mycluster`.
123 in `<ipython_dir>/cluster_mycluster`.
124 """
124 """
125 list_help = """List available cluster profiles
125 list_help = """List available cluster profiles
126
126
127 List all available clusters, by cluster directory, that can
127 List all available clusters, by cluster directory, that can
128 be found in the current working directly or in the ipython
128 be found in the current working directly or in the ipython
129 directory. Cluster directories are named using the convention
129 directory. Cluster directories are named using the convention
130 'cluster_<profile>'.
130 'cluster_<profile>'.
131 """
131 """
132
132
133
133
134 class IPClusterList(BaseIPythonApplication):
134 class IPClusterList(BaseIPythonApplication):
135 name = u'ipcluster-list'
135 name = u'ipcluster-list'
136 description = list_help
136 description = list_help
137
137
138 # empty aliases
138 # empty aliases
139 aliases=Dict()
139 aliases=Dict()
140 flags = Dict(base_flags)
140 flags = Dict(base_flags)
141
141
142 def _log_level_default(self):
142 def _log_level_default(self):
143 return 20
143 return 20
144
144
145 def list_cluster_dirs(self):
145 def list_profile_dirs(self):
146 # Find the search paths
146 # Find the search paths
147 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
147 profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
148 if cluster_dir_paths:
148 if profile_dir_paths:
149 cluster_dir_paths = cluster_dir_paths.split(':')
149 profile_dir_paths = profile_dir_paths.split(':')
150 else:
150 else:
151 cluster_dir_paths = []
151 profile_dir_paths = []
152
152
153 ipython_dir = self.ipython_dir
153 ipython_dir = self.ipython_dir
154
154
155 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
155 paths = [os.getcwd(), ipython_dir] + profile_dir_paths
156 paths = list(set(paths))
156 paths = list(set(paths))
157
157
158 self.log.info('Searching for cluster dirs in paths: %r' % paths)
158 self.log.info('Searching for cluster profiles in paths: %r' % paths)
159 for path in paths:
159 for path in paths:
160 files = os.listdir(path)
160 files = os.listdir(path)
161 for f in files:
161 for f in files:
162 full_path = os.path.join(path, f)
162 full_path = os.path.join(path, f)
163 if os.path.isdir(full_path) and f.startswith('cluster_'):
163 if os.path.isdir(full_path) and f.startswith('profile_') and \
164 profile = full_path.split('_')[-1]
164 os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
165 profile = f.split('_')[-1]
165 start_cmd = 'ipcluster start profile=%s n=4' % profile
166 start_cmd = 'ipcluster start profile=%s n=4' % profile
166 print start_cmd + " ==> " + full_path
167 print start_cmd + " ==> " + full_path
167
168
168 def start(self):
169 def start(self):
169 self.list_cluster_dirs()
170 self.list_profile_dirs()
171
172
173 # `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
170
174
171 create_flags = {}
175 create_flags = {}
172 create_flags.update(base_flags)
176 create_flags.update(base_flags)
173 create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
177 create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
174 "reset config files to defaults", "leave existing config files"))
178 "reset config files to defaults", "leave existing config files"))
175
179
176 class IPClusterCreate(ClusterApplication):
180 class IPClusterCreate(BaseParallelApplication):
177 name = u'ipcluster'
181 name = u'ipcluster-create'
178 description = create_help
182 description = create_help
179 auto_create_cluster_dir = Bool(True,
183 auto_create = Bool(True)
180 help="whether to create the cluster_dir if it doesn't exist")
181 config_file_name = Unicode(default_config_file_name)
184 config_file_name = Unicode(default_config_file_name)
182
185
183 reset = Bool(False, config=True,
184 help="Whether to reset config files as part of 'create'."
185 )
186
187 flags = Dict(create_flags)
186 flags = Dict(create_flags)
188
187
189 aliases = Dict(dict(profile='ClusterDir.profile'))
188 aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
190
189
191 classes = [ClusterDir]
190 classes = [ProfileDir]
192
191
193 def init_clusterdir(self):
194 super(IPClusterCreate, self).init_clusterdir()
195 self.log.info('Copying default config files to cluster directory '
196 '[overwrite=%r]' % (self.reset,))
197 self.cluster_dir.copy_all_config_files(overwrite=self.reset)
198
199 def initialize(self, argv=None):
200 self.parse_command_line(argv)
201 self.init_clusterdir()
202
192
203 stop_aliases = dict(
193 stop_aliases = dict(
204 signal='IPClusterStop.signal',
194 signal='IPClusterStop.signal',
205 profile='ClusterDir.profile',
195 profile='BaseIPythonApplication.profile',
206 cluster_dir='ClusterDir.location',
196 profile_dir='ProfileDir.location',
207 )
197 )
208
198
209 class IPClusterStop(ClusterApplication):
199 class IPClusterStop(BaseParallelApplication):
210 name = u'ipcluster'
200 name = u'ipcluster'
211 description = stop_help
201 description = stop_help
212 auto_create_cluster_dir = Bool(False)
213 config_file_name = Unicode(default_config_file_name)
202 config_file_name = Unicode(default_config_file_name)
214
203
215 signal = Int(signal.SIGINT, config=True,
204 signal = Int(signal.SIGINT, config=True,
216 help="signal to use for stopping processes.")
205 help="signal to use for stopping processes.")
217
206
218 aliases = Dict(stop_aliases)
207 aliases = Dict(stop_aliases)
219
208
220 def init_clusterdir(self):
221 try:
222 super(IPClusterStop, self).init_clusterdir()
223 except ClusterDirError as e:
224 self.log.fatal("Failed ClusterDir init: %s"%e)
225 self.exit(1)
226
227 def start(self):
209 def start(self):
228 """Start the app for the stop subcommand."""
210 """Start the app for the stop subcommand."""
229 try:
211 try:
230 pid = self.get_pid_from_file()
212 pid = self.get_pid_from_file()
231 except PIDFileError:
213 except PIDFileError:
232 self.log.critical(
214 self.log.critical(
233 'Could not read pid file, cluster is probably not running.'
215 'Could not read pid file, cluster is probably not running.'
234 )
216 )
235 # Here I exit with a unusual exit status that other processes
217 # Here I exit with a unusual exit status that other processes
236 # can watch for to learn how I existed.
218 # can watch for to learn how I existed.
237 self.remove_pid_file()
219 self.remove_pid_file()
238 self.exit(ALREADY_STOPPED)
220 self.exit(ALREADY_STOPPED)
239
221
240 if not self.check_pid(pid):
222 if not self.check_pid(pid):
241 self.log.critical(
223 self.log.critical(
242 'Cluster [pid=%r] is not running.' % pid
224 'Cluster [pid=%r] is not running.' % pid
243 )
225 )
244 self.remove_pid_file()
226 self.remove_pid_file()
245 # Here I exit with a unusual exit status that other processes
227 # Here I exit with a unusual exit status that other processes
246 # can watch for to learn how I existed.
228 # can watch for to learn how I existed.
247 self.exit(ALREADY_STOPPED)
229 self.exit(ALREADY_STOPPED)
248
230
249 elif os.name=='posix':
231 elif os.name=='posix':
250 sig = self.signal
232 sig = self.signal
251 self.log.info(
233 self.log.info(
252 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
234 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
253 )
235 )
254 try:
236 try:
255 os.kill(pid, sig)
237 os.kill(pid, sig)
256 except OSError:
238 except OSError:
257 self.log.error("Stopping cluster failed, assuming already dead.",
239 self.log.error("Stopping cluster failed, assuming already dead.",
258 exc_info=True)
240 exc_info=True)
259 self.remove_pid_file()
241 self.remove_pid_file()
260 elif os.name=='nt':
242 elif os.name=='nt':
261 try:
243 try:
262 # kill the whole tree
244 # kill the whole tree
263 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
245 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
264 except (CalledProcessError, OSError):
246 except (CalledProcessError, OSError):
265 self.log.error("Stopping cluster failed, assuming already dead.",
247 self.log.error("Stopping cluster failed, assuming already dead.",
266 exc_info=True)
248 exc_info=True)
267 self.remove_pid_file()
249 self.remove_pid_file()
268
250
269 engine_aliases = {}
251 engine_aliases = {}
270 engine_aliases.update(base_aliases)
252 engine_aliases.update(base_aliases)
271 engine_aliases.update(dict(
253 engine_aliases.update(dict(
272 n='IPClusterEngines.n',
254 n='IPClusterEngines.n',
273 elauncher = 'IPClusterEngines.engine_launcher_class',
255 elauncher = 'IPClusterEngines.engine_launcher_class',
274 ))
256 ))
275 class IPClusterEngines(ClusterApplication):
257 class IPClusterEngines(BaseParallelApplication):
276
258
277 name = u'ipcluster'
259 name = u'ipcluster'
278 description = engines_help
260 description = engines_help
279 usage = None
261 usage = None
280 config_file_name = Unicode(default_config_file_name)
262 config_file_name = Unicode(default_config_file_name)
281 default_log_level = logging.INFO
263 default_log_level = logging.INFO
282 auto_create_cluster_dir = Bool(False)
283 classes = List()
264 classes = List()
284 def _classes_default(self):
265 def _classes_default(self):
285 from IPython.parallel.apps import launcher
266 from IPython.parallel.apps import launcher
286 launchers = launcher.all_launchers
267 launchers = launcher.all_launchers
287 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
268 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
288 return [ClusterDir]+eslaunchers
269 return [ProfileDir]+eslaunchers
289
270
290 n = Int(2, config=True,
271 n = Int(2, config=True,
291 help="The number of engines to start.")
272 help="The number of engines to start.")
292
273
293 engine_launcher_class = Unicode('LocalEngineSetLauncher',
274 engine_launcher_class = Unicode('LocalEngineSetLauncher',
294 config=True,
275 config=True,
295 help="The class for launching a set of Engines."
276 help="The class for launching a set of Engines."
296 )
277 )
297 daemonize = Bool(False, config=True,
278 daemonize = Bool(False, config=True,
298 help='Daemonize the ipcluster program. This implies --log-to-file')
279 help='Daemonize the ipcluster program. This implies --log-to-file')
299
280
300 def _daemonize_changed(self, name, old, new):
281 def _daemonize_changed(self, name, old, new):
301 if new:
282 if new:
302 self.log_to_file = True
283 self.log_to_file = True
303
284
304 aliases = Dict(engine_aliases)
285 aliases = Dict(engine_aliases)
305 # flags = Dict(flags)
286 # flags = Dict(flags)
306 _stopping = False
287 _stopping = False
307
288
308 def initialize(self, argv=None):
289 def initialize(self, argv=None):
309 super(IPClusterEngines, self).initialize(argv)
290 super(IPClusterEngines, self).initialize(argv)
310 self.init_signal()
291 self.init_signal()
311 self.init_launchers()
292 self.init_launchers()
312
293
313 def init_launchers(self):
294 def init_launchers(self):
314 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
295 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
315 self.engine_launcher.on_stop(lambda r: self.loop.stop())
296 self.engine_launcher.on_stop(lambda r: self.loop.stop())
316
297
317 def init_signal(self):
298 def init_signal(self):
318 # Setup signals
299 # Setup signals
319 signal.signal(signal.SIGINT, self.sigint_handler)
300 signal.signal(signal.SIGINT, self.sigint_handler)
320
301
321 def build_launcher(self, clsname):
302 def build_launcher(self, clsname):
322 """import and instantiate a Launcher based on importstring"""
303 """import and instantiate a Launcher based on importstring"""
323 if '.' not in clsname:
304 if '.' not in clsname:
324 # not a module, presume it's the raw name in apps.launcher
305 # not a module, presume it's the raw name in apps.launcher
325 clsname = 'IPython.parallel.apps.launcher.'+clsname
306 clsname = 'IPython.parallel.apps.launcher.'+clsname
326 # print repr(clsname)
307 # print repr(clsname)
327 klass = import_item(clsname)
308 klass = import_item(clsname)
328
309
329 launcher = klass(
310 launcher = klass(
330 work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
311 work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
331 )
312 )
332 return launcher
313 return launcher
333
314
334 def start_engines(self):
315 def start_engines(self):
335 self.log.info("Starting %i engines"%self.n)
316 self.log.info("Starting %i engines"%self.n)
336 self.engine_launcher.start(
317 self.engine_launcher.start(
337 self.n,
318 self.n,
338 cluster_dir=self.cluster_dir.location
319 profile_dir=self.profile_dir.location
339 )
320 )
340
321
341 def stop_engines(self):
322 def stop_engines(self):
342 self.log.info("Stopping Engines...")
323 self.log.info("Stopping Engines...")
343 if self.engine_launcher.running:
324 if self.engine_launcher.running:
344 d = self.engine_launcher.stop()
325 d = self.engine_launcher.stop()
345 return d
326 return d
346 else:
327 else:
347 return None
328 return None
348
329
349 def stop_launchers(self, r=None):
330 def stop_launchers(self, r=None):
350 if not self._stopping:
331 if not self._stopping:
351 self._stopping = True
332 self._stopping = True
352 self.log.error("IPython cluster: stopping")
333 self.log.error("IPython cluster: stopping")
353 self.stop_engines()
334 self.stop_engines()
354 # Wait a few seconds to let things shut down.
335 # Wait a few seconds to let things shut down.
355 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
336 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
356 dc.start()
337 dc.start()
357
338
358 def sigint_handler(self, signum, frame):
339 def sigint_handler(self, signum, frame):
359 self.log.debug("SIGINT received, stopping launchers...")
340 self.log.debug("SIGINT received, stopping launchers...")
360 self.stop_launchers()
341 self.stop_launchers()
361
342
362 def start_logging(self):
343 def start_logging(self):
363 # Remove old log files of the controller and engine
344 # Remove old log files of the controller and engine
364 if self.clean_logs:
345 if self.clean_logs:
365 log_dir = self.cluster_dir.log_dir
346 log_dir = self.profile_dir.log_dir
366 for f in os.listdir(log_dir):
347 for f in os.listdir(log_dir):
367 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
348 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
368 os.remove(os.path.join(log_dir, f))
349 os.remove(os.path.join(log_dir, f))
369 # This will remove old log files for ipcluster itself
350 # This will remove old log files for ipcluster itself
370 # super(IPClusterApp, self).start_logging()
351 # super(IPBaseParallelApplication, self).start_logging()
371
352
372 def start(self):
353 def start(self):
373 """Start the app for the engines subcommand."""
354 """Start the app for the engines subcommand."""
374 self.log.info("IPython cluster: started")
355 self.log.info("IPython cluster: started")
375 # First see if the cluster is already running
356 # First see if the cluster is already running
376
357
377 # Now log and daemonize
358 # Now log and daemonize
378 self.log.info(
359 self.log.info(
379 'Starting engines with [daemon=%r]' % self.daemonize
360 'Starting engines with [daemon=%r]' % self.daemonize
380 )
361 )
381 # TODO: Get daemonize working on Windows or as a Windows Server.
362 # TODO: Get daemonize working on Windows or as a Windows Server.
382 if self.daemonize:
363 if self.daemonize:
383 if os.name=='posix':
364 if os.name=='posix':
384 from twisted.scripts._twistd_unix import daemonize
365 from twisted.scripts._twistd_unix import daemonize
385 daemonize()
366 daemonize()
386
367
387 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
368 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
388 dc.start()
369 dc.start()
389 # Now write the new pid file AFTER our new forked pid is active.
370 # Now write the new pid file AFTER our new forked pid is active.
390 # self.write_pid_file()
371 # self.write_pid_file()
391 try:
372 try:
392 self.loop.start()
373 self.loop.start()
393 except KeyboardInterrupt:
374 except KeyboardInterrupt:
394 pass
375 pass
395 except zmq.ZMQError as e:
376 except zmq.ZMQError as e:
396 if e.errno == errno.EINTR:
377 if e.errno == errno.EINTR:
397 pass
378 pass
398 else:
379 else:
399 raise
380 raise
400
381
401 start_aliases = {}
382 start_aliases = {}
402 start_aliases.update(engine_aliases)
383 start_aliases.update(engine_aliases)
403 start_aliases.update(dict(
384 start_aliases.update(dict(
404 delay='IPClusterStart.delay',
385 delay='IPClusterStart.delay',
405 clean_logs='IPClusterStart.clean_logs',
386 clean_logs='IPClusterStart.clean_logs',
406 ))
387 ))
407
388
408 class IPClusterStart(IPClusterEngines):
389 class IPClusterStart(IPClusterEngines):
409
390
410 name = u'ipcluster'
391 name = u'ipcluster'
411 description = start_help
392 description = start_help
412 default_log_level = logging.INFO
393 default_log_level = logging.INFO
413 auto_create_cluster_dir = Bool(True, config=True,
394 auto_create = Bool(True, config=True,
414 help="whether to create the cluster_dir if it doesn't exist")
395 help="whether to create the profile_dir if it doesn't exist")
415 classes = List()
396 classes = List()
416 def _classes_default(self,):
397 def _classes_default(self,):
417 from IPython.parallel.apps import launcher
398 from IPython.parallel.apps import launcher
418 return [ClusterDir]+launcher.all_launchers
399 return [ProfileDir]+launcher.all_launchers
419
400
420 clean_logs = Bool(True, config=True,
401 clean_logs = Bool(True, config=True,
421 help="whether to cleanup old logs before starting")
402 help="whether to cleanup old logs before starting")
422
403
423 delay = CFloat(1., config=True,
404 delay = CFloat(1., config=True,
424 help="delay (in s) between starting the controller and the engines")
405 help="delay (in s) between starting the controller and the engines")
425
406
426 controller_launcher_class = Unicode('LocalControllerLauncher',
407 controller_launcher_class = Unicode('LocalControllerLauncher',
427 config=True,
408 config=True,
428 help="The class for launching a Controller."
409 help="The class for launching a Controller."
429 )
410 )
430 reset = Bool(False, config=True,
411 reset = Bool(False, config=True,
431 help="Whether to reset config files as part of '--create'."
412 help="Whether to reset config files as part of '--create'."
432 )
413 )
433
414
434 # flags = Dict(flags)
415 # flags = Dict(flags)
435 aliases = Dict(start_aliases)
416 aliases = Dict(start_aliases)
436
417
437 def init_launchers(self):
418 def init_launchers(self):
438 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
419 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
439 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
420 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
440 self.controller_launcher.on_stop(self.stop_launchers)
421 self.controller_launcher.on_stop(self.stop_launchers)
441
422
442 def start_controller(self):
423 def start_controller(self):
443 self.controller_launcher.start(
424 self.controller_launcher.start(
444 cluster_dir=self.cluster_dir.location
425 profile_dir=self.profile_dir.location
445 )
426 )
446
427
447 def stop_controller(self):
428 def stop_controller(self):
448 # self.log.info("In stop_controller")
429 # self.log.info("In stop_controller")
449 if self.controller_launcher and self.controller_launcher.running:
430 if self.controller_launcher and self.controller_launcher.running:
450 return self.controller_launcher.stop()
431 return self.controller_launcher.stop()
451
432
452 def stop_launchers(self, r=None):
433 def stop_launchers(self, r=None):
453 if not self._stopping:
434 if not self._stopping:
454 self.stop_controller()
435 self.stop_controller()
455 super(IPClusterStart, self).stop_launchers()
436 super(IPClusterStart, self).stop_launchers()
456
437
457 def start(self):
438 def start(self):
458 """Start the app for the start subcommand."""
439 """Start the app for the start subcommand."""
459 # First see if the cluster is already running
440 # First see if the cluster is already running
460 try:
441 try:
461 pid = self.get_pid_from_file()
442 pid = self.get_pid_from_file()
462 except PIDFileError:
443 except PIDFileError:
463 pass
444 pass
464 else:
445 else:
465 if self.check_pid(pid):
446 if self.check_pid(pid):
466 self.log.critical(
447 self.log.critical(
467 'Cluster is already running with [pid=%s]. '
448 'Cluster is already running with [pid=%s]. '
468 'use "ipcluster stop" to stop the cluster.' % pid
449 'use "ipcluster stop" to stop the cluster.' % pid
469 )
450 )
470 # Here I exit with a unusual exit status that other processes
451 # Here I exit with a unusual exit status that other processes
471 # can watch for to learn how I existed.
452 # can watch for to learn how I existed.
472 self.exit(ALREADY_STARTED)
453 self.exit(ALREADY_STARTED)
473 else:
454 else:
474 self.remove_pid_file()
455 self.remove_pid_file()
475
456
476
457
477 # Now log and daemonize
458 # Now log and daemonize
478 self.log.info(
459 self.log.info(
479 'Starting ipcluster with [daemon=%r]' % self.daemonize
460 'Starting ipcluster with [daemon=%r]' % self.daemonize
480 )
461 )
481 # TODO: Get daemonize working on Windows or as a Windows Server.
462 # TODO: Get daemonize working on Windows or as a Windows Server.
482 if self.daemonize:
463 if self.daemonize:
483 if os.name=='posix':
464 if os.name=='posix':
484 from twisted.scripts._twistd_unix import daemonize
465 from twisted.scripts._twistd_unix import daemonize
485 daemonize()
466 daemonize()
486
467
487 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
468 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
488 dc.start()
469 dc.start()
489 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
470 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
490 dc.start()
471 dc.start()
491 # Now write the new pid file AFTER our new forked pid is active.
472 # Now write the new pid file AFTER our new forked pid is active.
492 self.write_pid_file()
473 self.write_pid_file()
493 try:
474 try:
494 self.loop.start()
475 self.loop.start()
495 except KeyboardInterrupt:
476 except KeyboardInterrupt:
496 pass
477 pass
497 except zmq.ZMQError as e:
478 except zmq.ZMQError as e:
498 if e.errno == errno.EINTR:
479 if e.errno == errno.EINTR:
499 pass
480 pass
500 else:
481 else:
501 raise
482 raise
502 finally:
483 finally:
503 self.remove_pid_file()
484 self.remove_pid_file()
504
485
505 base='IPython.parallel.apps.ipclusterapp.IPCluster'
486 base='IPython.parallel.apps.ipclusterapp.IPCluster'
506
487
507 class IPClusterApp(Application):
488 class IPBaseParallelApplication(Application):
508 name = u'ipcluster'
489 name = u'ipcluster'
509 description = _description
490 description = _description
510
491
511 subcommands = {'create' : (base+'Create', create_help),
492 subcommands = {'create' : (base+'Create', create_help),
512 'list' : (base+'List', list_help),
493 'list' : (base+'List', list_help),
513 'start' : (base+'Start', start_help),
494 'start' : (base+'Start', start_help),
514 'stop' : (base+'Stop', stop_help),
495 'stop' : (base+'Stop', stop_help),
515 'engines' : (base+'Engines', engines_help),
496 'engines' : (base+'Engines', engines_help),
516 }
497 }
517
498
518 # no aliases or flags for parent App
499 # no aliases or flags for parent App
519 aliases = Dict()
500 aliases = Dict()
520 flags = Dict()
501 flags = Dict()
521
502
522 def start(self):
503 def start(self):
523 if self.subapp is None:
504 if self.subapp is None:
524 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
505 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
525 print
506 print
526 self.print_subcommands()
507 self.print_subcommands()
527 self.exit(1)
508 self.exit(1)
528 else:
509 else:
529 return self.subapp.start()
510 return self.subapp.start()
530
511
531 def launch_new_instance():
512 def launch_new_instance():
532 """Create and run the IPython cluster."""
513 """Create and run the IPython cluster."""
533 app = IPClusterApp()
514 app = IPBaseParallelApplication()
534 app.initialize()
515 app.initialize()
535 app.start()
516 app.start()
536
517
537
518
538 if __name__ == '__main__':
519 if __name__ == '__main__':
539 launch_new_instance()
520 launch_new_instance()
540
521
@@ -1,404 +1,399 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import copy
21 import os
20 import os
22 import logging
23 import socket
21 import socket
24 import stat
22 import stat
25 import sys
23 import sys
26 import uuid
24 import uuid
27
25
28 from multiprocessing import Process
26 from multiprocessing import Process
29
27
30 import zmq
28 import zmq
31 from zmq.devices import ProcessMonitoredQueue
29 from zmq.devices import ProcessMonitoredQueue
32 from zmq.log.handlers import PUBHandler
30 from zmq.log.handlers import PUBHandler
33 from zmq.utils import jsonapi as json
31 from zmq.utils import jsonapi as json
34
32
35 from IPython.config.loader import Config
33 from IPython.config.loader import Config
36
34 from IPython.core.newapplication import ProfileDir
37 from IPython.parallel import factory
38
35
39 from IPython.parallel.apps.clusterdir import (
36 from IPython.parallel.apps.clusterdir import (
40 ClusterDir,
37 BaseParallelApplication,
41 ClusterApplication,
42 base_flags
38 base_flags
43 # ClusterDirConfigLoader
44 )
39 )
45 from IPython.utils.importstring import import_item
40 from IPython.utils.importstring import import_item
46 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
41 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
47
42
48 # from IPython.parallel.controller.controller import ControllerFactory
43 # from IPython.parallel.controller.controller import ControllerFactory
49 from IPython.parallel.streamsession import StreamSession
44 from IPython.parallel.streamsession import StreamSession
50 from IPython.parallel.controller.heartmonitor import HeartMonitor
45 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 from IPython.parallel.controller.hub import Hub, HubFactory
46 from IPython.parallel.controller.hub import HubFactory
52 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
47 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 from IPython.parallel.controller.sqlitedb import SQLiteDB
48 from IPython.parallel.controller.sqlitedb import SQLiteDB
54
49
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
50 from IPython.parallel.util import signal_children, split_url
56
51
57 # conditional import of MongoDB backend class
52 # conditional import of MongoDB backend class
58
53
59 try:
54 try:
60 from IPython.parallel.controller.mongodb import MongoDB
55 from IPython.parallel.controller.mongodb import MongoDB
61 except ImportError:
56 except ImportError:
62 maybe_mongo = []
57 maybe_mongo = []
63 else:
58 else:
64 maybe_mongo = [MongoDB]
59 maybe_mongo = [MongoDB]
65
60
66
61
67 #-----------------------------------------------------------------------------
62 #-----------------------------------------------------------------------------
68 # Module level variables
63 # Module level variables
69 #-----------------------------------------------------------------------------
64 #-----------------------------------------------------------------------------
70
65
71
66
72 #: The default config file name for this application
67 #: The default config file name for this application
73 default_config_file_name = u'ipcontroller_config.py'
68 default_config_file_name = u'ipcontroller_config.py'
74
69
75
70
76 _description = """Start the IPython controller for parallel computing.
71 _description = """Start the IPython controller for parallel computing.
77
72
78 The IPython controller provides a gateway between the IPython engines and
73 The IPython controller provides a gateway between the IPython engines and
79 clients. The controller needs to be started before the engines and can be
74 clients. The controller needs to be started before the engines and can be
80 configured using command line options or using a cluster directory. Cluster
75 configured using command line options or using a cluster directory. Cluster
81 directories contain config, log and security files and are usually located in
76 directories contain config, log and security files and are usually located in
82 your ipython directory and named as "cluster_<profile>". See the `profile`
77 your ipython directory and named as "cluster_<profile>". See the `profile`
83 and `cluster_dir` options for details.
78 and `profile_dir` options for details.
84 """
79 """
85
80
86
81
87
82
88
83
89 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
90 # The main application
85 # The main application
91 #-----------------------------------------------------------------------------
86 #-----------------------------------------------------------------------------
92 flags = {}
87 flags = {}
93 flags.update(base_flags)
88 flags.update(base_flags)
94 flags.update({
89 flags.update({
95 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
90 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
96 'Use threads instead of processes for the schedulers'),
91 'Use threads instead of processes for the schedulers'),
97 'sqlitedb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'})},
92 'sqlitedb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'})},
98 'use the SQLiteDB backend'),
93 'use the SQLiteDB backend'),
99 'mongodb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'})},
94 'mongodb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'})},
100 'use the MongoDB backend'),
95 'use the MongoDB backend'),
101 'dictdb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.dictdb.DictDB'})},
96 'dictdb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.dictdb.DictDB'})},
102 'use the in-memory DictDB backend'),
97 'use the in-memory DictDB backend'),
103 'reuse' : ({'IPControllerApp' : Config({'reuse_files' : True})},
98 'reuse' : ({'IPControllerApp' : Config({'reuse_files' : True})},
104 'reuse existing json connection files')
99 'reuse existing json connection files')
105 })
100 })
106
101
107 flags.update()
102 flags.update()
108
103
109 class IPControllerApp(ClusterApplication):
104 class IPControllerApp(BaseParallelApplication):
110
105
111 name = u'ipcontroller'
106 name = u'ipcontroller'
112 description = _description
107 description = _description
113 config_file_name = Unicode(default_config_file_name)
108 config_file_name = Unicode(default_config_file_name)
114 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
109 classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
110
111 # change default to True
112 auto_create = Bool(True, config=True,
113 help="""Whether to create profile dir if it doesn't exist""")
115
114
116 auto_create_cluster_dir = Bool(True, config=True,
117 help="Whether to create cluster_dir if it exists.")
118 reuse_files = Bool(False, config=True,
115 reuse_files = Bool(False, config=True,
119 help='Whether to reuse existing json connection files [default: False]'
116 help='Whether to reuse existing json connection files [default: False]'
120 )
117 )
121 secure = Bool(True, config=True,
118 secure = Bool(True, config=True,
122 help='Whether to use exec_keys for extra authentication [default: True]'
119 help='Whether to use exec_keys for extra authentication [default: True]'
123 )
120 )
124 ssh_server = Unicode(u'', config=True,
121 ssh_server = Unicode(u'', config=True,
125 help="""ssh url for clients to use when connecting to the Controller
122 help="""ssh url for clients to use when connecting to the Controller
126 processes. It should be of the form: [user@]server[:port]. The
123 processes. It should be of the form: [user@]server[:port]. The
127 Controller\'s listening addresses must be accessible from the ssh server""",
124 Controller\'s listening addresses must be accessible from the ssh server""",
128 )
125 )
129 location = Unicode(u'', config=True,
126 location = Unicode(u'', config=True,
130 help="""The external IP or domain name of the Controller, used for disambiguating
127 help="""The external IP or domain name of the Controller, used for disambiguating
131 engine and client connections.""",
128 engine and client connections.""",
132 )
129 )
133 import_statements = List([], config=True,
130 import_statements = List([], config=True,
134 help="import statements to be run at startup. Necessary in some environments"
131 help="import statements to be run at startup. Necessary in some environments"
135 )
132 )
136
133
137 use_threads = Bool(False, config=True,
134 use_threads = Bool(False, config=True,
138 help='Use threads instead of processes for the schedulers',
135 help='Use threads instead of processes for the schedulers',
139 )
136 )
140
137
141 # internal
138 # internal
142 children = List()
139 children = List()
143 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
140 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
144
141
145 def _use_threads_changed(self, name, old, new):
142 def _use_threads_changed(self, name, old, new):
146 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
143 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
147
144
148 aliases = Dict(dict(
145 aliases = Dict(dict(
149 config = 'IPControllerApp.config_file',
150 # file = 'IPControllerApp.url_file',
151 log_level = 'IPControllerApp.log_level',
146 log_level = 'IPControllerApp.log_level',
152 log_url = 'IPControllerApp.log_url',
147 log_url = 'IPControllerApp.log_url',
153 reuse_files = 'IPControllerApp.reuse_files',
148 reuse_files = 'IPControllerApp.reuse_files',
154 secure = 'IPControllerApp.secure',
149 secure = 'IPControllerApp.secure',
155 ssh = 'IPControllerApp.ssh_server',
150 ssh = 'IPControllerApp.ssh_server',
156 use_threads = 'IPControllerApp.use_threads',
151 use_threads = 'IPControllerApp.use_threads',
157 import_statements = 'IPControllerApp.import_statements',
152 import_statements = 'IPControllerApp.import_statements',
158 location = 'IPControllerApp.location',
153 location = 'IPControllerApp.location',
159
154
160 ident = 'StreamSession.session',
155 ident = 'StreamSession.session',
161 user = 'StreamSession.username',
156 user = 'StreamSession.username',
162 exec_key = 'StreamSession.keyfile',
157 exec_key = 'StreamSession.keyfile',
163
158
164 url = 'HubFactory.url',
159 url = 'HubFactory.url',
165 ip = 'HubFactory.ip',
160 ip = 'HubFactory.ip',
166 transport = 'HubFactory.transport',
161 transport = 'HubFactory.transport',
167 port = 'HubFactory.regport',
162 port = 'HubFactory.regport',
168
163
169 ping = 'HeartMonitor.period',
164 ping = 'HeartMonitor.period',
170
165
171 scheme = 'TaskScheduler.scheme_name',
166 scheme = 'TaskScheduler.scheme_name',
172 hwm = 'TaskScheduler.hwm',
167 hwm = 'TaskScheduler.hwm',
173
168
174
169
175 profile = "ClusterDir.profile",
170 profile = "BaseIPythonApplication.profile",
176 cluster_dir = 'ClusterDir.location',
171 profile_dir = 'ProfileDir.location',
177
172
178 ))
173 ))
179 flags = Dict(flags)
174 flags = Dict(flags)
180
175
181
176
182 def save_connection_dict(self, fname, cdict):
177 def save_connection_dict(self, fname, cdict):
183 """save a connection dict to json file."""
178 """save a connection dict to json file."""
184 c = self.config
179 c = self.config
185 url = cdict['url']
180 url = cdict['url']
186 location = cdict['location']
181 location = cdict['location']
187 if not location:
182 if not location:
188 try:
183 try:
189 proto,ip,port = split_url(url)
184 proto,ip,port = split_url(url)
190 except AssertionError:
185 except AssertionError:
191 pass
186 pass
192 else:
187 else:
193 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
188 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
194 cdict['location'] = location
189 cdict['location'] = location
195 fname = os.path.join(self.cluster_dir.security_dir, fname)
190 fname = os.path.join(self.profile_dir.security_dir, fname)
196 with open(fname, 'w') as f:
191 with open(fname, 'w') as f:
197 f.write(json.dumps(cdict, indent=2))
192 f.write(json.dumps(cdict, indent=2))
198 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
193 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
199
194
200 def load_config_from_json(self):
195 def load_config_from_json(self):
201 """load config from existing json connector files."""
196 """load config from existing json connector files."""
202 c = self.config
197 c = self.config
203 # load from engine config
198 # load from engine config
204 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
199 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
205 cfg = json.loads(f.read())
200 cfg = json.loads(f.read())
206 key = c.StreamSession.key = cfg['exec_key']
201 key = c.StreamSession.key = cfg['exec_key']
207 xport,addr = cfg['url'].split('://')
202 xport,addr = cfg['url'].split('://')
208 c.HubFactory.engine_transport = xport
203 c.HubFactory.engine_transport = xport
209 ip,ports = addr.split(':')
204 ip,ports = addr.split(':')
210 c.HubFactory.engine_ip = ip
205 c.HubFactory.engine_ip = ip
211 c.HubFactory.regport = int(ports)
206 c.HubFactory.regport = int(ports)
212 self.location = cfg['location']
207 self.location = cfg['location']
213
208
214 # load client config
209 # load client config
215 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
210 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
216 cfg = json.loads(f.read())
211 cfg = json.loads(f.read())
217 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
212 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
218 xport,addr = cfg['url'].split('://')
213 xport,addr = cfg['url'].split('://')
219 c.HubFactory.client_transport = xport
214 c.HubFactory.client_transport = xport
220 ip,ports = addr.split(':')
215 ip,ports = addr.split(':')
221 c.HubFactory.client_ip = ip
216 c.HubFactory.client_ip = ip
222 self.ssh_server = cfg['ssh']
217 self.ssh_server = cfg['ssh']
223 assert int(ports) == c.HubFactory.regport, "regport mismatch"
218 assert int(ports) == c.HubFactory.regport, "regport mismatch"
224
219
225 def init_hub(self):
220 def init_hub(self):
226 c = self.config
221 c = self.config
227
222
228 self.do_import_statements()
223 self.do_import_statements()
229 reusing = self.reuse_files
224 reusing = self.reuse_files
230 if reusing:
225 if reusing:
231 try:
226 try:
232 self.load_config_from_json()
227 self.load_config_from_json()
233 except (AssertionError,IOError):
228 except (AssertionError,IOError):
234 reusing=False
229 reusing=False
235 # check again, because reusing may have failed:
230 # check again, because reusing may have failed:
236 if reusing:
231 if reusing:
237 pass
232 pass
238 elif self.secure:
233 elif self.secure:
239 key = str(uuid.uuid4())
234 key = str(uuid.uuid4())
240 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
235 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
241 # with open(keyfile, 'w') as f:
236 # with open(keyfile, 'w') as f:
242 # f.write(key)
237 # f.write(key)
243 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
238 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
244 c.StreamSession.key = key
239 c.StreamSession.key = key
245 else:
240 else:
246 key = c.StreamSession.key = ''
241 key = c.StreamSession.key = ''
247
242
248 try:
243 try:
249 self.factory = HubFactory(config=c, log=self.log)
244 self.factory = HubFactory(config=c, log=self.log)
250 # self.start_logging()
245 # self.start_logging()
251 self.factory.init_hub()
246 self.factory.init_hub()
252 except:
247 except:
253 self.log.error("Couldn't construct the Controller", exc_info=True)
248 self.log.error("Couldn't construct the Controller", exc_info=True)
254 self.exit(1)
249 self.exit(1)
255
250
256 if not reusing:
251 if not reusing:
257 # save to new json config files
252 # save to new json config files
258 f = self.factory
253 f = self.factory
259 cdict = {'exec_key' : key,
254 cdict = {'exec_key' : key,
260 'ssh' : self.ssh_server,
255 'ssh' : self.ssh_server,
261 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
256 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
262 'location' : self.location
257 'location' : self.location
263 }
258 }
264 self.save_connection_dict('ipcontroller-client.json', cdict)
259 self.save_connection_dict('ipcontroller-client.json', cdict)
265 edict = cdict
260 edict = cdict
266 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
261 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
267 self.save_connection_dict('ipcontroller-engine.json', edict)
262 self.save_connection_dict('ipcontroller-engine.json', edict)
268
263
269 #
264 #
270 def init_schedulers(self):
265 def init_schedulers(self):
271 children = self.children
266 children = self.children
272 mq = import_item(str(self.mq_class))
267 mq = import_item(str(self.mq_class))
273
268
274 hub = self.factory
269 hub = self.factory
275 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
270 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
276 # IOPub relay (in a Process)
271 # IOPub relay (in a Process)
277 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
272 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
278 q.bind_in(hub.client_info['iopub'])
273 q.bind_in(hub.client_info['iopub'])
279 q.bind_out(hub.engine_info['iopub'])
274 q.bind_out(hub.engine_info['iopub'])
280 q.setsockopt_out(zmq.SUBSCRIBE, '')
275 q.setsockopt_out(zmq.SUBSCRIBE, '')
281 q.connect_mon(hub.monitor_url)
276 q.connect_mon(hub.monitor_url)
282 q.daemon=True
277 q.daemon=True
283 children.append(q)
278 children.append(q)
284
279
285 # Multiplexer Queue (in a Process)
280 # Multiplexer Queue (in a Process)
286 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
281 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
287 q.bind_in(hub.client_info['mux'])
282 q.bind_in(hub.client_info['mux'])
288 q.setsockopt_in(zmq.IDENTITY, 'mux')
283 q.setsockopt_in(zmq.IDENTITY, 'mux')
289 q.bind_out(hub.engine_info['mux'])
284 q.bind_out(hub.engine_info['mux'])
290 q.connect_mon(hub.monitor_url)
285 q.connect_mon(hub.monitor_url)
291 q.daemon=True
286 q.daemon=True
292 children.append(q)
287 children.append(q)
293
288
294 # Control Queue (in a Process)
289 # Control Queue (in a Process)
295 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
290 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
296 q.bind_in(hub.client_info['control'])
291 q.bind_in(hub.client_info['control'])
297 q.setsockopt_in(zmq.IDENTITY, 'control')
292 q.setsockopt_in(zmq.IDENTITY, 'control')
298 q.bind_out(hub.engine_info['control'])
293 q.bind_out(hub.engine_info['control'])
299 q.connect_mon(hub.monitor_url)
294 q.connect_mon(hub.monitor_url)
300 q.daemon=True
295 q.daemon=True
301 children.append(q)
296 children.append(q)
302 try:
297 try:
303 scheme = self.config.TaskScheduler.scheme_name
298 scheme = self.config.TaskScheduler.scheme_name
304 except AttributeError:
299 except AttributeError:
305 scheme = TaskScheduler.scheme_name.get_default_value()
300 scheme = TaskScheduler.scheme_name.get_default_value()
306 # Task Queue (in a Process)
301 # Task Queue (in a Process)
307 if scheme == 'pure':
302 if scheme == 'pure':
308 self.log.warn("task::using pure XREQ Task scheduler")
303 self.log.warn("task::using pure XREQ Task scheduler")
309 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
304 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
310 # q.setsockopt_out(zmq.HWM, hub.hwm)
305 # q.setsockopt_out(zmq.HWM, hub.hwm)
311 q.bind_in(hub.client_info['task'][1])
306 q.bind_in(hub.client_info['task'][1])
312 q.setsockopt_in(zmq.IDENTITY, 'task')
307 q.setsockopt_in(zmq.IDENTITY, 'task')
313 q.bind_out(hub.engine_info['task'])
308 q.bind_out(hub.engine_info['task'])
314 q.connect_mon(hub.monitor_url)
309 q.connect_mon(hub.monitor_url)
315 q.daemon=True
310 q.daemon=True
316 children.append(q)
311 children.append(q)
317 elif scheme == 'none':
312 elif scheme == 'none':
318 self.log.warn("task::using no Task scheduler")
313 self.log.warn("task::using no Task scheduler")
319
314
320 else:
315 else:
321 self.log.info("task::using Python %s Task scheduler"%scheme)
316 self.log.info("task::using Python %s Task scheduler"%scheme)
322 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
317 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
323 hub.monitor_url, hub.client_info['notification'])
318 hub.monitor_url, hub.client_info['notification'])
324 kwargs = dict(logname='scheduler', loglevel=self.log_level,
319 kwargs = dict(logname='scheduler', loglevel=self.log_level,
325 log_url = self.log_url, config=dict(self.config))
320 log_url = self.log_url, config=dict(self.config))
326 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
321 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
327 q.daemon=True
322 q.daemon=True
328 children.append(q)
323 children.append(q)
329
324
330
325
331 def save_urls(self):
326 def save_urls(self):
332 """save the registration urls to files."""
327 """save the registration urls to files."""
333 c = self.config
328 c = self.config
334
329
335 sec_dir = self.cluster_dir.security_dir
330 sec_dir = self.profile_dir.security_dir
336 cf = self.factory
331 cf = self.factory
337
332
338 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
333 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
339 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
334 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
340
335
341 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
336 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
342 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
337 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
343
338
344
339
345 def do_import_statements(self):
340 def do_import_statements(self):
346 statements = self.import_statements
341 statements = self.import_statements
347 for s in statements:
342 for s in statements:
348 try:
343 try:
349 self.log.msg("Executing statement: '%s'" % s)
344 self.log.msg("Executing statement: '%s'" % s)
350 exec s in globals(), locals()
345 exec s in globals(), locals()
351 except:
346 except:
352 self.log.msg("Error running statement: %s" % s)
347 self.log.msg("Error running statement: %s" % s)
353
348
354 def forward_logging(self):
349 def forward_logging(self):
355 if self.log_url:
350 if self.log_url:
356 self.log.info("Forwarding logging to %s"%self.log_url)
351 self.log.info("Forwarding logging to %s"%self.log_url)
357 context = zmq.Context.instance()
352 context = zmq.Context.instance()
358 lsock = context.socket(zmq.PUB)
353 lsock = context.socket(zmq.PUB)
359 lsock.connect(self.log_url)
354 lsock.connect(self.log_url)
360 handler = PUBHandler(lsock)
355 handler = PUBHandler(lsock)
361 self.log.removeHandler(self._log_handler)
356 self.log.removeHandler(self._log_handler)
362 handler.root_topic = 'controller'
357 handler.root_topic = 'controller'
363 handler.setLevel(self.log_level)
358 handler.setLevel(self.log_level)
364 self.log.addHandler(handler)
359 self.log.addHandler(handler)
365 self._log_handler = handler
360 self._log_handler = handler
366 # #
361 # #
367
362
368 def initialize(self, argv=None):
363 def initialize(self, argv=None):
369 super(IPControllerApp, self).initialize(argv)
364 super(IPControllerApp, self).initialize(argv)
370 self.forward_logging()
365 self.forward_logging()
371 self.init_hub()
366 self.init_hub()
372 self.init_schedulers()
367 self.init_schedulers()
373
368
374 def start(self):
369 def start(self):
375 # Start the subprocesses:
370 # Start the subprocesses:
376 self.factory.start()
371 self.factory.start()
377 child_procs = []
372 child_procs = []
378 for child in self.children:
373 for child in self.children:
379 child.start()
374 child.start()
380 if isinstance(child, ProcessMonitoredQueue):
375 if isinstance(child, ProcessMonitoredQueue):
381 child_procs.append(child.launcher)
376 child_procs.append(child.launcher)
382 elif isinstance(child, Process):
377 elif isinstance(child, Process):
383 child_procs.append(child)
378 child_procs.append(child)
384 if child_procs:
379 if child_procs:
385 signal_children(child_procs)
380 signal_children(child_procs)
386
381
387 self.write_pid_file(overwrite=True)
382 self.write_pid_file(overwrite=True)
388
383
389 try:
384 try:
390 self.factory.loop.start()
385 self.factory.loop.start()
391 except KeyboardInterrupt:
386 except KeyboardInterrupt:
392 self.log.critical("Interrupted, Exiting...\n")
387 self.log.critical("Interrupted, Exiting...\n")
393
388
394
389
395
390
396 def launch_new_instance():
391 def launch_new_instance():
397 """Create and run the IPython controller"""
392 """Create and run the IPython controller"""
398 app = IPControllerApp()
393 app = IPControllerApp()
399 app.initialize()
394 app.initialize()
400 app.start()
395 app.start()
401
396
402
397
403 if __name__ == '__main__':
398 if __name__ == '__main__':
404 launch_new_instance()
399 launch_new_instance()
@@ -1,277 +1,272 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import json
18 import json
19 import os
19 import os
20 import sys
20 import sys
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 from IPython.core.newapplication import ProfileDir
25 from IPython.parallel.apps.clusterdir import (
26 from IPython.parallel.apps.clusterdir import (
26 ClusterApplication,
27 BaseParallelApplication,
27 ClusterDir,
28 # ClusterDirConfigLoader
29 )
28 )
30 from IPython.zmq.log import EnginePUBHandler
29 from IPython.zmq.log import EnginePUBHandler
31
30
32 from IPython.config.configurable import Configurable
31 from IPython.config.configurable import Configurable
33 from IPython.parallel.streamsession import StreamSession
32 from IPython.parallel.streamsession import StreamSession
34 from IPython.parallel.engine.engine import EngineFactory
33 from IPython.parallel.engine.engine import EngineFactory
35 from IPython.parallel.engine.streamkernel import Kernel
34 from IPython.parallel.engine.streamkernel import Kernel
36 from IPython.parallel.util import disambiguate_url
35 from IPython.parallel.util import disambiguate_url
37
36
38 from IPython.utils.importstring import import_item
37 from IPython.utils.importstring import import_item
39 from IPython.utils.traitlets import Bool, Unicode, Dict, List
38 from IPython.utils.traitlets import Bool, Unicode, Dict, List
40
39
41
40
42 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
43 # Module level variables
42 # Module level variables
44 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
45
44
46 #: The default config file name for this application
45 #: The default config file name for this application
47 default_config_file_name = u'ipengine_config.py'
46 default_config_file_name = u'ipengine_config.py'
48
47
49 _description = """Start an IPython engine for parallel computing.
48 _description = """Start an IPython engine for parallel computing.
50
49
51 IPython engines run in parallel and perform computations on behalf of a client
50 IPython engines run in parallel and perform computations on behalf of a client
52 and controller. A controller needs to be started before the engines. The
51 and controller. A controller needs to be started before the engines. The
53 engine can be configured using command line options or using a cluster
52 engine can be configured using command line options or using a cluster
54 directory. Cluster directories contain config, log and security files and are
53 directory. Cluster directories contain config, log and security files and are
55 usually located in your ipython directory and named as "cluster_<profile>".
54 usually located in your ipython directory and named as "cluster_<profile>".
56 See the `profile` and `cluster_dir` options for details.
55 See the `profile` and `profile_dir` options for details.
57 """
56 """
58
57
59
58
60 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
61 # MPI configuration
60 # MPI configuration
62 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
63
62
64 mpi4py_init = """from mpi4py import MPI as mpi
63 mpi4py_init = """from mpi4py import MPI as mpi
65 mpi.size = mpi.COMM_WORLD.Get_size()
64 mpi.size = mpi.COMM_WORLD.Get_size()
66 mpi.rank = mpi.COMM_WORLD.Get_rank()
65 mpi.rank = mpi.COMM_WORLD.Get_rank()
67 """
66 """
68
67
69
68
70 pytrilinos_init = """from PyTrilinos import Epetra
69 pytrilinos_init = """from PyTrilinos import Epetra
71 class SimpleStruct:
70 class SimpleStruct:
72 pass
71 pass
73 mpi = SimpleStruct()
72 mpi = SimpleStruct()
74 mpi.rank = 0
73 mpi.rank = 0
75 mpi.size = 0
74 mpi.size = 0
76 """
75 """
77
76
78 class MPI(Configurable):
77 class MPI(Configurable):
79 """Configurable for MPI initialization"""
78 """Configurable for MPI initialization"""
80 use = Unicode('', config=True,
79 use = Unicode('', config=True,
81 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
80 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
82 )
81 )
83
82
84 def _on_use_changed(self, old, new):
83 def _on_use_changed(self, old, new):
85 # load default init script if it's not set
84 # load default init script if it's not set
86 if not self.init_script:
85 if not self.init_script:
87 self.init_script = self.default_inits.get(new, '')
86 self.init_script = self.default_inits.get(new, '')
88
87
89 init_script = Unicode('', config=True,
88 init_script = Unicode('', config=True,
90 help="Initialization code for MPI")
89 help="Initialization code for MPI")
91
90
92 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
91 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
93 config=True)
92 config=True)
94
93
95
94
96 #-----------------------------------------------------------------------------
95 #-----------------------------------------------------------------------------
97 # Main application
96 # Main application
98 #-----------------------------------------------------------------------------
97 #-----------------------------------------------------------------------------
99
98
100
99
101 class IPEngineApp(ClusterApplication):
100 class IPEngineApp(BaseParallelApplication):
102
101
103 app_name = Unicode(u'ipengine')
102 app_name = Unicode(u'ipengine')
104 description = Unicode(_description)
103 description = Unicode(_description)
105 config_file_name = Unicode(default_config_file_name)
104 config_file_name = Unicode(default_config_file_name)
106 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
105 classes = List([ProfileDir, StreamSession, EngineFactory, Kernel, MPI])
107
108 auto_create_cluster_dir = Bool(False,
109 help="whether to create the cluster_dir if it doesn't exist")
110
106
111 startup_script = Unicode(u'', config=True,
107 startup_script = Unicode(u'', config=True,
112 help='specify a script to be run at startup')
108 help='specify a script to be run at startup')
113 startup_command = Unicode('', config=True,
109 startup_command = Unicode('', config=True,
114 help='specify a command to be run at startup')
110 help='specify a command to be run at startup')
115
111
116 url_file = Unicode(u'', config=True,
112 url_file = Unicode(u'', config=True,
117 help="""The full location of the file containing the connection information for
113 help="""The full location of the file containing the connection information for
118 the controller. If this is not given, the file must be in the
114 the controller. If this is not given, the file must be in the
119 security directory of the cluster directory. This location is
115 security directory of the cluster directory. This location is
120 resolved using the `profile` or `cluster_dir` options.""",
116 resolved using the `profile` or `profile_dir` options.""",
121 )
117 )
122
118
123 url_file_name = Unicode(u'ipcontroller-engine.json')
119 url_file_name = Unicode(u'ipcontroller-engine.json')
124 log_url = Unicode('', config=True,
120 log_url = Unicode('', config=True,
125 help="""The URL for the iploggerapp instance, for forwarding
121 help="""The URL for the iploggerapp instance, for forwarding
126 logging to a central location.""")
122 logging to a central location.""")
127
123
128 aliases = Dict(dict(
124 aliases = Dict(dict(
129 config = 'IPEngineApp.config_file',
130 file = 'IPEngineApp.url_file',
125 file = 'IPEngineApp.url_file',
131 c = 'IPEngineApp.startup_command',
126 c = 'IPEngineApp.startup_command',
132 s = 'IPEngineApp.startup_script',
127 s = 'IPEngineApp.startup_script',
133
128
134 ident = 'StreamSession.session',
129 ident = 'StreamSession.session',
135 user = 'StreamSession.username',
130 user = 'StreamSession.username',
136 exec_key = 'StreamSession.keyfile',
131 exec_key = 'StreamSession.keyfile',
137
132
138 url = 'EngineFactory.url',
133 url = 'EngineFactory.url',
139 ip = 'EngineFactory.ip',
134 ip = 'EngineFactory.ip',
140 transport = 'EngineFactory.transport',
135 transport = 'EngineFactory.transport',
141 port = 'EngineFactory.regport',
136 port = 'EngineFactory.regport',
142 location = 'EngineFactory.location',
137 location = 'EngineFactory.location',
143
138
144 timeout = 'EngineFactory.timeout',
139 timeout = 'EngineFactory.timeout',
145
140
146 profile = "ClusterDir.profile",
141 profile = "IPEngineApp.profile",
147 cluster_dir = 'ClusterDir.location',
142 profile_dir = 'ProfileDir.location',
148
143
149 mpi = 'MPI.use',
144 mpi = 'MPI.use',
150
145
151 log_level = 'IPEngineApp.log_level',
146 log_level = 'IPEngineApp.log_level',
152 log_url = 'IPEngineApp.log_url'
147 log_url = 'IPEngineApp.log_url'
153 ))
148 ))
154
149
155 # def find_key_file(self):
150 # def find_key_file(self):
156 # """Set the key file.
151 # """Set the key file.
157 #
152 #
158 # Here we don't try to actually see if it exists for is valid as that
153 # Here we don't try to actually see if it exists for is valid as that
159 # is hadled by the connection logic.
154 # is hadled by the connection logic.
160 # """
155 # """
161 # config = self.master_config
156 # config = self.master_config
162 # # Find the actual controller key file
157 # # Find the actual controller key file
163 # if not config.Global.key_file:
158 # if not config.Global.key_file:
164 # try_this = os.path.join(
159 # try_this = os.path.join(
165 # config.Global.cluster_dir,
160 # config.Global.profile_dir,
166 # config.Global.security_dir,
161 # config.Global.security_dir,
167 # config.Global.key_file_name
162 # config.Global.key_file_name
168 # )
163 # )
169 # config.Global.key_file = try_this
164 # config.Global.key_file = try_this
170
165
171 def find_url_file(self):
166 def find_url_file(self):
172 """Set the key file.
167 """Set the key file.
173
168
174 Here we don't try to actually see if it exists for is valid as that
169 Here we don't try to actually see if it exists for is valid as that
175 is hadled by the connection logic.
170 is hadled by the connection logic.
176 """
171 """
177 config = self.config
172 config = self.config
178 # Find the actual controller key file
173 # Find the actual controller key file
179 if not self.url_file:
174 if not self.url_file:
180 self.url_file = os.path.join(
175 self.url_file = os.path.join(
181 self.cluster_dir.security_dir,
176 self.profile_dir.security_dir,
182 self.url_file_name
177 self.url_file_name
183 )
178 )
184 def init_engine(self):
179 def init_engine(self):
185 # This is the working dir by now.
180 # This is the working dir by now.
186 sys.path.insert(0, '')
181 sys.path.insert(0, '')
187 config = self.config
182 config = self.config
188 # print config
183 # print config
189 self.find_url_file()
184 self.find_url_file()
190
185
191 # if os.path.exists(config.Global.key_file) and config.Global.secure:
186 # if os.path.exists(config.Global.key_file) and config.Global.secure:
192 # config.SessionFactory.exec_key = config.Global.key_file
187 # config.SessionFactory.exec_key = config.Global.key_file
193 if os.path.exists(self.url_file):
188 if os.path.exists(self.url_file):
194 with open(self.url_file) as f:
189 with open(self.url_file) as f:
195 d = json.loads(f.read())
190 d = json.loads(f.read())
196 for k,v in d.iteritems():
191 for k,v in d.iteritems():
197 if isinstance(v, unicode):
192 if isinstance(v, unicode):
198 d[k] = v.encode()
193 d[k] = v.encode()
199 if d['exec_key']:
194 if d['exec_key']:
200 config.StreamSession.key = d['exec_key']
195 config.StreamSession.key = d['exec_key']
201 d['url'] = disambiguate_url(d['url'], d['location'])
196 d['url'] = disambiguate_url(d['url'], d['location'])
202 config.EngineFactory.url = d['url']
197 config.EngineFactory.url = d['url']
203 config.EngineFactory.location = d['location']
198 config.EngineFactory.location = d['location']
204
199
205 try:
200 try:
206 exec_lines = config.Kernel.exec_lines
201 exec_lines = config.Kernel.exec_lines
207 except AttributeError:
202 except AttributeError:
208 config.Kernel.exec_lines = []
203 config.Kernel.exec_lines = []
209 exec_lines = config.Kernel.exec_lines
204 exec_lines = config.Kernel.exec_lines
210
205
211 if self.startup_script:
206 if self.startup_script:
212 enc = sys.getfilesystemencoding() or 'utf8'
207 enc = sys.getfilesystemencoding() or 'utf8'
213 cmd="execfile(%r)"%self.startup_script.encode(enc)
208 cmd="execfile(%r)"%self.startup_script.encode(enc)
214 exec_lines.append(cmd)
209 exec_lines.append(cmd)
215 if self.startup_command:
210 if self.startup_command:
216 exec_lines.append(self.startup_command)
211 exec_lines.append(self.startup_command)
217
212
218 # Create the underlying shell class and Engine
213 # Create the underlying shell class and Engine
219 # shell_class = import_item(self.master_config.Global.shell_class)
214 # shell_class = import_item(self.master_config.Global.shell_class)
220 # print self.config
215 # print self.config
221 try:
216 try:
222 self.engine = EngineFactory(config=config, log=self.log)
217 self.engine = EngineFactory(config=config, log=self.log)
223 except:
218 except:
224 self.log.error("Couldn't start the Engine", exc_info=True)
219 self.log.error("Couldn't start the Engine", exc_info=True)
225 self.exit(1)
220 self.exit(1)
226
221
227 def forward_logging(self):
222 def forward_logging(self):
228 if self.log_url:
223 if self.log_url:
229 self.log.info("Forwarding logging to %s"%self.log_url)
224 self.log.info("Forwarding logging to %s"%self.log_url)
230 context = self.engine.context
225 context = self.engine.context
231 lsock = context.socket(zmq.PUB)
226 lsock = context.socket(zmq.PUB)
232 lsock.connect(self.log_url)
227 lsock.connect(self.log_url)
233 self.log.removeHandler(self._log_handler)
228 self.log.removeHandler(self._log_handler)
234 handler = EnginePUBHandler(self.engine, lsock)
229 handler = EnginePUBHandler(self.engine, lsock)
235 handler.setLevel(self.log_level)
230 handler.setLevel(self.log_level)
236 self.log.addHandler(handler)
231 self.log.addHandler(handler)
237 self._log_handler = handler
232 self._log_handler = handler
238 #
233 #
239 def init_mpi(self):
234 def init_mpi(self):
240 global mpi
235 global mpi
241 self.mpi = MPI(config=self.config)
236 self.mpi = MPI(config=self.config)
242
237
243 mpi_import_statement = self.mpi.init_script
238 mpi_import_statement = self.mpi.init_script
244 if mpi_import_statement:
239 if mpi_import_statement:
245 try:
240 try:
246 self.log.info("Initializing MPI:")
241 self.log.info("Initializing MPI:")
247 self.log.info(mpi_import_statement)
242 self.log.info(mpi_import_statement)
248 exec mpi_import_statement in globals()
243 exec mpi_import_statement in globals()
249 except:
244 except:
250 mpi = None
245 mpi = None
251 else:
246 else:
252 mpi = None
247 mpi = None
253
248
254 def initialize(self, argv=None):
249 def initialize(self, argv=None):
255 super(IPEngineApp, self).initialize(argv)
250 super(IPEngineApp, self).initialize(argv)
256 self.init_mpi()
251 self.init_mpi()
257 self.init_engine()
252 self.init_engine()
258 self.forward_logging()
253 self.forward_logging()
259
254
260 def start(self):
255 def start(self):
261 self.engine.start()
256 self.engine.start()
262 try:
257 try:
263 self.engine.loop.start()
258 self.engine.loop.start()
264 except KeyboardInterrupt:
259 except KeyboardInterrupt:
265 self.log.critical("Engine Interrupted, shutting down...\n")
260 self.log.critical("Engine Interrupted, shutting down...\n")
266
261
267
262
268 def launch_new_instance():
263 def launch_new_instance():
269 """Create and run the IPython engine"""
264 """Create and run the IPython engine"""
270 app = IPEngineApp()
265 app = IPEngineApp()
271 app.initialize()
266 app.initialize()
272 app.start()
267 app.start()
273
268
274
269
275 if __name__ == '__main__':
270 if __name__ == '__main__':
276 launch_new_instance()
271 launch_new_instance()
277
272
@@ -1,97 +1,96 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 A simple IPython logger application
4 A simple IPython logger application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
8 # Copyright (C) 2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import sys
19 import sys
20
20
21 import zmq
21 import zmq
22
22
23 from IPython.core.newapplication import ProfileDir
23 from IPython.utils.traitlets import Bool, Dict, Unicode
24 from IPython.utils.traitlets import Bool, Dict, Unicode
24
25
25 from IPython.parallel.apps.clusterdir import (
26 from IPython.parallel.apps.clusterdir import (
26 ClusterApplication,
27 BaseParallelApplication,
27 ClusterDir,
28 base_aliases
28 base_aliases
29 )
29 )
30 from IPython.parallel.apps.logwatcher import LogWatcher
30 from IPython.parallel.apps.logwatcher import LogWatcher
31
31
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33 # Module level variables
33 # Module level variables
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35
35
36 #: The default config file name for this application
36 #: The default config file name for this application
37 default_config_file_name = u'iplogger_config.py'
37 default_config_file_name = u'iplogger_config.py'
38
38
39 _description = """Start an IPython logger for parallel computing.
39 _description = """Start an IPython logger for parallel computing.
40
40
41 IPython controllers and engines (and your own processes) can broadcast log messages
41 IPython controllers and engines (and your own processes) can broadcast log messages
42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
43 logger can be configured using command line options or using a cluster
43 logger can be configured using command line options or using a cluster
44 directory. Cluster directories contain config, log and security files and are
44 directory. Cluster directories contain config, log and security files and are
45 usually located in your ipython directory and named as "cluster_<profile>".
45 usually located in your ipython directory and named as "cluster_<profile>".
46 See the `profile` and `cluster_dir` options for details.
46 See the `profile` and `profile_dir` options for details.
47 """
47 """
48
48
49
49
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51 # Main application
51 # Main application
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 aliases = {}
53 aliases = {}
54 aliases.update(base_aliases)
54 aliases.update(base_aliases)
55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
56
56
57 class IPLoggerApp(ClusterApplication):
57 class IPLoggerApp(BaseParallelApplication):
58
58
59 name = u'iploggerz'
59 name = u'iploggerz'
60 description = _description
60 description = _description
61 config_file_name = Unicode(default_config_file_name)
61 config_file_name = Unicode(default_config_file_name)
62 auto_create_cluster_dir = Bool(False)
63
62
64 classes = [LogWatcher, ClusterDir]
63 classes = [LogWatcher, ProfileDir]
65 aliases = Dict(aliases)
64 aliases = Dict(aliases)
66
65
67 def initialize(self, argv=None):
66 def initialize(self, argv=None):
68 super(IPLoggerApp, self).initialize(argv)
67 super(IPLoggerApp, self).initialize(argv)
69 self.init_watcher()
68 self.init_watcher()
70
69
71 def init_watcher(self):
70 def init_watcher(self):
72 try:
71 try:
73 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
72 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
74 except:
73 except:
75 self.log.error("Couldn't start the LogWatcher", exc_info=True)
74 self.log.error("Couldn't start the LogWatcher", exc_info=True)
76 self.exit(1)
75 self.exit(1)
77 self.log.info("Listening for log messages on %r"%self.watcher.url)
76 self.log.info("Listening for log messages on %r"%self.watcher.url)
78
77
79
78
80 def start(self):
79 def start(self):
81 self.watcher.start()
80 self.watcher.start()
82 try:
81 try:
83 self.watcher.loop.start()
82 self.watcher.loop.start()
84 except KeyboardInterrupt:
83 except KeyboardInterrupt:
85 self.log.critical("Logging Interrupted, shutting down...\n")
84 self.log.critical("Logging Interrupted, shutting down...\n")
86
85
87
86
88 def launch_new_instance():
87 def launch_new_instance():
89 """Create and run the IPython LogWatcher"""
88 """Create and run the IPython LogWatcher"""
90 app = IPLoggerApp()
89 app = IPLoggerApp()
91 app.initialize()
90 app.initialize()
92 app.start()
91 app.start()
93
92
94
93
95 if __name__ == '__main__':
94 if __name__ == '__main__':
96 launch_new_instance()
95 launch_new_instance()
97
96
@@ -1,1070 +1,1070 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23
23
24 # signal imports, handling various platforms, versions
24 # signal imports, handling various platforms, versions
25
25
26 from signal import SIGINT, SIGTERM
26 from signal import SIGINT, SIGTERM
27 try:
27 try:
28 from signal import SIGKILL
28 from signal import SIGKILL
29 except ImportError:
29 except ImportError:
30 # Windows
30 # Windows
31 SIGKILL=SIGTERM
31 SIGKILL=SIGTERM
32
32
33 try:
33 try:
34 # Windows >= 2.7, 3.2
34 # Windows >= 2.7, 3.2
35 from signal import CTRL_C_EVENT as SIGINT
35 from signal import CTRL_C_EVENT as SIGINT
36 except ImportError:
36 except ImportError:
37 pass
37 pass
38
38
39 from subprocess import Popen, PIPE, STDOUT
39 from subprocess import Popen, PIPE, STDOUT
40 try:
40 try:
41 from subprocess import check_output
41 from subprocess import check_output
42 except ImportError:
42 except ImportError:
43 # pre-2.7, define check_output with Popen
43 # pre-2.7, define check_output with Popen
44 def check_output(*args, **kwargs):
44 def check_output(*args, **kwargs):
45 kwargs.update(dict(stdout=PIPE))
45 kwargs.update(dict(stdout=PIPE))
46 p = Popen(*args, **kwargs)
46 p = Popen(*args, **kwargs)
47 out,err = p.communicate()
47 out,err = p.communicate()
48 return out
48 return out
49
49
50 from zmq.eventloop import ioloop
50 from zmq.eventloop import ioloop
51
51
52 from IPython.external import Itpl
52 from IPython.external import Itpl
53 # from IPython.config.configurable import Configurable
53 # from IPython.config.configurable import Configurable
54 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
54 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
55 from IPython.utils.path import get_ipython_module_path
55 from IPython.utils.path import get_ipython_module_path
56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
57
57
58 from IPython.parallel.factory import LoggingFactory
58 from IPython.parallel.factory import LoggingFactory
59
59
60 from .win32support import forward_read_events
60 from .win32support import forward_read_events
61
61
62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
63
63
64 WINDOWS = os.name == 'nt'
64 WINDOWS = os.name == 'nt'
65
65
66 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
67 # Paths to the kernel apps
67 # Paths to the kernel apps
68 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
69
69
70
70
71 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
71 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
72 'IPython.parallel.apps.ipclusterapp'
72 'IPython.parallel.apps.ipclusterapp'
73 ))
73 ))
74
74
75 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
75 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
76 'IPython.parallel.apps.ipengineapp'
76 'IPython.parallel.apps.ipengineapp'
77 ))
77 ))
78
78
79 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
79 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
80 'IPython.parallel.apps.ipcontrollerapp'
80 'IPython.parallel.apps.ipcontrollerapp'
81 ))
81 ))
82
82
83 #-----------------------------------------------------------------------------
83 #-----------------------------------------------------------------------------
84 # Base launchers and errors
84 # Base launchers and errors
85 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
86
86
87
87
88 class LauncherError(Exception):
88 class LauncherError(Exception):
89 pass
89 pass
90
90
91
91
92 class ProcessStateError(LauncherError):
92 class ProcessStateError(LauncherError):
93 pass
93 pass
94
94
95
95
96 class UnknownStatus(LauncherError):
96 class UnknownStatus(LauncherError):
97 pass
97 pass
98
98
99
99
100 class BaseLauncher(LoggingFactory):
100 class BaseLauncher(LoggingFactory):
101 """An asbtraction for starting, stopping and signaling a process."""
101 """An asbtraction for starting, stopping and signaling a process."""
102
102
103 # In all of the launchers, the work_dir is where child processes will be
103 # In all of the launchers, the work_dir is where child processes will be
104 # run. This will usually be the cluster_dir, but may not be. any work_dir
104 # run. This will usually be the profile_dir, but may not be. any work_dir
105 # passed into the __init__ method will override the config value.
105 # passed into the __init__ method will override the config value.
106 # This should not be used to set the work_dir for the actual engine
106 # This should not be used to set the work_dir for the actual engine
107 # and controller. Instead, use their own config files or the
107 # and controller. Instead, use their own config files or the
108 # controller_args, engine_args attributes of the launchers to add
108 # controller_args, engine_args attributes of the launchers to add
109 # the work_dir option.
109 # the work_dir option.
110 work_dir = Unicode(u'.')
110 work_dir = Unicode(u'.')
111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
112
112
113 start_data = Any()
113 start_data = Any()
114 stop_data = Any()
114 stop_data = Any()
115
115
116 def _loop_default(self):
116 def _loop_default(self):
117 return ioloop.IOLoop.instance()
117 return ioloop.IOLoop.instance()
118
118
119 def __init__(self, work_dir=u'.', config=None, **kwargs):
119 def __init__(self, work_dir=u'.', config=None, **kwargs):
120 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
120 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
121 self.state = 'before' # can be before, running, after
121 self.state = 'before' # can be before, running, after
122 self.stop_callbacks = []
122 self.stop_callbacks = []
123 self.start_data = None
123 self.start_data = None
124 self.stop_data = None
124 self.stop_data = None
125
125
126 @property
126 @property
127 def args(self):
127 def args(self):
128 """A list of cmd and args that will be used to start the process.
128 """A list of cmd and args that will be used to start the process.
129
129
130 This is what is passed to :func:`spawnProcess` and the first element
130 This is what is passed to :func:`spawnProcess` and the first element
131 will be the process name.
131 will be the process name.
132 """
132 """
133 return self.find_args()
133 return self.find_args()
134
134
135 def find_args(self):
135 def find_args(self):
136 """The ``.args`` property calls this to find the args list.
136 """The ``.args`` property calls this to find the args list.
137
137
138 Subcommand should implement this to construct the cmd and args.
138 Subcommand should implement this to construct the cmd and args.
139 """
139 """
140 raise NotImplementedError('find_args must be implemented in a subclass')
140 raise NotImplementedError('find_args must be implemented in a subclass')
141
141
142 @property
142 @property
143 def arg_str(self):
143 def arg_str(self):
144 """The string form of the program arguments."""
144 """The string form of the program arguments."""
145 return ' '.join(self.args)
145 return ' '.join(self.args)
146
146
147 @property
147 @property
148 def running(self):
148 def running(self):
149 """Am I running."""
149 """Am I running."""
150 if self.state == 'running':
150 if self.state == 'running':
151 return True
151 return True
152 else:
152 else:
153 return False
153 return False
154
154
155 def start(self):
155 def start(self):
156 """Start the process.
156 """Start the process.
157
157
158 This must return a deferred that fires with information about the
158 This must return a deferred that fires with information about the
159 process starting (like a pid, job id, etc.).
159 process starting (like a pid, job id, etc.).
160 """
160 """
161 raise NotImplementedError('start must be implemented in a subclass')
161 raise NotImplementedError('start must be implemented in a subclass')
162
162
163 def stop(self):
163 def stop(self):
164 """Stop the process and notify observers of stopping.
164 """Stop the process and notify observers of stopping.
165
165
166 This must return a deferred that fires with information about the
166 This must return a deferred that fires with information about the
167 processing stopping, like errors that occur while the process is
167 processing stopping, like errors that occur while the process is
168 attempting to be shut down. This deferred won't fire when the process
168 attempting to be shut down. This deferred won't fire when the process
169 actually stops. To observe the actual process stopping, see
169 actually stops. To observe the actual process stopping, see
170 :func:`observe_stop`.
170 :func:`observe_stop`.
171 """
171 """
172 raise NotImplementedError('stop must be implemented in a subclass')
172 raise NotImplementedError('stop must be implemented in a subclass')
173
173
174 def on_stop(self, f):
174 def on_stop(self, f):
175 """Get a deferred that will fire when the process stops.
175 """Get a deferred that will fire when the process stops.
176
176
177 The deferred will fire with data that contains information about
177 The deferred will fire with data that contains information about
178 the exit status of the process.
178 the exit status of the process.
179 """
179 """
180 if self.state=='after':
180 if self.state=='after':
181 return f(self.stop_data)
181 return f(self.stop_data)
182 else:
182 else:
183 self.stop_callbacks.append(f)
183 self.stop_callbacks.append(f)
184
184
185 def notify_start(self, data):
185 def notify_start(self, data):
186 """Call this to trigger startup actions.
186 """Call this to trigger startup actions.
187
187
188 This logs the process startup and sets the state to 'running'. It is
188 This logs the process startup and sets the state to 'running'. It is
189 a pass-through so it can be used as a callback.
189 a pass-through so it can be used as a callback.
190 """
190 """
191
191
192 self.log.info('Process %r started: %r' % (self.args[0], data))
192 self.log.info('Process %r started: %r' % (self.args[0], data))
193 self.start_data = data
193 self.start_data = data
194 self.state = 'running'
194 self.state = 'running'
195 return data
195 return data
196
196
197 def notify_stop(self, data):
197 def notify_stop(self, data):
198 """Call this to trigger process stop actions.
198 """Call this to trigger process stop actions.
199
199
200 This logs the process stopping and sets the state to 'after'. Call
200 This logs the process stopping and sets the state to 'after'. Call
201 this to trigger all the deferreds from :func:`observe_stop`."""
201 this to trigger all the deferreds from :func:`observe_stop`."""
202
202
203 self.log.info('Process %r stopped: %r' % (self.args[0], data))
203 self.log.info('Process %r stopped: %r' % (self.args[0], data))
204 self.stop_data = data
204 self.stop_data = data
205 self.state = 'after'
205 self.state = 'after'
206 for i in range(len(self.stop_callbacks)):
206 for i in range(len(self.stop_callbacks)):
207 d = self.stop_callbacks.pop()
207 d = self.stop_callbacks.pop()
208 d(data)
208 d(data)
209 return data
209 return data
210
210
211 def signal(self, sig):
211 def signal(self, sig):
212 """Signal the process.
212 """Signal the process.
213
213
214 Return a semi-meaningless deferred after signaling the process.
214 Return a semi-meaningless deferred after signaling the process.
215
215
216 Parameters
216 Parameters
217 ----------
217 ----------
218 sig : str or int
218 sig : str or int
219 'KILL', 'INT', etc., or any signal number
219 'KILL', 'INT', etc., or any signal number
220 """
220 """
221 raise NotImplementedError('signal must be implemented in a subclass')
221 raise NotImplementedError('signal must be implemented in a subclass')
222
222
223
223
224 #-----------------------------------------------------------------------------
224 #-----------------------------------------------------------------------------
225 # Local process launchers
225 # Local process launchers
226 #-----------------------------------------------------------------------------
226 #-----------------------------------------------------------------------------
227
227
228
228
229 class LocalProcessLauncher(BaseLauncher):
229 class LocalProcessLauncher(BaseLauncher):
230 """Start and stop an external process in an asynchronous manner.
230 """Start and stop an external process in an asynchronous manner.
231
231
232 This will launch the external process with a working directory of
232 This will launch the external process with a working directory of
233 ``self.work_dir``.
233 ``self.work_dir``.
234 """
234 """
235
235
236 # This is used to to construct self.args, which is passed to
236 # This is used to to construct self.args, which is passed to
237 # spawnProcess.
237 # spawnProcess.
238 cmd_and_args = List([])
238 cmd_and_args = List([])
239 poll_frequency = Int(100) # in ms
239 poll_frequency = Int(100) # in ms
240
240
241 def __init__(self, work_dir=u'.', config=None, **kwargs):
241 def __init__(self, work_dir=u'.', config=None, **kwargs):
242 super(LocalProcessLauncher, self).__init__(
242 super(LocalProcessLauncher, self).__init__(
243 work_dir=work_dir, config=config, **kwargs
243 work_dir=work_dir, config=config, **kwargs
244 )
244 )
245 self.process = None
245 self.process = None
246 self.start_deferred = None
246 self.start_deferred = None
247 self.poller = None
247 self.poller = None
248
248
249 def find_args(self):
249 def find_args(self):
250 return self.cmd_and_args
250 return self.cmd_and_args
251
251
252 def start(self):
252 def start(self):
253 if self.state == 'before':
253 if self.state == 'before':
254 self.process = Popen(self.args,
254 self.process = Popen(self.args,
255 stdout=PIPE,stderr=PIPE,stdin=PIPE,
255 stdout=PIPE,stderr=PIPE,stdin=PIPE,
256 env=os.environ,
256 env=os.environ,
257 cwd=self.work_dir
257 cwd=self.work_dir
258 )
258 )
259 if WINDOWS:
259 if WINDOWS:
260 self.stdout = forward_read_events(self.process.stdout)
260 self.stdout = forward_read_events(self.process.stdout)
261 self.stderr = forward_read_events(self.process.stderr)
261 self.stderr = forward_read_events(self.process.stderr)
262 else:
262 else:
263 self.stdout = self.process.stdout.fileno()
263 self.stdout = self.process.stdout.fileno()
264 self.stderr = self.process.stderr.fileno()
264 self.stderr = self.process.stderr.fileno()
265 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
265 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
266 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
266 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
267 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
267 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
268 self.poller.start()
268 self.poller.start()
269 self.notify_start(self.process.pid)
269 self.notify_start(self.process.pid)
270 else:
270 else:
271 s = 'The process was already started and has state: %r' % self.state
271 s = 'The process was already started and has state: %r' % self.state
272 raise ProcessStateError(s)
272 raise ProcessStateError(s)
273
273
274 def stop(self):
274 def stop(self):
275 return self.interrupt_then_kill()
275 return self.interrupt_then_kill()
276
276
277 def signal(self, sig):
277 def signal(self, sig):
278 if self.state == 'running':
278 if self.state == 'running':
279 if WINDOWS and sig != SIGINT:
279 if WINDOWS and sig != SIGINT:
280 # use Windows tree-kill for better child cleanup
280 # use Windows tree-kill for better child cleanup
281 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
281 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
282 else:
282 else:
283 self.process.send_signal(sig)
283 self.process.send_signal(sig)
284
284
285 def interrupt_then_kill(self, delay=2.0):
285 def interrupt_then_kill(self, delay=2.0):
286 """Send INT, wait a delay and then send KILL."""
286 """Send INT, wait a delay and then send KILL."""
287 try:
287 try:
288 self.signal(SIGINT)
288 self.signal(SIGINT)
289 except Exception:
289 except Exception:
290 self.log.debug("interrupt failed")
290 self.log.debug("interrupt failed")
291 pass
291 pass
292 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
292 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
293 self.killer.start()
293 self.killer.start()
294
294
295 # callbacks, etc:
295 # callbacks, etc:
296
296
297 def handle_stdout(self, fd, events):
297 def handle_stdout(self, fd, events):
298 if WINDOWS:
298 if WINDOWS:
299 line = self.stdout.recv()
299 line = self.stdout.recv()
300 else:
300 else:
301 line = self.process.stdout.readline()
301 line = self.process.stdout.readline()
302 # a stopped process will be readable but return empty strings
302 # a stopped process will be readable but return empty strings
303 if line:
303 if line:
304 self.log.info(line[:-1])
304 self.log.info(line[:-1])
305 else:
305 else:
306 self.poll()
306 self.poll()
307
307
308 def handle_stderr(self, fd, events):
308 def handle_stderr(self, fd, events):
309 if WINDOWS:
309 if WINDOWS:
310 line = self.stderr.recv()
310 line = self.stderr.recv()
311 else:
311 else:
312 line = self.process.stderr.readline()
312 line = self.process.stderr.readline()
313 # a stopped process will be readable but return empty strings
313 # a stopped process will be readable but return empty strings
314 if line:
314 if line:
315 self.log.error(line[:-1])
315 self.log.error(line[:-1])
316 else:
316 else:
317 self.poll()
317 self.poll()
318
318
319 def poll(self):
319 def poll(self):
320 status = self.process.poll()
320 status = self.process.poll()
321 if status is not None:
321 if status is not None:
322 self.poller.stop()
322 self.poller.stop()
323 self.loop.remove_handler(self.stdout)
323 self.loop.remove_handler(self.stdout)
324 self.loop.remove_handler(self.stderr)
324 self.loop.remove_handler(self.stderr)
325 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
325 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
326 return status
326 return status
327
327
328 class LocalControllerLauncher(LocalProcessLauncher):
328 class LocalControllerLauncher(LocalProcessLauncher):
329 """Launch a controller as a regular external process."""
329 """Launch a controller as a regular external process."""
330
330
331 controller_cmd = List(ipcontroller_cmd_argv, config=True,
331 controller_cmd = List(ipcontroller_cmd_argv, config=True,
332 help="""Popen command to launch ipcontroller.""")
332 help="""Popen command to launch ipcontroller.""")
333 # Command line arguments to ipcontroller.
333 # Command line arguments to ipcontroller.
334 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
334 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
335 help="""command-line args to pass to ipcontroller""")
335 help="""command-line args to pass to ipcontroller""")
336
336
337 def find_args(self):
337 def find_args(self):
338 return self.controller_cmd + self.controller_args
338 return self.controller_cmd + self.controller_args
339
339
340 def start(self, cluster_dir):
340 def start(self, profile_dir):
341 """Start the controller by cluster_dir."""
341 """Start the controller by profile_dir."""
342 self.controller_args.extend(['cluster_dir=%s'%cluster_dir])
342 self.controller_args.extend(['profile_dir=%s'%profile_dir])
343 self.cluster_dir = unicode(cluster_dir)
343 self.profile_dir = unicode(profile_dir)
344 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
344 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
345 return super(LocalControllerLauncher, self).start()
345 return super(LocalControllerLauncher, self).start()
346
346
347
347
348 class LocalEngineLauncher(LocalProcessLauncher):
348 class LocalEngineLauncher(LocalProcessLauncher):
349 """Launch a single engine as a regular externall process."""
349 """Launch a single engine as a regular externall process."""
350
350
351 engine_cmd = List(ipengine_cmd_argv, config=True,
351 engine_cmd = List(ipengine_cmd_argv, config=True,
352 help="""command to launch the Engine.""")
352 help="""command to launch the Engine.""")
353 # Command line arguments for ipengine.
353 # Command line arguments for ipengine.
354 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
354 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
355 help="command-line arguments to pass to ipengine"
355 help="command-line arguments to pass to ipengine"
356 )
356 )
357
357
358 def find_args(self):
358 def find_args(self):
359 return self.engine_cmd + self.engine_args
359 return self.engine_cmd + self.engine_args
360
360
361 def start(self, cluster_dir):
361 def start(self, profile_dir):
362 """Start the engine by cluster_dir."""
362 """Start the engine by profile_dir."""
363 self.engine_args.extend(['cluster_dir=%s'%cluster_dir])
363 self.engine_args.extend(['profile_dir=%s'%profile_dir])
364 self.cluster_dir = unicode(cluster_dir)
364 self.profile_dir = unicode(profile_dir)
365 return super(LocalEngineLauncher, self).start()
365 return super(LocalEngineLauncher, self).start()
366
366
367
367
368 class LocalEngineSetLauncher(BaseLauncher):
368 class LocalEngineSetLauncher(BaseLauncher):
369 """Launch a set of engines as regular external processes."""
369 """Launch a set of engines as regular external processes."""
370
370
371 # Command line arguments for ipengine.
371 # Command line arguments for ipengine.
372 engine_args = List(
372 engine_args = List(
373 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
373 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
374 help="command-line arguments to pass to ipengine"
374 help="command-line arguments to pass to ipengine"
375 )
375 )
376 # launcher class
376 # launcher class
377 launcher_class = LocalEngineLauncher
377 launcher_class = LocalEngineLauncher
378
378
379 launchers = Dict()
379 launchers = Dict()
380 stop_data = Dict()
380 stop_data = Dict()
381
381
382 def __init__(self, work_dir=u'.', config=None, **kwargs):
382 def __init__(self, work_dir=u'.', config=None, **kwargs):
383 super(LocalEngineSetLauncher, self).__init__(
383 super(LocalEngineSetLauncher, self).__init__(
384 work_dir=work_dir, config=config, **kwargs
384 work_dir=work_dir, config=config, **kwargs
385 )
385 )
386 self.stop_data = {}
386 self.stop_data = {}
387
387
388 def start(self, n, cluster_dir):
388 def start(self, n, profile_dir):
389 """Start n engines by profile or cluster_dir."""
389 """Start n engines by profile or profile_dir."""
390 self.cluster_dir = unicode(cluster_dir)
390 self.profile_dir = unicode(profile_dir)
391 dlist = []
391 dlist = []
392 for i in range(n):
392 for i in range(n):
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
394 # Copy the engine args over to each engine launcher.
394 # Copy the engine args over to each engine launcher.
395 el.engine_args = copy.deepcopy(self.engine_args)
395 el.engine_args = copy.deepcopy(self.engine_args)
396 el.on_stop(self._notice_engine_stopped)
396 el.on_stop(self._notice_engine_stopped)
397 d = el.start(cluster_dir)
397 d = el.start(profile_dir)
398 if i==0:
398 if i==0:
399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 self.launchers[i] = el
400 self.launchers[i] = el
401 dlist.append(d)
401 dlist.append(d)
402 self.notify_start(dlist)
402 self.notify_start(dlist)
403 # The consumeErrors here could be dangerous
403 # The consumeErrors here could be dangerous
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
405 # dfinal.addCallback(self.notify_start)
405 # dfinal.addCallback(self.notify_start)
406 return dlist
406 return dlist
407
407
408 def find_args(self):
408 def find_args(self):
409 return ['engine set']
409 return ['engine set']
410
410
411 def signal(self, sig):
411 def signal(self, sig):
412 dlist = []
412 dlist = []
413 for el in self.launchers.itervalues():
413 for el in self.launchers.itervalues():
414 d = el.signal(sig)
414 d = el.signal(sig)
415 dlist.append(d)
415 dlist.append(d)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 return dlist
417 return dlist
418
418
419 def interrupt_then_kill(self, delay=1.0):
419 def interrupt_then_kill(self, delay=1.0):
420 dlist = []
420 dlist = []
421 for el in self.launchers.itervalues():
421 for el in self.launchers.itervalues():
422 d = el.interrupt_then_kill(delay)
422 d = el.interrupt_then_kill(delay)
423 dlist.append(d)
423 dlist.append(d)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
425 return dlist
425 return dlist
426
426
427 def stop(self):
427 def stop(self):
428 return self.interrupt_then_kill()
428 return self.interrupt_then_kill()
429
429
430 def _notice_engine_stopped(self, data):
430 def _notice_engine_stopped(self, data):
431 pid = data['pid']
431 pid = data['pid']
432 for idx,el in self.launchers.iteritems():
432 for idx,el in self.launchers.iteritems():
433 if el.process.pid == pid:
433 if el.process.pid == pid:
434 break
434 break
435 self.launchers.pop(idx)
435 self.launchers.pop(idx)
436 self.stop_data[idx] = data
436 self.stop_data[idx] = data
437 if not self.launchers:
437 if not self.launchers:
438 self.notify_stop(self.stop_data)
438 self.notify_stop(self.stop_data)
439
439
440
440
441 #-----------------------------------------------------------------------------
441 #-----------------------------------------------------------------------------
442 # MPIExec launchers
442 # MPIExec launchers
443 #-----------------------------------------------------------------------------
443 #-----------------------------------------------------------------------------
444
444
445
445
446 class MPIExecLauncher(LocalProcessLauncher):
446 class MPIExecLauncher(LocalProcessLauncher):
447 """Launch an external process using mpiexec."""
447 """Launch an external process using mpiexec."""
448
448
449 mpi_cmd = List(['mpiexec'], config=True,
449 mpi_cmd = List(['mpiexec'], config=True,
450 help="The mpiexec command to use in starting the process."
450 help="The mpiexec command to use in starting the process."
451 )
451 )
452 mpi_args = List([], config=True,
452 mpi_args = List([], config=True,
453 help="The command line arguments to pass to mpiexec."
453 help="The command line arguments to pass to mpiexec."
454 )
454 )
455 program = List(['date'], config=True,
455 program = List(['date'], config=True,
456 help="The program to start via mpiexec.")
456 help="The program to start via mpiexec.")
457 program_args = List([], config=True,
457 program_args = List([], config=True,
458 help="The command line argument to the program."
458 help="The command line argument to the program."
459 )
459 )
460 n = Int(1)
460 n = Int(1)
461
461
462 def find_args(self):
462 def find_args(self):
463 """Build self.args using all the fields."""
463 """Build self.args using all the fields."""
464 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
464 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
465 self.program + self.program_args
465 self.program + self.program_args
466
466
467 def start(self, n):
467 def start(self, n):
468 """Start n instances of the program using mpiexec."""
468 """Start n instances of the program using mpiexec."""
469 self.n = n
469 self.n = n
470 return super(MPIExecLauncher, self).start()
470 return super(MPIExecLauncher, self).start()
471
471
472
472
473 class MPIExecControllerLauncher(MPIExecLauncher):
473 class MPIExecControllerLauncher(MPIExecLauncher):
474 """Launch a controller using mpiexec."""
474 """Launch a controller using mpiexec."""
475
475
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 help="Popen command to launch the Contropper"
477 help="Popen command to launch the Contropper"
478 )
478 )
479 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
479 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
480 help="Command line arguments to pass to ipcontroller."
480 help="Command line arguments to pass to ipcontroller."
481 )
481 )
482 n = Int(1)
482 n = Int(1)
483
483
484 def start(self, cluster_dir):
484 def start(self, profile_dir):
485 """Start the controller by cluster_dir."""
485 """Start the controller by profile_dir."""
486 self.controller_args.extend(['cluster_dir=%s'%cluster_dir])
486 self.controller_args.extend(['profile_dir=%s'%profile_dir])
487 self.cluster_dir = unicode(cluster_dir)
487 self.profile_dir = unicode(profile_dir)
488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 return super(MPIExecControllerLauncher, self).start(1)
489 return super(MPIExecControllerLauncher, self).start(1)
490
490
491 def find_args(self):
491 def find_args(self):
492 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
492 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
493 self.controller_cmd + self.controller_args
493 self.controller_cmd + self.controller_args
494
494
495
495
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
497
497
498 program = List(ipengine_cmd_argv, config=True,
498 program = List(ipengine_cmd_argv, config=True,
499 help="Popen command for ipengine"
499 help="Popen command for ipengine"
500 )
500 )
501 program_args = List(
501 program_args = List(
502 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
502 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
503 help="Command line arguments for ipengine."
503 help="Command line arguments for ipengine."
504 )
504 )
505 n = Int(1)
505 n = Int(1)
506
506
507 def start(self, n, cluster_dir):
507 def start(self, n, profile_dir):
508 """Start n engines by profile or cluster_dir."""
508 """Start n engines by profile or profile_dir."""
509 self.program_args.extend(['cluster_dir=%s'%cluster_dir])
509 self.program_args.extend(['profile_dir=%s'%profile_dir])
510 self.cluster_dir = unicode(cluster_dir)
510 self.profile_dir = unicode(profile_dir)
511 self.n = n
511 self.n = n
512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 return super(MPIExecEngineSetLauncher, self).start(n)
513 return super(MPIExecEngineSetLauncher, self).start(n)
514
514
515 #-----------------------------------------------------------------------------
515 #-----------------------------------------------------------------------------
516 # SSH launchers
516 # SSH launchers
517 #-----------------------------------------------------------------------------
517 #-----------------------------------------------------------------------------
518
518
519 # TODO: Get SSH Launcher working again.
519 # TODO: Get SSH Launcher working again.
520
520
521 class SSHLauncher(LocalProcessLauncher):
521 class SSHLauncher(LocalProcessLauncher):
522 """A minimal launcher for ssh.
522 """A minimal launcher for ssh.
523
523
524 To be useful this will probably have to be extended to use the ``sshx``
524 To be useful this will probably have to be extended to use the ``sshx``
525 idea for environment variables. There could be other things this needs
525 idea for environment variables. There could be other things this needs
526 as well.
526 as well.
527 """
527 """
528
528
529 ssh_cmd = List(['ssh'], config=True,
529 ssh_cmd = List(['ssh'], config=True,
530 help="command for starting ssh")
530 help="command for starting ssh")
531 ssh_args = List(['-tt'], config=True,
531 ssh_args = List(['-tt'], config=True,
532 help="args to pass to ssh")
532 help="args to pass to ssh")
533 program = List(['date'], config=True,
533 program = List(['date'], config=True,
534 help="Program to launch via ssh")
534 help="Program to launch via ssh")
535 program_args = List([], config=True,
535 program_args = List([], config=True,
536 help="args to pass to remote program")
536 help="args to pass to remote program")
537 hostname = Unicode('', config=True,
537 hostname = Unicode('', config=True,
538 help="hostname on which to launch the program")
538 help="hostname on which to launch the program")
539 user = Unicode('', config=True,
539 user = Unicode('', config=True,
540 help="username for ssh")
540 help="username for ssh")
541 location = Unicode('', config=True,
541 location = Unicode('', config=True,
542 help="user@hostname location for ssh in one setting")
542 help="user@hostname location for ssh in one setting")
543
543
544 def _hostname_changed(self, name, old, new):
544 def _hostname_changed(self, name, old, new):
545 if self.user:
545 if self.user:
546 self.location = u'%s@%s' % (self.user, new)
546 self.location = u'%s@%s' % (self.user, new)
547 else:
547 else:
548 self.location = new
548 self.location = new
549
549
550 def _user_changed(self, name, old, new):
550 def _user_changed(self, name, old, new):
551 self.location = u'%s@%s' % (new, self.hostname)
551 self.location = u'%s@%s' % (new, self.hostname)
552
552
553 def find_args(self):
553 def find_args(self):
554 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 self.program + self.program_args
555 self.program + self.program_args
556
556
557 def start(self, cluster_dir, hostname=None, user=None):
557 def start(self, profile_dir, hostname=None, user=None):
558 self.cluster_dir = unicode(cluster_dir)
558 self.profile_dir = unicode(profile_dir)
559 if hostname is not None:
559 if hostname is not None:
560 self.hostname = hostname
560 self.hostname = hostname
561 if user is not None:
561 if user is not None:
562 self.user = user
562 self.user = user
563
563
564 return super(SSHLauncher, self).start()
564 return super(SSHLauncher, self).start()
565
565
566 def signal(self, sig):
566 def signal(self, sig):
567 if self.state == 'running':
567 if self.state == 'running':
568 # send escaped ssh connection-closer
568 # send escaped ssh connection-closer
569 self.process.stdin.write('~.')
569 self.process.stdin.write('~.')
570 self.process.stdin.flush()
570 self.process.stdin.flush()
571
571
572
572
573
573
574 class SSHControllerLauncher(SSHLauncher):
574 class SSHControllerLauncher(SSHLauncher):
575
575
576 program = List(ipcontroller_cmd_argv, config=True,
576 program = List(ipcontroller_cmd_argv, config=True,
577 help="remote ipcontroller command.")
577 help="remote ipcontroller command.")
578 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
578 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
579 help="Command line arguments to ipcontroller.")
579 help="Command line arguments to ipcontroller.")
580
580
581
581
582 class SSHEngineLauncher(SSHLauncher):
582 class SSHEngineLauncher(SSHLauncher):
583 program = List(ipengine_cmd_argv, config=True,
583 program = List(ipengine_cmd_argv, config=True,
584 help="remote ipengine command.")
584 help="remote ipengine command.")
585 # Command line arguments for ipengine.
585 # Command line arguments for ipengine.
586 program_args = List(
586 program_args = List(
587 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
587 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
588 help="Command line arguments to ipengine."
588 help="Command line arguments to ipengine."
589 )
589 )
590
590
591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
592 launcher_class = SSHEngineLauncher
592 launcher_class = SSHEngineLauncher
593 engines = Dict(config=True,
593 engines = Dict(config=True,
594 help="""dict of engines to launch. This is a dict by hostname of ints,
594 help="""dict of engines to launch. This is a dict by hostname of ints,
595 corresponding to the number of engines to start on that host.""")
595 corresponding to the number of engines to start on that host.""")
596
596
597 def start(self, n, cluster_dir):
597 def start(self, n, profile_dir):
598 """Start engines by profile or cluster_dir.
598 """Start engines by profile or profile_dir.
599 `n` is ignored, and the `engines` config property is used instead.
599 `n` is ignored, and the `engines` config property is used instead.
600 """
600 """
601
601
602 self.cluster_dir = unicode(cluster_dir)
602 self.profile_dir = unicode(profile_dir)
603 dlist = []
603 dlist = []
604 for host, n in self.engines.iteritems():
604 for host, n in self.engines.iteritems():
605 if isinstance(n, (tuple, list)):
605 if isinstance(n, (tuple, list)):
606 n, args = n
606 n, args = n
607 else:
607 else:
608 args = copy.deepcopy(self.engine_args)
608 args = copy.deepcopy(self.engine_args)
609
609
610 if '@' in host:
610 if '@' in host:
611 user,host = host.split('@',1)
611 user,host = host.split('@',1)
612 else:
612 else:
613 user=None
613 user=None
614 for i in range(n):
614 for i in range(n):
615 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
615 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
616
616
617 # Copy the engine args over to each engine launcher.
617 # Copy the engine args over to each engine launcher.
618 i
618 i
619 el.program_args = args
619 el.program_args = args
620 el.on_stop(self._notice_engine_stopped)
620 el.on_stop(self._notice_engine_stopped)
621 d = el.start(cluster_dir, user=user, hostname=host)
621 d = el.start(profile_dir, user=user, hostname=host)
622 if i==0:
622 if i==0:
623 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
623 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
624 self.launchers[host+str(i)] = el
624 self.launchers[host+str(i)] = el
625 dlist.append(d)
625 dlist.append(d)
626 self.notify_start(dlist)
626 self.notify_start(dlist)
627 return dlist
627 return dlist
628
628
629
629
630
630
631 #-----------------------------------------------------------------------------
631 #-----------------------------------------------------------------------------
632 # Windows HPC Server 2008 scheduler launchers
632 # Windows HPC Server 2008 scheduler launchers
633 #-----------------------------------------------------------------------------
633 #-----------------------------------------------------------------------------
634
634
635
635
636 # This is only used on Windows.
636 # This is only used on Windows.
637 def find_job_cmd():
637 def find_job_cmd():
638 if WINDOWS:
638 if WINDOWS:
639 try:
639 try:
640 return find_cmd('job')
640 return find_cmd('job')
641 except (FindCmdError, ImportError):
641 except (FindCmdError, ImportError):
642 # ImportError will be raised if win32api is not installed
642 # ImportError will be raised if win32api is not installed
643 return 'job'
643 return 'job'
644 else:
644 else:
645 return 'job'
645 return 'job'
646
646
647
647
648 class WindowsHPCLauncher(BaseLauncher):
648 class WindowsHPCLauncher(BaseLauncher):
649
649
650 job_id_regexp = Unicode(r'\d+', config=True,
650 job_id_regexp = Unicode(r'\d+', config=True,
651 help="""A regular expression used to get the job id from the output of the
651 help="""A regular expression used to get the job id from the output of the
652 submit_command. """
652 submit_command. """
653 )
653 )
654 job_file_name = Unicode(u'ipython_job.xml', config=True,
654 job_file_name = Unicode(u'ipython_job.xml', config=True,
655 help="The filename of the instantiated job script.")
655 help="The filename of the instantiated job script.")
656 # The full path to the instantiated job script. This gets made dynamically
656 # The full path to the instantiated job script. This gets made dynamically
657 # by combining the work_dir with the job_file_name.
657 # by combining the work_dir with the job_file_name.
658 job_file = Unicode(u'')
658 job_file = Unicode(u'')
659 scheduler = Unicode('', config=True,
659 scheduler = Unicode('', config=True,
660 help="The hostname of the scheduler to submit the job to.")
660 help="The hostname of the scheduler to submit the job to.")
661 job_cmd = Unicode(find_job_cmd(), config=True,
661 job_cmd = Unicode(find_job_cmd(), config=True,
662 help="The command for submitting jobs.")
662 help="The command for submitting jobs.")
663
663
664 def __init__(self, work_dir=u'.', config=None, **kwargs):
664 def __init__(self, work_dir=u'.', config=None, **kwargs):
665 super(WindowsHPCLauncher, self).__init__(
665 super(WindowsHPCLauncher, self).__init__(
666 work_dir=work_dir, config=config, **kwargs
666 work_dir=work_dir, config=config, **kwargs
667 )
667 )
668
668
669 @property
669 @property
670 def job_file(self):
670 def job_file(self):
671 return os.path.join(self.work_dir, self.job_file_name)
671 return os.path.join(self.work_dir, self.job_file_name)
672
672
673 def write_job_file(self, n):
673 def write_job_file(self, n):
674 raise NotImplementedError("Implement write_job_file in a subclass.")
674 raise NotImplementedError("Implement write_job_file in a subclass.")
675
675
676 def find_args(self):
676 def find_args(self):
677 return [u'job.exe']
677 return [u'job.exe']
678
678
679 def parse_job_id(self, output):
679 def parse_job_id(self, output):
680 """Take the output of the submit command and return the job id."""
680 """Take the output of the submit command and return the job id."""
681 m = re.search(self.job_id_regexp, output)
681 m = re.search(self.job_id_regexp, output)
682 if m is not None:
682 if m is not None:
683 job_id = m.group()
683 job_id = m.group()
684 else:
684 else:
685 raise LauncherError("Job id couldn't be determined: %s" % output)
685 raise LauncherError("Job id couldn't be determined: %s" % output)
686 self.job_id = job_id
686 self.job_id = job_id
687 self.log.info('Job started with job id: %r' % job_id)
687 self.log.info('Job started with job id: %r' % job_id)
688 return job_id
688 return job_id
689
689
690 def start(self, n):
690 def start(self, n):
691 """Start n copies of the process using the Win HPC job scheduler."""
691 """Start n copies of the process using the Win HPC job scheduler."""
692 self.write_job_file(n)
692 self.write_job_file(n)
693 args = [
693 args = [
694 'submit',
694 'submit',
695 '/jobfile:%s' % self.job_file,
695 '/jobfile:%s' % self.job_file,
696 '/scheduler:%s' % self.scheduler
696 '/scheduler:%s' % self.scheduler
697 ]
697 ]
698 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
698 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
699 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
699 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
700 output = check_output([self.job_cmd]+args,
700 output = check_output([self.job_cmd]+args,
701 env=os.environ,
701 env=os.environ,
702 cwd=self.work_dir,
702 cwd=self.work_dir,
703 stderr=STDOUT
703 stderr=STDOUT
704 )
704 )
705 job_id = self.parse_job_id(output)
705 job_id = self.parse_job_id(output)
706 self.notify_start(job_id)
706 self.notify_start(job_id)
707 return job_id
707 return job_id
708
708
709 def stop(self):
709 def stop(self):
710 args = [
710 args = [
711 'cancel',
711 'cancel',
712 self.job_id,
712 self.job_id,
713 '/scheduler:%s' % self.scheduler
713 '/scheduler:%s' % self.scheduler
714 ]
714 ]
715 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
715 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
716 try:
716 try:
717 output = check_output([self.job_cmd]+args,
717 output = check_output([self.job_cmd]+args,
718 env=os.environ,
718 env=os.environ,
719 cwd=self.work_dir,
719 cwd=self.work_dir,
720 stderr=STDOUT
720 stderr=STDOUT
721 )
721 )
722 except:
722 except:
723 output = 'The job already appears to be stoppped: %r' % self.job_id
723 output = 'The job already appears to be stoppped: %r' % self.job_id
724 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
724 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
725 return output
725 return output
726
726
727
727
728 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
728 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
729
729
730 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
730 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
731 help="WinHPC xml job file.")
731 help="WinHPC xml job file.")
732 extra_args = List([], config=False,
732 extra_args = List([], config=False,
733 help="extra args to pass to ipcontroller")
733 help="extra args to pass to ipcontroller")
734
734
735 def write_job_file(self, n):
735 def write_job_file(self, n):
736 job = IPControllerJob(config=self.config)
736 job = IPControllerJob(config=self.config)
737
737
738 t = IPControllerTask(config=self.config)
738 t = IPControllerTask(config=self.config)
739 # The tasks work directory is *not* the actual work directory of
739 # The tasks work directory is *not* the actual work directory of
740 # the controller. It is used as the base path for the stdout/stderr
740 # the controller. It is used as the base path for the stdout/stderr
741 # files that the scheduler redirects to.
741 # files that the scheduler redirects to.
742 t.work_directory = self.cluster_dir
742 t.work_directory = self.profile_dir
743 # Add the cluster_dir and from self.start().
743 # Add the profile_dir and from self.start().
744 t.controller_args.extend(self.extra_args)
744 t.controller_args.extend(self.extra_args)
745 job.add_task(t)
745 job.add_task(t)
746
746
747 self.log.info("Writing job description file: %s" % self.job_file)
747 self.log.info("Writing job description file: %s" % self.job_file)
748 job.write(self.job_file)
748 job.write(self.job_file)
749
749
750 @property
750 @property
751 def job_file(self):
751 def job_file(self):
752 return os.path.join(self.cluster_dir, self.job_file_name)
752 return os.path.join(self.profile_dir, self.job_file_name)
753
753
754 def start(self, cluster_dir):
754 def start(self, profile_dir):
755 """Start the controller by cluster_dir."""
755 """Start the controller by profile_dir."""
756 self.extra_args = ['cluster_dir=%s'%cluster_dir]
756 self.extra_args = ['profile_dir=%s'%profile_dir]
757 self.cluster_dir = unicode(cluster_dir)
757 self.profile_dir = unicode(profile_dir)
758 return super(WindowsHPCControllerLauncher, self).start(1)
758 return super(WindowsHPCControllerLauncher, self).start(1)
759
759
760
760
761 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
761 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
762
762
763 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
763 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
764 help="jobfile for ipengines job")
764 help="jobfile for ipengines job")
765 extra_args = List([], config=False,
765 extra_args = List([], config=False,
766 help="extra args to pas to ipengine")
766 help="extra args to pas to ipengine")
767
767
768 def write_job_file(self, n):
768 def write_job_file(self, n):
769 job = IPEngineSetJob(config=self.config)
769 job = IPEngineSetJob(config=self.config)
770
770
771 for i in range(n):
771 for i in range(n):
772 t = IPEngineTask(config=self.config)
772 t = IPEngineTask(config=self.config)
773 # The tasks work directory is *not* the actual work directory of
773 # The tasks work directory is *not* the actual work directory of
774 # the engine. It is used as the base path for the stdout/stderr
774 # the engine. It is used as the base path for the stdout/stderr
775 # files that the scheduler redirects to.
775 # files that the scheduler redirects to.
776 t.work_directory = self.cluster_dir
776 t.work_directory = self.profile_dir
777 # Add the cluster_dir and from self.start().
777 # Add the profile_dir and from self.start().
778 t.engine_args.extend(self.extra_args)
778 t.engine_args.extend(self.extra_args)
779 job.add_task(t)
779 job.add_task(t)
780
780
781 self.log.info("Writing job description file: %s" % self.job_file)
781 self.log.info("Writing job description file: %s" % self.job_file)
782 job.write(self.job_file)
782 job.write(self.job_file)
783
783
784 @property
784 @property
785 def job_file(self):
785 def job_file(self):
786 return os.path.join(self.cluster_dir, self.job_file_name)
786 return os.path.join(self.profile_dir, self.job_file_name)
787
787
788 def start(self, n, cluster_dir):
788 def start(self, n, profile_dir):
789 """Start the controller by cluster_dir."""
789 """Start the controller by profile_dir."""
790 self.extra_args = ['cluster_dir=%s'%cluster_dir]
790 self.extra_args = ['profile_dir=%s'%profile_dir]
791 self.cluster_dir = unicode(cluster_dir)
791 self.profile_dir = unicode(profile_dir)
792 return super(WindowsHPCEngineSetLauncher, self).start(n)
792 return super(WindowsHPCEngineSetLauncher, self).start(n)
793
793
794
794
795 #-----------------------------------------------------------------------------
795 #-----------------------------------------------------------------------------
796 # Batch (PBS) system launchers
796 # Batch (PBS) system launchers
797 #-----------------------------------------------------------------------------
797 #-----------------------------------------------------------------------------
798
798
799 class BatchSystemLauncher(BaseLauncher):
799 class BatchSystemLauncher(BaseLauncher):
800 """Launch an external process using a batch system.
800 """Launch an external process using a batch system.
801
801
802 This class is designed to work with UNIX batch systems like PBS, LSF,
802 This class is designed to work with UNIX batch systems like PBS, LSF,
803 GridEngine, etc. The overall model is that there are different commands
803 GridEngine, etc. The overall model is that there are different commands
804 like qsub, qdel, etc. that handle the starting and stopping of the process.
804 like qsub, qdel, etc. that handle the starting and stopping of the process.
805
805
806 This class also has the notion of a batch script. The ``batch_template``
806 This class also has the notion of a batch script. The ``batch_template``
807 attribute can be set to a string that is a template for the batch script.
807 attribute can be set to a string that is a template for the batch script.
808 This template is instantiated using Itpl. Thus the template can use
808 This template is instantiated using Itpl. Thus the template can use
809 ${n} fot the number of instances. Subclasses can add additional variables
809 ${n} fot the number of instances. Subclasses can add additional variables
810 to the template dict.
810 to the template dict.
811 """
811 """
812
812
813 # Subclasses must fill these in. See PBSEngineSet
813 # Subclasses must fill these in. See PBSEngineSet
814 submit_command = List([''], config=True,
814 submit_command = List([''], config=True,
815 help="The name of the command line program used to submit jobs.")
815 help="The name of the command line program used to submit jobs.")
816 delete_command = List([''], config=True,
816 delete_command = List([''], config=True,
817 help="The name of the command line program used to delete jobs.")
817 help="The name of the command line program used to delete jobs.")
818 job_id_regexp = Unicode('', config=True,
818 job_id_regexp = Unicode('', config=True,
819 help="""A regular expression used to get the job id from the output of the
819 help="""A regular expression used to get the job id from the output of the
820 submit_command.""")
820 submit_command.""")
821 batch_template = Unicode('', config=True,
821 batch_template = Unicode('', config=True,
822 help="The string that is the batch script template itself.")
822 help="The string that is the batch script template itself.")
823 batch_template_file = Unicode(u'', config=True,
823 batch_template_file = Unicode(u'', config=True,
824 help="The file that contains the batch template.")
824 help="The file that contains the batch template.")
825 batch_file_name = Unicode(u'batch_script', config=True,
825 batch_file_name = Unicode(u'batch_script', config=True,
826 help="The filename of the instantiated batch script.")
826 help="The filename of the instantiated batch script.")
827 queue = Unicode(u'', config=True,
827 queue = Unicode(u'', config=True,
828 help="The PBS Queue.")
828 help="The PBS Queue.")
829
829
830 # not configurable, override in subclasses
830 # not configurable, override in subclasses
831 # PBS Job Array regex
831 # PBS Job Array regex
832 job_array_regexp = Unicode('')
832 job_array_regexp = Unicode('')
833 job_array_template = Unicode('')
833 job_array_template = Unicode('')
834 # PBS Queue regex
834 # PBS Queue regex
835 queue_regexp = Unicode('')
835 queue_regexp = Unicode('')
836 queue_template = Unicode('')
836 queue_template = Unicode('')
837 # The default batch template, override in subclasses
837 # The default batch template, override in subclasses
838 default_template = Unicode('')
838 default_template = Unicode('')
839 # The full path to the instantiated batch script.
839 # The full path to the instantiated batch script.
840 batch_file = Unicode(u'')
840 batch_file = Unicode(u'')
841 # the format dict used with batch_template:
841 # the format dict used with batch_template:
842 context = Dict()
842 context = Dict()
843
843
844
844
845 def find_args(self):
845 def find_args(self):
846 return self.submit_command + [self.batch_file]
846 return self.submit_command + [self.batch_file]
847
847
848 def __init__(self, work_dir=u'.', config=None, **kwargs):
848 def __init__(self, work_dir=u'.', config=None, **kwargs):
849 super(BatchSystemLauncher, self).__init__(
849 super(BatchSystemLauncher, self).__init__(
850 work_dir=work_dir, config=config, **kwargs
850 work_dir=work_dir, config=config, **kwargs
851 )
851 )
852 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
852 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
853
853
854 def parse_job_id(self, output):
854 def parse_job_id(self, output):
855 """Take the output of the submit command and return the job id."""
855 """Take the output of the submit command and return the job id."""
856 m = re.search(self.job_id_regexp, output)
856 m = re.search(self.job_id_regexp, output)
857 if m is not None:
857 if m is not None:
858 job_id = m.group()
858 job_id = m.group()
859 else:
859 else:
860 raise LauncherError("Job id couldn't be determined: %s" % output)
860 raise LauncherError("Job id couldn't be determined: %s" % output)
861 self.job_id = job_id
861 self.job_id = job_id
862 self.log.info('Job submitted with job id: %r' % job_id)
862 self.log.info('Job submitted with job id: %r' % job_id)
863 return job_id
863 return job_id
864
864
865 def write_batch_script(self, n):
865 def write_batch_script(self, n):
866 """Instantiate and write the batch script to the work_dir."""
866 """Instantiate and write the batch script to the work_dir."""
867 self.context['n'] = n
867 self.context['n'] = n
868 self.context['queue'] = self.queue
868 self.context['queue'] = self.queue
869 print self.context
869 print self.context
870 # first priority is batch_template if set
870 # first priority is batch_template if set
871 if self.batch_template_file and not self.batch_template:
871 if self.batch_template_file and not self.batch_template:
872 # second priority is batch_template_file
872 # second priority is batch_template_file
873 with open(self.batch_template_file) as f:
873 with open(self.batch_template_file) as f:
874 self.batch_template = f.read()
874 self.batch_template = f.read()
875 if not self.batch_template:
875 if not self.batch_template:
876 # third (last) priority is default_template
876 # third (last) priority is default_template
877 self.batch_template = self.default_template
877 self.batch_template = self.default_template
878
878
879 regex = re.compile(self.job_array_regexp)
879 regex = re.compile(self.job_array_regexp)
880 # print regex.search(self.batch_template)
880 # print regex.search(self.batch_template)
881 if not regex.search(self.batch_template):
881 if not regex.search(self.batch_template):
882 self.log.info("adding job array settings to batch script")
882 self.log.info("adding job array settings to batch script")
883 firstline, rest = self.batch_template.split('\n',1)
883 firstline, rest = self.batch_template.split('\n',1)
884 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
884 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
885
885
886 regex = re.compile(self.queue_regexp)
886 regex = re.compile(self.queue_regexp)
887 # print regex.search(self.batch_template)
887 # print regex.search(self.batch_template)
888 if self.queue and not regex.search(self.batch_template):
888 if self.queue and not regex.search(self.batch_template):
889 self.log.info("adding PBS queue settings to batch script")
889 self.log.info("adding PBS queue settings to batch script")
890 firstline, rest = self.batch_template.split('\n',1)
890 firstline, rest = self.batch_template.split('\n',1)
891 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
891 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
892
892
893 script_as_string = Itpl.itplns(self.batch_template, self.context)
893 script_as_string = Itpl.itplns(self.batch_template, self.context)
894 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
894 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
895
895
896 with open(self.batch_file, 'w') as f:
896 with open(self.batch_file, 'w') as f:
897 f.write(script_as_string)
897 f.write(script_as_string)
898 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
898 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
899
899
900 def start(self, n, cluster_dir):
900 def start(self, n, profile_dir):
901 """Start n copies of the process using a batch system."""
901 """Start n copies of the process using a batch system."""
902 # Here we save profile and cluster_dir in the context so they
902 # Here we save profile and profile_dir in the context so they
903 # can be used in the batch script template as ${profile} and
903 # can be used in the batch script template as ${profile} and
904 # ${cluster_dir}
904 # ${profile_dir}
905 self.context['cluster_dir'] = cluster_dir
905 self.context['profile_dir'] = profile_dir
906 self.cluster_dir = unicode(cluster_dir)
906 self.profile_dir = unicode(profile_dir)
907 self.write_batch_script(n)
907 self.write_batch_script(n)
908 output = check_output(self.args, env=os.environ)
908 output = check_output(self.args, env=os.environ)
909
909
910 job_id = self.parse_job_id(output)
910 job_id = self.parse_job_id(output)
911 self.notify_start(job_id)
911 self.notify_start(job_id)
912 return job_id
912 return job_id
913
913
914 def stop(self):
914 def stop(self):
915 output = check_output(self.delete_command+[self.job_id], env=os.environ)
915 output = check_output(self.delete_command+[self.job_id], env=os.environ)
916 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
916 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
917 return output
917 return output
918
918
919
919
920 class PBSLauncher(BatchSystemLauncher):
920 class PBSLauncher(BatchSystemLauncher):
921 """A BatchSystemLauncher subclass for PBS."""
921 """A BatchSystemLauncher subclass for PBS."""
922
922
923 submit_command = List(['qsub'], config=True,
923 submit_command = List(['qsub'], config=True,
924 help="The PBS submit command ['qsub']")
924 help="The PBS submit command ['qsub']")
925 delete_command = List(['qdel'], config=True,
925 delete_command = List(['qdel'], config=True,
926 help="The PBS delete command ['qsub']")
926 help="The PBS delete command ['qsub']")
927 job_id_regexp = Unicode(r'\d+', config=True,
927 job_id_regexp = Unicode(r'\d+', config=True,
928 help="Regular expresion for identifying the job ID [r'\d+']")
928 help="Regular expresion for identifying the job ID [r'\d+']")
929
929
930 batch_file = Unicode(u'')
930 batch_file = Unicode(u'')
931 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
931 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
932 job_array_template = Unicode('#PBS -t 1-$n')
932 job_array_template = Unicode('#PBS -t 1-$n')
933 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
933 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
934 queue_template = Unicode('#PBS -q $queue')
934 queue_template = Unicode('#PBS -q $queue')
935
935
936
936
937 class PBSControllerLauncher(PBSLauncher):
937 class PBSControllerLauncher(PBSLauncher):
938 """Launch a controller using PBS."""
938 """Launch a controller using PBS."""
939
939
940 batch_file_name = Unicode(u'pbs_controller', config=True,
940 batch_file_name = Unicode(u'pbs_controller', config=True,
941 help="batch file name for the controller job.")
941 help="batch file name for the controller job.")
942 default_template= Unicode("""#!/bin/sh
942 default_template= Unicode("""#!/bin/sh
943 #PBS -V
943 #PBS -V
944 #PBS -N ipcontroller
944 #PBS -N ipcontroller
945 %s --log-to-file cluster_dir $cluster_dir
945 %s --log-to-file profile_dir $profile_dir
946 """%(' '.join(ipcontroller_cmd_argv)))
946 """%(' '.join(ipcontroller_cmd_argv)))
947
947
948 def start(self, cluster_dir):
948 def start(self, profile_dir):
949 """Start the controller by profile or cluster_dir."""
949 """Start the controller by profile or profile_dir."""
950 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
950 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
951 return super(PBSControllerLauncher, self).start(1, cluster_dir)
951 return super(PBSControllerLauncher, self).start(1, profile_dir)
952
952
953
953
954 class PBSEngineSetLauncher(PBSLauncher):
954 class PBSEngineSetLauncher(PBSLauncher):
955 """Launch Engines using PBS"""
955 """Launch Engines using PBS"""
956 batch_file_name = Unicode(u'pbs_engines', config=True,
956 batch_file_name = Unicode(u'pbs_engines', config=True,
957 help="batch file name for the engine(s) job.")
957 help="batch file name for the engine(s) job.")
958 default_template= Unicode(u"""#!/bin/sh
958 default_template= Unicode(u"""#!/bin/sh
959 #PBS -V
959 #PBS -V
960 #PBS -N ipengine
960 #PBS -N ipengine
961 %s cluster_dir $cluster_dir
961 %s profile_dir $profile_dir
962 """%(' '.join(ipengine_cmd_argv)))
962 """%(' '.join(ipengine_cmd_argv)))
963
963
964 def start(self, n, cluster_dir):
964 def start(self, n, profile_dir):
965 """Start n engines by profile or cluster_dir."""
965 """Start n engines by profile or profile_dir."""
966 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
966 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
967 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
967 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
968
968
969 #SGE is very similar to PBS
969 #SGE is very similar to PBS
970
970
971 class SGELauncher(PBSLauncher):
971 class SGELauncher(PBSLauncher):
972 """Sun GridEngine is a PBS clone with slightly different syntax"""
972 """Sun GridEngine is a PBS clone with slightly different syntax"""
973 job_array_regexp = Unicode('#$$\W+-t\W+[\w\d\-\$]+')
973 job_array_regexp = Unicode('#$$\W+-t\W+[\w\d\-\$]+')
974 job_array_template = Unicode('#$$ -t 1-$n')
974 job_array_template = Unicode('#$$ -t 1-$n')
975 queue_regexp = Unicode('#$$\W+-q\W+\$?\w+')
975 queue_regexp = Unicode('#$$\W+-q\W+\$?\w+')
976 queue_template = Unicode('#$$ -q $queue')
976 queue_template = Unicode('#$$ -q $queue')
977
977
978 class SGEControllerLauncher(SGELauncher):
978 class SGEControllerLauncher(SGELauncher):
979 """Launch a controller using SGE."""
979 """Launch a controller using SGE."""
980
980
981 batch_file_name = Unicode(u'sge_controller', config=True,
981 batch_file_name = Unicode(u'sge_controller', config=True,
982 help="batch file name for the ipontroller job.")
982 help="batch file name for the ipontroller job.")
983 default_template= Unicode(u"""#$$ -V
983 default_template= Unicode(u"""#$$ -V
984 #$$ -S /bin/sh
984 #$$ -S /bin/sh
985 #$$ -N ipcontroller
985 #$$ -N ipcontroller
986 %s --log-to-file cluster_dir=$cluster_dir
986 %s --log-to-file profile_dir=$profile_dir
987 """%(' '.join(ipcontroller_cmd_argv)))
987 """%(' '.join(ipcontroller_cmd_argv)))
988
988
989 def start(self, cluster_dir):
989 def start(self, profile_dir):
990 """Start the controller by profile or cluster_dir."""
990 """Start the controller by profile or profile_dir."""
991 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
991 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
992 return super(PBSControllerLauncher, self).start(1, cluster_dir)
992 return super(PBSControllerLauncher, self).start(1, profile_dir)
993
993
994 class SGEEngineSetLauncher(SGELauncher):
994 class SGEEngineSetLauncher(SGELauncher):
995 """Launch Engines with SGE"""
995 """Launch Engines with SGE"""
996 batch_file_name = Unicode(u'sge_engines', config=True,
996 batch_file_name = Unicode(u'sge_engines', config=True,
997 help="batch file name for the engine(s) job.")
997 help="batch file name for the engine(s) job.")
998 default_template = Unicode("""#$$ -V
998 default_template = Unicode("""#$$ -V
999 #$$ -S /bin/sh
999 #$$ -S /bin/sh
1000 #$$ -N ipengine
1000 #$$ -N ipengine
1001 %s cluster_dir=$cluster_dir
1001 %s profile_dir=$profile_dir
1002 """%(' '.join(ipengine_cmd_argv)))
1002 """%(' '.join(ipengine_cmd_argv)))
1003
1003
1004 def start(self, n, cluster_dir):
1004 def start(self, n, profile_dir):
1005 """Start n engines by profile or cluster_dir."""
1005 """Start n engines by profile or profile_dir."""
1006 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1006 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1007 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
1007 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1008
1008
1009
1009
1010 #-----------------------------------------------------------------------------
1010 #-----------------------------------------------------------------------------
1011 # A launcher for ipcluster itself!
1011 # A launcher for ipcluster itself!
1012 #-----------------------------------------------------------------------------
1012 #-----------------------------------------------------------------------------
1013
1013
1014
1014
1015 class IPClusterLauncher(LocalProcessLauncher):
1015 class IPClusterLauncher(LocalProcessLauncher):
1016 """Launch the ipcluster program in an external process."""
1016 """Launch the ipcluster program in an external process."""
1017
1017
1018 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1018 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1019 help="Popen command for ipcluster")
1019 help="Popen command for ipcluster")
1020 ipcluster_args = List(
1020 ipcluster_args = List(
1021 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1021 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1022 help="Command line arguments to pass to ipcluster.")
1022 help="Command line arguments to pass to ipcluster.")
1023 ipcluster_subcommand = Unicode('start')
1023 ipcluster_subcommand = Unicode('start')
1024 ipcluster_n = Int(2)
1024 ipcluster_n = Int(2)
1025
1025
1026 def find_args(self):
1026 def find_args(self):
1027 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1027 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1028 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1028 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1029
1029
1030 def start(self):
1030 def start(self):
1031 self.log.info("Starting ipcluster: %r" % self.args)
1031 self.log.info("Starting ipcluster: %r" % self.args)
1032 return super(IPClusterLauncher, self).start()
1032 return super(IPClusterLauncher, self).start()
1033
1033
1034 #-----------------------------------------------------------------------------
1034 #-----------------------------------------------------------------------------
1035 # Collections of launchers
1035 # Collections of launchers
1036 #-----------------------------------------------------------------------------
1036 #-----------------------------------------------------------------------------
1037
1037
1038 local_launchers = [
1038 local_launchers = [
1039 LocalControllerLauncher,
1039 LocalControllerLauncher,
1040 LocalEngineLauncher,
1040 LocalEngineLauncher,
1041 LocalEngineSetLauncher,
1041 LocalEngineSetLauncher,
1042 ]
1042 ]
1043 mpi_launchers = [
1043 mpi_launchers = [
1044 MPIExecLauncher,
1044 MPIExecLauncher,
1045 MPIExecControllerLauncher,
1045 MPIExecControllerLauncher,
1046 MPIExecEngineSetLauncher,
1046 MPIExecEngineSetLauncher,
1047 ]
1047 ]
1048 ssh_launchers = [
1048 ssh_launchers = [
1049 SSHLauncher,
1049 SSHLauncher,
1050 SSHControllerLauncher,
1050 SSHControllerLauncher,
1051 SSHEngineLauncher,
1051 SSHEngineLauncher,
1052 SSHEngineSetLauncher,
1052 SSHEngineSetLauncher,
1053 ]
1053 ]
1054 winhpc_launchers = [
1054 winhpc_launchers = [
1055 WindowsHPCLauncher,
1055 WindowsHPCLauncher,
1056 WindowsHPCControllerLauncher,
1056 WindowsHPCControllerLauncher,
1057 WindowsHPCEngineSetLauncher,
1057 WindowsHPCEngineSetLauncher,
1058 ]
1058 ]
1059 pbs_launchers = [
1059 pbs_launchers = [
1060 PBSLauncher,
1060 PBSLauncher,
1061 PBSControllerLauncher,
1061 PBSControllerLauncher,
1062 PBSEngineSetLauncher,
1062 PBSEngineSetLauncher,
1063 ]
1063 ]
1064 sge_launchers = [
1064 sge_launchers = [
1065 SGELauncher,
1065 SGELauncher,
1066 SGEControllerLauncher,
1066 SGEControllerLauncher,
1067 SGEEngineSetLauncher,
1067 SGEEngineSetLauncher,
1068 ]
1068 ]
1069 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1069 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1070 + pbs_launchers + sge_launchers No newline at end of file
1070 + pbs_launchers + sge_launchers
@@ -1,1356 +1,1356 b''
1 """A semi-synchronous Client for the ZMQ cluster"""
1 """A semi-synchronous Client for the ZMQ cluster"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import json
14 import json
15 import time
15 import time
16 import warnings
16 import warnings
17 from datetime import datetime
17 from datetime import datetime
18 from getpass import getpass
18 from getpass import getpass
19 from pprint import pprint
19 from pprint import pprint
20
20
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 import zmq
23 import zmq
24 # from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25
25
26 from IPython.utils.path import get_ipython_dir
26 from IPython.utils.path import get_ipython_dir
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
28 Dict, List, Bool, Set)
28 Dict, List, Bool, Set)
29 from IPython.external.decorator import decorator
29 from IPython.external.decorator import decorator
30 from IPython.external.ssh import tunnel
30 from IPython.external.ssh import tunnel
31
31
32 from IPython.parallel import error
32 from IPython.parallel import error
33 from IPython.parallel import streamsession as ss
33 from IPython.parallel import streamsession as ss
34 from IPython.parallel import util
34 from IPython.parallel import util
35
35
36 from .asyncresult import AsyncResult, AsyncHubResult
36 from .asyncresult import AsyncResult, AsyncHubResult
37 from IPython.parallel.apps.clusterdir import ClusterDir, ClusterDirError
37 from IPython.core.newapplication import ProfileDir, ProfileDirError
38 from .view import DirectView, LoadBalancedView
38 from .view import DirectView, LoadBalancedView
39
39
40 #--------------------------------------------------------------------------
40 #--------------------------------------------------------------------------
41 # Decorators for Client methods
41 # Decorators for Client methods
42 #--------------------------------------------------------------------------
42 #--------------------------------------------------------------------------
43
43
44 @decorator
44 @decorator
45 def spin_first(f, self, *args, **kwargs):
45 def spin_first(f, self, *args, **kwargs):
46 """Call spin() to sync state prior to calling the method."""
46 """Call spin() to sync state prior to calling the method."""
47 self.spin()
47 self.spin()
48 return f(self, *args, **kwargs)
48 return f(self, *args, **kwargs)
49
49
50
50
51 #--------------------------------------------------------------------------
51 #--------------------------------------------------------------------------
52 # Classes
52 # Classes
53 #--------------------------------------------------------------------------
53 #--------------------------------------------------------------------------
54
54
55 class Metadata(dict):
55 class Metadata(dict):
56 """Subclass of dict for initializing metadata values.
56 """Subclass of dict for initializing metadata values.
57
57
58 Attribute access works on keys.
58 Attribute access works on keys.
59
59
60 These objects have a strict set of keys - errors will raise if you try
60 These objects have a strict set of keys - errors will raise if you try
61 to add new keys.
61 to add new keys.
62 """
62 """
63 def __init__(self, *args, **kwargs):
63 def __init__(self, *args, **kwargs):
64 dict.__init__(self)
64 dict.__init__(self)
65 md = {'msg_id' : None,
65 md = {'msg_id' : None,
66 'submitted' : None,
66 'submitted' : None,
67 'started' : None,
67 'started' : None,
68 'completed' : None,
68 'completed' : None,
69 'received' : None,
69 'received' : None,
70 'engine_uuid' : None,
70 'engine_uuid' : None,
71 'engine_id' : None,
71 'engine_id' : None,
72 'follow' : None,
72 'follow' : None,
73 'after' : None,
73 'after' : None,
74 'status' : None,
74 'status' : None,
75
75
76 'pyin' : None,
76 'pyin' : None,
77 'pyout' : None,
77 'pyout' : None,
78 'pyerr' : None,
78 'pyerr' : None,
79 'stdout' : '',
79 'stdout' : '',
80 'stderr' : '',
80 'stderr' : '',
81 }
81 }
82 self.update(md)
82 self.update(md)
83 self.update(dict(*args, **kwargs))
83 self.update(dict(*args, **kwargs))
84
84
85 def __getattr__(self, key):
85 def __getattr__(self, key):
86 """getattr aliased to getitem"""
86 """getattr aliased to getitem"""
87 if key in self.iterkeys():
87 if key in self.iterkeys():
88 return self[key]
88 return self[key]
89 else:
89 else:
90 raise AttributeError(key)
90 raise AttributeError(key)
91
91
92 def __setattr__(self, key, value):
92 def __setattr__(self, key, value):
93 """setattr aliased to setitem, with strict"""
93 """setattr aliased to setitem, with strict"""
94 if key in self.iterkeys():
94 if key in self.iterkeys():
95 self[key] = value
95 self[key] = value
96 else:
96 else:
97 raise AttributeError(key)
97 raise AttributeError(key)
98
98
99 def __setitem__(self, key, value):
99 def __setitem__(self, key, value):
100 """strict static key enforcement"""
100 """strict static key enforcement"""
101 if key in self.iterkeys():
101 if key in self.iterkeys():
102 dict.__setitem__(self, key, value)
102 dict.__setitem__(self, key, value)
103 else:
103 else:
104 raise KeyError(key)
104 raise KeyError(key)
105
105
106
106
107 class Client(HasTraits):
107 class Client(HasTraits):
108 """A semi-synchronous client to the IPython ZMQ cluster
108 """A semi-synchronous client to the IPython ZMQ cluster
109
109
110 Parameters
110 Parameters
111 ----------
111 ----------
112
112
113 url_or_file : bytes; zmq url or path to ipcontroller-client.json
113 url_or_file : bytes; zmq url or path to ipcontroller-client.json
114 Connection information for the Hub's registration. If a json connector
114 Connection information for the Hub's registration. If a json connector
115 file is given, then likely no further configuration is necessary.
115 file is given, then likely no further configuration is necessary.
116 [Default: use profile]
116 [Default: use profile]
117 profile : bytes
117 profile : bytes
118 The name of the Cluster profile to be used to find connector information.
118 The name of the Cluster profile to be used to find connector information.
119 [Default: 'default']
119 [Default: 'default']
120 context : zmq.Context
120 context : zmq.Context
121 Pass an existing zmq.Context instance, otherwise the client will create its own.
121 Pass an existing zmq.Context instance, otherwise the client will create its own.
122 username : bytes
122 username : bytes
123 set username to be passed to the Session object
123 set username to be passed to the Session object
124 debug : bool
124 debug : bool
125 flag for lots of message printing for debug purposes
125 flag for lots of message printing for debug purposes
126
126
127 #-------------- ssh related args ----------------
127 #-------------- ssh related args ----------------
128 # These are args for configuring the ssh tunnel to be used
128 # These are args for configuring the ssh tunnel to be used
129 # credentials are used to forward connections over ssh to the Controller
129 # credentials are used to forward connections over ssh to the Controller
130 # Note that the ip given in `addr` needs to be relative to sshserver
130 # Note that the ip given in `addr` needs to be relative to sshserver
131 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
131 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
132 # and set sshserver as the same machine the Controller is on. However,
132 # and set sshserver as the same machine the Controller is on. However,
133 # the only requirement is that sshserver is able to see the Controller
133 # the only requirement is that sshserver is able to see the Controller
134 # (i.e. is within the same trusted network).
134 # (i.e. is within the same trusted network).
135
135
136 sshserver : str
136 sshserver : str
137 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
137 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
138 If keyfile or password is specified, and this is not, it will default to
138 If keyfile or password is specified, and this is not, it will default to
139 the ip given in addr.
139 the ip given in addr.
140 sshkey : str; path to public ssh key file
140 sshkey : str; path to public ssh key file
141 This specifies a key to be used in ssh login, default None.
141 This specifies a key to be used in ssh login, default None.
142 Regular default ssh keys will be used without specifying this argument.
142 Regular default ssh keys will be used without specifying this argument.
143 password : str
143 password : str
144 Your ssh password to sshserver. Note that if this is left None,
144 Your ssh password to sshserver. Note that if this is left None,
145 you will be prompted for it if passwordless key based login is unavailable.
145 you will be prompted for it if passwordless key based login is unavailable.
146 paramiko : bool
146 paramiko : bool
147 flag for whether to use paramiko instead of shell ssh for tunneling.
147 flag for whether to use paramiko instead of shell ssh for tunneling.
148 [default: True on win32, False else]
148 [default: True on win32, False else]
149
149
150 ------- exec authentication args -------
150 ------- exec authentication args -------
151 If even localhost is untrusted, you can have some protection against
151 If even localhost is untrusted, you can have some protection against
152 unauthorized execution by using a key. Messages are still sent
152 unauthorized execution by using a key. Messages are still sent
153 as cleartext, so if someone can snoop your loopback traffic this will
153 as cleartext, so if someone can snoop your loopback traffic this will
154 not help against malicious attacks.
154 not help against malicious attacks.
155
155
156 exec_key : str
156 exec_key : str
157 an authentication key or file containing a key
157 an authentication key or file containing a key
158 default: None
158 default: None
159
159
160
160
161 Attributes
161 Attributes
162 ----------
162 ----------
163
163
164 ids : list of int engine IDs
164 ids : list of int engine IDs
165 requesting the ids attribute always synchronizes
165 requesting the ids attribute always synchronizes
166 the registration state. To request ids without synchronization,
166 the registration state. To request ids without synchronization,
167 use semi-private _ids attributes.
167 use semi-private _ids attributes.
168
168
169 history : list of msg_ids
169 history : list of msg_ids
170 a list of msg_ids, keeping track of all the execution
170 a list of msg_ids, keeping track of all the execution
171 messages you have submitted in order.
171 messages you have submitted in order.
172
172
173 outstanding : set of msg_ids
173 outstanding : set of msg_ids
174 a set of msg_ids that have been submitted, but whose
174 a set of msg_ids that have been submitted, but whose
175 results have not yet been received.
175 results have not yet been received.
176
176
177 results : dict
177 results : dict
178 a dict of all our results, keyed by msg_id
178 a dict of all our results, keyed by msg_id
179
179
180 block : bool
180 block : bool
181 determines default behavior when block not specified
181 determines default behavior when block not specified
182 in execution methods
182 in execution methods
183
183
184 Methods
184 Methods
185 -------
185 -------
186
186
187 spin
187 spin
188 flushes incoming results and registration state changes
188 flushes incoming results and registration state changes
189 control methods spin, and requesting `ids` also ensures up to date
189 control methods spin, and requesting `ids` also ensures up to date
190
190
191 wait
191 wait
192 wait on one or more msg_ids
192 wait on one or more msg_ids
193
193
194 execution methods
194 execution methods
195 apply
195 apply
196 legacy: execute, run
196 legacy: execute, run
197
197
198 data movement
198 data movement
199 push, pull, scatter, gather
199 push, pull, scatter, gather
200
200
201 query methods
201 query methods
202 queue_status, get_result, purge, result_status
202 queue_status, get_result, purge, result_status
203
203
204 control methods
204 control methods
205 abort, shutdown
205 abort, shutdown
206
206
207 """
207 """
208
208
209
209
210 block = Bool(False)
210 block = Bool(False)
211 outstanding = Set()
211 outstanding = Set()
212 results = Instance('collections.defaultdict', (dict,))
212 results = Instance('collections.defaultdict', (dict,))
213 metadata = Instance('collections.defaultdict', (Metadata,))
213 metadata = Instance('collections.defaultdict', (Metadata,))
214 history = List()
214 history = List()
215 debug = Bool(False)
215 debug = Bool(False)
216 profile=Unicode('default')
216 profile=Unicode('default')
217
217
218 _outstanding_dict = Instance('collections.defaultdict', (set,))
218 _outstanding_dict = Instance('collections.defaultdict', (set,))
219 _ids = List()
219 _ids = List()
220 _connected=Bool(False)
220 _connected=Bool(False)
221 _ssh=Bool(False)
221 _ssh=Bool(False)
222 _context = Instance('zmq.Context')
222 _context = Instance('zmq.Context')
223 _config = Dict()
223 _config = Dict()
224 _engines=Instance(util.ReverseDict, (), {})
224 _engines=Instance(util.ReverseDict, (), {})
225 # _hub_socket=Instance('zmq.Socket')
225 # _hub_socket=Instance('zmq.Socket')
226 _query_socket=Instance('zmq.Socket')
226 _query_socket=Instance('zmq.Socket')
227 _control_socket=Instance('zmq.Socket')
227 _control_socket=Instance('zmq.Socket')
228 _iopub_socket=Instance('zmq.Socket')
228 _iopub_socket=Instance('zmq.Socket')
229 _notification_socket=Instance('zmq.Socket')
229 _notification_socket=Instance('zmq.Socket')
230 _mux_socket=Instance('zmq.Socket')
230 _mux_socket=Instance('zmq.Socket')
231 _task_socket=Instance('zmq.Socket')
231 _task_socket=Instance('zmq.Socket')
232 _task_scheme=Unicode()
232 _task_scheme=Unicode()
233 _closed = False
233 _closed = False
234 _ignored_control_replies=Int(0)
234 _ignored_control_replies=Int(0)
235 _ignored_hub_replies=Int(0)
235 _ignored_hub_replies=Int(0)
236
236
237 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
237 def __init__(self, url_or_file=None, profile='default', profile_dir=None, ipython_dir=None,
238 context=None, username=None, debug=False, exec_key=None,
238 context=None, username=None, debug=False, exec_key=None,
239 sshserver=None, sshkey=None, password=None, paramiko=None,
239 sshserver=None, sshkey=None, password=None, paramiko=None,
240 timeout=10
240 timeout=10
241 ):
241 ):
242 super(Client, self).__init__(debug=debug, profile=profile)
242 super(Client, self).__init__(debug=debug, profile=profile)
243 if context is None:
243 if context is None:
244 context = zmq.Context.instance()
244 context = zmq.Context.instance()
245 self._context = context
245 self._context = context
246
246
247
247
248 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
248 self._setup_profile_dir(profile, profile_dir, ipython_dir)
249 if self._cd is not None:
249 if self._cd is not None:
250 if url_or_file is None:
250 if url_or_file is None:
251 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
251 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
252 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
252 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
253 " Please specify at least one of url_or_file or profile."
253 " Please specify at least one of url_or_file or profile."
254
254
255 try:
255 try:
256 util.validate_url(url_or_file)
256 util.validate_url(url_or_file)
257 except AssertionError:
257 except AssertionError:
258 if not os.path.exists(url_or_file):
258 if not os.path.exists(url_or_file):
259 if self._cd:
259 if self._cd:
260 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
260 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
261 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
261 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
262 with open(url_or_file) as f:
262 with open(url_or_file) as f:
263 cfg = json.loads(f.read())
263 cfg = json.loads(f.read())
264 else:
264 else:
265 cfg = {'url':url_or_file}
265 cfg = {'url':url_or_file}
266
266
267 # sync defaults from args, json:
267 # sync defaults from args, json:
268 if sshserver:
268 if sshserver:
269 cfg['ssh'] = sshserver
269 cfg['ssh'] = sshserver
270 if exec_key:
270 if exec_key:
271 cfg['exec_key'] = exec_key
271 cfg['exec_key'] = exec_key
272 exec_key = cfg['exec_key']
272 exec_key = cfg['exec_key']
273 sshserver=cfg['ssh']
273 sshserver=cfg['ssh']
274 url = cfg['url']
274 url = cfg['url']
275 location = cfg.setdefault('location', None)
275 location = cfg.setdefault('location', None)
276 cfg['url'] = util.disambiguate_url(cfg['url'], location)
276 cfg['url'] = util.disambiguate_url(cfg['url'], location)
277 url = cfg['url']
277 url = cfg['url']
278
278
279 self._config = cfg
279 self._config = cfg
280
280
281 self._ssh = bool(sshserver or sshkey or password)
281 self._ssh = bool(sshserver or sshkey or password)
282 if self._ssh and sshserver is None:
282 if self._ssh and sshserver is None:
283 # default to ssh via localhost
283 # default to ssh via localhost
284 sshserver = url.split('://')[1].split(':')[0]
284 sshserver = url.split('://')[1].split(':')[0]
285 if self._ssh and password is None:
285 if self._ssh and password is None:
286 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
286 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
287 password=False
287 password=False
288 else:
288 else:
289 password = getpass("SSH Password for %s: "%sshserver)
289 password = getpass("SSH Password for %s: "%sshserver)
290 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
290 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
291 if exec_key is not None and os.path.isfile(exec_key):
291 if exec_key is not None and os.path.isfile(exec_key):
292 arg = 'keyfile'
292 arg = 'keyfile'
293 else:
293 else:
294 arg = 'key'
294 arg = 'key'
295 key_arg = {arg:exec_key}
295 key_arg = {arg:exec_key}
296 if username is None:
296 if username is None:
297 self.session = ss.StreamSession(**key_arg)
297 self.session = ss.StreamSession(**key_arg)
298 else:
298 else:
299 self.session = ss.StreamSession(username=username, **key_arg)
299 self.session = ss.StreamSession(username=username, **key_arg)
300 self._query_socket = self._context.socket(zmq.XREQ)
300 self._query_socket = self._context.socket(zmq.XREQ)
301 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
301 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
302 if self._ssh:
302 if self._ssh:
303 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
303 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
304 else:
304 else:
305 self._query_socket.connect(url)
305 self._query_socket.connect(url)
306
306
307 self.session.debug = self.debug
307 self.session.debug = self.debug
308
308
309 self._notification_handlers = {'registration_notification' : self._register_engine,
309 self._notification_handlers = {'registration_notification' : self._register_engine,
310 'unregistration_notification' : self._unregister_engine,
310 'unregistration_notification' : self._unregister_engine,
311 'shutdown_notification' : lambda msg: self.close(),
311 'shutdown_notification' : lambda msg: self.close(),
312 }
312 }
313 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
313 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
314 'apply_reply' : self._handle_apply_reply}
314 'apply_reply' : self._handle_apply_reply}
315 self._connect(sshserver, ssh_kwargs, timeout)
315 self._connect(sshserver, ssh_kwargs, timeout)
316
316
317 def __del__(self):
317 def __del__(self):
318 """cleanup sockets, but _not_ context."""
318 """cleanup sockets, but _not_ context."""
319 self.close()
319 self.close()
320
320
321 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
321 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
322 if ipython_dir is None:
322 if ipython_dir is None:
323 ipython_dir = get_ipython_dir()
323 ipython_dir = get_ipython_dir()
324 if cluster_dir is not None:
324 if profile_dir is not None:
325 try:
325 try:
326 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
326 self._cd = ProfileDir.find_profile_dir(profile_dir)
327 return
327 return
328 except ClusterDirError:
328 except ProfileDirError:
329 pass
329 pass
330 elif profile is not None:
330 elif profile is not None:
331 try:
331 try:
332 self._cd = ClusterDir.find_cluster_dir_by_profile(
332 self._cd = ProfileDir.find_profile_dir_by_name(
333 ipython_dir, profile)
333 ipython_dir, profile)
334 return
334 return
335 except ClusterDirError:
335 except ProfileDirError:
336 pass
336 pass
337 self._cd = None
337 self._cd = None
338
338
339 def _update_engines(self, engines):
339 def _update_engines(self, engines):
340 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
340 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
341 for k,v in engines.iteritems():
341 for k,v in engines.iteritems():
342 eid = int(k)
342 eid = int(k)
343 self._engines[eid] = bytes(v) # force not unicode
343 self._engines[eid] = bytes(v) # force not unicode
344 self._ids.append(eid)
344 self._ids.append(eid)
345 self._ids = sorted(self._ids)
345 self._ids = sorted(self._ids)
346 if sorted(self._engines.keys()) != range(len(self._engines)) and \
346 if sorted(self._engines.keys()) != range(len(self._engines)) and \
347 self._task_scheme == 'pure' and self._task_socket:
347 self._task_scheme == 'pure' and self._task_socket:
348 self._stop_scheduling_tasks()
348 self._stop_scheduling_tasks()
349
349
350 def _stop_scheduling_tasks(self):
350 def _stop_scheduling_tasks(self):
351 """Stop scheduling tasks because an engine has been unregistered
351 """Stop scheduling tasks because an engine has been unregistered
352 from a pure ZMQ scheduler.
352 from a pure ZMQ scheduler.
353 """
353 """
354 self._task_socket.close()
354 self._task_socket.close()
355 self._task_socket = None
355 self._task_socket = None
356 msg = "An engine has been unregistered, and we are using pure " +\
356 msg = "An engine has been unregistered, and we are using pure " +\
357 "ZMQ task scheduling. Task farming will be disabled."
357 "ZMQ task scheduling. Task farming will be disabled."
358 if self.outstanding:
358 if self.outstanding:
359 msg += " If you were running tasks when this happened, " +\
359 msg += " If you were running tasks when this happened, " +\
360 "some `outstanding` msg_ids may never resolve."
360 "some `outstanding` msg_ids may never resolve."
361 warnings.warn(msg, RuntimeWarning)
361 warnings.warn(msg, RuntimeWarning)
362
362
363 def _build_targets(self, targets):
363 def _build_targets(self, targets):
364 """Turn valid target IDs or 'all' into two lists:
364 """Turn valid target IDs or 'all' into two lists:
365 (int_ids, uuids).
365 (int_ids, uuids).
366 """
366 """
367 if not self._ids:
367 if not self._ids:
368 # flush notification socket if no engines yet, just in case
368 # flush notification socket if no engines yet, just in case
369 if not self.ids:
369 if not self.ids:
370 raise error.NoEnginesRegistered("Can't build targets without any engines")
370 raise error.NoEnginesRegistered("Can't build targets without any engines")
371
371
372 if targets is None:
372 if targets is None:
373 targets = self._ids
373 targets = self._ids
374 elif isinstance(targets, str):
374 elif isinstance(targets, str):
375 if targets.lower() == 'all':
375 if targets.lower() == 'all':
376 targets = self._ids
376 targets = self._ids
377 else:
377 else:
378 raise TypeError("%r not valid str target, must be 'all'"%(targets))
378 raise TypeError("%r not valid str target, must be 'all'"%(targets))
379 elif isinstance(targets, int):
379 elif isinstance(targets, int):
380 if targets < 0:
380 if targets < 0:
381 targets = self.ids[targets]
381 targets = self.ids[targets]
382 if targets not in self._ids:
382 if targets not in self._ids:
383 raise IndexError("No such engine: %i"%targets)
383 raise IndexError("No such engine: %i"%targets)
384 targets = [targets]
384 targets = [targets]
385
385
386 if isinstance(targets, slice):
386 if isinstance(targets, slice):
387 indices = range(len(self._ids))[targets]
387 indices = range(len(self._ids))[targets]
388 ids = self.ids
388 ids = self.ids
389 targets = [ ids[i] for i in indices ]
389 targets = [ ids[i] for i in indices ]
390
390
391 if not isinstance(targets, (tuple, list, xrange)):
391 if not isinstance(targets, (tuple, list, xrange)):
392 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
392 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
393
393
394 return [self._engines[t] for t in targets], list(targets)
394 return [self._engines[t] for t in targets], list(targets)
395
395
396 def _connect(self, sshserver, ssh_kwargs, timeout):
396 def _connect(self, sshserver, ssh_kwargs, timeout):
397 """setup all our socket connections to the cluster. This is called from
397 """setup all our socket connections to the cluster. This is called from
398 __init__."""
398 __init__."""
399
399
400 # Maybe allow reconnecting?
400 # Maybe allow reconnecting?
401 if self._connected:
401 if self._connected:
402 return
402 return
403 self._connected=True
403 self._connected=True
404
404
405 def connect_socket(s, url):
405 def connect_socket(s, url):
406 url = util.disambiguate_url(url, self._config['location'])
406 url = util.disambiguate_url(url, self._config['location'])
407 if self._ssh:
407 if self._ssh:
408 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
408 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
409 else:
409 else:
410 return s.connect(url)
410 return s.connect(url)
411
411
412 self.session.send(self._query_socket, 'connection_request')
412 self.session.send(self._query_socket, 'connection_request')
413 r,w,x = zmq.select([self._query_socket],[],[], timeout)
413 r,w,x = zmq.select([self._query_socket],[],[], timeout)
414 if not r:
414 if not r:
415 raise error.TimeoutError("Hub connection request timed out")
415 raise error.TimeoutError("Hub connection request timed out")
416 idents,msg = self.session.recv(self._query_socket,mode=0)
416 idents,msg = self.session.recv(self._query_socket,mode=0)
417 if self.debug:
417 if self.debug:
418 pprint(msg)
418 pprint(msg)
419 msg = ss.Message(msg)
419 msg = ss.Message(msg)
420 content = msg.content
420 content = msg.content
421 self._config['registration'] = dict(content)
421 self._config['registration'] = dict(content)
422 if content.status == 'ok':
422 if content.status == 'ok':
423 if content.mux:
423 if content.mux:
424 self._mux_socket = self._context.socket(zmq.XREQ)
424 self._mux_socket = self._context.socket(zmq.XREQ)
425 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
425 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
426 connect_socket(self._mux_socket, content.mux)
426 connect_socket(self._mux_socket, content.mux)
427 if content.task:
427 if content.task:
428 self._task_scheme, task_addr = content.task
428 self._task_scheme, task_addr = content.task
429 self._task_socket = self._context.socket(zmq.XREQ)
429 self._task_socket = self._context.socket(zmq.XREQ)
430 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
430 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
431 connect_socket(self._task_socket, task_addr)
431 connect_socket(self._task_socket, task_addr)
432 if content.notification:
432 if content.notification:
433 self._notification_socket = self._context.socket(zmq.SUB)
433 self._notification_socket = self._context.socket(zmq.SUB)
434 connect_socket(self._notification_socket, content.notification)
434 connect_socket(self._notification_socket, content.notification)
435 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
435 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
436 # if content.query:
436 # if content.query:
437 # self._query_socket = self._context.socket(zmq.XREQ)
437 # self._query_socket = self._context.socket(zmq.XREQ)
438 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
438 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
439 # connect_socket(self._query_socket, content.query)
439 # connect_socket(self._query_socket, content.query)
440 if content.control:
440 if content.control:
441 self._control_socket = self._context.socket(zmq.XREQ)
441 self._control_socket = self._context.socket(zmq.XREQ)
442 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
442 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
443 connect_socket(self._control_socket, content.control)
443 connect_socket(self._control_socket, content.control)
444 if content.iopub:
444 if content.iopub:
445 self._iopub_socket = self._context.socket(zmq.SUB)
445 self._iopub_socket = self._context.socket(zmq.SUB)
446 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
446 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
447 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
447 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
448 connect_socket(self._iopub_socket, content.iopub)
448 connect_socket(self._iopub_socket, content.iopub)
449 self._update_engines(dict(content.engines))
449 self._update_engines(dict(content.engines))
450 else:
450 else:
451 self._connected = False
451 self._connected = False
452 raise Exception("Failed to connect!")
452 raise Exception("Failed to connect!")
453
453
454 #--------------------------------------------------------------------------
454 #--------------------------------------------------------------------------
455 # handlers and callbacks for incoming messages
455 # handlers and callbacks for incoming messages
456 #--------------------------------------------------------------------------
456 #--------------------------------------------------------------------------
457
457
458 def _unwrap_exception(self, content):
458 def _unwrap_exception(self, content):
459 """unwrap exception, and remap engine_id to int."""
459 """unwrap exception, and remap engine_id to int."""
460 e = error.unwrap_exception(content)
460 e = error.unwrap_exception(content)
461 # print e.traceback
461 # print e.traceback
462 if e.engine_info:
462 if e.engine_info:
463 e_uuid = e.engine_info['engine_uuid']
463 e_uuid = e.engine_info['engine_uuid']
464 eid = self._engines[e_uuid]
464 eid = self._engines[e_uuid]
465 e.engine_info['engine_id'] = eid
465 e.engine_info['engine_id'] = eid
466 return e
466 return e
467
467
468 def _extract_metadata(self, header, parent, content):
468 def _extract_metadata(self, header, parent, content):
469 md = {'msg_id' : parent['msg_id'],
469 md = {'msg_id' : parent['msg_id'],
470 'received' : datetime.now(),
470 'received' : datetime.now(),
471 'engine_uuid' : header.get('engine', None),
471 'engine_uuid' : header.get('engine', None),
472 'follow' : parent.get('follow', []),
472 'follow' : parent.get('follow', []),
473 'after' : parent.get('after', []),
473 'after' : parent.get('after', []),
474 'status' : content['status'],
474 'status' : content['status'],
475 }
475 }
476
476
477 if md['engine_uuid'] is not None:
477 if md['engine_uuid'] is not None:
478 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
478 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
479
479
480 if 'date' in parent:
480 if 'date' in parent:
481 md['submitted'] = datetime.strptime(parent['date'], util.ISO8601)
481 md['submitted'] = datetime.strptime(parent['date'], util.ISO8601)
482 if 'started' in header:
482 if 'started' in header:
483 md['started'] = datetime.strptime(header['started'], util.ISO8601)
483 md['started'] = datetime.strptime(header['started'], util.ISO8601)
484 if 'date' in header:
484 if 'date' in header:
485 md['completed'] = datetime.strptime(header['date'], util.ISO8601)
485 md['completed'] = datetime.strptime(header['date'], util.ISO8601)
486 return md
486 return md
487
487
488 def _register_engine(self, msg):
488 def _register_engine(self, msg):
489 """Register a new engine, and update our connection info."""
489 """Register a new engine, and update our connection info."""
490 content = msg['content']
490 content = msg['content']
491 eid = content['id']
491 eid = content['id']
492 d = {eid : content['queue']}
492 d = {eid : content['queue']}
493 self._update_engines(d)
493 self._update_engines(d)
494
494
495 def _unregister_engine(self, msg):
495 def _unregister_engine(self, msg):
496 """Unregister an engine that has died."""
496 """Unregister an engine that has died."""
497 content = msg['content']
497 content = msg['content']
498 eid = int(content['id'])
498 eid = int(content['id'])
499 if eid in self._ids:
499 if eid in self._ids:
500 self._ids.remove(eid)
500 self._ids.remove(eid)
501 uuid = self._engines.pop(eid)
501 uuid = self._engines.pop(eid)
502
502
503 self._handle_stranded_msgs(eid, uuid)
503 self._handle_stranded_msgs(eid, uuid)
504
504
505 if self._task_socket and self._task_scheme == 'pure':
505 if self._task_socket and self._task_scheme == 'pure':
506 self._stop_scheduling_tasks()
506 self._stop_scheduling_tasks()
507
507
508 def _handle_stranded_msgs(self, eid, uuid):
508 def _handle_stranded_msgs(self, eid, uuid):
509 """Handle messages known to be on an engine when the engine unregisters.
509 """Handle messages known to be on an engine when the engine unregisters.
510
510
511 It is possible that this will fire prematurely - that is, an engine will
511 It is possible that this will fire prematurely - that is, an engine will
512 go down after completing a result, and the client will be notified
512 go down after completing a result, and the client will be notified
513 of the unregistration and later receive the successful result.
513 of the unregistration and later receive the successful result.
514 """
514 """
515
515
516 outstanding = self._outstanding_dict[uuid]
516 outstanding = self._outstanding_dict[uuid]
517
517
518 for msg_id in list(outstanding):
518 for msg_id in list(outstanding):
519 if msg_id in self.results:
519 if msg_id in self.results:
520 # we already
520 # we already
521 continue
521 continue
522 try:
522 try:
523 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
523 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
524 except:
524 except:
525 content = error.wrap_exception()
525 content = error.wrap_exception()
526 # build a fake message:
526 # build a fake message:
527 parent = {}
527 parent = {}
528 header = {}
528 header = {}
529 parent['msg_id'] = msg_id
529 parent['msg_id'] = msg_id
530 header['engine'] = uuid
530 header['engine'] = uuid
531 header['date'] = datetime.now().strftime(util.ISO8601)
531 header['date'] = datetime.now().strftime(util.ISO8601)
532 msg = dict(parent_header=parent, header=header, content=content)
532 msg = dict(parent_header=parent, header=header, content=content)
533 self._handle_apply_reply(msg)
533 self._handle_apply_reply(msg)
534
534
535 def _handle_execute_reply(self, msg):
535 def _handle_execute_reply(self, msg):
536 """Save the reply to an execute_request into our results.
536 """Save the reply to an execute_request into our results.
537
537
538 execute messages are never actually used. apply is used instead.
538 execute messages are never actually used. apply is used instead.
539 """
539 """
540
540
541 parent = msg['parent_header']
541 parent = msg['parent_header']
542 msg_id = parent['msg_id']
542 msg_id = parent['msg_id']
543 if msg_id not in self.outstanding:
543 if msg_id not in self.outstanding:
544 if msg_id in self.history:
544 if msg_id in self.history:
545 print ("got stale result: %s"%msg_id)
545 print ("got stale result: %s"%msg_id)
546 else:
546 else:
547 print ("got unknown result: %s"%msg_id)
547 print ("got unknown result: %s"%msg_id)
548 else:
548 else:
549 self.outstanding.remove(msg_id)
549 self.outstanding.remove(msg_id)
550 self.results[msg_id] = self._unwrap_exception(msg['content'])
550 self.results[msg_id] = self._unwrap_exception(msg['content'])
551
551
552 def _handle_apply_reply(self, msg):
552 def _handle_apply_reply(self, msg):
553 """Save the reply to an apply_request into our results."""
553 """Save the reply to an apply_request into our results."""
554 parent = msg['parent_header']
554 parent = msg['parent_header']
555 msg_id = parent['msg_id']
555 msg_id = parent['msg_id']
556 if msg_id not in self.outstanding:
556 if msg_id not in self.outstanding:
557 if msg_id in self.history:
557 if msg_id in self.history:
558 print ("got stale result: %s"%msg_id)
558 print ("got stale result: %s"%msg_id)
559 print self.results[msg_id]
559 print self.results[msg_id]
560 print msg
560 print msg
561 else:
561 else:
562 print ("got unknown result: %s"%msg_id)
562 print ("got unknown result: %s"%msg_id)
563 else:
563 else:
564 self.outstanding.remove(msg_id)
564 self.outstanding.remove(msg_id)
565 content = msg['content']
565 content = msg['content']
566 header = msg['header']
566 header = msg['header']
567
567
568 # construct metadata:
568 # construct metadata:
569 md = self.metadata[msg_id]
569 md = self.metadata[msg_id]
570 md.update(self._extract_metadata(header, parent, content))
570 md.update(self._extract_metadata(header, parent, content))
571 # is this redundant?
571 # is this redundant?
572 self.metadata[msg_id] = md
572 self.metadata[msg_id] = md
573
573
574 e_outstanding = self._outstanding_dict[md['engine_uuid']]
574 e_outstanding = self._outstanding_dict[md['engine_uuid']]
575 if msg_id in e_outstanding:
575 if msg_id in e_outstanding:
576 e_outstanding.remove(msg_id)
576 e_outstanding.remove(msg_id)
577
577
578 # construct result:
578 # construct result:
579 if content['status'] == 'ok':
579 if content['status'] == 'ok':
580 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
580 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
581 elif content['status'] == 'aborted':
581 elif content['status'] == 'aborted':
582 self.results[msg_id] = error.TaskAborted(msg_id)
582 self.results[msg_id] = error.TaskAborted(msg_id)
583 elif content['status'] == 'resubmitted':
583 elif content['status'] == 'resubmitted':
584 # TODO: handle resubmission
584 # TODO: handle resubmission
585 pass
585 pass
586 else:
586 else:
587 self.results[msg_id] = self._unwrap_exception(content)
587 self.results[msg_id] = self._unwrap_exception(content)
588
588
589 def _flush_notifications(self):
589 def _flush_notifications(self):
590 """Flush notifications of engine registrations waiting
590 """Flush notifications of engine registrations waiting
591 in ZMQ queue."""
591 in ZMQ queue."""
592 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
592 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
593 while msg is not None:
593 while msg is not None:
594 if self.debug:
594 if self.debug:
595 pprint(msg)
595 pprint(msg)
596 msg = msg[-1]
596 msg = msg[-1]
597 msg_type = msg['msg_type']
597 msg_type = msg['msg_type']
598 handler = self._notification_handlers.get(msg_type, None)
598 handler = self._notification_handlers.get(msg_type, None)
599 if handler is None:
599 if handler is None:
600 raise Exception("Unhandled message type: %s"%msg.msg_type)
600 raise Exception("Unhandled message type: %s"%msg.msg_type)
601 else:
601 else:
602 handler(msg)
602 handler(msg)
603 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
603 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
604
604
605 def _flush_results(self, sock):
605 def _flush_results(self, sock):
606 """Flush task or queue results waiting in ZMQ queue."""
606 """Flush task or queue results waiting in ZMQ queue."""
607 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
607 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
608 while msg is not None:
608 while msg is not None:
609 if self.debug:
609 if self.debug:
610 pprint(msg)
610 pprint(msg)
611 msg = msg[-1]
611 msg = msg[-1]
612 msg_type = msg['msg_type']
612 msg_type = msg['msg_type']
613 handler = self._queue_handlers.get(msg_type, None)
613 handler = self._queue_handlers.get(msg_type, None)
614 if handler is None:
614 if handler is None:
615 raise Exception("Unhandled message type: %s"%msg.msg_type)
615 raise Exception("Unhandled message type: %s"%msg.msg_type)
616 else:
616 else:
617 handler(msg)
617 handler(msg)
618 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
618 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
619
619
620 def _flush_control(self, sock):
620 def _flush_control(self, sock):
621 """Flush replies from the control channel waiting
621 """Flush replies from the control channel waiting
622 in the ZMQ queue.
622 in the ZMQ queue.
623
623
624 Currently: ignore them."""
624 Currently: ignore them."""
625 if self._ignored_control_replies <= 0:
625 if self._ignored_control_replies <= 0:
626 return
626 return
627 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
627 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
628 while msg is not None:
628 while msg is not None:
629 self._ignored_control_replies -= 1
629 self._ignored_control_replies -= 1
630 if self.debug:
630 if self.debug:
631 pprint(msg)
631 pprint(msg)
632 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
632 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
633
633
634 def _flush_ignored_control(self):
634 def _flush_ignored_control(self):
635 """flush ignored control replies"""
635 """flush ignored control replies"""
636 while self._ignored_control_replies > 0:
636 while self._ignored_control_replies > 0:
637 self.session.recv(self._control_socket)
637 self.session.recv(self._control_socket)
638 self._ignored_control_replies -= 1
638 self._ignored_control_replies -= 1
639
639
640 def _flush_ignored_hub_replies(self):
640 def _flush_ignored_hub_replies(self):
641 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
641 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
642 while msg is not None:
642 while msg is not None:
643 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
643 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
644
644
645 def _flush_iopub(self, sock):
645 def _flush_iopub(self, sock):
646 """Flush replies from the iopub channel waiting
646 """Flush replies from the iopub channel waiting
647 in the ZMQ queue.
647 in the ZMQ queue.
648 """
648 """
649 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
649 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
650 while msg is not None:
650 while msg is not None:
651 if self.debug:
651 if self.debug:
652 pprint(msg)
652 pprint(msg)
653 msg = msg[-1]
653 msg = msg[-1]
654 parent = msg['parent_header']
654 parent = msg['parent_header']
655 msg_id = parent['msg_id']
655 msg_id = parent['msg_id']
656 content = msg['content']
656 content = msg['content']
657 header = msg['header']
657 header = msg['header']
658 msg_type = msg['msg_type']
658 msg_type = msg['msg_type']
659
659
660 # init metadata:
660 # init metadata:
661 md = self.metadata[msg_id]
661 md = self.metadata[msg_id]
662
662
663 if msg_type == 'stream':
663 if msg_type == 'stream':
664 name = content['name']
664 name = content['name']
665 s = md[name] or ''
665 s = md[name] or ''
666 md[name] = s + content['data']
666 md[name] = s + content['data']
667 elif msg_type == 'pyerr':
667 elif msg_type == 'pyerr':
668 md.update({'pyerr' : self._unwrap_exception(content)})
668 md.update({'pyerr' : self._unwrap_exception(content)})
669 elif msg_type == 'pyin':
669 elif msg_type == 'pyin':
670 md.update({'pyin' : content['code']})
670 md.update({'pyin' : content['code']})
671 else:
671 else:
672 md.update({msg_type : content.get('data', '')})
672 md.update({msg_type : content.get('data', '')})
673
673
674 # reduntant?
674 # reduntant?
675 self.metadata[msg_id] = md
675 self.metadata[msg_id] = md
676
676
677 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
677 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
678
678
679 #--------------------------------------------------------------------------
679 #--------------------------------------------------------------------------
680 # len, getitem
680 # len, getitem
681 #--------------------------------------------------------------------------
681 #--------------------------------------------------------------------------
682
682
683 def __len__(self):
683 def __len__(self):
684 """len(client) returns # of engines."""
684 """len(client) returns # of engines."""
685 return len(self.ids)
685 return len(self.ids)
686
686
687 def __getitem__(self, key):
687 def __getitem__(self, key):
688 """index access returns DirectView multiplexer objects
688 """index access returns DirectView multiplexer objects
689
689
690 Must be int, slice, or list/tuple/xrange of ints"""
690 Must be int, slice, or list/tuple/xrange of ints"""
691 if not isinstance(key, (int, slice, tuple, list, xrange)):
691 if not isinstance(key, (int, slice, tuple, list, xrange)):
692 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
692 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
693 else:
693 else:
694 return self.direct_view(key)
694 return self.direct_view(key)
695
695
696 #--------------------------------------------------------------------------
696 #--------------------------------------------------------------------------
697 # Begin public methods
697 # Begin public methods
698 #--------------------------------------------------------------------------
698 #--------------------------------------------------------------------------
699
699
700 @property
700 @property
701 def ids(self):
701 def ids(self):
702 """Always up-to-date ids property."""
702 """Always up-to-date ids property."""
703 self._flush_notifications()
703 self._flush_notifications()
704 # always copy:
704 # always copy:
705 return list(self._ids)
705 return list(self._ids)
706
706
707 def close(self):
707 def close(self):
708 if self._closed:
708 if self._closed:
709 return
709 return
710 snames = filter(lambda n: n.endswith('socket'), dir(self))
710 snames = filter(lambda n: n.endswith('socket'), dir(self))
711 for socket in map(lambda name: getattr(self, name), snames):
711 for socket in map(lambda name: getattr(self, name), snames):
712 if isinstance(socket, zmq.Socket) and not socket.closed:
712 if isinstance(socket, zmq.Socket) and not socket.closed:
713 socket.close()
713 socket.close()
714 self._closed = True
714 self._closed = True
715
715
716 def spin(self):
716 def spin(self):
717 """Flush any registration notifications and execution results
717 """Flush any registration notifications and execution results
718 waiting in the ZMQ queue.
718 waiting in the ZMQ queue.
719 """
719 """
720 if self._notification_socket:
720 if self._notification_socket:
721 self._flush_notifications()
721 self._flush_notifications()
722 if self._mux_socket:
722 if self._mux_socket:
723 self._flush_results(self._mux_socket)
723 self._flush_results(self._mux_socket)
724 if self._task_socket:
724 if self._task_socket:
725 self._flush_results(self._task_socket)
725 self._flush_results(self._task_socket)
726 if self._control_socket:
726 if self._control_socket:
727 self._flush_control(self._control_socket)
727 self._flush_control(self._control_socket)
728 if self._iopub_socket:
728 if self._iopub_socket:
729 self._flush_iopub(self._iopub_socket)
729 self._flush_iopub(self._iopub_socket)
730 if self._query_socket:
730 if self._query_socket:
731 self._flush_ignored_hub_replies()
731 self._flush_ignored_hub_replies()
732
732
733 def wait(self, jobs=None, timeout=-1):
733 def wait(self, jobs=None, timeout=-1):
734 """waits on one or more `jobs`, for up to `timeout` seconds.
734 """waits on one or more `jobs`, for up to `timeout` seconds.
735
735
736 Parameters
736 Parameters
737 ----------
737 ----------
738
738
739 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
739 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
740 ints are indices to self.history
740 ints are indices to self.history
741 strs are msg_ids
741 strs are msg_ids
742 default: wait on all outstanding messages
742 default: wait on all outstanding messages
743 timeout : float
743 timeout : float
744 a time in seconds, after which to give up.
744 a time in seconds, after which to give up.
745 default is -1, which means no timeout
745 default is -1, which means no timeout
746
746
747 Returns
747 Returns
748 -------
748 -------
749
749
750 True : when all msg_ids are done
750 True : when all msg_ids are done
751 False : timeout reached, some msg_ids still outstanding
751 False : timeout reached, some msg_ids still outstanding
752 """
752 """
753 tic = time.time()
753 tic = time.time()
754 if jobs is None:
754 if jobs is None:
755 theids = self.outstanding
755 theids = self.outstanding
756 else:
756 else:
757 if isinstance(jobs, (int, str, AsyncResult)):
757 if isinstance(jobs, (int, str, AsyncResult)):
758 jobs = [jobs]
758 jobs = [jobs]
759 theids = set()
759 theids = set()
760 for job in jobs:
760 for job in jobs:
761 if isinstance(job, int):
761 if isinstance(job, int):
762 # index access
762 # index access
763 job = self.history[job]
763 job = self.history[job]
764 elif isinstance(job, AsyncResult):
764 elif isinstance(job, AsyncResult):
765 map(theids.add, job.msg_ids)
765 map(theids.add, job.msg_ids)
766 continue
766 continue
767 theids.add(job)
767 theids.add(job)
768 if not theids.intersection(self.outstanding):
768 if not theids.intersection(self.outstanding):
769 return True
769 return True
770 self.spin()
770 self.spin()
771 while theids.intersection(self.outstanding):
771 while theids.intersection(self.outstanding):
772 if timeout >= 0 and ( time.time()-tic ) > timeout:
772 if timeout >= 0 and ( time.time()-tic ) > timeout:
773 break
773 break
774 time.sleep(1e-3)
774 time.sleep(1e-3)
775 self.spin()
775 self.spin()
776 return len(theids.intersection(self.outstanding)) == 0
776 return len(theids.intersection(self.outstanding)) == 0
777
777
778 #--------------------------------------------------------------------------
778 #--------------------------------------------------------------------------
779 # Control methods
779 # Control methods
780 #--------------------------------------------------------------------------
780 #--------------------------------------------------------------------------
781
781
782 @spin_first
782 @spin_first
783 def clear(self, targets=None, block=None):
783 def clear(self, targets=None, block=None):
784 """Clear the namespace in target(s)."""
784 """Clear the namespace in target(s)."""
785 block = self.block if block is None else block
785 block = self.block if block is None else block
786 targets = self._build_targets(targets)[0]
786 targets = self._build_targets(targets)[0]
787 for t in targets:
787 for t in targets:
788 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
788 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
789 error = False
789 error = False
790 if block:
790 if block:
791 self._flush_ignored_control()
791 self._flush_ignored_control()
792 for i in range(len(targets)):
792 for i in range(len(targets)):
793 idents,msg = self.session.recv(self._control_socket,0)
793 idents,msg = self.session.recv(self._control_socket,0)
794 if self.debug:
794 if self.debug:
795 pprint(msg)
795 pprint(msg)
796 if msg['content']['status'] != 'ok':
796 if msg['content']['status'] != 'ok':
797 error = self._unwrap_exception(msg['content'])
797 error = self._unwrap_exception(msg['content'])
798 else:
798 else:
799 self._ignored_control_replies += len(targets)
799 self._ignored_control_replies += len(targets)
800 if error:
800 if error:
801 raise error
801 raise error
802
802
803
803
804 @spin_first
804 @spin_first
805 def abort(self, jobs=None, targets=None, block=None):
805 def abort(self, jobs=None, targets=None, block=None):
806 """Abort specific jobs from the execution queues of target(s).
806 """Abort specific jobs from the execution queues of target(s).
807
807
808 This is a mechanism to prevent jobs that have already been submitted
808 This is a mechanism to prevent jobs that have already been submitted
809 from executing.
809 from executing.
810
810
811 Parameters
811 Parameters
812 ----------
812 ----------
813
813
814 jobs : msg_id, list of msg_ids, or AsyncResult
814 jobs : msg_id, list of msg_ids, or AsyncResult
815 The jobs to be aborted
815 The jobs to be aborted
816
816
817
817
818 """
818 """
819 block = self.block if block is None else block
819 block = self.block if block is None else block
820 targets = self._build_targets(targets)[0]
820 targets = self._build_targets(targets)[0]
821 msg_ids = []
821 msg_ids = []
822 if isinstance(jobs, (basestring,AsyncResult)):
822 if isinstance(jobs, (basestring,AsyncResult)):
823 jobs = [jobs]
823 jobs = [jobs]
824 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
824 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
825 if bad_ids:
825 if bad_ids:
826 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
826 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
827 for j in jobs:
827 for j in jobs:
828 if isinstance(j, AsyncResult):
828 if isinstance(j, AsyncResult):
829 msg_ids.extend(j.msg_ids)
829 msg_ids.extend(j.msg_ids)
830 else:
830 else:
831 msg_ids.append(j)
831 msg_ids.append(j)
832 content = dict(msg_ids=msg_ids)
832 content = dict(msg_ids=msg_ids)
833 for t in targets:
833 for t in targets:
834 self.session.send(self._control_socket, 'abort_request',
834 self.session.send(self._control_socket, 'abort_request',
835 content=content, ident=t)
835 content=content, ident=t)
836 error = False
836 error = False
837 if block:
837 if block:
838 self._flush_ignored_control()
838 self._flush_ignored_control()
839 for i in range(len(targets)):
839 for i in range(len(targets)):
840 idents,msg = self.session.recv(self._control_socket,0)
840 idents,msg = self.session.recv(self._control_socket,0)
841 if self.debug:
841 if self.debug:
842 pprint(msg)
842 pprint(msg)
843 if msg['content']['status'] != 'ok':
843 if msg['content']['status'] != 'ok':
844 error = self._unwrap_exception(msg['content'])
844 error = self._unwrap_exception(msg['content'])
845 else:
845 else:
846 self._ignored_control_replies += len(targets)
846 self._ignored_control_replies += len(targets)
847 if error:
847 if error:
848 raise error
848 raise error
849
849
850 @spin_first
850 @spin_first
851 def shutdown(self, targets=None, restart=False, hub=False, block=None):
851 def shutdown(self, targets=None, restart=False, hub=False, block=None):
852 """Terminates one or more engine processes, optionally including the hub."""
852 """Terminates one or more engine processes, optionally including the hub."""
853 block = self.block if block is None else block
853 block = self.block if block is None else block
854 if hub:
854 if hub:
855 targets = 'all'
855 targets = 'all'
856 targets = self._build_targets(targets)[0]
856 targets = self._build_targets(targets)[0]
857 for t in targets:
857 for t in targets:
858 self.session.send(self._control_socket, 'shutdown_request',
858 self.session.send(self._control_socket, 'shutdown_request',
859 content={'restart':restart},ident=t)
859 content={'restart':restart},ident=t)
860 error = False
860 error = False
861 if block or hub:
861 if block or hub:
862 self._flush_ignored_control()
862 self._flush_ignored_control()
863 for i in range(len(targets)):
863 for i in range(len(targets)):
864 idents,msg = self.session.recv(self._control_socket, 0)
864 idents,msg = self.session.recv(self._control_socket, 0)
865 if self.debug:
865 if self.debug:
866 pprint(msg)
866 pprint(msg)
867 if msg['content']['status'] != 'ok':
867 if msg['content']['status'] != 'ok':
868 error = self._unwrap_exception(msg['content'])
868 error = self._unwrap_exception(msg['content'])
869 else:
869 else:
870 self._ignored_control_replies += len(targets)
870 self._ignored_control_replies += len(targets)
871
871
872 if hub:
872 if hub:
873 time.sleep(0.25)
873 time.sleep(0.25)
874 self.session.send(self._query_socket, 'shutdown_request')
874 self.session.send(self._query_socket, 'shutdown_request')
875 idents,msg = self.session.recv(self._query_socket, 0)
875 idents,msg = self.session.recv(self._query_socket, 0)
876 if self.debug:
876 if self.debug:
877 pprint(msg)
877 pprint(msg)
878 if msg['content']['status'] != 'ok':
878 if msg['content']['status'] != 'ok':
879 error = self._unwrap_exception(msg['content'])
879 error = self._unwrap_exception(msg['content'])
880
880
881 if error:
881 if error:
882 raise error
882 raise error
883
883
884 #--------------------------------------------------------------------------
884 #--------------------------------------------------------------------------
885 # Execution related methods
885 # Execution related methods
886 #--------------------------------------------------------------------------
886 #--------------------------------------------------------------------------
887
887
888 def _maybe_raise(self, result):
888 def _maybe_raise(self, result):
889 """wrapper for maybe raising an exception if apply failed."""
889 """wrapper for maybe raising an exception if apply failed."""
890 if isinstance(result, error.RemoteError):
890 if isinstance(result, error.RemoteError):
891 raise result
891 raise result
892
892
893 return result
893 return result
894
894
895 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
895 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
896 ident=None):
896 ident=None):
897 """construct and send an apply message via a socket.
897 """construct and send an apply message via a socket.
898
898
899 This is the principal method with which all engine execution is performed by views.
899 This is the principal method with which all engine execution is performed by views.
900 """
900 """
901
901
902 assert not self._closed, "cannot use me anymore, I'm closed!"
902 assert not self._closed, "cannot use me anymore, I'm closed!"
903 # defaults:
903 # defaults:
904 args = args if args is not None else []
904 args = args if args is not None else []
905 kwargs = kwargs if kwargs is not None else {}
905 kwargs = kwargs if kwargs is not None else {}
906 subheader = subheader if subheader is not None else {}
906 subheader = subheader if subheader is not None else {}
907
907
908 # validate arguments
908 # validate arguments
909 if not callable(f):
909 if not callable(f):
910 raise TypeError("f must be callable, not %s"%type(f))
910 raise TypeError("f must be callable, not %s"%type(f))
911 if not isinstance(args, (tuple, list)):
911 if not isinstance(args, (tuple, list)):
912 raise TypeError("args must be tuple or list, not %s"%type(args))
912 raise TypeError("args must be tuple or list, not %s"%type(args))
913 if not isinstance(kwargs, dict):
913 if not isinstance(kwargs, dict):
914 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
914 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
915 if not isinstance(subheader, dict):
915 if not isinstance(subheader, dict):
916 raise TypeError("subheader must be dict, not %s"%type(subheader))
916 raise TypeError("subheader must be dict, not %s"%type(subheader))
917
917
918 bufs = util.pack_apply_message(f,args,kwargs)
918 bufs = util.pack_apply_message(f,args,kwargs)
919
919
920 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
920 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
921 subheader=subheader, track=track)
921 subheader=subheader, track=track)
922
922
923 msg_id = msg['msg_id']
923 msg_id = msg['msg_id']
924 self.outstanding.add(msg_id)
924 self.outstanding.add(msg_id)
925 if ident:
925 if ident:
926 # possibly routed to a specific engine
926 # possibly routed to a specific engine
927 if isinstance(ident, list):
927 if isinstance(ident, list):
928 ident = ident[-1]
928 ident = ident[-1]
929 if ident in self._engines.values():
929 if ident in self._engines.values():
930 # save for later, in case of engine death
930 # save for later, in case of engine death
931 self._outstanding_dict[ident].add(msg_id)
931 self._outstanding_dict[ident].add(msg_id)
932 self.history.append(msg_id)
932 self.history.append(msg_id)
933 self.metadata[msg_id]['submitted'] = datetime.now()
933 self.metadata[msg_id]['submitted'] = datetime.now()
934
934
935 return msg
935 return msg
936
936
937 #--------------------------------------------------------------------------
937 #--------------------------------------------------------------------------
938 # construct a View object
938 # construct a View object
939 #--------------------------------------------------------------------------
939 #--------------------------------------------------------------------------
940
940
941 def load_balanced_view(self, targets=None):
941 def load_balanced_view(self, targets=None):
942 """construct a DirectView object.
942 """construct a DirectView object.
943
943
944 If no arguments are specified, create a LoadBalancedView
944 If no arguments are specified, create a LoadBalancedView
945 using all engines.
945 using all engines.
946
946
947 Parameters
947 Parameters
948 ----------
948 ----------
949
949
950 targets: list,slice,int,etc. [default: use all engines]
950 targets: list,slice,int,etc. [default: use all engines]
951 The subset of engines across which to load-balance
951 The subset of engines across which to load-balance
952 """
952 """
953 if targets is not None:
953 if targets is not None:
954 targets = self._build_targets(targets)[1]
954 targets = self._build_targets(targets)[1]
955 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
955 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
956
956
957 def direct_view(self, targets='all'):
957 def direct_view(self, targets='all'):
958 """construct a DirectView object.
958 """construct a DirectView object.
959
959
960 If no targets are specified, create a DirectView
960 If no targets are specified, create a DirectView
961 using all engines.
961 using all engines.
962
962
963 Parameters
963 Parameters
964 ----------
964 ----------
965
965
966 targets: list,slice,int,etc. [default: use all engines]
966 targets: list,slice,int,etc. [default: use all engines]
967 The engines to use for the View
967 The engines to use for the View
968 """
968 """
969 single = isinstance(targets, int)
969 single = isinstance(targets, int)
970 targets = self._build_targets(targets)[1]
970 targets = self._build_targets(targets)[1]
971 if single:
971 if single:
972 targets = targets[0]
972 targets = targets[0]
973 return DirectView(client=self, socket=self._mux_socket, targets=targets)
973 return DirectView(client=self, socket=self._mux_socket, targets=targets)
974
974
975 #--------------------------------------------------------------------------
975 #--------------------------------------------------------------------------
976 # Query methods
976 # Query methods
977 #--------------------------------------------------------------------------
977 #--------------------------------------------------------------------------
978
978
979 @spin_first
979 @spin_first
980 def get_result(self, indices_or_msg_ids=None, block=None):
980 def get_result(self, indices_or_msg_ids=None, block=None):
981 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
981 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
982
982
983 If the client already has the results, no request to the Hub will be made.
983 If the client already has the results, no request to the Hub will be made.
984
984
985 This is a convenient way to construct AsyncResult objects, which are wrappers
985 This is a convenient way to construct AsyncResult objects, which are wrappers
986 that include metadata about execution, and allow for awaiting results that
986 that include metadata about execution, and allow for awaiting results that
987 were not submitted by this Client.
987 were not submitted by this Client.
988
988
989 It can also be a convenient way to retrieve the metadata associated with
989 It can also be a convenient way to retrieve the metadata associated with
990 blocking execution, since it always retrieves
990 blocking execution, since it always retrieves
991
991
992 Examples
992 Examples
993 --------
993 --------
994 ::
994 ::
995
995
996 In [10]: r = client.apply()
996 In [10]: r = client.apply()
997
997
998 Parameters
998 Parameters
999 ----------
999 ----------
1000
1000
1001 indices_or_msg_ids : integer history index, str msg_id, or list of either
1001 indices_or_msg_ids : integer history index, str msg_id, or list of either
1002 The indices or msg_ids of indices to be retrieved
1002 The indices or msg_ids of indices to be retrieved
1003
1003
1004 block : bool
1004 block : bool
1005 Whether to wait for the result to be done
1005 Whether to wait for the result to be done
1006
1006
1007 Returns
1007 Returns
1008 -------
1008 -------
1009
1009
1010 AsyncResult
1010 AsyncResult
1011 A single AsyncResult object will always be returned.
1011 A single AsyncResult object will always be returned.
1012
1012
1013 AsyncHubResult
1013 AsyncHubResult
1014 A subclass of AsyncResult that retrieves results from the Hub
1014 A subclass of AsyncResult that retrieves results from the Hub
1015
1015
1016 """
1016 """
1017 block = self.block if block is None else block
1017 block = self.block if block is None else block
1018 if indices_or_msg_ids is None:
1018 if indices_or_msg_ids is None:
1019 indices_or_msg_ids = -1
1019 indices_or_msg_ids = -1
1020
1020
1021 if not isinstance(indices_or_msg_ids, (list,tuple)):
1021 if not isinstance(indices_or_msg_ids, (list,tuple)):
1022 indices_or_msg_ids = [indices_or_msg_ids]
1022 indices_or_msg_ids = [indices_or_msg_ids]
1023
1023
1024 theids = []
1024 theids = []
1025 for id in indices_or_msg_ids:
1025 for id in indices_or_msg_ids:
1026 if isinstance(id, int):
1026 if isinstance(id, int):
1027 id = self.history[id]
1027 id = self.history[id]
1028 if not isinstance(id, str):
1028 if not isinstance(id, str):
1029 raise TypeError("indices must be str or int, not %r"%id)
1029 raise TypeError("indices must be str or int, not %r"%id)
1030 theids.append(id)
1030 theids.append(id)
1031
1031
1032 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1032 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1033 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1033 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1034
1034
1035 if remote_ids:
1035 if remote_ids:
1036 ar = AsyncHubResult(self, msg_ids=theids)
1036 ar = AsyncHubResult(self, msg_ids=theids)
1037 else:
1037 else:
1038 ar = AsyncResult(self, msg_ids=theids)
1038 ar = AsyncResult(self, msg_ids=theids)
1039
1039
1040 if block:
1040 if block:
1041 ar.wait()
1041 ar.wait()
1042
1042
1043 return ar
1043 return ar
1044
1044
1045 @spin_first
1045 @spin_first
1046 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1046 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1047 """Resubmit one or more tasks.
1047 """Resubmit one or more tasks.
1048
1048
1049 in-flight tasks may not be resubmitted.
1049 in-flight tasks may not be resubmitted.
1050
1050
1051 Parameters
1051 Parameters
1052 ----------
1052 ----------
1053
1053
1054 indices_or_msg_ids : integer history index, str msg_id, or list of either
1054 indices_or_msg_ids : integer history index, str msg_id, or list of either
1055 The indices or msg_ids of indices to be retrieved
1055 The indices or msg_ids of indices to be retrieved
1056
1056
1057 block : bool
1057 block : bool
1058 Whether to wait for the result to be done
1058 Whether to wait for the result to be done
1059
1059
1060 Returns
1060 Returns
1061 -------
1061 -------
1062
1062
1063 AsyncHubResult
1063 AsyncHubResult
1064 A subclass of AsyncResult that retrieves results from the Hub
1064 A subclass of AsyncResult that retrieves results from the Hub
1065
1065
1066 """
1066 """
1067 block = self.block if block is None else block
1067 block = self.block if block is None else block
1068 if indices_or_msg_ids is None:
1068 if indices_or_msg_ids is None:
1069 indices_or_msg_ids = -1
1069 indices_or_msg_ids = -1
1070
1070
1071 if not isinstance(indices_or_msg_ids, (list,tuple)):
1071 if not isinstance(indices_or_msg_ids, (list,tuple)):
1072 indices_or_msg_ids = [indices_or_msg_ids]
1072 indices_or_msg_ids = [indices_or_msg_ids]
1073
1073
1074 theids = []
1074 theids = []
1075 for id in indices_or_msg_ids:
1075 for id in indices_or_msg_ids:
1076 if isinstance(id, int):
1076 if isinstance(id, int):
1077 id = self.history[id]
1077 id = self.history[id]
1078 if not isinstance(id, str):
1078 if not isinstance(id, str):
1079 raise TypeError("indices must be str or int, not %r"%id)
1079 raise TypeError("indices must be str or int, not %r"%id)
1080 theids.append(id)
1080 theids.append(id)
1081
1081
1082 for msg_id in theids:
1082 for msg_id in theids:
1083 self.outstanding.discard(msg_id)
1083 self.outstanding.discard(msg_id)
1084 if msg_id in self.history:
1084 if msg_id in self.history:
1085 self.history.remove(msg_id)
1085 self.history.remove(msg_id)
1086 self.results.pop(msg_id, None)
1086 self.results.pop(msg_id, None)
1087 self.metadata.pop(msg_id, None)
1087 self.metadata.pop(msg_id, None)
1088 content = dict(msg_ids = theids)
1088 content = dict(msg_ids = theids)
1089
1089
1090 self.session.send(self._query_socket, 'resubmit_request', content)
1090 self.session.send(self._query_socket, 'resubmit_request', content)
1091
1091
1092 zmq.select([self._query_socket], [], [])
1092 zmq.select([self._query_socket], [], [])
1093 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1093 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1094 if self.debug:
1094 if self.debug:
1095 pprint(msg)
1095 pprint(msg)
1096 content = msg['content']
1096 content = msg['content']
1097 if content['status'] != 'ok':
1097 if content['status'] != 'ok':
1098 raise self._unwrap_exception(content)
1098 raise self._unwrap_exception(content)
1099
1099
1100 ar = AsyncHubResult(self, msg_ids=theids)
1100 ar = AsyncHubResult(self, msg_ids=theids)
1101
1101
1102 if block:
1102 if block:
1103 ar.wait()
1103 ar.wait()
1104
1104
1105 return ar
1105 return ar
1106
1106
1107 @spin_first
1107 @spin_first
1108 def result_status(self, msg_ids, status_only=True):
1108 def result_status(self, msg_ids, status_only=True):
1109 """Check on the status of the result(s) of the apply request with `msg_ids`.
1109 """Check on the status of the result(s) of the apply request with `msg_ids`.
1110
1110
1111 If status_only is False, then the actual results will be retrieved, else
1111 If status_only is False, then the actual results will be retrieved, else
1112 only the status of the results will be checked.
1112 only the status of the results will be checked.
1113
1113
1114 Parameters
1114 Parameters
1115 ----------
1115 ----------
1116
1116
1117 msg_ids : list of msg_ids
1117 msg_ids : list of msg_ids
1118 if int:
1118 if int:
1119 Passed as index to self.history for convenience.
1119 Passed as index to self.history for convenience.
1120 status_only : bool (default: True)
1120 status_only : bool (default: True)
1121 if False:
1121 if False:
1122 Retrieve the actual results of completed tasks.
1122 Retrieve the actual results of completed tasks.
1123
1123
1124 Returns
1124 Returns
1125 -------
1125 -------
1126
1126
1127 results : dict
1127 results : dict
1128 There will always be the keys 'pending' and 'completed', which will
1128 There will always be the keys 'pending' and 'completed', which will
1129 be lists of msg_ids that are incomplete or complete. If `status_only`
1129 be lists of msg_ids that are incomplete or complete. If `status_only`
1130 is False, then completed results will be keyed by their `msg_id`.
1130 is False, then completed results will be keyed by their `msg_id`.
1131 """
1131 """
1132 if not isinstance(msg_ids, (list,tuple)):
1132 if not isinstance(msg_ids, (list,tuple)):
1133 msg_ids = [msg_ids]
1133 msg_ids = [msg_ids]
1134
1134
1135 theids = []
1135 theids = []
1136 for msg_id in msg_ids:
1136 for msg_id in msg_ids:
1137 if isinstance(msg_id, int):
1137 if isinstance(msg_id, int):
1138 msg_id = self.history[msg_id]
1138 msg_id = self.history[msg_id]
1139 if not isinstance(msg_id, basestring):
1139 if not isinstance(msg_id, basestring):
1140 raise TypeError("msg_ids must be str, not %r"%msg_id)
1140 raise TypeError("msg_ids must be str, not %r"%msg_id)
1141 theids.append(msg_id)
1141 theids.append(msg_id)
1142
1142
1143 completed = []
1143 completed = []
1144 local_results = {}
1144 local_results = {}
1145
1145
1146 # comment this block out to temporarily disable local shortcut:
1146 # comment this block out to temporarily disable local shortcut:
1147 for msg_id in theids:
1147 for msg_id in theids:
1148 if msg_id in self.results:
1148 if msg_id in self.results:
1149 completed.append(msg_id)
1149 completed.append(msg_id)
1150 local_results[msg_id] = self.results[msg_id]
1150 local_results[msg_id] = self.results[msg_id]
1151 theids.remove(msg_id)
1151 theids.remove(msg_id)
1152
1152
1153 if theids: # some not locally cached
1153 if theids: # some not locally cached
1154 content = dict(msg_ids=theids, status_only=status_only)
1154 content = dict(msg_ids=theids, status_only=status_only)
1155 msg = self.session.send(self._query_socket, "result_request", content=content)
1155 msg = self.session.send(self._query_socket, "result_request", content=content)
1156 zmq.select([self._query_socket], [], [])
1156 zmq.select([self._query_socket], [], [])
1157 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1157 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1158 if self.debug:
1158 if self.debug:
1159 pprint(msg)
1159 pprint(msg)
1160 content = msg['content']
1160 content = msg['content']
1161 if content['status'] != 'ok':
1161 if content['status'] != 'ok':
1162 raise self._unwrap_exception(content)
1162 raise self._unwrap_exception(content)
1163 buffers = msg['buffers']
1163 buffers = msg['buffers']
1164 else:
1164 else:
1165 content = dict(completed=[],pending=[])
1165 content = dict(completed=[],pending=[])
1166
1166
1167 content['completed'].extend(completed)
1167 content['completed'].extend(completed)
1168
1168
1169 if status_only:
1169 if status_only:
1170 return content
1170 return content
1171
1171
1172 failures = []
1172 failures = []
1173 # load cached results into result:
1173 # load cached results into result:
1174 content.update(local_results)
1174 content.update(local_results)
1175 # update cache with results:
1175 # update cache with results:
1176 for msg_id in sorted(theids):
1176 for msg_id in sorted(theids):
1177 if msg_id in content['completed']:
1177 if msg_id in content['completed']:
1178 rec = content[msg_id]
1178 rec = content[msg_id]
1179 parent = rec['header']
1179 parent = rec['header']
1180 header = rec['result_header']
1180 header = rec['result_header']
1181 rcontent = rec['result_content']
1181 rcontent = rec['result_content']
1182 iodict = rec['io']
1182 iodict = rec['io']
1183 if isinstance(rcontent, str):
1183 if isinstance(rcontent, str):
1184 rcontent = self.session.unpack(rcontent)
1184 rcontent = self.session.unpack(rcontent)
1185
1185
1186 md = self.metadata[msg_id]
1186 md = self.metadata[msg_id]
1187 md.update(self._extract_metadata(header, parent, rcontent))
1187 md.update(self._extract_metadata(header, parent, rcontent))
1188 md.update(iodict)
1188 md.update(iodict)
1189
1189
1190 if rcontent['status'] == 'ok':
1190 if rcontent['status'] == 'ok':
1191 res,buffers = util.unserialize_object(buffers)
1191 res,buffers = util.unserialize_object(buffers)
1192 else:
1192 else:
1193 print rcontent
1193 print rcontent
1194 res = self._unwrap_exception(rcontent)
1194 res = self._unwrap_exception(rcontent)
1195 failures.append(res)
1195 failures.append(res)
1196
1196
1197 self.results[msg_id] = res
1197 self.results[msg_id] = res
1198 content[msg_id] = res
1198 content[msg_id] = res
1199
1199
1200 if len(theids) == 1 and failures:
1200 if len(theids) == 1 and failures:
1201 raise failures[0]
1201 raise failures[0]
1202
1202
1203 error.collect_exceptions(failures, "result_status")
1203 error.collect_exceptions(failures, "result_status")
1204 return content
1204 return content
1205
1205
1206 @spin_first
1206 @spin_first
1207 def queue_status(self, targets='all', verbose=False):
1207 def queue_status(self, targets='all', verbose=False):
1208 """Fetch the status of engine queues.
1208 """Fetch the status of engine queues.
1209
1209
1210 Parameters
1210 Parameters
1211 ----------
1211 ----------
1212
1212
1213 targets : int/str/list of ints/strs
1213 targets : int/str/list of ints/strs
1214 the engines whose states are to be queried.
1214 the engines whose states are to be queried.
1215 default : all
1215 default : all
1216 verbose : bool
1216 verbose : bool
1217 Whether to return lengths only, or lists of ids for each element
1217 Whether to return lengths only, or lists of ids for each element
1218 """
1218 """
1219 engine_ids = self._build_targets(targets)[1]
1219 engine_ids = self._build_targets(targets)[1]
1220 content = dict(targets=engine_ids, verbose=verbose)
1220 content = dict(targets=engine_ids, verbose=verbose)
1221 self.session.send(self._query_socket, "queue_request", content=content)
1221 self.session.send(self._query_socket, "queue_request", content=content)
1222 idents,msg = self.session.recv(self._query_socket, 0)
1222 idents,msg = self.session.recv(self._query_socket, 0)
1223 if self.debug:
1223 if self.debug:
1224 pprint(msg)
1224 pprint(msg)
1225 content = msg['content']
1225 content = msg['content']
1226 status = content.pop('status')
1226 status = content.pop('status')
1227 if status != 'ok':
1227 if status != 'ok':
1228 raise self._unwrap_exception(content)
1228 raise self._unwrap_exception(content)
1229 content = util.rekey(content)
1229 content = util.rekey(content)
1230 if isinstance(targets, int):
1230 if isinstance(targets, int):
1231 return content[targets]
1231 return content[targets]
1232 else:
1232 else:
1233 return content
1233 return content
1234
1234
1235 @spin_first
1235 @spin_first
1236 def purge_results(self, jobs=[], targets=[]):
1236 def purge_results(self, jobs=[], targets=[]):
1237 """Tell the Hub to forget results.
1237 """Tell the Hub to forget results.
1238
1238
1239 Individual results can be purged by msg_id, or the entire
1239 Individual results can be purged by msg_id, or the entire
1240 history of specific targets can be purged.
1240 history of specific targets can be purged.
1241
1241
1242 Parameters
1242 Parameters
1243 ----------
1243 ----------
1244
1244
1245 jobs : str or list of str or AsyncResult objects
1245 jobs : str or list of str or AsyncResult objects
1246 the msg_ids whose results should be forgotten.
1246 the msg_ids whose results should be forgotten.
1247 targets : int/str/list of ints/strs
1247 targets : int/str/list of ints/strs
1248 The targets, by uuid or int_id, whose entire history is to be purged.
1248 The targets, by uuid or int_id, whose entire history is to be purged.
1249 Use `targets='all'` to scrub everything from the Hub's memory.
1249 Use `targets='all'` to scrub everything from the Hub's memory.
1250
1250
1251 default : None
1251 default : None
1252 """
1252 """
1253 if not targets and not jobs:
1253 if not targets and not jobs:
1254 raise ValueError("Must specify at least one of `targets` and `jobs`")
1254 raise ValueError("Must specify at least one of `targets` and `jobs`")
1255 if targets:
1255 if targets:
1256 targets = self._build_targets(targets)[1]
1256 targets = self._build_targets(targets)[1]
1257
1257
1258 # construct msg_ids from jobs
1258 # construct msg_ids from jobs
1259 msg_ids = []
1259 msg_ids = []
1260 if isinstance(jobs, (basestring,AsyncResult)):
1260 if isinstance(jobs, (basestring,AsyncResult)):
1261 jobs = [jobs]
1261 jobs = [jobs]
1262 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1262 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1263 if bad_ids:
1263 if bad_ids:
1264 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1264 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1265 for j in jobs:
1265 for j in jobs:
1266 if isinstance(j, AsyncResult):
1266 if isinstance(j, AsyncResult):
1267 msg_ids.extend(j.msg_ids)
1267 msg_ids.extend(j.msg_ids)
1268 else:
1268 else:
1269 msg_ids.append(j)
1269 msg_ids.append(j)
1270
1270
1271 content = dict(targets=targets, msg_ids=msg_ids)
1271 content = dict(targets=targets, msg_ids=msg_ids)
1272 self.session.send(self._query_socket, "purge_request", content=content)
1272 self.session.send(self._query_socket, "purge_request", content=content)
1273 idents, msg = self.session.recv(self._query_socket, 0)
1273 idents, msg = self.session.recv(self._query_socket, 0)
1274 if self.debug:
1274 if self.debug:
1275 pprint(msg)
1275 pprint(msg)
1276 content = msg['content']
1276 content = msg['content']
1277 if content['status'] != 'ok':
1277 if content['status'] != 'ok':
1278 raise self._unwrap_exception(content)
1278 raise self._unwrap_exception(content)
1279
1279
1280 @spin_first
1280 @spin_first
1281 def hub_history(self):
1281 def hub_history(self):
1282 """Get the Hub's history
1282 """Get the Hub's history
1283
1283
1284 Just like the Client, the Hub has a history, which is a list of msg_ids.
1284 Just like the Client, the Hub has a history, which is a list of msg_ids.
1285 This will contain the history of all clients, and, depending on configuration,
1285 This will contain the history of all clients, and, depending on configuration,
1286 may contain history across multiple cluster sessions.
1286 may contain history across multiple cluster sessions.
1287
1287
1288 Any msg_id returned here is a valid argument to `get_result`.
1288 Any msg_id returned here is a valid argument to `get_result`.
1289
1289
1290 Returns
1290 Returns
1291 -------
1291 -------
1292
1292
1293 msg_ids : list of strs
1293 msg_ids : list of strs
1294 list of all msg_ids, ordered by task submission time.
1294 list of all msg_ids, ordered by task submission time.
1295 """
1295 """
1296
1296
1297 self.session.send(self._query_socket, "history_request", content={})
1297 self.session.send(self._query_socket, "history_request", content={})
1298 idents, msg = self.session.recv(self._query_socket, 0)
1298 idents, msg = self.session.recv(self._query_socket, 0)
1299
1299
1300 if self.debug:
1300 if self.debug:
1301 pprint(msg)
1301 pprint(msg)
1302 content = msg['content']
1302 content = msg['content']
1303 if content['status'] != 'ok':
1303 if content['status'] != 'ok':
1304 raise self._unwrap_exception(content)
1304 raise self._unwrap_exception(content)
1305 else:
1305 else:
1306 return content['history']
1306 return content['history']
1307
1307
1308 @spin_first
1308 @spin_first
1309 def db_query(self, query, keys=None):
1309 def db_query(self, query, keys=None):
1310 """Query the Hub's TaskRecord database
1310 """Query the Hub's TaskRecord database
1311
1311
1312 This will return a list of task record dicts that match `query`
1312 This will return a list of task record dicts that match `query`
1313
1313
1314 Parameters
1314 Parameters
1315 ----------
1315 ----------
1316
1316
1317 query : mongodb query dict
1317 query : mongodb query dict
1318 The search dict. See mongodb query docs for details.
1318 The search dict. See mongodb query docs for details.
1319 keys : list of strs [optional]
1319 keys : list of strs [optional]
1320 The subset of keys to be returned. The default is to fetch everything but buffers.
1320 The subset of keys to be returned. The default is to fetch everything but buffers.
1321 'msg_id' will *always* be included.
1321 'msg_id' will *always* be included.
1322 """
1322 """
1323 if isinstance(keys, basestring):
1323 if isinstance(keys, basestring):
1324 keys = [keys]
1324 keys = [keys]
1325 content = dict(query=query, keys=keys)
1325 content = dict(query=query, keys=keys)
1326 self.session.send(self._query_socket, "db_request", content=content)
1326 self.session.send(self._query_socket, "db_request", content=content)
1327 idents, msg = self.session.recv(self._query_socket, 0)
1327 idents, msg = self.session.recv(self._query_socket, 0)
1328 if self.debug:
1328 if self.debug:
1329 pprint(msg)
1329 pprint(msg)
1330 content = msg['content']
1330 content = msg['content']
1331 if content['status'] != 'ok':
1331 if content['status'] != 'ok':
1332 raise self._unwrap_exception(content)
1332 raise self._unwrap_exception(content)
1333
1333
1334 records = content['records']
1334 records = content['records']
1335 buffer_lens = content['buffer_lens']
1335 buffer_lens = content['buffer_lens']
1336 result_buffer_lens = content['result_buffer_lens']
1336 result_buffer_lens = content['result_buffer_lens']
1337 buffers = msg['buffers']
1337 buffers = msg['buffers']
1338 has_bufs = buffer_lens is not None
1338 has_bufs = buffer_lens is not None
1339 has_rbufs = result_buffer_lens is not None
1339 has_rbufs = result_buffer_lens is not None
1340 for i,rec in enumerate(records):
1340 for i,rec in enumerate(records):
1341 # relink buffers
1341 # relink buffers
1342 if has_bufs:
1342 if has_bufs:
1343 blen = buffer_lens[i]
1343 blen = buffer_lens[i]
1344 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1344 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1345 if has_rbufs:
1345 if has_rbufs:
1346 blen = result_buffer_lens[i]
1346 blen = result_buffer_lens[i]
1347 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1347 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1348 # turn timestamps back into times
1348 # turn timestamps back into times
1349 for key in 'submitted started completed resubmitted'.split():
1349 for key in 'submitted started completed resubmitted'.split():
1350 maybedate = rec.get(key, None)
1350 maybedate = rec.get(key, None)
1351 if maybedate and util.ISO8601_RE.match(maybedate):
1351 if maybedate and util.ISO8601_RE.match(maybedate):
1352 rec[key] = datetime.strptime(maybedate, util.ISO8601)
1352 rec[key] = datetime.strptime(maybedate, util.ISO8601)
1353
1353
1354 return records
1354 return records
1355
1355
1356 __all__ = [ 'Client' ]
1356 __all__ = [ 'Client' ]
@@ -1,333 +1,339 b''
1 """A TaskRecord backend using sqlite3"""
1 """A TaskRecord backend using sqlite3"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2011 The IPython Development Team
3 # Copyright (C) 2011 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 import json
9 import json
10 import os
10 import os
11 import cPickle as pickle
11 import cPickle as pickle
12 from datetime import datetime
12 from datetime import datetime
13
13
14 import sqlite3
14 import sqlite3
15
15
16 from zmq.eventloop import ioloop
16 from zmq.eventloop import ioloop
17
17
18 from IPython.utils.traitlets import Unicode, Instance, List
18 from IPython.utils.traitlets import Unicode, Instance, List
19 from .dictdb import BaseDB
19 from .dictdb import BaseDB
20 from IPython.parallel.util import ISO8601
20 from IPython.parallel.util import ISO8601
21
21
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23 # SQLite operators, adapters, and converters
23 # SQLite operators, adapters, and converters
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25
25
26 operators = {
26 operators = {
27 '$lt' : "<",
27 '$lt' : "<",
28 '$gt' : ">",
28 '$gt' : ">",
29 # null is handled weird with ==,!=
29 # null is handled weird with ==,!=
30 '$eq' : "=",
30 '$eq' : "=",
31 '$ne' : "!=",
31 '$ne' : "!=",
32 '$lte': "<=",
32 '$lte': "<=",
33 '$gte': ">=",
33 '$gte': ">=",
34 '$in' : ('=', ' OR '),
34 '$in' : ('=', ' OR '),
35 '$nin': ('!=', ' AND '),
35 '$nin': ('!=', ' AND '),
36 # '$all': None,
36 # '$all': None,
37 # '$mod': None,
37 # '$mod': None,
38 # '$exists' : None
38 # '$exists' : None
39 }
39 }
40 null_operators = {
40 null_operators = {
41 '=' : "IS NULL",
41 '=' : "IS NULL",
42 '!=' : "IS NOT NULL",
42 '!=' : "IS NOT NULL",
43 }
43 }
44
44
45 def _adapt_datetime(dt):
45 def _adapt_datetime(dt):
46 return dt.strftime(ISO8601)
46 return dt.strftime(ISO8601)
47
47
48 def _convert_datetime(ds):
48 def _convert_datetime(ds):
49 if ds is None:
49 if ds is None:
50 return ds
50 return ds
51 else:
51 else:
52 return datetime.strptime(ds, ISO8601)
52 return datetime.strptime(ds, ISO8601)
53
53
54 def _adapt_dict(d):
54 def _adapt_dict(d):
55 return json.dumps(d)
55 return json.dumps(d)
56
56
57 def _convert_dict(ds):
57 def _convert_dict(ds):
58 if ds is None:
58 if ds is None:
59 return ds
59 return ds
60 else:
60 else:
61 return json.loads(ds)
61 return json.loads(ds)
62
62
63 def _adapt_bufs(bufs):
63 def _adapt_bufs(bufs):
64 # this is *horrible*
64 # this is *horrible*
65 # copy buffers into single list and pickle it:
65 # copy buffers into single list and pickle it:
66 if bufs and isinstance(bufs[0], (bytes, buffer)):
66 if bufs and isinstance(bufs[0], (bytes, buffer)):
67 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
67 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
68 elif bufs:
68 elif bufs:
69 return bufs
69 return bufs
70 else:
70 else:
71 return None
71 return None
72
72
73 def _convert_bufs(bs):
73 def _convert_bufs(bs):
74 if bs is None:
74 if bs is None:
75 return []
75 return []
76 else:
76 else:
77 return pickle.loads(bytes(bs))
77 return pickle.loads(bytes(bs))
78
78
79 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
80 # SQLiteDB class
80 # SQLiteDB class
81 #-----------------------------------------------------------------------------
81 #-----------------------------------------------------------------------------
82
82
83 class SQLiteDB(BaseDB):
83 class SQLiteDB(BaseDB):
84 """SQLite3 TaskRecord backend."""
84 """SQLite3 TaskRecord backend."""
85
85
86 filename = Unicode('tasks.db', config=True,
86 filename = Unicode('tasks.db', config=True,
87 help="""The filename of the sqlite task database. [default: 'tasks.db']""")
87 help="""The filename of the sqlite task database. [default: 'tasks.db']""")
88 location = Unicode('', config=True,
88 location = Unicode('', config=True,
89 help="""The directory containing the sqlite task database. The default
89 help="""The directory containing the sqlite task database. The default
90 is to use the cluster_dir location.""")
90 is to use the cluster_dir location.""")
91 table = Unicode("", config=True,
91 table = Unicode("", config=True,
92 help="""The SQLite Table to use for storing tasks for this session. If unspecified,
92 help="""The SQLite Table to use for storing tasks for this session. If unspecified,
93 a new table will be created with the Hub's IDENT. Specifying the table will result
93 a new table will be created with the Hub's IDENT. Specifying the table will result
94 in tasks from previous sessions being available via Clients' db_query and
94 in tasks from previous sessions being available via Clients' db_query and
95 get_result methods.""")
95 get_result methods.""")
96
96
97 _db = Instance('sqlite3.Connection')
97 _db = Instance('sqlite3.Connection')
98 _keys = List(['msg_id' ,
98 _keys = List(['msg_id' ,
99 'header' ,
99 'header' ,
100 'content',
100 'content',
101 'buffers',
101 'buffers',
102 'submitted',
102 'submitted',
103 'client_uuid' ,
103 'client_uuid' ,
104 'engine_uuid' ,
104 'engine_uuid' ,
105 'started',
105 'started',
106 'completed',
106 'completed',
107 'resubmitted',
107 'resubmitted',
108 'result_header' ,
108 'result_header' ,
109 'result_content' ,
109 'result_content' ,
110 'result_buffers' ,
110 'result_buffers' ,
111 'queue' ,
111 'queue' ,
112 'pyin' ,
112 'pyin' ,
113 'pyout',
113 'pyout',
114 'pyerr',
114 'pyerr',
115 'stdout',
115 'stdout',
116 'stderr',
116 'stderr',
117 ])
117 ])
118
118
119 def __init__(self, **kwargs):
119 def __init__(self, **kwargs):
120 super(SQLiteDB, self).__init__(**kwargs)
120 super(SQLiteDB, self).__init__(**kwargs)
121 if not self.table:
121 if not self.table:
122 # use session, and prefix _, since starting with # is illegal
122 # use session, and prefix _, since starting with # is illegal
123 self.table = '_'+self.session.replace('-','_')
123 self.table = '_'+self.session.replace('-','_')
124 if not self.location:
124 if not self.location:
125 if hasattr(self.config.Global, 'cluster_dir'):
125 # get current profile
126 self.location = self.config.Global.cluster_dir
126 from IPython.core.newapplication import BaseIPythonApplication
127 if BaseIPythonApplication.initialized():
128 app = BaseIPythonApplication.instance()
129 if app.profile_dir is not None:
130 self.location = app.profile_dir.location
127 else:
131 else:
128 self.location = '.'
132 self.location = u'.'
133 else:
134 self.location = u'.'
129 self._init_db()
135 self._init_db()
130
136
131 # register db commit as 2s periodic callback
137 # register db commit as 2s periodic callback
132 # to prevent clogging pipes
138 # to prevent clogging pipes
133 # assumes we are being run in a zmq ioloop app
139 # assumes we are being run in a zmq ioloop app
134 loop = ioloop.IOLoop.instance()
140 loop = ioloop.IOLoop.instance()
135 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
141 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
136 pc.start()
142 pc.start()
137
143
138 def _defaults(self, keys=None):
144 def _defaults(self, keys=None):
139 """create an empty record"""
145 """create an empty record"""
140 d = {}
146 d = {}
141 keys = self._keys if keys is None else keys
147 keys = self._keys if keys is None else keys
142 for key in keys:
148 for key in keys:
143 d[key] = None
149 d[key] = None
144 return d
150 return d
145
151
146 def _init_db(self):
152 def _init_db(self):
147 """Connect to the database and get new session number."""
153 """Connect to the database and get new session number."""
148 # register adapters
154 # register adapters
149 sqlite3.register_adapter(datetime, _adapt_datetime)
155 sqlite3.register_adapter(datetime, _adapt_datetime)
150 sqlite3.register_converter('datetime', _convert_datetime)
156 sqlite3.register_converter('datetime', _convert_datetime)
151 sqlite3.register_adapter(dict, _adapt_dict)
157 sqlite3.register_adapter(dict, _adapt_dict)
152 sqlite3.register_converter('dict', _convert_dict)
158 sqlite3.register_converter('dict', _convert_dict)
153 sqlite3.register_adapter(list, _adapt_bufs)
159 sqlite3.register_adapter(list, _adapt_bufs)
154 sqlite3.register_converter('bufs', _convert_bufs)
160 sqlite3.register_converter('bufs', _convert_bufs)
155 # connect to the db
161 # connect to the db
156 dbfile = os.path.join(self.location, self.filename)
162 dbfile = os.path.join(self.location, self.filename)
157 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
163 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
158 # isolation_level = None)#,
164 # isolation_level = None)#,
159 cached_statements=64)
165 cached_statements=64)
160 # print dir(self._db)
166 # print dir(self._db)
161
167
162 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
168 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
163 (msg_id text PRIMARY KEY,
169 (msg_id text PRIMARY KEY,
164 header dict text,
170 header dict text,
165 content dict text,
171 content dict text,
166 buffers bufs blob,
172 buffers bufs blob,
167 submitted datetime text,
173 submitted datetime text,
168 client_uuid text,
174 client_uuid text,
169 engine_uuid text,
175 engine_uuid text,
170 started datetime text,
176 started datetime text,
171 completed datetime text,
177 completed datetime text,
172 resubmitted datetime text,
178 resubmitted datetime text,
173 result_header dict text,
179 result_header dict text,
174 result_content dict text,
180 result_content dict text,
175 result_buffers bufs blob,
181 result_buffers bufs blob,
176 queue text,
182 queue text,
177 pyin text,
183 pyin text,
178 pyout text,
184 pyout text,
179 pyerr text,
185 pyerr text,
180 stdout text,
186 stdout text,
181 stderr text)
187 stderr text)
182 """%self.table)
188 """%self.table)
183 self._db.commit()
189 self._db.commit()
184
190
185 def _dict_to_list(self, d):
191 def _dict_to_list(self, d):
186 """turn a mongodb-style record dict into a list."""
192 """turn a mongodb-style record dict into a list."""
187
193
188 return [ d[key] for key in self._keys ]
194 return [ d[key] for key in self._keys ]
189
195
190 def _list_to_dict(self, line, keys=None):
196 def _list_to_dict(self, line, keys=None):
191 """Inverse of dict_to_list"""
197 """Inverse of dict_to_list"""
192 keys = self._keys if keys is None else keys
198 keys = self._keys if keys is None else keys
193 d = self._defaults(keys)
199 d = self._defaults(keys)
194 for key,value in zip(keys, line):
200 for key,value in zip(keys, line):
195 d[key] = value
201 d[key] = value
196
202
197 return d
203 return d
198
204
199 def _render_expression(self, check):
205 def _render_expression(self, check):
200 """Turn a mongodb-style search dict into an SQL query."""
206 """Turn a mongodb-style search dict into an SQL query."""
201 expressions = []
207 expressions = []
202 args = []
208 args = []
203
209
204 skeys = set(check.keys())
210 skeys = set(check.keys())
205 skeys.difference_update(set(self._keys))
211 skeys.difference_update(set(self._keys))
206 skeys.difference_update(set(['buffers', 'result_buffers']))
212 skeys.difference_update(set(['buffers', 'result_buffers']))
207 if skeys:
213 if skeys:
208 raise KeyError("Illegal testing key(s): %s"%skeys)
214 raise KeyError("Illegal testing key(s): %s"%skeys)
209
215
210 for name,sub_check in check.iteritems():
216 for name,sub_check in check.iteritems():
211 if isinstance(sub_check, dict):
217 if isinstance(sub_check, dict):
212 for test,value in sub_check.iteritems():
218 for test,value in sub_check.iteritems():
213 try:
219 try:
214 op = operators[test]
220 op = operators[test]
215 except KeyError:
221 except KeyError:
216 raise KeyError("Unsupported operator: %r"%test)
222 raise KeyError("Unsupported operator: %r"%test)
217 if isinstance(op, tuple):
223 if isinstance(op, tuple):
218 op, join = op
224 op, join = op
219
225
220 if value is None and op in null_operators:
226 if value is None and op in null_operators:
221 expr = "%s %s"%null_operators[op]
227 expr = "%s %s"%null_operators[op]
222 else:
228 else:
223 expr = "%s %s ?"%(name, op)
229 expr = "%s %s ?"%(name, op)
224 if isinstance(value, (tuple,list)):
230 if isinstance(value, (tuple,list)):
225 if op in null_operators and any([v is None for v in value]):
231 if op in null_operators and any([v is None for v in value]):
226 # equality tests don't work with NULL
232 # equality tests don't work with NULL
227 raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test)
233 raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test)
228 expr = '( %s )'%( join.join([expr]*len(value)) )
234 expr = '( %s )'%( join.join([expr]*len(value)) )
229 args.extend(value)
235 args.extend(value)
230 else:
236 else:
231 args.append(value)
237 args.append(value)
232 expressions.append(expr)
238 expressions.append(expr)
233 else:
239 else:
234 # it's an equality check
240 # it's an equality check
235 if sub_check is None:
241 if sub_check is None:
236 expressions.append("%s IS NULL")
242 expressions.append("%s IS NULL")
237 else:
243 else:
238 expressions.append("%s = ?"%name)
244 expressions.append("%s = ?"%name)
239 args.append(sub_check)
245 args.append(sub_check)
240
246
241 expr = " AND ".join(expressions)
247 expr = " AND ".join(expressions)
242 return expr, args
248 return expr, args
243
249
244 def add_record(self, msg_id, rec):
250 def add_record(self, msg_id, rec):
245 """Add a new Task Record, by msg_id."""
251 """Add a new Task Record, by msg_id."""
246 d = self._defaults()
252 d = self._defaults()
247 d.update(rec)
253 d.update(rec)
248 d['msg_id'] = msg_id
254 d['msg_id'] = msg_id
249 line = self._dict_to_list(d)
255 line = self._dict_to_list(d)
250 tups = '(%s)'%(','.join(['?']*len(line)))
256 tups = '(%s)'%(','.join(['?']*len(line)))
251 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
257 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
252 # self._db.commit()
258 # self._db.commit()
253
259
254 def get_record(self, msg_id):
260 def get_record(self, msg_id):
255 """Get a specific Task Record, by msg_id."""
261 """Get a specific Task Record, by msg_id."""
256 cursor = self._db.execute("""SELECT * FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
262 cursor = self._db.execute("""SELECT * FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
257 line = cursor.fetchone()
263 line = cursor.fetchone()
258 if line is None:
264 if line is None:
259 raise KeyError("No such msg: %r"%msg_id)
265 raise KeyError("No such msg: %r"%msg_id)
260 return self._list_to_dict(line)
266 return self._list_to_dict(line)
261
267
262 def update_record(self, msg_id, rec):
268 def update_record(self, msg_id, rec):
263 """Update the data in an existing record."""
269 """Update the data in an existing record."""
264 query = "UPDATE %s SET "%self.table
270 query = "UPDATE %s SET "%self.table
265 sets = []
271 sets = []
266 keys = sorted(rec.keys())
272 keys = sorted(rec.keys())
267 values = []
273 values = []
268 for key in keys:
274 for key in keys:
269 sets.append('%s = ?'%key)
275 sets.append('%s = ?'%key)
270 values.append(rec[key])
276 values.append(rec[key])
271 query += ', '.join(sets)
277 query += ', '.join(sets)
272 query += ' WHERE msg_id == ?'
278 query += ' WHERE msg_id == ?'
273 values.append(msg_id)
279 values.append(msg_id)
274 self._db.execute(query, values)
280 self._db.execute(query, values)
275 # self._db.commit()
281 # self._db.commit()
276
282
277 def drop_record(self, msg_id):
283 def drop_record(self, msg_id):
278 """Remove a record from the DB."""
284 """Remove a record from the DB."""
279 self._db.execute("""DELETE FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
285 self._db.execute("""DELETE FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
280 # self._db.commit()
286 # self._db.commit()
281
287
282 def drop_matching_records(self, check):
288 def drop_matching_records(self, check):
283 """Remove a record from the DB."""
289 """Remove a record from the DB."""
284 expr,args = self._render_expression(check)
290 expr,args = self._render_expression(check)
285 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
291 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
286 self._db.execute(query,args)
292 self._db.execute(query,args)
287 # self._db.commit()
293 # self._db.commit()
288
294
289 def find_records(self, check, keys=None):
295 def find_records(self, check, keys=None):
290 """Find records matching a query dict, optionally extracting subset of keys.
296 """Find records matching a query dict, optionally extracting subset of keys.
291
297
292 Returns list of matching records.
298 Returns list of matching records.
293
299
294 Parameters
300 Parameters
295 ----------
301 ----------
296
302
297 check: dict
303 check: dict
298 mongodb-style query argument
304 mongodb-style query argument
299 keys: list of strs [optional]
305 keys: list of strs [optional]
300 if specified, the subset of keys to extract. msg_id will *always* be
306 if specified, the subset of keys to extract. msg_id will *always* be
301 included.
307 included.
302 """
308 """
303 if keys:
309 if keys:
304 bad_keys = [ key for key in keys if key not in self._keys ]
310 bad_keys = [ key for key in keys if key not in self._keys ]
305 if bad_keys:
311 if bad_keys:
306 raise KeyError("Bad record key(s): %s"%bad_keys)
312 raise KeyError("Bad record key(s): %s"%bad_keys)
307
313
308 if keys:
314 if keys:
309 # ensure msg_id is present and first:
315 # ensure msg_id is present and first:
310 if 'msg_id' in keys:
316 if 'msg_id' in keys:
311 keys.remove('msg_id')
317 keys.remove('msg_id')
312 keys.insert(0, 'msg_id')
318 keys.insert(0, 'msg_id')
313 req = ', '.join(keys)
319 req = ', '.join(keys)
314 else:
320 else:
315 req = '*'
321 req = '*'
316 expr,args = self._render_expression(check)
322 expr,args = self._render_expression(check)
317 query = """SELECT %s FROM %s WHERE %s"""%(req, self.table, expr)
323 query = """SELECT %s FROM %s WHERE %s"""%(req, self.table, expr)
318 cursor = self._db.execute(query, args)
324 cursor = self._db.execute(query, args)
319 matches = cursor.fetchall()
325 matches = cursor.fetchall()
320 records = []
326 records = []
321 for line in matches:
327 for line in matches:
322 rec = self._list_to_dict(line, keys)
328 rec = self._list_to_dict(line, keys)
323 records.append(rec)
329 records.append(rec)
324 return records
330 return records
325
331
326 def get_history(self):
332 def get_history(self):
327 """get all msg_ids, ordered by time submitted."""
333 """get all msg_ids, ordered by time submitted."""
328 query = """SELECT msg_id FROM %s ORDER by submitted ASC"""%self.table
334 query = """SELECT msg_id FROM %s ORDER by submitted ASC"""%self.table
329 cursor = self._db.execute(query)
335 cursor = self._db.execute(query)
330 # will be a list of length 1 tuples
336 # will be a list of length 1 tuples
331 return [ tup[0] for tup in cursor.fetchall()]
337 return [ tup[0] for tup in cursor.fetchall()]
332
338
333 __all__ = ['SQLiteDB'] No newline at end of file
339 __all__ = ['SQLiteDB']
General Comments 0
You need to be logged in to leave comments. Login now