##// END OF EJS Templates
fix residual import issues with IPython.parallel reorganization
MinRK -
Show More
@@ -1,592 +1,593 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import errno
18 import errno
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import signal
22 import signal
23
23
24 import zmq
24 import zmq
25 from zmq.eventloop import ioloop
25 from zmq.eventloop import ioloop
26
26
27 from IPython.external.argparse import ArgumentParser, SUPPRESS
27 from IPython.external.argparse import ArgumentParser, SUPPRESS
28 from IPython.utils.importstring import import_item
28 from IPython.utils.importstring import import_item
29 from .clusterdir import (
29
30 from IPython.parallel.apps.clusterdir import (
30 ApplicationWithClusterDir, ClusterDirConfigLoader,
31 ApplicationWithClusterDir, ClusterDirConfigLoader,
31 ClusterDirError, PIDFileError
32 ClusterDirError, PIDFileError
32 )
33 )
33
34
34
35
35 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
36 # Module level variables
37 # Module level variables
37 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
38
39
39
40
40 default_config_file_name = u'ipcluster_config.py'
41 default_config_file_name = u'ipcluster_config.py'
41
42
42
43
43 _description = """\
44 _description = """\
44 Start an IPython cluster for parallel computing.\n\n
45 Start an IPython cluster for parallel computing.\n\n
45
46
46 An IPython cluster consists of 1 controller and 1 or more engines.
47 An IPython cluster consists of 1 controller and 1 or more engines.
47 This command automates the startup of these processes using a wide
48 This command automates the startup of these processes using a wide
48 range of startup methods (SSH, local processes, PBS, mpiexec,
49 range of startup methods (SSH, local processes, PBS, mpiexec,
49 Windows HPC Server 2008). To start a cluster with 4 engines on your
50 Windows HPC Server 2008). To start a cluster with 4 engines on your
50 local host simply do 'ipcluster start -n 4'. For more complex usage
51 local host simply do 'ipcluster start -n 4'. For more complex usage
51 you will typically do 'ipcluster create -p mycluster', then edit
52 you will typically do 'ipcluster create -p mycluster', then edit
52 configuration files, followed by 'ipcluster start -p mycluster -n 4'.
53 configuration files, followed by 'ipcluster start -p mycluster -n 4'.
53 """
54 """
54
55
55
56
56 # Exit codes for ipcluster
57 # Exit codes for ipcluster
57
58
58 # This will be the exit code if the ipcluster appears to be running because
59 # This will be the exit code if the ipcluster appears to be running because
59 # a .pid file exists
60 # a .pid file exists
60 ALREADY_STARTED = 10
61 ALREADY_STARTED = 10
61
62
62
63
63 # This will be the exit code if ipcluster stop is run, but there is not .pid
64 # This will be the exit code if ipcluster stop is run, but there is not .pid
64 # file to be found.
65 # file to be found.
65 ALREADY_STOPPED = 11
66 ALREADY_STOPPED = 11
66
67
67 # This will be the exit code if ipcluster engines is run, but there is not .pid
68 # This will be the exit code if ipcluster engines is run, but there is not .pid
68 # file to be found.
69 # file to be found.
69 NO_CLUSTER = 12
70 NO_CLUSTER = 12
70
71
71
72
72 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
73 # Command line options
74 # Command line options
74 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
75
76
76
77
77 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
78 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
78
79
79 def _add_arguments(self):
80 def _add_arguments(self):
80 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
81 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
81 # its defaults on self.parser. Instead, we will put those on
82 # its defaults on self.parser. Instead, we will put those on
82 # default options on our subparsers.
83 # default options on our subparsers.
83
84
84 # This has all the common options that all subcommands use
85 # This has all the common options that all subcommands use
85 parent_parser1 = ArgumentParser(
86 parent_parser1 = ArgumentParser(
86 add_help=False,
87 add_help=False,
87 argument_default=SUPPRESS
88 argument_default=SUPPRESS
88 )
89 )
89 self._add_ipython_dir(parent_parser1)
90 self._add_ipython_dir(parent_parser1)
90 self._add_log_level(parent_parser1)
91 self._add_log_level(parent_parser1)
91
92
92 # This has all the common options that other subcommands use
93 # This has all the common options that other subcommands use
93 parent_parser2 = ArgumentParser(
94 parent_parser2 = ArgumentParser(
94 add_help=False,
95 add_help=False,
95 argument_default=SUPPRESS
96 argument_default=SUPPRESS
96 )
97 )
97 self._add_cluster_profile(parent_parser2)
98 self._add_cluster_profile(parent_parser2)
98 self._add_cluster_dir(parent_parser2)
99 self._add_cluster_dir(parent_parser2)
99 self._add_work_dir(parent_parser2)
100 self._add_work_dir(parent_parser2)
100 paa = parent_parser2.add_argument
101 paa = parent_parser2.add_argument
101 paa('--log-to-file',
102 paa('--log-to-file',
102 action='store_true', dest='Global.log_to_file',
103 action='store_true', dest='Global.log_to_file',
103 help='Log to a file in the log directory (default is stdout)')
104 help='Log to a file in the log directory (default is stdout)')
104
105
105 # Create the object used to create the subparsers.
106 # Create the object used to create the subparsers.
106 subparsers = self.parser.add_subparsers(
107 subparsers = self.parser.add_subparsers(
107 dest='Global.subcommand',
108 dest='Global.subcommand',
108 title='ipcluster subcommands',
109 title='ipcluster subcommands',
109 description=
110 description=
110 """ipcluster has a variety of subcommands. The general way of
111 """ipcluster has a variety of subcommands. The general way of
111 running ipcluster is 'ipcluster <cmd> [options]'. To get help
112 running ipcluster is 'ipcluster <cmd> [options]'. To get help
112 on a particular subcommand do 'ipcluster <cmd> -h'."""
113 on a particular subcommand do 'ipcluster <cmd> -h'."""
113 # help="For more help, type 'ipcluster <cmd> -h'",
114 # help="For more help, type 'ipcluster <cmd> -h'",
114 )
115 )
115
116
116 # The "list" subcommand parser
117 # The "list" subcommand parser
117 parser_list = subparsers.add_parser(
118 parser_list = subparsers.add_parser(
118 'list',
119 'list',
119 parents=[parent_parser1],
120 parents=[parent_parser1],
120 argument_default=SUPPRESS,
121 argument_default=SUPPRESS,
121 help="List all clusters in cwd and ipython_dir.",
122 help="List all clusters in cwd and ipython_dir.",
122 description=
123 description=
123 """List all available clusters, by cluster directory, that can
124 """List all available clusters, by cluster directory, that can
124 be found in the current working directly or in the ipython
125 be found in the current working directly or in the ipython
125 directory. Cluster directories are named using the convention
126 directory. Cluster directories are named using the convention
126 'cluster_<profile>'."""
127 'cluster_<profile>'."""
127 )
128 )
128
129
129 # The "create" subcommand parser
130 # The "create" subcommand parser
130 parser_create = subparsers.add_parser(
131 parser_create = subparsers.add_parser(
131 'create',
132 'create',
132 parents=[parent_parser1, parent_parser2],
133 parents=[parent_parser1, parent_parser2],
133 argument_default=SUPPRESS,
134 argument_default=SUPPRESS,
134 help="Create a new cluster directory.",
135 help="Create a new cluster directory.",
135 description=
136 description=
136 """Create an ipython cluster directory by its profile name or
137 """Create an ipython cluster directory by its profile name or
137 cluster directory path. Cluster directories contain
138 cluster directory path. Cluster directories contain
138 configuration, log and security related files and are named
139 configuration, log and security related files and are named
139 using the convention 'cluster_<profile>'. By default they are
140 using the convention 'cluster_<profile>'. By default they are
140 located in your ipython directory. Once created, you will
141 located in your ipython directory. Once created, you will
141 probably need to edit the configuration files in the cluster
142 probably need to edit the configuration files in the cluster
142 directory to configure your cluster. Most users will create a
143 directory to configure your cluster. Most users will create a
143 cluster directory by profile name,
144 cluster directory by profile name,
144 'ipcluster create -p mycluster', which will put the directory
145 'ipcluster create -p mycluster', which will put the directory
145 in '<ipython_dir>/cluster_mycluster'.
146 in '<ipython_dir>/cluster_mycluster'.
146 """
147 """
147 )
148 )
148 paa = parser_create.add_argument
149 paa = parser_create.add_argument
149 paa('--reset-config',
150 paa('--reset-config',
150 dest='Global.reset_config', action='store_true',
151 dest='Global.reset_config', action='store_true',
151 help=
152 help=
152 """Recopy the default config files to the cluster directory.
153 """Recopy the default config files to the cluster directory.
153 You will loose any modifications you have made to these files.""")
154 You will loose any modifications you have made to these files.""")
154
155
155 # The "start" subcommand parser
156 # The "start" subcommand parser
156 parser_start = subparsers.add_parser(
157 parser_start = subparsers.add_parser(
157 'start',
158 'start',
158 parents=[parent_parser1, parent_parser2],
159 parents=[parent_parser1, parent_parser2],
159 argument_default=SUPPRESS,
160 argument_default=SUPPRESS,
160 help="Start a cluster.",
161 help="Start a cluster.",
161 description=
162 description=
162 """Start an ipython cluster by its profile name or cluster
163 """Start an ipython cluster by its profile name or cluster
163 directory. Cluster directories contain configuration, log and
164 directory. Cluster directories contain configuration, log and
164 security related files and are named using the convention
165 security related files and are named using the convention
165 'cluster_<profile>' and should be creating using the 'start'
166 'cluster_<profile>' and should be creating using the 'start'
166 subcommand of 'ipcluster'. If your cluster directory is in
167 subcommand of 'ipcluster'. If your cluster directory is in
167 the cwd or the ipython directory, you can simply refer to it
168 the cwd or the ipython directory, you can simply refer to it
168 using its profile name, 'ipcluster start -n 4 -p <profile>`,
169 using its profile name, 'ipcluster start -n 4 -p <profile>`,
169 otherwise use the '--cluster-dir' option.
170 otherwise use the '--cluster-dir' option.
170 """
171 """
171 )
172 )
172
173
173 paa = parser_start.add_argument
174 paa = parser_start.add_argument
174 paa('-n', '--number',
175 paa('-n', '--number',
175 type=int, dest='Global.n',
176 type=int, dest='Global.n',
176 help='The number of engines to start.',
177 help='The number of engines to start.',
177 metavar='Global.n')
178 metavar='Global.n')
178 paa('--clean-logs',
179 paa('--clean-logs',
179 dest='Global.clean_logs', action='store_true',
180 dest='Global.clean_logs', action='store_true',
180 help='Delete old log flies before starting.')
181 help='Delete old log flies before starting.')
181 paa('--no-clean-logs',
182 paa('--no-clean-logs',
182 dest='Global.clean_logs', action='store_false',
183 dest='Global.clean_logs', action='store_false',
183 help="Don't delete old log flies before starting.")
184 help="Don't delete old log flies before starting.")
184 paa('--daemon',
185 paa('--daemon',
185 dest='Global.daemonize', action='store_true',
186 dest='Global.daemonize', action='store_true',
186 help='Daemonize the ipcluster program. This implies --log-to-file')
187 help='Daemonize the ipcluster program. This implies --log-to-file')
187 paa('--no-daemon',
188 paa('--no-daemon',
188 dest='Global.daemonize', action='store_false',
189 dest='Global.daemonize', action='store_false',
189 help="Dont't daemonize the ipcluster program.")
190 help="Dont't daemonize the ipcluster program.")
190 paa('--delay',
191 paa('--delay',
191 type=float, dest='Global.delay',
192 type=float, dest='Global.delay',
192 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
193 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
193
194
194 # The "stop" subcommand parser
195 # The "stop" subcommand parser
195 parser_stop = subparsers.add_parser(
196 parser_stop = subparsers.add_parser(
196 'stop',
197 'stop',
197 parents=[parent_parser1, parent_parser2],
198 parents=[parent_parser1, parent_parser2],
198 argument_default=SUPPRESS,
199 argument_default=SUPPRESS,
199 help="Stop a running cluster.",
200 help="Stop a running cluster.",
200 description=
201 description=
201 """Stop a running ipython cluster by its profile name or cluster
202 """Stop a running ipython cluster by its profile name or cluster
202 directory. Cluster directories are named using the convention
203 directory. Cluster directories are named using the convention
203 'cluster_<profile>'. If your cluster directory is in
204 'cluster_<profile>'. If your cluster directory is in
204 the cwd or the ipython directory, you can simply refer to it
205 the cwd or the ipython directory, you can simply refer to it
205 using its profile name, 'ipcluster stop -p <profile>`, otherwise
206 using its profile name, 'ipcluster stop -p <profile>`, otherwise
206 use the '--cluster-dir' option.
207 use the '--cluster-dir' option.
207 """
208 """
208 )
209 )
209 paa = parser_stop.add_argument
210 paa = parser_stop.add_argument
210 paa('--signal',
211 paa('--signal',
211 dest='Global.signal', type=int,
212 dest='Global.signal', type=int,
212 help="The signal number to use in stopping the cluster (default=2).",
213 help="The signal number to use in stopping the cluster (default=2).",
213 metavar="Global.signal")
214 metavar="Global.signal")
214
215
215 # the "engines" subcommand parser
216 # the "engines" subcommand parser
216 parser_engines = subparsers.add_parser(
217 parser_engines = subparsers.add_parser(
217 'engines',
218 'engines',
218 parents=[parent_parser1, parent_parser2],
219 parents=[parent_parser1, parent_parser2],
219 argument_default=SUPPRESS,
220 argument_default=SUPPRESS,
220 help="Attach some engines to an existing controller or cluster.",
221 help="Attach some engines to an existing controller or cluster.",
221 description=
222 description=
222 """Start one or more engines to connect to an existing Cluster
223 """Start one or more engines to connect to an existing Cluster
223 by profile name or cluster directory.
224 by profile name or cluster directory.
224 Cluster directories contain configuration, log and
225 Cluster directories contain configuration, log and
225 security related files and are named using the convention
226 security related files and are named using the convention
226 'cluster_<profile>' and should be creating using the 'start'
227 'cluster_<profile>' and should be creating using the 'start'
227 subcommand of 'ipcluster'. If your cluster directory is in
228 subcommand of 'ipcluster'. If your cluster directory is in
228 the cwd or the ipython directory, you can simply refer to it
229 the cwd or the ipython directory, you can simply refer to it
229 using its profile name, 'ipcluster engines -n 4 -p <profile>`,
230 using its profile name, 'ipcluster engines -n 4 -p <profile>`,
230 otherwise use the '--cluster-dir' option.
231 otherwise use the '--cluster-dir' option.
231 """
232 """
232 )
233 )
233 paa = parser_engines.add_argument
234 paa = parser_engines.add_argument
234 paa('-n', '--number',
235 paa('-n', '--number',
235 type=int, dest='Global.n',
236 type=int, dest='Global.n',
236 help='The number of engines to start.',
237 help='The number of engines to start.',
237 metavar='Global.n')
238 metavar='Global.n')
238 paa('--daemon',
239 paa('--daemon',
239 dest='Global.daemonize', action='store_true',
240 dest='Global.daemonize', action='store_true',
240 help='Daemonize the ipcluster program. This implies --log-to-file')
241 help='Daemonize the ipcluster program. This implies --log-to-file')
241 paa('--no-daemon',
242 paa('--no-daemon',
242 dest='Global.daemonize', action='store_false',
243 dest='Global.daemonize', action='store_false',
243 help="Dont't daemonize the ipcluster program.")
244 help="Dont't daemonize the ipcluster program.")
244
245
245 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
246 # Main application
247 # Main application
247 #-----------------------------------------------------------------------------
248 #-----------------------------------------------------------------------------
248
249
249
250
250 class IPClusterApp(ApplicationWithClusterDir):
251 class IPClusterApp(ApplicationWithClusterDir):
251
252
252 name = u'ipcluster'
253 name = u'ipcluster'
253 description = _description
254 description = _description
254 usage = None
255 usage = None
255 command_line_loader = IPClusterAppConfigLoader
256 command_line_loader = IPClusterAppConfigLoader
256 default_config_file_name = default_config_file_name
257 default_config_file_name = default_config_file_name
257 default_log_level = logging.INFO
258 default_log_level = logging.INFO
258 auto_create_cluster_dir = False
259 auto_create_cluster_dir = False
259
260
260 def create_default_config(self):
261 def create_default_config(self):
261 super(IPClusterApp, self).create_default_config()
262 super(IPClusterApp, self).create_default_config()
262 self.default_config.Global.controller_launcher = \
263 self.default_config.Global.controller_launcher = \
263 'IPython.parallel.launcher.LocalControllerLauncher'
264 'IPython.parallel.apps.launcher.LocalControllerLauncher'
264 self.default_config.Global.engine_launcher = \
265 self.default_config.Global.engine_launcher = \
265 'IPython.parallel.launcher.LocalEngineSetLauncher'
266 'IPython.parallel.apps.launcher.LocalEngineSetLauncher'
266 self.default_config.Global.n = 2
267 self.default_config.Global.n = 2
267 self.default_config.Global.delay = 2
268 self.default_config.Global.delay = 2
268 self.default_config.Global.reset_config = False
269 self.default_config.Global.reset_config = False
269 self.default_config.Global.clean_logs = True
270 self.default_config.Global.clean_logs = True
270 self.default_config.Global.signal = signal.SIGINT
271 self.default_config.Global.signal = signal.SIGINT
271 self.default_config.Global.daemonize = False
272 self.default_config.Global.daemonize = False
272
273
273 def find_resources(self):
274 def find_resources(self):
274 subcommand = self.command_line_config.Global.subcommand
275 subcommand = self.command_line_config.Global.subcommand
275 if subcommand=='list':
276 if subcommand=='list':
276 self.list_cluster_dirs()
277 self.list_cluster_dirs()
277 # Exit immediately because there is nothing left to do.
278 # Exit immediately because there is nothing left to do.
278 self.exit()
279 self.exit()
279 elif subcommand=='create':
280 elif subcommand=='create':
280 self.auto_create_cluster_dir = True
281 self.auto_create_cluster_dir = True
281 super(IPClusterApp, self).find_resources()
282 super(IPClusterApp, self).find_resources()
282 elif subcommand=='start' or subcommand=='stop':
283 elif subcommand=='start' or subcommand=='stop':
283 self.auto_create_cluster_dir = True
284 self.auto_create_cluster_dir = True
284 try:
285 try:
285 super(IPClusterApp, self).find_resources()
286 super(IPClusterApp, self).find_resources()
286 except ClusterDirError:
287 except ClusterDirError:
287 raise ClusterDirError(
288 raise ClusterDirError(
288 "Could not find a cluster directory. A cluster dir must "
289 "Could not find a cluster directory. A cluster dir must "
289 "be created before running 'ipcluster start'. Do "
290 "be created before running 'ipcluster start'. Do "
290 "'ipcluster create -h' or 'ipcluster list -h' for more "
291 "'ipcluster create -h' or 'ipcluster list -h' for more "
291 "information about creating and listing cluster dirs."
292 "information about creating and listing cluster dirs."
292 )
293 )
293 elif subcommand=='engines':
294 elif subcommand=='engines':
294 self.auto_create_cluster_dir = False
295 self.auto_create_cluster_dir = False
295 try:
296 try:
296 super(IPClusterApp, self).find_resources()
297 super(IPClusterApp, self).find_resources()
297 except ClusterDirError:
298 except ClusterDirError:
298 raise ClusterDirError(
299 raise ClusterDirError(
299 "Could not find a cluster directory. A cluster dir must "
300 "Could not find a cluster directory. A cluster dir must "
300 "be created before running 'ipcluster start'. Do "
301 "be created before running 'ipcluster start'. Do "
301 "'ipcluster create -h' or 'ipcluster list -h' for more "
302 "'ipcluster create -h' or 'ipcluster list -h' for more "
302 "information about creating and listing cluster dirs."
303 "information about creating and listing cluster dirs."
303 )
304 )
304
305
305 def list_cluster_dirs(self):
306 def list_cluster_dirs(self):
306 # Find the search paths
307 # Find the search paths
307 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
308 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
308 if cluster_dir_paths:
309 if cluster_dir_paths:
309 cluster_dir_paths = cluster_dir_paths.split(':')
310 cluster_dir_paths = cluster_dir_paths.split(':')
310 else:
311 else:
311 cluster_dir_paths = []
312 cluster_dir_paths = []
312 try:
313 try:
313 ipython_dir = self.command_line_config.Global.ipython_dir
314 ipython_dir = self.command_line_config.Global.ipython_dir
314 except AttributeError:
315 except AttributeError:
315 ipython_dir = self.default_config.Global.ipython_dir
316 ipython_dir = self.default_config.Global.ipython_dir
316 paths = [os.getcwd(), ipython_dir] + \
317 paths = [os.getcwd(), ipython_dir] + \
317 cluster_dir_paths
318 cluster_dir_paths
318 paths = list(set(paths))
319 paths = list(set(paths))
319
320
320 self.log.info('Searching for cluster dirs in paths: %r' % paths)
321 self.log.info('Searching for cluster dirs in paths: %r' % paths)
321 for path in paths:
322 for path in paths:
322 files = os.listdir(path)
323 files = os.listdir(path)
323 for f in files:
324 for f in files:
324 full_path = os.path.join(path, f)
325 full_path = os.path.join(path, f)
325 if os.path.isdir(full_path) and f.startswith('cluster_'):
326 if os.path.isdir(full_path) and f.startswith('cluster_'):
326 profile = full_path.split('_')[-1]
327 profile = full_path.split('_')[-1]
327 start_cmd = 'ipcluster start -p %s -n 4' % profile
328 start_cmd = 'ipcluster start -p %s -n 4' % profile
328 print start_cmd + " ==> " + full_path
329 print start_cmd + " ==> " + full_path
329
330
330 def pre_construct(self):
331 def pre_construct(self):
331 # IPClusterApp.pre_construct() is where we cd to the working directory.
332 # IPClusterApp.pre_construct() is where we cd to the working directory.
332 super(IPClusterApp, self).pre_construct()
333 super(IPClusterApp, self).pre_construct()
333 config = self.master_config
334 config = self.master_config
334 try:
335 try:
335 daemon = config.Global.daemonize
336 daemon = config.Global.daemonize
336 if daemon:
337 if daemon:
337 config.Global.log_to_file = True
338 config.Global.log_to_file = True
338 except AttributeError:
339 except AttributeError:
339 pass
340 pass
340
341
341 def construct(self):
342 def construct(self):
342 config = self.master_config
343 config = self.master_config
343 subcmd = config.Global.subcommand
344 subcmd = config.Global.subcommand
344 reset = config.Global.reset_config
345 reset = config.Global.reset_config
345 if subcmd == 'list':
346 if subcmd == 'list':
346 return
347 return
347 if subcmd == 'create':
348 if subcmd == 'create':
348 self.log.info('Copying default config files to cluster directory '
349 self.log.info('Copying default config files to cluster directory '
349 '[overwrite=%r]' % (reset,))
350 '[overwrite=%r]' % (reset,))
350 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
351 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
351 if subcmd =='start':
352 if subcmd =='start':
352 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
353 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
353 self.start_logging()
354 self.start_logging()
354 self.loop = ioloop.IOLoop.instance()
355 self.loop = ioloop.IOLoop.instance()
355 # reactor.callWhenRunning(self.start_launchers)
356 # reactor.callWhenRunning(self.start_launchers)
356 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
357 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
357 dc.start()
358 dc.start()
358 if subcmd == 'engines':
359 if subcmd == 'engines':
359 self.start_logging()
360 self.start_logging()
360 self.loop = ioloop.IOLoop.instance()
361 self.loop = ioloop.IOLoop.instance()
361 # reactor.callWhenRunning(self.start_launchers)
362 # reactor.callWhenRunning(self.start_launchers)
362 engine_only = lambda : self.start_launchers(controller=False)
363 engine_only = lambda : self.start_launchers(controller=False)
363 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
364 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
364 dc.start()
365 dc.start()
365
366
366 def start_launchers(self, controller=True):
367 def start_launchers(self, controller=True):
367 config = self.master_config
368 config = self.master_config
368
369
369 # Create the launchers. In both bases, we set the work_dir of
370 # Create the launchers. In both bases, we set the work_dir of
370 # the launcher to the cluster_dir. This is where the launcher's
371 # the launcher to the cluster_dir. This is where the launcher's
371 # subprocesses will be launched. It is not where the controller
372 # subprocesses will be launched. It is not where the controller
372 # and engine will be launched.
373 # and engine will be launched.
373 if controller:
374 if controller:
374 cl_class = import_item(config.Global.controller_launcher)
375 cl_class = import_item(config.Global.controller_launcher)
375 self.controller_launcher = cl_class(
376 self.controller_launcher = cl_class(
376 work_dir=self.cluster_dir, config=config,
377 work_dir=self.cluster_dir, config=config,
377 logname=self.log.name
378 logname=self.log.name
378 )
379 )
379 # Setup the observing of stopping. If the controller dies, shut
380 # Setup the observing of stopping. If the controller dies, shut
380 # everything down as that will be completely fatal for the engines.
381 # everything down as that will be completely fatal for the engines.
381 self.controller_launcher.on_stop(self.stop_launchers)
382 self.controller_launcher.on_stop(self.stop_launchers)
382 # But, we don't monitor the stopping of engines. An engine dying
383 # But, we don't monitor the stopping of engines. An engine dying
383 # is just fine and in principle a user could start a new engine.
384 # is just fine and in principle a user could start a new engine.
384 # Also, if we did monitor engine stopping, it is difficult to
385 # Also, if we did monitor engine stopping, it is difficult to
385 # know what to do when only some engines die. Currently, the
386 # know what to do when only some engines die. Currently, the
386 # observing of engine stopping is inconsistent. Some launchers
387 # observing of engine stopping is inconsistent. Some launchers
387 # might trigger on a single engine stopping, other wait until
388 # might trigger on a single engine stopping, other wait until
388 # all stop. TODO: think more about how to handle this.
389 # all stop. TODO: think more about how to handle this.
389 else:
390 else:
390 self.controller_launcher = None
391 self.controller_launcher = None
391
392
392 el_class = import_item(config.Global.engine_launcher)
393 el_class = import_item(config.Global.engine_launcher)
393 self.engine_launcher = el_class(
394 self.engine_launcher = el_class(
394 work_dir=self.cluster_dir, config=config, logname=self.log.name
395 work_dir=self.cluster_dir, config=config, logname=self.log.name
395 )
396 )
396
397
397 # Setup signals
398 # Setup signals
398 signal.signal(signal.SIGINT, self.sigint_handler)
399 signal.signal(signal.SIGINT, self.sigint_handler)
399
400
400 # Start the controller and engines
401 # Start the controller and engines
401 self._stopping = False # Make sure stop_launchers is not called 2x.
402 self._stopping = False # Make sure stop_launchers is not called 2x.
402 if controller:
403 if controller:
403 self.start_controller()
404 self.start_controller()
404 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
405 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
405 dc.start()
406 dc.start()
406 self.startup_message()
407 self.startup_message()
407
408
408 def startup_message(self, r=None):
409 def startup_message(self, r=None):
409 self.log.info("IPython cluster: started")
410 self.log.info("IPython cluster: started")
410 return r
411 return r
411
412
412 def start_controller(self, r=None):
413 def start_controller(self, r=None):
413 # self.log.info("In start_controller")
414 # self.log.info("In start_controller")
414 config = self.master_config
415 config = self.master_config
415 d = self.controller_launcher.start(
416 d = self.controller_launcher.start(
416 cluster_dir=config.Global.cluster_dir
417 cluster_dir=config.Global.cluster_dir
417 )
418 )
418 return d
419 return d
419
420
420 def start_engines(self, r=None):
421 def start_engines(self, r=None):
421 # self.log.info("In start_engines")
422 # self.log.info("In start_engines")
422 config = self.master_config
423 config = self.master_config
423
424
424 d = self.engine_launcher.start(
425 d = self.engine_launcher.start(
425 config.Global.n,
426 config.Global.n,
426 cluster_dir=config.Global.cluster_dir
427 cluster_dir=config.Global.cluster_dir
427 )
428 )
428 return d
429 return d
429
430
430 def stop_controller(self, r=None):
431 def stop_controller(self, r=None):
431 # self.log.info("In stop_controller")
432 # self.log.info("In stop_controller")
432 if self.controller_launcher and self.controller_launcher.running:
433 if self.controller_launcher and self.controller_launcher.running:
433 return self.controller_launcher.stop()
434 return self.controller_launcher.stop()
434
435
435 def stop_engines(self, r=None):
436 def stop_engines(self, r=None):
436 # self.log.info("In stop_engines")
437 # self.log.info("In stop_engines")
437 if self.engine_launcher.running:
438 if self.engine_launcher.running:
438 d = self.engine_launcher.stop()
439 d = self.engine_launcher.stop()
439 # d.addErrback(self.log_err)
440 # d.addErrback(self.log_err)
440 return d
441 return d
441 else:
442 else:
442 return None
443 return None
443
444
444 def log_err(self, f):
445 def log_err(self, f):
445 self.log.error(f.getTraceback())
446 self.log.error(f.getTraceback())
446 return None
447 return None
447
448
448 def stop_launchers(self, r=None):
449 def stop_launchers(self, r=None):
449 if not self._stopping:
450 if not self._stopping:
450 self._stopping = True
451 self._stopping = True
451 # if isinstance(r, failure.Failure):
452 # if isinstance(r, failure.Failure):
452 # self.log.error('Unexpected error in ipcluster:')
453 # self.log.error('Unexpected error in ipcluster:')
453 # self.log.info(r.getTraceback())
454 # self.log.info(r.getTraceback())
454 self.log.error("IPython cluster: stopping")
455 self.log.error("IPython cluster: stopping")
455 # These return deferreds. We are not doing anything with them
456 # These return deferreds. We are not doing anything with them
456 # but we are holding refs to them as a reminder that they
457 # but we are holding refs to them as a reminder that they
457 # do return deferreds.
458 # do return deferreds.
458 d1 = self.stop_engines()
459 d1 = self.stop_engines()
459 d2 = self.stop_controller()
460 d2 = self.stop_controller()
460 # Wait a few seconds to let things shut down.
461 # Wait a few seconds to let things shut down.
461 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
462 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
462 dc.start()
463 dc.start()
463 # reactor.callLater(4.0, reactor.stop)
464 # reactor.callLater(4.0, reactor.stop)
464
465
465 def sigint_handler(self, signum, frame):
466 def sigint_handler(self, signum, frame):
466 self.stop_launchers()
467 self.stop_launchers()
467
468
468 def start_logging(self):
469 def start_logging(self):
469 # Remove old log files of the controller and engine
470 # Remove old log files of the controller and engine
470 if self.master_config.Global.clean_logs:
471 if self.master_config.Global.clean_logs:
471 log_dir = self.master_config.Global.log_dir
472 log_dir = self.master_config.Global.log_dir
472 for f in os.listdir(log_dir):
473 for f in os.listdir(log_dir):
473 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
474 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
474 os.remove(os.path.join(log_dir, f))
475 os.remove(os.path.join(log_dir, f))
475 # This will remove old log files for ipcluster itself
476 # This will remove old log files for ipcluster itself
476 super(IPClusterApp, self).start_logging()
477 super(IPClusterApp, self).start_logging()
477
478
478 def start_app(self):
479 def start_app(self):
479 """Start the application, depending on what subcommand is used."""
480 """Start the application, depending on what subcommand is used."""
480 subcmd = self.master_config.Global.subcommand
481 subcmd = self.master_config.Global.subcommand
481 if subcmd=='create' or subcmd=='list':
482 if subcmd=='create' or subcmd=='list':
482 return
483 return
483 elif subcmd=='start':
484 elif subcmd=='start':
484 self.start_app_start()
485 self.start_app_start()
485 elif subcmd=='stop':
486 elif subcmd=='stop':
486 self.start_app_stop()
487 self.start_app_stop()
487 elif subcmd=='engines':
488 elif subcmd=='engines':
488 self.start_app_engines()
489 self.start_app_engines()
489
490
490 def start_app_start(self):
491 def start_app_start(self):
491 """Start the app for the start subcommand."""
492 """Start the app for the start subcommand."""
492 config = self.master_config
493 config = self.master_config
493 # First see if the cluster is already running
494 # First see if the cluster is already running
494 try:
495 try:
495 pid = self.get_pid_from_file()
496 pid = self.get_pid_from_file()
496 except PIDFileError:
497 except PIDFileError:
497 pass
498 pass
498 else:
499 else:
499 self.log.critical(
500 self.log.critical(
500 'Cluster is already running with [pid=%s]. '
501 'Cluster is already running with [pid=%s]. '
501 'use "ipcluster stop" to stop the cluster.' % pid
502 'use "ipcluster stop" to stop the cluster.' % pid
502 )
503 )
503 # Here I exit with a unusual exit status that other processes
504 # Here I exit with a unusual exit status that other processes
504 # can watch for to learn how I existed.
505 # can watch for to learn how I existed.
505 self.exit(ALREADY_STARTED)
506 self.exit(ALREADY_STARTED)
506
507
507 # Now log and daemonize
508 # Now log and daemonize
508 self.log.info(
509 self.log.info(
509 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
510 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
510 )
511 )
511 # TODO: Get daemonize working on Windows or as a Windows Server.
512 # TODO: Get daemonize working on Windows or as a Windows Server.
512 if config.Global.daemonize:
513 if config.Global.daemonize:
513 if os.name=='posix':
514 if os.name=='posix':
514 from twisted.scripts._twistd_unix import daemonize
515 from twisted.scripts._twistd_unix import daemonize
515 daemonize()
516 daemonize()
516
517
517 # Now write the new pid file AFTER our new forked pid is active.
518 # Now write the new pid file AFTER our new forked pid is active.
518 self.write_pid_file()
519 self.write_pid_file()
519 try:
520 try:
520 self.loop.start()
521 self.loop.start()
521 except KeyboardInterrupt:
522 except KeyboardInterrupt:
522 pass
523 pass
523 except zmq.ZMQError as e:
524 except zmq.ZMQError as e:
524 if e.errno == errno.EINTR:
525 if e.errno == errno.EINTR:
525 pass
526 pass
526 else:
527 else:
527 raise
528 raise
528 self.remove_pid_file()
529 self.remove_pid_file()
529
530
530 def start_app_engines(self):
531 def start_app_engines(self):
531 """Start the app for the start subcommand."""
532 """Start the app for the start subcommand."""
532 config = self.master_config
533 config = self.master_config
533 # First see if the cluster is already running
534 # First see if the cluster is already running
534
535
535 # Now log and daemonize
536 # Now log and daemonize
536 self.log.info(
537 self.log.info(
537 'Starting engines with [daemon=%r]' % config.Global.daemonize
538 'Starting engines with [daemon=%r]' % config.Global.daemonize
538 )
539 )
539 # TODO: Get daemonize working on Windows or as a Windows Server.
540 # TODO: Get daemonize working on Windows or as a Windows Server.
540 if config.Global.daemonize:
541 if config.Global.daemonize:
541 if os.name=='posix':
542 if os.name=='posix':
542 from twisted.scripts._twistd_unix import daemonize
543 from twisted.scripts._twistd_unix import daemonize
543 daemonize()
544 daemonize()
544
545
545 # Now write the new pid file AFTER our new forked pid is active.
546 # Now write the new pid file AFTER our new forked pid is active.
546 # self.write_pid_file()
547 # self.write_pid_file()
547 try:
548 try:
548 self.loop.start()
549 self.loop.start()
549 except KeyboardInterrupt:
550 except KeyboardInterrupt:
550 pass
551 pass
551 except zmq.ZMQError as e:
552 except zmq.ZMQError as e:
552 if e.errno == errno.EINTR:
553 if e.errno == errno.EINTR:
553 pass
554 pass
554 else:
555 else:
555 raise
556 raise
556 # self.remove_pid_file()
557 # self.remove_pid_file()
557
558
558 def start_app_stop(self):
559 def start_app_stop(self):
559 """Start the app for the stop subcommand."""
560 """Start the app for the stop subcommand."""
560 config = self.master_config
561 config = self.master_config
561 try:
562 try:
562 pid = self.get_pid_from_file()
563 pid = self.get_pid_from_file()
563 except PIDFileError:
564 except PIDFileError:
564 self.log.critical(
565 self.log.critical(
565 'Problem reading pid file, cluster is probably not running.'
566 'Problem reading pid file, cluster is probably not running.'
566 )
567 )
567 # Here I exit with a unusual exit status that other processes
568 # Here I exit with a unusual exit status that other processes
568 # can watch for to learn how I existed.
569 # can watch for to learn how I existed.
569 self.exit(ALREADY_STOPPED)
570 self.exit(ALREADY_STOPPED)
570 else:
571 else:
571 if os.name=='posix':
572 if os.name=='posix':
572 sig = config.Global.signal
573 sig = config.Global.signal
573 self.log.info(
574 self.log.info(
574 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
575 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
575 )
576 )
576 os.kill(pid, sig)
577 os.kill(pid, sig)
577 elif os.name=='nt':
578 elif os.name=='nt':
578 # As of right now, we don't support daemonize on Windows, so
579 # As of right now, we don't support daemonize on Windows, so
579 # stop will not do anything. Minimally, it should clean up the
580 # stop will not do anything. Minimally, it should clean up the
580 # old .pid files.
581 # old .pid files.
581 self.remove_pid_file()
582 self.remove_pid_file()
582
583
583
584
584 def launch_new_instance():
585 def launch_new_instance():
585 """Create and run the IPython cluster."""
586 """Create and run the IPython cluster."""
586 app = IPClusterApp()
587 app = IPClusterApp()
587 app.start()
588 app.start()
588
589
589
590
590 if __name__ == '__main__':
591 if __name__ == '__main__':
591 launch_new_instance()
592 launch_new_instance()
592
593
@@ -1,432 +1,433 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import copy
20 import copy
21 import os
21 import os
22 import logging
22 import logging
23 import socket
23 import socket
24 import stat
24 import stat
25 import sys
25 import sys
26 import uuid
26 import uuid
27
27
28 import zmq
28 import zmq
29 from zmq.log.handlers import PUBHandler
29 from zmq.log.handlers import PUBHandler
30 from zmq.utils import jsonapi as json
30 from zmq.utils import jsonapi as json
31
31
32 from IPython.config.loader import Config
32 from IPython.config.loader import Config
33
33
34 from IPython.parallel import factory
34 from IPython.parallel import factory
35 from .clusterdir import (
35
36 from IPython.parallel.apps.clusterdir import (
36 ApplicationWithClusterDir,
37 ApplicationWithClusterDir,
37 ClusterDirConfigLoader
38 ClusterDirConfigLoader
38 )
39 )
39 from IPython.parallel.util import disambiguate_ip_address, split_url
40 from IPython.parallel.util import disambiguate_ip_address, split_url
40 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
41 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
41 from IPython.utils.traitlets import Instance, Unicode
42 from IPython.utils.traitlets import Instance, Unicode
42
43
43 from IPython.parallel.controller.controller import ControllerFactory
44 from IPython.parallel.controller.controller import ControllerFactory
44
45
45
46
46 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
47 # Module level variables
48 # Module level variables
48 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
49
50
50
51
51 #: The default config file name for this application
52 #: The default config file name for this application
52 default_config_file_name = u'ipcontroller_config.py'
53 default_config_file_name = u'ipcontroller_config.py'
53
54
54
55
55 _description = """Start the IPython controller for parallel computing.
56 _description = """Start the IPython controller for parallel computing.
56
57
57 The IPython controller provides a gateway between the IPython engines and
58 The IPython controller provides a gateway between the IPython engines and
58 clients. The controller needs to be started before the engines and can be
59 clients. The controller needs to be started before the engines and can be
59 configured using command line options or using a cluster directory. Cluster
60 configured using command line options or using a cluster directory. Cluster
60 directories contain config, log and security files and are usually located in
61 directories contain config, log and security files and are usually located in
61 your ipython directory and named as "cluster_<profile>". See the --profile
62 your ipython directory and named as "cluster_<profile>". See the --profile
62 and --cluster-dir options for details.
63 and --cluster-dir options for details.
63 """
64 """
64
65
65 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
66 # Default interfaces
67 # Default interfaces
67 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
68
69
69 # The default client interfaces for FCClientServiceFactory.interfaces
70 # The default client interfaces for FCClientServiceFactory.interfaces
70 default_client_interfaces = Config()
71 default_client_interfaces = Config()
71 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
72 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
72
73
73 # Make this a dict we can pass to Config.__init__ for the default
74 # Make this a dict we can pass to Config.__init__ for the default
74 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
75 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
75
76
76
77
77
78
78 # The default engine interfaces for FCEngineServiceFactory.interfaces
79 # The default engine interfaces for FCEngineServiceFactory.interfaces
79 default_engine_interfaces = Config()
80 default_engine_interfaces = Config()
80 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
81 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
81
82
82 # Make this a dict we can pass to Config.__init__ for the default
83 # Make this a dict we can pass to Config.__init__ for the default
83 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
84 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
84
85
85
86
86 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
87 # Service factories
88 # Service factories
88 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
89
90
90 #
91 #
91 # class FCClientServiceFactory(FCServiceFactory):
92 # class FCClientServiceFactory(FCServiceFactory):
92 # """A Foolscap implementation of the client services."""
93 # """A Foolscap implementation of the client services."""
93 #
94 #
94 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
95 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
95 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
96 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
96 # allow_none=False, config=True)
97 # allow_none=False, config=True)
97 #
98 #
98 #
99 #
99 # class FCEngineServiceFactory(FCServiceFactory):
100 # class FCEngineServiceFactory(FCServiceFactory):
100 # """A Foolscap implementation of the engine services."""
101 # """A Foolscap implementation of the engine services."""
101 #
102 #
102 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
103 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
103 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
104 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
104 # allow_none=False, config=True)
105 # allow_none=False, config=True)
105 #
106 #
106
107
107 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
108 # Command line options
109 # Command line options
109 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
110
111
111
112
112 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
113 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
113
114
114 def _add_arguments(self):
115 def _add_arguments(self):
115 super(IPControllerAppConfigLoader, self)._add_arguments()
116 super(IPControllerAppConfigLoader, self)._add_arguments()
116 paa = self.parser.add_argument
117 paa = self.parser.add_argument
117
118
118 ## Hub Config:
119 ## Hub Config:
119 paa('--mongodb',
120 paa('--mongodb',
120 dest='HubFactory.db_class', action='store_const',
121 dest='HubFactory.db_class', action='store_const',
121 const='IPython.parallel.controller.mongodb.MongoDB',
122 const='IPython.parallel.controller.mongodb.MongoDB',
122 help='Use MongoDB for task storage [default: in-memory]')
123 help='Use MongoDB for task storage [default: in-memory]')
123 paa('--sqlite',
124 paa('--sqlite',
124 dest='HubFactory.db_class', action='store_const',
125 dest='HubFactory.db_class', action='store_const',
125 const='IPython.parallel.controller.sqlitedb.SQLiteDB',
126 const='IPython.parallel.controller.sqlitedb.SQLiteDB',
126 help='Use SQLite3 for DB task storage [default: in-memory]')
127 help='Use SQLite3 for DB task storage [default: in-memory]')
127 paa('--hb',
128 paa('--hb',
128 type=int, dest='HubFactory.hb', nargs=2,
129 type=int, dest='HubFactory.hb', nargs=2,
129 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
130 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
130 'connections [default: random]',
131 'connections [default: random]',
131 metavar='Hub.hb_ports')
132 metavar='Hub.hb_ports')
132 paa('--ping',
133 paa('--ping',
133 type=int, dest='HubFactory.ping',
134 type=int, dest='HubFactory.ping',
134 help='The frequency at which the Hub pings the engines for heartbeats '
135 help='The frequency at which the Hub pings the engines for heartbeats '
135 ' (in ms) [default: 100]',
136 ' (in ms) [default: 100]',
136 metavar='Hub.ping')
137 metavar='Hub.ping')
137
138
138 # Client config
139 # Client config
139 paa('--client-ip',
140 paa('--client-ip',
140 type=str, dest='HubFactory.client_ip',
141 type=str, dest='HubFactory.client_ip',
141 help='The IP address or hostname the Hub will listen on for '
142 help='The IP address or hostname the Hub will listen on for '
142 'client connections. Both engine-ip and client-ip can be set simultaneously '
143 'client connections. Both engine-ip and client-ip can be set simultaneously '
143 'via --ip [default: loopback]',
144 'via --ip [default: loopback]',
144 metavar='Hub.client_ip')
145 metavar='Hub.client_ip')
145 paa('--client-transport',
146 paa('--client-transport',
146 type=str, dest='HubFactory.client_transport',
147 type=str, dest='HubFactory.client_transport',
147 help='The ZeroMQ transport the Hub will use for '
148 help='The ZeroMQ transport the Hub will use for '
148 'client connections. Both engine-transport and client-transport can be set simultaneously '
149 'client connections. Both engine-transport and client-transport can be set simultaneously '
149 'via --transport [default: tcp]',
150 'via --transport [default: tcp]',
150 metavar='Hub.client_transport')
151 metavar='Hub.client_transport')
151 paa('--query',
152 paa('--query',
152 type=int, dest='HubFactory.query_port',
153 type=int, dest='HubFactory.query_port',
153 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
154 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
154 metavar='Hub.query_port')
155 metavar='Hub.query_port')
155 paa('--notifier',
156 paa('--notifier',
156 type=int, dest='HubFactory.notifier_port',
157 type=int, dest='HubFactory.notifier_port',
157 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
158 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
158 metavar='Hub.notifier_port')
159 metavar='Hub.notifier_port')
159
160
160 # Engine config
161 # Engine config
161 paa('--engine-ip',
162 paa('--engine-ip',
162 type=str, dest='HubFactory.engine_ip',
163 type=str, dest='HubFactory.engine_ip',
163 help='The IP address or hostname the Hub will listen on for '
164 help='The IP address or hostname the Hub will listen on for '
164 'engine connections. This applies to the Hub and its schedulers'
165 'engine connections. This applies to the Hub and its schedulers'
165 'engine-ip and client-ip can be set simultaneously '
166 'engine-ip and client-ip can be set simultaneously '
166 'via --ip [default: loopback]',
167 'via --ip [default: loopback]',
167 metavar='Hub.engine_ip')
168 metavar='Hub.engine_ip')
168 paa('--engine-transport',
169 paa('--engine-transport',
169 type=str, dest='HubFactory.engine_transport',
170 type=str, dest='HubFactory.engine_transport',
170 help='The ZeroMQ transport the Hub will use for '
171 help='The ZeroMQ transport the Hub will use for '
171 'client connections. Both engine-transport and client-transport can be set simultaneously '
172 'client connections. Both engine-transport and client-transport can be set simultaneously '
172 'via --transport [default: tcp]',
173 'via --transport [default: tcp]',
173 metavar='Hub.engine_transport')
174 metavar='Hub.engine_transport')
174
175
175 # Scheduler config
176 # Scheduler config
176 paa('--mux',
177 paa('--mux',
177 type=int, dest='ControllerFactory.mux', nargs=2,
178 type=int, dest='ControllerFactory.mux', nargs=2,
178 help='The (2) ports the MUX scheduler will listen on for client,engine '
179 help='The (2) ports the MUX scheduler will listen on for client,engine '
179 'connections, respectively [default: random]',
180 'connections, respectively [default: random]',
180 metavar='Scheduler.mux_ports')
181 metavar='Scheduler.mux_ports')
181 paa('--task',
182 paa('--task',
182 type=int, dest='ControllerFactory.task', nargs=2,
183 type=int, dest='ControllerFactory.task', nargs=2,
183 help='The (2) ports the Task scheduler will listen on for client,engine '
184 help='The (2) ports the Task scheduler will listen on for client,engine '
184 'connections, respectively [default: random]',
185 'connections, respectively [default: random]',
185 metavar='Scheduler.task_ports')
186 metavar='Scheduler.task_ports')
186 paa('--control',
187 paa('--control',
187 type=int, dest='ControllerFactory.control', nargs=2,
188 type=int, dest='ControllerFactory.control', nargs=2,
188 help='The (2) ports the Control scheduler will listen on for client,engine '
189 help='The (2) ports the Control scheduler will listen on for client,engine '
189 'connections, respectively [default: random]',
190 'connections, respectively [default: random]',
190 metavar='Scheduler.control_ports')
191 metavar='Scheduler.control_ports')
191 paa('--iopub',
192 paa('--iopub',
192 type=int, dest='ControllerFactory.iopub', nargs=2,
193 type=int, dest='ControllerFactory.iopub', nargs=2,
193 help='The (2) ports the IOPub scheduler will listen on for client,engine '
194 help='The (2) ports the IOPub scheduler will listen on for client,engine '
194 'connections, respectively [default: random]',
195 'connections, respectively [default: random]',
195 metavar='Scheduler.iopub_ports')
196 metavar='Scheduler.iopub_ports')
196
197
197 paa('--scheme',
198 paa('--scheme',
198 type=str, dest='HubFactory.scheme',
199 type=str, dest='HubFactory.scheme',
199 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
200 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
200 help='select the task scheduler scheme [default: Python LRU]',
201 help='select the task scheduler scheme [default: Python LRU]',
201 metavar='Scheduler.scheme')
202 metavar='Scheduler.scheme')
202 paa('--usethreads',
203 paa('--usethreads',
203 dest='ControllerFactory.usethreads', action="store_true",
204 dest='ControllerFactory.usethreads', action="store_true",
204 help='Use threads instead of processes for the schedulers',
205 help='Use threads instead of processes for the schedulers',
205 )
206 )
206 paa('--hwm',
207 paa('--hwm',
207 dest='ControllerFactory.hwm', type=int,
208 dest='ControllerFactory.hwm', type=int,
208 help='specify the High Water Mark (HWM) for the downstream '
209 help='specify the High Water Mark (HWM) for the downstream '
209 'socket in the pure ZMQ scheduler. This is the maximum number '
210 'socket in the pure ZMQ scheduler. This is the maximum number '
210 'of allowed outstanding tasks on each engine.',
211 'of allowed outstanding tasks on each engine.',
211 )
212 )
212
213
213 ## Global config
214 ## Global config
214 paa('--log-to-file',
215 paa('--log-to-file',
215 action='store_true', dest='Global.log_to_file',
216 action='store_true', dest='Global.log_to_file',
216 help='Log to a file in the log directory (default is stdout)')
217 help='Log to a file in the log directory (default is stdout)')
217 paa('--log-url',
218 paa('--log-url',
218 type=str, dest='Global.log_url',
219 type=str, dest='Global.log_url',
219 help='Broadcast logs to an iploggerz process [default: disabled]')
220 help='Broadcast logs to an iploggerz process [default: disabled]')
220 paa('-r','--reuse-files',
221 paa('-r','--reuse-files',
221 action='store_true', dest='Global.reuse_files',
222 action='store_true', dest='Global.reuse_files',
222 help='Try to reuse existing json connection files.')
223 help='Try to reuse existing json connection files.')
223 paa('--no-secure',
224 paa('--no-secure',
224 action='store_false', dest='Global.secure',
225 action='store_false', dest='Global.secure',
225 help='Turn off execution keys (default).')
226 help='Turn off execution keys (default).')
226 paa('--secure',
227 paa('--secure',
227 action='store_true', dest='Global.secure',
228 action='store_true', dest='Global.secure',
228 help='Turn on execution keys.')
229 help='Turn on execution keys.')
229 paa('--execkey',
230 paa('--execkey',
230 type=str, dest='Global.exec_key',
231 type=str, dest='Global.exec_key',
231 help='path to a file containing an execution key.',
232 help='path to a file containing an execution key.',
232 metavar='keyfile')
233 metavar='keyfile')
233 paa('--ssh',
234 paa('--ssh',
234 type=str, dest='Global.sshserver',
235 type=str, dest='Global.sshserver',
235 help='ssh url for clients to use when connecting to the Controller '
236 help='ssh url for clients to use when connecting to the Controller '
236 'processes. It should be of the form: [user@]server[:port]. The '
237 'processes. It should be of the form: [user@]server[:port]. The '
237 'Controller\'s listening addresses must be accessible from the ssh server',
238 'Controller\'s listening addresses must be accessible from the ssh server',
238 metavar='Global.sshserver')
239 metavar='Global.sshserver')
239 paa('--location',
240 paa('--location',
240 type=str, dest='Global.location',
241 type=str, dest='Global.location',
241 help="The external IP or domain name of this machine, used for disambiguating "
242 help="The external IP or domain name of this machine, used for disambiguating "
242 "engine and client connections.",
243 "engine and client connections.",
243 metavar='Global.location')
244 metavar='Global.location')
244 factory.add_session_arguments(self.parser)
245 factory.add_session_arguments(self.parser)
245 factory.add_registration_arguments(self.parser)
246 factory.add_registration_arguments(self.parser)
246
247
247
248
248 #-----------------------------------------------------------------------------
249 #-----------------------------------------------------------------------------
249 # The main application
250 # The main application
250 #-----------------------------------------------------------------------------
251 #-----------------------------------------------------------------------------
251
252
252
253
253 class IPControllerApp(ApplicationWithClusterDir):
254 class IPControllerApp(ApplicationWithClusterDir):
254
255
255 name = u'ipcontroller'
256 name = u'ipcontroller'
256 description = _description
257 description = _description
257 command_line_loader = IPControllerAppConfigLoader
258 command_line_loader = IPControllerAppConfigLoader
258 default_config_file_name = default_config_file_name
259 default_config_file_name = default_config_file_name
259 auto_create_cluster_dir = True
260 auto_create_cluster_dir = True
260
261
261
262
262 def create_default_config(self):
263 def create_default_config(self):
263 super(IPControllerApp, self).create_default_config()
264 super(IPControllerApp, self).create_default_config()
264 # Don't set defaults for Global.secure or Global.reuse_furls
265 # Don't set defaults for Global.secure or Global.reuse_furls
265 # as those are set in a component.
266 # as those are set in a component.
266 self.default_config.Global.import_statements = []
267 self.default_config.Global.import_statements = []
267 self.default_config.Global.clean_logs = True
268 self.default_config.Global.clean_logs = True
268 self.default_config.Global.secure = True
269 self.default_config.Global.secure = True
269 self.default_config.Global.reuse_files = False
270 self.default_config.Global.reuse_files = False
270 self.default_config.Global.exec_key = "exec_key.key"
271 self.default_config.Global.exec_key = "exec_key.key"
271 self.default_config.Global.sshserver = None
272 self.default_config.Global.sshserver = None
272 self.default_config.Global.location = None
273 self.default_config.Global.location = None
273
274
274 def pre_construct(self):
275 def pre_construct(self):
275 super(IPControllerApp, self).pre_construct()
276 super(IPControllerApp, self).pre_construct()
276 c = self.master_config
277 c = self.master_config
277 # The defaults for these are set in FCClientServiceFactory and
278 # The defaults for these are set in FCClientServiceFactory and
278 # FCEngineServiceFactory, so we only set them here if the global
279 # FCEngineServiceFactory, so we only set them here if the global
279 # options have be set to override the class level defaults.
280 # options have be set to override the class level defaults.
280
281
281 # if hasattr(c.Global, 'reuse_furls'):
282 # if hasattr(c.Global, 'reuse_furls'):
282 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
283 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
283 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
284 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
284 # del c.Global.reuse_furls
285 # del c.Global.reuse_furls
285 # if hasattr(c.Global, 'secure'):
286 # if hasattr(c.Global, 'secure'):
286 # c.FCClientServiceFactory.secure = c.Global.secure
287 # c.FCClientServiceFactory.secure = c.Global.secure
287 # c.FCEngineServiceFactory.secure = c.Global.secure
288 # c.FCEngineServiceFactory.secure = c.Global.secure
288 # del c.Global.secure
289 # del c.Global.secure
289
290
290 def save_connection_dict(self, fname, cdict):
291 def save_connection_dict(self, fname, cdict):
291 """save a connection dict to json file."""
292 """save a connection dict to json file."""
292 c = self.master_config
293 c = self.master_config
293 url = cdict['url']
294 url = cdict['url']
294 location = cdict['location']
295 location = cdict['location']
295 if not location:
296 if not location:
296 try:
297 try:
297 proto,ip,port = split_url(url)
298 proto,ip,port = split_url(url)
298 except AssertionError:
299 except AssertionError:
299 pass
300 pass
300 else:
301 else:
301 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
302 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
302 cdict['location'] = location
303 cdict['location'] = location
303 fname = os.path.join(c.Global.security_dir, fname)
304 fname = os.path.join(c.Global.security_dir, fname)
304 with open(fname, 'w') as f:
305 with open(fname, 'w') as f:
305 f.write(json.dumps(cdict, indent=2))
306 f.write(json.dumps(cdict, indent=2))
306 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
307 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
307
308
308 def load_config_from_json(self):
309 def load_config_from_json(self):
309 """load config from existing json connector files."""
310 """load config from existing json connector files."""
310 c = self.master_config
311 c = self.master_config
311 # load from engine config
312 # load from engine config
312 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
313 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
313 cfg = json.loads(f.read())
314 cfg = json.loads(f.read())
314 key = c.SessionFactory.exec_key = cfg['exec_key']
315 key = c.SessionFactory.exec_key = cfg['exec_key']
315 xport,addr = cfg['url'].split('://')
316 xport,addr = cfg['url'].split('://')
316 c.HubFactory.engine_transport = xport
317 c.HubFactory.engine_transport = xport
317 ip,ports = addr.split(':')
318 ip,ports = addr.split(':')
318 c.HubFactory.engine_ip = ip
319 c.HubFactory.engine_ip = ip
319 c.HubFactory.regport = int(ports)
320 c.HubFactory.regport = int(ports)
320 c.Global.location = cfg['location']
321 c.Global.location = cfg['location']
321
322
322 # load client config
323 # load client config
323 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
324 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
324 cfg = json.loads(f.read())
325 cfg = json.loads(f.read())
325 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
326 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
326 xport,addr = cfg['url'].split('://')
327 xport,addr = cfg['url'].split('://')
327 c.HubFactory.client_transport = xport
328 c.HubFactory.client_transport = xport
328 ip,ports = addr.split(':')
329 ip,ports = addr.split(':')
329 c.HubFactory.client_ip = ip
330 c.HubFactory.client_ip = ip
330 c.Global.sshserver = cfg['ssh']
331 c.Global.sshserver = cfg['ssh']
331 assert int(ports) == c.HubFactory.regport, "regport mismatch"
332 assert int(ports) == c.HubFactory.regport, "regport mismatch"
332
333
333 def construct(self):
334 def construct(self):
334 # This is the working dir by now.
335 # This is the working dir by now.
335 sys.path.insert(0, '')
336 sys.path.insert(0, '')
336 c = self.master_config
337 c = self.master_config
337
338
338 self.import_statements()
339 self.import_statements()
339 reusing = c.Global.reuse_files
340 reusing = c.Global.reuse_files
340 if reusing:
341 if reusing:
341 try:
342 try:
342 self.load_config_from_json()
343 self.load_config_from_json()
343 except (AssertionError,IOError):
344 except (AssertionError,IOError):
344 reusing=False
345 reusing=False
345 # check again, because reusing may have failed:
346 # check again, because reusing may have failed:
346 if reusing:
347 if reusing:
347 pass
348 pass
348 elif c.Global.secure:
349 elif c.Global.secure:
349 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
350 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
350 key = str(uuid.uuid4())
351 key = str(uuid.uuid4())
351 with open(keyfile, 'w') as f:
352 with open(keyfile, 'w') as f:
352 f.write(key)
353 f.write(key)
353 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
354 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
354 c.SessionFactory.exec_key = key
355 c.SessionFactory.exec_key = key
355 else:
356 else:
356 c.SessionFactory.exec_key = ''
357 c.SessionFactory.exec_key = ''
357 key = None
358 key = None
358
359
359 try:
360 try:
360 self.factory = ControllerFactory(config=c, logname=self.log.name)
361 self.factory = ControllerFactory(config=c, logname=self.log.name)
361 self.start_logging()
362 self.start_logging()
362 self.factory.construct()
363 self.factory.construct()
363 except:
364 except:
364 self.log.error("Couldn't construct the Controller", exc_info=True)
365 self.log.error("Couldn't construct the Controller", exc_info=True)
365 self.exit(1)
366 self.exit(1)
366
367
367 if not reusing:
368 if not reusing:
368 # save to new json config files
369 # save to new json config files
369 f = self.factory
370 f = self.factory
370 cdict = {'exec_key' : key,
371 cdict = {'exec_key' : key,
371 'ssh' : c.Global.sshserver,
372 'ssh' : c.Global.sshserver,
372 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
373 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
373 'location' : c.Global.location
374 'location' : c.Global.location
374 }
375 }
375 self.save_connection_dict('ipcontroller-client.json', cdict)
376 self.save_connection_dict('ipcontroller-client.json', cdict)
376 edict = cdict
377 edict = cdict
377 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
378 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
378 self.save_connection_dict('ipcontroller-engine.json', edict)
379 self.save_connection_dict('ipcontroller-engine.json', edict)
379
380
380
381
381 def save_urls(self):
382 def save_urls(self):
382 """save the registration urls to files."""
383 """save the registration urls to files."""
383 c = self.master_config
384 c = self.master_config
384
385
385 sec_dir = c.Global.security_dir
386 sec_dir = c.Global.security_dir
386 cf = self.factory
387 cf = self.factory
387
388
388 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
389 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
389 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
390 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
390
391
391 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
392 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
392 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
393 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
393
394
394
395
395 def import_statements(self):
396 def import_statements(self):
396 statements = self.master_config.Global.import_statements
397 statements = self.master_config.Global.import_statements
397 for s in statements:
398 for s in statements:
398 try:
399 try:
399 self.log.msg("Executing statement: '%s'" % s)
400 self.log.msg("Executing statement: '%s'" % s)
400 exec s in globals(), locals()
401 exec s in globals(), locals()
401 except:
402 except:
402 self.log.msg("Error running statement: %s" % s)
403 self.log.msg("Error running statement: %s" % s)
403
404
404 def start_logging(self):
405 def start_logging(self):
405 super(IPControllerApp, self).start_logging()
406 super(IPControllerApp, self).start_logging()
406 if self.master_config.Global.log_url:
407 if self.master_config.Global.log_url:
407 context = self.factory.context
408 context = self.factory.context
408 lsock = context.socket(zmq.PUB)
409 lsock = context.socket(zmq.PUB)
409 lsock.connect(self.master_config.Global.log_url)
410 lsock.connect(self.master_config.Global.log_url)
410 handler = PUBHandler(lsock)
411 handler = PUBHandler(lsock)
411 handler.root_topic = 'controller'
412 handler.root_topic = 'controller'
412 handler.setLevel(self.log_level)
413 handler.setLevel(self.log_level)
413 self.log.addHandler(handler)
414 self.log.addHandler(handler)
414 #
415 #
415 def start_app(self):
416 def start_app(self):
416 # Start the subprocesses:
417 # Start the subprocesses:
417 self.factory.start()
418 self.factory.start()
418 self.write_pid_file(overwrite=True)
419 self.write_pid_file(overwrite=True)
419 try:
420 try:
420 self.factory.loop.start()
421 self.factory.loop.start()
421 except KeyboardInterrupt:
422 except KeyboardInterrupt:
422 self.log.critical("Interrupted, Exiting...\n")
423 self.log.critical("Interrupted, Exiting...\n")
423
424
424
425
425 def launch_new_instance():
426 def launch_new_instance():
426 """Create and run the IPython controller"""
427 """Create and run the IPython controller"""
427 app = IPControllerApp()
428 app = IPControllerApp()
428 app.start()
429 app.start()
429
430
430
431
431 if __name__ == '__main__':
432 if __name__ == '__main__':
432 launch_new_instance()
433 launch_new_instance()
@@ -1,303 +1,303 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import json
18 import json
19 import os
19 import os
20 import sys
20 import sys
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 from .clusterdir import (
25 from IPython.parallel.apps.clusterdir import (
26 ApplicationWithClusterDir,
26 ApplicationWithClusterDir,
27 ClusterDirConfigLoader
27 ClusterDirConfigLoader
28 )
28 )
29 from IPython.zmq.log import EnginePUBHandler
29 from IPython.zmq.log import EnginePUBHandler
30
30
31 from IPython.parallel import factory
31 from IPython.parallel import factory
32 from IPython.parallel.engine.engine import EngineFactory
32 from IPython.parallel.engine.engine import EngineFactory
33 from IPython.parallel.engine.streamkernel import Kernel
33 from IPython.parallel.engine.streamkernel import Kernel
34 from IPython.parallel.util import disambiguate_url
34 from IPython.parallel.util import disambiguate_url
35
35
36 from IPython.utils.importstring import import_item
36 from IPython.utils.importstring import import_item
37
37
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # Module level variables
40 # Module level variables
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 #: The default config file name for this application
43 #: The default config file name for this application
44 default_config_file_name = u'ipengine_config.py'
44 default_config_file_name = u'ipengine_config.py'
45
45
46
46
47 mpi4py_init = """from mpi4py import MPI as mpi
47 mpi4py_init = """from mpi4py import MPI as mpi
48 mpi.size = mpi.COMM_WORLD.Get_size()
48 mpi.size = mpi.COMM_WORLD.Get_size()
49 mpi.rank = mpi.COMM_WORLD.Get_rank()
49 mpi.rank = mpi.COMM_WORLD.Get_rank()
50 """
50 """
51
51
52
52
53 pytrilinos_init = """from PyTrilinos import Epetra
53 pytrilinos_init = """from PyTrilinos import Epetra
54 class SimpleStruct:
54 class SimpleStruct:
55 pass
55 pass
56 mpi = SimpleStruct()
56 mpi = SimpleStruct()
57 mpi.rank = 0
57 mpi.rank = 0
58 mpi.size = 0
58 mpi.size = 0
59 """
59 """
60
60
61
61
62 _description = """Start an IPython engine for parallel computing.\n\n
62 _description = """Start an IPython engine for parallel computing.\n\n
63
63
64 IPython engines run in parallel and perform computations on behalf of a client
64 IPython engines run in parallel and perform computations on behalf of a client
65 and controller. A controller needs to be started before the engines. The
65 and controller. A controller needs to be started before the engines. The
66 engine can be configured using command line options or using a cluster
66 engine can be configured using command line options or using a cluster
67 directory. Cluster directories contain config, log and security files and are
67 directory. Cluster directories contain config, log and security files and are
68 usually located in your ipython directory and named as "cluster_<profile>".
68 usually located in your ipython directory and named as "cluster_<profile>".
69 See the --profile and --cluster-dir options for details.
69 See the --profile and --cluster-dir options for details.
70 """
70 """
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # Command line options
73 # Command line options
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
77 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
78
78
79 def _add_arguments(self):
79 def _add_arguments(self):
80 super(IPEngineAppConfigLoader, self)._add_arguments()
80 super(IPEngineAppConfigLoader, self)._add_arguments()
81 paa = self.parser.add_argument
81 paa = self.parser.add_argument
82 # Controller config
82 # Controller config
83 paa('--file', '-f',
83 paa('--file', '-f',
84 type=unicode, dest='Global.url_file',
84 type=unicode, dest='Global.url_file',
85 help='The full location of the file containing the connection information fo '
85 help='The full location of the file containing the connection information fo '
86 'controller. If this is not given, the file must be in the '
86 'controller. If this is not given, the file must be in the '
87 'security directory of the cluster directory. This location is '
87 'security directory of the cluster directory. This location is '
88 'resolved using the --profile and --app-dir options.',
88 'resolved using the --profile and --app-dir options.',
89 metavar='Global.url_file')
89 metavar='Global.url_file')
90 # MPI
90 # MPI
91 paa('--mpi',
91 paa('--mpi',
92 type=str, dest='MPI.use',
92 type=str, dest='MPI.use',
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
94 metavar='MPI.use')
94 metavar='MPI.use')
95 # Global config
95 # Global config
96 paa('--log-to-file',
96 paa('--log-to-file',
97 action='store_true', dest='Global.log_to_file',
97 action='store_true', dest='Global.log_to_file',
98 help='Log to a file in the log directory (default is stdout)')
98 help='Log to a file in the log directory (default is stdout)')
99 paa('--log-url',
99 paa('--log-url',
100 dest='Global.log_url',
100 dest='Global.log_url',
101 help="url of ZMQ logger, as started with iploggerz")
101 help="url of ZMQ logger, as started with iploggerz")
102 # paa('--execkey',
102 # paa('--execkey',
103 # type=str, dest='Global.exec_key',
103 # type=str, dest='Global.exec_key',
104 # help='path to a file containing an execution key.',
104 # help='path to a file containing an execution key.',
105 # metavar='keyfile')
105 # metavar='keyfile')
106 # paa('--no-secure',
106 # paa('--no-secure',
107 # action='store_false', dest='Global.secure',
107 # action='store_false', dest='Global.secure',
108 # help='Turn off execution keys.')
108 # help='Turn off execution keys.')
109 # paa('--secure',
109 # paa('--secure',
110 # action='store_true', dest='Global.secure',
110 # action='store_true', dest='Global.secure',
111 # help='Turn on execution keys (default).')
111 # help='Turn on execution keys (default).')
112 # init command
112 # init command
113 paa('-c',
113 paa('-c',
114 type=str, dest='Global.extra_exec_lines',
114 type=str, dest='Global.extra_exec_lines',
115 help='specify a command to be run at startup')
115 help='specify a command to be run at startup')
116 paa('-s',
116 paa('-s',
117 type=unicode, dest='Global.extra_exec_file',
117 type=unicode, dest='Global.extra_exec_file',
118 help='specify a script to be run at startup')
118 help='specify a script to be run at startup')
119
119
120 factory.add_session_arguments(self.parser)
120 factory.add_session_arguments(self.parser)
121 factory.add_registration_arguments(self.parser)
121 factory.add_registration_arguments(self.parser)
122
122
123
123
124 #-----------------------------------------------------------------------------
124 #-----------------------------------------------------------------------------
125 # Main application
125 # Main application
126 #-----------------------------------------------------------------------------
126 #-----------------------------------------------------------------------------
127
127
128
128
129 class IPEngineApp(ApplicationWithClusterDir):
129 class IPEngineApp(ApplicationWithClusterDir):
130
130
131 name = u'ipengine'
131 name = u'ipengine'
132 description = _description
132 description = _description
133 command_line_loader = IPEngineAppConfigLoader
133 command_line_loader = IPEngineAppConfigLoader
134 default_config_file_name = default_config_file_name
134 default_config_file_name = default_config_file_name
135 auto_create_cluster_dir = True
135 auto_create_cluster_dir = True
136
136
137 def create_default_config(self):
137 def create_default_config(self):
138 super(IPEngineApp, self).create_default_config()
138 super(IPEngineApp, self).create_default_config()
139
139
140 # The engine should not clean logs as we don't want to remove the
140 # The engine should not clean logs as we don't want to remove the
141 # active log files of other running engines.
141 # active log files of other running engines.
142 self.default_config.Global.clean_logs = False
142 self.default_config.Global.clean_logs = False
143 self.default_config.Global.secure = True
143 self.default_config.Global.secure = True
144
144
145 # Global config attributes
145 # Global config attributes
146 self.default_config.Global.exec_lines = []
146 self.default_config.Global.exec_lines = []
147 self.default_config.Global.extra_exec_lines = ''
147 self.default_config.Global.extra_exec_lines = ''
148 self.default_config.Global.extra_exec_file = u''
148 self.default_config.Global.extra_exec_file = u''
149
149
150 # Configuration related to the controller
150 # Configuration related to the controller
151 # This must match the filename (path not included) that the controller
151 # This must match the filename (path not included) that the controller
152 # used for the FURL file.
152 # used for the FURL file.
153 self.default_config.Global.url_file = u''
153 self.default_config.Global.url_file = u''
154 self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
154 self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
155 # If given, this is the actual location of the controller's FURL file.
155 # If given, this is the actual location of the controller's FURL file.
156 # If not, this is computed using the profile, app_dir and furl_file_name
156 # If not, this is computed using the profile, app_dir and furl_file_name
157 # self.default_config.Global.key_file_name = u'exec_key.key'
157 # self.default_config.Global.key_file_name = u'exec_key.key'
158 # self.default_config.Global.key_file = u''
158 # self.default_config.Global.key_file = u''
159
159
160 # MPI related config attributes
160 # MPI related config attributes
161 self.default_config.MPI.use = ''
161 self.default_config.MPI.use = ''
162 self.default_config.MPI.mpi4py = mpi4py_init
162 self.default_config.MPI.mpi4py = mpi4py_init
163 self.default_config.MPI.pytrilinos = pytrilinos_init
163 self.default_config.MPI.pytrilinos = pytrilinos_init
164
164
165 def post_load_command_line_config(self):
165 def post_load_command_line_config(self):
166 pass
166 pass
167
167
168 def pre_construct(self):
168 def pre_construct(self):
169 super(IPEngineApp, self).pre_construct()
169 super(IPEngineApp, self).pre_construct()
170 # self.find_cont_url_file()
170 # self.find_cont_url_file()
171 self.find_url_file()
171 self.find_url_file()
172 if self.master_config.Global.extra_exec_lines:
172 if self.master_config.Global.extra_exec_lines:
173 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
173 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
174 if self.master_config.Global.extra_exec_file:
174 if self.master_config.Global.extra_exec_file:
175 enc = sys.getfilesystemencoding() or 'utf8'
175 enc = sys.getfilesystemencoding() or 'utf8'
176 cmd="execfile(%r)"%self.master_config.Global.extra_exec_file.encode(enc)
176 cmd="execfile(%r)"%self.master_config.Global.extra_exec_file.encode(enc)
177 self.master_config.Global.exec_lines.append(cmd)
177 self.master_config.Global.exec_lines.append(cmd)
178
178
179 # def find_key_file(self):
179 # def find_key_file(self):
180 # """Set the key file.
180 # """Set the key file.
181 #
181 #
182 # Here we don't try to actually see if it exists for is valid as that
182 # Here we don't try to actually see if it exists for is valid as that
183 # is hadled by the connection logic.
183 # is hadled by the connection logic.
184 # """
184 # """
185 # config = self.master_config
185 # config = self.master_config
186 # # Find the actual controller key file
186 # # Find the actual controller key file
187 # if not config.Global.key_file:
187 # if not config.Global.key_file:
188 # try_this = os.path.join(
188 # try_this = os.path.join(
189 # config.Global.cluster_dir,
189 # config.Global.cluster_dir,
190 # config.Global.security_dir,
190 # config.Global.security_dir,
191 # config.Global.key_file_name
191 # config.Global.key_file_name
192 # )
192 # )
193 # config.Global.key_file = try_this
193 # config.Global.key_file = try_this
194
194
195 def find_url_file(self):
195 def find_url_file(self):
196 """Set the key file.
196 """Set the key file.
197
197
198 Here we don't try to actually see if it exists for is valid as that
198 Here we don't try to actually see if it exists for is valid as that
199 is hadled by the connection logic.
199 is hadled by the connection logic.
200 """
200 """
201 config = self.master_config
201 config = self.master_config
202 # Find the actual controller key file
202 # Find the actual controller key file
203 if not config.Global.url_file:
203 if not config.Global.url_file:
204 try_this = os.path.join(
204 try_this = os.path.join(
205 config.Global.cluster_dir,
205 config.Global.cluster_dir,
206 config.Global.security_dir,
206 config.Global.security_dir,
207 config.Global.url_file_name
207 config.Global.url_file_name
208 )
208 )
209 config.Global.url_file = try_this
209 config.Global.url_file = try_this
210
210
211 def construct(self):
211 def construct(self):
212 # This is the working dir by now.
212 # This is the working dir by now.
213 sys.path.insert(0, '')
213 sys.path.insert(0, '')
214 config = self.master_config
214 config = self.master_config
215 # if os.path.exists(config.Global.key_file) and config.Global.secure:
215 # if os.path.exists(config.Global.key_file) and config.Global.secure:
216 # config.SessionFactory.exec_key = config.Global.key_file
216 # config.SessionFactory.exec_key = config.Global.key_file
217 if os.path.exists(config.Global.url_file):
217 if os.path.exists(config.Global.url_file):
218 with open(config.Global.url_file) as f:
218 with open(config.Global.url_file) as f:
219 d = json.loads(f.read())
219 d = json.loads(f.read())
220 for k,v in d.iteritems():
220 for k,v in d.iteritems():
221 if isinstance(v, unicode):
221 if isinstance(v, unicode):
222 d[k] = v.encode()
222 d[k] = v.encode()
223 if d['exec_key']:
223 if d['exec_key']:
224 config.SessionFactory.exec_key = d['exec_key']
224 config.SessionFactory.exec_key = d['exec_key']
225 d['url'] = disambiguate_url(d['url'], d['location'])
225 d['url'] = disambiguate_url(d['url'], d['location'])
226 config.RegistrationFactory.url=d['url']
226 config.RegistrationFactory.url=d['url']
227 config.EngineFactory.location = d['location']
227 config.EngineFactory.location = d['location']
228
228
229
229
230
230
231 config.Kernel.exec_lines = config.Global.exec_lines
231 config.Kernel.exec_lines = config.Global.exec_lines
232
232
233 self.start_mpi()
233 self.start_mpi()
234
234
235 # Create the underlying shell class and EngineService
235 # Create the underlying shell class and EngineService
236 # shell_class = import_item(self.master_config.Global.shell_class)
236 # shell_class = import_item(self.master_config.Global.shell_class)
237 try:
237 try:
238 self.engine = EngineFactory(config=config, logname=self.log.name)
238 self.engine = EngineFactory(config=config, logname=self.log.name)
239 except:
239 except:
240 self.log.error("Couldn't start the Engine", exc_info=True)
240 self.log.error("Couldn't start the Engine", exc_info=True)
241 self.exit(1)
241 self.exit(1)
242
242
243 self.start_logging()
243 self.start_logging()
244
244
245 # Create the service hierarchy
245 # Create the service hierarchy
246 # self.main_service = service.MultiService()
246 # self.main_service = service.MultiService()
247 # self.engine_service.setServiceParent(self.main_service)
247 # self.engine_service.setServiceParent(self.main_service)
248 # self.tub_service = Tub()
248 # self.tub_service = Tub()
249 # self.tub_service.setServiceParent(self.main_service)
249 # self.tub_service.setServiceParent(self.main_service)
250 # # This needs to be called before the connection is initiated
250 # # This needs to be called before the connection is initiated
251 # self.main_service.startService()
251 # self.main_service.startService()
252
252
253 # This initiates the connection to the controller and calls
253 # This initiates the connection to the controller and calls
254 # register_engine to tell the controller we are ready to do work
254 # register_engine to tell the controller we are ready to do work
255 # self.engine_connector = EngineConnector(self.tub_service)
255 # self.engine_connector = EngineConnector(self.tub_service)
256
256
257 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
257 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
258
258
259 # reactor.callWhenRunning(self.call_connect)
259 # reactor.callWhenRunning(self.call_connect)
260
260
261
261
262 def start_logging(self):
262 def start_logging(self):
263 super(IPEngineApp, self).start_logging()
263 super(IPEngineApp, self).start_logging()
264 if self.master_config.Global.log_url:
264 if self.master_config.Global.log_url:
265 context = self.engine.context
265 context = self.engine.context
266 lsock = context.socket(zmq.PUB)
266 lsock = context.socket(zmq.PUB)
267 lsock.connect(self.master_config.Global.log_url)
267 lsock.connect(self.master_config.Global.log_url)
268 handler = EnginePUBHandler(self.engine, lsock)
268 handler = EnginePUBHandler(self.engine, lsock)
269 handler.setLevel(self.log_level)
269 handler.setLevel(self.log_level)
270 self.log.addHandler(handler)
270 self.log.addHandler(handler)
271
271
272 def start_mpi(self):
272 def start_mpi(self):
273 global mpi
273 global mpi
274 mpikey = self.master_config.MPI.use
274 mpikey = self.master_config.MPI.use
275 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
275 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
276 if mpi_import_statement is not None:
276 if mpi_import_statement is not None:
277 try:
277 try:
278 self.log.info("Initializing MPI:")
278 self.log.info("Initializing MPI:")
279 self.log.info(mpi_import_statement)
279 self.log.info(mpi_import_statement)
280 exec mpi_import_statement in globals()
280 exec mpi_import_statement in globals()
281 except:
281 except:
282 mpi = None
282 mpi = None
283 else:
283 else:
284 mpi = None
284 mpi = None
285
285
286
286
287 def start_app(self):
287 def start_app(self):
288 self.engine.start()
288 self.engine.start()
289 try:
289 try:
290 self.engine.loop.start()
290 self.engine.loop.start()
291 except KeyboardInterrupt:
291 except KeyboardInterrupt:
292 self.log.critical("Engine Interrupted, shutting down...\n")
292 self.log.critical("Engine Interrupted, shutting down...\n")
293
293
294
294
295 def launch_new_instance():
295 def launch_new_instance():
296 """Create and run the IPython controller"""
296 """Create and run the IPython controller"""
297 app = IPEngineApp()
297 app = IPEngineApp()
298 app.start()
298 app.start()
299
299
300
300
301 if __name__ == '__main__':
301 if __name__ == '__main__':
302 launch_new_instance()
302 launch_new_instance()
303
303
@@ -1,132 +1,132 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 A simple IPython logger application
4 A simple IPython logger application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
8 # Copyright (C) 2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import sys
19 import sys
20
20
21 import zmq
21 import zmq
22
22
23 from .clusterdir import (
23 from IPython.parallel.apps.clusterdir import (
24 ApplicationWithClusterDir,
24 ApplicationWithClusterDir,
25 ClusterDirConfigLoader
25 ClusterDirConfigLoader
26 )
26 )
27 from .logwatcher import LogWatcher
27 from IPython.parallel.apps.logwatcher import LogWatcher
28
28
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30 # Module level variables
30 # Module level variables
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32
32
33 #: The default config file name for this application
33 #: The default config file name for this application
34 default_config_file_name = u'iplogger_config.py'
34 default_config_file_name = u'iplogger_config.py'
35
35
36 _description = """Start an IPython logger for parallel computing.\n\n
36 _description = """Start an IPython logger for parallel computing.\n\n
37
37
38 IPython controllers and engines (and your own processes) can broadcast log messages
38 IPython controllers and engines (and your own processes) can broadcast log messages
39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
40 logger can be configured using command line options or using a cluster
40 logger can be configured using command line options or using a cluster
41 directory. Cluster directories contain config, log and security files and are
41 directory. Cluster directories contain config, log and security files and are
42 usually located in your ipython directory and named as "cluster_<profile>".
42 usually located in your ipython directory and named as "cluster_<profile>".
43 See the --profile and --cluster-dir options for details.
43 See the --profile and --cluster-dir options for details.
44 """
44 """
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Command line options
47 # Command line options
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50
50
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52
52
53 def _add_arguments(self):
53 def _add_arguments(self):
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 paa = self.parser.add_argument
55 paa = self.parser.add_argument
56 # Controller config
56 # Controller config
57 paa('--url',
57 paa('--url',
58 type=str, dest='LogWatcher.url',
58 type=str, dest='LogWatcher.url',
59 help='The url the LogWatcher will listen on',
59 help='The url the LogWatcher will listen on',
60 )
60 )
61 # MPI
61 # MPI
62 paa('--topics',
62 paa('--topics',
63 type=str, dest='LogWatcher.topics', nargs='+',
63 type=str, dest='LogWatcher.topics', nargs='+',
64 help='What topics to subscribe to',
64 help='What topics to subscribe to',
65 metavar='topics')
65 metavar='topics')
66 # Global config
66 # Global config
67 paa('--log-to-file',
67 paa('--log-to-file',
68 action='store_true', dest='Global.log_to_file',
68 action='store_true', dest='Global.log_to_file',
69 help='Log to a file in the log directory (default is stdout)')
69 help='Log to a file in the log directory (default is stdout)')
70
70
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # Main application
73 # Main application
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 class IPLoggerApp(ApplicationWithClusterDir):
77 class IPLoggerApp(ApplicationWithClusterDir):
78
78
79 name = u'iploggerz'
79 name = u'iploggerz'
80 description = _description
80 description = _description
81 command_line_loader = IPLoggerAppConfigLoader
81 command_line_loader = IPLoggerAppConfigLoader
82 default_config_file_name = default_config_file_name
82 default_config_file_name = default_config_file_name
83 auto_create_cluster_dir = True
83 auto_create_cluster_dir = True
84
84
85 def create_default_config(self):
85 def create_default_config(self):
86 super(IPLoggerApp, self).create_default_config()
86 super(IPLoggerApp, self).create_default_config()
87
87
88 # The engine should not clean logs as we don't want to remove the
88 # The engine should not clean logs as we don't want to remove the
89 # active log files of other running engines.
89 # active log files of other running engines.
90 self.default_config.Global.clean_logs = False
90 self.default_config.Global.clean_logs = False
91
91
92 # If given, this is the actual location of the logger's URL file.
92 # If given, this is the actual location of the logger's URL file.
93 # If not, this is computed using the profile, app_dir and furl_file_name
93 # If not, this is computed using the profile, app_dir and furl_file_name
94 self.default_config.Global.url_file_name = u'iplogger.url'
94 self.default_config.Global.url_file_name = u'iplogger.url'
95 self.default_config.Global.url_file = u''
95 self.default_config.Global.url_file = u''
96
96
97 def post_load_command_line_config(self):
97 def post_load_command_line_config(self):
98 pass
98 pass
99
99
100 def pre_construct(self):
100 def pre_construct(self):
101 super(IPLoggerApp, self).pre_construct()
101 super(IPLoggerApp, self).pre_construct()
102
102
103 def construct(self):
103 def construct(self):
104 # This is the working dir by now.
104 # This is the working dir by now.
105 sys.path.insert(0, '')
105 sys.path.insert(0, '')
106
106
107 self.start_logging()
107 self.start_logging()
108
108
109 try:
109 try:
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
111 except:
111 except:
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 self.exit(1)
113 self.exit(1)
114
114
115
115
116 def start_app(self):
116 def start_app(self):
117 try:
117 try:
118 self.watcher.start()
118 self.watcher.start()
119 self.watcher.loop.start()
119 self.watcher.loop.start()
120 except KeyboardInterrupt:
120 except KeyboardInterrupt:
121 self.log.critical("Logging Interrupted, shutting down...\n")
121 self.log.critical("Logging Interrupted, shutting down...\n")
122
122
123
123
124 def launch_new_instance():
124 def launch_new_instance():
125 """Create and run the IPython LogWatcher"""
125 """Create and run the IPython LogWatcher"""
126 app = IPLoggerApp()
126 app = IPLoggerApp()
127 app.start()
127 app.start()
128
128
129
129
130 if __name__ == '__main__':
130 if __name__ == '__main__':
131 launch_new_instance()
131 launch_new_instance()
132
132
General Comments 0
You need to be logged in to leave comments. Login now