##// END OF EJS Templates
minor cleanup in ipcontroller/ipengine...
MinRK -
Show More
@@ -1,445 +1,450
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import json
26 import json
27 import os
27 import os
28 import socket
28 import socket
29 import stat
29 import stat
30 import sys
30 import sys
31
31
32 from multiprocessing import Process
32 from multiprocessing import Process
33
33
34 import zmq
34 import zmq
35 from zmq.devices import ProcessMonitoredQueue
35 from zmq.devices import ProcessMonitoredQueue
36 from zmq.log.handlers import PUBHandler
36 from zmq.log.handlers import PUBHandler
37
37
38 from IPython.core.profiledir import ProfileDir
38 from IPython.core.profiledir import ProfileDir
39
39
40 from IPython.parallel.apps.baseapp import (
40 from IPython.parallel.apps.baseapp import (
41 BaseParallelApplication,
41 BaseParallelApplication,
42 base_aliases,
42 base_aliases,
43 base_flags,
43 base_flags,
44 catch_config_error,
44 catch_config_error,
45 )
45 )
46 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
47 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
47 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
48
48
49 from IPython.zmq.session import (
49 from IPython.zmq.session import (
50 Session, session_aliases, session_flags, default_secure
50 Session, session_aliases, session_flags, default_secure
51 )
51 )
52
52
53 from IPython.parallel.controller.heartmonitor import HeartMonitor
53 from IPython.parallel.controller.heartmonitor import HeartMonitor
54 from IPython.parallel.controller.hub import HubFactory
54 from IPython.parallel.controller.hub import HubFactory
55 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 from IPython.parallel.controller.sqlitedb import SQLiteDB
56 from IPython.parallel.controller.sqlitedb import SQLiteDB
57
57
58 from IPython.parallel.util import signal_children, split_url, asbytes, disambiguate_url
58 from IPython.parallel.util import signal_children, split_url, asbytes, disambiguate_url
59
59
60 # conditional import of MongoDB backend class
60 # conditional import of MongoDB backend class
61
61
62 try:
62 try:
63 from IPython.parallel.controller.mongodb import MongoDB
63 from IPython.parallel.controller.mongodb import MongoDB
64 except ImportError:
64 except ImportError:
65 maybe_mongo = []
65 maybe_mongo = []
66 else:
66 else:
67 maybe_mongo = [MongoDB]
67 maybe_mongo = [MongoDB]
68
68
69
69
70 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
71 # Module level variables
71 # Module level variables
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73
73
74
74
75 #: The default config file name for this application
75 #: The default config file name for this application
76 default_config_file_name = u'ipcontroller_config.py'
76 default_config_file_name = u'ipcontroller_config.py'
77
77
78
78
79 _description = """Start the IPython controller for parallel computing.
79 _description = """Start the IPython controller for parallel computing.
80
80
81 The IPython controller provides a gateway between the IPython engines and
81 The IPython controller provides a gateway between the IPython engines and
82 clients. The controller needs to be started before the engines and can be
82 clients. The controller needs to be started before the engines and can be
83 configured using command line options or using a cluster directory. Cluster
83 configured using command line options or using a cluster directory. Cluster
84 directories contain config, log and security files and are usually located in
84 directories contain config, log and security files and are usually located in
85 your ipython directory and named as "profile_name". See the `profile`
85 your ipython directory and named as "profile_name". See the `profile`
86 and `profile-dir` options for details.
86 and `profile-dir` options for details.
87 """
87 """
88
88
89 _examples = """
89 _examples = """
90 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
90 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
91 ipcontroller --scheme=pure # use the pure zeromq scheduler
91 ipcontroller --scheme=pure # use the pure zeromq scheduler
92 """
92 """
93
93
94
94
95 #-----------------------------------------------------------------------------
95 #-----------------------------------------------------------------------------
96 # The main application
96 # The main application
97 #-----------------------------------------------------------------------------
97 #-----------------------------------------------------------------------------
98 flags = {}
98 flags = {}
99 flags.update(base_flags)
99 flags.update(base_flags)
100 flags.update({
100 flags.update({
101 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
101 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
102 'Use threads instead of processes for the schedulers'),
102 'Use threads instead of processes for the schedulers'),
103 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
103 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
104 'use the SQLiteDB backend'),
104 'use the SQLiteDB backend'),
105 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
105 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
106 'use the MongoDB backend'),
106 'use the MongoDB backend'),
107 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
107 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
108 'use the in-memory DictDB backend'),
108 'use the in-memory DictDB backend'),
109 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
109 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
110 'reuse existing json connection files')
110 'reuse existing json connection files')
111 })
111 })
112
112
113 flags.update(session_flags)
113 flags.update(session_flags)
114
114
115 aliases = dict(
115 aliases = dict(
116 ssh = 'IPControllerApp.ssh_server',
116 ssh = 'IPControllerApp.ssh_server',
117 enginessh = 'IPControllerApp.engine_ssh_server',
117 enginessh = 'IPControllerApp.engine_ssh_server',
118 location = 'IPControllerApp.location',
118 location = 'IPControllerApp.location',
119
119
120 url = 'HubFactory.url',
120 url = 'HubFactory.url',
121 ip = 'HubFactory.ip',
121 ip = 'HubFactory.ip',
122 transport = 'HubFactory.transport',
122 transport = 'HubFactory.transport',
123 port = 'HubFactory.regport',
123 port = 'HubFactory.regport',
124
124
125 ping = 'HeartMonitor.period',
125 ping = 'HeartMonitor.period',
126
126
127 scheme = 'TaskScheduler.scheme_name',
127 scheme = 'TaskScheduler.scheme_name',
128 hwm = 'TaskScheduler.hwm',
128 hwm = 'TaskScheduler.hwm',
129 )
129 )
130 aliases.update(base_aliases)
130 aliases.update(base_aliases)
131 aliases.update(session_aliases)
131 aliases.update(session_aliases)
132
132
133
133
134 class IPControllerApp(BaseParallelApplication):
134 class IPControllerApp(BaseParallelApplication):
135
135
136 name = u'ipcontroller'
136 name = u'ipcontroller'
137 description = _description
137 description = _description
138 examples = _examples
138 examples = _examples
139 config_file_name = Unicode(default_config_file_name)
139 config_file_name = Unicode(default_config_file_name)
140 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
140 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
141
141
142 # change default to True
142 # change default to True
143 auto_create = Bool(True, config=True,
143 auto_create = Bool(True, config=True,
144 help="""Whether to create profile dir if it doesn't exist.""")
144 help="""Whether to create profile dir if it doesn't exist.""")
145
145
146 reuse_files = Bool(False, config=True,
146 reuse_files = Bool(False, config=True,
147 help='Whether to reuse existing json connection files.'
147 help='Whether to reuse existing json connection files.'
148 )
148 )
149 ssh_server = Unicode(u'', config=True,
149 ssh_server = Unicode(u'', config=True,
150 help="""ssh url for clients to use when connecting to the Controller
150 help="""ssh url for clients to use when connecting to the Controller
151 processes. It should be of the form: [user@]server[:port]. The
151 processes. It should be of the form: [user@]server[:port]. The
152 Controller's listening addresses must be accessible from the ssh server""",
152 Controller's listening addresses must be accessible from the ssh server""",
153 )
153 )
154 engine_ssh_server = Unicode(u'', config=True,
154 engine_ssh_server = Unicode(u'', config=True,
155 help="""ssh url for engines to use when connecting to the Controller
155 help="""ssh url for engines to use when connecting to the Controller
156 processes. It should be of the form: [user@]server[:port]. The
156 processes. It should be of the form: [user@]server[:port]. The
157 Controller's listening addresses must be accessible from the ssh server""",
157 Controller's listening addresses must be accessible from the ssh server""",
158 )
158 )
159 location = Unicode(u'', config=True,
159 location = Unicode(u'', config=True,
160 help="""The external IP or domain name of the Controller, used for disambiguating
160 help="""The external IP or domain name of the Controller, used for disambiguating
161 engine and client connections.""",
161 engine and client connections.""",
162 )
162 )
163 import_statements = List([], config=True,
163 import_statements = List([], config=True,
164 help="import statements to be run at startup. Necessary in some environments"
164 help="import statements to be run at startup. Necessary in some environments"
165 )
165 )
166
166
167 use_threads = Bool(False, config=True,
167 use_threads = Bool(False, config=True,
168 help='Use threads instead of processes for the schedulers',
168 help='Use threads instead of processes for the schedulers',
169 )
169 )
170
170
171 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
171 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
172 help="JSON filename where engine connection info will be stored.")
172 help="JSON filename where engine connection info will be stored.")
173 client_json_file = Unicode('ipcontroller-client.json', config=True,
173 client_json_file = Unicode('ipcontroller-client.json', config=True,
174 help="JSON filename where client connection info will be stored.")
174 help="JSON filename where client connection info will be stored.")
175
175
176 def _cluster_id_changed(self, name, old, new):
176 def _cluster_id_changed(self, name, old, new):
177 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
177 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
178 self.engine_json_file = "%s-engine.json" % self.name
178 self.engine_json_file = "%s-engine.json" % self.name
179 self.client_json_file = "%s-client.json" % self.name
179 self.client_json_file = "%s-client.json" % self.name
180
180
181
181
182 # internal
182 # internal
183 children = List()
183 children = List()
184 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
184 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
185
185
186 def _use_threads_changed(self, name, old, new):
186 def _use_threads_changed(self, name, old, new):
187 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
187 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
188
188
189 aliases = Dict(aliases)
189 aliases = Dict(aliases)
190 flags = Dict(flags)
190 flags = Dict(flags)
191
191
192
192
193 def save_connection_dict(self, fname, cdict):
193 def save_connection_dict(self, fname, cdict):
194 """save a connection dict to json file."""
194 """save a connection dict to json file."""
195 c = self.config
195 c = self.config
196 url = cdict['url']
196 url = cdict['url']
197 location = cdict['location']
197 location = cdict['location']
198 if not location:
198 if not location:
199 try:
199 try:
200 proto,ip,port = split_url(url)
200 proto,ip,port = split_url(url)
201 except AssertionError:
201 except AssertionError:
202 pass
202 pass
203 else:
203 else:
204 try:
204 try:
205 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
205 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
206 except (socket.gaierror, IndexError):
206 except (socket.gaierror, IndexError):
207 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
207 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
208 " You may need to specify '--location=<external_ip_address>' to help"
208 " You may need to specify '--location=<external_ip_address>' to help"
209 " IPython decide when to connect via loopback.")
209 " IPython decide when to connect via loopback.")
210 location = '127.0.0.1'
210 location = '127.0.0.1'
211 cdict['location'] = location
211 cdict['location'] = location
212 fname = os.path.join(self.profile_dir.security_dir, fname)
212 fname = os.path.join(self.profile_dir.security_dir, fname)
213 self.log.info("writing connection info to %s", fname)
213 with open(fname, 'w') as f:
214 with open(fname, 'w') as f:
214 f.write(json.dumps(cdict, indent=2))
215 f.write(json.dumps(cdict, indent=2))
215 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
216 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
216
217
217 def load_config_from_json(self):
218 def load_config_from_json(self):
218 """load config from existing json connector files."""
219 """load config from existing json connector files."""
219 c = self.config
220 c = self.config
220 self.log.debug("loading config from JSON")
221 self.log.debug("loading config from JSON")
221 # load from engine config
222 # load from engine config
222 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
223 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
224 self.log.info("loading connection info from %s", fname)
225 with open(fname) as f:
223 cfg = json.loads(f.read())
226 cfg = json.loads(f.read())
224 key = c.Session.key = asbytes(cfg['exec_key'])
227 key = c.Session.key = asbytes(cfg['exec_key'])
225 xport,addr = cfg['url'].split('://')
228 xport,addr = cfg['url'].split('://')
226 c.HubFactory.engine_transport = xport
229 c.HubFactory.engine_transport = xport
227 ip,ports = addr.split(':')
230 ip,ports = addr.split(':')
228 c.HubFactory.engine_ip = ip
231 c.HubFactory.engine_ip = ip
229 c.HubFactory.regport = int(ports)
232 c.HubFactory.regport = int(ports)
230 self.location = cfg['location']
233 self.location = cfg['location']
231 if not self.engine_ssh_server:
234 if not self.engine_ssh_server:
232 self.engine_ssh_server = cfg['ssh']
235 self.engine_ssh_server = cfg['ssh']
233 # load client config
236 # load client config
234 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
237 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
238 self.log.info("loading connection info from %s", fname)
239 with open(fname) as f:
235 cfg = json.loads(f.read())
240 cfg = json.loads(f.read())
236 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
241 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
237 xport,addr = cfg['url'].split('://')
242 xport,addr = cfg['url'].split('://')
238 c.HubFactory.client_transport = xport
243 c.HubFactory.client_transport = xport
239 ip,ports = addr.split(':')
244 ip,ports = addr.split(':')
240 c.HubFactory.client_ip = ip
245 c.HubFactory.client_ip = ip
241 if not self.ssh_server:
246 if not self.ssh_server:
242 self.ssh_server = cfg['ssh']
247 self.ssh_server = cfg['ssh']
243 assert int(ports) == c.HubFactory.regport, "regport mismatch"
248 assert int(ports) == c.HubFactory.regport, "regport mismatch"
244
249
245 def load_secondary_config(self):
250 def load_secondary_config(self):
246 """secondary config, loading from JSON and setting defaults"""
251 """secondary config, loading from JSON and setting defaults"""
247 if self.reuse_files:
252 if self.reuse_files:
248 try:
253 try:
249 self.load_config_from_json()
254 self.load_config_from_json()
250 except (AssertionError,IOError) as e:
255 except (AssertionError,IOError) as e:
251 self.log.error("Could not load config from JSON: %s" % e)
256 self.log.error("Could not load config from JSON: %s" % e)
252 self.reuse_files=False
257 self.reuse_files=False
253 # switch Session.key default to secure
258 # switch Session.key default to secure
254 default_secure(self.config)
259 default_secure(self.config)
255 self.log.debug("Config changed")
260 self.log.debug("Config changed")
256 self.log.debug(repr(self.config))
261 self.log.debug(repr(self.config))
257
262
258 def init_hub(self):
263 def init_hub(self):
259 c = self.config
264 c = self.config
260
265
261 self.do_import_statements()
266 self.do_import_statements()
262
267
263 try:
268 try:
264 self.factory = HubFactory(config=c, log=self.log)
269 self.factory = HubFactory(config=c, log=self.log)
265 # self.start_logging()
270 # self.start_logging()
266 self.factory.init_hub()
271 self.factory.init_hub()
267 except TraitError:
272 except TraitError:
268 raise
273 raise
269 except Exception:
274 except Exception:
270 self.log.error("Couldn't construct the Controller", exc_info=True)
275 self.log.error("Couldn't construct the Controller", exc_info=True)
271 self.exit(1)
276 self.exit(1)
272
277
273 if not self.reuse_files:
278 if not self.reuse_files:
274 # save to new json config files
279 # save to new json config files
275 f = self.factory
280 f = self.factory
276 cdict = {'exec_key' : f.session.key.decode('ascii'),
281 cdict = {'exec_key' : f.session.key.decode('ascii'),
277 'ssh' : self.ssh_server,
282 'ssh' : self.ssh_server,
278 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
283 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
279 'location' : self.location
284 'location' : self.location
280 }
285 }
281 self.save_connection_dict(self.client_json_file, cdict)
286 self.save_connection_dict(self.client_json_file, cdict)
282 edict = cdict
287 edict = cdict
283 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
288 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
284 edict['ssh'] = self.engine_ssh_server
289 edict['ssh'] = self.engine_ssh_server
285 self.save_connection_dict(self.engine_json_file, edict)
290 self.save_connection_dict(self.engine_json_file, edict)
286
291
287 #
292 #
288 def init_schedulers(self):
293 def init_schedulers(self):
289 children = self.children
294 children = self.children
290 mq = import_item(str(self.mq_class))
295 mq = import_item(str(self.mq_class))
291
296
292 hub = self.factory
297 hub = self.factory
293 # disambiguate url, in case of *
298 # disambiguate url, in case of *
294 monitor_url = disambiguate_url(hub.monitor_url)
299 monitor_url = disambiguate_url(hub.monitor_url)
295 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
300 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
296 # IOPub relay (in a Process)
301 # IOPub relay (in a Process)
297 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
302 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
298 q.bind_in(hub.client_info['iopub'])
303 q.bind_in(hub.client_info['iopub'])
299 q.bind_out(hub.engine_info['iopub'])
304 q.bind_out(hub.engine_info['iopub'])
300 q.setsockopt_out(zmq.SUBSCRIBE, b'')
305 q.setsockopt_out(zmq.SUBSCRIBE, b'')
301 q.connect_mon(monitor_url)
306 q.connect_mon(monitor_url)
302 q.daemon=True
307 q.daemon=True
303 children.append(q)
308 children.append(q)
304
309
305 # Multiplexer Queue (in a Process)
310 # Multiplexer Queue (in a Process)
306 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
311 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
307 q.bind_in(hub.client_info['mux'])
312 q.bind_in(hub.client_info['mux'])
308 q.setsockopt_in(zmq.IDENTITY, b'mux')
313 q.setsockopt_in(zmq.IDENTITY, b'mux')
309 q.bind_out(hub.engine_info['mux'])
314 q.bind_out(hub.engine_info['mux'])
310 q.connect_mon(monitor_url)
315 q.connect_mon(monitor_url)
311 q.daemon=True
316 q.daemon=True
312 children.append(q)
317 children.append(q)
313
318
314 # Control Queue (in a Process)
319 # Control Queue (in a Process)
315 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
320 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
316 q.bind_in(hub.client_info['control'])
321 q.bind_in(hub.client_info['control'])
317 q.setsockopt_in(zmq.IDENTITY, b'control')
322 q.setsockopt_in(zmq.IDENTITY, b'control')
318 q.bind_out(hub.engine_info['control'])
323 q.bind_out(hub.engine_info['control'])
319 q.connect_mon(monitor_url)
324 q.connect_mon(monitor_url)
320 q.daemon=True
325 q.daemon=True
321 children.append(q)
326 children.append(q)
322 try:
327 try:
323 scheme = self.config.TaskScheduler.scheme_name
328 scheme = self.config.TaskScheduler.scheme_name
324 except AttributeError:
329 except AttributeError:
325 scheme = TaskScheduler.scheme_name.get_default_value()
330 scheme = TaskScheduler.scheme_name.get_default_value()
326 # Task Queue (in a Process)
331 # Task Queue (in a Process)
327 if scheme == 'pure':
332 if scheme == 'pure':
328 self.log.warn("task::using pure XREQ Task scheduler")
333 self.log.warn("task::using pure XREQ Task scheduler")
329 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
334 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
330 # q.setsockopt_out(zmq.HWM, hub.hwm)
335 # q.setsockopt_out(zmq.HWM, hub.hwm)
331 q.bind_in(hub.client_info['task'][1])
336 q.bind_in(hub.client_info['task'][1])
332 q.setsockopt_in(zmq.IDENTITY, b'task')
337 q.setsockopt_in(zmq.IDENTITY, b'task')
333 q.bind_out(hub.engine_info['task'])
338 q.bind_out(hub.engine_info['task'])
334 q.connect_mon(monitor_url)
339 q.connect_mon(monitor_url)
335 q.daemon=True
340 q.daemon=True
336 children.append(q)
341 children.append(q)
337 elif scheme == 'none':
342 elif scheme == 'none':
338 self.log.warn("task::using no Task scheduler")
343 self.log.warn("task::using no Task scheduler")
339
344
340 else:
345 else:
341 self.log.info("task::using Python %s Task scheduler"%scheme)
346 self.log.info("task::using Python %s Task scheduler"%scheme)
342 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
347 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
343 monitor_url, disambiguate_url(hub.client_info['notification']))
348 monitor_url, disambiguate_url(hub.client_info['notification']))
344 kwargs = dict(logname='scheduler', loglevel=self.log_level,
349 kwargs = dict(logname='scheduler', loglevel=self.log_level,
345 log_url = self.log_url, config=dict(self.config))
350 log_url = self.log_url, config=dict(self.config))
346 if 'Process' in self.mq_class:
351 if 'Process' in self.mq_class:
347 # run the Python scheduler in a Process
352 # run the Python scheduler in a Process
348 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
353 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
349 q.daemon=True
354 q.daemon=True
350 children.append(q)
355 children.append(q)
351 else:
356 else:
352 # single-threaded Controller
357 # single-threaded Controller
353 kwargs['in_thread'] = True
358 kwargs['in_thread'] = True
354 launch_scheduler(*sargs, **kwargs)
359 launch_scheduler(*sargs, **kwargs)
355
360
356
361
357 def save_urls(self):
362 def save_urls(self):
358 """save the registration urls to files."""
363 """save the registration urls to files."""
359 c = self.config
364 c = self.config
360
365
361 sec_dir = self.profile_dir.security_dir
366 sec_dir = self.profile_dir.security_dir
362 cf = self.factory
367 cf = self.factory
363
368
364 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
369 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
365 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
370 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
366
371
367 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
372 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
368 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
373 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
369
374
370
375
371 def do_import_statements(self):
376 def do_import_statements(self):
372 statements = self.import_statements
377 statements = self.import_statements
373 for s in statements:
378 for s in statements:
374 try:
379 try:
375 self.log.msg("Executing statement: '%s'" % s)
380 self.log.msg("Executing statement: '%s'" % s)
376 exec s in globals(), locals()
381 exec s in globals(), locals()
377 except:
382 except:
378 self.log.msg("Error running statement: %s" % s)
383 self.log.msg("Error running statement: %s" % s)
379
384
380 def forward_logging(self):
385 def forward_logging(self):
381 if self.log_url:
386 if self.log_url:
382 self.log.info("Forwarding logging to %s"%self.log_url)
387 self.log.info("Forwarding logging to %s"%self.log_url)
383 context = zmq.Context.instance()
388 context = zmq.Context.instance()
384 lsock = context.socket(zmq.PUB)
389 lsock = context.socket(zmq.PUB)
385 lsock.connect(self.log_url)
390 lsock.connect(self.log_url)
386 handler = PUBHandler(lsock)
391 handler = PUBHandler(lsock)
387 self.log.removeHandler(self._log_handler)
392 self.log.removeHandler(self._log_handler)
388 handler.root_topic = 'controller'
393 handler.root_topic = 'controller'
389 handler.setLevel(self.log_level)
394 handler.setLevel(self.log_level)
390 self.log.addHandler(handler)
395 self.log.addHandler(handler)
391 self._log_handler = handler
396 self._log_handler = handler
392
397
393 @catch_config_error
398 @catch_config_error
394 def initialize(self, argv=None):
399 def initialize(self, argv=None):
395 super(IPControllerApp, self).initialize(argv)
400 super(IPControllerApp, self).initialize(argv)
396 self.forward_logging()
401 self.forward_logging()
397 self.load_secondary_config()
402 self.load_secondary_config()
398 self.init_hub()
403 self.init_hub()
399 self.init_schedulers()
404 self.init_schedulers()
400
405
401 def start(self):
406 def start(self):
402 # Start the subprocesses:
407 # Start the subprocesses:
403 self.factory.start()
408 self.factory.start()
404 child_procs = []
409 child_procs = []
405 for child in self.children:
410 for child in self.children:
406 child.start()
411 child.start()
407 if isinstance(child, ProcessMonitoredQueue):
412 if isinstance(child, ProcessMonitoredQueue):
408 child_procs.append(child.launcher)
413 child_procs.append(child.launcher)
409 elif isinstance(child, Process):
414 elif isinstance(child, Process):
410 child_procs.append(child)
415 child_procs.append(child)
411 if child_procs:
416 if child_procs:
412 signal_children(child_procs)
417 signal_children(child_procs)
413
418
414 self.write_pid_file(overwrite=True)
419 self.write_pid_file(overwrite=True)
415
420
416 try:
421 try:
417 self.factory.loop.start()
422 self.factory.loop.start()
418 except KeyboardInterrupt:
423 except KeyboardInterrupt:
419 self.log.critical("Interrupted, Exiting...\n")
424 self.log.critical("Interrupted, Exiting...\n")
420
425
421
426
422
427
423 def launch_new_instance():
428 def launch_new_instance():
424 """Create and run the IPython controller"""
429 """Create and run the IPython controller"""
425 if sys.platform == 'win32':
430 if sys.platform == 'win32':
426 # make sure we don't get called from a multiprocessing subprocess
431 # make sure we don't get called from a multiprocessing subprocess
427 # this can result in infinite Controllers being started on Windows
432 # this can result in infinite Controllers being started on Windows
428 # which doesn't have a proper fork, so multiprocessing is wonky
433 # which doesn't have a proper fork, so multiprocessing is wonky
429
434
430 # this only comes up when IPython has been installed using vanilla
435 # this only comes up when IPython has been installed using vanilla
431 # setuptools, and *not* distribute.
436 # setuptools, and *not* distribute.
432 import multiprocessing
437 import multiprocessing
433 p = multiprocessing.current_process()
438 p = multiprocessing.current_process()
434 # the main process has name 'MainProcess'
439 # the main process has name 'MainProcess'
435 # subprocesses will have names like 'Process-1'
440 # subprocesses will have names like 'Process-1'
436 if p.name != 'MainProcess':
441 if p.name != 'MainProcess':
437 # we are a subprocess, don't start another Controller!
442 # we are a subprocess, don't start another Controller!
438 return
443 return
439 app = IPControllerApp.instance()
444 app = IPControllerApp.instance()
440 app.initialize()
445 app.initialize()
441 app.start()
446 app.start()
442
447
443
448
444 if __name__ == '__main__':
449 if __name__ == '__main__':
445 launch_new_instance()
450 launch_new_instance()
@@ -1,346 +1,330
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import json
24 import json
25 import os
25 import os
26 import sys
26 import sys
27 import time
27 import time
28
28
29 import zmq
29 import zmq
30 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
31
31
32 from IPython.core.profiledir import ProfileDir
32 from IPython.core.profiledir import ProfileDir
33 from IPython.parallel.apps.baseapp import (
33 from IPython.parallel.apps.baseapp import (
34 BaseParallelApplication,
34 BaseParallelApplication,
35 base_aliases,
35 base_aliases,
36 base_flags,
36 base_flags,
37 catch_config_error,
37 catch_config_error,
38 )
38 )
39 from IPython.zmq.log import EnginePUBHandler
39 from IPython.zmq.log import EnginePUBHandler
40 from IPython.zmq.session import (
40 from IPython.zmq.session import (
41 Session, session_aliases, session_flags
41 Session, session_aliases, session_flags
42 )
42 )
43
43
44 from IPython.config.configurable import Configurable
44 from IPython.config.configurable import Configurable
45
45
46 from IPython.parallel.engine.engine import EngineFactory
46 from IPython.parallel.engine.engine import EngineFactory
47 from IPython.parallel.engine.streamkernel import Kernel
47 from IPython.parallel.engine.streamkernel import Kernel
48 from IPython.parallel.util import disambiguate_url, asbytes
48 from IPython.parallel.util import disambiguate_url, asbytes
49
49
50 from IPython.utils.importstring import import_item
50 from IPython.utils.importstring import import_item
51 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
51 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
52
52
53
53
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55 # Module level variables
55 # Module level variables
56 #-----------------------------------------------------------------------------
56 #-----------------------------------------------------------------------------
57
57
58 #: The default config file name for this application
58 #: The default config file name for this application
59 default_config_file_name = u'ipengine_config.py'
59 default_config_file_name = u'ipengine_config.py'
60
60
61 _description = """Start an IPython engine for parallel computing.
61 _description = """Start an IPython engine for parallel computing.
62
62
63 IPython engines run in parallel and perform computations on behalf of a client
63 IPython engines run in parallel and perform computations on behalf of a client
64 and controller. A controller needs to be started before the engines. The
64 and controller. A controller needs to be started before the engines. The
65 engine can be configured using command line options or using a cluster
65 engine can be configured using command line options or using a cluster
66 directory. Cluster directories contain config, log and security files and are
66 directory. Cluster directories contain config, log and security files and are
67 usually located in your ipython directory and named as "profile_name".
67 usually located in your ipython directory and named as "profile_name".
68 See the `profile` and `profile-dir` options for details.
68 See the `profile` and `profile-dir` options for details.
69 """
69 """
70
70
71 _examples = """
71 _examples = """
72 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
72 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
73 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
73 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
74 """
74 """
75
75
76 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
77 # MPI configuration
77 # MPI configuration
78 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
79
79
80 mpi4py_init = """from mpi4py import MPI as mpi
80 mpi4py_init = """from mpi4py import MPI as mpi
81 mpi.size = mpi.COMM_WORLD.Get_size()
81 mpi.size = mpi.COMM_WORLD.Get_size()
82 mpi.rank = mpi.COMM_WORLD.Get_rank()
82 mpi.rank = mpi.COMM_WORLD.Get_rank()
83 """
83 """
84
84
85
85
86 pytrilinos_init = """from PyTrilinos import Epetra
86 pytrilinos_init = """from PyTrilinos import Epetra
87 class SimpleStruct:
87 class SimpleStruct:
88 pass
88 pass
89 mpi = SimpleStruct()
89 mpi = SimpleStruct()
90 mpi.rank = 0
90 mpi.rank = 0
91 mpi.size = 0
91 mpi.size = 0
92 """
92 """
93
93
94 class MPI(Configurable):
94 class MPI(Configurable):
95 """Configurable for MPI initialization"""
95 """Configurable for MPI initialization"""
96 use = Unicode('', config=True,
96 use = Unicode('', config=True,
97 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
97 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
98 )
98 )
99
99
100 def _use_changed(self, name, old, new):
100 def _use_changed(self, name, old, new):
101 # load default init script if it's not set
101 # load default init script if it's not set
102 if not self.init_script:
102 if not self.init_script:
103 self.init_script = self.default_inits.get(new, '')
103 self.init_script = self.default_inits.get(new, '')
104
104
105 init_script = Unicode('', config=True,
105 init_script = Unicode('', config=True,
106 help="Initialization code for MPI")
106 help="Initialization code for MPI")
107
107
108 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
108 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
109 config=True)
109 config=True)
110
110
111
111
112 #-----------------------------------------------------------------------------
112 #-----------------------------------------------------------------------------
113 # Main application
113 # Main application
114 #-----------------------------------------------------------------------------
114 #-----------------------------------------------------------------------------
115 aliases = dict(
115 aliases = dict(
116 file = 'IPEngineApp.url_file',
116 file = 'IPEngineApp.url_file',
117 c = 'IPEngineApp.startup_command',
117 c = 'IPEngineApp.startup_command',
118 s = 'IPEngineApp.startup_script',
118 s = 'IPEngineApp.startup_script',
119
119
120 url = 'EngineFactory.url',
120 url = 'EngineFactory.url',
121 ssh = 'EngineFactory.sshserver',
121 ssh = 'EngineFactory.sshserver',
122 sshkey = 'EngineFactory.sshkey',
122 sshkey = 'EngineFactory.sshkey',
123 ip = 'EngineFactory.ip',
123 ip = 'EngineFactory.ip',
124 transport = 'EngineFactory.transport',
124 transport = 'EngineFactory.transport',
125 port = 'EngineFactory.regport',
125 port = 'EngineFactory.regport',
126 location = 'EngineFactory.location',
126 location = 'EngineFactory.location',
127
127
128 timeout = 'EngineFactory.timeout',
128 timeout = 'EngineFactory.timeout',
129
129
130 mpi = 'MPI.use',
130 mpi = 'MPI.use',
131
131
132 )
132 )
133 aliases.update(base_aliases)
133 aliases.update(base_aliases)
134 aliases.update(session_aliases)
134 aliases.update(session_aliases)
135 flags = {}
135 flags = {}
136 flags.update(base_flags)
136 flags.update(base_flags)
137 flags.update(session_flags)
137 flags.update(session_flags)
138
138
139 class IPEngineApp(BaseParallelApplication):
139 class IPEngineApp(BaseParallelApplication):
140
140
141 name = 'ipengine'
141 name = 'ipengine'
142 description = _description
142 description = _description
143 examples = _examples
143 examples = _examples
144 config_file_name = Unicode(default_config_file_name)
144 config_file_name = Unicode(default_config_file_name)
145 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
145 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
146
146
147 startup_script = Unicode(u'', config=True,
147 startup_script = Unicode(u'', config=True,
148 help='specify a script to be run at startup')
148 help='specify a script to be run at startup')
149 startup_command = Unicode('', config=True,
149 startup_command = Unicode('', config=True,
150 help='specify a command to be run at startup')
150 help='specify a command to be run at startup')
151
151
152 url_file = Unicode(u'', config=True,
152 url_file = Unicode(u'', config=True,
153 help="""The full location of the file containing the connection information for
153 help="""The full location of the file containing the connection information for
154 the controller. If this is not given, the file must be in the
154 the controller. If this is not given, the file must be in the
155 security directory of the cluster directory. This location is
155 security directory of the cluster directory. This location is
156 resolved using the `profile` or `profile_dir` options.""",
156 resolved using the `profile` or `profile_dir` options.""",
157 )
157 )
158 wait_for_url_file = Float(5, config=True,
158 wait_for_url_file = Float(5, config=True,
159 help="""The maximum number of seconds to wait for url_file to exist.
159 help="""The maximum number of seconds to wait for url_file to exist.
160 This is useful for batch-systems and shared-filesystems where the
160 This is useful for batch-systems and shared-filesystems where the
161 controller and engine are started at the same time and it
161 controller and engine are started at the same time and it
162 may take a moment for the controller to write the connector files.""")
162 may take a moment for the controller to write the connector files.""")
163
163
164 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
164 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
165
165
166 def _cluster_id_changed(self, name, old, new):
166 def _cluster_id_changed(self, name, old, new):
167 if new:
167 if new:
168 base = 'ipcontroller-%s' % new
168 base = 'ipcontroller-%s' % new
169 else:
169 else:
170 base = 'ipcontroller'
170 base = 'ipcontroller'
171 self.url_file_name = "%s-engine.json" % base
171 self.url_file_name = "%s-engine.json" % base
172
172
173 log_url = Unicode('', config=True,
173 log_url = Unicode('', config=True,
174 help="""The URL for the iploggerapp instance, for forwarding
174 help="""The URL for the iploggerapp instance, for forwarding
175 logging to a central location.""")
175 logging to a central location.""")
176
176
177 aliases = Dict(aliases)
177 aliases = Dict(aliases)
178 flags = Dict(flags)
178 flags = Dict(flags)
179
179
180 # def find_key_file(self):
181 # """Set the key file.
182 #
183 # Here we don't try to actually see if it exists for is valid as that
184 # is hadled by the connection logic.
185 # """
186 # config = self.master_config
187 # # Find the actual controller key file
188 # if not config.Global.key_file:
189 # try_this = os.path.join(
190 # config.Global.profile_dir,
191 # config.Global.security_dir,
192 # config.Global.key_file_name
193 # )
194 # config.Global.key_file = try_this
195
196 def find_url_file(self):
180 def find_url_file(self):
197 """Set the url file.
181 """Set the url file.
198
182
199 Here we don't try to actually see if it exists for is valid as that
183 Here we don't try to actually see if it exists for is valid as that
200 is hadled by the connection logic.
184 is hadled by the connection logic.
201 """
185 """
202 config = self.config
186 config = self.config
203 # Find the actual controller key file
187 # Find the actual controller key file
204 if not self.url_file:
188 if not self.url_file:
205 self.url_file = os.path.join(
189 self.url_file = os.path.join(
206 self.profile_dir.security_dir,
190 self.profile_dir.security_dir,
207 self.url_file_name
191 self.url_file_name
208 )
192 )
209
193
210 def load_connector_file(self):
194 def load_connector_file(self):
211 """load config from a JSON connector file,
195 """load config from a JSON connector file,
212 at a *lower* priority than command-line/config files.
196 at a *lower* priority than command-line/config files.
213 """
197 """
214
198
215 self.log.info("Loading url_file %r"%self.url_file)
199 self.log.info("Loading url_file %r", self.url_file)
216 config = self.config
200 config = self.config
217
201
218 with open(self.url_file) as f:
202 with open(self.url_file) as f:
219 d = json.loads(f.read())
203 d = json.loads(f.read())
220
204
221 if 'exec_key' in d:
205 if 'exec_key' in d:
222 config.Session.key = asbytes(d['exec_key'])
206 config.Session.key = asbytes(d['exec_key'])
223
207
224 try:
208 try:
225 config.EngineFactory.location
209 config.EngineFactory.location
226 except AttributeError:
210 except AttributeError:
227 config.EngineFactory.location = d['location']
211 config.EngineFactory.location = d['location']
228
212
229 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
213 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
230 try:
214 try:
231 config.EngineFactory.url
215 config.EngineFactory.url
232 except AttributeError:
216 except AttributeError:
233 config.EngineFactory.url = d['url']
217 config.EngineFactory.url = d['url']
234
218
235 try:
219 try:
236 config.EngineFactory.sshserver
220 config.EngineFactory.sshserver
237 except AttributeError:
221 except AttributeError:
238 config.EngineFactory.sshserver = d['ssh']
222 config.EngineFactory.sshserver = d['ssh']
239
223
240 def init_engine(self):
224 def init_engine(self):
241 # This is the working dir by now.
225 # This is the working dir by now.
242 sys.path.insert(0, '')
226 sys.path.insert(0, '')
243 config = self.config
227 config = self.config
244 # print config
228 # print config
245 self.find_url_file()
229 self.find_url_file()
246
230
247 # was the url manually specified?
231 # was the url manually specified?
248 keys = set(self.config.EngineFactory.keys())
232 keys = set(self.config.EngineFactory.keys())
249 keys = keys.union(set(self.config.RegistrationFactory.keys()))
233 keys = keys.union(set(self.config.RegistrationFactory.keys()))
250
234
251 if keys.intersection(set(['ip', 'url', 'port'])):
235 if keys.intersection(set(['ip', 'url', 'port'])):
252 # Connection info was specified, don't wait for the file
236 # Connection info was specified, don't wait for the file
253 url_specified = True
237 url_specified = True
254 self.wait_for_url_file = 0
238 self.wait_for_url_file = 0
255 else:
239 else:
256 url_specified = False
240 url_specified = False
257
241
258 if self.wait_for_url_file and not os.path.exists(self.url_file):
242 if self.wait_for_url_file and not os.path.exists(self.url_file):
259 self.log.warn("url_file %r not found"%self.url_file)
243 self.log.warn("url_file %r not found", self.url_file)
260 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
244 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
261 tic = time.time()
245 tic = time.time()
262 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
246 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
263 # wait for url_file to exist, for up to 10 seconds
247 # wait for url_file to exist, or until time limit
264 time.sleep(0.1)
248 time.sleep(0.1)
265
249
266 if os.path.exists(self.url_file):
250 if os.path.exists(self.url_file):
267 self.load_connector_file()
251 self.load_connector_file()
268 elif not url_specified:
252 elif not url_specified:
269 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
253 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
270 self.exit(1)
254 self.exit(1)
271
255
272
256
273 try:
257 try:
274 exec_lines = config.Kernel.exec_lines
258 exec_lines = config.Kernel.exec_lines
275 except AttributeError:
259 except AttributeError:
276 config.Kernel.exec_lines = []
260 config.Kernel.exec_lines = []
277 exec_lines = config.Kernel.exec_lines
261 exec_lines = config.Kernel.exec_lines
278
262
279 if self.startup_script:
263 if self.startup_script:
280 enc = sys.getfilesystemencoding() or 'utf8'
264 enc = sys.getfilesystemencoding() or 'utf8'
281 cmd="execfile(%r)"%self.startup_script.encode(enc)
265 cmd="execfile(%r)" % self.startup_script.encode(enc)
282 exec_lines.append(cmd)
266 exec_lines.append(cmd)
283 if self.startup_command:
267 if self.startup_command:
284 exec_lines.append(self.startup_command)
268 exec_lines.append(self.startup_command)
285
269
286 # Create the underlying shell class and Engine
270 # Create the underlying shell class and Engine
287 # shell_class = import_item(self.master_config.Global.shell_class)
271 # shell_class = import_item(self.master_config.Global.shell_class)
288 # print self.config
272 # print self.config
289 try:
273 try:
290 self.engine = EngineFactory(config=config, log=self.log)
274 self.engine = EngineFactory(config=config, log=self.log)
291 except:
275 except:
292 self.log.error("Couldn't start the Engine", exc_info=True)
276 self.log.error("Couldn't start the Engine", exc_info=True)
293 self.exit(1)
277 self.exit(1)
294
278
295 def forward_logging(self):
279 def forward_logging(self):
296 if self.log_url:
280 if self.log_url:
297 self.log.info("Forwarding logging to %s"%self.log_url)
281 self.log.info("Forwarding logging to %s", self.log_url)
298 context = self.engine.context
282 context = self.engine.context
299 lsock = context.socket(zmq.PUB)
283 lsock = context.socket(zmq.PUB)
300 lsock.connect(self.log_url)
284 lsock.connect(self.log_url)
301 self.log.removeHandler(self._log_handler)
285 self.log.removeHandler(self._log_handler)
302 handler = EnginePUBHandler(self.engine, lsock)
286 handler = EnginePUBHandler(self.engine, lsock)
303 handler.setLevel(self.log_level)
287 handler.setLevel(self.log_level)
304 self.log.addHandler(handler)
288 self.log.addHandler(handler)
305 self._log_handler = handler
289 self._log_handler = handler
306
290
307 def init_mpi(self):
291 def init_mpi(self):
308 global mpi
292 global mpi
309 self.mpi = MPI(config=self.config)
293 self.mpi = MPI(config=self.config)
310
294
311 mpi_import_statement = self.mpi.init_script
295 mpi_import_statement = self.mpi.init_script
312 if mpi_import_statement:
296 if mpi_import_statement:
313 try:
297 try:
314 self.log.info("Initializing MPI:")
298 self.log.info("Initializing MPI:")
315 self.log.info(mpi_import_statement)
299 self.log.info(mpi_import_statement)
316 exec mpi_import_statement in globals()
300 exec mpi_import_statement in globals()
317 except:
301 except:
318 mpi = None
302 mpi = None
319 else:
303 else:
320 mpi = None
304 mpi = None
321
305
322 @catch_config_error
306 @catch_config_error
323 def initialize(self, argv=None):
307 def initialize(self, argv=None):
324 super(IPEngineApp, self).initialize(argv)
308 super(IPEngineApp, self).initialize(argv)
325 self.init_mpi()
309 self.init_mpi()
326 self.init_engine()
310 self.init_engine()
327 self.forward_logging()
311 self.forward_logging()
328
312
329 def start(self):
313 def start(self):
330 self.engine.start()
314 self.engine.start()
331 try:
315 try:
332 self.engine.loop.start()
316 self.engine.loop.start()
333 except KeyboardInterrupt:
317 except KeyboardInterrupt:
334 self.log.critical("Engine Interrupted, shutting down...\n")
318 self.log.critical("Engine Interrupted, shutting down...\n")
335
319
336
320
337 def launch_new_instance():
321 def launch_new_instance():
338 """Create and run the IPython engine"""
322 """Create and run the IPython engine"""
339 app = IPEngineApp.instance()
323 app = IPEngineApp.instance()
340 app.initialize()
324 app.initialize()
341 app.start()
325 app.start()
342
326
343
327
344 if __name__ == '__main__':
328 if __name__ == '__main__':
345 launch_new_instance()
329 launch_new_instance()
346
330
General Comments 0
You need to be logged in to leave comments. Login now