##// END OF EJS Templates
use App.instance() in launch_new_instance (parallel apps)...
MinRK -
Show More
@@ -1,521 +1,521 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import errno
18 import errno
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import signal
22 import signal
23
23
24 from subprocess import check_call, CalledProcessError, PIPE
24 from subprocess import check_call, CalledProcessError, PIPE
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27
27
28 from IPython.config.application import Application, boolean_flag
28 from IPython.config.application import Application, boolean_flag
29 from IPython.config.loader import Config
29 from IPython.config.loader import Config
30 from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
30 from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
31 from IPython.utils.importstring import import_item
31 from IPython.utils.importstring import import_item
32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
33
33
34 from IPython.parallel.apps.baseapp import (
34 from IPython.parallel.apps.baseapp import (
35 BaseParallelApplication,
35 BaseParallelApplication,
36 PIDFileError,
36 PIDFileError,
37 base_flags, base_aliases
37 base_flags, base_aliases
38 )
38 )
39
39
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Module level variables
42 # Module level variables
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45
45
46 default_config_file_name = u'ipcluster_config.py'
46 default_config_file_name = u'ipcluster_config.py'
47
47
48
48
49 _description = """Start an IPython cluster for parallel computing.
49 _description = """Start an IPython cluster for parallel computing.
50
50
51 An IPython cluster consists of 1 controller and 1 or more engines.
51 An IPython cluster consists of 1 controller and 1 or more engines.
52 This command automates the startup of these processes using a wide
52 This command automates the startup of these processes using a wide
53 range of startup methods (SSH, local processes, PBS, mpiexec,
53 range of startup methods (SSH, local processes, PBS, mpiexec,
54 Windows HPC Server 2008). To start a cluster with 4 engines on your
54 Windows HPC Server 2008). To start a cluster with 4 engines on your
55 local host simply do 'ipcluster start n=4'. For more complex usage
55 local host simply do 'ipcluster start n=4'. For more complex usage
56 you will typically do 'ipcluster create profile=mycluster', then edit
56 you will typically do 'ipcluster create profile=mycluster', then edit
57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
58 """
58 """
59
59
60
60
61 # Exit codes for ipcluster
61 # Exit codes for ipcluster
62
62
63 # This will be the exit code if the ipcluster appears to be running because
63 # This will be the exit code if the ipcluster appears to be running because
64 # a .pid file exists
64 # a .pid file exists
65 ALREADY_STARTED = 10
65 ALREADY_STARTED = 10
66
66
67
67
68 # This will be the exit code if ipcluster stop is run, but there is not .pid
68 # This will be the exit code if ipcluster stop is run, but there is not .pid
69 # file to be found.
69 # file to be found.
70 ALREADY_STOPPED = 11
70 ALREADY_STOPPED = 11
71
71
72 # This will be the exit code if ipcluster engines is run, but there is not .pid
72 # This will be the exit code if ipcluster engines is run, but there is not .pid
73 # file to be found.
73 # file to be found.
74 NO_CLUSTER = 12
74 NO_CLUSTER = 12
75
75
76
76
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78 # Main application
78 # Main application
79 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
80 start_help = """Start an IPython cluster for parallel computing
80 start_help = """Start an IPython cluster for parallel computing
81
81
82 Start an ipython cluster by its profile name or cluster
82 Start an ipython cluster by its profile name or cluster
83 directory. Cluster directories contain configuration, log and
83 directory. Cluster directories contain configuration, log and
84 security related files and are named using the convention
84 security related files and are named using the convention
85 'cluster_<profile>' and should be creating using the 'start'
85 'cluster_<profile>' and should be creating using the 'start'
86 subcommand of 'ipcluster'. If your cluster directory is in
86 subcommand of 'ipcluster'. If your cluster directory is in
87 the cwd or the ipython directory, you can simply refer to it
87 the cwd or the ipython directory, you can simply refer to it
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
89 otherwise use the 'profile_dir' option.
89 otherwise use the 'profile_dir' option.
90 """
90 """
91 stop_help = """Stop a running IPython cluster
91 stop_help = """Stop a running IPython cluster
92
92
93 Stop a running ipython cluster by its profile name or cluster
93 Stop a running ipython cluster by its profile name or cluster
94 directory. Cluster directories are named using the convention
94 directory. Cluster directories are named using the convention
95 'cluster_<profile>'. If your cluster directory is in
95 'cluster_<profile>'. If your cluster directory is in
96 the cwd or the ipython directory, you can simply refer to it
96 the cwd or the ipython directory, you can simply refer to it
97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
98 use the 'profile_dir' option.
98 use the 'profile_dir' option.
99 """
99 """
100 engines_help = """Start engines connected to an existing IPython cluster
100 engines_help = """Start engines connected to an existing IPython cluster
101
101
102 Start one or more engines to connect to an existing Cluster
102 Start one or more engines to connect to an existing Cluster
103 by profile name or cluster directory.
103 by profile name or cluster directory.
104 Cluster directories contain configuration, log and
104 Cluster directories contain configuration, log and
105 security related files and are named using the convention
105 security related files and are named using the convention
106 'cluster_<profile>' and should be creating using the 'start'
106 'cluster_<profile>' and should be creating using the 'start'
107 subcommand of 'ipcluster'. If your cluster directory is in
107 subcommand of 'ipcluster'. If your cluster directory is in
108 the cwd or the ipython directory, you can simply refer to it
108 the cwd or the ipython directory, you can simply refer to it
109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
110 otherwise use the 'profile_dir' option.
110 otherwise use the 'profile_dir' option.
111 """
111 """
112 create_help = """Create an ipcluster profile by name
112 create_help = """Create an ipcluster profile by name
113
113
114 Create an ipython cluster directory by its profile name or
114 Create an ipython cluster directory by its profile name or
115 cluster directory path. Cluster directories contain
115 cluster directory path. Cluster directories contain
116 configuration, log and security related files and are named
116 configuration, log and security related files and are named
117 using the convention 'cluster_<profile>'. By default they are
117 using the convention 'cluster_<profile>'. By default they are
118 located in your ipython directory. Once created, you will
118 located in your ipython directory. Once created, you will
119 probably need to edit the configuration files in the cluster
119 probably need to edit the configuration files in the cluster
120 directory to configure your cluster. Most users will create a
120 directory to configure your cluster. Most users will create a
121 cluster directory by profile name,
121 cluster directory by profile name,
122 `ipcluster create profile=mycluster`, which will put the directory
122 `ipcluster create profile=mycluster`, which will put the directory
123 in `<ipython_dir>/cluster_mycluster`.
123 in `<ipython_dir>/cluster_mycluster`.
124 """
124 """
125 list_help = """List available cluster profiles
125 list_help = """List available cluster profiles
126
126
127 List all available clusters, by cluster directory, that can
127 List all available clusters, by cluster directory, that can
128 be found in the current working directly or in the ipython
128 be found in the current working directly or in the ipython
129 directory. Cluster directories are named using the convention
129 directory. Cluster directories are named using the convention
130 'cluster_<profile>'.
130 'cluster_<profile>'.
131 """
131 """
132
132
133
133
134 class IPClusterList(BaseIPythonApplication):
134 class IPClusterList(BaseIPythonApplication):
135 name = u'ipcluster-list'
135 name = u'ipcluster-list'
136 description = list_help
136 description = list_help
137
137
138 # empty aliases
138 # empty aliases
139 aliases=Dict()
139 aliases=Dict()
140 flags = Dict(base_flags)
140 flags = Dict(base_flags)
141
141
142 def _log_level_default(self):
142 def _log_level_default(self):
143 return 20
143 return 20
144
144
145 def list_profile_dirs(self):
145 def list_profile_dirs(self):
146 # Find the search paths
146 # Find the search paths
147 profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
147 profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
148 if profile_dir_paths:
148 if profile_dir_paths:
149 profile_dir_paths = profile_dir_paths.split(':')
149 profile_dir_paths = profile_dir_paths.split(':')
150 else:
150 else:
151 profile_dir_paths = []
151 profile_dir_paths = []
152
152
153 ipython_dir = self.ipython_dir
153 ipython_dir = self.ipython_dir
154
154
155 paths = [os.getcwd(), ipython_dir] + profile_dir_paths
155 paths = [os.getcwd(), ipython_dir] + profile_dir_paths
156 paths = list(set(paths))
156 paths = list(set(paths))
157
157
158 self.log.info('Searching for cluster profiles in paths: %r' % paths)
158 self.log.info('Searching for cluster profiles in paths: %r' % paths)
159 for path in paths:
159 for path in paths:
160 files = os.listdir(path)
160 files = os.listdir(path)
161 for f in files:
161 for f in files:
162 full_path = os.path.join(path, f)
162 full_path = os.path.join(path, f)
163 if os.path.isdir(full_path) and f.startswith('profile_') and \
163 if os.path.isdir(full_path) and f.startswith('profile_') and \
164 os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
164 os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
165 profile = f.split('_')[-1]
165 profile = f.split('_')[-1]
166 start_cmd = 'ipcluster start profile=%s n=4' % profile
166 start_cmd = 'ipcluster start profile=%s n=4' % profile
167 print start_cmd + " ==> " + full_path
167 print start_cmd + " ==> " + full_path
168
168
169 def start(self):
169 def start(self):
170 self.list_profile_dirs()
170 self.list_profile_dirs()
171
171
172
172
173 # `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
173 # `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
174
174
175 create_flags = {}
175 create_flags = {}
176 create_flags.update(base_flags)
176 create_flags.update(base_flags)
177 create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
177 create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
178 "reset config files to defaults", "leave existing config files"))
178 "reset config files to defaults", "leave existing config files"))
179
179
180 class IPClusterCreate(BaseParallelApplication):
180 class IPClusterCreate(BaseParallelApplication):
181 name = u'ipcluster-create'
181 name = u'ipcluster-create'
182 description = create_help
182 description = create_help
183 auto_create = Bool(True)
183 auto_create = Bool(True)
184 config_file_name = Unicode(default_config_file_name)
184 config_file_name = Unicode(default_config_file_name)
185
185
186 flags = Dict(create_flags)
186 flags = Dict(create_flags)
187
187
188 aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
188 aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
189
189
190 classes = [ProfileDir]
190 classes = [ProfileDir]
191
191
192
192
193 stop_aliases = dict(
193 stop_aliases = dict(
194 signal='IPClusterStop.signal',
194 signal='IPClusterStop.signal',
195 profile='BaseIPythonApplication.profile',
195 profile='BaseIPythonApplication.profile',
196 profile_dir='ProfileDir.location',
196 profile_dir='ProfileDir.location',
197 )
197 )
198
198
199 class IPClusterStop(BaseParallelApplication):
199 class IPClusterStop(BaseParallelApplication):
200 name = u'ipcluster'
200 name = u'ipcluster'
201 description = stop_help
201 description = stop_help
202 config_file_name = Unicode(default_config_file_name)
202 config_file_name = Unicode(default_config_file_name)
203
203
204 signal = Int(signal.SIGINT, config=True,
204 signal = Int(signal.SIGINT, config=True,
205 help="signal to use for stopping processes.")
205 help="signal to use for stopping processes.")
206
206
207 aliases = Dict(stop_aliases)
207 aliases = Dict(stop_aliases)
208
208
209 def start(self):
209 def start(self):
210 """Start the app for the stop subcommand."""
210 """Start the app for the stop subcommand."""
211 try:
211 try:
212 pid = self.get_pid_from_file()
212 pid = self.get_pid_from_file()
213 except PIDFileError:
213 except PIDFileError:
214 self.log.critical(
214 self.log.critical(
215 'Could not read pid file, cluster is probably not running.'
215 'Could not read pid file, cluster is probably not running.'
216 )
216 )
217 # Here I exit with a unusual exit status that other processes
217 # Here I exit with a unusual exit status that other processes
218 # can watch for to learn how I existed.
218 # can watch for to learn how I existed.
219 self.remove_pid_file()
219 self.remove_pid_file()
220 self.exit(ALREADY_STOPPED)
220 self.exit(ALREADY_STOPPED)
221
221
222 if not self.check_pid(pid):
222 if not self.check_pid(pid):
223 self.log.critical(
223 self.log.critical(
224 'Cluster [pid=%r] is not running.' % pid
224 'Cluster [pid=%r] is not running.' % pid
225 )
225 )
226 self.remove_pid_file()
226 self.remove_pid_file()
227 # Here I exit with a unusual exit status that other processes
227 # Here I exit with a unusual exit status that other processes
228 # can watch for to learn how I existed.
228 # can watch for to learn how I existed.
229 self.exit(ALREADY_STOPPED)
229 self.exit(ALREADY_STOPPED)
230
230
231 elif os.name=='posix':
231 elif os.name=='posix':
232 sig = self.signal
232 sig = self.signal
233 self.log.info(
233 self.log.info(
234 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
234 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
235 )
235 )
236 try:
236 try:
237 os.kill(pid, sig)
237 os.kill(pid, sig)
238 except OSError:
238 except OSError:
239 self.log.error("Stopping cluster failed, assuming already dead.",
239 self.log.error("Stopping cluster failed, assuming already dead.",
240 exc_info=True)
240 exc_info=True)
241 self.remove_pid_file()
241 self.remove_pid_file()
242 elif os.name=='nt':
242 elif os.name=='nt':
243 try:
243 try:
244 # kill the whole tree
244 # kill the whole tree
245 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
245 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
246 except (CalledProcessError, OSError):
246 except (CalledProcessError, OSError):
247 self.log.error("Stopping cluster failed, assuming already dead.",
247 self.log.error("Stopping cluster failed, assuming already dead.",
248 exc_info=True)
248 exc_info=True)
249 self.remove_pid_file()
249 self.remove_pid_file()
250
250
251 engine_aliases = {}
251 engine_aliases = {}
252 engine_aliases.update(base_aliases)
252 engine_aliases.update(base_aliases)
253 engine_aliases.update(dict(
253 engine_aliases.update(dict(
254 n='IPClusterEngines.n',
254 n='IPClusterEngines.n',
255 elauncher = 'IPClusterEngines.engine_launcher_class',
255 elauncher = 'IPClusterEngines.engine_launcher_class',
256 ))
256 ))
257 class IPClusterEngines(BaseParallelApplication):
257 class IPClusterEngines(BaseParallelApplication):
258
258
259 name = u'ipcluster'
259 name = u'ipcluster'
260 description = engines_help
260 description = engines_help
261 usage = None
261 usage = None
262 config_file_name = Unicode(default_config_file_name)
262 config_file_name = Unicode(default_config_file_name)
263 default_log_level = logging.INFO
263 default_log_level = logging.INFO
264 classes = List()
264 classes = List()
265 def _classes_default(self):
265 def _classes_default(self):
266 from IPython.parallel.apps import launcher
266 from IPython.parallel.apps import launcher
267 launchers = launcher.all_launchers
267 launchers = launcher.all_launchers
268 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
268 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
269 return [ProfileDir]+eslaunchers
269 return [ProfileDir]+eslaunchers
270
270
271 n = Int(2, config=True,
271 n = Int(2, config=True,
272 help="The number of engines to start.")
272 help="The number of engines to start.")
273
273
274 engine_launcher_class = Unicode('LocalEngineSetLauncher',
274 engine_launcher_class = Unicode('LocalEngineSetLauncher',
275 config=True,
275 config=True,
276 help="The class for launching a set of Engines."
276 help="The class for launching a set of Engines."
277 )
277 )
278 daemonize = Bool(False, config=True,
278 daemonize = Bool(False, config=True,
279 help='Daemonize the ipcluster program. This implies --log-to-file')
279 help='Daemonize the ipcluster program. This implies --log-to-file')
280
280
281 def _daemonize_changed(self, name, old, new):
281 def _daemonize_changed(self, name, old, new):
282 if new:
282 if new:
283 self.log_to_file = True
283 self.log_to_file = True
284
284
285 aliases = Dict(engine_aliases)
285 aliases = Dict(engine_aliases)
286 # flags = Dict(flags)
286 # flags = Dict(flags)
287 _stopping = False
287 _stopping = False
288
288
289 def initialize(self, argv=None):
289 def initialize(self, argv=None):
290 super(IPClusterEngines, self).initialize(argv)
290 super(IPClusterEngines, self).initialize(argv)
291 self.init_signal()
291 self.init_signal()
292 self.init_launchers()
292 self.init_launchers()
293
293
294 def init_launchers(self):
294 def init_launchers(self):
295 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
295 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
296 self.engine_launcher.on_stop(lambda r: self.loop.stop())
296 self.engine_launcher.on_stop(lambda r: self.loop.stop())
297
297
298 def init_signal(self):
298 def init_signal(self):
299 # Setup signals
299 # Setup signals
300 signal.signal(signal.SIGINT, self.sigint_handler)
300 signal.signal(signal.SIGINT, self.sigint_handler)
301
301
302 def build_launcher(self, clsname):
302 def build_launcher(self, clsname):
303 """import and instantiate a Launcher based on importstring"""
303 """import and instantiate a Launcher based on importstring"""
304 if '.' not in clsname:
304 if '.' not in clsname:
305 # not a module, presume it's the raw name in apps.launcher
305 # not a module, presume it's the raw name in apps.launcher
306 clsname = 'IPython.parallel.apps.launcher.'+clsname
306 clsname = 'IPython.parallel.apps.launcher.'+clsname
307 # print repr(clsname)
307 # print repr(clsname)
308 klass = import_item(clsname)
308 klass = import_item(clsname)
309
309
310 launcher = klass(
310 launcher = klass(
311 work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
311 work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
312 )
312 )
313 return launcher
313 return launcher
314
314
315 def start_engines(self):
315 def start_engines(self):
316 self.log.info("Starting %i engines"%self.n)
316 self.log.info("Starting %i engines"%self.n)
317 self.engine_launcher.start(
317 self.engine_launcher.start(
318 self.n,
318 self.n,
319 profile_dir=self.profile_dir.location
319 profile_dir=self.profile_dir.location
320 )
320 )
321
321
322 def stop_engines(self):
322 def stop_engines(self):
323 self.log.info("Stopping Engines...")
323 self.log.info("Stopping Engines...")
324 if self.engine_launcher.running:
324 if self.engine_launcher.running:
325 d = self.engine_launcher.stop()
325 d = self.engine_launcher.stop()
326 return d
326 return d
327 else:
327 else:
328 return None
328 return None
329
329
330 def stop_launchers(self, r=None):
330 def stop_launchers(self, r=None):
331 if not self._stopping:
331 if not self._stopping:
332 self._stopping = True
332 self._stopping = True
333 self.log.error("IPython cluster: stopping")
333 self.log.error("IPython cluster: stopping")
334 self.stop_engines()
334 self.stop_engines()
335 # Wait a few seconds to let things shut down.
335 # Wait a few seconds to let things shut down.
336 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
336 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
337 dc.start()
337 dc.start()
338
338
339 def sigint_handler(self, signum, frame):
339 def sigint_handler(self, signum, frame):
340 self.log.debug("SIGINT received, stopping launchers...")
340 self.log.debug("SIGINT received, stopping launchers...")
341 self.stop_launchers()
341 self.stop_launchers()
342
342
343 def start_logging(self):
343 def start_logging(self):
344 # Remove old log files of the controller and engine
344 # Remove old log files of the controller and engine
345 if self.clean_logs:
345 if self.clean_logs:
346 log_dir = self.profile_dir.log_dir
346 log_dir = self.profile_dir.log_dir
347 for f in os.listdir(log_dir):
347 for f in os.listdir(log_dir):
348 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
348 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
349 os.remove(os.path.join(log_dir, f))
349 os.remove(os.path.join(log_dir, f))
350 # This will remove old log files for ipcluster itself
350 # This will remove old log files for ipcluster itself
351 # super(IPBaseParallelApplication, self).start_logging()
351 # super(IPBaseParallelApplication, self).start_logging()
352
352
353 def start(self):
353 def start(self):
354 """Start the app for the engines subcommand."""
354 """Start the app for the engines subcommand."""
355 self.log.info("IPython cluster: started")
355 self.log.info("IPython cluster: started")
356 # First see if the cluster is already running
356 # First see if the cluster is already running
357
357
358 # Now log and daemonize
358 # Now log and daemonize
359 self.log.info(
359 self.log.info(
360 'Starting engines with [daemon=%r]' % self.daemonize
360 'Starting engines with [daemon=%r]' % self.daemonize
361 )
361 )
362 # TODO: Get daemonize working on Windows or as a Windows Server.
362 # TODO: Get daemonize working on Windows or as a Windows Server.
363 if self.daemonize:
363 if self.daemonize:
364 if os.name=='posix':
364 if os.name=='posix':
365 from twisted.scripts._twistd_unix import daemonize
365 from twisted.scripts._twistd_unix import daemonize
366 daemonize()
366 daemonize()
367
367
368 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
368 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
369 dc.start()
369 dc.start()
370 # Now write the new pid file AFTER our new forked pid is active.
370 # Now write the new pid file AFTER our new forked pid is active.
371 # self.write_pid_file()
371 # self.write_pid_file()
372 try:
372 try:
373 self.loop.start()
373 self.loop.start()
374 except KeyboardInterrupt:
374 except KeyboardInterrupt:
375 pass
375 pass
376 except zmq.ZMQError as e:
376 except zmq.ZMQError as e:
377 if e.errno == errno.EINTR:
377 if e.errno == errno.EINTR:
378 pass
378 pass
379 else:
379 else:
380 raise
380 raise
381
381
382 start_aliases = {}
382 start_aliases = {}
383 start_aliases.update(engine_aliases)
383 start_aliases.update(engine_aliases)
384 start_aliases.update(dict(
384 start_aliases.update(dict(
385 delay='IPClusterStart.delay',
385 delay='IPClusterStart.delay',
386 clean_logs='IPClusterStart.clean_logs',
386 clean_logs='IPClusterStart.clean_logs',
387 ))
387 ))
388
388
389 class IPClusterStart(IPClusterEngines):
389 class IPClusterStart(IPClusterEngines):
390
390
391 name = u'ipcluster'
391 name = u'ipcluster'
392 description = start_help
392 description = start_help
393 default_log_level = logging.INFO
393 default_log_level = logging.INFO
394 auto_create = Bool(True, config=True,
394 auto_create = Bool(True, config=True,
395 help="whether to create the profile_dir if it doesn't exist")
395 help="whether to create the profile_dir if it doesn't exist")
396 classes = List()
396 classes = List()
397 def _classes_default(self,):
397 def _classes_default(self,):
398 from IPython.parallel.apps import launcher
398 from IPython.parallel.apps import launcher
399 return [ProfileDir]+launcher.all_launchers
399 return [ProfileDir]+launcher.all_launchers
400
400
401 clean_logs = Bool(True, config=True,
401 clean_logs = Bool(True, config=True,
402 help="whether to cleanup old logs before starting")
402 help="whether to cleanup old logs before starting")
403
403
404 delay = CFloat(1., config=True,
404 delay = CFloat(1., config=True,
405 help="delay (in s) between starting the controller and the engines")
405 help="delay (in s) between starting the controller and the engines")
406
406
407 controller_launcher_class = Unicode('LocalControllerLauncher',
407 controller_launcher_class = Unicode('LocalControllerLauncher',
408 config=True,
408 config=True,
409 help="The class for launching a Controller."
409 help="The class for launching a Controller."
410 )
410 )
411 reset = Bool(False, config=True,
411 reset = Bool(False, config=True,
412 help="Whether to reset config files as part of '--create'."
412 help="Whether to reset config files as part of '--create'."
413 )
413 )
414
414
415 # flags = Dict(flags)
415 # flags = Dict(flags)
416 aliases = Dict(start_aliases)
416 aliases = Dict(start_aliases)
417
417
418 def init_launchers(self):
418 def init_launchers(self):
419 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
419 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
420 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
420 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
421 self.controller_launcher.on_stop(self.stop_launchers)
421 self.controller_launcher.on_stop(self.stop_launchers)
422
422
423 def start_controller(self):
423 def start_controller(self):
424 self.controller_launcher.start(
424 self.controller_launcher.start(
425 profile_dir=self.profile_dir.location
425 profile_dir=self.profile_dir.location
426 )
426 )
427
427
428 def stop_controller(self):
428 def stop_controller(self):
429 # self.log.info("In stop_controller")
429 # self.log.info("In stop_controller")
430 if self.controller_launcher and self.controller_launcher.running:
430 if self.controller_launcher and self.controller_launcher.running:
431 return self.controller_launcher.stop()
431 return self.controller_launcher.stop()
432
432
433 def stop_launchers(self, r=None):
433 def stop_launchers(self, r=None):
434 if not self._stopping:
434 if not self._stopping:
435 self.stop_controller()
435 self.stop_controller()
436 super(IPClusterStart, self).stop_launchers()
436 super(IPClusterStart, self).stop_launchers()
437
437
438 def start(self):
438 def start(self):
439 """Start the app for the start subcommand."""
439 """Start the app for the start subcommand."""
440 # First see if the cluster is already running
440 # First see if the cluster is already running
441 try:
441 try:
442 pid = self.get_pid_from_file()
442 pid = self.get_pid_from_file()
443 except PIDFileError:
443 except PIDFileError:
444 pass
444 pass
445 else:
445 else:
446 if self.check_pid(pid):
446 if self.check_pid(pid):
447 self.log.critical(
447 self.log.critical(
448 'Cluster is already running with [pid=%s]. '
448 'Cluster is already running with [pid=%s]. '
449 'use "ipcluster stop" to stop the cluster.' % pid
449 'use "ipcluster stop" to stop the cluster.' % pid
450 )
450 )
451 # Here I exit with a unusual exit status that other processes
451 # Here I exit with a unusual exit status that other processes
452 # can watch for to learn how I existed.
452 # can watch for to learn how I existed.
453 self.exit(ALREADY_STARTED)
453 self.exit(ALREADY_STARTED)
454 else:
454 else:
455 self.remove_pid_file()
455 self.remove_pid_file()
456
456
457
457
458 # Now log and daemonize
458 # Now log and daemonize
459 self.log.info(
459 self.log.info(
460 'Starting ipcluster with [daemon=%r]' % self.daemonize
460 'Starting ipcluster with [daemon=%r]' % self.daemonize
461 )
461 )
462 # TODO: Get daemonize working on Windows or as a Windows Server.
462 # TODO: Get daemonize working on Windows or as a Windows Server.
463 if self.daemonize:
463 if self.daemonize:
464 if os.name=='posix':
464 if os.name=='posix':
465 from twisted.scripts._twistd_unix import daemonize
465 from twisted.scripts._twistd_unix import daemonize
466 daemonize()
466 daemonize()
467
467
468 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
468 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
469 dc.start()
469 dc.start()
470 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
470 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
471 dc.start()
471 dc.start()
472 # Now write the new pid file AFTER our new forked pid is active.
472 # Now write the new pid file AFTER our new forked pid is active.
473 self.write_pid_file()
473 self.write_pid_file()
474 try:
474 try:
475 self.loop.start()
475 self.loop.start()
476 except KeyboardInterrupt:
476 except KeyboardInterrupt:
477 pass
477 pass
478 except zmq.ZMQError as e:
478 except zmq.ZMQError as e:
479 if e.errno == errno.EINTR:
479 if e.errno == errno.EINTR:
480 pass
480 pass
481 else:
481 else:
482 raise
482 raise
483 finally:
483 finally:
484 self.remove_pid_file()
484 self.remove_pid_file()
485
485
486 base='IPython.parallel.apps.ipclusterapp.IPCluster'
486 base='IPython.parallel.apps.ipclusterapp.IPCluster'
487
487
488 class IPBaseParallelApplication(Application):
488 class IPBaseParallelApplication(Application):
489 name = u'ipcluster'
489 name = u'ipcluster'
490 description = _description
490 description = _description
491
491
492 subcommands = {'create' : (base+'Create', create_help),
492 subcommands = {'create' : (base+'Create', create_help),
493 'list' : (base+'List', list_help),
493 'list' : (base+'List', list_help),
494 'start' : (base+'Start', start_help),
494 'start' : (base+'Start', start_help),
495 'stop' : (base+'Stop', stop_help),
495 'stop' : (base+'Stop', stop_help),
496 'engines' : (base+'Engines', engines_help),
496 'engines' : (base+'Engines', engines_help),
497 }
497 }
498
498
499 # no aliases or flags for parent App
499 # no aliases or flags for parent App
500 aliases = Dict()
500 aliases = Dict()
501 flags = Dict()
501 flags = Dict()
502
502
503 def start(self):
503 def start(self):
504 if self.subapp is None:
504 if self.subapp is None:
505 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
505 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
506 print
506 print
507 self.print_subcommands()
507 self.print_subcommands()
508 self.exit(1)
508 self.exit(1)
509 else:
509 else:
510 return self.subapp.start()
510 return self.subapp.start()
511
511
512 def launch_new_instance():
512 def launch_new_instance():
513 """Create and run the IPython cluster."""
513 """Create and run the IPython cluster."""
514 app = IPBaseParallelApplication()
514 app = IPBaseParallelApplication.instance()
515 app.initialize()
515 app.initialize()
516 app.start()
516 app.start()
517
517
518
518
519 if __name__ == '__main__':
519 if __name__ == '__main__':
520 launch_new_instance()
520 launch_new_instance()
521
521
@@ -1,398 +1,398 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import socket
21 import socket
22 import stat
22 import stat
23 import sys
23 import sys
24 import uuid
24 import uuid
25
25
26 from multiprocessing import Process
26 from multiprocessing import Process
27
27
28 import zmq
28 import zmq
29 from zmq.devices import ProcessMonitoredQueue
29 from zmq.devices import ProcessMonitoredQueue
30 from zmq.log.handlers import PUBHandler
30 from zmq.log.handlers import PUBHandler
31 from zmq.utils import jsonapi as json
31 from zmq.utils import jsonapi as json
32
32
33 from IPython.core.newapplication import ProfileDir
33 from IPython.core.newapplication import ProfileDir
34
34
35 from IPython.parallel.apps.baseapp import (
35 from IPython.parallel.apps.baseapp import (
36 BaseParallelApplication,
36 BaseParallelApplication,
37 base_flags
37 base_flags
38 )
38 )
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
40 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
41
41
42 # from IPython.parallel.controller.controller import ControllerFactory
42 # from IPython.parallel.controller.controller import ControllerFactory
43 from IPython.parallel.streamsession import StreamSession
43 from IPython.parallel.streamsession import StreamSession
44 from IPython.parallel.controller.heartmonitor import HeartMonitor
44 from IPython.parallel.controller.heartmonitor import HeartMonitor
45 from IPython.parallel.controller.hub import HubFactory
45 from IPython.parallel.controller.hub import HubFactory
46 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
46 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
47 from IPython.parallel.controller.sqlitedb import SQLiteDB
47 from IPython.parallel.controller.sqlitedb import SQLiteDB
48
48
49 from IPython.parallel.util import signal_children, split_url
49 from IPython.parallel.util import signal_children, split_url
50
50
51 # conditional import of MongoDB backend class
51 # conditional import of MongoDB backend class
52
52
53 try:
53 try:
54 from IPython.parallel.controller.mongodb import MongoDB
54 from IPython.parallel.controller.mongodb import MongoDB
55 except ImportError:
55 except ImportError:
56 maybe_mongo = []
56 maybe_mongo = []
57 else:
57 else:
58 maybe_mongo = [MongoDB]
58 maybe_mongo = [MongoDB]
59
59
60
60
61 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
62 # Module level variables
62 # Module level variables
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64
64
65
65
66 #: The default config file name for this application
66 #: The default config file name for this application
67 default_config_file_name = u'ipcontroller_config.py'
67 default_config_file_name = u'ipcontroller_config.py'
68
68
69
69
70 _description = """Start the IPython controller for parallel computing.
70 _description = """Start the IPython controller for parallel computing.
71
71
72 The IPython controller provides a gateway between the IPython engines and
72 The IPython controller provides a gateway between the IPython engines and
73 clients. The controller needs to be started before the engines and can be
73 clients. The controller needs to be started before the engines and can be
74 configured using command line options or using a cluster directory. Cluster
74 configured using command line options or using a cluster directory. Cluster
75 directories contain config, log and security files and are usually located in
75 directories contain config, log and security files and are usually located in
76 your ipython directory and named as "cluster_<profile>". See the `profile`
76 your ipython directory and named as "cluster_<profile>". See the `profile`
77 and `profile_dir` options for details.
77 and `profile_dir` options for details.
78 """
78 """
79
79
80
80
81
81
82
82
83 #-----------------------------------------------------------------------------
83 #-----------------------------------------------------------------------------
84 # The main application
84 # The main application
85 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
86 flags = {}
86 flags = {}
87 flags.update(base_flags)
87 flags.update(base_flags)
88 flags.update({
88 flags.update({
89 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
89 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
90 'Use threads instead of processes for the schedulers'),
90 'Use threads instead of processes for the schedulers'),
91 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
91 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
92 'use the SQLiteDB backend'),
92 'use the SQLiteDB backend'),
93 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
93 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
94 'use the MongoDB backend'),
94 'use the MongoDB backend'),
95 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
95 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
96 'use the in-memory DictDB backend'),
96 'use the in-memory DictDB backend'),
97 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
97 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
98 'reuse existing json connection files')
98 'reuse existing json connection files')
99 })
99 })
100
100
101 flags.update()
101 flags.update()
102
102
103 class IPControllerApp(BaseParallelApplication):
103 class IPControllerApp(BaseParallelApplication):
104
104
105 name = u'ipcontroller'
105 name = u'ipcontroller'
106 description = _description
106 description = _description
107 config_file_name = Unicode(default_config_file_name)
107 config_file_name = Unicode(default_config_file_name)
108 classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
108 classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
109
109
110 # change default to True
110 # change default to True
111 auto_create = Bool(True, config=True,
111 auto_create = Bool(True, config=True,
112 help="""Whether to create profile dir if it doesn't exist""")
112 help="""Whether to create profile dir if it doesn't exist""")
113
113
114 reuse_files = Bool(False, config=True,
114 reuse_files = Bool(False, config=True,
115 help='Whether to reuse existing json connection files [default: False]'
115 help='Whether to reuse existing json connection files [default: False]'
116 )
116 )
117 secure = Bool(True, config=True,
117 secure = Bool(True, config=True,
118 help='Whether to use exec_keys for extra authentication [default: True]'
118 help='Whether to use exec_keys for extra authentication [default: True]'
119 )
119 )
120 ssh_server = Unicode(u'', config=True,
120 ssh_server = Unicode(u'', config=True,
121 help="""ssh url for clients to use when connecting to the Controller
121 help="""ssh url for clients to use when connecting to the Controller
122 processes. It should be of the form: [user@]server[:port]. The
122 processes. It should be of the form: [user@]server[:port]. The
123 Controller\'s listening addresses must be accessible from the ssh server""",
123 Controller\'s listening addresses must be accessible from the ssh server""",
124 )
124 )
125 location = Unicode(u'', config=True,
125 location = Unicode(u'', config=True,
126 help="""The external IP or domain name of the Controller, used for disambiguating
126 help="""The external IP or domain name of the Controller, used for disambiguating
127 engine and client connections.""",
127 engine and client connections.""",
128 )
128 )
129 import_statements = List([], config=True,
129 import_statements = List([], config=True,
130 help="import statements to be run at startup. Necessary in some environments"
130 help="import statements to be run at startup. Necessary in some environments"
131 )
131 )
132
132
133 use_threads = Bool(False, config=True,
133 use_threads = Bool(False, config=True,
134 help='Use threads instead of processes for the schedulers',
134 help='Use threads instead of processes for the schedulers',
135 )
135 )
136
136
137 # internal
137 # internal
138 children = List()
138 children = List()
139 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
139 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
140
140
141 def _use_threads_changed(self, name, old, new):
141 def _use_threads_changed(self, name, old, new):
142 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
142 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
143
143
144 aliases = Dict(dict(
144 aliases = Dict(dict(
145 log_level = 'IPControllerApp.log_level',
145 log_level = 'IPControllerApp.log_level',
146 log_url = 'IPControllerApp.log_url',
146 log_url = 'IPControllerApp.log_url',
147 reuse_files = 'IPControllerApp.reuse_files',
147 reuse_files = 'IPControllerApp.reuse_files',
148 secure = 'IPControllerApp.secure',
148 secure = 'IPControllerApp.secure',
149 ssh = 'IPControllerApp.ssh_server',
149 ssh = 'IPControllerApp.ssh_server',
150 use_threads = 'IPControllerApp.use_threads',
150 use_threads = 'IPControllerApp.use_threads',
151 import_statements = 'IPControllerApp.import_statements',
151 import_statements = 'IPControllerApp.import_statements',
152 location = 'IPControllerApp.location',
152 location = 'IPControllerApp.location',
153
153
154 ident = 'StreamSession.session',
154 ident = 'StreamSession.session',
155 user = 'StreamSession.username',
155 user = 'StreamSession.username',
156 exec_key = 'StreamSession.keyfile',
156 exec_key = 'StreamSession.keyfile',
157
157
158 url = 'HubFactory.url',
158 url = 'HubFactory.url',
159 ip = 'HubFactory.ip',
159 ip = 'HubFactory.ip',
160 transport = 'HubFactory.transport',
160 transport = 'HubFactory.transport',
161 port = 'HubFactory.regport',
161 port = 'HubFactory.regport',
162
162
163 ping = 'HeartMonitor.period',
163 ping = 'HeartMonitor.period',
164
164
165 scheme = 'TaskScheduler.scheme_name',
165 scheme = 'TaskScheduler.scheme_name',
166 hwm = 'TaskScheduler.hwm',
166 hwm = 'TaskScheduler.hwm',
167
167
168
168
169 profile = "BaseIPythonApplication.profile",
169 profile = "BaseIPythonApplication.profile",
170 profile_dir = 'ProfileDir.location',
170 profile_dir = 'ProfileDir.location',
171
171
172 ))
172 ))
173 flags = Dict(flags)
173 flags = Dict(flags)
174
174
175
175
176 def save_connection_dict(self, fname, cdict):
176 def save_connection_dict(self, fname, cdict):
177 """save a connection dict to json file."""
177 """save a connection dict to json file."""
178 c = self.config
178 c = self.config
179 url = cdict['url']
179 url = cdict['url']
180 location = cdict['location']
180 location = cdict['location']
181 if not location:
181 if not location:
182 try:
182 try:
183 proto,ip,port = split_url(url)
183 proto,ip,port = split_url(url)
184 except AssertionError:
184 except AssertionError:
185 pass
185 pass
186 else:
186 else:
187 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
187 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
188 cdict['location'] = location
188 cdict['location'] = location
189 fname = os.path.join(self.profile_dir.security_dir, fname)
189 fname = os.path.join(self.profile_dir.security_dir, fname)
190 with open(fname, 'w') as f:
190 with open(fname, 'w') as f:
191 f.write(json.dumps(cdict, indent=2))
191 f.write(json.dumps(cdict, indent=2))
192 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
192 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
193
193
194 def load_config_from_json(self):
194 def load_config_from_json(self):
195 """load config from existing json connector files."""
195 """load config from existing json connector files."""
196 c = self.config
196 c = self.config
197 # load from engine config
197 # load from engine config
198 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
198 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
199 cfg = json.loads(f.read())
199 cfg = json.loads(f.read())
200 key = c.StreamSession.key = cfg['exec_key']
200 key = c.StreamSession.key = cfg['exec_key']
201 xport,addr = cfg['url'].split('://')
201 xport,addr = cfg['url'].split('://')
202 c.HubFactory.engine_transport = xport
202 c.HubFactory.engine_transport = xport
203 ip,ports = addr.split(':')
203 ip,ports = addr.split(':')
204 c.HubFactory.engine_ip = ip
204 c.HubFactory.engine_ip = ip
205 c.HubFactory.regport = int(ports)
205 c.HubFactory.regport = int(ports)
206 self.location = cfg['location']
206 self.location = cfg['location']
207
207
208 # load client config
208 # load client config
209 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
209 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
210 cfg = json.loads(f.read())
210 cfg = json.loads(f.read())
211 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
211 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
212 xport,addr = cfg['url'].split('://')
212 xport,addr = cfg['url'].split('://')
213 c.HubFactory.client_transport = xport
213 c.HubFactory.client_transport = xport
214 ip,ports = addr.split(':')
214 ip,ports = addr.split(':')
215 c.HubFactory.client_ip = ip
215 c.HubFactory.client_ip = ip
216 self.ssh_server = cfg['ssh']
216 self.ssh_server = cfg['ssh']
217 assert int(ports) == c.HubFactory.regport, "regport mismatch"
217 assert int(ports) == c.HubFactory.regport, "regport mismatch"
218
218
219 def init_hub(self):
219 def init_hub(self):
220 c = self.config
220 c = self.config
221
221
222 self.do_import_statements()
222 self.do_import_statements()
223 reusing = self.reuse_files
223 reusing = self.reuse_files
224 if reusing:
224 if reusing:
225 try:
225 try:
226 self.load_config_from_json()
226 self.load_config_from_json()
227 except (AssertionError,IOError):
227 except (AssertionError,IOError):
228 reusing=False
228 reusing=False
229 # check again, because reusing may have failed:
229 # check again, because reusing may have failed:
230 if reusing:
230 if reusing:
231 pass
231 pass
232 elif self.secure:
232 elif self.secure:
233 key = str(uuid.uuid4())
233 key = str(uuid.uuid4())
234 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
234 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
235 # with open(keyfile, 'w') as f:
235 # with open(keyfile, 'w') as f:
236 # f.write(key)
236 # f.write(key)
237 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
237 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
238 c.StreamSession.key = key
238 c.StreamSession.key = key
239 else:
239 else:
240 key = c.StreamSession.key = ''
240 key = c.StreamSession.key = ''
241
241
242 try:
242 try:
243 self.factory = HubFactory(config=c, log=self.log)
243 self.factory = HubFactory(config=c, log=self.log)
244 # self.start_logging()
244 # self.start_logging()
245 self.factory.init_hub()
245 self.factory.init_hub()
246 except:
246 except:
247 self.log.error("Couldn't construct the Controller", exc_info=True)
247 self.log.error("Couldn't construct the Controller", exc_info=True)
248 self.exit(1)
248 self.exit(1)
249
249
250 if not reusing:
250 if not reusing:
251 # save to new json config files
251 # save to new json config files
252 f = self.factory
252 f = self.factory
253 cdict = {'exec_key' : key,
253 cdict = {'exec_key' : key,
254 'ssh' : self.ssh_server,
254 'ssh' : self.ssh_server,
255 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
255 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
256 'location' : self.location
256 'location' : self.location
257 }
257 }
258 self.save_connection_dict('ipcontroller-client.json', cdict)
258 self.save_connection_dict('ipcontroller-client.json', cdict)
259 edict = cdict
259 edict = cdict
260 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
260 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
261 self.save_connection_dict('ipcontroller-engine.json', edict)
261 self.save_connection_dict('ipcontroller-engine.json', edict)
262
262
263 #
263 #
264 def init_schedulers(self):
264 def init_schedulers(self):
265 children = self.children
265 children = self.children
266 mq = import_item(str(self.mq_class))
266 mq = import_item(str(self.mq_class))
267
267
268 hub = self.factory
268 hub = self.factory
269 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
269 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
270 # IOPub relay (in a Process)
270 # IOPub relay (in a Process)
271 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
271 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
272 q.bind_in(hub.client_info['iopub'])
272 q.bind_in(hub.client_info['iopub'])
273 q.bind_out(hub.engine_info['iopub'])
273 q.bind_out(hub.engine_info['iopub'])
274 q.setsockopt_out(zmq.SUBSCRIBE, '')
274 q.setsockopt_out(zmq.SUBSCRIBE, '')
275 q.connect_mon(hub.monitor_url)
275 q.connect_mon(hub.monitor_url)
276 q.daemon=True
276 q.daemon=True
277 children.append(q)
277 children.append(q)
278
278
279 # Multiplexer Queue (in a Process)
279 # Multiplexer Queue (in a Process)
280 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
280 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
281 q.bind_in(hub.client_info['mux'])
281 q.bind_in(hub.client_info['mux'])
282 q.setsockopt_in(zmq.IDENTITY, 'mux')
282 q.setsockopt_in(zmq.IDENTITY, 'mux')
283 q.bind_out(hub.engine_info['mux'])
283 q.bind_out(hub.engine_info['mux'])
284 q.connect_mon(hub.monitor_url)
284 q.connect_mon(hub.monitor_url)
285 q.daemon=True
285 q.daemon=True
286 children.append(q)
286 children.append(q)
287
287
288 # Control Queue (in a Process)
288 # Control Queue (in a Process)
289 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
289 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
290 q.bind_in(hub.client_info['control'])
290 q.bind_in(hub.client_info['control'])
291 q.setsockopt_in(zmq.IDENTITY, 'control')
291 q.setsockopt_in(zmq.IDENTITY, 'control')
292 q.bind_out(hub.engine_info['control'])
292 q.bind_out(hub.engine_info['control'])
293 q.connect_mon(hub.monitor_url)
293 q.connect_mon(hub.monitor_url)
294 q.daemon=True
294 q.daemon=True
295 children.append(q)
295 children.append(q)
296 try:
296 try:
297 scheme = self.config.TaskScheduler.scheme_name
297 scheme = self.config.TaskScheduler.scheme_name
298 except AttributeError:
298 except AttributeError:
299 scheme = TaskScheduler.scheme_name.get_default_value()
299 scheme = TaskScheduler.scheme_name.get_default_value()
300 # Task Queue (in a Process)
300 # Task Queue (in a Process)
301 if scheme == 'pure':
301 if scheme == 'pure':
302 self.log.warn("task::using pure XREQ Task scheduler")
302 self.log.warn("task::using pure XREQ Task scheduler")
303 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
303 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
304 # q.setsockopt_out(zmq.HWM, hub.hwm)
304 # q.setsockopt_out(zmq.HWM, hub.hwm)
305 q.bind_in(hub.client_info['task'][1])
305 q.bind_in(hub.client_info['task'][1])
306 q.setsockopt_in(zmq.IDENTITY, 'task')
306 q.setsockopt_in(zmq.IDENTITY, 'task')
307 q.bind_out(hub.engine_info['task'])
307 q.bind_out(hub.engine_info['task'])
308 q.connect_mon(hub.monitor_url)
308 q.connect_mon(hub.monitor_url)
309 q.daemon=True
309 q.daemon=True
310 children.append(q)
310 children.append(q)
311 elif scheme == 'none':
311 elif scheme == 'none':
312 self.log.warn("task::using no Task scheduler")
312 self.log.warn("task::using no Task scheduler")
313
313
314 else:
314 else:
315 self.log.info("task::using Python %s Task scheduler"%scheme)
315 self.log.info("task::using Python %s Task scheduler"%scheme)
316 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
316 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
317 hub.monitor_url, hub.client_info['notification'])
317 hub.monitor_url, hub.client_info['notification'])
318 kwargs = dict(logname='scheduler', loglevel=self.log_level,
318 kwargs = dict(logname='scheduler', loglevel=self.log_level,
319 log_url = self.log_url, config=dict(self.config))
319 log_url = self.log_url, config=dict(self.config))
320 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
320 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
321 q.daemon=True
321 q.daemon=True
322 children.append(q)
322 children.append(q)
323
323
324
324
325 def save_urls(self):
325 def save_urls(self):
326 """save the registration urls to files."""
326 """save the registration urls to files."""
327 c = self.config
327 c = self.config
328
328
329 sec_dir = self.profile_dir.security_dir
329 sec_dir = self.profile_dir.security_dir
330 cf = self.factory
330 cf = self.factory
331
331
332 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
332 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
333 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
333 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
334
334
335 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
335 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
336 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
336 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
337
337
338
338
339 def do_import_statements(self):
339 def do_import_statements(self):
340 statements = self.import_statements
340 statements = self.import_statements
341 for s in statements:
341 for s in statements:
342 try:
342 try:
343 self.log.msg("Executing statement: '%s'" % s)
343 self.log.msg("Executing statement: '%s'" % s)
344 exec s in globals(), locals()
344 exec s in globals(), locals()
345 except:
345 except:
346 self.log.msg("Error running statement: %s" % s)
346 self.log.msg("Error running statement: %s" % s)
347
347
348 def forward_logging(self):
348 def forward_logging(self):
349 if self.log_url:
349 if self.log_url:
350 self.log.info("Forwarding logging to %s"%self.log_url)
350 self.log.info("Forwarding logging to %s"%self.log_url)
351 context = zmq.Context.instance()
351 context = zmq.Context.instance()
352 lsock = context.socket(zmq.PUB)
352 lsock = context.socket(zmq.PUB)
353 lsock.connect(self.log_url)
353 lsock.connect(self.log_url)
354 handler = PUBHandler(lsock)
354 handler = PUBHandler(lsock)
355 self.log.removeHandler(self._log_handler)
355 self.log.removeHandler(self._log_handler)
356 handler.root_topic = 'controller'
356 handler.root_topic = 'controller'
357 handler.setLevel(self.log_level)
357 handler.setLevel(self.log_level)
358 self.log.addHandler(handler)
358 self.log.addHandler(handler)
359 self._log_handler = handler
359 self._log_handler = handler
360 # #
360 # #
361
361
362 def initialize(self, argv=None):
362 def initialize(self, argv=None):
363 super(IPControllerApp, self).initialize(argv)
363 super(IPControllerApp, self).initialize(argv)
364 self.forward_logging()
364 self.forward_logging()
365 self.init_hub()
365 self.init_hub()
366 self.init_schedulers()
366 self.init_schedulers()
367
367
368 def start(self):
368 def start(self):
369 # Start the subprocesses:
369 # Start the subprocesses:
370 self.factory.start()
370 self.factory.start()
371 child_procs = []
371 child_procs = []
372 for child in self.children:
372 for child in self.children:
373 child.start()
373 child.start()
374 if isinstance(child, ProcessMonitoredQueue):
374 if isinstance(child, ProcessMonitoredQueue):
375 child_procs.append(child.launcher)
375 child_procs.append(child.launcher)
376 elif isinstance(child, Process):
376 elif isinstance(child, Process):
377 child_procs.append(child)
377 child_procs.append(child)
378 if child_procs:
378 if child_procs:
379 signal_children(child_procs)
379 signal_children(child_procs)
380
380
381 self.write_pid_file(overwrite=True)
381 self.write_pid_file(overwrite=True)
382
382
383 try:
383 try:
384 self.factory.loop.start()
384 self.factory.loop.start()
385 except KeyboardInterrupt:
385 except KeyboardInterrupt:
386 self.log.critical("Interrupted, Exiting...\n")
386 self.log.critical("Interrupted, Exiting...\n")
387
387
388
388
389
389
390 def launch_new_instance():
390 def launch_new_instance():
391 """Create and run the IPython controller"""
391 """Create and run the IPython controller"""
392 app = IPControllerApp()
392 app = IPControllerApp.instance()
393 app.initialize()
393 app.initialize()
394 app.start()
394 app.start()
395
395
396
396
397 if __name__ == '__main__':
397 if __name__ == '__main__':
398 launch_new_instance()
398 launch_new_instance()
@@ -1,270 +1,270 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import json
18 import json
19 import os
19 import os
20 import sys
20 import sys
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 from IPython.core.newapplication import ProfileDir
25 from IPython.core.newapplication import ProfileDir
26 from IPython.parallel.apps.baseapp import BaseParallelApplication
26 from IPython.parallel.apps.baseapp import BaseParallelApplication
27 from IPython.zmq.log import EnginePUBHandler
27 from IPython.zmq.log import EnginePUBHandler
28
28
29 from IPython.config.configurable import Configurable
29 from IPython.config.configurable import Configurable
30 from IPython.parallel.streamsession import StreamSession
30 from IPython.parallel.streamsession import StreamSession
31 from IPython.parallel.engine.engine import EngineFactory
31 from IPython.parallel.engine.engine import EngineFactory
32 from IPython.parallel.engine.streamkernel import Kernel
32 from IPython.parallel.engine.streamkernel import Kernel
33 from IPython.parallel.util import disambiguate_url
33 from IPython.parallel.util import disambiguate_url
34
34
35 from IPython.utils.importstring import import_item
35 from IPython.utils.importstring import import_item
36 from IPython.utils.traitlets import Bool, Unicode, Dict, List
36 from IPython.utils.traitlets import Bool, Unicode, Dict, List
37
37
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Module level variables
40 # Module level variables
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 #: The default config file name for this application
43 #: The default config file name for this application
44 default_config_file_name = u'ipengine_config.py'
44 default_config_file_name = u'ipengine_config.py'
45
45
46 _description = """Start an IPython engine for parallel computing.
46 _description = """Start an IPython engine for parallel computing.
47
47
48 IPython engines run in parallel and perform computations on behalf of a client
48 IPython engines run in parallel and perform computations on behalf of a client
49 and controller. A controller needs to be started before the engines. The
49 and controller. A controller needs to be started before the engines. The
50 engine can be configured using command line options or using a cluster
50 engine can be configured using command line options or using a cluster
51 directory. Cluster directories contain config, log and security files and are
51 directory. Cluster directories contain config, log and security files and are
52 usually located in your ipython directory and named as "cluster_<profile>".
52 usually located in your ipython directory and named as "cluster_<profile>".
53 See the `profile` and `profile_dir` options for details.
53 See the `profile` and `profile_dir` options for details.
54 """
54 """
55
55
56
56
57 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
58 # MPI configuration
58 # MPI configuration
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60
60
61 mpi4py_init = """from mpi4py import MPI as mpi
61 mpi4py_init = """from mpi4py import MPI as mpi
62 mpi.size = mpi.COMM_WORLD.Get_size()
62 mpi.size = mpi.COMM_WORLD.Get_size()
63 mpi.rank = mpi.COMM_WORLD.Get_rank()
63 mpi.rank = mpi.COMM_WORLD.Get_rank()
64 """
64 """
65
65
66
66
67 pytrilinos_init = """from PyTrilinos import Epetra
67 pytrilinos_init = """from PyTrilinos import Epetra
68 class SimpleStruct:
68 class SimpleStruct:
69 pass
69 pass
70 mpi = SimpleStruct()
70 mpi = SimpleStruct()
71 mpi.rank = 0
71 mpi.rank = 0
72 mpi.size = 0
72 mpi.size = 0
73 """
73 """
74
74
75 class MPI(Configurable):
75 class MPI(Configurable):
76 """Configurable for MPI initialization"""
76 """Configurable for MPI initialization"""
77 use = Unicode('', config=True,
77 use = Unicode('', config=True,
78 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
78 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
79 )
79 )
80
80
81 def _on_use_changed(self, old, new):
81 def _on_use_changed(self, old, new):
82 # load default init script if it's not set
82 # load default init script if it's not set
83 if not self.init_script:
83 if not self.init_script:
84 self.init_script = self.default_inits.get(new, '')
84 self.init_script = self.default_inits.get(new, '')
85
85
86 init_script = Unicode('', config=True,
86 init_script = Unicode('', config=True,
87 help="Initialization code for MPI")
87 help="Initialization code for MPI")
88
88
89 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
89 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
90 config=True)
90 config=True)
91
91
92
92
93 #-----------------------------------------------------------------------------
93 #-----------------------------------------------------------------------------
94 # Main application
94 # Main application
95 #-----------------------------------------------------------------------------
95 #-----------------------------------------------------------------------------
96
96
97
97
98 class IPEngineApp(BaseParallelApplication):
98 class IPEngineApp(BaseParallelApplication):
99
99
100 app_name = Unicode(u'ipengine')
100 app_name = Unicode(u'ipengine')
101 description = Unicode(_description)
101 description = Unicode(_description)
102 config_file_name = Unicode(default_config_file_name)
102 config_file_name = Unicode(default_config_file_name)
103 classes = List([ProfileDir, StreamSession, EngineFactory, Kernel, MPI])
103 classes = List([ProfileDir, StreamSession, EngineFactory, Kernel, MPI])
104
104
105 startup_script = Unicode(u'', config=True,
105 startup_script = Unicode(u'', config=True,
106 help='specify a script to be run at startup')
106 help='specify a script to be run at startup')
107 startup_command = Unicode('', config=True,
107 startup_command = Unicode('', config=True,
108 help='specify a command to be run at startup')
108 help='specify a command to be run at startup')
109
109
110 url_file = Unicode(u'', config=True,
110 url_file = Unicode(u'', config=True,
111 help="""The full location of the file containing the connection information for
111 help="""The full location of the file containing the connection information for
112 the controller. If this is not given, the file must be in the
112 the controller. If this is not given, the file must be in the
113 security directory of the cluster directory. This location is
113 security directory of the cluster directory. This location is
114 resolved using the `profile` or `profile_dir` options.""",
114 resolved using the `profile` or `profile_dir` options.""",
115 )
115 )
116
116
117 url_file_name = Unicode(u'ipcontroller-engine.json')
117 url_file_name = Unicode(u'ipcontroller-engine.json')
118 log_url = Unicode('', config=True,
118 log_url = Unicode('', config=True,
119 help="""The URL for the iploggerapp instance, for forwarding
119 help="""The URL for the iploggerapp instance, for forwarding
120 logging to a central location.""")
120 logging to a central location.""")
121
121
122 aliases = Dict(dict(
122 aliases = Dict(dict(
123 file = 'IPEngineApp.url_file',
123 file = 'IPEngineApp.url_file',
124 c = 'IPEngineApp.startup_command',
124 c = 'IPEngineApp.startup_command',
125 s = 'IPEngineApp.startup_script',
125 s = 'IPEngineApp.startup_script',
126
126
127 ident = 'StreamSession.session',
127 ident = 'StreamSession.session',
128 user = 'StreamSession.username',
128 user = 'StreamSession.username',
129 exec_key = 'StreamSession.keyfile',
129 exec_key = 'StreamSession.keyfile',
130
130
131 url = 'EngineFactory.url',
131 url = 'EngineFactory.url',
132 ip = 'EngineFactory.ip',
132 ip = 'EngineFactory.ip',
133 transport = 'EngineFactory.transport',
133 transport = 'EngineFactory.transport',
134 port = 'EngineFactory.regport',
134 port = 'EngineFactory.regport',
135 location = 'EngineFactory.location',
135 location = 'EngineFactory.location',
136
136
137 timeout = 'EngineFactory.timeout',
137 timeout = 'EngineFactory.timeout',
138
138
139 profile = "IPEngineApp.profile",
139 profile = "IPEngineApp.profile",
140 profile_dir = 'ProfileDir.location',
140 profile_dir = 'ProfileDir.location',
141
141
142 mpi = 'MPI.use',
142 mpi = 'MPI.use',
143
143
144 log_level = 'IPEngineApp.log_level',
144 log_level = 'IPEngineApp.log_level',
145 log_url = 'IPEngineApp.log_url'
145 log_url = 'IPEngineApp.log_url'
146 ))
146 ))
147
147
148 # def find_key_file(self):
148 # def find_key_file(self):
149 # """Set the key file.
149 # """Set the key file.
150 #
150 #
151 # Here we don't try to actually see if it exists for is valid as that
151 # Here we don't try to actually see if it exists for is valid as that
152 # is hadled by the connection logic.
152 # is hadled by the connection logic.
153 # """
153 # """
154 # config = self.master_config
154 # config = self.master_config
155 # # Find the actual controller key file
155 # # Find the actual controller key file
156 # if not config.Global.key_file:
156 # if not config.Global.key_file:
157 # try_this = os.path.join(
157 # try_this = os.path.join(
158 # config.Global.profile_dir,
158 # config.Global.profile_dir,
159 # config.Global.security_dir,
159 # config.Global.security_dir,
160 # config.Global.key_file_name
160 # config.Global.key_file_name
161 # )
161 # )
162 # config.Global.key_file = try_this
162 # config.Global.key_file = try_this
163
163
164 def find_url_file(self):
164 def find_url_file(self):
165 """Set the key file.
165 """Set the key file.
166
166
167 Here we don't try to actually see if it exists for is valid as that
167 Here we don't try to actually see if it exists for is valid as that
168 is hadled by the connection logic.
168 is hadled by the connection logic.
169 """
169 """
170 config = self.config
170 config = self.config
171 # Find the actual controller key file
171 # Find the actual controller key file
172 if not self.url_file:
172 if not self.url_file:
173 self.url_file = os.path.join(
173 self.url_file = os.path.join(
174 self.profile_dir.security_dir,
174 self.profile_dir.security_dir,
175 self.url_file_name
175 self.url_file_name
176 )
176 )
177 def init_engine(self):
177 def init_engine(self):
178 # This is the working dir by now.
178 # This is the working dir by now.
179 sys.path.insert(0, '')
179 sys.path.insert(0, '')
180 config = self.config
180 config = self.config
181 # print config
181 # print config
182 self.find_url_file()
182 self.find_url_file()
183
183
184 # if os.path.exists(config.Global.key_file) and config.Global.secure:
184 # if os.path.exists(config.Global.key_file) and config.Global.secure:
185 # config.SessionFactory.exec_key = config.Global.key_file
185 # config.SessionFactory.exec_key = config.Global.key_file
186 if os.path.exists(self.url_file):
186 if os.path.exists(self.url_file):
187 with open(self.url_file) as f:
187 with open(self.url_file) as f:
188 d = json.loads(f.read())
188 d = json.loads(f.read())
189 for k,v in d.iteritems():
189 for k,v in d.iteritems():
190 if isinstance(v, unicode):
190 if isinstance(v, unicode):
191 d[k] = v.encode()
191 d[k] = v.encode()
192 if d['exec_key']:
192 if d['exec_key']:
193 config.StreamSession.key = d['exec_key']
193 config.StreamSession.key = d['exec_key']
194 d['url'] = disambiguate_url(d['url'], d['location'])
194 d['url'] = disambiguate_url(d['url'], d['location'])
195 config.EngineFactory.url = d['url']
195 config.EngineFactory.url = d['url']
196 config.EngineFactory.location = d['location']
196 config.EngineFactory.location = d['location']
197
197
198 try:
198 try:
199 exec_lines = config.Kernel.exec_lines
199 exec_lines = config.Kernel.exec_lines
200 except AttributeError:
200 except AttributeError:
201 config.Kernel.exec_lines = []
201 config.Kernel.exec_lines = []
202 exec_lines = config.Kernel.exec_lines
202 exec_lines = config.Kernel.exec_lines
203
203
204 if self.startup_script:
204 if self.startup_script:
205 enc = sys.getfilesystemencoding() or 'utf8'
205 enc = sys.getfilesystemencoding() or 'utf8'
206 cmd="execfile(%r)"%self.startup_script.encode(enc)
206 cmd="execfile(%r)"%self.startup_script.encode(enc)
207 exec_lines.append(cmd)
207 exec_lines.append(cmd)
208 if self.startup_command:
208 if self.startup_command:
209 exec_lines.append(self.startup_command)
209 exec_lines.append(self.startup_command)
210
210
211 # Create the underlying shell class and Engine
211 # Create the underlying shell class and Engine
212 # shell_class = import_item(self.master_config.Global.shell_class)
212 # shell_class = import_item(self.master_config.Global.shell_class)
213 # print self.config
213 # print self.config
214 try:
214 try:
215 self.engine = EngineFactory(config=config, log=self.log)
215 self.engine = EngineFactory(config=config, log=self.log)
216 except:
216 except:
217 self.log.error("Couldn't start the Engine", exc_info=True)
217 self.log.error("Couldn't start the Engine", exc_info=True)
218 self.exit(1)
218 self.exit(1)
219
219
220 def forward_logging(self):
220 def forward_logging(self):
221 if self.log_url:
221 if self.log_url:
222 self.log.info("Forwarding logging to %s"%self.log_url)
222 self.log.info("Forwarding logging to %s"%self.log_url)
223 context = self.engine.context
223 context = self.engine.context
224 lsock = context.socket(zmq.PUB)
224 lsock = context.socket(zmq.PUB)
225 lsock.connect(self.log_url)
225 lsock.connect(self.log_url)
226 self.log.removeHandler(self._log_handler)
226 self.log.removeHandler(self._log_handler)
227 handler = EnginePUBHandler(self.engine, lsock)
227 handler = EnginePUBHandler(self.engine, lsock)
228 handler.setLevel(self.log_level)
228 handler.setLevel(self.log_level)
229 self.log.addHandler(handler)
229 self.log.addHandler(handler)
230 self._log_handler = handler
230 self._log_handler = handler
231 #
231 #
232 def init_mpi(self):
232 def init_mpi(self):
233 global mpi
233 global mpi
234 self.mpi = MPI(config=self.config)
234 self.mpi = MPI(config=self.config)
235
235
236 mpi_import_statement = self.mpi.init_script
236 mpi_import_statement = self.mpi.init_script
237 if mpi_import_statement:
237 if mpi_import_statement:
238 try:
238 try:
239 self.log.info("Initializing MPI:")
239 self.log.info("Initializing MPI:")
240 self.log.info(mpi_import_statement)
240 self.log.info(mpi_import_statement)
241 exec mpi_import_statement in globals()
241 exec mpi_import_statement in globals()
242 except:
242 except:
243 mpi = None
243 mpi = None
244 else:
244 else:
245 mpi = None
245 mpi = None
246
246
247 def initialize(self, argv=None):
247 def initialize(self, argv=None):
248 super(IPEngineApp, self).initialize(argv)
248 super(IPEngineApp, self).initialize(argv)
249 self.init_mpi()
249 self.init_mpi()
250 self.init_engine()
250 self.init_engine()
251 self.forward_logging()
251 self.forward_logging()
252
252
253 def start(self):
253 def start(self):
254 self.engine.start()
254 self.engine.start()
255 try:
255 try:
256 self.engine.loop.start()
256 self.engine.loop.start()
257 except KeyboardInterrupt:
257 except KeyboardInterrupt:
258 self.log.critical("Engine Interrupted, shutting down...\n")
258 self.log.critical("Engine Interrupted, shutting down...\n")
259
259
260
260
261 def launch_new_instance():
261 def launch_new_instance():
262 """Create and run the IPython engine"""
262 """Create and run the IPython engine"""
263 app = IPEngineApp()
263 app = IPEngineApp.instance()
264 app.initialize()
264 app.initialize()
265 app.start()
265 app.start()
266
266
267
267
268 if __name__ == '__main__':
268 if __name__ == '__main__':
269 launch_new_instance()
269 launch_new_instance()
270
270
@@ -1,96 +1,96 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 A simple IPython logger application
4 A simple IPython logger application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
8 # Copyright (C) 2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import sys
19 import sys
20
20
21 import zmq
21 import zmq
22
22
23 from IPython.core.newapplication import ProfileDir
23 from IPython.core.newapplication import ProfileDir
24 from IPython.utils.traitlets import Bool, Dict, Unicode
24 from IPython.utils.traitlets import Bool, Dict, Unicode
25
25
26 from IPython.parallel.apps.baseapp import (
26 from IPython.parallel.apps.baseapp import (
27 BaseParallelApplication,
27 BaseParallelApplication,
28 base_aliases
28 base_aliases
29 )
29 )
30 from IPython.parallel.apps.logwatcher import LogWatcher
30 from IPython.parallel.apps.logwatcher import LogWatcher
31
31
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33 # Module level variables
33 # Module level variables
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35
35
36 #: The default config file name for this application
36 #: The default config file name for this application
37 default_config_file_name = u'iplogger_config.py'
37 default_config_file_name = u'iplogger_config.py'
38
38
39 _description = """Start an IPython logger for parallel computing.
39 _description = """Start an IPython logger for parallel computing.
40
40
41 IPython controllers and engines (and your own processes) can broadcast log messages
41 IPython controllers and engines (and your own processes) can broadcast log messages
42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
43 logger can be configured using command line options or using a cluster
43 logger can be configured using command line options or using a cluster
44 directory. Cluster directories contain config, log and security files and are
44 directory. Cluster directories contain config, log and security files and are
45 usually located in your ipython directory and named as "cluster_<profile>".
45 usually located in your ipython directory and named as "cluster_<profile>".
46 See the `profile` and `profile_dir` options for details.
46 See the `profile` and `profile_dir` options for details.
47 """
47 """
48
48
49
49
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51 # Main application
51 # Main application
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 aliases = {}
53 aliases = {}
54 aliases.update(base_aliases)
54 aliases.update(base_aliases)
55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
56
56
57 class IPLoggerApp(BaseParallelApplication):
57 class IPLoggerApp(BaseParallelApplication):
58
58
59 name = u'iploggerz'
59 name = u'iploggerz'
60 description = _description
60 description = _description
61 config_file_name = Unicode(default_config_file_name)
61 config_file_name = Unicode(default_config_file_name)
62
62
63 classes = [LogWatcher, ProfileDir]
63 classes = [LogWatcher, ProfileDir]
64 aliases = Dict(aliases)
64 aliases = Dict(aliases)
65
65
66 def initialize(self, argv=None):
66 def initialize(self, argv=None):
67 super(IPLoggerApp, self).initialize(argv)
67 super(IPLoggerApp, self).initialize(argv)
68 self.init_watcher()
68 self.init_watcher()
69
69
70 def init_watcher(self):
70 def init_watcher(self):
71 try:
71 try:
72 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
72 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
73 except:
73 except:
74 self.log.error("Couldn't start the LogWatcher", exc_info=True)
74 self.log.error("Couldn't start the LogWatcher", exc_info=True)
75 self.exit(1)
75 self.exit(1)
76 self.log.info("Listening for log messages on %r"%self.watcher.url)
76 self.log.info("Listening for log messages on %r"%self.watcher.url)
77
77
78
78
79 def start(self):
79 def start(self):
80 self.watcher.start()
80 self.watcher.start()
81 try:
81 try:
82 self.watcher.loop.start()
82 self.watcher.loop.start()
83 except KeyboardInterrupt:
83 except KeyboardInterrupt:
84 self.log.critical("Logging Interrupted, shutting down...\n")
84 self.log.critical("Logging Interrupted, shutting down...\n")
85
85
86
86
87 def launch_new_instance():
87 def launch_new_instance():
88 """Create and run the IPython LogWatcher"""
88 """Create and run the IPython LogWatcher"""
89 app = IPLoggerApp()
89 app = IPLoggerApp.instance()
90 app.initialize()
90 app.initialize()
91 app.start()
91 app.start()
92
92
93
93
94 if __name__ == '__main__':
94 if __name__ == '__main__':
95 launch_new_instance()
95 launch_new_instance()
96
96
General Comments 0
You need to be logged in to leave comments. Login now