##// END OF EJS Templates
add cluster_id to parallel apps...
MinRK -
Show More
@@ -1,244 +1,257 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 The Base Application class for IPython.parallel apps
3 The Base Application class for IPython.parallel apps
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * Min RK
8 * Min RK
9
9
10 """
10 """
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2011 The IPython Development Team
13 # Copyright (C) 2008-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 from __future__ import with_statement
23 from __future__ import with_statement
24
24
25 import os
25 import os
26 import logging
26 import logging
27 import re
27 import re
28 import sys
28 import sys
29
29
30 from subprocess import Popen, PIPE
30 from subprocess import Popen, PIPE
31
31
32 from IPython.core import release
32 from IPython.core import release
33 from IPython.core.crashhandler import CrashHandler
33 from IPython.core.crashhandler import CrashHandler
34 from IPython.core.application import (
34 from IPython.core.application import (
35 BaseIPythonApplication,
35 BaseIPythonApplication,
36 base_aliases as base_ip_aliases,
36 base_aliases as base_ip_aliases,
37 base_flags as base_ip_flags
37 base_flags as base_ip_flags
38 )
38 )
39 from IPython.utils.path import expand_path
39 from IPython.utils.path import expand_path
40
40
41 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
41 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Module errors
44 # Module errors
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 class PIDFileError(Exception):
47 class PIDFileError(Exception):
48 pass
48 pass
49
49
50
50
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52 # Crash handler for this application
52 # Crash handler for this application
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54
54
55 class ParallelCrashHandler(CrashHandler):
55 class ParallelCrashHandler(CrashHandler):
56 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
56 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
57
57
58 def __init__(self, app):
58 def __init__(self, app):
59 contact_name = release.authors['Min'][0]
59 contact_name = release.authors['Min'][0]
60 contact_email = release.authors['Min'][1]
60 contact_email = release.authors['Min'][1]
61 bug_tracker = 'http://github.com/ipython/ipython/issues'
61 bug_tracker = 'http://github.com/ipython/ipython/issues'
62 super(ParallelCrashHandler,self).__init__(
62 super(ParallelCrashHandler,self).__init__(
63 app, contact_name, contact_email, bug_tracker
63 app, contact_name, contact_email, bug_tracker
64 )
64 )
65
65
66
66
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68 # Main application
68 # Main application
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70 base_aliases = {}
70 base_aliases = {}
71 base_aliases.update(base_ip_aliases)
71 base_aliases.update(base_ip_aliases)
72 base_aliases.update({
72 base_aliases.update({
73 'profile-dir' : 'ProfileDir.location',
73 'profile-dir' : 'ProfileDir.location',
74 'work-dir' : 'BaseParallelApplication.work_dir',
74 'work-dir' : 'BaseParallelApplication.work_dir',
75 'log-to-file' : 'BaseParallelApplication.log_to_file',
75 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 'clean-logs' : 'BaseParallelApplication.clean_logs',
76 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 'log-url' : 'BaseParallelApplication.log_url',
77 'log-url' : 'BaseParallelApplication.log_url',
78 'cluster-id' : 'BaseParallelApplication.cluster_id',
78 })
79 })
79
80
80 base_flags = {
81 base_flags = {
81 'log-to-file' : (
82 'log-to-file' : (
82 {'BaseParallelApplication' : {'log_to_file' : True}},
83 {'BaseParallelApplication' : {'log_to_file' : True}},
83 "send log output to a file"
84 "send log output to a file"
84 )
85 )
85 }
86 }
86 base_flags.update(base_ip_flags)
87 base_flags.update(base_ip_flags)
87
88
88 class BaseParallelApplication(BaseIPythonApplication):
89 class BaseParallelApplication(BaseIPythonApplication):
89 """The base Application for IPython.parallel apps
90 """The base Application for IPython.parallel apps
90
91
91 Principle extensions to BaseIPyythonApplication:
92 Principle extensions to BaseIPyythonApplication:
92
93
93 * work_dir
94 * work_dir
94 * remote logging via pyzmq
95 * remote logging via pyzmq
95 * IOLoop instance
96 * IOLoop instance
96 """
97 """
97
98
98 crash_handler_class = ParallelCrashHandler
99 crash_handler_class = ParallelCrashHandler
99
100
100 def _log_level_default(self):
101 def _log_level_default(self):
101 # temporarily override default_log_level to INFO
102 # temporarily override default_log_level to INFO
102 return logging.INFO
103 return logging.INFO
103
104
104 work_dir = Unicode(os.getcwdu(), config=True,
105 work_dir = Unicode(os.getcwdu(), config=True,
105 help='Set the working dir for the process.'
106 help='Set the working dir for the process.'
106 )
107 )
107 def _work_dir_changed(self, name, old, new):
108 def _work_dir_changed(self, name, old, new):
108 self.work_dir = unicode(expand_path(new))
109 self.work_dir = unicode(expand_path(new))
109
110
110 log_to_file = Bool(config=True,
111 log_to_file = Bool(config=True,
111 help="whether to log to a file")
112 help="whether to log to a file")
112
113
113 clean_logs = Bool(False, config=True,
114 clean_logs = Bool(False, config=True,
114 help="whether to cleanup old logfiles before starting")
115 help="whether to cleanup old logfiles before starting")
115
116
116 log_url = Unicode('', config=True,
117 log_url = Unicode('', config=True,
117 help="The ZMQ URL of the iplogger to aggregate logging.")
118 help="The ZMQ URL of the iplogger to aggregate logging.")
118
119
120 cluster_id = Unicode('', config=True,
121 help="""String id to add to runtime files, to prevent name collisions when
122 using multiple clusters with a single profile.
123
124 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
125 """
126 )
127 def _cluster_id_changed(self, name, old, new):
128 self.name = self.__class__.name
129 if new:
130 self.name += '-%s'%new
131
119 def _config_files_default(self):
132 def _config_files_default(self):
120 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
133 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
121
134
122 loop = Instance('zmq.eventloop.ioloop.IOLoop')
135 loop = Instance('zmq.eventloop.ioloop.IOLoop')
123 def _loop_default(self):
136 def _loop_default(self):
124 from zmq.eventloop.ioloop import IOLoop
137 from zmq.eventloop.ioloop import IOLoop
125 return IOLoop.instance()
138 return IOLoop.instance()
126
139
127 aliases = Dict(base_aliases)
140 aliases = Dict(base_aliases)
128 flags = Dict(base_flags)
141 flags = Dict(base_flags)
129
142
130 def initialize(self, argv=None):
143 def initialize(self, argv=None):
131 """initialize the app"""
144 """initialize the app"""
132 super(BaseParallelApplication, self).initialize(argv)
145 super(BaseParallelApplication, self).initialize(argv)
133 self.to_work_dir()
146 self.to_work_dir()
134 self.reinit_logging()
147 self.reinit_logging()
135
148
136 def to_work_dir(self):
149 def to_work_dir(self):
137 wd = self.work_dir
150 wd = self.work_dir
138 if unicode(wd) != os.getcwdu():
151 if unicode(wd) != os.getcwdu():
139 os.chdir(wd)
152 os.chdir(wd)
140 self.log.info("Changing to working dir: %s" % wd)
153 self.log.info("Changing to working dir: %s" % wd)
141 # This is the working dir by now.
154 # This is the working dir by now.
142 sys.path.insert(0, '')
155 sys.path.insert(0, '')
143
156
144 def reinit_logging(self):
157 def reinit_logging(self):
145 # Remove old log files
158 # Remove old log files
146 log_dir = self.profile_dir.log_dir
159 log_dir = self.profile_dir.log_dir
147 if self.clean_logs:
160 if self.clean_logs:
148 for f in os.listdir(log_dir):
161 for f in os.listdir(log_dir):
149 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
162 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
150 os.remove(os.path.join(log_dir, f))
163 os.remove(os.path.join(log_dir, f))
151 if self.log_to_file:
164 if self.log_to_file:
152 # Start logging to the new log file
165 # Start logging to the new log file
153 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
166 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
154 logfile = os.path.join(log_dir, log_filename)
167 logfile = os.path.join(log_dir, log_filename)
155 open_log_file = open(logfile, 'w')
168 open_log_file = open(logfile, 'w')
156 else:
169 else:
157 open_log_file = None
170 open_log_file = None
158 if open_log_file is not None:
171 if open_log_file is not None:
159 self.log.removeHandler(self._log_handler)
172 self.log.removeHandler(self._log_handler)
160 self._log_handler = logging.StreamHandler(open_log_file)
173 self._log_handler = logging.StreamHandler(open_log_file)
161 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
174 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
162 self._log_handler.setFormatter(self._log_formatter)
175 self._log_handler.setFormatter(self._log_formatter)
163 self.log.addHandler(self._log_handler)
176 self.log.addHandler(self._log_handler)
164 # do not propagate log messages to root logger
177 # do not propagate log messages to root logger
165 # ipcluster app will sometimes print duplicate messages during shutdown
178 # ipcluster app will sometimes print duplicate messages during shutdown
166 # if this is 1 (default):
179 # if this is 1 (default):
167 self.log.propagate = False
180 self.log.propagate = False
168
181
169 def write_pid_file(self, overwrite=False):
182 def write_pid_file(self, overwrite=False):
170 """Create a .pid file in the pid_dir with my pid.
183 """Create a .pid file in the pid_dir with my pid.
171
184
172 This must be called after pre_construct, which sets `self.pid_dir`.
185 This must be called after pre_construct, which sets `self.pid_dir`.
173 This raises :exc:`PIDFileError` if the pid file exists already.
186 This raises :exc:`PIDFileError` if the pid file exists already.
174 """
187 """
175 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
188 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
176 if os.path.isfile(pid_file):
189 if os.path.isfile(pid_file):
177 pid = self.get_pid_from_file()
190 pid = self.get_pid_from_file()
178 if not overwrite:
191 if not overwrite:
179 raise PIDFileError(
192 raise PIDFileError(
180 'The pid file [%s] already exists. \nThis could mean that this '
193 'The pid file [%s] already exists. \nThis could mean that this '
181 'server is already running with [pid=%s].' % (pid_file, pid)
194 'server is already running with [pid=%s].' % (pid_file, pid)
182 )
195 )
183 with open(pid_file, 'w') as f:
196 with open(pid_file, 'w') as f:
184 self.log.info("Creating pid file: %s" % pid_file)
197 self.log.info("Creating pid file: %s" % pid_file)
185 f.write(repr(os.getpid())+'\n')
198 f.write(repr(os.getpid())+'\n')
186
199
187 def remove_pid_file(self):
200 def remove_pid_file(self):
188 """Remove the pid file.
201 """Remove the pid file.
189
202
190 This should be called at shutdown by registering a callback with
203 This should be called at shutdown by registering a callback with
191 :func:`reactor.addSystemEventTrigger`. This needs to return
204 :func:`reactor.addSystemEventTrigger`. This needs to return
192 ``None``.
205 ``None``.
193 """
206 """
194 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
207 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
195 if os.path.isfile(pid_file):
208 if os.path.isfile(pid_file):
196 try:
209 try:
197 self.log.info("Removing pid file: %s" % pid_file)
210 self.log.info("Removing pid file: %s" % pid_file)
198 os.remove(pid_file)
211 os.remove(pid_file)
199 except:
212 except:
200 self.log.warn("Error removing the pid file: %s" % pid_file)
213 self.log.warn("Error removing the pid file: %s" % pid_file)
201
214
202 def get_pid_from_file(self):
215 def get_pid_from_file(self):
203 """Get the pid from the pid file.
216 """Get the pid from the pid file.
204
217
205 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
218 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
206 """
219 """
207 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
220 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
208 if os.path.isfile(pid_file):
221 if os.path.isfile(pid_file):
209 with open(pid_file, 'r') as f:
222 with open(pid_file, 'r') as f:
210 s = f.read().strip()
223 s = f.read().strip()
211 try:
224 try:
212 pid = int(s)
225 pid = int(s)
213 except:
226 except:
214 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
227 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
215 return pid
228 return pid
216 else:
229 else:
217 raise PIDFileError('pid file not found: %s' % pid_file)
230 raise PIDFileError('pid file not found: %s' % pid_file)
218
231
219 def check_pid(self, pid):
232 def check_pid(self, pid):
220 if os.name == 'nt':
233 if os.name == 'nt':
221 try:
234 try:
222 import ctypes
235 import ctypes
223 # returns 0 if no such process (of ours) exists
236 # returns 0 if no such process (of ours) exists
224 # positive int otherwise
237 # positive int otherwise
225 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
238 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
226 except Exception:
239 except Exception:
227 self.log.warn(
240 self.log.warn(
228 "Could not determine whether pid %i is running via `OpenProcess`. "
241 "Could not determine whether pid %i is running via `OpenProcess`. "
229 " Making the likely assumption that it is."%pid
242 " Making the likely assumption that it is."%pid
230 )
243 )
231 return True
244 return True
232 return bool(p)
245 return bool(p)
233 else:
246 else:
234 try:
247 try:
235 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
248 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
236 output,_ = p.communicate()
249 output,_ = p.communicate()
237 except OSError:
250 except OSError:
238 self.log.warn(
251 self.log.warn(
239 "Could not determine whether pid %i is running via `ps x`. "
252 "Could not determine whether pid %i is running via `ps x`. "
240 " Making the likely assumption that it is."%pid
253 " Making the likely assumption that it is."%pid
241 )
254 )
242 return True
255 return True
243 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
256 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
244 return pid in pids
257 return pid in pids
@@ -1,441 +1,452 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import os
26 import os
27 import socket
27 import socket
28 import stat
28 import stat
29 import sys
29 import sys
30 import uuid
30 import uuid
31
31
32 from multiprocessing import Process
32 from multiprocessing import Process
33
33
34 import zmq
34 import zmq
35 from zmq.devices import ProcessMonitoredQueue
35 from zmq.devices import ProcessMonitoredQueue
36 from zmq.log.handlers import PUBHandler
36 from zmq.log.handlers import PUBHandler
37 from zmq.utils import jsonapi as json
37 from zmq.utils import jsonapi as json
38
38
39 from IPython.config.application import boolean_flag
39 from IPython.config.application import boolean_flag
40 from IPython.core.profiledir import ProfileDir
40 from IPython.core.profiledir import ProfileDir
41
41
42 from IPython.parallel.apps.baseapp import (
42 from IPython.parallel.apps.baseapp import (
43 BaseParallelApplication,
43 BaseParallelApplication,
44 base_aliases,
44 base_aliases,
45 base_flags,
45 base_flags,
46 )
46 )
47 from IPython.utils.importstring import import_item
47 from IPython.utils.importstring import import_item
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
49
49
50 # from IPython.parallel.controller.controller import ControllerFactory
50 # from IPython.parallel.controller.controller import ControllerFactory
51 from IPython.zmq.session import Session
51 from IPython.zmq.session import Session
52 from IPython.parallel.controller.heartmonitor import HeartMonitor
52 from IPython.parallel.controller.heartmonitor import HeartMonitor
53 from IPython.parallel.controller.hub import HubFactory
53 from IPython.parallel.controller.hub import HubFactory
54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 from IPython.parallel.controller.sqlitedb import SQLiteDB
55 from IPython.parallel.controller.sqlitedb import SQLiteDB
56
56
57 from IPython.parallel.util import signal_children, split_url, asbytes
57 from IPython.parallel.util import signal_children, split_url, asbytes
58
58
59 # conditional import of MongoDB backend class
59 # conditional import of MongoDB backend class
60
60
61 try:
61 try:
62 from IPython.parallel.controller.mongodb import MongoDB
62 from IPython.parallel.controller.mongodb import MongoDB
63 except ImportError:
63 except ImportError:
64 maybe_mongo = []
64 maybe_mongo = []
65 else:
65 else:
66 maybe_mongo = [MongoDB]
66 maybe_mongo = [MongoDB]
67
67
68
68
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70 # Module level variables
70 # Module level variables
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72
72
73
73
74 #: The default config file name for this application
74 #: The default config file name for this application
75 default_config_file_name = u'ipcontroller_config.py'
75 default_config_file_name = u'ipcontroller_config.py'
76
76
77
77
78 _description = """Start the IPython controller for parallel computing.
78 _description = """Start the IPython controller for parallel computing.
79
79
80 The IPython controller provides a gateway between the IPython engines and
80 The IPython controller provides a gateway between the IPython engines and
81 clients. The controller needs to be started before the engines and can be
81 clients. The controller needs to be started before the engines and can be
82 configured using command line options or using a cluster directory. Cluster
82 configured using command line options or using a cluster directory. Cluster
83 directories contain config, log and security files and are usually located in
83 directories contain config, log and security files and are usually located in
84 your ipython directory and named as "profile_name". See the `profile`
84 your ipython directory and named as "profile_name". See the `profile`
85 and `profile-dir` options for details.
85 and `profile-dir` options for details.
86 """
86 """
87
87
88 _examples = """
88 _examples = """
89 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
89 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
90 ipcontroller --scheme=pure # use the pure zeromq scheduler
90 ipcontroller --scheme=pure # use the pure zeromq scheduler
91 """
91 """
92
92
93
93
94 #-----------------------------------------------------------------------------
94 #-----------------------------------------------------------------------------
95 # The main application
95 # The main application
96 #-----------------------------------------------------------------------------
96 #-----------------------------------------------------------------------------
97 flags = {}
97 flags = {}
98 flags.update(base_flags)
98 flags.update(base_flags)
99 flags.update({
99 flags.update({
100 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
100 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
101 'Use threads instead of processes for the schedulers'),
101 'Use threads instead of processes for the schedulers'),
102 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
102 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
103 'use the SQLiteDB backend'),
103 'use the SQLiteDB backend'),
104 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
104 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
105 'use the MongoDB backend'),
105 'use the MongoDB backend'),
106 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
106 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
107 'use the in-memory DictDB backend'),
107 'use the in-memory DictDB backend'),
108 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
108 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
109 'reuse existing json connection files')
109 'reuse existing json connection files')
110 })
110 })
111
111
112 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
112 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
113 "Use HMAC digests for authentication of messages.",
113 "Use HMAC digests for authentication of messages.",
114 "Don't authenticate messages."
114 "Don't authenticate messages."
115 ))
115 ))
116 aliases = dict(
116 aliases = dict(
117 secure = 'IPControllerApp.secure',
117 secure = 'IPControllerApp.secure',
118 ssh = 'IPControllerApp.ssh_server',
118 ssh = 'IPControllerApp.ssh_server',
119 enginessh = 'IPControllerApp.engine_ssh_server',
119 enginessh = 'IPControllerApp.engine_ssh_server',
120 location = 'IPControllerApp.location',
120 location = 'IPControllerApp.location',
121
121
122 ident = 'Session.session',
122 ident = 'Session.session',
123 user = 'Session.username',
123 user = 'Session.username',
124 keyfile = 'Session.keyfile',
124 keyfile = 'Session.keyfile',
125
125
126 url = 'HubFactory.url',
126 url = 'HubFactory.url',
127 ip = 'HubFactory.ip',
127 ip = 'HubFactory.ip',
128 transport = 'HubFactory.transport',
128 transport = 'HubFactory.transport',
129 port = 'HubFactory.regport',
129 port = 'HubFactory.regport',
130
130
131 ping = 'HeartMonitor.period',
131 ping = 'HeartMonitor.period',
132
132
133 scheme = 'TaskScheduler.scheme_name',
133 scheme = 'TaskScheduler.scheme_name',
134 hwm = 'TaskScheduler.hwm',
134 hwm = 'TaskScheduler.hwm',
135 )
135 )
136 aliases.update(base_aliases)
136 aliases.update(base_aliases)
137
137
138
138
139 class IPControllerApp(BaseParallelApplication):
139 class IPControllerApp(BaseParallelApplication):
140
140
141 name = u'ipcontroller'
141 name = u'ipcontroller'
142 description = _description
142 description = _description
143 examples = _examples
143 examples = _examples
144 config_file_name = Unicode(default_config_file_name)
144 config_file_name = Unicode(default_config_file_name)
145 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
145 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
146
146
147 # change default to True
147 # change default to True
148 auto_create = Bool(True, config=True,
148 auto_create = Bool(True, config=True,
149 help="""Whether to create profile dir if it doesn't exist.""")
149 help="""Whether to create profile dir if it doesn't exist.""")
150
150
151 reuse_files = Bool(False, config=True,
151 reuse_files = Bool(False, config=True,
152 help='Whether to reuse existing json connection files.'
152 help='Whether to reuse existing json connection files.'
153 )
153 )
154 secure = Bool(True, config=True,
154 secure = Bool(True, config=True,
155 help='Whether to use HMAC digests for extra message authentication.'
155 help='Whether to use HMAC digests for extra message authentication.'
156 )
156 )
157 ssh_server = Unicode(u'', config=True,
157 ssh_server = Unicode(u'', config=True,
158 help="""ssh url for clients to use when connecting to the Controller
158 help="""ssh url for clients to use when connecting to the Controller
159 processes. It should be of the form: [user@]server[:port]. The
159 processes. It should be of the form: [user@]server[:port]. The
160 Controller's listening addresses must be accessible from the ssh server""",
160 Controller's listening addresses must be accessible from the ssh server""",
161 )
161 )
162 engine_ssh_server = Unicode(u'', config=True,
162 engine_ssh_server = Unicode(u'', config=True,
163 help="""ssh url for engines to use when connecting to the Controller
163 help="""ssh url for engines to use when connecting to the Controller
164 processes. It should be of the form: [user@]server[:port]. The
164 processes. It should be of the form: [user@]server[:port]. The
165 Controller's listening addresses must be accessible from the ssh server""",
165 Controller's listening addresses must be accessible from the ssh server""",
166 )
166 )
167 location = Unicode(u'', config=True,
167 location = Unicode(u'', config=True,
168 help="""The external IP or domain name of the Controller, used for disambiguating
168 help="""The external IP or domain name of the Controller, used for disambiguating
169 engine and client connections.""",
169 engine and client connections.""",
170 )
170 )
171 import_statements = List([], config=True,
171 import_statements = List([], config=True,
172 help="import statements to be run at startup. Necessary in some environments"
172 help="import statements to be run at startup. Necessary in some environments"
173 )
173 )
174
174
175 use_threads = Bool(False, config=True,
175 use_threads = Bool(False, config=True,
176 help='Use threads instead of processes for the schedulers',
176 help='Use threads instead of processes for the schedulers',
177 )
177 )
178
179 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
180 help="JSON filename where engine connection info will be stored.")
181 client_json_file = Unicode('ipcontroller-client.json', config=True,
182 help="JSON filename where client connection info will be stored.")
183
184 def _cluster_id_changed(self, name, old, new):
185 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
186 self.engine_json_file = "%s-engine.json"%self.name
187 self.client_json_file = "%s-client.json"%self.name
188
178
189
179 # internal
190 # internal
180 children = List()
191 children = List()
181 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
192 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
182
193
183 def _use_threads_changed(self, name, old, new):
194 def _use_threads_changed(self, name, old, new):
184 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
195 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
185
196
186 aliases = Dict(aliases)
197 aliases = Dict(aliases)
187 flags = Dict(flags)
198 flags = Dict(flags)
188
199
189
200
190 def save_connection_dict(self, fname, cdict):
201 def save_connection_dict(self, fname, cdict):
191 """save a connection dict to json file."""
202 """save a connection dict to json file."""
192 c = self.config
203 c = self.config
193 url = cdict['url']
204 url = cdict['url']
194 location = cdict['location']
205 location = cdict['location']
195 if not location:
206 if not location:
196 try:
207 try:
197 proto,ip,port = split_url(url)
208 proto,ip,port = split_url(url)
198 except AssertionError:
209 except AssertionError:
199 pass
210 pass
200 else:
211 else:
201 try:
212 try:
202 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
213 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
203 except (socket.gaierror, IndexError):
214 except (socket.gaierror, IndexError):
204 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
215 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
205 " You may need to specify '--location=<external_ip_address>' to help"
216 " You may need to specify '--location=<external_ip_address>' to help"
206 " IPython decide when to connect via loopback.")
217 " IPython decide when to connect via loopback.")
207 location = '127.0.0.1'
218 location = '127.0.0.1'
208 cdict['location'] = location
219 cdict['location'] = location
209 fname = os.path.join(self.profile_dir.security_dir, fname)
220 fname = os.path.join(self.profile_dir.security_dir, fname)
210 with open(fname, 'wb') as f:
221 with open(fname, 'wb') as f:
211 f.write(json.dumps(cdict, indent=2))
222 f.write(json.dumps(cdict, indent=2))
212 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
223 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
213
224
214 def load_config_from_json(self):
225 def load_config_from_json(self):
215 """load config from existing json connector files."""
226 """load config from existing json connector files."""
216 c = self.config
227 c = self.config
217 # load from engine config
228 # load from engine config
218 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
229 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
219 cfg = json.loads(f.read())
230 cfg = json.loads(f.read())
220 key = c.Session.key = asbytes(cfg['exec_key'])
231 key = c.Session.key = asbytes(cfg['exec_key'])
221 xport,addr = cfg['url'].split('://')
232 xport,addr = cfg['url'].split('://')
222 c.HubFactory.engine_transport = xport
233 c.HubFactory.engine_transport = xport
223 ip,ports = addr.split(':')
234 ip,ports = addr.split(':')
224 c.HubFactory.engine_ip = ip
235 c.HubFactory.engine_ip = ip
225 c.HubFactory.regport = int(ports)
236 c.HubFactory.regport = int(ports)
226 self.location = cfg['location']
237 self.location = cfg['location']
227 if not self.engine_ssh_server:
238 if not self.engine_ssh_server:
228 self.engine_ssh_server = cfg['ssh']
239 self.engine_ssh_server = cfg['ssh']
229 # load client config
240 # load client config
230 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
241 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
231 cfg = json.loads(f.read())
242 cfg = json.loads(f.read())
232 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
243 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
233 xport,addr = cfg['url'].split('://')
244 xport,addr = cfg['url'].split('://')
234 c.HubFactory.client_transport = xport
245 c.HubFactory.client_transport = xport
235 ip,ports = addr.split(':')
246 ip,ports = addr.split(':')
236 c.HubFactory.client_ip = ip
247 c.HubFactory.client_ip = ip
237 if not self.ssh_server:
248 if not self.ssh_server:
238 self.ssh_server = cfg['ssh']
249 self.ssh_server = cfg['ssh']
239 assert int(ports) == c.HubFactory.regport, "regport mismatch"
250 assert int(ports) == c.HubFactory.regport, "regport mismatch"
240
251
241 def init_hub(self):
252 def init_hub(self):
242 c = self.config
253 c = self.config
243
254
244 self.do_import_statements()
255 self.do_import_statements()
245 reusing = self.reuse_files
256 reusing = self.reuse_files
246 if reusing:
257 if reusing:
247 try:
258 try:
248 self.load_config_from_json()
259 self.load_config_from_json()
249 except (AssertionError,IOError):
260 except (AssertionError,IOError):
250 reusing=False
261 reusing=False
251 # check again, because reusing may have failed:
262 # check again, because reusing may have failed:
252 if reusing:
263 if reusing:
253 pass
264 pass
254 elif self.secure:
265 elif self.secure:
255 key = str(uuid.uuid4())
266 key = str(uuid.uuid4())
256 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
267 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
257 # with open(keyfile, 'w') as f:
268 # with open(keyfile, 'w') as f:
258 # f.write(key)
269 # f.write(key)
259 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
270 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
260 c.Session.key = asbytes(key)
271 c.Session.key = asbytes(key)
261 else:
272 else:
262 key = c.Session.key = b''
273 key = c.Session.key = b''
263
274
264 try:
275 try:
265 self.factory = HubFactory(config=c, log=self.log)
276 self.factory = HubFactory(config=c, log=self.log)
266 # self.start_logging()
277 # self.start_logging()
267 self.factory.init_hub()
278 self.factory.init_hub()
268 except:
279 except:
269 self.log.error("Couldn't construct the Controller", exc_info=True)
280 self.log.error("Couldn't construct the Controller", exc_info=True)
270 self.exit(1)
281 self.exit(1)
271
282
272 if not reusing:
283 if not reusing:
273 # save to new json config files
284 # save to new json config files
274 f = self.factory
285 f = self.factory
275 cdict = {'exec_key' : key,
286 cdict = {'exec_key' : key,
276 'ssh' : self.ssh_server,
287 'ssh' : self.ssh_server,
277 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
288 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
278 'location' : self.location
289 'location' : self.location
279 }
290 }
280 self.save_connection_dict('ipcontroller-client.json', cdict)
291 self.save_connection_dict(self.client_json_file, cdict)
281 edict = cdict
292 edict = cdict
282 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
293 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
283 edict['ssh'] = self.engine_ssh_server
294 edict['ssh'] = self.engine_ssh_server
284 self.save_connection_dict('ipcontroller-engine.json', edict)
295 self.save_connection_dict(self.engine_json_file, edict)
285
296
286 #
297 #
287 def init_schedulers(self):
298 def init_schedulers(self):
288 children = self.children
299 children = self.children
289 mq = import_item(str(self.mq_class))
300 mq = import_item(str(self.mq_class))
290
301
291 hub = self.factory
302 hub = self.factory
292 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
303 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
293 # IOPub relay (in a Process)
304 # IOPub relay (in a Process)
294 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
305 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
295 q.bind_in(hub.client_info['iopub'])
306 q.bind_in(hub.client_info['iopub'])
296 q.bind_out(hub.engine_info['iopub'])
307 q.bind_out(hub.engine_info['iopub'])
297 q.setsockopt_out(zmq.SUBSCRIBE, b'')
308 q.setsockopt_out(zmq.SUBSCRIBE, b'')
298 q.connect_mon(hub.monitor_url)
309 q.connect_mon(hub.monitor_url)
299 q.daemon=True
310 q.daemon=True
300 children.append(q)
311 children.append(q)
301
312
302 # Multiplexer Queue (in a Process)
313 # Multiplexer Queue (in a Process)
303 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
314 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
304 q.bind_in(hub.client_info['mux'])
315 q.bind_in(hub.client_info['mux'])
305 q.setsockopt_in(zmq.IDENTITY, b'mux')
316 q.setsockopt_in(zmq.IDENTITY, b'mux')
306 q.bind_out(hub.engine_info['mux'])
317 q.bind_out(hub.engine_info['mux'])
307 q.connect_mon(hub.monitor_url)
318 q.connect_mon(hub.monitor_url)
308 q.daemon=True
319 q.daemon=True
309 children.append(q)
320 children.append(q)
310
321
311 # Control Queue (in a Process)
322 # Control Queue (in a Process)
312 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
323 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
313 q.bind_in(hub.client_info['control'])
324 q.bind_in(hub.client_info['control'])
314 q.setsockopt_in(zmq.IDENTITY, b'control')
325 q.setsockopt_in(zmq.IDENTITY, b'control')
315 q.bind_out(hub.engine_info['control'])
326 q.bind_out(hub.engine_info['control'])
316 q.connect_mon(hub.monitor_url)
327 q.connect_mon(hub.monitor_url)
317 q.daemon=True
328 q.daemon=True
318 children.append(q)
329 children.append(q)
319 try:
330 try:
320 scheme = self.config.TaskScheduler.scheme_name
331 scheme = self.config.TaskScheduler.scheme_name
321 except AttributeError:
332 except AttributeError:
322 scheme = TaskScheduler.scheme_name.get_default_value()
333 scheme = TaskScheduler.scheme_name.get_default_value()
323 # Task Queue (in a Process)
334 # Task Queue (in a Process)
324 if scheme == 'pure':
335 if scheme == 'pure':
325 self.log.warn("task::using pure XREQ Task scheduler")
336 self.log.warn("task::using pure XREQ Task scheduler")
326 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
337 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
327 # q.setsockopt_out(zmq.HWM, hub.hwm)
338 # q.setsockopt_out(zmq.HWM, hub.hwm)
328 q.bind_in(hub.client_info['task'][1])
339 q.bind_in(hub.client_info['task'][1])
329 q.setsockopt_in(zmq.IDENTITY, b'task')
340 q.setsockopt_in(zmq.IDENTITY, b'task')
330 q.bind_out(hub.engine_info['task'])
341 q.bind_out(hub.engine_info['task'])
331 q.connect_mon(hub.monitor_url)
342 q.connect_mon(hub.monitor_url)
332 q.daemon=True
343 q.daemon=True
333 children.append(q)
344 children.append(q)
334 elif scheme == 'none':
345 elif scheme == 'none':
335 self.log.warn("task::using no Task scheduler")
346 self.log.warn("task::using no Task scheduler")
336
347
337 else:
348 else:
338 self.log.info("task::using Python %s Task scheduler"%scheme)
349 self.log.info("task::using Python %s Task scheduler"%scheme)
339 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
350 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
340 hub.monitor_url, hub.client_info['notification'])
351 hub.monitor_url, hub.client_info['notification'])
341 kwargs = dict(logname='scheduler', loglevel=self.log_level,
352 kwargs = dict(logname='scheduler', loglevel=self.log_level,
342 log_url = self.log_url, config=dict(self.config))
353 log_url = self.log_url, config=dict(self.config))
343 if 'Process' in self.mq_class:
354 if 'Process' in self.mq_class:
344 # run the Python scheduler in a Process
355 # run the Python scheduler in a Process
345 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
356 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
346 q.daemon=True
357 q.daemon=True
347 children.append(q)
358 children.append(q)
348 else:
359 else:
349 # single-threaded Controller
360 # single-threaded Controller
350 kwargs['in_thread'] = True
361 kwargs['in_thread'] = True
351 launch_scheduler(*sargs, **kwargs)
362 launch_scheduler(*sargs, **kwargs)
352
363
353
364
354 def save_urls(self):
365 def save_urls(self):
355 """save the registration urls to files."""
366 """save the registration urls to files."""
356 c = self.config
367 c = self.config
357
368
358 sec_dir = self.profile_dir.security_dir
369 sec_dir = self.profile_dir.security_dir
359 cf = self.factory
370 cf = self.factory
360
371
361 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
372 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
362 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
373 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
363
374
364 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
375 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
365 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
376 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
366
377
367
378
368 def do_import_statements(self):
379 def do_import_statements(self):
369 statements = self.import_statements
380 statements = self.import_statements
370 for s in statements:
381 for s in statements:
371 try:
382 try:
372 self.log.msg("Executing statement: '%s'" % s)
383 self.log.msg("Executing statement: '%s'" % s)
373 exec s in globals(), locals()
384 exec s in globals(), locals()
374 except:
385 except:
375 self.log.msg("Error running statement: %s" % s)
386 self.log.msg("Error running statement: %s" % s)
376
387
377 def forward_logging(self):
388 def forward_logging(self):
378 if self.log_url:
389 if self.log_url:
379 self.log.info("Forwarding logging to %s"%self.log_url)
390 self.log.info("Forwarding logging to %s"%self.log_url)
380 context = zmq.Context.instance()
391 context = zmq.Context.instance()
381 lsock = context.socket(zmq.PUB)
392 lsock = context.socket(zmq.PUB)
382 lsock.connect(self.log_url)
393 lsock.connect(self.log_url)
383 handler = PUBHandler(lsock)
394 handler = PUBHandler(lsock)
384 self.log.removeHandler(self._log_handler)
395 self.log.removeHandler(self._log_handler)
385 handler.root_topic = 'controller'
396 handler.root_topic = 'controller'
386 handler.setLevel(self.log_level)
397 handler.setLevel(self.log_level)
387 self.log.addHandler(handler)
398 self.log.addHandler(handler)
388 self._log_handler = handler
399 self._log_handler = handler
389 # #
400 # #
390
401
391 def initialize(self, argv=None):
402 def initialize(self, argv=None):
392 super(IPControllerApp, self).initialize(argv)
403 super(IPControllerApp, self).initialize(argv)
393 self.forward_logging()
404 self.forward_logging()
394 self.init_hub()
405 self.init_hub()
395 self.init_schedulers()
406 self.init_schedulers()
396
407
397 def start(self):
408 def start(self):
398 # Start the subprocesses:
409 # Start the subprocesses:
399 self.factory.start()
410 self.factory.start()
400 child_procs = []
411 child_procs = []
401 for child in self.children:
412 for child in self.children:
402 child.start()
413 child.start()
403 if isinstance(child, ProcessMonitoredQueue):
414 if isinstance(child, ProcessMonitoredQueue):
404 child_procs.append(child.launcher)
415 child_procs.append(child.launcher)
405 elif isinstance(child, Process):
416 elif isinstance(child, Process):
406 child_procs.append(child)
417 child_procs.append(child)
407 if child_procs:
418 if child_procs:
408 signal_children(child_procs)
419 signal_children(child_procs)
409
420
410 self.write_pid_file(overwrite=True)
421 self.write_pid_file(overwrite=True)
411
422
412 try:
423 try:
413 self.factory.loop.start()
424 self.factory.loop.start()
414 except KeyboardInterrupt:
425 except KeyboardInterrupt:
415 self.log.critical("Interrupted, Exiting...\n")
426 self.log.critical("Interrupted, Exiting...\n")
416
427
417
428
418
429
419 def launch_new_instance():
430 def launch_new_instance():
420 """Create and run the IPython controller"""
431 """Create and run the IPython controller"""
421 if sys.platform == 'win32':
432 if sys.platform == 'win32':
422 # make sure we don't get called from a multiprocessing subprocess
433 # make sure we don't get called from a multiprocessing subprocess
423 # this can result in infinite Controllers being started on Windows
434 # this can result in infinite Controllers being started on Windows
424 # which doesn't have a proper fork, so multiprocessing is wonky
435 # which doesn't have a proper fork, so multiprocessing is wonky
425
436
426 # this only comes up when IPython has been installed using vanilla
437 # this only comes up when IPython has been installed using vanilla
427 # setuptools, and *not* distribute.
438 # setuptools, and *not* distribute.
428 import multiprocessing
439 import multiprocessing
429 p = multiprocessing.current_process()
440 p = multiprocessing.current_process()
430 # the main process has name 'MainProcess'
441 # the main process has name 'MainProcess'
431 # subprocesses will have names like 'Process-1'
442 # subprocesses will have names like 'Process-1'
432 if p.name != 'MainProcess':
443 if p.name != 'MainProcess':
433 # we are a subprocess, don't start another Controller!
444 # we are a subprocess, don't start another Controller!
434 return
445 return
435 app = IPControllerApp.instance()
446 app = IPControllerApp.instance()
436 app.initialize()
447 app.initialize()
437 app.start()
448 app.start()
438
449
439
450
440 if __name__ == '__main__':
451 if __name__ == '__main__':
441 launch_new_instance()
452 launch_new_instance()
@@ -1,336 +1,344 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import json
24 import json
25 import os
25 import os
26 import sys
26 import sys
27 import time
27 import time
28
28
29 import zmq
29 import zmq
30 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
31
31
32 from IPython.core.profiledir import ProfileDir
32 from IPython.core.profiledir import ProfileDir
33 from IPython.parallel.apps.baseapp import (
33 from IPython.parallel.apps.baseapp import (
34 BaseParallelApplication,
34 BaseParallelApplication,
35 base_aliases,
35 base_aliases,
36 base_flags,
36 base_flags,
37 )
37 )
38 from IPython.zmq.log import EnginePUBHandler
38 from IPython.zmq.log import EnginePUBHandler
39
39
40 from IPython.config.configurable import Configurable
40 from IPython.config.configurable import Configurable
41 from IPython.zmq.session import Session
41 from IPython.zmq.session import Session
42 from IPython.parallel.engine.engine import EngineFactory
42 from IPython.parallel.engine.engine import EngineFactory
43 from IPython.parallel.engine.streamkernel import Kernel
43 from IPython.parallel.engine.streamkernel import Kernel
44 from IPython.parallel.util import disambiguate_url, asbytes
44 from IPython.parallel.util import disambiguate_url, asbytes
45
45
46 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
48
48
49
49
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51 # Module level variables
51 # Module level variables
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53
53
54 #: The default config file name for this application
54 #: The default config file name for this application
55 default_config_file_name = u'ipengine_config.py'
55 default_config_file_name = u'ipengine_config.py'
56
56
57 _description = """Start an IPython engine for parallel computing.
57 _description = """Start an IPython engine for parallel computing.
58
58
59 IPython engines run in parallel and perform computations on behalf of a client
59 IPython engines run in parallel and perform computations on behalf of a client
60 and controller. A controller needs to be started before the engines. The
60 and controller. A controller needs to be started before the engines. The
61 engine can be configured using command line options or using a cluster
61 engine can be configured using command line options or using a cluster
62 directory. Cluster directories contain config, log and security files and are
62 directory. Cluster directories contain config, log and security files and are
63 usually located in your ipython directory and named as "profile_name".
63 usually located in your ipython directory and named as "profile_name".
64 See the `profile` and `profile-dir` options for details.
64 See the `profile` and `profile-dir` options for details.
65 """
65 """
66
66
67 _examples = """
67 _examples = """
68 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
68 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
69 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
69 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
70 """
70 """
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # MPI configuration
73 # MPI configuration
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76 mpi4py_init = """from mpi4py import MPI as mpi
76 mpi4py_init = """from mpi4py import MPI as mpi
77 mpi.size = mpi.COMM_WORLD.Get_size()
77 mpi.size = mpi.COMM_WORLD.Get_size()
78 mpi.rank = mpi.COMM_WORLD.Get_rank()
78 mpi.rank = mpi.COMM_WORLD.Get_rank()
79 """
79 """
80
80
81
81
82 pytrilinos_init = """from PyTrilinos import Epetra
82 pytrilinos_init = """from PyTrilinos import Epetra
83 class SimpleStruct:
83 class SimpleStruct:
84 pass
84 pass
85 mpi = SimpleStruct()
85 mpi = SimpleStruct()
86 mpi.rank = 0
86 mpi.rank = 0
87 mpi.size = 0
87 mpi.size = 0
88 """
88 """
89
89
90 class MPI(Configurable):
90 class MPI(Configurable):
91 """Configurable for MPI initialization"""
91 """Configurable for MPI initialization"""
92 use = Unicode('', config=True,
92 use = Unicode('', config=True,
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
94 )
94 )
95
95
96 def _on_use_changed(self, old, new):
96 def _on_use_changed(self, old, new):
97 # load default init script if it's not set
97 # load default init script if it's not set
98 if not self.init_script:
98 if not self.init_script:
99 self.init_script = self.default_inits.get(new, '')
99 self.init_script = self.default_inits.get(new, '')
100
100
101 init_script = Unicode('', config=True,
101 init_script = Unicode('', config=True,
102 help="Initialization code for MPI")
102 help="Initialization code for MPI")
103
103
104 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
104 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
105 config=True)
105 config=True)
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Main application
109 # Main application
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111 aliases = dict(
111 aliases = dict(
112 file = 'IPEngineApp.url_file',
112 file = 'IPEngineApp.url_file',
113 c = 'IPEngineApp.startup_command',
113 c = 'IPEngineApp.startup_command',
114 s = 'IPEngineApp.startup_script',
114 s = 'IPEngineApp.startup_script',
115
115
116 ident = 'Session.session',
116 ident = 'Session.session',
117 user = 'Session.username',
117 user = 'Session.username',
118 keyfile = 'Session.keyfile',
118 keyfile = 'Session.keyfile',
119
119
120 url = 'EngineFactory.url',
120 url = 'EngineFactory.url',
121 ssh = 'EngineFactory.sshserver',
121 ssh = 'EngineFactory.sshserver',
122 sshkey = 'EngineFactory.sshkey',
122 sshkey = 'EngineFactory.sshkey',
123 ip = 'EngineFactory.ip',
123 ip = 'EngineFactory.ip',
124 transport = 'EngineFactory.transport',
124 transport = 'EngineFactory.transport',
125 port = 'EngineFactory.regport',
125 port = 'EngineFactory.regport',
126 location = 'EngineFactory.location',
126 location = 'EngineFactory.location',
127
127
128 timeout = 'EngineFactory.timeout',
128 timeout = 'EngineFactory.timeout',
129
129
130 mpi = 'MPI.use',
130 mpi = 'MPI.use',
131
131
132 )
132 )
133 aliases.update(base_aliases)
133 aliases.update(base_aliases)
134
134
135
135
136 class IPEngineApp(BaseParallelApplication):
136 class IPEngineApp(BaseParallelApplication):
137
137
138 name = Unicode(u'ipengine')
138 name = 'ipengine'
139 description = Unicode(_description)
139 description = _description
140 examples = _examples
140 examples = _examples
141 config_file_name = Unicode(default_config_file_name)
141 config_file_name = Unicode(default_config_file_name)
142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
143
143
144 startup_script = Unicode(u'', config=True,
144 startup_script = Unicode(u'', config=True,
145 help='specify a script to be run at startup')
145 help='specify a script to be run at startup')
146 startup_command = Unicode('', config=True,
146 startup_command = Unicode('', config=True,
147 help='specify a command to be run at startup')
147 help='specify a command to be run at startup')
148
148
149 url_file = Unicode(u'', config=True,
149 url_file = Unicode(u'', config=True,
150 help="""The full location of the file containing the connection information for
150 help="""The full location of the file containing the connection information for
151 the controller. If this is not given, the file must be in the
151 the controller. If this is not given, the file must be in the
152 security directory of the cluster directory. This location is
152 security directory of the cluster directory. This location is
153 resolved using the `profile` or `profile_dir` options.""",
153 resolved using the `profile` or `profile_dir` options.""",
154 )
154 )
155 wait_for_url_file = Float(5, config=True,
155 wait_for_url_file = Float(5, config=True,
156 help="""The maximum number of seconds to wait for url_file to exist.
156 help="""The maximum number of seconds to wait for url_file to exist.
157 This is useful for batch-systems and shared-filesystems where the
157 This is useful for batch-systems and shared-filesystems where the
158 controller and engine are started at the same time and it
158 controller and engine are started at the same time and it
159 may take a moment for the controller to write the connector files.""")
159 may take a moment for the controller to write the connector files.""")
160
160
161 url_file_name = Unicode(u'ipcontroller-engine.json')
161 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
162
163 def _cluster_id_changed(self, name, old, new):
164 if new:
165 base = 'ipcontroller-%s'%new
166 else:
167 base = 'ipcontroller'
168 self.url_file_name = "%s-engine.json"%base
169
162 log_url = Unicode('', config=True,
170 log_url = Unicode('', config=True,
163 help="""The URL for the iploggerapp instance, for forwarding
171 help="""The URL for the iploggerapp instance, for forwarding
164 logging to a central location.""")
172 logging to a central location.""")
165
173
166 aliases = Dict(aliases)
174 aliases = Dict(aliases)
167
175
168 # def find_key_file(self):
176 # def find_key_file(self):
169 # """Set the key file.
177 # """Set the key file.
170 #
178 #
171 # Here we don't try to actually see if it exists for is valid as that
179 # Here we don't try to actually see if it exists for is valid as that
172 # is hadled by the connection logic.
180 # is hadled by the connection logic.
173 # """
181 # """
174 # config = self.master_config
182 # config = self.master_config
175 # # Find the actual controller key file
183 # # Find the actual controller key file
176 # if not config.Global.key_file:
184 # if not config.Global.key_file:
177 # try_this = os.path.join(
185 # try_this = os.path.join(
178 # config.Global.profile_dir,
186 # config.Global.profile_dir,
179 # config.Global.security_dir,
187 # config.Global.security_dir,
180 # config.Global.key_file_name
188 # config.Global.key_file_name
181 # )
189 # )
182 # config.Global.key_file = try_this
190 # config.Global.key_file = try_this
183
191
184 def find_url_file(self):
192 def find_url_file(self):
185 """Set the url file.
193 """Set the url file.
186
194
187 Here we don't try to actually see if it exists for is valid as that
195 Here we don't try to actually see if it exists for is valid as that
188 is hadled by the connection logic.
196 is hadled by the connection logic.
189 """
197 """
190 config = self.config
198 config = self.config
191 # Find the actual controller key file
199 # Find the actual controller key file
192 if not self.url_file:
200 if not self.url_file:
193 self.url_file = os.path.join(
201 self.url_file = os.path.join(
194 self.profile_dir.security_dir,
202 self.profile_dir.security_dir,
195 self.url_file_name
203 self.url_file_name
196 )
204 )
197
205
198 def load_connector_file(self):
206 def load_connector_file(self):
199 """load config from a JSON connector file,
207 """load config from a JSON connector file,
200 at a *lower* priority than command-line/config files.
208 at a *lower* priority than command-line/config files.
201 """
209 """
202
210
203 self.log.info("Loading url_file %r"%self.url_file)
211 self.log.info("Loading url_file %r"%self.url_file)
204 config = self.config
212 config = self.config
205
213
206 with open(self.url_file) as f:
214 with open(self.url_file) as f:
207 d = json.loads(f.read())
215 d = json.loads(f.read())
208
216
209 try:
217 try:
210 config.Session.key
218 config.Session.key
211 except AttributeError:
219 except AttributeError:
212 if d['exec_key']:
220 if d['exec_key']:
213 config.Session.key = asbytes(d['exec_key'])
221 config.Session.key = asbytes(d['exec_key'])
214
222
215 try:
223 try:
216 config.EngineFactory.location
224 config.EngineFactory.location
217 except AttributeError:
225 except AttributeError:
218 config.EngineFactory.location = d['location']
226 config.EngineFactory.location = d['location']
219
227
220 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
228 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
221 try:
229 try:
222 config.EngineFactory.url
230 config.EngineFactory.url
223 except AttributeError:
231 except AttributeError:
224 config.EngineFactory.url = d['url']
232 config.EngineFactory.url = d['url']
225
233
226 try:
234 try:
227 config.EngineFactory.sshserver
235 config.EngineFactory.sshserver
228 except AttributeError:
236 except AttributeError:
229 config.EngineFactory.sshserver = d['ssh']
237 config.EngineFactory.sshserver = d['ssh']
230
238
231 def init_engine(self):
239 def init_engine(self):
232 # This is the working dir by now.
240 # This is the working dir by now.
233 sys.path.insert(0, '')
241 sys.path.insert(0, '')
234 config = self.config
242 config = self.config
235 # print config
243 # print config
236 self.find_url_file()
244 self.find_url_file()
237
245
238 # was the url manually specified?
246 # was the url manually specified?
239 keys = set(self.config.EngineFactory.keys())
247 keys = set(self.config.EngineFactory.keys())
240 keys = keys.union(set(self.config.RegistrationFactory.keys()))
248 keys = keys.union(set(self.config.RegistrationFactory.keys()))
241
249
242 if keys.intersection(set(['ip', 'url', 'port'])):
250 if keys.intersection(set(['ip', 'url', 'port'])):
243 # Connection info was specified, don't wait for the file
251 # Connection info was specified, don't wait for the file
244 url_specified = True
252 url_specified = True
245 self.wait_for_url_file = 0
253 self.wait_for_url_file = 0
246 else:
254 else:
247 url_specified = False
255 url_specified = False
248
256
249 if self.wait_for_url_file and not os.path.exists(self.url_file):
257 if self.wait_for_url_file and not os.path.exists(self.url_file):
250 self.log.warn("url_file %r not found"%self.url_file)
258 self.log.warn("url_file %r not found"%self.url_file)
251 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
259 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
252 tic = time.time()
260 tic = time.time()
253 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
261 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
254 # wait for url_file to exist, for up to 10 seconds
262 # wait for url_file to exist, for up to 10 seconds
255 time.sleep(0.1)
263 time.sleep(0.1)
256
264
257 if os.path.exists(self.url_file):
265 if os.path.exists(self.url_file):
258 self.load_connector_file()
266 self.load_connector_file()
259 elif not url_specified:
267 elif not url_specified:
260 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
268 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
261 self.exit(1)
269 self.exit(1)
262
270
263
271
264 try:
272 try:
265 exec_lines = config.Kernel.exec_lines
273 exec_lines = config.Kernel.exec_lines
266 except AttributeError:
274 except AttributeError:
267 config.Kernel.exec_lines = []
275 config.Kernel.exec_lines = []
268 exec_lines = config.Kernel.exec_lines
276 exec_lines = config.Kernel.exec_lines
269
277
270 if self.startup_script:
278 if self.startup_script:
271 enc = sys.getfilesystemencoding() or 'utf8'
279 enc = sys.getfilesystemencoding() or 'utf8'
272 cmd="execfile(%r)"%self.startup_script.encode(enc)
280 cmd="execfile(%r)"%self.startup_script.encode(enc)
273 exec_lines.append(cmd)
281 exec_lines.append(cmd)
274 if self.startup_command:
282 if self.startup_command:
275 exec_lines.append(self.startup_command)
283 exec_lines.append(self.startup_command)
276
284
277 # Create the underlying shell class and Engine
285 # Create the underlying shell class and Engine
278 # shell_class = import_item(self.master_config.Global.shell_class)
286 # shell_class = import_item(self.master_config.Global.shell_class)
279 # print self.config
287 # print self.config
280 try:
288 try:
281 self.engine = EngineFactory(config=config, log=self.log)
289 self.engine = EngineFactory(config=config, log=self.log)
282 except:
290 except:
283 self.log.error("Couldn't start the Engine", exc_info=True)
291 self.log.error("Couldn't start the Engine", exc_info=True)
284 self.exit(1)
292 self.exit(1)
285
293
286 def forward_logging(self):
294 def forward_logging(self):
287 if self.log_url:
295 if self.log_url:
288 self.log.info("Forwarding logging to %s"%self.log_url)
296 self.log.info("Forwarding logging to %s"%self.log_url)
289 context = self.engine.context
297 context = self.engine.context
290 lsock = context.socket(zmq.PUB)
298 lsock = context.socket(zmq.PUB)
291 lsock.connect(self.log_url)
299 lsock.connect(self.log_url)
292 self.log.removeHandler(self._log_handler)
300 self.log.removeHandler(self._log_handler)
293 handler = EnginePUBHandler(self.engine, lsock)
301 handler = EnginePUBHandler(self.engine, lsock)
294 handler.setLevel(self.log_level)
302 handler.setLevel(self.log_level)
295 self.log.addHandler(handler)
303 self.log.addHandler(handler)
296 self._log_handler = handler
304 self._log_handler = handler
297
305
298 def init_mpi(self):
306 def init_mpi(self):
299 global mpi
307 global mpi
300 self.mpi = MPI(config=self.config)
308 self.mpi = MPI(config=self.config)
301
309
302 mpi_import_statement = self.mpi.init_script
310 mpi_import_statement = self.mpi.init_script
303 if mpi_import_statement:
311 if mpi_import_statement:
304 try:
312 try:
305 self.log.info("Initializing MPI:")
313 self.log.info("Initializing MPI:")
306 self.log.info(mpi_import_statement)
314 self.log.info(mpi_import_statement)
307 exec mpi_import_statement in globals()
315 exec mpi_import_statement in globals()
308 except:
316 except:
309 mpi = None
317 mpi = None
310 else:
318 else:
311 mpi = None
319 mpi = None
312
320
313 def initialize(self, argv=None):
321 def initialize(self, argv=None):
314 super(IPEngineApp, self).initialize(argv)
322 super(IPEngineApp, self).initialize(argv)
315 self.init_mpi()
323 self.init_mpi()
316 self.init_engine()
324 self.init_engine()
317 self.forward_logging()
325 self.forward_logging()
318
326
319 def start(self):
327 def start(self):
320 self.engine.start()
328 self.engine.start()
321 try:
329 try:
322 self.engine.loop.start()
330 self.engine.loop.start()
323 except KeyboardInterrupt:
331 except KeyboardInterrupt:
324 self.log.critical("Engine Interrupted, shutting down...\n")
332 self.log.critical("Engine Interrupted, shutting down...\n")
325
333
326
334
327 def launch_new_instance():
335 def launch_new_instance():
328 """Create and run the IPython engine"""
336 """Create and run the IPython engine"""
329 app = IPEngineApp.instance()
337 app = IPEngineApp.instance()
330 app.initialize()
338 app.initialize()
331 app.start()
339 app.start()
332
340
333
341
334 if __name__ == '__main__':
342 if __name__ == '__main__':
335 launch_new_instance()
343 launch_new_instance()
336
344
General Comments 0
You need to be logged in to leave comments. Login now