Show More
@@ -116,7 +116,10 b' flags.update({' | |||||
116 | select one of the true db backends. |
|
116 | select one of the true db backends. | |
117 | """), |
|
117 | """), | |
118 | 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}}, |
|
118 | 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}}, | |
119 | 'reuse existing json connection files') |
|
119 | 'reuse existing json connection files'), | |
|
120 | 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}}, | |||
|
121 | 'Attempt to restore engines from a JSON file. ' | |||
|
122 | 'For use when resuming a crashed controller'), | |||
120 | }) |
|
123 | }) | |
121 |
|
124 | |||
122 | flags.update(session_flags) |
|
125 | flags.update(session_flags) | |
@@ -156,6 +159,10 b' class IPControllerApp(BaseParallelApplication):' | |||||
156 | If False, connection files will be removed on a clean exit. |
|
159 | If False, connection files will be removed on a clean exit. | |
157 | """ |
|
160 | """ | |
158 | ) |
|
161 | ) | |
|
162 | restore_engines = Bool(False, config=True, | |||
|
163 | help="""Reload engine state from JSON file | |||
|
164 | """ | |||
|
165 | ) | |||
159 | ssh_server = Unicode(u'', config=True, |
|
166 | ssh_server = Unicode(u'', config=True, | |
160 | help="""ssh url for clients to use when connecting to the Controller |
|
167 | help="""ssh url for clients to use when connecting to the Controller | |
161 | processes. It should be of the form: [user@]server[:port]. The |
|
168 | processes. It should be of the form: [user@]server[:port]. The | |
@@ -209,21 +216,17 b' class IPControllerApp(BaseParallelApplication):' | |||||
209 | def save_connection_dict(self, fname, cdict): |
|
216 | def save_connection_dict(self, fname, cdict): | |
210 | """save a connection dict to json file.""" |
|
217 | """save a connection dict to json file.""" | |
211 | c = self.config |
|
218 | c = self.config | |
212 |
url = cdict[' |
|
219 | url = cdict['registration'] | |
213 | location = cdict['location'] |
|
220 | location = cdict['location'] | |
|
221 | ||||
214 | if not location: |
|
222 | if not location: | |
215 | try: |
|
223 | try: | |
216 | proto,ip,port = split_url(url) |
|
224 | location = socket.gethostbyname_ex(socket.gethostname())[2][-1] | |
217 |
except |
|
225 | except (socket.gaierror, IndexError): | |
218 | pass |
|
226 | self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1." | |
219 | else: |
|
227 | " You may need to specify '--location=<external_ip_address>' to help" | |
220 | try: |
|
228 | " IPython decide when to connect via loopback.") | |
221 | location = socket.gethostbyname_ex(socket.gethostname())[2][-1] |
|
229 | location = '127.0.0.1' | |
222 | except (socket.gaierror, IndexError): |
|
|||
223 | self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1." |
|
|||
224 | " You may need to specify '--location=<external_ip_address>' to help" |
|
|||
225 | " IPython decide when to connect via loopback.") |
|
|||
226 | location = '127.0.0.1' |
|
|||
227 | cdict['location'] = location |
|
230 | cdict['location'] = location | |
228 | fname = os.path.join(self.profile_dir.security_dir, fname) |
|
231 | fname = os.path.join(self.profile_dir.security_dir, fname) | |
229 | self.log.info("writing connection info to %s", fname) |
|
232 | self.log.info("writing connection info to %s", fname) | |
@@ -235,35 +238,51 b' class IPControllerApp(BaseParallelApplication):' | |||||
235 | """load config from existing json connector files.""" |
|
238 | """load config from existing json connector files.""" | |
236 | c = self.config |
|
239 | c = self.config | |
237 | self.log.debug("loading config from JSON") |
|
240 | self.log.debug("loading config from JSON") | |
238 | # load from engine config |
|
241 | ||
|
242 | # load engine config | |||
|
243 | ||||
239 | fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file) |
|
244 | fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file) | |
240 | self.log.info("loading connection info from %s", fname) |
|
245 | self.log.info("loading connection info from %s", fname) | |
241 | with open(fname) as f: |
|
246 | with open(fname) as f: | |
242 | cfg = json.loads(f.read()) |
|
247 | ecfg = json.loads(f.read()) | |
243 | key = cfg['exec_key'] |
|
248 | ||
244 | # json gives unicode, Session.key wants bytes |
|
249 | # json gives unicode, Session.key wants bytes | |
245 | c.Session.key = key.encode('ascii') |
|
250 | c.Session.key = ecfg['exec_key'].encode('ascii') | |
246 | xport,addr = cfg['url'].split('://') |
|
251 | ||
247 | c.HubFactory.engine_transport = xport |
|
252 | xport,ip = ecfg['interface'].split('://') | |
248 | ip,ports = addr.split(':') |
|
253 | ||
249 | c.HubFactory.engine_ip = ip |
|
254 | c.HubFactory.engine_ip = ip | |
250 |
c.HubFactory. |
|
255 | c.HubFactory.engine_transport = xport | |
251 | self.location = cfg['location'] |
|
256 | ||
|
257 | self.location = ecfg['location'] | |||
252 | if not self.engine_ssh_server: |
|
258 | if not self.engine_ssh_server: | |
253 | self.engine_ssh_server = cfg['ssh'] |
|
259 | self.engine_ssh_server = ecfg['ssh'] | |
|
260 | ||||
254 | # load client config |
|
261 | # load client config | |
|
262 | ||||
255 | fname = os.path.join(self.profile_dir.security_dir, self.client_json_file) |
|
263 | fname = os.path.join(self.profile_dir.security_dir, self.client_json_file) | |
256 | self.log.info("loading connection info from %s", fname) |
|
264 | self.log.info("loading connection info from %s", fname) | |
257 | with open(fname) as f: |
|
265 | with open(fname) as f: | |
258 | cfg = json.loads(f.read()) |
|
266 | ccfg = json.loads(f.read()) | |
259 | assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys" |
|
267 | ||
260 | xport,addr = cfg['url'].split('://') |
|
268 | for key in ('exec_key', 'registration', 'pack', 'unpack'): | |
|
269 | assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key | |||
|
270 | ||||
|
271 | xport,addr = ccfg['interface'].split('://') | |||
|
272 | ||||
261 | c.HubFactory.client_transport = xport |
|
273 | c.HubFactory.client_transport = xport | |
262 | ip,ports = addr.split(':') |
|
|||
263 | c.HubFactory.client_ip = ip |
|
274 | c.HubFactory.client_ip = ip | |
264 | if not self.ssh_server: |
|
275 | if not self.ssh_server: | |
265 | self.ssh_server = cfg['ssh'] |
|
276 | self.ssh_server = ccfg['ssh'] | |
266 | assert int(ports) == c.HubFactory.regport, "regport mismatch" |
|
277 | ||
|
278 | # load port config: | |||
|
279 | c.HubFactory.regport = ecfg['registration'] | |||
|
280 | c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong']) | |||
|
281 | c.HubFactory.control = (ccfg['control'], ecfg['control']) | |||
|
282 | c.HubFactory.mux = (ccfg['mux'], ecfg['mux']) | |||
|
283 | c.HubFactory.task = (ccfg['task'], ecfg['task']) | |||
|
284 | c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub']) | |||
|
285 | c.HubFactory.notifier_port = ccfg['notification'] | |||
267 |
|
286 | |||
268 | def cleanup_connection_files(self): |
|
287 | def cleanup_connection_files(self): | |
269 | if self.reuse_files: |
|
288 | if self.reuse_files: | |
@@ -314,29 +333,42 b' class IPControllerApp(BaseParallelApplication):' | |||||
314 | if self.write_connection_files: |
|
333 | if self.write_connection_files: | |
315 | # save to new json config files |
|
334 | # save to new json config files | |
316 | f = self.factory |
|
335 | f = self.factory | |
317 | cdict = {'exec_key' : f.session.key.decode('ascii'), |
|
336 | base = { | |
318 | 'ssh' : self.ssh_server, |
|
337 | 'exec_key' : f.session.key.decode('ascii'), | |
319 | 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), |
|
338 | 'location' : self.location, | |
320 | 'location' : self.location |
|
339 | 'pack' : f.session.packer, | |
321 | } |
|
340 | 'unpack' : f.session.unpacker, | |
|
341 | } | |||
|
342 | ||||
|
343 | cdict = {'ssh' : self.ssh_server} | |||
|
344 | cdict.update(f.client_info) | |||
|
345 | cdict.update(base) | |||
322 | self.save_connection_dict(self.client_json_file, cdict) |
|
346 | self.save_connection_dict(self.client_json_file, cdict) | |
323 |
|
|
347 | ||
324 | edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) |
|
348 | edict = {'ssh' : self.engine_ssh_server} | |
325 |
edict |
|
349 | edict.update(f.engine_info) | |
|
350 | edict.update(base) | |||
326 | self.save_connection_dict(self.engine_json_file, edict) |
|
351 | self.save_connection_dict(self.engine_json_file, edict) | |
327 |
|
352 | |||
|
353 | fname = "engines%s.json" % self.cluster_id | |||
|
354 | self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname) | |||
|
355 | if self.restore_engines: | |||
|
356 | self.factory.hub._load_engine_state() | |||
|
357 | ||||
328 | def init_schedulers(self): |
|
358 | def init_schedulers(self): | |
329 | children = self.children |
|
359 | children = self.children | |
330 | mq = import_item(str(self.mq_class)) |
|
360 | mq = import_item(str(self.mq_class)) | |
331 |
|
361 | |||
332 |
|
|
362 | f = self.factory | |
|
363 | ident = f.session.bsession | |||
333 | # disambiguate url, in case of * |
|
364 | # disambiguate url, in case of * | |
334 |
monitor_url = disambiguate_url( |
|
365 | monitor_url = disambiguate_url(f.monitor_url) | |
335 | # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url |
|
366 | # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url | |
336 | # IOPub relay (in a Process) |
|
367 | # IOPub relay (in a Process) | |
337 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') |
|
368 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') | |
338 |
q.bind_in( |
|
369 | q.bind_in(f.client_url('iopub')) | |
339 | q.bind_out(hub.engine_info['iopub']) |
|
370 | q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub") | |
|
371 | q.bind_out(f.engine_url('iopub')) | |||
340 | q.setsockopt_out(zmq.SUBSCRIBE, b'') |
|
372 | q.setsockopt_out(zmq.SUBSCRIBE, b'') | |
341 | q.connect_mon(monitor_url) |
|
373 | q.connect_mon(monitor_url) | |
342 | q.daemon=True |
|
374 | q.daemon=True | |
@@ -344,18 +376,20 b' class IPControllerApp(BaseParallelApplication):' | |||||
344 |
|
376 | |||
345 | # Multiplexer Queue (in a Process) |
|
377 | # Multiplexer Queue (in a Process) | |
346 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') |
|
378 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') | |
347 |
q.bind_in( |
|
379 | q.bind_in(f.client_url('mux')) | |
348 | q.setsockopt_in(zmq.IDENTITY, b'mux') |
|
380 | q.setsockopt_in(zmq.IDENTITY, b'mux_in') | |
349 |
q.bind_out( |
|
381 | q.bind_out(f.engine_url('mux')) | |
|
382 | q.setsockopt_out(zmq.IDENTITY, b'mux_out') | |||
350 | q.connect_mon(monitor_url) |
|
383 | q.connect_mon(monitor_url) | |
351 | q.daemon=True |
|
384 | q.daemon=True | |
352 | children.append(q) |
|
385 | children.append(q) | |
353 |
|
386 | |||
354 | # Control Queue (in a Process) |
|
387 | # Control Queue (in a Process) | |
355 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol') |
|
388 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol') | |
356 |
q.bind_in( |
|
389 | q.bind_in(f.client_url('control')) | |
357 | q.setsockopt_in(zmq.IDENTITY, b'control') |
|
390 | q.setsockopt_in(zmq.IDENTITY, b'control_in') | |
358 |
q.bind_out( |
|
391 | q.bind_out(f.engine_url('control')) | |
|
392 | q.setsockopt_out(zmq.IDENTITY, b'control_out') | |||
359 | q.connect_mon(monitor_url) |
|
393 | q.connect_mon(monitor_url) | |
360 | q.daemon=True |
|
394 | q.daemon=True | |
361 | children.append(q) |
|
395 | children.append(q) | |
@@ -368,9 +402,10 b' class IPControllerApp(BaseParallelApplication):' | |||||
368 | self.log.warn("task::using pure DEALER Task scheduler") |
|
402 | self.log.warn("task::using pure DEALER Task scheduler") | |
369 | q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask') |
|
403 | q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask') | |
370 | # q.setsockopt_out(zmq.HWM, hub.hwm) |
|
404 | # q.setsockopt_out(zmq.HWM, hub.hwm) | |
371 |
q.bind_in( |
|
405 | q.bind_in(f.client_url('task')) | |
372 | q.setsockopt_in(zmq.IDENTITY, b'task') |
|
406 | q.setsockopt_in(zmq.IDENTITY, b'task_in') | |
373 |
q.bind_out( |
|
407 | q.bind_out(f.engine_url('task')) | |
|
408 | q.setsockopt_out(zmq.IDENTITY, b'task_out') | |||
374 | q.connect_mon(monitor_url) |
|
409 | q.connect_mon(monitor_url) | |
375 | q.daemon=True |
|
410 | q.daemon=True | |
376 | children.append(q) |
|
411 | children.append(q) | |
@@ -379,8 +414,10 b' class IPControllerApp(BaseParallelApplication):' | |||||
379 |
|
414 | |||
380 | else: |
|
415 | else: | |
381 | self.log.info("task::using Python %s Task scheduler"%scheme) |
|
416 | self.log.info("task::using Python %s Task scheduler"%scheme) | |
382 |
sargs = ( |
|
417 | sargs = (f.client_url('task'), f.engine_url('task'), | |
383 |
|
|
418 | monitor_url, disambiguate_url(f.client_url('notification')), | |
|
419 | disambiguate_url(f.client_url('registration')), | |||
|
420 | ) | |||
384 | kwargs = dict(logname='scheduler', loglevel=self.log_level, |
|
421 | kwargs = dict(logname='scheduler', loglevel=self.log_level, | |
385 | log_url = self.log_url, config=dict(self.config)) |
|
422 | log_url = self.log_url, config=dict(self.config)) | |
386 | if 'Process' in self.mq_class: |
|
423 | if 'Process' in self.mq_class: |
@@ -45,7 +45,7 b' from IPython.zmq.session import (' | |||||
45 | from IPython.config.configurable import Configurable |
|
45 | from IPython.config.configurable import Configurable | |
46 |
|
46 | |||
47 | from IPython.parallel.engine.engine import EngineFactory |
|
47 | from IPython.parallel.engine.engine import EngineFactory | |
48 |
from IPython.parallel.util import disambiguate_ |
|
48 | from IPython.parallel.util import disambiguate_ip_address | |
49 |
|
49 | |||
50 | from IPython.utils.importstring import import_item |
|
50 | from IPython.utils.importstring import import_item | |
51 | from IPython.utils.py3compat import cast_bytes |
|
51 | from IPython.utils.py3compat import cast_bytes | |
@@ -211,24 +211,36 b' class IPEngineApp(BaseParallelApplication):' | |||||
211 | with open(self.url_file) as f: |
|
211 | with open(self.url_file) as f: | |
212 | d = json.loads(f.read()) |
|
212 | d = json.loads(f.read()) | |
213 |
|
213 | |||
214 | if 'exec_key' in d: |
|
214 | # allow hand-override of location for disambiguation | |
215 | config.Session.key = cast_bytes(d['exec_key']) |
|
215 | # and ssh-server | |
216 |
|
||||
217 | try: |
|
216 | try: | |
218 | config.EngineFactory.location |
|
217 | config.EngineFactory.location | |
219 | except AttributeError: |
|
218 | except AttributeError: | |
220 | config.EngineFactory.location = d['location'] |
|
219 | config.EngineFactory.location = d['location'] | |
221 |
|
220 | |||
222 | d['url'] = disambiguate_url(d['url'], config.EngineFactory.location) |
|
|||
223 | try: |
|
|||
224 | config.EngineFactory.url |
|
|||
225 | except AttributeError: |
|
|||
226 | config.EngineFactory.url = d['url'] |
|
|||
227 |
|
||||
228 | try: |
|
221 | try: | |
229 | config.EngineFactory.sshserver |
|
222 | config.EngineFactory.sshserver | |
230 | except AttributeError: |
|
223 | except AttributeError: | |
231 |
config.EngineFactory.sshserver = d |
|
224 | config.EngineFactory.sshserver = d.get('ssh') | |
|
225 | ||||
|
226 | location = config.EngineFactory.location | |||
|
227 | ||||
|
228 | proto, ip = d['interface'].split('://') | |||
|
229 | ip = disambiguate_ip_address(ip) | |||
|
230 | d['interface'] = '%s://%s' % (proto, ip) | |||
|
231 | ||||
|
232 | # DO NOT allow override of basic URLs, serialization, or exec_key | |||
|
233 | # JSON file takes top priority there | |||
|
234 | config.Session.key = cast_bytes(d['exec_key']) | |||
|
235 | ||||
|
236 | config.EngineFactory.url = d['interface'] + ':%i' % d['registration'] | |||
|
237 | ||||
|
238 | config.Session.packer = d['pack'] | |||
|
239 | config.Session.unpacker = d['unpack'] | |||
|
240 | ||||
|
241 | self.log.debug("Config changed:") | |||
|
242 | self.log.debug("%r", config) | |||
|
243 | self.connection_info = d | |||
232 |
|
244 | |||
233 | def bind_kernel(self, **kwargs): |
|
245 | def bind_kernel(self, **kwargs): | |
234 | """Promote engine to listening kernel, accessible to frontends.""" |
|
246 | """Promote engine to listening kernel, accessible to frontends.""" | |
@@ -320,7 +332,9 b' class IPEngineApp(BaseParallelApplication):' | |||||
320 | # shell_class = import_item(self.master_config.Global.shell_class) |
|
332 | # shell_class = import_item(self.master_config.Global.shell_class) | |
321 | # print self.config |
|
333 | # print self.config | |
322 | try: |
|
334 | try: | |
323 |
self.engine = EngineFactory(config=config, log=self.log |
|
335 | self.engine = EngineFactory(config=config, log=self.log, | |
|
336 | connection_info=self.connection_info, | |||
|
337 | ) | |||
324 | except: |
|
338 | except: | |
325 | self.log.error("Couldn't start the Engine", exc_info=True) |
|
339 | self.log.error("Couldn't start the Engine", exc_info=True) | |
326 | self.exit(1) |
|
340 | self.exit(1) |
@@ -217,7 +217,9 b' class Client(HasTraits):' | |||||
217 | Parameters |
|
217 | Parameters | |
218 | ---------- |
|
218 | ---------- | |
219 |
|
219 | |||
220 |
url_ |
|
220 | url_file : str/unicode; path to ipcontroller-client.json | |
|
221 | This JSON file should contain all the information needed to connect to a cluster, | |||
|
222 | and is likely the only argument needed. | |||
221 | Connection information for the Hub's registration. If a json connector |
|
223 | Connection information for the Hub's registration. If a json connector | |
222 | file is given, then likely no further configuration is necessary. |
|
224 | file is given, then likely no further configuration is necessary. | |
223 | [Default: use profile] |
|
225 | [Default: use profile] | |
@@ -239,14 +241,6 b' class Client(HasTraits):' | |||||
239 | If specified, this will be relayed to the Session for configuration |
|
241 | If specified, this will be relayed to the Session for configuration | |
240 | username : str |
|
242 | username : str | |
241 | set username for the session object |
|
243 | set username for the session object | |
242 | packer : str (import_string) or callable |
|
|||
243 | Can be either the simple keyword 'json' or 'pickle', or an import_string to a |
|
|||
244 | function to serialize messages. Must support same input as |
|
|||
245 | JSON, and output must be bytes. |
|
|||
246 | You can pass a callable directly as `pack` |
|
|||
247 | unpacker : str (import_string) or callable |
|
|||
248 | The inverse of packer. Only necessary if packer is specified as *not* one |
|
|||
249 | of 'json' or 'pickle'. |
|
|||
250 |
|
244 | |||
251 | #-------------- ssh related args ---------------- |
|
245 | #-------------- ssh related args ---------------- | |
252 | # These are args for configuring the ssh tunnel to be used |
|
246 | # These are args for configuring the ssh tunnel to be used | |
@@ -271,17 +265,6 b' class Client(HasTraits):' | |||||
271 | flag for whether to use paramiko instead of shell ssh for tunneling. |
|
265 | flag for whether to use paramiko instead of shell ssh for tunneling. | |
272 | [default: True on win32, False else] |
|
266 | [default: True on win32, False else] | |
273 |
|
267 | |||
274 | ------- exec authentication args ------- |
|
|||
275 | If even localhost is untrusted, you can have some protection against |
|
|||
276 | unauthorized execution by signing messages with HMAC digests. |
|
|||
277 | Messages are still sent as cleartext, so if someone can snoop your |
|
|||
278 | loopback traffic this will not protect your privacy, but will prevent |
|
|||
279 | unauthorized execution. |
|
|||
280 |
|
||||
281 | exec_key : str |
|
|||
282 | an authentication key or file containing a key |
|
|||
283 | default: None |
|
|||
284 |
|
||||
285 |
|
268 | |||
286 | Attributes |
|
269 | Attributes | |
287 | ---------- |
|
270 | ---------- | |
@@ -378,8 +361,8 b' class Client(HasTraits):' | |||||
378 | # don't raise on positional args |
|
361 | # don't raise on positional args | |
379 | return HasTraits.__new__(self, **kw) |
|
362 | return HasTraits.__new__(self, **kw) | |
380 |
|
363 | |||
381 |
def __init__(self, url_ |
|
364 | def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None, | |
382 |
context=None, debug=False, |
|
365 | context=None, debug=False, | |
383 | sshserver=None, sshkey=None, password=None, paramiko=None, |
|
366 | sshserver=None, sshkey=None, password=None, paramiko=None, | |
384 | timeout=10, **extra_args |
|
367 | timeout=10, **extra_args | |
385 | ): |
|
368 | ): | |
@@ -391,39 +374,46 b' class Client(HasTraits):' | |||||
391 | context = zmq.Context.instance() |
|
374 | context = zmq.Context.instance() | |
392 | self._context = context |
|
375 | self._context = context | |
393 | self._stop_spinning = Event() |
|
376 | self._stop_spinning = Event() | |
|
377 | ||||
|
378 | if 'url_or_file' in extra_args: | |||
|
379 | url_file = extra_args['url_or_file'] | |||
|
380 | warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning) | |||
|
381 | ||||
|
382 | if url_file and util.is_url(url_file): | |||
|
383 | raise ValueError("single urls cannot be specified, url-files must be used.") | |||
394 |
|
384 | |||
395 | self._setup_profile_dir(self.profile, profile_dir, ipython_dir) |
|
385 | self._setup_profile_dir(self.profile, profile_dir, ipython_dir) | |
|
386 | ||||
396 | if self._cd is not None: |
|
387 | if self._cd is not None: | |
397 |
if url_ |
|
388 | if url_file is None: | |
398 |
url_ |
|
389 | url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') | |
399 |
if url_ |
|
390 | if url_file is None: | |
400 | raise ValueError( |
|
391 | raise ValueError( | |
401 | "I can't find enough information to connect to a hub!" |
|
392 | "I can't find enough information to connect to a hub!" | |
402 |
" Please specify at least one of url_ |
|
393 | " Please specify at least one of url_file or profile." | |
403 | ) |
|
394 | ) | |
404 |
|
395 | |||
405 |
|
|
396 | with open(url_file) as f: | |
406 | # it's not a url, try for a file |
|
397 | cfg = json.load(f) | |
407 | if not os.path.exists(url_or_file): |
|
398 | ||
408 | if self._cd: |
|
399 | self._task_scheme = cfg['task_scheme'] | |
409 | url_or_file = os.path.join(self._cd.security_dir, url_or_file) |
|
|||
410 | if not os.path.exists(url_or_file): |
|
|||
411 | raise IOError("Connection file not found: %r" % url_or_file) |
|
|||
412 | with open(url_or_file) as f: |
|
|||
413 | cfg = json.loads(f.read()) |
|
|||
414 | else: |
|
|||
415 | cfg = {'url':url_or_file} |
|
|||
416 |
|
400 | |||
417 | # sync defaults from args, json: |
|
401 | # sync defaults from args, json: | |
418 | if sshserver: |
|
402 | if sshserver: | |
419 | cfg['ssh'] = sshserver |
|
403 | cfg['ssh'] = sshserver | |
420 | if exec_key: |
|
404 | ||
421 | cfg['exec_key'] = exec_key |
|
|||
422 | exec_key = cfg['exec_key'] |
|
|||
423 | location = cfg.setdefault('location', None) |
|
405 | location = cfg.setdefault('location', None) | |
424 | cfg['url'] = util.disambiguate_url(cfg['url'], location) |
|
406 | ||
425 | url = cfg['url'] |
|
407 | proto,addr = cfg['interface'].split('://') | |
426 | proto,addr,port = util.split_url(url) |
|
408 | addr = util.disambiguate_ip_address(addr) | |
|
409 | cfg['interface'] = "%s://%s" % (proto, addr) | |||
|
410 | ||||
|
411 | # turn interface,port into full urls: | |||
|
412 | for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'): | |||
|
413 | cfg[key] = cfg['interface'] + ':%i' % cfg[key] | |||
|
414 | ||||
|
415 | url = cfg['registration'] | |||
|
416 | ||||
427 | if location is not None and addr == '127.0.0.1': |
|
417 | if location is not None and addr == '127.0.0.1': | |
428 | # location specified, and connection is expected to be local |
|
418 | # location specified, and connection is expected to be local | |
429 | if location not in LOCAL_IPS and not sshserver: |
|
419 | if location not in LOCAL_IPS and not sshserver: | |
@@ -448,7 +438,7 b' class Client(HasTraits):' | |||||
448 | self._ssh = bool(sshserver or sshkey or password) |
|
438 | self._ssh = bool(sshserver or sshkey or password) | |
449 | if self._ssh and sshserver is None: |
|
439 | if self._ssh and sshserver is None: | |
450 | # default to ssh via localhost |
|
440 | # default to ssh via localhost | |
451 | sshserver = url.split('://')[1].split(':')[0] |
|
441 | sshserver = addr | |
452 | if self._ssh and password is None: |
|
442 | if self._ssh and password is None: | |
453 | if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko): |
|
443 | if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko): | |
454 | password=False |
|
444 | password=False | |
@@ -457,20 +447,18 b' class Client(HasTraits):' | |||||
457 | ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko) |
|
447 | ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko) | |
458 |
|
448 | |||
459 | # configure and construct the session |
|
449 | # configure and construct the session | |
460 | if exec_key is not None: |
|
450 | extra_args['packer'] = cfg['pack'] | |
461 | if os.path.isfile(exec_key): |
|
451 | extra_args['unpacker'] = cfg['unpack'] | |
462 |
|
|
452 | extra_args['key'] = cast_bytes(cfg['exec_key']) | |
463 |
|
|
453 | ||
464 | exec_key = cast_bytes(exec_key) |
|
|||
465 | extra_args['key'] = exec_key |
|
|||
466 | self.session = Session(**extra_args) |
|
454 | self.session = Session(**extra_args) | |
467 |
|
455 | |||
468 | self._query_socket = self._context.socket(zmq.DEALER) |
|
456 | self._query_socket = self._context.socket(zmq.DEALER) | |
469 | self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) |
|
457 | ||
470 | if self._ssh: |
|
458 | if self._ssh: | |
471 |
tunnel.tunnel_connection(self._query_socket, |
|
459 | tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs) | |
472 | else: |
|
460 | else: | |
473 |
self._query_socket.connect( |
|
461 | self._query_socket.connect(cfg['registration']) | |
474 |
|
462 | |||
475 | self.session.debug = self.debug |
|
463 | self.session.debug = self.debug | |
476 |
|
464 | |||
@@ -520,8 +508,9 b' class Client(HasTraits):' | |||||
520 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" |
|
508 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" | |
521 | for k,v in engines.iteritems(): |
|
509 | for k,v in engines.iteritems(): | |
522 | eid = int(k) |
|
510 | eid = int(k) | |
|
511 | if eid not in self._engines: | |||
|
512 | self._ids.append(eid) | |||
523 | self._engines[eid] = v |
|
513 | self._engines[eid] = v | |
524 | self._ids.append(eid) |
|
|||
525 | self._ids = sorted(self._ids) |
|
514 | self._ids = sorted(self._ids) | |
526 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ |
|
515 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ | |
527 | self._task_scheme == 'pure' and self._task_socket: |
|
516 | self._task_scheme == 'pure' and self._task_socket: | |
@@ -583,7 +572,7 b' class Client(HasTraits):' | |||||
583 | self._connected=True |
|
572 | self._connected=True | |
584 |
|
573 | |||
585 | def connect_socket(s, url): |
|
574 | def connect_socket(s, url): | |
586 | url = util.disambiguate_url(url, self._config['location']) |
|
575 | # url = util.disambiguate_url(url, self._config['location']) | |
587 | if self._ssh: |
|
576 | if self._ssh: | |
588 | return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) |
|
577 | return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) | |
589 | else: |
|
578 | else: | |
@@ -600,38 +589,28 b' class Client(HasTraits):' | |||||
600 | idents,msg = self.session.recv(self._query_socket,mode=0) |
|
589 | idents,msg = self.session.recv(self._query_socket,mode=0) | |
601 | if self.debug: |
|
590 | if self.debug: | |
602 | pprint(msg) |
|
591 | pprint(msg) | |
603 | msg = Message(msg) |
|
592 | content = msg['content'] | |
604 | content = msg.content |
|
593 | # self._config['registration'] = dict(content) | |
605 | self._config['registration'] = dict(content) |
|
594 | cfg = self._config | |
606 |
if content |
|
595 | if content['status'] == 'ok': | |
607 | ident = self.session.bsession |
|
596 | self._mux_socket = self._context.socket(zmq.DEALER) | |
608 | if content.mux: |
|
597 | connect_socket(self._mux_socket, cfg['mux']) | |
609 | self._mux_socket = self._context.socket(zmq.DEALER) |
|
598 | ||
610 |
|
|
599 | self._task_socket = self._context.socket(zmq.DEALER) | |
611 |
|
|
600 | connect_socket(self._task_socket, cfg['task']) | |
612 | if content.task: |
|
601 | ||
613 | self._task_scheme, task_addr = content.task |
|
602 | self._notification_socket = self._context.socket(zmq.SUB) | |
614 |
|
|
603 | self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') | |
615 | self._task_socket.setsockopt(zmq.IDENTITY, ident) |
|
604 | connect_socket(self._notification_socket, cfg['notification']) | |
616 | connect_socket(self._task_socket, task_addr) |
|
605 | ||
617 | if content.notification: |
|
606 | self._control_socket = self._context.socket(zmq.DEALER) | |
618 | self._notification_socket = self._context.socket(zmq.SUB) |
|
607 | connect_socket(self._control_socket, cfg['control']) | |
619 | connect_socket(self._notification_socket, content.notification) |
|
608 | ||
620 |
|
|
609 | self._iopub_socket = self._context.socket(zmq.SUB) | |
621 | # if content.query: |
|
610 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') | |
622 | # self._query_socket = self._context.socket(zmq.DEALER) |
|
611 | connect_socket(self._iopub_socket, cfg['iopub']) | |
623 | # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) |
|
612 | ||
624 | # connect_socket(self._query_socket, content.query) |
|
613 | self._update_engines(dict(content['engines'])) | |
625 | if content.control: |
|
|||
626 | self._control_socket = self._context.socket(zmq.DEALER) |
|
|||
627 | self._control_socket.setsockopt(zmq.IDENTITY, ident) |
|
|||
628 | connect_socket(self._control_socket, content.control) |
|
|||
629 | if content.iopub: |
|
|||
630 | self._iopub_socket = self._context.socket(zmq.SUB) |
|
|||
631 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') |
|
|||
632 | self._iopub_socket.setsockopt(zmq.IDENTITY, ident) |
|
|||
633 | connect_socket(self._iopub_socket, content.iopub) |
|
|||
634 | self._update_engines(dict(content.engines)) |
|
|||
635 | else: |
|
614 | else: | |
636 | self._connected = False |
|
615 | self._connected = False | |
637 | raise Exception("Failed to connect!") |
|
616 | raise Exception("Failed to connect!") | |
@@ -674,7 +653,7 b' class Client(HasTraits):' | |||||
674 | """Register a new engine, and update our connection info.""" |
|
653 | """Register a new engine, and update our connection info.""" | |
675 | content = msg['content'] |
|
654 | content = msg['content'] | |
676 | eid = content['id'] |
|
655 | eid = content['id'] | |
677 |
d = {eid : content[' |
|
656 | d = {eid : content['uuid']} | |
678 | self._update_engines(d) |
|
657 | self._update_engines(d) | |
679 |
|
658 | |||
680 | def _unregister_engine(self, msg): |
|
659 | def _unregister_engine(self, msg): |
@@ -18,6 +18,8 b' Authors:' | |||||
18 | #----------------------------------------------------------------------------- |
|
18 | #----------------------------------------------------------------------------- | |
19 | from __future__ import print_function |
|
19 | from __future__ import print_function | |
20 |
|
20 | |||
|
21 | import json | |||
|
22 | import os | |||
21 | import sys |
|
23 | import sys | |
22 | import time |
|
24 | import time | |
23 | from datetime import datetime |
|
25 | from datetime import datetime | |
@@ -107,17 +109,16 b' class EngineConnector(HasTraits):' | |||||
107 | """A simple object for accessing the various zmq connections of an object. |
|
109 | """A simple object for accessing the various zmq connections of an object. | |
108 | Attributes are: |
|
110 | Attributes are: | |
109 | id (int): engine ID |
|
111 | id (int): engine ID | |
110 | uuid (str): uuid (unused?) |
|
112 | uuid (unicode): engine UUID | |
111 | queue (str): identity of queue's DEALER socket |
|
113 | pending: set of msg_ids | |
112 | registration (str): identity of registration DEALER socket |
|
114 | stallback: DelayedCallback for stalled registration | |
113 | heartbeat (str): identity of heartbeat DEALER socket |
|
|||
114 | """ |
|
115 | """ | |
115 | id=Integer(0) |
|
116 | ||
116 | queue=CBytes() |
|
117 | id = Integer(0) | |
117 | control=CBytes() |
|
118 | uuid = Unicode() | |
118 | registration=CBytes() |
|
119 | pending = Set() | |
119 | heartbeat=CBytes() |
|
120 | stallback = Instance(ioloop.DelayedCallback) | |
120 | pending=Set() |
|
121 | ||
121 |
|
122 | |||
122 | _db_shortcuts = { |
|
123 | _db_shortcuts = { | |
123 | 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB', |
|
124 | 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB', | |
@@ -131,29 +132,29 b' class HubFactory(RegistrationFactory):' | |||||
131 |
|
132 | |||
132 | # port-pairs for monitoredqueues: |
|
133 | # port-pairs for monitoredqueues: | |
133 | hb = Tuple(Integer,Integer,config=True, |
|
134 | hb = Tuple(Integer,Integer,config=True, | |
134 |
help=""" |
|
135 | help="""PUB/ROUTER Port pair for Engine heartbeats""") | |
135 | def _hb_default(self): |
|
136 | def _hb_default(self): | |
136 | return tuple(util.select_random_ports(2)) |
|
137 | return tuple(util.select_random_ports(2)) | |
137 |
|
138 | |||
138 | mux = Tuple(Integer,Integer,config=True, |
|
139 | mux = Tuple(Integer,Integer,config=True, | |
139 |
help=""" |
|
140 | help="""Client/Engine Port pair for MUX queue""") | |
140 |
|
141 | |||
141 | def _mux_default(self): |
|
142 | def _mux_default(self): | |
142 | return tuple(util.select_random_ports(2)) |
|
143 | return tuple(util.select_random_ports(2)) | |
143 |
|
144 | |||
144 | task = Tuple(Integer,Integer,config=True, |
|
145 | task = Tuple(Integer,Integer,config=True, | |
145 |
help=""" |
|
146 | help="""Client/Engine Port pair for Task queue""") | |
146 | def _task_default(self): |
|
147 | def _task_default(self): | |
147 | return tuple(util.select_random_ports(2)) |
|
148 | return tuple(util.select_random_ports(2)) | |
148 |
|
149 | |||
149 | control = Tuple(Integer,Integer,config=True, |
|
150 | control = Tuple(Integer,Integer,config=True, | |
150 |
help=""" |
|
151 | help="""Client/Engine Port pair for Control queue""") | |
151 |
|
152 | |||
152 | def _control_default(self): |
|
153 | def _control_default(self): | |
153 | return tuple(util.select_random_ports(2)) |
|
154 | return tuple(util.select_random_ports(2)) | |
154 |
|
155 | |||
155 | iopub = Tuple(Integer,Integer,config=True, |
|
156 | iopub = Tuple(Integer,Integer,config=True, | |
156 |
help=""" |
|
157 | help="""Client/Engine Port pair for IOPub relay""") | |
157 |
|
158 | |||
158 | def _iopub_default(self): |
|
159 | def _iopub_default(self): | |
159 | return tuple(util.select_random_ports(2)) |
|
160 | return tuple(util.select_random_ports(2)) | |
@@ -231,38 +232,77 b' class HubFactory(RegistrationFactory):' | |||||
231 | self.heartmonitor.start() |
|
232 | self.heartmonitor.start() | |
232 | self.log.info("Heartmonitor started") |
|
233 | self.log.info("Heartmonitor started") | |
233 |
|
234 | |||
|
235 | def client_url(self, channel): | |||
|
236 | """return full zmq url for a named client channel""" | |||
|
237 | return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel]) | |||
|
238 | ||||
|
239 | def engine_url(self, channel): | |||
|
240 | """return full zmq url for a named engine channel""" | |||
|
241 | return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel]) | |||
|
242 | ||||
234 | def init_hub(self): |
|
243 | def init_hub(self): | |
235 | """construct""" |
|
244 | """construct Hub object""" | |
236 | client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i" |
|
|||
237 | engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i" |
|
|||
238 |
|
245 | |||
239 | ctx = self.context |
|
246 | ctx = self.context | |
240 | loop = self.loop |
|
247 | loop = self.loop | |
241 |
|
248 | |||
|
249 | try: | |||
|
250 | scheme = self.config.TaskScheduler.scheme_name | |||
|
251 | except AttributeError: | |||
|
252 | from .scheduler import TaskScheduler | |||
|
253 | scheme = TaskScheduler.scheme_name.get_default_value() | |||
|
254 | ||||
|
255 | # build connection dicts | |||
|
256 | engine = self.engine_info = { | |||
|
257 | 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip), | |||
|
258 | 'registration' : self.regport, | |||
|
259 | 'control' : self.control[1], | |||
|
260 | 'mux' : self.mux[1], | |||
|
261 | 'hb_ping' : self.hb[0], | |||
|
262 | 'hb_pong' : self.hb[1], | |||
|
263 | 'task' : self.task[1], | |||
|
264 | 'iopub' : self.iopub[1], | |||
|
265 | } | |||
|
266 | ||||
|
267 | client = self.client_info = { | |||
|
268 | 'interface' : "%s://%s" % (self.client_transport, self.client_ip), | |||
|
269 | 'registration' : self.regport, | |||
|
270 | 'control' : self.control[0], | |||
|
271 | 'mux' : self.mux[0], | |||
|
272 | 'task' : self.task[0], | |||
|
273 | 'task_scheme' : scheme, | |||
|
274 | 'iopub' : self.iopub[0], | |||
|
275 | 'notification' : self.notifier_port, | |||
|
276 | } | |||
|
277 | ||||
|
278 | self.log.debug("Hub engine addrs: %s", self.engine_info) | |||
|
279 | self.log.debug("Hub client addrs: %s", self.client_info) | |||
|
280 | ||||
242 | # Registrar socket |
|
281 | # Registrar socket | |
243 | q = ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
282 | q = ZMQStream(ctx.socket(zmq.ROUTER), loop) | |
244 |
q.bind( |
|
283 | q.bind(self.client_url('registration')) | |
245 |
self.log.info("Hub listening on %s for registration.", |
|
284 | self.log.info("Hub listening on %s for registration.", self.client_url('registration')) | |
246 | if self.client_ip != self.engine_ip: |
|
285 | if self.client_ip != self.engine_ip: | |
247 |
q.bind( |
|
286 | q.bind(self.engine_url('registration')) | |
248 |
self.log.info("Hub listening on %s for registration.", |
|
287 | self.log.info("Hub listening on %s for registration.", self.engine_url('registration')) | |
249 |
|
288 | |||
250 | ### Engine connections ### |
|
289 | ### Engine connections ### | |
251 |
|
290 | |||
252 | # heartbeat |
|
291 | # heartbeat | |
253 | hpub = ctx.socket(zmq.PUB) |
|
292 | hpub = ctx.socket(zmq.PUB) | |
254 |
hpub.bind( |
|
293 | hpub.bind(self.engine_url('hb_ping')) | |
255 | hrep = ctx.socket(zmq.ROUTER) |
|
294 | hrep = ctx.socket(zmq.ROUTER) | |
256 |
hrep.bind( |
|
295 | hrep.bind(self.engine_url('hb_pong')) | |
257 | self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, |
|
296 | self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, | |
258 | pingstream=ZMQStream(hpub,loop), |
|
297 | pingstream=ZMQStream(hpub,loop), | |
259 | pongstream=ZMQStream(hrep,loop) |
|
298 | pongstream=ZMQStream(hrep,loop) | |
260 | ) |
|
299 | ) | |
261 |
|
300 | |||
262 | ### Client connections ### |
|
301 | ### Client connections ### | |
|
302 | ||||
263 | # Notifier socket |
|
303 | # Notifier socket | |
264 | n = ZMQStream(ctx.socket(zmq.PUB), loop) |
|
304 | n = ZMQStream(ctx.socket(zmq.PUB), loop) | |
265 |
n.bind( |
|
305 | n.bind(self.client_url('notification')) | |
266 |
|
306 | |||
267 | ### build and launch the queues ### |
|
307 | ### build and launch the queues ### | |
268 |
|
308 | |||
@@ -279,35 +319,10 b' class HubFactory(RegistrationFactory):' | |||||
279 | self.db = import_item(str(db_class))(session=self.session.session, |
|
319 | self.db = import_item(str(db_class))(session=self.session.session, | |
280 | config=self.config, log=self.log) |
|
320 | config=self.config, log=self.log) | |
281 | time.sleep(.25) |
|
321 | time.sleep(.25) | |
282 | try: |
|
|||
283 | scheme = self.config.TaskScheduler.scheme_name |
|
|||
284 | except AttributeError: |
|
|||
285 | from .scheduler import TaskScheduler |
|
|||
286 | scheme = TaskScheduler.scheme_name.get_default_value() |
|
|||
287 | # build connection dicts |
|
|||
288 | self.engine_info = { |
|
|||
289 | 'control' : engine_iface%self.control[1], |
|
|||
290 | 'mux': engine_iface%self.mux[1], |
|
|||
291 | 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]), |
|
|||
292 | 'task' : engine_iface%self.task[1], |
|
|||
293 | 'iopub' : engine_iface%self.iopub[1], |
|
|||
294 | # 'monitor' : engine_iface%self.mon_port, |
|
|||
295 | } |
|
|||
296 |
|
||||
297 | self.client_info = { |
|
|||
298 | 'control' : client_iface%self.control[0], |
|
|||
299 | 'mux': client_iface%self.mux[0], |
|
|||
300 | 'task' : (scheme, client_iface%self.task[0]), |
|
|||
301 | 'iopub' : client_iface%self.iopub[0], |
|
|||
302 | 'notification': client_iface%self.notifier_port |
|
|||
303 | } |
|
|||
304 | self.log.debug("Hub engine addrs: %s", self.engine_info) |
|
|||
305 | self.log.debug("Hub client addrs: %s", self.client_info) |
|
|||
306 |
|
322 | |||
307 | # resubmit stream |
|
323 | # resubmit stream | |
308 | r = ZMQStream(ctx.socket(zmq.DEALER), loop) |
|
324 | r = ZMQStream(ctx.socket(zmq.DEALER), loop) | |
309 |
url = util.disambiguate_url(self.client_ |
|
325 | url = util.disambiguate_url(self.client_url('task')) | |
310 | r.setsockopt(zmq.IDENTITY, self.session.bsession) |
|
|||
311 | r.connect(url) |
|
326 | r.connect(url) | |
312 |
|
327 | |||
313 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
|
328 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, | |
@@ -335,6 +350,9 b' class Hub(SessionFactory):' | |||||
335 | client_info: dict of zmq connection information for engines to connect |
|
350 | client_info: dict of zmq connection information for engines to connect | |
336 | to the queues. |
|
351 | to the queues. | |
337 | """ |
|
352 | """ | |
|
353 | ||||
|
354 | engine_state_file = Unicode() | |||
|
355 | ||||
338 | # internal data structures: |
|
356 | # internal data structures: | |
339 | ids=Set() # engine IDs |
|
357 | ids=Set() # engine IDs | |
340 | keytable=Dict() |
|
358 | keytable=Dict() | |
@@ -382,15 +400,6 b' class Hub(SessionFactory):' | |||||
382 | super(Hub, self).__init__(**kwargs) |
|
400 | super(Hub, self).__init__(**kwargs) | |
383 | self.registration_timeout = max(5000, 2*self.heartmonitor.period) |
|
401 | self.registration_timeout = max(5000, 2*self.heartmonitor.period) | |
384 |
|
402 | |||
385 | # validate connection dicts: |
|
|||
386 | for k,v in self.client_info.iteritems(): |
|
|||
387 | if k == 'task': |
|
|||
388 | util.validate_url_container(v[1]) |
|
|||
389 | else: |
|
|||
390 | util.validate_url_container(v) |
|
|||
391 | # util.validate_url_container(self.client_info) |
|
|||
392 | util.validate_url_container(self.engine_info) |
|
|||
393 |
|
||||
394 | # register our callbacks |
|
403 | # register our callbacks | |
395 | self.query.on_recv(self.dispatch_query) |
|
404 | self.query.on_recv(self.dispatch_query) | |
396 | self.monitor.on_recv(self.dispatch_monitor_traffic) |
|
405 | self.monitor.on_recv(self.dispatch_monitor_traffic) | |
@@ -425,7 +434,7 b' class Hub(SessionFactory):' | |||||
425 | self.resubmit.on_recv(lambda msg: None, copy=False) |
|
434 | self.resubmit.on_recv(lambda msg: None, copy=False) | |
426 |
|
435 | |||
427 | self.log.info("hub::created hub") |
|
436 | self.log.info("hub::created hub") | |
428 |
|
437 | |||
429 | @property |
|
438 | @property | |
430 | def _next_id(self): |
|
439 | def _next_id(self): | |
431 | """gemerate a new ID. |
|
440 | """gemerate a new ID. | |
@@ -440,7 +449,7 b' class Hub(SessionFactory):' | |||||
440 | # while newid in self.ids or newid in incoming: |
|
449 | # while newid in self.ids or newid in incoming: | |
441 | # newid += 1 |
|
450 | # newid += 1 | |
442 | # return newid |
|
451 | # return newid | |
443 |
|
452 | |||
444 | #----------------------------------------------------------------------------- |
|
453 | #----------------------------------------------------------------------------- | |
445 | # message validation |
|
454 | # message validation | |
446 | #----------------------------------------------------------------------------- |
|
455 | #----------------------------------------------------------------------------- | |
@@ -556,11 +565,11 b' class Hub(SessionFactory):' | |||||
556 | triggers unregistration""" |
|
565 | triggers unregistration""" | |
557 | self.log.debug("heartbeat::handle_heart_failure(%r)", heart) |
|
566 | self.log.debug("heartbeat::handle_heart_failure(%r)", heart) | |
558 | eid = self.hearts.get(heart, None) |
|
567 | eid = self.hearts.get(heart, None) | |
559 |
|
|
568 | uuid = self.engines[eid].uuid | |
560 | if eid is None or self.keytable[eid] in self.dead_engines: |
|
569 | if eid is None or self.keytable[eid] in self.dead_engines: | |
561 | self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart) |
|
570 | self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart) | |
562 | else: |
|
571 | else: | |
563 |
self.unregister_engine(heart, dict(content=dict(id=eid, queue= |
|
572 | self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid))) | |
564 |
|
573 | |||
565 | #----------------------- MUX Queue Traffic ------------------------------ |
|
574 | #----------------------- MUX Queue Traffic ------------------------------ | |
566 |
|
575 | |||
@@ -585,7 +594,7 b' class Hub(SessionFactory):' | |||||
585 | self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid) |
|
594 | self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid) | |
586 | # Unicode in records |
|
595 | # Unicode in records | |
587 | record['engine_uuid'] = queue_id.decode('ascii') |
|
596 | record['engine_uuid'] = queue_id.decode('ascii') | |
588 |
record['client_uuid'] = |
|
597 | record['client_uuid'] = msg['header']['session'] | |
589 | record['queue'] = 'mux' |
|
598 | record['queue'] = 'mux' | |
590 |
|
599 | |||
591 | try: |
|
600 | try: | |
@@ -677,7 +686,7 b' class Hub(SessionFactory):' | |||||
677 | return |
|
686 | return | |
678 | record = init_record(msg) |
|
687 | record = init_record(msg) | |
679 |
|
688 | |||
680 |
record['client_uuid'] = |
|
689 | record['client_uuid'] = msg['header']['session'] | |
681 | record['queue'] = 'task' |
|
690 | record['queue'] = 'task' | |
682 | header = msg['header'] |
|
691 | header = msg['header'] | |
683 | msg_id = header['msg_id'] |
|
692 | msg_id = header['msg_id'] | |
@@ -865,11 +874,10 b' class Hub(SessionFactory):' | |||||
865 | """Reply with connection addresses for clients.""" |
|
874 | """Reply with connection addresses for clients.""" | |
866 | self.log.info("client::client %r connected", client_id) |
|
875 | self.log.info("client::client %r connected", client_id) | |
867 | content = dict(status='ok') |
|
876 | content = dict(status='ok') | |
868 | content.update(self.client_info) |
|
|||
869 | jsonable = {} |
|
877 | jsonable = {} | |
870 | for k,v in self.keytable.iteritems(): |
|
878 | for k,v in self.keytable.iteritems(): | |
871 | if v not in self.dead_engines: |
|
879 | if v not in self.dead_engines: | |
872 |
jsonable[str(k)] = v |
|
880 | jsonable[str(k)] = v | |
873 | content['engines'] = jsonable |
|
881 | content['engines'] = jsonable | |
874 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) |
|
882 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) | |
875 |
|
883 | |||
@@ -877,48 +885,37 b' class Hub(SessionFactory):' | |||||
877 | """Register a new engine.""" |
|
885 | """Register a new engine.""" | |
878 | content = msg['content'] |
|
886 | content = msg['content'] | |
879 | try: |
|
887 | try: | |
880 |
|
|
888 | uuid = content['uuid'] | |
881 | except KeyError: |
|
889 | except KeyError: | |
882 | self.log.error("registration::queue not specified", exc_info=True) |
|
890 | self.log.error("registration::queue not specified", exc_info=True) | |
883 | return |
|
891 | return | |
884 | heart = content.get('heartbeat', None) |
|
892 | ||
885 | if heart: |
|
|||
886 | heart = cast_bytes(heart) |
|
|||
887 | """register a new engine, and create the socket(s) necessary""" |
|
|||
888 | eid = self._next_id |
|
893 | eid = self._next_id | |
889 | # print (eid, queue, reg, heart) |
|
|||
890 |
|
894 | |||
891 |
self.log.debug("registration::register_engine(%i, %r |
|
895 | self.log.debug("registration::register_engine(%i, %r)", eid, uuid) | |
892 |
|
896 | |||
893 | content = dict(id=eid,status='ok') |
|
897 | content = dict(id=eid,status='ok') | |
894 | content.update(self.engine_info) |
|
|||
895 | # check if requesting available IDs: |
|
898 | # check if requesting available IDs: | |
896 |
if |
|
899 | if cast_bytes(uuid) in self.by_ident: | |
897 | try: |
|
|||
898 | raise KeyError("queue_id %r in use" % queue) |
|
|||
899 | except: |
|
|||
900 | content = error.wrap_exception() |
|
|||
901 | self.log.error("queue_id %r in use", queue, exc_info=True) |
|
|||
902 | elif heart in self.hearts: # need to check unique hearts? |
|
|||
903 | try: |
|
900 | try: | |
904 |
raise KeyError(" |
|
901 | raise KeyError("uuid %r in use" % uuid) | |
905 | except: |
|
902 | except: | |
906 | self.log.error("heart_id %r in use", heart, exc_info=True) |
|
|||
907 | content = error.wrap_exception() |
|
903 | content = error.wrap_exception() | |
|
904 | self.log.error("uuid %r in use", uuid, exc_info=True) | |||
908 | else: |
|
905 | else: | |
909 |
for h, |
|
906 | for h, ec in self.incoming_registrations.iteritems(): | |
910 |
if |
|
907 | if uuid == h: | |
911 | try: |
|
908 | try: | |
912 |
raise KeyError("heart_id %r in use" % |
|
909 | raise KeyError("heart_id %r in use" % uuid) | |
913 | except: |
|
910 | except: | |
914 |
self.log.error("heart_id %r in use", |
|
911 | self.log.error("heart_id %r in use", uuid, exc_info=True) | |
915 | content = error.wrap_exception() |
|
912 | content = error.wrap_exception() | |
916 | break |
|
913 | break | |
917 |
elif |
|
914 | elif uuid == ec.uuid: | |
918 | try: |
|
915 | try: | |
919 |
raise KeyError(" |
|
916 | raise KeyError("uuid %r in use" % uuid) | |
920 | except: |
|
917 | except: | |
921 |
self.log.error(" |
|
918 | self.log.error("uuid %r in use", uuid, exc_info=True) | |
922 | content = error.wrap_exception() |
|
919 | content = error.wrap_exception() | |
923 | break |
|
920 | break | |
924 |
|
921 | |||
@@ -926,18 +923,21 b' class Hub(SessionFactory):' | |||||
926 | content=content, |
|
923 | content=content, | |
927 | ident=reg) |
|
924 | ident=reg) | |
928 |
|
925 | |||
|
926 | heart = cast_bytes(uuid) | |||
|
927 | ||||
929 | if content['status'] == 'ok': |
|
928 | if content['status'] == 'ok': | |
930 | if heart in self.heartmonitor.hearts: |
|
929 | if heart in self.heartmonitor.hearts: | |
931 | # already beating |
|
930 | # already beating | |
932 |
self.incoming_registrations[heart] = (eid, |
|
931 | self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid) | |
933 | self.finish_registration(heart) |
|
932 | self.finish_registration(heart) | |
934 | else: |
|
933 | else: | |
935 | purge = lambda : self._purge_stalled_registration(heart) |
|
934 | purge = lambda : self._purge_stalled_registration(heart) | |
936 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) |
|
935 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) | |
937 | dc.start() |
|
936 | dc.start() | |
938 |
self.incoming_registrations[heart] = (eid, |
|
937 | self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc) | |
939 | else: |
|
938 | else: | |
940 | self.log.error("registration::registration %i failed: %r", eid, content['evalue']) |
|
939 | self.log.error("registration::registration %i failed: %r", eid, content['evalue']) | |
|
940 | ||||
941 | return eid |
|
941 | return eid | |
942 |
|
942 | |||
943 | def unregister_engine(self, ident, msg): |
|
943 | def unregister_engine(self, ident, msg): | |
@@ -950,7 +950,7 b' class Hub(SessionFactory):' | |||||
950 | self.log.info("registration::unregister_engine(%r)", eid) |
|
950 | self.log.info("registration::unregister_engine(%r)", eid) | |
951 | # print (eid) |
|
951 | # print (eid) | |
952 | uuid = self.keytable[eid] |
|
952 | uuid = self.keytable[eid] | |
953 |
content=dict(id=eid, |
|
953 | content=dict(id=eid, uuid=uuid) | |
954 | self.dead_engines.add(uuid) |
|
954 | self.dead_engines.add(uuid) | |
955 | # self.ids.remove(eid) |
|
955 | # self.ids.remove(eid) | |
956 | # uuid = self.keytable.pop(eid) |
|
956 | # uuid = self.keytable.pop(eid) | |
@@ -963,6 +963,8 b' class Hub(SessionFactory):' | |||||
963 | dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) |
|
963 | dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) | |
964 | dc.start() |
|
964 | dc.start() | |
965 | ############## TODO: HANDLE IT ################ |
|
965 | ############## TODO: HANDLE IT ################ | |
|
966 | ||||
|
967 | self._save_engine_state() | |||
966 |
|
968 | |||
967 | if self.notifier: |
|
969 | if self.notifier: | |
968 | self.session.send(self.notifier, "unregistration_notification", content=content) |
|
970 | self.session.send(self.notifier, "unregistration_notification", content=content) | |
@@ -1001,36 +1003,97 b' class Hub(SessionFactory):' | |||||
1001 | """Second half of engine registration, called after our HeartMonitor |
|
1003 | """Second half of engine registration, called after our HeartMonitor | |
1002 | has received a beat from the Engine's Heart.""" |
|
1004 | has received a beat from the Engine's Heart.""" | |
1003 | try: |
|
1005 | try: | |
1004 |
|
|
1006 | ec = self.incoming_registrations.pop(heart) | |
1005 | except KeyError: |
|
1007 | except KeyError: | |
1006 | self.log.error("registration::tried to finish nonexistant registration", exc_info=True) |
|
1008 | self.log.error("registration::tried to finish nonexistant registration", exc_info=True) | |
1007 | return |
|
1009 | return | |
1008 |
self.log.info("registration::finished registering engine %i:% |
|
1010 | self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid) | |
1009 |
if |
|
1011 | if ec.stallback is not None: | |
1010 |
|
|
1012 | ec.stallback.stop() | |
1011 | control = queue |
|
1013 | eid = ec.id | |
1012 | self.ids.add(eid) |
|
1014 | self.ids.add(eid) | |
1013 |
self.keytable[eid] = |
|
1015 | self.keytable[eid] = ec.uuid | |
1014 | self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg, |
|
1016 | self.engines[eid] = ec | |
1015 | control=control, heartbeat=heart) |
|
1017 | self.by_ident[cast_bytes(ec.uuid)] = ec.id | |
1016 | self.by_ident[queue] = eid |
|
|||
1017 | self.queues[eid] = list() |
|
1018 | self.queues[eid] = list() | |
1018 | self.tasks[eid] = list() |
|
1019 | self.tasks[eid] = list() | |
1019 | self.completed[eid] = list() |
|
1020 | self.completed[eid] = list() | |
1020 | self.hearts[heart] = eid |
|
1021 | self.hearts[heart] = eid | |
1021 |
content = dict(id=eid, |
|
1022 | content = dict(id=eid, uuid=self.engines[eid].uuid) | |
1022 | if self.notifier: |
|
1023 | if self.notifier: | |
1023 | self.session.send(self.notifier, "registration_notification", content=content) |
|
1024 | self.session.send(self.notifier, "registration_notification", content=content) | |
1024 | self.log.info("engine::Engine Connected: %i", eid) |
|
1025 | self.log.info("engine::Engine Connected: %i", eid) | |
|
1026 | ||||
|
1027 | self._save_engine_state() | |||
1025 |
|
1028 | |||
1026 | def _purge_stalled_registration(self, heart): |
|
1029 | def _purge_stalled_registration(self, heart): | |
1027 | if heart in self.incoming_registrations: |
|
1030 | if heart in self.incoming_registrations: | |
1028 |
e |
|
1031 | ec = self.incoming_registrations.pop(heart) | |
1029 | self.log.info("registration::purging stalled registration: %i", eid) |
|
1032 | self.log.info("registration::purging stalled registration: %i", ec.id) | |
1030 | else: |
|
1033 | else: | |
1031 | pass |
|
1034 | pass | |
1032 |
|
1035 | |||
1033 | #------------------------------------------------------------------------- |
|
1036 | #------------------------------------------------------------------------- | |
|
1037 | # Engine State | |||
|
1038 | #------------------------------------------------------------------------- | |||
|
1039 | ||||
|
1040 | ||||
|
1041 | def _cleanup_engine_state_file(self): | |||
|
1042 | """cleanup engine state mapping""" | |||
|
1043 | ||||
|
1044 | if os.path.exists(self.engine_state_file): | |||
|
1045 | self.log.debug("cleaning up engine state: %s", self.engine_state_file) | |||
|
1046 | try: | |||
|
1047 | os.remove(self.engine_state_file) | |||
|
1048 | except IOError: | |||
|
1049 | self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True) | |||
|
1050 | ||||
|
1051 | ||||
|
1052 | def _save_engine_state(self): | |||
|
1053 | """save engine mapping to JSON file""" | |||
|
1054 | if not self.engine_state_file: | |||
|
1055 | return | |||
|
1056 | self.log.debug("save engine state to %s" % self.engine_state_file) | |||
|
1057 | state = {} | |||
|
1058 | engines = {} | |||
|
1059 | for eid, ec in self.engines.iteritems(): | |||
|
1060 | if ec.uuid not in self.dead_engines: | |||
|
1061 | engines[eid] = ec.uuid | |||
|
1062 | ||||
|
1063 | state['engines'] = engines | |||
|
1064 | ||||
|
1065 | state['next_id'] = self._idcounter | |||
|
1066 | ||||
|
1067 | with open(self.engine_state_file, 'w') as f: | |||
|
1068 | json.dump(state, f) | |||
|
1069 | ||||
|
1070 | ||||
|
1071 | def _load_engine_state(self): | |||
|
1072 | """load engine mapping from JSON file""" | |||
|
1073 | if not os.path.exists(self.engine_state_file): | |||
|
1074 | return | |||
|
1075 | ||||
|
1076 | self.log.info("loading engine state from %s" % self.engine_state_file) | |||
|
1077 | ||||
|
1078 | with open(self.engine_state_file) as f: | |||
|
1079 | state = json.load(f) | |||
|
1080 | ||||
|
1081 | save_notifier = self.notifier | |||
|
1082 | self.notifier = None | |||
|
1083 | for eid, uuid in state['engines'].iteritems(): | |||
|
1084 | heart = uuid.encode('ascii') | |||
|
1085 | # start with this heart as current and beating: | |||
|
1086 | self.heartmonitor.responses.add(heart) | |||
|
1087 | self.heartmonitor.hearts.add(heart) | |||
|
1088 | ||||
|
1089 | self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid) | |||
|
1090 | self.finish_registration(heart) | |||
|
1091 | ||||
|
1092 | self.notifier = save_notifier | |||
|
1093 | ||||
|
1094 | self._idcounter = state['next_id'] | |||
|
1095 | ||||
|
1096 | #------------------------------------------------------------------------- | |||
1034 | # Client Requests |
|
1097 | # Client Requests | |
1035 | #------------------------------------------------------------------------- |
|
1098 | #------------------------------------------------------------------------- | |
1036 |
|
1099 | |||
@@ -1131,7 +1194,7 b' class Hub(SessionFactory):' | |||||
1131 | except: |
|
1194 | except: | |
1132 | reply = error.wrap_exception() |
|
1195 | reply = error.wrap_exception() | |
1133 | break |
|
1196 | break | |
1134 |
uid = self.engines[eid]. |
|
1197 | uid = self.engines[eid].uuid | |
1135 | try: |
|
1198 | try: | |
1136 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
|
1199 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) | |
1137 | except Exception: |
|
1200 | except Exception: | |
@@ -1205,6 +1268,7 b' class Hub(SessionFactory):' | |||||
1205 | self.db.add_record(msg_id, init_record(msg)) |
|
1268 | self.db.add_record(msg_id, init_record(msg)) | |
1206 | except Exception: |
|
1269 | except Exception: | |
1207 | self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) |
|
1270 | self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) | |
|
1271 | return finish(error.wrap_exception()) | |||
1208 |
|
1272 | |||
1209 | finish(dict(status='ok', resubmitted=resubmitted)) |
|
1273 | finish(dict(status='ok', resubmitted=resubmitted)) | |
1210 |
|
1274 |
@@ -189,6 +189,7 b' class TaskScheduler(SessionFactory):' | |||||
189 | engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream |
|
189 | engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream | |
190 | notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream |
|
190 | notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream | |
191 | mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream |
|
191 | mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream | |
|
192 | query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream | |||
192 |
|
193 | |||
193 | # internals: |
|
194 | # internals: | |
194 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
|
195 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] | |
@@ -216,6 +217,9 b' class TaskScheduler(SessionFactory):' | |||||
216 | return self.session.bsession |
|
217 | return self.session.bsession | |
217 |
|
218 | |||
218 | def start(self): |
|
219 | def start(self): | |
|
220 | self.query_stream.on_recv(self.dispatch_query_reply) | |||
|
221 | self.session.send(self.query_stream, "connection_request", {}) | |||
|
222 | ||||
219 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
|
223 | self.engine_stream.on_recv(self.dispatch_result, copy=False) | |
220 | self.client_stream.on_recv(self.dispatch_submission, copy=False) |
|
224 | self.client_stream.on_recv(self.dispatch_submission, copy=False) | |
221 |
|
225 | |||
@@ -240,6 +244,24 b' class TaskScheduler(SessionFactory):' | |||||
240 | #----------------------------------------------------------------------- |
|
244 | #----------------------------------------------------------------------- | |
241 | # [Un]Registration Handling |
|
245 | # [Un]Registration Handling | |
242 | #----------------------------------------------------------------------- |
|
246 | #----------------------------------------------------------------------- | |
|
247 | ||||
|
248 | ||||
|
249 | def dispatch_query_reply(self, msg): | |||
|
250 | """handle reply to our initial connection request""" | |||
|
251 | try: | |||
|
252 | idents,msg = self.session.feed_identities(msg) | |||
|
253 | except ValueError: | |||
|
254 | self.log.warn("task::Invalid Message: %r",msg) | |||
|
255 | return | |||
|
256 | try: | |||
|
257 | msg = self.session.unserialize(msg) | |||
|
258 | except ValueError: | |||
|
259 | self.log.warn("task::Unauthorized message from: %r"%idents) | |||
|
260 | return | |||
|
261 | ||||
|
262 | content = msg['content'] | |||
|
263 | for uuid in content.get('engines', {}).values(): | |||
|
264 | self._register_engine(cast_bytes(uuid)) | |||
243 |
|
265 | |||
244 |
|
266 | |||
245 | @util.log_errors |
|
267 | @util.log_errors | |
@@ -263,7 +285,7 b' class TaskScheduler(SessionFactory):' | |||||
263 | self.log.error("Unhandled message type: %r"%msg_type) |
|
285 | self.log.error("Unhandled message type: %r"%msg_type) | |
264 | else: |
|
286 | else: | |
265 | try: |
|
287 | try: | |
266 |
handler(cast_bytes(msg['content'][' |
|
288 | handler(cast_bytes(msg['content']['uuid'])) | |
267 | except Exception: |
|
289 | except Exception: | |
268 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) |
|
290 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) | |
269 |
|
291 | |||
@@ -714,7 +736,7 b' class TaskScheduler(SessionFactory):' | |||||
714 |
|
736 | |||
715 |
|
737 | |||
716 |
|
738 | |||
717 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, |
|
739 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None, | |
718 | logname='root', log_url=None, loglevel=logging.DEBUG, |
|
740 | logname='root', log_url=None, loglevel=logging.DEBUG, | |
719 | identity=b'task', in_thread=False): |
|
741 | identity=b'task', in_thread=False): | |
720 |
|
742 | |||
@@ -734,18 +756,21 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,' | |||||
734 | ctx = zmq.Context() |
|
756 | ctx = zmq.Context() | |
735 | loop = ioloop.IOLoop() |
|
757 | loop = ioloop.IOLoop() | |
736 | ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) |
|
758 | ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) | |
737 | ins.setsockopt(zmq.IDENTITY, identity) |
|
759 | ins.setsockopt(zmq.IDENTITY, identity + b'_in') | |
738 | ins.bind(in_addr) |
|
760 | ins.bind(in_addr) | |
739 |
|
761 | |||
740 | outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) |
|
762 | outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) | |
741 | outs.setsockopt(zmq.IDENTITY, identity) |
|
763 | outs.setsockopt(zmq.IDENTITY, identity + b'_out') | |
742 | outs.bind(out_addr) |
|
764 | outs.bind(out_addr) | |
743 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) |
|
765 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) | |
744 | mons.connect(mon_addr) |
|
766 | mons.connect(mon_addr) | |
745 | nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) |
|
767 | nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) | |
746 | nots.setsockopt(zmq.SUBSCRIBE, b'') |
|
768 | nots.setsockopt(zmq.SUBSCRIBE, b'') | |
747 | nots.connect(not_addr) |
|
769 | nots.connect(not_addr) | |
748 |
|
770 | |||
|
771 | querys = ZMQStream(ctx.socket(zmq.DEALER),loop) | |||
|
772 | querys.connect(reg_addr) | |||
|
773 | ||||
749 | # setup logging. |
|
774 | # setup logging. | |
750 | if in_thread: |
|
775 | if in_thread: | |
751 | log = Application.instance().log |
|
776 | log = Application.instance().log | |
@@ -757,6 +782,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,' | |||||
757 |
|
782 | |||
758 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, |
|
783 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, | |
759 | mon_stream=mons, notifier_stream=nots, |
|
784 | mon_stream=mons, notifier_stream=nots, | |
|
785 | query_stream=querys, | |||
760 | loop=loop, log=log, |
|
786 | loop=loop, log=log, | |
761 | config=config) |
|
787 | config=config) | |
762 | scheduler.start() |
|
788 | scheduler.start() |
@@ -50,7 +50,7 b' class EngineFactory(RegistrationFactory):' | |||||
50 | help="""The location (an IP address) of the controller. This is |
|
50 | help="""The location (an IP address) of the controller. This is | |
51 | used for disambiguating URLs, to determine whether |
|
51 | used for disambiguating URLs, to determine whether | |
52 | loopback should be used to connect or the public address.""") |
|
52 | loopback should be used to connect or the public address.""") | |
53 |
timeout=CFloat( |
|
53 | timeout=CFloat(5, config=True, | |
54 | help="""The time (in seconds) to wait for the Controller to respond |
|
54 | help="""The time (in seconds) to wait for the Controller to respond | |
55 | to registration requests before giving up.""") |
|
55 | to registration requests before giving up.""") | |
56 | sshserver=Unicode(config=True, |
|
56 | sshserver=Unicode(config=True, | |
@@ -61,10 +61,11 b' class EngineFactory(RegistrationFactory):' | |||||
61 | help="""Whether to use paramiko instead of openssh for tunnels.""") |
|
61 | help="""Whether to use paramiko instead of openssh for tunnels.""") | |
62 |
|
62 | |||
63 | # not configurable: |
|
63 | # not configurable: | |
64 |
|
|
64 | connection_info = Dict() | |
65 | id=Integer(allow_none=True) |
|
65 | user_ns = Dict() | |
66 | registrar=Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
66 | id = Integer(allow_none=True) | |
67 | kernel=Instance(Kernel) |
|
67 | registrar = Instance('zmq.eventloop.zmqstream.ZMQStream') | |
|
68 | kernel = Instance(Kernel) | |||
68 |
|
69 | |||
69 | bident = CBytes() |
|
70 | bident = CBytes() | |
70 | ident = Unicode() |
|
71 | ident = Unicode() | |
@@ -96,7 +97,7 b' class EngineFactory(RegistrationFactory):' | |||||
96 | def connect(s, url): |
|
97 | def connect(s, url): | |
97 | url = disambiguate_url(url, self.location) |
|
98 | url = disambiguate_url(url, self.location) | |
98 | if self.using_ssh: |
|
99 | if self.using_ssh: | |
99 |
self.log.debug("Tunneling connection to %s via %s" |
|
100 | self.log.debug("Tunneling connection to %s via %s", url, self.sshserver) | |
100 | return tunnel.tunnel_connection(s, url, self.sshserver, |
|
101 | return tunnel.tunnel_connection(s, url, self.sshserver, | |
101 | keyfile=self.sshkey, paramiko=self.paramiko, |
|
102 | keyfile=self.sshkey, paramiko=self.paramiko, | |
102 | password=password, |
|
103 | password=password, | |
@@ -108,12 +109,12 b' class EngineFactory(RegistrationFactory):' | |||||
108 | """like connect, but don't complete the connection (for use by heartbeat)""" |
|
109 | """like connect, but don't complete the connection (for use by heartbeat)""" | |
109 | url = disambiguate_url(url, self.location) |
|
110 | url = disambiguate_url(url, self.location) | |
110 | if self.using_ssh: |
|
111 | if self.using_ssh: | |
111 |
self.log.debug("Tunneling connection to %s via %s" |
|
112 | self.log.debug("Tunneling connection to %s via %s", url, self.sshserver) | |
112 | url,tunnelobj = tunnel.open_tunnel(url, self.sshserver, |
|
113 | url,tunnelobj = tunnel.open_tunnel(url, self.sshserver, | |
113 | keyfile=self.sshkey, paramiko=self.paramiko, |
|
114 | keyfile=self.sshkey, paramiko=self.paramiko, | |
114 | password=password, |
|
115 | password=password, | |
115 | ) |
|
116 | ) | |
116 | return url |
|
117 | return str(url) | |
117 | return connect, maybe_tunnel |
|
118 | return connect, maybe_tunnel | |
118 |
|
119 | |||
119 | def register(self): |
|
120 | def register(self): | |
@@ -128,10 +129,10 b' class EngineFactory(RegistrationFactory):' | |||||
128 | self.registrar = zmqstream.ZMQStream(reg, self.loop) |
|
129 | self.registrar = zmqstream.ZMQStream(reg, self.loop) | |
129 |
|
130 | |||
130 |
|
131 | |||
131 |
content = dict( |
|
132 | content = dict(uuid=self.ident) | |
132 | self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel)) |
|
133 | self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel)) | |
133 | # print (self.session.key) |
|
134 | # print (self.session.key) | |
134 | self.session.send(self.registrar, "registration_request",content=content) |
|
135 | self.session.send(self.registrar, "registration_request", content=content) | |
135 |
|
136 | |||
136 | def complete_registration(self, msg, connect, maybe_tunnel): |
|
137 | def complete_registration(self, msg, connect, maybe_tunnel): | |
137 | # print msg |
|
138 | # print msg | |
@@ -140,50 +141,43 b' class EngineFactory(RegistrationFactory):' | |||||
140 | loop = self.loop |
|
141 | loop = self.loop | |
141 | identity = self.bident |
|
142 | identity = self.bident | |
142 | idents,msg = self.session.feed_identities(msg) |
|
143 | idents,msg = self.session.feed_identities(msg) | |
143 |
msg = |
|
144 | msg = self.session.unserialize(msg) | |
144 |
|
145 | content = msg['content'] | ||
145 | if msg.content.status == 'ok': |
|
146 | info = self.connection_info | |
146 | self.id = int(msg.content.id) |
|
147 | ||
|
148 | def url(key): | |||
|
149 | """get zmq url for given channel""" | |||
|
150 | return str(info["interface"] + ":%i" % info[key]) | |||
|
151 | ||||
|
152 | if content['status'] == 'ok': | |||
|
153 | self.id = int(content['id']) | |||
147 |
|
154 | |||
148 | # launch heartbeat |
|
155 | # launch heartbeat | |
149 | hb_addrs = msg.content.heartbeat |
|
|||
150 |
|
||||
151 | # possibly forward hb ports with tunnels |
|
156 | # possibly forward hb ports with tunnels | |
152 |
hb_ |
|
157 | hb_ping = maybe_tunnel(url('hb_ping')) | |
153 | heart = Heart(*map(str, hb_addrs), heart_id=identity) |
|
158 | hb_pong = maybe_tunnel(url('hb_pong')) | |
|
159 | ||||
|
160 | heart = Heart(hb_ping, hb_pong, heart_id=identity) | |||
154 | heart.start() |
|
161 | heart.start() | |
155 |
|
162 | |||
156 |
# create Shell |
|
163 | # create Shell Connections (MUX, Task, etc.): | |
157 | queue_addr = msg.content.mux |
|
164 | shell_addrs = url('mux'), url('task') | |
158 | shell_addrs = [ str(queue_addr) ] |
|
165 | ||
159 | task_addr = msg.content.task |
|
166 | # Use only one shell stream for mux and tasks | |
160 | if task_addr: |
|
|||
161 | shell_addrs.append(str(task_addr)) |
|
|||
162 |
|
||||
163 | # Uncomment this to go back to two-socket model |
|
|||
164 | # shell_streams = [] |
|
|||
165 | # for addr in shell_addrs: |
|
|||
166 | # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
|||
167 | # stream.setsockopt(zmq.IDENTITY, identity) |
|
|||
168 | # stream.connect(disambiguate_url(addr, self.location)) |
|
|||
169 | # shell_streams.append(stream) |
|
|||
170 |
|
||||
171 | # Now use only one shell stream for mux and tasks |
|
|||
172 | stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
167 | stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) | |
173 | stream.setsockopt(zmq.IDENTITY, identity) |
|
168 | stream.setsockopt(zmq.IDENTITY, identity) | |
174 | shell_streams = [stream] |
|
169 | shell_streams = [stream] | |
175 | for addr in shell_addrs: |
|
170 | for addr in shell_addrs: | |
176 | connect(stream, addr) |
|
171 | connect(stream, addr) | |
177 | # end single stream-socket |
|
|||
178 |
|
172 | |||
179 | # control stream: |
|
173 | # control stream: | |
180 |
control_addr = |
|
174 | control_addr = url('control') | |
181 | control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
175 | control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) | |
182 | control_stream.setsockopt(zmq.IDENTITY, identity) |
|
176 | control_stream.setsockopt(zmq.IDENTITY, identity) | |
183 | connect(control_stream, control_addr) |
|
177 | connect(control_stream, control_addr) | |
184 |
|
178 | |||
185 | # create iopub stream: |
|
179 | # create iopub stream: | |
186 |
iopub_addr = |
|
180 | iopub_addr = url('iopub') | |
187 | iopub_socket = ctx.socket(zmq.PUB) |
|
181 | iopub_socket = ctx.socket(zmq.PUB) | |
188 | iopub_socket.setsockopt(zmq.IDENTITY, identity) |
|
182 | iopub_socket.setsockopt(zmq.IDENTITY, identity) | |
189 | connect(iopub_socket, iopub_addr) |
|
183 | connect(iopub_socket, iopub_addr) |
@@ -257,9 +257,11 b' class Session(Configurable):' | |||||
257 | if new.lower() == 'json': |
|
257 | if new.lower() == 'json': | |
258 | self.pack = json_packer |
|
258 | self.pack = json_packer | |
259 | self.unpack = json_unpacker |
|
259 | self.unpack = json_unpacker | |
|
260 | self.unpacker = new | |||
260 | elif new.lower() == 'pickle': |
|
261 | elif new.lower() == 'pickle': | |
261 | self.pack = pickle_packer |
|
262 | self.pack = pickle_packer | |
262 | self.unpack = pickle_unpacker |
|
263 | self.unpack = pickle_unpacker | |
|
264 | self.unpacker = new | |||
263 | else: |
|
265 | else: | |
264 | self.pack = import_item(str(new)) |
|
266 | self.pack = import_item(str(new)) | |
265 |
|
267 | |||
@@ -270,9 +272,11 b' class Session(Configurable):' | |||||
270 | if new.lower() == 'json': |
|
272 | if new.lower() == 'json': | |
271 | self.pack = json_packer |
|
273 | self.pack = json_packer | |
272 | self.unpack = json_unpacker |
|
274 | self.unpack = json_unpacker | |
|
275 | self.packer = new | |||
273 | elif new.lower() == 'pickle': |
|
276 | elif new.lower() == 'pickle': | |
274 | self.pack = pickle_packer |
|
277 | self.pack = pickle_packer | |
275 | self.unpack = pickle_unpacker |
|
278 | self.unpack = pickle_unpacker | |
|
279 | self.packer = new | |||
276 | else: |
|
280 | else: | |
277 | self.unpack = import_item(str(new)) |
|
281 | self.unpack = import_item(str(new)) | |
278 |
|
282 |
@@ -43,9 +43,7 b' monitor the survival of the Engine process.' | |||||
43 | Message type: ``registration_request``:: |
|
43 | Message type: ``registration_request``:: | |
44 |
|
44 | |||
45 | content = { |
|
45 | content = { | |
46 |
' |
|
46 | 'uuid' : 'abcd-1234-...', # the zmq.IDENTITY of the engine's sockets | |
47 | 'control' : 'abcd-1234-...', # the control queue zmq.IDENTITY |
|
|||
48 | 'heartbeat' : 'abcd-1234-...' # the heartbeat zmq.IDENTITY |
|
|||
49 | } |
|
47 | } | |
50 |
|
48 | |||
51 | .. note:: |
|
49 | .. note:: | |
@@ -63,10 +61,6 b' Message type: ``registration_reply``::' | |||||
63 | 'status' : 'ok', # or 'error' |
|
61 | 'status' : 'ok', # or 'error' | |
64 | # if ok: |
|
62 | # if ok: | |
65 | 'id' : 0, # int, the engine id |
|
63 | 'id' : 0, # int, the engine id | |
66 | 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue |
|
|||
67 | 'control' : 'tcp://...', # addr for control queue |
|
|||
68 | 'heartbeat' : ('tcp://...','tcp://...'), # tuple containing two interfaces needed for heartbeat |
|
|||
69 | 'task' : 'tcp://...', # addr for task queue, or None if no task queue running |
|
|||
70 | } |
|
64 | } | |
71 |
|
65 | |||
72 | Clients use the same socket as engines to start their connections. Connection requests |
|
66 | Clients use the same socket as engines to start their connections. Connection requests | |
@@ -84,11 +78,6 b' Message type: ``connection_reply``::' | |||||
84 |
|
78 | |||
85 | content = { |
|
79 | content = { | |
86 | 'status' : 'ok', # or 'error' |
|
80 | 'status' : 'ok', # or 'error' | |
87 | # if ok: |
|
|||
88 | 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue |
|
|||
89 | 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple) |
|
|||
90 | 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc. |
|
|||
91 | 'control' : 'tcp...', # addr for control methods, like abort, etc. |
|
|||
92 | } |
|
81 | } | |
93 |
|
82 | |||
94 | Heartbeat |
|
83 | Heartbeat | |
@@ -110,13 +99,14 b' Message type: ``registration_notification``::' | |||||
110 |
|
99 | |||
111 | content = { |
|
100 | content = { | |
112 | 'id' : 0, # engine ID that has been registered |
|
101 | 'id' : 0, # engine ID that has been registered | |
113 |
' |
|
102 | 'uuid' : 'engine_id' # the IDENT for the engine's sockets | |
114 | } |
|
103 | } | |
115 |
|
104 | |||
116 | Message type : ``unregistration_notification``:: |
|
105 | Message type : ``unregistration_notification``:: | |
117 |
|
106 | |||
118 | content = { |
|
107 | content = { | |
119 | 'id' : 0 # engine ID that has been unregistered |
|
108 | 'id' : 0 # engine ID that has been unregistered | |
|
109 | 'uuid' : 'engine_id' # the IDENT for the engine's sockets | |||
120 | } |
|
110 | } | |
121 |
|
111 | |||
122 |
|
112 |
General Comments 0
You need to be logged in to leave comments.
Login now