##// END OF EJS Templates
cleanup aliases in parallel apps...
MinRK -
Show More
@@ -1,267 +1,266 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The Base Application class for IPython.parallel apps
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * Min RK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import os
27 27 import logging
28 28 import re
29 29 import sys
30 30
31 31 from subprocess import Popen, PIPE
32 32
33 33 from IPython.core import release
34 34 from IPython.core.crashhandler import CrashHandler
35 35 from IPython.core.application import (
36 36 BaseIPythonApplication,
37 37 base_aliases as base_ip_aliases,
38 38 base_flags as base_ip_flags
39 39 )
40 40 from IPython.utils.path import expand_path
41 41
42 42 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
43 43
44 44 #-----------------------------------------------------------------------------
45 45 # Module errors
46 46 #-----------------------------------------------------------------------------
47 47
48 48 class PIDFileError(Exception):
49 49 pass
50 50
51 51
52 52 #-----------------------------------------------------------------------------
53 53 # Crash handler for this application
54 54 #-----------------------------------------------------------------------------
55 55
56 56
57 57 _message_template = """\
58 58 Oops, $self.app_name crashed. We do our best to make it stable, but...
59 59
60 60 A crash report was automatically generated with the following information:
61 61 - A verbatim copy of the crash traceback.
62 62 - Data on your current $self.app_name configuration.
63 63
64 64 It was left in the file named:
65 65 \t'$self.crash_report_fname'
66 66 If you can email this file to the developers, the information in it will help
67 67 them in understanding and correcting the problem.
68 68
69 69 You can mail it to: $self.contact_name at $self.contact_email
70 70 with the subject '$self.app_name Crash Report'.
71 71
72 72 If you want to do it now, the following command will work (under Unix):
73 73 mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
74 74
75 75 To ensure accurate tracking of this issue, please file a report about it at:
76 76 $self.bug_tracker
77 77 """
78 78
79 79 class ParallelCrashHandler(CrashHandler):
80 80 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
81 81
82 82 message_template = _message_template
83 83
84 84 def __init__(self, app):
85 85 contact_name = release.authors['Min'][0]
86 86 contact_email = release.authors['Min'][1]
87 87 bug_tracker = 'http://github.com/ipython/ipython/issues'
88 88 super(ParallelCrashHandler,self).__init__(
89 89 app, contact_name, contact_email, bug_tracker
90 90 )
91 91
92 92
93 93 #-----------------------------------------------------------------------------
94 94 # Main application
95 95 #-----------------------------------------------------------------------------
96 96 base_aliases = {}
97 97 base_aliases.update(base_ip_aliases)
98 98 base_aliases.update({
99 99 'profile_dir' : 'ProfileDir.location',
100 'log_level' : 'BaseParallelApplication.log_level',
101 100 'work_dir' : 'BaseParallelApplication.work_dir',
102 101 'log_to_file' : 'BaseParallelApplication.log_to_file',
103 102 'clean_logs' : 'BaseParallelApplication.clean_logs',
104 103 'log_url' : 'BaseParallelApplication.log_url',
105 104 })
106 105
107 106 base_flags = {
108 107 'log-to-file' : (
109 108 {'BaseParallelApplication' : {'log_to_file' : True}},
110 109 "send log output to a file"
111 110 )
112 111 }
113 112 base_flags.update(base_ip_flags)
114 113
115 114 class BaseParallelApplication(BaseIPythonApplication):
116 115 """The base Application for IPython.parallel apps
117 116
118 117 Principle extensions to BaseIPyythonApplication:
119 118
120 119 * work_dir
121 120 * remote logging via pyzmq
122 121 * IOLoop instance
123 122 """
124 123
125 124 crash_handler_class = ParallelCrashHandler
126 125
127 126 def _log_level_default(self):
128 127 # temporarily override default_log_level to INFO
129 128 return logging.INFO
130 129
131 130 work_dir = Unicode(os.getcwdu(), config=True,
132 131 help='Set the working dir for the process.'
133 132 )
134 133 def _work_dir_changed(self, name, old, new):
135 134 self.work_dir = unicode(expand_path(new))
136 135
137 136 log_to_file = Bool(config=True,
138 137 help="whether to log to a file")
139 138
140 139 clean_logs = Bool(False, config=True,
141 140 help="whether to cleanup old logfiles before starting")
142 141
143 142 log_url = Unicode('', config=True,
144 143 help="The ZMQ URL of the iplogger to aggregate logging.")
145 144
146 145 def _config_files_default(self):
147 146 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
148 147
149 148 loop = Instance('zmq.eventloop.ioloop.IOLoop')
150 149 def _loop_default(self):
151 150 from zmq.eventloop.ioloop import IOLoop
152 151 return IOLoop.instance()
153 152
154 153 aliases = Dict(base_aliases)
155 154 flags = Dict(base_flags)
156 155
157 156 def initialize(self, argv=None):
158 157 """initialize the app"""
159 158 super(BaseParallelApplication, self).initialize(argv)
160 159 self.to_work_dir()
161 160 self.reinit_logging()
162 161
163 162 def to_work_dir(self):
164 163 wd = self.work_dir
165 164 if unicode(wd) != os.getcwdu():
166 165 os.chdir(wd)
167 166 self.log.info("Changing to working dir: %s" % wd)
168 167 # This is the working dir by now.
169 168 sys.path.insert(0, '')
170 169
171 170 def reinit_logging(self):
172 171 # Remove old log files
173 172 log_dir = self.profile_dir.log_dir
174 173 if self.clean_logs:
175 174 for f in os.listdir(log_dir):
176 175 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
177 176 os.remove(os.path.join(log_dir, f))
178 177 if self.log_to_file:
179 178 # Start logging to the new log file
180 179 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
181 180 logfile = os.path.join(log_dir, log_filename)
182 181 open_log_file = open(logfile, 'w')
183 182 else:
184 183 open_log_file = None
185 184 if open_log_file is not None:
186 185 self.log.removeHandler(self._log_handler)
187 186 self._log_handler = logging.StreamHandler(open_log_file)
188 187 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
189 188 self._log_handler.setFormatter(self._log_formatter)
190 189 self.log.addHandler(self._log_handler)
191 190
192 191 def write_pid_file(self, overwrite=False):
193 192 """Create a .pid file in the pid_dir with my pid.
194 193
195 194 This must be called after pre_construct, which sets `self.pid_dir`.
196 195 This raises :exc:`PIDFileError` if the pid file exists already.
197 196 """
198 197 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
199 198 if os.path.isfile(pid_file):
200 199 pid = self.get_pid_from_file()
201 200 if not overwrite:
202 201 raise PIDFileError(
203 202 'The pid file [%s] already exists. \nThis could mean that this '
204 203 'server is already running with [pid=%s].' % (pid_file, pid)
205 204 )
206 205 with open(pid_file, 'w') as f:
207 206 self.log.info("Creating pid file: %s" % pid_file)
208 207 f.write(repr(os.getpid())+'\n')
209 208
210 209 def remove_pid_file(self):
211 210 """Remove the pid file.
212 211
213 212 This should be called at shutdown by registering a callback with
214 213 :func:`reactor.addSystemEventTrigger`. This needs to return
215 214 ``None``.
216 215 """
217 216 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
218 217 if os.path.isfile(pid_file):
219 218 try:
220 219 self.log.info("Removing pid file: %s" % pid_file)
221 220 os.remove(pid_file)
222 221 except:
223 222 self.log.warn("Error removing the pid file: %s" % pid_file)
224 223
225 224 def get_pid_from_file(self):
226 225 """Get the pid from the pid file.
227 226
228 227 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
229 228 """
230 229 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
231 230 if os.path.isfile(pid_file):
232 231 with open(pid_file, 'r') as f:
233 232 s = f.read().strip()
234 233 try:
235 234 pid = int(s)
236 235 except:
237 236 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
238 237 return pid
239 238 else:
240 239 raise PIDFileError('pid file not found: %s' % pid_file)
241 240
242 241 def check_pid(self, pid):
243 242 if os.name == 'nt':
244 243 try:
245 244 import ctypes
246 245 # returns 0 if no such process (of ours) exists
247 246 # positive int otherwise
248 247 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
249 248 except Exception:
250 249 self.log.warn(
251 250 "Could not determine whether pid %i is running via `OpenProcess`. "
252 251 " Making the likely assumption that it is."%pid
253 252 )
254 253 return True
255 254 return bool(p)
256 255 else:
257 256 try:
258 257 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
259 258 output,_ = p.communicate()
260 259 except OSError:
261 260 self.log.warn(
262 261 "Could not determine whether pid %i is running via `ps x`. "
263 262 " Making the likely assumption that it is."%pid
264 263 )
265 264 return True
266 265 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
267 266 return pid in pids
@@ -1,458 +1,459 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import errno
25 25 import logging
26 26 import os
27 27 import re
28 28 import signal
29 29
30 30 from subprocess import check_call, CalledProcessError, PIPE
31 31 import zmq
32 32 from zmq.eventloop import ioloop
33 33
34 34 from IPython.config.application import Application, boolean_flag
35 35 from IPython.config.loader import Config
36 36 from IPython.core.application import BaseIPythonApplication
37 37 from IPython.core.profiledir import ProfileDir
38 38 from IPython.utils.daemonize import daemonize
39 39 from IPython.utils.importstring import import_item
40 40 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
41 41 DottedObjectName)
42 42
43 43 from IPython.parallel.apps.baseapp import (
44 44 BaseParallelApplication,
45 45 PIDFileError,
46 46 base_flags, base_aliases
47 47 )
48 48
49 49
50 50 #-----------------------------------------------------------------------------
51 51 # Module level variables
52 52 #-----------------------------------------------------------------------------
53 53
54 54
55 55 default_config_file_name = u'ipcluster_config.py'
56 56
57 57
58 58 _description = """Start an IPython cluster for parallel computing.
59 59
60 60 An IPython cluster consists of 1 controller and 1 or more engines.
61 61 This command automates the startup of these processes using a wide
62 62 range of startup methods (SSH, local processes, PBS, mpiexec,
63 63 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 64 local host simply do 'ipcluster start n=4'. For more complex usage
65 65 you will typically do 'ipcluster create profile=mycluster', then edit
66 66 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
67 67 """
68 68
69 69
70 70 # Exit codes for ipcluster
71 71
72 72 # This will be the exit code if the ipcluster appears to be running because
73 73 # a .pid file exists
74 74 ALREADY_STARTED = 10
75 75
76 76
77 77 # This will be the exit code if ipcluster stop is run, but there is not .pid
78 78 # file to be found.
79 79 ALREADY_STOPPED = 11
80 80
81 81 # This will be the exit code if ipcluster engines is run, but there is not .pid
82 82 # file to be found.
83 83 NO_CLUSTER = 12
84 84
85 85
86 86 #-----------------------------------------------------------------------------
87 87 # Main application
88 88 #-----------------------------------------------------------------------------
89 89 start_help = """Start an IPython cluster for parallel computing
90 90
91 91 Start an ipython cluster by its profile name or cluster
92 92 directory. Cluster directories contain configuration, log and
93 93 security related files and are named using the convention
94 94 'profile_<name>' and should be creating using the 'start'
95 95 subcommand of 'ipcluster'. If your cluster directory is in
96 96 the cwd or the ipython directory, you can simply refer to it
97 97 using its profile name, 'ipcluster start n=4 profile=<profile>`,
98 98 otherwise use the 'profile_dir' option.
99 99 """
100 100 stop_help = """Stop a running IPython cluster
101 101
102 102 Stop a running ipython cluster by its profile name or cluster
103 103 directory. Cluster directories are named using the convention
104 104 'profile_<name>'. If your cluster directory is in
105 105 the cwd or the ipython directory, you can simply refer to it
106 106 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
107 107 use the 'profile_dir' option.
108 108 """
109 109 engines_help = """Start engines connected to an existing IPython cluster
110 110
111 111 Start one or more engines to connect to an existing Cluster
112 112 by profile name or cluster directory.
113 113 Cluster directories contain configuration, log and
114 114 security related files and are named using the convention
115 115 'profile_<name>' and should be creating using the 'start'
116 116 subcommand of 'ipcluster'. If your cluster directory is in
117 117 the cwd or the ipython directory, you can simply refer to it
118 118 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
119 119 otherwise use the 'profile_dir' option.
120 120 """
121 121 stop_aliases = dict(
122 122 signal='IPClusterStop.signal',
123 123 )
124 124 stop_aliases.update(base_aliases)
125 125
126 126 class IPClusterStop(BaseParallelApplication):
127 127 name = u'ipcluster'
128 128 description = stop_help
129 129 config_file_name = Unicode(default_config_file_name)
130 130
131 131 signal = Int(signal.SIGINT, config=True,
132 132 help="signal to use for stopping processes.")
133 133
134 134 aliases = Dict(stop_aliases)
135 135
136 136 def start(self):
137 137 """Start the app for the stop subcommand."""
138 138 try:
139 139 pid = self.get_pid_from_file()
140 140 except PIDFileError:
141 141 self.log.critical(
142 142 'Could not read pid file, cluster is probably not running.'
143 143 )
144 144 # Here I exit with a unusual exit status that other processes
145 145 # can watch for to learn how I existed.
146 146 self.remove_pid_file()
147 147 self.exit(ALREADY_STOPPED)
148 148
149 149 if not self.check_pid(pid):
150 150 self.log.critical(
151 151 'Cluster [pid=%r] is not running.' % pid
152 152 )
153 153 self.remove_pid_file()
154 154 # Here I exit with a unusual exit status that other processes
155 155 # can watch for to learn how I existed.
156 156 self.exit(ALREADY_STOPPED)
157 157
158 158 elif os.name=='posix':
159 159 sig = self.signal
160 160 self.log.info(
161 161 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
162 162 )
163 163 try:
164 164 os.kill(pid, sig)
165 165 except OSError:
166 166 self.log.error("Stopping cluster failed, assuming already dead.",
167 167 exc_info=True)
168 168 self.remove_pid_file()
169 169 elif os.name=='nt':
170 170 try:
171 171 # kill the whole tree
172 172 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
173 173 except (CalledProcessError, OSError):
174 174 self.log.error("Stopping cluster failed, assuming already dead.",
175 175 exc_info=True)
176 176 self.remove_pid_file()
177 177
178 178 engine_aliases = {}
179 179 engine_aliases.update(base_aliases)
180 180 engine_aliases.update(dict(
181 181 n='IPClusterEngines.n',
182 elauncher = 'IPClusterEngines.engine_launcher_class',
182 engines = 'IPClusterEngines.engine_launcher_class',
183 183 daemonize = 'IPClusterEngines.daemonize',
184 184 ))
185 185 engine_flags = {}
186 186 engine_flags.update(base_flags)
187 187
188 188 engine_flags.update(dict(
189 189 daemonize=(
190 190 {'IPClusterEngines' : {'daemonize' : True}},
191 191 """run the cluster into the background (not available on Windows)""",
192 192 )
193 193 ))
194 194 class IPClusterEngines(BaseParallelApplication):
195 195
196 196 name = u'ipcluster'
197 197 description = engines_help
198 198 usage = None
199 199 config_file_name = Unicode(default_config_file_name)
200 200 default_log_level = logging.INFO
201 201 classes = List()
202 202 def _classes_default(self):
203 203 from IPython.parallel.apps import launcher
204 204 launchers = launcher.all_launchers
205 205 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
206 206 return [ProfileDir]+eslaunchers
207 207
208 208 n = Int(2, config=True,
209 209 help="The number of engines to start.")
210 210
211 211 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
212 212 config=True,
213 213 help="The class for launching a set of Engines."
214 214 )
215 215 daemonize = Bool(False, config=True,
216 216 help="""Daemonize the ipcluster program. This implies --log-to-file.
217 217 Not available on Windows.
218 218 """)
219 219
220 220 def _daemonize_changed(self, name, old, new):
221 221 if new:
222 222 self.log_to_file = True
223 223
224 224 aliases = Dict(engine_aliases)
225 225 flags = Dict(engine_flags)
226 226 _stopping = False
227 227
228 228 def initialize(self, argv=None):
229 229 super(IPClusterEngines, self).initialize(argv)
230 230 self.init_signal()
231 231 self.init_launchers()
232 232
233 233 def init_launchers(self):
234 234 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
235 235 self.engine_launcher.on_stop(lambda r: self.loop.stop())
236 236
237 237 def init_signal(self):
238 238 # Setup signals
239 239 signal.signal(signal.SIGINT, self.sigint_handler)
240 240
241 241 def build_launcher(self, clsname):
242 242 """import and instantiate a Launcher based on importstring"""
243 243 if '.' not in clsname:
244 244 # not a module, presume it's the raw name in apps.launcher
245 245 clsname = 'IPython.parallel.apps.launcher.'+clsname
246 246 # print repr(clsname)
247 247 klass = import_item(clsname)
248 248
249 249 launcher = klass(
250 250 work_dir=self.profile_dir.location, config=self.config, log=self.log
251 251 )
252 252 return launcher
253 253
254 254 def start_engines(self):
255 255 self.log.info("Starting %i engines"%self.n)
256 256 self.engine_launcher.start(
257 257 self.n,
258 258 self.profile_dir.location
259 259 )
260 260
261 261 def stop_engines(self):
262 262 self.log.info("Stopping Engines...")
263 263 if self.engine_launcher.running:
264 264 d = self.engine_launcher.stop()
265 265 return d
266 266 else:
267 267 return None
268 268
269 269 def stop_launchers(self, r=None):
270 270 if not self._stopping:
271 271 self._stopping = True
272 272 self.log.error("IPython cluster: stopping")
273 273 self.stop_engines()
274 274 # Wait a few seconds to let things shut down.
275 275 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
276 276 dc.start()
277 277
278 278 def sigint_handler(self, signum, frame):
279 279 self.log.debug("SIGINT received, stopping launchers...")
280 280 self.stop_launchers()
281 281
282 282 def start_logging(self):
283 283 # Remove old log files of the controller and engine
284 284 if self.clean_logs:
285 285 log_dir = self.profile_dir.log_dir
286 286 for f in os.listdir(log_dir):
287 287 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
288 288 os.remove(os.path.join(log_dir, f))
289 289 # This will remove old log files for ipcluster itself
290 290 # super(IPBaseParallelApplication, self).start_logging()
291 291
292 292 def start(self):
293 293 """Start the app for the engines subcommand."""
294 294 self.log.info("IPython cluster: started")
295 295 # First see if the cluster is already running
296 296
297 297 # Now log and daemonize
298 298 self.log.info(
299 299 'Starting engines with [daemon=%r]' % self.daemonize
300 300 )
301 301 # TODO: Get daemonize working on Windows or as a Windows Server.
302 302 if self.daemonize:
303 303 if os.name=='posix':
304 304 daemonize()
305 305
306 306 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
307 307 dc.start()
308 308 # Now write the new pid file AFTER our new forked pid is active.
309 309 # self.write_pid_file()
310 310 try:
311 311 self.loop.start()
312 312 except KeyboardInterrupt:
313 313 pass
314 314 except zmq.ZMQError as e:
315 315 if e.errno == errno.EINTR:
316 316 pass
317 317 else:
318 318 raise
319 319
320 320 start_aliases = {}
321 321 start_aliases.update(engine_aliases)
322 322 start_aliases.update(dict(
323 323 delay='IPClusterStart.delay',
324 324 clean_logs='IPClusterStart.clean_logs',
325 controller = 'IPClusterStart.controller_launcher_class',
325 326 ))
326 327
327 328 class IPClusterStart(IPClusterEngines):
328 329
329 330 name = u'ipcluster'
330 331 description = start_help
331 332 default_log_level = logging.INFO
332 333 auto_create = Bool(True, config=True,
333 334 help="whether to create the profile_dir if it doesn't exist")
334 335 classes = List()
335 336 def _classes_default(self,):
336 337 from IPython.parallel.apps import launcher
337 338 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
338 339
339 340 clean_logs = Bool(True, config=True,
340 341 help="whether to cleanup old logs before starting")
341 342
342 343 delay = CFloat(1., config=True,
343 344 help="delay (in s) between starting the controller and the engines")
344 345
345 346 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
346 347 config=True,
347 348 help="The class for launching a Controller."
348 349 )
349 350 reset = Bool(False, config=True,
350 351 help="Whether to reset config files as part of '--create'."
351 352 )
352 353
353 354 # flags = Dict(flags)
354 355 aliases = Dict(start_aliases)
355 356
356 357 def init_launchers(self):
357 358 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
358 359 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
359 360 self.controller_launcher.on_stop(self.stop_launchers)
360 361
361 362 def start_controller(self):
362 363 self.controller_launcher.start(
363 364 self.profile_dir.location
364 365 )
365 366
366 367 def stop_controller(self):
367 368 # self.log.info("In stop_controller")
368 369 if self.controller_launcher and self.controller_launcher.running:
369 370 return self.controller_launcher.stop()
370 371
371 372 def stop_launchers(self, r=None):
372 373 if not self._stopping:
373 374 self.stop_controller()
374 375 super(IPClusterStart, self).stop_launchers()
375 376
376 377 def start(self):
377 378 """Start the app for the start subcommand."""
378 379 # First see if the cluster is already running
379 380 try:
380 381 pid = self.get_pid_from_file()
381 382 except PIDFileError:
382 383 pass
383 384 else:
384 385 if self.check_pid(pid):
385 386 self.log.critical(
386 387 'Cluster is already running with [pid=%s]. '
387 388 'use "ipcluster stop" to stop the cluster.' % pid
388 389 )
389 390 # Here I exit with a unusual exit status that other processes
390 391 # can watch for to learn how I existed.
391 392 self.exit(ALREADY_STARTED)
392 393 else:
393 394 self.remove_pid_file()
394 395
395 396
396 397 # Now log and daemonize
397 398 self.log.info(
398 399 'Starting ipcluster with [daemon=%r]' % self.daemonize
399 400 )
400 401 # TODO: Get daemonize working on Windows or as a Windows Server.
401 402 if self.daemonize:
402 403 if os.name=='posix':
403 404 daemonize()
404 405
405 406 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
406 407 dc.start()
407 408 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
408 409 dc.start()
409 410 # Now write the new pid file AFTER our new forked pid is active.
410 411 self.write_pid_file()
411 412 try:
412 413 self.loop.start()
413 414 except KeyboardInterrupt:
414 415 pass
415 416 except zmq.ZMQError as e:
416 417 if e.errno == errno.EINTR:
417 418 pass
418 419 else:
419 420 raise
420 421 finally:
421 422 self.remove_pid_file()
422 423
423 424 base='IPython.parallel.apps.ipclusterapp.IPCluster'
424 425
425 426 class IPClusterApp(Application):
426 427 name = u'ipcluster'
427 428 description = _description
428 429
429 430 subcommands = {
430 431 'start' : (base+'Start', start_help),
431 432 'stop' : (base+'Stop', stop_help),
432 433 'engines' : (base+'Engines', engines_help),
433 434 }
434 435
435 436 # no aliases or flags for parent App
436 437 aliases = Dict()
437 438 flags = Dict()
438 439
439 440 def start(self):
440 441 if self.subapp is None:
441 442 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
442 443 print
443 444 self.print_description()
444 445 self.print_subcommands()
445 446 self.exit(1)
446 447 else:
447 448 return self.subapp.start()
448 449
449 450 def launch_new_instance():
450 451 """Create and run the IPython cluster."""
451 452 app = IPClusterApp.instance()
452 453 app.initialize()
453 454 app.start()
454 455
455 456
456 457 if __name__ == '__main__':
457 458 launch_new_instance()
458 459
@@ -1,428 +1,423 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import os
27 27 import socket
28 28 import stat
29 29 import sys
30 30 import uuid
31 31
32 32 from multiprocessing import Process
33 33
34 34 import zmq
35 35 from zmq.devices import ProcessMonitoredQueue
36 36 from zmq.log.handlers import PUBHandler
37 37 from zmq.utils import jsonapi as json
38 38
39 39 from IPython.config.application import boolean_flag
40 40 from IPython.core.profiledir import ProfileDir
41 41
42 42 from IPython.parallel.apps.baseapp import (
43 43 BaseParallelApplication,
44 base_flags
44 base_aliases,
45 base_flags,
45 46 )
46 47 from IPython.utils.importstring import import_item
47 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
48 49
49 50 # from IPython.parallel.controller.controller import ControllerFactory
50 51 from IPython.zmq.session import Session
51 52 from IPython.parallel.controller.heartmonitor import HeartMonitor
52 53 from IPython.parallel.controller.hub import HubFactory
53 54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
54 55 from IPython.parallel.controller.sqlitedb import SQLiteDB
55 56
56 57 from IPython.parallel.util import signal_children, split_url
57 58
58 59 # conditional import of MongoDB backend class
59 60
60 61 try:
61 62 from IPython.parallel.controller.mongodb import MongoDB
62 63 except ImportError:
63 64 maybe_mongo = []
64 65 else:
65 66 maybe_mongo = [MongoDB]
66 67
67 68
68 69 #-----------------------------------------------------------------------------
69 70 # Module level variables
70 71 #-----------------------------------------------------------------------------
71 72
72 73
73 74 #: The default config file name for this application
74 75 default_config_file_name = u'ipcontroller_config.py'
75 76
76 77
77 78 _description = """Start the IPython controller for parallel computing.
78 79
79 80 The IPython controller provides a gateway between the IPython engines and
80 81 clients. The controller needs to be started before the engines and can be
81 82 configured using command line options or using a cluster directory. Cluster
82 83 directories contain config, log and security files and are usually located in
83 84 your ipython directory and named as "profile_name". See the `profile`
84 85 and `profile_dir` options for details.
85 86 """
86 87
87 88
88 89
89 90
90 91 #-----------------------------------------------------------------------------
91 92 # The main application
92 93 #-----------------------------------------------------------------------------
93 94 flags = {}
94 95 flags.update(base_flags)
95 96 flags.update({
96 97 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
97 98 'Use threads instead of processes for the schedulers'),
98 99 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
99 100 'use the SQLiteDB backend'),
100 101 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
101 102 'use the MongoDB backend'),
102 103 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
103 104 'use the in-memory DictDB backend'),
104 105 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
105 106 'reuse existing json connection files')
106 107 })
107 108
108 109 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
109 110 "Use HMAC digests for authentication of messages.",
110 111 "Don't authenticate messages."
111 112 ))
113 aliases = dict(
114 reuse_files = 'IPControllerApp.reuse_files',
115 secure = 'IPControllerApp.secure',
116 ssh = 'IPControllerApp.ssh_server',
117 use_threads = 'IPControllerApp.use_threads',
118 location = 'IPControllerApp.location',
119
120 ident = 'Session.session',
121 user = 'Session.username',
122 exec_key = 'Session.keyfile',
123
124 url = 'HubFactory.url',
125 ip = 'HubFactory.ip',
126 transport = 'HubFactory.transport',
127 port = 'HubFactory.regport',
128
129 ping = 'HeartMonitor.period',
130
131 scheme = 'TaskScheduler.scheme_name',
132 hwm = 'TaskScheduler.hwm',
133 )
134 aliases.update(base_aliases)
112 135
113 136 class IPControllerApp(BaseParallelApplication):
114 137
115 138 name = u'ipcontroller'
116 139 description = _description
117 140 config_file_name = Unicode(default_config_file_name)
118 141 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
119 142
120 143 # change default to True
121 144 auto_create = Bool(True, config=True,
122 145 help="""Whether to create profile dir if it doesn't exist.""")
123 146
124 147 reuse_files = Bool(False, config=True,
125 148 help='Whether to reuse existing json connection files.'
126 149 )
127 150 secure = Bool(True, config=True,
128 151 help='Whether to use HMAC digests for extra message authentication.'
129 152 )
130 153 ssh_server = Unicode(u'', config=True,
131 154 help="""ssh url for clients to use when connecting to the Controller
132 155 processes. It should be of the form: [user@]server[:port]. The
133 156 Controller's listening addresses must be accessible from the ssh server""",
134 157 )
135 158 location = Unicode(u'', config=True,
136 159 help="""The external IP or domain name of the Controller, used for disambiguating
137 160 engine and client connections.""",
138 161 )
139 162 import_statements = List([], config=True,
140 163 help="import statements to be run at startup. Necessary in some environments"
141 164 )
142 165
143 166 use_threads = Bool(False, config=True,
144 167 help='Use threads instead of processes for the schedulers',
145 168 )
146 169
147 170 # internal
148 171 children = List()
149 172 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
150 173
151 174 def _use_threads_changed(self, name, old, new):
152 175 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
153 176
154 aliases = Dict(dict(
155 log_level = 'IPControllerApp.log_level',
156 log_url = 'IPControllerApp.log_url',
157 reuse_files = 'IPControllerApp.reuse_files',
158 secure = 'IPControllerApp.secure',
159 ssh = 'IPControllerApp.ssh_server',
160 use_threads = 'IPControllerApp.use_threads',
161 import_statements = 'IPControllerApp.import_statements',
162 location = 'IPControllerApp.location',
163
164 ident = 'Session.session',
165 user = 'Session.username',
166 exec_key = 'Session.keyfile',
167
168 url = 'HubFactory.url',
169 ip = 'HubFactory.ip',
170 transport = 'HubFactory.transport',
171 port = 'HubFactory.regport',
172
173 ping = 'HeartMonitor.period',
174
175 scheme = 'TaskScheduler.scheme_name',
176 hwm = 'TaskScheduler.hwm',
177
178
179 profile = "BaseIPythonApplication.profile",
180 profile_dir = 'ProfileDir.location',
181
182 ))
177 aliases = Dict(aliases)
183 178 flags = Dict(flags)
184 179
185 180
186 181 def save_connection_dict(self, fname, cdict):
187 182 """save a connection dict to json file."""
188 183 c = self.config
189 184 url = cdict['url']
190 185 location = cdict['location']
191 186 if not location:
192 187 try:
193 188 proto,ip,port = split_url(url)
194 189 except AssertionError:
195 190 pass
196 191 else:
197 192 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
198 193 cdict['location'] = location
199 194 fname = os.path.join(self.profile_dir.security_dir, fname)
200 195 with open(fname, 'wb') as f:
201 196 f.write(json.dumps(cdict, indent=2))
202 197 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
203 198
204 199 def load_config_from_json(self):
205 200 """load config from existing json connector files."""
206 201 c = self.config
207 202 # load from engine config
208 203 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
209 204 cfg = json.loads(f.read())
210 205 key = c.Session.key = cfg['exec_key']
211 206 xport,addr = cfg['url'].split('://')
212 207 c.HubFactory.engine_transport = xport
213 208 ip,ports = addr.split(':')
214 209 c.HubFactory.engine_ip = ip
215 210 c.HubFactory.regport = int(ports)
216 211 self.location = cfg['location']
217 212
218 213 # load client config
219 214 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
220 215 cfg = json.loads(f.read())
221 216 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
222 217 xport,addr = cfg['url'].split('://')
223 218 c.HubFactory.client_transport = xport
224 219 ip,ports = addr.split(':')
225 220 c.HubFactory.client_ip = ip
226 221 self.ssh_server = cfg['ssh']
227 222 assert int(ports) == c.HubFactory.regport, "regport mismatch"
228 223
229 224 def init_hub(self):
230 225 c = self.config
231 226
232 227 self.do_import_statements()
233 228 reusing = self.reuse_files
234 229 if reusing:
235 230 try:
236 231 self.load_config_from_json()
237 232 except (AssertionError,IOError):
238 233 reusing=False
239 234 # check again, because reusing may have failed:
240 235 if reusing:
241 236 pass
242 237 elif self.secure:
243 238 key = str(uuid.uuid4())
244 239 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
245 240 # with open(keyfile, 'w') as f:
246 241 # f.write(key)
247 242 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
248 243 c.Session.key = key
249 244 else:
250 245 key = c.Session.key = ''
251 246
252 247 try:
253 248 self.factory = HubFactory(config=c, log=self.log)
254 249 # self.start_logging()
255 250 self.factory.init_hub()
256 251 except:
257 252 self.log.error("Couldn't construct the Controller", exc_info=True)
258 253 self.exit(1)
259 254
260 255 if not reusing:
261 256 # save to new json config files
262 257 f = self.factory
263 258 cdict = {'exec_key' : key,
264 259 'ssh' : self.ssh_server,
265 260 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
266 261 'location' : self.location
267 262 }
268 263 self.save_connection_dict('ipcontroller-client.json', cdict)
269 264 edict = cdict
270 265 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
271 266 self.save_connection_dict('ipcontroller-engine.json', edict)
272 267
273 268 #
274 269 def init_schedulers(self):
275 270 children = self.children
276 271 mq = import_item(str(self.mq_class))
277 272
278 273 hub = self.factory
279 274 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
280 275 # IOPub relay (in a Process)
281 276 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
282 277 q.bind_in(hub.client_info['iopub'])
283 278 q.bind_out(hub.engine_info['iopub'])
284 279 q.setsockopt_out(zmq.SUBSCRIBE, '')
285 280 q.connect_mon(hub.monitor_url)
286 281 q.daemon=True
287 282 children.append(q)
288 283
289 284 # Multiplexer Queue (in a Process)
290 285 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
291 286 q.bind_in(hub.client_info['mux'])
292 287 q.setsockopt_in(zmq.IDENTITY, 'mux')
293 288 q.bind_out(hub.engine_info['mux'])
294 289 q.connect_mon(hub.monitor_url)
295 290 q.daemon=True
296 291 children.append(q)
297 292
298 293 # Control Queue (in a Process)
299 294 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
300 295 q.bind_in(hub.client_info['control'])
301 296 q.setsockopt_in(zmq.IDENTITY, 'control')
302 297 q.bind_out(hub.engine_info['control'])
303 298 q.connect_mon(hub.monitor_url)
304 299 q.daemon=True
305 300 children.append(q)
306 301 try:
307 302 scheme = self.config.TaskScheduler.scheme_name
308 303 except AttributeError:
309 304 scheme = TaskScheduler.scheme_name.get_default_value()
310 305 # Task Queue (in a Process)
311 306 if scheme == 'pure':
312 307 self.log.warn("task::using pure XREQ Task scheduler")
313 308 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
314 309 # q.setsockopt_out(zmq.HWM, hub.hwm)
315 310 q.bind_in(hub.client_info['task'][1])
316 311 q.setsockopt_in(zmq.IDENTITY, 'task')
317 312 q.bind_out(hub.engine_info['task'])
318 313 q.connect_mon(hub.monitor_url)
319 314 q.daemon=True
320 315 children.append(q)
321 316 elif scheme == 'none':
322 317 self.log.warn("task::using no Task scheduler")
323 318
324 319 else:
325 320 self.log.info("task::using Python %s Task scheduler"%scheme)
326 321 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
327 322 hub.monitor_url, hub.client_info['notification'])
328 323 kwargs = dict(logname='scheduler', loglevel=self.log_level,
329 324 log_url = self.log_url, config=dict(self.config))
330 325 if 'Process' in self.mq_class:
331 326 # run the Python scheduler in a Process
332 327 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
333 328 q.daemon=True
334 329 children.append(q)
335 330 else:
336 331 # single-threaded Controller
337 332 kwargs['in_thread'] = True
338 333 launch_scheduler(*sargs, **kwargs)
339 334
340 335
341 336 def save_urls(self):
342 337 """save the registration urls to files."""
343 338 c = self.config
344 339
345 340 sec_dir = self.profile_dir.security_dir
346 341 cf = self.factory
347 342
348 343 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
349 344 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
350 345
351 346 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
352 347 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
353 348
354 349
355 350 def do_import_statements(self):
356 351 statements = self.import_statements
357 352 for s in statements:
358 353 try:
359 354 self.log.msg("Executing statement: '%s'" % s)
360 355 exec s in globals(), locals()
361 356 except:
362 357 self.log.msg("Error running statement: %s" % s)
363 358
364 359 def forward_logging(self):
365 360 if self.log_url:
366 361 self.log.info("Forwarding logging to %s"%self.log_url)
367 362 context = zmq.Context.instance()
368 363 lsock = context.socket(zmq.PUB)
369 364 lsock.connect(self.log_url)
370 365 handler = PUBHandler(lsock)
371 366 self.log.removeHandler(self._log_handler)
372 367 handler.root_topic = 'controller'
373 368 handler.setLevel(self.log_level)
374 369 self.log.addHandler(handler)
375 370 self._log_handler = handler
376 371 # #
377 372
378 373 def initialize(self, argv=None):
379 374 super(IPControllerApp, self).initialize(argv)
380 375 self.forward_logging()
381 376 self.init_hub()
382 377 self.init_schedulers()
383 378
384 379 def start(self):
385 380 # Start the subprocesses:
386 381 self.factory.start()
387 382 child_procs = []
388 383 for child in self.children:
389 384 child.start()
390 385 if isinstance(child, ProcessMonitoredQueue):
391 386 child_procs.append(child.launcher)
392 387 elif isinstance(child, Process):
393 388 child_procs.append(child)
394 389 if child_procs:
395 390 signal_children(child_procs)
396 391
397 392 self.write_pid_file(overwrite=True)
398 393
399 394 try:
400 395 self.factory.loop.start()
401 396 except KeyboardInterrupt:
402 397 self.log.critical("Interrupted, Exiting...\n")
403 398
404 399
405 400
406 401 def launch_new_instance():
407 402 """Create and run the IPython controller"""
408 403 if sys.platform == 'win32':
409 404 # make sure we don't get called from a multiprocessing subprocess
410 405 # this can result in infinite Controllers being started on Windows
411 406 # which doesn't have a proper fork, so multiprocessing is wonky
412 407
413 408 # this only comes up when IPython has been installed using vanilla
414 409 # setuptools, and *not* distribute.
415 410 import multiprocessing
416 411 p = multiprocessing.current_process()
417 412 # the main process has name 'MainProcess'
418 413 # subprocesses will have names like 'Process-1'
419 414 if p.name != 'MainProcess':
420 415 # we are a subprocess, don't start another Controller!
421 416 return
422 417 app = IPControllerApp.instance()
423 418 app.initialize()
424 419 app.start()
425 420
426 421
427 422 if __name__ == '__main__':
428 423 launch_new_instance()
@@ -1,276 +1,276 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import json
25 25 import os
26 26 import sys
27 27
28 28 import zmq
29 29 from zmq.eventloop import ioloop
30 30
31 31 from IPython.core.profiledir import ProfileDir
32 from IPython.parallel.apps.baseapp import BaseParallelApplication
32 from IPython.parallel.apps.baseapp import (
33 BaseParallelApplication,
34 base_aliases,
35 base_flags,
36 )
33 37 from IPython.zmq.log import EnginePUBHandler
34 38
35 39 from IPython.config.configurable import Configurable
36 40 from IPython.zmq.session import Session
37 41 from IPython.parallel.engine.engine import EngineFactory
38 42 from IPython.parallel.engine.streamkernel import Kernel
39 43 from IPython.parallel.util import disambiguate_url
40 44
41 45 from IPython.utils.importstring import import_item
42 46 from IPython.utils.traitlets import Bool, Unicode, Dict, List
43 47
44 48
45 49 #-----------------------------------------------------------------------------
46 50 # Module level variables
47 51 #-----------------------------------------------------------------------------
48 52
49 53 #: The default config file name for this application
50 54 default_config_file_name = u'ipengine_config.py'
51 55
52 56 _description = """Start an IPython engine for parallel computing.
53 57
54 58 IPython engines run in parallel and perform computations on behalf of a client
55 59 and controller. A controller needs to be started before the engines. The
56 60 engine can be configured using command line options or using a cluster
57 61 directory. Cluster directories contain config, log and security files and are
58 62 usually located in your ipython directory and named as "profile_name".
59 63 See the `profile` and `profile_dir` options for details.
60 64 """
61 65
62 66
63 67 #-----------------------------------------------------------------------------
64 68 # MPI configuration
65 69 #-----------------------------------------------------------------------------
66 70
67 71 mpi4py_init = """from mpi4py import MPI as mpi
68 72 mpi.size = mpi.COMM_WORLD.Get_size()
69 73 mpi.rank = mpi.COMM_WORLD.Get_rank()
70 74 """
71 75
72 76
73 77 pytrilinos_init = """from PyTrilinos import Epetra
74 78 class SimpleStruct:
75 79 pass
76 80 mpi = SimpleStruct()
77 81 mpi.rank = 0
78 82 mpi.size = 0
79 83 """
80 84
81 85 class MPI(Configurable):
82 86 """Configurable for MPI initialization"""
83 87 use = Unicode('', config=True,
84 88 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
85 89 )
86 90
87 91 def _on_use_changed(self, old, new):
88 92 # load default init script if it's not set
89 93 if not self.init_script:
90 94 self.init_script = self.default_inits.get(new, '')
91 95
92 96 init_script = Unicode('', config=True,
93 97 help="Initialization code for MPI")
94 98
95 99 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
96 100 config=True)
97 101
98 102
99 103 #-----------------------------------------------------------------------------
100 104 # Main application
101 105 #-----------------------------------------------------------------------------
106 aliases = dict(
107 file = 'IPEngineApp.url_file',
108 c = 'IPEngineApp.startup_command',
109 s = 'IPEngineApp.startup_script',
102 110
111 ident = 'Session.session',
112 user = 'Session.username',
113 exec_key = 'Session.keyfile',
114
115 url = 'EngineFactory.url',
116 ip = 'EngineFactory.ip',
117 transport = 'EngineFactory.transport',
118 port = 'EngineFactory.regport',
119 location = 'EngineFactory.location',
120
121 timeout = 'EngineFactory.timeout',
122
123 mpi = 'MPI.use',
124
125 )
126 aliases.update(base_aliases)
103 127
104 128 class IPEngineApp(BaseParallelApplication):
105 129
106 130 name = Unicode(u'ipengine')
107 131 description = Unicode(_description)
108 132 config_file_name = Unicode(default_config_file_name)
109 133 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
110 134
111 135 startup_script = Unicode(u'', config=True,
112 136 help='specify a script to be run at startup')
113 137 startup_command = Unicode('', config=True,
114 138 help='specify a command to be run at startup')
115 139
116 140 url_file = Unicode(u'', config=True,
117 141 help="""The full location of the file containing the connection information for
118 142 the controller. If this is not given, the file must be in the
119 143 security directory of the cluster directory. This location is
120 144 resolved using the `profile` or `profile_dir` options.""",
121 145 )
122 146
123 147 url_file_name = Unicode(u'ipcontroller-engine.json')
124 148 log_url = Unicode('', config=True,
125 149 help="""The URL for the iploggerapp instance, for forwarding
126 150 logging to a central location.""")
127 151
128 aliases = Dict(dict(
129 file = 'IPEngineApp.url_file',
130 c = 'IPEngineApp.startup_command',
131 s = 'IPEngineApp.startup_script',
132
133 ident = 'Session.session',
134 user = 'Session.username',
135 exec_key = 'Session.keyfile',
136
137 url = 'EngineFactory.url',
138 ip = 'EngineFactory.ip',
139 transport = 'EngineFactory.transport',
140 port = 'EngineFactory.regport',
141 location = 'EngineFactory.location',
142
143 timeout = 'EngineFactory.timeout',
144
145 profile = "IPEngineApp.profile",
146 profile_dir = 'ProfileDir.location',
147
148 mpi = 'MPI.use',
149
150 log_level = 'IPEngineApp.log_level',
151 log_url = 'IPEngineApp.log_url'
152 ))
152 aliases = Dict(aliases)
153 153
154 154 # def find_key_file(self):
155 155 # """Set the key file.
156 156 #
157 157 # Here we don't try to actually see if it exists for is valid as that
158 158 # is hadled by the connection logic.
159 159 # """
160 160 # config = self.master_config
161 161 # # Find the actual controller key file
162 162 # if not config.Global.key_file:
163 163 # try_this = os.path.join(
164 164 # config.Global.profile_dir,
165 165 # config.Global.security_dir,
166 166 # config.Global.key_file_name
167 167 # )
168 168 # config.Global.key_file = try_this
169 169
170 170 def find_url_file(self):
171 171 """Set the key file.
172 172
173 173 Here we don't try to actually see if it exists for is valid as that
174 174 is hadled by the connection logic.
175 175 """
176 176 config = self.config
177 177 # Find the actual controller key file
178 178 if not self.url_file:
179 179 self.url_file = os.path.join(
180 180 self.profile_dir.security_dir,
181 181 self.url_file_name
182 182 )
183 183 def init_engine(self):
184 184 # This is the working dir by now.
185 185 sys.path.insert(0, '')
186 186 config = self.config
187 187 # print config
188 188 self.find_url_file()
189 189
190 190 # if os.path.exists(config.Global.key_file) and config.Global.secure:
191 191 # config.SessionFactory.exec_key = config.Global.key_file
192 192 if os.path.exists(self.url_file):
193 193 with open(self.url_file) as f:
194 194 d = json.loads(f.read())
195 195 for k,v in d.iteritems():
196 196 if isinstance(v, unicode):
197 197 d[k] = v.encode()
198 198 if d['exec_key']:
199 199 config.Session.key = d['exec_key']
200 200 d['url'] = disambiguate_url(d['url'], d['location'])
201 201 config.EngineFactory.url = d['url']
202 202 config.EngineFactory.location = d['location']
203 203
204 204 try:
205 205 exec_lines = config.Kernel.exec_lines
206 206 except AttributeError:
207 207 config.Kernel.exec_lines = []
208 208 exec_lines = config.Kernel.exec_lines
209 209
210 210 if self.startup_script:
211 211 enc = sys.getfilesystemencoding() or 'utf8'
212 212 cmd="execfile(%r)"%self.startup_script.encode(enc)
213 213 exec_lines.append(cmd)
214 214 if self.startup_command:
215 215 exec_lines.append(self.startup_command)
216 216
217 217 # Create the underlying shell class and Engine
218 218 # shell_class = import_item(self.master_config.Global.shell_class)
219 219 # print self.config
220 220 try:
221 221 self.engine = EngineFactory(config=config, log=self.log)
222 222 except:
223 223 self.log.error("Couldn't start the Engine", exc_info=True)
224 224 self.exit(1)
225 225
226 226 def forward_logging(self):
227 227 if self.log_url:
228 228 self.log.info("Forwarding logging to %s"%self.log_url)
229 229 context = self.engine.context
230 230 lsock = context.socket(zmq.PUB)
231 231 lsock.connect(self.log_url)
232 232 self.log.removeHandler(self._log_handler)
233 233 handler = EnginePUBHandler(self.engine, lsock)
234 234 handler.setLevel(self.log_level)
235 235 self.log.addHandler(handler)
236 236 self._log_handler = handler
237 237 #
238 238 def init_mpi(self):
239 239 global mpi
240 240 self.mpi = MPI(config=self.config)
241 241
242 242 mpi_import_statement = self.mpi.init_script
243 243 if mpi_import_statement:
244 244 try:
245 245 self.log.info("Initializing MPI:")
246 246 self.log.info(mpi_import_statement)
247 247 exec mpi_import_statement in globals()
248 248 except:
249 249 mpi = None
250 250 else:
251 251 mpi = None
252 252
253 253 def initialize(self, argv=None):
254 254 super(IPEngineApp, self).initialize(argv)
255 255 self.init_mpi()
256 256 self.init_engine()
257 257 self.forward_logging()
258 258
259 259 def start(self):
260 260 self.engine.start()
261 261 try:
262 262 self.engine.loop.start()
263 263 except KeyboardInterrupt:
264 264 self.log.critical("Engine Interrupted, shutting down...\n")
265 265
266 266
267 267 def launch_new_instance():
268 268 """Create and run the IPython engine"""
269 269 app = IPEngineApp.instance()
270 270 app.initialize()
271 271 app.start()
272 272
273 273
274 274 if __name__ == '__main__':
275 275 launch_new_instance()
276 276
General Comments 0
You need to be logged in to leave comments. Login now