##// END OF EJS Templates
rename clusterdir to more descriptive baseapp...
MinRK -
Show More
1 NO CONTENT: file renamed from IPython/parallel/apps/clusterdir.py to IPython/parallel/apps/baseapp.py
NO CONTENT: file renamed from IPython/parallel/apps/clusterdir.py to IPython/parallel/apps/baseapp.py
@@ -1,521 +1,521 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import errno
18 import errno
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import signal
22 import signal
23
23
24 from subprocess import check_call, CalledProcessError, PIPE
24 from subprocess import check_call, CalledProcessError, PIPE
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27
27
28 from IPython.config.application import Application, boolean_flag
28 from IPython.config.application import Application, boolean_flag
29 from IPython.config.loader import Config
29 from IPython.config.loader import Config
30 from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
30 from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
31 from IPython.utils.importstring import import_item
31 from IPython.utils.importstring import import_item
32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
33
33
34 from IPython.parallel.apps.clusterdir import (
34 from IPython.parallel.apps.baseapp import (
35 BaseParallelApplication,
35 BaseParallelApplication,
36 PIDFileError,
36 PIDFileError,
37 base_flags, base_aliases
37 base_flags, base_aliases
38 )
38 )
39
39
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Module level variables
42 # Module level variables
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45
45
46 default_config_file_name = u'ipcluster_config.py'
46 default_config_file_name = u'ipcluster_config.py'
47
47
48
48
49 _description = """Start an IPython cluster for parallel computing.
49 _description = """Start an IPython cluster for parallel computing.
50
50
51 An IPython cluster consists of 1 controller and 1 or more engines.
51 An IPython cluster consists of 1 controller and 1 or more engines.
52 This command automates the startup of these processes using a wide
52 This command automates the startup of these processes using a wide
53 range of startup methods (SSH, local processes, PBS, mpiexec,
53 range of startup methods (SSH, local processes, PBS, mpiexec,
54 Windows HPC Server 2008). To start a cluster with 4 engines on your
54 Windows HPC Server 2008). To start a cluster with 4 engines on your
55 local host simply do 'ipcluster start n=4'. For more complex usage
55 local host simply do 'ipcluster start n=4'. For more complex usage
56 you will typically do 'ipcluster create profile=mycluster', then edit
56 you will typically do 'ipcluster create profile=mycluster', then edit
57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
58 """
58 """
59
59
60
60
61 # Exit codes for ipcluster
61 # Exit codes for ipcluster
62
62
63 # This will be the exit code if the ipcluster appears to be running because
63 # This will be the exit code if the ipcluster appears to be running because
64 # a .pid file exists
64 # a .pid file exists
65 ALREADY_STARTED = 10
65 ALREADY_STARTED = 10
66
66
67
67
68 # This will be the exit code if ipcluster stop is run, but there is not .pid
68 # This will be the exit code if ipcluster stop is run, but there is not .pid
69 # file to be found.
69 # file to be found.
70 ALREADY_STOPPED = 11
70 ALREADY_STOPPED = 11
71
71
72 # This will be the exit code if ipcluster engines is run, but there is not .pid
72 # This will be the exit code if ipcluster engines is run, but there is not .pid
73 # file to be found.
73 # file to be found.
74 NO_CLUSTER = 12
74 NO_CLUSTER = 12
75
75
76
76
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78 # Main application
78 # Main application
79 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
80 start_help = """Start an IPython cluster for parallel computing
80 start_help = """Start an IPython cluster for parallel computing
81
81
82 Start an ipython cluster by its profile name or cluster
82 Start an ipython cluster by its profile name or cluster
83 directory. Cluster directories contain configuration, log and
83 directory. Cluster directories contain configuration, log and
84 security related files and are named using the convention
84 security related files and are named using the convention
85 'cluster_<profile>' and should be creating using the 'start'
85 'cluster_<profile>' and should be creating using the 'start'
86 subcommand of 'ipcluster'. If your cluster directory is in
86 subcommand of 'ipcluster'. If your cluster directory is in
87 the cwd or the ipython directory, you can simply refer to it
87 the cwd or the ipython directory, you can simply refer to it
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
89 otherwise use the 'profile_dir' option.
89 otherwise use the 'profile_dir' option.
90 """
90 """
91 stop_help = """Stop a running IPython cluster
91 stop_help = """Stop a running IPython cluster
92
92
93 Stop a running ipython cluster by its profile name or cluster
93 Stop a running ipython cluster by its profile name or cluster
94 directory. Cluster directories are named using the convention
94 directory. Cluster directories are named using the convention
95 'cluster_<profile>'. If your cluster directory is in
95 'cluster_<profile>'. If your cluster directory is in
96 the cwd or the ipython directory, you can simply refer to it
96 the cwd or the ipython directory, you can simply refer to it
97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
98 use the 'profile_dir' option.
98 use the 'profile_dir' option.
99 """
99 """
100 engines_help = """Start engines connected to an existing IPython cluster
100 engines_help = """Start engines connected to an existing IPython cluster
101
101
102 Start one or more engines to connect to an existing Cluster
102 Start one or more engines to connect to an existing Cluster
103 by profile name or cluster directory.
103 by profile name or cluster directory.
104 Cluster directories contain configuration, log and
104 Cluster directories contain configuration, log and
105 security related files and are named using the convention
105 security related files and are named using the convention
106 'cluster_<profile>' and should be creating using the 'start'
106 'cluster_<profile>' and should be creating using the 'start'
107 subcommand of 'ipcluster'. If your cluster directory is in
107 subcommand of 'ipcluster'. If your cluster directory is in
108 the cwd or the ipython directory, you can simply refer to it
108 the cwd or the ipython directory, you can simply refer to it
109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
110 otherwise use the 'profile_dir' option.
110 otherwise use the 'profile_dir' option.
111 """
111 """
112 create_help = """Create an ipcluster profile by name
112 create_help = """Create an ipcluster profile by name
113
113
114 Create an ipython cluster directory by its profile name or
114 Create an ipython cluster directory by its profile name or
115 cluster directory path. Cluster directories contain
115 cluster directory path. Cluster directories contain
116 configuration, log and security related files and are named
116 configuration, log and security related files and are named
117 using the convention 'cluster_<profile>'. By default they are
117 using the convention 'cluster_<profile>'. By default they are
118 located in your ipython directory. Once created, you will
118 located in your ipython directory. Once created, you will
119 probably need to edit the configuration files in the cluster
119 probably need to edit the configuration files in the cluster
120 directory to configure your cluster. Most users will create a
120 directory to configure your cluster. Most users will create a
121 cluster directory by profile name,
121 cluster directory by profile name,
122 `ipcluster create profile=mycluster`, which will put the directory
122 `ipcluster create profile=mycluster`, which will put the directory
123 in `<ipython_dir>/cluster_mycluster`.
123 in `<ipython_dir>/cluster_mycluster`.
124 """
124 """
125 list_help = """List available cluster profiles
125 list_help = """List available cluster profiles
126
126
127 List all available clusters, by cluster directory, that can
127 List all available clusters, by cluster directory, that can
128 be found in the current working directly or in the ipython
128 be found in the current working directly or in the ipython
129 directory. Cluster directories are named using the convention
129 directory. Cluster directories are named using the convention
130 'cluster_<profile>'.
130 'cluster_<profile>'.
131 """
131 """
132
132
133
133
134 class IPClusterList(BaseIPythonApplication):
134 class IPClusterList(BaseIPythonApplication):
135 name = u'ipcluster-list'
135 name = u'ipcluster-list'
136 description = list_help
136 description = list_help
137
137
138 # empty aliases
138 # empty aliases
139 aliases=Dict()
139 aliases=Dict()
140 flags = Dict(base_flags)
140 flags = Dict(base_flags)
141
141
142 def _log_level_default(self):
142 def _log_level_default(self):
143 return 20
143 return 20
144
144
145 def list_profile_dirs(self):
145 def list_profile_dirs(self):
146 # Find the search paths
146 # Find the search paths
147 profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
147 profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
148 if profile_dir_paths:
148 if profile_dir_paths:
149 profile_dir_paths = profile_dir_paths.split(':')
149 profile_dir_paths = profile_dir_paths.split(':')
150 else:
150 else:
151 profile_dir_paths = []
151 profile_dir_paths = []
152
152
153 ipython_dir = self.ipython_dir
153 ipython_dir = self.ipython_dir
154
154
155 paths = [os.getcwd(), ipython_dir] + profile_dir_paths
155 paths = [os.getcwd(), ipython_dir] + profile_dir_paths
156 paths = list(set(paths))
156 paths = list(set(paths))
157
157
158 self.log.info('Searching for cluster profiles in paths: %r' % paths)
158 self.log.info('Searching for cluster profiles in paths: %r' % paths)
159 for path in paths:
159 for path in paths:
160 files = os.listdir(path)
160 files = os.listdir(path)
161 for f in files:
161 for f in files:
162 full_path = os.path.join(path, f)
162 full_path = os.path.join(path, f)
163 if os.path.isdir(full_path) and f.startswith('profile_') and \
163 if os.path.isdir(full_path) and f.startswith('profile_') and \
164 os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
164 os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
165 profile = f.split('_')[-1]
165 profile = f.split('_')[-1]
166 start_cmd = 'ipcluster start profile=%s n=4' % profile
166 start_cmd = 'ipcluster start profile=%s n=4' % profile
167 print start_cmd + " ==> " + full_path
167 print start_cmd + " ==> " + full_path
168
168
169 def start(self):
169 def start(self):
170 self.list_profile_dirs()
170 self.list_profile_dirs()
171
171
172
172
173 # `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
173 # `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
174
174
175 create_flags = {}
175 create_flags = {}
176 create_flags.update(base_flags)
176 create_flags.update(base_flags)
177 create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
177 create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
178 "reset config files to defaults", "leave existing config files"))
178 "reset config files to defaults", "leave existing config files"))
179
179
180 class IPClusterCreate(BaseParallelApplication):
180 class IPClusterCreate(BaseParallelApplication):
181 name = u'ipcluster-create'
181 name = u'ipcluster-create'
182 description = create_help
182 description = create_help
183 auto_create = Bool(True)
183 auto_create = Bool(True)
184 config_file_name = Unicode(default_config_file_name)
184 config_file_name = Unicode(default_config_file_name)
185
185
186 flags = Dict(create_flags)
186 flags = Dict(create_flags)
187
187
188 aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
188 aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
189
189
190 classes = [ProfileDir]
190 classes = [ProfileDir]
191
191
192
192
193 stop_aliases = dict(
193 stop_aliases = dict(
194 signal='IPClusterStop.signal',
194 signal='IPClusterStop.signal',
195 profile='BaseIPythonApplication.profile',
195 profile='BaseIPythonApplication.profile',
196 profile_dir='ProfileDir.location',
196 profile_dir='ProfileDir.location',
197 )
197 )
198
198
199 class IPClusterStop(BaseParallelApplication):
199 class IPClusterStop(BaseParallelApplication):
200 name = u'ipcluster'
200 name = u'ipcluster'
201 description = stop_help
201 description = stop_help
202 config_file_name = Unicode(default_config_file_name)
202 config_file_name = Unicode(default_config_file_name)
203
203
204 signal = Int(signal.SIGINT, config=True,
204 signal = Int(signal.SIGINT, config=True,
205 help="signal to use for stopping processes.")
205 help="signal to use for stopping processes.")
206
206
207 aliases = Dict(stop_aliases)
207 aliases = Dict(stop_aliases)
208
208
209 def start(self):
209 def start(self):
210 """Start the app for the stop subcommand."""
210 """Start the app for the stop subcommand."""
211 try:
211 try:
212 pid = self.get_pid_from_file()
212 pid = self.get_pid_from_file()
213 except PIDFileError:
213 except PIDFileError:
214 self.log.critical(
214 self.log.critical(
215 'Could not read pid file, cluster is probably not running.'
215 'Could not read pid file, cluster is probably not running.'
216 )
216 )
217 # Here I exit with a unusual exit status that other processes
217 # Here I exit with a unusual exit status that other processes
218 # can watch for to learn how I existed.
218 # can watch for to learn how I existed.
219 self.remove_pid_file()
219 self.remove_pid_file()
220 self.exit(ALREADY_STOPPED)
220 self.exit(ALREADY_STOPPED)
221
221
222 if not self.check_pid(pid):
222 if not self.check_pid(pid):
223 self.log.critical(
223 self.log.critical(
224 'Cluster [pid=%r] is not running.' % pid
224 'Cluster [pid=%r] is not running.' % pid
225 )
225 )
226 self.remove_pid_file()
226 self.remove_pid_file()
227 # Here I exit with a unusual exit status that other processes
227 # Here I exit with a unusual exit status that other processes
228 # can watch for to learn how I existed.
228 # can watch for to learn how I existed.
229 self.exit(ALREADY_STOPPED)
229 self.exit(ALREADY_STOPPED)
230
230
231 elif os.name=='posix':
231 elif os.name=='posix':
232 sig = self.signal
232 sig = self.signal
233 self.log.info(
233 self.log.info(
234 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
234 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
235 )
235 )
236 try:
236 try:
237 os.kill(pid, sig)
237 os.kill(pid, sig)
238 except OSError:
238 except OSError:
239 self.log.error("Stopping cluster failed, assuming already dead.",
239 self.log.error("Stopping cluster failed, assuming already dead.",
240 exc_info=True)
240 exc_info=True)
241 self.remove_pid_file()
241 self.remove_pid_file()
242 elif os.name=='nt':
242 elif os.name=='nt':
243 try:
243 try:
244 # kill the whole tree
244 # kill the whole tree
245 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
245 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
246 except (CalledProcessError, OSError):
246 except (CalledProcessError, OSError):
247 self.log.error("Stopping cluster failed, assuming already dead.",
247 self.log.error("Stopping cluster failed, assuming already dead.",
248 exc_info=True)
248 exc_info=True)
249 self.remove_pid_file()
249 self.remove_pid_file()
250
250
251 engine_aliases = {}
251 engine_aliases = {}
252 engine_aliases.update(base_aliases)
252 engine_aliases.update(base_aliases)
253 engine_aliases.update(dict(
253 engine_aliases.update(dict(
254 n='IPClusterEngines.n',
254 n='IPClusterEngines.n',
255 elauncher = 'IPClusterEngines.engine_launcher_class',
255 elauncher = 'IPClusterEngines.engine_launcher_class',
256 ))
256 ))
257 class IPClusterEngines(BaseParallelApplication):
257 class IPClusterEngines(BaseParallelApplication):
258
258
259 name = u'ipcluster'
259 name = u'ipcluster'
260 description = engines_help
260 description = engines_help
261 usage = None
261 usage = None
262 config_file_name = Unicode(default_config_file_name)
262 config_file_name = Unicode(default_config_file_name)
263 default_log_level = logging.INFO
263 default_log_level = logging.INFO
264 classes = List()
264 classes = List()
265 def _classes_default(self):
265 def _classes_default(self):
266 from IPython.parallel.apps import launcher
266 from IPython.parallel.apps import launcher
267 launchers = launcher.all_launchers
267 launchers = launcher.all_launchers
268 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
268 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
269 return [ProfileDir]+eslaunchers
269 return [ProfileDir]+eslaunchers
270
270
271 n = Int(2, config=True,
271 n = Int(2, config=True,
272 help="The number of engines to start.")
272 help="The number of engines to start.")
273
273
274 engine_launcher_class = Unicode('LocalEngineSetLauncher',
274 engine_launcher_class = Unicode('LocalEngineSetLauncher',
275 config=True,
275 config=True,
276 help="The class for launching a set of Engines."
276 help="The class for launching a set of Engines."
277 )
277 )
278 daemonize = Bool(False, config=True,
278 daemonize = Bool(False, config=True,
279 help='Daemonize the ipcluster program. This implies --log-to-file')
279 help='Daemonize the ipcluster program. This implies --log-to-file')
280
280
281 def _daemonize_changed(self, name, old, new):
281 def _daemonize_changed(self, name, old, new):
282 if new:
282 if new:
283 self.log_to_file = True
283 self.log_to_file = True
284
284
285 aliases = Dict(engine_aliases)
285 aliases = Dict(engine_aliases)
286 # flags = Dict(flags)
286 # flags = Dict(flags)
287 _stopping = False
287 _stopping = False
288
288
289 def initialize(self, argv=None):
289 def initialize(self, argv=None):
290 super(IPClusterEngines, self).initialize(argv)
290 super(IPClusterEngines, self).initialize(argv)
291 self.init_signal()
291 self.init_signal()
292 self.init_launchers()
292 self.init_launchers()
293
293
294 def init_launchers(self):
294 def init_launchers(self):
295 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
295 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
296 self.engine_launcher.on_stop(lambda r: self.loop.stop())
296 self.engine_launcher.on_stop(lambda r: self.loop.stop())
297
297
298 def init_signal(self):
298 def init_signal(self):
299 # Setup signals
299 # Setup signals
300 signal.signal(signal.SIGINT, self.sigint_handler)
300 signal.signal(signal.SIGINT, self.sigint_handler)
301
301
302 def build_launcher(self, clsname):
302 def build_launcher(self, clsname):
303 """import and instantiate a Launcher based on importstring"""
303 """import and instantiate a Launcher based on importstring"""
304 if '.' not in clsname:
304 if '.' not in clsname:
305 # not a module, presume it's the raw name in apps.launcher
305 # not a module, presume it's the raw name in apps.launcher
306 clsname = 'IPython.parallel.apps.launcher.'+clsname
306 clsname = 'IPython.parallel.apps.launcher.'+clsname
307 # print repr(clsname)
307 # print repr(clsname)
308 klass = import_item(clsname)
308 klass = import_item(clsname)
309
309
310 launcher = klass(
310 launcher = klass(
311 work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
311 work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
312 )
312 )
313 return launcher
313 return launcher
314
314
315 def start_engines(self):
315 def start_engines(self):
316 self.log.info("Starting %i engines"%self.n)
316 self.log.info("Starting %i engines"%self.n)
317 self.engine_launcher.start(
317 self.engine_launcher.start(
318 self.n,
318 self.n,
319 profile_dir=self.profile_dir.location
319 profile_dir=self.profile_dir.location
320 )
320 )
321
321
322 def stop_engines(self):
322 def stop_engines(self):
323 self.log.info("Stopping Engines...")
323 self.log.info("Stopping Engines...")
324 if self.engine_launcher.running:
324 if self.engine_launcher.running:
325 d = self.engine_launcher.stop()
325 d = self.engine_launcher.stop()
326 return d
326 return d
327 else:
327 else:
328 return None
328 return None
329
329
330 def stop_launchers(self, r=None):
330 def stop_launchers(self, r=None):
331 if not self._stopping:
331 if not self._stopping:
332 self._stopping = True
332 self._stopping = True
333 self.log.error("IPython cluster: stopping")
333 self.log.error("IPython cluster: stopping")
334 self.stop_engines()
334 self.stop_engines()
335 # Wait a few seconds to let things shut down.
335 # Wait a few seconds to let things shut down.
336 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
336 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
337 dc.start()
337 dc.start()
338
338
339 def sigint_handler(self, signum, frame):
339 def sigint_handler(self, signum, frame):
340 self.log.debug("SIGINT received, stopping launchers...")
340 self.log.debug("SIGINT received, stopping launchers...")
341 self.stop_launchers()
341 self.stop_launchers()
342
342
343 def start_logging(self):
343 def start_logging(self):
344 # Remove old log files of the controller and engine
344 # Remove old log files of the controller and engine
345 if self.clean_logs:
345 if self.clean_logs:
346 log_dir = self.profile_dir.log_dir
346 log_dir = self.profile_dir.log_dir
347 for f in os.listdir(log_dir):
347 for f in os.listdir(log_dir):
348 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
348 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
349 os.remove(os.path.join(log_dir, f))
349 os.remove(os.path.join(log_dir, f))
350 # This will remove old log files for ipcluster itself
350 # This will remove old log files for ipcluster itself
351 # super(IPBaseParallelApplication, self).start_logging()
351 # super(IPBaseParallelApplication, self).start_logging()
352
352
353 def start(self):
353 def start(self):
354 """Start the app for the engines subcommand."""
354 """Start the app for the engines subcommand."""
355 self.log.info("IPython cluster: started")
355 self.log.info("IPython cluster: started")
356 # First see if the cluster is already running
356 # First see if the cluster is already running
357
357
358 # Now log and daemonize
358 # Now log and daemonize
359 self.log.info(
359 self.log.info(
360 'Starting engines with [daemon=%r]' % self.daemonize
360 'Starting engines with [daemon=%r]' % self.daemonize
361 )
361 )
362 # TODO: Get daemonize working on Windows or as a Windows Server.
362 # TODO: Get daemonize working on Windows or as a Windows Server.
363 if self.daemonize:
363 if self.daemonize:
364 if os.name=='posix':
364 if os.name=='posix':
365 from twisted.scripts._twistd_unix import daemonize
365 from twisted.scripts._twistd_unix import daemonize
366 daemonize()
366 daemonize()
367
367
368 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
368 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
369 dc.start()
369 dc.start()
370 # Now write the new pid file AFTER our new forked pid is active.
370 # Now write the new pid file AFTER our new forked pid is active.
371 # self.write_pid_file()
371 # self.write_pid_file()
372 try:
372 try:
373 self.loop.start()
373 self.loop.start()
374 except KeyboardInterrupt:
374 except KeyboardInterrupt:
375 pass
375 pass
376 except zmq.ZMQError as e:
376 except zmq.ZMQError as e:
377 if e.errno == errno.EINTR:
377 if e.errno == errno.EINTR:
378 pass
378 pass
379 else:
379 else:
380 raise
380 raise
381
381
382 start_aliases = {}
382 start_aliases = {}
383 start_aliases.update(engine_aliases)
383 start_aliases.update(engine_aliases)
384 start_aliases.update(dict(
384 start_aliases.update(dict(
385 delay='IPClusterStart.delay',
385 delay='IPClusterStart.delay',
386 clean_logs='IPClusterStart.clean_logs',
386 clean_logs='IPClusterStart.clean_logs',
387 ))
387 ))
388
388
389 class IPClusterStart(IPClusterEngines):
389 class IPClusterStart(IPClusterEngines):
390
390
391 name = u'ipcluster'
391 name = u'ipcluster'
392 description = start_help
392 description = start_help
393 default_log_level = logging.INFO
393 default_log_level = logging.INFO
394 auto_create = Bool(True, config=True,
394 auto_create = Bool(True, config=True,
395 help="whether to create the profile_dir if it doesn't exist")
395 help="whether to create the profile_dir if it doesn't exist")
396 classes = List()
396 classes = List()
397 def _classes_default(self,):
397 def _classes_default(self,):
398 from IPython.parallel.apps import launcher
398 from IPython.parallel.apps import launcher
399 return [ProfileDir]+launcher.all_launchers
399 return [ProfileDir]+launcher.all_launchers
400
400
401 clean_logs = Bool(True, config=True,
401 clean_logs = Bool(True, config=True,
402 help="whether to cleanup old logs before starting")
402 help="whether to cleanup old logs before starting")
403
403
404 delay = CFloat(1., config=True,
404 delay = CFloat(1., config=True,
405 help="delay (in s) between starting the controller and the engines")
405 help="delay (in s) between starting the controller and the engines")
406
406
407 controller_launcher_class = Unicode('LocalControllerLauncher',
407 controller_launcher_class = Unicode('LocalControllerLauncher',
408 config=True,
408 config=True,
409 help="The class for launching a Controller."
409 help="The class for launching a Controller."
410 )
410 )
411 reset = Bool(False, config=True,
411 reset = Bool(False, config=True,
412 help="Whether to reset config files as part of '--create'."
412 help="Whether to reset config files as part of '--create'."
413 )
413 )
414
414
415 # flags = Dict(flags)
415 # flags = Dict(flags)
416 aliases = Dict(start_aliases)
416 aliases = Dict(start_aliases)
417
417
418 def init_launchers(self):
418 def init_launchers(self):
419 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
419 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
420 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
420 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
421 self.controller_launcher.on_stop(self.stop_launchers)
421 self.controller_launcher.on_stop(self.stop_launchers)
422
422
423 def start_controller(self):
423 def start_controller(self):
424 self.controller_launcher.start(
424 self.controller_launcher.start(
425 profile_dir=self.profile_dir.location
425 profile_dir=self.profile_dir.location
426 )
426 )
427
427
428 def stop_controller(self):
428 def stop_controller(self):
429 # self.log.info("In stop_controller")
429 # self.log.info("In stop_controller")
430 if self.controller_launcher and self.controller_launcher.running:
430 if self.controller_launcher and self.controller_launcher.running:
431 return self.controller_launcher.stop()
431 return self.controller_launcher.stop()
432
432
433 def stop_launchers(self, r=None):
433 def stop_launchers(self, r=None):
434 if not self._stopping:
434 if not self._stopping:
435 self.stop_controller()
435 self.stop_controller()
436 super(IPClusterStart, self).stop_launchers()
436 super(IPClusterStart, self).stop_launchers()
437
437
438 def start(self):
438 def start(self):
439 """Start the app for the start subcommand."""
439 """Start the app for the start subcommand."""
440 # First see if the cluster is already running
440 # First see if the cluster is already running
441 try:
441 try:
442 pid = self.get_pid_from_file()
442 pid = self.get_pid_from_file()
443 except PIDFileError:
443 except PIDFileError:
444 pass
444 pass
445 else:
445 else:
446 if self.check_pid(pid):
446 if self.check_pid(pid):
447 self.log.critical(
447 self.log.critical(
448 'Cluster is already running with [pid=%s]. '
448 'Cluster is already running with [pid=%s]. '
449 'use "ipcluster stop" to stop the cluster.' % pid
449 'use "ipcluster stop" to stop the cluster.' % pid
450 )
450 )
451 # Here I exit with a unusual exit status that other processes
451 # Here I exit with a unusual exit status that other processes
452 # can watch for to learn how I existed.
452 # can watch for to learn how I existed.
453 self.exit(ALREADY_STARTED)
453 self.exit(ALREADY_STARTED)
454 else:
454 else:
455 self.remove_pid_file()
455 self.remove_pid_file()
456
456
457
457
458 # Now log and daemonize
458 # Now log and daemonize
459 self.log.info(
459 self.log.info(
460 'Starting ipcluster with [daemon=%r]' % self.daemonize
460 'Starting ipcluster with [daemon=%r]' % self.daemonize
461 )
461 )
462 # TODO: Get daemonize working on Windows or as a Windows Server.
462 # TODO: Get daemonize working on Windows or as a Windows Server.
463 if self.daemonize:
463 if self.daemonize:
464 if os.name=='posix':
464 if os.name=='posix':
465 from twisted.scripts._twistd_unix import daemonize
465 from twisted.scripts._twistd_unix import daemonize
466 daemonize()
466 daemonize()
467
467
468 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
468 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
469 dc.start()
469 dc.start()
470 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
470 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
471 dc.start()
471 dc.start()
472 # Now write the new pid file AFTER our new forked pid is active.
472 # Now write the new pid file AFTER our new forked pid is active.
473 self.write_pid_file()
473 self.write_pid_file()
474 try:
474 try:
475 self.loop.start()
475 self.loop.start()
476 except KeyboardInterrupt:
476 except KeyboardInterrupt:
477 pass
477 pass
478 except zmq.ZMQError as e:
478 except zmq.ZMQError as e:
479 if e.errno == errno.EINTR:
479 if e.errno == errno.EINTR:
480 pass
480 pass
481 else:
481 else:
482 raise
482 raise
483 finally:
483 finally:
484 self.remove_pid_file()
484 self.remove_pid_file()
485
485
486 base='IPython.parallel.apps.ipclusterapp.IPCluster'
486 base='IPython.parallel.apps.ipclusterapp.IPCluster'
487
487
488 class IPBaseParallelApplication(Application):
488 class IPBaseParallelApplication(Application):
489 name = u'ipcluster'
489 name = u'ipcluster'
490 description = _description
490 description = _description
491
491
492 subcommands = {'create' : (base+'Create', create_help),
492 subcommands = {'create' : (base+'Create', create_help),
493 'list' : (base+'List', list_help),
493 'list' : (base+'List', list_help),
494 'start' : (base+'Start', start_help),
494 'start' : (base+'Start', start_help),
495 'stop' : (base+'Stop', stop_help),
495 'stop' : (base+'Stop', stop_help),
496 'engines' : (base+'Engines', engines_help),
496 'engines' : (base+'Engines', engines_help),
497 }
497 }
498
498
499 # no aliases or flags for parent App
499 # no aliases or flags for parent App
500 aliases = Dict()
500 aliases = Dict()
501 flags = Dict()
501 flags = Dict()
502
502
503 def start(self):
503 def start(self):
504 if self.subapp is None:
504 if self.subapp is None:
505 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
505 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
506 print
506 print
507 self.print_subcommands()
507 self.print_subcommands()
508 self.exit(1)
508 self.exit(1)
509 else:
509 else:
510 return self.subapp.start()
510 return self.subapp.start()
511
511
512 def launch_new_instance():
512 def launch_new_instance():
513 """Create and run the IPython cluster."""
513 """Create and run the IPython cluster."""
514 app = IPBaseParallelApplication()
514 app = IPBaseParallelApplication()
515 app.initialize()
515 app.initialize()
516 app.start()
516 app.start()
517
517
518
518
519 if __name__ == '__main__':
519 if __name__ == '__main__':
520 launch_new_instance()
520 launch_new_instance()
521
521
@@ -1,399 +1,399 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import socket
21 import socket
22 import stat
22 import stat
23 import sys
23 import sys
24 import uuid
24 import uuid
25
25
26 from multiprocessing import Process
26 from multiprocessing import Process
27
27
28 import zmq
28 import zmq
29 from zmq.devices import ProcessMonitoredQueue
29 from zmq.devices import ProcessMonitoredQueue
30 from zmq.log.handlers import PUBHandler
30 from zmq.log.handlers import PUBHandler
31 from zmq.utils import jsonapi as json
31 from zmq.utils import jsonapi as json
32
32
33 from IPython.config.loader import Config
33 from IPython.config.loader import Config
34 from IPython.core.newapplication import ProfileDir
34 from IPython.core.newapplication import ProfileDir
35
35
36 from IPython.parallel.apps.clusterdir import (
36 from IPython.parallel.apps.baseapp import (
37 BaseParallelApplication,
37 BaseParallelApplication,
38 base_flags
38 base_flags
39 )
39 )
40 from IPython.utils.importstring import import_item
40 from IPython.utils.importstring import import_item
41 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
41 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
42
42
43 # from IPython.parallel.controller.controller import ControllerFactory
43 # from IPython.parallel.controller.controller import ControllerFactory
44 from IPython.parallel.streamsession import StreamSession
44 from IPython.parallel.streamsession import StreamSession
45 from IPython.parallel.controller.heartmonitor import HeartMonitor
45 from IPython.parallel.controller.heartmonitor import HeartMonitor
46 from IPython.parallel.controller.hub import HubFactory
46 from IPython.parallel.controller.hub import HubFactory
47 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
47 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
48 from IPython.parallel.controller.sqlitedb import SQLiteDB
48 from IPython.parallel.controller.sqlitedb import SQLiteDB
49
49
50 from IPython.parallel.util import signal_children, split_url
50 from IPython.parallel.util import signal_children, split_url
51
51
52 # conditional import of MongoDB backend class
52 # conditional import of MongoDB backend class
53
53
54 try:
54 try:
55 from IPython.parallel.controller.mongodb import MongoDB
55 from IPython.parallel.controller.mongodb import MongoDB
56 except ImportError:
56 except ImportError:
57 maybe_mongo = []
57 maybe_mongo = []
58 else:
58 else:
59 maybe_mongo = [MongoDB]
59 maybe_mongo = [MongoDB]
60
60
61
61
62 #-----------------------------------------------------------------------------
62 #-----------------------------------------------------------------------------
63 # Module level variables
63 # Module level variables
64 #-----------------------------------------------------------------------------
64 #-----------------------------------------------------------------------------
65
65
66
66
67 #: The default config file name for this application
67 #: The default config file name for this application
68 default_config_file_name = u'ipcontroller_config.py'
68 default_config_file_name = u'ipcontroller_config.py'
69
69
70
70
71 _description = """Start the IPython controller for parallel computing.
71 _description = """Start the IPython controller for parallel computing.
72
72
73 The IPython controller provides a gateway between the IPython engines and
73 The IPython controller provides a gateway between the IPython engines and
74 clients. The controller needs to be started before the engines and can be
74 clients. The controller needs to be started before the engines and can be
75 configured using command line options or using a cluster directory. Cluster
75 configured using command line options or using a cluster directory. Cluster
76 directories contain config, log and security files and are usually located in
76 directories contain config, log and security files and are usually located in
77 your ipython directory and named as "cluster_<profile>". See the `profile`
77 your ipython directory and named as "cluster_<profile>". See the `profile`
78 and `profile_dir` options for details.
78 and `profile_dir` options for details.
79 """
79 """
80
80
81
81
82
82
83
83
84 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
85 # The main application
85 # The main application
86 #-----------------------------------------------------------------------------
86 #-----------------------------------------------------------------------------
87 flags = {}
87 flags = {}
88 flags.update(base_flags)
88 flags.update(base_flags)
89 flags.update({
89 flags.update({
90 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
90 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
91 'Use threads instead of processes for the schedulers'),
91 'Use threads instead of processes for the schedulers'),
92 'sqlitedb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'})},
92 'sqlitedb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'})},
93 'use the SQLiteDB backend'),
93 'use the SQLiteDB backend'),
94 'mongodb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'})},
94 'mongodb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'})},
95 'use the MongoDB backend'),
95 'use the MongoDB backend'),
96 'dictdb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.dictdb.DictDB'})},
96 'dictdb' : ({'HubFactory' : Config({'db_class' : 'IPython.parallel.controller.dictdb.DictDB'})},
97 'use the in-memory DictDB backend'),
97 'use the in-memory DictDB backend'),
98 'reuse' : ({'IPControllerApp' : Config({'reuse_files' : True})},
98 'reuse' : ({'IPControllerApp' : Config({'reuse_files' : True})},
99 'reuse existing json connection files')
99 'reuse existing json connection files')
100 })
100 })
101
101
102 flags.update()
102 flags.update()
103
103
104 class IPControllerApp(BaseParallelApplication):
104 class IPControllerApp(BaseParallelApplication):
105
105
106 name = u'ipcontroller'
106 name = u'ipcontroller'
107 description = _description
107 description = _description
108 config_file_name = Unicode(default_config_file_name)
108 config_file_name = Unicode(default_config_file_name)
109 classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
109 classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
110
110
111 # change default to True
111 # change default to True
112 auto_create = Bool(True, config=True,
112 auto_create = Bool(True, config=True,
113 help="""Whether to create profile dir if it doesn't exist""")
113 help="""Whether to create profile dir if it doesn't exist""")
114
114
115 reuse_files = Bool(False, config=True,
115 reuse_files = Bool(False, config=True,
116 help='Whether to reuse existing json connection files [default: False]'
116 help='Whether to reuse existing json connection files [default: False]'
117 )
117 )
118 secure = Bool(True, config=True,
118 secure = Bool(True, config=True,
119 help='Whether to use exec_keys for extra authentication [default: True]'
119 help='Whether to use exec_keys for extra authentication [default: True]'
120 )
120 )
121 ssh_server = Unicode(u'', config=True,
121 ssh_server = Unicode(u'', config=True,
122 help="""ssh url for clients to use when connecting to the Controller
122 help="""ssh url for clients to use when connecting to the Controller
123 processes. It should be of the form: [user@]server[:port]. The
123 processes. It should be of the form: [user@]server[:port]. The
124 Controller\'s listening addresses must be accessible from the ssh server""",
124 Controller\'s listening addresses must be accessible from the ssh server""",
125 )
125 )
126 location = Unicode(u'', config=True,
126 location = Unicode(u'', config=True,
127 help="""The external IP or domain name of the Controller, used for disambiguating
127 help="""The external IP or domain name of the Controller, used for disambiguating
128 engine and client connections.""",
128 engine and client connections.""",
129 )
129 )
130 import_statements = List([], config=True,
130 import_statements = List([], config=True,
131 help="import statements to be run at startup. Necessary in some environments"
131 help="import statements to be run at startup. Necessary in some environments"
132 )
132 )
133
133
134 use_threads = Bool(False, config=True,
134 use_threads = Bool(False, config=True,
135 help='Use threads instead of processes for the schedulers',
135 help='Use threads instead of processes for the schedulers',
136 )
136 )
137
137
138 # internal
138 # internal
139 children = List()
139 children = List()
140 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
140 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
141
141
142 def _use_threads_changed(self, name, old, new):
142 def _use_threads_changed(self, name, old, new):
143 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
143 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
144
144
145 aliases = Dict(dict(
145 aliases = Dict(dict(
146 log_level = 'IPControllerApp.log_level',
146 log_level = 'IPControllerApp.log_level',
147 log_url = 'IPControllerApp.log_url',
147 log_url = 'IPControllerApp.log_url',
148 reuse_files = 'IPControllerApp.reuse_files',
148 reuse_files = 'IPControllerApp.reuse_files',
149 secure = 'IPControllerApp.secure',
149 secure = 'IPControllerApp.secure',
150 ssh = 'IPControllerApp.ssh_server',
150 ssh = 'IPControllerApp.ssh_server',
151 use_threads = 'IPControllerApp.use_threads',
151 use_threads = 'IPControllerApp.use_threads',
152 import_statements = 'IPControllerApp.import_statements',
152 import_statements = 'IPControllerApp.import_statements',
153 location = 'IPControllerApp.location',
153 location = 'IPControllerApp.location',
154
154
155 ident = 'StreamSession.session',
155 ident = 'StreamSession.session',
156 user = 'StreamSession.username',
156 user = 'StreamSession.username',
157 exec_key = 'StreamSession.keyfile',
157 exec_key = 'StreamSession.keyfile',
158
158
159 url = 'HubFactory.url',
159 url = 'HubFactory.url',
160 ip = 'HubFactory.ip',
160 ip = 'HubFactory.ip',
161 transport = 'HubFactory.transport',
161 transport = 'HubFactory.transport',
162 port = 'HubFactory.regport',
162 port = 'HubFactory.regport',
163
163
164 ping = 'HeartMonitor.period',
164 ping = 'HeartMonitor.period',
165
165
166 scheme = 'TaskScheduler.scheme_name',
166 scheme = 'TaskScheduler.scheme_name',
167 hwm = 'TaskScheduler.hwm',
167 hwm = 'TaskScheduler.hwm',
168
168
169
169
170 profile = "BaseIPythonApplication.profile",
170 profile = "BaseIPythonApplication.profile",
171 profile_dir = 'ProfileDir.location',
171 profile_dir = 'ProfileDir.location',
172
172
173 ))
173 ))
174 flags = Dict(flags)
174 flags = Dict(flags)
175
175
176
176
177 def save_connection_dict(self, fname, cdict):
177 def save_connection_dict(self, fname, cdict):
178 """save a connection dict to json file."""
178 """save a connection dict to json file."""
179 c = self.config
179 c = self.config
180 url = cdict['url']
180 url = cdict['url']
181 location = cdict['location']
181 location = cdict['location']
182 if not location:
182 if not location:
183 try:
183 try:
184 proto,ip,port = split_url(url)
184 proto,ip,port = split_url(url)
185 except AssertionError:
185 except AssertionError:
186 pass
186 pass
187 else:
187 else:
188 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
188 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
189 cdict['location'] = location
189 cdict['location'] = location
190 fname = os.path.join(self.profile_dir.security_dir, fname)
190 fname = os.path.join(self.profile_dir.security_dir, fname)
191 with open(fname, 'w') as f:
191 with open(fname, 'w') as f:
192 f.write(json.dumps(cdict, indent=2))
192 f.write(json.dumps(cdict, indent=2))
193 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
193 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
194
194
195 def load_config_from_json(self):
195 def load_config_from_json(self):
196 """load config from existing json connector files."""
196 """load config from existing json connector files."""
197 c = self.config
197 c = self.config
198 # load from engine config
198 # load from engine config
199 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
199 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
200 cfg = json.loads(f.read())
200 cfg = json.loads(f.read())
201 key = c.StreamSession.key = cfg['exec_key']
201 key = c.StreamSession.key = cfg['exec_key']
202 xport,addr = cfg['url'].split('://')
202 xport,addr = cfg['url'].split('://')
203 c.HubFactory.engine_transport = xport
203 c.HubFactory.engine_transport = xport
204 ip,ports = addr.split(':')
204 ip,ports = addr.split(':')
205 c.HubFactory.engine_ip = ip
205 c.HubFactory.engine_ip = ip
206 c.HubFactory.regport = int(ports)
206 c.HubFactory.regport = int(ports)
207 self.location = cfg['location']
207 self.location = cfg['location']
208
208
209 # load client config
209 # load client config
210 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
210 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
211 cfg = json.loads(f.read())
211 cfg = json.loads(f.read())
212 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
212 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
213 xport,addr = cfg['url'].split('://')
213 xport,addr = cfg['url'].split('://')
214 c.HubFactory.client_transport = xport
214 c.HubFactory.client_transport = xport
215 ip,ports = addr.split(':')
215 ip,ports = addr.split(':')
216 c.HubFactory.client_ip = ip
216 c.HubFactory.client_ip = ip
217 self.ssh_server = cfg['ssh']
217 self.ssh_server = cfg['ssh']
218 assert int(ports) == c.HubFactory.regport, "regport mismatch"
218 assert int(ports) == c.HubFactory.regport, "regport mismatch"
219
219
220 def init_hub(self):
220 def init_hub(self):
221 c = self.config
221 c = self.config
222
222
223 self.do_import_statements()
223 self.do_import_statements()
224 reusing = self.reuse_files
224 reusing = self.reuse_files
225 if reusing:
225 if reusing:
226 try:
226 try:
227 self.load_config_from_json()
227 self.load_config_from_json()
228 except (AssertionError,IOError):
228 except (AssertionError,IOError):
229 reusing=False
229 reusing=False
230 # check again, because reusing may have failed:
230 # check again, because reusing may have failed:
231 if reusing:
231 if reusing:
232 pass
232 pass
233 elif self.secure:
233 elif self.secure:
234 key = str(uuid.uuid4())
234 key = str(uuid.uuid4())
235 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
235 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
236 # with open(keyfile, 'w') as f:
236 # with open(keyfile, 'w') as f:
237 # f.write(key)
237 # f.write(key)
238 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
238 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
239 c.StreamSession.key = key
239 c.StreamSession.key = key
240 else:
240 else:
241 key = c.StreamSession.key = ''
241 key = c.StreamSession.key = ''
242
242
243 try:
243 try:
244 self.factory = HubFactory(config=c, log=self.log)
244 self.factory = HubFactory(config=c, log=self.log)
245 # self.start_logging()
245 # self.start_logging()
246 self.factory.init_hub()
246 self.factory.init_hub()
247 except:
247 except:
248 self.log.error("Couldn't construct the Controller", exc_info=True)
248 self.log.error("Couldn't construct the Controller", exc_info=True)
249 self.exit(1)
249 self.exit(1)
250
250
251 if not reusing:
251 if not reusing:
252 # save to new json config files
252 # save to new json config files
253 f = self.factory
253 f = self.factory
254 cdict = {'exec_key' : key,
254 cdict = {'exec_key' : key,
255 'ssh' : self.ssh_server,
255 'ssh' : self.ssh_server,
256 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
256 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
257 'location' : self.location
257 'location' : self.location
258 }
258 }
259 self.save_connection_dict('ipcontroller-client.json', cdict)
259 self.save_connection_dict('ipcontroller-client.json', cdict)
260 edict = cdict
260 edict = cdict
261 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
261 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
262 self.save_connection_dict('ipcontroller-engine.json', edict)
262 self.save_connection_dict('ipcontroller-engine.json', edict)
263
263
264 #
264 #
265 def init_schedulers(self):
265 def init_schedulers(self):
266 children = self.children
266 children = self.children
267 mq = import_item(str(self.mq_class))
267 mq = import_item(str(self.mq_class))
268
268
269 hub = self.factory
269 hub = self.factory
270 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
270 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
271 # IOPub relay (in a Process)
271 # IOPub relay (in a Process)
272 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
272 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
273 q.bind_in(hub.client_info['iopub'])
273 q.bind_in(hub.client_info['iopub'])
274 q.bind_out(hub.engine_info['iopub'])
274 q.bind_out(hub.engine_info['iopub'])
275 q.setsockopt_out(zmq.SUBSCRIBE, '')
275 q.setsockopt_out(zmq.SUBSCRIBE, '')
276 q.connect_mon(hub.monitor_url)
276 q.connect_mon(hub.monitor_url)
277 q.daemon=True
277 q.daemon=True
278 children.append(q)
278 children.append(q)
279
279
280 # Multiplexer Queue (in a Process)
280 # Multiplexer Queue (in a Process)
281 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
281 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
282 q.bind_in(hub.client_info['mux'])
282 q.bind_in(hub.client_info['mux'])
283 q.setsockopt_in(zmq.IDENTITY, 'mux')
283 q.setsockopt_in(zmq.IDENTITY, 'mux')
284 q.bind_out(hub.engine_info['mux'])
284 q.bind_out(hub.engine_info['mux'])
285 q.connect_mon(hub.monitor_url)
285 q.connect_mon(hub.monitor_url)
286 q.daemon=True
286 q.daemon=True
287 children.append(q)
287 children.append(q)
288
288
289 # Control Queue (in a Process)
289 # Control Queue (in a Process)
290 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
290 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
291 q.bind_in(hub.client_info['control'])
291 q.bind_in(hub.client_info['control'])
292 q.setsockopt_in(zmq.IDENTITY, 'control')
292 q.setsockopt_in(zmq.IDENTITY, 'control')
293 q.bind_out(hub.engine_info['control'])
293 q.bind_out(hub.engine_info['control'])
294 q.connect_mon(hub.monitor_url)
294 q.connect_mon(hub.monitor_url)
295 q.daemon=True
295 q.daemon=True
296 children.append(q)
296 children.append(q)
297 try:
297 try:
298 scheme = self.config.TaskScheduler.scheme_name
298 scheme = self.config.TaskScheduler.scheme_name
299 except AttributeError:
299 except AttributeError:
300 scheme = TaskScheduler.scheme_name.get_default_value()
300 scheme = TaskScheduler.scheme_name.get_default_value()
301 # Task Queue (in a Process)
301 # Task Queue (in a Process)
302 if scheme == 'pure':
302 if scheme == 'pure':
303 self.log.warn("task::using pure XREQ Task scheduler")
303 self.log.warn("task::using pure XREQ Task scheduler")
304 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
304 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
305 # q.setsockopt_out(zmq.HWM, hub.hwm)
305 # q.setsockopt_out(zmq.HWM, hub.hwm)
306 q.bind_in(hub.client_info['task'][1])
306 q.bind_in(hub.client_info['task'][1])
307 q.setsockopt_in(zmq.IDENTITY, 'task')
307 q.setsockopt_in(zmq.IDENTITY, 'task')
308 q.bind_out(hub.engine_info['task'])
308 q.bind_out(hub.engine_info['task'])
309 q.connect_mon(hub.monitor_url)
309 q.connect_mon(hub.monitor_url)
310 q.daemon=True
310 q.daemon=True
311 children.append(q)
311 children.append(q)
312 elif scheme == 'none':
312 elif scheme == 'none':
313 self.log.warn("task::using no Task scheduler")
313 self.log.warn("task::using no Task scheduler")
314
314
315 else:
315 else:
316 self.log.info("task::using Python %s Task scheduler"%scheme)
316 self.log.info("task::using Python %s Task scheduler"%scheme)
317 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
317 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
318 hub.monitor_url, hub.client_info['notification'])
318 hub.monitor_url, hub.client_info['notification'])
319 kwargs = dict(logname='scheduler', loglevel=self.log_level,
319 kwargs = dict(logname='scheduler', loglevel=self.log_level,
320 log_url = self.log_url, config=dict(self.config))
320 log_url = self.log_url, config=dict(self.config))
321 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
321 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
322 q.daemon=True
322 q.daemon=True
323 children.append(q)
323 children.append(q)
324
324
325
325
326 def save_urls(self):
326 def save_urls(self):
327 """save the registration urls to files."""
327 """save the registration urls to files."""
328 c = self.config
328 c = self.config
329
329
330 sec_dir = self.profile_dir.security_dir
330 sec_dir = self.profile_dir.security_dir
331 cf = self.factory
331 cf = self.factory
332
332
333 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
333 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
334 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
334 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
335
335
336 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
336 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
337 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
337 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
338
338
339
339
340 def do_import_statements(self):
340 def do_import_statements(self):
341 statements = self.import_statements
341 statements = self.import_statements
342 for s in statements:
342 for s in statements:
343 try:
343 try:
344 self.log.msg("Executing statement: '%s'" % s)
344 self.log.msg("Executing statement: '%s'" % s)
345 exec s in globals(), locals()
345 exec s in globals(), locals()
346 except:
346 except:
347 self.log.msg("Error running statement: %s" % s)
347 self.log.msg("Error running statement: %s" % s)
348
348
349 def forward_logging(self):
349 def forward_logging(self):
350 if self.log_url:
350 if self.log_url:
351 self.log.info("Forwarding logging to %s"%self.log_url)
351 self.log.info("Forwarding logging to %s"%self.log_url)
352 context = zmq.Context.instance()
352 context = zmq.Context.instance()
353 lsock = context.socket(zmq.PUB)
353 lsock = context.socket(zmq.PUB)
354 lsock.connect(self.log_url)
354 lsock.connect(self.log_url)
355 handler = PUBHandler(lsock)
355 handler = PUBHandler(lsock)
356 self.log.removeHandler(self._log_handler)
356 self.log.removeHandler(self._log_handler)
357 handler.root_topic = 'controller'
357 handler.root_topic = 'controller'
358 handler.setLevel(self.log_level)
358 handler.setLevel(self.log_level)
359 self.log.addHandler(handler)
359 self.log.addHandler(handler)
360 self._log_handler = handler
360 self._log_handler = handler
361 # #
361 # #
362
362
363 def initialize(self, argv=None):
363 def initialize(self, argv=None):
364 super(IPControllerApp, self).initialize(argv)
364 super(IPControllerApp, self).initialize(argv)
365 self.forward_logging()
365 self.forward_logging()
366 self.init_hub()
366 self.init_hub()
367 self.init_schedulers()
367 self.init_schedulers()
368
368
369 def start(self):
369 def start(self):
370 # Start the subprocesses:
370 # Start the subprocesses:
371 self.factory.start()
371 self.factory.start()
372 child_procs = []
372 child_procs = []
373 for child in self.children:
373 for child in self.children:
374 child.start()
374 child.start()
375 if isinstance(child, ProcessMonitoredQueue):
375 if isinstance(child, ProcessMonitoredQueue):
376 child_procs.append(child.launcher)
376 child_procs.append(child.launcher)
377 elif isinstance(child, Process):
377 elif isinstance(child, Process):
378 child_procs.append(child)
378 child_procs.append(child)
379 if child_procs:
379 if child_procs:
380 signal_children(child_procs)
380 signal_children(child_procs)
381
381
382 self.write_pid_file(overwrite=True)
382 self.write_pid_file(overwrite=True)
383
383
384 try:
384 try:
385 self.factory.loop.start()
385 self.factory.loop.start()
386 except KeyboardInterrupt:
386 except KeyboardInterrupt:
387 self.log.critical("Interrupted, Exiting...\n")
387 self.log.critical("Interrupted, Exiting...\n")
388
388
389
389
390
390
391 def launch_new_instance():
391 def launch_new_instance():
392 """Create and run the IPython controller"""
392 """Create and run the IPython controller"""
393 app = IPControllerApp()
393 app = IPControllerApp()
394 app.initialize()
394 app.initialize()
395 app.start()
395 app.start()
396
396
397
397
398 if __name__ == '__main__':
398 if __name__ == '__main__':
399 launch_new_instance()
399 launch_new_instance()
@@ -1,272 +1,270 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import json
18 import json
19 import os
19 import os
20 import sys
20 import sys
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 from IPython.core.newapplication import ProfileDir
25 from IPython.core.newapplication import ProfileDir
26 from IPython.parallel.apps.clusterdir import (
26 from IPython.parallel.apps.baseapp import BaseParallelApplication
27 BaseParallelApplication,
28 )
29 from IPython.zmq.log import EnginePUBHandler
27 from IPython.zmq.log import EnginePUBHandler
30
28
31 from IPython.config.configurable import Configurable
29 from IPython.config.configurable import Configurable
32 from IPython.parallel.streamsession import StreamSession
30 from IPython.parallel.streamsession import StreamSession
33 from IPython.parallel.engine.engine import EngineFactory
31 from IPython.parallel.engine.engine import EngineFactory
34 from IPython.parallel.engine.streamkernel import Kernel
32 from IPython.parallel.engine.streamkernel import Kernel
35 from IPython.parallel.util import disambiguate_url
33 from IPython.parallel.util import disambiguate_url
36
34
37 from IPython.utils.importstring import import_item
35 from IPython.utils.importstring import import_item
38 from IPython.utils.traitlets import Bool, Unicode, Dict, List
36 from IPython.utils.traitlets import Bool, Unicode, Dict, List
39
37
40
38
41 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
42 # Module level variables
40 # Module level variables
43 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
44
42
45 #: The default config file name for this application
43 #: The default config file name for this application
46 default_config_file_name = u'ipengine_config.py'
44 default_config_file_name = u'ipengine_config.py'
47
45
48 _description = """Start an IPython engine for parallel computing.
46 _description = """Start an IPython engine for parallel computing.
49
47
50 IPython engines run in parallel and perform computations on behalf of a client
48 IPython engines run in parallel and perform computations on behalf of a client
51 and controller. A controller needs to be started before the engines. The
49 and controller. A controller needs to be started before the engines. The
52 engine can be configured using command line options or using a cluster
50 engine can be configured using command line options or using a cluster
53 directory. Cluster directories contain config, log and security files and are
51 directory. Cluster directories contain config, log and security files and are
54 usually located in your ipython directory and named as "cluster_<profile>".
52 usually located in your ipython directory and named as "cluster_<profile>".
55 See the `profile` and `profile_dir` options for details.
53 See the `profile` and `profile_dir` options for details.
56 """
54 """
57
55
58
56
59 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
60 # MPI configuration
58 # MPI configuration
61 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
62
60
63 mpi4py_init = """from mpi4py import MPI as mpi
61 mpi4py_init = """from mpi4py import MPI as mpi
64 mpi.size = mpi.COMM_WORLD.Get_size()
62 mpi.size = mpi.COMM_WORLD.Get_size()
65 mpi.rank = mpi.COMM_WORLD.Get_rank()
63 mpi.rank = mpi.COMM_WORLD.Get_rank()
66 """
64 """
67
65
68
66
69 pytrilinos_init = """from PyTrilinos import Epetra
67 pytrilinos_init = """from PyTrilinos import Epetra
70 class SimpleStruct:
68 class SimpleStruct:
71 pass
69 pass
72 mpi = SimpleStruct()
70 mpi = SimpleStruct()
73 mpi.rank = 0
71 mpi.rank = 0
74 mpi.size = 0
72 mpi.size = 0
75 """
73 """
76
74
77 class MPI(Configurable):
75 class MPI(Configurable):
78 """Configurable for MPI initialization"""
76 """Configurable for MPI initialization"""
79 use = Unicode('', config=True,
77 use = Unicode('', config=True,
80 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
78 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
81 )
79 )
82
80
83 def _on_use_changed(self, old, new):
81 def _on_use_changed(self, old, new):
84 # load default init script if it's not set
82 # load default init script if it's not set
85 if not self.init_script:
83 if not self.init_script:
86 self.init_script = self.default_inits.get(new, '')
84 self.init_script = self.default_inits.get(new, '')
87
85
88 init_script = Unicode('', config=True,
86 init_script = Unicode('', config=True,
89 help="Initialization code for MPI")
87 help="Initialization code for MPI")
90
88
91 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
89 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
92 config=True)
90 config=True)
93
91
94
92
95 #-----------------------------------------------------------------------------
93 #-----------------------------------------------------------------------------
96 # Main application
94 # Main application
97 #-----------------------------------------------------------------------------
95 #-----------------------------------------------------------------------------
98
96
99
97
100 class IPEngineApp(BaseParallelApplication):
98 class IPEngineApp(BaseParallelApplication):
101
99
102 app_name = Unicode(u'ipengine')
100 app_name = Unicode(u'ipengine')
103 description = Unicode(_description)
101 description = Unicode(_description)
104 config_file_name = Unicode(default_config_file_name)
102 config_file_name = Unicode(default_config_file_name)
105 classes = List([ProfileDir, StreamSession, EngineFactory, Kernel, MPI])
103 classes = List([ProfileDir, StreamSession, EngineFactory, Kernel, MPI])
106
104
107 startup_script = Unicode(u'', config=True,
105 startup_script = Unicode(u'', config=True,
108 help='specify a script to be run at startup')
106 help='specify a script to be run at startup')
109 startup_command = Unicode('', config=True,
107 startup_command = Unicode('', config=True,
110 help='specify a command to be run at startup')
108 help='specify a command to be run at startup')
111
109
112 url_file = Unicode(u'', config=True,
110 url_file = Unicode(u'', config=True,
113 help="""The full location of the file containing the connection information for
111 help="""The full location of the file containing the connection information for
114 the controller. If this is not given, the file must be in the
112 the controller. If this is not given, the file must be in the
115 security directory of the cluster directory. This location is
113 security directory of the cluster directory. This location is
116 resolved using the `profile` or `profile_dir` options.""",
114 resolved using the `profile` or `profile_dir` options.""",
117 )
115 )
118
116
119 url_file_name = Unicode(u'ipcontroller-engine.json')
117 url_file_name = Unicode(u'ipcontroller-engine.json')
120 log_url = Unicode('', config=True,
118 log_url = Unicode('', config=True,
121 help="""The URL for the iploggerapp instance, for forwarding
119 help="""The URL for the iploggerapp instance, for forwarding
122 logging to a central location.""")
120 logging to a central location.""")
123
121
124 aliases = Dict(dict(
122 aliases = Dict(dict(
125 file = 'IPEngineApp.url_file',
123 file = 'IPEngineApp.url_file',
126 c = 'IPEngineApp.startup_command',
124 c = 'IPEngineApp.startup_command',
127 s = 'IPEngineApp.startup_script',
125 s = 'IPEngineApp.startup_script',
128
126
129 ident = 'StreamSession.session',
127 ident = 'StreamSession.session',
130 user = 'StreamSession.username',
128 user = 'StreamSession.username',
131 exec_key = 'StreamSession.keyfile',
129 exec_key = 'StreamSession.keyfile',
132
130
133 url = 'EngineFactory.url',
131 url = 'EngineFactory.url',
134 ip = 'EngineFactory.ip',
132 ip = 'EngineFactory.ip',
135 transport = 'EngineFactory.transport',
133 transport = 'EngineFactory.transport',
136 port = 'EngineFactory.regport',
134 port = 'EngineFactory.regport',
137 location = 'EngineFactory.location',
135 location = 'EngineFactory.location',
138
136
139 timeout = 'EngineFactory.timeout',
137 timeout = 'EngineFactory.timeout',
140
138
141 profile = "IPEngineApp.profile",
139 profile = "IPEngineApp.profile",
142 profile_dir = 'ProfileDir.location',
140 profile_dir = 'ProfileDir.location',
143
141
144 mpi = 'MPI.use',
142 mpi = 'MPI.use',
145
143
146 log_level = 'IPEngineApp.log_level',
144 log_level = 'IPEngineApp.log_level',
147 log_url = 'IPEngineApp.log_url'
145 log_url = 'IPEngineApp.log_url'
148 ))
146 ))
149
147
150 # def find_key_file(self):
148 # def find_key_file(self):
151 # """Set the key file.
149 # """Set the key file.
152 #
150 #
153 # Here we don't try to actually see if it exists for is valid as that
151 # Here we don't try to actually see if it exists for is valid as that
154 # is hadled by the connection logic.
152 # is hadled by the connection logic.
155 # """
153 # """
156 # config = self.master_config
154 # config = self.master_config
157 # # Find the actual controller key file
155 # # Find the actual controller key file
158 # if not config.Global.key_file:
156 # if not config.Global.key_file:
159 # try_this = os.path.join(
157 # try_this = os.path.join(
160 # config.Global.profile_dir,
158 # config.Global.profile_dir,
161 # config.Global.security_dir,
159 # config.Global.security_dir,
162 # config.Global.key_file_name
160 # config.Global.key_file_name
163 # )
161 # )
164 # config.Global.key_file = try_this
162 # config.Global.key_file = try_this
165
163
166 def find_url_file(self):
164 def find_url_file(self):
167 """Set the key file.
165 """Set the key file.
168
166
169 Here we don't try to actually see if it exists for is valid as that
167 Here we don't try to actually see if it exists for is valid as that
170 is hadled by the connection logic.
168 is hadled by the connection logic.
171 """
169 """
172 config = self.config
170 config = self.config
173 # Find the actual controller key file
171 # Find the actual controller key file
174 if not self.url_file:
172 if not self.url_file:
175 self.url_file = os.path.join(
173 self.url_file = os.path.join(
176 self.profile_dir.security_dir,
174 self.profile_dir.security_dir,
177 self.url_file_name
175 self.url_file_name
178 )
176 )
179 def init_engine(self):
177 def init_engine(self):
180 # This is the working dir by now.
178 # This is the working dir by now.
181 sys.path.insert(0, '')
179 sys.path.insert(0, '')
182 config = self.config
180 config = self.config
183 # print config
181 # print config
184 self.find_url_file()
182 self.find_url_file()
185
183
186 # if os.path.exists(config.Global.key_file) and config.Global.secure:
184 # if os.path.exists(config.Global.key_file) and config.Global.secure:
187 # config.SessionFactory.exec_key = config.Global.key_file
185 # config.SessionFactory.exec_key = config.Global.key_file
188 if os.path.exists(self.url_file):
186 if os.path.exists(self.url_file):
189 with open(self.url_file) as f:
187 with open(self.url_file) as f:
190 d = json.loads(f.read())
188 d = json.loads(f.read())
191 for k,v in d.iteritems():
189 for k,v in d.iteritems():
192 if isinstance(v, unicode):
190 if isinstance(v, unicode):
193 d[k] = v.encode()
191 d[k] = v.encode()
194 if d['exec_key']:
192 if d['exec_key']:
195 config.StreamSession.key = d['exec_key']
193 config.StreamSession.key = d['exec_key']
196 d['url'] = disambiguate_url(d['url'], d['location'])
194 d['url'] = disambiguate_url(d['url'], d['location'])
197 config.EngineFactory.url = d['url']
195 config.EngineFactory.url = d['url']
198 config.EngineFactory.location = d['location']
196 config.EngineFactory.location = d['location']
199
197
200 try:
198 try:
201 exec_lines = config.Kernel.exec_lines
199 exec_lines = config.Kernel.exec_lines
202 except AttributeError:
200 except AttributeError:
203 config.Kernel.exec_lines = []
201 config.Kernel.exec_lines = []
204 exec_lines = config.Kernel.exec_lines
202 exec_lines = config.Kernel.exec_lines
205
203
206 if self.startup_script:
204 if self.startup_script:
207 enc = sys.getfilesystemencoding() or 'utf8'
205 enc = sys.getfilesystemencoding() or 'utf8'
208 cmd="execfile(%r)"%self.startup_script.encode(enc)
206 cmd="execfile(%r)"%self.startup_script.encode(enc)
209 exec_lines.append(cmd)
207 exec_lines.append(cmd)
210 if self.startup_command:
208 if self.startup_command:
211 exec_lines.append(self.startup_command)
209 exec_lines.append(self.startup_command)
212
210
213 # Create the underlying shell class and Engine
211 # Create the underlying shell class and Engine
214 # shell_class = import_item(self.master_config.Global.shell_class)
212 # shell_class = import_item(self.master_config.Global.shell_class)
215 # print self.config
213 # print self.config
216 try:
214 try:
217 self.engine = EngineFactory(config=config, log=self.log)
215 self.engine = EngineFactory(config=config, log=self.log)
218 except:
216 except:
219 self.log.error("Couldn't start the Engine", exc_info=True)
217 self.log.error("Couldn't start the Engine", exc_info=True)
220 self.exit(1)
218 self.exit(1)
221
219
222 def forward_logging(self):
220 def forward_logging(self):
223 if self.log_url:
221 if self.log_url:
224 self.log.info("Forwarding logging to %s"%self.log_url)
222 self.log.info("Forwarding logging to %s"%self.log_url)
225 context = self.engine.context
223 context = self.engine.context
226 lsock = context.socket(zmq.PUB)
224 lsock = context.socket(zmq.PUB)
227 lsock.connect(self.log_url)
225 lsock.connect(self.log_url)
228 self.log.removeHandler(self._log_handler)
226 self.log.removeHandler(self._log_handler)
229 handler = EnginePUBHandler(self.engine, lsock)
227 handler = EnginePUBHandler(self.engine, lsock)
230 handler.setLevel(self.log_level)
228 handler.setLevel(self.log_level)
231 self.log.addHandler(handler)
229 self.log.addHandler(handler)
232 self._log_handler = handler
230 self._log_handler = handler
233 #
231 #
234 def init_mpi(self):
232 def init_mpi(self):
235 global mpi
233 global mpi
236 self.mpi = MPI(config=self.config)
234 self.mpi = MPI(config=self.config)
237
235
238 mpi_import_statement = self.mpi.init_script
236 mpi_import_statement = self.mpi.init_script
239 if mpi_import_statement:
237 if mpi_import_statement:
240 try:
238 try:
241 self.log.info("Initializing MPI:")
239 self.log.info("Initializing MPI:")
242 self.log.info(mpi_import_statement)
240 self.log.info(mpi_import_statement)
243 exec mpi_import_statement in globals()
241 exec mpi_import_statement in globals()
244 except:
242 except:
245 mpi = None
243 mpi = None
246 else:
244 else:
247 mpi = None
245 mpi = None
248
246
249 def initialize(self, argv=None):
247 def initialize(self, argv=None):
250 super(IPEngineApp, self).initialize(argv)
248 super(IPEngineApp, self).initialize(argv)
251 self.init_mpi()
249 self.init_mpi()
252 self.init_engine()
250 self.init_engine()
253 self.forward_logging()
251 self.forward_logging()
254
252
255 def start(self):
253 def start(self):
256 self.engine.start()
254 self.engine.start()
257 try:
255 try:
258 self.engine.loop.start()
256 self.engine.loop.start()
259 except KeyboardInterrupt:
257 except KeyboardInterrupt:
260 self.log.critical("Engine Interrupted, shutting down...\n")
258 self.log.critical("Engine Interrupted, shutting down...\n")
261
259
262
260
263 def launch_new_instance():
261 def launch_new_instance():
264 """Create and run the IPython engine"""
262 """Create and run the IPython engine"""
265 app = IPEngineApp()
263 app = IPEngineApp()
266 app.initialize()
264 app.initialize()
267 app.start()
265 app.start()
268
266
269
267
270 if __name__ == '__main__':
268 if __name__ == '__main__':
271 launch_new_instance()
269 launch_new_instance()
272
270
@@ -1,96 +1,96 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 A simple IPython logger application
4 A simple IPython logger application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
8 # Copyright (C) 2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import sys
19 import sys
20
20
21 import zmq
21 import zmq
22
22
23 from IPython.core.newapplication import ProfileDir
23 from IPython.core.newapplication import ProfileDir
24 from IPython.utils.traitlets import Bool, Dict, Unicode
24 from IPython.utils.traitlets import Bool, Dict, Unicode
25
25
26 from IPython.parallel.apps.clusterdir import (
26 from IPython.parallel.apps.baseapp import (
27 BaseParallelApplication,
27 BaseParallelApplication,
28 base_aliases
28 base_aliases
29 )
29 )
30 from IPython.parallel.apps.logwatcher import LogWatcher
30 from IPython.parallel.apps.logwatcher import LogWatcher
31
31
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33 # Module level variables
33 # Module level variables
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35
35
36 #: The default config file name for this application
36 #: The default config file name for this application
37 default_config_file_name = u'iplogger_config.py'
37 default_config_file_name = u'iplogger_config.py'
38
38
39 _description = """Start an IPython logger for parallel computing.
39 _description = """Start an IPython logger for parallel computing.
40
40
41 IPython controllers and engines (and your own processes) can broadcast log messages
41 IPython controllers and engines (and your own processes) can broadcast log messages
42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
42 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
43 logger can be configured using command line options or using a cluster
43 logger can be configured using command line options or using a cluster
44 directory. Cluster directories contain config, log and security files and are
44 directory. Cluster directories contain config, log and security files and are
45 usually located in your ipython directory and named as "cluster_<profile>".
45 usually located in your ipython directory and named as "cluster_<profile>".
46 See the `profile` and `profile_dir` options for details.
46 See the `profile` and `profile_dir` options for details.
47 """
47 """
48
48
49
49
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51 # Main application
51 # Main application
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 aliases = {}
53 aliases = {}
54 aliases.update(base_aliases)
54 aliases.update(base_aliases)
55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
55 aliases.update(dict(url='LogWatcher.url', topics='LogWatcher.topics'))
56
56
57 class IPLoggerApp(BaseParallelApplication):
57 class IPLoggerApp(BaseParallelApplication):
58
58
59 name = u'iploggerz'
59 name = u'iploggerz'
60 description = _description
60 description = _description
61 config_file_name = Unicode(default_config_file_name)
61 config_file_name = Unicode(default_config_file_name)
62
62
63 classes = [LogWatcher, ProfileDir]
63 classes = [LogWatcher, ProfileDir]
64 aliases = Dict(aliases)
64 aliases = Dict(aliases)
65
65
66 def initialize(self, argv=None):
66 def initialize(self, argv=None):
67 super(IPLoggerApp, self).initialize(argv)
67 super(IPLoggerApp, self).initialize(argv)
68 self.init_watcher()
68 self.init_watcher()
69
69
70 def init_watcher(self):
70 def init_watcher(self):
71 try:
71 try:
72 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
72 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
73 except:
73 except:
74 self.log.error("Couldn't start the LogWatcher", exc_info=True)
74 self.log.error("Couldn't start the LogWatcher", exc_info=True)
75 self.exit(1)
75 self.exit(1)
76 self.log.info("Listening for log messages on %r"%self.watcher.url)
76 self.log.info("Listening for log messages on %r"%self.watcher.url)
77
77
78
78
79 def start(self):
79 def start(self):
80 self.watcher.start()
80 self.watcher.start()
81 try:
81 try:
82 self.watcher.loop.start()
82 self.watcher.loop.start()
83 except KeyboardInterrupt:
83 except KeyboardInterrupt:
84 self.log.critical("Logging Interrupted, shutting down...\n")
84 self.log.critical("Logging Interrupted, shutting down...\n")
85
85
86
86
87 def launch_new_instance():
87 def launch_new_instance():
88 """Create and run the IPython LogWatcher"""
88 """Create and run the IPython LogWatcher"""
89 app = IPLoggerApp()
89 app = IPLoggerApp()
90 app.initialize()
90 app.initialize()
91 app.start()
91 app.start()
92
92
93
93
94 if __name__ == '__main__':
94 if __name__ == '__main__':
95 launch_new_instance()
95 launch_new_instance()
96
96
General Comments 0
You need to be logged in to leave comments. Login now