##// END OF EJS Templates
Work on engine/client reconnect logic.
Brian Granger -
Show More
@@ -17,22 +17,39 b''
17 17 from __future__ import with_statement
18 18 import os
19 19
20 from IPython.kernel.fcutil import Tub, find_furl
20 from IPython.kernel.fcutil import (
21 Tub,
22 find_furl,
23 is_valid_furl_or_file,
24 validate_furl_or_file,
25 FURLError
26 )
21 27 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
22 28 from IPython.kernel.launcher import IPClusterLauncher
23 from IPython.kernel.twistedutil import gatherBoth, make_deferred
24 from IPython.kernel.twistedutil import blockingCallFromThread
25
29 from IPython.kernel.twistedutil import (
30 gatherBoth,
31 make_deferred,
32 blockingCallFromThread,
33 sleep_deferred
34 )
26 35 from IPython.utils.importstring import import_item
27 36 from IPython.utils.genutils import get_ipython_dir
28 37
29 38 from twisted.internet import defer
30 from twisted.python import failure
39 from twisted.internet.defer import inlineCallbacks, returnValue
40 from twisted.python import failure, log
31 41
32 42 #-----------------------------------------------------------------------------
33 43 # The ClientConnector class
34 44 #-----------------------------------------------------------------------------
35 45
46 DELAY = 0.2
47 MAX_TRIES = 9
48
49
50 class ClientConnectorError(Exception):
51 pass
52
36 53
37 54 class AsyncClientConnector(object):
38 55 """A class for getting remote references and clients from furls.
@@ -51,24 +68,24 b' class AsyncClientConnector(object):'
51 68 ipythondir=None):
52 69 """Find a FURL file by profile+ipythondir or cluster dir.
53 70
54 This raises an exception if a FURL file can't be found.
71 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
72 if a FURL file can't be found.
55 73 """
56 74 # Try by furl_or_file
57 75 if furl_or_file is not None:
58 try:
59 furl = find_furl(furl_or_file)
60 except ValueError:
61 return furl
76 validate_furl_or_file(furl_or_file)
77 return furl_or_file
62 78
63 79 if furl_file_name is None:
64 raise ValueError('A furl_file_name must be provided')
80 raise FURLError('A furl_file_name must be provided')
65 81
66 82 # Try by cluster_dir
67 83 if cluster_dir is not None:
68 84 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
69 85 sdir = cluster_dir_obj.security_dir
70 86 furl_file = os.path.join(sdir, furl_file_name)
71 return find_furl(furl_file)
87 validate_furl_or_file(furl_file)
88 return furl_file
72 89
73 90 # Try by profile
74 91 if ipythondir is None:
@@ -78,9 +95,10 b' class AsyncClientConnector(object):'
78 95 ipythondir, profile)
79 96 sdir = cluster_dir_obj.security_dir
80 97 furl_file = os.path.join(sdir, furl_file_name)
81 return find_furl(furl_file)
98 validate_furl_or_file(furl_file)
99 return furl_file
82 100
83 raise ValueError('Could not find a valid FURL file.')
101 raise FURLError('Could not find a valid FURL file.')
84 102
85 103 def get_reference(self, furl_or_file):
86 104 """Get a remote reference using a furl or a file containing a furl.
@@ -92,13 +110,14 b' class AsyncClientConnector(object):'
92 110 Parameters
93 111 ----------
94 112 furl_or_file : str
95 A furl or a filename containing a furl
113 A furl or a filename containing a furl. This should already be
114 validated, but might not yet exist.
96 115
97 116 Returns
98 117 -------
99 118 A deferred to a remote reference
100 119 """
101 furl = find_furl(furl_or_file)
120 furl = furl_or_file
102 121 if furl in self._remote_refs:
103 122 d = defer.succeed(self._remote_refs[furl])
104 123 else:
@@ -112,7 +131,8 b' class AsyncClientConnector(object):'
112 131 return ref
113 132
114 133 def get_task_client(self, profile='default', cluster_dir=None,
115 furl_or_file=None, ipythondir=None):
134 furl_or_file=None, ipythondir=None,
135 delay=DELAY, max_tries=MAX_TRIES):
116 136 """Get the task controller client.
117 137
118 138 This method is a simple wrapper around `get_client` that passes in
@@ -143,11 +163,13 b' class AsyncClientConnector(object):'
143 163 """
144 164 return self.get_client(
145 165 profile, cluster_dir, furl_or_file,
146 'ipcontroller-tc.furl', ipythondir
166 'ipcontroller-tc.furl', ipythondir,
167 delay, max_tries
147 168 )
148 169
149 170 def get_multiengine_client(self, profile='default', cluster_dir=None,
150 furl_or_file=None, ipythondir=None):
171 furl_or_file=None, ipythondir=None,
172 delay=DELAY, max_tries=MAX_TRIES):
151 173 """Get the multiengine controller client.
152 174
153 175 This method is a simple wrapper around `get_client` that passes in
@@ -178,11 +200,13 b' class AsyncClientConnector(object):'
178 200 """
179 201 return self.get_client(
180 202 profile, cluster_dir, furl_or_file,
181 'ipcontroller-mec.furl', ipythondir
203 'ipcontroller-mec.furl', ipythondir,
204 delay, max_tries
182 205 )
183 206
184 207 def get_client(self, profile='default', cluster_dir=None,
185 furl_or_file=None, furl_file_name=None, ipythondir=None):
208 furl_or_file=None, furl_file_name=None, ipythondir=None,
209 delay=DELAY, max_tries=MAX_TRIES):
186 210 """Get a remote reference and wrap it in a client by furl.
187 211
188 212 This method is a simple wrapper around `get_client` that passes in
@@ -201,7 +225,7 b' class AsyncClientConnector(object):'
201 225 The full path to a cluster directory. This is useful if profiles
202 226 are not being used.
203 227 furl_or_file : str
204 A furl or a filename containing a FURLK. This is useful if you
228 A furl or a filename containing a FURL. This is useful if you
205 229 simply know the location of the FURL file.
206 230 furl_file_name : str
207 231 The filename (not the full path) of the FURL. This must be
@@ -212,18 +236,17 b' class AsyncClientConnector(object):'
212 236
213 237 Returns
214 238 -------
215 A deferred to the actual client class.
239 A deferred to the actual client class. Or a failure to a
240 :exc:`FURLError`.
216 241 """
217 242 try:
218 furl = self._find_furl(
243 furl_file = self._find_furl(
219 244 profile, cluster_dir, furl_or_file,
220 245 furl_file_name, ipythondir
221 246 )
222 except:
247 except FURLError:
223 248 return defer.fail(failure.Failure())
224 249
225 d = self.get_reference(furl)
226
227 250 def _wrap_remote_reference(rr):
228 251 d = rr.callRemote('get_client_name')
229 252 d.addCallback(lambda name: import_item(name))
@@ -235,9 +258,42 b' class AsyncClientConnector(object):'
235 258
236 259 return d
237 260
261 d = self._try_to_connect(furl_file, delay, max_tries, attempt=0)
238 262 d.addCallback(_wrap_remote_reference)
239 263 return d
240 264
265 @inlineCallbacks
266 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
267 """Try to connect to the controller with retry logic."""
268 if attempt < max_tries:
269 log.msg("Connecting to controller [%r]: %s" % \
270 (attempt, furl_or_file))
271 try:
272 self.furl = find_furl(furl_or_file)
273 # Uncomment this to see the FURL being tried.
274 # log.msg("FURL: %s" % self.furl)
275 rr = yield self.get_reference(self.furl)
276 except:
277 if attempt==max_tries-1:
278 # This will propagate the exception all the way to the top
279 # where it can be handled.
280 raise
281 else:
282 yield sleep_deferred(delay)
283 rr = yield self._try_to_connect(
284 furl_or_file, 1.5*delay, max_tries, attempt+1
285 )
286 returnValue(rr)
287 else:
288 returnValue(rr)
289 else:
290 raise ClientConnectorError(
291 'Could not connect to controller, max_tries (%r) exceeded. '
292 'This usually means that i) the controller was not started, '
293 'or ii) a firewall was blocking the client from connecting '
294 'to the controller.' % max_tries
295 )
296
241 297
242 298 class ClientConnector(object):
243 299 """A blocking version of a client connector.
@@ -252,7 +308,8 b' class ClientConnector(object):'
252 308 self.async_cc = AsyncClientConnector()
253 309
254 310 def get_task_client(self, profile='default', cluster_dir=None,
255 furl_or_file=None, ipythondir=None):
311 furl_or_file=None, ipythondir=None,
312 delay=DELAY, max_tries=MAX_TRIES):
256 313 """Get the task client.
257 314
258 315 Usually only the ``profile`` option will be needed. If a FURL file
@@ -282,12 +339,13 b' class ClientConnector(object):'
282 339 """
283 340 client = blockingCallFromThread(
284 341 self.async_cc.get_task_client, profile, cluster_dir,
285 furl_or_file, ipythondir
342 furl_or_file, ipythondir, delay, max_tries
286 343 )
287 344 return client.adapt_to_blocking_client()
288 345
289 346 def get_multiengine_client(self, profile='default', cluster_dir=None,
290 furl_or_file=None, ipythondir=None):
347 furl_or_file=None, ipythondir=None,
348 delay=DELAY, max_tries=MAX_TRIES):
291 349 """Get the multiengine client.
292 350
293 351 Usually only the ``profile`` option will be needed. If a FURL file
@@ -317,15 +375,17 b' class ClientConnector(object):'
317 375 """
318 376 client = blockingCallFromThread(
319 377 self.async_cc.get_multiengine_client, profile, cluster_dir,
320 furl_or_file, ipythondir
378 furl_or_file, ipythondir, delay, max_tries
321 379 )
322 380 return client.adapt_to_blocking_client()
323 381
324 382 def get_client(self, profile='default', cluster_dir=None,
325 furl_or_file=None, ipythondir=None):
383 furl_or_file=None, ipythondir=None,
384 delay=DELAY, max_tries=MAX_TRIES):
326 385 client = blockingCallFromThread(
327 386 self.async_cc.get_client, profile, cluster_dir,
328 furl_or_file, ipythondir
387 furl_or_file, ipythondir,
388 delay, max_tries
329 389 )
330 390 return client.adapt_to_blocking_client()
331 391
@@ -357,9 +417,6 b' class AsyncCluster(object):'
357 417 cluster_dir : str
358 418 The full path to a cluster directory. This is useful if profiles
359 419 are not being used.
360 furl_or_file : str
361 A furl or a filename containing a FURLK. This is useful if you
362 simply know the location of the FURL file.
363 420 ipythondir : str
364 421 The location of the ipythondir if different from the default.
365 422 This is used if the cluster directory is being found by profile.
@@ -451,7 +508,7 b' class AsyncCluster(object):'
451 508 else:
452 509 raise ClusterStateError("Cluster not running")
453 510
454 def get_multiengine_client(self):
511 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
455 512 """Get the multiengine client for the running cluster.
456 513
457 514 If this fails, it means that the cluster has not finished starting.
@@ -460,10 +517,11 b' class AsyncCluster(object):'
460 517 if self.client_connector is None:
461 518 self.client_connector = AsyncClientConnector()
462 519 return self.client_connector.get_multiengine_client(
463 cluster_dir=self.cluster_dir_obj.location
520 cluster_dir=self.cluster_dir_obj.location,
521 delay=delay, max_tries=max_tries
464 522 )
465 523
466 def get_task_client(self):
524 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
467 525 """Get the task client for the running cluster.
468 526
469 527 If this fails, it means that the cluster has not finished starting.
@@ -472,7 +530,8 b' class AsyncCluster(object):'
472 530 if self.client_connector is None:
473 531 self.client_connector = AsyncClientConnector()
474 532 return self.client_connector.get_task_client(
475 cluster_dir=self.cluster_dir_obj.location
533 cluster_dir=self.cluster_dir_obj.location,
534 delay=delay, max_tries=max_tries
476 535 )
477 536
478 537 def get_ipengine_logs(self):
@@ -529,9 +588,6 b' class Cluster(object):'
529 588 cluster_dir : str
530 589 The full path to a cluster directory. This is useful if profiles
531 590 are not being used.
532 furl_or_file : str
533 A furl or a filename containing a FURLK. This is useful if you
534 simply know the location of the FURL file.
535 591 ipythondir : str
536 592 The location of the ipythondir if different from the default.
537 593 This is used if the cluster directory is being found by profile.
@@ -581,7 +637,7 b' class Cluster(object):'
581 637 """Stop the IPython cluster if it is running."""
582 638 return blockingCallFromThread(self.async_cluster.stop)
583 639
584 def get_multiengine_client(self):
640 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
585 641 """Get the multiengine client for the running cluster.
586 642
587 643 If this fails, it means that the cluster has not finished starting.
@@ -590,10 +646,11 b' class Cluster(object):'
590 646 if self.client_connector is None:
591 647 self.client_connector = ClientConnector()
592 648 return self.client_connector.get_multiengine_client(
593 cluster_dir=self.cluster_dir_obj.location
649 cluster_dir=self.cluster_dir_obj.location,
650 delay=delay, max_tries=max_tries
594 651 )
595 652
596 def get_task_client(self):
653 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
597 654 """Get the task client for the running cluster.
598 655
599 656 If this fails, it means that the cluster has not finished starting.
@@ -602,7 +659,8 b' class Cluster(object):'
602 659 if self.client_connector is None:
603 660 self.client_connector = ClientConnector()
604 661 return self.client_connector.get_task_client(
605 cluster_dir=self.cluster_dir_obj.location
662 cluster_dir=self.cluster_dir_obj.location,
663 delay=delay, max_tries=max_tries
606 664 )
607 665
608 666 def __repr__(self):
@@ -21,9 +21,9 b' from twisted.python import log, failure'
21 21 from twisted.internet import defer
22 22 from twisted.internet.defer import inlineCallbacks, returnValue
23 23
24 from IPython.kernel.fcutil import find_furl
24 from IPython.kernel.fcutil import find_furl, validate_furl_or_file
25 25 from IPython.kernel.enginefc import IFCEngine
26 from IPython.kernel.twistedutil import sleep_deferred
26 from IPython.kernel.twistedutil import sleep_deferred, make_deferred
27 27
28 28 #-----------------------------------------------------------------------------
29 29 # The ClientConnector class
@@ -45,6 +45,7 b' class EngineConnector(object):'
45 45 def __init__(self, tub):
46 46 self.tub = tub
47 47
48 @make_deferred
48 49 def connect_to_controller(self, engine_service, furl_or_file,
49 50 delay=0.1, max_tries=10):
50 51 """
@@ -74,13 +75,20 b' class EngineConnector(object):'
74 75 attempts have increasing delays.
75 76 max_tries : int
76 77 The maximum number of connection attempts.
78
79 Returns
80 -------
81 A deferred to the registered client or a failure to an error
82 like :exc:`FURLError`.
77 83 """
78 84 if not self.tub.running:
79 85 self.tub.startService()
80 86 self.engine_service = engine_service
81 87 self.engine_reference = IFCEngine(self.engine_service)
82 88
89 validate_furl_or_file(furl_or_file)
83 90 d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0)
91 d.addCallback(self._register)
84 92 return d
85 93
86 94 @inlineCallbacks
@@ -101,12 +109,13 b' class EngineConnector(object):'
101 109 raise
102 110 else:
103 111 yield sleep_deferred(delay)
104 yield self._try_to_connect(
112 rr = yield self._try_to_connect(
105 113 furl_or_file, 1.5*delay, max_tries, attempt+1
106 114 )
115 # rr becomes an int when there is a connection!!!
116 returnValue(rr)
107 117 else:
108 result = yield self._register(rr)
109 returnValue(result)
118 returnValue(rr)
110 119 else:
111 120 raise EngineConnectorError(
112 121 'Could not connect to controller, max_tries (%r) exceeded. '
@@ -51,6 +51,10 b' else:'
51 51 have_crypto = True
52 52
53 53
54 class FURLError(Exception):
55 pass
56
57
54 58 def check_furl_file_security(furl_file, secure):
55 59 """Remove the old furl_file if changing security modes."""
56 60 if os.path.isfile(furl_file):
@@ -69,7 +73,7 b' def is_secure(furl):'
69 73 elif furl.startswith("pbu://"):
70 74 return False
71 75 else:
72 raise ValueError("invalid FURL: %s" % furl)
76 raise FURLError("invalid FURL: %s" % furl)
73 77
74 78
75 79 def is_valid(furl):
@@ -91,7 +95,30 b' def find_furl(furl_or_file):'
91 95 furl = f.read().strip()
92 96 if is_valid(furl):
93 97 return furl
94 raise ValueError("Not a FURL or a file containing a FURL: %s" % furl_or_file)
98 raise FURLError("Not a valid FURL or FURL file: %s" % furl_or_file)
99
100
101 def is_valid_furl_or_file(furl_or_file):
102 """Validate a FURL or a FURL file.
103
104 If ``furl_or_file`` looks like a file, we simply make sure its directory
105 exists and that it has a ``.furl`` file extension. We don't try to see
106 if the FURL file exists or to read its contents. This is useful for
107 cases where auto re-connection is being used.
108 """
109 if isinstance(furl_or_file, str):
110 if is_valid(furl_or_file):
111 return True
112 if isinstance(furl_or_file, (str, unicode)):
113 path, furl_filename = os.path.split(furl_or_file)
114 if os.path.isdir(path) and furl_filename.endswith('.furl'):
115 return True
116 return False
117
118
119 def validate_furl_or_file(furl_or_file):
120 if not is_valid_furl_or_file(furl_or_file):
121 raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file)
95 122
96 123
97 124 def get_temp_furlfile(filename):
@@ -40,6 +40,15 b' class ReactorInThread(threading.Thread):'
40 40 """
41 41
42 42 def run(self):
43 """Run the twisted reactor in a thread.
44
45 This runs the reactor with installSignalHandlers=0, which prevents
46 twisted from installing any of its own signal handlers. This needs to
47 be disabled because signal.signal can't be called in a thread. The
48 only problem with this is that SIGCHLD events won't be detected so
49 spawnProcess won't detect that its processes have been killed by
50 an external factor.
51 """
43 52 reactor.run(installSignalHandlers=0)
44 53 # self.join()
45 54
@@ -260,4 +269,6 b' def make_deferred(func):'
260 269 def _wrapper(*args, **kwargs):
261 270 return defer.maybeDeferred(func, *args, **kwargs)
262 271
263 return _wrapper No newline at end of file
272 return _wrapper
273
274
General Comments 0
You need to be logged in to leave comments. Login now