##// END OF EJS Templates
reorganize Factory classes to follow relocation of Session object
MinRK -
Show More
@@ -308,7 +308,7 b' class IPClusterEngines(BaseParallelApplication):'
308 308 klass = import_item(clsname)
309 309
310 310 launcher = klass(
311 work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
311 work_dir=self.profile_dir.location, config=self.config, log=self.log
312 312 )
313 313 return launcher
314 314
@@ -69,7 +69,7 b' class IPLoggerApp(BaseParallelApplication):'
69 69
70 70 def init_watcher(self):
71 71 try:
72 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
72 self.watcher = LogWatcher(config=self.config, log=self.log)
73 73 except:
74 74 self.log.error("Couldn't start the LogWatcher", exc_info=True)
75 75 self.exit(1)
@@ -49,14 +49,12 b' except ImportError:'
49 49
50 50 from zmq.eventloop import ioloop
51 51
52 # from IPython.config.configurable import Configurable
52 from IPython.config.configurable import Configurable
53 53 from IPython.utils.text import EvalFormatter
54 54 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
55 55 from IPython.utils.path import get_ipython_module_path
56 56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
57 57
58 from IPython.parallel.factory import LoggingFactory
59
60 58 from .win32support import forward_read_events
61 59
62 60 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
@@ -97,7 +95,7 b' class UnknownStatus(LauncherError):'
97 95 pass
98 96
99 97
100 class BaseLauncher(LoggingFactory):
98 class BaseLauncher(Configurable):
101 99 """An asbtraction for starting, stopping and signaling a process."""
102 100
103 101 # In all of the launchers, the work_dir is where child processes will be
@@ -109,6 +107,7 b' class BaseLauncher(LoggingFactory):'
109 107 # the work_dir option.
110 108 work_dir = Unicode(u'.')
111 109 loop = Instance('zmq.eventloop.ioloop.IOLoop')
110 log = Instance('logging.Logger', ('root',))
112 111
113 112 start_data = Any()
114 113 stop_data = Any()
@@ -390,7 +389,7 b' class LocalEngineSetLauncher(BaseLauncher):'
390 389 self.profile_dir = unicode(profile_dir)
391 390 dlist = []
392 391 for i in range(n):
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
394 393 # Copy the engine args over to each engine launcher.
395 394 el.engine_args = copy.deepcopy(self.engine_args)
396 395 el.on_stop(self._notice_engine_stopped)
@@ -612,7 +611,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
612 611 else:
613 612 user=None
614 613 for i in range(n):
615 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
614 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
616 615
617 616 # Copy the engine args over to each engine launcher.
618 617 i
@@ -19,21 +19,23 b' import sys'
19 19 import zmq
20 20 from zmq.eventloop import ioloop, zmqstream
21 21
22 from IPython.config.configurable import Configurable
22 23 from IPython.utils.traitlets import Int, Unicode, Instance, List
23 24
24 from IPython.parallel.factory import LoggingFactory
25
26 25 #-----------------------------------------------------------------------------
27 26 # Classes
28 27 #-----------------------------------------------------------------------------
29 28
30 29
31 class LogWatcher(LoggingFactory):
30 class LogWatcher(Configurable):
32 31 """A simple class that receives messages on a SUB socket, as published
33 32 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
34 33
35 34 This can subscribe to multiple topics, but defaults to all topics.
36 35 """
36
37 log = Instance('logging.Logger', ('root',))
38
37 39 # configurables
38 40 topics = List([''], config=True,
39 41 help="The ZMQ topics to subscribe to. Default is to subscribe to all messages")
@@ -18,8 +18,8 b' import zmq'
18 18 from zmq.devices import ThreadDevice
19 19 from zmq.eventloop import ioloop, zmqstream
20 20
21 from IPython.config.configurable import Configurable
21 22 from IPython.utils.traitlets import Set, Instance, CFloat
22 from IPython.parallel.factory import LoggingFactory
23 23
24 24 class Heart(object):
25 25 """A basic heart object for responding to a HeartMonitor.
@@ -47,7 +47,7 b' class Heart(object):'
47 47 def start(self):
48 48 return self.device.start()
49 49
50 class HeartMonitor(LoggingFactory):
50 class HeartMonitor(Configurable):
51 51 """A basic HeartMonitor class
52 52 pingstream: a PUB stream
53 53 pongstream: an XREP stream
@@ -58,6 +58,7 b' class HeartMonitor(LoggingFactory):'
58 58 ' (in ms) [default: 100]',
59 59 )
60 60
61 log = Instance('logging.Logger', ('root',))
61 62 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
62 63 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
63 64 loop = Instance('zmq.eventloop.ioloop.IOLoop')
@@ -31,7 +31,9 b' from IPython.utils.traitlets import ('
31 31 from IPython.utils.jsonutil import ISO8601, extract_dates
32 32
33 33 from IPython.parallel import error, util
34 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
34 from IPython.parallel.factory import RegistrationFactory
35
36 from IPython.zmq.session import SessionFactory
35 37
36 38 from .heartmonitor import HeartMonitor
37 39
@@ -231,8 +233,10 b' class HubFactory(RegistrationFactory):'
231 233 hpub.bind(engine_iface % self.hb[0])
232 234 hrep = ctx.socket(zmq.XREP)
233 235 hrep.bind(engine_iface % self.hb[1])
234 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
235 config=self.config)
236 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
237 pingstream=ZMQStream(hpub,loop),
238 pongstream=ZMQStream(hrep,loop)
239 )
236 240
237 241 ### Client connections ###
238 242 # Notifier socket
@@ -287,10 +291,10 b' class HubFactory(RegistrationFactory):'
287 291 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
288 292 query=q, notifier=n, resubmit=r, db=self.db,
289 293 engine_info=self.engine_info, client_info=self.client_info,
290 logname=self.log.name)
294 log=self.log)
291 295
292 296
293 class Hub(LoggingFactory):
297 class Hub(SessionFactory):
294 298 """The IPython Controller Hub with 0MQ connections
295 299
296 300 Parameters
@@ -328,7 +332,6 b' class Hub(LoggingFactory):'
328 332 _idcounter=Int(0)
329 333
330 334 # objects from constructor:
331 loop=Instance(ioloop.IOLoop)
332 335 query=Instance(ZMQStream)
333 336 monitor=Instance(ZMQStream)
334 337 notifier=Instance(ZMQStream)
@@ -612,7 +612,8 b' class TaskScheduler(SessionFactory):'
612 612 for msg_id in jobs:
613 613 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
614 614
615 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
615 if after.unreachable(self.all_completed, self.all_failed)\
616 or follow.unreachable(self.all_completed, self.all_failed):
616 617 self.fail_unreachable(msg_id)
617 618
618 619 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
@@ -643,9 +644,9 b' class TaskScheduler(SessionFactory):'
643 644
644 645
645 646
646 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
647 log_url=None, loglevel=logging.DEBUG,
648 identity=b'task'):
647 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
648 logname='root', log_url=None, loglevel=logging.DEBUG,
649 identity=b'task'):
649 650 from zmq.eventloop import ioloop
650 651 from zmq.eventloop.zmqstream import ZMQStream
651 652
@@ -671,13 +672,13 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='
671 672 # setup logging. Note that these will not work in-process, because they clobber
672 673 # existing loggers.
673 674 if log_url:
674 connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
675 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
675 676 else:
676 local_logger(logname, loglevel)
677 log = local_logger(logname, loglevel)
677 678
678 679 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
679 680 mon_stream=mons, notifier_stream=nots,
680 loop=loop, logname=logname,
681 loop=loop, log=log,
681 682 config=config)
682 683 scheduler.start()
683 684 try:
@@ -1,7 +1,7 b''
1 1 """Base config factories."""
2 2
3 3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2008-2009 The IPython Development Team
4 # Copyright (C) 2010-2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
@@ -22,39 +22,12 b' from IPython.config.configurable import Configurable'
22 22 from IPython.utils.traitlets import Int, Instance, Unicode
23 23
24 24 from IPython.parallel.util import select_random_ports
25 from IPython.zmq.session import Session
25 from IPython.zmq.session import Session, SessionFactory
26 26
27 27 #-----------------------------------------------------------------------------
28 28 # Classes
29 29 #-----------------------------------------------------------------------------
30 class LoggingFactory(Configurable):
31 """A most basic class, that has a `log` (type:`Logger`) attribute, set via a `logname` Trait."""
32 log = Instance('logging.Logger', ('ZMQ', logging.WARN))
33 logname = Unicode('ZMQ')
34 def _logname_changed(self, name, old, new):
35 self.log = logging.getLogger(new)
36
37 30
38 class SessionFactory(LoggingFactory):
39 """The Base factory from which every factory in IPython.parallel inherits"""
40
41 # not configurable:
42 context = Instance('zmq.Context')
43 def _context_default(self):
44 return zmq.Context.instance()
45
46 session = Instance('IPython.zmq.session.Session')
47 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
48 def _loop_default(self):
49 return IOLoop.instance()
50
51
52 def __init__(self, **kwargs):
53 super(SessionFactory, self).__init__(**kwargs)
54
55 # construct the session
56 self.session = Session(**kwargs)
57
58 31
59 32 class RegistrationFactory(SessionFactory):
60 33 """The Base Configurable for objects that involve registration."""
@@ -452,6 +452,7 b' def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):'
452 452 handler.setLevel(loglevel)
453 453 logger.addHandler(handler)
454 454 logger.setLevel(loglevel)
455 return logger
455 456
456 457 def local_logger(logname, loglevel=logging.DEBUG):
457 458 loglevel = integer_loglevel(loglevel)
@@ -463,4 +464,5 b' def local_logger(logname, loglevel=logging.DEBUG):'
463 464 handler.setLevel(loglevel)
464 465 logger.addHandler(handler)
465 466 logger.setLevel(loglevel)
467 return logger
466 468
@@ -13,6 +13,7 b''
13 13 #-----------------------------------------------------------------------------
14 14
15 15 import hmac
16 import logging
16 17 import os
17 18 import pprint
18 19 import uuid
@@ -27,6 +28,7 b' except:'
27 28
28 29 import zmq
29 30 from zmq.utils import jsonapi
31 from zmq.eventloop.ioloop import IOLoop
30 32 from zmq.eventloop.zmqstream import ZMQStream
31 33
32 34 from IPython.config.configurable import Configurable
@@ -73,6 +75,36 b' DELIM="<IDS|MSG>"'
73 75 # Classes
74 76 #-----------------------------------------------------------------------------
75 77
78 class SessionFactory(Configurable):
79 """The Base class for configurables that have a Session, Context, logger,
80 and IOLoop.
81 """
82
83 log = Instance('logging.Logger', ('', logging.WARN))
84
85 logname = Unicode('')
86 def _logname_changed(self, name, old, new):
87 self.log = logging.getLogger(new)
88
89 # not configurable:
90 context = Instance('zmq.Context')
91 def _context_default(self):
92 return zmq.Context.instance()
93
94 session = Instance('IPython.zmq.session.Session')
95
96 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
97 def _loop_default(self):
98 return IOLoop.instance()
99
100 def __init__(self, **kwargs):
101 super(SessionFactory, self).__init__(**kwargs)
102
103 if self.session is None:
104 # construct the session
105 self.session = Session(**kwargs)
106
107
76 108 class Message(object):
77 109 """A simple message object that maps dict keys to attributes.
78 110
@@ -143,7 +175,7 b' class Session(Configurable):'
143 175 else:
144 176 self.pack = import_item(new)
145 177
146 unpacker = Unicode('json',config=True,
178 unpacker = Unicode('json', config=True,
147 179 help="""The name of the unpacker for unserializing messages.
148 180 Only used with custom functions for `packer`.""")
149 181 def _unpacker_changed(self, name, old, new):
@@ -156,7 +188,7 b' class Session(Configurable):'
156 188 else:
157 189 self.unpack = import_item(new)
158 190
159 session = CStr('',config=True,
191 session = CStr('', config=True,
160 192 help="""The UUID identifying this session.""")
161 193 def _session_default(self):
162 194 return bytes(uuid.uuid4())
General Comments 0
You need to be logged in to leave comments. Login now