##// END OF EJS Templates
reorganize Factory classes to follow relocation of Session object
MinRK -
Show More
@@ -308,7 +308,7 b' class IPClusterEngines(BaseParallelApplication):'
308 klass = import_item(clsname)
308 klass = import_item(clsname)
309
309
310 launcher = klass(
310 launcher = klass(
311 work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
311 work_dir=self.profile_dir.location, config=self.config, log=self.log
312 )
312 )
313 return launcher
313 return launcher
314
314
@@ -69,7 +69,7 b' class IPLoggerApp(BaseParallelApplication):'
69
69
70 def init_watcher(self):
70 def init_watcher(self):
71 try:
71 try:
72 self.watcher = LogWatcher(config=self.config, logname=self.log.name)
72 self.watcher = LogWatcher(config=self.config, log=self.log)
73 except:
73 except:
74 self.log.error("Couldn't start the LogWatcher", exc_info=True)
74 self.log.error("Couldn't start the LogWatcher", exc_info=True)
75 self.exit(1)
75 self.exit(1)
@@ -49,14 +49,12 b' except ImportError:'
49
49
50 from zmq.eventloop import ioloop
50 from zmq.eventloop import ioloop
51
51
52 # from IPython.config.configurable import Configurable
52 from IPython.config.configurable import Configurable
53 from IPython.utils.text import EvalFormatter
53 from IPython.utils.text import EvalFormatter
54 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
54 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
55 from IPython.utils.path import get_ipython_module_path
55 from IPython.utils.path import get_ipython_module_path
56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
57
57
58 from IPython.parallel.factory import LoggingFactory
59
60 from .win32support import forward_read_events
58 from .win32support import forward_read_events
61
59
62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
60 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
@@ -97,7 +95,7 b' class UnknownStatus(LauncherError):'
97 pass
95 pass
98
96
99
97
100 class BaseLauncher(LoggingFactory):
98 class BaseLauncher(Configurable):
101 """An asbtraction for starting, stopping and signaling a process."""
99 """An asbtraction for starting, stopping and signaling a process."""
102
100
103 # In all of the launchers, the work_dir is where child processes will be
101 # In all of the launchers, the work_dir is where child processes will be
@@ -109,6 +107,7 b' class BaseLauncher(LoggingFactory):'
109 # the work_dir option.
107 # the work_dir option.
110 work_dir = Unicode(u'.')
108 work_dir = Unicode(u'.')
111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
109 loop = Instance('zmq.eventloop.ioloop.IOLoop')
110 log = Instance('logging.Logger', ('root',))
112
111
113 start_data = Any()
112 start_data = Any()
114 stop_data = Any()
113 stop_data = Any()
@@ -390,7 +389,7 b' class LocalEngineSetLauncher(BaseLauncher):'
390 self.profile_dir = unicode(profile_dir)
389 self.profile_dir = unicode(profile_dir)
391 dlist = []
390 dlist = []
392 for i in range(n):
391 for i in range(n):
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
394 # Copy the engine args over to each engine launcher.
393 # Copy the engine args over to each engine launcher.
395 el.engine_args = copy.deepcopy(self.engine_args)
394 el.engine_args = copy.deepcopy(self.engine_args)
396 el.on_stop(self._notice_engine_stopped)
395 el.on_stop(self._notice_engine_stopped)
@@ -612,7 +611,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
612 else:
611 else:
613 user=None
612 user=None
614 for i in range(n):
613 for i in range(n):
615 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
614 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
616
615
617 # Copy the engine args over to each engine launcher.
616 # Copy the engine args over to each engine launcher.
618 i
617 i
@@ -19,21 +19,23 b' import sys'
19 import zmq
19 import zmq
20 from zmq.eventloop import ioloop, zmqstream
20 from zmq.eventloop import ioloop, zmqstream
21
21
22 from IPython.config.configurable import Configurable
22 from IPython.utils.traitlets import Int, Unicode, Instance, List
23 from IPython.utils.traitlets import Int, Unicode, Instance, List
23
24
24 from IPython.parallel.factory import LoggingFactory
25
26 #-----------------------------------------------------------------------------
25 #-----------------------------------------------------------------------------
27 # Classes
26 # Classes
28 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
29
28
30
29
31 class LogWatcher(LoggingFactory):
30 class LogWatcher(Configurable):
32 """A simple class that receives messages on a SUB socket, as published
31 """A simple class that receives messages on a SUB socket, as published
33 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
32 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
34
33
35 This can subscribe to multiple topics, but defaults to all topics.
34 This can subscribe to multiple topics, but defaults to all topics.
36 """
35 """
36
37 log = Instance('logging.Logger', ('root',))
38
37 # configurables
39 # configurables
38 topics = List([''], config=True,
40 topics = List([''], config=True,
39 help="The ZMQ topics to subscribe to. Default is to subscribe to all messages")
41 help="The ZMQ topics to subscribe to. Default is to subscribe to all messages")
@@ -18,8 +18,8 b' import zmq'
18 from zmq.devices import ThreadDevice
18 from zmq.devices import ThreadDevice
19 from zmq.eventloop import ioloop, zmqstream
19 from zmq.eventloop import ioloop, zmqstream
20
20
21 from IPython.config.configurable import Configurable
21 from IPython.utils.traitlets import Set, Instance, CFloat
22 from IPython.utils.traitlets import Set, Instance, CFloat
22 from IPython.parallel.factory import LoggingFactory
23
23
24 class Heart(object):
24 class Heart(object):
25 """A basic heart object for responding to a HeartMonitor.
25 """A basic heart object for responding to a HeartMonitor.
@@ -47,7 +47,7 b' class Heart(object):'
47 def start(self):
47 def start(self):
48 return self.device.start()
48 return self.device.start()
49
49
50 class HeartMonitor(LoggingFactory):
50 class HeartMonitor(Configurable):
51 """A basic HeartMonitor class
51 """A basic HeartMonitor class
52 pingstream: a PUB stream
52 pingstream: a PUB stream
53 pongstream: an XREP stream
53 pongstream: an XREP stream
@@ -58,6 +58,7 b' class HeartMonitor(LoggingFactory):'
58 ' (in ms) [default: 100]',
58 ' (in ms) [default: 100]',
59 )
59 )
60
60
61 log = Instance('logging.Logger', ('root',))
61 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
62 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
62 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
63 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
63 loop = Instance('zmq.eventloop.ioloop.IOLoop')
64 loop = Instance('zmq.eventloop.ioloop.IOLoop')
@@ -31,7 +31,9 b' from IPython.utils.traitlets import ('
31 from IPython.utils.jsonutil import ISO8601, extract_dates
31 from IPython.utils.jsonutil import ISO8601, extract_dates
32
32
33 from IPython.parallel import error, util
33 from IPython.parallel import error, util
34 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
34 from IPython.parallel.factory import RegistrationFactory
35
36 from IPython.zmq.session import SessionFactory
35
37
36 from .heartmonitor import HeartMonitor
38 from .heartmonitor import HeartMonitor
37
39
@@ -231,8 +233,10 b' class HubFactory(RegistrationFactory):'
231 hpub.bind(engine_iface % self.hb[0])
233 hpub.bind(engine_iface % self.hb[0])
232 hrep = ctx.socket(zmq.XREP)
234 hrep = ctx.socket(zmq.XREP)
233 hrep.bind(engine_iface % self.hb[1])
235 hrep.bind(engine_iface % self.hb[1])
234 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
236 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
235 config=self.config)
237 pingstream=ZMQStream(hpub,loop),
238 pongstream=ZMQStream(hrep,loop)
239 )
236
240
237 ### Client connections ###
241 ### Client connections ###
238 # Notifier socket
242 # Notifier socket
@@ -287,10 +291,10 b' class HubFactory(RegistrationFactory):'
287 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
291 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
288 query=q, notifier=n, resubmit=r, db=self.db,
292 query=q, notifier=n, resubmit=r, db=self.db,
289 engine_info=self.engine_info, client_info=self.client_info,
293 engine_info=self.engine_info, client_info=self.client_info,
290 logname=self.log.name)
294 log=self.log)
291
295
292
296
293 class Hub(LoggingFactory):
297 class Hub(SessionFactory):
294 """The IPython Controller Hub with 0MQ connections
298 """The IPython Controller Hub with 0MQ connections
295
299
296 Parameters
300 Parameters
@@ -328,7 +332,6 b' class Hub(LoggingFactory):'
328 _idcounter=Int(0)
332 _idcounter=Int(0)
329
333
330 # objects from constructor:
334 # objects from constructor:
331 loop=Instance(ioloop.IOLoop)
332 query=Instance(ZMQStream)
335 query=Instance(ZMQStream)
333 monitor=Instance(ZMQStream)
336 monitor=Instance(ZMQStream)
334 notifier=Instance(ZMQStream)
337 notifier=Instance(ZMQStream)
@@ -612,7 +612,8 b' class TaskScheduler(SessionFactory):'
612 for msg_id in jobs:
612 for msg_id in jobs:
613 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
613 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
614
614
615 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
615 if after.unreachable(self.all_completed, self.all_failed)\
616 or follow.unreachable(self.all_completed, self.all_failed):
616 self.fail_unreachable(msg_id)
617 self.fail_unreachable(msg_id)
617
618
618 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
619 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
@@ -643,9 +644,9 b' class TaskScheduler(SessionFactory):'
643
644
644
645
645
646
646 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
647 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
647 log_url=None, loglevel=logging.DEBUG,
648 logname='root', log_url=None, loglevel=logging.DEBUG,
648 identity=b'task'):
649 identity=b'task'):
649 from zmq.eventloop import ioloop
650 from zmq.eventloop import ioloop
650 from zmq.eventloop.zmqstream import ZMQStream
651 from zmq.eventloop.zmqstream import ZMQStream
651
652
@@ -671,13 +672,13 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='
671 # setup logging. Note that these will not work in-process, because they clobber
672 # setup logging. Note that these will not work in-process, because they clobber
672 # existing loggers.
673 # existing loggers.
673 if log_url:
674 if log_url:
674 connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
675 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
675 else:
676 else:
676 local_logger(logname, loglevel)
677 log = local_logger(logname, loglevel)
677
678
678 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
679 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
679 mon_stream=mons, notifier_stream=nots,
680 mon_stream=mons, notifier_stream=nots,
680 loop=loop, logname=logname,
681 loop=loop, log=log,
681 config=config)
682 config=config)
682 scheduler.start()
683 scheduler.start()
683 try:
684 try:
@@ -1,7 +1,7 b''
1 """Base config factories."""
1 """Base config factories."""
2
2
3 #-----------------------------------------------------------------------------
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2008-2009 The IPython Development Team
4 # Copyright (C) 2010-2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
@@ -22,39 +22,12 b' from IPython.config.configurable import Configurable'
22 from IPython.utils.traitlets import Int, Instance, Unicode
22 from IPython.utils.traitlets import Int, Instance, Unicode
23
23
24 from IPython.parallel.util import select_random_ports
24 from IPython.parallel.util import select_random_ports
25 from IPython.zmq.session import Session
25 from IPython.zmq.session import Session, SessionFactory
26
26
27 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
28 # Classes
28 # Classes
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30 class LoggingFactory(Configurable):
31 """A most basic class, that has a `log` (type:`Logger`) attribute, set via a `logname` Trait."""
32 log = Instance('logging.Logger', ('ZMQ', logging.WARN))
33 logname = Unicode('ZMQ')
34 def _logname_changed(self, name, old, new):
35 self.log = logging.getLogger(new)
36
37
30
38 class SessionFactory(LoggingFactory):
39 """The Base factory from which every factory in IPython.parallel inherits"""
40
41 # not configurable:
42 context = Instance('zmq.Context')
43 def _context_default(self):
44 return zmq.Context.instance()
45
46 session = Instance('IPython.zmq.session.Session')
47 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
48 def _loop_default(self):
49 return IOLoop.instance()
50
51
52 def __init__(self, **kwargs):
53 super(SessionFactory, self).__init__(**kwargs)
54
55 # construct the session
56 self.session = Session(**kwargs)
57
58
31
59 class RegistrationFactory(SessionFactory):
32 class RegistrationFactory(SessionFactory):
60 """The Base Configurable for objects that involve registration."""
33 """The Base Configurable for objects that involve registration."""
@@ -452,6 +452,7 b' def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):'
452 handler.setLevel(loglevel)
452 handler.setLevel(loglevel)
453 logger.addHandler(handler)
453 logger.addHandler(handler)
454 logger.setLevel(loglevel)
454 logger.setLevel(loglevel)
455 return logger
455
456
456 def local_logger(logname, loglevel=logging.DEBUG):
457 def local_logger(logname, loglevel=logging.DEBUG):
457 loglevel = integer_loglevel(loglevel)
458 loglevel = integer_loglevel(loglevel)
@@ -463,4 +464,5 b' def local_logger(logname, loglevel=logging.DEBUG):'
463 handler.setLevel(loglevel)
464 handler.setLevel(loglevel)
464 logger.addHandler(handler)
465 logger.addHandler(handler)
465 logger.setLevel(loglevel)
466 logger.setLevel(loglevel)
467 return logger
466
468
@@ -13,6 +13,7 b''
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 import hmac
15 import hmac
16 import logging
16 import os
17 import os
17 import pprint
18 import pprint
18 import uuid
19 import uuid
@@ -27,6 +28,7 b' except:'
27
28
28 import zmq
29 import zmq
29 from zmq.utils import jsonapi
30 from zmq.utils import jsonapi
31 from zmq.eventloop.ioloop import IOLoop
30 from zmq.eventloop.zmqstream import ZMQStream
32 from zmq.eventloop.zmqstream import ZMQStream
31
33
32 from IPython.config.configurable import Configurable
34 from IPython.config.configurable import Configurable
@@ -73,6 +75,36 b' DELIM="<IDS|MSG>"'
73 # Classes
75 # Classes
74 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
75
77
78 class SessionFactory(Configurable):
79 """The Base class for configurables that have a Session, Context, logger,
80 and IOLoop.
81 """
82
83 log = Instance('logging.Logger', ('', logging.WARN))
84
85 logname = Unicode('')
86 def _logname_changed(self, name, old, new):
87 self.log = logging.getLogger(new)
88
89 # not configurable:
90 context = Instance('zmq.Context')
91 def _context_default(self):
92 return zmq.Context.instance()
93
94 session = Instance('IPython.zmq.session.Session')
95
96 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
97 def _loop_default(self):
98 return IOLoop.instance()
99
100 def __init__(self, **kwargs):
101 super(SessionFactory, self).__init__(**kwargs)
102
103 if self.session is None:
104 # construct the session
105 self.session = Session(**kwargs)
106
107
76 class Message(object):
108 class Message(object):
77 """A simple message object that maps dict keys to attributes.
109 """A simple message object that maps dict keys to attributes.
78
110
@@ -143,7 +175,7 b' class Session(Configurable):'
143 else:
175 else:
144 self.pack = import_item(new)
176 self.pack = import_item(new)
145
177
146 unpacker = Unicode('json',config=True,
178 unpacker = Unicode('json', config=True,
147 help="""The name of the unpacker for unserializing messages.
179 help="""The name of the unpacker for unserializing messages.
148 Only used with custom functions for `packer`.""")
180 Only used with custom functions for `packer`.""")
149 def _unpacker_changed(self, name, old, new):
181 def _unpacker_changed(self, name, old, new):
@@ -156,7 +188,7 b' class Session(Configurable):'
156 else:
188 else:
157 self.unpack = import_item(new)
189 self.unpack = import_item(new)
158
190
159 session = CStr('',config=True,
191 session = CStr('', config=True,
160 help="""The UUID identifying this session.""")
192 help="""The UUID identifying this session.""")
161 def _session_default(self):
193 def _session_default(self):
162 return bytes(uuid.uuid4())
194 return bytes(uuid.uuid4())
General Comments 0
You need to be logged in to leave comments. Login now