##// END OF EJS Templates
Merge pull request #1471 from minrk/connection...
Fernando Perez -
r7962:bff463b5 merge
parent child Browse files
Show More
@@ -116,7 +116,10 flags.update({
116 select one of the true db backends.
116 select one of the true db backends.
117 """),
117 """),
118 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
118 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
119 'reuse existing json connection files')
119 'reuse existing json connection files'),
120 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
121 'Attempt to restore engines from a JSON file. '
122 'For use when resuming a crashed controller'),
120 })
123 })
121
124
122 flags.update(session_flags)
125 flags.update(session_flags)
@@ -156,6 +159,10 class IPControllerApp(BaseParallelApplication):
156 If False, connection files will be removed on a clean exit.
159 If False, connection files will be removed on a clean exit.
157 """
160 """
158 )
161 )
162 restore_engines = Bool(False, config=True,
163 help="""Reload engine state from JSON file
164 """
165 )
159 ssh_server = Unicode(u'', config=True,
166 ssh_server = Unicode(u'', config=True,
160 help="""ssh url for clients to use when connecting to the Controller
167 help="""ssh url for clients to use when connecting to the Controller
161 processes. It should be of the form: [user@]server[:port]. The
168 processes. It should be of the form: [user@]server[:port]. The
@@ -209,15 +216,11 class IPControllerApp(BaseParallelApplication):
209 def save_connection_dict(self, fname, cdict):
216 def save_connection_dict(self, fname, cdict):
210 """save a connection dict to json file."""
217 """save a connection dict to json file."""
211 c = self.config
218 c = self.config
212 url = cdict['url']
219 url = cdict['registration']
213 location = cdict['location']
220 location = cdict['location']
221
214 if not location:
222 if not location:
215 try:
223 try:
216 proto,ip,port = split_url(url)
217 except AssertionError:
218 pass
219 else:
220 try:
221 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
224 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
222 except (socket.gaierror, IndexError):
225 except (socket.gaierror, IndexError):
223 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
226 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
@@ -235,35 +238,51 class IPControllerApp(BaseParallelApplication):
235 """load config from existing json connector files."""
238 """load config from existing json connector files."""
236 c = self.config
239 c = self.config
237 self.log.debug("loading config from JSON")
240 self.log.debug("loading config from JSON")
238 # load from engine config
241
242 # load engine config
243
239 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
244 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
240 self.log.info("loading connection info from %s", fname)
245 self.log.info("loading connection info from %s", fname)
241 with open(fname) as f:
246 with open(fname) as f:
242 cfg = json.loads(f.read())
247 ecfg = json.loads(f.read())
243 key = cfg['exec_key']
248
244 # json gives unicode, Session.key wants bytes
249 # json gives unicode, Session.key wants bytes
245 c.Session.key = key.encode('ascii')
250 c.Session.key = ecfg['exec_key'].encode('ascii')
246 xport,addr = cfg['url'].split('://')
251
247 c.HubFactory.engine_transport = xport
252 xport,ip = ecfg['interface'].split('://')
248 ip,ports = addr.split(':')
253
249 c.HubFactory.engine_ip = ip
254 c.HubFactory.engine_ip = ip
250 c.HubFactory.regport = int(ports)
255 c.HubFactory.engine_transport = xport
251 self.location = cfg['location']
256
257 self.location = ecfg['location']
252 if not self.engine_ssh_server:
258 if not self.engine_ssh_server:
253 self.engine_ssh_server = cfg['ssh']
259 self.engine_ssh_server = ecfg['ssh']
260
254 # load client config
261 # load client config
262
255 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
263 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
256 self.log.info("loading connection info from %s", fname)
264 self.log.info("loading connection info from %s", fname)
257 with open(fname) as f:
265 with open(fname) as f:
258 cfg = json.loads(f.read())
266 ccfg = json.loads(f.read())
259 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
267
260 xport,addr = cfg['url'].split('://')
268 for key in ('exec_key', 'registration', 'pack', 'unpack'):
269 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
270
271 xport,addr = ccfg['interface'].split('://')
272
261 c.HubFactory.client_transport = xport
273 c.HubFactory.client_transport = xport
262 ip,ports = addr.split(':')
263 c.HubFactory.client_ip = ip
274 c.HubFactory.client_ip = ip
264 if not self.ssh_server:
275 if not self.ssh_server:
265 self.ssh_server = cfg['ssh']
276 self.ssh_server = ccfg['ssh']
266 assert int(ports) == c.HubFactory.regport, "regport mismatch"
277
278 # load port config:
279 c.HubFactory.regport = ecfg['registration']
280 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
281 c.HubFactory.control = (ccfg['control'], ecfg['control'])
282 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
283 c.HubFactory.task = (ccfg['task'], ecfg['task'])
284 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
285 c.HubFactory.notifier_port = ccfg['notification']
267
286
268 def cleanup_connection_files(self):
287 def cleanup_connection_files(self):
269 if self.reuse_files:
288 if self.reuse_files:
@@ -314,29 +333,42 class IPControllerApp(BaseParallelApplication):
314 if self.write_connection_files:
333 if self.write_connection_files:
315 # save to new json config files
334 # save to new json config files
316 f = self.factory
335 f = self.factory
317 cdict = {'exec_key' : f.session.key.decode('ascii'),
336 base = {
318 'ssh' : self.ssh_server,
337 'exec_key' : f.session.key.decode('ascii'),
319 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
338 'location' : self.location,
320 'location' : self.location
339 'pack' : f.session.packer,
340 'unpack' : f.session.unpacker,
321 }
341 }
342
343 cdict = {'ssh' : self.ssh_server}
344 cdict.update(f.client_info)
345 cdict.update(base)
322 self.save_connection_dict(self.client_json_file, cdict)
346 self.save_connection_dict(self.client_json_file, cdict)
323 edict = cdict
347
324 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
348 edict = {'ssh' : self.engine_ssh_server}
325 edict['ssh'] = self.engine_ssh_server
349 edict.update(f.engine_info)
350 edict.update(base)
326 self.save_connection_dict(self.engine_json_file, edict)
351 self.save_connection_dict(self.engine_json_file, edict)
327
352
353 fname = "engines%s.json" % self.cluster_id
354 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
355 if self.restore_engines:
356 self.factory.hub._load_engine_state()
357
328 def init_schedulers(self):
358 def init_schedulers(self):
329 children = self.children
359 children = self.children
330 mq = import_item(str(self.mq_class))
360 mq = import_item(str(self.mq_class))
331
361
332 hub = self.factory
362 f = self.factory
363 ident = f.session.bsession
333 # disambiguate url, in case of *
364 # disambiguate url, in case of *
334 monitor_url = disambiguate_url(hub.monitor_url)
365 monitor_url = disambiguate_url(f.monitor_url)
335 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
366 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
336 # IOPub relay (in a Process)
367 # IOPub relay (in a Process)
337 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
368 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
338 q.bind_in(hub.client_info['iopub'])
369 q.bind_in(f.client_url('iopub'))
339 q.bind_out(hub.engine_info['iopub'])
370 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
371 q.bind_out(f.engine_url('iopub'))
340 q.setsockopt_out(zmq.SUBSCRIBE, b'')
372 q.setsockopt_out(zmq.SUBSCRIBE, b'')
341 q.connect_mon(monitor_url)
373 q.connect_mon(monitor_url)
342 q.daemon=True
374 q.daemon=True
@@ -344,18 +376,20 class IPControllerApp(BaseParallelApplication):
344
376
345 # Multiplexer Queue (in a Process)
377 # Multiplexer Queue (in a Process)
346 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
378 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
347 q.bind_in(hub.client_info['mux'])
379 q.bind_in(f.client_url('mux'))
348 q.setsockopt_in(zmq.IDENTITY, b'mux')
380 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
349 q.bind_out(hub.engine_info['mux'])
381 q.bind_out(f.engine_url('mux'))
382 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
350 q.connect_mon(monitor_url)
383 q.connect_mon(monitor_url)
351 q.daemon=True
384 q.daemon=True
352 children.append(q)
385 children.append(q)
353
386
354 # Control Queue (in a Process)
387 # Control Queue (in a Process)
355 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
388 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
356 q.bind_in(hub.client_info['control'])
389 q.bind_in(f.client_url('control'))
357 q.setsockopt_in(zmq.IDENTITY, b'control')
390 q.setsockopt_in(zmq.IDENTITY, b'control_in')
358 q.bind_out(hub.engine_info['control'])
391 q.bind_out(f.engine_url('control'))
392 q.setsockopt_out(zmq.IDENTITY, b'control_out')
359 q.connect_mon(monitor_url)
393 q.connect_mon(monitor_url)
360 q.daemon=True
394 q.daemon=True
361 children.append(q)
395 children.append(q)
@@ -368,9 +402,10 class IPControllerApp(BaseParallelApplication):
368 self.log.warn("task::using pure DEALER Task scheduler")
402 self.log.warn("task::using pure DEALER Task scheduler")
369 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
403 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
370 # q.setsockopt_out(zmq.HWM, hub.hwm)
404 # q.setsockopt_out(zmq.HWM, hub.hwm)
371 q.bind_in(hub.client_info['task'][1])
405 q.bind_in(f.client_url('task'))
372 q.setsockopt_in(zmq.IDENTITY, b'task')
406 q.setsockopt_in(zmq.IDENTITY, b'task_in')
373 q.bind_out(hub.engine_info['task'])
407 q.bind_out(f.engine_url('task'))
408 q.setsockopt_out(zmq.IDENTITY, b'task_out')
374 q.connect_mon(monitor_url)
409 q.connect_mon(monitor_url)
375 q.daemon=True
410 q.daemon=True
376 children.append(q)
411 children.append(q)
@@ -379,8 +414,10 class IPControllerApp(BaseParallelApplication):
379
414
380 else:
415 else:
381 self.log.info("task::using Python %s Task scheduler"%scheme)
416 self.log.info("task::using Python %s Task scheduler"%scheme)
382 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
417 sargs = (f.client_url('task'), f.engine_url('task'),
383 monitor_url, disambiguate_url(hub.client_info['notification']))
418 monitor_url, disambiguate_url(f.client_url('notification')),
419 disambiguate_url(f.client_url('registration')),
420 )
384 kwargs = dict(logname='scheduler', loglevel=self.log_level,
421 kwargs = dict(logname='scheduler', loglevel=self.log_level,
385 log_url = self.log_url, config=dict(self.config))
422 log_url = self.log_url, config=dict(self.config))
386 if 'Process' in self.mq_class:
423 if 'Process' in self.mq_class:
@@ -45,7 +45,7 from IPython.zmq.session import (
45 from IPython.config.configurable import Configurable
45 from IPython.config.configurable import Configurable
46
46
47 from IPython.parallel.engine.engine import EngineFactory
47 from IPython.parallel.engine.engine import EngineFactory
48 from IPython.parallel.util import disambiguate_url
48 from IPython.parallel.util import disambiguate_ip_address
49
49
50 from IPython.utils.importstring import import_item
50 from IPython.utils.importstring import import_item
51 from IPython.utils.py3compat import cast_bytes
51 from IPython.utils.py3compat import cast_bytes
@@ -211,24 +211,36 class IPEngineApp(BaseParallelApplication):
211 with open(self.url_file) as f:
211 with open(self.url_file) as f:
212 d = json.loads(f.read())
212 d = json.loads(f.read())
213
213
214 if 'exec_key' in d:
214 # allow hand-override of location for disambiguation
215 config.Session.key = cast_bytes(d['exec_key'])
215 # and ssh-server
216
217 try:
216 try:
218 config.EngineFactory.location
217 config.EngineFactory.location
219 except AttributeError:
218 except AttributeError:
220 config.EngineFactory.location = d['location']
219 config.EngineFactory.location = d['location']
221
220
222 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
223 try:
224 config.EngineFactory.url
225 except AttributeError:
226 config.EngineFactory.url = d['url']
227
228 try:
221 try:
229 config.EngineFactory.sshserver
222 config.EngineFactory.sshserver
230 except AttributeError:
223 except AttributeError:
231 config.EngineFactory.sshserver = d['ssh']
224 config.EngineFactory.sshserver = d.get('ssh')
225
226 location = config.EngineFactory.location
227
228 proto, ip = d['interface'].split('://')
229 ip = disambiguate_ip_address(ip)
230 d['interface'] = '%s://%s' % (proto, ip)
231
232 # DO NOT allow override of basic URLs, serialization, or exec_key
233 # JSON file takes top priority there
234 config.Session.key = cast_bytes(d['exec_key'])
235
236 config.EngineFactory.url = d['interface'] + ':%i' % d['registration']
237
238 config.Session.packer = d['pack']
239 config.Session.unpacker = d['unpack']
240
241 self.log.debug("Config changed:")
242 self.log.debug("%r", config)
243 self.connection_info = d
232
244
233 def bind_kernel(self, **kwargs):
245 def bind_kernel(self, **kwargs):
234 """Promote engine to listening kernel, accessible to frontends."""
246 """Promote engine to listening kernel, accessible to frontends."""
@@ -320,7 +332,9 class IPEngineApp(BaseParallelApplication):
320 # shell_class = import_item(self.master_config.Global.shell_class)
332 # shell_class = import_item(self.master_config.Global.shell_class)
321 # print self.config
333 # print self.config
322 try:
334 try:
323 self.engine = EngineFactory(config=config, log=self.log)
335 self.engine = EngineFactory(config=config, log=self.log,
336 connection_info=self.connection_info,
337 )
324 except:
338 except:
325 self.log.error("Couldn't start the Engine", exc_info=True)
339 self.log.error("Couldn't start the Engine", exc_info=True)
326 self.exit(1)
340 self.exit(1)
@@ -217,7 +217,9 class Client(HasTraits):
217 Parameters
217 Parameters
218 ----------
218 ----------
219
219
220 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
220 url_file : str/unicode; path to ipcontroller-client.json
221 This JSON file should contain all the information needed to connect to a cluster,
222 and is likely the only argument needed.
221 Connection information for the Hub's registration. If a json connector
223 Connection information for the Hub's registration. If a json connector
222 file is given, then likely no further configuration is necessary.
224 file is given, then likely no further configuration is necessary.
223 [Default: use profile]
225 [Default: use profile]
@@ -239,14 +241,6 class Client(HasTraits):
239 If specified, this will be relayed to the Session for configuration
241 If specified, this will be relayed to the Session for configuration
240 username : str
242 username : str
241 set username for the session object
243 set username for the session object
242 packer : str (import_string) or callable
243 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
244 function to serialize messages. Must support same input as
245 JSON, and output must be bytes.
246 You can pass a callable directly as `pack`
247 unpacker : str (import_string) or callable
248 The inverse of packer. Only necessary if packer is specified as *not* one
249 of 'json' or 'pickle'.
250
244
251 #-------------- ssh related args ----------------
245 #-------------- ssh related args ----------------
252 # These are args for configuring the ssh tunnel to be used
246 # These are args for configuring the ssh tunnel to be used
@@ -271,17 +265,6 class Client(HasTraits):
271 flag for whether to use paramiko instead of shell ssh for tunneling.
265 flag for whether to use paramiko instead of shell ssh for tunneling.
272 [default: True on win32, False else]
266 [default: True on win32, False else]
273
267
274 ------- exec authentication args -------
275 If even localhost is untrusted, you can have some protection against
276 unauthorized execution by signing messages with HMAC digests.
277 Messages are still sent as cleartext, so if someone can snoop your
278 loopback traffic this will not protect your privacy, but will prevent
279 unauthorized execution.
280
281 exec_key : str
282 an authentication key or file containing a key
283 default: None
284
285
268
286 Attributes
269 Attributes
287 ----------
270 ----------
@@ -378,8 +361,8 class Client(HasTraits):
378 # don't raise on positional args
361 # don't raise on positional args
379 return HasTraits.__new__(self, **kw)
362 return HasTraits.__new__(self, **kw)
380
363
381 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
364 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
382 context=None, debug=False, exec_key=None,
365 context=None, debug=False,
383 sshserver=None, sshkey=None, password=None, paramiko=None,
366 sshserver=None, sshkey=None, password=None, paramiko=None,
384 timeout=10, **extra_args
367 timeout=10, **extra_args
385 ):
368 ):
@@ -392,38 +375,45 class Client(HasTraits):
392 self._context = context
375 self._context = context
393 self._stop_spinning = Event()
376 self._stop_spinning = Event()
394
377
378 if 'url_or_file' in extra_args:
379 url_file = extra_args['url_or_file']
380 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
381
382 if url_file and util.is_url(url_file):
383 raise ValueError("single urls cannot be specified, url-files must be used.")
384
395 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
385 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
386
396 if self._cd is not None:
387 if self._cd is not None:
397 if url_or_file is None:
388 if url_file is None:
398 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
389 url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
399 if url_or_file is None:
390 if url_file is None:
400 raise ValueError(
391 raise ValueError(
401 "I can't find enough information to connect to a hub!"
392 "I can't find enough information to connect to a hub!"
402 " Please specify at least one of url_or_file or profile."
393 " Please specify at least one of url_file or profile."
403 )
394 )
404
395
405 if not util.is_url(url_or_file):
396 with open(url_file) as f:
406 # it's not a url, try for a file
397 cfg = json.load(f)
407 if not os.path.exists(url_or_file):
398
408 if self._cd:
399 self._task_scheme = cfg['task_scheme']
409 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
410 if not os.path.exists(url_or_file):
411 raise IOError("Connection file not found: %r" % url_or_file)
412 with open(url_or_file) as f:
413 cfg = json.loads(f.read())
414 else:
415 cfg = {'url':url_or_file}
416
400
417 # sync defaults from args, json:
401 # sync defaults from args, json:
418 if sshserver:
402 if sshserver:
419 cfg['ssh'] = sshserver
403 cfg['ssh'] = sshserver
420 if exec_key:
404
421 cfg['exec_key'] = exec_key
422 exec_key = cfg['exec_key']
423 location = cfg.setdefault('location', None)
405 location = cfg.setdefault('location', None)
424 cfg['url'] = util.disambiguate_url(cfg['url'], location)
406
425 url = cfg['url']
407 proto,addr = cfg['interface'].split('://')
426 proto,addr,port = util.split_url(url)
408 addr = util.disambiguate_ip_address(addr)
409 cfg['interface'] = "%s://%s" % (proto, addr)
410
411 # turn interface,port into full urls:
412 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
413 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
414
415 url = cfg['registration']
416
427 if location is not None and addr == '127.0.0.1':
417 if location is not None and addr == '127.0.0.1':
428 # location specified, and connection is expected to be local
418 # location specified, and connection is expected to be local
429 if location not in LOCAL_IPS and not sshserver:
419 if location not in LOCAL_IPS and not sshserver:
@@ -448,7 +438,7 class Client(HasTraits):
448 self._ssh = bool(sshserver or sshkey or password)
438 self._ssh = bool(sshserver or sshkey or password)
449 if self._ssh and sshserver is None:
439 if self._ssh and sshserver is None:
450 # default to ssh via localhost
440 # default to ssh via localhost
451 sshserver = url.split('://')[1].split(':')[0]
441 sshserver = addr
452 if self._ssh and password is None:
442 if self._ssh and password is None:
453 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
443 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
454 password=False
444 password=False
@@ -457,20 +447,18 class Client(HasTraits):
457 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
447 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
458
448
459 # configure and construct the session
449 # configure and construct the session
460 if exec_key is not None:
450 extra_args['packer'] = cfg['pack']
461 if os.path.isfile(exec_key):
451 extra_args['unpacker'] = cfg['unpack']
462 extra_args['keyfile'] = exec_key
452 extra_args['key'] = cast_bytes(cfg['exec_key'])
463 else:
453
464 exec_key = cast_bytes(exec_key)
465 extra_args['key'] = exec_key
466 self.session = Session(**extra_args)
454 self.session = Session(**extra_args)
467
455
468 self._query_socket = self._context.socket(zmq.DEALER)
456 self._query_socket = self._context.socket(zmq.DEALER)
469 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
457
470 if self._ssh:
458 if self._ssh:
471 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
459 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
472 else:
460 else:
473 self._query_socket.connect(url)
461 self._query_socket.connect(cfg['registration'])
474
462
475 self.session.debug = self.debug
463 self.session.debug = self.debug
476
464
@@ -520,8 +508,9 class Client(HasTraits):
520 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
508 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
521 for k,v in engines.iteritems():
509 for k,v in engines.iteritems():
522 eid = int(k)
510 eid = int(k)
523 self._engines[eid] = v
511 if eid not in self._engines:
524 self._ids.append(eid)
512 self._ids.append(eid)
513 self._engines[eid] = v
525 self._ids = sorted(self._ids)
514 self._ids = sorted(self._ids)
526 if sorted(self._engines.keys()) != range(len(self._engines)) and \
515 if sorted(self._engines.keys()) != range(len(self._engines)) and \
527 self._task_scheme == 'pure' and self._task_socket:
516 self._task_scheme == 'pure' and self._task_socket:
@@ -583,7 +572,7 class Client(HasTraits):
583 self._connected=True
572 self._connected=True
584
573
585 def connect_socket(s, url):
574 def connect_socket(s, url):
586 url = util.disambiguate_url(url, self._config['location'])
575 # url = util.disambiguate_url(url, self._config['location'])
587 if self._ssh:
576 if self._ssh:
588 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
577 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
589 else:
578 else:
@@ -600,38 +589,28 class Client(HasTraits):
600 idents,msg = self.session.recv(self._query_socket,mode=0)
589 idents,msg = self.session.recv(self._query_socket,mode=0)
601 if self.debug:
590 if self.debug:
602 pprint(msg)
591 pprint(msg)
603 msg = Message(msg)
592 content = msg['content']
604 content = msg.content
593 # self._config['registration'] = dict(content)
605 self._config['registration'] = dict(content)
594 cfg = self._config
606 if content.status == 'ok':
595 if content['status'] == 'ok':
607 ident = self.session.bsession
608 if content.mux:
609 self._mux_socket = self._context.socket(zmq.DEALER)
596 self._mux_socket = self._context.socket(zmq.DEALER)
610 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
597 connect_socket(self._mux_socket, cfg['mux'])
611 connect_socket(self._mux_socket, content.mux)
598
612 if content.task:
613 self._task_scheme, task_addr = content.task
614 self._task_socket = self._context.socket(zmq.DEALER)
599 self._task_socket = self._context.socket(zmq.DEALER)
615 self._task_socket.setsockopt(zmq.IDENTITY, ident)
600 connect_socket(self._task_socket, cfg['task'])
616 connect_socket(self._task_socket, task_addr)
601
617 if content.notification:
618 self._notification_socket = self._context.socket(zmq.SUB)
602 self._notification_socket = self._context.socket(zmq.SUB)
619 connect_socket(self._notification_socket, content.notification)
620 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
603 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
621 # if content.query:
604 connect_socket(self._notification_socket, cfg['notification'])
622 # self._query_socket = self._context.socket(zmq.DEALER)
605
623 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
624 # connect_socket(self._query_socket, content.query)
625 if content.control:
626 self._control_socket = self._context.socket(zmq.DEALER)
606 self._control_socket = self._context.socket(zmq.DEALER)
627 self._control_socket.setsockopt(zmq.IDENTITY, ident)
607 connect_socket(self._control_socket, cfg['control'])
628 connect_socket(self._control_socket, content.control)
608
629 if content.iopub:
630 self._iopub_socket = self._context.socket(zmq.SUB)
609 self._iopub_socket = self._context.socket(zmq.SUB)
631 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
610 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
632 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
611 connect_socket(self._iopub_socket, cfg['iopub'])
633 connect_socket(self._iopub_socket, content.iopub)
612
634 self._update_engines(dict(content.engines))
613 self._update_engines(dict(content['engines']))
635 else:
614 else:
636 self._connected = False
615 self._connected = False
637 raise Exception("Failed to connect!")
616 raise Exception("Failed to connect!")
@@ -674,7 +653,7 class Client(HasTraits):
674 """Register a new engine, and update our connection info."""
653 """Register a new engine, and update our connection info."""
675 content = msg['content']
654 content = msg['content']
676 eid = content['id']
655 eid = content['id']
677 d = {eid : content['queue']}
656 d = {eid : content['uuid']}
678 self._update_engines(d)
657 self._update_engines(d)
679
658
680 def _unregister_engine(self, msg):
659 def _unregister_engine(self, msg):
@@ -18,6 +18,8 Authors:
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import json
22 import os
21 import sys
23 import sys
22 import time
24 import time
23 from datetime import datetime
25 from datetime import datetime
@@ -107,17 +109,16 class EngineConnector(HasTraits):
107 """A simple object for accessing the various zmq connections of an object.
109 """A simple object for accessing the various zmq connections of an object.
108 Attributes are:
110 Attributes are:
109 id (int): engine ID
111 id (int): engine ID
110 uuid (str): uuid (unused?)
112 uuid (unicode): engine UUID
111 queue (str): identity of queue's DEALER socket
113 pending: set of msg_ids
112 registration (str): identity of registration DEALER socket
114 stallback: DelayedCallback for stalled registration
113 heartbeat (str): identity of heartbeat DEALER socket
114 """
115 """
116
115 id=Integer(0)
117 id = Integer(0)
116 queue=CBytes()
118 uuid = Unicode()
117 control=CBytes()
118 registration=CBytes()
119 heartbeat=CBytes()
120 pending=Set()
119 pending = Set()
120 stallback = Instance(ioloop.DelayedCallback)
121
121
122
122 _db_shortcuts = {
123 _db_shortcuts = {
123 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
124 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
@@ -131,29 +132,29 class HubFactory(RegistrationFactory):
131
132
132 # port-pairs for monitoredqueues:
133 # port-pairs for monitoredqueues:
133 hb = Tuple(Integer,Integer,config=True,
134 hb = Tuple(Integer,Integer,config=True,
134 help="""DEALER/SUB Port pair for Engine heartbeats""")
135 help="""PUB/ROUTER Port pair for Engine heartbeats""")
135 def _hb_default(self):
136 def _hb_default(self):
136 return tuple(util.select_random_ports(2))
137 return tuple(util.select_random_ports(2))
137
138
138 mux = Tuple(Integer,Integer,config=True,
139 mux = Tuple(Integer,Integer,config=True,
139 help="""Engine/Client Port pair for MUX queue""")
140 help="""Client/Engine Port pair for MUX queue""")
140
141
141 def _mux_default(self):
142 def _mux_default(self):
142 return tuple(util.select_random_ports(2))
143 return tuple(util.select_random_ports(2))
143
144
144 task = Tuple(Integer,Integer,config=True,
145 task = Tuple(Integer,Integer,config=True,
145 help="""Engine/Client Port pair for Task queue""")
146 help="""Client/Engine Port pair for Task queue""")
146 def _task_default(self):
147 def _task_default(self):
147 return tuple(util.select_random_ports(2))
148 return tuple(util.select_random_ports(2))
148
149
149 control = Tuple(Integer,Integer,config=True,
150 control = Tuple(Integer,Integer,config=True,
150 help="""Engine/Client Port pair for Control queue""")
151 help="""Client/Engine Port pair for Control queue""")
151
152
152 def _control_default(self):
153 def _control_default(self):
153 return tuple(util.select_random_ports(2))
154 return tuple(util.select_random_ports(2))
154
155
155 iopub = Tuple(Integer,Integer,config=True,
156 iopub = Tuple(Integer,Integer,config=True,
156 help="""Engine/Client Port pair for IOPub relay""")
157 help="""Client/Engine Port pair for IOPub relay""")
157
158
158 def _iopub_default(self):
159 def _iopub_default(self):
159 return tuple(util.select_random_ports(2))
160 return tuple(util.select_random_ports(2))
@@ -231,38 +232,77 class HubFactory(RegistrationFactory):
231 self.heartmonitor.start()
232 self.heartmonitor.start()
232 self.log.info("Heartmonitor started")
233 self.log.info("Heartmonitor started")
233
234
235 def client_url(self, channel):
236 """return full zmq url for a named client channel"""
237 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
238
239 def engine_url(self, channel):
240 """return full zmq url for a named engine channel"""
241 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
242
234 def init_hub(self):
243 def init_hub(self):
235 """construct"""
244 """construct Hub object"""
236 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
237 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
238
245
239 ctx = self.context
246 ctx = self.context
240 loop = self.loop
247 loop = self.loop
241
248
249 try:
250 scheme = self.config.TaskScheduler.scheme_name
251 except AttributeError:
252 from .scheduler import TaskScheduler
253 scheme = TaskScheduler.scheme_name.get_default_value()
254
255 # build connection dicts
256 engine = self.engine_info = {
257 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
258 'registration' : self.regport,
259 'control' : self.control[1],
260 'mux' : self.mux[1],
261 'hb_ping' : self.hb[0],
262 'hb_pong' : self.hb[1],
263 'task' : self.task[1],
264 'iopub' : self.iopub[1],
265 }
266
267 client = self.client_info = {
268 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
269 'registration' : self.regport,
270 'control' : self.control[0],
271 'mux' : self.mux[0],
272 'task' : self.task[0],
273 'task_scheme' : scheme,
274 'iopub' : self.iopub[0],
275 'notification' : self.notifier_port,
276 }
277
278 self.log.debug("Hub engine addrs: %s", self.engine_info)
279 self.log.debug("Hub client addrs: %s", self.client_info)
280
242 # Registrar socket
281 # Registrar socket
243 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
282 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
244 q.bind(client_iface % self.regport)
283 q.bind(self.client_url('registration'))
245 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
284 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
246 if self.client_ip != self.engine_ip:
285 if self.client_ip != self.engine_ip:
247 q.bind(engine_iface % self.regport)
286 q.bind(self.engine_url('registration'))
248 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
287 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
249
288
250 ### Engine connections ###
289 ### Engine connections ###
251
290
252 # heartbeat
291 # heartbeat
253 hpub = ctx.socket(zmq.PUB)
292 hpub = ctx.socket(zmq.PUB)
254 hpub.bind(engine_iface % self.hb[0])
293 hpub.bind(self.engine_url('hb_ping'))
255 hrep = ctx.socket(zmq.ROUTER)
294 hrep = ctx.socket(zmq.ROUTER)
256 hrep.bind(engine_iface % self.hb[1])
295 hrep.bind(self.engine_url('hb_pong'))
257 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
296 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
258 pingstream=ZMQStream(hpub,loop),
297 pingstream=ZMQStream(hpub,loop),
259 pongstream=ZMQStream(hrep,loop)
298 pongstream=ZMQStream(hrep,loop)
260 )
299 )
261
300
262 ### Client connections ###
301 ### Client connections ###
302
263 # Notifier socket
303 # Notifier socket
264 n = ZMQStream(ctx.socket(zmq.PUB), loop)
304 n = ZMQStream(ctx.socket(zmq.PUB), loop)
265 n.bind(client_iface%self.notifier_port)
305 n.bind(self.client_url('notification'))
266
306
267 ### build and launch the queues ###
307 ### build and launch the queues ###
268
308
@@ -279,35 +319,10 class HubFactory(RegistrationFactory):
279 self.db = import_item(str(db_class))(session=self.session.session,
319 self.db = import_item(str(db_class))(session=self.session.session,
280 config=self.config, log=self.log)
320 config=self.config, log=self.log)
281 time.sleep(.25)
321 time.sleep(.25)
282 try:
283 scheme = self.config.TaskScheduler.scheme_name
284 except AttributeError:
285 from .scheduler import TaskScheduler
286 scheme = TaskScheduler.scheme_name.get_default_value()
287 # build connection dicts
288 self.engine_info = {
289 'control' : engine_iface%self.control[1],
290 'mux': engine_iface%self.mux[1],
291 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
292 'task' : engine_iface%self.task[1],
293 'iopub' : engine_iface%self.iopub[1],
294 # 'monitor' : engine_iface%self.mon_port,
295 }
296
297 self.client_info = {
298 'control' : client_iface%self.control[0],
299 'mux': client_iface%self.mux[0],
300 'task' : (scheme, client_iface%self.task[0]),
301 'iopub' : client_iface%self.iopub[0],
302 'notification': client_iface%self.notifier_port
303 }
304 self.log.debug("Hub engine addrs: %s", self.engine_info)
305 self.log.debug("Hub client addrs: %s", self.client_info)
306
322
307 # resubmit stream
323 # resubmit stream
308 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
324 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
309 url = util.disambiguate_url(self.client_info['task'][-1])
325 url = util.disambiguate_url(self.client_url('task'))
310 r.setsockopt(zmq.IDENTITY, self.session.bsession)
311 r.connect(url)
326 r.connect(url)
312
327
313 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
328 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
@@ -335,6 +350,9 class Hub(SessionFactory):
335 client_info: dict of zmq connection information for engines to connect
350 client_info: dict of zmq connection information for engines to connect
336 to the queues.
351 to the queues.
337 """
352 """
353
354 engine_state_file = Unicode()
355
338 # internal data structures:
356 # internal data structures:
339 ids=Set() # engine IDs
357 ids=Set() # engine IDs
340 keytable=Dict()
358 keytable=Dict()
@@ -382,15 +400,6 class Hub(SessionFactory):
382 super(Hub, self).__init__(**kwargs)
400 super(Hub, self).__init__(**kwargs)
383 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
401 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
384
402
385 # validate connection dicts:
386 for k,v in self.client_info.iteritems():
387 if k == 'task':
388 util.validate_url_container(v[1])
389 else:
390 util.validate_url_container(v)
391 # util.validate_url_container(self.client_info)
392 util.validate_url_container(self.engine_info)
393
394 # register our callbacks
403 # register our callbacks
395 self.query.on_recv(self.dispatch_query)
404 self.query.on_recv(self.dispatch_query)
396 self.monitor.on_recv(self.dispatch_monitor_traffic)
405 self.monitor.on_recv(self.dispatch_monitor_traffic)
@@ -556,11 +565,11 class Hub(SessionFactory):
556 triggers unregistration"""
565 triggers unregistration"""
557 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
566 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
558 eid = self.hearts.get(heart, None)
567 eid = self.hearts.get(heart, None)
559 queue = self.engines[eid].queue
568 uuid = self.engines[eid].uuid
560 if eid is None or self.keytable[eid] in self.dead_engines:
569 if eid is None or self.keytable[eid] in self.dead_engines:
561 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
570 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
562 else:
571 else:
563 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
572 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
564
573
565 #----------------------- MUX Queue Traffic ------------------------------
574 #----------------------- MUX Queue Traffic ------------------------------
566
575
@@ -585,7 +594,7 class Hub(SessionFactory):
585 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
594 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
586 # Unicode in records
595 # Unicode in records
587 record['engine_uuid'] = queue_id.decode('ascii')
596 record['engine_uuid'] = queue_id.decode('ascii')
588 record['client_uuid'] = client_id.decode('ascii')
597 record['client_uuid'] = msg['header']['session']
589 record['queue'] = 'mux'
598 record['queue'] = 'mux'
590
599
591 try:
600 try:
@@ -677,7 +686,7 class Hub(SessionFactory):
677 return
686 return
678 record = init_record(msg)
687 record = init_record(msg)
679
688
680 record['client_uuid'] = client_id.decode('ascii')
689 record['client_uuid'] = msg['header']['session']
681 record['queue'] = 'task'
690 record['queue'] = 'task'
682 header = msg['header']
691 header = msg['header']
683 msg_id = header['msg_id']
692 msg_id = header['msg_id']
@@ -865,11 +874,10 class Hub(SessionFactory):
865 """Reply with connection addresses for clients."""
874 """Reply with connection addresses for clients."""
866 self.log.info("client::client %r connected", client_id)
875 self.log.info("client::client %r connected", client_id)
867 content = dict(status='ok')
876 content = dict(status='ok')
868 content.update(self.client_info)
869 jsonable = {}
877 jsonable = {}
870 for k,v in self.keytable.iteritems():
878 for k,v in self.keytable.iteritems():
871 if v not in self.dead_engines:
879 if v not in self.dead_engines:
872 jsonable[str(k)] = v.decode('ascii')
880 jsonable[str(k)] = v
873 content['engines'] = jsonable
881 content['engines'] = jsonable
874 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
882 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
875
883
@@ -877,48 +885,37 class Hub(SessionFactory):
877 """Register a new engine."""
885 """Register a new engine."""
878 content = msg['content']
886 content = msg['content']
879 try:
887 try:
880 queue = cast_bytes(content['queue'])
888 uuid = content['uuid']
881 except KeyError:
889 except KeyError:
882 self.log.error("registration::queue not specified", exc_info=True)
890 self.log.error("registration::queue not specified", exc_info=True)
883 return
891 return
884 heart = content.get('heartbeat', None)
892
885 if heart:
886 heart = cast_bytes(heart)
887 """register a new engine, and create the socket(s) necessary"""
888 eid = self._next_id
893 eid = self._next_id
889 # print (eid, queue, reg, heart)
890
894
891 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
895 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
892
896
893 content = dict(id=eid,status='ok')
897 content = dict(id=eid,status='ok')
894 content.update(self.engine_info)
895 # check if requesting available IDs:
898 # check if requesting available IDs:
896 if queue in self.by_ident:
899 if cast_bytes(uuid) in self.by_ident:
897 try:
898 raise KeyError("queue_id %r in use" % queue)
899 except:
900 content = error.wrap_exception()
901 self.log.error("queue_id %r in use", queue, exc_info=True)
902 elif heart in self.hearts: # need to check unique hearts?
903 try:
900 try:
904 raise KeyError("heart_id %r in use" % heart)
901 raise KeyError("uuid %r in use" % uuid)
905 except:
902 except:
906 self.log.error("heart_id %r in use", heart, exc_info=True)
907 content = error.wrap_exception()
903 content = error.wrap_exception()
904 self.log.error("uuid %r in use", uuid, exc_info=True)
908 else:
905 else:
909 for h, pack in self.incoming_registrations.iteritems():
906 for h, ec in self.incoming_registrations.iteritems():
910 if heart == h:
907 if uuid == h:
911 try:
908 try:
912 raise KeyError("heart_id %r in use" % heart)
909 raise KeyError("heart_id %r in use" % uuid)
913 except:
910 except:
914 self.log.error("heart_id %r in use", heart, exc_info=True)
911 self.log.error("heart_id %r in use", uuid, exc_info=True)
915 content = error.wrap_exception()
912 content = error.wrap_exception()
916 break
913 break
917 elif queue == pack[1]:
914 elif uuid == ec.uuid:
918 try:
915 try:
919 raise KeyError("queue_id %r in use" % queue)
916 raise KeyError("uuid %r in use" % uuid)
920 except:
917 except:
921 self.log.error("queue_id %r in use", queue, exc_info=True)
918 self.log.error("uuid %r in use", uuid, exc_info=True)
922 content = error.wrap_exception()
919 content = error.wrap_exception()
923 break
920 break
924
921
@@ -926,18 +923,21 class Hub(SessionFactory):
926 content=content,
923 content=content,
927 ident=reg)
924 ident=reg)
928
925
926 heart = cast_bytes(uuid)
927
929 if content['status'] == 'ok':
928 if content['status'] == 'ok':
930 if heart in self.heartmonitor.hearts:
929 if heart in self.heartmonitor.hearts:
931 # already beating
930 # already beating
932 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
931 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
933 self.finish_registration(heart)
932 self.finish_registration(heart)
934 else:
933 else:
935 purge = lambda : self._purge_stalled_registration(heart)
934 purge = lambda : self._purge_stalled_registration(heart)
936 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
935 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
937 dc.start()
936 dc.start()
938 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
937 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
939 else:
938 else:
940 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
939 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
940
941 return eid
941 return eid
942
942
943 def unregister_engine(self, ident, msg):
943 def unregister_engine(self, ident, msg):
@@ -950,7 +950,7 class Hub(SessionFactory):
950 self.log.info("registration::unregister_engine(%r)", eid)
950 self.log.info("registration::unregister_engine(%r)", eid)
951 # print (eid)
951 # print (eid)
952 uuid = self.keytable[eid]
952 uuid = self.keytable[eid]
953 content=dict(id=eid, queue=uuid.decode('ascii'))
953 content=dict(id=eid, uuid=uuid)
954 self.dead_engines.add(uuid)
954 self.dead_engines.add(uuid)
955 # self.ids.remove(eid)
955 # self.ids.remove(eid)
956 # uuid = self.keytable.pop(eid)
956 # uuid = self.keytable.pop(eid)
@@ -964,6 +964,8 class Hub(SessionFactory):
964 dc.start()
964 dc.start()
965 ############## TODO: HANDLE IT ################
965 ############## TODO: HANDLE IT ################
966
966
967 self._save_engine_state()
968
967 if self.notifier:
969 if self.notifier:
968 self.session.send(self.notifier, "unregistration_notification", content=content)
970 self.session.send(self.notifier, "unregistration_notification", content=content)
969
971
@@ -1001,36 +1003,97 class Hub(SessionFactory):
1001 """Second half of engine registration, called after our HeartMonitor
1003 """Second half of engine registration, called after our HeartMonitor
1002 has received a beat from the Engine's Heart."""
1004 has received a beat from the Engine's Heart."""
1003 try:
1005 try:
1004 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
1006 ec = self.incoming_registrations.pop(heart)
1005 except KeyError:
1007 except KeyError:
1006 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1008 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1007 return
1009 return
1008 self.log.info("registration::finished registering engine %i:%r", eid, queue)
1010 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1009 if purge is not None:
1011 if ec.stallback is not None:
1010 purge.stop()
1012 ec.stallback.stop()
1011 control = queue
1013 eid = ec.id
1012 self.ids.add(eid)
1014 self.ids.add(eid)
1013 self.keytable[eid] = queue
1015 self.keytable[eid] = ec.uuid
1014 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
1016 self.engines[eid] = ec
1015 control=control, heartbeat=heart)
1017 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1016 self.by_ident[queue] = eid
1017 self.queues[eid] = list()
1018 self.queues[eid] = list()
1018 self.tasks[eid] = list()
1019 self.tasks[eid] = list()
1019 self.completed[eid] = list()
1020 self.completed[eid] = list()
1020 self.hearts[heart] = eid
1021 self.hearts[heart] = eid
1021 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
1022 content = dict(id=eid, uuid=self.engines[eid].uuid)
1022 if self.notifier:
1023 if self.notifier:
1023 self.session.send(self.notifier, "registration_notification", content=content)
1024 self.session.send(self.notifier, "registration_notification", content=content)
1024 self.log.info("engine::Engine Connected: %i", eid)
1025 self.log.info("engine::Engine Connected: %i", eid)
1025
1026
1027 self._save_engine_state()
1028
1026 def _purge_stalled_registration(self, heart):
1029 def _purge_stalled_registration(self, heart):
1027 if heart in self.incoming_registrations:
1030 if heart in self.incoming_registrations:
1028 eid = self.incoming_registrations.pop(heart)[0]
1031 ec = self.incoming_registrations.pop(heart)
1029 self.log.info("registration::purging stalled registration: %i", eid)
1032 self.log.info("registration::purging stalled registration: %i", ec.id)
1030 else:
1033 else:
1031 pass
1034 pass
1032
1035
1033 #-------------------------------------------------------------------------
1036 #-------------------------------------------------------------------------
1037 # Engine State
1038 #-------------------------------------------------------------------------
1039
1040
1041 def _cleanup_engine_state_file(self):
1042 """cleanup engine state mapping"""
1043
1044 if os.path.exists(self.engine_state_file):
1045 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1046 try:
1047 os.remove(self.engine_state_file)
1048 except IOError:
1049 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1050
1051
1052 def _save_engine_state(self):
1053 """save engine mapping to JSON file"""
1054 if not self.engine_state_file:
1055 return
1056 self.log.debug("save engine state to %s" % self.engine_state_file)
1057 state = {}
1058 engines = {}
1059 for eid, ec in self.engines.iteritems():
1060 if ec.uuid not in self.dead_engines:
1061 engines[eid] = ec.uuid
1062
1063 state['engines'] = engines
1064
1065 state['next_id'] = self._idcounter
1066
1067 with open(self.engine_state_file, 'w') as f:
1068 json.dump(state, f)
1069
1070
1071 def _load_engine_state(self):
1072 """load engine mapping from JSON file"""
1073 if not os.path.exists(self.engine_state_file):
1074 return
1075
1076 self.log.info("loading engine state from %s" % self.engine_state_file)
1077
1078 with open(self.engine_state_file) as f:
1079 state = json.load(f)
1080
1081 save_notifier = self.notifier
1082 self.notifier = None
1083 for eid, uuid in state['engines'].iteritems():
1084 heart = uuid.encode('ascii')
1085 # start with this heart as current and beating:
1086 self.heartmonitor.responses.add(heart)
1087 self.heartmonitor.hearts.add(heart)
1088
1089 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1090 self.finish_registration(heart)
1091
1092 self.notifier = save_notifier
1093
1094 self._idcounter = state['next_id']
1095
1096 #-------------------------------------------------------------------------
1034 # Client Requests
1097 # Client Requests
1035 #-------------------------------------------------------------------------
1098 #-------------------------------------------------------------------------
1036
1099
@@ -1131,7 +1194,7 class Hub(SessionFactory):
1131 except:
1194 except:
1132 reply = error.wrap_exception()
1195 reply = error.wrap_exception()
1133 break
1196 break
1134 uid = self.engines[eid].queue
1197 uid = self.engines[eid].uuid
1135 try:
1198 try:
1136 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1199 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1137 except Exception:
1200 except Exception:
@@ -1205,6 +1268,7 class Hub(SessionFactory):
1205 self.db.add_record(msg_id, init_record(msg))
1268 self.db.add_record(msg_id, init_record(msg))
1206 except Exception:
1269 except Exception:
1207 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1270 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1271 return finish(error.wrap_exception())
1208
1272
1209 finish(dict(status='ok', resubmitted=resubmitted))
1273 finish(dict(status='ok', resubmitted=resubmitted))
1210
1274
@@ -189,6 +189,7 class TaskScheduler(SessionFactory):
189 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
189 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
190 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
190 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
191 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
191 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
192 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
192
193
193 # internals:
194 # internals:
194 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
195 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
@@ -216,6 +217,9 class TaskScheduler(SessionFactory):
216 return self.session.bsession
217 return self.session.bsession
217
218
218 def start(self):
219 def start(self):
220 self.query_stream.on_recv(self.dispatch_query_reply)
221 self.session.send(self.query_stream, "connection_request", {})
222
219 self.engine_stream.on_recv(self.dispatch_result, copy=False)
223 self.engine_stream.on_recv(self.dispatch_result, copy=False)
220 self.client_stream.on_recv(self.dispatch_submission, copy=False)
224 self.client_stream.on_recv(self.dispatch_submission, copy=False)
221
225
@@ -242,6 +246,24 class TaskScheduler(SessionFactory):
242 #-----------------------------------------------------------------------
246 #-----------------------------------------------------------------------
243
247
244
248
249 def dispatch_query_reply(self, msg):
250 """handle reply to our initial connection request"""
251 try:
252 idents,msg = self.session.feed_identities(msg)
253 except ValueError:
254 self.log.warn("task::Invalid Message: %r",msg)
255 return
256 try:
257 msg = self.session.unserialize(msg)
258 except ValueError:
259 self.log.warn("task::Unauthorized message from: %r"%idents)
260 return
261
262 content = msg['content']
263 for uuid in content.get('engines', {}).values():
264 self._register_engine(cast_bytes(uuid))
265
266
245 @util.log_errors
267 @util.log_errors
246 def dispatch_notification(self, msg):
268 def dispatch_notification(self, msg):
247 """dispatch register/unregister events."""
269 """dispatch register/unregister events."""
@@ -263,7 +285,7 class TaskScheduler(SessionFactory):
263 self.log.error("Unhandled message type: %r"%msg_type)
285 self.log.error("Unhandled message type: %r"%msg_type)
264 else:
286 else:
265 try:
287 try:
266 handler(cast_bytes(msg['content']['queue']))
288 handler(cast_bytes(msg['content']['uuid']))
267 except Exception:
289 except Exception:
268 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
290 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
269
291
@@ -714,7 +736,7 class TaskScheduler(SessionFactory):
714
736
715
737
716
738
717 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
739 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
718 logname='root', log_url=None, loglevel=logging.DEBUG,
740 logname='root', log_url=None, loglevel=logging.DEBUG,
719 identity=b'task', in_thread=False):
741 identity=b'task', in_thread=False):
720
742
@@ -734,11 +756,11 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
734 ctx = zmq.Context()
756 ctx = zmq.Context()
735 loop = ioloop.IOLoop()
757 loop = ioloop.IOLoop()
736 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
758 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
737 ins.setsockopt(zmq.IDENTITY, identity)
759 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
738 ins.bind(in_addr)
760 ins.bind(in_addr)
739
761
740 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
762 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
741 outs.setsockopt(zmq.IDENTITY, identity)
763 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
742 outs.bind(out_addr)
764 outs.bind(out_addr)
743 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
765 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
744 mons.connect(mon_addr)
766 mons.connect(mon_addr)
@@ -746,6 +768,9 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
746 nots.setsockopt(zmq.SUBSCRIBE, b'')
768 nots.setsockopt(zmq.SUBSCRIBE, b'')
747 nots.connect(not_addr)
769 nots.connect(not_addr)
748
770
771 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
772 querys.connect(reg_addr)
773
749 # setup logging.
774 # setup logging.
750 if in_thread:
775 if in_thread:
751 log = Application.instance().log
776 log = Application.instance().log
@@ -757,6 +782,7 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
757
782
758 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
783 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
759 mon_stream=mons, notifier_stream=nots,
784 mon_stream=mons, notifier_stream=nots,
785 query_stream=querys,
760 loop=loop, log=log,
786 loop=loop, log=log,
761 config=config)
787 config=config)
762 scheduler.start()
788 scheduler.start()
@@ -50,7 +50,7 class EngineFactory(RegistrationFactory):
50 help="""The location (an IP address) of the controller. This is
50 help="""The location (an IP address) of the controller. This is
51 used for disambiguating URLs, to determine whether
51 used for disambiguating URLs, to determine whether
52 loopback should be used to connect or the public address.""")
52 loopback should be used to connect or the public address.""")
53 timeout=CFloat(2,config=True,
53 timeout=CFloat(5, config=True,
54 help="""The time (in seconds) to wait for the Controller to respond
54 help="""The time (in seconds) to wait for the Controller to respond
55 to registration requests before giving up.""")
55 to registration requests before giving up.""")
56 sshserver=Unicode(config=True,
56 sshserver=Unicode(config=True,
@@ -61,6 +61,7 class EngineFactory(RegistrationFactory):
61 help="""Whether to use paramiko instead of openssh for tunnels.""")
61 help="""Whether to use paramiko instead of openssh for tunnels.""")
62
62
63 # not configurable:
63 # not configurable:
64 connection_info = Dict()
64 user_ns=Dict()
65 user_ns = Dict()
65 id=Integer(allow_none=True)
66 id = Integer(allow_none=True)
66 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
67 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
@@ -96,7 +97,7 class EngineFactory(RegistrationFactory):
96 def connect(s, url):
97 def connect(s, url):
97 url = disambiguate_url(url, self.location)
98 url = disambiguate_url(url, self.location)
98 if self.using_ssh:
99 if self.using_ssh:
99 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
100 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
100 return tunnel.tunnel_connection(s, url, self.sshserver,
101 return tunnel.tunnel_connection(s, url, self.sshserver,
101 keyfile=self.sshkey, paramiko=self.paramiko,
102 keyfile=self.sshkey, paramiko=self.paramiko,
102 password=password,
103 password=password,
@@ -108,12 +109,12 class EngineFactory(RegistrationFactory):
108 """like connect, but don't complete the connection (for use by heartbeat)"""
109 """like connect, but don't complete the connection (for use by heartbeat)"""
109 url = disambiguate_url(url, self.location)
110 url = disambiguate_url(url, self.location)
110 if self.using_ssh:
111 if self.using_ssh:
111 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
112 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
112 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
113 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
113 keyfile=self.sshkey, paramiko=self.paramiko,
114 keyfile=self.sshkey, paramiko=self.paramiko,
114 password=password,
115 password=password,
115 )
116 )
116 return url
117 return str(url)
117 return connect, maybe_tunnel
118 return connect, maybe_tunnel
118
119
119 def register(self):
120 def register(self):
@@ -128,7 +129,7 class EngineFactory(RegistrationFactory):
128 self.registrar = zmqstream.ZMQStream(reg, self.loop)
129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
129
130
130
131
131 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 content = dict(uuid=self.ident)
132 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
133 # print (self.session.key)
134 # print (self.session.key)
134 self.session.send(self.registrar, "registration_request",content=content)
135 self.session.send(self.registrar, "registration_request", content=content)
@@ -140,50 +141,43 class EngineFactory(RegistrationFactory):
140 loop = self.loop
141 loop = self.loop
141 identity = self.bident
142 identity = self.bident
142 idents,msg = self.session.feed_identities(msg)
143 idents,msg = self.session.feed_identities(msg)
143 msg = Message(self.session.unserialize(msg))
144 msg = self.session.unserialize(msg)
145 content = msg['content']
146 info = self.connection_info
144
147
145 if msg.content.status == 'ok':
148 def url(key):
146 self.id = int(msg.content.id)
149 """get zmq url for given channel"""
150 return str(info["interface"] + ":%i" % info[key])
147
151
148 # launch heartbeat
152 if content['status'] == 'ok':
149 hb_addrs = msg.content.heartbeat
153 self.id = int(content['id'])
150
154
155 # launch heartbeat
151 # possibly forward hb ports with tunnels
156 # possibly forward hb ports with tunnels
152 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
157 hb_ping = maybe_tunnel(url('hb_ping'))
153 heart = Heart(*map(str, hb_addrs), heart_id=identity)
158 hb_pong = maybe_tunnel(url('hb_pong'))
159
160 heart = Heart(hb_ping, hb_pong, heart_id=identity)
154 heart.start()
161 heart.start()
155
162
156 # create Shell Streams (MUX, Task, etc.):
163 # create Shell Connections (MUX, Task, etc.):
157 queue_addr = msg.content.mux
164 shell_addrs = url('mux'), url('task')
158 shell_addrs = [ str(queue_addr) ]
165
159 task_addr = msg.content.task
166 # Use only one shell stream for mux and tasks
160 if task_addr:
161 shell_addrs.append(str(task_addr))
162
163 # Uncomment this to go back to two-socket model
164 # shell_streams = []
165 # for addr in shell_addrs:
166 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
167 # stream.setsockopt(zmq.IDENTITY, identity)
168 # stream.connect(disambiguate_url(addr, self.location))
169 # shell_streams.append(stream)
170
171 # Now use only one shell stream for mux and tasks
172 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
167 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
173 stream.setsockopt(zmq.IDENTITY, identity)
168 stream.setsockopt(zmq.IDENTITY, identity)
174 shell_streams = [stream]
169 shell_streams = [stream]
175 for addr in shell_addrs:
170 for addr in shell_addrs:
176 connect(stream, addr)
171 connect(stream, addr)
177 # end single stream-socket
178
172
179 # control stream:
173 # control stream:
180 control_addr = str(msg.content.control)
174 control_addr = url('control')
181 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
175 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
182 control_stream.setsockopt(zmq.IDENTITY, identity)
176 control_stream.setsockopt(zmq.IDENTITY, identity)
183 connect(control_stream, control_addr)
177 connect(control_stream, control_addr)
184
178
185 # create iopub stream:
179 # create iopub stream:
186 iopub_addr = msg.content.iopub
180 iopub_addr = url('iopub')
187 iopub_socket = ctx.socket(zmq.PUB)
181 iopub_socket = ctx.socket(zmq.PUB)
188 iopub_socket.setsockopt(zmq.IDENTITY, identity)
182 iopub_socket.setsockopt(zmq.IDENTITY, identity)
189 connect(iopub_socket, iopub_addr)
183 connect(iopub_socket, iopub_addr)
@@ -257,9 +257,11 class Session(Configurable):
257 if new.lower() == 'json':
257 if new.lower() == 'json':
258 self.pack = json_packer
258 self.pack = json_packer
259 self.unpack = json_unpacker
259 self.unpack = json_unpacker
260 self.unpacker = new
260 elif new.lower() == 'pickle':
261 elif new.lower() == 'pickle':
261 self.pack = pickle_packer
262 self.pack = pickle_packer
262 self.unpack = pickle_unpacker
263 self.unpack = pickle_unpacker
264 self.unpacker = new
263 else:
265 else:
264 self.pack = import_item(str(new))
266 self.pack = import_item(str(new))
265
267
@@ -270,9 +272,11 class Session(Configurable):
270 if new.lower() == 'json':
272 if new.lower() == 'json':
271 self.pack = json_packer
273 self.pack = json_packer
272 self.unpack = json_unpacker
274 self.unpack = json_unpacker
275 self.packer = new
273 elif new.lower() == 'pickle':
276 elif new.lower() == 'pickle':
274 self.pack = pickle_packer
277 self.pack = pickle_packer
275 self.unpack = pickle_unpacker
278 self.unpack = pickle_unpacker
279 self.packer = new
276 else:
280 else:
277 self.unpack = import_item(str(new))
281 self.unpack = import_item(str(new))
278
282
@@ -43,9 +43,7 monitor the survival of the Engine process.
43 Message type: ``registration_request``::
43 Message type: ``registration_request``::
44
44
45 content = {
45 content = {
46 'queue' : 'abcd-1234-...', # the MUX queue zmq.IDENTITY
46 'uuid' : 'abcd-1234-...', # the zmq.IDENTITY of the engine's sockets
47 'control' : 'abcd-1234-...', # the control queue zmq.IDENTITY
48 'heartbeat' : 'abcd-1234-...' # the heartbeat zmq.IDENTITY
49 }
47 }
50
48
51 .. note::
49 .. note::
@@ -63,10 +61,6 Message type: ``registration_reply``::
63 'status' : 'ok', # or 'error'
61 'status' : 'ok', # or 'error'
64 # if ok:
62 # if ok:
65 'id' : 0, # int, the engine id
63 'id' : 0, # int, the engine id
66 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue
67 'control' : 'tcp://...', # addr for control queue
68 'heartbeat' : ('tcp://...','tcp://...'), # tuple containing two interfaces needed for heartbeat
69 'task' : 'tcp://...', # addr for task queue, or None if no task queue running
70 }
64 }
71
65
72 Clients use the same socket as engines to start their connections. Connection requests
66 Clients use the same socket as engines to start their connections. Connection requests
@@ -84,11 +78,6 Message type: ``connection_reply``::
84
78
85 content = {
79 content = {
86 'status' : 'ok', # or 'error'
80 'status' : 'ok', # or 'error'
87 # if ok:
88 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue
89 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple)
90 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc.
91 'control' : 'tcp...', # addr for control methods, like abort, etc.
92 }
81 }
93
82
94 Heartbeat
83 Heartbeat
@@ -110,13 +99,14 Message type: ``registration_notification``::
110
99
111 content = {
100 content = {
112 'id' : 0, # engine ID that has been registered
101 'id' : 0, # engine ID that has been registered
113 'queue' : 'engine_id' # the IDENT for the engine's queue
102 'uuid' : 'engine_id' # the IDENT for the engine's sockets
114 }
103 }
115
104
116 Message type : ``unregistration_notification``::
105 Message type : ``unregistration_notification``::
117
106
118 content = {
107 content = {
119 'id' : 0 # engine ID that has been unregistered
108 'id' : 0 # engine ID that has been unregistered
109 'uuid' : 'engine_id' # the IDENT for the engine's sockets
120 }
110 }
121
111
122
112
General Comments 0
You need to be logged in to leave comments. Login now