Show More
@@ -116,7 +116,10 flags.update({ | |||
|
116 | 116 | select one of the true db backends. |
|
117 | 117 | """), |
|
118 | 118 | 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}}, |
|
119 | 'reuse existing json connection files') | |
|
119 | 'reuse existing json connection files'), | |
|
120 | 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}}, | |
|
121 | 'Attempt to restore engines from a JSON file. ' | |
|
122 | 'For use when resuming a crashed controller'), | |
|
120 | 123 | }) |
|
121 | 124 | |
|
122 | 125 | flags.update(session_flags) |
@@ -156,6 +159,10 class IPControllerApp(BaseParallelApplication): | |||
|
156 | 159 | If False, connection files will be removed on a clean exit. |
|
157 | 160 | """ |
|
158 | 161 | ) |
|
162 | restore_engines = Bool(False, config=True, | |
|
163 | help="""Reload engine state from JSON file | |
|
164 | """ | |
|
165 | ) | |
|
159 | 166 | ssh_server = Unicode(u'', config=True, |
|
160 | 167 | help="""ssh url for clients to use when connecting to the Controller |
|
161 | 168 | processes. It should be of the form: [user@]server[:port]. The |
@@ -209,15 +216,11 class IPControllerApp(BaseParallelApplication): | |||
|
209 | 216 | def save_connection_dict(self, fname, cdict): |
|
210 | 217 | """save a connection dict to json file.""" |
|
211 | 218 | c = self.config |
|
212 |
url = cdict[' |
|
|
219 | url = cdict['registration'] | |
|
213 | 220 | location = cdict['location'] |
|
221 | ||
|
214 | 222 | if not location: |
|
215 | 223 | try: |
|
216 | proto,ip,port = split_url(url) | |
|
217 | except AssertionError: | |
|
218 | pass | |
|
219 | else: | |
|
220 | try: | |
|
221 | 224 |
|
|
222 | 225 |
|
|
223 | 226 |
|
@@ -235,35 +238,51 class IPControllerApp(BaseParallelApplication): | |||
|
235 | 238 | """load config from existing json connector files.""" |
|
236 | 239 | c = self.config |
|
237 | 240 | self.log.debug("loading config from JSON") |
|
238 | # load from engine config | |
|
241 | ||
|
242 | # load engine config | |
|
243 | ||
|
239 | 244 | fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file) |
|
240 | 245 | self.log.info("loading connection info from %s", fname) |
|
241 | 246 | with open(fname) as f: |
|
242 | cfg = json.loads(f.read()) | |
|
243 | key = cfg['exec_key'] | |
|
247 | ecfg = json.loads(f.read()) | |
|
248 | ||
|
244 | 249 | # json gives unicode, Session.key wants bytes |
|
245 | c.Session.key = key.encode('ascii') | |
|
246 | xport,addr = cfg['url'].split('://') | |
|
247 | c.HubFactory.engine_transport = xport | |
|
248 | ip,ports = addr.split(':') | |
|
250 | c.Session.key = ecfg['exec_key'].encode('ascii') | |
|
251 | ||
|
252 | xport,ip = ecfg['interface'].split('://') | |
|
253 | ||
|
249 | 254 | c.HubFactory.engine_ip = ip |
|
250 |
c.HubFactory. |
|
|
251 | self.location = cfg['location'] | |
|
255 | c.HubFactory.engine_transport = xport | |
|
256 | ||
|
257 | self.location = ecfg['location'] | |
|
252 | 258 | if not self.engine_ssh_server: |
|
253 | self.engine_ssh_server = cfg['ssh'] | |
|
259 | self.engine_ssh_server = ecfg['ssh'] | |
|
260 | ||
|
254 | 261 | # load client config |
|
262 | ||
|
255 | 263 | fname = os.path.join(self.profile_dir.security_dir, self.client_json_file) |
|
256 | 264 | self.log.info("loading connection info from %s", fname) |
|
257 | 265 | with open(fname) as f: |
|
258 | cfg = json.loads(f.read()) | |
|
259 | assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys" | |
|
260 | xport,addr = cfg['url'].split('://') | |
|
266 | ccfg = json.loads(f.read()) | |
|
267 | ||
|
268 | for key in ('exec_key', 'registration', 'pack', 'unpack'): | |
|
269 | assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key | |
|
270 | ||
|
271 | xport,addr = ccfg['interface'].split('://') | |
|
272 | ||
|
261 | 273 | c.HubFactory.client_transport = xport |
|
262 | ip,ports = addr.split(':') | |
|
263 | 274 | c.HubFactory.client_ip = ip |
|
264 | 275 | if not self.ssh_server: |
|
265 | self.ssh_server = cfg['ssh'] | |
|
266 | assert int(ports) == c.HubFactory.regport, "regport mismatch" | |
|
276 | self.ssh_server = ccfg['ssh'] | |
|
277 | ||
|
278 | # load port config: | |
|
279 | c.HubFactory.regport = ecfg['registration'] | |
|
280 | c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong']) | |
|
281 | c.HubFactory.control = (ccfg['control'], ecfg['control']) | |
|
282 | c.HubFactory.mux = (ccfg['mux'], ecfg['mux']) | |
|
283 | c.HubFactory.task = (ccfg['task'], ecfg['task']) | |
|
284 | c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub']) | |
|
285 | c.HubFactory.notifier_port = ccfg['notification'] | |
|
267 | 286 | |
|
268 | 287 | def cleanup_connection_files(self): |
|
269 | 288 | if self.reuse_files: |
@@ -314,29 +333,42 class IPControllerApp(BaseParallelApplication): | |||
|
314 | 333 | if self.write_connection_files: |
|
315 | 334 | # save to new json config files |
|
316 | 335 | f = self.factory |
|
317 | cdict = {'exec_key' : f.session.key.decode('ascii'), | |
|
318 | 'ssh' : self.ssh_server, | |
|
319 | 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), | |
|
320 | 'location' : self.location | |
|
336 | base = { | |
|
337 | 'exec_key' : f.session.key.decode('ascii'), | |
|
338 | 'location' : self.location, | |
|
339 | 'pack' : f.session.packer, | |
|
340 | 'unpack' : f.session.unpacker, | |
|
321 | 341 |
|
|
342 | ||
|
343 | cdict = {'ssh' : self.ssh_server} | |
|
344 | cdict.update(f.client_info) | |
|
345 | cdict.update(base) | |
|
322 | 346 | self.save_connection_dict(self.client_json_file, cdict) |
|
323 |
|
|
|
324 | edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) | |
|
325 |
edict |
|
|
347 | ||
|
348 | edict = {'ssh' : self.engine_ssh_server} | |
|
349 | edict.update(f.engine_info) | |
|
350 | edict.update(base) | |
|
326 | 351 | self.save_connection_dict(self.engine_json_file, edict) |
|
327 | 352 | |
|
353 | fname = "engines%s.json" % self.cluster_id | |
|
354 | self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname) | |
|
355 | if self.restore_engines: | |
|
356 | self.factory.hub._load_engine_state() | |
|
357 | ||
|
328 | 358 | def init_schedulers(self): |
|
329 | 359 | children = self.children |
|
330 | 360 | mq = import_item(str(self.mq_class)) |
|
331 | 361 | |
|
332 |
|
|
|
362 | f = self.factory | |
|
363 | ident = f.session.bsession | |
|
333 | 364 | # disambiguate url, in case of * |
|
334 |
monitor_url = disambiguate_url( |
|
|
365 | monitor_url = disambiguate_url(f.monitor_url) | |
|
335 | 366 | # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url |
|
336 | 367 | # IOPub relay (in a Process) |
|
337 | 368 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') |
|
338 |
q.bind_in( |
|
|
339 | q.bind_out(hub.engine_info['iopub']) | |
|
369 | q.bind_in(f.client_url('iopub')) | |
|
370 | q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub") | |
|
371 | q.bind_out(f.engine_url('iopub')) | |
|
340 | 372 | q.setsockopt_out(zmq.SUBSCRIBE, b'') |
|
341 | 373 | q.connect_mon(monitor_url) |
|
342 | 374 | q.daemon=True |
@@ -344,18 +376,20 class IPControllerApp(BaseParallelApplication): | |||
|
344 | 376 | |
|
345 | 377 | # Multiplexer Queue (in a Process) |
|
346 | 378 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') |
|
347 |
q.bind_in( |
|
|
348 | q.setsockopt_in(zmq.IDENTITY, b'mux') | |
|
349 |
q.bind_out( |
|
|
379 | q.bind_in(f.client_url('mux')) | |
|
380 | q.setsockopt_in(zmq.IDENTITY, b'mux_in') | |
|
381 | q.bind_out(f.engine_url('mux')) | |
|
382 | q.setsockopt_out(zmq.IDENTITY, b'mux_out') | |
|
350 | 383 | q.connect_mon(monitor_url) |
|
351 | 384 | q.daemon=True |
|
352 | 385 | children.append(q) |
|
353 | 386 | |
|
354 | 387 | # Control Queue (in a Process) |
|
355 | 388 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol') |
|
356 |
q.bind_in( |
|
|
357 | q.setsockopt_in(zmq.IDENTITY, b'control') | |
|
358 |
q.bind_out( |
|
|
389 | q.bind_in(f.client_url('control')) | |
|
390 | q.setsockopt_in(zmq.IDENTITY, b'control_in') | |
|
391 | q.bind_out(f.engine_url('control')) | |
|
392 | q.setsockopt_out(zmq.IDENTITY, b'control_out') | |
|
359 | 393 | q.connect_mon(monitor_url) |
|
360 | 394 | q.daemon=True |
|
361 | 395 | children.append(q) |
@@ -368,9 +402,10 class IPControllerApp(BaseParallelApplication): | |||
|
368 | 402 | self.log.warn("task::using pure DEALER Task scheduler") |
|
369 | 403 | q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask') |
|
370 | 404 | # q.setsockopt_out(zmq.HWM, hub.hwm) |
|
371 |
q.bind_in( |
|
|
372 | q.setsockopt_in(zmq.IDENTITY, b'task') | |
|
373 |
q.bind_out( |
|
|
405 | q.bind_in(f.client_url('task')) | |
|
406 | q.setsockopt_in(zmq.IDENTITY, b'task_in') | |
|
407 | q.bind_out(f.engine_url('task')) | |
|
408 | q.setsockopt_out(zmq.IDENTITY, b'task_out') | |
|
374 | 409 | q.connect_mon(monitor_url) |
|
375 | 410 | q.daemon=True |
|
376 | 411 | children.append(q) |
@@ -379,8 +414,10 class IPControllerApp(BaseParallelApplication): | |||
|
379 | 414 | |
|
380 | 415 | else: |
|
381 | 416 | self.log.info("task::using Python %s Task scheduler"%scheme) |
|
382 |
sargs = ( |
|
|
383 |
|
|
|
417 | sargs = (f.client_url('task'), f.engine_url('task'), | |
|
418 | monitor_url, disambiguate_url(f.client_url('notification')), | |
|
419 | disambiguate_url(f.client_url('registration')), | |
|
420 | ) | |
|
384 | 421 | kwargs = dict(logname='scheduler', loglevel=self.log_level, |
|
385 | 422 | log_url = self.log_url, config=dict(self.config)) |
|
386 | 423 | if 'Process' in self.mq_class: |
@@ -45,7 +45,7 from IPython.zmq.session import ( | |||
|
45 | 45 | from IPython.config.configurable import Configurable |
|
46 | 46 | |
|
47 | 47 | from IPython.parallel.engine.engine import EngineFactory |
|
48 |
from IPython.parallel.util import disambiguate_ |
|
|
48 | from IPython.parallel.util import disambiguate_ip_address | |
|
49 | 49 | |
|
50 | 50 | from IPython.utils.importstring import import_item |
|
51 | 51 | from IPython.utils.py3compat import cast_bytes |
@@ -211,24 +211,36 class IPEngineApp(BaseParallelApplication): | |||
|
211 | 211 | with open(self.url_file) as f: |
|
212 | 212 | d = json.loads(f.read()) |
|
213 | 213 | |
|
214 | if 'exec_key' in d: | |
|
215 | config.Session.key = cast_bytes(d['exec_key']) | |
|
216 | ||
|
214 | # allow hand-override of location for disambiguation | |
|
215 | # and ssh-server | |
|
217 | 216 | try: |
|
218 | 217 | config.EngineFactory.location |
|
219 | 218 | except AttributeError: |
|
220 | 219 | config.EngineFactory.location = d['location'] |
|
221 | 220 | |
|
222 | d['url'] = disambiguate_url(d['url'], config.EngineFactory.location) | |
|
223 | try: | |
|
224 | config.EngineFactory.url | |
|
225 | except AttributeError: | |
|
226 | config.EngineFactory.url = d['url'] | |
|
227 | ||
|
228 | 221 | try: |
|
229 | 222 | config.EngineFactory.sshserver |
|
230 | 223 | except AttributeError: |
|
231 |
config.EngineFactory.sshserver = d |
|
|
224 | config.EngineFactory.sshserver = d.get('ssh') | |
|
225 | ||
|
226 | location = config.EngineFactory.location | |
|
227 | ||
|
228 | proto, ip = d['interface'].split('://') | |
|
229 | ip = disambiguate_ip_address(ip) | |
|
230 | d['interface'] = '%s://%s' % (proto, ip) | |
|
231 | ||
|
232 | # DO NOT allow override of basic URLs, serialization, or exec_key | |
|
233 | # JSON file takes top priority there | |
|
234 | config.Session.key = cast_bytes(d['exec_key']) | |
|
235 | ||
|
236 | config.EngineFactory.url = d['interface'] + ':%i' % d['registration'] | |
|
237 | ||
|
238 | config.Session.packer = d['pack'] | |
|
239 | config.Session.unpacker = d['unpack'] | |
|
240 | ||
|
241 | self.log.debug("Config changed:") | |
|
242 | self.log.debug("%r", config) | |
|
243 | self.connection_info = d | |
|
232 | 244 | |
|
233 | 245 | def bind_kernel(self, **kwargs): |
|
234 | 246 | """Promote engine to listening kernel, accessible to frontends.""" |
@@ -320,7 +332,9 class IPEngineApp(BaseParallelApplication): | |||
|
320 | 332 | # shell_class = import_item(self.master_config.Global.shell_class) |
|
321 | 333 | # print self.config |
|
322 | 334 | try: |
|
323 |
self.engine = EngineFactory(config=config, log=self.log |
|
|
335 | self.engine = EngineFactory(config=config, log=self.log, | |
|
336 | connection_info=self.connection_info, | |
|
337 | ) | |
|
324 | 338 | except: |
|
325 | 339 | self.log.error("Couldn't start the Engine", exc_info=True) |
|
326 | 340 | self.exit(1) |
@@ -217,7 +217,9 class Client(HasTraits): | |||
|
217 | 217 | Parameters |
|
218 | 218 | ---------- |
|
219 | 219 | |
|
220 |
url_ |
|
|
220 | url_file : str/unicode; path to ipcontroller-client.json | |
|
221 | This JSON file should contain all the information needed to connect to a cluster, | |
|
222 | and is likely the only argument needed. | |
|
221 | 223 | Connection information for the Hub's registration. If a json connector |
|
222 | 224 | file is given, then likely no further configuration is necessary. |
|
223 | 225 | [Default: use profile] |
@@ -239,14 +241,6 class Client(HasTraits): | |||
|
239 | 241 | If specified, this will be relayed to the Session for configuration |
|
240 | 242 | username : str |
|
241 | 243 | set username for the session object |
|
242 | packer : str (import_string) or callable | |
|
243 | Can be either the simple keyword 'json' or 'pickle', or an import_string to a | |
|
244 | function to serialize messages. Must support same input as | |
|
245 | JSON, and output must be bytes. | |
|
246 | You can pass a callable directly as `pack` | |
|
247 | unpacker : str (import_string) or callable | |
|
248 | The inverse of packer. Only necessary if packer is specified as *not* one | |
|
249 | of 'json' or 'pickle'. | |
|
250 | 244 | |
|
251 | 245 | #-------------- ssh related args ---------------- |
|
252 | 246 | # These are args for configuring the ssh tunnel to be used |
@@ -271,17 +265,6 class Client(HasTraits): | |||
|
271 | 265 | flag for whether to use paramiko instead of shell ssh for tunneling. |
|
272 | 266 | [default: True on win32, False else] |
|
273 | 267 | |
|
274 | ------- exec authentication args ------- | |
|
275 | If even localhost is untrusted, you can have some protection against | |
|
276 | unauthorized execution by signing messages with HMAC digests. | |
|
277 | Messages are still sent as cleartext, so if someone can snoop your | |
|
278 | loopback traffic this will not protect your privacy, but will prevent | |
|
279 | unauthorized execution. | |
|
280 | ||
|
281 | exec_key : str | |
|
282 | an authentication key or file containing a key | |
|
283 | default: None | |
|
284 | ||
|
285 | 268 | |
|
286 | 269 | Attributes |
|
287 | 270 | ---------- |
@@ -378,8 +361,8 class Client(HasTraits): | |||
|
378 | 361 | # don't raise on positional args |
|
379 | 362 | return HasTraits.__new__(self, **kw) |
|
380 | 363 | |
|
381 |
def __init__(self, url_ |
|
|
382 |
context=None, debug=False, |
|
|
364 | def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None, | |
|
365 | context=None, debug=False, | |
|
383 | 366 | sshserver=None, sshkey=None, password=None, paramiko=None, |
|
384 | 367 | timeout=10, **extra_args |
|
385 | 368 | ): |
@@ -392,38 +375,45 class Client(HasTraits): | |||
|
392 | 375 | self._context = context |
|
393 | 376 | self._stop_spinning = Event() |
|
394 | 377 | |
|
378 | if 'url_or_file' in extra_args: | |
|
379 | url_file = extra_args['url_or_file'] | |
|
380 | warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning) | |
|
381 | ||
|
382 | if url_file and util.is_url(url_file): | |
|
383 | raise ValueError("single urls cannot be specified, url-files must be used.") | |
|
384 | ||
|
395 | 385 | self._setup_profile_dir(self.profile, profile_dir, ipython_dir) |
|
386 | ||
|
396 | 387 | if self._cd is not None: |
|
397 |
if url_ |
|
|
398 |
url_ |
|
|
399 |
if url_ |
|
|
388 | if url_file is None: | |
|
389 | url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') | |
|
390 | if url_file is None: | |
|
400 | 391 | raise ValueError( |
|
401 | 392 | "I can't find enough information to connect to a hub!" |
|
402 |
" Please specify at least one of url_ |
|
|
393 | " Please specify at least one of url_file or profile." | |
|
403 | 394 | ) |
|
404 | 395 | |
|
405 |
|
|
|
406 | # it's not a url, try for a file | |
|
407 | if not os.path.exists(url_or_file): | |
|
408 | if self._cd: | |
|
409 | url_or_file = os.path.join(self._cd.security_dir, url_or_file) | |
|
410 | if not os.path.exists(url_or_file): | |
|
411 | raise IOError("Connection file not found: %r" % url_or_file) | |
|
412 | with open(url_or_file) as f: | |
|
413 | cfg = json.loads(f.read()) | |
|
414 | else: | |
|
415 | cfg = {'url':url_or_file} | |
|
396 | with open(url_file) as f: | |
|
397 | cfg = json.load(f) | |
|
398 | ||
|
399 | self._task_scheme = cfg['task_scheme'] | |
|
416 | 400 | |
|
417 | 401 | # sync defaults from args, json: |
|
418 | 402 | if sshserver: |
|
419 | 403 | cfg['ssh'] = sshserver |
|
420 | if exec_key: | |
|
421 | cfg['exec_key'] = exec_key | |
|
422 | exec_key = cfg['exec_key'] | |
|
404 | ||
|
423 | 405 | location = cfg.setdefault('location', None) |
|
424 | cfg['url'] = util.disambiguate_url(cfg['url'], location) | |
|
425 | url = cfg['url'] | |
|
426 | proto,addr,port = util.split_url(url) | |
|
406 | ||
|
407 | proto,addr = cfg['interface'].split('://') | |
|
408 | addr = util.disambiguate_ip_address(addr) | |
|
409 | cfg['interface'] = "%s://%s" % (proto, addr) | |
|
410 | ||
|
411 | # turn interface,port into full urls: | |
|
412 | for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'): | |
|
413 | cfg[key] = cfg['interface'] + ':%i' % cfg[key] | |
|
414 | ||
|
415 | url = cfg['registration'] | |
|
416 | ||
|
427 | 417 | if location is not None and addr == '127.0.0.1': |
|
428 | 418 | # location specified, and connection is expected to be local |
|
429 | 419 | if location not in LOCAL_IPS and not sshserver: |
@@ -448,7 +438,7 class Client(HasTraits): | |||
|
448 | 438 | self._ssh = bool(sshserver or sshkey or password) |
|
449 | 439 | if self._ssh and sshserver is None: |
|
450 | 440 | # default to ssh via localhost |
|
451 | sshserver = url.split('://')[1].split(':')[0] | |
|
441 | sshserver = addr | |
|
452 | 442 | if self._ssh and password is None: |
|
453 | 443 | if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko): |
|
454 | 444 | password=False |
@@ -457,20 +447,18 class Client(HasTraits): | |||
|
457 | 447 | ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko) |
|
458 | 448 | |
|
459 | 449 | # configure and construct the session |
|
460 | if exec_key is not None: | |
|
461 | if os.path.isfile(exec_key): | |
|
462 |
|
|
|
463 |
|
|
|
464 | exec_key = cast_bytes(exec_key) | |
|
465 | extra_args['key'] = exec_key | |
|
450 | extra_args['packer'] = cfg['pack'] | |
|
451 | extra_args['unpacker'] = cfg['unpack'] | |
|
452 | extra_args['key'] = cast_bytes(cfg['exec_key']) | |
|
453 | ||
|
466 | 454 | self.session = Session(**extra_args) |
|
467 | 455 | |
|
468 | 456 | self._query_socket = self._context.socket(zmq.DEALER) |
|
469 | self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) | |
|
457 | ||
|
470 | 458 | if self._ssh: |
|
471 |
tunnel.tunnel_connection(self._query_socket, |
|
|
459 | tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs) | |
|
472 | 460 | else: |
|
473 |
self._query_socket.connect( |
|
|
461 | self._query_socket.connect(cfg['registration']) | |
|
474 | 462 | |
|
475 | 463 | self.session.debug = self.debug |
|
476 | 464 | |
@@ -520,8 +508,9 class Client(HasTraits): | |||
|
520 | 508 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" |
|
521 | 509 | for k,v in engines.iteritems(): |
|
522 | 510 | eid = int(k) |
|
523 |
self._engines |
|
|
511 | if eid not in self._engines: | |
|
524 | 512 | self._ids.append(eid) |
|
513 | self._engines[eid] = v | |
|
525 | 514 | self._ids = sorted(self._ids) |
|
526 | 515 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ |
|
527 | 516 | self._task_scheme == 'pure' and self._task_socket: |
@@ -583,7 +572,7 class Client(HasTraits): | |||
|
583 | 572 | self._connected=True |
|
584 | 573 | |
|
585 | 574 | def connect_socket(s, url): |
|
586 | url = util.disambiguate_url(url, self._config['location']) | |
|
575 | # url = util.disambiguate_url(url, self._config['location']) | |
|
587 | 576 | if self._ssh: |
|
588 | 577 | return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) |
|
589 | 578 | else: |
@@ -600,38 +589,28 class Client(HasTraits): | |||
|
600 | 589 | idents,msg = self.session.recv(self._query_socket,mode=0) |
|
601 | 590 | if self.debug: |
|
602 | 591 | pprint(msg) |
|
603 | msg = Message(msg) | |
|
604 | content = msg.content | |
|
605 | self._config['registration'] = dict(content) | |
|
606 |
if content |
|
|
607 | ident = self.session.bsession | |
|
608 | if content.mux: | |
|
592 | content = msg['content'] | |
|
593 | # self._config['registration'] = dict(content) | |
|
594 | cfg = self._config | |
|
595 | if content['status'] == 'ok': | |
|
609 | 596 |
|
|
610 | self._mux_socket.setsockopt(zmq.IDENTITY, ident) | |
|
611 | connect_socket(self._mux_socket, content.mux) | |
|
612 | if content.task: | |
|
613 | self._task_scheme, task_addr = content.task | |
|
597 | connect_socket(self._mux_socket, cfg['mux']) | |
|
598 | ||
|
614 | 599 |
|
|
615 | self._task_socket.setsockopt(zmq.IDENTITY, ident) | |
|
616 | connect_socket(self._task_socket, task_addr) | |
|
617 | if content.notification: | |
|
600 | connect_socket(self._task_socket, cfg['task']) | |
|
601 | ||
|
618 | 602 |
|
|
619 | connect_socket(self._notification_socket, content.notification) | |
|
620 | 603 |
|
|
621 | # if content.query: | |
|
622 | # self._query_socket = self._context.socket(zmq.DEALER) | |
|
623 | # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) | |
|
624 | # connect_socket(self._query_socket, content.query) | |
|
625 | if content.control: | |
|
604 | connect_socket(self._notification_socket, cfg['notification']) | |
|
605 | ||
|
626 | 606 |
|
|
627 | self._control_socket.setsockopt(zmq.IDENTITY, ident) | |
|
628 | connect_socket(self._control_socket, content.control) | |
|
629 | if content.iopub: | |
|
607 | connect_socket(self._control_socket, cfg['control']) | |
|
608 | ||
|
630 | 609 |
|
|
631 | 610 |
|
|
632 | self._iopub_socket.setsockopt(zmq.IDENTITY, ident) | |
|
633 | connect_socket(self._iopub_socket, content.iopub) | |
|
634 |
self._update_engines(dict(content |
|
|
611 | connect_socket(self._iopub_socket, cfg['iopub']) | |
|
612 | ||
|
613 | self._update_engines(dict(content['engines'])) | |
|
635 | 614 | else: |
|
636 | 615 | self._connected = False |
|
637 | 616 | raise Exception("Failed to connect!") |
@@ -674,7 +653,7 class Client(HasTraits): | |||
|
674 | 653 | """Register a new engine, and update our connection info.""" |
|
675 | 654 | content = msg['content'] |
|
676 | 655 | eid = content['id'] |
|
677 |
d = {eid : content[' |
|
|
656 | d = {eid : content['uuid']} | |
|
678 | 657 | self._update_engines(d) |
|
679 | 658 | |
|
680 | 659 | def _unregister_engine(self, msg): |
@@ -18,6 +18,8 Authors: | |||
|
18 | 18 | #----------------------------------------------------------------------------- |
|
19 | 19 | from __future__ import print_function |
|
20 | 20 | |
|
21 | import json | |
|
22 | import os | |
|
21 | 23 | import sys |
|
22 | 24 | import time |
|
23 | 25 | from datetime import datetime |
@@ -107,17 +109,16 class EngineConnector(HasTraits): | |||
|
107 | 109 | """A simple object for accessing the various zmq connections of an object. |
|
108 | 110 | Attributes are: |
|
109 | 111 | id (int): engine ID |
|
110 | uuid (str): uuid (unused?) | |
|
111 | queue (str): identity of queue's DEALER socket | |
|
112 | registration (str): identity of registration DEALER socket | |
|
113 | heartbeat (str): identity of heartbeat DEALER socket | |
|
112 | uuid (unicode): engine UUID | |
|
113 | pending: set of msg_ids | |
|
114 | stallback: DelayedCallback for stalled registration | |
|
114 | 115 | """ |
|
116 | ||
|
115 | 117 | id=Integer(0) |
|
116 | queue=CBytes() | |
|
117 | control=CBytes() | |
|
118 | registration=CBytes() | |
|
119 | heartbeat=CBytes() | |
|
118 | uuid = Unicode() | |
|
120 | 119 | pending=Set() |
|
120 | stallback = Instance(ioloop.DelayedCallback) | |
|
121 | ||
|
121 | 122 | |
|
122 | 123 | _db_shortcuts = { |
|
123 | 124 | 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB', |
@@ -131,29 +132,29 class HubFactory(RegistrationFactory): | |||
|
131 | 132 | |
|
132 | 133 | # port-pairs for monitoredqueues: |
|
133 | 134 | hb = Tuple(Integer,Integer,config=True, |
|
134 |
help=""" |
|
|
135 | help="""PUB/ROUTER Port pair for Engine heartbeats""") | |
|
135 | 136 | def _hb_default(self): |
|
136 | 137 | return tuple(util.select_random_ports(2)) |
|
137 | 138 | |
|
138 | 139 | mux = Tuple(Integer,Integer,config=True, |
|
139 |
help=""" |
|
|
140 | help="""Client/Engine Port pair for MUX queue""") | |
|
140 | 141 | |
|
141 | 142 | def _mux_default(self): |
|
142 | 143 | return tuple(util.select_random_ports(2)) |
|
143 | 144 | |
|
144 | 145 | task = Tuple(Integer,Integer,config=True, |
|
145 |
help=""" |
|
|
146 | help="""Client/Engine Port pair for Task queue""") | |
|
146 | 147 | def _task_default(self): |
|
147 | 148 | return tuple(util.select_random_ports(2)) |
|
148 | 149 | |
|
149 | 150 | control = Tuple(Integer,Integer,config=True, |
|
150 |
help=""" |
|
|
151 | help="""Client/Engine Port pair for Control queue""") | |
|
151 | 152 | |
|
152 | 153 | def _control_default(self): |
|
153 | 154 | return tuple(util.select_random_ports(2)) |
|
154 | 155 | |
|
155 | 156 | iopub = Tuple(Integer,Integer,config=True, |
|
156 |
help=""" |
|
|
157 | help="""Client/Engine Port pair for IOPub relay""") | |
|
157 | 158 | |
|
158 | 159 | def _iopub_default(self): |
|
159 | 160 | return tuple(util.select_random_ports(2)) |
@@ -231,38 +232,77 class HubFactory(RegistrationFactory): | |||
|
231 | 232 | self.heartmonitor.start() |
|
232 | 233 | self.log.info("Heartmonitor started") |
|
233 | 234 | |
|
235 | def client_url(self, channel): | |
|
236 | """return full zmq url for a named client channel""" | |
|
237 | return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel]) | |
|
238 | ||
|
239 | def engine_url(self, channel): | |
|
240 | """return full zmq url for a named engine channel""" | |
|
241 | return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel]) | |
|
242 | ||
|
234 | 243 | def init_hub(self): |
|
235 | """construct""" | |
|
236 | client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i" | |
|
237 | engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i" | |
|
244 | """construct Hub object""" | |
|
238 | 245 | |
|
239 | 246 | ctx = self.context |
|
240 | 247 | loop = self.loop |
|
241 | 248 | |
|
249 | try: | |
|
250 | scheme = self.config.TaskScheduler.scheme_name | |
|
251 | except AttributeError: | |
|
252 | from .scheduler import TaskScheduler | |
|
253 | scheme = TaskScheduler.scheme_name.get_default_value() | |
|
254 | ||
|
255 | # build connection dicts | |
|
256 | engine = self.engine_info = { | |
|
257 | 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip), | |
|
258 | 'registration' : self.regport, | |
|
259 | 'control' : self.control[1], | |
|
260 | 'mux' : self.mux[1], | |
|
261 | 'hb_ping' : self.hb[0], | |
|
262 | 'hb_pong' : self.hb[1], | |
|
263 | 'task' : self.task[1], | |
|
264 | 'iopub' : self.iopub[1], | |
|
265 | } | |
|
266 | ||
|
267 | client = self.client_info = { | |
|
268 | 'interface' : "%s://%s" % (self.client_transport, self.client_ip), | |
|
269 | 'registration' : self.regport, | |
|
270 | 'control' : self.control[0], | |
|
271 | 'mux' : self.mux[0], | |
|
272 | 'task' : self.task[0], | |
|
273 | 'task_scheme' : scheme, | |
|
274 | 'iopub' : self.iopub[0], | |
|
275 | 'notification' : self.notifier_port, | |
|
276 | } | |
|
277 | ||
|
278 | self.log.debug("Hub engine addrs: %s", self.engine_info) | |
|
279 | self.log.debug("Hub client addrs: %s", self.client_info) | |
|
280 | ||
|
242 | 281 | # Registrar socket |
|
243 | 282 | q = ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
244 |
q.bind( |
|
|
245 |
self.log.info("Hub listening on %s for registration.", |
|
|
283 | q.bind(self.client_url('registration')) | |
|
284 | self.log.info("Hub listening on %s for registration.", self.client_url('registration')) | |
|
246 | 285 | if self.client_ip != self.engine_ip: |
|
247 |
q.bind( |
|
|
248 |
self.log.info("Hub listening on %s for registration.", |
|
|
286 | q.bind(self.engine_url('registration')) | |
|
287 | self.log.info("Hub listening on %s for registration.", self.engine_url('registration')) | |
|
249 | 288 | |
|
250 | 289 | ### Engine connections ### |
|
251 | 290 | |
|
252 | 291 | # heartbeat |
|
253 | 292 | hpub = ctx.socket(zmq.PUB) |
|
254 |
hpub.bind( |
|
|
293 | hpub.bind(self.engine_url('hb_ping')) | |
|
255 | 294 | hrep = ctx.socket(zmq.ROUTER) |
|
256 |
hrep.bind( |
|
|
295 | hrep.bind(self.engine_url('hb_pong')) | |
|
257 | 296 | self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, |
|
258 | 297 | pingstream=ZMQStream(hpub,loop), |
|
259 | 298 | pongstream=ZMQStream(hrep,loop) |
|
260 | 299 | ) |
|
261 | 300 | |
|
262 | 301 | ### Client connections ### |
|
302 | ||
|
263 | 303 | # Notifier socket |
|
264 | 304 | n = ZMQStream(ctx.socket(zmq.PUB), loop) |
|
265 |
n.bind( |
|
|
305 | n.bind(self.client_url('notification')) | |
|
266 | 306 | |
|
267 | 307 | ### build and launch the queues ### |
|
268 | 308 | |
@@ -279,35 +319,10 class HubFactory(RegistrationFactory): | |||
|
279 | 319 | self.db = import_item(str(db_class))(session=self.session.session, |
|
280 | 320 | config=self.config, log=self.log) |
|
281 | 321 | time.sleep(.25) |
|
282 | try: | |
|
283 | scheme = self.config.TaskScheduler.scheme_name | |
|
284 | except AttributeError: | |
|
285 | from .scheduler import TaskScheduler | |
|
286 | scheme = TaskScheduler.scheme_name.get_default_value() | |
|
287 | # build connection dicts | |
|
288 | self.engine_info = { | |
|
289 | 'control' : engine_iface%self.control[1], | |
|
290 | 'mux': engine_iface%self.mux[1], | |
|
291 | 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]), | |
|
292 | 'task' : engine_iface%self.task[1], | |
|
293 | 'iopub' : engine_iface%self.iopub[1], | |
|
294 | # 'monitor' : engine_iface%self.mon_port, | |
|
295 | } | |
|
296 | ||
|
297 | self.client_info = { | |
|
298 | 'control' : client_iface%self.control[0], | |
|
299 | 'mux': client_iface%self.mux[0], | |
|
300 | 'task' : (scheme, client_iface%self.task[0]), | |
|
301 | 'iopub' : client_iface%self.iopub[0], | |
|
302 | 'notification': client_iface%self.notifier_port | |
|
303 | } | |
|
304 | self.log.debug("Hub engine addrs: %s", self.engine_info) | |
|
305 | self.log.debug("Hub client addrs: %s", self.client_info) | |
|
306 | 322 | |
|
307 | 323 | # resubmit stream |
|
308 | 324 | r = ZMQStream(ctx.socket(zmq.DEALER), loop) |
|
309 |
url = util.disambiguate_url(self.client_ |
|
|
310 | r.setsockopt(zmq.IDENTITY, self.session.bsession) | |
|
325 | url = util.disambiguate_url(self.client_url('task')) | |
|
311 | 326 | r.connect(url) |
|
312 | 327 | |
|
313 | 328 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
@@ -335,6 +350,9 class Hub(SessionFactory): | |||
|
335 | 350 | client_info: dict of zmq connection information for engines to connect |
|
336 | 351 | to the queues. |
|
337 | 352 | """ |
|
353 | ||
|
354 | engine_state_file = Unicode() | |
|
355 | ||
|
338 | 356 | # internal data structures: |
|
339 | 357 | ids=Set() # engine IDs |
|
340 | 358 | keytable=Dict() |
@@ -382,15 +400,6 class Hub(SessionFactory): | |||
|
382 | 400 | super(Hub, self).__init__(**kwargs) |
|
383 | 401 | self.registration_timeout = max(5000, 2*self.heartmonitor.period) |
|
384 | 402 | |
|
385 | # validate connection dicts: | |
|
386 | for k,v in self.client_info.iteritems(): | |
|
387 | if k == 'task': | |
|
388 | util.validate_url_container(v[1]) | |
|
389 | else: | |
|
390 | util.validate_url_container(v) | |
|
391 | # util.validate_url_container(self.client_info) | |
|
392 | util.validate_url_container(self.engine_info) | |
|
393 | ||
|
394 | 403 | # register our callbacks |
|
395 | 404 | self.query.on_recv(self.dispatch_query) |
|
396 | 405 | self.monitor.on_recv(self.dispatch_monitor_traffic) |
@@ -556,11 +565,11 class Hub(SessionFactory): | |||
|
556 | 565 | triggers unregistration""" |
|
557 | 566 | self.log.debug("heartbeat::handle_heart_failure(%r)", heart) |
|
558 | 567 | eid = self.hearts.get(heart, None) |
|
559 |
|
|
|
568 | uuid = self.engines[eid].uuid | |
|
560 | 569 | if eid is None or self.keytable[eid] in self.dead_engines: |
|
561 | 570 | self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart) |
|
562 | 571 | else: |
|
563 |
self.unregister_engine(heart, dict(content=dict(id=eid, queue= |
|
|
572 | self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid))) | |
|
564 | 573 | |
|
565 | 574 | #----------------------- MUX Queue Traffic ------------------------------ |
|
566 | 575 | |
@@ -585,7 +594,7 class Hub(SessionFactory): | |||
|
585 | 594 | self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid) |
|
586 | 595 | # Unicode in records |
|
587 | 596 | record['engine_uuid'] = queue_id.decode('ascii') |
|
588 |
record['client_uuid'] = |
|
|
597 | record['client_uuid'] = msg['header']['session'] | |
|
589 | 598 | record['queue'] = 'mux' |
|
590 | 599 | |
|
591 | 600 | try: |
@@ -677,7 +686,7 class Hub(SessionFactory): | |||
|
677 | 686 | return |
|
678 | 687 | record = init_record(msg) |
|
679 | 688 | |
|
680 |
record['client_uuid'] = |
|
|
689 | record['client_uuid'] = msg['header']['session'] | |
|
681 | 690 | record['queue'] = 'task' |
|
682 | 691 | header = msg['header'] |
|
683 | 692 | msg_id = header['msg_id'] |
@@ -865,11 +874,10 class Hub(SessionFactory): | |||
|
865 | 874 | """Reply with connection addresses for clients.""" |
|
866 | 875 | self.log.info("client::client %r connected", client_id) |
|
867 | 876 | content = dict(status='ok') |
|
868 | content.update(self.client_info) | |
|
869 | 877 | jsonable = {} |
|
870 | 878 | for k,v in self.keytable.iteritems(): |
|
871 | 879 | if v not in self.dead_engines: |
|
872 |
jsonable[str(k)] = v |
|
|
880 | jsonable[str(k)] = v | |
|
873 | 881 | content['engines'] = jsonable |
|
874 | 882 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) |
|
875 | 883 | |
@@ -877,48 +885,37 class Hub(SessionFactory): | |||
|
877 | 885 | """Register a new engine.""" |
|
878 | 886 | content = msg['content'] |
|
879 | 887 | try: |
|
880 |
|
|
|
888 | uuid = content['uuid'] | |
|
881 | 889 | except KeyError: |
|
882 | 890 | self.log.error("registration::queue not specified", exc_info=True) |
|
883 | 891 | return |
|
884 | heart = content.get('heartbeat', None) | |
|
885 | if heart: | |
|
886 | heart = cast_bytes(heart) | |
|
887 | """register a new engine, and create the socket(s) necessary""" | |
|
892 | ||
|
888 | 893 | eid = self._next_id |
|
889 | # print (eid, queue, reg, heart) | |
|
890 | 894 | |
|
891 |
self.log.debug("registration::register_engine(%i, %r |
|
|
895 | self.log.debug("registration::register_engine(%i, %r)", eid, uuid) | |
|
892 | 896 | |
|
893 | 897 | content = dict(id=eid,status='ok') |
|
894 | content.update(self.engine_info) | |
|
895 | 898 | # check if requesting available IDs: |
|
896 |
if |
|
|
897 | try: | |
|
898 | raise KeyError("queue_id %r in use" % queue) | |
|
899 | except: | |
|
900 | content = error.wrap_exception() | |
|
901 | self.log.error("queue_id %r in use", queue, exc_info=True) | |
|
902 | elif heart in self.hearts: # need to check unique hearts? | |
|
899 | if cast_bytes(uuid) in self.by_ident: | |
|
903 | 900 | try: |
|
904 |
raise KeyError(" |
|
|
901 | raise KeyError("uuid %r in use" % uuid) | |
|
905 | 902 | except: |
|
906 | self.log.error("heart_id %r in use", heart, exc_info=True) | |
|
907 | 903 | content = error.wrap_exception() |
|
904 | self.log.error("uuid %r in use", uuid, exc_info=True) | |
|
908 | 905 | else: |
|
909 |
for h, |
|
|
910 |
if |
|
|
906 | for h, ec in self.incoming_registrations.iteritems(): | |
|
907 | if uuid == h: | |
|
911 | 908 | try: |
|
912 |
raise KeyError("heart_id %r in use" % |
|
|
909 | raise KeyError("heart_id %r in use" % uuid) | |
|
913 | 910 | except: |
|
914 |
self.log.error("heart_id %r in use", |
|
|
911 | self.log.error("heart_id %r in use", uuid, exc_info=True) | |
|
915 | 912 | content = error.wrap_exception() |
|
916 | 913 | break |
|
917 |
elif |
|
|
914 | elif uuid == ec.uuid: | |
|
918 | 915 | try: |
|
919 |
raise KeyError(" |
|
|
916 | raise KeyError("uuid %r in use" % uuid) | |
|
920 | 917 | except: |
|
921 |
self.log.error(" |
|
|
918 | self.log.error("uuid %r in use", uuid, exc_info=True) | |
|
922 | 919 | content = error.wrap_exception() |
|
923 | 920 | break |
|
924 | 921 | |
@@ -926,18 +923,21 class Hub(SessionFactory): | |||
|
926 | 923 | content=content, |
|
927 | 924 | ident=reg) |
|
928 | 925 | |
|
926 | heart = cast_bytes(uuid) | |
|
927 | ||
|
929 | 928 | if content['status'] == 'ok': |
|
930 | 929 | if heart in self.heartmonitor.hearts: |
|
931 | 930 | # already beating |
|
932 |
self.incoming_registrations[heart] = (eid, |
|
|
931 | self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid) | |
|
933 | 932 | self.finish_registration(heart) |
|
934 | 933 | else: |
|
935 | 934 | purge = lambda : self._purge_stalled_registration(heart) |
|
936 | 935 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) |
|
937 | 936 | dc.start() |
|
938 |
self.incoming_registrations[heart] = (eid, |
|
|
937 | self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc) | |
|
939 | 938 | else: |
|
940 | 939 | self.log.error("registration::registration %i failed: %r", eid, content['evalue']) |
|
940 | ||
|
941 | 941 | return eid |
|
942 | 942 | |
|
943 | 943 | def unregister_engine(self, ident, msg): |
@@ -950,7 +950,7 class Hub(SessionFactory): | |||
|
950 | 950 | self.log.info("registration::unregister_engine(%r)", eid) |
|
951 | 951 | # print (eid) |
|
952 | 952 | uuid = self.keytable[eid] |
|
953 |
content=dict(id=eid, |
|
|
953 | content=dict(id=eid, uuid=uuid) | |
|
954 | 954 | self.dead_engines.add(uuid) |
|
955 | 955 | # self.ids.remove(eid) |
|
956 | 956 | # uuid = self.keytable.pop(eid) |
@@ -964,6 +964,8 class Hub(SessionFactory): | |||
|
964 | 964 | dc.start() |
|
965 | 965 | ############## TODO: HANDLE IT ################ |
|
966 | 966 | |
|
967 | self._save_engine_state() | |
|
968 | ||
|
967 | 969 | if self.notifier: |
|
968 | 970 | self.session.send(self.notifier, "unregistration_notification", content=content) |
|
969 | 971 | |
@@ -1001,36 +1003,97 class Hub(SessionFactory): | |||
|
1001 | 1003 | """Second half of engine registration, called after our HeartMonitor |
|
1002 | 1004 | has received a beat from the Engine's Heart.""" |
|
1003 | 1005 | try: |
|
1004 |
|
|
|
1006 | ec = self.incoming_registrations.pop(heart) | |
|
1005 | 1007 | except KeyError: |
|
1006 | 1008 | self.log.error("registration::tried to finish nonexistant registration", exc_info=True) |
|
1007 | 1009 | return |
|
1008 |
self.log.info("registration::finished registering engine %i:% |
|
|
1009 |
if |
|
|
1010 |
|
|
|
1011 | control = queue | |
|
1010 | self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid) | |
|
1011 | if ec.stallback is not None: | |
|
1012 | ec.stallback.stop() | |
|
1013 | eid = ec.id | |
|
1012 | 1014 | self.ids.add(eid) |
|
1013 |
self.keytable[eid] = |
|
|
1014 | self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg, | |
|
1015 | control=control, heartbeat=heart) | |
|
1016 | self.by_ident[queue] = eid | |
|
1015 | self.keytable[eid] = ec.uuid | |
|
1016 | self.engines[eid] = ec | |
|
1017 | self.by_ident[cast_bytes(ec.uuid)] = ec.id | |
|
1017 | 1018 | self.queues[eid] = list() |
|
1018 | 1019 | self.tasks[eid] = list() |
|
1019 | 1020 | self.completed[eid] = list() |
|
1020 | 1021 | self.hearts[heart] = eid |
|
1021 |
content = dict(id=eid, |
|
|
1022 | content = dict(id=eid, uuid=self.engines[eid].uuid) | |
|
1022 | 1023 | if self.notifier: |
|
1023 | 1024 | self.session.send(self.notifier, "registration_notification", content=content) |
|
1024 | 1025 | self.log.info("engine::Engine Connected: %i", eid) |
|
1025 | 1026 | |
|
1027 | self._save_engine_state() | |
|
1028 | ||
|
1026 | 1029 | def _purge_stalled_registration(self, heart): |
|
1027 | 1030 | if heart in self.incoming_registrations: |
|
1028 |
e |
|
|
1029 | self.log.info("registration::purging stalled registration: %i", eid) | |
|
1031 | ec = self.incoming_registrations.pop(heart) | |
|
1032 | self.log.info("registration::purging stalled registration: %i", ec.id) | |
|
1030 | 1033 | else: |
|
1031 | 1034 | pass |
|
1032 | 1035 | |
|
1033 | 1036 | #------------------------------------------------------------------------- |
|
1037 | # Engine State | |
|
1038 | #------------------------------------------------------------------------- | |
|
1039 | ||
|
1040 | ||
|
1041 | def _cleanup_engine_state_file(self): | |
|
1042 | """cleanup engine state mapping""" | |
|
1043 | ||
|
1044 | if os.path.exists(self.engine_state_file): | |
|
1045 | self.log.debug("cleaning up engine state: %s", self.engine_state_file) | |
|
1046 | try: | |
|
1047 | os.remove(self.engine_state_file) | |
|
1048 | except IOError: | |
|
1049 | self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True) | |
|
1050 | ||
|
1051 | ||
|
1052 | def _save_engine_state(self): | |
|
1053 | """save engine mapping to JSON file""" | |
|
1054 | if not self.engine_state_file: | |
|
1055 | return | |
|
1056 | self.log.debug("save engine state to %s" % self.engine_state_file) | |
|
1057 | state = {} | |
|
1058 | engines = {} | |
|
1059 | for eid, ec in self.engines.iteritems(): | |
|
1060 | if ec.uuid not in self.dead_engines: | |
|
1061 | engines[eid] = ec.uuid | |
|
1062 | ||
|
1063 | state['engines'] = engines | |
|
1064 | ||
|
1065 | state['next_id'] = self._idcounter | |
|
1066 | ||
|
1067 | with open(self.engine_state_file, 'w') as f: | |
|
1068 | json.dump(state, f) | |
|
1069 | ||
|
1070 | ||
|
1071 | def _load_engine_state(self): | |
|
1072 | """load engine mapping from JSON file""" | |
|
1073 | if not os.path.exists(self.engine_state_file): | |
|
1074 | return | |
|
1075 | ||
|
1076 | self.log.info("loading engine state from %s" % self.engine_state_file) | |
|
1077 | ||
|
1078 | with open(self.engine_state_file) as f: | |
|
1079 | state = json.load(f) | |
|
1080 | ||
|
1081 | save_notifier = self.notifier | |
|
1082 | self.notifier = None | |
|
1083 | for eid, uuid in state['engines'].iteritems(): | |
|
1084 | heart = uuid.encode('ascii') | |
|
1085 | # start with this heart as current and beating: | |
|
1086 | self.heartmonitor.responses.add(heart) | |
|
1087 | self.heartmonitor.hearts.add(heart) | |
|
1088 | ||
|
1089 | self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid) | |
|
1090 | self.finish_registration(heart) | |
|
1091 | ||
|
1092 | self.notifier = save_notifier | |
|
1093 | ||
|
1094 | self._idcounter = state['next_id'] | |
|
1095 | ||
|
1096 | #------------------------------------------------------------------------- | |
|
1034 | 1097 | # Client Requests |
|
1035 | 1098 | #------------------------------------------------------------------------- |
|
1036 | 1099 | |
@@ -1131,7 +1194,7 class Hub(SessionFactory): | |||
|
1131 | 1194 | except: |
|
1132 | 1195 | reply = error.wrap_exception() |
|
1133 | 1196 | break |
|
1134 |
uid = self.engines[eid]. |
|
|
1197 | uid = self.engines[eid].uuid | |
|
1135 | 1198 | try: |
|
1136 | 1199 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
|
1137 | 1200 | except Exception: |
@@ -1205,6 +1268,7 class Hub(SessionFactory): | |||
|
1205 | 1268 | self.db.add_record(msg_id, init_record(msg)) |
|
1206 | 1269 | except Exception: |
|
1207 | 1270 | self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) |
|
1271 | return finish(error.wrap_exception()) | |
|
1208 | 1272 | |
|
1209 | 1273 | finish(dict(status='ok', resubmitted=resubmitted)) |
|
1210 | 1274 |
@@ -189,6 +189,7 class TaskScheduler(SessionFactory): | |||
|
189 | 189 | engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream |
|
190 | 190 | notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream |
|
191 | 191 | mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream |
|
192 | query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream | |
|
192 | 193 | |
|
193 | 194 | # internals: |
|
194 | 195 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
@@ -216,6 +217,9 class TaskScheduler(SessionFactory): | |||
|
216 | 217 | return self.session.bsession |
|
217 | 218 | |
|
218 | 219 | def start(self): |
|
220 | self.query_stream.on_recv(self.dispatch_query_reply) | |
|
221 | self.session.send(self.query_stream, "connection_request", {}) | |
|
222 | ||
|
219 | 223 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
|
220 | 224 | self.client_stream.on_recv(self.dispatch_submission, copy=False) |
|
221 | 225 | |
@@ -242,6 +246,24 class TaskScheduler(SessionFactory): | |||
|
242 | 246 | #----------------------------------------------------------------------- |
|
243 | 247 | |
|
244 | 248 | |
|
249 | def dispatch_query_reply(self, msg): | |
|
250 | """handle reply to our initial connection request""" | |
|
251 | try: | |
|
252 | idents,msg = self.session.feed_identities(msg) | |
|
253 | except ValueError: | |
|
254 | self.log.warn("task::Invalid Message: %r",msg) | |
|
255 | return | |
|
256 | try: | |
|
257 | msg = self.session.unserialize(msg) | |
|
258 | except ValueError: | |
|
259 | self.log.warn("task::Unauthorized message from: %r"%idents) | |
|
260 | return | |
|
261 | ||
|
262 | content = msg['content'] | |
|
263 | for uuid in content.get('engines', {}).values(): | |
|
264 | self._register_engine(cast_bytes(uuid)) | |
|
265 | ||
|
266 | ||
|
245 | 267 | @util.log_errors |
|
246 | 268 | def dispatch_notification(self, msg): |
|
247 | 269 | """dispatch register/unregister events.""" |
@@ -263,7 +285,7 class TaskScheduler(SessionFactory): | |||
|
263 | 285 | self.log.error("Unhandled message type: %r"%msg_type) |
|
264 | 286 | else: |
|
265 | 287 | try: |
|
266 |
handler(cast_bytes(msg['content'][' |
|
|
288 | handler(cast_bytes(msg['content']['uuid'])) | |
|
267 | 289 | except Exception: |
|
268 | 290 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) |
|
269 | 291 | |
@@ -714,7 +736,7 class TaskScheduler(SessionFactory): | |||
|
714 | 736 | |
|
715 | 737 | |
|
716 | 738 | |
|
717 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, | |
|
739 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None, | |
|
718 | 740 | logname='root', log_url=None, loglevel=logging.DEBUG, |
|
719 | 741 | identity=b'task', in_thread=False): |
|
720 | 742 | |
@@ -734,11 +756,11 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, | |||
|
734 | 756 | ctx = zmq.Context() |
|
735 | 757 | loop = ioloop.IOLoop() |
|
736 | 758 | ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) |
|
737 | ins.setsockopt(zmq.IDENTITY, identity) | |
|
759 | ins.setsockopt(zmq.IDENTITY, identity + b'_in') | |
|
738 | 760 | ins.bind(in_addr) |
|
739 | 761 | |
|
740 | 762 | outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) |
|
741 | outs.setsockopt(zmq.IDENTITY, identity) | |
|
763 | outs.setsockopt(zmq.IDENTITY, identity + b'_out') | |
|
742 | 764 | outs.bind(out_addr) |
|
743 | 765 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) |
|
744 | 766 | mons.connect(mon_addr) |
@@ -746,6 +768,9 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, | |||
|
746 | 768 | nots.setsockopt(zmq.SUBSCRIBE, b'') |
|
747 | 769 | nots.connect(not_addr) |
|
748 | 770 | |
|
771 | querys = ZMQStream(ctx.socket(zmq.DEALER),loop) | |
|
772 | querys.connect(reg_addr) | |
|
773 | ||
|
749 | 774 | # setup logging. |
|
750 | 775 | if in_thread: |
|
751 | 776 | log = Application.instance().log |
@@ -757,6 +782,7 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, | |||
|
757 | 782 | |
|
758 | 783 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, |
|
759 | 784 | mon_stream=mons, notifier_stream=nots, |
|
785 | query_stream=querys, | |
|
760 | 786 | loop=loop, log=log, |
|
761 | 787 | config=config) |
|
762 | 788 | scheduler.start() |
@@ -50,7 +50,7 class EngineFactory(RegistrationFactory): | |||
|
50 | 50 | help="""The location (an IP address) of the controller. This is |
|
51 | 51 | used for disambiguating URLs, to determine whether |
|
52 | 52 | loopback should be used to connect or the public address.""") |
|
53 |
timeout=CFloat( |
|
|
53 | timeout=CFloat(5, config=True, | |
|
54 | 54 | help="""The time (in seconds) to wait for the Controller to respond |
|
55 | 55 | to registration requests before giving up.""") |
|
56 | 56 | sshserver=Unicode(config=True, |
@@ -61,6 +61,7 class EngineFactory(RegistrationFactory): | |||
|
61 | 61 | help="""Whether to use paramiko instead of openssh for tunnels.""") |
|
62 | 62 | |
|
63 | 63 | # not configurable: |
|
64 | connection_info = Dict() | |
|
64 | 65 | user_ns=Dict() |
|
65 | 66 | id=Integer(allow_none=True) |
|
66 | 67 | registrar=Instance('zmq.eventloop.zmqstream.ZMQStream') |
@@ -96,7 +97,7 class EngineFactory(RegistrationFactory): | |||
|
96 | 97 | def connect(s, url): |
|
97 | 98 | url = disambiguate_url(url, self.location) |
|
98 | 99 | if self.using_ssh: |
|
99 |
self.log.debug("Tunneling connection to %s via %s" |
|
|
100 | self.log.debug("Tunneling connection to %s via %s", url, self.sshserver) | |
|
100 | 101 | return tunnel.tunnel_connection(s, url, self.sshserver, |
|
101 | 102 | keyfile=self.sshkey, paramiko=self.paramiko, |
|
102 | 103 | password=password, |
@@ -108,12 +109,12 class EngineFactory(RegistrationFactory): | |||
|
108 | 109 | """like connect, but don't complete the connection (for use by heartbeat)""" |
|
109 | 110 | url = disambiguate_url(url, self.location) |
|
110 | 111 | if self.using_ssh: |
|
111 |
self.log.debug("Tunneling connection to %s via %s" |
|
|
112 | self.log.debug("Tunneling connection to %s via %s", url, self.sshserver) | |
|
112 | 113 | url,tunnelobj = tunnel.open_tunnel(url, self.sshserver, |
|
113 | 114 | keyfile=self.sshkey, paramiko=self.paramiko, |
|
114 | 115 | password=password, |
|
115 | 116 | ) |
|
116 | return url | |
|
117 | return str(url) | |
|
117 | 118 | return connect, maybe_tunnel |
|
118 | 119 | |
|
119 | 120 | def register(self): |
@@ -128,7 +129,7 class EngineFactory(RegistrationFactory): | |||
|
128 | 129 | self.registrar = zmqstream.ZMQStream(reg, self.loop) |
|
129 | 130 | |
|
130 | 131 | |
|
131 |
content = dict( |
|
|
132 | content = dict(uuid=self.ident) | |
|
132 | 133 | self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel)) |
|
133 | 134 | # print (self.session.key) |
|
134 | 135 | self.session.send(self.registrar, "registration_request",content=content) |
@@ -140,50 +141,43 class EngineFactory(RegistrationFactory): | |||
|
140 | 141 | loop = self.loop |
|
141 | 142 | identity = self.bident |
|
142 | 143 | idents,msg = self.session.feed_identities(msg) |
|
143 |
msg = |
|
|
144 | msg = self.session.unserialize(msg) | |
|
145 | content = msg['content'] | |
|
146 | info = self.connection_info | |
|
144 | 147 | |
|
145 | if msg.content.status == 'ok': | |
|
146 | self.id = int(msg.content.id) | |
|
148 | def url(key): | |
|
149 | """get zmq url for given channel""" | |
|
150 | return str(info["interface"] + ":%i" % info[key]) | |
|
147 | 151 | |
|
148 | # launch heartbeat | |
|
149 | hb_addrs = msg.content.heartbeat | |
|
152 | if content['status'] == 'ok': | |
|
153 | self.id = int(content['id']) | |
|
150 | 154 | |
|
155 | # launch heartbeat | |
|
151 | 156 | # possibly forward hb ports with tunnels |
|
152 |
hb_ |
|
|
153 | heart = Heart(*map(str, hb_addrs), heart_id=identity) | |
|
157 | hb_ping = maybe_tunnel(url('hb_ping')) | |
|
158 | hb_pong = maybe_tunnel(url('hb_pong')) | |
|
159 | ||
|
160 | heart = Heart(hb_ping, hb_pong, heart_id=identity) | |
|
154 | 161 | heart.start() |
|
155 | 162 | |
|
156 |
# create Shell |
|
|
157 | queue_addr = msg.content.mux | |
|
158 | shell_addrs = [ str(queue_addr) ] | |
|
159 | task_addr = msg.content.task | |
|
160 | if task_addr: | |
|
161 | shell_addrs.append(str(task_addr)) | |
|
162 | ||
|
163 | # Uncomment this to go back to two-socket model | |
|
164 | # shell_streams = [] | |
|
165 | # for addr in shell_addrs: | |
|
166 | # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) | |
|
167 | # stream.setsockopt(zmq.IDENTITY, identity) | |
|
168 | # stream.connect(disambiguate_url(addr, self.location)) | |
|
169 | # shell_streams.append(stream) | |
|
170 | ||
|
171 | # Now use only one shell stream for mux and tasks | |
|
163 | # create Shell Connections (MUX, Task, etc.): | |
|
164 | shell_addrs = url('mux'), url('task') | |
|
165 | ||
|
166 | # Use only one shell stream for mux and tasks | |
|
172 | 167 | stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
173 | 168 | stream.setsockopt(zmq.IDENTITY, identity) |
|
174 | 169 | shell_streams = [stream] |
|
175 | 170 | for addr in shell_addrs: |
|
176 | 171 | connect(stream, addr) |
|
177 | # end single stream-socket | |
|
178 | 172 | |
|
179 | 173 | # control stream: |
|
180 |
control_addr = |
|
|
174 | control_addr = url('control') | |
|
181 | 175 | control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
182 | 176 | control_stream.setsockopt(zmq.IDENTITY, identity) |
|
183 | 177 | connect(control_stream, control_addr) |
|
184 | 178 | |
|
185 | 179 | # create iopub stream: |
|
186 |
iopub_addr = |
|
|
180 | iopub_addr = url('iopub') | |
|
187 | 181 | iopub_socket = ctx.socket(zmq.PUB) |
|
188 | 182 | iopub_socket.setsockopt(zmq.IDENTITY, identity) |
|
189 | 183 | connect(iopub_socket, iopub_addr) |
@@ -257,9 +257,11 class Session(Configurable): | |||
|
257 | 257 | if new.lower() == 'json': |
|
258 | 258 | self.pack = json_packer |
|
259 | 259 | self.unpack = json_unpacker |
|
260 | self.unpacker = new | |
|
260 | 261 | elif new.lower() == 'pickle': |
|
261 | 262 | self.pack = pickle_packer |
|
262 | 263 | self.unpack = pickle_unpacker |
|
264 | self.unpacker = new | |
|
263 | 265 | else: |
|
264 | 266 | self.pack = import_item(str(new)) |
|
265 | 267 | |
@@ -270,9 +272,11 class Session(Configurable): | |||
|
270 | 272 | if new.lower() == 'json': |
|
271 | 273 | self.pack = json_packer |
|
272 | 274 | self.unpack = json_unpacker |
|
275 | self.packer = new | |
|
273 | 276 | elif new.lower() == 'pickle': |
|
274 | 277 | self.pack = pickle_packer |
|
275 | 278 | self.unpack = pickle_unpacker |
|
279 | self.packer = new | |
|
276 | 280 | else: |
|
277 | 281 | self.unpack = import_item(str(new)) |
|
278 | 282 |
@@ -43,9 +43,7 monitor the survival of the Engine process. | |||
|
43 | 43 | Message type: ``registration_request``:: |
|
44 | 44 | |
|
45 | 45 | content = { |
|
46 |
' |
|
|
47 | 'control' : 'abcd-1234-...', # the control queue zmq.IDENTITY | |
|
48 | 'heartbeat' : 'abcd-1234-...' # the heartbeat zmq.IDENTITY | |
|
46 | 'uuid' : 'abcd-1234-...', # the zmq.IDENTITY of the engine's sockets | |
|
49 | 47 | } |
|
50 | 48 | |
|
51 | 49 | .. note:: |
@@ -63,10 +61,6 Message type: ``registration_reply``:: | |||
|
63 | 61 | 'status' : 'ok', # or 'error' |
|
64 | 62 | # if ok: |
|
65 | 63 | 'id' : 0, # int, the engine id |
|
66 | 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue | |
|
67 | 'control' : 'tcp://...', # addr for control queue | |
|
68 | 'heartbeat' : ('tcp://...','tcp://...'), # tuple containing two interfaces needed for heartbeat | |
|
69 | 'task' : 'tcp://...', # addr for task queue, or None if no task queue running | |
|
70 | 64 | } |
|
71 | 65 | |
|
72 | 66 | Clients use the same socket as engines to start their connections. Connection requests |
@@ -84,11 +78,6 Message type: ``connection_reply``:: | |||
|
84 | 78 | |
|
85 | 79 | content = { |
|
86 | 80 | 'status' : 'ok', # or 'error' |
|
87 | # if ok: | |
|
88 | 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue | |
|
89 | 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple) | |
|
90 | 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc. | |
|
91 | 'control' : 'tcp...', # addr for control methods, like abort, etc. | |
|
92 | 81 | } |
|
93 | 82 | |
|
94 | 83 | Heartbeat |
@@ -110,13 +99,14 Message type: ``registration_notification``:: | |||
|
110 | 99 | |
|
111 | 100 | content = { |
|
112 | 101 | 'id' : 0, # engine ID that has been registered |
|
113 |
' |
|
|
102 | 'uuid' : 'engine_id' # the IDENT for the engine's sockets | |
|
114 | 103 | } |
|
115 | 104 | |
|
116 | 105 | Message type : ``unregistration_notification``:: |
|
117 | 106 | |
|
118 | 107 | content = { |
|
119 | 108 | 'id' : 0 # engine ID that has been unregistered |
|
109 | 'uuid' : 'engine_id' # the IDENT for the engine's sockets | |
|
120 | 110 | } |
|
121 | 111 | |
|
122 | 112 |
General Comments 0
You need to be logged in to leave comments.
Login now