##// END OF EJS Templates
stop using deprecated DelayedCallback...
MinRK -
Show More
@@ -1,173 +1,174
1 1 """Manage IPython.parallel clusters in the notebook.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 from tornado import web
20 from zmq.eventloop import ioloop
21 20
22 21 from IPython.config.configurable import LoggingConfigurable
23 from IPython.utils.traitlets import Dict, Instance, CFloat
22 from IPython.utils.traitlets import Dict, Instance, Float
24 23 from IPython.core.profileapp import list_profiles_in
25 24 from IPython.core.profiledir import ProfileDir
26 25 from IPython.utils import py3compat
27 26 from IPython.utils.path import get_ipython_dir
28 27
29 28
30 29 #-----------------------------------------------------------------------------
31 30 # Classes
32 31 #-----------------------------------------------------------------------------
33 32
34 33
35 34
36 35
37 36 class ClusterManager(LoggingConfigurable):
38 37
39 38 profiles = Dict()
40 39
41 delay = CFloat(1., config=True,
40 delay = Float(1., config=True,
42 41 help="delay (in s) between starting the controller and the engines")
43 42
44 43 loop = Instance('zmq.eventloop.ioloop.IOLoop')
45 44 def _loop_default(self):
46 45 from zmq.eventloop.ioloop import IOLoop
47 46 return IOLoop.instance()
48 47
49 48 def build_launchers(self, profile_dir):
50 49 from IPython.parallel.apps.ipclusterapp import IPClusterStart
51 50
52 51 class DummyIPClusterStart(IPClusterStart):
53 52 """Dummy subclass to skip init steps that conflict with global app.
54 53
55 54 Instantiating and initializing this class should result in fully configured
56 55 launchers, but no other side effects or state.
57 56 """
58 57
59 58 def init_signal(self):
60 59 pass
61 60 def reinit_logging(self):
62 61 pass
63 62
64 63 starter = DummyIPClusterStart(log=self.log)
65 64 starter.initialize(['--profile-dir', profile_dir])
66 65 cl = starter.controller_launcher
67 66 esl = starter.engine_launcher
68 67 n = starter.n
69 68 return cl, esl, n
70 69
71 70 def get_profile_dir(self, name, path):
72 71 p = ProfileDir.find_profile_dir_by_name(path,name=name)
73 72 return p.location
74 73
75 74 def update_profiles(self):
76 75 """List all profiles in the ipython_dir and cwd.
77 76 """
78 77 for path in [get_ipython_dir(), py3compat.getcwd()]:
79 78 for profile in list_profiles_in(path):
80 79 pd = self.get_profile_dir(profile, path)
81 80 if profile not in self.profiles:
82 81 self.log.debug("Adding cluster profile '%s'" % profile)
83 82 self.profiles[profile] = {
84 83 'profile': profile,
85 84 'profile_dir': pd,
86 85 'status': 'stopped'
87 86 }
88 87
89 88 def list_profiles(self):
90 89 self.update_profiles()
91 90 # sorted list, but ensure that 'default' always comes first
92 91 default_first = lambda name: name if name != 'default' else ''
93 92 result = [self.profile_info(p) for p in sorted(self.profiles, key=default_first)]
94 93 return result
95 94
96 95 def check_profile(self, profile):
97 96 if profile not in self.profiles:
98 97 raise web.HTTPError(404, u'profile not found')
99 98
100 99 def profile_info(self, profile):
101 100 self.check_profile(profile)
102 101 result = {}
103 102 data = self.profiles.get(profile)
104 103 result['profile'] = profile
105 104 result['profile_dir'] = data['profile_dir']
106 105 result['status'] = data['status']
107 106 if 'n' in data:
108 107 result['n'] = data['n']
109 108 return result
110 109
111 110 def start_cluster(self, profile, n=None):
112 111 """Start a cluster for a given profile."""
113 112 self.check_profile(profile)
114 113 data = self.profiles[profile]
115 114 if data['status'] == 'running':
116 115 raise web.HTTPError(409, u'cluster already running')
117 116 cl, esl, default_n = self.build_launchers(data['profile_dir'])
118 117 n = n if n is not None else default_n
119 118 def clean_data():
120 119 data.pop('controller_launcher',None)
121 120 data.pop('engine_set_launcher',None)
122 121 data.pop('n',None)
123 122 data['status'] = 'stopped'
124 123 def engines_stopped(r):
125 124 self.log.debug('Engines stopped')
126 125 if cl.running:
127 126 cl.stop()
128 127 clean_data()
129 128 esl.on_stop(engines_stopped)
130 129 def controller_stopped(r):
131 130 self.log.debug('Controller stopped')
132 131 if esl.running:
133 132 esl.stop()
134 133 clean_data()
135 134 cl.on_stop(controller_stopped)
135 loop = self.loop
136 136
137 dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
138 dc.start()
139 dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
140 dc.start()
137 def start():
138 """start the controller, then the engines after a delay"""
139 cl.start()
140 loop.add_timeout(self.loop.time() + self.delay, lambda : esl.start(n))
141 self.loop.add_callback(start)
141 142
142 143 self.log.debug('Cluster started')
143 144 data['controller_launcher'] = cl
144 145 data['engine_set_launcher'] = esl
145 146 data['n'] = n
146 147 data['status'] = 'running'
147 148 return self.profile_info(profile)
148 149
149 150 def stop_cluster(self, profile):
150 151 """Stop a cluster for a given profile."""
151 152 self.check_profile(profile)
152 153 data = self.profiles[profile]
153 154 if data['status'] == 'stopped':
154 155 raise web.HTTPError(409, u'cluster not running')
155 156 data = self.profiles[profile]
156 157 cl = data['controller_launcher']
157 158 esl = data['engine_set_launcher']
158 159 if cl.running:
159 160 cl.stop()
160 161 if esl.running:
161 162 esl.stop()
162 163 # Return a temp info dict, the real one is updated in the on_stop
163 164 # logic above.
164 165 result = {
165 166 'profile': data['profile'],
166 167 'profile_dir': data['profile_dir'],
167 168 'status': 'stopped'
168 169 }
169 170 return result
170 171
171 172 def stop_all_clusters(self):
172 173 for p in self.profiles.keys():
173 174 self.stop_cluster(p)
@@ -1,260 +1,242
1 1 # encoding: utf-8
2 2 """
3 3 The Base Application class for IPython.parallel apps
4
5 Authors:
6
7 * Brian Granger
8 * Min RK
9
10 4 """
11 5
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2011 The IPython Development Team
14 #
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
18
19 #-----------------------------------------------------------------------------
20 # Imports
21 #-----------------------------------------------------------------------------
22 6
23 7 import os
24 8 import logging
25 9 import re
26 10 import sys
27 11
28 from subprocess import Popen, PIPE
29
30 12 from IPython.config.application import catch_config_error, LevelFormatter
31 13 from IPython.core import release
32 14 from IPython.core.crashhandler import CrashHandler
33 15 from IPython.core.application import (
34 16 BaseIPythonApplication,
35 17 base_aliases as base_ip_aliases,
36 18 base_flags as base_ip_flags
37 19 )
38 20 from IPython.utils.path import expand_path
39 21 from IPython.utils.process import check_pid
40 22 from IPython.utils import py3compat
41 23 from IPython.utils.py3compat import unicode_type
42 24
43 25 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict
44 26
45 27 #-----------------------------------------------------------------------------
46 28 # Module errors
47 29 #-----------------------------------------------------------------------------
48 30
49 31 class PIDFileError(Exception):
50 32 pass
51 33
52 34
53 35 #-----------------------------------------------------------------------------
54 36 # Crash handler for this application
55 37 #-----------------------------------------------------------------------------
56 38
57 39 class ParallelCrashHandler(CrashHandler):
58 40 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
59 41
60 42 def __init__(self, app):
61 43 contact_name = release.authors['Min'][0]
62 44 contact_email = release.author_email
63 45 bug_tracker = 'https://github.com/ipython/ipython/issues'
64 46 super(ParallelCrashHandler,self).__init__(
65 47 app, contact_name, contact_email, bug_tracker
66 48 )
67 49
68 50
69 51 #-----------------------------------------------------------------------------
70 52 # Main application
71 53 #-----------------------------------------------------------------------------
72 54 base_aliases = {}
73 55 base_aliases.update(base_ip_aliases)
74 56 base_aliases.update({
75 57 'work-dir' : 'BaseParallelApplication.work_dir',
76 58 'log-to-file' : 'BaseParallelApplication.log_to_file',
77 59 'clean-logs' : 'BaseParallelApplication.clean_logs',
78 60 'log-url' : 'BaseParallelApplication.log_url',
79 61 'cluster-id' : 'BaseParallelApplication.cluster_id',
80 62 })
81 63
82 64 base_flags = {
83 65 'log-to-file' : (
84 66 {'BaseParallelApplication' : {'log_to_file' : True}},
85 67 "send log output to a file"
86 68 )
87 69 }
88 70 base_flags.update(base_ip_flags)
89 71
90 72 class BaseParallelApplication(BaseIPythonApplication):
91 73 """The base Application for IPython.parallel apps
92 74
93 75 Principle extensions to BaseIPyythonApplication:
94 76
95 77 * work_dir
96 78 * remote logging via pyzmq
97 79 * IOLoop instance
98 80 """
99 81
100 82 crash_handler_class = ParallelCrashHandler
101 83
102 84 def _log_level_default(self):
103 85 # temporarily override default_log_level to INFO
104 86 return logging.INFO
105 87
106 88 def _log_format_default(self):
107 89 """override default log format to include time"""
108 90 return u"%(asctime)s.%(msecs).03d [%(name)s]%(highlevel)s %(message)s"
109 91
110 92 work_dir = Unicode(py3compat.getcwd(), config=True,
111 93 help='Set the working dir for the process.'
112 94 )
113 95 def _work_dir_changed(self, name, old, new):
114 96 self.work_dir = unicode_type(expand_path(new))
115 97
116 98 log_to_file = Bool(config=True,
117 99 help="whether to log to a file")
118 100
119 101 clean_logs = Bool(False, config=True,
120 102 help="whether to cleanup old logfiles before starting")
121 103
122 104 log_url = Unicode('', config=True,
123 105 help="The ZMQ URL of the iplogger to aggregate logging.")
124 106
125 107 cluster_id = Unicode('', config=True,
126 108 help="""String id to add to runtime files, to prevent name collisions when
127 109 using multiple clusters with a single profile simultaneously.
128 110
129 111 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
130 112
131 113 Since this is text inserted into filenames, typical recommendations apply:
132 114 Simple character strings are ideal, and spaces are not recommended (but should
133 115 generally work).
134 116 """
135 117 )
136 118 def _cluster_id_changed(self, name, old, new):
137 119 self.name = self.__class__.name
138 120 if new:
139 121 self.name += '-%s'%new
140 122
141 123 def _config_files_default(self):
142 124 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
143 125
144 126 loop = Instance('zmq.eventloop.ioloop.IOLoop')
145 127 def _loop_default(self):
146 128 from zmq.eventloop.ioloop import IOLoop
147 129 return IOLoop.instance()
148 130
149 131 aliases = Dict(base_aliases)
150 132 flags = Dict(base_flags)
151 133
152 134 @catch_config_error
153 135 def initialize(self, argv=None):
154 136 """initialize the app"""
155 137 super(BaseParallelApplication, self).initialize(argv)
156 138 self.to_work_dir()
157 139 self.reinit_logging()
158 140
159 141 def to_work_dir(self):
160 142 wd = self.work_dir
161 143 if unicode_type(wd) != py3compat.getcwd():
162 144 os.chdir(wd)
163 145 self.log.info("Changing to working dir: %s" % wd)
164 146 # This is the working dir by now.
165 147 sys.path.insert(0, '')
166 148
167 149 def reinit_logging(self):
168 150 # Remove old log files
169 151 log_dir = self.profile_dir.log_dir
170 152 if self.clean_logs:
171 153 for f in os.listdir(log_dir):
172 154 if re.match(r'%s-\d+\.(log|err|out)' % self.name, f):
173 155 try:
174 156 os.remove(os.path.join(log_dir, f))
175 157 except (OSError, IOError):
176 158 # probably just conflict from sibling process
177 159 # already removing it
178 160 pass
179 161 if self.log_to_file:
180 162 # Start logging to the new log file
181 163 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
182 164 logfile = os.path.join(log_dir, log_filename)
183 165 open_log_file = open(logfile, 'w')
184 166 else:
185 167 open_log_file = None
186 168 if open_log_file is not None:
187 169 while self.log.handlers:
188 170 self.log.removeHandler(self.log.handlers[0])
189 171 self._log_handler = logging.StreamHandler(open_log_file)
190 172 self.log.addHandler(self._log_handler)
191 173 else:
192 174 self._log_handler = self.log.handlers[0]
193 175 # Add timestamps to log format:
194 176 self._log_formatter = LevelFormatter(self.log_format,
195 177 datefmt=self.log_datefmt)
196 178 self._log_handler.setFormatter(self._log_formatter)
197 179 # do not propagate log messages to root logger
198 180 # ipcluster app will sometimes print duplicate messages during shutdown
199 181 # if this is 1 (default):
200 182 self.log.propagate = False
201 183
202 184 def write_pid_file(self, overwrite=False):
203 185 """Create a .pid file in the pid_dir with my pid.
204 186
205 187 This must be called after pre_construct, which sets `self.pid_dir`.
206 188 This raises :exc:`PIDFileError` if the pid file exists already.
207 189 """
208 190 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
209 191 if os.path.isfile(pid_file):
210 192 pid = self.get_pid_from_file()
211 193 if not overwrite:
212 194 raise PIDFileError(
213 195 'The pid file [%s] already exists. \nThis could mean that this '
214 196 'server is already running with [pid=%s].' % (pid_file, pid)
215 197 )
216 198 with open(pid_file, 'w') as f:
217 199 self.log.info("Creating pid file: %s" % pid_file)
218 200 f.write(repr(os.getpid())+'\n')
219 201
220 202 def remove_pid_file(self):
221 203 """Remove the pid file.
222 204
223 205 This should be called at shutdown by registering a callback with
224 206 :func:`reactor.addSystemEventTrigger`. This needs to return
225 207 ``None``.
226 208 """
227 209 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
228 210 if os.path.isfile(pid_file):
229 211 try:
230 212 self.log.info("Removing pid file: %s" % pid_file)
231 213 os.remove(pid_file)
232 214 except:
233 215 self.log.warn("Error removing the pid file: %s" % pid_file)
234 216
235 217 def get_pid_from_file(self):
236 218 """Get the pid from the pid file.
237 219
238 220 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
239 221 """
240 222 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
241 223 if os.path.isfile(pid_file):
242 224 with open(pid_file, 'r') as f:
243 225 s = f.read().strip()
244 226 try:
245 227 pid = int(s)
246 228 except:
247 229 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
248 230 return pid
249 231 else:
250 232 raise PIDFileError('pid file not found: %s' % pid_file)
251 233
252 234 def check_pid(self, pid):
253 235 try:
254 236 return check_pid(pid)
255 237 except Exception:
256 238 self.log.warn(
257 239 "Could not determine whether pid %i is running. "
258 240 " Making the likely assumption that it is."%pid
259 241 )
260 242 return True
@@ -1,618 +1,596
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 """
4 The ipcluster application.
5
6 Authors:
7
8 * Brian Granger
9 * MinRK
10
11 """
3 """The ipcluster application."""
12 4 from __future__ import print_function
13 5
14 #-----------------------------------------------------------------------------
15 # Copyright (C) 2008-2011 The IPython Development Team
16 #
17 # Distributed under the terms of the BSD License. The full license is in
18 # the file COPYING, distributed as part of this software.
19 #-----------------------------------------------------------------------------
20
21 #-----------------------------------------------------------------------------
22 # Imports
23 #-----------------------------------------------------------------------------
24
25 6 import errno
26 7 import logging
27 8 import os
28 9 import re
29 10 import signal
30 11
31 12 from subprocess import check_call, CalledProcessError, PIPE
32 13 import zmq
33 from zmq.eventloop import ioloop
34 14
35 from IPython.config.application import Application, boolean_flag, catch_config_error
15 from IPython.config.application import catch_config_error
36 16 from IPython.config.loader import Config
37 17 from IPython.core.application import BaseIPythonApplication
38 18 from IPython.core.profiledir import ProfileDir
39 19 from IPython.utils.daemonize import daemonize
40 20 from IPython.utils.importstring import import_item
41 21 from IPython.utils.py3compat import string_types
42 22 from IPython.utils.sysinfo import num_cpus
43 23 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
44 24 DottedObjectName)
45 25
46 26 from IPython.parallel.apps.baseapp import (
47 27 BaseParallelApplication,
48 28 PIDFileError,
49 29 base_flags, base_aliases
50 30 )
51 31
52 32
53 33 #-----------------------------------------------------------------------------
54 34 # Module level variables
55 35 #-----------------------------------------------------------------------------
56 36
57 37
58 38 _description = """Start an IPython cluster for parallel computing.
59 39
60 40 An IPython cluster consists of 1 controller and 1 or more engines.
61 41 This command automates the startup of these processes using a wide range of
62 42 startup methods (SSH, local processes, PBS, mpiexec, SGE, LSF, HTCondor,
63 43 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 44 local host simply do 'ipcluster start --n=4'. For more complex usage
65 45 you will typically do 'ipython profile create mycluster --parallel', then edit
66 46 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
67 47 """
68 48
69 49 _main_examples = """
70 50 ipcluster start --n=4 # start a 4 node cluster on localhost
71 51 ipcluster start -h # show the help string for the start subcmd
72 52
73 53 ipcluster stop -h # show the help string for the stop subcmd
74 54 ipcluster engines -h # show the help string for the engines subcmd
75 55 """
76 56
77 57 _start_examples = """
78 58 ipython profile create mycluster --parallel # create mycluster profile
79 59 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
80 60 """
81 61
82 62 _stop_examples = """
83 63 ipcluster stop --profile=mycluster # stop a running cluster by profile name
84 64 """
85 65
86 66 _engines_examples = """
87 67 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
88 68 """
89 69
90 70
91 71 # Exit codes for ipcluster
92 72
93 73 # This will be the exit code if the ipcluster appears to be running because
94 74 # a .pid file exists
95 75 ALREADY_STARTED = 10
96 76
97 77
98 78 # This will be the exit code if ipcluster stop is run, but there is not .pid
99 79 # file to be found.
100 80 ALREADY_STOPPED = 11
101 81
102 82 # This will be the exit code if ipcluster engines is run, but there is not .pid
103 83 # file to be found.
104 84 NO_CLUSTER = 12
105 85
106 86
107 87 #-----------------------------------------------------------------------------
108 88 # Utilities
109 89 #-----------------------------------------------------------------------------
110 90
111 91 def find_launcher_class(clsname, kind):
112 92 """Return a launcher for a given clsname and kind.
113 93
114 94 Parameters
115 95 ==========
116 96 clsname : str
117 97 The full name of the launcher class, either with or without the
118 98 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF, HTCondor
119 99 WindowsHPC).
120 100 kind : str
121 101 Either 'EngineSet' or 'Controller'.
122 102 """
123 103 if '.' not in clsname:
124 104 # not a module, presume it's the raw name in apps.launcher
125 105 if kind and kind not in clsname:
126 106 # doesn't match necessary full class name, assume it's
127 107 # just 'PBS' or 'MPI' etc prefix:
128 108 clsname = clsname + kind + 'Launcher'
129 109 clsname = 'IPython.parallel.apps.launcher.'+clsname
130 110 klass = import_item(clsname)
131 111 return klass
132 112
133 113 #-----------------------------------------------------------------------------
134 114 # Main application
135 115 #-----------------------------------------------------------------------------
136 116
137 117 start_help = """Start an IPython cluster for parallel computing
138 118
139 119 Start an ipython cluster by its profile name or cluster
140 120 directory. Cluster directories contain configuration, log and
141 121 security related files and are named using the convention
142 122 'profile_<name>' and should be creating using the 'start'
143 123 subcommand of 'ipcluster'. If your cluster directory is in
144 124 the cwd or the ipython directory, you can simply refer to it
145 125 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
146 126 otherwise use the 'profile-dir' option.
147 127 """
148 128 stop_help = """Stop a running IPython cluster
149 129
150 130 Stop a running ipython cluster by its profile name or cluster
151 131 directory. Cluster directories are named using the convention
152 132 'profile_<name>'. If your cluster directory is in
153 133 the cwd or the ipython directory, you can simply refer to it
154 134 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
155 135 use the '--profile-dir' option.
156 136 """
157 137 engines_help = """Start engines connected to an existing IPython cluster
158 138
159 139 Start one or more engines to connect to an existing Cluster
160 140 by profile name or cluster directory.
161 141 Cluster directories contain configuration, log and
162 142 security related files and are named using the convention
163 143 'profile_<name>' and should be creating using the 'start'
164 144 subcommand of 'ipcluster'. If your cluster directory is in
165 145 the cwd or the ipython directory, you can simply refer to it
166 146 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
167 147 otherwise use the 'profile-dir' option.
168 148 """
169 149 stop_aliases = dict(
170 150 signal='IPClusterStop.signal',
171 151 )
172 152 stop_aliases.update(base_aliases)
173 153
174 154 class IPClusterStop(BaseParallelApplication):
175 155 name = u'ipcluster'
176 156 description = stop_help
177 157 examples = _stop_examples
178 158
179 159 signal = Integer(signal.SIGINT, config=True,
180 160 help="signal to use for stopping processes.")
181 161
182 162 aliases = Dict(stop_aliases)
183 163
184 164 def start(self):
185 165 """Start the app for the stop subcommand."""
186 166 try:
187 167 pid = self.get_pid_from_file()
188 168 except PIDFileError:
189 169 self.log.critical(
190 170 'Could not read pid file, cluster is probably not running.'
191 171 )
192 172 # Here I exit with a unusual exit status that other processes
193 173 # can watch for to learn how I existed.
194 174 self.remove_pid_file()
195 175 self.exit(ALREADY_STOPPED)
196 176
197 177 if not self.check_pid(pid):
198 178 self.log.critical(
199 179 'Cluster [pid=%r] is not running.' % pid
200 180 )
201 181 self.remove_pid_file()
202 182 # Here I exit with a unusual exit status that other processes
203 183 # can watch for to learn how I existed.
204 184 self.exit(ALREADY_STOPPED)
205 185
206 186 elif os.name=='posix':
207 187 sig = self.signal
208 188 self.log.info(
209 189 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
210 190 )
211 191 try:
212 192 os.kill(pid, sig)
213 193 except OSError:
214 194 self.log.error("Stopping cluster failed, assuming already dead.",
215 195 exc_info=True)
216 196 self.remove_pid_file()
217 197 elif os.name=='nt':
218 198 try:
219 199 # kill the whole tree
220 200 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
221 201 except (CalledProcessError, OSError):
222 202 self.log.error("Stopping cluster failed, assuming already dead.",
223 203 exc_info=True)
224 204 self.remove_pid_file()
225 205
226 206 engine_aliases = {}
227 207 engine_aliases.update(base_aliases)
228 208 engine_aliases.update(dict(
229 209 n='IPClusterEngines.n',
230 210 engines = 'IPClusterEngines.engine_launcher_class',
231 211 daemonize = 'IPClusterEngines.daemonize',
232 212 ))
233 213 engine_flags = {}
234 214 engine_flags.update(base_flags)
235 215
236 216 engine_flags.update(dict(
237 217 daemonize=(
238 218 {'IPClusterEngines' : {'daemonize' : True}},
239 219 """run the cluster into the background (not available on Windows)""",
240 220 )
241 221 ))
242 222 class IPClusterEngines(BaseParallelApplication):
243 223
244 224 name = u'ipcluster'
245 225 description = engines_help
246 226 examples = _engines_examples
247 227 usage = None
248 228 default_log_level = logging.INFO
249 229 classes = List()
250 230 def _classes_default(self):
251 231 from IPython.parallel.apps import launcher
252 232 launchers = launcher.all_launchers
253 233 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
254 234 return [ProfileDir]+eslaunchers
255 235
256 236 n = Integer(num_cpus(), config=True,
257 237 help="""The number of engines to start. The default is to use one for each
258 238 CPU on your machine""")
259 239
260 240 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
261 241 def _engine_launcher_changed(self, name, old, new):
262 242 if isinstance(new, string_types):
263 243 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
264 244 " use engine_launcher_class" % self.__class__.__name__)
265 245 self.engine_launcher_class = new
266 246 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
267 247 config=True,
268 248 help="""The class for launching a set of Engines. Change this value
269 249 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
270 250 Each launcher class has its own set of configuration options, for making sure
271 251 it will work in your environment.
272 252
273 253 You can also write your own launcher, and specify it's absolute import path,
274 254 as in 'mymodule.launcher.FTLEnginesLauncher`.
275 255
276 256 IPython's bundled examples include:
277 257
278 258 Local : start engines locally as subprocesses [default]
279 259 MPI : use mpiexec to launch engines in an MPI environment
280 260 PBS : use PBS (qsub) to submit engines to a batch queue
281 261 SGE : use SGE (qsub) to submit engines to a batch queue
282 262 LSF : use LSF (bsub) to submit engines to a batch queue
283 263 SSH : use SSH to start the controller
284 264 Note that SSH does *not* move the connection files
285 265 around, so you will likely have to do this manually
286 266 unless the machines are on a shared file system.
287 267 HTCondor : use HTCondor to submit engines to a batch queue
288 268 WindowsHPC : use Windows HPC
289 269
290 270 If you are using one of IPython's builtin launchers, you can specify just the
291 271 prefix, e.g:
292 272
293 273 c.IPClusterEngines.engine_launcher_class = 'SSH'
294 274
295 275 or:
296 276
297 277 ipcluster start --engines=MPI
298 278
299 279 """
300 280 )
301 281 daemonize = Bool(False, config=True,
302 282 help="""Daemonize the ipcluster program. This implies --log-to-file.
303 283 Not available on Windows.
304 284 """)
305 285
306 286 def _daemonize_changed(self, name, old, new):
307 287 if new:
308 288 self.log_to_file = True
309 289
310 290 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
311 291 _stopping = False
312 292
313 293 aliases = Dict(engine_aliases)
314 294 flags = Dict(engine_flags)
315 295
316 296 @catch_config_error
317 297 def initialize(self, argv=None):
318 298 super(IPClusterEngines, self).initialize(argv)
319 299 self.init_signal()
320 300 self.init_launchers()
321 301
322 302 def init_launchers(self):
323 303 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
324 304
325 305 def init_signal(self):
326 306 # Setup signals
327 307 signal.signal(signal.SIGINT, self.sigint_handler)
328 308
329 309 def build_launcher(self, clsname, kind=None):
330 310 """import and instantiate a Launcher based on importstring"""
331 311 try:
332 312 klass = find_launcher_class(clsname, kind)
333 313 except (ImportError, KeyError):
334 314 self.log.fatal("Could not import launcher class: %r"%clsname)
335 315 self.exit(1)
336 316
337 317 launcher = klass(
338 318 work_dir=u'.', parent=self, log=self.log,
339 319 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
340 320 )
341 321 return launcher
342 322
343 323 def engines_started_ok(self):
344 324 self.log.info("Engines appear to have started successfully")
345 325 self.early_shutdown = 0
346 326
347 327 def start_engines(self):
348 328 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
349 329 n = getattr(self.engine_launcher, 'engine_count', self.n)
350 330 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
351 331 try:
352 332 self.engine_launcher.start(self.n)
353 333 except:
354 334 self.log.exception("Engine start failed")
355 335 raise
356 336 self.engine_launcher.on_stop(self.engines_stopped_early)
357 337 if self.early_shutdown:
358 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
338 self.loop.add_timeout(self.loop.time() + self.early_shutdown, self.engines_started_ok)
359 339
360 340 def engines_stopped_early(self, r):
361 341 if self.early_shutdown and not self._stopping:
362 342 self.log.error("""
363 343 Engines shutdown early, they probably failed to connect.
364 344
365 345 Check the engine log files for output.
366 346
367 347 If your controller and engines are not on the same machine, you probably
368 348 have to instruct the controller to listen on an interface other than localhost.
369 349
370 350 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
371 351
372 352 Be sure to read our security docs before instructing your controller to listen on
373 353 a public interface.
374 354 """)
375 355 self.stop_launchers()
376 356
377 357 return self.engines_stopped(r)
378 358
379 359 def engines_stopped(self, r):
380 360 return self.loop.stop()
381 361
382 362 def stop_engines(self):
383 363 if self.engine_launcher.running:
384 364 self.log.info("Stopping Engines...")
385 365 d = self.engine_launcher.stop()
386 366 return d
387 367 else:
388 368 return None
389 369
390 370 def stop_launchers(self, r=None):
391 371 if not self._stopping:
392 372 self._stopping = True
393 373 self.log.error("IPython cluster: stopping")
394 374 self.stop_engines()
395 375 # Wait a few seconds to let things shut down.
396 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
397 dc.start()
376 self.loop.add_timeout(self.loop.time() + 3, self.loop.stop)
398 377
399 378 def sigint_handler(self, signum, frame):
400 379 self.log.debug("SIGINT received, stopping launchers...")
401 380 self.stop_launchers()
402 381
403 382 def start_logging(self):
404 383 # Remove old log files of the controller and engine
405 384 if self.clean_logs:
406 385 log_dir = self.profile_dir.log_dir
407 386 for f in os.listdir(log_dir):
408 387 if re.match(r'ip(engine|controller)-.+\.(log|err|out)',f):
409 388 os.remove(os.path.join(log_dir, f))
410 389
411 390 def start(self):
412 391 """Start the app for the engines subcommand."""
413 392 self.log.info("IPython cluster: started")
414 393 # First see if the cluster is already running
415 394
416 395 # Now log and daemonize
417 396 self.log.info(
418 397 'Starting engines with [daemon=%r]' % self.daemonize
419 398 )
420 399 # TODO: Get daemonize working on Windows or as a Windows Server.
421 400 if self.daemonize:
422 401 if os.name=='posix':
423 402 daemonize()
424 403
425 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
426 dc.start()
404 self.loop.add_callback(self.start_engines)
427 405 # Now write the new pid file AFTER our new forked pid is active.
428 406 # self.write_pid_file()
429 407 try:
430 408 self.loop.start()
431 409 except KeyboardInterrupt:
432 410 pass
433 411 except zmq.ZMQError as e:
434 412 if e.errno == errno.EINTR:
435 413 pass
436 414 else:
437 415 raise
438 416
439 417 start_aliases = {}
440 418 start_aliases.update(engine_aliases)
441 419 start_aliases.update(dict(
442 420 delay='IPClusterStart.delay',
443 421 controller = 'IPClusterStart.controller_launcher_class',
444 422 ))
445 423 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
446 424
447 425 class IPClusterStart(IPClusterEngines):
448 426
449 427 name = u'ipcluster'
450 428 description = start_help
451 429 examples = _start_examples
452 430 default_log_level = logging.INFO
453 431 auto_create = Bool(True, config=True,
454 432 help="whether to create the profile_dir if it doesn't exist")
455 433 classes = List()
456 434 def _classes_default(self,):
457 435 from IPython.parallel.apps import launcher
458 436 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
459 437
460 438 clean_logs = Bool(True, config=True,
461 439 help="whether to cleanup old logs before starting")
462 440
463 441 delay = CFloat(1., config=True,
464 442 help="delay (in s) between starting the controller and the engines")
465 443
466 444 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
467 445 def _controller_launcher_changed(self, name, old, new):
468 446 if isinstance(new, string_types):
469 447 # old 0.11-style config
470 448 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
471 449 " use controller_launcher_class" % self.__class__.__name__)
472 450 self.controller_launcher_class = new
473 451 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
474 452 config=True,
475 453 help="""The class for launching a Controller. Change this value if you want
476 454 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
477 455
478 456 Each launcher class has its own set of configuration options, for making sure
479 457 it will work in your environment.
480 458
481 459 Note that using a batch launcher for the controller *does not* put it
482 460 in the same batch job as the engines, so they will still start separately.
483 461
484 462 IPython's bundled examples include:
485 463
486 464 Local : start engines locally as subprocesses
487 465 MPI : use mpiexec to launch the controller in an MPI universe
488 466 PBS : use PBS (qsub) to submit the controller to a batch queue
489 467 SGE : use SGE (qsub) to submit the controller to a batch queue
490 468 LSF : use LSF (bsub) to submit the controller to a batch queue
491 469 HTCondor : use HTCondor to submit the controller to a batch queue
492 470 SSH : use SSH to start the controller
493 471 WindowsHPC : use Windows HPC
494 472
495 473 If you are using one of IPython's builtin launchers, you can specify just the
496 474 prefix, e.g:
497 475
498 476 c.IPClusterStart.controller_launcher_class = 'SSH'
499 477
500 478 or:
501 479
502 480 ipcluster start --controller=MPI
503 481
504 482 """
505 483 )
506 484 reset = Bool(False, config=True,
507 485 help="Whether to reset config files as part of '--create'."
508 486 )
509 487
510 488 # flags = Dict(flags)
511 489 aliases = Dict(start_aliases)
512 490
513 491 def init_launchers(self):
514 492 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
515 493 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
516 494
517 495 def engines_stopped(self, r):
518 496 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
519 497 pass
520 498
521 499 def start_controller(self):
522 500 self.log.info("Starting Controller with %s", self.controller_launcher_class)
523 501 self.controller_launcher.on_stop(self.stop_launchers)
524 502 try:
525 503 self.controller_launcher.start()
526 504 except:
527 505 self.log.exception("Controller start failed")
528 506 raise
529 507
530 508 def stop_controller(self):
531 509 # self.log.info("In stop_controller")
532 510 if self.controller_launcher and self.controller_launcher.running:
533 511 return self.controller_launcher.stop()
534 512
535 513 def stop_launchers(self, r=None):
536 514 if not self._stopping:
537 515 self.stop_controller()
538 516 super(IPClusterStart, self).stop_launchers()
539 517
540 518 def start(self):
541 519 """Start the app for the start subcommand."""
542 520 # First see if the cluster is already running
543 521 try:
544 522 pid = self.get_pid_from_file()
545 523 except PIDFileError:
546 524 pass
547 525 else:
548 526 if self.check_pid(pid):
549 527 self.log.critical(
550 528 'Cluster is already running with [pid=%s]. '
551 529 'use "ipcluster stop" to stop the cluster.' % pid
552 530 )
553 531 # Here I exit with a unusual exit status that other processes
554 532 # can watch for to learn how I existed.
555 533 self.exit(ALREADY_STARTED)
556 534 else:
557 535 self.remove_pid_file()
558 536
559 537
560 538 # Now log and daemonize
561 539 self.log.info(
562 540 'Starting ipcluster with [daemon=%r]' % self.daemonize
563 541 )
564 542 # TODO: Get daemonize working on Windows or as a Windows Server.
565 543 if self.daemonize:
566 544 if os.name=='posix':
567 545 daemonize()
568 546
569 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
570 dc.start()
571 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
572 dc.start()
547 def start():
548 self.start_controller()
549 self.loop.add_timeout(self.loop.time() + self.delay, self.start_engines)
550 self.loop.add_callback(start)
573 551 # Now write the new pid file AFTER our new forked pid is active.
574 552 self.write_pid_file()
575 553 try:
576 554 self.loop.start()
577 555 except KeyboardInterrupt:
578 556 pass
579 557 except zmq.ZMQError as e:
580 558 if e.errno == errno.EINTR:
581 559 pass
582 560 else:
583 561 raise
584 562 finally:
585 563 self.remove_pid_file()
586 564
587 565 base='IPython.parallel.apps.ipclusterapp.IPCluster'
588 566
589 567 class IPClusterApp(BaseIPythonApplication):
590 568 name = u'ipcluster'
591 569 description = _description
592 570 examples = _main_examples
593 571
594 572 subcommands = {
595 573 'start' : (base+'Start', start_help),
596 574 'stop' : (base+'Stop', stop_help),
597 575 'engines' : (base+'Engines', engines_help),
598 576 }
599 577
600 578 # no aliases or flags for parent App
601 579 aliases = Dict()
602 580 flags = Dict()
603 581
604 582 def start(self):
605 583 if self.subapp is None:
606 584 print("No subcommand specified. Must specify one of: %s"%(self.subcommands.keys()))
607 585 print()
608 586 self.print_description()
609 587 self.print_subcommands()
610 588 self.exit(1)
611 589 else:
612 590 return self.subapp.start()
613 591
614 592 launch_new_instance = IPClusterApp.launch_instance
615 593
616 594 if __name__ == '__main__':
617 595 launch_new_instance()
618 596
@@ -1,1447 +1,1446
1 1 # encoding: utf-8
2 2 """Facilities for launching IPython processes asynchronously."""
3 3
4 4 # Copyright (c) IPython Development Team.
5 5 # Distributed under the terms of the Modified BSD License.
6 6
7 7 import copy
8 8 import logging
9 9 import os
10 10 import pipes
11 11 import stat
12 12 import sys
13 13 import time
14 14
15 15 # signal imports, handling various platforms, versions
16 16
17 17 from signal import SIGINT, SIGTERM
18 18 try:
19 19 from signal import SIGKILL
20 20 except ImportError:
21 21 # Windows
22 22 SIGKILL=SIGTERM
23 23
24 24 try:
25 25 # Windows >= 2.7, 3.2
26 26 from signal import CTRL_C_EVENT as SIGINT
27 27 except ImportError:
28 28 pass
29 29
30 30 from subprocess import Popen, PIPE, STDOUT
31 31 try:
32 32 from subprocess import check_output
33 33 except ImportError:
34 34 # pre-2.7, define check_output with Popen
35 35 def check_output(*args, **kwargs):
36 36 kwargs.update(dict(stdout=PIPE))
37 37 p = Popen(*args, **kwargs)
38 38 out,err = p.communicate()
39 39 return out
40 40
41 41 from zmq.eventloop import ioloop
42 42
43 43 from IPython.config.application import Application
44 44 from IPython.config.configurable import LoggingConfigurable
45 45 from IPython.utils.text import EvalFormatter
46 46 from IPython.utils.traitlets import (
47 47 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
48 48 )
49 49 from IPython.utils.encoding import DEFAULT_ENCODING
50 50 from IPython.utils.path import get_home_dir, ensure_dir_exists
51 51 from IPython.utils.process import find_cmd, FindCmdError
52 52 from IPython.utils.py3compat import iteritems, itervalues
53 53
54 54 from .win32support import forward_read_events
55 55
56 56 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
57 57
58 58 WINDOWS = os.name == 'nt'
59 59
60 60 #-----------------------------------------------------------------------------
61 61 # Paths to the kernel apps
62 62 #-----------------------------------------------------------------------------
63 63
64 64 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
65 65
66 66 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
67 67
68 68 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
69 69
70 70 if WINDOWS and sys.version_info < (3,):
71 71 # `python -m package` doesn't work on Windows Python 2,
72 72 # but `python -m module` does.
73 73 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipengineapp"]
74 74 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipcontrollerapp"]
75 75
76 76 #-----------------------------------------------------------------------------
77 77 # Base launchers and errors
78 78 #-----------------------------------------------------------------------------
79 79
80 80 class LauncherError(Exception):
81 81 pass
82 82
83 83
84 84 class ProcessStateError(LauncherError):
85 85 pass
86 86
87 87
88 88 class UnknownStatus(LauncherError):
89 89 pass
90 90
91 91
92 92 class BaseLauncher(LoggingConfigurable):
93 93 """An asbtraction for starting, stopping and signaling a process."""
94 94
95 95 # In all of the launchers, the work_dir is where child processes will be
96 96 # run. This will usually be the profile_dir, but may not be. any work_dir
97 97 # passed into the __init__ method will override the config value.
98 98 # This should not be used to set the work_dir for the actual engine
99 99 # and controller. Instead, use their own config files or the
100 100 # controller_args, engine_args attributes of the launchers to add
101 101 # the work_dir option.
102 102 work_dir = Unicode(u'.')
103 103 loop = Instance('zmq.eventloop.ioloop.IOLoop')
104 104
105 105 start_data = Any()
106 106 stop_data = Any()
107 107
108 108 def _loop_default(self):
109 109 return ioloop.IOLoop.instance()
110 110
111 111 def __init__(self, work_dir=u'.', config=None, **kwargs):
112 112 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
113 113 self.state = 'before' # can be before, running, after
114 114 self.stop_callbacks = []
115 115 self.start_data = None
116 116 self.stop_data = None
117 117
118 118 @property
119 119 def args(self):
120 120 """A list of cmd and args that will be used to start the process.
121 121
122 122 This is what is passed to :func:`spawnProcess` and the first element
123 123 will be the process name.
124 124 """
125 125 return self.find_args()
126 126
127 127 def find_args(self):
128 128 """The ``.args`` property calls this to find the args list.
129 129
130 130 Subcommand should implement this to construct the cmd and args.
131 131 """
132 132 raise NotImplementedError('find_args must be implemented in a subclass')
133 133
134 134 @property
135 135 def arg_str(self):
136 136 """The string form of the program arguments."""
137 137 return ' '.join(self.args)
138 138
139 139 @property
140 140 def running(self):
141 141 """Am I running."""
142 142 if self.state == 'running':
143 143 return True
144 144 else:
145 145 return False
146 146
147 147 def start(self):
148 148 """Start the process."""
149 149 raise NotImplementedError('start must be implemented in a subclass')
150 150
151 151 def stop(self):
152 152 """Stop the process and notify observers of stopping.
153 153
154 154 This method will return None immediately.
155 155 To observe the actual process stopping, see :meth:`on_stop`.
156 156 """
157 157 raise NotImplementedError('stop must be implemented in a subclass')
158 158
159 159 def on_stop(self, f):
160 160 """Register a callback to be called with this Launcher's stop_data
161 161 when the process actually finishes.
162 162 """
163 163 if self.state=='after':
164 164 return f(self.stop_data)
165 165 else:
166 166 self.stop_callbacks.append(f)
167 167
168 168 def notify_start(self, data):
169 169 """Call this to trigger startup actions.
170 170
171 171 This logs the process startup and sets the state to 'running'. It is
172 172 a pass-through so it can be used as a callback.
173 173 """
174 174
175 175 self.log.debug('Process %r started: %r', self.args[0], data)
176 176 self.start_data = data
177 177 self.state = 'running'
178 178 return data
179 179
180 180 def notify_stop(self, data):
181 181 """Call this to trigger process stop actions.
182 182
183 183 This logs the process stopping and sets the state to 'after'. Call
184 184 this to trigger callbacks registered via :meth:`on_stop`."""
185 185
186 186 self.log.debug('Process %r stopped: %r', self.args[0], data)
187 187 self.stop_data = data
188 188 self.state = 'after'
189 189 for i in range(len(self.stop_callbacks)):
190 190 d = self.stop_callbacks.pop()
191 191 d(data)
192 192 return data
193 193
194 194 def signal(self, sig):
195 195 """Signal the process.
196 196
197 197 Parameters
198 198 ----------
199 199 sig : str or int
200 200 'KILL', 'INT', etc., or any signal number
201 201 """
202 202 raise NotImplementedError('signal must be implemented in a subclass')
203 203
204 204 class ClusterAppMixin(HasTraits):
205 205 """MixIn for cluster args as traits"""
206 206 profile_dir=Unicode('')
207 207 cluster_id=Unicode('')
208 208
209 209 @property
210 210 def cluster_args(self):
211 211 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
212 212
213 213 class ControllerMixin(ClusterAppMixin):
214 214 controller_cmd = List(ipcontroller_cmd_argv, config=True,
215 215 help="""Popen command to launch ipcontroller.""")
216 216 # Command line arguments to ipcontroller.
217 217 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
218 218 help="""command-line args to pass to ipcontroller""")
219 219
220 220 class EngineMixin(ClusterAppMixin):
221 221 engine_cmd = List(ipengine_cmd_argv, config=True,
222 222 help="""command to launch the Engine.""")
223 223 # Command line arguments for ipengine.
224 224 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
225 225 help="command-line arguments to pass to ipengine"
226 226 )
227 227
228 228
229 229 #-----------------------------------------------------------------------------
230 230 # Local process launchers
231 231 #-----------------------------------------------------------------------------
232 232
233 233
234 234 class LocalProcessLauncher(BaseLauncher):
235 235 """Start and stop an external process in an asynchronous manner.
236 236
237 237 This will launch the external process with a working directory of
238 238 ``self.work_dir``.
239 239 """
240 240
241 241 # This is used to to construct self.args, which is passed to
242 242 # spawnProcess.
243 243 cmd_and_args = List([])
244 244 poll_frequency = Integer(100) # in ms
245 245
246 246 def __init__(self, work_dir=u'.', config=None, **kwargs):
247 247 super(LocalProcessLauncher, self).__init__(
248 248 work_dir=work_dir, config=config, **kwargs
249 249 )
250 250 self.process = None
251 251 self.poller = None
252 252
253 253 def find_args(self):
254 254 return self.cmd_and_args
255 255
256 256 def start(self):
257 257 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
258 258 if self.state == 'before':
259 259 self.process = Popen(self.args,
260 260 stdout=PIPE,stderr=PIPE,stdin=PIPE,
261 261 env=os.environ,
262 262 cwd=self.work_dir
263 263 )
264 264 if WINDOWS:
265 265 self.stdout = forward_read_events(self.process.stdout)
266 266 self.stderr = forward_read_events(self.process.stderr)
267 267 else:
268 268 self.stdout = self.process.stdout.fileno()
269 269 self.stderr = self.process.stderr.fileno()
270 270 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
271 271 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
272 272 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
273 273 self.poller.start()
274 274 self.notify_start(self.process.pid)
275 275 else:
276 276 s = 'The process was already started and has state: %r' % self.state
277 277 raise ProcessStateError(s)
278 278
279 279 def stop(self):
280 280 return self.interrupt_then_kill()
281 281
282 282 def signal(self, sig):
283 283 if self.state == 'running':
284 284 if WINDOWS and sig != SIGINT:
285 285 # use Windows tree-kill for better child cleanup
286 286 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
287 287 else:
288 288 self.process.send_signal(sig)
289 289
290 290 def interrupt_then_kill(self, delay=2.0):
291 291 """Send INT, wait a delay and then send KILL."""
292 292 try:
293 293 self.signal(SIGINT)
294 294 except Exception:
295 295 self.log.debug("interrupt failed")
296 296 pass
297 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
298 self.killer.start()
297 self.killer = self.loop.add_timeout(self.loop.time() + delay, lambda : self.signal(SIGKILL))
299 298
300 299 # callbacks, etc:
301 300
302 301 def handle_stdout(self, fd, events):
303 302 if WINDOWS:
304 303 line = self.stdout.recv()
305 304 else:
306 305 line = self.process.stdout.readline()
307 306 # a stopped process will be readable but return empty strings
308 307 if line:
309 308 self.log.debug(line[:-1])
310 309 else:
311 310 self.poll()
312 311
313 312 def handle_stderr(self, fd, events):
314 313 if WINDOWS:
315 314 line = self.stderr.recv()
316 315 else:
317 316 line = self.process.stderr.readline()
318 317 # a stopped process will be readable but return empty strings
319 318 if line:
320 319 self.log.debug(line[:-1])
321 320 else:
322 321 self.poll()
323 322
324 323 def poll(self):
325 324 status = self.process.poll()
326 325 if status is not None:
327 326 self.poller.stop()
328 327 self.loop.remove_handler(self.stdout)
329 328 self.loop.remove_handler(self.stderr)
330 329 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
331 330 return status
332 331
333 332 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
334 333 """Launch a controller as a regular external process."""
335 334
336 335 def find_args(self):
337 336 return self.controller_cmd + self.cluster_args + self.controller_args
338 337
339 338 def start(self):
340 339 """Start the controller by profile_dir."""
341 340 return super(LocalControllerLauncher, self).start()
342 341
343 342
344 343 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
345 344 """Launch a single engine as a regular externall process."""
346 345
347 346 def find_args(self):
348 347 return self.engine_cmd + self.cluster_args + self.engine_args
349 348
350 349
351 350 class LocalEngineSetLauncher(LocalEngineLauncher):
352 351 """Launch a set of engines as regular external processes."""
353 352
354 353 delay = CFloat(0.1, config=True,
355 354 help="""delay (in seconds) between starting each engine after the first.
356 355 This can help force the engines to get their ids in order, or limit
357 356 process flood when starting many engines."""
358 357 )
359 358
360 359 # launcher class
361 360 launcher_class = LocalEngineLauncher
362 361
363 362 launchers = Dict()
364 363 stop_data = Dict()
365 364
366 365 def __init__(self, work_dir=u'.', config=None, **kwargs):
367 366 super(LocalEngineSetLauncher, self).__init__(
368 367 work_dir=work_dir, config=config, **kwargs
369 368 )
370 369 self.stop_data = {}
371 370
372 371 def start(self, n):
373 372 """Start n engines by profile or profile_dir."""
374 373 dlist = []
375 374 for i in range(n):
376 375 if i > 0:
377 376 time.sleep(self.delay)
378 377 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
379 378 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
380 379 )
381 380
382 381 # Copy the engine args over to each engine launcher.
383 382 el.engine_cmd = copy.deepcopy(self.engine_cmd)
384 383 el.engine_args = copy.deepcopy(self.engine_args)
385 384 el.on_stop(self._notice_engine_stopped)
386 385 d = el.start()
387 386 self.launchers[i] = el
388 387 dlist.append(d)
389 388 self.notify_start(dlist)
390 389 return dlist
391 390
392 391 def find_args(self):
393 392 return ['engine set']
394 393
395 394 def signal(self, sig):
396 395 dlist = []
397 396 for el in itervalues(self.launchers):
398 397 d = el.signal(sig)
399 398 dlist.append(d)
400 399 return dlist
401 400
402 401 def interrupt_then_kill(self, delay=1.0):
403 402 dlist = []
404 403 for el in itervalues(self.launchers):
405 404 d = el.interrupt_then_kill(delay)
406 405 dlist.append(d)
407 406 return dlist
408 407
409 408 def stop(self):
410 409 return self.interrupt_then_kill()
411 410
412 411 def _notice_engine_stopped(self, data):
413 412 pid = data['pid']
414 413 for idx,el in iteritems(self.launchers):
415 414 if el.process.pid == pid:
416 415 break
417 416 self.launchers.pop(idx)
418 417 self.stop_data[idx] = data
419 418 if not self.launchers:
420 419 self.notify_stop(self.stop_data)
421 420
422 421
423 422 #-----------------------------------------------------------------------------
424 423 # MPI launchers
425 424 #-----------------------------------------------------------------------------
426 425
427 426
428 427 class MPILauncher(LocalProcessLauncher):
429 428 """Launch an external process using mpiexec."""
430 429
431 430 mpi_cmd = List(['mpiexec'], config=True,
432 431 help="The mpiexec command to use in starting the process."
433 432 )
434 433 mpi_args = List([], config=True,
435 434 help="The command line arguments to pass to mpiexec."
436 435 )
437 436 program = List(['date'],
438 437 help="The program to start via mpiexec.")
439 438 program_args = List([],
440 439 help="The command line argument to the program."
441 440 )
442 441 n = Integer(1)
443 442
444 443 def __init__(self, *args, **kwargs):
445 444 # deprecation for old MPIExec names:
446 445 config = kwargs.get('config', {})
447 446 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
448 447 deprecated = config.get(oldname)
449 448 if deprecated:
450 449 newname = oldname.replace('MPIExec', 'MPI')
451 450 config[newname].update(deprecated)
452 451 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
453 452
454 453 super(MPILauncher, self).__init__(*args, **kwargs)
455 454
456 455 def find_args(self):
457 456 """Build self.args using all the fields."""
458 457 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
459 458 self.program + self.program_args
460 459
461 460 def start(self, n):
462 461 """Start n instances of the program using mpiexec."""
463 462 self.n = n
464 463 return super(MPILauncher, self).start()
465 464
466 465
467 466 class MPIControllerLauncher(MPILauncher, ControllerMixin):
468 467 """Launch a controller using mpiexec."""
469 468
470 469 # alias back to *non-configurable* program[_args] for use in find_args()
471 470 # this way all Controller/EngineSetLaunchers have the same form, rather
472 471 # than *some* having `program_args` and others `controller_args`
473 472 @property
474 473 def program(self):
475 474 return self.controller_cmd
476 475
477 476 @property
478 477 def program_args(self):
479 478 return self.cluster_args + self.controller_args
480 479
481 480 def start(self):
482 481 """Start the controller by profile_dir."""
483 482 return super(MPIControllerLauncher, self).start(1)
484 483
485 484
486 485 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
487 486 """Launch engines using mpiexec"""
488 487
489 488 # alias back to *non-configurable* program[_args] for use in find_args()
490 489 # this way all Controller/EngineSetLaunchers have the same form, rather
491 490 # than *some* having `program_args` and others `controller_args`
492 491 @property
493 492 def program(self):
494 493 return self.engine_cmd
495 494
496 495 @property
497 496 def program_args(self):
498 497 return self.cluster_args + self.engine_args
499 498
500 499 def start(self, n):
501 500 """Start n engines by profile or profile_dir."""
502 501 self.n = n
503 502 return super(MPIEngineSetLauncher, self).start(n)
504 503
505 504 # deprecated MPIExec names
506 505 class DeprecatedMPILauncher(object):
507 506 def warn(self):
508 507 oldname = self.__class__.__name__
509 508 newname = oldname.replace('MPIExec', 'MPI')
510 509 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
511 510
512 511 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
513 512 """Deprecated, use MPILauncher"""
514 513 def __init__(self, *args, **kwargs):
515 514 super(MPIExecLauncher, self).__init__(*args, **kwargs)
516 515 self.warn()
517 516
518 517 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
519 518 """Deprecated, use MPIControllerLauncher"""
520 519 def __init__(self, *args, **kwargs):
521 520 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
522 521 self.warn()
523 522
524 523 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
525 524 """Deprecated, use MPIEngineSetLauncher"""
526 525 def __init__(self, *args, **kwargs):
527 526 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
528 527 self.warn()
529 528
530 529
531 530 #-----------------------------------------------------------------------------
532 531 # SSH launchers
533 532 #-----------------------------------------------------------------------------
534 533
535 534 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
536 535
537 536 class SSHLauncher(LocalProcessLauncher):
538 537 """A minimal launcher for ssh.
539 538
540 539 To be useful this will probably have to be extended to use the ``sshx``
541 540 idea for environment variables. There could be other things this needs
542 541 as well.
543 542 """
544 543
545 544 ssh_cmd = List(['ssh'], config=True,
546 545 help="command for starting ssh")
547 546 ssh_args = List(['-tt'], config=True,
548 547 help="args to pass to ssh")
549 548 scp_cmd = List(['scp'], config=True,
550 549 help="command for sending files")
551 550 program = List(['date'],
552 551 help="Program to launch via ssh")
553 552 program_args = List([],
554 553 help="args to pass to remote program")
555 554 hostname = Unicode('', config=True,
556 555 help="hostname on which to launch the program")
557 556 user = Unicode('', config=True,
558 557 help="username for ssh")
559 558 location = Unicode('', config=True,
560 559 help="user@hostname location for ssh in one setting")
561 560 to_fetch = List([], config=True,
562 561 help="List of (remote, local) files to fetch after starting")
563 562 to_send = List([], config=True,
564 563 help="List of (local, remote) files to send before starting")
565 564
566 565 def _hostname_changed(self, name, old, new):
567 566 if self.user:
568 567 self.location = u'%s@%s' % (self.user, new)
569 568 else:
570 569 self.location = new
571 570
572 571 def _user_changed(self, name, old, new):
573 572 self.location = u'%s@%s' % (new, self.hostname)
574 573
575 574 def find_args(self):
576 575 return self.ssh_cmd + self.ssh_args + [self.location] + \
577 576 list(map(pipes.quote, self.program + self.program_args))
578 577
579 578 def _send_file(self, local, remote):
580 579 """send a single file"""
581 580 full_remote = "%s:%s" % (self.location, remote)
582 581 for i in range(10):
583 582 if not os.path.exists(local):
584 583 self.log.debug("waiting for %s" % local)
585 584 time.sleep(1)
586 585 else:
587 586 break
588 587 remote_dir = os.path.dirname(remote)
589 588 self.log.info("ensuring remote %s:%s/ exists", self.location, remote_dir)
590 589 check_output(self.ssh_cmd + self.ssh_args + \
591 590 [self.location, 'mkdir', '-p', '--', remote_dir]
592 591 )
593 592 self.log.info("sending %s to %s", local, full_remote)
594 593 check_output(self.scp_cmd + [local, full_remote])
595 594
596 595 def send_files(self):
597 596 """send our files (called before start)"""
598 597 if not self.to_send:
599 598 return
600 599 for local_file, remote_file in self.to_send:
601 600 self._send_file(local_file, remote_file)
602 601
603 602 def _fetch_file(self, remote, local):
604 603 """fetch a single file"""
605 604 full_remote = "%s:%s" % (self.location, remote)
606 605 self.log.info("fetching %s from %s", local, full_remote)
607 606 for i in range(10):
608 607 # wait up to 10s for remote file to exist
609 608 check = check_output(self.ssh_cmd + self.ssh_args + \
610 609 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
611 610 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
612 611 if check == u'no':
613 612 time.sleep(1)
614 613 elif check == u'yes':
615 614 break
616 615 local_dir = os.path.dirname(local)
617 616 ensure_dir_exists(local_dir, 775)
618 617 check_output(self.scp_cmd + [full_remote, local])
619 618
620 619 def fetch_files(self):
621 620 """fetch remote files (called after start)"""
622 621 if not self.to_fetch:
623 622 return
624 623 for remote_file, local_file in self.to_fetch:
625 624 self._fetch_file(remote_file, local_file)
626 625
627 626 def start(self, hostname=None, user=None):
628 627 if hostname is not None:
629 628 self.hostname = hostname
630 629 if user is not None:
631 630 self.user = user
632 631
633 632 self.send_files()
634 633 super(SSHLauncher, self).start()
635 634 self.fetch_files()
636 635
637 636 def signal(self, sig):
638 637 if self.state == 'running':
639 638 # send escaped ssh connection-closer
640 639 self.process.stdin.write('~.')
641 640 self.process.stdin.flush()
642 641
643 642 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
644 643
645 644 remote_profile_dir = Unicode('', config=True,
646 645 help="""The remote profile_dir to use.
647 646
648 647 If not specified, use calling profile, stripping out possible leading homedir.
649 648 """)
650 649
651 650 def _profile_dir_changed(self, name, old, new):
652 651 if not self.remote_profile_dir:
653 652 # trigger remote_profile_dir_default logic again,
654 653 # in case it was already triggered before profile_dir was set
655 654 self.remote_profile_dir = self._strip_home(new)
656 655
657 656 @staticmethod
658 657 def _strip_home(path):
659 658 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
660 659 home = get_home_dir()
661 660 if not home.endswith('/'):
662 661 home = home+'/'
663 662
664 663 if path.startswith(home):
665 664 return path[len(home):]
666 665 else:
667 666 return path
668 667
669 668 def _remote_profile_dir_default(self):
670 669 return self._strip_home(self.profile_dir)
671 670
672 671 def _cluster_id_changed(self, name, old, new):
673 672 if new:
674 673 raise ValueError("cluster id not supported by SSH launchers")
675 674
676 675 @property
677 676 def cluster_args(self):
678 677 return ['--profile-dir', self.remote_profile_dir]
679 678
680 679 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
681 680
682 681 # alias back to *non-configurable* program[_args] for use in find_args()
683 682 # this way all Controller/EngineSetLaunchers have the same form, rather
684 683 # than *some* having `program_args` and others `controller_args`
685 684
686 685 def _controller_cmd_default(self):
687 686 return ['ipcontroller']
688 687
689 688 @property
690 689 def program(self):
691 690 return self.controller_cmd
692 691
693 692 @property
694 693 def program_args(self):
695 694 return self.cluster_args + self.controller_args
696 695
697 696 def _to_fetch_default(self):
698 697 return [
699 698 (os.path.join(self.remote_profile_dir, 'security', cf),
700 699 os.path.join(self.profile_dir, 'security', cf),)
701 700 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
702 701 ]
703 702
704 703 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
705 704
706 705 # alias back to *non-configurable* program[_args] for use in find_args()
707 706 # this way all Controller/EngineSetLaunchers have the same form, rather
708 707 # than *some* having `program_args` and others `controller_args`
709 708
710 709 def _engine_cmd_default(self):
711 710 return ['ipengine']
712 711
713 712 @property
714 713 def program(self):
715 714 return self.engine_cmd
716 715
717 716 @property
718 717 def program_args(self):
719 718 return self.cluster_args + self.engine_args
720 719
721 720 def _to_send_default(self):
722 721 return [
723 722 (os.path.join(self.profile_dir, 'security', cf),
724 723 os.path.join(self.remote_profile_dir, 'security', cf))
725 724 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
726 725 ]
727 726
728 727
729 728 class SSHEngineSetLauncher(LocalEngineSetLauncher):
730 729 launcher_class = SSHEngineLauncher
731 730 engines = Dict(config=True,
732 731 help="""dict of engines to launch. This is a dict by hostname of ints,
733 732 corresponding to the number of engines to start on that host.""")
734 733
735 734 def _engine_cmd_default(self):
736 735 return ['ipengine']
737 736
738 737 @property
739 738 def engine_count(self):
740 739 """determine engine count from `engines` dict"""
741 740 count = 0
742 741 for n in itervalues(self.engines):
743 742 if isinstance(n, (tuple,list)):
744 743 n,args = n
745 744 count += n
746 745 return count
747 746
748 747 def start(self, n):
749 748 """Start engines by profile or profile_dir.
750 749 `n` is ignored, and the `engines` config property is used instead.
751 750 """
752 751
753 752 dlist = []
754 753 for host, n in iteritems(self.engines):
755 754 if isinstance(n, (tuple, list)):
756 755 n, args = n
757 756 else:
758 757 args = copy.deepcopy(self.engine_args)
759 758
760 759 if '@' in host:
761 760 user,host = host.split('@',1)
762 761 else:
763 762 user=None
764 763 for i in range(n):
765 764 if i > 0:
766 765 time.sleep(self.delay)
767 766 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
768 767 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
769 768 )
770 769 if i > 0:
771 770 # only send files for the first engine on each host
772 771 el.to_send = []
773 772
774 773 # Copy the engine args over to each engine launcher.
775 774 el.engine_cmd = self.engine_cmd
776 775 el.engine_args = args
777 776 el.on_stop(self._notice_engine_stopped)
778 777 d = el.start(user=user, hostname=host)
779 778 self.launchers[ "%s/%i" % (host,i) ] = el
780 779 dlist.append(d)
781 780 self.notify_start(dlist)
782 781 return dlist
783 782
784 783
785 784 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
786 785 """Launcher for calling
787 786 `ipcluster engines` on a remote machine.
788 787
789 788 Requires that remote profile is already configured.
790 789 """
791 790
792 791 n = Integer()
793 792 ipcluster_cmd = List(['ipcluster'], config=True)
794 793
795 794 @property
796 795 def program(self):
797 796 return self.ipcluster_cmd + ['engines']
798 797
799 798 @property
800 799 def program_args(self):
801 800 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
802 801
803 802 def _to_send_default(self):
804 803 return [
805 804 (os.path.join(self.profile_dir, 'security', cf),
806 805 os.path.join(self.remote_profile_dir, 'security', cf))
807 806 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
808 807 ]
809 808
810 809 def start(self, n):
811 810 self.n = n
812 811 super(SSHProxyEngineSetLauncher, self).start()
813 812
814 813
815 814 #-----------------------------------------------------------------------------
816 815 # Windows HPC Server 2008 scheduler launchers
817 816 #-----------------------------------------------------------------------------
818 817
819 818
820 819 # This is only used on Windows.
821 820 def find_job_cmd():
822 821 if WINDOWS:
823 822 try:
824 823 return find_cmd('job')
825 824 except (FindCmdError, ImportError):
826 825 # ImportError will be raised if win32api is not installed
827 826 return 'job'
828 827 else:
829 828 return 'job'
830 829
831 830
832 831 class WindowsHPCLauncher(BaseLauncher):
833 832
834 833 job_id_regexp = CRegExp(r'\d+', config=True,
835 834 help="""A regular expression used to get the job id from the output of the
836 835 submit_command. """
837 836 )
838 837 job_file_name = Unicode(u'ipython_job.xml', config=True,
839 838 help="The filename of the instantiated job script.")
840 839 # The full path to the instantiated job script. This gets made dynamically
841 840 # by combining the work_dir with the job_file_name.
842 841 job_file = Unicode(u'')
843 842 scheduler = Unicode('', config=True,
844 843 help="The hostname of the scheduler to submit the job to.")
845 844 job_cmd = Unicode(find_job_cmd(), config=True,
846 845 help="The command for submitting jobs.")
847 846
848 847 def __init__(self, work_dir=u'.', config=None, **kwargs):
849 848 super(WindowsHPCLauncher, self).__init__(
850 849 work_dir=work_dir, config=config, **kwargs
851 850 )
852 851
853 852 @property
854 853 def job_file(self):
855 854 return os.path.join(self.work_dir, self.job_file_name)
856 855
857 856 def write_job_file(self, n):
858 857 raise NotImplementedError("Implement write_job_file in a subclass.")
859 858
860 859 def find_args(self):
861 860 return [u'job.exe']
862 861
863 862 def parse_job_id(self, output):
864 863 """Take the output of the submit command and return the job id."""
865 864 m = self.job_id_regexp.search(output)
866 865 if m is not None:
867 866 job_id = m.group()
868 867 else:
869 868 raise LauncherError("Job id couldn't be determined: %s" % output)
870 869 self.job_id = job_id
871 870 self.log.info('Job started with id: %r', job_id)
872 871 return job_id
873 872
874 873 def start(self, n):
875 874 """Start n copies of the process using the Win HPC job scheduler."""
876 875 self.write_job_file(n)
877 876 args = [
878 877 'submit',
879 878 '/jobfile:%s' % self.job_file,
880 879 '/scheduler:%s' % self.scheduler
881 880 ]
882 881 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
883 882
884 883 output = check_output([self.job_cmd]+args,
885 884 env=os.environ,
886 885 cwd=self.work_dir,
887 886 stderr=STDOUT
888 887 )
889 888 output = output.decode(DEFAULT_ENCODING, 'replace')
890 889 job_id = self.parse_job_id(output)
891 890 self.notify_start(job_id)
892 891 return job_id
893 892
894 893 def stop(self):
895 894 args = [
896 895 'cancel',
897 896 self.job_id,
898 897 '/scheduler:%s' % self.scheduler
899 898 ]
900 899 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
901 900 try:
902 901 output = check_output([self.job_cmd]+args,
903 902 env=os.environ,
904 903 cwd=self.work_dir,
905 904 stderr=STDOUT
906 905 )
907 906 output = output.decode(DEFAULT_ENCODING, 'replace')
908 907 except:
909 908 output = u'The job already appears to be stopped: %r' % self.job_id
910 909 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
911 910 return output
912 911
913 912
914 913 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
915 914
916 915 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
917 916 help="WinHPC xml job file.")
918 917 controller_args = List([], config=False,
919 918 help="extra args to pass to ipcontroller")
920 919
921 920 def write_job_file(self, n):
922 921 job = IPControllerJob(parent=self)
923 922
924 923 t = IPControllerTask(parent=self)
925 924 # The tasks work directory is *not* the actual work directory of
926 925 # the controller. It is used as the base path for the stdout/stderr
927 926 # files that the scheduler redirects to.
928 927 t.work_directory = self.profile_dir
929 928 # Add the profile_dir and from self.start().
930 929 t.controller_args.extend(self.cluster_args)
931 930 t.controller_args.extend(self.controller_args)
932 931 job.add_task(t)
933 932
934 933 self.log.debug("Writing job description file: %s", self.job_file)
935 934 job.write(self.job_file)
936 935
937 936 @property
938 937 def job_file(self):
939 938 return os.path.join(self.profile_dir, self.job_file_name)
940 939
941 940 def start(self):
942 941 """Start the controller by profile_dir."""
943 942 return super(WindowsHPCControllerLauncher, self).start(1)
944 943
945 944
946 945 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
947 946
948 947 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
949 948 help="jobfile for ipengines job")
950 949 engine_args = List([], config=False,
951 950 help="extra args to pas to ipengine")
952 951
953 952 def write_job_file(self, n):
954 953 job = IPEngineSetJob(parent=self)
955 954
956 955 for i in range(n):
957 956 t = IPEngineTask(parent=self)
958 957 # The tasks work directory is *not* the actual work directory of
959 958 # the engine. It is used as the base path for the stdout/stderr
960 959 # files that the scheduler redirects to.
961 960 t.work_directory = self.profile_dir
962 961 # Add the profile_dir and from self.start().
963 962 t.engine_args.extend(self.cluster_args)
964 963 t.engine_args.extend(self.engine_args)
965 964 job.add_task(t)
966 965
967 966 self.log.debug("Writing job description file: %s", self.job_file)
968 967 job.write(self.job_file)
969 968
970 969 @property
971 970 def job_file(self):
972 971 return os.path.join(self.profile_dir, self.job_file_name)
973 972
974 973 def start(self, n):
975 974 """Start the controller by profile_dir."""
976 975 return super(WindowsHPCEngineSetLauncher, self).start(n)
977 976
978 977
979 978 #-----------------------------------------------------------------------------
980 979 # Batch (PBS) system launchers
981 980 #-----------------------------------------------------------------------------
982 981
983 982 class BatchClusterAppMixin(ClusterAppMixin):
984 983 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
985 984 def _profile_dir_changed(self, name, old, new):
986 985 self.context[name] = new
987 986 _cluster_id_changed = _profile_dir_changed
988 987
989 988 def _profile_dir_default(self):
990 989 self.context['profile_dir'] = ''
991 990 return ''
992 991 def _cluster_id_default(self):
993 992 self.context['cluster_id'] = ''
994 993 return ''
995 994
996 995
997 996 class BatchSystemLauncher(BaseLauncher):
998 997 """Launch an external process using a batch system.
999 998
1000 999 This class is designed to work with UNIX batch systems like PBS, LSF,
1001 1000 GridEngine, etc. The overall model is that there are different commands
1002 1001 like qsub, qdel, etc. that handle the starting and stopping of the process.
1003 1002
1004 1003 This class also has the notion of a batch script. The ``batch_template``
1005 1004 attribute can be set to a string that is a template for the batch script.
1006 1005 This template is instantiated using string formatting. Thus the template can
1007 1006 use {n} fot the number of instances. Subclasses can add additional variables
1008 1007 to the template dict.
1009 1008 """
1010 1009
1011 1010 # Subclasses must fill these in. See PBSEngineSet
1012 1011 submit_command = List([''], config=True,
1013 1012 help="The name of the command line program used to submit jobs.")
1014 1013 delete_command = List([''], config=True,
1015 1014 help="The name of the command line program used to delete jobs.")
1016 1015 job_id_regexp = CRegExp('', config=True,
1017 1016 help="""A regular expression used to get the job id from the output of the
1018 1017 submit_command.""")
1019 1018 job_id_regexp_group = Integer(0, config=True,
1020 1019 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1021 1020 batch_template = Unicode('', config=True,
1022 1021 help="The string that is the batch script template itself.")
1023 1022 batch_template_file = Unicode(u'', config=True,
1024 1023 help="The file that contains the batch template.")
1025 1024 batch_file_name = Unicode(u'batch_script', config=True,
1026 1025 help="The filename of the instantiated batch script.")
1027 1026 queue = Unicode(u'', config=True,
1028 1027 help="The PBS Queue.")
1029 1028
1030 1029 def _queue_changed(self, name, old, new):
1031 1030 self.context[name] = new
1032 1031
1033 1032 n = Integer(1)
1034 1033 _n_changed = _queue_changed
1035 1034
1036 1035 # not configurable, override in subclasses
1037 1036 # PBS Job Array regex
1038 1037 job_array_regexp = CRegExp('')
1039 1038 job_array_template = Unicode('')
1040 1039 # PBS Queue regex
1041 1040 queue_regexp = CRegExp('')
1042 1041 queue_template = Unicode('')
1043 1042 # The default batch template, override in subclasses
1044 1043 default_template = Unicode('')
1045 1044 # The full path to the instantiated batch script.
1046 1045 batch_file = Unicode(u'')
1047 1046 # the format dict used with batch_template:
1048 1047 context = Dict()
1049 1048
1050 1049 def _context_default(self):
1051 1050 """load the default context with the default values for the basic keys
1052 1051
1053 1052 because the _trait_changed methods only load the context if they
1054 1053 are set to something other than the default value.
1055 1054 """
1056 1055 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1057 1056
1058 1057 # the Formatter instance for rendering the templates:
1059 1058 formatter = Instance(EvalFormatter, (), {})
1060 1059
1061 1060 def find_args(self):
1062 1061 return self.submit_command + [self.batch_file]
1063 1062
1064 1063 def __init__(self, work_dir=u'.', config=None, **kwargs):
1065 1064 super(BatchSystemLauncher, self).__init__(
1066 1065 work_dir=work_dir, config=config, **kwargs
1067 1066 )
1068 1067 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1069 1068
1070 1069 def parse_job_id(self, output):
1071 1070 """Take the output of the submit command and return the job id."""
1072 1071 m = self.job_id_regexp.search(output)
1073 1072 if m is not None:
1074 1073 job_id = m.group(self.job_id_regexp_group)
1075 1074 else:
1076 1075 raise LauncherError("Job id couldn't be determined: %s" % output)
1077 1076 self.job_id = job_id
1078 1077 self.log.info('Job submitted with job id: %r', job_id)
1079 1078 return job_id
1080 1079
1081 1080 def write_batch_script(self, n):
1082 1081 """Instantiate and write the batch script to the work_dir."""
1083 1082 self.n = n
1084 1083 # first priority is batch_template if set
1085 1084 if self.batch_template_file and not self.batch_template:
1086 1085 # second priority is batch_template_file
1087 1086 with open(self.batch_template_file) as f:
1088 1087 self.batch_template = f.read()
1089 1088 if not self.batch_template:
1090 1089 # third (last) priority is default_template
1091 1090 self.batch_template = self.default_template
1092 1091 # add jobarray or queue lines to user-specified template
1093 1092 # note that this is *only* when user did not specify a template.
1094 1093 self._insert_queue_in_script()
1095 1094 self._insert_job_array_in_script()
1096 1095 script_as_string = self.formatter.format(self.batch_template, **self.context)
1097 1096 self.log.debug('Writing batch script: %s', self.batch_file)
1098 1097 with open(self.batch_file, 'w') as f:
1099 1098 f.write(script_as_string)
1100 1099 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1101 1100
1102 1101 def _insert_queue_in_script(self):
1103 1102 """Inserts a queue if required into the batch script.
1104 1103 """
1105 1104 if self.queue and not self.queue_regexp.search(self.batch_template):
1106 1105 self.log.debug("adding PBS queue settings to batch script")
1107 1106 firstline, rest = self.batch_template.split('\n',1)
1108 1107 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1109 1108
1110 1109 def _insert_job_array_in_script(self):
1111 1110 """Inserts a job array if required into the batch script.
1112 1111 """
1113 1112 if not self.job_array_regexp.search(self.batch_template):
1114 1113 self.log.debug("adding job array settings to batch script")
1115 1114 firstline, rest = self.batch_template.split('\n',1)
1116 1115 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1117 1116
1118 1117 def start(self, n):
1119 1118 """Start n copies of the process using a batch system."""
1120 1119 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1121 1120 # Here we save profile_dir in the context so they
1122 1121 # can be used in the batch script template as {profile_dir}
1123 1122 self.write_batch_script(n)
1124 1123 output = check_output(self.args, env=os.environ)
1125 1124 output = output.decode(DEFAULT_ENCODING, 'replace')
1126 1125
1127 1126 job_id = self.parse_job_id(output)
1128 1127 self.notify_start(job_id)
1129 1128 return job_id
1130 1129
1131 1130 def stop(self):
1132 1131 try:
1133 1132 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1134 1133 stdout=PIPE, stderr=PIPE)
1135 1134 out, err = p.communicate()
1136 1135 output = out + err
1137 1136 except:
1138 1137 self.log.exception("Problem stopping cluster with command: %s" %
1139 1138 (self.delete_command + [self.job_id]))
1140 1139 output = ""
1141 1140 output = output.decode(DEFAULT_ENCODING, 'replace')
1142 1141 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1143 1142 return output
1144 1143
1145 1144
1146 1145 class PBSLauncher(BatchSystemLauncher):
1147 1146 """A BatchSystemLauncher subclass for PBS."""
1148 1147
1149 1148 submit_command = List(['qsub'], config=True,
1150 1149 help="The PBS submit command ['qsub']")
1151 1150 delete_command = List(['qdel'], config=True,
1152 1151 help="The PBS delete command ['qsub']")
1153 1152 job_id_regexp = CRegExp(r'\d+', config=True,
1154 1153 help="Regular expresion for identifying the job ID [r'\d+']")
1155 1154
1156 1155 batch_file = Unicode(u'')
1157 1156 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1158 1157 job_array_template = Unicode('#PBS -t 1-{n}')
1159 1158 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1160 1159 queue_template = Unicode('#PBS -q {queue}')
1161 1160
1162 1161
1163 1162 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1164 1163 """Launch a controller using PBS."""
1165 1164
1166 1165 batch_file_name = Unicode(u'pbs_controller', config=True,
1167 1166 help="batch file name for the controller job.")
1168 1167 default_template= Unicode("""#!/bin/sh
1169 1168 #PBS -V
1170 1169 #PBS -N ipcontroller
1171 1170 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1172 1171 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1173 1172
1174 1173 def start(self):
1175 1174 """Start the controller by profile or profile_dir."""
1176 1175 return super(PBSControllerLauncher, self).start(1)
1177 1176
1178 1177
1179 1178 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1180 1179 """Launch Engines using PBS"""
1181 1180 batch_file_name = Unicode(u'pbs_engines', config=True,
1182 1181 help="batch file name for the engine(s) job.")
1183 1182 default_template= Unicode(u"""#!/bin/sh
1184 1183 #PBS -V
1185 1184 #PBS -N ipengine
1186 1185 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1187 1186 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1188 1187
1189 1188
1190 1189 #SGE is very similar to PBS
1191 1190
1192 1191 class SGELauncher(PBSLauncher):
1193 1192 """Sun GridEngine is a PBS clone with slightly different syntax"""
1194 1193 job_array_regexp = CRegExp('#\$\W+\-t')
1195 1194 job_array_template = Unicode('#$ -t 1-{n}')
1196 1195 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1197 1196 queue_template = Unicode('#$ -q {queue}')
1198 1197
1199 1198
1200 1199 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1201 1200 """Launch a controller using SGE."""
1202 1201
1203 1202 batch_file_name = Unicode(u'sge_controller', config=True,
1204 1203 help="batch file name for the ipontroller job.")
1205 1204 default_template= Unicode(u"""#$ -V
1206 1205 #$ -S /bin/sh
1207 1206 #$ -N ipcontroller
1208 1207 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1209 1208 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1210 1209
1211 1210 def start(self):
1212 1211 """Start the controller by profile or profile_dir."""
1213 1212 return super(SGEControllerLauncher, self).start(1)
1214 1213
1215 1214
1216 1215 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1217 1216 """Launch Engines with SGE"""
1218 1217 batch_file_name = Unicode(u'sge_engines', config=True,
1219 1218 help="batch file name for the engine(s) job.")
1220 1219 default_template = Unicode("""#$ -V
1221 1220 #$ -S /bin/sh
1222 1221 #$ -N ipengine
1223 1222 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1224 1223 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1225 1224
1226 1225
1227 1226 # LSF launchers
1228 1227
1229 1228 class LSFLauncher(BatchSystemLauncher):
1230 1229 """A BatchSystemLauncher subclass for LSF."""
1231 1230
1232 1231 submit_command = List(['bsub'], config=True,
1233 1232 help="The PBS submit command ['bsub']")
1234 1233 delete_command = List(['bkill'], config=True,
1235 1234 help="The PBS delete command ['bkill']")
1236 1235 job_id_regexp = CRegExp(r'\d+', config=True,
1237 1236 help="Regular expresion for identifying the job ID [r'\d+']")
1238 1237
1239 1238 batch_file = Unicode(u'')
1240 1239 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1241 1240 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1242 1241 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1243 1242 queue_template = Unicode('#BSUB -q {queue}')
1244 1243
1245 1244 def start(self, n):
1246 1245 """Start n copies of the process using LSF batch system.
1247 1246 This cant inherit from the base class because bsub expects
1248 1247 to be piped a shell script in order to honor the #BSUB directives :
1249 1248 bsub < script
1250 1249 """
1251 1250 # Here we save profile_dir in the context so they
1252 1251 # can be used in the batch script template as {profile_dir}
1253 1252 self.write_batch_script(n)
1254 1253 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1255 1254 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1256 1255 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1257 1256 output,err = p.communicate()
1258 1257 output = output.decode(DEFAULT_ENCODING, 'replace')
1259 1258 job_id = self.parse_job_id(output)
1260 1259 self.notify_start(job_id)
1261 1260 return job_id
1262 1261
1263 1262
1264 1263 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1265 1264 """Launch a controller using LSF."""
1266 1265
1267 1266 batch_file_name = Unicode(u'lsf_controller', config=True,
1268 1267 help="batch file name for the controller job.")
1269 1268 default_template= Unicode("""#!/bin/sh
1270 1269 #BSUB -J ipcontroller
1271 1270 #BSUB -oo ipcontroller.o.%%J
1272 1271 #BSUB -eo ipcontroller.e.%%J
1273 1272 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1274 1273 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1275 1274
1276 1275 def start(self):
1277 1276 """Start the controller by profile or profile_dir."""
1278 1277 return super(LSFControllerLauncher, self).start(1)
1279 1278
1280 1279
1281 1280 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1282 1281 """Launch Engines using LSF"""
1283 1282 batch_file_name = Unicode(u'lsf_engines', config=True,
1284 1283 help="batch file name for the engine(s) job.")
1285 1284 default_template= Unicode(u"""#!/bin/sh
1286 1285 #BSUB -oo ipengine.o.%%J
1287 1286 #BSUB -eo ipengine.e.%%J
1288 1287 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1289 1288 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1290 1289
1291 1290
1292 1291
1293 1292 class HTCondorLauncher(BatchSystemLauncher):
1294 1293 """A BatchSystemLauncher subclass for HTCondor.
1295 1294
1296 1295 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1297 1296 that the python instance but otherwise is very similar to PBS. This is because
1298 1297 HTCondor destroys sys.executable when launching remote processes - a launched
1299 1298 python process depends on sys.executable to effectively evaluate its
1300 1299 module search paths. Without it, regardless of which python interpreter you launch
1301 1300 you will get the to built in module search paths.
1302 1301
1303 1302 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1304 1303 this - the mechanism of shebanged scripts means that the python binary will be
1305 1304 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1306 1305 scripts on the remote node*. This means you need to take care that:
1307 1306
1308 1307 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1309 1308 of the python environment you wish to execute code in having top precedence.
1310 1309 b. This functionality is untested on Windows.
1311 1310
1312 1311 If you need different behavior, consider making you own template.
1313 1312 """
1314 1313
1315 1314 submit_command = List(['condor_submit'], config=True,
1316 1315 help="The HTCondor submit command ['condor_submit']")
1317 1316 delete_command = List(['condor_rm'], config=True,
1318 1317 help="The HTCondor delete command ['condor_rm']")
1319 1318 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1320 1319 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1321 1320 job_id_regexp_group = Integer(1, config=True,
1322 1321 help="""The group we wish to match in job_id_regexp [1]""")
1323 1322
1324 1323 job_array_regexp = CRegExp('queue\W+\$')
1325 1324 job_array_template = Unicode('queue {n}')
1326 1325
1327 1326
1328 1327 def _insert_job_array_in_script(self):
1329 1328 """Inserts a job array if required into the batch script.
1330 1329 """
1331 1330 if not self.job_array_regexp.search(self.batch_template):
1332 1331 self.log.debug("adding job array settings to batch script")
1333 1332 #HTCondor requires that the job array goes at the bottom of the script
1334 1333 self.batch_template = '\n'.join([self.batch_template,
1335 1334 self.job_array_template])
1336 1335
1337 1336 def _insert_queue_in_script(self):
1338 1337 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1339 1338 specified in the script.
1340 1339 """
1341 1340 pass
1342 1341
1343 1342
1344 1343 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1345 1344 """Launch a controller using HTCondor."""
1346 1345
1347 1346 batch_file_name = Unicode(u'htcondor_controller', config=True,
1348 1347 help="batch file name for the controller job.")
1349 1348 default_template = Unicode(r"""
1350 1349 universe = vanilla
1351 1350 executable = ipcontroller
1352 1351 # by default we expect a shared file system
1353 1352 transfer_executable = False
1354 1353 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1355 1354 """)
1356 1355
1357 1356 def start(self):
1358 1357 """Start the controller by profile or profile_dir."""
1359 1358 return super(HTCondorControllerLauncher, self).start(1)
1360 1359
1361 1360
1362 1361 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1363 1362 """Launch Engines using HTCondor"""
1364 1363 batch_file_name = Unicode(u'htcondor_engines', config=True,
1365 1364 help="batch file name for the engine(s) job.")
1366 1365 default_template = Unicode("""
1367 1366 universe = vanilla
1368 1367 executable = ipengine
1369 1368 # by default we expect a shared file system
1370 1369 transfer_executable = False
1371 1370 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1372 1371 """)
1373 1372
1374 1373
1375 1374 #-----------------------------------------------------------------------------
1376 1375 # A launcher for ipcluster itself!
1377 1376 #-----------------------------------------------------------------------------
1378 1377
1379 1378
1380 1379 class IPClusterLauncher(LocalProcessLauncher):
1381 1380 """Launch the ipcluster program in an external process."""
1382 1381
1383 1382 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1384 1383 help="Popen command for ipcluster")
1385 1384 ipcluster_args = List(
1386 1385 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1387 1386 help="Command line arguments to pass to ipcluster.")
1388 1387 ipcluster_subcommand = Unicode('start')
1389 1388 profile = Unicode('default')
1390 1389 n = Integer(2)
1391 1390
1392 1391 def find_args(self):
1393 1392 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1394 1393 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1395 1394 self.ipcluster_args
1396 1395
1397 1396 def start(self):
1398 1397 return super(IPClusterLauncher, self).start()
1399 1398
1400 1399 #-----------------------------------------------------------------------------
1401 1400 # Collections of launchers
1402 1401 #-----------------------------------------------------------------------------
1403 1402
1404 1403 local_launchers = [
1405 1404 LocalControllerLauncher,
1406 1405 LocalEngineLauncher,
1407 1406 LocalEngineSetLauncher,
1408 1407 ]
1409 1408 mpi_launchers = [
1410 1409 MPILauncher,
1411 1410 MPIControllerLauncher,
1412 1411 MPIEngineSetLauncher,
1413 1412 ]
1414 1413 ssh_launchers = [
1415 1414 SSHLauncher,
1416 1415 SSHControllerLauncher,
1417 1416 SSHEngineLauncher,
1418 1417 SSHEngineSetLauncher,
1419 1418 SSHProxyEngineSetLauncher,
1420 1419 ]
1421 1420 winhpc_launchers = [
1422 1421 WindowsHPCLauncher,
1423 1422 WindowsHPCControllerLauncher,
1424 1423 WindowsHPCEngineSetLauncher,
1425 1424 ]
1426 1425 pbs_launchers = [
1427 1426 PBSLauncher,
1428 1427 PBSControllerLauncher,
1429 1428 PBSEngineSetLauncher,
1430 1429 ]
1431 1430 sge_launchers = [
1432 1431 SGELauncher,
1433 1432 SGEControllerLauncher,
1434 1433 SGEEngineSetLauncher,
1435 1434 ]
1436 1435 lsf_launchers = [
1437 1436 LSFLauncher,
1438 1437 LSFControllerLauncher,
1439 1438 LSFEngineSetLauncher,
1440 1439 ]
1441 1440 htcondor_launchers = [
1442 1441 HTCondorLauncher,
1443 1442 HTCondorControllerLauncher,
1444 1443 HTCondorEngineSetLauncher,
1445 1444 ]
1446 1445 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1447 1446 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
@@ -1,1449 +1,1438
1 1 """The IPython Controller Hub with 0MQ
2 2
3 3 This is the master object that handles connections from engines and clients,
4 4 and monitors traffic through the various queues.
5 5 """
6 6
7 7 # Copyright (c) IPython Development Team.
8 8 # Distributed under the terms of the Modified BSD License.
9 9
10 10 from __future__ import print_function
11 11
12 12 import json
13 13 import os
14 14 import sys
15 15 import time
16 16 from datetime import datetime
17 17
18 18 import zmq
19 from zmq.eventloop import ioloop
20 19 from zmq.eventloop.zmqstream import ZMQStream
21 20
22 21 # internal:
23 22 from IPython.utils.importstring import import_item
24 23 from IPython.utils.jsonutil import extract_dates
25 24 from IPython.utils.localinterfaces import localhost
26 25 from IPython.utils.py3compat import cast_bytes, unicode_type, iteritems
27 26 from IPython.utils.traitlets import (
28 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
27 HasTraits, Any, Instance, Integer, Unicode, Dict, Set, Tuple, DottedObjectName
29 28 )
30 29
31 30 from IPython.parallel import error, util
32 31 from IPython.parallel.factory import RegistrationFactory
33 32
34 33 from IPython.kernel.zmq.session import SessionFactory
35 34
36 35 from .heartmonitor import HeartMonitor
37 36
38 #-----------------------------------------------------------------------------
39 # Code
40 #-----------------------------------------------------------------------------
41 37
42 38 def _passer(*args, **kwargs):
43 39 return
44 40
45 41 def _printer(*args, **kwargs):
46 42 print (args)
47 43 print (kwargs)
48 44
49 45 def empty_record():
50 46 """Return an empty dict with all record keys."""
51 47 return {
52 48 'msg_id' : None,
53 49 'header' : None,
54 50 'metadata' : None,
55 51 'content': None,
56 52 'buffers': None,
57 53 'submitted': None,
58 54 'client_uuid' : None,
59 55 'engine_uuid' : None,
60 56 'started': None,
61 57 'completed': None,
62 58 'resubmitted': None,
63 59 'received': None,
64 60 'result_header' : None,
65 61 'result_metadata' : None,
66 62 'result_content' : None,
67 63 'result_buffers' : None,
68 64 'queue' : None,
69 65 'execute_input' : None,
70 66 'execute_result': None,
71 67 'error': None,
72 68 'stdout': '',
73 69 'stderr': '',
74 70 }
75 71
76 72 def init_record(msg):
77 73 """Initialize a TaskRecord based on a request."""
78 74 header = msg['header']
79 75 return {
80 76 'msg_id' : header['msg_id'],
81 77 'header' : header,
82 78 'content': msg['content'],
83 79 'metadata': msg['metadata'],
84 80 'buffers': msg['buffers'],
85 81 'submitted': header['date'],
86 82 'client_uuid' : None,
87 83 'engine_uuid' : None,
88 84 'started': None,
89 85 'completed': None,
90 86 'resubmitted': None,
91 87 'received': None,
92 88 'result_header' : None,
93 89 'result_metadata': None,
94 90 'result_content' : None,
95 91 'result_buffers' : None,
96 92 'queue' : None,
97 93 'execute_input' : None,
98 94 'execute_result': None,
99 95 'error': None,
100 96 'stdout': '',
101 97 'stderr': '',
102 98 }
103 99
104 100
105 101 class EngineConnector(HasTraits):
106 102 """A simple object for accessing the various zmq connections of an object.
107 103 Attributes are:
108 104 id (int): engine ID
109 105 uuid (unicode): engine UUID
110 106 pending: set of msg_ids
111 stallback: DelayedCallback for stalled registration
107 stallback: tornado timeout for stalled registration
112 108 """
113 109
114 110 id = Integer(0)
115 111 uuid = Unicode()
116 112 pending = Set()
117 stallback = Instance(ioloop.DelayedCallback)
113 stallback = Any()
118 114
119 115
120 116 _db_shortcuts = {
121 117 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
122 118 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
123 119 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
124 120 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
125 121 }
126 122
127 123 class HubFactory(RegistrationFactory):
128 124 """The Configurable for setting up a Hub."""
129 125
130 126 # port-pairs for monitoredqueues:
131 127 hb = Tuple(Integer,Integer,config=True,
132 128 help="""PUB/ROUTER Port pair for Engine heartbeats""")
133 129 def _hb_default(self):
134 130 return tuple(util.select_random_ports(2))
135 131
136 132 mux = Tuple(Integer,Integer,config=True,
137 133 help="""Client/Engine Port pair for MUX queue""")
138 134
139 135 def _mux_default(self):
140 136 return tuple(util.select_random_ports(2))
141 137
142 138 task = Tuple(Integer,Integer,config=True,
143 139 help="""Client/Engine Port pair for Task queue""")
144 140 def _task_default(self):
145 141 return tuple(util.select_random_ports(2))
146 142
147 143 control = Tuple(Integer,Integer,config=True,
148 144 help="""Client/Engine Port pair for Control queue""")
149 145
150 146 def _control_default(self):
151 147 return tuple(util.select_random_ports(2))
152 148
153 149 iopub = Tuple(Integer,Integer,config=True,
154 150 help="""Client/Engine Port pair for IOPub relay""")
155 151
156 152 def _iopub_default(self):
157 153 return tuple(util.select_random_ports(2))
158 154
159 155 # single ports:
160 156 mon_port = Integer(config=True,
161 157 help="""Monitor (SUB) port for queue traffic""")
162 158
163 159 def _mon_port_default(self):
164 160 return util.select_random_ports(1)[0]
165 161
166 162 notifier_port = Integer(config=True,
167 163 help="""PUB port for sending engine status notifications""")
168 164
169 165 def _notifier_port_default(self):
170 166 return util.select_random_ports(1)[0]
171 167
172 168 engine_ip = Unicode(config=True,
173 169 help="IP on which to listen for engine connections. [default: loopback]")
174 170 def _engine_ip_default(self):
175 171 return localhost()
176 172 engine_transport = Unicode('tcp', config=True,
177 173 help="0MQ transport for engine connections. [default: tcp]")
178 174
179 175 client_ip = Unicode(config=True,
180 176 help="IP on which to listen for client connections. [default: loopback]")
181 177 client_transport = Unicode('tcp', config=True,
182 178 help="0MQ transport for client connections. [default : tcp]")
183 179
184 180 monitor_ip = Unicode(config=True,
185 181 help="IP on which to listen for monitor messages. [default: loopback]")
186 182 monitor_transport = Unicode('tcp', config=True,
187 183 help="0MQ transport for monitor messages. [default : tcp]")
188 184
189 185 _client_ip_default = _monitor_ip_default = _engine_ip_default
190 186
191 187
192 188 monitor_url = Unicode('')
193 189
194 190 db_class = DottedObjectName('NoDB',
195 191 config=True, help="""The class to use for the DB backend
196 192
197 193 Options include:
198 194
199 195 SQLiteDB: SQLite
200 196 MongoDB : use MongoDB
201 197 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
202 198 NoDB : disable database altogether (default)
203 199
204 200 """)
205 201
206 202 registration_timeout = Integer(0, config=True,
207 203 help="Engine registration timeout in seconds [default: max(30,"
208 204 "10*heartmonitor.period)]" )
209 205
210 206 def _registration_timeout_default(self):
211 207 if self.heartmonitor is None:
212 208 # early initialization, this value will be ignored
213 209 return 0
214 210 # heartmonitor period is in milliseconds, so 10x in seconds is .01
215 211 return max(30, int(.01 * self.heartmonitor.period))
216 212
217 213 # not configurable
218 214 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
219 215 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
220 216
221 217 def _ip_changed(self, name, old, new):
222 218 self.engine_ip = new
223 219 self.client_ip = new
224 220 self.monitor_ip = new
225 221 self._update_monitor_url()
226 222
227 223 def _update_monitor_url(self):
228 224 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
229 225
230 226 def _transport_changed(self, name, old, new):
231 227 self.engine_transport = new
232 228 self.client_transport = new
233 229 self.monitor_transport = new
234 230 self._update_monitor_url()
235 231
236 232 def __init__(self, **kwargs):
237 233 super(HubFactory, self).__init__(**kwargs)
238 234 self._update_monitor_url()
239 235
240 236
241 237 def construct(self):
242 238 self.init_hub()
243 239
244 240 def start(self):
245 241 self.heartmonitor.start()
246 242 self.log.info("Heartmonitor started")
247 243
248 244 def client_url(self, channel):
249 245 """return full zmq url for a named client channel"""
250 246 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
251 247
252 248 def engine_url(self, channel):
253 249 """return full zmq url for a named engine channel"""
254 250 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
255 251
256 252 def init_hub(self):
257 253 """construct Hub object"""
258 254
259 255 ctx = self.context
260 256 loop = self.loop
261 257 if 'TaskScheduler.scheme_name' in self.config:
262 258 scheme = self.config.TaskScheduler.scheme_name
263 259 else:
264 260 from .scheduler import TaskScheduler
265 261 scheme = TaskScheduler.scheme_name.get_default_value()
266 262
267 263 # build connection dicts
268 264 engine = self.engine_info = {
269 265 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
270 266 'registration' : self.regport,
271 267 'control' : self.control[1],
272 268 'mux' : self.mux[1],
273 269 'hb_ping' : self.hb[0],
274 270 'hb_pong' : self.hb[1],
275 271 'task' : self.task[1],
276 272 'iopub' : self.iopub[1],
277 273 }
278 274
279 275 client = self.client_info = {
280 276 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
281 277 'registration' : self.regport,
282 278 'control' : self.control[0],
283 279 'mux' : self.mux[0],
284 280 'task' : self.task[0],
285 281 'task_scheme' : scheme,
286 282 'iopub' : self.iopub[0],
287 283 'notification' : self.notifier_port,
288 284 }
289 285
290 286 self.log.debug("Hub engine addrs: %s", self.engine_info)
291 287 self.log.debug("Hub client addrs: %s", self.client_info)
292 288
293 289 # Registrar socket
294 290 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
295 291 util.set_hwm(q, 0)
296 292 q.bind(self.client_url('registration'))
297 293 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
298 294 if self.client_ip != self.engine_ip:
299 295 q.bind(self.engine_url('registration'))
300 296 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
301 297
302 298 ### Engine connections ###
303 299
304 300 # heartbeat
305 301 hpub = ctx.socket(zmq.PUB)
306 302 hpub.bind(self.engine_url('hb_ping'))
307 303 hrep = ctx.socket(zmq.ROUTER)
308 304 util.set_hwm(hrep, 0)
309 305 hrep.bind(self.engine_url('hb_pong'))
310 306 self.heartmonitor = HeartMonitor(loop=loop, parent=self, log=self.log,
311 307 pingstream=ZMQStream(hpub,loop),
312 308 pongstream=ZMQStream(hrep,loop)
313 309 )
314 310
315 311 ### Client connections ###
316 312
317 313 # Notifier socket
318 314 n = ZMQStream(ctx.socket(zmq.PUB), loop)
319 315 n.bind(self.client_url('notification'))
320 316
321 317 ### build and launch the queues ###
322 318
323 319 # monitor socket
324 320 sub = ctx.socket(zmq.SUB)
325 321 sub.setsockopt(zmq.SUBSCRIBE, b"")
326 322 sub.bind(self.monitor_url)
327 323 sub.bind('inproc://monitor')
328 324 sub = ZMQStream(sub, loop)
329 325
330 326 # connect the db
331 327 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
332 328 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
333 329 self.db = import_item(str(db_class))(session=self.session.session,
334 330 parent=self, log=self.log)
335 331 time.sleep(.25)
336 332
337 333 # resubmit stream
338 334 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
339 335 url = util.disambiguate_url(self.client_url('task'))
340 336 r.connect(url)
341 337
342 # convert seconds to msec
343 registration_timeout = 1000*self.registration_timeout
344
345 338 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
346 339 query=q, notifier=n, resubmit=r, db=self.db,
347 340 engine_info=self.engine_info, client_info=self.client_info,
348 log=self.log, registration_timeout=registration_timeout)
341 log=self.log, registration_timeout=self.registration_timeout)
349 342
350 343
351 344 class Hub(SessionFactory):
352 345 """The IPython Controller Hub with 0MQ connections
353 346
354 347 Parameters
355 348 ==========
356 349 loop: zmq IOLoop instance
357 350 session: Session object
358 351 <removed> context: zmq context for creating new connections (?)
359 352 queue: ZMQStream for monitoring the command queue (SUB)
360 353 query: ZMQStream for engine registration and client queries requests (ROUTER)
361 354 heartbeat: HeartMonitor object checking the pulse of the engines
362 355 notifier: ZMQStream for broadcasting engine registration changes (PUB)
363 356 db: connection to db for out of memory logging of commands
364 357 NotImplemented
365 358 engine_info: dict of zmq connection information for engines to connect
366 359 to the queues.
367 360 client_info: dict of zmq connection information for engines to connect
368 361 to the queues.
369 362 """
370 363
371 364 engine_state_file = Unicode()
372 365
373 366 # internal data structures:
374 367 ids=Set() # engine IDs
375 368 keytable=Dict()
376 369 by_ident=Dict()
377 370 engines=Dict()
378 371 clients=Dict()
379 372 hearts=Dict()
380 373 pending=Set()
381 374 queues=Dict() # pending msg_ids keyed by engine_id
382 375 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
383 376 completed=Dict() # completed msg_ids keyed by engine_id
384 377 all_completed=Set() # completed msg_ids keyed by engine_id
385 378 dead_engines=Set() # completed msg_ids keyed by engine_id
386 379 unassigned=Set() # set of task msg_ds not yet assigned a destination
387 380 incoming_registrations=Dict()
388 381 registration_timeout=Integer()
389 382 _idcounter=Integer(0)
390 383
391 384 # objects from constructor:
392 385 query=Instance(ZMQStream)
393 386 monitor=Instance(ZMQStream)
394 387 notifier=Instance(ZMQStream)
395 388 resubmit=Instance(ZMQStream)
396 389 heartmonitor=Instance(HeartMonitor)
397 390 db=Instance(object)
398 391 client_info=Dict()
399 392 engine_info=Dict()
400 393
401 394
402 395 def __init__(self, **kwargs):
403 396 """
404 397 # universal:
405 398 loop: IOLoop for creating future connections
406 399 session: streamsession for sending serialized data
407 400 # engine:
408 401 queue: ZMQStream for monitoring queue messages
409 402 query: ZMQStream for engine+client registration and client requests
410 403 heartbeat: HeartMonitor object for tracking engines
411 404 # extra:
412 405 db: ZMQStream for db connection (NotImplemented)
413 406 engine_info: zmq address/protocol dict for engine connections
414 407 client_info: zmq address/protocol dict for client connections
415 408 """
416 409
417 410 super(Hub, self).__init__(**kwargs)
418 411
419 412 # register our callbacks
420 413 self.query.on_recv(self.dispatch_query)
421 414 self.monitor.on_recv(self.dispatch_monitor_traffic)
422 415
423 416 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
424 417 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
425 418
426 419 self.monitor_handlers = {b'in' : self.save_queue_request,
427 420 b'out': self.save_queue_result,
428 421 b'intask': self.save_task_request,
429 422 b'outtask': self.save_task_result,
430 423 b'tracktask': self.save_task_destination,
431 424 b'incontrol': _passer,
432 425 b'outcontrol': _passer,
433 426 b'iopub': self.save_iopub_message,
434 427 }
435 428
436 429 self.query_handlers = {'queue_request': self.queue_status,
437 430 'result_request': self.get_results,
438 431 'history_request': self.get_history,
439 432 'db_request': self.db_query,
440 433 'purge_request': self.purge_results,
441 434 'load_request': self.check_load,
442 435 'resubmit_request': self.resubmit_task,
443 436 'shutdown_request': self.shutdown_request,
444 437 'registration_request' : self.register_engine,
445 438 'unregistration_request' : self.unregister_engine,
446 439 'connection_request': self.connection_request,
447 440 }
448 441
449 442 # ignore resubmit replies
450 443 self.resubmit.on_recv(lambda msg: None, copy=False)
451 444
452 445 self.log.info("hub::created hub")
453 446
454 447 @property
455 448 def _next_id(self):
456 449 """gemerate a new ID.
457 450
458 451 No longer reuse old ids, just count from 0."""
459 452 newid = self._idcounter
460 453 self._idcounter += 1
461 454 return newid
462 455 # newid = 0
463 456 # incoming = [id[0] for id in itervalues(self.incoming_registrations)]
464 457 # # print newid, self.ids, self.incoming_registrations
465 458 # while newid in self.ids or newid in incoming:
466 459 # newid += 1
467 460 # return newid
468 461
469 462 #-----------------------------------------------------------------------------
470 463 # message validation
471 464 #-----------------------------------------------------------------------------
472 465
473 466 def _validate_targets(self, targets):
474 467 """turn any valid targets argument into a list of integer ids"""
475 468 if targets is None:
476 469 # default to all
477 470 return self.ids
478 471
479 472 if isinstance(targets, (int,str,unicode_type)):
480 473 # only one target specified
481 474 targets = [targets]
482 475 _targets = []
483 476 for t in targets:
484 477 # map raw identities to ids
485 478 if isinstance(t, (str,unicode_type)):
486 479 t = self.by_ident.get(cast_bytes(t), t)
487 480 _targets.append(t)
488 481 targets = _targets
489 482 bad_targets = [ t for t in targets if t not in self.ids ]
490 483 if bad_targets:
491 484 raise IndexError("No Such Engine: %r" % bad_targets)
492 485 if not targets:
493 486 raise IndexError("No Engines Registered")
494 487 return targets
495 488
496 489 #-----------------------------------------------------------------------------
497 490 # dispatch methods (1 per stream)
498 491 #-----------------------------------------------------------------------------
499 492
500 493
501 494 @util.log_errors
502 495 def dispatch_monitor_traffic(self, msg):
503 496 """all ME and Task queue messages come through here, as well as
504 497 IOPub traffic."""
505 498 self.log.debug("monitor traffic: %r", msg[0])
506 499 switch = msg[0]
507 500 try:
508 501 idents, msg = self.session.feed_identities(msg[1:])
509 502 except ValueError:
510 503 idents=[]
511 504 if not idents:
512 505 self.log.error("Monitor message without topic: %r", msg)
513 506 return
514 507 handler = self.monitor_handlers.get(switch, None)
515 508 if handler is not None:
516 509 handler(idents, msg)
517 510 else:
518 511 self.log.error("Unrecognized monitor topic: %r", switch)
519 512
520 513
521 514 @util.log_errors
522 515 def dispatch_query(self, msg):
523 516 """Route registration requests and queries from clients."""
524 517 try:
525 518 idents, msg = self.session.feed_identities(msg)
526 519 except ValueError:
527 520 idents = []
528 521 if not idents:
529 522 self.log.error("Bad Query Message: %r", msg)
530 523 return
531 524 client_id = idents[0]
532 525 try:
533 526 msg = self.session.unserialize(msg, content=True)
534 527 except Exception:
535 528 content = error.wrap_exception()
536 529 self.log.error("Bad Query Message: %r", msg, exc_info=True)
537 530 self.session.send(self.query, "hub_error", ident=client_id,
538 531 content=content)
539 532 return
540 533 # print client_id, header, parent, content
541 534 #switch on message type:
542 535 msg_type = msg['header']['msg_type']
543 536 self.log.info("client::client %r requested %r", client_id, msg_type)
544 537 handler = self.query_handlers.get(msg_type, None)
545 538 try:
546 539 assert handler is not None, "Bad Message Type: %r" % msg_type
547 540 except:
548 541 content = error.wrap_exception()
549 542 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
550 543 self.session.send(self.query, "hub_error", ident=client_id,
551 544 content=content)
552 545 return
553 546
554 547 else:
555 548 handler(idents, msg)
556 549
557 550 def dispatch_db(self, msg):
558 551 """"""
559 552 raise NotImplementedError
560 553
561 554 #---------------------------------------------------------------------------
562 555 # handler methods (1 per event)
563 556 #---------------------------------------------------------------------------
564 557
565 558 #----------------------- Heartbeat --------------------------------------
566 559
567 560 def handle_new_heart(self, heart):
568 561 """handler to attach to heartbeater.
569 562 Called when a new heart starts to beat.
570 563 Triggers completion of registration."""
571 564 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
572 565 if heart not in self.incoming_registrations:
573 566 self.log.info("heartbeat::ignoring new heart: %r", heart)
574 567 else:
575 568 self.finish_registration(heart)
576 569
577 570
578 571 def handle_heart_failure(self, heart):
579 572 """handler to attach to heartbeater.
580 573 called when a previously registered heart fails to respond to beat request.
581 574 triggers unregistration"""
582 575 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
583 576 eid = self.hearts.get(heart, None)
584 577 uuid = self.engines[eid].uuid
585 578 if eid is None or self.keytable[eid] in self.dead_engines:
586 579 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
587 580 else:
588 581 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
589 582
590 583 #----------------------- MUX Queue Traffic ------------------------------
591 584
592 585 def save_queue_request(self, idents, msg):
593 586 if len(idents) < 2:
594 587 self.log.error("invalid identity prefix: %r", idents)
595 588 return
596 589 queue_id, client_id = idents[:2]
597 590 try:
598 591 msg = self.session.unserialize(msg)
599 592 except Exception:
600 593 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
601 594 return
602 595
603 596 eid = self.by_ident.get(queue_id, None)
604 597 if eid is None:
605 598 self.log.error("queue::target %r not registered", queue_id)
606 599 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
607 600 return
608 601 record = init_record(msg)
609 602 msg_id = record['msg_id']
610 603 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
611 604 # Unicode in records
612 605 record['engine_uuid'] = queue_id.decode('ascii')
613 606 record['client_uuid'] = msg['header']['session']
614 607 record['queue'] = 'mux'
615 608
616 609 try:
617 610 # it's posible iopub arrived first:
618 611 existing = self.db.get_record(msg_id)
619 612 for key,evalue in iteritems(existing):
620 613 rvalue = record.get(key, None)
621 614 if evalue and rvalue and evalue != rvalue:
622 615 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
623 616 elif evalue and not rvalue:
624 617 record[key] = evalue
625 618 try:
626 619 self.db.update_record(msg_id, record)
627 620 except Exception:
628 621 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
629 622 except KeyError:
630 623 try:
631 624 self.db.add_record(msg_id, record)
632 625 except Exception:
633 626 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
634 627
635 628
636 629 self.pending.add(msg_id)
637 630 self.queues[eid].append(msg_id)
638 631
639 632 def save_queue_result(self, idents, msg):
640 633 if len(idents) < 2:
641 634 self.log.error("invalid identity prefix: %r", idents)
642 635 return
643 636
644 637 client_id, queue_id = idents[:2]
645 638 try:
646 639 msg = self.session.unserialize(msg)
647 640 except Exception:
648 641 self.log.error("queue::engine %r sent invalid message to %r: %r",
649 642 queue_id, client_id, msg, exc_info=True)
650 643 return
651 644
652 645 eid = self.by_ident.get(queue_id, None)
653 646 if eid is None:
654 647 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
655 648 return
656 649
657 650 parent = msg['parent_header']
658 651 if not parent:
659 652 return
660 653 msg_id = parent['msg_id']
661 654 if msg_id in self.pending:
662 655 self.pending.remove(msg_id)
663 656 self.all_completed.add(msg_id)
664 657 self.queues[eid].remove(msg_id)
665 658 self.completed[eid].append(msg_id)
666 659 self.log.info("queue::request %r completed on %s", msg_id, eid)
667 660 elif msg_id not in self.all_completed:
668 661 # it could be a result from a dead engine that died before delivering the
669 662 # result
670 663 self.log.warn("queue:: unknown msg finished %r", msg_id)
671 664 return
672 665 # update record anyway, because the unregistration could have been premature
673 666 rheader = msg['header']
674 667 md = msg['metadata']
675 668 completed = rheader['date']
676 669 started = extract_dates(md.get('started', None))
677 670 result = {
678 671 'result_header' : rheader,
679 672 'result_metadata': md,
680 673 'result_content': msg['content'],
681 674 'received': datetime.now(),
682 675 'started' : started,
683 676 'completed' : completed
684 677 }
685 678
686 679 result['result_buffers'] = msg['buffers']
687 680 try:
688 681 self.db.update_record(msg_id, result)
689 682 except Exception:
690 683 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
691 684
692 685
693 686 #--------------------- Task Queue Traffic ------------------------------
694 687
695 688 def save_task_request(self, idents, msg):
696 689 """Save the submission of a task."""
697 690 client_id = idents[0]
698 691
699 692 try:
700 693 msg = self.session.unserialize(msg)
701 694 except Exception:
702 695 self.log.error("task::client %r sent invalid task message: %r",
703 696 client_id, msg, exc_info=True)
704 697 return
705 698 record = init_record(msg)
706 699
707 700 record['client_uuid'] = msg['header']['session']
708 701 record['queue'] = 'task'
709 702 header = msg['header']
710 703 msg_id = header['msg_id']
711 704 self.pending.add(msg_id)
712 705 self.unassigned.add(msg_id)
713 706 try:
714 707 # it's posible iopub arrived first:
715 708 existing = self.db.get_record(msg_id)
716 709 if existing['resubmitted']:
717 710 for key in ('submitted', 'client_uuid', 'buffers'):
718 711 # don't clobber these keys on resubmit
719 712 # submitted and client_uuid should be different
720 713 # and buffers might be big, and shouldn't have changed
721 714 record.pop(key)
722 715 # still check content,header which should not change
723 716 # but are not expensive to compare as buffers
724 717
725 718 for key,evalue in iteritems(existing):
726 719 if key.endswith('buffers'):
727 720 # don't compare buffers
728 721 continue
729 722 rvalue = record.get(key, None)
730 723 if evalue and rvalue and evalue != rvalue:
731 724 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
732 725 elif evalue and not rvalue:
733 726 record[key] = evalue
734 727 try:
735 728 self.db.update_record(msg_id, record)
736 729 except Exception:
737 730 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
738 731 except KeyError:
739 732 try:
740 733 self.db.add_record(msg_id, record)
741 734 except Exception:
742 735 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
743 736 except Exception:
744 737 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
745 738
746 739 def save_task_result(self, idents, msg):
747 740 """save the result of a completed task."""
748 741 client_id = idents[0]
749 742 try:
750 743 msg = self.session.unserialize(msg)
751 744 except Exception:
752 745 self.log.error("task::invalid task result message send to %r: %r",
753 746 client_id, msg, exc_info=True)
754 747 return
755 748
756 749 parent = msg['parent_header']
757 750 if not parent:
758 751 # print msg
759 752 self.log.warn("Task %r had no parent!", msg)
760 753 return
761 754 msg_id = parent['msg_id']
762 755 if msg_id in self.unassigned:
763 756 self.unassigned.remove(msg_id)
764 757
765 758 header = msg['header']
766 759 md = msg['metadata']
767 760 engine_uuid = md.get('engine', u'')
768 761 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
769 762
770 763 status = md.get('status', None)
771 764
772 765 if msg_id in self.pending:
773 766 self.log.info("task::task %r finished on %s", msg_id, eid)
774 767 self.pending.remove(msg_id)
775 768 self.all_completed.add(msg_id)
776 769 if eid is not None:
777 770 if status != 'aborted':
778 771 self.completed[eid].append(msg_id)
779 772 if msg_id in self.tasks[eid]:
780 773 self.tasks[eid].remove(msg_id)
781 774 completed = header['date']
782 775 started = extract_dates(md.get('started', None))
783 776 result = {
784 777 'result_header' : header,
785 778 'result_metadata': msg['metadata'],
786 779 'result_content': msg['content'],
787 780 'started' : started,
788 781 'completed' : completed,
789 782 'received' : datetime.now(),
790 783 'engine_uuid': engine_uuid,
791 784 }
792 785
793 786 result['result_buffers'] = msg['buffers']
794 787 try:
795 788 self.db.update_record(msg_id, result)
796 789 except Exception:
797 790 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
798 791
799 792 else:
800 793 self.log.debug("task::unknown task %r finished", msg_id)
801 794
802 795 def save_task_destination(self, idents, msg):
803 796 try:
804 797 msg = self.session.unserialize(msg, content=True)
805 798 except Exception:
806 799 self.log.error("task::invalid task tracking message", exc_info=True)
807 800 return
808 801 content = msg['content']
809 802 # print (content)
810 803 msg_id = content['msg_id']
811 804 engine_uuid = content['engine_id']
812 805 eid = self.by_ident[cast_bytes(engine_uuid)]
813 806
814 807 self.log.info("task::task %r arrived on %r", msg_id, eid)
815 808 if msg_id in self.unassigned:
816 809 self.unassigned.remove(msg_id)
817 810 # else:
818 811 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
819 812
820 813 self.tasks[eid].append(msg_id)
821 814 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
822 815 try:
823 816 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
824 817 except Exception:
825 818 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
826 819
827 820
828 821 def mia_task_request(self, idents, msg):
829 822 raise NotImplementedError
830 823 client_id = idents[0]
831 824 # content = dict(mia=self.mia,status='ok')
832 825 # self.session.send('mia_reply', content=content, idents=client_id)
833 826
834 827
835 828 #--------------------- IOPub Traffic ------------------------------
836 829
837 830 def save_iopub_message(self, topics, msg):
838 831 """save an iopub message into the db"""
839 832 # print (topics)
840 833 try:
841 834 msg = self.session.unserialize(msg, content=True)
842 835 except Exception:
843 836 self.log.error("iopub::invalid IOPub message", exc_info=True)
844 837 return
845 838
846 839 parent = msg['parent_header']
847 840 if not parent:
848 841 self.log.debug("iopub::IOPub message lacks parent: %r", msg)
849 842 return
850 843 msg_id = parent['msg_id']
851 844 msg_type = msg['header']['msg_type']
852 845 content = msg['content']
853 846
854 847 # ensure msg_id is in db
855 848 try:
856 849 rec = self.db.get_record(msg_id)
857 850 except KeyError:
858 851 rec = None
859 852
860 853 # stream
861 854 d = {}
862 855 if msg_type == 'stream':
863 856 name = content['name']
864 857 s = '' if rec is None else rec[name]
865 858 d[name] = s + content['data']
866 859
867 860 elif msg_type == 'error':
868 861 d['error'] = content
869 862 elif msg_type == 'execute_input':
870 863 d['execute_input'] = content['code']
871 864 elif msg_type in ('display_data', 'execute_result'):
872 865 d[msg_type] = content
873 866 elif msg_type == 'status':
874 867 pass
875 868 elif msg_type == 'data_pub':
876 869 self.log.info("ignored data_pub message for %s" % msg_id)
877 870 else:
878 871 self.log.warn("unhandled iopub msg_type: %r", msg_type)
879 872
880 873 if not d:
881 874 return
882 875
883 876 if rec is None:
884 877 # new record
885 878 rec = empty_record()
886 879 rec['msg_id'] = msg_id
887 880 rec.update(d)
888 881 d = rec
889 882 update_record = self.db.add_record
890 883 else:
891 884 update_record = self.db.update_record
892 885
893 886 try:
894 887 update_record(msg_id, d)
895 888 except Exception:
896 889 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
897 890
898 891
899 892
900 893 #-------------------------------------------------------------------------
901 894 # Registration requests
902 895 #-------------------------------------------------------------------------
903 896
904 897 def connection_request(self, client_id, msg):
905 898 """Reply with connection addresses for clients."""
906 899 self.log.info("client::client %r connected", client_id)
907 900 content = dict(status='ok')
908 901 jsonable = {}
909 902 for k,v in iteritems(self.keytable):
910 903 if v not in self.dead_engines:
911 904 jsonable[str(k)] = v
912 905 content['engines'] = jsonable
913 906 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
914 907
915 908 def register_engine(self, reg, msg):
916 909 """Register a new engine."""
917 910 content = msg['content']
918 911 try:
919 912 uuid = content['uuid']
920 913 except KeyError:
921 914 self.log.error("registration::queue not specified", exc_info=True)
922 915 return
923 916
924 917 eid = self._next_id
925 918
926 919 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
927 920
928 921 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
929 922 # check if requesting available IDs:
930 923 if cast_bytes(uuid) in self.by_ident:
931 924 try:
932 925 raise KeyError("uuid %r in use" % uuid)
933 926 except:
934 927 content = error.wrap_exception()
935 928 self.log.error("uuid %r in use", uuid, exc_info=True)
936 929 else:
937 930 for h, ec in iteritems(self.incoming_registrations):
938 931 if uuid == h:
939 932 try:
940 933 raise KeyError("heart_id %r in use" % uuid)
941 934 except:
942 935 self.log.error("heart_id %r in use", uuid, exc_info=True)
943 936 content = error.wrap_exception()
944 937 break
945 938 elif uuid == ec.uuid:
946 939 try:
947 940 raise KeyError("uuid %r in use" % uuid)
948 941 except:
949 942 self.log.error("uuid %r in use", uuid, exc_info=True)
950 943 content = error.wrap_exception()
951 944 break
952 945
953 946 msg = self.session.send(self.query, "registration_reply",
954 947 content=content,
955 948 ident=reg)
956 949
957 950 heart = cast_bytes(uuid)
958 951
959 952 if content['status'] == 'ok':
960 953 if heart in self.heartmonitor.hearts:
961 954 # already beating
962 955 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
963 956 self.finish_registration(heart)
964 957 else:
965 958 purge = lambda : self._purge_stalled_registration(heart)
966 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
967 dc.start()
968 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
959 t = self.loop.add_timeout(
960 self.loop.time() + self.registration_timeout,
961 purge,
962 )
963 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=t)
969 964 else:
970 965 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
971 966
972 967 return eid
973 968
974 969 def unregister_engine(self, ident, msg):
975 970 """Unregister an engine that explicitly requested to leave."""
976 971 try:
977 972 eid = msg['content']['id']
978 973 except:
979 974 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
980 975 return
981 976 self.log.info("registration::unregister_engine(%r)", eid)
982 # print (eid)
977
983 978 uuid = self.keytable[eid]
984 979 content=dict(id=eid, uuid=uuid)
985 980 self.dead_engines.add(uuid)
986 # self.ids.remove(eid)
987 # uuid = self.keytable.pop(eid)
988 #
989 # ec = self.engines.pop(eid)
990 # self.hearts.pop(ec.heartbeat)
991 # self.by_ident.pop(ec.queue)
992 # self.completed.pop(eid)
993 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
994 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
995 dc.start()
981
982 self.loop.add_timeout(
983 self.loop.time() + self.registration_timeout,
984 lambda : self._handle_stranded_msgs(eid, uuid),
985 )
996 986 ############## TODO: HANDLE IT ################
997 987
998 988 self._save_engine_state()
999 989
1000 990 if self.notifier:
1001 991 self.session.send(self.notifier, "unregistration_notification", content=content)
1002 992
1003 993 def _handle_stranded_msgs(self, eid, uuid):
1004 994 """Handle messages known to be on an engine when the engine unregisters.
1005 995
1006 996 It is possible that this will fire prematurely - that is, an engine will
1007 997 go down after completing a result, and the client will be notified
1008 998 that the result failed and later receive the actual result.
1009 999 """
1010 1000
1011 1001 outstanding = self.queues[eid]
1012 1002
1013 1003 for msg_id in outstanding:
1014 1004 self.pending.remove(msg_id)
1015 1005 self.all_completed.add(msg_id)
1016 1006 try:
1017 1007 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
1018 1008 except:
1019 1009 content = error.wrap_exception()
1020 1010 # build a fake header:
1021 1011 header = {}
1022 1012 header['engine'] = uuid
1023 1013 header['date'] = datetime.now()
1024 1014 rec = dict(result_content=content, result_header=header, result_buffers=[])
1025 1015 rec['completed'] = header['date']
1026 1016 rec['engine_uuid'] = uuid
1027 1017 try:
1028 1018 self.db.update_record(msg_id, rec)
1029 1019 except Exception:
1030 1020 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1031 1021
1032 1022
1033 1023 def finish_registration(self, heart):
1034 1024 """Second half of engine registration, called after our HeartMonitor
1035 1025 has received a beat from the Engine's Heart."""
1036 1026 try:
1037 1027 ec = self.incoming_registrations.pop(heart)
1038 1028 except KeyError:
1039 1029 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1040 1030 return
1041 1031 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1042 1032 if ec.stallback is not None:
1043 ec.stallback.stop()
1033 self.loop.remove_timeout(ec.stallback)
1044 1034 eid = ec.id
1045 1035 self.ids.add(eid)
1046 1036 self.keytable[eid] = ec.uuid
1047 1037 self.engines[eid] = ec
1048 1038 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1049 1039 self.queues[eid] = list()
1050 1040 self.tasks[eid] = list()
1051 1041 self.completed[eid] = list()
1052 1042 self.hearts[heart] = eid
1053 1043 content = dict(id=eid, uuid=self.engines[eid].uuid)
1054 1044 if self.notifier:
1055 1045 self.session.send(self.notifier, "registration_notification", content=content)
1056 1046 self.log.info("engine::Engine Connected: %i", eid)
1057 1047
1058 1048 self._save_engine_state()
1059 1049
1060 1050 def _purge_stalled_registration(self, heart):
1061 1051 if heart in self.incoming_registrations:
1062 1052 ec = self.incoming_registrations.pop(heart)
1063 1053 self.log.info("registration::purging stalled registration: %i", ec.id)
1064 1054 else:
1065 1055 pass
1066 1056
1067 1057 #-------------------------------------------------------------------------
1068 1058 # Engine State
1069 1059 #-------------------------------------------------------------------------
1070 1060
1071 1061
1072 1062 def _cleanup_engine_state_file(self):
1073 1063 """cleanup engine state mapping"""
1074 1064
1075 1065 if os.path.exists(self.engine_state_file):
1076 1066 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1077 1067 try:
1078 1068 os.remove(self.engine_state_file)
1079 1069 except IOError:
1080 1070 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1081 1071
1082 1072
1083 1073 def _save_engine_state(self):
1084 1074 """save engine mapping to JSON file"""
1085 1075 if not self.engine_state_file:
1086 1076 return
1087 1077 self.log.debug("save engine state to %s" % self.engine_state_file)
1088 1078 state = {}
1089 1079 engines = {}
1090 1080 for eid, ec in iteritems(self.engines):
1091 1081 if ec.uuid not in self.dead_engines:
1092 1082 engines[eid] = ec.uuid
1093 1083
1094 1084 state['engines'] = engines
1095 1085
1096 1086 state['next_id'] = self._idcounter
1097 1087
1098 1088 with open(self.engine_state_file, 'w') as f:
1099 1089 json.dump(state, f)
1100 1090
1101 1091
1102 1092 def _load_engine_state(self):
1103 1093 """load engine mapping from JSON file"""
1104 1094 if not os.path.exists(self.engine_state_file):
1105 1095 return
1106 1096
1107 1097 self.log.info("loading engine state from %s" % self.engine_state_file)
1108 1098
1109 1099 with open(self.engine_state_file) as f:
1110 1100 state = json.load(f)
1111 1101
1112 1102 save_notifier = self.notifier
1113 1103 self.notifier = None
1114 1104 for eid, uuid in iteritems(state['engines']):
1115 1105 heart = uuid.encode('ascii')
1116 1106 # start with this heart as current and beating:
1117 1107 self.heartmonitor.responses.add(heart)
1118 1108 self.heartmonitor.hearts.add(heart)
1119 1109
1120 1110 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1121 1111 self.finish_registration(heart)
1122 1112
1123 1113 self.notifier = save_notifier
1124 1114
1125 1115 self._idcounter = state['next_id']
1126 1116
1127 1117 #-------------------------------------------------------------------------
1128 1118 # Client Requests
1129 1119 #-------------------------------------------------------------------------
1130 1120
1131 1121 def shutdown_request(self, client_id, msg):
1132 1122 """handle shutdown request."""
1133 1123 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1134 1124 # also notify other clients of shutdown
1135 1125 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1136 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1137 dc.start()
1126 self.loop.add_timeout(self.loop.time() + 1, self._shutdown)
1138 1127
1139 1128 def _shutdown(self):
1140 1129 self.log.info("hub::hub shutting down.")
1141 1130 time.sleep(0.1)
1142 1131 sys.exit(0)
1143 1132
1144 1133
1145 1134 def check_load(self, client_id, msg):
1146 1135 content = msg['content']
1147 1136 try:
1148 1137 targets = content['targets']
1149 1138 targets = self._validate_targets(targets)
1150 1139 except:
1151 1140 content = error.wrap_exception()
1152 1141 self.session.send(self.query, "hub_error",
1153 1142 content=content, ident=client_id)
1154 1143 return
1155 1144
1156 1145 content = dict(status='ok')
1157 1146 # loads = {}
1158 1147 for t in targets:
1159 1148 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1160 1149 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1161 1150
1162 1151
1163 1152 def queue_status(self, client_id, msg):
1164 1153 """Return the Queue status of one or more targets.
1165 1154
1166 1155 If verbose, return the msg_ids, else return len of each type.
1167 1156
1168 1157 Keys:
1169 1158
1170 1159 * queue (pending MUX jobs)
1171 1160 * tasks (pending Task jobs)
1172 1161 * completed (finished jobs from both queues)
1173 1162 """
1174 1163 content = msg['content']
1175 1164 targets = content['targets']
1176 1165 try:
1177 1166 targets = self._validate_targets(targets)
1178 1167 except:
1179 1168 content = error.wrap_exception()
1180 1169 self.session.send(self.query, "hub_error",
1181 1170 content=content, ident=client_id)
1182 1171 return
1183 1172 verbose = content.get('verbose', False)
1184 1173 content = dict(status='ok')
1185 1174 for t in targets:
1186 1175 queue = self.queues[t]
1187 1176 completed = self.completed[t]
1188 1177 tasks = self.tasks[t]
1189 1178 if not verbose:
1190 1179 queue = len(queue)
1191 1180 completed = len(completed)
1192 1181 tasks = len(tasks)
1193 1182 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1194 1183 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1195 1184 # print (content)
1196 1185 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1197 1186
1198 1187 def purge_results(self, client_id, msg):
1199 1188 """Purge results from memory. This method is more valuable before we move
1200 1189 to a DB based message storage mechanism."""
1201 1190 content = msg['content']
1202 1191 self.log.info("Dropping records with %s", content)
1203 1192 msg_ids = content.get('msg_ids', [])
1204 1193 reply = dict(status='ok')
1205 1194 if msg_ids == 'all':
1206 1195 try:
1207 1196 self.db.drop_matching_records(dict(completed={'$ne':None}))
1208 1197 except Exception:
1209 1198 reply = error.wrap_exception()
1210 1199 self.log.exception("Error dropping records")
1211 1200 else:
1212 1201 pending = [m for m in msg_ids if (m in self.pending)]
1213 1202 if pending:
1214 1203 try:
1215 1204 raise IndexError("msg pending: %r" % pending[0])
1216 1205 except:
1217 1206 reply = error.wrap_exception()
1218 1207 self.log.exception("Error dropping records")
1219 1208 else:
1220 1209 try:
1221 1210 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1222 1211 except Exception:
1223 1212 reply = error.wrap_exception()
1224 1213 self.log.exception("Error dropping records")
1225 1214
1226 1215 if reply['status'] == 'ok':
1227 1216 eids = content.get('engine_ids', [])
1228 1217 for eid in eids:
1229 1218 if eid not in self.engines:
1230 1219 try:
1231 1220 raise IndexError("No such engine: %i" % eid)
1232 1221 except:
1233 1222 reply = error.wrap_exception()
1234 1223 self.log.exception("Error dropping records")
1235 1224 break
1236 1225 uid = self.engines[eid].uuid
1237 1226 try:
1238 1227 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1239 1228 except Exception:
1240 1229 reply = error.wrap_exception()
1241 1230 self.log.exception("Error dropping records")
1242 1231 break
1243 1232
1244 1233 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1245 1234
1246 1235 def resubmit_task(self, client_id, msg):
1247 1236 """Resubmit one or more tasks."""
1248 1237 def finish(reply):
1249 1238 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1250 1239
1251 1240 content = msg['content']
1252 1241 msg_ids = content['msg_ids']
1253 1242 reply = dict(status='ok')
1254 1243 try:
1255 1244 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1256 1245 'header', 'content', 'buffers'])
1257 1246 except Exception:
1258 1247 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1259 1248 return finish(error.wrap_exception())
1260 1249
1261 1250 # validate msg_ids
1262 1251 found_ids = [ rec['msg_id'] for rec in records ]
1263 1252 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1264 1253 if len(records) > len(msg_ids):
1265 1254 try:
1266 1255 raise RuntimeError("DB appears to be in an inconsistent state."
1267 1256 "More matching records were found than should exist")
1268 1257 except Exception:
1269 1258 self.log.exception("Failed to resubmit task")
1270 1259 return finish(error.wrap_exception())
1271 1260 elif len(records) < len(msg_ids):
1272 1261 missing = [ m for m in msg_ids if m not in found_ids ]
1273 1262 try:
1274 1263 raise KeyError("No such msg(s): %r" % missing)
1275 1264 except KeyError:
1276 1265 self.log.exception("Failed to resubmit task")
1277 1266 return finish(error.wrap_exception())
1278 1267 elif pending_ids:
1279 1268 pass
1280 1269 # no need to raise on resubmit of pending task, now that we
1281 1270 # resubmit under new ID, but do we want to raise anyway?
1282 1271 # msg_id = invalid_ids[0]
1283 1272 # try:
1284 1273 # raise ValueError("Task(s) %r appears to be inflight" % )
1285 1274 # except Exception:
1286 1275 # return finish(error.wrap_exception())
1287 1276
1288 1277 # mapping of original IDs to resubmitted IDs
1289 1278 resubmitted = {}
1290 1279
1291 1280 # send the messages
1292 1281 for rec in records:
1293 1282 header = rec['header']
1294 1283 msg = self.session.msg(header['msg_type'], parent=header)
1295 1284 msg_id = msg['msg_id']
1296 1285 msg['content'] = rec['content']
1297 1286
1298 1287 # use the old header, but update msg_id and timestamp
1299 1288 fresh = msg['header']
1300 1289 header['msg_id'] = fresh['msg_id']
1301 1290 header['date'] = fresh['date']
1302 1291 msg['header'] = header
1303 1292
1304 1293 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1305 1294
1306 1295 resubmitted[rec['msg_id']] = msg_id
1307 1296 self.pending.add(msg_id)
1308 1297 msg['buffers'] = rec['buffers']
1309 1298 try:
1310 1299 self.db.add_record(msg_id, init_record(msg))
1311 1300 except Exception:
1312 1301 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1313 1302 return finish(error.wrap_exception())
1314 1303
1315 1304 finish(dict(status='ok', resubmitted=resubmitted))
1316 1305
1317 1306 # store the new IDs in the Task DB
1318 1307 for msg_id, resubmit_id in iteritems(resubmitted):
1319 1308 try:
1320 1309 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1321 1310 except Exception:
1322 1311 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1323 1312
1324 1313
1325 1314 def _extract_record(self, rec):
1326 1315 """decompose a TaskRecord dict into subsection of reply for get_result"""
1327 1316 io_dict = {}
1328 1317 for key in ('execute_input', 'execute_result', 'error', 'stdout', 'stderr'):
1329 1318 io_dict[key] = rec[key]
1330 1319 content = {
1331 1320 'header': rec['header'],
1332 1321 'metadata': rec['metadata'],
1333 1322 'result_metadata': rec['result_metadata'],
1334 1323 'result_header' : rec['result_header'],
1335 1324 'result_content': rec['result_content'],
1336 1325 'received' : rec['received'],
1337 1326 'io' : io_dict,
1338 1327 }
1339 1328 if rec['result_buffers']:
1340 1329 buffers = list(map(bytes, rec['result_buffers']))
1341 1330 else:
1342 1331 buffers = []
1343 1332
1344 1333 return content, buffers
1345 1334
1346 1335 def get_results(self, client_id, msg):
1347 1336 """Get the result of 1 or more messages."""
1348 1337 content = msg['content']
1349 1338 msg_ids = sorted(set(content['msg_ids']))
1350 1339 statusonly = content.get('status_only', False)
1351 1340 pending = []
1352 1341 completed = []
1353 1342 content = dict(status='ok')
1354 1343 content['pending'] = pending
1355 1344 content['completed'] = completed
1356 1345 buffers = []
1357 1346 if not statusonly:
1358 1347 try:
1359 1348 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1360 1349 # turn match list into dict, for faster lookup
1361 1350 records = {}
1362 1351 for rec in matches:
1363 1352 records[rec['msg_id']] = rec
1364 1353 except Exception:
1365 1354 content = error.wrap_exception()
1366 1355 self.log.exception("Failed to get results")
1367 1356 self.session.send(self.query, "result_reply", content=content,
1368 1357 parent=msg, ident=client_id)
1369 1358 return
1370 1359 else:
1371 1360 records = {}
1372 1361 for msg_id in msg_ids:
1373 1362 if msg_id in self.pending:
1374 1363 pending.append(msg_id)
1375 1364 elif msg_id in self.all_completed:
1376 1365 completed.append(msg_id)
1377 1366 if not statusonly:
1378 1367 c,bufs = self._extract_record(records[msg_id])
1379 1368 content[msg_id] = c
1380 1369 buffers.extend(bufs)
1381 1370 elif msg_id in records:
1382 1371 if rec['completed']:
1383 1372 completed.append(msg_id)
1384 1373 c,bufs = self._extract_record(records[msg_id])
1385 1374 content[msg_id] = c
1386 1375 buffers.extend(bufs)
1387 1376 else:
1388 1377 pending.append(msg_id)
1389 1378 else:
1390 1379 try:
1391 1380 raise KeyError('No such message: '+msg_id)
1392 1381 except:
1393 1382 content = error.wrap_exception()
1394 1383 break
1395 1384 self.session.send(self.query, "result_reply", content=content,
1396 1385 parent=msg, ident=client_id,
1397 1386 buffers=buffers)
1398 1387
1399 1388 def get_history(self, client_id, msg):
1400 1389 """Get a list of all msg_ids in our DB records"""
1401 1390 try:
1402 1391 msg_ids = self.db.get_history()
1403 1392 except Exception as e:
1404 1393 content = error.wrap_exception()
1405 1394 self.log.exception("Failed to get history")
1406 1395 else:
1407 1396 content = dict(status='ok', history=msg_ids)
1408 1397
1409 1398 self.session.send(self.query, "history_reply", content=content,
1410 1399 parent=msg, ident=client_id)
1411 1400
1412 1401 def db_query(self, client_id, msg):
1413 1402 """Perform a raw query on the task record database."""
1414 1403 content = msg['content']
1415 1404 query = extract_dates(content.get('query', {}))
1416 1405 keys = content.get('keys', None)
1417 1406 buffers = []
1418 1407 empty = list()
1419 1408 try:
1420 1409 records = self.db.find_records(query, keys)
1421 1410 except Exception as e:
1422 1411 content = error.wrap_exception()
1423 1412 self.log.exception("DB query failed")
1424 1413 else:
1425 1414 # extract buffers from reply content:
1426 1415 if keys is not None:
1427 1416 buffer_lens = [] if 'buffers' in keys else None
1428 1417 result_buffer_lens = [] if 'result_buffers' in keys else None
1429 1418 else:
1430 1419 buffer_lens = None
1431 1420 result_buffer_lens = None
1432 1421
1433 1422 for rec in records:
1434 1423 # buffers may be None, so double check
1435 1424 b = rec.pop('buffers', empty) or empty
1436 1425 if buffer_lens is not None:
1437 1426 buffer_lens.append(len(b))
1438 1427 buffers.extend(b)
1439 1428 rb = rec.pop('result_buffers', empty) or empty
1440 1429 if result_buffer_lens is not None:
1441 1430 result_buffer_lens.append(len(rb))
1442 1431 buffers.extend(rb)
1443 1432 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1444 1433 result_buffer_lens=result_buffer_lens)
1445 1434 # self.log.debug (content)
1446 1435 self.session.send(self.query, "db_reply", content=content,
1447 1436 parent=msg, ident=client_id,
1448 1437 buffers=buffers)
1449 1438
@@ -1,848 +1,849
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6 """
7 7
8 8 # Copyright (c) IPython Development Team.
9 9 # Distributed under the terms of the Modified BSD License.
10 10
11 11 import logging
12 12 import sys
13 13 import time
14 14
15 15 from collections import deque
16 16 from datetime import datetime
17 17 from random import randint, random
18 18 from types import FunctionType
19 19
20 20 try:
21 21 import numpy
22 22 except ImportError:
23 23 numpy = None
24 24
25 25 import zmq
26 26 from zmq.eventloop import ioloop, zmqstream
27 27
28 28 # local imports
29 29 from IPython.external.decorator import decorator
30 30 from IPython.config.application import Application
31 31 from IPython.config.loader import Config
32 32 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
33 33 from IPython.utils.py3compat import cast_bytes
34 34
35 35 from IPython.parallel import error, util
36 36 from IPython.parallel.factory import SessionFactory
37 37 from IPython.parallel.util import connect_logger, local_logger
38 38
39 39 from .dependency import Dependency
40 40
41 41 @decorator
42 42 def logged(f,self,*args,**kwargs):
43 43 # print ("#--------------------")
44 44 self.log.debug("scheduler::%s(*%s,**%s)", f.__name__, args, kwargs)
45 45 # print ("#--")
46 46 return f(self,*args, **kwargs)
47 47
48 48 #----------------------------------------------------------------------
49 49 # Chooser functions
50 50 #----------------------------------------------------------------------
51 51
52 52 def plainrandom(loads):
53 53 """Plain random pick."""
54 54 n = len(loads)
55 55 return randint(0,n-1)
56 56
57 57 def lru(loads):
58 58 """Always pick the front of the line.
59 59
60 60 The content of `loads` is ignored.
61 61
62 62 Assumes LRU ordering of loads, with oldest first.
63 63 """
64 64 return 0
65 65
66 66 def twobin(loads):
67 67 """Pick two at random, use the LRU of the two.
68 68
69 69 The content of loads is ignored.
70 70
71 71 Assumes LRU ordering of loads, with oldest first.
72 72 """
73 73 n = len(loads)
74 74 a = randint(0,n-1)
75 75 b = randint(0,n-1)
76 76 return min(a,b)
77 77
78 78 def weighted(loads):
79 79 """Pick two at random using inverse load as weight.
80 80
81 81 Return the less loaded of the two.
82 82 """
83 83 # weight 0 a million times more than 1:
84 84 weights = 1./(1e-6+numpy.array(loads))
85 85 sums = weights.cumsum()
86 86 t = sums[-1]
87 87 x = random()*t
88 88 y = random()*t
89 89 idx = 0
90 90 idy = 0
91 91 while sums[idx] < x:
92 92 idx += 1
93 93 while sums[idy] < y:
94 94 idy += 1
95 95 if weights[idy] > weights[idx]:
96 96 return idy
97 97 else:
98 98 return idx
99 99
100 100 def leastload(loads):
101 101 """Always choose the lowest load.
102 102
103 103 If the lowest load occurs more than once, the first
104 104 occurance will be used. If loads has LRU ordering, this means
105 105 the LRU of those with the lowest load is chosen.
106 106 """
107 107 return loads.index(min(loads))
108 108
109 109 #---------------------------------------------------------------------
110 110 # Classes
111 111 #---------------------------------------------------------------------
112 112
113 113
114 114 # store empty default dependency:
115 115 MET = Dependency([])
116 116
117 117
118 118 class Job(object):
119 119 """Simple container for a job"""
120 120 def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
121 121 targets, after, follow, timeout):
122 122 self.msg_id = msg_id
123 123 self.raw_msg = raw_msg
124 124 self.idents = idents
125 125 self.msg = msg
126 126 self.header = header
127 127 self.metadata = metadata
128 128 self.targets = targets
129 129 self.after = after
130 130 self.follow = follow
131 131 self.timeout = timeout
132 132
133 133 self.removed = False # used for lazy-delete from sorted queue
134 134 self.timestamp = time.time()
135 135 self.timeout_id = 0
136 136 self.blacklist = set()
137 137
138 138 def __lt__(self, other):
139 139 return self.timestamp < other.timestamp
140 140
141 141 def __cmp__(self, other):
142 142 return cmp(self.timestamp, other.timestamp)
143 143
144 144 @property
145 145 def dependents(self):
146 146 return self.follow.union(self.after)
147 147
148 148
149 149 class TaskScheduler(SessionFactory):
150 150 """Python TaskScheduler object.
151 151
152 152 This is the simplest object that supports msg_id based
153 153 DAG dependencies. *Only* task msg_ids are checked, not
154 154 msg_ids of jobs submitted via the MUX queue.
155 155
156 156 """
157 157
158 158 hwm = Integer(1, config=True,
159 159 help="""specify the High Water Mark (HWM) for the downstream
160 160 socket in the Task scheduler. This is the maximum number
161 161 of allowed outstanding tasks on each engine.
162 162
163 163 The default (1) means that only one task can be outstanding on each
164 164 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
165 165 engines continue to be assigned tasks while they are working,
166 166 effectively hiding network latency behind computation, but can result
167 167 in an imbalance of work when submitting many heterogenous tasks all at
168 168 once. Any positive value greater than one is a compromise between the
169 169 two.
170 170
171 171 """
172 172 )
173 173 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
174 174 'leastload', config=True, allow_none=False,
175 175 help="""select the task scheduler scheme [default: Python LRU]
176 176 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
177 177 )
178 178 def _scheme_name_changed(self, old, new):
179 179 self.log.debug("Using scheme %r"%new)
180 180 self.scheme = globals()[new]
181 181
182 182 # input arguments:
183 183 scheme = Instance(FunctionType) # function for determining the destination
184 184 def _scheme_default(self):
185 185 return leastload
186 186 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
187 187 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
188 188 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
189 189 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
190 190 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
191 191
192 192 # internals:
193 193 queue = Instance(deque) # sorted list of Jobs
194 194 def _queue_default(self):
195 195 return deque()
196 196 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
197 197 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
198 198 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
199 199 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
200 200 pending = Dict() # dict by engine_uuid of submitted tasks
201 201 completed = Dict() # dict by engine_uuid of completed tasks
202 202 failed = Dict() # dict by engine_uuid of failed tasks
203 203 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
204 204 clients = Dict() # dict by msg_id for who submitted the task
205 205 targets = List() # list of target IDENTs
206 206 loads = List() # list of engine loads
207 207 # full = Set() # set of IDENTs that have HWM outstanding tasks
208 208 all_completed = Set() # set of all completed tasks
209 209 all_failed = Set() # set of all failed tasks
210 210 all_done = Set() # set of all finished tasks=union(completed,failed)
211 211 all_ids = Set() # set of all submitted task IDs
212 212
213 213 ident = CBytes() # ZMQ identity. This should just be self.session.session
214 214 # but ensure Bytes
215 215 def _ident_default(self):
216 216 return self.session.bsession
217 217
218 218 def start(self):
219 219 self.query_stream.on_recv(self.dispatch_query_reply)
220 220 self.session.send(self.query_stream, "connection_request", {})
221 221
222 222 self.engine_stream.on_recv(self.dispatch_result, copy=False)
223 223 self.client_stream.on_recv(self.dispatch_submission, copy=False)
224 224
225 225 self._notification_handlers = dict(
226 226 registration_notification = self._register_engine,
227 227 unregistration_notification = self._unregister_engine
228 228 )
229 229 self.notifier_stream.on_recv(self.dispatch_notification)
230 230 self.log.info("Scheduler started [%s]" % self.scheme_name)
231 231
232 232 def resume_receiving(self):
233 233 """Resume accepting jobs."""
234 234 self.client_stream.on_recv(self.dispatch_submission, copy=False)
235 235
236 236 def stop_receiving(self):
237 237 """Stop accepting jobs while there are no engines.
238 238 Leave them in the ZMQ queue."""
239 239 self.client_stream.on_recv(None)
240 240
241 241 #-----------------------------------------------------------------------
242 242 # [Un]Registration Handling
243 243 #-----------------------------------------------------------------------
244 244
245 245
246 246 def dispatch_query_reply(self, msg):
247 247 """handle reply to our initial connection request"""
248 248 try:
249 249 idents,msg = self.session.feed_identities(msg)
250 250 except ValueError:
251 251 self.log.warn("task::Invalid Message: %r",msg)
252 252 return
253 253 try:
254 254 msg = self.session.unserialize(msg)
255 255 except ValueError:
256 256 self.log.warn("task::Unauthorized message from: %r"%idents)
257 257 return
258 258
259 259 content = msg['content']
260 260 for uuid in content.get('engines', {}).values():
261 261 self._register_engine(cast_bytes(uuid))
262 262
263 263
264 264 @util.log_errors
265 265 def dispatch_notification(self, msg):
266 266 """dispatch register/unregister events."""
267 267 try:
268 268 idents,msg = self.session.feed_identities(msg)
269 269 except ValueError:
270 270 self.log.warn("task::Invalid Message: %r",msg)
271 271 return
272 272 try:
273 273 msg = self.session.unserialize(msg)
274 274 except ValueError:
275 275 self.log.warn("task::Unauthorized message from: %r"%idents)
276 276 return
277 277
278 278 msg_type = msg['header']['msg_type']
279 279
280 280 handler = self._notification_handlers.get(msg_type, None)
281 281 if handler is None:
282 282 self.log.error("Unhandled message type: %r"%msg_type)
283 283 else:
284 284 try:
285 285 handler(cast_bytes(msg['content']['uuid']))
286 286 except Exception:
287 287 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
288 288
289 289 def _register_engine(self, uid):
290 290 """New engine with ident `uid` became available."""
291 291 # head of the line:
292 292 self.targets.insert(0,uid)
293 293 self.loads.insert(0,0)
294 294
295 295 # initialize sets
296 296 self.completed[uid] = set()
297 297 self.failed[uid] = set()
298 298 self.pending[uid] = {}
299 299
300 300 # rescan the graph:
301 301 self.update_graph(None)
302 302
303 303 def _unregister_engine(self, uid):
304 304 """Existing engine with ident `uid` became unavailable."""
305 305 if len(self.targets) == 1:
306 306 # this was our only engine
307 307 pass
308 308
309 309 # handle any potentially finished tasks:
310 310 self.engine_stream.flush()
311 311
312 312 # don't pop destinations, because they might be used later
313 313 # map(self.destinations.pop, self.completed.pop(uid))
314 314 # map(self.destinations.pop, self.failed.pop(uid))
315 315
316 316 # prevent this engine from receiving work
317 317 idx = self.targets.index(uid)
318 318 self.targets.pop(idx)
319 319 self.loads.pop(idx)
320 320
321 321 # wait 5 seconds before cleaning up pending jobs, since the results might
322 322 # still be incoming
323 323 if self.pending[uid]:
324 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
325 dc.start()
324 self.loop.add_timeout(self.loop.time() + 5,
325 lambda : self.handle_stranded_tasks(uid),
326 )
326 327 else:
327 328 self.completed.pop(uid)
328 329 self.failed.pop(uid)
329 330
330 331
331 332 def handle_stranded_tasks(self, engine):
332 333 """Deal with jobs resident in an engine that died."""
333 334 lost = self.pending[engine]
334 335 for msg_id in lost.keys():
335 336 if msg_id not in self.pending[engine]:
336 337 # prevent double-handling of messages
337 338 continue
338 339
339 340 raw_msg = lost[msg_id].raw_msg
340 341 idents,msg = self.session.feed_identities(raw_msg, copy=False)
341 342 parent = self.session.unpack(msg[1].bytes)
342 343 idents = [engine, idents[0]]
343 344
344 345 # build fake error reply
345 346 try:
346 347 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
347 348 except:
348 349 content = error.wrap_exception()
349 350 # build fake metadata
350 351 md = dict(
351 352 status=u'error',
352 353 engine=engine.decode('ascii'),
353 354 date=datetime.now(),
354 355 )
355 356 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
356 357 raw_reply = list(map(zmq.Message, self.session.serialize(msg, ident=idents)))
357 358 # and dispatch it
358 359 self.dispatch_result(raw_reply)
359 360
360 361 # finally scrub completed/failed lists
361 362 self.completed.pop(engine)
362 363 self.failed.pop(engine)
363 364
364 365
365 366 #-----------------------------------------------------------------------
366 367 # Job Submission
367 368 #-----------------------------------------------------------------------
368 369
369 370
370 371 @util.log_errors
371 372 def dispatch_submission(self, raw_msg):
372 373 """Dispatch job submission to appropriate handlers."""
373 374 # ensure targets up to date:
374 375 self.notifier_stream.flush()
375 376 try:
376 377 idents, msg = self.session.feed_identities(raw_msg, copy=False)
377 378 msg = self.session.unserialize(msg, content=False, copy=False)
378 379 except Exception:
379 380 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
380 381 return
381 382
382 383
383 384 # send to monitor
384 385 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
385 386
386 387 header = msg['header']
387 388 md = msg['metadata']
388 389 msg_id = header['msg_id']
389 390 self.all_ids.add(msg_id)
390 391
391 392 # get targets as a set of bytes objects
392 393 # from a list of unicode objects
393 394 targets = md.get('targets', [])
394 395 targets = set(map(cast_bytes, targets))
395 396
396 397 retries = md.get('retries', 0)
397 398 self.retries[msg_id] = retries
398 399
399 400 # time dependencies
400 401 after = md.get('after', None)
401 402 if after:
402 403 after = Dependency(after)
403 404 if after.all:
404 405 if after.success:
405 406 after = Dependency(after.difference(self.all_completed),
406 407 success=after.success,
407 408 failure=after.failure,
408 409 all=after.all,
409 410 )
410 411 if after.failure:
411 412 after = Dependency(after.difference(self.all_failed),
412 413 success=after.success,
413 414 failure=after.failure,
414 415 all=after.all,
415 416 )
416 417 if after.check(self.all_completed, self.all_failed):
417 418 # recast as empty set, if `after` already met,
418 419 # to prevent unnecessary set comparisons
419 420 after = MET
420 421 else:
421 422 after = MET
422 423
423 424 # location dependencies
424 425 follow = Dependency(md.get('follow', []))
425 426
426 427 timeout = md.get('timeout', None)
427 428 if timeout:
428 429 timeout = float(timeout)
429 430
430 431 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
431 432 header=header, targets=targets, after=after, follow=follow,
432 433 timeout=timeout, metadata=md,
433 434 )
434 435 # validate and reduce dependencies:
435 436 for dep in after,follow:
436 437 if not dep: # empty dependency
437 438 continue
438 439 # check valid:
439 440 if msg_id in dep or dep.difference(self.all_ids):
440 441 self.queue_map[msg_id] = job
441 442 return self.fail_unreachable(msg_id, error.InvalidDependency)
442 443 # check if unreachable:
443 444 if dep.unreachable(self.all_completed, self.all_failed):
444 445 self.queue_map[msg_id] = job
445 446 return self.fail_unreachable(msg_id)
446 447
447 448 if after.check(self.all_completed, self.all_failed):
448 449 # time deps already met, try to run
449 450 if not self.maybe_run(job):
450 451 # can't run yet
451 452 if msg_id not in self.all_failed:
452 453 # could have failed as unreachable
453 454 self.save_unmet(job)
454 455 else:
455 456 self.save_unmet(job)
456 457
457 458 def job_timeout(self, job, timeout_id):
458 459 """callback for a job's timeout.
459 460
460 461 The job may or may not have been run at this point.
461 462 """
462 463 if job.timeout_id != timeout_id:
463 464 # not the most recent call
464 465 return
465 466 now = time.time()
466 467 if job.timeout >= (now + 1):
467 468 self.log.warn("task %s timeout fired prematurely: %s > %s",
468 469 job.msg_id, job.timeout, now
469 470 )
470 471 if job.msg_id in self.queue_map:
471 472 # still waiting, but ran out of time
472 473 self.log.info("task %r timed out", job.msg_id)
473 474 self.fail_unreachable(job.msg_id, error.TaskTimeout)
474 475
475 476 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
476 477 """a task has become unreachable, send a reply with an ImpossibleDependency
477 478 error."""
478 479 if msg_id not in self.queue_map:
479 480 self.log.error("task %r already failed!", msg_id)
480 481 return
481 482 job = self.queue_map.pop(msg_id)
482 483 # lazy-delete from the queue
483 484 job.removed = True
484 485 for mid in job.dependents:
485 486 if mid in self.graph:
486 487 self.graph[mid].remove(msg_id)
487 488
488 489 try:
489 490 raise why()
490 491 except:
491 492 content = error.wrap_exception()
492 493 self.log.debug("task %r failing as unreachable with: %s", msg_id, content['ename'])
493 494
494 495 self.all_done.add(msg_id)
495 496 self.all_failed.add(msg_id)
496 497
497 498 msg = self.session.send(self.client_stream, 'apply_reply', content,
498 499 parent=job.header, ident=job.idents)
499 500 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
500 501
501 502 self.update_graph(msg_id, success=False)
502 503
503 504 def available_engines(self):
504 505 """return a list of available engine indices based on HWM"""
505 506 if not self.hwm:
506 507 return list(range(len(self.targets)))
507 508 available = []
508 509 for idx in range(len(self.targets)):
509 510 if self.loads[idx] < self.hwm:
510 511 available.append(idx)
511 512 return available
512 513
513 514 def maybe_run(self, job):
514 515 """check location dependencies, and run if they are met."""
515 516 msg_id = job.msg_id
516 517 self.log.debug("Attempting to assign task %s", msg_id)
517 518 available = self.available_engines()
518 519 if not available:
519 520 # no engines, definitely can't run
520 521 return False
521 522
522 523 if job.follow or job.targets or job.blacklist or self.hwm:
523 524 # we need a can_run filter
524 525 def can_run(idx):
525 526 # check hwm
526 527 if self.hwm and self.loads[idx] == self.hwm:
527 528 return False
528 529 target = self.targets[idx]
529 530 # check blacklist
530 531 if target in job.blacklist:
531 532 return False
532 533 # check targets
533 534 if job.targets and target not in job.targets:
534 535 return False
535 536 # check follow
536 537 return job.follow.check(self.completed[target], self.failed[target])
537 538
538 539 indices = list(filter(can_run, available))
539 540
540 541 if not indices:
541 542 # couldn't run
542 543 if job.follow.all:
543 544 # check follow for impossibility
544 545 dests = set()
545 546 relevant = set()
546 547 if job.follow.success:
547 548 relevant = self.all_completed
548 549 if job.follow.failure:
549 550 relevant = relevant.union(self.all_failed)
550 551 for m in job.follow.intersection(relevant):
551 552 dests.add(self.destinations[m])
552 553 if len(dests) > 1:
553 554 self.queue_map[msg_id] = job
554 555 self.fail_unreachable(msg_id)
555 556 return False
556 557 if job.targets:
557 558 # check blacklist+targets for impossibility
558 559 job.targets.difference_update(job.blacklist)
559 560 if not job.targets or not job.targets.intersection(self.targets):
560 561 self.queue_map[msg_id] = job
561 562 self.fail_unreachable(msg_id)
562 563 return False
563 564 return False
564 565 else:
565 566 indices = None
566 567
567 568 self.submit_task(job, indices)
568 569 return True
569 570
570 571 def save_unmet(self, job):
571 572 """Save a message for later submission when its dependencies are met."""
572 573 msg_id = job.msg_id
573 574 self.log.debug("Adding task %s to the queue", msg_id)
574 575 self.queue_map[msg_id] = job
575 576 self.queue.append(job)
576 577 # track the ids in follow or after, but not those already finished
577 578 for dep_id in job.after.union(job.follow).difference(self.all_done):
578 579 if dep_id not in self.graph:
579 580 self.graph[dep_id] = set()
580 581 self.graph[dep_id].add(msg_id)
581 582
582 583 # schedule timeout callback
583 584 if job.timeout:
584 585 timeout_id = job.timeout_id = job.timeout_id + 1
585 586 self.loop.add_timeout(time.time() + job.timeout,
586 587 lambda : self.job_timeout(job, timeout_id)
587 588 )
588 589
589 590
590 591 def submit_task(self, job, indices=None):
591 592 """Submit a task to any of a subset of our targets."""
592 593 if indices:
593 594 loads = [self.loads[i] for i in indices]
594 595 else:
595 596 loads = self.loads
596 597 idx = self.scheme(loads)
597 598 if indices:
598 599 idx = indices[idx]
599 600 target = self.targets[idx]
600 601 # print (target, map(str, msg[:3]))
601 602 # send job to the engine
602 603 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
603 604 self.engine_stream.send_multipart(job.raw_msg, copy=False)
604 605 # update load
605 606 self.add_job(idx)
606 607 self.pending[target][job.msg_id] = job
607 608 # notify Hub
608 609 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
609 610 self.session.send(self.mon_stream, 'task_destination', content=content,
610 611 ident=[b'tracktask',self.ident])
611 612
612 613
613 614 #-----------------------------------------------------------------------
614 615 # Result Handling
615 616 #-----------------------------------------------------------------------
616 617
617 618
618 619 @util.log_errors
619 620 def dispatch_result(self, raw_msg):
620 621 """dispatch method for result replies"""
621 622 try:
622 623 idents,msg = self.session.feed_identities(raw_msg, copy=False)
623 624 msg = self.session.unserialize(msg, content=False, copy=False)
624 625 engine = idents[0]
625 626 try:
626 627 idx = self.targets.index(engine)
627 628 except ValueError:
628 629 pass # skip load-update for dead engines
629 630 else:
630 631 self.finish_job(idx)
631 632 except Exception:
632 633 self.log.error("task::Invalid result: %r", raw_msg, exc_info=True)
633 634 return
634 635
635 636 md = msg['metadata']
636 637 parent = msg['parent_header']
637 638 if md.get('dependencies_met', True):
638 639 success = (md['status'] == 'ok')
639 640 msg_id = parent['msg_id']
640 641 retries = self.retries[msg_id]
641 642 if not success and retries > 0:
642 643 # failed
643 644 self.retries[msg_id] = retries - 1
644 645 self.handle_unmet_dependency(idents, parent)
645 646 else:
646 647 del self.retries[msg_id]
647 648 # relay to client and update graph
648 649 self.handle_result(idents, parent, raw_msg, success)
649 650 # send to Hub monitor
650 651 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
651 652 else:
652 653 self.handle_unmet_dependency(idents, parent)
653 654
654 655 def handle_result(self, idents, parent, raw_msg, success=True):
655 656 """handle a real task result, either success or failure"""
656 657 # first, relay result to client
657 658 engine = idents[0]
658 659 client = idents[1]
659 660 # swap_ids for ROUTER-ROUTER mirror
660 661 raw_msg[:2] = [client,engine]
661 662 # print (map(str, raw_msg[:4]))
662 663 self.client_stream.send_multipart(raw_msg, copy=False)
663 664 # now, update our data structures
664 665 msg_id = parent['msg_id']
665 666 self.pending[engine].pop(msg_id)
666 667 if success:
667 668 self.completed[engine].add(msg_id)
668 669 self.all_completed.add(msg_id)
669 670 else:
670 671 self.failed[engine].add(msg_id)
671 672 self.all_failed.add(msg_id)
672 673 self.all_done.add(msg_id)
673 674 self.destinations[msg_id] = engine
674 675
675 676 self.update_graph(msg_id, success)
676 677
677 678 def handle_unmet_dependency(self, idents, parent):
678 679 """handle an unmet dependency"""
679 680 engine = idents[0]
680 681 msg_id = parent['msg_id']
681 682
682 683 job = self.pending[engine].pop(msg_id)
683 684 job.blacklist.add(engine)
684 685
685 686 if job.blacklist == job.targets:
686 687 self.queue_map[msg_id] = job
687 688 self.fail_unreachable(msg_id)
688 689 elif not self.maybe_run(job):
689 690 # resubmit failed
690 691 if msg_id not in self.all_failed:
691 692 # put it back in our dependency tree
692 693 self.save_unmet(job)
693 694
694 695 if self.hwm:
695 696 try:
696 697 idx = self.targets.index(engine)
697 698 except ValueError:
698 699 pass # skip load-update for dead engines
699 700 else:
700 701 if self.loads[idx] == self.hwm-1:
701 702 self.update_graph(None)
702 703
703 704 def update_graph(self, dep_id=None, success=True):
704 705 """dep_id just finished. Update our dependency
705 706 graph and submit any jobs that just became runnable.
706 707
707 708 Called with dep_id=None to update entire graph for hwm, but without finishing a task.
708 709 """
709 710 # print ("\n\n***********")
710 711 # pprint (dep_id)
711 712 # pprint (self.graph)
712 713 # pprint (self.queue_map)
713 714 # pprint (self.all_completed)
714 715 # pprint (self.all_failed)
715 716 # print ("\n\n***********\n\n")
716 717 # update any jobs that depended on the dependency
717 718 msg_ids = self.graph.pop(dep_id, [])
718 719
719 720 # recheck *all* jobs if
720 721 # a) we have HWM and an engine just become no longer full
721 722 # or b) dep_id was given as None
722 723
723 724 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
724 725 jobs = self.queue
725 726 using_queue = True
726 727 else:
727 728 using_queue = False
728 729 jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids ))
729 730
730 731 to_restore = []
731 732 while jobs:
732 733 job = jobs.popleft()
733 734 if job.removed:
734 735 continue
735 736 msg_id = job.msg_id
736 737
737 738 put_it_back = True
738 739
739 740 if job.after.unreachable(self.all_completed, self.all_failed)\
740 741 or job.follow.unreachable(self.all_completed, self.all_failed):
741 742 self.fail_unreachable(msg_id)
742 743 put_it_back = False
743 744
744 745 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
745 746 if self.maybe_run(job):
746 747 put_it_back = False
747 748 self.queue_map.pop(msg_id)
748 749 for mid in job.dependents:
749 750 if mid in self.graph:
750 751 self.graph[mid].remove(msg_id)
751 752
752 753 # abort the loop if we just filled up all of our engines.
753 754 # avoids an O(N) operation in situation of full queue,
754 755 # where graph update is triggered as soon as an engine becomes
755 756 # non-full, and all tasks after the first are checked,
756 757 # even though they can't run.
757 758 if not self.available_engines():
758 759 break
759 760
760 761 if using_queue and put_it_back:
761 762 # popped a job from the queue but it neither ran nor failed,
762 763 # so we need to put it back when we are done
763 764 # make sure to_restore preserves the same ordering
764 765 to_restore.append(job)
765 766
766 767 # put back any tasks we popped but didn't run
767 768 if using_queue:
768 769 self.queue.extendleft(to_restore)
769 770
770 771 #----------------------------------------------------------------------
771 772 # methods to be overridden by subclasses
772 773 #----------------------------------------------------------------------
773 774
774 775 def add_job(self, idx):
775 776 """Called after self.targets[idx] just got the job with header.
776 777 Override with subclasses. The default ordering is simple LRU.
777 778 The default loads are the number of outstanding jobs."""
778 779 self.loads[idx] += 1
779 780 for lis in (self.targets, self.loads):
780 781 lis.append(lis.pop(idx))
781 782
782 783
783 784 def finish_job(self, idx):
784 785 """Called after self.targets[idx] just finished a job.
785 786 Override with subclasses."""
786 787 self.loads[idx] -= 1
787 788
788 789
789 790
790 791 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
791 792 logname='root', log_url=None, loglevel=logging.DEBUG,
792 793 identity=b'task', in_thread=False):
793 794
794 795 ZMQStream = zmqstream.ZMQStream
795 796
796 797 if config:
797 798 # unwrap dict back into Config
798 799 config = Config(config)
799 800
800 801 if in_thread:
801 802 # use instance() to get the same Context/Loop as our parent
802 803 ctx = zmq.Context.instance()
803 804 loop = ioloop.IOLoop.instance()
804 805 else:
805 806 # in a process, don't use instance()
806 807 # for safety with multiprocessing
807 808 ctx = zmq.Context()
808 809 loop = ioloop.IOLoop()
809 810 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
810 811 util.set_hwm(ins, 0)
811 812 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
812 813 ins.bind(in_addr)
813 814
814 815 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
815 816 util.set_hwm(outs, 0)
816 817 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
817 818 outs.bind(out_addr)
818 819 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
819 820 util.set_hwm(mons, 0)
820 821 mons.connect(mon_addr)
821 822 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
822 823 nots.setsockopt(zmq.SUBSCRIBE, b'')
823 824 nots.connect(not_addr)
824 825
825 826 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
826 827 querys.connect(reg_addr)
827 828
828 829 # setup logging.
829 830 if in_thread:
830 831 log = Application.instance().log
831 832 else:
832 833 if log_url:
833 834 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
834 835 else:
835 836 log = local_logger(logname, loglevel)
836 837
837 838 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
838 839 mon_stream=mons, notifier_stream=nots,
839 840 query_stream=querys,
840 841 loop=loop, log=log,
841 842 config=config)
842 843 scheduler.start()
843 844 if not in_thread:
844 845 try:
845 846 loop.start()
846 847 except KeyboardInterrupt:
847 848 scheduler.log.critical("Interrupted, exiting...")
848 849
@@ -1,301 +1,301
1 1 """A simple engine that talks to a controller over 0MQ.
2 2 it handles registration, etc. and launches a kernel
3 3 connected to the Controller's Schedulers.
4 4 """
5 5
6 6 # Copyright (c) IPython Development Team.
7 7 # Distributed under the terms of the Modified BSD License.
8 8
9 9 from __future__ import print_function
10 10
11 11 import sys
12 12 import time
13 13 from getpass import getpass
14 14
15 15 import zmq
16 16 from zmq.eventloop import ioloop, zmqstream
17 17
18 18 from IPython.utils.localinterfaces import localhost
19 19 from IPython.utils.traitlets import (
20 20 Instance, Dict, Integer, Type, Float, Unicode, CBytes, Bool
21 21 )
22 22 from IPython.utils.py3compat import cast_bytes
23 23
24 24 from IPython.parallel.controller.heartmonitor import Heart
25 25 from IPython.parallel.factory import RegistrationFactory
26 26 from IPython.parallel.util import disambiguate_url
27 27
28 from IPython.kernel.zmq.session import Message
29 28 from IPython.kernel.zmq.ipkernel import IPythonKernel as Kernel
30 29 from IPython.kernel.zmq.kernelapp import IPKernelApp
31 30
32 31 class EngineFactory(RegistrationFactory):
33 32 """IPython engine"""
34 33
35 34 # configurables:
36 35 out_stream_factory=Type('IPython.kernel.zmq.iostream.OutStream', config=True,
37 36 help="""The OutStream for handling stdout/err.
38 37 Typically 'IPython.kernel.zmq.iostream.OutStream'""")
39 38 display_hook_factory=Type('IPython.kernel.zmq.displayhook.ZMQDisplayHook', config=True,
40 39 help="""The class for handling displayhook.
41 40 Typically 'IPython.kernel.zmq.displayhook.ZMQDisplayHook'""")
42 41 location=Unicode(config=True,
43 42 help="""The location (an IP address) of the controller. This is
44 43 used for disambiguating URLs, to determine whether
45 44 loopback should be used to connect or the public address.""")
46 45 timeout=Float(5.0, config=True,
47 46 help="""The time (in seconds) to wait for the Controller to respond
48 47 to registration requests before giving up.""")
49 48 max_heartbeat_misses=Integer(50, config=True,
50 49 help="""The maximum number of times a check for the heartbeat ping of a
51 50 controller can be missed before shutting down the engine.
52 51
53 52 If set to 0, the check is disabled.""")
54 53 sshserver=Unicode(config=True,
55 54 help="""The SSH server to use for tunneling connections to the Controller.""")
56 55 sshkey=Unicode(config=True,
57 56 help="""The SSH private key file to use when tunneling connections to the Controller.""")
58 57 paramiko=Bool(sys.platform == 'win32', config=True,
59 58 help="""Whether to use paramiko instead of openssh for tunnels.""")
60 59
61 60 @property
62 61 def tunnel_mod(self):
63 62 from zmq.ssh import tunnel
64 63 return tunnel
65 64
66 65
67 66 # not configurable:
68 67 connection_info = Dict()
69 68 user_ns = Dict()
70 69 id = Integer(allow_none=True)
71 70 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
72 71 kernel = Instance(Kernel)
73 72 hb_check_period=Integer()
74 73
75 74 # States for the heartbeat monitoring
76 75 # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that
77 76 # during the first check no "missed" ping is reported. Must be floats for Python 3 compatibility.
78 77 _hb_last_pinged = 0.0
79 78 _hb_last_monitored = 0.0
80 79 _hb_missed_beats = 0
81 80 # The zmq Stream which receives the pings from the Heart
82 81 _hb_listener = None
83 82
84 83 bident = CBytes()
85 84 ident = Unicode()
86 85 def _ident_changed(self, name, old, new):
87 86 self.bident = cast_bytes(new)
88 87 using_ssh=Bool(False)
89 88
90 89
91 90 def __init__(self, **kwargs):
92 91 super(EngineFactory, self).__init__(**kwargs)
93 92 self.ident = self.session.session
94 93
95 94 def init_connector(self):
96 95 """construct connection function, which handles tunnels."""
97 96 self.using_ssh = bool(self.sshkey or self.sshserver)
98 97
99 98 if self.sshkey and not self.sshserver:
100 99 # We are using ssh directly to the controller, tunneling localhost to localhost
101 100 self.sshserver = self.url.split('://')[1].split(':')[0]
102 101
103 102 if self.using_ssh:
104 103 if self.tunnel_mod.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
105 104 password=False
106 105 else:
107 106 password = getpass("SSH Password for %s: "%self.sshserver)
108 107 else:
109 108 password = False
110 109
111 110 def connect(s, url):
112 111 url = disambiguate_url(url, self.location)
113 112 if self.using_ssh:
114 113 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
115 114 return self.tunnel_mod.tunnel_connection(s, url, self.sshserver,
116 115 keyfile=self.sshkey, paramiko=self.paramiko,
117 116 password=password,
118 117 )
119 118 else:
120 119 return s.connect(url)
121 120
122 121 def maybe_tunnel(url):
123 122 """like connect, but don't complete the connection (for use by heartbeat)"""
124 123 url = disambiguate_url(url, self.location)
125 124 if self.using_ssh:
126 125 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
127 126 url, tunnelobj = self.tunnel_mod.open_tunnel(url, self.sshserver,
128 127 keyfile=self.sshkey, paramiko=self.paramiko,
129 128 password=password,
130 129 )
131 130 return str(url)
132 131 return connect, maybe_tunnel
133 132
134 133 def register(self):
135 134 """send the registration_request"""
136 135
137 136 self.log.info("Registering with controller at %s"%self.url)
138 137 ctx = self.context
139 138 connect,maybe_tunnel = self.init_connector()
140 139 reg = ctx.socket(zmq.DEALER)
141 140 reg.setsockopt(zmq.IDENTITY, self.bident)
142 141 connect(reg, self.url)
143 142 self.registrar = zmqstream.ZMQStream(reg, self.loop)
144 143
145 144
146 145 content = dict(uuid=self.ident)
147 146 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
148 147 # print (self.session.key)
149 148 self.session.send(self.registrar, "registration_request", content=content)
150 149
151 150 def _report_ping(self, msg):
152 151 """Callback for when the heartmonitor.Heart receives a ping"""
153 152 #self.log.debug("Received a ping: %s", msg)
154 153 self._hb_last_pinged = time.time()
155 154
156 155 def complete_registration(self, msg, connect, maybe_tunnel):
157 156 # print msg
158 self._abort_dc.stop()
157 self.loop.remove_timeout(self._abort_dc)
159 158 ctx = self.context
160 159 loop = self.loop
161 160 identity = self.bident
162 161 idents,msg = self.session.feed_identities(msg)
163 162 msg = self.session.unserialize(msg)
164 163 content = msg['content']
165 164 info = self.connection_info
166 165
167 166 def url(key):
168 167 """get zmq url for given channel"""
169 168 return str(info["interface"] + ":%i" % info[key])
170 169
171 170 if content['status'] == 'ok':
172 171 self.id = int(content['id'])
173 172
174 173 # launch heartbeat
175 174 # possibly forward hb ports with tunnels
176 175 hb_ping = maybe_tunnel(url('hb_ping'))
177 176 hb_pong = maybe_tunnel(url('hb_pong'))
178 177
179 178 hb_monitor = None
180 179 if self.max_heartbeat_misses > 0:
181 180 # Add a monitor socket which will record the last time a ping was seen
182 181 mon = self.context.socket(zmq.SUB)
183 182 mport = mon.bind_to_random_port('tcp://%s' % localhost())
184 183 mon.setsockopt(zmq.SUBSCRIBE, b"")
185 184 self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
186 185 self._hb_listener.on_recv(self._report_ping)
187 186
188 187
189 188 hb_monitor = "tcp://%s:%i" % (localhost(), mport)
190 189
191 190 heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity)
192 191 heart.start()
193 192
194 193 # create Shell Connections (MUX, Task, etc.):
195 194 shell_addrs = url('mux'), url('task')
196 195
197 196 # Use only one shell stream for mux and tasks
198 197 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
199 198 stream.setsockopt(zmq.IDENTITY, identity)
200 199 shell_streams = [stream]
201 200 for addr in shell_addrs:
202 201 connect(stream, addr)
203 202
204 203 # control stream:
205 204 control_addr = url('control')
206 205 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
207 206 control_stream.setsockopt(zmq.IDENTITY, identity)
208 207 connect(control_stream, control_addr)
209 208
210 209 # create iopub stream:
211 210 iopub_addr = url('iopub')
212 211 iopub_socket = ctx.socket(zmq.PUB)
213 212 iopub_socket.setsockopt(zmq.IDENTITY, identity)
214 213 connect(iopub_socket, iopub_addr)
215 214
216 215 # disable history:
217 216 self.config.HistoryManager.hist_file = ':memory:'
218 217
219 218 # Redirect input streams and set a display hook.
220 219 if self.out_stream_factory:
221 220 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
222 221 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
223 222 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
224 223 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
225 224 if self.display_hook_factory:
226 225 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
227 226 sys.displayhook.topic = cast_bytes('engine.%i.execute_result' % self.id)
228 227
229 228 self.kernel = Kernel(parent=self, int_id=self.id, ident=self.ident, session=self.session,
230 229 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
231 230 loop=loop, user_ns=self.user_ns, log=self.log)
232 231
233 232 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
234 233
235 234
236 235 # periodically check the heartbeat pings of the controller
237 236 # Should be started here and not in "start()" so that the right period can be taken
238 237 # from the hubs HeartBeatMonitor.period
239 238 if self.max_heartbeat_misses > 0:
240 239 # Use a slightly bigger check period than the hub signal period to not warn unnecessary
241 240 self.hb_check_period = int(content['hb_period'])+10
242 241 self.log.info("Starting to monitor the heartbeat signal from the hub every %i ms." , self.hb_check_period)
243 242 self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop)
244 243 self._hb_reporter.start()
245 244 else:
246 245 self.log.info("Monitoring of the heartbeat signal from the hub is not enabled.")
247 246
248 247
249 248 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
250 249 app = IPKernelApp(parent=self, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
251 250 app.init_profile_dir()
252 251 app.init_code()
253 252
254 253 self.kernel.start()
255 254 else:
256 255 self.log.fatal("Registration Failed: %s"%msg)
257 256 raise Exception("Registration Failed: %s"%msg)
258 257
259 258 self.log.info("Completed registration with id %i"%self.id)
260 259
261 260
262 261 def abort(self):
263 262 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
264 263 if self.url.startswith('127.'):
265 264 self.log.fatal("""
266 265 If the controller and engines are not on the same machine,
267 266 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
268 267 c.HubFactory.ip='*' # for all interfaces, internal and external
269 268 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
270 269 or tunnel connections via ssh.
271 270 """)
272 271 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
273 272 time.sleep(1)
274 273 sys.exit(255)
275 274
276 275 def _hb_monitor(self):
277 276 """Callback to monitor the heartbeat from the controller"""
278 277 self._hb_listener.flush()
279 278 if self._hb_last_monitored > self._hb_last_pinged:
280 279 self._hb_missed_beats += 1
281 280 self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats)
282 281 else:
283 282 #self.log.debug("Heartbeat received (after missing %s beats).", self._hb_missed_beats)
284 283 self._hb_missed_beats = 0
285 284
286 285 if self._hb_missed_beats >= self.max_heartbeat_misses:
287 286 self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.",
288 287 self.max_heartbeat_misses, self.hb_check_period)
289 288 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
290 289 self.loop.stop()
291 290
292 291 self._hb_last_monitored = time.time()
293 292
294 293
295 294 def start(self):
296 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
297 dc.start()
298 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
299 self._abort_dc.start()
295 loop = self.loop
296 def _start():
297 self.register()
298 loop.add_timeout(loop.time() + self.timeout, self.abort)
299 self.loop.add_callback(_start)
300 300
301 301
General Comments 0
You need to be logged in to leave comments. Login now