##// END OF EJS Templates
Added a log retrieval interface to Cluster.
Brian Granger -
Show More
@@ -1,584 +1,655 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Facilities for handling client connections to the controller."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008-2009 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 from __future__ import with_statement
17 18 import os
18 19
19 20 from IPython.kernel.fcutil import Tub, find_furl
20 21 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
21 22 from IPython.kernel.launcher import IPClusterLauncher
22 23 from IPython.kernel.twistedutil import gatherBoth, make_deferred
23 24 from IPython.kernel.twistedutil import blockingCallFromThread
24 25
25 26 from IPython.utils.importstring import import_item
26 27 from IPython.utils.genutils import get_ipython_dir
27 28
28 29 from twisted.internet import defer
29 30 from twisted.python import failure
30 31
31 32 #-----------------------------------------------------------------------------
32 33 # The ClientConnector class
33 34 #-----------------------------------------------------------------------------
34 35
35 36
36 37 class AsyncClientConnector(object):
37 38 """A class for getting remote references and clients from furls.
38 39
39 40 This start a single :class:`Tub` for all remote reference and caches
40 41 references.
41 42 """
42 43
43 44 def __init__(self):
44 45 self._remote_refs = {}
45 46 self.tub = Tub()
46 47 self.tub.startService()
47 48
48 49 def _find_furl(self, profile='default', cluster_dir=None,
49 50 furl_or_file=None, furl_file_name=None,
50 51 ipythondir=None):
51 52 """Find a FURL file by profile+ipythondir or cluster dir.
52 53
53 54 This raises an exception if a FURL file can't be found.
54 55 """
55 56 # Try by furl_or_file
56 57 if furl_or_file is not None:
57 58 try:
58 59 furl = find_furl(furl_or_file)
59 60 except ValueError:
60 61 return furl
61 62
62 63 if furl_file_name is None:
63 64 raise ValueError('A furl_file_name must be provided')
64 65
65 66 # Try by cluster_dir
66 67 if cluster_dir is not None:
67 68 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
68 69 sdir = cluster_dir_obj.security_dir
69 70 furl_file = os.path.join(sdir, furl_file_name)
70 71 return find_furl(furl_file)
71 72
72 73 # Try by profile
73 74 if ipythondir is None:
74 75 ipythondir = get_ipython_dir()
75 76 if profile is not None:
76 77 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
77 78 ipythondir, profile)
78 79 sdir = cluster_dir_obj.security_dir
79 80 furl_file = os.path.join(sdir, furl_file_name)
80 81 return find_furl(furl_file)
81 82
82 83 raise ValueError('Could not find a valid FURL file.')
83 84
84 85 def get_reference(self, furl_or_file):
85 86 """Get a remote reference using a furl or a file containing a furl.
86 87
87 88 Remote references are cached locally so once a remote reference
88 89 has been retrieved for a given furl, the cached version is
89 90 returned.
90 91
91 92 Parameters
92 93 ----------
93 94 furl_or_file : str
94 95 A furl or a filename containing a furl
95 96
96 97 Returns
97 98 -------
98 99 A deferred to a remote reference
99 100 """
100 101 furl = find_furl(furl_or_file)
101 102 if furl in self._remote_refs:
102 103 d = defer.succeed(self._remote_refs[furl])
103 104 else:
104 105 d = self.tub.getReference(furl)
105 106 d.addCallback(self._save_ref, furl)
106 107 return d
107 108
108 109 def _save_ref(self, ref, furl):
109 110 """Cache a remote reference by its furl."""
110 111 self._remote_refs[furl] = ref
111 112 return ref
112 113
113 114 def get_task_client(self, profile='default', cluster_dir=None,
114 115 furl_or_file=None, ipythondir=None):
115 116 """Get the task controller client.
116 117
117 118 This method is a simple wrapper around `get_client` that passes in
118 119 the default name of the task client FURL file. Usually only
119 120 the ``profile`` option will be needed. If a FURL file can't be
120 121 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
121 122
122 123 Parameters
123 124 ----------
124 125 profile : str
125 126 The name of a cluster directory profile (default="default"). The
126 127 cluster directory "cluster_<profile>" will be searched for
127 128 in ``os.getcwd()``, the ipythondir and then in the directories
128 129 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
129 130 cluster_dir : str
130 131 The full path to a cluster directory. This is useful if profiles
131 132 are not being used.
132 133 furl_or_file : str
133 134 A furl or a filename containing a FURLK. This is useful if you
134 135 simply know the location of the FURL file.
135 136 ipythondir : str
136 137 The location of the ipythondir if different from the default.
137 138 This is used if the cluster directory is being found by profile.
138 139
139 140 Returns
140 141 -------
141 142 A deferred to the actual client class.
142 143 """
143 144 return self.get_client(
144 145 profile, cluster_dir, furl_or_file,
145 146 'ipcontroller-tc.furl', ipythondir
146 147 )
147 148
148 149 def get_multiengine_client(self, profile='default', cluster_dir=None,
149 150 furl_or_file=None, ipythondir=None):
150 151 """Get the multiengine controller client.
151 152
152 153 This method is a simple wrapper around `get_client` that passes in
153 154 the default name of the task client FURL file. Usually only
154 155 the ``profile`` option will be needed. If a FURL file can't be
155 156 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
156 157
157 158 Parameters
158 159 ----------
159 160 profile : str
160 161 The name of a cluster directory profile (default="default"). The
161 162 cluster directory "cluster_<profile>" will be searched for
162 163 in ``os.getcwd()``, the ipythondir and then in the directories
163 164 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
164 165 cluster_dir : str
165 166 The full path to a cluster directory. This is useful if profiles
166 167 are not being used.
167 168 furl_or_file : str
168 169 A furl or a filename containing a FURLK. This is useful if you
169 170 simply know the location of the FURL file.
170 171 ipythondir : str
171 172 The location of the ipythondir if different from the default.
172 173 This is used if the cluster directory is being found by profile.
173 174
174 175 Returns
175 176 -------
176 177 A deferred to the actual client class.
177 178 """
178 179 return self.get_client(
179 180 profile, cluster_dir, furl_or_file,
180 181 'ipcontroller-mec.furl', ipythondir
181 182 )
182 183
183 184 def get_client(self, profile='default', cluster_dir=None,
184 185 furl_or_file=None, furl_file_name=None, ipythondir=None):
185 186 """Get a remote reference and wrap it in a client by furl.
186 187
187 188 This method is a simple wrapper around `get_client` that passes in
188 189 the default name of the task client FURL file. Usually only
189 190 the ``profile`` option will be needed. If a FURL file can't be
190 191 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
191 192
192 193 Parameters
193 194 ----------
194 195 profile : str
195 196 The name of a cluster directory profile (default="default"). The
196 197 cluster directory "cluster_<profile>" will be searched for
197 198 in ``os.getcwd()``, the ipythondir and then in the directories
198 199 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
199 200 cluster_dir : str
200 201 The full path to a cluster directory. This is useful if profiles
201 202 are not being used.
202 203 furl_or_file : str
203 204 A furl or a filename containing a FURLK. This is useful if you
204 205 simply know the location of the FURL file.
205 206 furl_file_name : str
206 207 The filename (not the full path) of the FURL. This must be
207 208 provided if ``furl_or_file`` is not.
208 209 ipythondir : str
209 210 The location of the ipythondir if different from the default.
210 211 This is used if the cluster directory is being found by profile.
211 212
212 213 Returns
213 214 -------
214 215 A deferred to the actual client class.
215 216 """
216 217 try:
217 218 furl = self._find_furl(
218 219 profile, cluster_dir, furl_or_file,
219 220 furl_file_name, ipythondir
220 221 )
221 222 except:
222 223 return defer.fail(failure.Failure())
223 224
224 225 d = self.get_reference(furl)
225 226
226 227 def _wrap_remote_reference(rr):
227 228 d = rr.callRemote('get_client_name')
228 229 d.addCallback(lambda name: import_item(name))
229 230 def adapt(client_interface):
230 231 client = client_interface(rr)
231 232 client.tub = self.tub
232 233 return client
233 234 d.addCallback(adapt)
234 235
235 236 return d
236 237
237 238 d.addCallback(_wrap_remote_reference)
238 239 return d
239 240
240 241
241 242 class ClientConnector(object):
242 243 """A blocking version of a client connector.
243 244
244 245 This class creates a single :class:`Tub` instance and allows remote
245 246 references and client to be retrieved by their FURLs. Remote references
246 247 are cached locally and FURL files can be found using profiles and cluster
247 248 directories.
248 249 """
249 250
250 251 def __init__(self):
251 252 self.async_cc = AsyncClientConnector()
252 253
253 254 def get_task_client(self, profile='default', cluster_dir=None,
254 255 furl_or_file=None, ipythondir=None):
255 256 """Get the task client.
256 257
257 258 Usually only the ``profile`` option will be needed. If a FURL file
258 259 can't be found by its profile, use ``cluster_dir`` or
259 260 ``furl_or_file``.
260 261
261 262 Parameters
262 263 ----------
263 264 profile : str
264 265 The name of a cluster directory profile (default="default"). The
265 266 cluster directory "cluster_<profile>" will be searched for
266 267 in ``os.getcwd()``, the ipythondir and then in the directories
267 268 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
268 269 cluster_dir : str
269 270 The full path to a cluster directory. This is useful if profiles
270 271 are not being used.
271 272 furl_or_file : str
272 273 A furl or a filename containing a FURLK. This is useful if you
273 274 simply know the location of the FURL file.
274 275 ipythondir : str
275 276 The location of the ipythondir if different from the default.
276 277 This is used if the cluster directory is being found by profile.
277 278
278 279 Returns
279 280 -------
280 281 The task client instance.
281 282 """
282 283 client = blockingCallFromThread(
283 284 self.async_cc.get_task_client, profile, cluster_dir,
284 285 furl_or_file, ipythondir
285 286 )
286 287 return client.adapt_to_blocking_client()
287 288
288 289 def get_multiengine_client(self, profile='default', cluster_dir=None,
289 290 furl_or_file=None, ipythondir=None):
290 291 """Get the multiengine client.
291 292
292 293 Usually only the ``profile`` option will be needed. If a FURL file
293 294 can't be found by its profile, use ``cluster_dir`` or
294 295 ``furl_or_file``.
295 296
296 297 Parameters
297 298 ----------
298 299 profile : str
299 300 The name of a cluster directory profile (default="default"). The
300 301 cluster directory "cluster_<profile>" will be searched for
301 302 in ``os.getcwd()``, the ipythondir and then in the directories
302 303 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
303 304 cluster_dir : str
304 305 The full path to a cluster directory. This is useful if profiles
305 306 are not being used.
306 307 furl_or_file : str
307 308 A furl or a filename containing a FURLK. This is useful if you
308 309 simply know the location of the FURL file.
309 310 ipythondir : str
310 311 The location of the ipythondir if different from the default.
311 312 This is used if the cluster directory is being found by profile.
312 313
313 314 Returns
314 315 -------
315 316 The multiengine client instance.
316 317 """
317 318 client = blockingCallFromThread(
318 319 self.async_cc.get_multiengine_client, profile, cluster_dir,
319 320 furl_or_file, ipythondir
320 321 )
321 322 return client.adapt_to_blocking_client()
322 323
323 324 def get_client(self, profile='default', cluster_dir=None,
324 325 furl_or_file=None, ipythondir=None):
325 326 client = blockingCallFromThread(
326 327 self.async_cc.get_client, profile, cluster_dir,
327 328 furl_or_file, ipythondir
328 329 )
329 330 return client.adapt_to_blocking_client()
330 331
331 332
332 333 class ClusterStateError(Exception):
333 334 pass
334 335
335 336
336 337 class AsyncCluster(object):
337 338 """An class that wraps the :command:`ipcluster` script."""
338 339
339 340 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
340 341 auto_create=False, auto_stop=True):
341 342 """Create a class to manage an IPython cluster.
342 343
343 344 This class calls the :command:`ipcluster` command with the right
344 345 options to start an IPython cluster. Typically a cluster directory
345 346 must be created (:command:`ipcluster create`) and configured before
346 347 using this class. Configuration is done by editing the
347 348 configuration files in the top level of the cluster directory.
348 349
349 350 Parameters
350 351 ----------
351 352 profile : str
352 353 The name of a cluster directory profile (default="default"). The
353 354 cluster directory "cluster_<profile>" will be searched for
354 355 in ``os.getcwd()``, the ipythondir and then in the directories
355 356 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
356 357 cluster_dir : str
357 358 The full path to a cluster directory. This is useful if profiles
358 359 are not being used.
359 360 furl_or_file : str
360 361 A furl or a filename containing a FURLK. This is useful if you
361 362 simply know the location of the FURL file.
362 363 ipythondir : str
363 364 The location of the ipythondir if different from the default.
364 365 This is used if the cluster directory is being found by profile.
365 366 auto_create : bool
366 367 Automatically create the cluster directory it is dones't exist.
367 368 This will usually only make sense if using a local cluster
368 369 (default=False).
369 370 auto_stop : bool
370 371 Automatically stop the cluster when this instance is garbage
371 372 collected (default=True). This is useful if you want the cluster
372 373 to live beyond your current process. There is also an instance
373 374 attribute ``auto_stop`` to change this behavior.
374 375 """
375 376 self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create)
376 377 self.state = 'before'
377 378 self.launcher = None
378 379 self.client_connector = None
379 380 self.auto_stop = auto_stop
380 381
381 382 def __del__(self):
382 383 if self.auto_stop and self.state=='running':
383 384 print "Auto stopping the cluster..."
384 385 self.stop()
385 386
386 387 @property
387 388 def location(self):
388 389 if hasattr(self, 'cluster_dir_obj'):
389 390 return self.cluster_dir_obj.location
390 391 else:
391 392 return ''
392 393
393 394 @property
394 395 def running(self):
395 396 if self.state=='running':
396 397 return True
397 398 else:
398 399 return False
399 400
400 401 def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create):
401 402 if ipythondir is None:
402 403 ipythondir = get_ipython_dir()
403 404 if cluster_dir is not None:
404 405 try:
405 406 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
406 407 except ClusterDirError:
407 408 pass
408 409 if profile is not None:
409 410 try:
410 411 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
411 412 ipythondir, profile)
412 413 except ClusterDirError:
413 414 pass
414 415 if auto_create or profile=='default':
415 416 # This should call 'ipcluster create --profile default
416 417 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
417 418 ipythondir, profile)
418 419 else:
419 420 raise ClusterDirError('Cluster dir not found.')
420 421
421 422 @make_deferred
422 423 def start(self, n=2):
423 424 """Start the IPython cluster with n engines.
424 425
425 426 Parameters
426 427 ----------
427 428 n : int
428 429 The number of engine to start.
429 430 """
430 431 # We might want to add logic to test if the cluster has started
431 432 # by another process....
432 433 if not self.state=='running':
433 434 self.launcher = IPClusterLauncher(os.getcwd())
434 435 self.launcher.ipcluster_n = n
435 436 self.launcher.ipcluster_subcommand = 'start'
436 437 d = self.launcher.start()
437 438 d.addCallback(self._handle_start)
438 439 return d
439 440 else:
440 441 raise ClusterStateError('Cluster is already running')
441 442
442 443 @make_deferred
443 444 def stop(self):
444 445 """Stop the IPython cluster if it is running."""
445 446 if self.state=='running':
446 447 d1 = self.launcher.observe_stop()
447 448 d1.addCallback(self._handle_stop)
448 449 d2 = self.launcher.stop()
449 450 return gatherBoth([d1, d2], consumeErrors=True)
450 451 else:
451 452 raise ClusterStateError("Cluster not running")
452 453
453 454 def get_multiengine_client(self):
454 455 """Get the multiengine client for the running cluster.
455 456
456 457 If this fails, it means that the cluster has not finished starting.
457 458 Usually waiting a few seconds are re-trying will solve this.
458 459 """
459 460 if self.client_connector is None:
460 461 self.client_connector = AsyncClientConnector()
461 462 return self.client_connector.get_multiengine_client(
462 463 cluster_dir=self.cluster_dir_obj.location
463 464 )
464 465
465 466 def get_task_client(self):
466 467 """Get the task client for the running cluster.
467 468
468 469 If this fails, it means that the cluster has not finished starting.
469 470 Usually waiting a few seconds are re-trying will solve this.
470 471 """
471 472 if self.client_connector is None:
472 473 self.client_connector = AsyncClientConnector()
473 474 return self.client_connector.get_task_client(
474 475 cluster_dir=self.cluster_dir_obj.location
475 476 )
476 477
478 def get_ipengine_logs(self):
479 return self.get_logs_by_name('ipengine')
480
481 def get_ipcontroller_logs(self):
482 return self.get_logs_by_name('ipcontroller')
483
484 def get_ipcluster_logs(self):
485 return self.get_logs_by_name('ipcluster')
486
487 def get_logs_by_name(self, name='ipcluster'):
488 log_dir = self.cluster_dir_obj.log_dir
489 logs = {}
490 for log in os.listdir(log_dir):
491 if log.startswith(name + '-') and log.endswith('.log'):
492 with open(os.path.join(log_dir, log), 'r') as f:
493 logs[log] = f.read()
494 return logs
495
496 def get_logs(self):
497 d = self.get_ipcluster_logs()
498 d.update(self.get_ipengine_logs())
499 d.update(self.get_ipcontroller_logs())
500 return d
501
477 502 def _handle_start(self, r):
478 503 self.state = 'running'
479 504
480 505 def _handle_stop(self, r):
481 506 self.state = 'after'
482 507
483 508
484 509 class Cluster(object):
485 510
486 511
487 512 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
488 513 auto_create=False, auto_stop=True):
489 514 """Create a class to manage an IPython cluster.
490 515
491 516 This class calls the :command:`ipcluster` command with the right
492 517 options to start an IPython cluster. Typically a cluster directory
493 518 must be created (:command:`ipcluster create`) and configured before
494 519 using this class. Configuration is done by editing the
495 520 configuration files in the top level of the cluster directory.
496 521
497 522 Parameters
498 523 ----------
499 524 profile : str
500 525 The name of a cluster directory profile (default="default"). The
501 526 cluster directory "cluster_<profile>" will be searched for
502 527 in ``os.getcwd()``, the ipythondir and then in the directories
503 528 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
504 529 cluster_dir : str
505 530 The full path to a cluster directory. This is useful if profiles
506 531 are not being used.
507 532 furl_or_file : str
508 533 A furl or a filename containing a FURLK. This is useful if you
509 534 simply know the location of the FURL file.
510 535 ipythondir : str
511 536 The location of the ipythondir if different from the default.
512 537 This is used if the cluster directory is being found by profile.
513 538 auto_create : bool
514 539 Automatically create the cluster directory it is dones't exist.
515 540 This will usually only make sense if using a local cluster
516 541 (default=False).
517 542 auto_stop : bool
518 543 Automatically stop the cluster when this instance is garbage
519 544 collected (default=True). This is useful if you want the cluster
520 545 to live beyond your current process. There is also an instance
521 546 attribute ``auto_stop`` to change this behavior.
522 547 """
523 548 self.async_cluster = AsyncCluster(
524 549 profile, cluster_dir, ipythondir, auto_create, auto_stop
525 550 )
526 551 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
527 552 self.client_connector = None
528 553
529 554 def _set_auto_stop(self, value):
530 555 self.async_cluster.auto_stop = value
531 556
532 557 def _get_auto_stop(self):
533 558 return self.async_cluster.auto_stop
534 559
535 560 auto_stop = property(_get_auto_stop, _set_auto_stop)
536 561
537 562 @property
538 563 def location(self):
539 564 return self.async_cluster.location
540 565
541 566 @property
542 567 def running(self):
543 568 return self.async_cluster.running
544 569
545 570 def start(self, n=2):
546 571 """Start the IPython cluster with n engines.
547 572
548 573 Parameters
549 574 ----------
550 575 n : int
551 576 The number of engine to start.
552 577 """
553 578 return blockingCallFromThread(self.async_cluster.start, n)
554 579
555 580 def stop(self):
556 581 """Stop the IPython cluster if it is running."""
557 582 return blockingCallFromThread(self.async_cluster.stop)
558 583
559 584 def get_multiengine_client(self):
560 585 """Get the multiengine client for the running cluster.
561 586
562 587 If this fails, it means that the cluster has not finished starting.
563 588 Usually waiting a few seconds are re-trying will solve this.
564 589 """
565 590 if self.client_connector is None:
566 591 self.client_connector = ClientConnector()
567 592 return self.client_connector.get_multiengine_client(
568 593 cluster_dir=self.cluster_dir_obj.location
569 594 )
570 595
571 596 def get_task_client(self):
572 597 """Get the task client for the running cluster.
573 598
574 599 If this fails, it means that the cluster has not finished starting.
575 600 Usually waiting a few seconds are re-trying will solve this.
576 601 """
577 602 if self.client_connector is None:
578 603 self.client_connector = ClientConnector()
579 604 return self.client_connector.get_task_client(
580 605 cluster_dir=self.cluster_dir_obj.location
581 606 )
582 607
583
608 def __repr__(self):
609 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
610 return s
611
612 def get_logs_by_name(self, name='ipcluter'):
613 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
614 return self.async_cluster.get_logs_by_name(name)
615
616 def get_ipengine_logs(self):
617 """Get a dict of logs for all engines in this cluster."""
618 return self.async_cluster.get_ipengine_logs()
619
620 def get_ipcontroller_logs(self):
621 """Get a dict of logs for the controller in this cluster."""
622 return self.async_cluster.get_ipcontroller_logs()
623
624 def get_ipcluster_logs(self):
625 """Get a dict of the ipcluster logs for this cluster."""
626 return self.async_cluster.get_ipcluster_logs()
627
628 def get_logs(self):
629 """Get a dict of all logs for this cluster."""
630 return self.async_cluster.get_logs()
631
632 def _print_logs(self, logs):
633 for k, v in logs.iteritems():
634 print "==================================="
635 print "Logfile: %s" % k
636 print "==================================="
637 print v
638 print
639
640 def print_ipengine_logs(self):
641 """Print the ipengine logs for this cluster to stdout."""
642 self._print_logs(self.get_ipengine_logs())
643
644 def print_ipcontroller_logs(self):
645 """Print the ipcontroller logs for this cluster to stdout."""
646 self._print_logs(self.get_ipcontroller_logs())
647
648 def print_ipcluster_logs(self):
649 """Print the ipcluster logs for this cluster to stdout."""
650 self._print_logs(self.get_ipcluster_logs())
651
652 def print_logs(self):
653 """Print all the logs for this cluster to stdout."""
654 self._print_logs(self.get_logs())
584 655
General Comments 0
You need to be logged in to leave comments. Login now