##// END OF EJS Templates
update parallel apps to use ProfileDir
MinRK -
Show More
@@ -1,540 +1,257 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython cluster directory
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import os
21 21 import logging
22 22 import re
23 import shutil
24 23 import sys
25 24
26 25 from subprocess import Popen, PIPE
27 26
28 from IPython.config.loader import PyFileConfigLoader, Config
29 from IPython.config.configurable import Configurable
30 from IPython.config.application import Application
31 from IPython.core.crashhandler import CrashHandler
32 from IPython.core.newapplication import BaseIPythonApplication
27 from IPython.config.loader import Config
33 28 from IPython.core import release
34 from IPython.utils.path import (
35 get_ipython_package_dir,
36 get_ipython_dir,
37 expand_path
29 from IPython.core.crashhandler import CrashHandler
30 from IPython.core.newapplication import (
31 BaseIPythonApplication,
32 base_aliases as base_ip_aliases,
33 base_flags as base_ip_flags
38 34 )
35 from IPython.utils.path import expand_path
36
39 37 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
40 38
41 39 #-----------------------------------------------------------------------------
42 40 # Module errors
43 41 #-----------------------------------------------------------------------------
44 42
45 class ClusterDirError(Exception):
46 pass
47
48
49 43 class PIDFileError(Exception):
50 44 pass
51 45
52 46
53 47 #-----------------------------------------------------------------------------
54 # Class for managing cluster directories
55 #-----------------------------------------------------------------------------
56
57 class ClusterDir(Configurable):
58 """An object to manage the cluster directory and its resources.
59
60 The cluster directory is used by :command:`ipengine`,
61 :command:`ipcontroller` and :command:`ipclsuter` to manage the
62 configuration, logging and security of these applications.
63
64 This object knows how to find, create and manage these directories. This
65 should be used by any code that want's to handle cluster directories.
66 """
67
68 security_dir_name = Unicode('security')
69 log_dir_name = Unicode('log')
70 pid_dir_name = Unicode('pid')
71 security_dir = Unicode(u'')
72 log_dir = Unicode(u'')
73 pid_dir = Unicode(u'')
74
75 auto_create = Bool(False,
76 help="""Whether to automatically create the ClusterDirectory if it does
77 not exist""")
78 overwrite = Bool(False,
79 help="""Whether to overwrite existing config files""")
80 location = Unicode(u'', config=True,
81 help="""Set the cluster dir. This overrides the logic used by the
82 `profile` option.""",
83 )
84 profile = Unicode(u'default', config=True,
85 help="""The string name of the profile to be used. This determines the name
86 of the cluster dir as: cluster_<profile>. The default profile is named
87 'default'. The cluster directory is resolve this way if the
88 `cluster_dir` option is not used."""
89 )
90
91 _location_isset = Bool(False) # flag for detecting multiply set location
92 _new_dir = Bool(False) # flag for whether a new dir was created
93
94 def __init__(self, **kwargs):
95 # make sure auto_create,overwrite are set *before* location
96 for name in ('auto_create', 'overwrite'):
97 v = kwargs.pop(name, None)
98 if v is not None:
99 setattr(self, name, v)
100 super(ClusterDir, self).__init__(**kwargs)
101 if not self.location:
102 self._profile_changed('profile', 'default', self.profile)
103
104 def _location_changed(self, name, old, new):
105 if self._location_isset:
106 raise RuntimeError("Cannot set ClusterDir more than once.")
107 self._location_isset = True
108 if not os.path.isdir(new):
109 if self.auto_create:# or self.config.ClusterDir.auto_create:
110 os.makedirs(new)
111 self._new_dir = True
112 else:
113 raise ClusterDirError('Directory not found: %s' % new)
114
115 # ensure config files exist:
116 self.copy_all_config_files(overwrite=self.overwrite)
117 self.security_dir = os.path.join(new, self.security_dir_name)
118 self.log_dir = os.path.join(new, self.log_dir_name)
119 self.pid_dir = os.path.join(new, self.pid_dir_name)
120 self.check_dirs()
121
122 def _profile_changed(self, name, old, new):
123 if self._location_isset:
124 raise RuntimeError("ClusterDir already set. Cannot set by profile.")
125 self.location = os.path.join(get_ipython_dir(), 'cluster_'+new)
126
127 def _log_dir_changed(self, name, old, new):
128 self.check_log_dir()
129
130 def check_log_dir(self):
131 if not os.path.isdir(self.log_dir):
132 os.mkdir(self.log_dir)
133
134 def _security_dir_changed(self, name, old, new):
135 self.check_security_dir()
136
137 def check_security_dir(self):
138 if not os.path.isdir(self.security_dir):
139 os.mkdir(self.security_dir, 0700)
140 os.chmod(self.security_dir, 0700)
141
142 def _pid_dir_changed(self, name, old, new):
143 self.check_pid_dir()
144
145 def check_pid_dir(self):
146 if not os.path.isdir(self.pid_dir):
147 os.mkdir(self.pid_dir, 0700)
148 os.chmod(self.pid_dir, 0700)
149
150 def check_dirs(self):
151 self.check_security_dir()
152 self.check_log_dir()
153 self.check_pid_dir()
154
155 def copy_config_file(self, config_file, path=None, overwrite=False):
156 """Copy a default config file into the active cluster directory.
157
158 Default configuration files are kept in :mod:`IPython.config.default`.
159 This function moves these from that location to the working cluster
160 directory.
161 """
162 if path is None:
163 import IPython.config.default
164 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
165 path = os.path.sep.join(path)
166 src = os.path.join(path, config_file)
167 dst = os.path.join(self.location, config_file)
168 if not os.path.isfile(dst) or overwrite:
169 shutil.copy(src, dst)
170
171 def copy_all_config_files(self, path=None, overwrite=False):
172 """Copy all config files into the active cluster directory."""
173 for f in [u'ipcontroller_config.py', u'ipengine_config.py',
174 u'ipcluster_config.py']:
175 self.copy_config_file(f, path=path, overwrite=overwrite)
176
177 @classmethod
178 def create_cluster_dir(csl, cluster_dir):
179 """Create a new cluster directory given a full path.
180
181 Parameters
182 ----------
183 cluster_dir : str
184 The full path to the cluster directory. If it does exist, it will
185 be used. If not, it will be created.
186 """
187 return ClusterDir(location=cluster_dir)
188
189 @classmethod
190 def create_cluster_dir_by_profile(cls, path, profile=u'default'):
191 """Create a cluster dir by profile name and path.
192
193 Parameters
194 ----------
195 path : str
196 The path (directory) to put the cluster directory in.
197 profile : str
198 The name of the profile. The name of the cluster directory will
199 be "cluster_<profile>".
200 """
201 if not os.path.isdir(path):
202 raise ClusterDirError('Directory not found: %s' % path)
203 cluster_dir = os.path.join(path, u'cluster_' + profile)
204 return ClusterDir(location=cluster_dir)
205
206 @classmethod
207 def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
208 """Find an existing cluster dir by profile name, return its ClusterDir.
209
210 This searches through a sequence of paths for a cluster dir. If it
211 is not found, a :class:`ClusterDirError` exception will be raised.
212
213 The search path algorithm is:
214 1. ``os.getcwd()``
215 2. ``ipython_dir``
216 3. The directories found in the ":" separated
217 :env:`IPCLUSTER_DIR_PATH` environment variable.
218
219 Parameters
220 ----------
221 ipython_dir : unicode or str
222 The IPython directory to use.
223 profile : unicode or str
224 The name of the profile. The name of the cluster directory
225 will be "cluster_<profile>".
226 """
227 dirname = u'cluster_' + profile
228 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
229 if cluster_dir_paths:
230 cluster_dir_paths = cluster_dir_paths.split(':')
231 else:
232 cluster_dir_paths = []
233 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
234 for p in paths:
235 cluster_dir = os.path.join(p, dirname)
236 if os.path.isdir(cluster_dir):
237 return ClusterDir(location=cluster_dir)
238 else:
239 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
240
241 @classmethod
242 def find_cluster_dir(cls, cluster_dir):
243 """Find/create a cluster dir and return its ClusterDir.
244
245 This will create the cluster directory if it doesn't exist.
246
247 Parameters
248 ----------
249 cluster_dir : unicode or str
250 The path of the cluster directory. This is expanded using
251 :func:`IPython.utils.genutils.expand_path`.
252 """
253 cluster_dir = expand_path(cluster_dir)
254 if not os.path.isdir(cluster_dir):
255 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
256 return ClusterDir(location=cluster_dir)
257
258
259 #-----------------------------------------------------------------------------
260 48 # Crash handler for this application
261 49 #-----------------------------------------------------------------------------
262 50
263 51
264 52 _message_template = """\
265 53 Oops, $self.app_name crashed. We do our best to make it stable, but...
266 54
267 55 A crash report was automatically generated with the following information:
268 56 - A verbatim copy of the crash traceback.
269 57 - Data on your current $self.app_name configuration.
270 58
271 59 It was left in the file named:
272 60 \t'$self.crash_report_fname'
273 61 If you can email this file to the developers, the information in it will help
274 62 them in understanding and correcting the problem.
275 63
276 64 You can mail it to: $self.contact_name at $self.contact_email
277 65 with the subject '$self.app_name Crash Report'.
278 66
279 67 If you want to do it now, the following command will work (under Unix):
280 68 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
281 69
282 70 To ensure accurate tracking of this issue, please file a report about it at:
283 71 $self.bug_tracker
284 72 """
285 73
286 class ClusterDirCrashHandler(CrashHandler):
74 class ParallelCrashHandler(CrashHandler):
287 75 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
288 76
289 77 message_template = _message_template
290 78
291 79 def __init__(self, app):
292 80 contact_name = release.authors['Min'][0]
293 81 contact_email = release.authors['Min'][1]
294 82 bug_tracker = 'http://github.com/ipython/ipython/issues'
295 super(ClusterDirCrashHandler,self).__init__(
83 super(ParallelCrashHandler,self).__init__(
296 84 app, contact_name, contact_email, bug_tracker
297 85 )
298 86
299 87
300 88 #-----------------------------------------------------------------------------
301 89 # Main application
302 90 #-----------------------------------------------------------------------------
303 base_aliases = {
304 'profile' : "ClusterDir.profile",
305 'cluster_dir' : 'ClusterDir.location',
306 'log_level' : 'ClusterApplication.log_level',
307 'work_dir' : 'ClusterApplication.work_dir',
308 'log_to_file' : 'ClusterApplication.log_to_file',
309 'clean_logs' : 'ClusterApplication.clean_logs',
310 'log_url' : 'ClusterApplication.log_url',
311 'config' : 'ClusterApplication.config_file',
312 }
91 base_aliases = {}
92 base_aliases.update(base_ip_aliases)
93 base_aliases.update({
94 'profile_dir' : 'ProfileDir.location',
95 'log_level' : 'BaseParallelApplication.log_level',
96 'work_dir' : 'BaseParallelApplication.work_dir',
97 'log_to_file' : 'BaseParallelApplication.log_to_file',
98 'clean_logs' : 'BaseParallelApplication.clean_logs',
99 'log_url' : 'BaseParallelApplication.log_url',
100 })
313 101
314 102 base_flags = {
315 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
316 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
317 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
103 'log-to-file' : ({'BaseParallelApplication' : Config({
104 'log_to_file' : True}),
105 }, "send log output to a file")
318 106 }
319 for k,v in base_flags.iteritems():
320 base_flags[k] = (Config(v[0]),v[1])
321
322 class ClusterApplication(BaseIPythonApplication):
323 """An application that puts everything into a cluster directory.
324
325 Instead of looking for things in the ipython_dir, this type of application
326 will use its own private directory called the "cluster directory"
327 for things like config files, log files, etc.
328
329 The cluster directory is resolved as follows:
107 base_flags.update(base_ip_flags)
330 108
331 * If the ``cluster_dir`` option is given, it is used.
332 * If ``cluster_dir`` is not given, the application directory is
333 resolve using the profile name as ``cluster_<profile>``. The search
334 path for this directory is then i) cwd if it is found there
335 and ii) in ipython_dir otherwise.
336
337 The config file for the application is to be put in the cluster
338 dir and named the value of the ``config_file_name`` class attribute.
109 class BaseParallelApplication(BaseIPythonApplication):
110 """The base Application for IPython.parallel apps
111
112 Principle extensions to BaseIPyythonApplication:
113
114 * work_dir
115 * remote logging via pyzmq
116 * IOLoop instance
339 117 """
340 118
341 crash_handler_class = ClusterDirCrashHandler
342 auto_create_cluster_dir = Bool(True, config=True,
343 help="whether to create the cluster_dir if it doesn't exist")
344 cluster_dir = Instance(ClusterDir)
345 classes = [ClusterDir]
119 crash_handler_class = ParallelCrashHandler
346 120
347 121 def _log_level_default(self):
348 122 # temporarily override default_log_level to INFO
349 123 return logging.INFO
350 124
351 125 work_dir = Unicode(os.getcwdu(), config=True,
352 126 help='Set the working dir for the process.'
353 127 )
354 128 def _work_dir_changed(self, name, old, new):
355 129 self.work_dir = unicode(expand_path(new))
356 130
357 131 log_to_file = Bool(config=True,
358 132 help="whether to log to a file")
359 133
360 134 clean_logs = Bool(False, config=True,
361 135 help="whether to cleanup old logfiles before starting")
362 136
363 137 log_url = Unicode('', config=True,
364 138 help="The ZMQ URL of the iplogger to aggregate logging.")
365 139
366 config_file = Unicode(u'', config=True,
367 help="""Path to ip<appname> configuration file. The default is to use
368 <appname>_config.py, as found by cluster-dir."""
369 )
370 def _config_file_paths_default(self):
371 # don't include profile dir
372 return [ os.getcwdu(), self.ipython_dir ]
373
374 def _config_file_changed(self, name, old, new):
375 if os.pathsep in new:
376 path, new = new.rsplit(os.pathsep)
377 self.config_file_paths.insert(0, path)
378 self.config_file_name = new
379
380 config_file_name = Unicode('')
140 def _config_files_default(self):
141 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
381 142
382 143 loop = Instance('zmq.eventloop.ioloop.IOLoop')
383 144 def _loop_default(self):
384 145 from zmq.eventloop.ioloop import IOLoop
385 146 return IOLoop.instance()
386 147
387 148 aliases = Dict(base_aliases)
388 149 flags = Dict(base_flags)
389
390 def init_clusterdir(self):
391 """This resolves the cluster directory.
392
393 This tries to find the cluster directory and if successful, it will
394 have done:
395 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
396 the application.
397 * Sets ``self.cluster_dir`` attribute of the application and config
398 objects.
399
400 The algorithm used for this is as follows:
401 1. Try ``Global.cluster_dir``.
402 2. Try using ``Global.profile``.
403 3. If both of these fail and ``self.auto_create_cluster_dir`` is
404 ``True``, then create the new cluster dir in the IPython directory.
405 4. If all fails, then raise :class:`ClusterDirError`.
406 """
407 try:
408 self.cluster_dir = ClusterDir(auto_create=self.auto_create_cluster_dir, config=self.config)
409 except ClusterDirError as e:
410 self.log.fatal("Error initializing cluster dir: %s"%e)
411 self.log.fatal("A cluster dir must be created before running this command.")
412 self.log.fatal("Do 'ipcluster create -h' or 'ipcluster list -h' for more "
413 "information about creating and listing cluster dirs."
414 )
415 self.exit(1)
416
417 if self.cluster_dir._new_dir:
418 self.log.info('Creating new cluster dir: %s' % \
419 self.cluster_dir.location)
420 else:
421 self.log.info('Using existing cluster dir: %s' % \
422 self.cluster_dir.location)
423
424 # insert after cwd:
425 self.config_file_paths.insert(1, self.cluster_dir.location)
426 150
427 151 def initialize(self, argv=None):
428 152 """initialize the app"""
429 self.init_crash_handler()
430 self.parse_command_line(argv)
431 cl_config = self.config
432 self.init_clusterdir()
433 self.load_config_file()
434 # command-line should *override* config file, but command-line is necessary
435 # to determine clusterdir, etc.
436 self.update_config(cl_config)
153 super(BaseParallelApplication, self).initialize(argv)
437 154 self.to_work_dir()
438 155 self.reinit_logging()
439 156
440 157 def to_work_dir(self):
441 158 wd = self.work_dir
442 159 if unicode(wd) != os.getcwdu():
443 160 os.chdir(wd)
444 161 self.log.info("Changing to working dir: %s" % wd)
445 162 # This is the working dir by now.
446 163 sys.path.insert(0, '')
447 164
448 165 def reinit_logging(self):
449 166 # Remove old log files
450 log_dir = self.cluster_dir.log_dir
167 log_dir = self.profile_dir.log_dir
451 168 if self.clean_logs:
452 169 for f in os.listdir(log_dir):
453 170 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
454 171 os.remove(os.path.join(log_dir, f))
455 172 if self.log_to_file:
456 173 # Start logging to the new log file
457 174 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
458 175 logfile = os.path.join(log_dir, log_filename)
459 176 open_log_file = open(logfile, 'w')
460 177 else:
461 178 open_log_file = None
462 179 if open_log_file is not None:
463 180 self.log.removeHandler(self._log_handler)
464 181 self._log_handler = logging.StreamHandler(open_log_file)
465 182 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
466 183 self._log_handler.setFormatter(self._log_formatter)
467 184 self.log.addHandler(self._log_handler)
468 185
469 186 def write_pid_file(self, overwrite=False):
470 187 """Create a .pid file in the pid_dir with my pid.
471 188
472 189 This must be called after pre_construct, which sets `self.pid_dir`.
473 190 This raises :exc:`PIDFileError` if the pid file exists already.
474 191 """
475 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
192 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
476 193 if os.path.isfile(pid_file):
477 194 pid = self.get_pid_from_file()
478 195 if not overwrite:
479 196 raise PIDFileError(
480 197 'The pid file [%s] already exists. \nThis could mean that this '
481 198 'server is already running with [pid=%s].' % (pid_file, pid)
482 199 )
483 200 with open(pid_file, 'w') as f:
484 201 self.log.info("Creating pid file: %s" % pid_file)
485 202 f.write(repr(os.getpid())+'\n')
486 203
487 204 def remove_pid_file(self):
488 205 """Remove the pid file.
489 206
490 207 This should be called at shutdown by registering a callback with
491 208 :func:`reactor.addSystemEventTrigger`. This needs to return
492 209 ``None``.
493 210 """
494 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
211 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
495 212 if os.path.isfile(pid_file):
496 213 try:
497 214 self.log.info("Removing pid file: %s" % pid_file)
498 215 os.remove(pid_file)
499 216 except:
500 217 self.log.warn("Error removing the pid file: %s" % pid_file)
501 218
502 219 def get_pid_from_file(self):
503 220 """Get the pid from the pid file.
504 221
505 222 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
506 223 """
507 pid_file = os.path.join(self.cluster_dir.pid_dir, self.name + u'.pid')
224 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
508 225 if os.path.isfile(pid_file):
509 226 with open(pid_file, 'r') as f:
510 227 pid = int(f.read().strip())
511 228 return pid
512 229 else:
513 230 raise PIDFileError('pid file not found: %s' % pid_file)
514 231
515 232 def check_pid(self, pid):
516 233 if os.name == 'nt':
517 234 try:
518 235 import ctypes
519 236 # returns 0 if no such process (of ours) exists
520 237 # positive int otherwise
521 238 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
522 239 except Exception:
523 240 self.log.warn(
524 241 "Could not determine whether pid %i is running via `OpenProcess`. "
525 242 " Making the likely assumption that it is."%pid
526 243 )
527 244 return True
528 245 return bool(p)
529 246 else:
530 247 try:
531 248 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
532 249 output,_ = p.communicate()
533 250 except OSError:
534 251 self.log.warn(
535 252 "Could not determine whether pid %i is running via `ps x`. "
536 253 " Making the likely assumption that it is."%pid
537 254 )
538 255 return True
539 256 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
540 257 return pid in pids
@@ -1,540 +1,521 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import errno
19 19 import logging
20 20 import os
21 21 import re
22 22 import signal
23 23
24 24 from subprocess import check_call, CalledProcessError, PIPE
25 25 import zmq
26 26 from zmq.eventloop import ioloop
27 27
28 28 from IPython.config.application import Application, boolean_flag
29 29 from IPython.config.loader import Config
30 from IPython.core.newapplication import BaseIPythonApplication
30 from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
31 31 from IPython.utils.importstring import import_item
32 32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
33 33
34 34 from IPython.parallel.apps.clusterdir import (
35 ClusterApplication, ClusterDirError, ClusterDir,
35 BaseParallelApplication,
36 36 PIDFileError,
37 37 base_flags, base_aliases
38 38 )
39 39
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Module level variables
43 43 #-----------------------------------------------------------------------------
44 44
45 45
46 46 default_config_file_name = u'ipcluster_config.py'
47 47
48 48
49 49 _description = """Start an IPython cluster for parallel computing.
50 50
51 51 An IPython cluster consists of 1 controller and 1 or more engines.
52 52 This command automates the startup of these processes using a wide
53 53 range of startup methods (SSH, local processes, PBS, mpiexec,
54 54 Windows HPC Server 2008). To start a cluster with 4 engines on your
55 55 local host simply do 'ipcluster start n=4'. For more complex usage
56 56 you will typically do 'ipcluster create profile=mycluster', then edit
57 57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
58 58 """
59 59
60 60
61 61 # Exit codes for ipcluster
62 62
63 63 # This will be the exit code if the ipcluster appears to be running because
64 64 # a .pid file exists
65 65 ALREADY_STARTED = 10
66 66
67 67
68 68 # This will be the exit code if ipcluster stop is run, but there is not .pid
69 69 # file to be found.
70 70 ALREADY_STOPPED = 11
71 71
72 72 # This will be the exit code if ipcluster engines is run, but there is not .pid
73 73 # file to be found.
74 74 NO_CLUSTER = 12
75 75
76 76
77 77 #-----------------------------------------------------------------------------
78 78 # Main application
79 79 #-----------------------------------------------------------------------------
80 80 start_help = """Start an IPython cluster for parallel computing
81 81
82 82 Start an ipython cluster by its profile name or cluster
83 83 directory. Cluster directories contain configuration, log and
84 84 security related files and are named using the convention
85 85 'cluster_<profile>' and should be creating using the 'start'
86 86 subcommand of 'ipcluster'. If your cluster directory is in
87 87 the cwd or the ipython directory, you can simply refer to it
88 88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
89 otherwise use the 'cluster_dir' option.
89 otherwise use the 'profile_dir' option.
90 90 """
91 91 stop_help = """Stop a running IPython cluster
92 92
93 93 Stop a running ipython cluster by its profile name or cluster
94 94 directory. Cluster directories are named using the convention
95 95 'cluster_<profile>'. If your cluster directory is in
96 96 the cwd or the ipython directory, you can simply refer to it
97 97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
98 use the 'cluster_dir' option.
98 use the 'profile_dir' option.
99 99 """
100 100 engines_help = """Start engines connected to an existing IPython cluster
101 101
102 102 Start one or more engines to connect to an existing Cluster
103 103 by profile name or cluster directory.
104 104 Cluster directories contain configuration, log and
105 105 security related files and are named using the convention
106 106 'cluster_<profile>' and should be creating using the 'start'
107 107 subcommand of 'ipcluster'. If your cluster directory is in
108 108 the cwd or the ipython directory, you can simply refer to it
109 109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
110 otherwise use the 'cluster_dir' option.
110 otherwise use the 'profile_dir' option.
111 111 """
112 112 create_help = """Create an ipcluster profile by name
113 113
114 114 Create an ipython cluster directory by its profile name or
115 115 cluster directory path. Cluster directories contain
116 116 configuration, log and security related files and are named
117 117 using the convention 'cluster_<profile>'. By default they are
118 118 located in your ipython directory. Once created, you will
119 119 probably need to edit the configuration files in the cluster
120 120 directory to configure your cluster. Most users will create a
121 121 cluster directory by profile name,
122 122 `ipcluster create profile=mycluster`, which will put the directory
123 123 in `<ipython_dir>/cluster_mycluster`.
124 124 """
125 125 list_help = """List available cluster profiles
126 126
127 127 List all available clusters, by cluster directory, that can
128 128 be found in the current working directly or in the ipython
129 129 directory. Cluster directories are named using the convention
130 130 'cluster_<profile>'.
131 131 """
132 132
133 133
134 134 class IPClusterList(BaseIPythonApplication):
135 135 name = u'ipcluster-list'
136 136 description = list_help
137 137
138 138 # empty aliases
139 139 aliases=Dict()
140 140 flags = Dict(base_flags)
141 141
142 142 def _log_level_default(self):
143 143 return 20
144 144
145 def list_cluster_dirs(self):
145 def list_profile_dirs(self):
146 146 # Find the search paths
147 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
148 if cluster_dir_paths:
149 cluster_dir_paths = cluster_dir_paths.split(':')
147 profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
148 if profile_dir_paths:
149 profile_dir_paths = profile_dir_paths.split(':')
150 150 else:
151 cluster_dir_paths = []
151 profile_dir_paths = []
152 152
153 153 ipython_dir = self.ipython_dir
154 154
155 paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
155 paths = [os.getcwd(), ipython_dir] + profile_dir_paths
156 156 paths = list(set(paths))
157 157
158 self.log.info('Searching for cluster dirs in paths: %r' % paths)
158 self.log.info('Searching for cluster profiles in paths: %r' % paths)
159 159 for path in paths:
160 160 files = os.listdir(path)
161 161 for f in files:
162 162 full_path = os.path.join(path, f)
163 if os.path.isdir(full_path) and f.startswith('cluster_'):
164 profile = full_path.split('_')[-1]
163 if os.path.isdir(full_path) and f.startswith('profile_') and \
164 os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
165 profile = f.split('_')[-1]
165 166 start_cmd = 'ipcluster start profile=%s n=4' % profile
166 167 print start_cmd + " ==> " + full_path
167 168
168 169 def start(self):
169 self.list_cluster_dirs()
170 self.list_profile_dirs()
171
172
173 # `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
170 174
171 175 create_flags = {}
172 176 create_flags.update(base_flags)
173 create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
177 create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
174 178 "reset config files to defaults", "leave existing config files"))
175 179
176 class IPClusterCreate(ClusterApplication):
177 name = u'ipcluster'
180 class IPClusterCreate(BaseParallelApplication):
181 name = u'ipcluster-create'
178 182 description = create_help
179 auto_create_cluster_dir = Bool(True,
180 help="whether to create the cluster_dir if it doesn't exist")
183 auto_create = Bool(True)
181 184 config_file_name = Unicode(default_config_file_name)
182 185
183 reset = Bool(False, config=True,
184 help="Whether to reset config files as part of 'create'."
185 )
186
187 186 flags = Dict(create_flags)
188 187
189 aliases = Dict(dict(profile='ClusterDir.profile'))
188 aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
190 189
191 classes = [ClusterDir]
190 classes = [ProfileDir]
192 191
193 def init_clusterdir(self):
194 super(IPClusterCreate, self).init_clusterdir()
195 self.log.info('Copying default config files to cluster directory '
196 '[overwrite=%r]' % (self.reset,))
197 self.cluster_dir.copy_all_config_files(overwrite=self.reset)
198
199 def initialize(self, argv=None):
200 self.parse_command_line(argv)
201 self.init_clusterdir()
202 192
203 193 stop_aliases = dict(
204 194 signal='IPClusterStop.signal',
205 profile='ClusterDir.profile',
206 cluster_dir='ClusterDir.location',
195 profile='BaseIPythonApplication.profile',
196 profile_dir='ProfileDir.location',
207 197 )
208 198
209 class IPClusterStop(ClusterApplication):
199 class IPClusterStop(BaseParallelApplication):
210 200 name = u'ipcluster'
211 201 description = stop_help
212 auto_create_cluster_dir = Bool(False)
213 202 config_file_name = Unicode(default_config_file_name)
214 203
215 204 signal = Int(signal.SIGINT, config=True,
216 205 help="signal to use for stopping processes.")
217 206
218 207 aliases = Dict(stop_aliases)
219 208
220 def init_clusterdir(self):
221 try:
222 super(IPClusterStop, self).init_clusterdir()
223 except ClusterDirError as e:
224 self.log.fatal("Failed ClusterDir init: %s"%e)
225 self.exit(1)
226
227 209 def start(self):
228 210 """Start the app for the stop subcommand."""
229 211 try:
230 212 pid = self.get_pid_from_file()
231 213 except PIDFileError:
232 214 self.log.critical(
233 215 'Could not read pid file, cluster is probably not running.'
234 216 )
235 217 # Here I exit with a unusual exit status that other processes
236 218 # can watch for to learn how I existed.
237 219 self.remove_pid_file()
238 220 self.exit(ALREADY_STOPPED)
239 221
240 222 if not self.check_pid(pid):
241 223 self.log.critical(
242 224 'Cluster [pid=%r] is not running.' % pid
243 225 )
244 226 self.remove_pid_file()
245 227 # Here I exit with a unusual exit status that other processes
246 228 # can watch for to learn how I existed.
247 229 self.exit(ALREADY_STOPPED)
248 230
249 231 elif os.name=='posix':
250 232 sig = self.signal
251 233 self.log.info(
252 234 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
253 235 )
254 236 try:
255 237 os.kill(pid, sig)
256 238 except OSError:
257 239 self.log.error("Stopping cluster failed, assuming already dead.",
258 240 exc_info=True)
259 241 self.remove_pid_file()
260 242 elif os.name=='nt':
261 243 try:
262 244 # kill the whole tree
263 245 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
264 246 except (CalledProcessError, OSError):
265 247 self.log.error("Stopping cluster failed, assuming already dead.",
266 248 exc_info=True)
267 249 self.remove_pid_file()
268 250
269 251 engine_aliases = {}
270 252 engine_aliases.update(base_aliases)
271 253 engine_aliases.update(dict(
272 254 n='IPClusterEngines.n',
273 255 elauncher = 'IPClusterEngines.engine_launcher_class',
274 256 ))
275 class IPClusterEngines(ClusterApplication):
257 class IPClusterEngines(BaseParallelApplication):
276 258
277 259 name = u'ipcluster'
278 260 description = engines_help
279 261 usage = None
280 262 config_file_name = Unicode(default_config_file_name)
281 263 default_log_level = logging.INFO
282 auto_create_cluster_dir = Bool(False)
283 264 classes = List()
284 265 def _classes_default(self):
285 266 from IPython.parallel.apps import launcher
286 267 launchers = launcher.all_launchers
287 268 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
288 return [ClusterDir]+eslaunchers
269 return [ProfileDir]+eslaunchers
289 270
290 271 n = Int(2, config=True,
291 272 help="The number of engines to start.")
292 273
293 274 engine_launcher_class = Unicode('LocalEngineSetLauncher',
294 275 config=True,
295 276 help="The class for launching a set of Engines."
296 277 )
297 278 daemonize = Bool(False, config=True,
298 279 help='Daemonize the ipcluster program. This implies --log-to-file')
299 280
300 281 def _daemonize_changed(self, name, old, new):
301 282 if new:
302 283 self.log_to_file = True
303 284
304 285 aliases = Dict(engine_aliases)
305 286 # flags = Dict(flags)
306 287 _stopping = False
307 288
308 289 def initialize(self, argv=None):
309 290 super(IPClusterEngines, self).initialize(argv)
310 291 self.init_signal()
311 292 self.init_launchers()
312 293
313 294 def init_launchers(self):
314 295 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
315 296 self.engine_launcher.on_stop(lambda r: self.loop.stop())
316 297
317 298 def init_signal(self):
318 299 # Setup signals
319 300 signal.signal(signal.SIGINT, self.sigint_handler)
320 301
321 302 def build_launcher(self, clsname):
322 303 """import and instantiate a Launcher based on importstring"""
323 304 if '.' not in clsname:
324 305 # not a module, presume it's the raw name in apps.launcher
325 306 clsname = 'IPython.parallel.apps.launcher.'+clsname
326 307 # print repr(clsname)
327 308 klass = import_item(clsname)
328 309
329 310 launcher = klass(
330 work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
311 work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
331 312 )
332 313 return launcher
333 314
334 315 def start_engines(self):
335 316 self.log.info("Starting %i engines"%self.n)
336 317 self.engine_launcher.start(
337 318 self.n,
338 cluster_dir=self.cluster_dir.location
319 profile_dir=self.profile_dir.location
339 320 )
340 321
341 322 def stop_engines(self):
342 323 self.log.info("Stopping Engines...")
343 324 if self.engine_launcher.running:
344 325 d = self.engine_launcher.stop()
345 326 return d
346 327 else:
347 328 return None
348 329
349 330 def stop_launchers(self, r=None):
350 331 if not self._stopping:
351 332 self._stopping = True
352 333 self.log.error("IPython cluster: stopping")
353 334 self.stop_engines()
354 335 # Wait a few seconds to let things shut down.
355 336 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
356 337 dc.start()
357 338
358 339 def sigint_handler(self, signum, frame):
359 340 self.log.debug("SIGINT received, stopping launchers...")
360 341 self.stop_launchers()
361 342
362 343 def start_logging(self):
363 344 # Remove old log files of the controller and engine
364 345 if self.clean_logs:
365 log_dir = self.cluster_dir.log_dir
346 log_dir = self.profile_dir.log_dir
366 347 for f in os.listdir(log_dir):
367 348 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
368 349 os.remove(os.path.join(log_dir, f))
369 350 # This will remove old log files for ipcluster itself
370 # super(IPClusterApp, self).start_logging()
351 # super(IPBaseParallelApplication, self).start_logging()
371 352
372 353 def start(self):
373 354 """Start the app for the engines subcommand."""
374 355 self.log.info("IPython cluster: started")
375 356 # First see if the cluster is already running
376 357
377 358 # Now log and daemonize
378 359 self.log.info(
379 360 'Starting engines with [daemon=%r]' % self.daemonize
380 361 )
381 362 # TODO: Get daemonize working on Windows or as a Windows Server.
382 363 if self.daemonize:
383 364 if os.name=='posix':
384 365 from twisted.scripts._twistd_unix import daemonize
385 366 daemonize()
386 367
387 368 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
388 369 dc.start()
389 370 # Now write the new pid file AFTER our new forked pid is active.
390 371 # self.write_pid_file()
391 372 try:
392 373 self.loop.start()
393 374 except KeyboardInterrupt:
394 375 pass
395 376 except zmq.ZMQError as e:
396 377 if e.errno == errno.EINTR:
397 378 pass
398 379 else:
399 380 raise
400 381
401 382 start_aliases = {}
402 383 start_aliases.update(engine_aliases)
403 384 start_aliases.update(dict(
404 385 delay='IPClusterStart.delay',
405 386 clean_logs='IPClusterStart.clean_logs',
406 387 ))
407 388
408 389 class IPClusterStart(IPClusterEngines):
409 390
410 391 name = u'ipcluster'
411 392 description = start_help
412 393 default_log_level = logging.INFO
413 auto_create_cluster_dir = Bool(True, config=True,
414 help="whether to create the cluster_dir if it doesn't exist")
394 auto_create = Bool(True, config=True,
395 help="whether to create the profile_dir if it doesn't exist")
415 396 classes = List()
416 397 def _classes_default(self,):
417 398 from IPython.parallel.apps import launcher
418 return [ClusterDir]+launcher.all_launchers
399 return [ProfileDir]+launcher.all_launchers
419 400
420 401 clean_logs = Bool(True, config=True,
421 402 help="whether to cleanup old logs before starting")
422 403
423 404 delay = CFloat(1., config=True,
424 405 help="delay (in s) between starting the controller and the engines")
425 406
426 407 controller_launcher_class = Unicode('LocalControllerLauncher',
427 408 config=True,
428 409 help="The class for launching a Controller."
429 410 )
430 411 reset = Bool(False, config=True,
431 412 help="Whether to reset config files as part of '--create'."
432 413 )
433 414
434 415 # flags = Dict(flags)
435 416 aliases = Dict(start_aliases)
436 417
437 418 def init_launchers(self):
438 419 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
439 420 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
440 421 self.controller_launcher.on_stop(self.stop_launchers)
441 422
442 423 def start_controller(self):
443 424 self.controller_launcher.start(
444 cluster_dir=self.cluster_dir.location
425 profile_dir=self.profile_dir.location
445 426 )
446 427
447 428 def stop_controller(self):
448 429 # self.log.info("In stop_controller")
449 430 if self.controller_launcher and self.controller_launcher.running:
450 431 return self.controller_launcher.stop()
451 432
452 433 def stop_launchers(self, r=None):
453 434 if not self._stopping:
454 435 self.stop_controller()
455 436 super(IPClusterStart, self).stop_launchers()
456 437
457 438 def start(self):
458 439 """Start the app for the start subcommand."""
459 440 # First see if the cluster is already running
460 441 try:
461 442 pid = self.get_pid_from_file()
462 443 except PIDFileError:
463 444 pass
464 445 else:
465 446 if self.check_pid(pid):
466 447 self.log.critical(
467 448 'Cluster is already running with [pid=%s]. '
468 449 'use "ipcluster stop" to stop the cluster.' % pid
469 450 )
470 451 # Here I exit with a unusual exit status that other processes
471 452 # can watch for to learn how I existed.
472 453 self.exit(ALREADY_STARTED)
473 454 else:
474 455 self.remove_pid_file()
475 456
476 457
477 458 # Now log and daemonize
478 459 self.log.info(
479 460 'Starting ipcluster with [daemon=%r]' % self.daemonize
480 461 )
481 462 # TODO: Get daemonize working on Windows or as a Windows Server.
482 463 if self.daemonize:
483 464 if os.name=='posix':
484 465 from twisted.scripts._twistd_unix import daemonize
485 466 daemonize()
486 467
487 468 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
488 469 dc.start()
489 470 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
490 471 dc.start()
491 472 # Now write the new pid file AFTER our new forked pid is active.
492 473 self.write_pid_file()
493 474 try:
494 475 self.loop.start()
495 476 except KeyboardInterrupt:
496 477 pass
497 478 except zmq.ZMQError as e:
498 479 if e.errno == errno.EINTR:
499 480 pass
500 481 else:
501 482 raise
502 483 finally:
503 484 self.remove_pid_file()
504 485
505 486 base='IPython.parallel.apps.ipclusterapp.IPCluster'
506 487
507 class IPClusterApp(Application):
488 class IPBaseParallelApplication(Application):
508 489 name = u'ipcluster'
509 490 description = _description
510 491
511 492 subcommands = {'create' : (base+'Create', create_help),
512 493 'list' : (base+'List', list_help),
513 494 'start' : (base+'Start', start_help),
514 495 'stop' : (base+'Stop', stop_help),
515 496 'engines' : (base+'Engines', engines_help),
516 497 }
517 498
518 499 # no aliases or flags for parent App
519 500 aliases = Dict()
520 501 flags = Dict()
521 502
522 503 def start(self):
523 504 if self.subapp is None:
524 505 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
525 506 print
526 507 self.print_subcommands()
527 508 self.exit(1)
528 509 else:
529 510 return self.subapp.start()
530 511
531 512 def launch_new_instance():
532 513 """Create and run the IPython cluster."""
533 app = IPClusterApp()
514 app = IPBaseParallelApplication()
534 515 app.initialize()
535 516 app.start()
536 517
537 518
538 519 if __name__ == '__main__':
539 520 launch_new_instance()
540 521
@@ -1,404 +1,399 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 import copy
21 20 import os
22 import logging
23 21 import socket
24 22 import stat
25 23 import sys
26 24 import uuid
27 25
28 26 from multiprocessing import Process
29 27
30 28 import zmq
31 29 from zmq.devices import ProcessMonitoredQueue
32 30 from zmq.log.handlers import PUBHandler
33 31 from zmq.utils import jsonapi as json
34 32
35 33 from IPython.config.loader import Config
36
37 from IPython.parallel import factory
34 from IPython.core.newapplication import ProfileDir
38 35
39 36 from IPython.parallel.apps.clusterdir import (
40 ClusterDir,
41 ClusterApplication,
37 BaseParallelApplication,
42 38 base_flags
43 # ClusterDirConfigLoader
44 39 )
45 40 from IPython.utils.importstring import import_item
46 41 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
47 42
48 43 # from IPython.parallel.controller.controller import ControllerFactory
49 44 from IPython.parallel.streamsession import StreamSession
50 45 from IPython.parallel.controller.heartmonitor import HeartMonitor
51 from IPython.parallel.controller.hub import Hub, HubFactory
46 from IPython.parallel.controller.hub import HubFactory
52 47 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
53 48 from IPython.parallel.controller.sqlitedb import SQLiteDB
54 49
55 from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
50 from IPython.parallel.util import signal_children, split_url
56 51
57 52 # conditional import of MongoDB backend class
58 53
59 54 try:
60 55 from IPython.parallel.controller.mongodb import MongoDB
61 56 except ImportError:
62 57 maybe_mongo = []
63 58 else:
64 59 maybe_mongo = [MongoDB]
65 60
66 61
67 62 #-----------------------------------------------------------------------------
68 63 # Module level variables
69 64 #-----------------------------------------------------------------------------
70 65
71 66
72 67 #: The default config file name for this application
73 68 default_config_file_name = u'ipcontroller_config.py'
74 69
75 70
76 71 _description = """Start the IPython controller for parallel computing.
77 72
78 73 The IPython controller provides a gateway between the IPython engines and
79 74 clients. The controller needs to be started before the engines and can be
80 75 configured using command line options or using a cluster directory. Cluster
81 76 directories contain config, log and security files and are usually located in
82 77 your ipython directory and named as "cluster_<profile>". See the `profile`
83 and `cluster_dir` options for details.
78 and `profile_dir` options for details.
84 79 """
85 80
86 81
87 82
88 83
89 84 #-----------------------------------------------------------------------------
90 85 # The main application
91 86 #-----------------------------------------------------------------------------
92 87 flags = {}
93 88 flags.update(base_flags)
94 89 flags.update({
95 90 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
96 91 'Use threads instead of processes for the schedulers'),
97 92 'sqlitedb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'})},
98 93 'use the SQLiteDB backend'),
99 94 'mongodb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'})},
100 95 'use the MongoDB backend'),
101 96 'dictdb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.dictdb.DictDB'})},
102 97 'use the in-memory DictDB backend'),
103 98 'reuse' : ({'IPControllerApp' : Config({'reuse_files' : True})},
104 99 'reuse existing json connection files')
105 100 })
106 101
107 102 flags.update()
108 103
109 class IPControllerApp(ClusterApplication):
104 class IPControllerApp(BaseParallelApplication):
110 105
111 106 name = u'ipcontroller'
112 107 description = _description
113 108 config_file_name = Unicode(default_config_file_name)
114 classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
109 classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
110
111 # change default to True
112 auto_create = Bool(True, config=True,
113 help="""Whether to create profile dir if it doesn't exist""")
115 114
116 auto_create_cluster_dir = Bool(True, config=True,
117 help="Whether to create cluster_dir if it exists.")
118 115 reuse_files = Bool(False, config=True,
119 116 help='Whether to reuse existing json connection files [default: False]'
120 117 )
121 118 secure = Bool(True, config=True,
122 119 help='Whether to use exec_keys for extra authentication [default: True]'
123 120 )
124 121 ssh_server = Unicode(u'', config=True,
125 122 help="""ssh url for clients to use when connecting to the Controller
126 123 processes. It should be of the form: [user@]server[:port]. The
127 124 Controller\'s listening addresses must be accessible from the ssh server""",
128 125 )
129 126 location = Unicode(u'', config=True,
130 127 help="""The external IP or domain name of the Controller, used for disambiguating
131 128 engine and client connections.""",
132 129 )
133 130 import_statements = List([], config=True,
134 131 help="import statements to be run at startup. Necessary in some environments"
135 132 )
136 133
137 134 use_threads = Bool(False, config=True,
138 135 help='Use threads instead of processes for the schedulers',
139 136 )
140 137
141 138 # internal
142 139 children = List()
143 140 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
144 141
145 142 def _use_threads_changed(self, name, old, new):
146 143 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
147 144
148 145 aliases = Dict(dict(
149 config = 'IPControllerApp.config_file',
150 # file = 'IPControllerApp.url_file',
151 146 log_level = 'IPControllerApp.log_level',
152 147 log_url = 'IPControllerApp.log_url',
153 148 reuse_files = 'IPControllerApp.reuse_files',
154 149 secure = 'IPControllerApp.secure',
155 150 ssh = 'IPControllerApp.ssh_server',
156 151 use_threads = 'IPControllerApp.use_threads',
157 152 import_statements = 'IPControllerApp.import_statements',
158 153 location = 'IPControllerApp.location',
159 154
160 155 ident = 'StreamSession.session',
161 156 user = 'StreamSession.username',
162 157 exec_key = 'StreamSession.keyfile',
163 158
164 159 url = 'HubFactory.url',
165 160 ip = 'HubFactory.ip',
166 161 transport = 'HubFactory.transport',
167 162 port = 'HubFactory.regport',
168 163
169 164 ping = 'HeartMonitor.period',
170 165
171 166 scheme = 'TaskScheduler.scheme_name',
172 167 hwm = 'TaskScheduler.hwm',
173 168
174 169
175 profile = "ClusterDir.profile",
176 cluster_dir = 'ClusterDir.location',
170 profile = "BaseIPythonApplication.profile",
171 profile_dir = 'ProfileDir.location',
177 172
178 173 ))
179 174 flags = Dict(flags)
180 175
181 176
182 177 def save_connection_dict(self, fname, cdict):
183 178 """save a connection dict to json file."""
184 179 c = self.config
185 180 url = cdict['url']
186 181 location = cdict['location']
187 182 if not location:
188 183 try:
189 184 proto,ip,port = split_url(url)
190 185 except AssertionError:
191 186 pass
192 187 else:
193 188 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
194 189 cdict['location'] = location
195 fname = os.path.join(self.cluster_dir.security_dir, fname)
190 fname = os.path.join(self.profile_dir.security_dir, fname)
196 191 with open(fname, 'w') as f:
197 192 f.write(json.dumps(cdict, indent=2))
198 193 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
199 194
200 195 def load_config_from_json(self):
201 196 """load config from existing json connector files."""
202 197 c = self.config
203 198 # load from engine config
204 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
199 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
205 200 cfg = json.loads(f.read())
206 201 key = c.StreamSession.key = cfg['exec_key']
207 202 xport,addr = cfg['url'].split('://')
208 203 c.HubFactory.engine_transport = xport
209 204 ip,ports = addr.split(':')
210 205 c.HubFactory.engine_ip = ip
211 206 c.HubFactory.regport = int(ports)
212 207 self.location = cfg['location']
213 208
214 209 # load client config
215 with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
210 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
216 211 cfg = json.loads(f.read())
217 212 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
218 213 xport,addr = cfg['url'].split('://')
219 214 c.HubFactory.client_transport = xport
220 215 ip,ports = addr.split(':')
221 216 c.HubFactory.client_ip = ip
222 217 self.ssh_server = cfg['ssh']
223 218 assert int(ports) == c.HubFactory.regport, "regport mismatch"
224 219
225 220 def init_hub(self):
226 221 c = self.config
227 222
228 223 self.do_import_statements()
229 224 reusing = self.reuse_files
230 225 if reusing:
231 226 try:
232 227 self.load_config_from_json()
233 228 except (AssertionError,IOError):
234 229 reusing=False
235 230 # check again, because reusing may have failed:
236 231 if reusing:
237 232 pass
238 233 elif self.secure:
239 234 key = str(uuid.uuid4())
240 # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
235 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
241 236 # with open(keyfile, 'w') as f:
242 237 # f.write(key)
243 238 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
244 239 c.StreamSession.key = key
245 240 else:
246 241 key = c.StreamSession.key = ''
247 242
248 243 try:
249 244 self.factory = HubFactory(config=c, log=self.log)
250 245 # self.start_logging()
251 246 self.factory.init_hub()
252 247 except:
253 248 self.log.error("Couldn't construct the Controller", exc_info=True)
254 249 self.exit(1)
255 250
256 251 if not reusing:
257 252 # save to new json config files
258 253 f = self.factory
259 254 cdict = {'exec_key' : key,
260 255 'ssh' : self.ssh_server,
261 256 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
262 257 'location' : self.location
263 258 }
264 259 self.save_connection_dict('ipcontroller-client.json', cdict)
265 260 edict = cdict
266 261 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
267 262 self.save_connection_dict('ipcontroller-engine.json', edict)
268 263
269 264 #
270 265 def init_schedulers(self):
271 266 children = self.children
272 267 mq = import_item(str(self.mq_class))
273 268
274 269 hub = self.factory
275 270 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
276 271 # IOPub relay (in a Process)
277 272 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
278 273 q.bind_in(hub.client_info['iopub'])
279 274 q.bind_out(hub.engine_info['iopub'])
280 275 q.setsockopt_out(zmq.SUBSCRIBE, '')
281 276 q.connect_mon(hub.monitor_url)
282 277 q.daemon=True
283 278 children.append(q)
284 279
285 280 # Multiplexer Queue (in a Process)
286 281 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
287 282 q.bind_in(hub.client_info['mux'])
288 283 q.setsockopt_in(zmq.IDENTITY, 'mux')
289 284 q.bind_out(hub.engine_info['mux'])
290 285 q.connect_mon(hub.monitor_url)
291 286 q.daemon=True
292 287 children.append(q)
293 288
294 289 # Control Queue (in a Process)
295 290 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
296 291 q.bind_in(hub.client_info['control'])
297 292 q.setsockopt_in(zmq.IDENTITY, 'control')
298 293 q.bind_out(hub.engine_info['control'])
299 294 q.connect_mon(hub.monitor_url)
300 295 q.daemon=True
301 296 children.append(q)
302 297 try:
303 298 scheme = self.config.TaskScheduler.scheme_name
304 299 except AttributeError:
305 300 scheme = TaskScheduler.scheme_name.get_default_value()
306 301 # Task Queue (in a Process)
307 302 if scheme == 'pure':
308 303 self.log.warn("task::using pure XREQ Task scheduler")
309 304 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
310 305 # q.setsockopt_out(zmq.HWM, hub.hwm)
311 306 q.bind_in(hub.client_info['task'][1])
312 307 q.setsockopt_in(zmq.IDENTITY, 'task')
313 308 q.bind_out(hub.engine_info['task'])
314 309 q.connect_mon(hub.monitor_url)
315 310 q.daemon=True
316 311 children.append(q)
317 312 elif scheme == 'none':
318 313 self.log.warn("task::using no Task scheduler")
319 314
320 315 else:
321 316 self.log.info("task::using Python %s Task scheduler"%scheme)
322 317 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
323 318 hub.monitor_url, hub.client_info['notification'])
324 319 kwargs = dict(logname='scheduler', loglevel=self.log_level,
325 320 log_url = self.log_url, config=dict(self.config))
326 321 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
327 322 q.daemon=True
328 323 children.append(q)
329 324
330 325
331 326 def save_urls(self):
332 327 """save the registration urls to files."""
333 328 c = self.config
334 329
335 sec_dir = self.cluster_dir.security_dir
330 sec_dir = self.profile_dir.security_dir
336 331 cf = self.factory
337 332
338 333 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
339 334 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
340 335
341 336 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
342 337 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
343 338
344 339
345 340 def do_import_statements(self):
346 341 statements = self.import_statements
347 342 for s in statements:
348 343 try:
349 344 self.log.msg("Executing statement: '%s'" % s)
350 345 exec s in globals(), locals()
351 346 except:
352 347 self.log.msg("Error running statement: %s" % s)
353 348
354 349 def forward_logging(self):
355 350 if self.log_url:
356 351 self.log.info("Forwarding logging to %s"%self.log_url)
357 352 context = zmq.Context.instance()
358 353 lsock = context.socket(zmq.PUB)
359 354 lsock.connect(self.log_url)
360 355 handler = PUBHandler(lsock)
361 356 self.log.removeHandler(self._log_handler)
362 357 handler.root_topic = 'controller'
363 358 handler.setLevel(self.log_level)
364 359 self.log.addHandler(handler)
365 360 self._log_handler = handler
366 361 # #
367 362
368 363 def initialize(self, argv=None):
369 364 super(IPControllerApp, self).initialize(argv)
370 365 self.forward_logging()
371 366 self.init_hub()
372 367 self.init_schedulers()
373 368
374 369 def start(self):
375 370 # Start the subprocesses:
376 371 self.factory.start()
377 372 child_procs = []
378 373 for child in self.children:
379 374 child.start()
380 375 if isinstance(child, ProcessMonitoredQueue):
381 376 child_procs.append(child.launcher)
382 377 elif isinstance(child, Process):
383 378 child_procs.append(child)
384 379 if child_procs:
385 380 signal_children(child_procs)
386 381
387 382 self.write_pid_file(overwrite=True)
388 383
389 384 try:
390 385 self.factory.loop.start()
391 386 except KeyboardInterrupt:
392 387 self.log.critical("Interrupted, Exiting...\n")
393 388
394 389
395 390
396 391 def launch_new_instance():
397 392 """Create and run the IPython controller"""
398 393 app = IPControllerApp()
399 394 app.initialize()
400 395 app.start()
401 396
402 397
403 398 if __name__ == '__main__':
404 399 launch_new_instance()
@@ -1,277 +1,272 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import json
19 19 import os
20 20 import sys
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24
25 from IPython.core.newapplication import ProfileDir
25 26 from IPython.parallel.apps.clusterdir import (
26 ClusterApplication,
27 ClusterDir,
28 # ClusterDirConfigLoader
27 BaseParallelApplication,
29 28 )
30 29 from IPython.zmq.log import EnginePUBHandler
31 30
32 31 from IPython.config.configurable import Configurable
33 32 from IPython.parallel.streamsession import StreamSession
34 33 from IPython.parallel.engine.engine import EngineFactory
35 34 from IPython.parallel.engine.streamkernel import Kernel
36 35 from IPython.parallel.util import disambiguate_url
37 36
38 37 from IPython.utils.importstring import import_item
39 38 from IPython.utils.traitlets import Bool, Unicode, Dict, List
40 39
41 40
42 41 #-----------------------------------------------------------------------------
43 42 # Module level variables
44 43 #-----------------------------------------------------------------------------
45 44
46 45 #: The default config file name for this application
47 46 default_config_file_name = u'ipengine_config.py'
48 47
49 48 _description = """Start an IPython engine for parallel computing.
50 49
51 50 IPython engines run in parallel and perform computations on behalf of a client
52 51 and controller. A controller needs to be started before the engines. The
53 52 engine can be configured using command line options or using a cluster
54 53 directory. Cluster directories contain config, log and security files and are
55 54 usually located in your ipython directory and named as "cluster_<profile>".
56 See the `profile` and `cluster_dir` options for details.
55 See the `profile` and `profile_dir` options for details.
57 56 """
58 57
59 58
60 59 #-----------------------------------------------------------------------------
61 60 # MPI configuration
62 61 #-----------------------------------------------------------------------------
63 62
64 63 mpi4py_init = """from mpi4py import MPI as mpi
65 64 mpi.size = mpi.COMM_WORLD.Get_size()
66 65 mpi.rank = mpi.COMM_WORLD.Get_rank()
67 66 """
68 67
69 68
70 69 pytrilinos_init = """from PyTrilinos import Epetra
71 70 class SimpleStruct:
72 71 pass
73 72 mpi = SimpleStruct()
74 73 mpi.rank = 0
75 74 mpi.size = 0
76 75 """
77 76
78 77 class MPI(Configurable):
79 78 """Configurable for MPI initialization"""
80 79 use = Unicode('', config=True,
81 80 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
82 81 )
83 82
84 83 def _on_use_changed(self, old, new):
85 84 # load default init script if it's not set
86 85 if not self.init_script:
87 86 self.init_script = self.default_inits.get(new, '')
88 87
89 88 init_script = Unicode('', config=True,
90 89 help="Initialization code for MPI")
91 90
92 91 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
93 92 config=True)
94 93
95 94
96 95 #-----------------------------------------------------------------------------
97 96 # Main application
98 97 #-----------------------------------------------------------------------------
99 98
100 99
101 class IPEngineApp(ClusterApplication):
100 class IPEngineApp(BaseParallelApplication):
102 101
103 102 app_name = Unicode(u'ipengine')
104 103 description = Unicode(_description)
105 104 config_file_name = Unicode(default_config_file_name)
106 classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
105 classes = List([ProfileDir, StreamSession, EngineFactory, Kernel, MPI])
107 106
108 auto_create_cluster_dir = Bool(False,
109 help="whether to create the cluster_dir if it doesn't exist")
110
111 107 startup_script = Unicode(u'', config=True,
112 108 help='specify a script to be run at startup')
113 109 startup_command = Unicode('', config=True,
114 110 help='specify a command to be run at startup')
115 111
116 112 url_file = Unicode(u'', config=True,
117 113 help="""The full location of the file containing the connection information for
118 114 the controller. If this is not given, the file must be in the
119 115 security directory of the cluster directory. This location is
120 resolved using the `profile` or `cluster_dir` options.""",
116 resolved using the `profile` or `profile_dir` options.""",
121 117 )
122 118
123 119 url_file_name = Unicode(u'ipcontroller-engine.json')
124 120 log_url = Unicode('', config=True,
125 121 help="""The URL for the iploggerapp instance, for forwarding
126 122 logging to a central location.""")
127 123
128 124 aliases = Dict(dict(
129 config = 'IPEngineApp.config_file',
130 125 file = 'IPEngineApp.url_file',
131 126 c = 'IPEngineApp.startup_command',
132 127 s = 'IPEngineApp.startup_script',
133 128
134 129 ident = 'StreamSession.session',
135 130 user = 'StreamSession.username',
136 131 exec_key = 'StreamSession.keyfile',
137 132
138 133 url = 'EngineFactory.url',
139 134 ip = 'EngineFactory.ip',
140 135 transport = 'EngineFactory.transport',
141 136 port = 'EngineFactory.regport',
142 137 location = 'EngineFactory.location',
143 138
144 139 timeout = 'EngineFactory.timeout',
145 140
146 profile = "ClusterDir.profile",
147 cluster_dir = 'ClusterDir.location',
141 profile = "IPEngineApp.profile",
142 profile_dir = 'ProfileDir.location',
148 143
149 144 mpi = 'MPI.use',
150 145
151 146 log_level = 'IPEngineApp.log_level',
152 147 log_url = 'IPEngineApp.log_url'
153 148 ))
154 149
155 150 # def find_key_file(self):
156 151 # """Set the key file.
157 152 #
158 153 # Here we don't try to actually see if it exists for is valid as that
159 154 # is hadled by the connection logic.
160 155 # """
161 156 # config = self.master_config
162 157 # # Find the actual controller key file
163 158 # if not config.Global.key_file:
164 159 # try_this = os.path.join(
165 # config.Global.cluster_dir,
160 # config.Global.profile_dir,
166 161 # config.Global.security_dir,
167 162 # config.Global.key_file_name
168 163 # )
169 164 # config.Global.key_file = try_this
170 165
171 166 def find_url_file(self):
172 167 """Set the key file.
173 168
174 169 Here we don't try to actually see if it exists for is valid as that
175 170 is hadled by the connection logic.
176 171 """
177 172 config = self.config
178 173 # Find the actual controller key file
179 174 if not self.url_file:
180 175 self.url_file = os.path.join(
181 self.cluster_dir.security_dir,
176 self.profile_dir.security_dir,
182 177 self.url_file_name
183 178 )
184 179 def init_engine(self):
185 180 # This is the working dir by now.
186 181 sys.path.insert(0, '')
187 182 config = self.config
188 183 # print config
189 184 self.find_url_file()
190 185
191 186 # if os.path.exists(config.Global.key_file) and config.Global.secure:
192 187 # config.SessionFactory.exec_key = config.Global.key_file
193 188 if os.path.exists(self.url_file):
194 189 with open(self.url_file) as f:
195 190 d = json.loads(f.read())
196 191 for k,v in d.iteritems():
197 192 if isinstance(v, unicode):
198 193 d[k] = v.encode()
199 194 if d['exec_key']:
200 195 config.StreamSession.key = d['exec_key']
201 196 d['url'] = disambiguate_url(d['url'], d['location'])
202 197 config.EngineFactory.url = d['url']
203 198 config.EngineFactory.location = d['location']
204 199
205 200 try:
206 201 exec_lines = config.Kernel.exec_lines
207 202 except AttributeError:
208 203 config.Kernel.exec_lines = []
209 204 exec_lines = config.Kernel.exec_lines
210 205
211 206 if self.startup_script:
212 207 enc = sys.getfilesystemencoding() or 'utf8'
213 208 cmd="execfile(%r)"%self.startup_script.encode(enc)
214 209 exec_lines.append(cmd)
215 210 if self.startup_command:
216 211 exec_lines.append(self.startup_command)
217 212
218 213 # Create the underlying shell class and Engine
219 214 # shell_class = import_item(self.master_config.Global.shell_class)
220 215 # print self.config
221 216 try:
222 217 self.engine = EngineFactory(config=config, log=self.log)
223 218 except:
224 219 self.log.error("Couldn't start the Engine", exc_info=True)
225 220 self.exit(1)
226 221
227 222 def forward_logging(self):
228 223 if self.log_url:
229 224 self.log.info("Forwarding logging to %s"%self.log_url)
230 225 context = self.engine.context
231 226 lsock = context.socket(zmq.PUB)
232 227 lsock.connect(self.log_url)
233 228 self.log.removeHandler(self._log_handler)
234 229 handler = EnginePUBHandler(self.engine, lsock)
235 230 handler.setLevel(self.log_level)
236 231 self.log.addHandler(handler)
237 232 self._log_handler = handler
238 233 #
239 234 def init_mpi(self):
240 235 global mpi
241 236 self.mpi = MPI(config=self.config)
242 237
243 238 mpi_import_statement = self.mpi.init_script
244 239 if mpi_import_statement:
245 240 try:
246 241 self.log.info("Initializing MPI:")
247 242 self.log.info(mpi_import_statement)
248 243 exec mpi_import_statement in globals()
249 244 except:
250 245 mpi = None
251 246 else:
252 247 mpi = None
253 248
254 249 def initialize(self, argv=None):
255 250 super(IPEngineApp, self).initialize(argv)
256 251 self.init_mpi()
257 252 self.init_engine()
258 253 self.forward_logging()
259 254
260 255 def start(self):
261 256 self.engine.start()
262 257 try:
263 258 self.engine.loop.start()
264 259 except KeyboardInterrupt:
265 260 self.log.critical("Engine Interrupted, shutting down...\n")
266 261
267 262
268 263 def launch_new_instance():
269 264 """Create and run the IPython engine"""
270 265 app = IPEngineApp()
271 266 app.initialize()
272 267 app.start()
273 268
274 269
275 270 if __name__ == '__main__':
276 271 launch_new_instance()
277 272
@@ -1,97 +1,96 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 A simple IPython logger application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import sys
20 20
21 21 import zmq
22 22
23 from IPython.core.newapplication import ProfileDir
23 24 from IPython.utils.traitlets import Bool, Dict, Unicode
24 25
25 26 from IPython.parallel.apps.clusterdir import (
26 ClusterApplication,
27 ClusterDir,
27 BaseParallelApplication,
28 28 base_aliases
29 29 )
30 30 from IPython.parallel.apps.logwatcher import LogWatcher
31 31
32 32 #-----------------------------------------------------------------------------
33 33 # Module level variables
34 34 #-----------------------------------------------------------------------------
35 35
36 36 #: The default config file name for this application
37 37 default_config_file_name = u'iplogger_config.py'
38 38
39 39 _description = """Start an IPython logger for parallel computing.
40 40
41 41 IPython controllers and engines (and your own processes) can broadcast log messages
42 42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
43 43 logger can be configured using command line options or using a cluster
44 44 directory. Cluster directories contain config, log and security files and are
45 45 usually located in your ipython directory and named as "cluster_<profile>".
46 See the `profile` and `cluster_dir` options for details.
46 See the `profile` and `profile_dir` options for details.
47 47 """
48 48
49 49
50 50 #-----------------------------------------------------------------------------
51 51 # Main application
52 52 #-----------------------------------------------------------------------------
53 53 aliases = {}
54 54 aliases.update(base_aliases)
55 55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
56 56
57 class IPLoggerApp(ClusterApplication):
57 class IPLoggerApp(BaseParallelApplication):
58 58
59 59 name = u'iploggerz'
60 60 description = _description
61 61 config_file_name = Unicode(default_config_file_name)
62 auto_create_cluster_dir = Bool(False)
63 62
64 classes = [LogWatcher, ClusterDir]
63 classes = [LogWatcher, ProfileDir]
65 64 aliases = Dict(aliases)
66 65
67 66 def initialize(self, argv=None):
68 67 super(IPLoggerApp, self).initialize(argv)
69 68 self.init_watcher()
70 69
71 70 def init_watcher(self):
72 71 try:
73 72 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
74 73 except:
75 74 self.log.error("Couldn't start the LogWatcher", exc_info=True)
76 75 self.exit(1)
77 76 self.log.info("Listening for log messages on %r"%self.watcher.url)
78 77
79 78
80 79 def start(self):
81 80 self.watcher.start()
82 81 try:
83 82 self.watcher.loop.start()
84 83 except KeyboardInterrupt:
85 84 self.log.critical("Logging Interrupted, shutting down...\n")
86 85
87 86
88 87 def launch_new_instance():
89 88 """Create and run the IPython LogWatcher"""
90 89 app = IPLoggerApp()
91 90 app.initialize()
92 91 app.start()
93 92
94 93
95 94 if __name__ == '__main__':
96 95 launch_new_instance()
97 96
@@ -1,1070 +1,1070 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import logging
20 20 import os
21 21 import re
22 22 import stat
23 23
24 24 # signal imports, handling various platforms, versions
25 25
26 26 from signal import SIGINT, SIGTERM
27 27 try:
28 28 from signal import SIGKILL
29 29 except ImportError:
30 30 # Windows
31 31 SIGKILL=SIGTERM
32 32
33 33 try:
34 34 # Windows >= 2.7, 3.2
35 35 from signal import CTRL_C_EVENT as SIGINT
36 36 except ImportError:
37 37 pass
38 38
39 39 from subprocess import Popen, PIPE, STDOUT
40 40 try:
41 41 from subprocess import check_output
42 42 except ImportError:
43 43 # pre-2.7, define check_output with Popen
44 44 def check_output(*args, **kwargs):
45 45 kwargs.update(dict(stdout=PIPE))
46 46 p = Popen(*args, **kwargs)
47 47 out,err = p.communicate()
48 48 return out
49 49
50 50 from zmq.eventloop import ioloop
51 51
52 52 from IPython.external import Itpl
53 53 # from IPython.config.configurable import Configurable
54 54 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
55 55 from IPython.utils.path import get_ipython_module_path
56 56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
57 57
58 58 from IPython.parallel.factory import LoggingFactory
59 59
60 60 from .win32support import forward_read_events
61 61
62 62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
63 63
64 64 WINDOWS = os.name == 'nt'
65 65
66 66 #-----------------------------------------------------------------------------
67 67 # Paths to the kernel apps
68 68 #-----------------------------------------------------------------------------
69 69
70 70
71 71 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
72 72 'IPython.parallel.apps.ipclusterapp'
73 73 ))
74 74
75 75 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
76 76 'IPython.parallel.apps.ipengineapp'
77 77 ))
78 78
79 79 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
80 80 'IPython.parallel.apps.ipcontrollerapp'
81 81 ))
82 82
83 83 #-----------------------------------------------------------------------------
84 84 # Base launchers and errors
85 85 #-----------------------------------------------------------------------------
86 86
87 87
88 88 class LauncherError(Exception):
89 89 pass
90 90
91 91
92 92 class ProcessStateError(LauncherError):
93 93 pass
94 94
95 95
96 96 class UnknownStatus(LauncherError):
97 97 pass
98 98
99 99
100 100 class BaseLauncher(LoggingFactory):
101 101 """An asbtraction for starting, stopping and signaling a process."""
102 102
103 103 # In all of the launchers, the work_dir is where child processes will be
104 # run. This will usually be the cluster_dir, but may not be. any work_dir
104 # run. This will usually be the profile_dir, but may not be. any work_dir
105 105 # passed into the __init__ method will override the config value.
106 106 # This should not be used to set the work_dir for the actual engine
107 107 # and controller. Instead, use their own config files or the
108 108 # controller_args, engine_args attributes of the launchers to add
109 109 # the work_dir option.
110 110 work_dir = Unicode(u'.')
111 111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
112 112
113 113 start_data = Any()
114 114 stop_data = Any()
115 115
116 116 def _loop_default(self):
117 117 return ioloop.IOLoop.instance()
118 118
119 119 def __init__(self, work_dir=u'.', config=None, **kwargs):
120 120 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
121 121 self.state = 'before' # can be before, running, after
122 122 self.stop_callbacks = []
123 123 self.start_data = None
124 124 self.stop_data = None
125 125
126 126 @property
127 127 def args(self):
128 128 """A list of cmd and args that will be used to start the process.
129 129
130 130 This is what is passed to :func:`spawnProcess` and the first element
131 131 will be the process name.
132 132 """
133 133 return self.find_args()
134 134
135 135 def find_args(self):
136 136 """The ``.args`` property calls this to find the args list.
137 137
138 138 Subcommand should implement this to construct the cmd and args.
139 139 """
140 140 raise NotImplementedError('find_args must be implemented in a subclass')
141 141
142 142 @property
143 143 def arg_str(self):
144 144 """The string form of the program arguments."""
145 145 return ' '.join(self.args)
146 146
147 147 @property
148 148 def running(self):
149 149 """Am I running."""
150 150 if self.state == 'running':
151 151 return True
152 152 else:
153 153 return False
154 154
155 155 def start(self):
156 156 """Start the process.
157 157
158 158 This must return a deferred that fires with information about the
159 159 process starting (like a pid, job id, etc.).
160 160 """
161 161 raise NotImplementedError('start must be implemented in a subclass')
162 162
163 163 def stop(self):
164 164 """Stop the process and notify observers of stopping.
165 165
166 166 This must return a deferred that fires with information about the
167 167 processing stopping, like errors that occur while the process is
168 168 attempting to be shut down. This deferred won't fire when the process
169 169 actually stops. To observe the actual process stopping, see
170 170 :func:`observe_stop`.
171 171 """
172 172 raise NotImplementedError('stop must be implemented in a subclass')
173 173
174 174 def on_stop(self, f):
175 175 """Get a deferred that will fire when the process stops.
176 176
177 177 The deferred will fire with data that contains information about
178 178 the exit status of the process.
179 179 """
180 180 if self.state=='after':
181 181 return f(self.stop_data)
182 182 else:
183 183 self.stop_callbacks.append(f)
184 184
185 185 def notify_start(self, data):
186 186 """Call this to trigger startup actions.
187 187
188 188 This logs the process startup and sets the state to 'running'. It is
189 189 a pass-through so it can be used as a callback.
190 190 """
191 191
192 192 self.log.info('Process %r started: %r' % (self.args[0], data))
193 193 self.start_data = data
194 194 self.state = 'running'
195 195 return data
196 196
197 197 def notify_stop(self, data):
198 198 """Call this to trigger process stop actions.
199 199
200 200 This logs the process stopping and sets the state to 'after'. Call
201 201 this to trigger all the deferreds from :func:`observe_stop`."""
202 202
203 203 self.log.info('Process %r stopped: %r' % (self.args[0], data))
204 204 self.stop_data = data
205 205 self.state = 'after'
206 206 for i in range(len(self.stop_callbacks)):
207 207 d = self.stop_callbacks.pop()
208 208 d(data)
209 209 return data
210 210
211 211 def signal(self, sig):
212 212 """Signal the process.
213 213
214 214 Return a semi-meaningless deferred after signaling the process.
215 215
216 216 Parameters
217 217 ----------
218 218 sig : str or int
219 219 'KILL', 'INT', etc., or any signal number
220 220 """
221 221 raise NotImplementedError('signal must be implemented in a subclass')
222 222
223 223
224 224 #-----------------------------------------------------------------------------
225 225 # Local process launchers
226 226 #-----------------------------------------------------------------------------
227 227
228 228
229 229 class LocalProcessLauncher(BaseLauncher):
230 230 """Start and stop an external process in an asynchronous manner.
231 231
232 232 This will launch the external process with a working directory of
233 233 ``self.work_dir``.
234 234 """
235 235
236 236 # This is used to to construct self.args, which is passed to
237 237 # spawnProcess.
238 238 cmd_and_args = List([])
239 239 poll_frequency = Int(100) # in ms
240 240
241 241 def __init__(self, work_dir=u'.', config=None, **kwargs):
242 242 super(LocalProcessLauncher, self).__init__(
243 243 work_dir=work_dir, config=config, **kwargs
244 244 )
245 245 self.process = None
246 246 self.start_deferred = None
247 247 self.poller = None
248 248
249 249 def find_args(self):
250 250 return self.cmd_and_args
251 251
252 252 def start(self):
253 253 if self.state == 'before':
254 254 self.process = Popen(self.args,
255 255 stdout=PIPE,stderr=PIPE,stdin=PIPE,
256 256 env=os.environ,
257 257 cwd=self.work_dir
258 258 )
259 259 if WINDOWS:
260 260 self.stdout = forward_read_events(self.process.stdout)
261 261 self.stderr = forward_read_events(self.process.stderr)
262 262 else:
263 263 self.stdout = self.process.stdout.fileno()
264 264 self.stderr = self.process.stderr.fileno()
265 265 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
266 266 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
267 267 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
268 268 self.poller.start()
269 269 self.notify_start(self.process.pid)
270 270 else:
271 271 s = 'The process was already started and has state: %r' % self.state
272 272 raise ProcessStateError(s)
273 273
274 274 def stop(self):
275 275 return self.interrupt_then_kill()
276 276
277 277 def signal(self, sig):
278 278 if self.state == 'running':
279 279 if WINDOWS and sig != SIGINT:
280 280 # use Windows tree-kill for better child cleanup
281 281 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
282 282 else:
283 283 self.process.send_signal(sig)
284 284
285 285 def interrupt_then_kill(self, delay=2.0):
286 286 """Send INT, wait a delay and then send KILL."""
287 287 try:
288 288 self.signal(SIGINT)
289 289 except Exception:
290 290 self.log.debug("interrupt failed")
291 291 pass
292 292 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
293 293 self.killer.start()
294 294
295 295 # callbacks, etc:
296 296
297 297 def handle_stdout(self, fd, events):
298 298 if WINDOWS:
299 299 line = self.stdout.recv()
300 300 else:
301 301 line = self.process.stdout.readline()
302 302 # a stopped process will be readable but return empty strings
303 303 if line:
304 304 self.log.info(line[:-1])
305 305 else:
306 306 self.poll()
307 307
308 308 def handle_stderr(self, fd, events):
309 309 if WINDOWS:
310 310 line = self.stderr.recv()
311 311 else:
312 312 line = self.process.stderr.readline()
313 313 # a stopped process will be readable but return empty strings
314 314 if line:
315 315 self.log.error(line[:-1])
316 316 else:
317 317 self.poll()
318 318
319 319 def poll(self):
320 320 status = self.process.poll()
321 321 if status is not None:
322 322 self.poller.stop()
323 323 self.loop.remove_handler(self.stdout)
324 324 self.loop.remove_handler(self.stderr)
325 325 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
326 326 return status
327 327
328 328 class LocalControllerLauncher(LocalProcessLauncher):
329 329 """Launch a controller as a regular external process."""
330 330
331 331 controller_cmd = List(ipcontroller_cmd_argv, config=True,
332 332 help="""Popen command to launch ipcontroller.""")
333 333 # Command line arguments to ipcontroller.
334 334 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
335 335 help="""command-line args to pass to ipcontroller""")
336 336
337 337 def find_args(self):
338 338 return self.controller_cmd + self.controller_args
339 339
340 def start(self, cluster_dir):
341 """Start the controller by cluster_dir."""
342 self.controller_args.extend(['cluster_dir=%s'%cluster_dir])
343 self.cluster_dir = unicode(cluster_dir)
340 def start(self, profile_dir):
341 """Start the controller by profile_dir."""
342 self.controller_args.extend(['profile_dir=%s'%profile_dir])
343 self.profile_dir = unicode(profile_dir)
344 344 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
345 345 return super(LocalControllerLauncher, self).start()
346 346
347 347
348 348 class LocalEngineLauncher(LocalProcessLauncher):
349 349 """Launch a single engine as a regular externall process."""
350 350
351 351 engine_cmd = List(ipengine_cmd_argv, config=True,
352 352 help="""command to launch the Engine.""")
353 353 # Command line arguments for ipengine.
354 354 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
355 355 help="command-line arguments to pass to ipengine"
356 356 )
357 357
358 358 def find_args(self):
359 359 return self.engine_cmd + self.engine_args
360 360
361 def start(self, cluster_dir):
362 """Start the engine by cluster_dir."""
363 self.engine_args.extend(['cluster_dir=%s'%cluster_dir])
364 self.cluster_dir = unicode(cluster_dir)
361 def start(self, profile_dir):
362 """Start the engine by profile_dir."""
363 self.engine_args.extend(['profile_dir=%s'%profile_dir])
364 self.profile_dir = unicode(profile_dir)
365 365 return super(LocalEngineLauncher, self).start()
366 366
367 367
368 368 class LocalEngineSetLauncher(BaseLauncher):
369 369 """Launch a set of engines as regular external processes."""
370 370
371 371 # Command line arguments for ipengine.
372 372 engine_args = List(
373 373 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
374 374 help="command-line arguments to pass to ipengine"
375 375 )
376 376 # launcher class
377 377 launcher_class = LocalEngineLauncher
378 378
379 379 launchers = Dict()
380 380 stop_data = Dict()
381 381
382 382 def __init__(self, work_dir=u'.', config=None, **kwargs):
383 383 super(LocalEngineSetLauncher, self).__init__(
384 384 work_dir=work_dir, config=config, **kwargs
385 385 )
386 386 self.stop_data = {}
387 387
388 def start(self, n, cluster_dir):
389 """Start n engines by profile or cluster_dir."""
390 self.cluster_dir = unicode(cluster_dir)
388 def start(self, n, profile_dir):
389 """Start n engines by profile or profile_dir."""
390 self.profile_dir = unicode(profile_dir)
391 391 dlist = []
392 392 for i in range(n):
393 393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
394 394 # Copy the engine args over to each engine launcher.
395 395 el.engine_args = copy.deepcopy(self.engine_args)
396 396 el.on_stop(self._notice_engine_stopped)
397 d = el.start(cluster_dir)
397 d = el.start(profile_dir)
398 398 if i==0:
399 399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 400 self.launchers[i] = el
401 401 dlist.append(d)
402 402 self.notify_start(dlist)
403 403 # The consumeErrors here could be dangerous
404 404 # dfinal = gatherBoth(dlist, consumeErrors=True)
405 405 # dfinal.addCallback(self.notify_start)
406 406 return dlist
407 407
408 408 def find_args(self):
409 409 return ['engine set']
410 410
411 411 def signal(self, sig):
412 412 dlist = []
413 413 for el in self.launchers.itervalues():
414 414 d = el.signal(sig)
415 415 dlist.append(d)
416 416 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 417 return dlist
418 418
419 419 def interrupt_then_kill(self, delay=1.0):
420 420 dlist = []
421 421 for el in self.launchers.itervalues():
422 422 d = el.interrupt_then_kill(delay)
423 423 dlist.append(d)
424 424 # dfinal = gatherBoth(dlist, consumeErrors=True)
425 425 return dlist
426 426
427 427 def stop(self):
428 428 return self.interrupt_then_kill()
429 429
430 430 def _notice_engine_stopped(self, data):
431 431 pid = data['pid']
432 432 for idx,el in self.launchers.iteritems():
433 433 if el.process.pid == pid:
434 434 break
435 435 self.launchers.pop(idx)
436 436 self.stop_data[idx] = data
437 437 if not self.launchers:
438 438 self.notify_stop(self.stop_data)
439 439
440 440
441 441 #-----------------------------------------------------------------------------
442 442 # MPIExec launchers
443 443 #-----------------------------------------------------------------------------
444 444
445 445
446 446 class MPIExecLauncher(LocalProcessLauncher):
447 447 """Launch an external process using mpiexec."""
448 448
449 449 mpi_cmd = List(['mpiexec'], config=True,
450 450 help="The mpiexec command to use in starting the process."
451 451 )
452 452 mpi_args = List([], config=True,
453 453 help="The command line arguments to pass to mpiexec."
454 454 )
455 455 program = List(['date'], config=True,
456 456 help="The program to start via mpiexec.")
457 457 program_args = List([], config=True,
458 458 help="The command line argument to the program."
459 459 )
460 460 n = Int(1)
461 461
462 462 def find_args(self):
463 463 """Build self.args using all the fields."""
464 464 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
465 465 self.program + self.program_args
466 466
467 467 def start(self, n):
468 468 """Start n instances of the program using mpiexec."""
469 469 self.n = n
470 470 return super(MPIExecLauncher, self).start()
471 471
472 472
473 473 class MPIExecControllerLauncher(MPIExecLauncher):
474 474 """Launch a controller using mpiexec."""
475 475
476 476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 477 help="Popen command to launch the Contropper"
478 478 )
479 479 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
480 480 help="Command line arguments to pass to ipcontroller."
481 481 )
482 482 n = Int(1)
483 483
484 def start(self, cluster_dir):
485 """Start the controller by cluster_dir."""
486 self.controller_args.extend(['cluster_dir=%s'%cluster_dir])
487 self.cluster_dir = unicode(cluster_dir)
484 def start(self, profile_dir):
485 """Start the controller by profile_dir."""
486 self.controller_args.extend(['profile_dir=%s'%profile_dir])
487 self.profile_dir = unicode(profile_dir)
488 488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 489 return super(MPIExecControllerLauncher, self).start(1)
490 490
491 491 def find_args(self):
492 492 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
493 493 self.controller_cmd + self.controller_args
494 494
495 495
496 496 class MPIExecEngineSetLauncher(MPIExecLauncher):
497 497
498 498 program = List(ipengine_cmd_argv, config=True,
499 499 help="Popen command for ipengine"
500 500 )
501 501 program_args = List(
502 502 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
503 503 help="Command line arguments for ipengine."
504 504 )
505 505 n = Int(1)
506 506
507 def start(self, n, cluster_dir):
508 """Start n engines by profile or cluster_dir."""
509 self.program_args.extend(['cluster_dir=%s'%cluster_dir])
510 self.cluster_dir = unicode(cluster_dir)
507 def start(self, n, profile_dir):
508 """Start n engines by profile or profile_dir."""
509 self.program_args.extend(['profile_dir=%s'%profile_dir])
510 self.profile_dir = unicode(profile_dir)
511 511 self.n = n
512 512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 513 return super(MPIExecEngineSetLauncher, self).start(n)
514 514
515 515 #-----------------------------------------------------------------------------
516 516 # SSH launchers
517 517 #-----------------------------------------------------------------------------
518 518
519 519 # TODO: Get SSH Launcher working again.
520 520
521 521 class SSHLauncher(LocalProcessLauncher):
522 522 """A minimal launcher for ssh.
523 523
524 524 To be useful this will probably have to be extended to use the ``sshx``
525 525 idea for environment variables. There could be other things this needs
526 526 as well.
527 527 """
528 528
529 529 ssh_cmd = List(['ssh'], config=True,
530 530 help="command for starting ssh")
531 531 ssh_args = List(['-tt'], config=True,
532 532 help="args to pass to ssh")
533 533 program = List(['date'], config=True,
534 534 help="Program to launch via ssh")
535 535 program_args = List([], config=True,
536 536 help="args to pass to remote program")
537 537 hostname = Unicode('', config=True,
538 538 help="hostname on which to launch the program")
539 539 user = Unicode('', config=True,
540 540 help="username for ssh")
541 541 location = Unicode('', config=True,
542 542 help="user@hostname location for ssh in one setting")
543 543
544 544 def _hostname_changed(self, name, old, new):
545 545 if self.user:
546 546 self.location = u'%s@%s' % (self.user, new)
547 547 else:
548 548 self.location = new
549 549
550 550 def _user_changed(self, name, old, new):
551 551 self.location = u'%s@%s' % (new, self.hostname)
552 552
553 553 def find_args(self):
554 554 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 555 self.program + self.program_args
556 556
557 def start(self, cluster_dir, hostname=None, user=None):
558 self.cluster_dir = unicode(cluster_dir)
557 def start(self, profile_dir, hostname=None, user=None):
558 self.profile_dir = unicode(profile_dir)
559 559 if hostname is not None:
560 560 self.hostname = hostname
561 561 if user is not None:
562 562 self.user = user
563 563
564 564 return super(SSHLauncher, self).start()
565 565
566 566 def signal(self, sig):
567 567 if self.state == 'running':
568 568 # send escaped ssh connection-closer
569 569 self.process.stdin.write('~.')
570 570 self.process.stdin.flush()
571 571
572 572
573 573
574 574 class SSHControllerLauncher(SSHLauncher):
575 575
576 576 program = List(ipcontroller_cmd_argv, config=True,
577 577 help="remote ipcontroller command.")
578 578 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
579 579 help="Command line arguments to ipcontroller.")
580 580
581 581
582 582 class SSHEngineLauncher(SSHLauncher):
583 583 program = List(ipengine_cmd_argv, config=True,
584 584 help="remote ipengine command.")
585 585 # Command line arguments for ipengine.
586 586 program_args = List(
587 587 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
588 588 help="Command line arguments to ipengine."
589 589 )
590 590
591 591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
592 592 launcher_class = SSHEngineLauncher
593 593 engines = Dict(config=True,
594 594 help="""dict of engines to launch. This is a dict by hostname of ints,
595 595 corresponding to the number of engines to start on that host.""")
596 596
597 def start(self, n, cluster_dir):
598 """Start engines by profile or cluster_dir.
597 def start(self, n, profile_dir):
598 """Start engines by profile or profile_dir.
599 599 `n` is ignored, and the `engines` config property is used instead.
600 600 """
601 601
602 self.cluster_dir = unicode(cluster_dir)
602 self.profile_dir = unicode(profile_dir)
603 603 dlist = []
604 604 for host, n in self.engines.iteritems():
605 605 if isinstance(n, (tuple, list)):
606 606 n, args = n
607 607 else:
608 608 args = copy.deepcopy(self.engine_args)
609 609
610 610 if '@' in host:
611 611 user,host = host.split('@',1)
612 612 else:
613 613 user=None
614 614 for i in range(n):
615 615 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
616 616
617 617 # Copy the engine args over to each engine launcher.
618 618 i
619 619 el.program_args = args
620 620 el.on_stop(self._notice_engine_stopped)
621 d = el.start(cluster_dir, user=user, hostname=host)
621 d = el.start(profile_dir, user=user, hostname=host)
622 622 if i==0:
623 623 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
624 624 self.launchers[host+str(i)] = el
625 625 dlist.append(d)
626 626 self.notify_start(dlist)
627 627 return dlist
628 628
629 629
630 630
631 631 #-----------------------------------------------------------------------------
632 632 # Windows HPC Server 2008 scheduler launchers
633 633 #-----------------------------------------------------------------------------
634 634
635 635
636 636 # This is only used on Windows.
637 637 def find_job_cmd():
638 638 if WINDOWS:
639 639 try:
640 640 return find_cmd('job')
641 641 except (FindCmdError, ImportError):
642 642 # ImportError will be raised if win32api is not installed
643 643 return 'job'
644 644 else:
645 645 return 'job'
646 646
647 647
648 648 class WindowsHPCLauncher(BaseLauncher):
649 649
650 650 job_id_regexp = Unicode(r'\d+', config=True,
651 651 help="""A regular expression used to get the job id from the output of the
652 652 submit_command. """
653 653 )
654 654 job_file_name = Unicode(u'ipython_job.xml', config=True,
655 655 help="The filename of the instantiated job script.")
656 656 # The full path to the instantiated job script. This gets made dynamically
657 657 # by combining the work_dir with the job_file_name.
658 658 job_file = Unicode(u'')
659 659 scheduler = Unicode('', config=True,
660 660 help="The hostname of the scheduler to submit the job to.")
661 661 job_cmd = Unicode(find_job_cmd(), config=True,
662 662 help="The command for submitting jobs.")
663 663
664 664 def __init__(self, work_dir=u'.', config=None, **kwargs):
665 665 super(WindowsHPCLauncher, self).__init__(
666 666 work_dir=work_dir, config=config, **kwargs
667 667 )
668 668
669 669 @property
670 670 def job_file(self):
671 671 return os.path.join(self.work_dir, self.job_file_name)
672 672
673 673 def write_job_file(self, n):
674 674 raise NotImplementedError("Implement write_job_file in a subclass.")
675 675
676 676 def find_args(self):
677 677 return [u'job.exe']
678 678
679 679 def parse_job_id(self, output):
680 680 """Take the output of the submit command and return the job id."""
681 681 m = re.search(self.job_id_regexp, output)
682 682 if m is not None:
683 683 job_id = m.group()
684 684 else:
685 685 raise LauncherError("Job id couldn't be determined: %s" % output)
686 686 self.job_id = job_id
687 687 self.log.info('Job started with job id: %r' % job_id)
688 688 return job_id
689 689
690 690 def start(self, n):
691 691 """Start n copies of the process using the Win HPC job scheduler."""
692 692 self.write_job_file(n)
693 693 args = [
694 694 'submit',
695 695 '/jobfile:%s' % self.job_file,
696 696 '/scheduler:%s' % self.scheduler
697 697 ]
698 698 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
699 699 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
700 700 output = check_output([self.job_cmd]+args,
701 701 env=os.environ,
702 702 cwd=self.work_dir,
703 703 stderr=STDOUT
704 704 )
705 705 job_id = self.parse_job_id(output)
706 706 self.notify_start(job_id)
707 707 return job_id
708 708
709 709 def stop(self):
710 710 args = [
711 711 'cancel',
712 712 self.job_id,
713 713 '/scheduler:%s' % self.scheduler
714 714 ]
715 715 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
716 716 try:
717 717 output = check_output([self.job_cmd]+args,
718 718 env=os.environ,
719 719 cwd=self.work_dir,
720 720 stderr=STDOUT
721 721 )
722 722 except:
723 723 output = 'The job already appears to be stoppped: %r' % self.job_id
724 724 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
725 725 return output
726 726
727 727
728 728 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
729 729
730 730 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
731 731 help="WinHPC xml job file.")
732 732 extra_args = List([], config=False,
733 733 help="extra args to pass to ipcontroller")
734 734
735 735 def write_job_file(self, n):
736 736 job = IPControllerJob(config=self.config)
737 737
738 738 t = IPControllerTask(config=self.config)
739 739 # The tasks work directory is *not* the actual work directory of
740 740 # the controller. It is used as the base path for the stdout/stderr
741 741 # files that the scheduler redirects to.
742 t.work_directory = self.cluster_dir
743 # Add the cluster_dir and from self.start().
742 t.work_directory = self.profile_dir
743 # Add the profile_dir and from self.start().
744 744 t.controller_args.extend(self.extra_args)
745 745 job.add_task(t)
746 746
747 747 self.log.info("Writing job description file: %s" % self.job_file)
748 748 job.write(self.job_file)
749 749
750 750 @property
751 751 def job_file(self):
752 return os.path.join(self.cluster_dir, self.job_file_name)
752 return os.path.join(self.profile_dir, self.job_file_name)
753 753
754 def start(self, cluster_dir):
755 """Start the controller by cluster_dir."""
756 self.extra_args = ['cluster_dir=%s'%cluster_dir]
757 self.cluster_dir = unicode(cluster_dir)
754 def start(self, profile_dir):
755 """Start the controller by profile_dir."""
756 self.extra_args = ['profile_dir=%s'%profile_dir]
757 self.profile_dir = unicode(profile_dir)
758 758 return super(WindowsHPCControllerLauncher, self).start(1)
759 759
760 760
761 761 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
762 762
763 763 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
764 764 help="jobfile for ipengines job")
765 765 extra_args = List([], config=False,
766 766 help="extra args to pas to ipengine")
767 767
768 768 def write_job_file(self, n):
769 769 job = IPEngineSetJob(config=self.config)
770 770
771 771 for i in range(n):
772 772 t = IPEngineTask(config=self.config)
773 773 # The tasks work directory is *not* the actual work directory of
774 774 # the engine. It is used as the base path for the stdout/stderr
775 775 # files that the scheduler redirects to.
776 t.work_directory = self.cluster_dir
777 # Add the cluster_dir and from self.start().
776 t.work_directory = self.profile_dir
777 # Add the profile_dir and from self.start().
778 778 t.engine_args.extend(self.extra_args)
779 779 job.add_task(t)
780 780
781 781 self.log.info("Writing job description file: %s" % self.job_file)
782 782 job.write(self.job_file)
783 783
784 784 @property
785 785 def job_file(self):
786 return os.path.join(self.cluster_dir, self.job_file_name)
786 return os.path.join(self.profile_dir, self.job_file_name)
787 787
788 def start(self, n, cluster_dir):
789 """Start the controller by cluster_dir."""
790 self.extra_args = ['cluster_dir=%s'%cluster_dir]
791 self.cluster_dir = unicode(cluster_dir)
788 def start(self, n, profile_dir):
789 """Start the controller by profile_dir."""
790 self.extra_args = ['profile_dir=%s'%profile_dir]
791 self.profile_dir = unicode(profile_dir)
792 792 return super(WindowsHPCEngineSetLauncher, self).start(n)
793 793
794 794
795 795 #-----------------------------------------------------------------------------
796 796 # Batch (PBS) system launchers
797 797 #-----------------------------------------------------------------------------
798 798
799 799 class BatchSystemLauncher(BaseLauncher):
800 800 """Launch an external process using a batch system.
801 801
802 802 This class is designed to work with UNIX batch systems like PBS, LSF,
803 803 GridEngine, etc. The overall model is that there are different commands
804 804 like qsub, qdel, etc. that handle the starting and stopping of the process.
805 805
806 806 This class also has the notion of a batch script. The ``batch_template``
807 807 attribute can be set to a string that is a template for the batch script.
808 808 This template is instantiated using Itpl. Thus the template can use
809 809 ${n} fot the number of instances. Subclasses can add additional variables
810 810 to the template dict.
811 811 """
812 812
813 813 # Subclasses must fill these in. See PBSEngineSet
814 814 submit_command = List([''], config=True,
815 815 help="The name of the command line program used to submit jobs.")
816 816 delete_command = List([''], config=True,
817 817 help="The name of the command line program used to delete jobs.")
818 818 job_id_regexp = Unicode('', config=True,
819 819 help="""A regular expression used to get the job id from the output of the
820 820 submit_command.""")
821 821 batch_template = Unicode('', config=True,
822 822 help="The string that is the batch script template itself.")
823 823 batch_template_file = Unicode(u'', config=True,
824 824 help="The file that contains the batch template.")
825 825 batch_file_name = Unicode(u'batch_script', config=True,
826 826 help="The filename of the instantiated batch script.")
827 827 queue = Unicode(u'', config=True,
828 828 help="The PBS Queue.")
829 829
830 830 # not configurable, override in subclasses
831 831 # PBS Job Array regex
832 832 job_array_regexp = Unicode('')
833 833 job_array_template = Unicode('')
834 834 # PBS Queue regex
835 835 queue_regexp = Unicode('')
836 836 queue_template = Unicode('')
837 837 # The default batch template, override in subclasses
838 838 default_template = Unicode('')
839 839 # The full path to the instantiated batch script.
840 840 batch_file = Unicode(u'')
841 841 # the format dict used with batch_template:
842 842 context = Dict()
843 843
844 844
845 845 def find_args(self):
846 846 return self.submit_command + [self.batch_file]
847 847
848 848 def __init__(self, work_dir=u'.', config=None, **kwargs):
849 849 super(BatchSystemLauncher, self).__init__(
850 850 work_dir=work_dir, config=config, **kwargs
851 851 )
852 852 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
853 853
854 854 def parse_job_id(self, output):
855 855 """Take the output of the submit command and return the job id."""
856 856 m = re.search(self.job_id_regexp, output)
857 857 if m is not None:
858 858 job_id = m.group()
859 859 else:
860 860 raise LauncherError("Job id couldn't be determined: %s" % output)
861 861 self.job_id = job_id
862 862 self.log.info('Job submitted with job id: %r' % job_id)
863 863 return job_id
864 864
865 865 def write_batch_script(self, n):
866 866 """Instantiate and write the batch script to the work_dir."""
867 867 self.context['n'] = n
868 868 self.context['queue'] = self.queue
869 869 print self.context
870 870 # first priority is batch_template if set
871 871 if self.batch_template_file and not self.batch_template:
872 872 # second priority is batch_template_file
873 873 with open(self.batch_template_file) as f:
874 874 self.batch_template = f.read()
875 875 if not self.batch_template:
876 876 # third (last) priority is default_template
877 877 self.batch_template = self.default_template
878 878
879 879 regex = re.compile(self.job_array_regexp)
880 880 # print regex.search(self.batch_template)
881 881 if not regex.search(self.batch_template):
882 882 self.log.info("adding job array settings to batch script")
883 883 firstline, rest = self.batch_template.split('\n',1)
884 884 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
885 885
886 886 regex = re.compile(self.queue_regexp)
887 887 # print regex.search(self.batch_template)
888 888 if self.queue and not regex.search(self.batch_template):
889 889 self.log.info("adding PBS queue settings to batch script")
890 890 firstline, rest = self.batch_template.split('\n',1)
891 891 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
892 892
893 893 script_as_string = Itpl.itplns(self.batch_template, self.context)
894 894 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
895 895
896 896 with open(self.batch_file, 'w') as f:
897 897 f.write(script_as_string)
898 898 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
899 899
900 def start(self, n, cluster_dir):
900 def start(self, n, profile_dir):
901 901 """Start n copies of the process using a batch system."""
902 # Here we save profile and cluster_dir in the context so they
902 # Here we save profile and profile_dir in the context so they
903 903 # can be used in the batch script template as ${profile} and
904 # ${cluster_dir}
905 self.context['cluster_dir'] = cluster_dir
906 self.cluster_dir = unicode(cluster_dir)
904 # ${profile_dir}
905 self.context['profile_dir'] = profile_dir
906 self.profile_dir = unicode(profile_dir)
907 907 self.write_batch_script(n)
908 908 output = check_output(self.args, env=os.environ)
909 909
910 910 job_id = self.parse_job_id(output)
911 911 self.notify_start(job_id)
912 912 return job_id
913 913
914 914 def stop(self):
915 915 output = check_output(self.delete_command+[self.job_id], env=os.environ)
916 916 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
917 917 return output
918 918
919 919
920 920 class PBSLauncher(BatchSystemLauncher):
921 921 """A BatchSystemLauncher subclass for PBS."""
922 922
923 923 submit_command = List(['qsub'], config=True,
924 924 help="The PBS submit command ['qsub']")
925 925 delete_command = List(['qdel'], config=True,
926 926 help="The PBS delete command ['qsub']")
927 927 job_id_regexp = Unicode(r'\d+', config=True,
928 928 help="Regular expresion for identifying the job ID [r'\d+']")
929 929
930 930 batch_file = Unicode(u'')
931 931 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
932 932 job_array_template = Unicode('#PBS -t 1-$n')
933 933 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
934 934 queue_template = Unicode('#PBS -q $queue')
935 935
936 936
937 937 class PBSControllerLauncher(PBSLauncher):
938 938 """Launch a controller using PBS."""
939 939
940 940 batch_file_name = Unicode(u'pbs_controller', config=True,
941 941 help="batch file name for the controller job.")
942 942 default_template= Unicode("""#!/bin/sh
943 943 #PBS -V
944 944 #PBS -N ipcontroller
945 %s --log-to-file cluster_dir $cluster_dir
945 %s --log-to-file profile_dir $profile_dir
946 946 """%(' '.join(ipcontroller_cmd_argv)))
947 947
948 def start(self, cluster_dir):
949 """Start the controller by profile or cluster_dir."""
948 def start(self, profile_dir):
949 """Start the controller by profile or profile_dir."""
950 950 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
951 return super(PBSControllerLauncher, self).start(1, cluster_dir)
951 return super(PBSControllerLauncher, self).start(1, profile_dir)
952 952
953 953
954 954 class PBSEngineSetLauncher(PBSLauncher):
955 955 """Launch Engines using PBS"""
956 956 batch_file_name = Unicode(u'pbs_engines', config=True,
957 957 help="batch file name for the engine(s) job.")
958 958 default_template= Unicode(u"""#!/bin/sh
959 959 #PBS -V
960 960 #PBS -N ipengine
961 %s cluster_dir $cluster_dir
961 %s profile_dir $profile_dir
962 962 """%(' '.join(ipengine_cmd_argv)))
963 963
964 def start(self, n, cluster_dir):
965 """Start n engines by profile or cluster_dir."""
964 def start(self, n, profile_dir):
965 """Start n engines by profile or profile_dir."""
966 966 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
967 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
967 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
968 968
969 969 #SGE is very similar to PBS
970 970
971 971 class SGELauncher(PBSLauncher):
972 972 """Sun GridEngine is a PBS clone with slightly different syntax"""
973 973 job_array_regexp = Unicode('#$$\W+-t\W+[\w\d\-\$]+')
974 974 job_array_template = Unicode('#$$ -t 1-$n')
975 975 queue_regexp = Unicode('#$$\W+-q\W+\$?\w+')
976 976 queue_template = Unicode('#$$ -q $queue')
977 977
978 978 class SGEControllerLauncher(SGELauncher):
979 979 """Launch a controller using SGE."""
980 980
981 981 batch_file_name = Unicode(u'sge_controller', config=True,
982 982 help="batch file name for the ipontroller job.")
983 983 default_template= Unicode(u"""#$$ -V
984 984 #$$ -S /bin/sh
985 985 #$$ -N ipcontroller
986 %s --log-to-file cluster_dir=$cluster_dir
986 %s --log-to-file profile_dir=$profile_dir
987 987 """%(' '.join(ipcontroller_cmd_argv)))
988 988
989 def start(self, cluster_dir):
990 """Start the controller by profile or cluster_dir."""
989 def start(self, profile_dir):
990 """Start the controller by profile or profile_dir."""
991 991 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
992 return super(PBSControllerLauncher, self).start(1, cluster_dir)
992 return super(PBSControllerLauncher, self).start(1, profile_dir)
993 993
994 994 class SGEEngineSetLauncher(SGELauncher):
995 995 """Launch Engines with SGE"""
996 996 batch_file_name = Unicode(u'sge_engines', config=True,
997 997 help="batch file name for the engine(s) job.")
998 998 default_template = Unicode("""#$$ -V
999 999 #$$ -S /bin/sh
1000 1000 #$$ -N ipengine
1001 %s cluster_dir=$cluster_dir
1001 %s profile_dir=$profile_dir
1002 1002 """%(' '.join(ipengine_cmd_argv)))
1003 1003
1004 def start(self, n, cluster_dir):
1005 """Start n engines by profile or cluster_dir."""
1004 def start(self, n, profile_dir):
1005 """Start n engines by profile or profile_dir."""
1006 1006 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1007 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
1007 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1008 1008
1009 1009
1010 1010 #-----------------------------------------------------------------------------
1011 1011 # A launcher for ipcluster itself!
1012 1012 #-----------------------------------------------------------------------------
1013 1013
1014 1014
1015 1015 class IPClusterLauncher(LocalProcessLauncher):
1016 1016 """Launch the ipcluster program in an external process."""
1017 1017
1018 1018 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1019 1019 help="Popen command for ipcluster")
1020 1020 ipcluster_args = List(
1021 1021 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1022 1022 help="Command line arguments to pass to ipcluster.")
1023 1023 ipcluster_subcommand = Unicode('start')
1024 1024 ipcluster_n = Int(2)
1025 1025
1026 1026 def find_args(self):
1027 1027 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1028 1028 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1029 1029
1030 1030 def start(self):
1031 1031 self.log.info("Starting ipcluster: %r" % self.args)
1032 1032 return super(IPClusterLauncher, self).start()
1033 1033
1034 1034 #-----------------------------------------------------------------------------
1035 1035 # Collections of launchers
1036 1036 #-----------------------------------------------------------------------------
1037 1037
1038 1038 local_launchers = [
1039 1039 LocalControllerLauncher,
1040 1040 LocalEngineLauncher,
1041 1041 LocalEngineSetLauncher,
1042 1042 ]
1043 1043 mpi_launchers = [
1044 1044 MPIExecLauncher,
1045 1045 MPIExecControllerLauncher,
1046 1046 MPIExecEngineSetLauncher,
1047 1047 ]
1048 1048 ssh_launchers = [
1049 1049 SSHLauncher,
1050 1050 SSHControllerLauncher,
1051 1051 SSHEngineLauncher,
1052 1052 SSHEngineSetLauncher,
1053 1053 ]
1054 1054 winhpc_launchers = [
1055 1055 WindowsHPCLauncher,
1056 1056 WindowsHPCControllerLauncher,
1057 1057 WindowsHPCEngineSetLauncher,
1058 1058 ]
1059 1059 pbs_launchers = [
1060 1060 PBSLauncher,
1061 1061 PBSControllerLauncher,
1062 1062 PBSEngineSetLauncher,
1063 1063 ]
1064 1064 sge_launchers = [
1065 1065 SGELauncher,
1066 1066 SGEControllerLauncher,
1067 1067 SGEEngineSetLauncher,
1068 1068 ]
1069 1069 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1070 1070 + pbs_launchers + sge_launchers No newline at end of file
@@ -1,1356 +1,1356 b''
1 1 """A semi-synchronous Client for the ZMQ cluster"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import os
14 14 import json
15 15 import time
16 16 import warnings
17 17 from datetime import datetime
18 18 from getpass import getpass
19 19 from pprint import pprint
20 20
21 21 pjoin = os.path.join
22 22
23 23 import zmq
24 24 # from zmq.eventloop import ioloop, zmqstream
25 25
26 26 from IPython.utils.path import get_ipython_dir
27 27 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
28 28 Dict, List, Bool, Set)
29 29 from IPython.external.decorator import decorator
30 30 from IPython.external.ssh import tunnel
31 31
32 32 from IPython.parallel import error
33 33 from IPython.parallel import streamsession as ss
34 34 from IPython.parallel import util
35 35
36 36 from .asyncresult import AsyncResult, AsyncHubResult
37 from IPython.parallel.apps.clusterdir import ClusterDir, ClusterDirError
37 from IPython.core.newapplication import ProfileDir, ProfileDirError
38 38 from .view import DirectView, LoadBalancedView
39 39
40 40 #--------------------------------------------------------------------------
41 41 # Decorators for Client methods
42 42 #--------------------------------------------------------------------------
43 43
44 44 @decorator
45 45 def spin_first(f, self, *args, **kwargs):
46 46 """Call spin() to sync state prior to calling the method."""
47 47 self.spin()
48 48 return f(self, *args, **kwargs)
49 49
50 50
51 51 #--------------------------------------------------------------------------
52 52 # Classes
53 53 #--------------------------------------------------------------------------
54 54
55 55 class Metadata(dict):
56 56 """Subclass of dict for initializing metadata values.
57 57
58 58 Attribute access works on keys.
59 59
60 60 These objects have a strict set of keys - errors will raise if you try
61 61 to add new keys.
62 62 """
63 63 def __init__(self, *args, **kwargs):
64 64 dict.__init__(self)
65 65 md = {'msg_id' : None,
66 66 'submitted' : None,
67 67 'started' : None,
68 68 'completed' : None,
69 69 'received' : None,
70 70 'engine_uuid' : None,
71 71 'engine_id' : None,
72 72 'follow' : None,
73 73 'after' : None,
74 74 'status' : None,
75 75
76 76 'pyin' : None,
77 77 'pyout' : None,
78 78 'pyerr' : None,
79 79 'stdout' : '',
80 80 'stderr' : '',
81 81 }
82 82 self.update(md)
83 83 self.update(dict(*args, **kwargs))
84 84
85 85 def __getattr__(self, key):
86 86 """getattr aliased to getitem"""
87 87 if key in self.iterkeys():
88 88 return self[key]
89 89 else:
90 90 raise AttributeError(key)
91 91
92 92 def __setattr__(self, key, value):
93 93 """setattr aliased to setitem, with strict"""
94 94 if key in self.iterkeys():
95 95 self[key] = value
96 96 else:
97 97 raise AttributeError(key)
98 98
99 99 def __setitem__(self, key, value):
100 100 """strict static key enforcement"""
101 101 if key in self.iterkeys():
102 102 dict.__setitem__(self, key, value)
103 103 else:
104 104 raise KeyError(key)
105 105
106 106
107 107 class Client(HasTraits):
108 108 """A semi-synchronous client to the IPython ZMQ cluster
109 109
110 110 Parameters
111 111 ----------
112 112
113 113 url_or_file : bytes; zmq url or path to ipcontroller-client.json
114 114 Connection information for the Hub's registration. If a json connector
115 115 file is given, then likely no further configuration is necessary.
116 116 [Default: use profile]
117 117 profile : bytes
118 118 The name of the Cluster profile to be used to find connector information.
119 119 [Default: 'default']
120 120 context : zmq.Context
121 121 Pass an existing zmq.Context instance, otherwise the client will create its own.
122 122 username : bytes
123 123 set username to be passed to the Session object
124 124 debug : bool
125 125 flag for lots of message printing for debug purposes
126 126
127 127 #-------------- ssh related args ----------------
128 128 # These are args for configuring the ssh tunnel to be used
129 129 # credentials are used to forward connections over ssh to the Controller
130 130 # Note that the ip given in `addr` needs to be relative to sshserver
131 131 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
132 132 # and set sshserver as the same machine the Controller is on. However,
133 133 # the only requirement is that sshserver is able to see the Controller
134 134 # (i.e. is within the same trusted network).
135 135
136 136 sshserver : str
137 137 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
138 138 If keyfile or password is specified, and this is not, it will default to
139 139 the ip given in addr.
140 140 sshkey : str; path to public ssh key file
141 141 This specifies a key to be used in ssh login, default None.
142 142 Regular default ssh keys will be used without specifying this argument.
143 143 password : str
144 144 Your ssh password to sshserver. Note that if this is left None,
145 145 you will be prompted for it if passwordless key based login is unavailable.
146 146 paramiko : bool
147 147 flag for whether to use paramiko instead of shell ssh for tunneling.
148 148 [default: True on win32, False else]
149 149
150 150 ------- exec authentication args -------
151 151 If even localhost is untrusted, you can have some protection against
152 152 unauthorized execution by using a key. Messages are still sent
153 153 as cleartext, so if someone can snoop your loopback traffic this will
154 154 not help against malicious attacks.
155 155
156 156 exec_key : str
157 157 an authentication key or file containing a key
158 158 default: None
159 159
160 160
161 161 Attributes
162 162 ----------
163 163
164 164 ids : list of int engine IDs
165 165 requesting the ids attribute always synchronizes
166 166 the registration state. To request ids without synchronization,
167 167 use semi-private _ids attributes.
168 168
169 169 history : list of msg_ids
170 170 a list of msg_ids, keeping track of all the execution
171 171 messages you have submitted in order.
172 172
173 173 outstanding : set of msg_ids
174 174 a set of msg_ids that have been submitted, but whose
175 175 results have not yet been received.
176 176
177 177 results : dict
178 178 a dict of all our results, keyed by msg_id
179 179
180 180 block : bool
181 181 determines default behavior when block not specified
182 182 in execution methods
183 183
184 184 Methods
185 185 -------
186 186
187 187 spin
188 188 flushes incoming results and registration state changes
189 189 control methods spin, and requesting `ids` also ensures up to date
190 190
191 191 wait
192 192 wait on one or more msg_ids
193 193
194 194 execution methods
195 195 apply
196 196 legacy: execute, run
197 197
198 198 data movement
199 199 push, pull, scatter, gather
200 200
201 201 query methods
202 202 queue_status, get_result, purge, result_status
203 203
204 204 control methods
205 205 abort, shutdown
206 206
207 207 """
208 208
209 209
210 210 block = Bool(False)
211 211 outstanding = Set()
212 212 results = Instance('collections.defaultdict', (dict,))
213 213 metadata = Instance('collections.defaultdict', (Metadata,))
214 214 history = List()
215 215 debug = Bool(False)
216 216 profile=Unicode('default')
217 217
218 218 _outstanding_dict = Instance('collections.defaultdict', (set,))
219 219 _ids = List()
220 220 _connected=Bool(False)
221 221 _ssh=Bool(False)
222 222 _context = Instance('zmq.Context')
223 223 _config = Dict()
224 224 _engines=Instance(util.ReverseDict, (), {})
225 225 # _hub_socket=Instance('zmq.Socket')
226 226 _query_socket=Instance('zmq.Socket')
227 227 _control_socket=Instance('zmq.Socket')
228 228 _iopub_socket=Instance('zmq.Socket')
229 229 _notification_socket=Instance('zmq.Socket')
230 230 _mux_socket=Instance('zmq.Socket')
231 231 _task_socket=Instance('zmq.Socket')
232 232 _task_scheme=Unicode()
233 233 _closed = False
234 234 _ignored_control_replies=Int(0)
235 235 _ignored_hub_replies=Int(0)
236 236
237 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
237 def __init__(self, url_or_file=None, profile='default', profile_dir=None, ipython_dir=None,
238 238 context=None, username=None, debug=False, exec_key=None,
239 239 sshserver=None, sshkey=None, password=None, paramiko=None,
240 240 timeout=10
241 241 ):
242 242 super(Client, self).__init__(debug=debug, profile=profile)
243 243 if context is None:
244 244 context = zmq.Context.instance()
245 245 self._context = context
246 246
247 247
248 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
248 self._setup_profile_dir(profile, profile_dir, ipython_dir)
249 249 if self._cd is not None:
250 250 if url_or_file is None:
251 251 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
252 252 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
253 253 " Please specify at least one of url_or_file or profile."
254 254
255 255 try:
256 256 util.validate_url(url_or_file)
257 257 except AssertionError:
258 258 if not os.path.exists(url_or_file):
259 259 if self._cd:
260 260 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
261 261 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
262 262 with open(url_or_file) as f:
263 263 cfg = json.loads(f.read())
264 264 else:
265 265 cfg = {'url':url_or_file}
266 266
267 267 # sync defaults from args, json:
268 268 if sshserver:
269 269 cfg['ssh'] = sshserver
270 270 if exec_key:
271 271 cfg['exec_key'] = exec_key
272 272 exec_key = cfg['exec_key']
273 273 sshserver=cfg['ssh']
274 274 url = cfg['url']
275 275 location = cfg.setdefault('location', None)
276 276 cfg['url'] = util.disambiguate_url(cfg['url'], location)
277 277 url = cfg['url']
278 278
279 279 self._config = cfg
280 280
281 281 self._ssh = bool(sshserver or sshkey or password)
282 282 if self._ssh and sshserver is None:
283 283 # default to ssh via localhost
284 284 sshserver = url.split('://')[1].split(':')[0]
285 285 if self._ssh and password is None:
286 286 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
287 287 password=False
288 288 else:
289 289 password = getpass("SSH Password for %s: "%sshserver)
290 290 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
291 291 if exec_key is not None and os.path.isfile(exec_key):
292 292 arg = 'keyfile'
293 293 else:
294 294 arg = 'key'
295 295 key_arg = {arg:exec_key}
296 296 if username is None:
297 297 self.session = ss.StreamSession(**key_arg)
298 298 else:
299 299 self.session = ss.StreamSession(username=username, **key_arg)
300 300 self._query_socket = self._context.socket(zmq.XREQ)
301 301 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
302 302 if self._ssh:
303 303 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
304 304 else:
305 305 self._query_socket.connect(url)
306 306
307 307 self.session.debug = self.debug
308 308
309 309 self._notification_handlers = {'registration_notification' : self._register_engine,
310 310 'unregistration_notification' : self._unregister_engine,
311 311 'shutdown_notification' : lambda msg: self.close(),
312 312 }
313 313 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
314 314 'apply_reply' : self._handle_apply_reply}
315 315 self._connect(sshserver, ssh_kwargs, timeout)
316 316
317 317 def __del__(self):
318 318 """cleanup sockets, but _not_ context."""
319 319 self.close()
320 320
321 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
321 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
322 322 if ipython_dir is None:
323 323 ipython_dir = get_ipython_dir()
324 if cluster_dir is not None:
324 if profile_dir is not None:
325 325 try:
326 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
326 self._cd = ProfileDir.find_profile_dir(profile_dir)
327 327 return
328 except ClusterDirError:
328 except ProfileDirError:
329 329 pass
330 330 elif profile is not None:
331 331 try:
332 self._cd = ClusterDir.find_cluster_dir_by_profile(
332 self._cd = ProfileDir.find_profile_dir_by_name(
333 333 ipython_dir, profile)
334 334 return
335 except ClusterDirError:
335 except ProfileDirError:
336 336 pass
337 337 self._cd = None
338 338
339 339 def _update_engines(self, engines):
340 340 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
341 341 for k,v in engines.iteritems():
342 342 eid = int(k)
343 343 self._engines[eid] = bytes(v) # force not unicode
344 344 self._ids.append(eid)
345 345 self._ids = sorted(self._ids)
346 346 if sorted(self._engines.keys()) != range(len(self._engines)) and \
347 347 self._task_scheme == 'pure' and self._task_socket:
348 348 self._stop_scheduling_tasks()
349 349
350 350 def _stop_scheduling_tasks(self):
351 351 """Stop scheduling tasks because an engine has been unregistered
352 352 from a pure ZMQ scheduler.
353 353 """
354 354 self._task_socket.close()
355 355 self._task_socket = None
356 356 msg = "An engine has been unregistered, and we are using pure " +\
357 357 "ZMQ task scheduling. Task farming will be disabled."
358 358 if self.outstanding:
359 359 msg += " If you were running tasks when this happened, " +\
360 360 "some `outstanding` msg_ids may never resolve."
361 361 warnings.warn(msg, RuntimeWarning)
362 362
363 363 def _build_targets(self, targets):
364 364 """Turn valid target IDs or 'all' into two lists:
365 365 (int_ids, uuids).
366 366 """
367 367 if not self._ids:
368 368 # flush notification socket if no engines yet, just in case
369 369 if not self.ids:
370 370 raise error.NoEnginesRegistered("Can't build targets without any engines")
371 371
372 372 if targets is None:
373 373 targets = self._ids
374 374 elif isinstance(targets, str):
375 375 if targets.lower() == 'all':
376 376 targets = self._ids
377 377 else:
378 378 raise TypeError("%r not valid str target, must be 'all'"%(targets))
379 379 elif isinstance(targets, int):
380 380 if targets < 0:
381 381 targets = self.ids[targets]
382 382 if targets not in self._ids:
383 383 raise IndexError("No such engine: %i"%targets)
384 384 targets = [targets]
385 385
386 386 if isinstance(targets, slice):
387 387 indices = range(len(self._ids))[targets]
388 388 ids = self.ids
389 389 targets = [ ids[i] for i in indices ]
390 390
391 391 if not isinstance(targets, (tuple, list, xrange)):
392 392 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
393 393
394 394 return [self._engines[t] for t in targets], list(targets)
395 395
396 396 def _connect(self, sshserver, ssh_kwargs, timeout):
397 397 """setup all our socket connections to the cluster. This is called from
398 398 __init__."""
399 399
400 400 # Maybe allow reconnecting?
401 401 if self._connected:
402 402 return
403 403 self._connected=True
404 404
405 405 def connect_socket(s, url):
406 406 url = util.disambiguate_url(url, self._config['location'])
407 407 if self._ssh:
408 408 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
409 409 else:
410 410 return s.connect(url)
411 411
412 412 self.session.send(self._query_socket, 'connection_request')
413 413 r,w,x = zmq.select([self._query_socket],[],[], timeout)
414 414 if not r:
415 415 raise error.TimeoutError("Hub connection request timed out")
416 416 idents,msg = self.session.recv(self._query_socket,mode=0)
417 417 if self.debug:
418 418 pprint(msg)
419 419 msg = ss.Message(msg)
420 420 content = msg.content
421 421 self._config['registration'] = dict(content)
422 422 if content.status == 'ok':
423 423 if content.mux:
424 424 self._mux_socket = self._context.socket(zmq.XREQ)
425 425 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
426 426 connect_socket(self._mux_socket, content.mux)
427 427 if content.task:
428 428 self._task_scheme, task_addr = content.task
429 429 self._task_socket = self._context.socket(zmq.XREQ)
430 430 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
431 431 connect_socket(self._task_socket, task_addr)
432 432 if content.notification:
433 433 self._notification_socket = self._context.socket(zmq.SUB)
434 434 connect_socket(self._notification_socket, content.notification)
435 435 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
436 436 # if content.query:
437 437 # self._query_socket = self._context.socket(zmq.XREQ)
438 438 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
439 439 # connect_socket(self._query_socket, content.query)
440 440 if content.control:
441 441 self._control_socket = self._context.socket(zmq.XREQ)
442 442 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
443 443 connect_socket(self._control_socket, content.control)
444 444 if content.iopub:
445 445 self._iopub_socket = self._context.socket(zmq.SUB)
446 446 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
447 447 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
448 448 connect_socket(self._iopub_socket, content.iopub)
449 449 self._update_engines(dict(content.engines))
450 450 else:
451 451 self._connected = False
452 452 raise Exception("Failed to connect!")
453 453
454 454 #--------------------------------------------------------------------------
455 455 # handlers and callbacks for incoming messages
456 456 #--------------------------------------------------------------------------
457 457
458 458 def _unwrap_exception(self, content):
459 459 """unwrap exception, and remap engine_id to int."""
460 460 e = error.unwrap_exception(content)
461 461 # print e.traceback
462 462 if e.engine_info:
463 463 e_uuid = e.engine_info['engine_uuid']
464 464 eid = self._engines[e_uuid]
465 465 e.engine_info['engine_id'] = eid
466 466 return e
467 467
468 468 def _extract_metadata(self, header, parent, content):
469 469 md = {'msg_id' : parent['msg_id'],
470 470 'received' : datetime.now(),
471 471 'engine_uuid' : header.get('engine', None),
472 472 'follow' : parent.get('follow', []),
473 473 'after' : parent.get('after', []),
474 474 'status' : content['status'],
475 475 }
476 476
477 477 if md['engine_uuid'] is not None:
478 478 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
479 479
480 480 if 'date' in parent:
481 481 md['submitted'] = datetime.strptime(parent['date'], util.ISO8601)
482 482 if 'started' in header:
483 483 md['started'] = datetime.strptime(header['started'], util.ISO8601)
484 484 if 'date' in header:
485 485 md['completed'] = datetime.strptime(header['date'], util.ISO8601)
486 486 return md
487 487
488 488 def _register_engine(self, msg):
489 489 """Register a new engine, and update our connection info."""
490 490 content = msg['content']
491 491 eid = content['id']
492 492 d = {eid : content['queue']}
493 493 self._update_engines(d)
494 494
495 495 def _unregister_engine(self, msg):
496 496 """Unregister an engine that has died."""
497 497 content = msg['content']
498 498 eid = int(content['id'])
499 499 if eid in self._ids:
500 500 self._ids.remove(eid)
501 501 uuid = self._engines.pop(eid)
502 502
503 503 self._handle_stranded_msgs(eid, uuid)
504 504
505 505 if self._task_socket and self._task_scheme == 'pure':
506 506 self._stop_scheduling_tasks()
507 507
508 508 def _handle_stranded_msgs(self, eid, uuid):
509 509 """Handle messages known to be on an engine when the engine unregisters.
510 510
511 511 It is possible that this will fire prematurely - that is, an engine will
512 512 go down after completing a result, and the client will be notified
513 513 of the unregistration and later receive the successful result.
514 514 """
515 515
516 516 outstanding = self._outstanding_dict[uuid]
517 517
518 518 for msg_id in list(outstanding):
519 519 if msg_id in self.results:
520 520 # we already
521 521 continue
522 522 try:
523 523 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
524 524 except:
525 525 content = error.wrap_exception()
526 526 # build a fake message:
527 527 parent = {}
528 528 header = {}
529 529 parent['msg_id'] = msg_id
530 530 header['engine'] = uuid
531 531 header['date'] = datetime.now().strftime(util.ISO8601)
532 532 msg = dict(parent_header=parent, header=header, content=content)
533 533 self._handle_apply_reply(msg)
534 534
535 535 def _handle_execute_reply(self, msg):
536 536 """Save the reply to an execute_request into our results.
537 537
538 538 execute messages are never actually used. apply is used instead.
539 539 """
540 540
541 541 parent = msg['parent_header']
542 542 msg_id = parent['msg_id']
543 543 if msg_id not in self.outstanding:
544 544 if msg_id in self.history:
545 545 print ("got stale result: %s"%msg_id)
546 546 else:
547 547 print ("got unknown result: %s"%msg_id)
548 548 else:
549 549 self.outstanding.remove(msg_id)
550 550 self.results[msg_id] = self._unwrap_exception(msg['content'])
551 551
552 552 def _handle_apply_reply(self, msg):
553 553 """Save the reply to an apply_request into our results."""
554 554 parent = msg['parent_header']
555 555 msg_id = parent['msg_id']
556 556 if msg_id not in self.outstanding:
557 557 if msg_id in self.history:
558 558 print ("got stale result: %s"%msg_id)
559 559 print self.results[msg_id]
560 560 print msg
561 561 else:
562 562 print ("got unknown result: %s"%msg_id)
563 563 else:
564 564 self.outstanding.remove(msg_id)
565 565 content = msg['content']
566 566 header = msg['header']
567 567
568 568 # construct metadata:
569 569 md = self.metadata[msg_id]
570 570 md.update(self._extract_metadata(header, parent, content))
571 571 # is this redundant?
572 572 self.metadata[msg_id] = md
573 573
574 574 e_outstanding = self._outstanding_dict[md['engine_uuid']]
575 575 if msg_id in e_outstanding:
576 576 e_outstanding.remove(msg_id)
577 577
578 578 # construct result:
579 579 if content['status'] == 'ok':
580 580 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
581 581 elif content['status'] == 'aborted':
582 582 self.results[msg_id] = error.TaskAborted(msg_id)
583 583 elif content['status'] == 'resubmitted':
584 584 # TODO: handle resubmission
585 585 pass
586 586 else:
587 587 self.results[msg_id] = self._unwrap_exception(content)
588 588
589 589 def _flush_notifications(self):
590 590 """Flush notifications of engine registrations waiting
591 591 in ZMQ queue."""
592 592 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
593 593 while msg is not None:
594 594 if self.debug:
595 595 pprint(msg)
596 596 msg = msg[-1]
597 597 msg_type = msg['msg_type']
598 598 handler = self._notification_handlers.get(msg_type, None)
599 599 if handler is None:
600 600 raise Exception("Unhandled message type: %s"%msg.msg_type)
601 601 else:
602 602 handler(msg)
603 603 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
604 604
605 605 def _flush_results(self, sock):
606 606 """Flush task or queue results waiting in ZMQ queue."""
607 607 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
608 608 while msg is not None:
609 609 if self.debug:
610 610 pprint(msg)
611 611 msg = msg[-1]
612 612 msg_type = msg['msg_type']
613 613 handler = self._queue_handlers.get(msg_type, None)
614 614 if handler is None:
615 615 raise Exception("Unhandled message type: %s"%msg.msg_type)
616 616 else:
617 617 handler(msg)
618 618 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
619 619
620 620 def _flush_control(self, sock):
621 621 """Flush replies from the control channel waiting
622 622 in the ZMQ queue.
623 623
624 624 Currently: ignore them."""
625 625 if self._ignored_control_replies <= 0:
626 626 return
627 627 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
628 628 while msg is not None:
629 629 self._ignored_control_replies -= 1
630 630 if self.debug:
631 631 pprint(msg)
632 632 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
633 633
634 634 def _flush_ignored_control(self):
635 635 """flush ignored control replies"""
636 636 while self._ignored_control_replies > 0:
637 637 self.session.recv(self._control_socket)
638 638 self._ignored_control_replies -= 1
639 639
640 640 def _flush_ignored_hub_replies(self):
641 641 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
642 642 while msg is not None:
643 643 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
644 644
645 645 def _flush_iopub(self, sock):
646 646 """Flush replies from the iopub channel waiting
647 647 in the ZMQ queue.
648 648 """
649 649 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
650 650 while msg is not None:
651 651 if self.debug:
652 652 pprint(msg)
653 653 msg = msg[-1]
654 654 parent = msg['parent_header']
655 655 msg_id = parent['msg_id']
656 656 content = msg['content']
657 657 header = msg['header']
658 658 msg_type = msg['msg_type']
659 659
660 660 # init metadata:
661 661 md = self.metadata[msg_id]
662 662
663 663 if msg_type == 'stream':
664 664 name = content['name']
665 665 s = md[name] or ''
666 666 md[name] = s + content['data']
667 667 elif msg_type == 'pyerr':
668 668 md.update({'pyerr' : self._unwrap_exception(content)})
669 669 elif msg_type == 'pyin':
670 670 md.update({'pyin' : content['code']})
671 671 else:
672 672 md.update({msg_type : content.get('data', '')})
673 673
674 674 # reduntant?
675 675 self.metadata[msg_id] = md
676 676
677 677 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
678 678
679 679 #--------------------------------------------------------------------------
680 680 # len, getitem
681 681 #--------------------------------------------------------------------------
682 682
683 683 def __len__(self):
684 684 """len(client) returns # of engines."""
685 685 return len(self.ids)
686 686
687 687 def __getitem__(self, key):
688 688 """index access returns DirectView multiplexer objects
689 689
690 690 Must be int, slice, or list/tuple/xrange of ints"""
691 691 if not isinstance(key, (int, slice, tuple, list, xrange)):
692 692 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
693 693 else:
694 694 return self.direct_view(key)
695 695
696 696 #--------------------------------------------------------------------------
697 697 # Begin public methods
698 698 #--------------------------------------------------------------------------
699 699
700 700 @property
701 701 def ids(self):
702 702 """Always up-to-date ids property."""
703 703 self._flush_notifications()
704 704 # always copy:
705 705 return list(self._ids)
706 706
707 707 def close(self):
708 708 if self._closed:
709 709 return
710 710 snames = filter(lambda n: n.endswith('socket'), dir(self))
711 711 for socket in map(lambda name: getattr(self, name), snames):
712 712 if isinstance(socket, zmq.Socket) and not socket.closed:
713 713 socket.close()
714 714 self._closed = True
715 715
716 716 def spin(self):
717 717 """Flush any registration notifications and execution results
718 718 waiting in the ZMQ queue.
719 719 """
720 720 if self._notification_socket:
721 721 self._flush_notifications()
722 722 if self._mux_socket:
723 723 self._flush_results(self._mux_socket)
724 724 if self._task_socket:
725 725 self._flush_results(self._task_socket)
726 726 if self._control_socket:
727 727 self._flush_control(self._control_socket)
728 728 if self._iopub_socket:
729 729 self._flush_iopub(self._iopub_socket)
730 730 if self._query_socket:
731 731 self._flush_ignored_hub_replies()
732 732
733 733 def wait(self, jobs=None, timeout=-1):
734 734 """waits on one or more `jobs`, for up to `timeout` seconds.
735 735
736 736 Parameters
737 737 ----------
738 738
739 739 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
740 740 ints are indices to self.history
741 741 strs are msg_ids
742 742 default: wait on all outstanding messages
743 743 timeout : float
744 744 a time in seconds, after which to give up.
745 745 default is -1, which means no timeout
746 746
747 747 Returns
748 748 -------
749 749
750 750 True : when all msg_ids are done
751 751 False : timeout reached, some msg_ids still outstanding
752 752 """
753 753 tic = time.time()
754 754 if jobs is None:
755 755 theids = self.outstanding
756 756 else:
757 757 if isinstance(jobs, (int, str, AsyncResult)):
758 758 jobs = [jobs]
759 759 theids = set()
760 760 for job in jobs:
761 761 if isinstance(job, int):
762 762 # index access
763 763 job = self.history[job]
764 764 elif isinstance(job, AsyncResult):
765 765 map(theids.add, job.msg_ids)
766 766 continue
767 767 theids.add(job)
768 768 if not theids.intersection(self.outstanding):
769 769 return True
770 770 self.spin()
771 771 while theids.intersection(self.outstanding):
772 772 if timeout >= 0 and ( time.time()-tic ) > timeout:
773 773 break
774 774 time.sleep(1e-3)
775 775 self.spin()
776 776 return len(theids.intersection(self.outstanding)) == 0
777 777
778 778 #--------------------------------------------------------------------------
779 779 # Control methods
780 780 #--------------------------------------------------------------------------
781 781
782 782 @spin_first
783 783 def clear(self, targets=None, block=None):
784 784 """Clear the namespace in target(s)."""
785 785 block = self.block if block is None else block
786 786 targets = self._build_targets(targets)[0]
787 787 for t in targets:
788 788 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
789 789 error = False
790 790 if block:
791 791 self._flush_ignored_control()
792 792 for i in range(len(targets)):
793 793 idents,msg = self.session.recv(self._control_socket,0)
794 794 if self.debug:
795 795 pprint(msg)
796 796 if msg['content']['status'] != 'ok':
797 797 error = self._unwrap_exception(msg['content'])
798 798 else:
799 799 self._ignored_control_replies += len(targets)
800 800 if error:
801 801 raise error
802 802
803 803
804 804 @spin_first
805 805 def abort(self, jobs=None, targets=None, block=None):
806 806 """Abort specific jobs from the execution queues of target(s).
807 807
808 808 This is a mechanism to prevent jobs that have already been submitted
809 809 from executing.
810 810
811 811 Parameters
812 812 ----------
813 813
814 814 jobs : msg_id, list of msg_ids, or AsyncResult
815 815 The jobs to be aborted
816 816
817 817
818 818 """
819 819 block = self.block if block is None else block
820 820 targets = self._build_targets(targets)[0]
821 821 msg_ids = []
822 822 if isinstance(jobs, (basestring,AsyncResult)):
823 823 jobs = [jobs]
824 824 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
825 825 if bad_ids:
826 826 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
827 827 for j in jobs:
828 828 if isinstance(j, AsyncResult):
829 829 msg_ids.extend(j.msg_ids)
830 830 else:
831 831 msg_ids.append(j)
832 832 content = dict(msg_ids=msg_ids)
833 833 for t in targets:
834 834 self.session.send(self._control_socket, 'abort_request',
835 835 content=content, ident=t)
836 836 error = False
837 837 if block:
838 838 self._flush_ignored_control()
839 839 for i in range(len(targets)):
840 840 idents,msg = self.session.recv(self._control_socket,0)
841 841 if self.debug:
842 842 pprint(msg)
843 843 if msg['content']['status'] != 'ok':
844 844 error = self._unwrap_exception(msg['content'])
845 845 else:
846 846 self._ignored_control_replies += len(targets)
847 847 if error:
848 848 raise error
849 849
850 850 @spin_first
851 851 def shutdown(self, targets=None, restart=False, hub=False, block=None):
852 852 """Terminates one or more engine processes, optionally including the hub."""
853 853 block = self.block if block is None else block
854 854 if hub:
855 855 targets = 'all'
856 856 targets = self._build_targets(targets)[0]
857 857 for t in targets:
858 858 self.session.send(self._control_socket, 'shutdown_request',
859 859 content={'restart':restart},ident=t)
860 860 error = False
861 861 if block or hub:
862 862 self._flush_ignored_control()
863 863 for i in range(len(targets)):
864 864 idents,msg = self.session.recv(self._control_socket, 0)
865 865 if self.debug:
866 866 pprint(msg)
867 867 if msg['content']['status'] != 'ok':
868 868 error = self._unwrap_exception(msg['content'])
869 869 else:
870 870 self._ignored_control_replies += len(targets)
871 871
872 872 if hub:
873 873 time.sleep(0.25)
874 874 self.session.send(self._query_socket, 'shutdown_request')
875 875 idents,msg = self.session.recv(self._query_socket, 0)
876 876 if self.debug:
877 877 pprint(msg)
878 878 if msg['content']['status'] != 'ok':
879 879 error = self._unwrap_exception(msg['content'])
880 880
881 881 if error:
882 882 raise error
883 883
884 884 #--------------------------------------------------------------------------
885 885 # Execution related methods
886 886 #--------------------------------------------------------------------------
887 887
888 888 def _maybe_raise(self, result):
889 889 """wrapper for maybe raising an exception if apply failed."""
890 890 if isinstance(result, error.RemoteError):
891 891 raise result
892 892
893 893 return result
894 894
895 895 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
896 896 ident=None):
897 897 """construct and send an apply message via a socket.
898 898
899 899 This is the principal method with which all engine execution is performed by views.
900 900 """
901 901
902 902 assert not self._closed, "cannot use me anymore, I'm closed!"
903 903 # defaults:
904 904 args = args if args is not None else []
905 905 kwargs = kwargs if kwargs is not None else {}
906 906 subheader = subheader if subheader is not None else {}
907 907
908 908 # validate arguments
909 909 if not callable(f):
910 910 raise TypeError("f must be callable, not %s"%type(f))
911 911 if not isinstance(args, (tuple, list)):
912 912 raise TypeError("args must be tuple or list, not %s"%type(args))
913 913 if not isinstance(kwargs, dict):
914 914 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
915 915 if not isinstance(subheader, dict):
916 916 raise TypeError("subheader must be dict, not %s"%type(subheader))
917 917
918 918 bufs = util.pack_apply_message(f,args,kwargs)
919 919
920 920 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
921 921 subheader=subheader, track=track)
922 922
923 923 msg_id = msg['msg_id']
924 924 self.outstanding.add(msg_id)
925 925 if ident:
926 926 # possibly routed to a specific engine
927 927 if isinstance(ident, list):
928 928 ident = ident[-1]
929 929 if ident in self._engines.values():
930 930 # save for later, in case of engine death
931 931 self._outstanding_dict[ident].add(msg_id)
932 932 self.history.append(msg_id)
933 933 self.metadata[msg_id]['submitted'] = datetime.now()
934 934
935 935 return msg
936 936
937 937 #--------------------------------------------------------------------------
938 938 # construct a View object
939 939 #--------------------------------------------------------------------------
940 940
941 941 def load_balanced_view(self, targets=None):
942 942 """construct a DirectView object.
943 943
944 944 If no arguments are specified, create a LoadBalancedView
945 945 using all engines.
946 946
947 947 Parameters
948 948 ----------
949 949
950 950 targets: list,slice,int,etc. [default: use all engines]
951 951 The subset of engines across which to load-balance
952 952 """
953 953 if targets is not None:
954 954 targets = self._build_targets(targets)[1]
955 955 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
956 956
957 957 def direct_view(self, targets='all'):
958 958 """construct a DirectView object.
959 959
960 960 If no targets are specified, create a DirectView
961 961 using all engines.
962 962
963 963 Parameters
964 964 ----------
965 965
966 966 targets: list,slice,int,etc. [default: use all engines]
967 967 The engines to use for the View
968 968 """
969 969 single = isinstance(targets, int)
970 970 targets = self._build_targets(targets)[1]
971 971 if single:
972 972 targets = targets[0]
973 973 return DirectView(client=self, socket=self._mux_socket, targets=targets)
974 974
975 975 #--------------------------------------------------------------------------
976 976 # Query methods
977 977 #--------------------------------------------------------------------------
978 978
979 979 @spin_first
980 980 def get_result(self, indices_or_msg_ids=None, block=None):
981 981 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
982 982
983 983 If the client already has the results, no request to the Hub will be made.
984 984
985 985 This is a convenient way to construct AsyncResult objects, which are wrappers
986 986 that include metadata about execution, and allow for awaiting results that
987 987 were not submitted by this Client.
988 988
989 989 It can also be a convenient way to retrieve the metadata associated with
990 990 blocking execution, since it always retrieves
991 991
992 992 Examples
993 993 --------
994 994 ::
995 995
996 996 In [10]: r = client.apply()
997 997
998 998 Parameters
999 999 ----------
1000 1000
1001 1001 indices_or_msg_ids : integer history index, str msg_id, or list of either
1002 1002 The indices or msg_ids of indices to be retrieved
1003 1003
1004 1004 block : bool
1005 1005 Whether to wait for the result to be done
1006 1006
1007 1007 Returns
1008 1008 -------
1009 1009
1010 1010 AsyncResult
1011 1011 A single AsyncResult object will always be returned.
1012 1012
1013 1013 AsyncHubResult
1014 1014 A subclass of AsyncResult that retrieves results from the Hub
1015 1015
1016 1016 """
1017 1017 block = self.block if block is None else block
1018 1018 if indices_or_msg_ids is None:
1019 1019 indices_or_msg_ids = -1
1020 1020
1021 1021 if not isinstance(indices_or_msg_ids, (list,tuple)):
1022 1022 indices_or_msg_ids = [indices_or_msg_ids]
1023 1023
1024 1024 theids = []
1025 1025 for id in indices_or_msg_ids:
1026 1026 if isinstance(id, int):
1027 1027 id = self.history[id]
1028 1028 if not isinstance(id, str):
1029 1029 raise TypeError("indices must be str or int, not %r"%id)
1030 1030 theids.append(id)
1031 1031
1032 1032 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1033 1033 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1034 1034
1035 1035 if remote_ids:
1036 1036 ar = AsyncHubResult(self, msg_ids=theids)
1037 1037 else:
1038 1038 ar = AsyncResult(self, msg_ids=theids)
1039 1039
1040 1040 if block:
1041 1041 ar.wait()
1042 1042
1043 1043 return ar
1044 1044
1045 1045 @spin_first
1046 1046 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1047 1047 """Resubmit one or more tasks.
1048 1048
1049 1049 in-flight tasks may not be resubmitted.
1050 1050
1051 1051 Parameters
1052 1052 ----------
1053 1053
1054 1054 indices_or_msg_ids : integer history index, str msg_id, or list of either
1055 1055 The indices or msg_ids of indices to be retrieved
1056 1056
1057 1057 block : bool
1058 1058 Whether to wait for the result to be done
1059 1059
1060 1060 Returns
1061 1061 -------
1062 1062
1063 1063 AsyncHubResult
1064 1064 A subclass of AsyncResult that retrieves results from the Hub
1065 1065
1066 1066 """
1067 1067 block = self.block if block is None else block
1068 1068 if indices_or_msg_ids is None:
1069 1069 indices_or_msg_ids = -1
1070 1070
1071 1071 if not isinstance(indices_or_msg_ids, (list,tuple)):
1072 1072 indices_or_msg_ids = [indices_or_msg_ids]
1073 1073
1074 1074 theids = []
1075 1075 for id in indices_or_msg_ids:
1076 1076 if isinstance(id, int):
1077 1077 id = self.history[id]
1078 1078 if not isinstance(id, str):
1079 1079 raise TypeError("indices must be str or int, not %r"%id)
1080 1080 theids.append(id)
1081 1081
1082 1082 for msg_id in theids:
1083 1083 self.outstanding.discard(msg_id)
1084 1084 if msg_id in self.history:
1085 1085 self.history.remove(msg_id)
1086 1086 self.results.pop(msg_id, None)
1087 1087 self.metadata.pop(msg_id, None)
1088 1088 content = dict(msg_ids = theids)
1089 1089
1090 1090 self.session.send(self._query_socket, 'resubmit_request', content)
1091 1091
1092 1092 zmq.select([self._query_socket], [], [])
1093 1093 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1094 1094 if self.debug:
1095 1095 pprint(msg)
1096 1096 content = msg['content']
1097 1097 if content['status'] != 'ok':
1098 1098 raise self._unwrap_exception(content)
1099 1099
1100 1100 ar = AsyncHubResult(self, msg_ids=theids)
1101 1101
1102 1102 if block:
1103 1103 ar.wait()
1104 1104
1105 1105 return ar
1106 1106
1107 1107 @spin_first
1108 1108 def result_status(self, msg_ids, status_only=True):
1109 1109 """Check on the status of the result(s) of the apply request with `msg_ids`.
1110 1110
1111 1111 If status_only is False, then the actual results will be retrieved, else
1112 1112 only the status of the results will be checked.
1113 1113
1114 1114 Parameters
1115 1115 ----------
1116 1116
1117 1117 msg_ids : list of msg_ids
1118 1118 if int:
1119 1119 Passed as index to self.history for convenience.
1120 1120 status_only : bool (default: True)
1121 1121 if False:
1122 1122 Retrieve the actual results of completed tasks.
1123 1123
1124 1124 Returns
1125 1125 -------
1126 1126
1127 1127 results : dict
1128 1128 There will always be the keys 'pending' and 'completed', which will
1129 1129 be lists of msg_ids that are incomplete or complete. If `status_only`
1130 1130 is False, then completed results will be keyed by their `msg_id`.
1131 1131 """
1132 1132 if not isinstance(msg_ids, (list,tuple)):
1133 1133 msg_ids = [msg_ids]
1134 1134
1135 1135 theids = []
1136 1136 for msg_id in msg_ids:
1137 1137 if isinstance(msg_id, int):
1138 1138 msg_id = self.history[msg_id]
1139 1139 if not isinstance(msg_id, basestring):
1140 1140 raise TypeError("msg_ids must be str, not %r"%msg_id)
1141 1141 theids.append(msg_id)
1142 1142
1143 1143 completed = []
1144 1144 local_results = {}
1145 1145
1146 1146 # comment this block out to temporarily disable local shortcut:
1147 1147 for msg_id in theids:
1148 1148 if msg_id in self.results:
1149 1149 completed.append(msg_id)
1150 1150 local_results[msg_id] = self.results[msg_id]
1151 1151 theids.remove(msg_id)
1152 1152
1153 1153 if theids: # some not locally cached
1154 1154 content = dict(msg_ids=theids, status_only=status_only)
1155 1155 msg = self.session.send(self._query_socket, "result_request", content=content)
1156 1156 zmq.select([self._query_socket], [], [])
1157 1157 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1158 1158 if self.debug:
1159 1159 pprint(msg)
1160 1160 content = msg['content']
1161 1161 if content['status'] != 'ok':
1162 1162 raise self._unwrap_exception(content)
1163 1163 buffers = msg['buffers']
1164 1164 else:
1165 1165 content = dict(completed=[],pending=[])
1166 1166
1167 1167 content['completed'].extend(completed)
1168 1168
1169 1169 if status_only:
1170 1170 return content
1171 1171
1172 1172 failures = []
1173 1173 # load cached results into result:
1174 1174 content.update(local_results)
1175 1175 # update cache with results:
1176 1176 for msg_id in sorted(theids):
1177 1177 if msg_id in content['completed']:
1178 1178 rec = content[msg_id]
1179 1179 parent = rec['header']
1180 1180 header = rec['result_header']
1181 1181 rcontent = rec['result_content']
1182 1182 iodict = rec['io']
1183 1183 if isinstance(rcontent, str):
1184 1184 rcontent = self.session.unpack(rcontent)
1185 1185
1186 1186 md = self.metadata[msg_id]
1187 1187 md.update(self._extract_metadata(header, parent, rcontent))
1188 1188 md.update(iodict)
1189 1189
1190 1190 if rcontent['status'] == 'ok':
1191 1191 res,buffers = util.unserialize_object(buffers)
1192 1192 else:
1193 1193 print rcontent
1194 1194 res = self._unwrap_exception(rcontent)
1195 1195 failures.append(res)
1196 1196
1197 1197 self.results[msg_id] = res
1198 1198 content[msg_id] = res
1199 1199
1200 1200 if len(theids) == 1 and failures:
1201 1201 raise failures[0]
1202 1202
1203 1203 error.collect_exceptions(failures, "result_status")
1204 1204 return content
1205 1205
1206 1206 @spin_first
1207 1207 def queue_status(self, targets='all', verbose=False):
1208 1208 """Fetch the status of engine queues.
1209 1209
1210 1210 Parameters
1211 1211 ----------
1212 1212
1213 1213 targets : int/str/list of ints/strs
1214 1214 the engines whose states are to be queried.
1215 1215 default : all
1216 1216 verbose : bool
1217 1217 Whether to return lengths only, or lists of ids for each element
1218 1218 """
1219 1219 engine_ids = self._build_targets(targets)[1]
1220 1220 content = dict(targets=engine_ids, verbose=verbose)
1221 1221 self.session.send(self._query_socket, "queue_request", content=content)
1222 1222 idents,msg = self.session.recv(self._query_socket, 0)
1223 1223 if self.debug:
1224 1224 pprint(msg)
1225 1225 content = msg['content']
1226 1226 status = content.pop('status')
1227 1227 if status != 'ok':
1228 1228 raise self._unwrap_exception(content)
1229 1229 content = util.rekey(content)
1230 1230 if isinstance(targets, int):
1231 1231 return content[targets]
1232 1232 else:
1233 1233 return content
1234 1234
1235 1235 @spin_first
1236 1236 def purge_results(self, jobs=[], targets=[]):
1237 1237 """Tell the Hub to forget results.
1238 1238
1239 1239 Individual results can be purged by msg_id, or the entire
1240 1240 history of specific targets can be purged.
1241 1241
1242 1242 Parameters
1243 1243 ----------
1244 1244
1245 1245 jobs : str or list of str or AsyncResult objects
1246 1246 the msg_ids whose results should be forgotten.
1247 1247 targets : int/str/list of ints/strs
1248 1248 The targets, by uuid or int_id, whose entire history is to be purged.
1249 1249 Use `targets='all'` to scrub everything from the Hub's memory.
1250 1250
1251 1251 default : None
1252 1252 """
1253 1253 if not targets and not jobs:
1254 1254 raise ValueError("Must specify at least one of `targets` and `jobs`")
1255 1255 if targets:
1256 1256 targets = self._build_targets(targets)[1]
1257 1257
1258 1258 # construct msg_ids from jobs
1259 1259 msg_ids = []
1260 1260 if isinstance(jobs, (basestring,AsyncResult)):
1261 1261 jobs = [jobs]
1262 1262 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1263 1263 if bad_ids:
1264 1264 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1265 1265 for j in jobs:
1266 1266 if isinstance(j, AsyncResult):
1267 1267 msg_ids.extend(j.msg_ids)
1268 1268 else:
1269 1269 msg_ids.append(j)
1270 1270
1271 1271 content = dict(targets=targets, msg_ids=msg_ids)
1272 1272 self.session.send(self._query_socket, "purge_request", content=content)
1273 1273 idents, msg = self.session.recv(self._query_socket, 0)
1274 1274 if self.debug:
1275 1275 pprint(msg)
1276 1276 content = msg['content']
1277 1277 if content['status'] != 'ok':
1278 1278 raise self._unwrap_exception(content)
1279 1279
1280 1280 @spin_first
1281 1281 def hub_history(self):
1282 1282 """Get the Hub's history
1283 1283
1284 1284 Just like the Client, the Hub has a history, which is a list of msg_ids.
1285 1285 This will contain the history of all clients, and, depending on configuration,
1286 1286 may contain history across multiple cluster sessions.
1287 1287
1288 1288 Any msg_id returned here is a valid argument to `get_result`.
1289 1289
1290 1290 Returns
1291 1291 -------
1292 1292
1293 1293 msg_ids : list of strs
1294 1294 list of all msg_ids, ordered by task submission time.
1295 1295 """
1296 1296
1297 1297 self.session.send(self._query_socket, "history_request", content={})
1298 1298 idents, msg = self.session.recv(self._query_socket, 0)
1299 1299
1300 1300 if self.debug:
1301 1301 pprint(msg)
1302 1302 content = msg['content']
1303 1303 if content['status'] != 'ok':
1304 1304 raise self._unwrap_exception(content)
1305 1305 else:
1306 1306 return content['history']
1307 1307
1308 1308 @spin_first
1309 1309 def db_query(self, query, keys=None):
1310 1310 """Query the Hub's TaskRecord database
1311 1311
1312 1312 This will return a list of task record dicts that match `query`
1313 1313
1314 1314 Parameters
1315 1315 ----------
1316 1316
1317 1317 query : mongodb query dict
1318 1318 The search dict. See mongodb query docs for details.
1319 1319 keys : list of strs [optional]
1320 1320 The subset of keys to be returned. The default is to fetch everything but buffers.
1321 1321 'msg_id' will *always* be included.
1322 1322 """
1323 1323 if isinstance(keys, basestring):
1324 1324 keys = [keys]
1325 1325 content = dict(query=query, keys=keys)
1326 1326 self.session.send(self._query_socket, "db_request", content=content)
1327 1327 idents, msg = self.session.recv(self._query_socket, 0)
1328 1328 if self.debug:
1329 1329 pprint(msg)
1330 1330 content = msg['content']
1331 1331 if content['status'] != 'ok':
1332 1332 raise self._unwrap_exception(content)
1333 1333
1334 1334 records = content['records']
1335 1335 buffer_lens = content['buffer_lens']
1336 1336 result_buffer_lens = content['result_buffer_lens']
1337 1337 buffers = msg['buffers']
1338 1338 has_bufs = buffer_lens is not None
1339 1339 has_rbufs = result_buffer_lens is not None
1340 1340 for i,rec in enumerate(records):
1341 1341 # relink buffers
1342 1342 if has_bufs:
1343 1343 blen = buffer_lens[i]
1344 1344 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1345 1345 if has_rbufs:
1346 1346 blen = result_buffer_lens[i]
1347 1347 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1348 1348 # turn timestamps back into times
1349 1349 for key in 'submitted started completed resubmitted'.split():
1350 1350 maybedate = rec.get(key, None)
1351 1351 if maybedate and util.ISO8601_RE.match(maybedate):
1352 1352 rec[key] = datetime.strptime(maybedate, util.ISO8601)
1353 1353
1354 1354 return records
1355 1355
1356 1356 __all__ = [ 'Client' ]
@@ -1,333 +1,339 b''
1 1 """A TaskRecord backend using sqlite3"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2011 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 import json
10 10 import os
11 11 import cPickle as pickle
12 12 from datetime import datetime
13 13
14 14 import sqlite3
15 15
16 16 from zmq.eventloop import ioloop
17 17
18 18 from IPython.utils.traitlets import Unicode, Instance, List
19 19 from .dictdb import BaseDB
20 20 from IPython.parallel.util import ISO8601
21 21
22 22 #-----------------------------------------------------------------------------
23 23 # SQLite operators, adapters, and converters
24 24 #-----------------------------------------------------------------------------
25 25
26 26 operators = {
27 27 '$lt' : "<",
28 28 '$gt' : ">",
29 29 # null is handled weird with ==,!=
30 30 '$eq' : "=",
31 31 '$ne' : "!=",
32 32 '$lte': "<=",
33 33 '$gte': ">=",
34 34 '$in' : ('=', ' OR '),
35 35 '$nin': ('!=', ' AND '),
36 36 # '$all': None,
37 37 # '$mod': None,
38 38 # '$exists' : None
39 39 }
40 40 null_operators = {
41 41 '=' : "IS NULL",
42 42 '!=' : "IS NOT NULL",
43 43 }
44 44
45 45 def _adapt_datetime(dt):
46 46 return dt.strftime(ISO8601)
47 47
48 48 def _convert_datetime(ds):
49 49 if ds is None:
50 50 return ds
51 51 else:
52 52 return datetime.strptime(ds, ISO8601)
53 53
54 54 def _adapt_dict(d):
55 55 return json.dumps(d)
56 56
57 57 def _convert_dict(ds):
58 58 if ds is None:
59 59 return ds
60 60 else:
61 61 return json.loads(ds)
62 62
63 63 def _adapt_bufs(bufs):
64 64 # this is *horrible*
65 65 # copy buffers into single list and pickle it:
66 66 if bufs and isinstance(bufs[0], (bytes, buffer)):
67 67 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
68 68 elif bufs:
69 69 return bufs
70 70 else:
71 71 return None
72 72
73 73 def _convert_bufs(bs):
74 74 if bs is None:
75 75 return []
76 76 else:
77 77 return pickle.loads(bytes(bs))
78 78
79 79 #-----------------------------------------------------------------------------
80 80 # SQLiteDB class
81 81 #-----------------------------------------------------------------------------
82 82
83 83 class SQLiteDB(BaseDB):
84 84 """SQLite3 TaskRecord backend."""
85 85
86 86 filename = Unicode('tasks.db', config=True,
87 87 help="""The filename of the sqlite task database. [default: 'tasks.db']""")
88 88 location = Unicode('', config=True,
89 89 help="""The directory containing the sqlite task database. The default
90 90 is to use the cluster_dir location.""")
91 91 table = Unicode("", config=True,
92 92 help="""The SQLite Table to use for storing tasks for this session. If unspecified,
93 93 a new table will be created with the Hub's IDENT. Specifying the table will result
94 94 in tasks from previous sessions being available via Clients' db_query and
95 95 get_result methods.""")
96 96
97 97 _db = Instance('sqlite3.Connection')
98 98 _keys = List(['msg_id' ,
99 99 'header' ,
100 100 'content',
101 101 'buffers',
102 102 'submitted',
103 103 'client_uuid' ,
104 104 'engine_uuid' ,
105 105 'started',
106 106 'completed',
107 107 'resubmitted',
108 108 'result_header' ,
109 109 'result_content' ,
110 110 'result_buffers' ,
111 111 'queue' ,
112 112 'pyin' ,
113 113 'pyout',
114 114 'pyerr',
115 115 'stdout',
116 116 'stderr',
117 117 ])
118 118
119 119 def __init__(self, **kwargs):
120 120 super(SQLiteDB, self).__init__(**kwargs)
121 121 if not self.table:
122 122 # use session, and prefix _, since starting with # is illegal
123 123 self.table = '_'+self.session.replace('-','_')
124 124 if not self.location:
125 if hasattr(self.config.Global, 'cluster_dir'):
126 self.location = self.config.Global.cluster_dir
125 # get current profile
126 from IPython.core.newapplication import BaseIPythonApplication
127 if BaseIPythonApplication.initialized():
128 app = BaseIPythonApplication.instance()
129 if app.profile_dir is not None:
130 self.location = app.profile_dir.location
131 else:
132 self.location = u'.'
127 133 else:
128 self.location = '.'
134 self.location = u'.'
129 135 self._init_db()
130 136
131 137 # register db commit as 2s periodic callback
132 138 # to prevent clogging pipes
133 139 # assumes we are being run in a zmq ioloop app
134 140 loop = ioloop.IOLoop.instance()
135 141 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
136 142 pc.start()
137 143
138 144 def _defaults(self, keys=None):
139 145 """create an empty record"""
140 146 d = {}
141 147 keys = self._keys if keys is None else keys
142 148 for key in keys:
143 149 d[key] = None
144 150 return d
145 151
146 152 def _init_db(self):
147 153 """Connect to the database and get new session number."""
148 154 # register adapters
149 155 sqlite3.register_adapter(datetime, _adapt_datetime)
150 156 sqlite3.register_converter('datetime', _convert_datetime)
151 157 sqlite3.register_adapter(dict, _adapt_dict)
152 158 sqlite3.register_converter('dict', _convert_dict)
153 159 sqlite3.register_adapter(list, _adapt_bufs)
154 160 sqlite3.register_converter('bufs', _convert_bufs)
155 161 # connect to the db
156 162 dbfile = os.path.join(self.location, self.filename)
157 163 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
158 164 # isolation_level = None)#,
159 165 cached_statements=64)
160 166 # print dir(self._db)
161 167
162 168 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
163 169 (msg_id text PRIMARY KEY,
164 170 header dict text,
165 171 content dict text,
166 172 buffers bufs blob,
167 173 submitted datetime text,
168 174 client_uuid text,
169 175 engine_uuid text,
170 176 started datetime text,
171 177 completed datetime text,
172 178 resubmitted datetime text,
173 179 result_header dict text,
174 180 result_content dict text,
175 181 result_buffers bufs blob,
176 182 queue text,
177 183 pyin text,
178 184 pyout text,
179 185 pyerr text,
180 186 stdout text,
181 187 stderr text)
182 188 """%self.table)
183 189 self._db.commit()
184 190
185 191 def _dict_to_list(self, d):
186 192 """turn a mongodb-style record dict into a list."""
187 193
188 194 return [ d[key] for key in self._keys ]
189 195
190 196 def _list_to_dict(self, line, keys=None):
191 197 """Inverse of dict_to_list"""
192 198 keys = self._keys if keys is None else keys
193 199 d = self._defaults(keys)
194 200 for key,value in zip(keys, line):
195 201 d[key] = value
196 202
197 203 return d
198 204
199 205 def _render_expression(self, check):
200 206 """Turn a mongodb-style search dict into an SQL query."""
201 207 expressions = []
202 208 args = []
203 209
204 210 skeys = set(check.keys())
205 211 skeys.difference_update(set(self._keys))
206 212 skeys.difference_update(set(['buffers', 'result_buffers']))
207 213 if skeys:
208 214 raise KeyError("Illegal testing key(s): %s"%skeys)
209 215
210 216 for name,sub_check in check.iteritems():
211 217 if isinstance(sub_check, dict):
212 218 for test,value in sub_check.iteritems():
213 219 try:
214 220 op = operators[test]
215 221 except KeyError:
216 222 raise KeyError("Unsupported operator: %r"%test)
217 223 if isinstance(op, tuple):
218 224 op, join = op
219 225
220 226 if value is None and op in null_operators:
221 227 expr = "%s %s"%null_operators[op]
222 228 else:
223 229 expr = "%s %s ?"%(name, op)
224 230 if isinstance(value, (tuple,list)):
225 231 if op in null_operators and any([v is None for v in value]):
226 232 # equality tests don't work with NULL
227 233 raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test)
228 234 expr = '( %s )'%( join.join([expr]*len(value)) )
229 235 args.extend(value)
230 236 else:
231 237 args.append(value)
232 238 expressions.append(expr)
233 239 else:
234 240 # it's an equality check
235 241 if sub_check is None:
236 242 expressions.append("%s IS NULL")
237 243 else:
238 244 expressions.append("%s = ?"%name)
239 245 args.append(sub_check)
240 246
241 247 expr = " AND ".join(expressions)
242 248 return expr, args
243 249
244 250 def add_record(self, msg_id, rec):
245 251 """Add a new Task Record, by msg_id."""
246 252 d = self._defaults()
247 253 d.update(rec)
248 254 d['msg_id'] = msg_id
249 255 line = self._dict_to_list(d)
250 256 tups = '(%s)'%(','.join(['?']*len(line)))
251 257 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
252 258 # self._db.commit()
253 259
254 260 def get_record(self, msg_id):
255 261 """Get a specific Task Record, by msg_id."""
256 262 cursor = self._db.execute("""SELECT * FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
257 263 line = cursor.fetchone()
258 264 if line is None:
259 265 raise KeyError("No such msg: %r"%msg_id)
260 266 return self._list_to_dict(line)
261 267
262 268 def update_record(self, msg_id, rec):
263 269 """Update the data in an existing record."""
264 270 query = "UPDATE %s SET "%self.table
265 271 sets = []
266 272 keys = sorted(rec.keys())
267 273 values = []
268 274 for key in keys:
269 275 sets.append('%s = ?'%key)
270 276 values.append(rec[key])
271 277 query += ', '.join(sets)
272 278 query += ' WHERE msg_id == ?'
273 279 values.append(msg_id)
274 280 self._db.execute(query, values)
275 281 # self._db.commit()
276 282
277 283 def drop_record(self, msg_id):
278 284 """Remove a record from the DB."""
279 285 self._db.execute("""DELETE FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
280 286 # self._db.commit()
281 287
282 288 def drop_matching_records(self, check):
283 289 """Remove a record from the DB."""
284 290 expr,args = self._render_expression(check)
285 291 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
286 292 self._db.execute(query,args)
287 293 # self._db.commit()
288 294
289 295 def find_records(self, check, keys=None):
290 296 """Find records matching a query dict, optionally extracting subset of keys.
291 297
292 298 Returns list of matching records.
293 299
294 300 Parameters
295 301 ----------
296 302
297 303 check: dict
298 304 mongodb-style query argument
299 305 keys: list of strs [optional]
300 306 if specified, the subset of keys to extract. msg_id will *always* be
301 307 included.
302 308 """
303 309 if keys:
304 310 bad_keys = [ key for key in keys if key not in self._keys ]
305 311 if bad_keys:
306 312 raise KeyError("Bad record key(s): %s"%bad_keys)
307 313
308 314 if keys:
309 315 # ensure msg_id is present and first:
310 316 if 'msg_id' in keys:
311 317 keys.remove('msg_id')
312 318 keys.insert(0, 'msg_id')
313 319 req = ', '.join(keys)
314 320 else:
315 321 req = '*'
316 322 expr,args = self._render_expression(check)
317 323 query = """SELECT %s FROM %s WHERE %s"""%(req, self.table, expr)
318 324 cursor = self._db.execute(query, args)
319 325 matches = cursor.fetchall()
320 326 records = []
321 327 for line in matches:
322 328 rec = self._list_to_dict(line, keys)
323 329 records.append(rec)
324 330 return records
325 331
326 332 def get_history(self):
327 333 """get all msg_ids, ordered by time submitted."""
328 334 query = """SELECT msg_id FROM %s ORDER by submitted ASC"""%self.table
329 335 cursor = self._db.execute(query)
330 336 # will be a list of length 1 tuples
331 337 return [ tup[0] for tup in cursor.fetchall()]
332 338
333 339 __all__ = ['SQLiteDB'] No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now