##// END OF EJS Templates
json/jsonapi cleanup...
MinRK -
Show More
@@ -1,449 +1,445 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import json
26 import os
27 import os
27 import socket
28 import socket
28 import stat
29 import stat
29 import sys
30 import sys
30
31
31 from multiprocessing import Process
32 from multiprocessing import Process
32
33
33 import zmq
34 import zmq
34 from zmq.devices import ProcessMonitoredQueue
35 from zmq.devices import ProcessMonitoredQueue
35 from zmq.log.handlers import PUBHandler
36 from zmq.log.handlers import PUBHandler
36
37
37 # Note: use our own import to work around jsonlib api mismatch. When these
38 # changes propagate to zmq, revert back to the following line instead:
39 #from zmq.utils import jsonapi as json
40 from IPython.zmq import jsonapi as json
41
42 from IPython.core.profiledir import ProfileDir
38 from IPython.core.profiledir import ProfileDir
43
39
44 from IPython.parallel.apps.baseapp import (
40 from IPython.parallel.apps.baseapp import (
45 BaseParallelApplication,
41 BaseParallelApplication,
46 base_aliases,
42 base_aliases,
47 base_flags,
43 base_flags,
48 catch_config_error,
44 catch_config_error,
49 )
45 )
50 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
51 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
47 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
52
48
53 from IPython.zmq.session import (
49 from IPython.zmq.session import (
54 Session, session_aliases, session_flags, default_secure
50 Session, session_aliases, session_flags, default_secure
55 )
51 )
56
52
57 from IPython.parallel.controller.heartmonitor import HeartMonitor
53 from IPython.parallel.controller.heartmonitor import HeartMonitor
58 from IPython.parallel.controller.hub import HubFactory
54 from IPython.parallel.controller.hub import HubFactory
59 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
60 from IPython.parallel.controller.sqlitedb import SQLiteDB
56 from IPython.parallel.controller.sqlitedb import SQLiteDB
61
57
62 from IPython.parallel.util import signal_children, split_url, asbytes, disambiguate_url
58 from IPython.parallel.util import signal_children, split_url, asbytes, disambiguate_url
63
59
64 # conditional import of MongoDB backend class
60 # conditional import of MongoDB backend class
65
61
66 try:
62 try:
67 from IPython.parallel.controller.mongodb import MongoDB
63 from IPython.parallel.controller.mongodb import MongoDB
68 except ImportError:
64 except ImportError:
69 maybe_mongo = []
65 maybe_mongo = []
70 else:
66 else:
71 maybe_mongo = [MongoDB]
67 maybe_mongo = [MongoDB]
72
68
73
69
74 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
75 # Module level variables
71 # Module level variables
76 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
77
73
78
74
79 #: The default config file name for this application
75 #: The default config file name for this application
80 default_config_file_name = u'ipcontroller_config.py'
76 default_config_file_name = u'ipcontroller_config.py'
81
77
82
78
83 _description = """Start the IPython controller for parallel computing.
79 _description = """Start the IPython controller for parallel computing.
84
80
85 The IPython controller provides a gateway between the IPython engines and
81 The IPython controller provides a gateway between the IPython engines and
86 clients. The controller needs to be started before the engines and can be
82 clients. The controller needs to be started before the engines and can be
87 configured using command line options or using a cluster directory. Cluster
83 configured using command line options or using a cluster directory. Cluster
88 directories contain config, log and security files and are usually located in
84 directories contain config, log and security files and are usually located in
89 your ipython directory and named as "profile_name". See the `profile`
85 your ipython directory and named as "profile_name". See the `profile`
90 and `profile-dir` options for details.
86 and `profile-dir` options for details.
91 """
87 """
92
88
93 _examples = """
89 _examples = """
94 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
90 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
95 ipcontroller --scheme=pure # use the pure zeromq scheduler
91 ipcontroller --scheme=pure # use the pure zeromq scheduler
96 """
92 """
97
93
98
94
99 #-----------------------------------------------------------------------------
95 #-----------------------------------------------------------------------------
100 # The main application
96 # The main application
101 #-----------------------------------------------------------------------------
97 #-----------------------------------------------------------------------------
102 flags = {}
98 flags = {}
103 flags.update(base_flags)
99 flags.update(base_flags)
104 flags.update({
100 flags.update({
105 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
101 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
106 'Use threads instead of processes for the schedulers'),
102 'Use threads instead of processes for the schedulers'),
107 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
103 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
108 'use the SQLiteDB backend'),
104 'use the SQLiteDB backend'),
109 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
105 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
110 'use the MongoDB backend'),
106 'use the MongoDB backend'),
111 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
107 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
112 'use the in-memory DictDB backend'),
108 'use the in-memory DictDB backend'),
113 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
109 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
114 'reuse existing json connection files')
110 'reuse existing json connection files')
115 })
111 })
116
112
117 flags.update(session_flags)
113 flags.update(session_flags)
118
114
119 aliases = dict(
115 aliases = dict(
120 ssh = 'IPControllerApp.ssh_server',
116 ssh = 'IPControllerApp.ssh_server',
121 enginessh = 'IPControllerApp.engine_ssh_server',
117 enginessh = 'IPControllerApp.engine_ssh_server',
122 location = 'IPControllerApp.location',
118 location = 'IPControllerApp.location',
123
119
124 url = 'HubFactory.url',
120 url = 'HubFactory.url',
125 ip = 'HubFactory.ip',
121 ip = 'HubFactory.ip',
126 transport = 'HubFactory.transport',
122 transport = 'HubFactory.transport',
127 port = 'HubFactory.regport',
123 port = 'HubFactory.regport',
128
124
129 ping = 'HeartMonitor.period',
125 ping = 'HeartMonitor.period',
130
126
131 scheme = 'TaskScheduler.scheme_name',
127 scheme = 'TaskScheduler.scheme_name',
132 hwm = 'TaskScheduler.hwm',
128 hwm = 'TaskScheduler.hwm',
133 )
129 )
134 aliases.update(base_aliases)
130 aliases.update(base_aliases)
135 aliases.update(session_aliases)
131 aliases.update(session_aliases)
136
132
137
133
138 class IPControllerApp(BaseParallelApplication):
134 class IPControllerApp(BaseParallelApplication):
139
135
140 name = u'ipcontroller'
136 name = u'ipcontroller'
141 description = _description
137 description = _description
142 examples = _examples
138 examples = _examples
143 config_file_name = Unicode(default_config_file_name)
139 config_file_name = Unicode(default_config_file_name)
144 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
140 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
145
141
146 # change default to True
142 # change default to True
147 auto_create = Bool(True, config=True,
143 auto_create = Bool(True, config=True,
148 help="""Whether to create profile dir if it doesn't exist.""")
144 help="""Whether to create profile dir if it doesn't exist.""")
149
145
150 reuse_files = Bool(False, config=True,
146 reuse_files = Bool(False, config=True,
151 help='Whether to reuse existing json connection files.'
147 help='Whether to reuse existing json connection files.'
152 )
148 )
153 ssh_server = Unicode(u'', config=True,
149 ssh_server = Unicode(u'', config=True,
154 help="""ssh url for clients to use when connecting to the Controller
150 help="""ssh url for clients to use when connecting to the Controller
155 processes. It should be of the form: [user@]server[:port]. The
151 processes. It should be of the form: [user@]server[:port]. The
156 Controller's listening addresses must be accessible from the ssh server""",
152 Controller's listening addresses must be accessible from the ssh server""",
157 )
153 )
158 engine_ssh_server = Unicode(u'', config=True,
154 engine_ssh_server = Unicode(u'', config=True,
159 help="""ssh url for engines to use when connecting to the Controller
155 help="""ssh url for engines to use when connecting to the Controller
160 processes. It should be of the form: [user@]server[:port]. The
156 processes. It should be of the form: [user@]server[:port]. The
161 Controller's listening addresses must be accessible from the ssh server""",
157 Controller's listening addresses must be accessible from the ssh server""",
162 )
158 )
163 location = Unicode(u'', config=True,
159 location = Unicode(u'', config=True,
164 help="""The external IP or domain name of the Controller, used for disambiguating
160 help="""The external IP or domain name of the Controller, used for disambiguating
165 engine and client connections.""",
161 engine and client connections.""",
166 )
162 )
167 import_statements = List([], config=True,
163 import_statements = List([], config=True,
168 help="import statements to be run at startup. Necessary in some environments"
164 help="import statements to be run at startup. Necessary in some environments"
169 )
165 )
170
166
171 use_threads = Bool(False, config=True,
167 use_threads = Bool(False, config=True,
172 help='Use threads instead of processes for the schedulers',
168 help='Use threads instead of processes for the schedulers',
173 )
169 )
174
170
175 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
171 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
176 help="JSON filename where engine connection info will be stored.")
172 help="JSON filename where engine connection info will be stored.")
177 client_json_file = Unicode('ipcontroller-client.json', config=True,
173 client_json_file = Unicode('ipcontroller-client.json', config=True,
178 help="JSON filename where client connection info will be stored.")
174 help="JSON filename where client connection info will be stored.")
179
175
180 def _cluster_id_changed(self, name, old, new):
176 def _cluster_id_changed(self, name, old, new):
181 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
177 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
182 self.engine_json_file = "%s-engine.json" % self.name
178 self.engine_json_file = "%s-engine.json" % self.name
183 self.client_json_file = "%s-client.json" % self.name
179 self.client_json_file = "%s-client.json" % self.name
184
180
185
181
186 # internal
182 # internal
187 children = List()
183 children = List()
188 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
184 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
189
185
190 def _use_threads_changed(self, name, old, new):
186 def _use_threads_changed(self, name, old, new):
191 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
187 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
192
188
193 aliases = Dict(aliases)
189 aliases = Dict(aliases)
194 flags = Dict(flags)
190 flags = Dict(flags)
195
191
196
192
197 def save_connection_dict(self, fname, cdict):
193 def save_connection_dict(self, fname, cdict):
198 """save a connection dict to json file."""
194 """save a connection dict to json file."""
199 c = self.config
195 c = self.config
200 url = cdict['url']
196 url = cdict['url']
201 location = cdict['location']
197 location = cdict['location']
202 if not location:
198 if not location:
203 try:
199 try:
204 proto,ip,port = split_url(url)
200 proto,ip,port = split_url(url)
205 except AssertionError:
201 except AssertionError:
206 pass
202 pass
207 else:
203 else:
208 try:
204 try:
209 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
205 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
210 except (socket.gaierror, IndexError):
206 except (socket.gaierror, IndexError):
211 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
207 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
212 " You may need to specify '--location=<external_ip_address>' to help"
208 " You may need to specify '--location=<external_ip_address>' to help"
213 " IPython decide when to connect via loopback.")
209 " IPython decide when to connect via loopback.")
214 location = '127.0.0.1'
210 location = '127.0.0.1'
215 cdict['location'] = location
211 cdict['location'] = location
216 fname = os.path.join(self.profile_dir.security_dir, fname)
212 fname = os.path.join(self.profile_dir.security_dir, fname)
217 with open(fname, 'wb') as f:
213 with open(fname, 'w') as f:
218 f.write(json.dumps(cdict, indent=2))
214 f.write(json.dumps(cdict, indent=2))
219 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
215 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
220
216
221 def load_config_from_json(self):
217 def load_config_from_json(self):
222 """load config from existing json connector files."""
218 """load config from existing json connector files."""
223 c = self.config
219 c = self.config
224 self.log.debug("loading config from JSON")
220 self.log.debug("loading config from JSON")
225 # load from engine config
221 # load from engine config
226 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
222 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
227 cfg = json.loads(f.read())
223 cfg = json.loads(f.read())
228 key = c.Session.key = asbytes(cfg['exec_key'])
224 key = c.Session.key = asbytes(cfg['exec_key'])
229 xport,addr = cfg['url'].split('://')
225 xport,addr = cfg['url'].split('://')
230 c.HubFactory.engine_transport = xport
226 c.HubFactory.engine_transport = xport
231 ip,ports = addr.split(':')
227 ip,ports = addr.split(':')
232 c.HubFactory.engine_ip = ip
228 c.HubFactory.engine_ip = ip
233 c.HubFactory.regport = int(ports)
229 c.HubFactory.regport = int(ports)
234 self.location = cfg['location']
230 self.location = cfg['location']
235 if not self.engine_ssh_server:
231 if not self.engine_ssh_server:
236 self.engine_ssh_server = cfg['ssh']
232 self.engine_ssh_server = cfg['ssh']
237 # load client config
233 # load client config
238 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
234 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
239 cfg = json.loads(f.read())
235 cfg = json.loads(f.read())
240 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
236 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
241 xport,addr = cfg['url'].split('://')
237 xport,addr = cfg['url'].split('://')
242 c.HubFactory.client_transport = xport
238 c.HubFactory.client_transport = xport
243 ip,ports = addr.split(':')
239 ip,ports = addr.split(':')
244 c.HubFactory.client_ip = ip
240 c.HubFactory.client_ip = ip
245 if not self.ssh_server:
241 if not self.ssh_server:
246 self.ssh_server = cfg['ssh']
242 self.ssh_server = cfg['ssh']
247 assert int(ports) == c.HubFactory.regport, "regport mismatch"
243 assert int(ports) == c.HubFactory.regport, "regport mismatch"
248
244
249 def load_secondary_config(self):
245 def load_secondary_config(self):
250 """secondary config, loading from JSON and setting defaults"""
246 """secondary config, loading from JSON and setting defaults"""
251 if self.reuse_files:
247 if self.reuse_files:
252 try:
248 try:
253 self.load_config_from_json()
249 self.load_config_from_json()
254 except (AssertionError,IOError) as e:
250 except (AssertionError,IOError) as e:
255 self.log.error("Could not load config from JSON: %s" % e)
251 self.log.error("Could not load config from JSON: %s" % e)
256 self.reuse_files=False
252 self.reuse_files=False
257 # switch Session.key default to secure
253 # switch Session.key default to secure
258 default_secure(self.config)
254 default_secure(self.config)
259 self.log.debug("Config changed")
255 self.log.debug("Config changed")
260 self.log.debug(repr(self.config))
256 self.log.debug(repr(self.config))
261
257
262 def init_hub(self):
258 def init_hub(self):
263 c = self.config
259 c = self.config
264
260
265 self.do_import_statements()
261 self.do_import_statements()
266
262
267 try:
263 try:
268 self.factory = HubFactory(config=c, log=self.log)
264 self.factory = HubFactory(config=c, log=self.log)
269 # self.start_logging()
265 # self.start_logging()
270 self.factory.init_hub()
266 self.factory.init_hub()
271 except TraitError:
267 except TraitError:
272 raise
268 raise
273 except Exception:
269 except Exception:
274 self.log.error("Couldn't construct the Controller", exc_info=True)
270 self.log.error("Couldn't construct the Controller", exc_info=True)
275 self.exit(1)
271 self.exit(1)
276
272
277 if not self.reuse_files:
273 if not self.reuse_files:
278 # save to new json config files
274 # save to new json config files
279 f = self.factory
275 f = self.factory
280 cdict = {'exec_key' : f.session.key.decode('ascii'),
276 cdict = {'exec_key' : f.session.key.decode('ascii'),
281 'ssh' : self.ssh_server,
277 'ssh' : self.ssh_server,
282 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
278 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
283 'location' : self.location
279 'location' : self.location
284 }
280 }
285 self.save_connection_dict(self.client_json_file, cdict)
281 self.save_connection_dict(self.client_json_file, cdict)
286 edict = cdict
282 edict = cdict
287 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
283 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
288 edict['ssh'] = self.engine_ssh_server
284 edict['ssh'] = self.engine_ssh_server
289 self.save_connection_dict(self.engine_json_file, edict)
285 self.save_connection_dict(self.engine_json_file, edict)
290
286
291 #
287 #
292 def init_schedulers(self):
288 def init_schedulers(self):
293 children = self.children
289 children = self.children
294 mq = import_item(str(self.mq_class))
290 mq = import_item(str(self.mq_class))
295
291
296 hub = self.factory
292 hub = self.factory
297 # disambiguate url, in case of *
293 # disambiguate url, in case of *
298 monitor_url = disambiguate_url(hub.monitor_url)
294 monitor_url = disambiguate_url(hub.monitor_url)
299 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
295 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
300 # IOPub relay (in a Process)
296 # IOPub relay (in a Process)
301 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
297 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
302 q.bind_in(hub.client_info['iopub'])
298 q.bind_in(hub.client_info['iopub'])
303 q.bind_out(hub.engine_info['iopub'])
299 q.bind_out(hub.engine_info['iopub'])
304 q.setsockopt_out(zmq.SUBSCRIBE, b'')
300 q.setsockopt_out(zmq.SUBSCRIBE, b'')
305 q.connect_mon(monitor_url)
301 q.connect_mon(monitor_url)
306 q.daemon=True
302 q.daemon=True
307 children.append(q)
303 children.append(q)
308
304
309 # Multiplexer Queue (in a Process)
305 # Multiplexer Queue (in a Process)
310 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
306 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
311 q.bind_in(hub.client_info['mux'])
307 q.bind_in(hub.client_info['mux'])
312 q.setsockopt_in(zmq.IDENTITY, b'mux')
308 q.setsockopt_in(zmq.IDENTITY, b'mux')
313 q.bind_out(hub.engine_info['mux'])
309 q.bind_out(hub.engine_info['mux'])
314 q.connect_mon(monitor_url)
310 q.connect_mon(monitor_url)
315 q.daemon=True
311 q.daemon=True
316 children.append(q)
312 children.append(q)
317
313
318 # Control Queue (in a Process)
314 # Control Queue (in a Process)
319 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
315 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
320 q.bind_in(hub.client_info['control'])
316 q.bind_in(hub.client_info['control'])
321 q.setsockopt_in(zmq.IDENTITY, b'control')
317 q.setsockopt_in(zmq.IDENTITY, b'control')
322 q.bind_out(hub.engine_info['control'])
318 q.bind_out(hub.engine_info['control'])
323 q.connect_mon(monitor_url)
319 q.connect_mon(monitor_url)
324 q.daemon=True
320 q.daemon=True
325 children.append(q)
321 children.append(q)
326 try:
322 try:
327 scheme = self.config.TaskScheduler.scheme_name
323 scheme = self.config.TaskScheduler.scheme_name
328 except AttributeError:
324 except AttributeError:
329 scheme = TaskScheduler.scheme_name.get_default_value()
325 scheme = TaskScheduler.scheme_name.get_default_value()
330 # Task Queue (in a Process)
326 # Task Queue (in a Process)
331 if scheme == 'pure':
327 if scheme == 'pure':
332 self.log.warn("task::using pure XREQ Task scheduler")
328 self.log.warn("task::using pure XREQ Task scheduler")
333 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
329 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
334 # q.setsockopt_out(zmq.HWM, hub.hwm)
330 # q.setsockopt_out(zmq.HWM, hub.hwm)
335 q.bind_in(hub.client_info['task'][1])
331 q.bind_in(hub.client_info['task'][1])
336 q.setsockopt_in(zmq.IDENTITY, b'task')
332 q.setsockopt_in(zmq.IDENTITY, b'task')
337 q.bind_out(hub.engine_info['task'])
333 q.bind_out(hub.engine_info['task'])
338 q.connect_mon(monitor_url)
334 q.connect_mon(monitor_url)
339 q.daemon=True
335 q.daemon=True
340 children.append(q)
336 children.append(q)
341 elif scheme == 'none':
337 elif scheme == 'none':
342 self.log.warn("task::using no Task scheduler")
338 self.log.warn("task::using no Task scheduler")
343
339
344 else:
340 else:
345 self.log.info("task::using Python %s Task scheduler"%scheme)
341 self.log.info("task::using Python %s Task scheduler"%scheme)
346 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
342 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
347 monitor_url, disambiguate_url(hub.client_info['notification']))
343 monitor_url, disambiguate_url(hub.client_info['notification']))
348 kwargs = dict(logname='scheduler', loglevel=self.log_level,
344 kwargs = dict(logname='scheduler', loglevel=self.log_level,
349 log_url = self.log_url, config=dict(self.config))
345 log_url = self.log_url, config=dict(self.config))
350 if 'Process' in self.mq_class:
346 if 'Process' in self.mq_class:
351 # run the Python scheduler in a Process
347 # run the Python scheduler in a Process
352 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
348 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
353 q.daemon=True
349 q.daemon=True
354 children.append(q)
350 children.append(q)
355 else:
351 else:
356 # single-threaded Controller
352 # single-threaded Controller
357 kwargs['in_thread'] = True
353 kwargs['in_thread'] = True
358 launch_scheduler(*sargs, **kwargs)
354 launch_scheduler(*sargs, **kwargs)
359
355
360
356
361 def save_urls(self):
357 def save_urls(self):
362 """save the registration urls to files."""
358 """save the registration urls to files."""
363 c = self.config
359 c = self.config
364
360
365 sec_dir = self.profile_dir.security_dir
361 sec_dir = self.profile_dir.security_dir
366 cf = self.factory
362 cf = self.factory
367
363
368 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
364 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
369 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
365 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
370
366
371 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
367 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
372 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
368 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
373
369
374
370
375 def do_import_statements(self):
371 def do_import_statements(self):
376 statements = self.import_statements
372 statements = self.import_statements
377 for s in statements:
373 for s in statements:
378 try:
374 try:
379 self.log.msg("Executing statement: '%s'" % s)
375 self.log.msg("Executing statement: '%s'" % s)
380 exec s in globals(), locals()
376 exec s in globals(), locals()
381 except:
377 except:
382 self.log.msg("Error running statement: %s" % s)
378 self.log.msg("Error running statement: %s" % s)
383
379
384 def forward_logging(self):
380 def forward_logging(self):
385 if self.log_url:
381 if self.log_url:
386 self.log.info("Forwarding logging to %s"%self.log_url)
382 self.log.info("Forwarding logging to %s"%self.log_url)
387 context = zmq.Context.instance()
383 context = zmq.Context.instance()
388 lsock = context.socket(zmq.PUB)
384 lsock = context.socket(zmq.PUB)
389 lsock.connect(self.log_url)
385 lsock.connect(self.log_url)
390 handler = PUBHandler(lsock)
386 handler = PUBHandler(lsock)
391 self.log.removeHandler(self._log_handler)
387 self.log.removeHandler(self._log_handler)
392 handler.root_topic = 'controller'
388 handler.root_topic = 'controller'
393 handler.setLevel(self.log_level)
389 handler.setLevel(self.log_level)
394 self.log.addHandler(handler)
390 self.log.addHandler(handler)
395 self._log_handler = handler
391 self._log_handler = handler
396
392
397 @catch_config_error
393 @catch_config_error
398 def initialize(self, argv=None):
394 def initialize(self, argv=None):
399 super(IPControllerApp, self).initialize(argv)
395 super(IPControllerApp, self).initialize(argv)
400 self.forward_logging()
396 self.forward_logging()
401 self.load_secondary_config()
397 self.load_secondary_config()
402 self.init_hub()
398 self.init_hub()
403 self.init_schedulers()
399 self.init_schedulers()
404
400
405 def start(self):
401 def start(self):
406 # Start the subprocesses:
402 # Start the subprocesses:
407 self.factory.start()
403 self.factory.start()
408 child_procs = []
404 child_procs = []
409 for child in self.children:
405 for child in self.children:
410 child.start()
406 child.start()
411 if isinstance(child, ProcessMonitoredQueue):
407 if isinstance(child, ProcessMonitoredQueue):
412 child_procs.append(child.launcher)
408 child_procs.append(child.launcher)
413 elif isinstance(child, Process):
409 elif isinstance(child, Process):
414 child_procs.append(child)
410 child_procs.append(child)
415 if child_procs:
411 if child_procs:
416 signal_children(child_procs)
412 signal_children(child_procs)
417
413
418 self.write_pid_file(overwrite=True)
414 self.write_pid_file(overwrite=True)
419
415
420 try:
416 try:
421 self.factory.loop.start()
417 self.factory.loop.start()
422 except KeyboardInterrupt:
418 except KeyboardInterrupt:
423 self.log.critical("Interrupted, Exiting...\n")
419 self.log.critical("Interrupted, Exiting...\n")
424
420
425
421
426
422
427 def launch_new_instance():
423 def launch_new_instance():
428 """Create and run the IPython controller"""
424 """Create and run the IPython controller"""
429 if sys.platform == 'win32':
425 if sys.platform == 'win32':
430 # make sure we don't get called from a multiprocessing subprocess
426 # make sure we don't get called from a multiprocessing subprocess
431 # this can result in infinite Controllers being started on Windows
427 # this can result in infinite Controllers being started on Windows
432 # which doesn't have a proper fork, so multiprocessing is wonky
428 # which doesn't have a proper fork, so multiprocessing is wonky
433
429
434 # this only comes up when IPython has been installed using vanilla
430 # this only comes up when IPython has been installed using vanilla
435 # setuptools, and *not* distribute.
431 # setuptools, and *not* distribute.
436 import multiprocessing
432 import multiprocessing
437 p = multiprocessing.current_process()
433 p = multiprocessing.current_process()
438 # the main process has name 'MainProcess'
434 # the main process has name 'MainProcess'
439 # subprocesses will have names like 'Process-1'
435 # subprocesses will have names like 'Process-1'
440 if p.name != 'MainProcess':
436 if p.name != 'MainProcess':
441 # we are a subprocess, don't start another Controller!
437 # we are a subprocess, don't start another Controller!
442 return
438 return
443 app = IPControllerApp.instance()
439 app = IPControllerApp.instance()
444 app.initialize()
440 app.initialize()
445 app.start()
441 app.start()
446
442
447
443
448 if __name__ == '__main__':
444 if __name__ == '__main__':
449 launch_new_instance()
445 launch_new_instance()
@@ -1,714 +1,716 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6
6
7 Authors:
7 Authors:
8
8
9 * Min RK
9 * Min RK
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #----------------------------------------------------------------------
18 #----------------------------------------------------------------------
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 from __future__ import print_function
22 from __future__ import print_function
23
23
24 import logging
24 import logging
25 import sys
25 import sys
26
26
27 from datetime import datetime, timedelta
27 from datetime import datetime, timedelta
28 from random import randint, random
28 from random import randint, random
29 from types import FunctionType
29 from types import FunctionType
30
30
31 try:
31 try:
32 import numpy
32 import numpy
33 except ImportError:
33 except ImportError:
34 numpy = None
34 numpy = None
35
35
36 import zmq
36 import zmq
37 from zmq.eventloop import ioloop, zmqstream
37 from zmq.eventloop import ioloop, zmqstream
38
38
39 # local imports
39 # local imports
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.application import Application
42 from IPython.config.loader import Config
42 from IPython.config.loader import Config
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44
44
45 from IPython.parallel import error
45 from IPython.parallel import error
46 from IPython.parallel.factory import SessionFactory
46 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.util import connect_logger, local_logger, asbytes
47 from IPython.parallel.util import connect_logger, local_logger, asbytes
48
48
49 from .dependency import Dependency
49 from .dependency import Dependency
50
50
51 @decorator
51 @decorator
52 def logged(f,self,*args,**kwargs):
52 def logged(f,self,*args,**kwargs):
53 # print ("#--------------------")
53 # print ("#--------------------")
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 # print ("#--")
55 # print ("#--")
56 return f(self,*args, **kwargs)
56 return f(self,*args, **kwargs)
57
57
58 #----------------------------------------------------------------------
58 #----------------------------------------------------------------------
59 # Chooser functions
59 # Chooser functions
60 #----------------------------------------------------------------------
60 #----------------------------------------------------------------------
61
61
62 def plainrandom(loads):
62 def plainrandom(loads):
63 """Plain random pick."""
63 """Plain random pick."""
64 n = len(loads)
64 n = len(loads)
65 return randint(0,n-1)
65 return randint(0,n-1)
66
66
67 def lru(loads):
67 def lru(loads):
68 """Always pick the front of the line.
68 """Always pick the front of the line.
69
69
70 The content of `loads` is ignored.
70 The content of `loads` is ignored.
71
71
72 Assumes LRU ordering of loads, with oldest first.
72 Assumes LRU ordering of loads, with oldest first.
73 """
73 """
74 return 0
74 return 0
75
75
76 def twobin(loads):
76 def twobin(loads):
77 """Pick two at random, use the LRU of the two.
77 """Pick two at random, use the LRU of the two.
78
78
79 The content of loads is ignored.
79 The content of loads is ignored.
80
80
81 Assumes LRU ordering of loads, with oldest first.
81 Assumes LRU ordering of loads, with oldest first.
82 """
82 """
83 n = len(loads)
83 n = len(loads)
84 a = randint(0,n-1)
84 a = randint(0,n-1)
85 b = randint(0,n-1)
85 b = randint(0,n-1)
86 return min(a,b)
86 return min(a,b)
87
87
88 def weighted(loads):
88 def weighted(loads):
89 """Pick two at random using inverse load as weight.
89 """Pick two at random using inverse load as weight.
90
90
91 Return the less loaded of the two.
91 Return the less loaded of the two.
92 """
92 """
93 # weight 0 a million times more than 1:
93 # weight 0 a million times more than 1:
94 weights = 1./(1e-6+numpy.array(loads))
94 weights = 1./(1e-6+numpy.array(loads))
95 sums = weights.cumsum()
95 sums = weights.cumsum()
96 t = sums[-1]
96 t = sums[-1]
97 x = random()*t
97 x = random()*t
98 y = random()*t
98 y = random()*t
99 idx = 0
99 idx = 0
100 idy = 0
100 idy = 0
101 while sums[idx] < x:
101 while sums[idx] < x:
102 idx += 1
102 idx += 1
103 while sums[idy] < y:
103 while sums[idy] < y:
104 idy += 1
104 idy += 1
105 if weights[idy] > weights[idx]:
105 if weights[idy] > weights[idx]:
106 return idy
106 return idy
107 else:
107 else:
108 return idx
108 return idx
109
109
110 def leastload(loads):
110 def leastload(loads):
111 """Always choose the lowest load.
111 """Always choose the lowest load.
112
112
113 If the lowest load occurs more than once, the first
113 If the lowest load occurs more than once, the first
114 occurance will be used. If loads has LRU ordering, this means
114 occurance will be used. If loads has LRU ordering, this means
115 the LRU of those with the lowest load is chosen.
115 the LRU of those with the lowest load is chosen.
116 """
116 """
117 return loads.index(min(loads))
117 return loads.index(min(loads))
118
118
119 #---------------------------------------------------------------------
119 #---------------------------------------------------------------------
120 # Classes
120 # Classes
121 #---------------------------------------------------------------------
121 #---------------------------------------------------------------------
122 # store empty default dependency:
122 # store empty default dependency:
123 MET = Dependency([])
123 MET = Dependency([])
124
124
125 class TaskScheduler(SessionFactory):
125 class TaskScheduler(SessionFactory):
126 """Python TaskScheduler object.
126 """Python TaskScheduler object.
127
127
128 This is the simplest object that supports msg_id based
128 This is the simplest object that supports msg_id based
129 DAG dependencies. *Only* task msg_ids are checked, not
129 DAG dependencies. *Only* task msg_ids are checked, not
130 msg_ids of jobs submitted via the MUX queue.
130 msg_ids of jobs submitted via the MUX queue.
131
131
132 """
132 """
133
133
134 hwm = Integer(0, config=True, shortname='hwm',
134 hwm = Integer(0, config=True, shortname='hwm',
135 help="""specify the High Water Mark (HWM) for the downstream
135 help="""specify the High Water Mark (HWM) for the downstream
136 socket in the Task scheduler. This is the maximum number
136 socket in the Task scheduler. This is the maximum number
137 of allowed outstanding tasks on each engine."""
137 of allowed outstanding tasks on each engine."""
138 )
138 )
139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
140 'leastload', config=True, shortname='scheme', allow_none=False,
140 'leastload', config=True, shortname='scheme', allow_none=False,
141 help="""select the task scheduler scheme [default: Python LRU]
141 help="""select the task scheduler scheme [default: Python LRU]
142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
143 )
143 )
144 def _scheme_name_changed(self, old, new):
144 def _scheme_name_changed(self, old, new):
145 self.log.debug("Using scheme %r"%new)
145 self.log.debug("Using scheme %r"%new)
146 self.scheme = globals()[new]
146 self.scheme = globals()[new]
147
147
148 # input arguments:
148 # input arguments:
149 scheme = Instance(FunctionType) # function for determining the destination
149 scheme = Instance(FunctionType) # function for determining the destination
150 def _scheme_default(self):
150 def _scheme_default(self):
151 return leastload
151 return leastload
152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
156
156
157 # internals:
157 # internals:
158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
162 pending = Dict() # dict by engine_uuid of submitted tasks
162 pending = Dict() # dict by engine_uuid of submitted tasks
163 completed = Dict() # dict by engine_uuid of completed tasks
163 completed = Dict() # dict by engine_uuid of completed tasks
164 failed = Dict() # dict by engine_uuid of failed tasks
164 failed = Dict() # dict by engine_uuid of failed tasks
165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
166 clients = Dict() # dict by msg_id for who submitted the task
166 clients = Dict() # dict by msg_id for who submitted the task
167 targets = List() # list of target IDENTs
167 targets = List() # list of target IDENTs
168 loads = List() # list of engine loads
168 loads = List() # list of engine loads
169 # full = Set() # set of IDENTs that have HWM outstanding tasks
169 # full = Set() # set of IDENTs that have HWM outstanding tasks
170 all_completed = Set() # set of all completed tasks
170 all_completed = Set() # set of all completed tasks
171 all_failed = Set() # set of all failed tasks
171 all_failed = Set() # set of all failed tasks
172 all_done = Set() # set of all finished tasks=union(completed,failed)
172 all_done = Set() # set of all finished tasks=union(completed,failed)
173 all_ids = Set() # set of all submitted task IDs
173 all_ids = Set() # set of all submitted task IDs
174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
176
176
177 ident = CBytes() # ZMQ identity. This should just be self.session.session
177 ident = CBytes() # ZMQ identity. This should just be self.session.session
178 # but ensure Bytes
178 # but ensure Bytes
179 def _ident_default(self):
179 def _ident_default(self):
180 return self.session.bsession
180 return self.session.bsession
181
181
182 def start(self):
182 def start(self):
183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
184 self._notification_handlers = dict(
184 self._notification_handlers = dict(
185 registration_notification = self._register_engine,
185 registration_notification = self._register_engine,
186 unregistration_notification = self._unregister_engine
186 unregistration_notification = self._unregister_engine
187 )
187 )
188 self.notifier_stream.on_recv(self.dispatch_notification)
188 self.notifier_stream.on_recv(self.dispatch_notification)
189 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
189 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
190 self.auditor.start()
190 self.auditor.start()
191 self.log.info("Scheduler started [%s]"%self.scheme_name)
191 self.log.info("Scheduler started [%s]"%self.scheme_name)
192
192
193 def resume_receiving(self):
193 def resume_receiving(self):
194 """Resume accepting jobs."""
194 """Resume accepting jobs."""
195 self.client_stream.on_recv(self.dispatch_submission, copy=False)
195 self.client_stream.on_recv(self.dispatch_submission, copy=False)
196
196
197 def stop_receiving(self):
197 def stop_receiving(self):
198 """Stop accepting jobs while there are no engines.
198 """Stop accepting jobs while there are no engines.
199 Leave them in the ZMQ queue."""
199 Leave them in the ZMQ queue."""
200 self.client_stream.on_recv(None)
200 self.client_stream.on_recv(None)
201
201
202 #-----------------------------------------------------------------------
202 #-----------------------------------------------------------------------
203 # [Un]Registration Handling
203 # [Un]Registration Handling
204 #-----------------------------------------------------------------------
204 #-----------------------------------------------------------------------
205
205
206 def dispatch_notification(self, msg):
206 def dispatch_notification(self, msg):
207 """dispatch register/unregister events."""
207 """dispatch register/unregister events."""
208 try:
208 try:
209 idents,msg = self.session.feed_identities(msg)
209 idents,msg = self.session.feed_identities(msg)
210 except ValueError:
210 except ValueError:
211 self.log.warn("task::Invalid Message: %r",msg)
211 self.log.warn("task::Invalid Message: %r",msg)
212 return
212 return
213 try:
213 try:
214 msg = self.session.unserialize(msg)
214 msg = self.session.unserialize(msg)
215 except ValueError:
215 except ValueError:
216 self.log.warn("task::Unauthorized message from: %r"%idents)
216 self.log.warn("task::Unauthorized message from: %r"%idents)
217 return
217 return
218
218
219 msg_type = msg['header']['msg_type']
219 msg_type = msg['header']['msg_type']
220
220
221 handler = self._notification_handlers.get(msg_type, None)
221 handler = self._notification_handlers.get(msg_type, None)
222 if handler is None:
222 if handler is None:
223 self.log.error("Unhandled message type: %r"%msg_type)
223 self.log.error("Unhandled message type: %r"%msg_type)
224 else:
224 else:
225 try:
225 try:
226 handler(asbytes(msg['content']['queue']))
226 handler(asbytes(msg['content']['queue']))
227 except Exception:
227 except Exception:
228 self.log.error("task::Invalid notification msg: %r",msg)
228 self.log.error("task::Invalid notification msg: %r",msg)
229
229
230 def _register_engine(self, uid):
230 def _register_engine(self, uid):
231 """New engine with ident `uid` became available."""
231 """New engine with ident `uid` became available."""
232 # head of the line:
232 # head of the line:
233 self.targets.insert(0,uid)
233 self.targets.insert(0,uid)
234 self.loads.insert(0,0)
234 self.loads.insert(0,0)
235
235
236 # initialize sets
236 # initialize sets
237 self.completed[uid] = set()
237 self.completed[uid] = set()
238 self.failed[uid] = set()
238 self.failed[uid] = set()
239 self.pending[uid] = {}
239 self.pending[uid] = {}
240 if len(self.targets) == 1:
240 if len(self.targets) == 1:
241 self.resume_receiving()
241 self.resume_receiving()
242 # rescan the graph:
242 # rescan the graph:
243 self.update_graph(None)
243 self.update_graph(None)
244
244
245 def _unregister_engine(self, uid):
245 def _unregister_engine(self, uid):
246 """Existing engine with ident `uid` became unavailable."""
246 """Existing engine with ident `uid` became unavailable."""
247 if len(self.targets) == 1:
247 if len(self.targets) == 1:
248 # this was our only engine
248 # this was our only engine
249 self.stop_receiving()
249 self.stop_receiving()
250
250
251 # handle any potentially finished tasks:
251 # handle any potentially finished tasks:
252 self.engine_stream.flush()
252 self.engine_stream.flush()
253
253
254 # don't pop destinations, because they might be used later
254 # don't pop destinations, because they might be used later
255 # map(self.destinations.pop, self.completed.pop(uid))
255 # map(self.destinations.pop, self.completed.pop(uid))
256 # map(self.destinations.pop, self.failed.pop(uid))
256 # map(self.destinations.pop, self.failed.pop(uid))
257
257
258 # prevent this engine from receiving work
258 # prevent this engine from receiving work
259 idx = self.targets.index(uid)
259 idx = self.targets.index(uid)
260 self.targets.pop(idx)
260 self.targets.pop(idx)
261 self.loads.pop(idx)
261 self.loads.pop(idx)
262
262
263 # wait 5 seconds before cleaning up pending jobs, since the results might
263 # wait 5 seconds before cleaning up pending jobs, since the results might
264 # still be incoming
264 # still be incoming
265 if self.pending[uid]:
265 if self.pending[uid]:
266 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
266 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
267 dc.start()
267 dc.start()
268 else:
268 else:
269 self.completed.pop(uid)
269 self.completed.pop(uid)
270 self.failed.pop(uid)
270 self.failed.pop(uid)
271
271
272
272
273 def handle_stranded_tasks(self, engine):
273 def handle_stranded_tasks(self, engine):
274 """Deal with jobs resident in an engine that died."""
274 """Deal with jobs resident in an engine that died."""
275 lost = self.pending[engine]
275 lost = self.pending[engine]
276 for msg_id in lost.keys():
276 for msg_id in lost.keys():
277 if msg_id not in self.pending[engine]:
277 if msg_id not in self.pending[engine]:
278 # prevent double-handling of messages
278 # prevent double-handling of messages
279 continue
279 continue
280
280
281 raw_msg = lost[msg_id][0]
281 raw_msg = lost[msg_id][0]
282 idents,msg = self.session.feed_identities(raw_msg, copy=False)
282 idents,msg = self.session.feed_identities(raw_msg, copy=False)
283 parent = self.session.unpack(msg[1].bytes)
283 parent = self.session.unpack(msg[1].bytes)
284 idents = [engine, idents[0]]
284 idents = [engine, idents[0]]
285
285
286 # build fake error reply
286 # build fake error reply
287 try:
287 try:
288 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
288 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
289 except:
289 except:
290 content = error.wrap_exception()
290 content = error.wrap_exception()
291 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
291 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
292 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
292 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
293 # and dispatch it
293 # and dispatch it
294 self.dispatch_result(raw_reply)
294 self.dispatch_result(raw_reply)
295
295
296 # finally scrub completed/failed lists
296 # finally scrub completed/failed lists
297 self.completed.pop(engine)
297 self.completed.pop(engine)
298 self.failed.pop(engine)
298 self.failed.pop(engine)
299
299
300
300
301 #-----------------------------------------------------------------------
301 #-----------------------------------------------------------------------
302 # Job Submission
302 # Job Submission
303 #-----------------------------------------------------------------------
303 #-----------------------------------------------------------------------
304 def dispatch_submission(self, raw_msg):
304 def dispatch_submission(self, raw_msg):
305 """Dispatch job submission to appropriate handlers."""
305 """Dispatch job submission to appropriate handlers."""
306 # ensure targets up to date:
306 # ensure targets up to date:
307 self.notifier_stream.flush()
307 self.notifier_stream.flush()
308 try:
308 try:
309 idents, msg = self.session.feed_identities(raw_msg, copy=False)
309 idents, msg = self.session.feed_identities(raw_msg, copy=False)
310 msg = self.session.unserialize(msg, content=False, copy=False)
310 msg = self.session.unserialize(msg, content=False, copy=False)
311 except Exception:
311 except Exception:
312 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
312 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
313 return
313 return
314
314
315
315
316 # send to monitor
316 # send to monitor
317 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
317 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
318
318
319 header = msg['header']
319 header = msg['header']
320 msg_id = header['msg_id']
320 msg_id = header['msg_id']
321 self.all_ids.add(msg_id)
321 self.all_ids.add(msg_id)
322
322
323 # get targets as a set of bytes objects
323 # get targets as a set of bytes objects
324 # from a list of unicode objects
324 # from a list of unicode objects
325 targets = header.get('targets', [])
325 targets = header.get('targets', [])
326 targets = map(asbytes, targets)
326 targets = map(asbytes, targets)
327 targets = set(targets)
327 targets = set(targets)
328
328
329 retries = header.get('retries', 0)
329 retries = header.get('retries', 0)
330 self.retries[msg_id] = retries
330 self.retries[msg_id] = retries
331
331
332 # time dependencies
332 # time dependencies
333 after = header.get('after', None)
333 after = header.get('after', None)
334 if after:
334 if after:
335 after = Dependency(after)
335 after = Dependency(after)
336 if after.all:
336 if after.all:
337 if after.success:
337 if after.success:
338 after = Dependency(after.difference(self.all_completed),
338 after = Dependency(after.difference(self.all_completed),
339 success=after.success,
339 success=after.success,
340 failure=after.failure,
340 failure=after.failure,
341 all=after.all,
341 all=after.all,
342 )
342 )
343 if after.failure:
343 if after.failure:
344 after = Dependency(after.difference(self.all_failed),
344 after = Dependency(after.difference(self.all_failed),
345 success=after.success,
345 success=after.success,
346 failure=after.failure,
346 failure=after.failure,
347 all=after.all,
347 all=after.all,
348 )
348 )
349 if after.check(self.all_completed, self.all_failed):
349 if after.check(self.all_completed, self.all_failed):
350 # recast as empty set, if `after` already met,
350 # recast as empty set, if `after` already met,
351 # to prevent unnecessary set comparisons
351 # to prevent unnecessary set comparisons
352 after = MET
352 after = MET
353 else:
353 else:
354 after = MET
354 after = MET
355
355
356 # location dependencies
356 # location dependencies
357 follow = Dependency(header.get('follow', []))
357 follow = Dependency(header.get('follow', []))
358
358
359 # turn timeouts into datetime objects:
359 # turn timeouts into datetime objects:
360 timeout = header.get('timeout', None)
360 timeout = header.get('timeout', None)
361 if timeout:
361 if timeout:
362 timeout = datetime.now() + timedelta(0,timeout,0)
362 # cast to float, because jsonlib returns floats as decimal.Decimal,
363 # which timedelta does not accept
364 timeout = datetime.now() + timedelta(0,float(timeout),0)
363
365
364 args = [raw_msg, targets, after, follow, timeout]
366 args = [raw_msg, targets, after, follow, timeout]
365
367
366 # validate and reduce dependencies:
368 # validate and reduce dependencies:
367 for dep in after,follow:
369 for dep in after,follow:
368 if not dep: # empty dependency
370 if not dep: # empty dependency
369 continue
371 continue
370 # check valid:
372 # check valid:
371 if msg_id in dep or dep.difference(self.all_ids):
373 if msg_id in dep or dep.difference(self.all_ids):
372 self.depending[msg_id] = args
374 self.depending[msg_id] = args
373 return self.fail_unreachable(msg_id, error.InvalidDependency)
375 return self.fail_unreachable(msg_id, error.InvalidDependency)
374 # check if unreachable:
376 # check if unreachable:
375 if dep.unreachable(self.all_completed, self.all_failed):
377 if dep.unreachable(self.all_completed, self.all_failed):
376 self.depending[msg_id] = args
378 self.depending[msg_id] = args
377 return self.fail_unreachable(msg_id)
379 return self.fail_unreachable(msg_id)
378
380
379 if after.check(self.all_completed, self.all_failed):
381 if after.check(self.all_completed, self.all_failed):
380 # time deps already met, try to run
382 # time deps already met, try to run
381 if not self.maybe_run(msg_id, *args):
383 if not self.maybe_run(msg_id, *args):
382 # can't run yet
384 # can't run yet
383 if msg_id not in self.all_failed:
385 if msg_id not in self.all_failed:
384 # could have failed as unreachable
386 # could have failed as unreachable
385 self.save_unmet(msg_id, *args)
387 self.save_unmet(msg_id, *args)
386 else:
388 else:
387 self.save_unmet(msg_id, *args)
389 self.save_unmet(msg_id, *args)
388
390
389 def audit_timeouts(self):
391 def audit_timeouts(self):
390 """Audit all waiting tasks for expired timeouts."""
392 """Audit all waiting tasks for expired timeouts."""
391 now = datetime.now()
393 now = datetime.now()
392 for msg_id in self.depending.keys():
394 for msg_id in self.depending.keys():
393 # must recheck, in case one failure cascaded to another:
395 # must recheck, in case one failure cascaded to another:
394 if msg_id in self.depending:
396 if msg_id in self.depending:
395 raw,after,targets,follow,timeout = self.depending[msg_id]
397 raw,after,targets,follow,timeout = self.depending[msg_id]
396 if timeout and timeout < now:
398 if timeout and timeout < now:
397 self.fail_unreachable(msg_id, error.TaskTimeout)
399 self.fail_unreachable(msg_id, error.TaskTimeout)
398
400
399 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
401 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
400 """a task has become unreachable, send a reply with an ImpossibleDependency
402 """a task has become unreachable, send a reply with an ImpossibleDependency
401 error."""
403 error."""
402 if msg_id not in self.depending:
404 if msg_id not in self.depending:
403 self.log.error("msg %r already failed!", msg_id)
405 self.log.error("msg %r already failed!", msg_id)
404 return
406 return
405 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
407 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
406 for mid in follow.union(after):
408 for mid in follow.union(after):
407 if mid in self.graph:
409 if mid in self.graph:
408 self.graph[mid].remove(msg_id)
410 self.graph[mid].remove(msg_id)
409
411
410 # FIXME: unpacking a message I've already unpacked, but didn't save:
412 # FIXME: unpacking a message I've already unpacked, but didn't save:
411 idents,msg = self.session.feed_identities(raw_msg, copy=False)
413 idents,msg = self.session.feed_identities(raw_msg, copy=False)
412 header = self.session.unpack(msg[1].bytes)
414 header = self.session.unpack(msg[1].bytes)
413
415
414 try:
416 try:
415 raise why()
417 raise why()
416 except:
418 except:
417 content = error.wrap_exception()
419 content = error.wrap_exception()
418
420
419 self.all_done.add(msg_id)
421 self.all_done.add(msg_id)
420 self.all_failed.add(msg_id)
422 self.all_failed.add(msg_id)
421
423
422 msg = self.session.send(self.client_stream, 'apply_reply', content,
424 msg = self.session.send(self.client_stream, 'apply_reply', content,
423 parent=header, ident=idents)
425 parent=header, ident=idents)
424 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
426 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
425
427
426 self.update_graph(msg_id, success=False)
428 self.update_graph(msg_id, success=False)
427
429
428 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
430 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
429 """check location dependencies, and run if they are met."""
431 """check location dependencies, and run if they are met."""
430 blacklist = self.blacklist.setdefault(msg_id, set())
432 blacklist = self.blacklist.setdefault(msg_id, set())
431 if follow or targets or blacklist or self.hwm:
433 if follow or targets or blacklist or self.hwm:
432 # we need a can_run filter
434 # we need a can_run filter
433 def can_run(idx):
435 def can_run(idx):
434 # check hwm
436 # check hwm
435 if self.hwm and self.loads[idx] == self.hwm:
437 if self.hwm and self.loads[idx] == self.hwm:
436 return False
438 return False
437 target = self.targets[idx]
439 target = self.targets[idx]
438 # check blacklist
440 # check blacklist
439 if target in blacklist:
441 if target in blacklist:
440 return False
442 return False
441 # check targets
443 # check targets
442 if targets and target not in targets:
444 if targets and target not in targets:
443 return False
445 return False
444 # check follow
446 # check follow
445 return follow.check(self.completed[target], self.failed[target])
447 return follow.check(self.completed[target], self.failed[target])
446
448
447 indices = filter(can_run, range(len(self.targets)))
449 indices = filter(can_run, range(len(self.targets)))
448
450
449 if not indices:
451 if not indices:
450 # couldn't run
452 # couldn't run
451 if follow.all:
453 if follow.all:
452 # check follow for impossibility
454 # check follow for impossibility
453 dests = set()
455 dests = set()
454 relevant = set()
456 relevant = set()
455 if follow.success:
457 if follow.success:
456 relevant = self.all_completed
458 relevant = self.all_completed
457 if follow.failure:
459 if follow.failure:
458 relevant = relevant.union(self.all_failed)
460 relevant = relevant.union(self.all_failed)
459 for m in follow.intersection(relevant):
461 for m in follow.intersection(relevant):
460 dests.add(self.destinations[m])
462 dests.add(self.destinations[m])
461 if len(dests) > 1:
463 if len(dests) > 1:
462 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
464 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
463 self.fail_unreachable(msg_id)
465 self.fail_unreachable(msg_id)
464 return False
466 return False
465 if targets:
467 if targets:
466 # check blacklist+targets for impossibility
468 # check blacklist+targets for impossibility
467 targets.difference_update(blacklist)
469 targets.difference_update(blacklist)
468 if not targets or not targets.intersection(self.targets):
470 if not targets or not targets.intersection(self.targets):
469 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
471 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
470 self.fail_unreachable(msg_id)
472 self.fail_unreachable(msg_id)
471 return False
473 return False
472 return False
474 return False
473 else:
475 else:
474 indices = None
476 indices = None
475
477
476 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
478 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
477 return True
479 return True
478
480
479 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
481 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
480 """Save a message for later submission when its dependencies are met."""
482 """Save a message for later submission when its dependencies are met."""
481 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
483 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
482 # track the ids in follow or after, but not those already finished
484 # track the ids in follow or after, but not those already finished
483 for dep_id in after.union(follow).difference(self.all_done):
485 for dep_id in after.union(follow).difference(self.all_done):
484 if dep_id not in self.graph:
486 if dep_id not in self.graph:
485 self.graph[dep_id] = set()
487 self.graph[dep_id] = set()
486 self.graph[dep_id].add(msg_id)
488 self.graph[dep_id].add(msg_id)
487
489
488 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
490 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
489 """Submit a task to any of a subset of our targets."""
491 """Submit a task to any of a subset of our targets."""
490 if indices:
492 if indices:
491 loads = [self.loads[i] for i in indices]
493 loads = [self.loads[i] for i in indices]
492 else:
494 else:
493 loads = self.loads
495 loads = self.loads
494 idx = self.scheme(loads)
496 idx = self.scheme(loads)
495 if indices:
497 if indices:
496 idx = indices[idx]
498 idx = indices[idx]
497 target = self.targets[idx]
499 target = self.targets[idx]
498 # print (target, map(str, msg[:3]))
500 # print (target, map(str, msg[:3]))
499 # send job to the engine
501 # send job to the engine
500 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
502 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
501 self.engine_stream.send_multipart(raw_msg, copy=False)
503 self.engine_stream.send_multipart(raw_msg, copy=False)
502 # update load
504 # update load
503 self.add_job(idx)
505 self.add_job(idx)
504 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
506 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
505 # notify Hub
507 # notify Hub
506 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
508 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
507 self.session.send(self.mon_stream, 'task_destination', content=content,
509 self.session.send(self.mon_stream, 'task_destination', content=content,
508 ident=[b'tracktask',self.ident])
510 ident=[b'tracktask',self.ident])
509
511
510
512
511 #-----------------------------------------------------------------------
513 #-----------------------------------------------------------------------
512 # Result Handling
514 # Result Handling
513 #-----------------------------------------------------------------------
515 #-----------------------------------------------------------------------
514 def dispatch_result(self, raw_msg):
516 def dispatch_result(self, raw_msg):
515 """dispatch method for result replies"""
517 """dispatch method for result replies"""
516 try:
518 try:
517 idents,msg = self.session.feed_identities(raw_msg, copy=False)
519 idents,msg = self.session.feed_identities(raw_msg, copy=False)
518 msg = self.session.unserialize(msg, content=False, copy=False)
520 msg = self.session.unserialize(msg, content=False, copy=False)
519 engine = idents[0]
521 engine = idents[0]
520 try:
522 try:
521 idx = self.targets.index(engine)
523 idx = self.targets.index(engine)
522 except ValueError:
524 except ValueError:
523 pass # skip load-update for dead engines
525 pass # skip load-update for dead engines
524 else:
526 else:
525 self.finish_job(idx)
527 self.finish_job(idx)
526 except Exception:
528 except Exception:
527 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
529 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
528 return
530 return
529
531
530 header = msg['header']
532 header = msg['header']
531 parent = msg['parent_header']
533 parent = msg['parent_header']
532 if header.get('dependencies_met', True):
534 if header.get('dependencies_met', True):
533 success = (header['status'] == 'ok')
535 success = (header['status'] == 'ok')
534 msg_id = parent['msg_id']
536 msg_id = parent['msg_id']
535 retries = self.retries[msg_id]
537 retries = self.retries[msg_id]
536 if not success and retries > 0:
538 if not success and retries > 0:
537 # failed
539 # failed
538 self.retries[msg_id] = retries - 1
540 self.retries[msg_id] = retries - 1
539 self.handle_unmet_dependency(idents, parent)
541 self.handle_unmet_dependency(idents, parent)
540 else:
542 else:
541 del self.retries[msg_id]
543 del self.retries[msg_id]
542 # relay to client and update graph
544 # relay to client and update graph
543 self.handle_result(idents, parent, raw_msg, success)
545 self.handle_result(idents, parent, raw_msg, success)
544 # send to Hub monitor
546 # send to Hub monitor
545 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
547 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
546 else:
548 else:
547 self.handle_unmet_dependency(idents, parent)
549 self.handle_unmet_dependency(idents, parent)
548
550
549 def handle_result(self, idents, parent, raw_msg, success=True):
551 def handle_result(self, idents, parent, raw_msg, success=True):
550 """handle a real task result, either success or failure"""
552 """handle a real task result, either success or failure"""
551 # first, relay result to client
553 # first, relay result to client
552 engine = idents[0]
554 engine = idents[0]
553 client = idents[1]
555 client = idents[1]
554 # swap_ids for XREP-XREP mirror
556 # swap_ids for XREP-XREP mirror
555 raw_msg[:2] = [client,engine]
557 raw_msg[:2] = [client,engine]
556 # print (map(str, raw_msg[:4]))
558 # print (map(str, raw_msg[:4]))
557 self.client_stream.send_multipart(raw_msg, copy=False)
559 self.client_stream.send_multipart(raw_msg, copy=False)
558 # now, update our data structures
560 # now, update our data structures
559 msg_id = parent['msg_id']
561 msg_id = parent['msg_id']
560 self.blacklist.pop(msg_id, None)
562 self.blacklist.pop(msg_id, None)
561 self.pending[engine].pop(msg_id)
563 self.pending[engine].pop(msg_id)
562 if success:
564 if success:
563 self.completed[engine].add(msg_id)
565 self.completed[engine].add(msg_id)
564 self.all_completed.add(msg_id)
566 self.all_completed.add(msg_id)
565 else:
567 else:
566 self.failed[engine].add(msg_id)
568 self.failed[engine].add(msg_id)
567 self.all_failed.add(msg_id)
569 self.all_failed.add(msg_id)
568 self.all_done.add(msg_id)
570 self.all_done.add(msg_id)
569 self.destinations[msg_id] = engine
571 self.destinations[msg_id] = engine
570
572
571 self.update_graph(msg_id, success)
573 self.update_graph(msg_id, success)
572
574
573 def handle_unmet_dependency(self, idents, parent):
575 def handle_unmet_dependency(self, idents, parent):
574 """handle an unmet dependency"""
576 """handle an unmet dependency"""
575 engine = idents[0]
577 engine = idents[0]
576 msg_id = parent['msg_id']
578 msg_id = parent['msg_id']
577
579
578 if msg_id not in self.blacklist:
580 if msg_id not in self.blacklist:
579 self.blacklist[msg_id] = set()
581 self.blacklist[msg_id] = set()
580 self.blacklist[msg_id].add(engine)
582 self.blacklist[msg_id].add(engine)
581
583
582 args = self.pending[engine].pop(msg_id)
584 args = self.pending[engine].pop(msg_id)
583 raw,targets,after,follow,timeout = args
585 raw,targets,after,follow,timeout = args
584
586
585 if self.blacklist[msg_id] == targets:
587 if self.blacklist[msg_id] == targets:
586 self.depending[msg_id] = args
588 self.depending[msg_id] = args
587 self.fail_unreachable(msg_id)
589 self.fail_unreachable(msg_id)
588 elif not self.maybe_run(msg_id, *args):
590 elif not self.maybe_run(msg_id, *args):
589 # resubmit failed
591 # resubmit failed
590 if msg_id not in self.all_failed:
592 if msg_id not in self.all_failed:
591 # put it back in our dependency tree
593 # put it back in our dependency tree
592 self.save_unmet(msg_id, *args)
594 self.save_unmet(msg_id, *args)
593
595
594 if self.hwm:
596 if self.hwm:
595 try:
597 try:
596 idx = self.targets.index(engine)
598 idx = self.targets.index(engine)
597 except ValueError:
599 except ValueError:
598 pass # skip load-update for dead engines
600 pass # skip load-update for dead engines
599 else:
601 else:
600 if self.loads[idx] == self.hwm-1:
602 if self.loads[idx] == self.hwm-1:
601 self.update_graph(None)
603 self.update_graph(None)
602
604
603
605
604
606
605 def update_graph(self, dep_id=None, success=True):
607 def update_graph(self, dep_id=None, success=True):
606 """dep_id just finished. Update our dependency
608 """dep_id just finished. Update our dependency
607 graph and submit any jobs that just became runable.
609 graph and submit any jobs that just became runable.
608
610
609 Called with dep_id=None to update entire graph for hwm, but without finishing
611 Called with dep_id=None to update entire graph for hwm, but without finishing
610 a task.
612 a task.
611 """
613 """
612 # print ("\n\n***********")
614 # print ("\n\n***********")
613 # pprint (dep_id)
615 # pprint (dep_id)
614 # pprint (self.graph)
616 # pprint (self.graph)
615 # pprint (self.depending)
617 # pprint (self.depending)
616 # pprint (self.all_completed)
618 # pprint (self.all_completed)
617 # pprint (self.all_failed)
619 # pprint (self.all_failed)
618 # print ("\n\n***********\n\n")
620 # print ("\n\n***********\n\n")
619 # update any jobs that depended on the dependency
621 # update any jobs that depended on the dependency
620 jobs = self.graph.pop(dep_id, [])
622 jobs = self.graph.pop(dep_id, [])
621
623
622 # recheck *all* jobs if
624 # recheck *all* jobs if
623 # a) we have HWM and an engine just become no longer full
625 # a) we have HWM and an engine just become no longer full
624 # or b) dep_id was given as None
626 # or b) dep_id was given as None
625 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
627 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
626 jobs = self.depending.keys()
628 jobs = self.depending.keys()
627
629
628 for msg_id in jobs:
630 for msg_id in jobs:
629 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
631 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
630
632
631 if after.unreachable(self.all_completed, self.all_failed)\
633 if after.unreachable(self.all_completed, self.all_failed)\
632 or follow.unreachable(self.all_completed, self.all_failed):
634 or follow.unreachable(self.all_completed, self.all_failed):
633 self.fail_unreachable(msg_id)
635 self.fail_unreachable(msg_id)
634
636
635 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
637 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
636 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
638 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
637
639
638 self.depending.pop(msg_id)
640 self.depending.pop(msg_id)
639 for mid in follow.union(after):
641 for mid in follow.union(after):
640 if mid in self.graph:
642 if mid in self.graph:
641 self.graph[mid].remove(msg_id)
643 self.graph[mid].remove(msg_id)
642
644
643 #----------------------------------------------------------------------
645 #----------------------------------------------------------------------
644 # methods to be overridden by subclasses
646 # methods to be overridden by subclasses
645 #----------------------------------------------------------------------
647 #----------------------------------------------------------------------
646
648
647 def add_job(self, idx):
649 def add_job(self, idx):
648 """Called after self.targets[idx] just got the job with header.
650 """Called after self.targets[idx] just got the job with header.
649 Override with subclasses. The default ordering is simple LRU.
651 Override with subclasses. The default ordering is simple LRU.
650 The default loads are the number of outstanding jobs."""
652 The default loads are the number of outstanding jobs."""
651 self.loads[idx] += 1
653 self.loads[idx] += 1
652 for lis in (self.targets, self.loads):
654 for lis in (self.targets, self.loads):
653 lis.append(lis.pop(idx))
655 lis.append(lis.pop(idx))
654
656
655
657
656 def finish_job(self, idx):
658 def finish_job(self, idx):
657 """Called after self.targets[idx] just finished a job.
659 """Called after self.targets[idx] just finished a job.
658 Override with subclasses."""
660 Override with subclasses."""
659 self.loads[idx] -= 1
661 self.loads[idx] -= 1
660
662
661
663
662
664
663 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
665 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
664 logname='root', log_url=None, loglevel=logging.DEBUG,
666 logname='root', log_url=None, loglevel=logging.DEBUG,
665 identity=b'task', in_thread=False):
667 identity=b'task', in_thread=False):
666
668
667 ZMQStream = zmqstream.ZMQStream
669 ZMQStream = zmqstream.ZMQStream
668
670
669 if config:
671 if config:
670 # unwrap dict back into Config
672 # unwrap dict back into Config
671 config = Config(config)
673 config = Config(config)
672
674
673 if in_thread:
675 if in_thread:
674 # use instance() to get the same Context/Loop as our parent
676 # use instance() to get the same Context/Loop as our parent
675 ctx = zmq.Context.instance()
677 ctx = zmq.Context.instance()
676 loop = ioloop.IOLoop.instance()
678 loop = ioloop.IOLoop.instance()
677 else:
679 else:
678 # in a process, don't use instance()
680 # in a process, don't use instance()
679 # for safety with multiprocessing
681 # for safety with multiprocessing
680 ctx = zmq.Context()
682 ctx = zmq.Context()
681 loop = ioloop.IOLoop()
683 loop = ioloop.IOLoop()
682 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
684 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
683 ins.setsockopt(zmq.IDENTITY, identity)
685 ins.setsockopt(zmq.IDENTITY, identity)
684 ins.bind(in_addr)
686 ins.bind(in_addr)
685
687
686 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
688 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
687 outs.setsockopt(zmq.IDENTITY, identity)
689 outs.setsockopt(zmq.IDENTITY, identity)
688 outs.bind(out_addr)
690 outs.bind(out_addr)
689 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
691 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
690 mons.connect(mon_addr)
692 mons.connect(mon_addr)
691 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
693 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
692 nots.setsockopt(zmq.SUBSCRIBE, b'')
694 nots.setsockopt(zmq.SUBSCRIBE, b'')
693 nots.connect(not_addr)
695 nots.connect(not_addr)
694
696
695 # setup logging.
697 # setup logging.
696 if in_thread:
698 if in_thread:
697 log = Application.instance().log
699 log = Application.instance().log
698 else:
700 else:
699 if log_url:
701 if log_url:
700 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
702 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
701 else:
703 else:
702 log = local_logger(logname, loglevel)
704 log = local_logger(logname, loglevel)
703
705
704 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
706 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
705 mon_stream=mons, notifier_stream=nots,
707 mon_stream=mons, notifier_stream=nots,
706 loop=loop, log=log,
708 loop=loop, log=log,
707 config=config)
709 config=config)
708 scheduler.start()
710 scheduler.start()
709 if not in_thread:
711 if not in_thread:
710 try:
712 try:
711 loop.start()
713 loop.start()
712 except KeyboardInterrupt:
714 except KeyboardInterrupt:
713 print ("interrupted, exiting...", file=sys.__stderr__)
715 print ("interrupted, exiting...", file=sys.__stderr__)
714
716
@@ -1,212 +1,208 b''
1 """ Defines helper functions for creating kernel entry points and process
1 """ Defines helper functions for creating kernel entry points and process
2 launchers.
2 launchers.
3 """
3 """
4
4
5 # Standard library imports.
5 # Standard library imports.
6 import atexit
6 import atexit
7 import json
7 import os
8 import os
8 import socket
9 import socket
9 from subprocess import Popen, PIPE
10 from subprocess import Popen, PIPE
10 import sys
11 import sys
11 import tempfile
12 import tempfile
12
13
13 # System library imports
14 # System library imports
14
15
15 # Note: use our own import to work around jsonlib api mismatch. When these
16 # changes propagate to zmq, revert back to the following line instead:
17 #from zmq.utils import jsonapi as json
18 from IPython.zmq import jsonapi as json
19
20 # IPython imports
16 # IPython imports
21 from IPython.utils.localinterfaces import LOCALHOST
17 from IPython.utils.localinterfaces import LOCALHOST
22 from IPython.utils.py3compat import bytes_to_str
18 from IPython.utils.py3compat import bytes_to_str
23
19
24 # Local imports.
20 # Local imports.
25 from parentpoller import ParentPollerWindows
21 from parentpoller import ParentPollerWindows
26
22
27 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
23 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
28 ip=LOCALHOST, key=b''):
24 ip=LOCALHOST, key=b''):
29 """Generates a JSON config file, including the selection of random ports.
25 """Generates a JSON config file, including the selection of random ports.
30
26
31 Parameters
27 Parameters
32 ----------
28 ----------
33
29
34 fname : unicode
30 fname : unicode
35 The path to the file to write
31 The path to the file to write
36
32
37 shell_port : int, optional
33 shell_port : int, optional
38 The port to use for XREP channel.
34 The port to use for XREP channel.
39
35
40 iopub_port : int, optional
36 iopub_port : int, optional
41 The port to use for the SUB channel.
37 The port to use for the SUB channel.
42
38
43 stdin_port : int, optional
39 stdin_port : int, optional
44 The port to use for the REQ (raw input) channel.
40 The port to use for the REQ (raw input) channel.
45
41
46 hb_port : int, optional
42 hb_port : int, optional
47 The port to use for the hearbeat REP channel.
43 The port to use for the hearbeat REP channel.
48
44
49 ip : str, optional
45 ip : str, optional
50 The ip address the kernel will bind to.
46 The ip address the kernel will bind to.
51
47
52 key : str, optional
48 key : str, optional
53 The Session key used for HMAC authentication.
49 The Session key used for HMAC authentication.
54
50
55 """
51 """
56 # default to temporary connector file
52 # default to temporary connector file
57 if not fname:
53 if not fname:
58 fname = tempfile.mktemp('.json')
54 fname = tempfile.mktemp('.json')
59
55
60 # Find open ports as necessary.
56 # Find open ports as necessary.
61 ports = []
57 ports = []
62 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
58 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
63 int(stdin_port <= 0) + int(hb_port <= 0)
59 int(stdin_port <= 0) + int(hb_port <= 0)
64 for i in xrange(ports_needed):
60 for i in xrange(ports_needed):
65 sock = socket.socket()
61 sock = socket.socket()
66 sock.bind(('', 0))
62 sock.bind(('', 0))
67 ports.append(sock)
63 ports.append(sock)
68 for i, sock in enumerate(ports):
64 for i, sock in enumerate(ports):
69 port = sock.getsockname()[1]
65 port = sock.getsockname()[1]
70 sock.close()
66 sock.close()
71 ports[i] = port
67 ports[i] = port
72 if shell_port <= 0:
68 if shell_port <= 0:
73 shell_port = ports.pop(0)
69 shell_port = ports.pop(0)
74 if iopub_port <= 0:
70 if iopub_port <= 0:
75 iopub_port = ports.pop(0)
71 iopub_port = ports.pop(0)
76 if stdin_port <= 0:
72 if stdin_port <= 0:
77 stdin_port = ports.pop(0)
73 stdin_port = ports.pop(0)
78 if hb_port <= 0:
74 if hb_port <= 0:
79 hb_port = ports.pop(0)
75 hb_port = ports.pop(0)
80
76
81 cfg = dict( shell_port=shell_port,
77 cfg = dict( shell_port=shell_port,
82 iopub_port=iopub_port,
78 iopub_port=iopub_port,
83 stdin_port=stdin_port,
79 stdin_port=stdin_port,
84 hb_port=hb_port,
80 hb_port=hb_port,
85 )
81 )
86 cfg['ip'] = ip
82 cfg['ip'] = ip
87 cfg['key'] = bytes_to_str(key)
83 cfg['key'] = bytes_to_str(key)
88
84
89 with open(fname, 'wb') as f:
85 with open(fname, 'w') as f:
90 f.write(json.dumps(cfg, indent=2))
86 f.write(json.dumps(cfg, indent=2))
91
87
92 return fname, cfg
88 return fname, cfg
93
89
94
90
95 def base_launch_kernel(code, fname, stdin=None, stdout=None, stderr=None,
91 def base_launch_kernel(code, fname, stdin=None, stdout=None, stderr=None,
96 executable=None, independent=False, extra_arguments=[]):
92 executable=None, independent=False, extra_arguments=[]):
97 """ Launches a localhost kernel, binding to the specified ports.
93 """ Launches a localhost kernel, binding to the specified ports.
98
94
99 Parameters
95 Parameters
100 ----------
96 ----------
101 code : str,
97 code : str,
102 A string of Python code that imports and executes a kernel entry point.
98 A string of Python code that imports and executes a kernel entry point.
103
99
104 stdin, stdout, stderr : optional (default None)
100 stdin, stdout, stderr : optional (default None)
105 Standards streams, as defined in subprocess.Popen.
101 Standards streams, as defined in subprocess.Popen.
106
102
107 fname : unicode, optional
103 fname : unicode, optional
108 The JSON connector file, containing ip/port/hmac key information.
104 The JSON connector file, containing ip/port/hmac key information.
109
105
110 key : str, optional
106 key : str, optional
111 The Session key used for HMAC authentication.
107 The Session key used for HMAC authentication.
112
108
113 executable : str, optional (default sys.executable)
109 executable : str, optional (default sys.executable)
114 The Python executable to use for the kernel process.
110 The Python executable to use for the kernel process.
115
111
116 independent : bool, optional (default False)
112 independent : bool, optional (default False)
117 If set, the kernel process is guaranteed to survive if this process
113 If set, the kernel process is guaranteed to survive if this process
118 dies. If not set, an effort is made to ensure that the kernel is killed
114 dies. If not set, an effort is made to ensure that the kernel is killed
119 when this process dies. Note that in this case it is still good practice
115 when this process dies. Note that in this case it is still good practice
120 to kill kernels manually before exiting.
116 to kill kernels manually before exiting.
121
117
122 extra_arguments = list, optional
118 extra_arguments = list, optional
123 A list of extra arguments to pass when executing the launch code.
119 A list of extra arguments to pass when executing the launch code.
124
120
125 Returns
121 Returns
126 -------
122 -------
127 A tuple of form:
123 A tuple of form:
128 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
124 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
129 where kernel_process is a Popen object and the ports are integers.
125 where kernel_process is a Popen object and the ports are integers.
130 """
126 """
131
127
132 # Build the kernel launch command.
128 # Build the kernel launch command.
133 if executable is None:
129 if executable is None:
134 executable = sys.executable
130 executable = sys.executable
135 arguments = [ executable, '-c', code, '-f', fname ]
131 arguments = [ executable, '-c', code, '-f', fname ]
136 arguments.extend(extra_arguments)
132 arguments.extend(extra_arguments)
137
133
138 # Popen will fail (sometimes with a deadlock) if stdin, stdout, and stderr
134 # Popen will fail (sometimes with a deadlock) if stdin, stdout, and stderr
139 # are invalid. Unfortunately, there is in general no way to detect whether
135 # are invalid. Unfortunately, there is in general no way to detect whether
140 # they are valid. The following two blocks redirect them to (temporary)
136 # they are valid. The following two blocks redirect them to (temporary)
141 # pipes in certain important cases.
137 # pipes in certain important cases.
142
138
143 # If this process has been backgrounded, our stdin is invalid. Since there
139 # If this process has been backgrounded, our stdin is invalid. Since there
144 # is no compelling reason for the kernel to inherit our stdin anyway, we'll
140 # is no compelling reason for the kernel to inherit our stdin anyway, we'll
145 # place this one safe and always redirect.
141 # place this one safe and always redirect.
146 redirect_in = True
142 redirect_in = True
147 _stdin = PIPE if stdin is None else stdin
143 _stdin = PIPE if stdin is None else stdin
148
144
149 # If this process in running on pythonw, we know that stdin, stdout, and
145 # If this process in running on pythonw, we know that stdin, stdout, and
150 # stderr are all invalid.
146 # stderr are all invalid.
151 redirect_out = sys.executable.endswith('pythonw.exe')
147 redirect_out = sys.executable.endswith('pythonw.exe')
152 if redirect_out:
148 if redirect_out:
153 _stdout = PIPE if stdout is None else stdout
149 _stdout = PIPE if stdout is None else stdout
154 _stderr = PIPE if stderr is None else stderr
150 _stderr = PIPE if stderr is None else stderr
155 else:
151 else:
156 _stdout, _stderr = stdout, stderr
152 _stdout, _stderr = stdout, stderr
157
153
158 # Spawn a kernel.
154 # Spawn a kernel.
159 if sys.platform == 'win32':
155 if sys.platform == 'win32':
160 # Create a Win32 event for interrupting the kernel.
156 # Create a Win32 event for interrupting the kernel.
161 interrupt_event = ParentPollerWindows.create_interrupt_event()
157 interrupt_event = ParentPollerWindows.create_interrupt_event()
162 arguments += [ '--interrupt=%i'%interrupt_event ]
158 arguments += [ '--interrupt=%i'%interrupt_event ]
163
159
164 # If the kernel is running on pythonw and stdout/stderr are not been
160 # If the kernel is running on pythonw and stdout/stderr are not been
165 # re-directed, it will crash when more than 4KB of data is written to
161 # re-directed, it will crash when more than 4KB of data is written to
166 # stdout or stderr. This is a bug that has been with Python for a very
162 # stdout or stderr. This is a bug that has been with Python for a very
167 # long time; see http://bugs.python.org/issue706263.
163 # long time; see http://bugs.python.org/issue706263.
168 # A cleaner solution to this problem would be to pass os.devnull to
164 # A cleaner solution to this problem would be to pass os.devnull to
169 # Popen directly. Unfortunately, that does not work.
165 # Popen directly. Unfortunately, that does not work.
170 if executable.endswith('pythonw.exe'):
166 if executable.endswith('pythonw.exe'):
171 if stdout is None:
167 if stdout is None:
172 arguments.append('--no-stdout')
168 arguments.append('--no-stdout')
173 if stderr is None:
169 if stderr is None:
174 arguments.append('--no-stderr')
170 arguments.append('--no-stderr')
175
171
176 # Launch the kernel process.
172 # Launch the kernel process.
177 if independent:
173 if independent:
178 proc = Popen(arguments,
174 proc = Popen(arguments,
179 creationflags=512, # CREATE_NEW_PROCESS_GROUP
175 creationflags=512, # CREATE_NEW_PROCESS_GROUP
180 stdin=_stdin, stdout=_stdout, stderr=_stderr)
176 stdin=_stdin, stdout=_stdout, stderr=_stderr)
181 else:
177 else:
182 from _subprocess import DuplicateHandle, GetCurrentProcess, \
178 from _subprocess import DuplicateHandle, GetCurrentProcess, \
183 DUPLICATE_SAME_ACCESS
179 DUPLICATE_SAME_ACCESS
184 pid = GetCurrentProcess()
180 pid = GetCurrentProcess()
185 handle = DuplicateHandle(pid, pid, pid, 0,
181 handle = DuplicateHandle(pid, pid, pid, 0,
186 True, # Inheritable by new processes.
182 True, # Inheritable by new processes.
187 DUPLICATE_SAME_ACCESS)
183 DUPLICATE_SAME_ACCESS)
188 proc = Popen(arguments + ['--parent=%i'%int(handle)],
184 proc = Popen(arguments + ['--parent=%i'%int(handle)],
189 stdin=_stdin, stdout=_stdout, stderr=_stderr)
185 stdin=_stdin, stdout=_stdout, stderr=_stderr)
190
186
191 # Attach the interrupt event to the Popen objet so it can be used later.
187 # Attach the interrupt event to the Popen objet so it can be used later.
192 proc.win32_interrupt_event = interrupt_event
188 proc.win32_interrupt_event = interrupt_event
193
189
194 else:
190 else:
195 if independent:
191 if independent:
196 proc = Popen(arguments, preexec_fn=lambda: os.setsid(),
192 proc = Popen(arguments, preexec_fn=lambda: os.setsid(),
197 stdin=_stdin, stdout=_stdout, stderr=_stderr)
193 stdin=_stdin, stdout=_stdout, stderr=_stderr)
198 else:
194 else:
199 proc = Popen(arguments + ['--parent=1'],
195 proc = Popen(arguments + ['--parent=1'],
200 stdin=_stdin, stdout=_stdout, stderr=_stderr)
196 stdin=_stdin, stdout=_stdout, stderr=_stderr)
201
197
202 # Clean up pipes created to work around Popen bug.
198 # Clean up pipes created to work around Popen bug.
203 if redirect_in:
199 if redirect_in:
204 if stdin is None:
200 if stdin is None:
205 proc.stdin.close()
201 proc.stdin.close()
206 if redirect_out:
202 if redirect_out:
207 if stdout is None:
203 if stdout is None:
208 proc.stdout.close()
204 proc.stdout.close()
209 if stderr is None:
205 if stderr is None:
210 proc.stderr.close()
206 proc.stderr.close()
211
207
212 return proc
208 return proc
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now