##// END OF EJS Templates
add ssh tunneling to Engine...
MinRK -
Show More
@@ -1,431 +1,441 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import os
26 import os
27 import socket
27 import socket
28 import stat
28 import stat
29 import sys
29 import sys
30 import uuid
30 import uuid
31
31
32 from multiprocessing import Process
32 from multiprocessing import Process
33
33
34 import zmq
34 import zmq
35 from zmq.devices import ProcessMonitoredQueue
35 from zmq.devices import ProcessMonitoredQueue
36 from zmq.log.handlers import PUBHandler
36 from zmq.log.handlers import PUBHandler
37 from zmq.utils import jsonapi as json
37 from zmq.utils import jsonapi as json
38
38
39 from IPython.config.application import boolean_flag
39 from IPython.config.application import boolean_flag
40 from IPython.core.profiledir import ProfileDir
40 from IPython.core.profiledir import ProfileDir
41
41
42 from IPython.parallel.apps.baseapp import (
42 from IPython.parallel.apps.baseapp import (
43 BaseParallelApplication,
43 BaseParallelApplication,
44 base_aliases,
44 base_aliases,
45 base_flags,
45 base_flags,
46 )
46 )
47 from IPython.utils.importstring import import_item
47 from IPython.utils.importstring import import_item
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
49
49
50 # from IPython.parallel.controller.controller import ControllerFactory
50 # from IPython.parallel.controller.controller import ControllerFactory
51 from IPython.zmq.session import Session
51 from IPython.zmq.session import Session
52 from IPython.parallel.controller.heartmonitor import HeartMonitor
52 from IPython.parallel.controller.heartmonitor import HeartMonitor
53 from IPython.parallel.controller.hub import HubFactory
53 from IPython.parallel.controller.hub import HubFactory
54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 from IPython.parallel.controller.sqlitedb import SQLiteDB
55 from IPython.parallel.controller.sqlitedb import SQLiteDB
56
56
57 from IPython.parallel.util import signal_children, split_url, asbytes
57 from IPython.parallel.util import signal_children, split_url, asbytes
58
58
59 # conditional import of MongoDB backend class
59 # conditional import of MongoDB backend class
60
60
61 try:
61 try:
62 from IPython.parallel.controller.mongodb import MongoDB
62 from IPython.parallel.controller.mongodb import MongoDB
63 except ImportError:
63 except ImportError:
64 maybe_mongo = []
64 maybe_mongo = []
65 else:
65 else:
66 maybe_mongo = [MongoDB]
66 maybe_mongo = [MongoDB]
67
67
68
68
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70 # Module level variables
70 # Module level variables
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72
72
73
73
74 #: The default config file name for this application
74 #: The default config file name for this application
75 default_config_file_name = u'ipcontroller_config.py'
75 default_config_file_name = u'ipcontroller_config.py'
76
76
77
77
78 _description = """Start the IPython controller for parallel computing.
78 _description = """Start the IPython controller for parallel computing.
79
79
80 The IPython controller provides a gateway between the IPython engines and
80 The IPython controller provides a gateway between the IPython engines and
81 clients. The controller needs to be started before the engines and can be
81 clients. The controller needs to be started before the engines and can be
82 configured using command line options or using a cluster directory. Cluster
82 configured using command line options or using a cluster directory. Cluster
83 directories contain config, log and security files and are usually located in
83 directories contain config, log and security files and are usually located in
84 your ipython directory and named as "profile_name". See the `profile`
84 your ipython directory and named as "profile_name". See the `profile`
85 and `profile-dir` options for details.
85 and `profile-dir` options for details.
86 """
86 """
87
87
88 _examples = """
88 _examples = """
89 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
89 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
90 ipcontroller --scheme=pure # use the pure zeromq scheduler
90 ipcontroller --scheme=pure # use the pure zeromq scheduler
91 """
91 """
92
92
93
93
94 #-----------------------------------------------------------------------------
94 #-----------------------------------------------------------------------------
95 # The main application
95 # The main application
96 #-----------------------------------------------------------------------------
96 #-----------------------------------------------------------------------------
97 flags = {}
97 flags = {}
98 flags.update(base_flags)
98 flags.update(base_flags)
99 flags.update({
99 flags.update({
100 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
100 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
101 'Use threads instead of processes for the schedulers'),
101 'Use threads instead of processes for the schedulers'),
102 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
102 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
103 'use the SQLiteDB backend'),
103 'use the SQLiteDB backend'),
104 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
104 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
105 'use the MongoDB backend'),
105 'use the MongoDB backend'),
106 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
106 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
107 'use the in-memory DictDB backend'),
107 'use the in-memory DictDB backend'),
108 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
108 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
109 'reuse existing json connection files')
109 'reuse existing json connection files')
110 })
110 })
111
111
112 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
112 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
113 "Use HMAC digests for authentication of messages.",
113 "Use HMAC digests for authentication of messages.",
114 "Don't authenticate messages."
114 "Don't authenticate messages."
115 ))
115 ))
116 aliases = dict(
116 aliases = dict(
117 secure = 'IPControllerApp.secure',
117 secure = 'IPControllerApp.secure',
118 ssh = 'IPControllerApp.ssh_server',
118 ssh = 'IPControllerApp.ssh_server',
119 enginessh = 'IPControllerApp.engine_ssh_server',
119 location = 'IPControllerApp.location',
120 location = 'IPControllerApp.location',
120
121
121 ident = 'Session.session',
122 ident = 'Session.session',
122 user = 'Session.username',
123 user = 'Session.username',
123 keyfile = 'Session.keyfile',
124 keyfile = 'Session.keyfile',
124
125
125 url = 'HubFactory.url',
126 url = 'HubFactory.url',
126 ip = 'HubFactory.ip',
127 ip = 'HubFactory.ip',
127 transport = 'HubFactory.transport',
128 transport = 'HubFactory.transport',
128 port = 'HubFactory.regport',
129 port = 'HubFactory.regport',
129
130
130 ping = 'HeartMonitor.period',
131 ping = 'HeartMonitor.period',
131
132
132 scheme = 'TaskScheduler.scheme_name',
133 scheme = 'TaskScheduler.scheme_name',
133 hwm = 'TaskScheduler.hwm',
134 hwm = 'TaskScheduler.hwm',
134 )
135 )
135 aliases.update(base_aliases)
136 aliases.update(base_aliases)
136
137
137
138
138 class IPControllerApp(BaseParallelApplication):
139 class IPControllerApp(BaseParallelApplication):
139
140
140 name = u'ipcontroller'
141 name = u'ipcontroller'
141 description = _description
142 description = _description
142 examples = _examples
143 examples = _examples
143 config_file_name = Unicode(default_config_file_name)
144 config_file_name = Unicode(default_config_file_name)
144 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
145 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
145
146
146 # change default to True
147 # change default to True
147 auto_create = Bool(True, config=True,
148 auto_create = Bool(True, config=True,
148 help="""Whether to create profile dir if it doesn't exist.""")
149 help="""Whether to create profile dir if it doesn't exist.""")
149
150
150 reuse_files = Bool(False, config=True,
151 reuse_files = Bool(False, config=True,
151 help='Whether to reuse existing json connection files.'
152 help='Whether to reuse existing json connection files.'
152 )
153 )
153 secure = Bool(True, config=True,
154 secure = Bool(True, config=True,
154 help='Whether to use HMAC digests for extra message authentication.'
155 help='Whether to use HMAC digests for extra message authentication.'
155 )
156 )
156 ssh_server = Unicode(u'', config=True,
157 ssh_server = Unicode(u'', config=True,
157 help="""ssh url for clients to use when connecting to the Controller
158 help="""ssh url for clients to use when connecting to the Controller
158 processes. It should be of the form: [user@]server[:port]. The
159 processes. It should be of the form: [user@]server[:port]. The
159 Controller's listening addresses must be accessible from the ssh server""",
160 Controller's listening addresses must be accessible from the ssh server""",
160 )
161 )
162 engine_ssh_server = Unicode(u'', config=True,
163 help="""ssh url for engines to use when connecting to the Controller
164 processes. It should be of the form: [user@]server[:port]. The
165 Controller's listening addresses must be accessible from the ssh server""",
166 )
161 location = Unicode(u'', config=True,
167 location = Unicode(u'', config=True,
162 help="""The external IP or domain name of the Controller, used for disambiguating
168 help="""The external IP or domain name of the Controller, used for disambiguating
163 engine and client connections.""",
169 engine and client connections.""",
164 )
170 )
165 import_statements = List([], config=True,
171 import_statements = List([], config=True,
166 help="import statements to be run at startup. Necessary in some environments"
172 help="import statements to be run at startup. Necessary in some environments"
167 )
173 )
168
174
169 use_threads = Bool(False, config=True,
175 use_threads = Bool(False, config=True,
170 help='Use threads instead of processes for the schedulers',
176 help='Use threads instead of processes for the schedulers',
171 )
177 )
172
178
173 # internal
179 # internal
174 children = List()
180 children = List()
175 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
181 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
176
182
177 def _use_threads_changed(self, name, old, new):
183 def _use_threads_changed(self, name, old, new):
178 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
184 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
179
185
180 aliases = Dict(aliases)
186 aliases = Dict(aliases)
181 flags = Dict(flags)
187 flags = Dict(flags)
182
188
183
189
184 def save_connection_dict(self, fname, cdict):
190 def save_connection_dict(self, fname, cdict):
185 """save a connection dict to json file."""
191 """save a connection dict to json file."""
186 c = self.config
192 c = self.config
187 url = cdict['url']
193 url = cdict['url']
188 location = cdict['location']
194 location = cdict['location']
189 if not location:
195 if not location:
190 try:
196 try:
191 proto,ip,port = split_url(url)
197 proto,ip,port = split_url(url)
192 except AssertionError:
198 except AssertionError:
193 pass
199 pass
194 else:
200 else:
195 try:
201 try:
196 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
202 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
197 except (socket.gaierror, IndexError):
203 except (socket.gaierror, IndexError):
198 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
204 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
199 " You may need to specify '--location=<external_ip_address>' to help"
205 " You may need to specify '--location=<external_ip_address>' to help"
200 " IPython decide when to connect via loopback.")
206 " IPython decide when to connect via loopback.")
201 location = '127.0.0.1'
207 location = '127.0.0.1'
202 cdict['location'] = location
208 cdict['location'] = location
203 fname = os.path.join(self.profile_dir.security_dir, fname)
209 fname = os.path.join(self.profile_dir.security_dir, fname)
204 with open(fname, 'wb') as f:
210 with open(fname, 'wb') as f:
205 f.write(json.dumps(cdict, indent=2))
211 f.write(json.dumps(cdict, indent=2))
206 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
212 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
207
213
208 def load_config_from_json(self):
214 def load_config_from_json(self):
209 """load config from existing json connector files."""
215 """load config from existing json connector files."""
210 c = self.config
216 c = self.config
211 # load from engine config
217 # load from engine config
212 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
218 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
213 cfg = json.loads(f.read())
219 cfg = json.loads(f.read())
214 key = c.Session.key = asbytes(cfg['exec_key'])
220 key = c.Session.key = asbytes(cfg['exec_key'])
215 xport,addr = cfg['url'].split('://')
221 xport,addr = cfg['url'].split('://')
216 c.HubFactory.engine_transport = xport
222 c.HubFactory.engine_transport = xport
217 ip,ports = addr.split(':')
223 ip,ports = addr.split(':')
218 c.HubFactory.engine_ip = ip
224 c.HubFactory.engine_ip = ip
219 c.HubFactory.regport = int(ports)
225 c.HubFactory.regport = int(ports)
220 self.location = cfg['location']
226 self.location = cfg['location']
227 if not self.engine_ssh_server:
228 self.engine_ssh_server = cfg['ssh']
221 # load client config
229 # load client config
222 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
230 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
223 cfg = json.loads(f.read())
231 cfg = json.loads(f.read())
224 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
232 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
225 xport,addr = cfg['url'].split('://')
233 xport,addr = cfg['url'].split('://')
226 c.HubFactory.client_transport = xport
234 c.HubFactory.client_transport = xport
227 ip,ports = addr.split(':')
235 ip,ports = addr.split(':')
228 c.HubFactory.client_ip = ip
236 c.HubFactory.client_ip = ip
229 self.ssh_server = cfg['ssh']
237 if not self.ssh_server:
238 self.ssh_server = cfg['ssh']
230 assert int(ports) == c.HubFactory.regport, "regport mismatch"
239 assert int(ports) == c.HubFactory.regport, "regport mismatch"
231
240
232 def init_hub(self):
241 def init_hub(self):
233 c = self.config
242 c = self.config
234
243
235 self.do_import_statements()
244 self.do_import_statements()
236 reusing = self.reuse_files
245 reusing = self.reuse_files
237 if reusing:
246 if reusing:
238 try:
247 try:
239 self.load_config_from_json()
248 self.load_config_from_json()
240 except (AssertionError,IOError):
249 except (AssertionError,IOError):
241 reusing=False
250 reusing=False
242 # check again, because reusing may have failed:
251 # check again, because reusing may have failed:
243 if reusing:
252 if reusing:
244 pass
253 pass
245 elif self.secure:
254 elif self.secure:
246 key = str(uuid.uuid4())
255 key = str(uuid.uuid4())
247 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
256 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
248 # with open(keyfile, 'w') as f:
257 # with open(keyfile, 'w') as f:
249 # f.write(key)
258 # f.write(key)
250 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
259 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
251 c.Session.key = asbytes(key)
260 c.Session.key = asbytes(key)
252 else:
261 else:
253 key = c.Session.key = b''
262 key = c.Session.key = b''
254
263
255 try:
264 try:
256 self.factory = HubFactory(config=c, log=self.log)
265 self.factory = HubFactory(config=c, log=self.log)
257 # self.start_logging()
266 # self.start_logging()
258 self.factory.init_hub()
267 self.factory.init_hub()
259 except:
268 except:
260 self.log.error("Couldn't construct the Controller", exc_info=True)
269 self.log.error("Couldn't construct the Controller", exc_info=True)
261 self.exit(1)
270 self.exit(1)
262
271
263 if not reusing:
272 if not reusing:
264 # save to new json config files
273 # save to new json config files
265 f = self.factory
274 f = self.factory
266 cdict = {'exec_key' : key,
275 cdict = {'exec_key' : key,
267 'ssh' : self.ssh_server,
276 'ssh' : self.ssh_server,
268 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
277 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
269 'location' : self.location
278 'location' : self.location
270 }
279 }
271 self.save_connection_dict('ipcontroller-client.json', cdict)
280 self.save_connection_dict('ipcontroller-client.json', cdict)
272 edict = cdict
281 edict = cdict
273 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
282 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
283 edict['ssh'] = self.engine_ssh_server
274 self.save_connection_dict('ipcontroller-engine.json', edict)
284 self.save_connection_dict('ipcontroller-engine.json', edict)
275
285
276 #
286 #
277 def init_schedulers(self):
287 def init_schedulers(self):
278 children = self.children
288 children = self.children
279 mq = import_item(str(self.mq_class))
289 mq = import_item(str(self.mq_class))
280
290
281 hub = self.factory
291 hub = self.factory
282 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
292 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
283 # IOPub relay (in a Process)
293 # IOPub relay (in a Process)
284 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
294 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
285 q.bind_in(hub.client_info['iopub'])
295 q.bind_in(hub.client_info['iopub'])
286 q.bind_out(hub.engine_info['iopub'])
296 q.bind_out(hub.engine_info['iopub'])
287 q.setsockopt_out(zmq.SUBSCRIBE, b'')
297 q.setsockopt_out(zmq.SUBSCRIBE, b'')
288 q.connect_mon(hub.monitor_url)
298 q.connect_mon(hub.monitor_url)
289 q.daemon=True
299 q.daemon=True
290 children.append(q)
300 children.append(q)
291
301
292 # Multiplexer Queue (in a Process)
302 # Multiplexer Queue (in a Process)
293 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out')
303 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'in', b'out')
294 q.bind_in(hub.client_info['mux'])
304 q.bind_in(hub.client_info['mux'])
295 q.setsockopt_in(zmq.IDENTITY, b'mux')
305 q.setsockopt_in(zmq.IDENTITY, b'mux')
296 q.bind_out(hub.engine_info['mux'])
306 q.bind_out(hub.engine_info['mux'])
297 q.connect_mon(hub.monitor_url)
307 q.connect_mon(hub.monitor_url)
298 q.daemon=True
308 q.daemon=True
299 children.append(q)
309 children.append(q)
300
310
301 # Control Queue (in a Process)
311 # Control Queue (in a Process)
302 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol')
312 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, b'incontrol', b'outcontrol')
303 q.bind_in(hub.client_info['control'])
313 q.bind_in(hub.client_info['control'])
304 q.setsockopt_in(zmq.IDENTITY, b'control')
314 q.setsockopt_in(zmq.IDENTITY, b'control')
305 q.bind_out(hub.engine_info['control'])
315 q.bind_out(hub.engine_info['control'])
306 q.connect_mon(hub.monitor_url)
316 q.connect_mon(hub.monitor_url)
307 q.daemon=True
317 q.daemon=True
308 children.append(q)
318 children.append(q)
309 try:
319 try:
310 scheme = self.config.TaskScheduler.scheme_name
320 scheme = self.config.TaskScheduler.scheme_name
311 except AttributeError:
321 except AttributeError:
312 scheme = TaskScheduler.scheme_name.get_default_value()
322 scheme = TaskScheduler.scheme_name.get_default_value()
313 # Task Queue (in a Process)
323 # Task Queue (in a Process)
314 if scheme == 'pure':
324 if scheme == 'pure':
315 self.log.warn("task::using pure XREQ Task scheduler")
325 self.log.warn("task::using pure XREQ Task scheduler")
316 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask')
326 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, b'intask', b'outtask')
317 # q.setsockopt_out(zmq.HWM, hub.hwm)
327 # q.setsockopt_out(zmq.HWM, hub.hwm)
318 q.bind_in(hub.client_info['task'][1])
328 q.bind_in(hub.client_info['task'][1])
319 q.setsockopt_in(zmq.IDENTITY, b'task')
329 q.setsockopt_in(zmq.IDENTITY, b'task')
320 q.bind_out(hub.engine_info['task'])
330 q.bind_out(hub.engine_info['task'])
321 q.connect_mon(hub.monitor_url)
331 q.connect_mon(hub.monitor_url)
322 q.daemon=True
332 q.daemon=True
323 children.append(q)
333 children.append(q)
324 elif scheme == 'none':
334 elif scheme == 'none':
325 self.log.warn("task::using no Task scheduler")
335 self.log.warn("task::using no Task scheduler")
326
336
327 else:
337 else:
328 self.log.info("task::using Python %s Task scheduler"%scheme)
338 self.log.info("task::using Python %s Task scheduler"%scheme)
329 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
339 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
330 hub.monitor_url, hub.client_info['notification'])
340 hub.monitor_url, hub.client_info['notification'])
331 kwargs = dict(logname='scheduler', loglevel=self.log_level,
341 kwargs = dict(logname='scheduler', loglevel=self.log_level,
332 log_url = self.log_url, config=dict(self.config))
342 log_url = self.log_url, config=dict(self.config))
333 if 'Process' in self.mq_class:
343 if 'Process' in self.mq_class:
334 # run the Python scheduler in a Process
344 # run the Python scheduler in a Process
335 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
345 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
336 q.daemon=True
346 q.daemon=True
337 children.append(q)
347 children.append(q)
338 else:
348 else:
339 # single-threaded Controller
349 # single-threaded Controller
340 kwargs['in_thread'] = True
350 kwargs['in_thread'] = True
341 launch_scheduler(*sargs, **kwargs)
351 launch_scheduler(*sargs, **kwargs)
342
352
343
353
344 def save_urls(self):
354 def save_urls(self):
345 """save the registration urls to files."""
355 """save the registration urls to files."""
346 c = self.config
356 c = self.config
347
357
348 sec_dir = self.profile_dir.security_dir
358 sec_dir = self.profile_dir.security_dir
349 cf = self.factory
359 cf = self.factory
350
360
351 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
361 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
352 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
362 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
353
363
354 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
364 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
355 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
365 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
356
366
357
367
358 def do_import_statements(self):
368 def do_import_statements(self):
359 statements = self.import_statements
369 statements = self.import_statements
360 for s in statements:
370 for s in statements:
361 try:
371 try:
362 self.log.msg("Executing statement: '%s'" % s)
372 self.log.msg("Executing statement: '%s'" % s)
363 exec s in globals(), locals()
373 exec s in globals(), locals()
364 except:
374 except:
365 self.log.msg("Error running statement: %s" % s)
375 self.log.msg("Error running statement: %s" % s)
366
376
367 def forward_logging(self):
377 def forward_logging(self):
368 if self.log_url:
378 if self.log_url:
369 self.log.info("Forwarding logging to %s"%self.log_url)
379 self.log.info("Forwarding logging to %s"%self.log_url)
370 context = zmq.Context.instance()
380 context = zmq.Context.instance()
371 lsock = context.socket(zmq.PUB)
381 lsock = context.socket(zmq.PUB)
372 lsock.connect(self.log_url)
382 lsock.connect(self.log_url)
373 handler = PUBHandler(lsock)
383 handler = PUBHandler(lsock)
374 self.log.removeHandler(self._log_handler)
384 self.log.removeHandler(self._log_handler)
375 handler.root_topic = 'controller'
385 handler.root_topic = 'controller'
376 handler.setLevel(self.log_level)
386 handler.setLevel(self.log_level)
377 self.log.addHandler(handler)
387 self.log.addHandler(handler)
378 self._log_handler = handler
388 self._log_handler = handler
379 # #
389 # #
380
390
381 def initialize(self, argv=None):
391 def initialize(self, argv=None):
382 super(IPControllerApp, self).initialize(argv)
392 super(IPControllerApp, self).initialize(argv)
383 self.forward_logging()
393 self.forward_logging()
384 self.init_hub()
394 self.init_hub()
385 self.init_schedulers()
395 self.init_schedulers()
386
396
387 def start(self):
397 def start(self):
388 # Start the subprocesses:
398 # Start the subprocesses:
389 self.factory.start()
399 self.factory.start()
390 child_procs = []
400 child_procs = []
391 for child in self.children:
401 for child in self.children:
392 child.start()
402 child.start()
393 if isinstance(child, ProcessMonitoredQueue):
403 if isinstance(child, ProcessMonitoredQueue):
394 child_procs.append(child.launcher)
404 child_procs.append(child.launcher)
395 elif isinstance(child, Process):
405 elif isinstance(child, Process):
396 child_procs.append(child)
406 child_procs.append(child)
397 if child_procs:
407 if child_procs:
398 signal_children(child_procs)
408 signal_children(child_procs)
399
409
400 self.write_pid_file(overwrite=True)
410 self.write_pid_file(overwrite=True)
401
411
402 try:
412 try:
403 self.factory.loop.start()
413 self.factory.loop.start()
404 except KeyboardInterrupt:
414 except KeyboardInterrupt:
405 self.log.critical("Interrupted, Exiting...\n")
415 self.log.critical("Interrupted, Exiting...\n")
406
416
407
417
408
418
409 def launch_new_instance():
419 def launch_new_instance():
410 """Create and run the IPython controller"""
420 """Create and run the IPython controller"""
411 if sys.platform == 'win32':
421 if sys.platform == 'win32':
412 # make sure we don't get called from a multiprocessing subprocess
422 # make sure we don't get called from a multiprocessing subprocess
413 # this can result in infinite Controllers being started on Windows
423 # this can result in infinite Controllers being started on Windows
414 # which doesn't have a proper fork, so multiprocessing is wonky
424 # which doesn't have a proper fork, so multiprocessing is wonky
415
425
416 # this only comes up when IPython has been installed using vanilla
426 # this only comes up when IPython has been installed using vanilla
417 # setuptools, and *not* distribute.
427 # setuptools, and *not* distribute.
418 import multiprocessing
428 import multiprocessing
419 p = multiprocessing.current_process()
429 p = multiprocessing.current_process()
420 # the main process has name 'MainProcess'
430 # the main process has name 'MainProcess'
421 # subprocesses will have names like 'Process-1'
431 # subprocesses will have names like 'Process-1'
422 if p.name != 'MainProcess':
432 if p.name != 'MainProcess':
423 # we are a subprocess, don't start another Controller!
433 # we are a subprocess, don't start another Controller!
424 return
434 return
425 app = IPControllerApp.instance()
435 app = IPControllerApp.instance()
426 app.initialize()
436 app.initialize()
427 app.start()
437 app.start()
428
438
429
439
430 if __name__ == '__main__':
440 if __name__ == '__main__':
431 launch_new_instance()
441 launch_new_instance()
@@ -1,307 +1,336 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import json
24 import json
25 import os
25 import os
26 import sys
26 import sys
27 import time
27 import time
28
28
29 import zmq
29 import zmq
30 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
31
31
32 from IPython.core.profiledir import ProfileDir
32 from IPython.core.profiledir import ProfileDir
33 from IPython.parallel.apps.baseapp import (
33 from IPython.parallel.apps.baseapp import (
34 BaseParallelApplication,
34 BaseParallelApplication,
35 base_aliases,
35 base_aliases,
36 base_flags,
36 base_flags,
37 )
37 )
38 from IPython.zmq.log import EnginePUBHandler
38 from IPython.zmq.log import EnginePUBHandler
39
39
40 from IPython.config.configurable import Configurable
40 from IPython.config.configurable import Configurable
41 from IPython.zmq.session import Session
41 from IPython.zmq.session import Session
42 from IPython.parallel.engine.engine import EngineFactory
42 from IPython.parallel.engine.engine import EngineFactory
43 from IPython.parallel.engine.streamkernel import Kernel
43 from IPython.parallel.engine.streamkernel import Kernel
44 from IPython.parallel.util import disambiguate_url, asbytes
44 from IPython.parallel.util import disambiguate_url, asbytes
45
45
46 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
48
48
49
49
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51 # Module level variables
51 # Module level variables
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53
53
54 #: The default config file name for this application
54 #: The default config file name for this application
55 default_config_file_name = u'ipengine_config.py'
55 default_config_file_name = u'ipengine_config.py'
56
56
57 _description = """Start an IPython engine for parallel computing.
57 _description = """Start an IPython engine for parallel computing.
58
58
59 IPython engines run in parallel and perform computations on behalf of a client
59 IPython engines run in parallel and perform computations on behalf of a client
60 and controller. A controller needs to be started before the engines. The
60 and controller. A controller needs to be started before the engines. The
61 engine can be configured using command line options or using a cluster
61 engine can be configured using command line options or using a cluster
62 directory. Cluster directories contain config, log and security files and are
62 directory. Cluster directories contain config, log and security files and are
63 usually located in your ipython directory and named as "profile_name".
63 usually located in your ipython directory and named as "profile_name".
64 See the `profile` and `profile-dir` options for details.
64 See the `profile` and `profile-dir` options for details.
65 """
65 """
66
66
67 _examples = """
67 _examples = """
68 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
68 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
69 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
69 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
70 """
70 """
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # MPI configuration
73 # MPI configuration
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76 mpi4py_init = """from mpi4py import MPI as mpi
76 mpi4py_init = """from mpi4py import MPI as mpi
77 mpi.size = mpi.COMM_WORLD.Get_size()
77 mpi.size = mpi.COMM_WORLD.Get_size()
78 mpi.rank = mpi.COMM_WORLD.Get_rank()
78 mpi.rank = mpi.COMM_WORLD.Get_rank()
79 """
79 """
80
80
81
81
82 pytrilinos_init = """from PyTrilinos import Epetra
82 pytrilinos_init = """from PyTrilinos import Epetra
83 class SimpleStruct:
83 class SimpleStruct:
84 pass
84 pass
85 mpi = SimpleStruct()
85 mpi = SimpleStruct()
86 mpi.rank = 0
86 mpi.rank = 0
87 mpi.size = 0
87 mpi.size = 0
88 """
88 """
89
89
90 class MPI(Configurable):
90 class MPI(Configurable):
91 """Configurable for MPI initialization"""
91 """Configurable for MPI initialization"""
92 use = Unicode('', config=True,
92 use = Unicode('', config=True,
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
94 )
94 )
95
95
96 def _on_use_changed(self, old, new):
96 def _on_use_changed(self, old, new):
97 # load default init script if it's not set
97 # load default init script if it's not set
98 if not self.init_script:
98 if not self.init_script:
99 self.init_script = self.default_inits.get(new, '')
99 self.init_script = self.default_inits.get(new, '')
100
100
101 init_script = Unicode('', config=True,
101 init_script = Unicode('', config=True,
102 help="Initialization code for MPI")
102 help="Initialization code for MPI")
103
103
104 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
104 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
105 config=True)
105 config=True)
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Main application
109 # Main application
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111 aliases = dict(
111 aliases = dict(
112 file = 'IPEngineApp.url_file',
112 file = 'IPEngineApp.url_file',
113 c = 'IPEngineApp.startup_command',
113 c = 'IPEngineApp.startup_command',
114 s = 'IPEngineApp.startup_script',
114 s = 'IPEngineApp.startup_script',
115
115
116 ident = 'Session.session',
116 ident = 'Session.session',
117 user = 'Session.username',
117 user = 'Session.username',
118 keyfile = 'Session.keyfile',
118 keyfile = 'Session.keyfile',
119
119
120 url = 'EngineFactory.url',
120 url = 'EngineFactory.url',
121 ssh = 'EngineFactory.sshserver',
122 sshkey = 'EngineFactory.sshkey',
121 ip = 'EngineFactory.ip',
123 ip = 'EngineFactory.ip',
122 transport = 'EngineFactory.transport',
124 transport = 'EngineFactory.transport',
123 port = 'EngineFactory.regport',
125 port = 'EngineFactory.regport',
124 location = 'EngineFactory.location',
126 location = 'EngineFactory.location',
125
127
126 timeout = 'EngineFactory.timeout',
128 timeout = 'EngineFactory.timeout',
127
129
128 mpi = 'MPI.use',
130 mpi = 'MPI.use',
129
131
130 )
132 )
131 aliases.update(base_aliases)
133 aliases.update(base_aliases)
132
134
133
135
134 class IPEngineApp(BaseParallelApplication):
136 class IPEngineApp(BaseParallelApplication):
135
137
136 name = Unicode(u'ipengine')
138 name = Unicode(u'ipengine')
137 description = Unicode(_description)
139 description = Unicode(_description)
138 examples = _examples
140 examples = _examples
139 config_file_name = Unicode(default_config_file_name)
141 config_file_name = Unicode(default_config_file_name)
140 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
141
143
142 startup_script = Unicode(u'', config=True,
144 startup_script = Unicode(u'', config=True,
143 help='specify a script to be run at startup')
145 help='specify a script to be run at startup')
144 startup_command = Unicode('', config=True,
146 startup_command = Unicode('', config=True,
145 help='specify a command to be run at startup')
147 help='specify a command to be run at startup')
146
148
147 url_file = Unicode(u'', config=True,
149 url_file = Unicode(u'', config=True,
148 help="""The full location of the file containing the connection information for
150 help="""The full location of the file containing the connection information for
149 the controller. If this is not given, the file must be in the
151 the controller. If this is not given, the file must be in the
150 security directory of the cluster directory. This location is
152 security directory of the cluster directory. This location is
151 resolved using the `profile` or `profile_dir` options.""",
153 resolved using the `profile` or `profile_dir` options.""",
152 )
154 )
153 wait_for_url_file = Float(5, config=True,
155 wait_for_url_file = Float(5, config=True,
154 help="""The maximum number of seconds to wait for url_file to exist.
156 help="""The maximum number of seconds to wait for url_file to exist.
155 This is useful for batch-systems and shared-filesystems where the
157 This is useful for batch-systems and shared-filesystems where the
156 controller and engine are started at the same time and it
158 controller and engine are started at the same time and it
157 may take a moment for the controller to write the connector files.""")
159 may take a moment for the controller to write the connector files.""")
158
160
159 url_file_name = Unicode(u'ipcontroller-engine.json')
161 url_file_name = Unicode(u'ipcontroller-engine.json')
160 log_url = Unicode('', config=True,
162 log_url = Unicode('', config=True,
161 help="""The URL for the iploggerapp instance, for forwarding
163 help="""The URL for the iploggerapp instance, for forwarding
162 logging to a central location.""")
164 logging to a central location.""")
163
165
164 aliases = Dict(aliases)
166 aliases = Dict(aliases)
165
167
166 # def find_key_file(self):
168 # def find_key_file(self):
167 # """Set the key file.
169 # """Set the key file.
168 #
170 #
169 # Here we don't try to actually see if it exists for is valid as that
171 # Here we don't try to actually see if it exists for is valid as that
170 # is hadled by the connection logic.
172 # is hadled by the connection logic.
171 # """
173 # """
172 # config = self.master_config
174 # config = self.master_config
173 # # Find the actual controller key file
175 # # Find the actual controller key file
174 # if not config.Global.key_file:
176 # if not config.Global.key_file:
175 # try_this = os.path.join(
177 # try_this = os.path.join(
176 # config.Global.profile_dir,
178 # config.Global.profile_dir,
177 # config.Global.security_dir,
179 # config.Global.security_dir,
178 # config.Global.key_file_name
180 # config.Global.key_file_name
179 # )
181 # )
180 # config.Global.key_file = try_this
182 # config.Global.key_file = try_this
181
183
182 def find_url_file(self):
184 def find_url_file(self):
183 """Set the url file.
185 """Set the url file.
184
186
185 Here we don't try to actually see if it exists for is valid as that
187 Here we don't try to actually see if it exists for is valid as that
186 is hadled by the connection logic.
188 is hadled by the connection logic.
187 """
189 """
188 config = self.config
190 config = self.config
189 # Find the actual controller key file
191 # Find the actual controller key file
190 if not self.url_file:
192 if not self.url_file:
191 self.url_file = os.path.join(
193 self.url_file = os.path.join(
192 self.profile_dir.security_dir,
194 self.profile_dir.security_dir,
193 self.url_file_name
195 self.url_file_name
194 )
196 )
197
198 def load_connector_file(self):
199 """load config from a JSON connector file,
200 at a *lower* priority than command-line/config files.
201 """
202
203 self.log.info("Loading url_file %r"%self.url_file)
204 config = self.config
205
206 with open(self.url_file) as f:
207 d = json.loads(f.read())
208
209 try:
210 config.Session.key
211 except AttributeError:
212 if d['exec_key']:
213 config.Session.key = asbytes(d['exec_key'])
214
215 try:
216 config.EngineFactory.location
217 except AttributeError:
218 config.EngineFactory.location = d['location']
219
220 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
221 try:
222 config.EngineFactory.url
223 except AttributeError:
224 config.EngineFactory.url = d['url']
225
226 try:
227 config.EngineFactory.sshserver
228 except AttributeError:
229 config.EngineFactory.sshserver = d['ssh']
230
195 def init_engine(self):
231 def init_engine(self):
196 # This is the working dir by now.
232 # This is the working dir by now.
197 sys.path.insert(0, '')
233 sys.path.insert(0, '')
198 config = self.config
234 config = self.config
199 # print config
235 # print config
200 self.find_url_file()
236 self.find_url_file()
201
237
202 # was the url manually specified?
238 # was the url manually specified?
203 keys = set(self.config.EngineFactory.keys())
239 keys = set(self.config.EngineFactory.keys())
204 keys = keys.union(set(self.config.RegistrationFactory.keys()))
240 keys = keys.union(set(self.config.RegistrationFactory.keys()))
205
241
206 if keys.intersection(set(['ip', 'url', 'port'])):
242 if keys.intersection(set(['ip', 'url', 'port'])):
207 # Connection info was specified, don't wait for the file
243 # Connection info was specified, don't wait for the file
208 url_specified = True
244 url_specified = True
209 self.wait_for_url_file = 0
245 self.wait_for_url_file = 0
210 else:
246 else:
211 url_specified = False
247 url_specified = False
212
248
213 if self.wait_for_url_file and not os.path.exists(self.url_file):
249 if self.wait_for_url_file and not os.path.exists(self.url_file):
214 self.log.warn("url_file %r not found"%self.url_file)
250 self.log.warn("url_file %r not found"%self.url_file)
215 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
251 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
216 tic = time.time()
252 tic = time.time()
217 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
253 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
218 # wait for url_file to exist, for up to 10 seconds
254 # wait for url_file to exist, for up to 10 seconds
219 time.sleep(0.1)
255 time.sleep(0.1)
220
256
221 if os.path.exists(self.url_file):
257 if os.path.exists(self.url_file):
222 self.log.info("Loading url_file %r"%self.url_file)
258 self.load_connector_file()
223 with open(self.url_file) as f:
224 d = json.loads(f.read())
225 if d['exec_key']:
226 config.Session.key = asbytes(d['exec_key'])
227 d['url'] = disambiguate_url(d['url'], d['location'])
228 config.EngineFactory.url = d['url']
229 config.EngineFactory.location = d['location']
230 elif not url_specified:
259 elif not url_specified:
231 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
260 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
232 self.exit(1)
261 self.exit(1)
233
262
234
263
235 try:
264 try:
236 exec_lines = config.Kernel.exec_lines
265 exec_lines = config.Kernel.exec_lines
237 except AttributeError:
266 except AttributeError:
238 config.Kernel.exec_lines = []
267 config.Kernel.exec_lines = []
239 exec_lines = config.Kernel.exec_lines
268 exec_lines = config.Kernel.exec_lines
240
269
241 if self.startup_script:
270 if self.startup_script:
242 enc = sys.getfilesystemencoding() or 'utf8'
271 enc = sys.getfilesystemencoding() or 'utf8'
243 cmd="execfile(%r)"%self.startup_script.encode(enc)
272 cmd="execfile(%r)"%self.startup_script.encode(enc)
244 exec_lines.append(cmd)
273 exec_lines.append(cmd)
245 if self.startup_command:
274 if self.startup_command:
246 exec_lines.append(self.startup_command)
275 exec_lines.append(self.startup_command)
247
276
248 # Create the underlying shell class and Engine
277 # Create the underlying shell class and Engine
249 # shell_class = import_item(self.master_config.Global.shell_class)
278 # shell_class = import_item(self.master_config.Global.shell_class)
250 # print self.config
279 # print self.config
251 try:
280 try:
252 self.engine = EngineFactory(config=config, log=self.log)
281 self.engine = EngineFactory(config=config, log=self.log)
253 except:
282 except:
254 self.log.error("Couldn't start the Engine", exc_info=True)
283 self.log.error("Couldn't start the Engine", exc_info=True)
255 self.exit(1)
284 self.exit(1)
256
285
257 def forward_logging(self):
286 def forward_logging(self):
258 if self.log_url:
287 if self.log_url:
259 self.log.info("Forwarding logging to %s"%self.log_url)
288 self.log.info("Forwarding logging to %s"%self.log_url)
260 context = self.engine.context
289 context = self.engine.context
261 lsock = context.socket(zmq.PUB)
290 lsock = context.socket(zmq.PUB)
262 lsock.connect(self.log_url)
291 lsock.connect(self.log_url)
263 self.log.removeHandler(self._log_handler)
292 self.log.removeHandler(self._log_handler)
264 handler = EnginePUBHandler(self.engine, lsock)
293 handler = EnginePUBHandler(self.engine, lsock)
265 handler.setLevel(self.log_level)
294 handler.setLevel(self.log_level)
266 self.log.addHandler(handler)
295 self.log.addHandler(handler)
267 self._log_handler = handler
296 self._log_handler = handler
268 #
297
269 def init_mpi(self):
298 def init_mpi(self):
270 global mpi
299 global mpi
271 self.mpi = MPI(config=self.config)
300 self.mpi = MPI(config=self.config)
272
301
273 mpi_import_statement = self.mpi.init_script
302 mpi_import_statement = self.mpi.init_script
274 if mpi_import_statement:
303 if mpi_import_statement:
275 try:
304 try:
276 self.log.info("Initializing MPI:")
305 self.log.info("Initializing MPI:")
277 self.log.info(mpi_import_statement)
306 self.log.info(mpi_import_statement)
278 exec mpi_import_statement in globals()
307 exec mpi_import_statement in globals()
279 except:
308 except:
280 mpi = None
309 mpi = None
281 else:
310 else:
282 mpi = None
311 mpi = None
283
312
284 def initialize(self, argv=None):
313 def initialize(self, argv=None):
285 super(IPEngineApp, self).initialize(argv)
314 super(IPEngineApp, self).initialize(argv)
286 self.init_mpi()
315 self.init_mpi()
287 self.init_engine()
316 self.init_engine()
288 self.forward_logging()
317 self.forward_logging()
289
318
290 def start(self):
319 def start(self):
291 self.engine.start()
320 self.engine.start()
292 try:
321 try:
293 self.engine.loop.start()
322 self.engine.loop.start()
294 except KeyboardInterrupt:
323 except KeyboardInterrupt:
295 self.log.critical("Engine Interrupted, shutting down...\n")
324 self.log.critical("Engine Interrupted, shutting down...\n")
296
325
297
326
298 def launch_new_instance():
327 def launch_new_instance():
299 """Create and run the IPython engine"""
328 """Create and run the IPython engine"""
300 app = IPEngineApp.instance()
329 app = IPEngineApp.instance()
301 app.initialize()
330 app.initialize()
302 app.start()
331 app.start()
303
332
304
333
305 if __name__ == '__main__':
334 if __name__ == '__main__':
306 launch_new_instance()
335 launch_new_instance()
307
336
@@ -1,173 +1,226 b''
1 """A simple engine that talks to a controller over 0MQ.
1 """A simple engine that talks to a controller over 0MQ.
2 it handles registration, etc. and launches a kernel
2 it handles registration, etc. and launches a kernel
3 connected to the Controller's Schedulers.
3 connected to the Controller's Schedulers.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from getpass import getpass
20
21
21 import zmq
22 import zmq
22 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
23
24
25 from IPython.external.ssh import tunnel
24 # internal
26 # internal
25 from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, CBytes
27 from IPython.utils.traitlets import (
28 Instance, Dict, Int, Type, CFloat, Unicode, CBytes, Bool
29 )
26 # from IPython.utils.localinterfaces import LOCALHOST
30 # from IPython.utils.localinterfaces import LOCALHOST
27
31
28 from IPython.parallel.controller.heartmonitor import Heart
32 from IPython.parallel.controller.heartmonitor import Heart
29 from IPython.parallel.factory import RegistrationFactory
33 from IPython.parallel.factory import RegistrationFactory
30 from IPython.parallel.util import disambiguate_url, asbytes
34 from IPython.parallel.util import disambiguate_url, asbytes
31
35
32 from IPython.zmq.session import Message
36 from IPython.zmq.session import Message
33
37
34 from .streamkernel import Kernel
38 from .streamkernel import Kernel
35
39
36 class EngineFactory(RegistrationFactory):
40 class EngineFactory(RegistrationFactory):
37 """IPython engine"""
41 """IPython engine"""
38
42
39 # configurables:
43 # configurables:
40 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
44 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
41 help="""The OutStream for handling stdout/err.
45 help="""The OutStream for handling stdout/err.
42 Typically 'IPython.zmq.iostream.OutStream'""")
46 Typically 'IPython.zmq.iostream.OutStream'""")
43 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
47 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
44 help="""The class for handling displayhook.
48 help="""The class for handling displayhook.
45 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
49 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
46 location=Unicode(config=True,
50 location=Unicode(config=True,
47 help="""The location (an IP address) of the controller. This is
51 help="""The location (an IP address) of the controller. This is
48 used for disambiguating URLs, to determine whether
52 used for disambiguating URLs, to determine whether
49 loopback should be used to connect or the public address.""")
53 loopback should be used to connect or the public address.""")
50 timeout=CFloat(2,config=True,
54 timeout=CFloat(2,config=True,
51 help="""The time (in seconds) to wait for the Controller to respond
55 help="""The time (in seconds) to wait for the Controller to respond
52 to registration requests before giving up.""")
56 to registration requests before giving up.""")
57 sshserver=Unicode(config=True,
58 help="""The SSH server to use for tunneling connections to the Controller.""")
59 sshkey=Unicode(config=True,
60 help="""The SSH keyfile to use when tunneling connections to the Controller.""")
61 paramiko=Bool(sys.platform == 'win32', config=True,
62 help="""Whether to use paramiko instead of openssh for tunnels.""")
53
63
54 # not configurable:
64 # not configurable:
55 user_ns=Dict()
65 user_ns=Dict()
56 id=Int(allow_none=True)
66 id=Int(allow_none=True)
57 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
67 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
58 kernel=Instance(Kernel)
68 kernel=Instance(Kernel)
59
69
60 bident = CBytes()
70 bident = CBytes()
61 ident = Unicode()
71 ident = Unicode()
62 def _ident_changed(self, name, old, new):
72 def _ident_changed(self, name, old, new):
63 self.bident = asbytes(new)
73 self.bident = asbytes(new)
74 using_ssh=Bool(False)
64
75
65
76
66 def __init__(self, **kwargs):
77 def __init__(self, **kwargs):
67 super(EngineFactory, self).__init__(**kwargs)
78 super(EngineFactory, self).__init__(**kwargs)
68 self.ident = self.session.session
79 self.ident = self.session.session
69 ctx = self.context
80
81 def init_connector(self):
82 """construct connection function, which handles tunnels."""
83 self.using_ssh = bool(self.sshkey or self.sshserver)
70
84
71 reg = ctx.socket(zmq.XREQ)
85 if self.sshkey and not self.sshserver:
72 reg.setsockopt(zmq.IDENTITY, self.bident)
86 # We are using ssh directly to the controller, tunneling localhost to localhost
73 reg.connect(self.url)
87 self.sshserver = self.url.split('://')[1].split(':')[0]
74 self.registrar = zmqstream.ZMQStream(reg, self.loop)
88
89 if self.using_ssh:
90 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
91 password=False
92 else:
93 password = getpass("SSH Password for %s: "%self.sshserver)
94 else:
95 password = False
96
97 def connect(s, url):
98 url = disambiguate_url(url, self.location)
99 if self.using_ssh:
100 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
101 return tunnel.tunnel_connection(s, url, self.sshserver,
102 keyfile=self.sshkey, paramiko=self.paramiko,
103 password=password,
104 )
105 else:
106 return s.connect(url)
107
108 def maybe_tunnel(url):
109 """like connect, but don't complete the connection (for use by heartbeat)"""
110 url = disambiguate_url(url, self.location)
111 if self.using_ssh:
112 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
113 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
114 keyfile=self.sshkey, paramiko=self.paramiko,
115 password=password,
116 )
117 return url
118 return connect, maybe_tunnel
75
119
76 def register(self):
120 def register(self):
77 """send the registration_request"""
121 """send the registration_request"""
78
122
79 self.log.info("Registering with controller at %s"%self.url)
123 self.log.info("Registering with controller at %s"%self.url)
124 ctx = self.context
125 connect,maybe_tunnel = self.init_connector()
126 reg = ctx.socket(zmq.XREQ)
127 reg.setsockopt(zmq.IDENTITY, self.bident)
128 connect(reg, self.url)
129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
130
131
80 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
81 self.registrar.on_recv(self.complete_registration)
133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
82 # print (self.session.key)
134 # print (self.session.key)
83 self.session.send(self.registrar, "registration_request",content=content)
135 self.session.send(self.registrar, "registration_request",content=content)
84
136
85 def complete_registration(self, msg):
137 def complete_registration(self, msg, connect, maybe_tunnel):
86 # print msg
138 # print msg
87 self._abort_dc.stop()
139 self._abort_dc.stop()
88 ctx = self.context
140 ctx = self.context
89 loop = self.loop
141 loop = self.loop
90 identity = self.bident
142 identity = self.bident
91 idents,msg = self.session.feed_identities(msg)
143 idents,msg = self.session.feed_identities(msg)
92 msg = Message(self.session.unserialize(msg))
144 msg = Message(self.session.unserialize(msg))
93
145
94 if msg.content.status == 'ok':
146 if msg.content.status == 'ok':
95 self.id = int(msg.content.id)
147 self.id = int(msg.content.id)
96
148
149 # launch heartbeat
150 hb_addrs = msg.content.heartbeat
151
152 # possibly forward hb ports with tunnels
153 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
154 heart = Heart(*map(str, hb_addrs), heart_id=identity)
155 heart.start()
156
97 # create Shell Streams (MUX, Task, etc.):
157 # create Shell Streams (MUX, Task, etc.):
98 queue_addr = msg.content.mux
158 queue_addr = msg.content.mux
99 shell_addrs = [ str(queue_addr) ]
159 shell_addrs = [ str(queue_addr) ]
100 task_addr = msg.content.task
160 task_addr = msg.content.task
101 if task_addr:
161 if task_addr:
102 shell_addrs.append(str(task_addr))
162 shell_addrs.append(str(task_addr))
103
163
104 # Uncomment this to go back to two-socket model
164 # Uncomment this to go back to two-socket model
105 # shell_streams = []
165 # shell_streams = []
106 # for addr in shell_addrs:
166 # for addr in shell_addrs:
107 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
167 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
108 # stream.setsockopt(zmq.IDENTITY, identity)
168 # stream.setsockopt(zmq.IDENTITY, identity)
109 # stream.connect(disambiguate_url(addr, self.location))
169 # stream.connect(disambiguate_url(addr, self.location))
110 # shell_streams.append(stream)
170 # shell_streams.append(stream)
111
171
112 # Now use only one shell stream for mux and tasks
172 # Now use only one shell stream for mux and tasks
113 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
173 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
114 stream.setsockopt(zmq.IDENTITY, identity)
174 stream.setsockopt(zmq.IDENTITY, identity)
115 shell_streams = [stream]
175 shell_streams = [stream]
116 for addr in shell_addrs:
176 for addr in shell_addrs:
117 stream.connect(disambiguate_url(addr, self.location))
177 connect(stream, addr)
118 # end single stream-socket
178 # end single stream-socket
119
179
120 # control stream:
180 # control stream:
121 control_addr = str(msg.content.control)
181 control_addr = str(msg.content.control)
122 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
182 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
123 control_stream.setsockopt(zmq.IDENTITY, identity)
183 control_stream.setsockopt(zmq.IDENTITY, identity)
124 control_stream.connect(disambiguate_url(control_addr, self.location))
184 connect(control_stream, control_addr)
125
185
126 # create iopub stream:
186 # create iopub stream:
127 iopub_addr = msg.content.iopub
187 iopub_addr = msg.content.iopub
128 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
188 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
129 iopub_stream.setsockopt(zmq.IDENTITY, identity)
189 iopub_stream.setsockopt(zmq.IDENTITY, identity)
130 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
190 connect(iopub_stream, iopub_addr)
131
132 # launch heartbeat
133 hb_addrs = msg.content.heartbeat
134 # print (hb_addrs)
135
191
136 # # Redirect input streams and set a display hook.
192 # # Redirect input streams and set a display hook.
137 if self.out_stream_factory:
193 if self.out_stream_factory:
138 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
194 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
139 sys.stdout.topic = 'engine.%i.stdout'%self.id
195 sys.stdout.topic = 'engine.%i.stdout'%self.id
140 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
196 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
141 sys.stderr.topic = 'engine.%i.stderr'%self.id
197 sys.stderr.topic = 'engine.%i.stderr'%self.id
142 if self.display_hook_factory:
198 if self.display_hook_factory:
143 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
199 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
144 sys.displayhook.topic = 'engine.%i.pyout'%self.id
200 sys.displayhook.topic = 'engine.%i.pyout'%self.id
145
201
146 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
202 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
147 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
203 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
148 loop=loop, user_ns = self.user_ns, log=self.log)
204 loop=loop, user_ns = self.user_ns, log=self.log)
149 self.kernel.start()
205 self.kernel.start()
150 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
151 heart = Heart(*map(str, hb_addrs), heart_id=identity)
152 heart.start()
153
206
154
207
155 else:
208 else:
156 self.log.fatal("Registration Failed: %s"%msg)
209 self.log.fatal("Registration Failed: %s"%msg)
157 raise Exception("Registration Failed: %s"%msg)
210 raise Exception("Registration Failed: %s"%msg)
158
211
159 self.log.info("Completed registration with id %i"%self.id)
212 self.log.info("Completed registration with id %i"%self.id)
160
213
161
214
162 def abort(self):
215 def abort(self):
163 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
216 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
164 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
217 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
165 time.sleep(1)
218 time.sleep(1)
166 sys.exit(255)
219 sys.exit(255)
167
220
168 def start(self):
221 def start(self):
169 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
222 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
170 dc.start()
223 dc.start()
171 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
224 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
172 self._abort_dc.start()
225 self._abort_dc.start()
173
226
General Comments 0
You need to be logged in to leave comments. Login now