##// END OF EJS Templates
improved logging + Hub,Engine,Scheduler are Configurable
MinRK -
Show More
@@ -1,27 +1,23 b''
1 import logging
1 import logging
2 from logging import INFO, DEBUG, WARN, ERROR, FATAL
2 from logging import INFO, DEBUG, WARN, ERROR, FATAL
3
3
4 import zmq
4 import zmq
5 from zmq.log.handlers import PUBHandler
5 from zmq.log.handlers import PUBHandler
6
6
7 class EnginePUBHandler(PUBHandler):
7 class EnginePUBHandler(PUBHandler):
8 """A simple PUBHandler subclass that sets root_topic"""
8 """A simple PUBHandler subclass that sets root_topic"""
9 engine=None
9 engine=None
10
10
11 def __init__(self, engine, *args, **kwargs):
11 def __init__(self, engine, *args, **kwargs):
12 PUBHandler.__init__(self,*args, **kwargs)
12 PUBHandler.__init__(self,*args, **kwargs)
13 self.engine = engine
13 self.engine = engine
14
14
15 @property
15 @property
16 def root_topic(self):
16 def root_topic(self):
17 """this is a property, in case the handler is created
17 """this is a property, in case the handler is created
18 before the engine gets registered with an id"""
18 before the engine gets registered with an id"""
19 if isinstance(getattr(self.engine, 'id', None), int):
19 if isinstance(getattr(self.engine, 'id', None), int):
20 return "engine.%i"%self.engine.id
20 return "engine.%i"%self.engine.id
21 else:
21 else:
22 return "engine"
22 return "engine"
23
23
24
25 logger = logging.getLogger('ipzmq')
26 logger.setLevel(logging.DEBUG)
27
@@ -1,1177 +1,1172 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import time
14 import time
15 from getpass import getpass
15 from getpass import getpass
16 from pprint import pprint
16 from pprint import pprint
17 from datetime import datetime
17 from datetime import datetime
18
18
19 import zmq
19 import zmq
20 from zmq.eventloop import ioloop, zmqstream
20 from zmq.eventloop import ioloop, zmqstream
21
21
22 from IPython.external.decorator import decorator
22 from IPython.external.decorator import decorator
23 from IPython.zmq import tunnel
23 from IPython.zmq import tunnel
24
24
25 import streamsession as ss
25 import streamsession as ss
26 # from remotenamespace import RemoteNamespace
26 # from remotenamespace import RemoteNamespace
27 from view import DirectView, LoadBalancedView
27 from view import DirectView, LoadBalancedView
28 from dependency import Dependency, depend, require
28 from dependency import Dependency, depend, require
29 import error
29 import error
30 import map as Map
30 import map as Map
31 from asyncresult import AsyncResult, AsyncMapResult
31 from asyncresult import AsyncResult, AsyncMapResult
32 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
32 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
33 from util import ReverseDict
33 from util import ReverseDict
34
34
35 #--------------------------------------------------------------------------
35 #--------------------------------------------------------------------------
36 # helpers for implementing old MEC API via client.apply
36 # helpers for implementing old MEC API via client.apply
37 #--------------------------------------------------------------------------
37 #--------------------------------------------------------------------------
38
38
39 def _push(ns):
39 def _push(ns):
40 """helper method for implementing `client.push` via `client.apply`"""
40 """helper method for implementing `client.push` via `client.apply`"""
41 globals().update(ns)
41 globals().update(ns)
42
42
43 def _pull(keys):
43 def _pull(keys):
44 """helper method for implementing `client.pull` via `client.apply`"""
44 """helper method for implementing `client.pull` via `client.apply`"""
45 g = globals()
45 g = globals()
46 if isinstance(keys, (list,tuple, set)):
46 if isinstance(keys, (list,tuple, set)):
47 for key in keys:
47 for key in keys:
48 if not g.has_key(key):
48 if not g.has_key(key):
49 raise NameError("name '%s' is not defined"%key)
49 raise NameError("name '%s' is not defined"%key)
50 return map(g.get, keys)
50 return map(g.get, keys)
51 else:
51 else:
52 if not g.has_key(keys):
52 if not g.has_key(keys):
53 raise NameError("name '%s' is not defined"%keys)
53 raise NameError("name '%s' is not defined"%keys)
54 return g.get(keys)
54 return g.get(keys)
55
55
56 def _clear():
56 def _clear():
57 """helper method for implementing `client.clear` via `client.apply`"""
57 """helper method for implementing `client.clear` via `client.apply`"""
58 globals().clear()
58 globals().clear()
59
59
60 def _execute(code):
60 def _execute(code):
61 """helper method for implementing `client.execute` via `client.apply`"""
61 """helper method for implementing `client.execute` via `client.apply`"""
62 exec code in globals()
62 exec code in globals()
63
63
64
64
65 #--------------------------------------------------------------------------
65 #--------------------------------------------------------------------------
66 # Decorators for Client methods
66 # Decorators for Client methods
67 #--------------------------------------------------------------------------
67 #--------------------------------------------------------------------------
68
68
69 @decorator
69 @decorator
70 def spinfirst(f, self, *args, **kwargs):
70 def spinfirst(f, self, *args, **kwargs):
71 """Call spin() to sync state prior to calling the method."""
71 """Call spin() to sync state prior to calling the method."""
72 self.spin()
72 self.spin()
73 return f(self, *args, **kwargs)
73 return f(self, *args, **kwargs)
74
74
75 @decorator
75 @decorator
76 def defaultblock(f, self, *args, **kwargs):
76 def defaultblock(f, self, *args, **kwargs):
77 """Default to self.block; preserve self.block."""
77 """Default to self.block; preserve self.block."""
78 block = kwargs.get('block',None)
78 block = kwargs.get('block',None)
79 block = self.block if block is None else block
79 block = self.block if block is None else block
80 saveblock = self.block
80 saveblock = self.block
81 self.block = block
81 self.block = block
82 try:
82 try:
83 ret = f(self, *args, **kwargs)
83 ret = f(self, *args, **kwargs)
84 finally:
84 finally:
85 self.block = saveblock
85 self.block = saveblock
86 return ret
86 return ret
87
87
88
88
89 #--------------------------------------------------------------------------
89 #--------------------------------------------------------------------------
90 # Classes
90 # Classes
91 #--------------------------------------------------------------------------
91 #--------------------------------------------------------------------------
92
92
93 class AbortedTask(object):
94 """A basic wrapper object describing an aborted task."""
95 def __init__(self, msg_id):
96 self.msg_id = msg_id
97
98 class ResultDict(dict):
93 class ResultDict(dict):
99 """A subclass of dict that raises errors if it has them."""
94 """A subclass of dict that raises errors if it has them."""
100 def __getitem__(self, key):
95 def __getitem__(self, key):
101 res = dict.__getitem__(self, key)
96 res = dict.__getitem__(self, key)
102 if isinstance(res, error.KernelError):
97 if isinstance(res, error.KernelError):
103 raise res
98 raise res
104 return res
99 return res
105
100
106 class Metadata(dict):
101 class Metadata(dict):
107 """Subclass of dict for initializing metadata values."""
102 """Subclass of dict for initializing metadata values."""
108 def __init__(self, *args, **kwargs):
103 def __init__(self, *args, **kwargs):
109 dict.__init__(self)
104 dict.__init__(self)
110 md = {'msg_id' : None,
105 md = {'msg_id' : None,
111 'submitted' : None,
106 'submitted' : None,
112 'started' : None,
107 'started' : None,
113 'completed' : None,
108 'completed' : None,
114 'received' : None,
109 'received' : None,
115 'engine_uuid' : None,
110 'engine_uuid' : None,
116 'engine_id' : None,
111 'engine_id' : None,
117 'follow' : None,
112 'follow' : None,
118 'after' : None,
113 'after' : None,
119 'status' : None,
114 'status' : None,
120
115
121 'pyin' : None,
116 'pyin' : None,
122 'pyout' : None,
117 'pyout' : None,
123 'pyerr' : None,
118 'pyerr' : None,
124 'stdout' : '',
119 'stdout' : '',
125 'stderr' : '',
120 'stderr' : '',
126 }
121 }
127 self.update(md)
122 self.update(md)
128 self.update(dict(*args, **kwargs))
123 self.update(dict(*args, **kwargs))
129
124
130
125
131
126
132 class Client(object):
127 class Client(object):
133 """A semi-synchronous client to the IPython ZMQ controller
128 """A semi-synchronous client to the IPython ZMQ controller
134
129
135 Parameters
130 Parameters
136 ----------
131 ----------
137
132
138 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
133 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
139 The address of the controller's registration socket.
134 The address of the controller's registration socket.
140 [Default: 'tcp://127.0.0.1:10101']
135 [Default: 'tcp://127.0.0.1:10101']
141 context : zmq.Context
136 context : zmq.Context
142 Pass an existing zmq.Context instance, otherwise the client will create its own
137 Pass an existing zmq.Context instance, otherwise the client will create its own
143 username : bytes
138 username : bytes
144 set username to be passed to the Session object
139 set username to be passed to the Session object
145 debug : bool
140 debug : bool
146 flag for lots of message printing for debug purposes
141 flag for lots of message printing for debug purposes
147
142
148 #-------------- ssh related args ----------------
143 #-------------- ssh related args ----------------
149 # These are args for configuring the ssh tunnel to be used
144 # These are args for configuring the ssh tunnel to be used
150 # credentials are used to forward connections over ssh to the Controller
145 # credentials are used to forward connections over ssh to the Controller
151 # Note that the ip given in `addr` needs to be relative to sshserver
146 # Note that the ip given in `addr` needs to be relative to sshserver
152 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
147 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
153 # and set sshserver as the same machine the Controller is on. However,
148 # and set sshserver as the same machine the Controller is on. However,
154 # the only requirement is that sshserver is able to see the Controller
149 # the only requirement is that sshserver is able to see the Controller
155 # (i.e. is within the same trusted network).
150 # (i.e. is within the same trusted network).
156
151
157 sshserver : str
152 sshserver : str
158 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
153 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
159 If keyfile or password is specified, and this is not, it will default to
154 If keyfile or password is specified, and this is not, it will default to
160 the ip given in addr.
155 the ip given in addr.
161 sshkey : str; path to public ssh key file
156 sshkey : str; path to public ssh key file
162 This specifies a key to be used in ssh login, default None.
157 This specifies a key to be used in ssh login, default None.
163 Regular default ssh keys will be used without specifying this argument.
158 Regular default ssh keys will be used without specifying this argument.
164 password : str;
159 password : str;
165 Your ssh password to sshserver. Note that if this is left None,
160 Your ssh password to sshserver. Note that if this is left None,
166 you will be prompted for it if passwordless key based login is unavailable.
161 you will be prompted for it if passwordless key based login is unavailable.
167
162
168 #------- exec authentication args -------
163 #------- exec authentication args -------
169 # If even localhost is untrusted, you can have some protection against
164 # If even localhost is untrusted, you can have some protection against
170 # unauthorized execution by using a key. Messages are still sent
165 # unauthorized execution by using a key. Messages are still sent
171 # as cleartext, so if someone can snoop your loopback traffic this will
166 # as cleartext, so if someone can snoop your loopback traffic this will
172 # not help anything.
167 # not help anything.
173
168
174 exec_key : str
169 exec_key : str
175 an authentication key or file containing a key
170 an authentication key or file containing a key
176 default: None
171 default: None
177
172
178
173
179 Attributes
174 Attributes
180 ----------
175 ----------
181 ids : set of int engine IDs
176 ids : set of int engine IDs
182 requesting the ids attribute always synchronizes
177 requesting the ids attribute always synchronizes
183 the registration state. To request ids without synchronization,
178 the registration state. To request ids without synchronization,
184 use semi-private _ids attributes.
179 use semi-private _ids attributes.
185
180
186 history : list of msg_ids
181 history : list of msg_ids
187 a list of msg_ids, keeping track of all the execution
182 a list of msg_ids, keeping track of all the execution
188 messages you have submitted in order.
183 messages you have submitted in order.
189
184
190 outstanding : set of msg_ids
185 outstanding : set of msg_ids
191 a set of msg_ids that have been submitted, but whose
186 a set of msg_ids that have been submitted, but whose
192 results have not yet been received.
187 results have not yet been received.
193
188
194 results : dict
189 results : dict
195 a dict of all our results, keyed by msg_id
190 a dict of all our results, keyed by msg_id
196
191
197 block : bool
192 block : bool
198 determines default behavior when block not specified
193 determines default behavior when block not specified
199 in execution methods
194 in execution methods
200
195
201 Methods
196 Methods
202 -------
197 -------
203 spin : flushes incoming results and registration state changes
198 spin : flushes incoming results and registration state changes
204 control methods spin, and requesting `ids` also ensures up to date
199 control methods spin, and requesting `ids` also ensures up to date
205
200
206 barrier : wait on one or more msg_ids
201 barrier : wait on one or more msg_ids
207
202
208 execution methods: apply/apply_bound/apply_to/apply_bound
203 execution methods: apply/apply_bound/apply_to/apply_bound
209 legacy: execute, run
204 legacy: execute, run
210
205
211 query methods: queue_status, get_result, purge
206 query methods: queue_status, get_result, purge
212
207
213 control methods: abort, kill
208 control methods: abort, kill
214
209
215 """
210 """
216
211
217
212
218 _connected=False
213 _connected=False
219 _ssh=False
214 _ssh=False
220 _engines=None
215 _engines=None
221 _addr='tcp://127.0.0.1:10101'
216 _addr='tcp://127.0.0.1:10101'
222 _registration_socket=None
217 _registration_socket=None
223 _query_socket=None
218 _query_socket=None
224 _control_socket=None
219 _control_socket=None
225 _iopub_socket=None
220 _iopub_socket=None
226 _notification_socket=None
221 _notification_socket=None
227 _mux_socket=None
222 _mux_socket=None
228 _task_socket=None
223 _task_socket=None
229 block = False
224 block = False
230 outstanding=None
225 outstanding=None
231 results = None
226 results = None
232 history = None
227 history = None
233 debug = False
228 debug = False
234 targets = None
229 targets = None
235
230
236 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
231 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
237 sshserver=None, sshkey=None, password=None, paramiko=None,
232 sshserver=None, sshkey=None, password=None, paramiko=None,
238 exec_key=None,):
233 exec_key=None,):
239 if context is None:
234 if context is None:
240 context = zmq.Context()
235 context = zmq.Context()
241 self.context = context
236 self.context = context
242 self.targets = 'all'
237 self.targets = 'all'
243 self._addr = addr
238 self._addr = addr
244 self._ssh = bool(sshserver or sshkey or password)
239 self._ssh = bool(sshserver or sshkey or password)
245 if self._ssh and sshserver is None:
240 if self._ssh and sshserver is None:
246 # default to the same
241 # default to the same
247 sshserver = addr.split('://')[1].split(':')[0]
242 sshserver = addr.split('://')[1].split(':')[0]
248 if self._ssh and password is None:
243 if self._ssh and password is None:
249 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
244 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
250 password=False
245 password=False
251 else:
246 else:
252 password = getpass("SSH Password for %s: "%sshserver)
247 password = getpass("SSH Password for %s: "%sshserver)
253 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
248 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
254
249
255 if exec_key is not None and os.path.isfile(exec_key):
250 if exec_key is not None and os.path.isfile(exec_key):
256 arg = 'keyfile'
251 arg = 'keyfile'
257 else:
252 else:
258 arg = 'key'
253 arg = 'key'
259 key_arg = {arg:exec_key}
254 key_arg = {arg:exec_key}
260 if username is None:
255 if username is None:
261 self.session = ss.StreamSession(**key_arg)
256 self.session = ss.StreamSession(**key_arg)
262 else:
257 else:
263 self.session = ss.StreamSession(username, **key_arg)
258 self.session = ss.StreamSession(username, **key_arg)
264 self._registration_socket = self.context.socket(zmq.XREQ)
259 self._registration_socket = self.context.socket(zmq.XREQ)
265 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
260 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
266 if self._ssh:
261 if self._ssh:
267 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
262 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
268 else:
263 else:
269 self._registration_socket.connect(addr)
264 self._registration_socket.connect(addr)
270 self._engines = ReverseDict()
265 self._engines = ReverseDict()
271 self._ids = set()
266 self._ids = set()
272 self.outstanding=set()
267 self.outstanding=set()
273 self.results = {}
268 self.results = {}
274 self.metadata = {}
269 self.metadata = {}
275 self.history = []
270 self.history = []
276 self.debug = debug
271 self.debug = debug
277 self.session.debug = debug
272 self.session.debug = debug
278
273
279 self._notification_handlers = {'registration_notification' : self._register_engine,
274 self._notification_handlers = {'registration_notification' : self._register_engine,
280 'unregistration_notification' : self._unregister_engine,
275 'unregistration_notification' : self._unregister_engine,
281 }
276 }
282 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
277 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
283 'apply_reply' : self._handle_apply_reply}
278 'apply_reply' : self._handle_apply_reply}
284 self._connect(sshserver, ssh_kwargs)
279 self._connect(sshserver, ssh_kwargs)
285
280
286
281
287 @property
282 @property
288 def ids(self):
283 def ids(self):
289 """Always up to date ids property."""
284 """Always up to date ids property."""
290 self._flush_notifications()
285 self._flush_notifications()
291 return self._ids
286 return self._ids
292
287
293 def _update_engines(self, engines):
288 def _update_engines(self, engines):
294 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
289 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
295 for k,v in engines.iteritems():
290 for k,v in engines.iteritems():
296 eid = int(k)
291 eid = int(k)
297 self._engines[eid] = bytes(v) # force not unicode
292 self._engines[eid] = bytes(v) # force not unicode
298 self._ids.add(eid)
293 self._ids.add(eid)
299
294
300 def _build_targets(self, targets):
295 def _build_targets(self, targets):
301 """Turn valid target IDs or 'all' into two lists:
296 """Turn valid target IDs or 'all' into two lists:
302 (int_ids, uuids).
297 (int_ids, uuids).
303 """
298 """
304 if targets is None:
299 if targets is None:
305 targets = self._ids
300 targets = self._ids
306 elif isinstance(targets, str):
301 elif isinstance(targets, str):
307 if targets.lower() == 'all':
302 if targets.lower() == 'all':
308 targets = self._ids
303 targets = self._ids
309 else:
304 else:
310 raise TypeError("%r not valid str target, must be 'all'"%(targets))
305 raise TypeError("%r not valid str target, must be 'all'"%(targets))
311 elif isinstance(targets, int):
306 elif isinstance(targets, int):
312 targets = [targets]
307 targets = [targets]
313 return [self._engines[t] for t in targets], list(targets)
308 return [self._engines[t] for t in targets], list(targets)
314
309
315 def _connect(self, sshserver, ssh_kwargs):
310 def _connect(self, sshserver, ssh_kwargs):
316 """setup all our socket connections to the controller. This is called from
311 """setup all our socket connections to the controller. This is called from
317 __init__."""
312 __init__."""
318 if self._connected:
313 if self._connected:
319 return
314 return
320 self._connected=True
315 self._connected=True
321
316
322 def connect_socket(s, addr):
317 def connect_socket(s, addr):
323 if self._ssh:
318 if self._ssh:
324 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
319 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
325 else:
320 else:
326 return s.connect(addr)
321 return s.connect(addr)
327
322
328 self.session.send(self._registration_socket, 'connection_request')
323 self.session.send(self._registration_socket, 'connection_request')
329 idents,msg = self.session.recv(self._registration_socket,mode=0)
324 idents,msg = self.session.recv(self._registration_socket,mode=0)
330 if self.debug:
325 if self.debug:
331 pprint(msg)
326 pprint(msg)
332 msg = ss.Message(msg)
327 msg = ss.Message(msg)
333 content = msg.content
328 content = msg.content
334 if content.status == 'ok':
329 if content.status == 'ok':
335 if content.queue:
330 if content.mux:
336 self._mux_socket = self.context.socket(zmq.PAIR)
331 self._mux_socket = self.context.socket(zmq.PAIR)
337 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
332 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
338 connect_socket(self._mux_socket, content.queue)
333 connect_socket(self._mux_socket, content.mux)
339 if content.task:
334 if content.task:
340 self._task_socket = self.context.socket(zmq.PAIR)
335 self._task_socket = self.context.socket(zmq.PAIR)
341 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
336 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
342 connect_socket(self._task_socket, content.task)
337 connect_socket(self._task_socket, content.task)
343 if content.notification:
338 if content.notification:
344 self._notification_socket = self.context.socket(zmq.SUB)
339 self._notification_socket = self.context.socket(zmq.SUB)
345 connect_socket(self._notification_socket, content.notification)
340 connect_socket(self._notification_socket, content.notification)
346 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
341 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
347 if content.query:
342 if content.query:
348 self._query_socket = self.context.socket(zmq.PAIR)
343 self._query_socket = self.context.socket(zmq.PAIR)
349 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
344 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
350 connect_socket(self._query_socket, content.query)
345 connect_socket(self._query_socket, content.query)
351 if content.control:
346 if content.control:
352 self._control_socket = self.context.socket(zmq.PAIR)
347 self._control_socket = self.context.socket(zmq.PAIR)
353 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
348 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
354 connect_socket(self._control_socket, content.control)
349 connect_socket(self._control_socket, content.control)
355 if content.iopub:
350 if content.iopub:
356 self._iopub_socket = self.context.socket(zmq.SUB)
351 self._iopub_socket = self.context.socket(zmq.SUB)
357 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
352 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
358 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
353 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
359 connect_socket(self._iopub_socket, content.iopub)
354 connect_socket(self._iopub_socket, content.iopub)
360 self._update_engines(dict(content.engines))
355 self._update_engines(dict(content.engines))
361
356
362 else:
357 else:
363 self._connected = False
358 self._connected = False
364 raise Exception("Failed to connect!")
359 raise Exception("Failed to connect!")
365
360
366 #--------------------------------------------------------------------------
361 #--------------------------------------------------------------------------
367 # handlers and callbacks for incoming messages
362 # handlers and callbacks for incoming messages
368 #--------------------------------------------------------------------------
363 #--------------------------------------------------------------------------
369
364
370 def _register_engine(self, msg):
365 def _register_engine(self, msg):
371 """Register a new engine, and update our connection info."""
366 """Register a new engine, and update our connection info."""
372 content = msg['content']
367 content = msg['content']
373 eid = content['id']
368 eid = content['id']
374 d = {eid : content['queue']}
369 d = {eid : content['queue']}
375 self._update_engines(d)
370 self._update_engines(d)
376 self._ids.add(int(eid))
371 self._ids.add(int(eid))
377
372
378 def _unregister_engine(self, msg):
373 def _unregister_engine(self, msg):
379 """Unregister an engine that has died."""
374 """Unregister an engine that has died."""
380 content = msg['content']
375 content = msg['content']
381 eid = int(content['id'])
376 eid = int(content['id'])
382 if eid in self._ids:
377 if eid in self._ids:
383 self._ids.remove(eid)
378 self._ids.remove(eid)
384 self._engines.pop(eid)
379 self._engines.pop(eid)
385
380
386 def _extract_metadata(self, header, parent, content):
381 def _extract_metadata(self, header, parent, content):
387 md = {'msg_id' : parent['msg_id'],
382 md = {'msg_id' : parent['msg_id'],
388 'submitted' : datetime.strptime(parent['date'], ss.ISO8601),
383 'submitted' : datetime.strptime(parent['date'], ss.ISO8601),
389 'started' : datetime.strptime(header['started'], ss.ISO8601),
384 'started' : datetime.strptime(header['started'], ss.ISO8601),
390 'completed' : datetime.strptime(header['date'], ss.ISO8601),
385 'completed' : datetime.strptime(header['date'], ss.ISO8601),
391 'received' : datetime.now(),
386 'received' : datetime.now(),
392 'engine_uuid' : header['engine'],
387 'engine_uuid' : header['engine'],
393 'engine_id' : self._engines.get(header['engine'], None),
388 'engine_id' : self._engines.get(header['engine'], None),
394 'follow' : parent['follow'],
389 'follow' : parent['follow'],
395 'after' : parent['after'],
390 'after' : parent['after'],
396 'status' : content['status'],
391 'status' : content['status'],
397 }
392 }
398 return md
393 return md
399
394
400 def _handle_execute_reply(self, msg):
395 def _handle_execute_reply(self, msg):
401 """Save the reply to an execute_request into our results.
396 """Save the reply to an execute_request into our results.
402
397
403 execute messages are never actually used. apply is used instead.
398 execute messages are never actually used. apply is used instead.
404 """
399 """
405
400
406 parent = msg['parent_header']
401 parent = msg['parent_header']
407 msg_id = parent['msg_id']
402 msg_id = parent['msg_id']
408 if msg_id not in self.outstanding:
403 if msg_id not in self.outstanding:
409 print("got unknown result: %s"%msg_id)
404 print("got unknown result: %s"%msg_id)
410 else:
405 else:
411 self.outstanding.remove(msg_id)
406 self.outstanding.remove(msg_id)
412 self.results[msg_id] = ss.unwrap_exception(msg['content'])
407 self.results[msg_id] = ss.unwrap_exception(msg['content'])
413
408
414 def _handle_apply_reply(self, msg):
409 def _handle_apply_reply(self, msg):
415 """Save the reply to an apply_request into our results."""
410 """Save the reply to an apply_request into our results."""
416 parent = msg['parent_header']
411 parent = msg['parent_header']
417 msg_id = parent['msg_id']
412 msg_id = parent['msg_id']
418 if msg_id not in self.outstanding:
413 if msg_id not in self.outstanding:
419 print ("got unknown result: %s"%msg_id)
414 print ("got unknown result: %s"%msg_id)
420 else:
415 else:
421 self.outstanding.remove(msg_id)
416 self.outstanding.remove(msg_id)
422 content = msg['content']
417 content = msg['content']
423 header = msg['header']
418 header = msg['header']
424
419
425 # construct metadata:
420 # construct metadata:
426 md = self.metadata.setdefault(msg_id, Metadata())
421 md = self.metadata.setdefault(msg_id, Metadata())
427 md.update(self._extract_metadata(header, parent, content))
422 md.update(self._extract_metadata(header, parent, content))
428 self.metadata[msg_id] = md
423 self.metadata[msg_id] = md
429
424
430 # construct result:
425 # construct result:
431 if content['status'] == 'ok':
426 if content['status'] == 'ok':
432 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
427 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
433 elif content['status'] == 'aborted':
428 elif content['status'] == 'aborted':
434 self.results[msg_id] = error.AbortedTask(msg_id)
429 self.results[msg_id] = error.AbortedTask(msg_id)
435 elif content['status'] == 'resubmitted':
430 elif content['status'] == 'resubmitted':
436 # TODO: handle resubmission
431 # TODO: handle resubmission
437 pass
432 pass
438 else:
433 else:
439 e = ss.unwrap_exception(content)
434 e = ss.unwrap_exception(content)
440 e_uuid = e.engine_info['engineid']
435 e_uuid = e.engine_info['engineid']
441 eid = self._engines[e_uuid]
436 eid = self._engines[e_uuid]
442 e.engine_info['engineid'] = eid
437 e.engine_info['engineid'] = eid
443 self.results[msg_id] = e
438 self.results[msg_id] = e
444
439
445 def _flush_notifications(self):
440 def _flush_notifications(self):
446 """Flush notifications of engine registrations waiting
441 """Flush notifications of engine registrations waiting
447 in ZMQ queue."""
442 in ZMQ queue."""
448 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
443 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
449 while msg is not None:
444 while msg is not None:
450 if self.debug:
445 if self.debug:
451 pprint(msg)
446 pprint(msg)
452 msg = msg[-1]
447 msg = msg[-1]
453 msg_type = msg['msg_type']
448 msg_type = msg['msg_type']
454 handler = self._notification_handlers.get(msg_type, None)
449 handler = self._notification_handlers.get(msg_type, None)
455 if handler is None:
450 if handler is None:
456 raise Exception("Unhandled message type: %s"%msg.msg_type)
451 raise Exception("Unhandled message type: %s"%msg.msg_type)
457 else:
452 else:
458 handler(msg)
453 handler(msg)
459 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
454 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
460
455
461 def _flush_results(self, sock):
456 def _flush_results(self, sock):
462 """Flush task or queue results waiting in ZMQ queue."""
457 """Flush task or queue results waiting in ZMQ queue."""
463 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
458 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
464 while msg is not None:
459 while msg is not None:
465 if self.debug:
460 if self.debug:
466 pprint(msg)
461 pprint(msg)
467 msg = msg[-1]
462 msg = msg[-1]
468 msg_type = msg['msg_type']
463 msg_type = msg['msg_type']
469 handler = self._queue_handlers.get(msg_type, None)
464 handler = self._queue_handlers.get(msg_type, None)
470 if handler is None:
465 if handler is None:
471 raise Exception("Unhandled message type: %s"%msg.msg_type)
466 raise Exception("Unhandled message type: %s"%msg.msg_type)
472 else:
467 else:
473 handler(msg)
468 handler(msg)
474 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
469 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
475
470
476 def _flush_control(self, sock):
471 def _flush_control(self, sock):
477 """Flush replies from the control channel waiting
472 """Flush replies from the control channel waiting
478 in the ZMQ queue.
473 in the ZMQ queue.
479
474
480 Currently: ignore them."""
475 Currently: ignore them."""
481 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
476 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
482 while msg is not None:
477 while msg is not None:
483 if self.debug:
478 if self.debug:
484 pprint(msg)
479 pprint(msg)
485 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
480 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
486
481
487 def _flush_iopub(self, sock):
482 def _flush_iopub(self, sock):
488 """Flush replies from the iopub channel waiting
483 """Flush replies from the iopub channel waiting
489 in the ZMQ queue.
484 in the ZMQ queue.
490 """
485 """
491 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
486 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
492 while msg is not None:
487 while msg is not None:
493 if self.debug:
488 if self.debug:
494 pprint(msg)
489 pprint(msg)
495 msg = msg[-1]
490 msg = msg[-1]
496 parent = msg['parent_header']
491 parent = msg['parent_header']
497 msg_id = parent['msg_id']
492 msg_id = parent['msg_id']
498 content = msg['content']
493 content = msg['content']
499 header = msg['header']
494 header = msg['header']
500 msg_type = msg['msg_type']
495 msg_type = msg['msg_type']
501
496
502 # init metadata:
497 # init metadata:
503 md = self.metadata.setdefault(msg_id, Metadata())
498 md = self.metadata.setdefault(msg_id, Metadata())
504
499
505 if msg_type == 'stream':
500 if msg_type == 'stream':
506 name = content['name']
501 name = content['name']
507 s = md[name] or ''
502 s = md[name] or ''
508 md[name] = s + content['data']
503 md[name] = s + content['data']
509 elif msg_type == 'pyerr':
504 elif msg_type == 'pyerr':
510 md.update({'pyerr' : ss.unwrap_exception(content)})
505 md.update({'pyerr' : ss.unwrap_exception(content)})
511 else:
506 else:
512 md.update({msg_type : content['data']})
507 md.update({msg_type : content['data']})
513
508
514 self.metadata[msg_id] = md
509 self.metadata[msg_id] = md
515
510
516 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
511 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
517
512
518 #--------------------------------------------------------------------------
513 #--------------------------------------------------------------------------
519 # getitem
514 # getitem
520 #--------------------------------------------------------------------------
515 #--------------------------------------------------------------------------
521
516
522 def __getitem__(self, key):
517 def __getitem__(self, key):
523 """Dict access returns DirectView multiplexer objects or,
518 """Dict access returns DirectView multiplexer objects or,
524 if key is None, a LoadBalancedView."""
519 if key is None, a LoadBalancedView."""
525 if key is None:
520 if key is None:
526 return LoadBalancedView(self)
521 return LoadBalancedView(self)
527 if isinstance(key, int):
522 if isinstance(key, int):
528 if key not in self.ids:
523 if key not in self.ids:
529 raise IndexError("No such engine: %i"%key)
524 raise IndexError("No such engine: %i"%key)
530 return DirectView(self, key)
525 return DirectView(self, key)
531
526
532 if isinstance(key, slice):
527 if isinstance(key, slice):
533 indices = range(len(self.ids))[key]
528 indices = range(len(self.ids))[key]
534 ids = sorted(self._ids)
529 ids = sorted(self._ids)
535 key = [ ids[i] for i in indices ]
530 key = [ ids[i] for i in indices ]
536 # newkeys = sorted(self._ids)[thekeys[k]]
531 # newkeys = sorted(self._ids)[thekeys[k]]
537
532
538 if isinstance(key, (tuple, list, xrange)):
533 if isinstance(key, (tuple, list, xrange)):
539 _,targets = self._build_targets(list(key))
534 _,targets = self._build_targets(list(key))
540 return DirectView(self, targets)
535 return DirectView(self, targets)
541 else:
536 else:
542 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
537 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
543
538
544 #--------------------------------------------------------------------------
539 #--------------------------------------------------------------------------
545 # Begin public methods
540 # Begin public methods
546 #--------------------------------------------------------------------------
541 #--------------------------------------------------------------------------
547
542
548 @property
543 @property
549 def remote(self):
544 def remote(self):
550 """property for convenient RemoteFunction generation.
545 """property for convenient RemoteFunction generation.
551
546
552 >>> @client.remote
547 >>> @client.remote
553 ... def f():
548 ... def f():
554 import os
549 import os
555 print (os.getpid())
550 print (os.getpid())
556 """
551 """
557 return remote(self, block=self.block)
552 return remote(self, block=self.block)
558
553
559 def spin(self):
554 def spin(self):
560 """Flush any registration notifications and execution results
555 """Flush any registration notifications and execution results
561 waiting in the ZMQ queue.
556 waiting in the ZMQ queue.
562 """
557 """
563 if self._notification_socket:
558 if self._notification_socket:
564 self._flush_notifications()
559 self._flush_notifications()
565 if self._mux_socket:
560 if self._mux_socket:
566 self._flush_results(self._mux_socket)
561 self._flush_results(self._mux_socket)
567 if self._task_socket:
562 if self._task_socket:
568 self._flush_results(self._task_socket)
563 self._flush_results(self._task_socket)
569 if self._control_socket:
564 if self._control_socket:
570 self._flush_control(self._control_socket)
565 self._flush_control(self._control_socket)
571 if self._iopub_socket:
566 if self._iopub_socket:
572 self._flush_iopub(self._iopub_socket)
567 self._flush_iopub(self._iopub_socket)
573
568
574 def barrier(self, msg_ids=None, timeout=-1):
569 def barrier(self, msg_ids=None, timeout=-1):
575 """waits on one or more `msg_ids`, for up to `timeout` seconds.
570 """waits on one or more `msg_ids`, for up to `timeout` seconds.
576
571
577 Parameters
572 Parameters
578 ----------
573 ----------
579 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
574 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
580 ints are indices to self.history
575 ints are indices to self.history
581 strs are msg_ids
576 strs are msg_ids
582 default: wait on all outstanding messages
577 default: wait on all outstanding messages
583 timeout : float
578 timeout : float
584 a time in seconds, after which to give up.
579 a time in seconds, after which to give up.
585 default is -1, which means no timeout
580 default is -1, which means no timeout
586
581
587 Returns
582 Returns
588 -------
583 -------
589 True : when all msg_ids are done
584 True : when all msg_ids are done
590 False : timeout reached, some msg_ids still outstanding
585 False : timeout reached, some msg_ids still outstanding
591 """
586 """
592 tic = time.time()
587 tic = time.time()
593 if msg_ids is None:
588 if msg_ids is None:
594 theids = self.outstanding
589 theids = self.outstanding
595 else:
590 else:
596 if isinstance(msg_ids, (int, str, AsyncResult)):
591 if isinstance(msg_ids, (int, str, AsyncResult)):
597 msg_ids = [msg_ids]
592 msg_ids = [msg_ids]
598 theids = set()
593 theids = set()
599 for msg_id in msg_ids:
594 for msg_id in msg_ids:
600 if isinstance(msg_id, int):
595 if isinstance(msg_id, int):
601 msg_id = self.history[msg_id]
596 msg_id = self.history[msg_id]
602 elif isinstance(msg_id, AsyncResult):
597 elif isinstance(msg_id, AsyncResult):
603 map(theids.add, msg_id.msg_ids)
598 map(theids.add, msg_id.msg_ids)
604 continue
599 continue
605 theids.add(msg_id)
600 theids.add(msg_id)
606 if not theids.intersection(self.outstanding):
601 if not theids.intersection(self.outstanding):
607 return True
602 return True
608 self.spin()
603 self.spin()
609 while theids.intersection(self.outstanding):
604 while theids.intersection(self.outstanding):
610 if timeout >= 0 and ( time.time()-tic ) > timeout:
605 if timeout >= 0 and ( time.time()-tic ) > timeout:
611 break
606 break
612 time.sleep(1e-3)
607 time.sleep(1e-3)
613 self.spin()
608 self.spin()
614 return len(theids.intersection(self.outstanding)) == 0
609 return len(theids.intersection(self.outstanding)) == 0
615
610
616 #--------------------------------------------------------------------------
611 #--------------------------------------------------------------------------
617 # Control methods
612 # Control methods
618 #--------------------------------------------------------------------------
613 #--------------------------------------------------------------------------
619
614
620 @spinfirst
615 @spinfirst
621 @defaultblock
616 @defaultblock
622 def clear(self, targets=None, block=None):
617 def clear(self, targets=None, block=None):
623 """Clear the namespace in target(s)."""
618 """Clear the namespace in target(s)."""
624 targets = self._build_targets(targets)[0]
619 targets = self._build_targets(targets)[0]
625 for t in targets:
620 for t in targets:
626 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
621 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
627 error = False
622 error = False
628 if self.block:
623 if self.block:
629 for i in range(len(targets)):
624 for i in range(len(targets)):
630 idents,msg = self.session.recv(self._control_socket,0)
625 idents,msg = self.session.recv(self._control_socket,0)
631 if self.debug:
626 if self.debug:
632 pprint(msg)
627 pprint(msg)
633 if msg['content']['status'] != 'ok':
628 if msg['content']['status'] != 'ok':
634 error = ss.unwrap_exception(msg['content'])
629 error = ss.unwrap_exception(msg['content'])
635 if error:
630 if error:
636 return error
631 return error
637
632
638
633
639 @spinfirst
634 @spinfirst
640 @defaultblock
635 @defaultblock
641 def abort(self, msg_ids = None, targets=None, block=None):
636 def abort(self, msg_ids = None, targets=None, block=None):
642 """Abort the execution queues of target(s)."""
637 """Abort the execution queues of target(s)."""
643 targets = self._build_targets(targets)[0]
638 targets = self._build_targets(targets)[0]
644 if isinstance(msg_ids, basestring):
639 if isinstance(msg_ids, basestring):
645 msg_ids = [msg_ids]
640 msg_ids = [msg_ids]
646 content = dict(msg_ids=msg_ids)
641 content = dict(msg_ids=msg_ids)
647 for t in targets:
642 for t in targets:
648 self.session.send(self._control_socket, 'abort_request',
643 self.session.send(self._control_socket, 'abort_request',
649 content=content, ident=t)
644 content=content, ident=t)
650 error = False
645 error = False
651 if self.block:
646 if self.block:
652 for i in range(len(targets)):
647 for i in range(len(targets)):
653 idents,msg = self.session.recv(self._control_socket,0)
648 idents,msg = self.session.recv(self._control_socket,0)
654 if self.debug:
649 if self.debug:
655 pprint(msg)
650 pprint(msg)
656 if msg['content']['status'] != 'ok':
651 if msg['content']['status'] != 'ok':
657 error = ss.unwrap_exception(msg['content'])
652 error = ss.unwrap_exception(msg['content'])
658 if error:
653 if error:
659 return error
654 return error
660
655
661 @spinfirst
656 @spinfirst
662 @defaultblock
657 @defaultblock
663 def shutdown(self, targets=None, restart=False, controller=False, block=None):
658 def shutdown(self, targets=None, restart=False, controller=False, block=None):
664 """Terminates one or more engine processes, optionally including the controller."""
659 """Terminates one or more engine processes, optionally including the controller."""
665 if controller:
660 if controller:
666 targets = 'all'
661 targets = 'all'
667 targets = self._build_targets(targets)[0]
662 targets = self._build_targets(targets)[0]
668 for t in targets:
663 for t in targets:
669 self.session.send(self._control_socket, 'shutdown_request',
664 self.session.send(self._control_socket, 'shutdown_request',
670 content={'restart':restart},ident=t)
665 content={'restart':restart},ident=t)
671 error = False
666 error = False
672 if block or controller:
667 if block or controller:
673 for i in range(len(targets)):
668 for i in range(len(targets)):
674 idents,msg = self.session.recv(self._control_socket,0)
669 idents,msg = self.session.recv(self._control_socket,0)
675 if self.debug:
670 if self.debug:
676 pprint(msg)
671 pprint(msg)
677 if msg['content']['status'] != 'ok':
672 if msg['content']['status'] != 'ok':
678 error = ss.unwrap_exception(msg['content'])
673 error = ss.unwrap_exception(msg['content'])
679
674
680 if controller:
675 if controller:
681 time.sleep(0.25)
676 time.sleep(0.25)
682 self.session.send(self._query_socket, 'shutdown_request')
677 self.session.send(self._query_socket, 'shutdown_request')
683 idents,msg = self.session.recv(self._query_socket, 0)
678 idents,msg = self.session.recv(self._query_socket, 0)
684 if self.debug:
679 if self.debug:
685 pprint(msg)
680 pprint(msg)
686 if msg['content']['status'] != 'ok':
681 if msg['content']['status'] != 'ok':
687 error = ss.unwrap_exception(msg['content'])
682 error = ss.unwrap_exception(msg['content'])
688
683
689 if error:
684 if error:
690 raise error
685 raise error
691
686
692 #--------------------------------------------------------------------------
687 #--------------------------------------------------------------------------
693 # Execution methods
688 # Execution methods
694 #--------------------------------------------------------------------------
689 #--------------------------------------------------------------------------
695
690
696 @defaultblock
691 @defaultblock
697 def execute(self, code, targets='all', block=None):
692 def execute(self, code, targets='all', block=None):
698 """Executes `code` on `targets` in blocking or nonblocking manner.
693 """Executes `code` on `targets` in blocking or nonblocking manner.
699
694
700 ``execute`` is always `bound` (affects engine namespace)
695 ``execute`` is always `bound` (affects engine namespace)
701
696
702 Parameters
697 Parameters
703 ----------
698 ----------
704 code : str
699 code : str
705 the code string to be executed
700 the code string to be executed
706 targets : int/str/list of ints/strs
701 targets : int/str/list of ints/strs
707 the engines on which to execute
702 the engines on which to execute
708 default : all
703 default : all
709 block : bool
704 block : bool
710 whether or not to wait until done to return
705 whether or not to wait until done to return
711 default: self.block
706 default: self.block
712 """
707 """
713 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
708 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
714 return result
709 return result
715
710
716 def run(self, code, block=None):
711 def run(self, code, block=None):
717 """Runs `code` on an engine.
712 """Runs `code` on an engine.
718
713
719 Calls to this are load-balanced.
714 Calls to this are load-balanced.
720
715
721 ``run`` is never `bound` (no effect on engine namespace)
716 ``run`` is never `bound` (no effect on engine namespace)
722
717
723 Parameters
718 Parameters
724 ----------
719 ----------
725 code : str
720 code : str
726 the code string to be executed
721 the code string to be executed
727 block : bool
722 block : bool
728 whether or not to wait until done
723 whether or not to wait until done
729
724
730 """
725 """
731 result = self.apply(_execute, (code,), targets=None, block=block, bound=False)
726 result = self.apply(_execute, (code,), targets=None, block=block, bound=False)
732 return result
727 return result
733
728
734 def _maybe_raise(self, result):
729 def _maybe_raise(self, result):
735 """wrapper for maybe raising an exception if apply failed."""
730 """wrapper for maybe raising an exception if apply failed."""
736 if isinstance(result, error.RemoteError):
731 if isinstance(result, error.RemoteError):
737 raise result
732 raise result
738
733
739 return result
734 return result
740
735
741 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
736 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
742 after=None, follow=None):
737 after=None, follow=None):
743 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
738 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
744
739
745 This is the central execution command for the client.
740 This is the central execution command for the client.
746
741
747 Parameters
742 Parameters
748 ----------
743 ----------
749
744
750 f : function
745 f : function
751 The fuction to be called remotely
746 The fuction to be called remotely
752 args : tuple/list
747 args : tuple/list
753 The positional arguments passed to `f`
748 The positional arguments passed to `f`
754 kwargs : dict
749 kwargs : dict
755 The keyword arguments passed to `f`
750 The keyword arguments passed to `f`
756 bound : bool (default: True)
751 bound : bool (default: True)
757 Whether to execute in the Engine(s) namespace, or in a clean
752 Whether to execute in the Engine(s) namespace, or in a clean
758 namespace not affecting the engine.
753 namespace not affecting the engine.
759 block : bool (default: self.block)
754 block : bool (default: self.block)
760 Whether to wait for the result, or return immediately.
755 Whether to wait for the result, or return immediately.
761 False:
756 False:
762 returns msg_id(s)
757 returns msg_id(s)
763 if multiple targets:
758 if multiple targets:
764 list of ids
759 list of ids
765 True:
760 True:
766 returns actual result(s) of f(*args, **kwargs)
761 returns actual result(s) of f(*args, **kwargs)
767 if multiple targets:
762 if multiple targets:
768 dict of results, by engine ID
763 dict of results, by engine ID
769 targets : int,list of ints, 'all', None
764 targets : int,list of ints, 'all', None
770 Specify the destination of the job.
765 Specify the destination of the job.
771 if None:
766 if None:
772 Submit via Task queue for load-balancing.
767 Submit via Task queue for load-balancing.
773 if 'all':
768 if 'all':
774 Run on all active engines
769 Run on all active engines
775 if list:
770 if list:
776 Run on each specified engine
771 Run on each specified engine
777 if int:
772 if int:
778 Run on single engine
773 Run on single engine
779
774
780 after : Dependency or collection of msg_ids
775 after : Dependency or collection of msg_ids
781 Only for load-balanced execution (targets=None)
776 Only for load-balanced execution (targets=None)
782 Specify a list of msg_ids as a time-based dependency.
777 Specify a list of msg_ids as a time-based dependency.
783 This job will only be run *after* the dependencies
778 This job will only be run *after* the dependencies
784 have been met.
779 have been met.
785
780
786 follow : Dependency or collection of msg_ids
781 follow : Dependency or collection of msg_ids
787 Only for load-balanced execution (targets=None)
782 Only for load-balanced execution (targets=None)
788 Specify a list of msg_ids as a location-based dependency.
783 Specify a list of msg_ids as a location-based dependency.
789 This job will only be run on an engine where this dependency
784 This job will only be run on an engine where this dependency
790 is met.
785 is met.
791
786
792 Returns
787 Returns
793 -------
788 -------
794 if block is False:
789 if block is False:
795 if single target:
790 if single target:
796 return msg_id
791 return msg_id
797 else:
792 else:
798 return list of msg_ids
793 return list of msg_ids
799 ? (should this be dict like block=True) ?
794 ? (should this be dict like block=True) ?
800 else:
795 else:
801 if single target:
796 if single target:
802 return result of f(*args, **kwargs)
797 return result of f(*args, **kwargs)
803 else:
798 else:
804 return dict of results, keyed by engine
799 return dict of results, keyed by engine
805 """
800 """
806
801
807 # defaults:
802 # defaults:
808 block = block if block is not None else self.block
803 block = block if block is not None else self.block
809 args = args if args is not None else []
804 args = args if args is not None else []
810 kwargs = kwargs if kwargs is not None else {}
805 kwargs = kwargs if kwargs is not None else {}
811
806
812 # enforce types of f,args,kwrags
807 # enforce types of f,args,kwrags
813 if not callable(f):
808 if not callable(f):
814 raise TypeError("f must be callable, not %s"%type(f))
809 raise TypeError("f must be callable, not %s"%type(f))
815 if not isinstance(args, (tuple, list)):
810 if not isinstance(args, (tuple, list)):
816 raise TypeError("args must be tuple or list, not %s"%type(args))
811 raise TypeError("args must be tuple or list, not %s"%type(args))
817 if not isinstance(kwargs, dict):
812 if not isinstance(kwargs, dict):
818 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
813 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
819
814
820 if isinstance(after, Dependency):
815 if isinstance(after, Dependency):
821 after = after.as_dict()
816 after = after.as_dict()
822 elif isinstance(after, AsyncResult):
817 elif isinstance(after, AsyncResult):
823 after=after.msg_ids
818 after=after.msg_ids
824 elif after is None:
819 elif after is None:
825 after = []
820 after = []
826 if isinstance(follow, Dependency):
821 if isinstance(follow, Dependency):
827 follow = follow.as_dict()
822 follow = follow.as_dict()
828 elif isinstance(follow, AsyncResult):
823 elif isinstance(follow, AsyncResult):
829 follow=follow.msg_ids
824 follow=follow.msg_ids
830 elif follow is None:
825 elif follow is None:
831 follow = []
826 follow = []
832 options = dict(bound=bound, block=block, after=after, follow=follow)
827 options = dict(bound=bound, block=block, after=after, follow=follow)
833
828
834 if targets is None:
829 if targets is None:
835 return self._apply_balanced(f, args, kwargs, **options)
830 return self._apply_balanced(f, args, kwargs, **options)
836 else:
831 else:
837 return self._apply_direct(f, args, kwargs, targets=targets, **options)
832 return self._apply_direct(f, args, kwargs, targets=targets, **options)
838
833
839 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
834 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
840 after=None, follow=None):
835 after=None, follow=None):
841 """The underlying method for applying functions in a load balanced
836 """The underlying method for applying functions in a load balanced
842 manner, via the task queue."""
837 manner, via the task queue."""
843
838
844 subheader = dict(after=after, follow=follow)
839 subheader = dict(after=after, follow=follow)
845 bufs = ss.pack_apply_message(f,args,kwargs)
840 bufs = ss.pack_apply_message(f,args,kwargs)
846 content = dict(bound=bound)
841 content = dict(bound=bound)
847
842
848 msg = self.session.send(self._task_socket, "apply_request",
843 msg = self.session.send(self._task_socket, "apply_request",
849 content=content, buffers=bufs, subheader=subheader)
844 content=content, buffers=bufs, subheader=subheader)
850 msg_id = msg['msg_id']
845 msg_id = msg['msg_id']
851 self.outstanding.add(msg_id)
846 self.outstanding.add(msg_id)
852 self.history.append(msg_id)
847 self.history.append(msg_id)
853 ar = AsyncResult(self, [msg_id], fname=f.__name__)
848 ar = AsyncResult(self, [msg_id], fname=f.__name__)
854 if block:
849 if block:
855 return ar.get()
850 return ar.get()
856 else:
851 else:
857 return ar
852 return ar
858
853
859 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
854 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
860 after=None, follow=None):
855 after=None, follow=None):
861 """Then underlying method for applying functions to specific engines
856 """Then underlying method for applying functions to specific engines
862 via the MUX queue."""
857 via the MUX queue."""
863
858
864 queues,targets = self._build_targets(targets)
859 queues,targets = self._build_targets(targets)
865
860
866 subheader = dict(after=after, follow=follow)
861 subheader = dict(after=after, follow=follow)
867 content = dict(bound=bound)
862 content = dict(bound=bound)
868 bufs = ss.pack_apply_message(f,args,kwargs)
863 bufs = ss.pack_apply_message(f,args,kwargs)
869
864
870 msg_ids = []
865 msg_ids = []
871 for queue in queues:
866 for queue in queues:
872 msg = self.session.send(self._mux_socket, "apply_request",
867 msg = self.session.send(self._mux_socket, "apply_request",
873 content=content, buffers=bufs,ident=queue, subheader=subheader)
868 content=content, buffers=bufs,ident=queue, subheader=subheader)
874 msg_id = msg['msg_id']
869 msg_id = msg['msg_id']
875 self.outstanding.add(msg_id)
870 self.outstanding.add(msg_id)
876 self.history.append(msg_id)
871 self.history.append(msg_id)
877 msg_ids.append(msg_id)
872 msg_ids.append(msg_id)
878 ar = AsyncResult(self, msg_ids, fname=f.__name__)
873 ar = AsyncResult(self, msg_ids, fname=f.__name__)
879 if block:
874 if block:
880 return ar.get()
875 return ar.get()
881 else:
876 else:
882 return ar
877 return ar
883
878
884 #--------------------------------------------------------------------------
879 #--------------------------------------------------------------------------
885 # Map and decorators
880 # Map and decorators
886 #--------------------------------------------------------------------------
881 #--------------------------------------------------------------------------
887
882
888 def map(self, f, *sequences):
883 def map(self, f, *sequences):
889 """Parallel version of builtin `map`, using all our engines."""
884 """Parallel version of builtin `map`, using all our engines."""
890 pf = ParallelFunction(self, f, block=self.block,
885 pf = ParallelFunction(self, f, block=self.block,
891 bound=True, targets='all')
886 bound=True, targets='all')
892 return pf.map(*sequences)
887 return pf.map(*sequences)
893
888
894 def parallel(self, bound=True, targets='all', block=True):
889 def parallel(self, bound=True, targets='all', block=True):
895 """Decorator for making a ParallelFunction."""
890 """Decorator for making a ParallelFunction."""
896 return parallel(self, bound=bound, targets=targets, block=block)
891 return parallel(self, bound=bound, targets=targets, block=block)
897
892
898 def remote(self, bound=True, targets='all', block=True):
893 def remote(self, bound=True, targets='all', block=True):
899 """Decorator for making a RemoteFunction."""
894 """Decorator for making a RemoteFunction."""
900 return remote(self, bound=bound, targets=targets, block=block)
895 return remote(self, bound=bound, targets=targets, block=block)
901
896
902 #--------------------------------------------------------------------------
897 #--------------------------------------------------------------------------
903 # Data movement
898 # Data movement
904 #--------------------------------------------------------------------------
899 #--------------------------------------------------------------------------
905
900
906 @defaultblock
901 @defaultblock
907 def push(self, ns, targets='all', block=None):
902 def push(self, ns, targets='all', block=None):
908 """Push the contents of `ns` into the namespace on `target`"""
903 """Push the contents of `ns` into the namespace on `target`"""
909 if not isinstance(ns, dict):
904 if not isinstance(ns, dict):
910 raise TypeError("Must be a dict, not %s"%type(ns))
905 raise TypeError("Must be a dict, not %s"%type(ns))
911 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
906 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
912 return result
907 return result
913
908
914 @defaultblock
909 @defaultblock
915 def pull(self, keys, targets='all', block=None):
910 def pull(self, keys, targets='all', block=None):
916 """Pull objects from `target`'s namespace by `keys`"""
911 """Pull objects from `target`'s namespace by `keys`"""
917 if isinstance(keys, str):
912 if isinstance(keys, str):
918 pass
913 pass
919 elif isinstance(keys, (list,tuple,set)):
914 elif isinstance(keys, (list,tuple,set)):
920 for key in keys:
915 for key in keys:
921 if not isinstance(key, str):
916 if not isinstance(key, str):
922 raise TypeError
917 raise TypeError
923 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
918 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
924 return result
919 return result
925
920
926 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
921 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
927 """
922 """
928 Partition a Python sequence and send the partitions to a set of engines.
923 Partition a Python sequence and send the partitions to a set of engines.
929 """
924 """
930 block = block if block is not None else self.block
925 block = block if block is not None else self.block
931 targets = self._build_targets(targets)[-1]
926 targets = self._build_targets(targets)[-1]
932 mapObject = Map.dists[dist]()
927 mapObject = Map.dists[dist]()
933 nparts = len(targets)
928 nparts = len(targets)
934 msg_ids = []
929 msg_ids = []
935 for index, engineid in enumerate(targets):
930 for index, engineid in enumerate(targets):
936 partition = mapObject.getPartition(seq, index, nparts)
931 partition = mapObject.getPartition(seq, index, nparts)
937 if flatten and len(partition) == 1:
932 if flatten and len(partition) == 1:
938 r = self.push({key: partition[0]}, targets=engineid, block=False)
933 r = self.push({key: partition[0]}, targets=engineid, block=False)
939 else:
934 else:
940 r = self.push({key: partition}, targets=engineid, block=False)
935 r = self.push({key: partition}, targets=engineid, block=False)
941 msg_ids.extend(r.msg_ids)
936 msg_ids.extend(r.msg_ids)
942 r = AsyncResult(self, msg_ids, fname='scatter')
937 r = AsyncResult(self, msg_ids, fname='scatter')
943 if block:
938 if block:
944 return r.get()
939 return r.get()
945 else:
940 else:
946 return r
941 return r
947
942
948 def gather(self, key, dist='b', targets='all', block=None):
943 def gather(self, key, dist='b', targets='all', block=None):
949 """
944 """
950 Gather a partitioned sequence on a set of engines as a single local seq.
945 Gather a partitioned sequence on a set of engines as a single local seq.
951 """
946 """
952 block = block if block is not None else self.block
947 block = block if block is not None else self.block
953
948
954 targets = self._build_targets(targets)[-1]
949 targets = self._build_targets(targets)[-1]
955 mapObject = Map.dists[dist]()
950 mapObject = Map.dists[dist]()
956 msg_ids = []
951 msg_ids = []
957 for index, engineid in enumerate(targets):
952 for index, engineid in enumerate(targets):
958 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
953 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
959
954
960 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
955 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
961 if block:
956 if block:
962 return r.get()
957 return r.get()
963 else:
958 else:
964 return r
959 return r
965
960
966 #--------------------------------------------------------------------------
961 #--------------------------------------------------------------------------
967 # Query methods
962 # Query methods
968 #--------------------------------------------------------------------------
963 #--------------------------------------------------------------------------
969
964
970 @spinfirst
965 @spinfirst
971 def get_results(self, msg_ids, status_only=False):
966 def get_results(self, msg_ids, status_only=False):
972 """Returns the result of the execute or task request with `msg_ids`.
967 """Returns the result of the execute or task request with `msg_ids`.
973
968
974 Parameters
969 Parameters
975 ----------
970 ----------
976 msg_ids : list of ints or msg_ids
971 msg_ids : list of ints or msg_ids
977 if int:
972 if int:
978 Passed as index to self.history for convenience.
973 Passed as index to self.history for convenience.
979 status_only : bool (default: False)
974 status_only : bool (default: False)
980 if False:
975 if False:
981 return the actual results
976 return the actual results
982
977
983 Returns
978 Returns
984 -------
979 -------
985
980
986 results : dict
981 results : dict
987 There will always be the keys 'pending' and 'completed', which will
982 There will always be the keys 'pending' and 'completed', which will
988 be lists of msg_ids.
983 be lists of msg_ids.
989 """
984 """
990 if not isinstance(msg_ids, (list,tuple)):
985 if not isinstance(msg_ids, (list,tuple)):
991 msg_ids = [msg_ids]
986 msg_ids = [msg_ids]
992 theids = []
987 theids = []
993 for msg_id in msg_ids:
988 for msg_id in msg_ids:
994 if isinstance(msg_id, int):
989 if isinstance(msg_id, int):
995 msg_id = self.history[msg_id]
990 msg_id = self.history[msg_id]
996 if not isinstance(msg_id, str):
991 if not isinstance(msg_id, str):
997 raise TypeError("msg_ids must be str, not %r"%msg_id)
992 raise TypeError("msg_ids must be str, not %r"%msg_id)
998 theids.append(msg_id)
993 theids.append(msg_id)
999
994
1000 completed = []
995 completed = []
1001 local_results = {}
996 local_results = {}
1002 # temporarily disable local shortcut
997 # temporarily disable local shortcut
1003 # for msg_id in list(theids):
998 # for msg_id in list(theids):
1004 # if msg_id in self.results:
999 # if msg_id in self.results:
1005 # completed.append(msg_id)
1000 # completed.append(msg_id)
1006 # local_results[msg_id] = self.results[msg_id]
1001 # local_results[msg_id] = self.results[msg_id]
1007 # theids.remove(msg_id)
1002 # theids.remove(msg_id)
1008
1003
1009 if theids: # some not locally cached
1004 if theids: # some not locally cached
1010 content = dict(msg_ids=theids, status_only=status_only)
1005 content = dict(msg_ids=theids, status_only=status_only)
1011 msg = self.session.send(self._query_socket, "result_request", content=content)
1006 msg = self.session.send(self._query_socket, "result_request", content=content)
1012 zmq.select([self._query_socket], [], [])
1007 zmq.select([self._query_socket], [], [])
1013 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1008 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1014 if self.debug:
1009 if self.debug:
1015 pprint(msg)
1010 pprint(msg)
1016 content = msg['content']
1011 content = msg['content']
1017 if content['status'] != 'ok':
1012 if content['status'] != 'ok':
1018 raise ss.unwrap_exception(content)
1013 raise ss.unwrap_exception(content)
1019 buffers = msg['buffers']
1014 buffers = msg['buffers']
1020 else:
1015 else:
1021 content = dict(completed=[],pending=[])
1016 content = dict(completed=[],pending=[])
1022
1017
1023 content['completed'].extend(completed)
1018 content['completed'].extend(completed)
1024
1019
1025 if status_only:
1020 if status_only:
1026 return content
1021 return content
1027
1022
1028 failures = []
1023 failures = []
1029 # load cached results into result:
1024 # load cached results into result:
1030 content.update(local_results)
1025 content.update(local_results)
1031 # update cache with results:
1026 # update cache with results:
1032 for msg_id in sorted(theids):
1027 for msg_id in sorted(theids):
1033 if msg_id in content['completed']:
1028 if msg_id in content['completed']:
1034 rec = content[msg_id]
1029 rec = content[msg_id]
1035 parent = rec['header']
1030 parent = rec['header']
1036 header = rec['result_header']
1031 header = rec['result_header']
1037 rcontent = rec['result_content']
1032 rcontent = rec['result_content']
1038 iodict = rec['io']
1033 iodict = rec['io']
1039 if isinstance(rcontent, str):
1034 if isinstance(rcontent, str):
1040 rcontent = self.session.unpack(rcontent)
1035 rcontent = self.session.unpack(rcontent)
1041
1036
1042 md = self.metadata.setdefault(msg_id, Metadata())
1037 md = self.metadata.setdefault(msg_id, Metadata())
1043 md.update(self._extract_metadata(header, parent, rcontent))
1038 md.update(self._extract_metadata(header, parent, rcontent))
1044 md.update(iodict)
1039 md.update(iodict)
1045
1040
1046 if rcontent['status'] == 'ok':
1041 if rcontent['status'] == 'ok':
1047 res,buffers = ss.unserialize_object(buffers)
1042 res,buffers = ss.unserialize_object(buffers)
1048 else:
1043 else:
1049 res = ss.unwrap_exception(rcontent)
1044 res = ss.unwrap_exception(rcontent)
1050 failures.append(res)
1045 failures.append(res)
1051
1046
1052 self.results[msg_id] = res
1047 self.results[msg_id] = res
1053 content[msg_id] = res
1048 content[msg_id] = res
1054
1049
1055 error.collect_exceptions(failures, "get_results")
1050 error.collect_exceptions(failures, "get_results")
1056 return content
1051 return content
1057
1052
1058 @spinfirst
1053 @spinfirst
1059 def queue_status(self, targets=None, verbose=False):
1054 def queue_status(self, targets=None, verbose=False):
1060 """Fetch the status of engine queues.
1055 """Fetch the status of engine queues.
1061
1056
1062 Parameters
1057 Parameters
1063 ----------
1058 ----------
1064 targets : int/str/list of ints/strs
1059 targets : int/str/list of ints/strs
1065 the engines on which to execute
1060 the engines on which to execute
1066 default : all
1061 default : all
1067 verbose : bool
1062 verbose : bool
1068 Whether to return lengths only, or lists of ids for each element
1063 Whether to return lengths only, or lists of ids for each element
1069 """
1064 """
1070 targets = self._build_targets(targets)[1]
1065 targets = self._build_targets(targets)[1]
1071 content = dict(targets=targets, verbose=verbose)
1066 content = dict(targets=targets, verbose=verbose)
1072 self.session.send(self._query_socket, "queue_request", content=content)
1067 self.session.send(self._query_socket, "queue_request", content=content)
1073 idents,msg = self.session.recv(self._query_socket, 0)
1068 idents,msg = self.session.recv(self._query_socket, 0)
1074 if self.debug:
1069 if self.debug:
1075 pprint(msg)
1070 pprint(msg)
1076 content = msg['content']
1071 content = msg['content']
1077 status = content.pop('status')
1072 status = content.pop('status')
1078 if status != 'ok':
1073 if status != 'ok':
1079 raise ss.unwrap_exception(content)
1074 raise ss.unwrap_exception(content)
1080 return ss.rekey(content)
1075 return ss.rekey(content)
1081
1076
1082 @spinfirst
1077 @spinfirst
1083 def purge_results(self, msg_ids=[], targets=[]):
1078 def purge_results(self, msg_ids=[], targets=[]):
1084 """Tell the controller to forget results.
1079 """Tell the controller to forget results.
1085
1080
1086 Individual results can be purged by msg_id, or the entire
1081 Individual results can be purged by msg_id, or the entire
1087 history of specific targets can be purged.
1082 history of specific targets can be purged.
1088
1083
1089 Parameters
1084 Parameters
1090 ----------
1085 ----------
1091 msg_ids : str or list of strs
1086 msg_ids : str or list of strs
1092 the msg_ids whose results should be forgotten.
1087 the msg_ids whose results should be forgotten.
1093 targets : int/str/list of ints/strs
1088 targets : int/str/list of ints/strs
1094 The targets, by uuid or int_id, whose entire history is to be purged.
1089 The targets, by uuid or int_id, whose entire history is to be purged.
1095 Use `targets='all'` to scrub everything from the controller's memory.
1090 Use `targets='all'` to scrub everything from the controller's memory.
1096
1091
1097 default : None
1092 default : None
1098 """
1093 """
1099 if not targets and not msg_ids:
1094 if not targets and not msg_ids:
1100 raise ValueError
1095 raise ValueError
1101 if targets:
1096 if targets:
1102 targets = self._build_targets(targets)[1]
1097 targets = self._build_targets(targets)[1]
1103 content = dict(targets=targets, msg_ids=msg_ids)
1098 content = dict(targets=targets, msg_ids=msg_ids)
1104 self.session.send(self._query_socket, "purge_request", content=content)
1099 self.session.send(self._query_socket, "purge_request", content=content)
1105 idents, msg = self.session.recv(self._query_socket, 0)
1100 idents, msg = self.session.recv(self._query_socket, 0)
1106 if self.debug:
1101 if self.debug:
1107 pprint(msg)
1102 pprint(msg)
1108 content = msg['content']
1103 content = msg['content']
1109 if content['status'] != 'ok':
1104 if content['status'] != 'ok':
1110 raise ss.unwrap_exception(content)
1105 raise ss.unwrap_exception(content)
1111
1106
1112 #----------------------------------------
1107 #----------------------------------------
1113 # activate for %px,%autopx magics
1108 # activate for %px,%autopx magics
1114 #----------------------------------------
1109 #----------------------------------------
1115 def activate(self):
1110 def activate(self):
1116 """Make this `View` active for parallel magic commands.
1111 """Make this `View` active for parallel magic commands.
1117
1112
1118 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1113 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1119 In a given IPython session there is a single active one. While
1114 In a given IPython session there is a single active one. While
1120 there can be many `Views` created and used by the user,
1115 there can be many `Views` created and used by the user,
1121 there is only one active one. The active `View` is used whenever
1116 there is only one active one. The active `View` is used whenever
1122 the magic commands %px and %autopx are used.
1117 the magic commands %px and %autopx are used.
1123
1118
1124 The activate() method is called on a given `View` to make it
1119 The activate() method is called on a given `View` to make it
1125 active. Once this has been done, the magic commands can be used.
1120 active. Once this has been done, the magic commands can be used.
1126 """
1121 """
1127
1122
1128 try:
1123 try:
1129 # This is injected into __builtins__.
1124 # This is injected into __builtins__.
1130 ip = get_ipython()
1125 ip = get_ipython()
1131 except NameError:
1126 except NameError:
1132 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1127 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1133 else:
1128 else:
1134 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1129 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1135 if pmagic is not None:
1130 if pmagic is not None:
1136 pmagic.active_multiengine_client = self
1131 pmagic.active_multiengine_client = self
1137 else:
1132 else:
1138 print "You must first load the parallelmagic extension " \
1133 print "You must first load the parallelmagic extension " \
1139 "by doing '%load_ext parallelmagic'"
1134 "by doing '%load_ext parallelmagic'"
1140
1135
1141 class AsynClient(Client):
1136 class AsynClient(Client):
1142 """An Asynchronous client, using the Tornado Event Loop.
1137 """An Asynchronous client, using the Tornado Event Loop.
1143 !!!unfinished!!!"""
1138 !!!unfinished!!!"""
1144 io_loop = None
1139 io_loop = None
1145 _queue_stream = None
1140 _queue_stream = None
1146 _notifier_stream = None
1141 _notifier_stream = None
1147 _task_stream = None
1142 _task_stream = None
1148 _control_stream = None
1143 _control_stream = None
1149
1144
1150 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1145 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1151 Client.__init__(self, addr, context, username, debug)
1146 Client.__init__(self, addr, context, username, debug)
1152 if io_loop is None:
1147 if io_loop is None:
1153 io_loop = ioloop.IOLoop.instance()
1148 io_loop = ioloop.IOLoop.instance()
1154 self.io_loop = io_loop
1149 self.io_loop = io_loop
1155
1150
1156 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1151 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1157 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1152 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1158 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1153 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1159 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1154 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1160
1155
1161 def spin(self):
1156 def spin(self):
1162 for stream in (self.queue_stream, self.notifier_stream,
1157 for stream in (self.queue_stream, self.notifier_stream,
1163 self.task_stream, self.control_stream):
1158 self.task_stream, self.control_stream):
1164 stream.flush()
1159 stream.flush()
1165
1160
1166 __all__ = [ 'Client',
1161 __all__ = [ 'Client',
1167 'depend',
1162 'depend',
1168 'require',
1163 'require',
1169 'remote',
1164 'remote',
1170 'parallel',
1165 'parallel',
1171 'RemoteFunction',
1166 'RemoteFunction',
1172 'ParallelFunction',
1167 'ParallelFunction',
1173 'DirectView',
1168 'DirectView',
1174 'LoadBalancedView',
1169 'LoadBalancedView',
1175 'AsyncResult',
1170 'AsyncResult',
1176 'AsyncMapResult'
1171 'AsyncMapResult'
1177 ]
1172 ]
@@ -1,254 +1,265 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import os
18 import os
19 import time
19 import time
20 import logging
20 from multiprocessing import Process
21 from multiprocessing import Process
21
22
22 import zmq
23 import zmq
23 from zmq.eventloop import ioloop
24 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
25 from zmq.eventloop.zmqstream import ZMQStream
25 from zmq.devices import ProcessMonitoredQueue
26 from zmq.devices import ProcessMonitoredQueue
26
27
27 # internal:
28 # internal:
28 from IPython.zmq.entry_point import bind_port
29 from IPython.zmq.entry_point import bind_port
29
30
30 from hub import Hub
31 from hub import Hub
31 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
32 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
32 connect_logger, parse_url, signal_children, generate_exec_key)
33 connect_logger, parse_url, signal_children, generate_exec_key,
34 local_logger)
33
35
34
36
35 import streamsession as session
37 import streamsession as session
36 import heartmonitor
38 import heartmonitor
37 from scheduler import launch_scheduler
39 from scheduler import launch_scheduler
38
40
39 from dictdb import DictDB
41 from dictdb import DictDB
40 try:
42 try:
41 import pymongo
43 import pymongo
42 except ImportError:
44 except ImportError:
43 MongoDB=None
45 MongoDB=None
44 else:
46 else:
45 from mongodb import MongoDB
47 from mongodb import MongoDB
46
48
47 #-------------------------------------------------------------------------
49 #-------------------------------------------------------------------------
48 # Entry Point
50 # Entry Point
49 #-------------------------------------------------------------------------
51 #-------------------------------------------------------------------------
50
52
51 def make_argument_parser():
53 def make_argument_parser():
52 """Make an argument parser"""
54 """Make an argument parser"""
53 parser = make_base_argument_parser()
55 parser = make_base_argument_parser()
54
56
55 parser.add_argument('--client', type=int, metavar='PORT', default=0,
57 parser.add_argument('--client', type=int, metavar='PORT', default=0,
56 help='set the XREP port for clients [default: random]')
58 help='set the XREP port for clients [default: random]')
57 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
59 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
58 help='set the PUB socket for registration notification [default: random]')
60 help='set the PUB socket for registration notification [default: random]')
59 parser.add_argument('--hb', type=str, metavar='PORTS',
61 parser.add_argument('--hb', type=str, metavar='PORTS',
60 help='set the 2 ports for heartbeats [default: random]')
62 help='set the 2 ports for heartbeats [default: random]')
61 parser.add_argument('--ping', type=int, default=100,
63 parser.add_argument('--ping', type=int, default=100,
62 help='set the heartbeat period in ms [default: 100]')
64 help='set the heartbeat period in ms [default: 100]')
63 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
65 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
64 help='set the SUB port for queue monitoring [default: random]')
66 help='set the SUB port for queue monitoring [default: random]')
65 parser.add_argument('--mux', type=str, metavar='PORTS',
67 parser.add_argument('--mux', type=str, metavar='PORTS',
66 help='set the XREP ports for the MUX queue [default: random]')
68 help='set the XREP ports for the MUX queue [default: random]')
67 parser.add_argument('--task', type=str, metavar='PORTS',
69 parser.add_argument('--task', type=str, metavar='PORTS',
68 help='set the XREP/XREQ ports for the task queue [default: random]')
70 help='set the XREP/XREQ ports for the task queue [default: random]')
69 parser.add_argument('--control', type=str, metavar='PORTS',
71 parser.add_argument('--control', type=str, metavar='PORTS',
70 help='set the XREP ports for the control queue [default: random]')
72 help='set the XREP ports for the control queue [default: random]')
71 parser.add_argument('--iopub', type=str, metavar='PORTS',
73 parser.add_argument('--iopub', type=str, metavar='PORTS',
72 help='set the PUB/SUB ports for the iopub relay [default: random]')
74 help='set the PUB/SUB ports for the iopub relay [default: random]')
73 parser.add_argument('--scheduler', type=str, default='lru',
75 parser.add_argument('--scheduler', type=str, default='lru',
74 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
76 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
75 help='select the task scheduler [default: Python LRU]')
77 help='select the task scheduler [default: Python LRU]')
76 parser.add_argument('--mongodb', action='store_true',
78 parser.add_argument('--mongodb', action='store_true',
77 help='Use MongoDB task storage [default: in-memory]')
79 help='Use MongoDB task storage [default: in-memory]')
78 parser.add_argument('--session', type=str, default=None,
80 parser.add_argument('--session', type=str, default=None,
79 help='Manually specify the session id.')
81 help='Manually specify the session id.')
80
82
81 return parser
83 return parser
82
84
83 def main(argv=None):
85 def main(argv=None):
84
86
85 parser = make_argument_parser()
87 parser = make_argument_parser()
86
88
87 args = parser.parse_args(argv)
89 args = parser.parse_args(argv)
88 parse_url(args)
90 parse_url(args)
89
91
90 iface="%s://%s"%(args.transport,args.ip)+':%i'
92 iface="%s://%s"%(args.transport,args.ip)+':%i'
91
93
92 random_ports = 0
94 random_ports = 0
93 if args.hb:
95 if args.hb:
94 hb = split_ports(args.hb, 2)
96 hb = split_ports(args.hb, 2)
95 else:
97 else:
96 hb = select_random_ports(2)
98 hb = select_random_ports(2)
97 if args.mux:
99 if args.mux:
98 mux = split_ports(args.mux, 2)
100 mux = split_ports(args.mux, 2)
99 else:
101 else:
100 mux = None
102 mux = None
101 random_ports += 2
103 random_ports += 2
102 if args.iopub:
104 if args.iopub:
103 iopub = split_ports(args.iopub, 2)
105 iopub = split_ports(args.iopub, 2)
104 else:
106 else:
105 iopub = None
107 iopub = None
106 random_ports += 2
108 random_ports += 2
107 if args.task:
109 if args.task:
108 task = split_ports(args.task, 2)
110 task = split_ports(args.task, 2)
109 else:
111 else:
110 task = None
112 task = None
111 random_ports += 2
113 random_ports += 2
112 if args.control:
114 if args.control:
113 control = split_ports(args.control, 2)
115 control = split_ports(args.control, 2)
114 else:
116 else:
115 control = None
117 control = None
116 random_ports += 2
118 random_ports += 2
117
119
118 ctx = zmq.Context()
120 ctx = zmq.Context()
119 loop = ioloop.IOLoop.instance()
121 loop = ioloop.IOLoop.instance()
120
122
121 # setup logging
122 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
123
123
124 # Registrar socket
124 # Registrar socket
125 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
125 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
126 regport = bind_port(reg, args.ip, args.regport)
126 regport = bind_port(reg, args.ip, args.regport)
127
127
128 ### Engine connections ###
128 ### Engine connections ###
129
129
130 # heartbeat
130 # heartbeat
131 hpub = ctx.socket(zmq.PUB)
131 hpub = ctx.socket(zmq.PUB)
132 bind_port(hpub, args.ip, hb[0])
132 bind_port(hpub, args.ip, hb[0])
133 hrep = ctx.socket(zmq.XREP)
133 hrep = ctx.socket(zmq.XREP)
134 bind_port(hrep, args.ip, hb[1])
134 bind_port(hrep, args.ip, hb[1])
135
135
136 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
136 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
137 hmon.start()
137 hmon.start()
138
138
139 ### Client connections ###
139 ### Client connections ###
140 # Clientele socket
140 # Clientele socket
141 c = ZMQStream(ctx.socket(zmq.XREP), loop)
141 c = ZMQStream(ctx.socket(zmq.XREP), loop)
142 cport = bind_port(c, args.ip, args.client)
142 cport = bind_port(c, args.ip, args.client)
143 # Notifier socket
143 # Notifier socket
144 n = ZMQStream(ctx.socket(zmq.PUB), loop)
144 n = ZMQStream(ctx.socket(zmq.PUB), loop)
145 nport = bind_port(n, args.ip, args.notice)
145 nport = bind_port(n, args.ip, args.notice)
146
146
147 ### Key File ###
147 ### Key File ###
148 if args.execkey and not os.path.isfile(args.execkey):
148 if args.execkey and not os.path.isfile(args.execkey):
149 generate_exec_key(args.execkey)
149 generate_exec_key(args.execkey)
150
150
151 thesession = session.StreamSession(username=args.ident or "controller",
151 thesession = session.StreamSession(username=args.ident or "controller",
152 keyfile=args.execkey, session=args.session)
152 keyfile=args.execkey, session=args.session)
153
153
154 ### build and launch the queues ###
154 ### build and launch the queues ###
155
155
156 # monitor socket
156 # monitor socket
157 sub = ctx.socket(zmq.SUB)
157 sub = ctx.socket(zmq.SUB)
158 sub.setsockopt(zmq.SUBSCRIBE, "")
158 sub.setsockopt(zmq.SUBSCRIBE, "")
159 monport = bind_port(sub, args.ip, args.monitor)
159 monport = bind_port(sub, args.ip, args.monitor)
160 sub = ZMQStream(sub, loop)
160 sub = ZMQStream(sub, loop)
161
161
162 ports = select_random_ports(random_ports)
162 ports = select_random_ports(random_ports)
163 children = []
163 children = []
164
164
165 # IOPub relay (in a Process)
165 # IOPub relay (in a Process)
166 if not iopub:
166 if not iopub:
167 iopub = (ports.pop(),ports.pop())
167 iopub = (ports.pop(),ports.pop())
168 q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A')
168 q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A')
169 q.bind_in(iface%iopub[1])
169 q.bind_in(iface%iopub[1])
170 q.bind_out(iface%iopub[0])
170 q.bind_out(iface%iopub[0])
171 q.setsockopt_in(zmq.SUBSCRIBE, '')
171 q.setsockopt_in(zmq.SUBSCRIBE, '')
172 q.connect_mon(iface%monport)
172 q.connect_mon(iface%monport)
173 q.daemon=True
173 q.daemon=True
174 q.start()
174 q.start()
175 children.append(q.launcher)
175 children.append(q.launcher)
176
176
177 # Multiplexer Queue (in a Process)
177 # Multiplexer Queue (in a Process)
178 if not mux:
178 if not mux:
179 mux = (ports.pop(),ports.pop())
179 mux = (ports.pop(),ports.pop())
180 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
180 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
181 q.bind_in(iface%mux[0])
181 q.bind_in(iface%mux[0])
182 q.bind_out(iface%mux[1])
182 q.bind_out(iface%mux[1])
183 q.connect_mon(iface%monport)
183 q.connect_mon(iface%monport)
184 q.daemon=True
184 q.daemon=True
185 q.start()
185 q.start()
186 children.append(q.launcher)
186 children.append(q.launcher)
187
187
188 # Control Queue (in a Process)
188 # Control Queue (in a Process)
189 if not control:
189 if not control:
190 control = (ports.pop(),ports.pop())
190 control = (ports.pop(),ports.pop())
191 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
191 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
192 q.bind_in(iface%control[0])
192 q.bind_in(iface%control[0])
193 q.bind_out(iface%control[1])
193 q.bind_out(iface%control[1])
194 q.connect_mon(iface%monport)
194 q.connect_mon(iface%monport)
195 q.daemon=True
195 q.daemon=True
196 q.start()
196 q.start()
197 children.append(q.launcher)
197 children.append(q.launcher)
198 # Task Queue (in a Process)
198 # Task Queue (in a Process)
199 if not task:
199 if not task:
200 task = (ports.pop(),ports.pop())
200 task = (ports.pop(),ports.pop())
201 if args.scheduler == 'pure':
201 if args.scheduler == 'pure':
202 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
202 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
203 q.bind_in(iface%task[0])
203 q.bind_in(iface%task[0])
204 q.bind_out(iface%task[1])
204 q.bind_out(iface%task[1])
205 q.connect_mon(iface%monport)
205 q.connect_mon(iface%monport)
206 q.daemon=True
206 q.daemon=True
207 q.start()
207 q.start()
208 children.append(q.launcher)
208 children.append(q.launcher)
209 else:
209 else:
210 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
210 log_addr = iface%args.logport if args.logport else None
211 sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport,
212 log_addr, args.loglevel, args.scheduler)
211 print (sargs)
213 print (sargs)
212 q = Process(target=launch_scheduler, args=sargs)
214 q = Process(target=launch_scheduler, args=sargs)
213 q.daemon=True
215 q.daemon=True
214 q.start()
216 q.start()
215 children.append(q)
217 children.append(q)
216
218
217 if args.mongodb:
219 if args.mongodb:
218 from mongodb import MongoDB
220 from mongodb import MongoDB
219 db = MongoDB(thesession.session)
221 db = MongoDB(thesession.session)
220 else:
222 else:
221 db = DictDB()
223 db = DictDB()
222 time.sleep(.25)
224 time.sleep(.25)
223
225
224 # build connection dicts
226 # build connection dicts
225 engine_addrs = {
227 engine_addrs = {
226 'control' : iface%control[1],
228 'control' : iface%control[1],
227 'queue': iface%mux[1],
229 'mux': iface%mux[1],
228 'heartbeat': (iface%hb[0], iface%hb[1]),
230 'heartbeat': (iface%hb[0], iface%hb[1]),
229 'task' : iface%task[1],
231 'task' : iface%task[1],
230 'iopub' : iface%iopub[1],
232 'iopub' : iface%iopub[1],
231 'monitor' : iface%monport,
233 'monitor' : iface%monport,
232 }
234 }
233
235
234 client_addrs = {
236 client_addrs = {
235 'control' : iface%control[0],
237 'control' : iface%control[0],
236 'query': iface%cport,
238 'query': iface%cport,
237 'queue': iface%mux[0],
239 'mux': iface%mux[0],
238 'task' : iface%task[0],
240 'task' : iface%task[0],
239 'iopub' : iface%iopub[0],
241 'iopub' : iface%iopub[0],
240 'notification': iface%nport
242 'notification': iface%nport
241 }
243 }
242
244
245 # setup logging
246 if args.logport:
247 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
248 else:
249 local_logger(args.loglevel)
250
243 # register relay of signals to the children
251 # register relay of signals to the children
244 signal_children(children)
252 signal_children(children)
245 hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
253 hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon,
254 registrar=reg, clientele=c, notifier=n, db=db,
255 engine_addrs=engine_addrs, client_addrs=client_addrs)
256
246 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
257 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
247 dc.start()
258 dc.start()
248 loop.start()
259 loop.start()
249
260
250
261
251
262
252
263
253 if __name__ == '__main__':
264 if __name__ == '__main__':
254 main()
265 main()
@@ -1,139 +1,144 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple engine that talks to a controller over 0MQ.
2 """A simple engine that talks to a controller over 0MQ.
3 it handles registration, etc. and launches a kernel
3 it handles registration, etc. and launches a kernel
4 connected to the Controller's queue(s).
4 connected to the Controller's queue(s).
5 """
5 """
6 from __future__ import print_function
6 from __future__ import print_function
7 import sys
7 import sys
8 import time
8 import time
9 import traceback
9 import traceback
10 import uuid
10 import uuid
11 import logging
11 from pprint import pprint
12 from pprint import pprint
12
13
13 import zmq
14 import zmq
14 from zmq.eventloop import ioloop, zmqstream
15 from zmq.eventloop import ioloop, zmqstream
15
16
16 from IPython.utils.traitlets import HasTraits
17 # internal
17 from IPython.utils.localinterfaces import LOCALHOST
18 from IPython.config.configurable import Configurable
19 from IPython.utils.traitlets import Instance, Str, Dict
20 # from IPython.utils.localinterfaces import LOCALHOST
18
21
19 from streamsession import Message, StreamSession
22 from streamsession import Message, StreamSession
20 from client import Client
21 from streamkernel import Kernel, make_kernel
23 from streamkernel import Kernel, make_kernel
22 import heartmonitor
24 import heartmonitor
23 from entry_point import make_base_argument_parser, connect_logger, parse_url
25 from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url,
26 local_logger)
24 # import taskthread
27 # import taskthread
25 # from log import logger
28 logger = logging.getLogger()
26
27
29
28 def printer(*msg):
30 def printer(*msg):
29 pprint(msg, stream=sys.__stdout__)
31 # print (logger.handlers, file=sys.__stdout__)
32 logger.info(str(msg))
30
33
31 class Engine(object):
34 class Engine(Configurable):
32 """IPython engine"""
35 """IPython engine"""
33
36
34 id=None
35 context=None
36 loop=None
37 session=None
38 ident=None
39 registrar=None
40 heart=None
41 kernel=None
37 kernel=None
42 user_ns=None
38 id=None
43
39
44 def __init__(self, context, loop, session, registrar, client=None, ident=None,
40 # configurables:
45 heart_id=None, user_ns=None):
41 context=Instance(zmq.Context)
46 self.context = context
42 loop=Instance(ioloop.IOLoop)
47 self.loop = loop
43 session=Instance(StreamSession)
48 self.session = session
44 ident=Str()
49 self.registrar = registrar
45 registrar=Instance(zmqstream.ZMQStream)
50 self.client = client
46 user_ns=Dict()
51 self.ident = ident if ident else str(uuid.uuid4())
47
48 def __init__(self, **kwargs):
49 super(Engine, self).__init__(**kwargs)
50 if not self.ident:
51 self.ident = str(uuid.uuid4())
52 self.registrar.on_send(printer)
52 self.registrar.on_send(printer)
53 self.user_ns = user_ns
54
53
55 def register(self):
54 def register(self):
56
55
57 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
56 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
58 self.registrar.on_recv(self.complete_registration)
57 self.registrar.on_recv(self.complete_registration)
59 # print (self.session.key)
58 # print (self.session.key)
60 self.session.send(self.registrar, "registration_request",content=content)
59 self.session.send(self.registrar, "registration_request",content=content)
61
60
62 def complete_registration(self, msg):
61 def complete_registration(self, msg):
63 # print msg
62 # print msg
64 idents,msg = self.session.feed_identities(msg)
63 idents,msg = self.session.feed_identities(msg)
65 msg = Message(self.session.unpack_message(msg))
64 msg = Message(self.session.unpack_message(msg))
66 if msg.content.status == 'ok':
65 if msg.content.status == 'ok':
67 self.session.username = str(msg.content.id)
66 self.id = int(msg.content.id)
68 queue_addr = msg.content.queue
67 self.session.username = 'engine-%i'%self.id
69 shell_addrs = [str(queue_addr)]
68 queue_addr = msg.content.mux
69 shell_addrs = [ str(queue_addr) ]
70 control_addr = str(msg.content.control)
70 control_addr = str(msg.content.control)
71 task_addr = msg.content.task
71 task_addr = msg.content.task
72 iopub_addr = msg.content.iopub
72 iopub_addr = msg.content.iopub
73 if task_addr:
73 if task_addr:
74 shell_addrs.append(str(task_addr))
74 shell_addrs.append(str(task_addr))
75
75
76 hb_addrs = msg.content.heartbeat
76 hb_addrs = msg.content.heartbeat
77 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
77 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
78 k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
78 k = make_kernel(self.id, self.ident, control_addr, shell_addrs, iopub_addr,
79 hb_addrs, client_addr=None, loop=self.loop,
79 hb_addrs, client_addr=None, loop=self.loop,
80 context=self.context, key=self.session.key)[-1]
80 context=self.context, key=self.session.key)[-1]
81 self.kernel = k
81 self.kernel = k
82 if self.user_ns is not None:
82 if self.user_ns is not None:
83 self.user_ns.update(self.kernel.user_ns)
83 self.user_ns.update(self.kernel.user_ns)
84 self.kernel.user_ns = self.user_ns
84 self.kernel.user_ns = self.user_ns
85
85
86 else:
86 else:
87 # logger.error("Registration Failed: %s"%msg)
87 logger.error("Registration Failed: %s"%msg)
88 raise Exception("Registration Failed: %s"%msg)
88 raise Exception("Registration Failed: %s"%msg)
89
89
90 # logger.info("engine::completed registration with id %s"%self.session.username)
90 logger.info("completed registration with id %i"%self.id)
91
91
92 print (msg,file=sys.__stdout__)
92 # logger.info(str(msg))
93
93
94 def unregister(self):
94 def unregister(self):
95 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
95 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
96 time.sleep(1)
96 time.sleep(1)
97 sys.exit(0)
97 sys.exit(0)
98
98
99 def start(self):
99 def start(self):
100 print ("registering",file=sys.__stdout__)
100 logger.info("registering")
101 self.register()
101 self.register()
102
102
103
103
104
104
105 def main(argv=None, user_ns=None):
105 def main(argv=None, user_ns=None):
106
106
107 parser = make_base_argument_parser()
107 parser = make_base_argument_parser()
108
108
109 args = parser.parse_args(argv)
109 args = parser.parse_args(argv)
110
110
111 parse_url(args)
111 parse_url(args)
112
112
113 iface="%s://%s"%(args.transport,args.ip)+':%i'
113 iface="%s://%s"%(args.transport,args.ip)+':%i'
114
114
115 loop = ioloop.IOLoop.instance()
115 loop = ioloop.IOLoop.instance()
116 session = StreamSession(keyfile=args.execkey)
116 session = StreamSession(keyfile=args.execkey)
117 # print (session.key)
117 # print (session.key)
118 ctx = zmq.Context()
118 ctx = zmq.Context()
119
119
120 # setup logging
120 # setup logging
121 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
122
121
123 reg_conn = iface % args.regport
122 reg_conn = iface % args.regport
124 print (reg_conn, file=sys.__stdout__)
123 print (reg_conn, file=sys.__stdout__)
125 print ("Starting the engine...", file=sys.__stderr__)
124 print ("Starting the engine...", file=sys.__stderr__)
126
125
127 reg = ctx.socket(zmq.PAIR)
126 reg = ctx.socket(zmq.PAIR)
128 reg.connect(reg_conn)
127 reg.connect(reg_conn)
129 reg = zmqstream.ZMQStream(reg, loop)
128 reg = zmqstream.ZMQStream(reg, loop)
130 client = None
131
129
132 e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns)
130 e = Engine(context=ctx, loop=loop, session=session, registrar=reg,
133 dc = ioloop.DelayedCallback(e.start, 100, loop)
131 ident=args.ident or '', user_ns=user_ns)
132 if args.logport:
133 print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__)
134 connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel)
135 else:
136 local_logger(args.loglevel)
137
138 dc = ioloop.DelayedCallback(e.start, 0, loop)
134 dc.start()
139 dc.start()
135 loop.start()
140 loop.start()
136
141
137 # Execution as a script
142 # Execution as a script
138 if __name__ == '__main__':
143 if __name__ == '__main__':
139 main()
144 main()
@@ -1,121 +1,147 b''
1 """ Defines helper functions for creating kernel entry points and process
1 """ Defines helper functions for creating kernel entry points and process
2 launchers.
2 launchers.
3 """
3 """
4
4
5 # Standard library imports.
5 # Standard library imports.
6 import logging
6 import logging
7 import atexit
7 import atexit
8 import sys
8 import sys
9 import os
9 import os
10 import stat
10 import stat
11 import socket
11 import socket
12 from subprocess import Popen, PIPE
12 from subprocess import Popen, PIPE
13 from signal import signal, SIGINT, SIGABRT, SIGTERM
13 from signal import signal, SIGINT, SIGABRT, SIGTERM
14 try:
14 try:
15 from signal import SIGKILL
15 from signal import SIGKILL
16 except ImportError:
16 except ImportError:
17 SIGKILL=None
17 SIGKILL=None
18
18
19 # System library imports.
19 # System library imports.
20 import zmq
20 import zmq
21 from zmq.log import handlers
21 from zmq.log import handlers
22 # Local imports.
22 # Local imports.
23 from IPython.core.ultratb import FormattedTB
23 from IPython.core.ultratb import FormattedTB
24 from IPython.external.argparse import ArgumentParser
24 from IPython.external.argparse import ArgumentParser
25 from IPython.zmq.log import logger
25 from IPython.zmq.log import EnginePUBHandler
26
26
27 def split_ports(s, n):
27 def split_ports(s, n):
28 """Parser helper for multiport strings"""
28 """Parser helper for multiport strings"""
29 if not s:
29 if not s:
30 return tuple([0]*n)
30 return tuple([0]*n)
31 ports = map(int, s.split(','))
31 ports = map(int, s.split(','))
32 if len(ports) != n:
32 if len(ports) != n:
33 raise ValueError
33 raise ValueError
34 return ports
34 return ports
35
35
36 def select_random_ports(n):
36 def select_random_ports(n):
37 """Selects and return n random ports that are available."""
37 """Selects and return n random ports that are available."""
38 ports = []
38 ports = []
39 for i in xrange(n):
39 for i in xrange(n):
40 sock = socket.socket()
40 sock = socket.socket()
41 sock.bind(('', 0))
41 sock.bind(('', 0))
42 ports.append(sock)
42 ports.append(sock)
43 for i, sock in enumerate(ports):
43 for i, sock in enumerate(ports):
44 port = sock.getsockname()[1]
44 port = sock.getsockname()[1]
45 sock.close()
45 sock.close()
46 ports[i] = port
46 ports[i] = port
47 return ports
47 return ports
48
48
49 def parse_url(args):
49 def parse_url(args):
50 """Ensure args.url contains full transport://interface:port"""
50 """Ensure args.url contains full transport://interface:port"""
51 if args.url:
51 if args.url:
52 iface = args.url.split('://',1)
52 iface = args.url.split('://',1)
53 if len(args) == 2:
53 if len(args) == 2:
54 args.transport,iface = iface
54 args.transport,iface = iface
55 iface = iface.split(':')
55 iface = iface.split(':')
56 args.ip = iface[0]
56 args.ip = iface[0]
57 if iface[1]:
57 if iface[1]:
58 args.regport = iface[1]
58 args.regport = iface[1]
59 args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
59 args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
60
60
61 def signal_children(children):
61 def signal_children(children):
62 """Relay interupt/term signals to children, for more solid process cleanup."""
62 """Relay interupt/term signals to children, for more solid process cleanup."""
63 def terminate_children(sig, frame):
63 def terminate_children(sig, frame):
64 for child in children:
64 for child in children:
65 child.terminate()
65 child.terminate()
66 # sys.exit(sig)
66 # sys.exit(sig)
67 for sig in (SIGINT, SIGABRT, SIGTERM):
67 for sig in (SIGINT, SIGABRT, SIGTERM):
68 signal(sig, terminate_children)
68 signal(sig, terminate_children)
69
69
70 def generate_exec_key(keyfile):
70 def generate_exec_key(keyfile):
71 import uuid
71 import uuid
72 newkey = str(uuid.uuid4())
72 newkey = str(uuid.uuid4())
73 with open(keyfile, 'w') as f:
73 with open(keyfile, 'w') as f:
74 # f.write('ipython-key ')
74 # f.write('ipython-key ')
75 f.write(newkey)
75 f.write(newkey)
76 # set user-only RW permissions (0600)
76 # set user-only RW permissions (0600)
77 # this will have no effect on Windows
77 # this will have no effect on Windows
78 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
78 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
79
79
80
80
81 def make_base_argument_parser():
81 def make_base_argument_parser():
82 """ Creates an ArgumentParser for the generic arguments supported by all
82 """ Creates an ArgumentParser for the generic arguments supported by all
83 ipcluster entry points.
83 ipcluster entry points.
84 """
84 """
85
85 parser = ArgumentParser()
86 parser = ArgumentParser()
86 parser.add_argument('--ip', type=str, default='127.0.0.1',
87 parser.add_argument('--ip', type=str, default='127.0.0.1',
87 help='set the controller\'s IP address [default: local]')
88 help='set the controller\'s IP address [default: local]')
88 parser.add_argument('--transport', type=str, default='tcp',
89 parser.add_argument('--transport', type=str, default='tcp',
89 help='set the transport to use [default: tcp]')
90 help='set the transport to use [default: tcp]')
90 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
91 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
91 help='set the XREP port for registration [default: 10101]')
92 help='set the XREP port for registration [default: 10101]')
92 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
93 parser.add_argument('--logport', type=int, metavar='PORT', default=0,
93 help='set the PUB port for logging [default: 10201]')
94 help='set the PUB port for remote logging [default: log to stdout]')
94 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG,
95 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO,
95 help='set the log level [default: DEBUG]')
96 help='set the log level [default: INFO]')
96 parser.add_argument('--ident', type=str,
97 parser.add_argument('--ident', type=str,
97 help='set the ZMQ identity [default: random]')
98 help='set the ZMQ identity [default: random]')
98 parser.add_argument('--packer', type=str, default='json',
99 parser.add_argument('--packer', type=str, default='json',
99 choices=['json','pickle'],
100 choices=['json','pickle'],
100 help='set the message format method [default: json]')
101 help='set the message format method [default: json]')
101 parser.add_argument('--url', type=str,
102 parser.add_argument('--url', type=str,
102 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
103 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
103 parser.add_argument('--execkey', type=str,
104 parser.add_argument('--execkey', type=str,
104 help="File containing key for authenticating requests.")
105 help="File containing key for authenticating requests.")
105
106
106 return parser
107 return parser
107
108
108
109 def integer_loglevel(loglevel):
109 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
110 try:
110 try:
111 loglevel = int(loglevel)
111 loglevel = int(loglevel)
112 except ValueError:
112 except ValueError:
113 if isinstance(loglevel, str):
113 if isinstance(loglevel, str):
114 loglevel = getattr(logging, loglevel)
114 loglevel = getattr(logging, loglevel)
115 return loglevel
116
117 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
118 loglevel = integer_loglevel(loglevel)
115 lsock = context.socket(zmq.PUB)
119 lsock = context.socket(zmq.PUB)
116 lsock.connect(iface)
120 lsock.connect(iface)
117 handler = handlers.PUBHandler(lsock)
121 handler = handlers.PUBHandler(lsock)
118 handler.setLevel(loglevel)
122 handler.setLevel(loglevel)
119 handler.root_topic = root
123 handler.root_topic = root
124 logger = logging.getLogger()
125 logger.addHandler(handler)
126 logger.setLevel(loglevel)
127
128 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
129 logger = logging.getLogger()
130 loglevel = integer_loglevel(loglevel)
131 lsock = context.socket(zmq.PUB)
132 lsock.connect(iface)
133 handler = EnginePUBHandler(engine, lsock)
134 handler.setLevel(loglevel)
135 logger.addHandler(handler)
136 logger.setLevel(loglevel)
137
138 def local_logger(loglevel=logging.DEBUG):
139 loglevel = integer_loglevel(loglevel)
140 logger = logging.getLogger()
141 if logger.handlers:
142 # if there are any handlers, skip the hookup
143 return
144 handler = logging.StreamHandler()
145 handler.setLevel(loglevel)
120 logger.addHandler(handler)
146 logger.addHandler(handler)
121 No newline at end of file
147 logger.setLevel(loglevel)
@@ -1,171 +1,159 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
4 and hearts are tracked based on their XREQ identities.
5 """
5 """
6
6
7 from __future__ import print_function
7 from __future__ import print_function
8 import time
8 import time
9 import uuid
9 import uuid
10 import logging
10
11
11 import zmq
12 import zmq
12 from zmq.devices import ProcessDevice,ThreadDevice
13 from zmq.devices import ProcessDevice,ThreadDevice
13 from zmq.eventloop import ioloop, zmqstream
14 from zmq.eventloop import ioloop, zmqstream
14
15
15 #internal
16 logger = logging.getLogger()
16 from IPython.zmq.log import logger
17
17
18 class Heart(object):
18 class Heart(object):
19 """A basic heart object for responding to a HeartMonitor.
19 """A basic heart object for responding to a HeartMonitor.
20 This is a simple wrapper with defaults for the most common
20 This is a simple wrapper with defaults for the most common
21 Device model for responding to heartbeats.
21 Device model for responding to heartbeats.
22
22
23 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
23 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
24 SUB/XREQ for in/out.
24 SUB/XREQ for in/out.
25
25
26 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
26 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
27 device=None
27 device=None
28 id=None
28 id=None
29 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
29 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
30 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
30 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
31 self.device.daemon=True
31 self.device.daemon=True
32 self.device.connect_in(in_addr)
32 self.device.connect_in(in_addr)
33 self.device.connect_out(out_addr)
33 self.device.connect_out(out_addr)
34 if in_type == zmq.SUB:
34 if in_type == zmq.SUB:
35 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
35 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
36 if heart_id is None:
36 if heart_id is None:
37 heart_id = str(uuid.uuid4())
37 heart_id = str(uuid.uuid4())
38 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
38 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
39 self.id = heart_id
39 self.id = heart_id
40
40
41 def start(self):
41 def start(self):
42 return self.device.start()
42 return self.device.start()
43
43
44 class HeartMonitor(object):
44 class HeartMonitor(object):
45 """A basic HeartMonitor class
45 """A basic HeartMonitor class
46 pingstream: a PUB stream
46 pingstream: a PUB stream
47 pongstream: an XREP stream
47 pongstream: an XREP stream
48 period: the period of the heartbeat in milliseconds"""
48 period: the period of the heartbeat in milliseconds"""
49 loop=None
49 loop=None
50 pingstream=None
50 pingstream=None
51 pongstream=None
51 pongstream=None
52 period=None
52 period=None
53 hearts=None
53 hearts=None
54 on_probation=None
54 on_probation=None
55 last_ping=None
55 last_ping=None
56 # debug=False
56
57
57 def __init__(self, loop, pingstream, pongstream, period=1000):
58 def __init__(self, loop, pingstream, pongstream, period=1000):
58 self.loop = loop
59 self.loop = loop
59 self.period = period
60 self.period = period
60
61
61 self.pingstream = pingstream
62 self.pingstream = pingstream
62 self.pongstream = pongstream
63 self.pongstream = pongstream
63 self.pongstream.on_recv(self.handle_pong)
64 self.pongstream.on_recv(self.handle_pong)
64
65
65 self.hearts = set()
66 self.hearts = set()
66 self.responses = set()
67 self.responses = set()
67 self.on_probation = set()
68 self.on_probation = set()
68 self.lifetime = 0
69 self.lifetime = 0
69 self.tic = time.time()
70 self.tic = time.time()
70
71
71 self._new_handlers = set()
72 self._new_handlers = set()
72 self._failure_handlers = set()
73 self._failure_handlers = set()
73
74
74 def start(self):
75 def start(self):
75 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
76 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
76 self.caller.start()
77 self.caller.start()
77
78
78 def add_new_heart_handler(self, handler):
79 def add_new_heart_handler(self, handler):
79 """add a new handler for new hearts"""
80 """add a new handler for new hearts"""
80 logger.debug("heartbeat::new_heart_handler: %s"%handler)
81 logger.debug("heartbeat::new_heart_handler: %s"%handler)
81 self._new_handlers.add(handler)
82 self._new_handlers.add(handler)
82
83
83 def add_heart_failure_handler(self, handler):
84 def add_heart_failure_handler(self, handler):
84 """add a new handler for heart failure"""
85 """add a new handler for heart failure"""
85 logger.debug("heartbeat::new heart failure handler: %s"%handler)
86 logger.debug("heartbeat::new heart failure handler: %s"%handler)
86 self._failure_handlers.add(handler)
87 self._failure_handlers.add(handler)
87
88 # def _flush(self):
89 # """override IOLoop triggers"""
90 # while True:
91 # try:
92 # msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK)
93 # logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled")
94 # except zmq.ZMQError:
95 # return
96 # else:
97 # self.handle_pong(msg)
98 # # print '.'
99 #
100
88
101 def beat(self):
89 def beat(self):
102 self.pongstream.flush()
90 self.pongstream.flush()
103 self.last_ping = self.lifetime
91 self.last_ping = self.lifetime
104
92
105 toc = time.time()
93 toc = time.time()
106 self.lifetime += toc-self.tic
94 self.lifetime += toc-self.tic
107 self.tic = toc
95 self.tic = toc
108 logger.debug("heartbeat::%s"%self.lifetime)
96 # logger.debug("heartbeat::%s"%self.lifetime)
109 goodhearts = self.hearts.intersection(self.responses)
97 goodhearts = self.hearts.intersection(self.responses)
110 missed_beats = self.hearts.difference(goodhearts)
98 missed_beats = self.hearts.difference(goodhearts)
111 heartfailures = self.on_probation.intersection(missed_beats)
99 heartfailures = self.on_probation.intersection(missed_beats)
112 newhearts = self.responses.difference(goodhearts)
100 newhearts = self.responses.difference(goodhearts)
113 map(self.handle_new_heart, newhearts)
101 map(self.handle_new_heart, newhearts)
114 map(self.handle_heart_failure, heartfailures)
102 map(self.handle_heart_failure, heartfailures)
115 self.on_probation = missed_beats.intersection(self.hearts)
103 self.on_probation = missed_beats.intersection(self.hearts)
116 self.responses = set()
104 self.responses = set()
117 # print self.on_probation, self.hearts
105 # print self.on_probation, self.hearts
118 # logger.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
106 # logger.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
119 self.pingstream.send(str(self.lifetime))
107 self.pingstream.send(str(self.lifetime))
120
108
121 def handle_new_heart(self, heart):
109 def handle_new_heart(self, heart):
122 if self._new_handlers:
110 if self._new_handlers:
123 for handler in self._new_handlers:
111 for handler in self._new_handlers:
124 handler(heart)
112 handler(heart)
125 else:
113 else:
126 logger.info("heartbeat::yay, got new heart %s!"%heart)
114 logger.info("heartbeat::yay, got new heart %s!"%heart)
127 self.hearts.add(heart)
115 self.hearts.add(heart)
128
116
129 def handle_heart_failure(self, heart):
117 def handle_heart_failure(self, heart):
130 if self._failure_handlers:
118 if self._failure_handlers:
131 for handler in self._failure_handlers:
119 for handler in self._failure_handlers:
132 try:
120 try:
133 handler(heart)
121 handler(heart)
134 except Exception as e:
122 except Exception as e:
135 print (e)
123 print (e)
136 logger.error("heartbeat::Bad Handler! %s"%handler)
124 logger.error("heartbeat::Bad Handler! %s"%handler)
137 pass
125 pass
138 else:
126 else:
139 logger.info("heartbeat::Heart %s failed :("%heart)
127 logger.info("heartbeat::Heart %s failed :("%heart)
140 self.hearts.remove(heart)
128 self.hearts.remove(heart)
141
129
142
130
143 def handle_pong(self, msg):
131 def handle_pong(self, msg):
144 "a heart just beat"
132 "a heart just beat"
145 if msg[1] == str(self.lifetime):
133 if msg[1] == str(self.lifetime):
146 delta = time.time()-self.tic
134 delta = time.time()-self.tic
147 logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
135 # logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
148 self.responses.add(msg[0])
136 self.responses.add(msg[0])
149 elif msg[1] == str(self.last_ping):
137 elif msg[1] == str(self.last_ping):
150 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
138 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
151 logger.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
139 logger.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
152 self.responses.add(msg[0])
140 self.responses.add(msg[0])
153 else:
141 else:
154 logger.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
142 logger.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
155 (msg[1],self.lifetime))
143 (msg[1],self.lifetime))
156
144
157
145
158 if __name__ == '__main__':
146 if __name__ == '__main__':
159 loop = ioloop.IOLoop.instance()
147 loop = ioloop.IOLoop.instance()
160 context = zmq.Context()
148 context = zmq.Context()
161 pub = context.socket(zmq.PUB)
149 pub = context.socket(zmq.PUB)
162 pub.bind('tcp://127.0.0.1:5555')
150 pub.bind('tcp://127.0.0.1:5555')
163 xrep = context.socket(zmq.XREP)
151 xrep = context.socket(zmq.XREP)
164 xrep.bind('tcp://127.0.0.1:5556')
152 xrep.bind('tcp://127.0.0.1:5556')
165
153
166 outstream = zmqstream.ZMQStream(pub, loop)
154 outstream = zmqstream.ZMQStream(pub, loop)
167 instream = zmqstream.ZMQStream(xrep, loop)
155 instream = zmqstream.ZMQStream(xrep, loop)
168
156
169 hb = HeartMonitor(loop, outstream, instream)
157 hb = HeartMonitor(loop, outstream, instream)
170
158
171 loop.start()
159 loop.start()
@@ -1,889 +1,906 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 from datetime import datetime
19 from datetime import datetime
20 import time
20 import time
21 import logging
21
22
22 import zmq
23 import zmq
23 from zmq.eventloop import ioloop
24 from zmq.eventloop import ioloop, zmqstream
24
25
25 # internal:
26 # internal:
26 from IPython.zmq.log import logger # a Logger object
27 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict
29 # from IPython.zmq.log import logger # a Logger object
27
30
28 from streamsession import Message, wrap_exception, ISO8601
31 from streamsession import Message, wrap_exception, ISO8601
32 from heartmonitor import HeartMonitor
33 from util import validate_url_container
29
34
30 try:
35 try:
31 from pymongo.binary import Binary
36 from pymongo.binary import Binary
32 except ImportError:
37 except ImportError:
33 MongoDB=None
38 MongoDB=None
34 else:
39 else:
35 from mongodb import MongoDB
40 from mongodb import MongoDB
36
41
37 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
38 # Code
43 # Code
39 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
40
45
46 logger = logging.getLogger()
47
41 def _passer(*args, **kwargs):
48 def _passer(*args, **kwargs):
42 return
49 return
43
50
44 def _printer(*args, **kwargs):
51 def _printer(*args, **kwargs):
45 print (args)
52 print (args)
46 print (kwargs)
53 print (kwargs)
47
54
48 def init_record(msg):
55 def init_record(msg):
49 """return an empty TaskRecord dict, with all keys initialized with None."""
56 """Initialize a TaskRecord based on a request."""
50 header = msg['header']
57 header = msg['header']
51 return {
58 return {
52 'msg_id' : header['msg_id'],
59 'msg_id' : header['msg_id'],
53 'header' : header,
60 'header' : header,
54 'content': msg['content'],
61 'content': msg['content'],
55 'buffers': msg['buffers'],
62 'buffers': msg['buffers'],
56 'submitted': datetime.strptime(header['date'], ISO8601),
63 'submitted': datetime.strptime(header['date'], ISO8601),
57 'client_uuid' : None,
64 'client_uuid' : None,
58 'engine_uuid' : None,
65 'engine_uuid' : None,
59 'started': None,
66 'started': None,
60 'completed': None,
67 'completed': None,
61 'resubmitted': None,
68 'resubmitted': None,
62 'result_header' : None,
69 'result_header' : None,
63 'result_content' : None,
70 'result_content' : None,
64 'result_buffers' : None,
71 'result_buffers' : None,
65 'queue' : None,
72 'queue' : None,
66 'pyin' : None,
73 'pyin' : None,
67 'pyout': None,
74 'pyout': None,
68 'pyerr': None,
75 'pyerr': None,
69 'stdout': '',
76 'stdout': '',
70 'stderr': '',
77 'stderr': '',
71 }
78 }
72
79
73
80
74 class EngineConnector(object):
81 class EngineConnector(HasTraits):
75 """A simple object for accessing the various zmq connections of an object.
82 """A simple object for accessing the various zmq connections of an object.
76 Attributes are:
83 Attributes are:
77 id (int): engine ID
84 id (int): engine ID
78 uuid (str): uuid (unused?)
85 uuid (str): uuid (unused?)
79 queue (str): identity of queue's XREQ socket
86 queue (str): identity of queue's XREQ socket
80 registration (str): identity of registration XREQ socket
87 registration (str): identity of registration XREQ socket
81 heartbeat (str): identity of heartbeat XREQ socket
88 heartbeat (str): identity of heartbeat XREQ socket
82 """
89 """
83 id=0
90 id=Int(0)
84 queue=None
91 queue=Str()
85 control=None
92 control=Str()
86 registration=None
93 registration=Str()
87 heartbeat=None
94 heartbeat=Str()
88 pending=None
95 pending=Instance(set)
89
96
90 def __init__(self, id, queue, registration, control, heartbeat=None):
97 def __init__(self, **kwargs):
91 logger.info("engine::Engine Connected: %i"%id)
98 super(EngineConnector, self).__init__(**kwargs)
92 self.id = id
99 logger.info("engine::Engine Connected: %i"%self.id)
93 self.queue = queue
100
94 self.registration = registration
101 class Hub(Configurable):
95 self.control = control
96 self.heartbeat = heartbeat
97
98 class Hub(object):
99 """The IPython Controller Hub with 0MQ connections
102 """The IPython Controller Hub with 0MQ connections
100
103
101 Parameters
104 Parameters
102 ==========
105 ==========
103 loop: zmq IOLoop instance
106 loop: zmq IOLoop instance
104 session: StreamSession object
107 session: StreamSession object
105 <removed> context: zmq context for creating new connections (?)
108 <removed> context: zmq context for creating new connections (?)
106 queue: ZMQStream for monitoring the command queue (SUB)
109 queue: ZMQStream for monitoring the command queue (SUB)
107 registrar: ZMQStream for engine registration requests (XREP)
110 registrar: ZMQStream for engine registration requests (XREP)
108 heartbeat: HeartMonitor object checking the pulse of the engines
111 heartbeat: HeartMonitor object checking the pulse of the engines
109 clientele: ZMQStream for client connections (XREP)
112 clientele: ZMQStream for client connections (XREP)
110 not used for jobs, only query/control commands
113 not used for jobs, only query/control commands
111 notifier: ZMQStream for broadcasting engine registration changes (PUB)
114 notifier: ZMQStream for broadcasting engine registration changes (PUB)
112 db: connection to db for out of memory logging of commands
115 db: connection to db for out of memory logging of commands
113 NotImplemented
116 NotImplemented
114 engine_addrs: dict of zmq connection information for engines to connect
117 engine_addrs: dict of zmq connection information for engines to connect
115 to the queues.
118 to the queues.
116 client_addrs: dict of zmq connection information for engines to connect
119 client_addrs: dict of zmq connection information for engines to connect
117 to the queues.
120 to the queues.
118 """
121 """
119 # internal data structures:
122 # internal data structures:
120 ids=None # engine IDs
123 ids=None # engine IDs
121 keytable=None
124 keytable=None
122 engines=None
125 engines=None
123 clients=None
126 clients=None
124 hearts=None
127 hearts=None
125 pending=None
128 pending=None
126 results=None
127 tasks=None
129 tasks=None
128 completed=None
130 completed=None
129 mia=None
131 # mia=None
130 incoming_registrations=None
132 incoming_registrations=None
131 registration_timeout=None
133 registration_timeout=None
132
134
133 #objects from constructor:
135 # objects from constructor:
134 loop=None
136 loop=Instance(ioloop.IOLoop)
135 registrar=None
137 registrar=Instance(zmqstream.ZMQStream)
136 clientelle=None
138 clientele=Instance(zmqstream.ZMQStream)
137 queue=None
139 monitor=Instance(zmqstream.ZMQStream)
138 heartbeat=None
140 heartmonitor=Instance(HeartMonitor)
139 notifier=None
141 notifier=Instance(zmqstream.ZMQStream)
140 db=None
142 db=Instance(object)
141 client_addr=None
143 client_addrs=Dict()
142 engine_addrs=None
144 engine_addrs=Dict()
143
145
144
146
145 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
147 def __init__(self, **kwargs):
146 """
148 """
147 # universal:
149 # universal:
148 loop: IOLoop for creating future connections
150 loop: IOLoop for creating future connections
149 session: streamsession for sending serialized data
151 session: streamsession for sending serialized data
150 # engine:
152 # engine:
151 queue: ZMQStream for monitoring queue messages
153 queue: ZMQStream for monitoring queue messages
152 registrar: ZMQStream for engine registration
154 registrar: ZMQStream for engine registration
153 heartbeat: HeartMonitor object for tracking engines
155 heartbeat: HeartMonitor object for tracking engines
154 # client:
156 # client:
155 clientele: ZMQStream for client connections
157 clientele: ZMQStream for client connections
156 # extra:
158 # extra:
157 db: ZMQStream for db connection (NotImplemented)
159 db: ZMQStream for db connection (NotImplemented)
158 engine_addrs: zmq address/protocol dict for engine connections
160 engine_addrs: zmq address/protocol dict for engine connections
159 client_addrs: zmq address/protocol dict for client connections
161 client_addrs: zmq address/protocol dict for client connections
160 """
162 """
163
164 super(Hub, self).__init__(**kwargs)
161 self.ids = set()
165 self.ids = set()
162 self.keytable={}
166 self.keytable={}
163 self.incoming_registrations={}
167 self.incoming_registrations={}
164 self.engines = {}
168 self.engines = {}
165 self.by_ident = {}
169 self.by_ident = {}
166 self.clients = {}
170 self.clients = {}
167 self.hearts = {}
171 self.hearts = {}
168 # self.mia = set()
172 # self.mia = set()
169
173 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
174 # this is the stuff that will move to DB:
175 self.pending = set() # pending messages, keyed by msg_id
176 self.queues = {} # pending msg_ids keyed by engine_id
177 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
178 self.completed = {} # completed msg_ids keyed by engine_id
179 self.all_completed = set()
180 self._idcounter = 0
170 # self.sockets = {}
181 # self.sockets = {}
171 self.loop = loop
182 # self.loop = loop
172 self.session = session
183 # self.session = session
173 self.registrar = registrar
184 # self.registrar = registrar
174 self.clientele = clientele
185 # self.clientele = clientele
175 self.queue = queue
186 # self.queue = queue
176 self.heartbeat = heartbeat
187 # self.heartmonitor = heartbeat
177 self.notifier = notifier
188 # self.notifier = notifier
178 self.db = db
189 # self.db = db
179
190
180 # validate connection dicts:
191 # validate connection dicts:
181 self.client_addrs = client_addrs
192 # self.client_addrs = client_addrs
182 assert isinstance(client_addrs['queue'], str)
193 validate_url_container(self.client_addrs)
183 assert isinstance(client_addrs['control'], str)
194
195 # assert isinstance(self.client_addrs['queue'], str)
196 # assert isinstance(self.client_addrs['control'], str)
184 # self.hb_addrs = hb_addrs
197 # self.hb_addrs = hb_addrs
185 self.engine_addrs = engine_addrs
198 validate_url_container(self.engine_addrs)
186 assert isinstance(engine_addrs['queue'], str)
199 # self.engine_addrs = engine_addrs
187 assert isinstance(client_addrs['control'], str)
200 # assert isinstance(self.engine_addrs['queue'], str)
188 assert len(engine_addrs['heartbeat']) == 2
201 # assert isinstance(self.engine_addrs['control'], str)
202 # assert len(engine_addrs['heartbeat']) == 2
189
203
190 # register our callbacks
204 # register our callbacks
191 self.registrar.on_recv(self.dispatch_register_request)
205 self.registrar.on_recv(self.dispatch_register_request)
192 self.clientele.on_recv(self.dispatch_client_msg)
206 self.clientele.on_recv(self.dispatch_client_msg)
193 self.queue.on_recv(self.dispatch_monitor_traffic)
207 self.monitor.on_recv(self.dispatch_monitor_traffic)
194
208
195 if heartbeat is not None:
209 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
196 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
210 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
197 heartbeat.add_new_heart_handler(self.handle_new_heart)
198
211
199 self.monitor_handlers = { 'in' : self.save_queue_request,
212 self.monitor_handlers = { 'in' : self.save_queue_request,
200 'out': self.save_queue_result,
213 'out': self.save_queue_result,
201 'intask': self.save_task_request,
214 'intask': self.save_task_request,
202 'outtask': self.save_task_result,
215 'outtask': self.save_task_result,
203 'tracktask': self.save_task_destination,
216 'tracktask': self.save_task_destination,
204 'incontrol': _passer,
217 'incontrol': _passer,
205 'outcontrol': _passer,
218 'outcontrol': _passer,
206 'iopub': self.save_iopub_message,
219 'iopub': self.save_iopub_message,
207 }
220 }
208
221
209 self.client_handlers = {'queue_request': self.queue_status,
222 self.client_handlers = {'queue_request': self.queue_status,
210 'result_request': self.get_results,
223 'result_request': self.get_results,
211 'purge_request': self.purge_results,
224 'purge_request': self.purge_results,
212 'load_request': self.check_load,
225 'load_request': self.check_load,
213 'resubmit_request': self.resubmit_task,
226 'resubmit_request': self.resubmit_task,
214 'shutdown_request': self.shutdown_request,
227 'shutdown_request': self.shutdown_request,
215 }
228 }
216
229
217 self.registrar_handlers = {'registration_request' : self.register_engine,
230 self.registrar_handlers = {'registration_request' : self.register_engine,
218 'unregistration_request' : self.unregister_engine,
231 'unregistration_request' : self.unregister_engine,
219 'connection_request': self.connection_request,
232 'connection_request': self.connection_request,
220 }
233 }
221 self.registration_timeout = max(5000, 2*self.heartbeat.period)
222 # this is the stuff that will move to DB:
223 # self.results = {} # completed results
224 self.pending = set() # pending messages, keyed by msg_id
225 self.queues = {} # pending msg_ids keyed by engine_id
226 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
227 self.completed = {} # completed msg_ids keyed by engine_id
228 self.all_completed = set()
229
234
230 logger.info("controller::created controller")
235 logger.info("controller::created controller")
231
236
232 def _new_id(self):
237 @property
238 def _next_id(self):
233 """gemerate a new ID"""
239 """gemerate a new ID"""
234 newid = 0
240 newid = self._idcounter
235 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
241 self._idcounter += 1
236 # print newid, self.ids, self.incoming_registrations
237 while newid in self.ids or newid in incoming:
238 newid += 1
239 return newid
242 return newid
243 # newid = 0
244 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
245 # # print newid, self.ids, self.incoming_registrations
246 # while newid in self.ids or newid in incoming:
247 # newid += 1
248 # return newid
240
249
241 #-----------------------------------------------------------------------------
250 #-----------------------------------------------------------------------------
242 # message validation
251 # message validation
243 #-----------------------------------------------------------------------------
252 #-----------------------------------------------------------------------------
244
253
245 def _validate_targets(self, targets):
254 def _validate_targets(self, targets):
246 """turn any valid targets argument into a list of integer ids"""
255 """turn any valid targets argument into a list of integer ids"""
247 if targets is None:
256 if targets is None:
248 # default to all
257 # default to all
249 targets = self.ids
258 targets = self.ids
250
259
251 if isinstance(targets, (int,str,unicode)):
260 if isinstance(targets, (int,str,unicode)):
252 # only one target specified
261 # only one target specified
253 targets = [targets]
262 targets = [targets]
254 _targets = []
263 _targets = []
255 for t in targets:
264 for t in targets:
256 # map raw identities to ids
265 # map raw identities to ids
257 if isinstance(t, (str,unicode)):
266 if isinstance(t, (str,unicode)):
258 t = self.by_ident.get(t, t)
267 t = self.by_ident.get(t, t)
259 _targets.append(t)
268 _targets.append(t)
260 targets = _targets
269 targets = _targets
261 bad_targets = [ t for t in targets if t not in self.ids ]
270 bad_targets = [ t for t in targets if t not in self.ids ]
262 if bad_targets:
271 if bad_targets:
263 raise IndexError("No Such Engine: %r"%bad_targets)
272 raise IndexError("No Such Engine: %r"%bad_targets)
264 if not targets:
273 if not targets:
265 raise IndexError("No Engines Registered")
274 raise IndexError("No Engines Registered")
266 return targets
275 return targets
267
276
268 def _validate_client_msg(self, msg):
277 def _validate_client_msg(self, msg):
269 """validates and unpacks headers of a message. Returns False if invalid,
278 """validates and unpacks headers of a message. Returns False if invalid,
270 (ident, header, parent, content)"""
279 (ident, header, parent, content)"""
271 client_id = msg[0]
280 client_id = msg[0]
272 try:
281 try:
273 msg = self.session.unpack_message(msg[1:], content=True)
282 msg = self.session.unpack_message(msg[1:], content=True)
274 except:
283 except:
275 logger.error("client::Invalid Message %s"%msg, exc_info=True)
284 logger.error("client::Invalid Message %s"%msg, exc_info=True)
276 return False
285 return False
277
286
278 msg_type = msg.get('msg_type', None)
287 msg_type = msg.get('msg_type', None)
279 if msg_type is None:
288 if msg_type is None:
280 return False
289 return False
281 header = msg.get('header')
290 header = msg.get('header')
282 # session doesn't handle split content for now:
291 # session doesn't handle split content for now:
283 return client_id, msg
292 return client_id, msg
284
293
285
294
286 #-----------------------------------------------------------------------------
295 #-----------------------------------------------------------------------------
287 # dispatch methods (1 per stream)
296 # dispatch methods (1 per stream)
288 #-----------------------------------------------------------------------------
297 #-----------------------------------------------------------------------------
289
298
290 def dispatch_register_request(self, msg):
299 def dispatch_register_request(self, msg):
291 """"""
300 """"""
292 logger.debug("registration::dispatch_register_request(%s)"%msg)
301 logger.debug("registration::dispatch_register_request(%s)"%msg)
293 idents,msg = self.session.feed_identities(msg)
302 idents,msg = self.session.feed_identities(msg)
294 if not idents:
303 if not idents:
295 logger.error("Bad Queue Message: %s"%msg, exc_info=True)
304 logger.error("Bad Queue Message: %s"%msg, exc_info=True)
296 return
305 return
297 try:
306 try:
298 msg = self.session.unpack_message(msg,content=True)
307 msg = self.session.unpack_message(msg,content=True)
299 except:
308 except:
300 logger.error("registration::got bad registration message: %s"%msg, exc_info=True)
309 logger.error("registration::got bad registration message: %s"%msg, exc_info=True)
301 return
310 return
302
311
303 msg_type = msg['msg_type']
312 msg_type = msg['msg_type']
304 content = msg['content']
313 content = msg['content']
305
314
306 handler = self.registrar_handlers.get(msg_type, None)
315 handler = self.registrar_handlers.get(msg_type, None)
307 if handler is None:
316 if handler is None:
308 logger.error("registration::got bad registration message: %s"%msg)
317 logger.error("registration::got bad registration message: %s"%msg)
309 else:
318 else:
310 handler(idents, msg)
319 handler(idents, msg)
311
320
312 def dispatch_monitor_traffic(self, msg):
321 def dispatch_monitor_traffic(self, msg):
313 """all ME and Task queue messages come through here, as well as
322 """all ME and Task queue messages come through here, as well as
314 IOPub traffic."""
323 IOPub traffic."""
315 logger.debug("monitor traffic: %s"%msg[:2])
324 logger.debug("monitor traffic: %s"%msg[:2])
316 switch = msg[0]
325 switch = msg[0]
317 idents, msg = self.session.feed_identities(msg[1:])
326 idents, msg = self.session.feed_identities(msg[1:])
318 if not idents:
327 if not idents:
319 logger.error("Bad Monitor Message: %s"%msg)
328 logger.error("Bad Monitor Message: %s"%msg)
320 return
329 return
321 handler = self.monitor_handlers.get(switch, None)
330 handler = self.monitor_handlers.get(switch, None)
322 if handler is not None:
331 if handler is not None:
323 handler(idents, msg)
332 handler(idents, msg)
324 else:
333 else:
325 logger.error("Invalid monitor topic: %s"%switch)
334 logger.error("Invalid monitor topic: %s"%switch)
326
335
327
336
328 def dispatch_client_msg(self, msg):
337 def dispatch_client_msg(self, msg):
329 """Route messages from clients"""
338 """Route messages from clients"""
330 idents, msg = self.session.feed_identities(msg)
339 idents, msg = self.session.feed_identities(msg)
331 if not idents:
340 if not idents:
332 logger.error("Bad Client Message: %s"%msg)
341 logger.error("Bad Client Message: %s"%msg)
333 return
342 return
334 client_id = idents[0]
343 client_id = idents[0]
335 try:
344 try:
336 msg = self.session.unpack_message(msg, content=True)
345 msg = self.session.unpack_message(msg, content=True)
337 except:
346 except:
338 content = wrap_exception()
347 content = wrap_exception()
339 logger.error("Bad Client Message: %s"%msg, exc_info=True)
348 logger.error("Bad Client Message: %s"%msg, exc_info=True)
340 self.session.send(self.clientele, "controller_error", ident=client_id,
349 self.session.send(self.clientele, "controller_error", ident=client_id,
341 content=content)
350 content=content)
342 return
351 return
343
352
344 # print client_id, header, parent, content
353 # print client_id, header, parent, content
345 #switch on message type:
354 #switch on message type:
346 msg_type = msg['msg_type']
355 msg_type = msg['msg_type']
347 logger.info("client:: client %s requested %s"%(client_id, msg_type))
356 logger.info("client:: client %s requested %s"%(client_id, msg_type))
348 handler = self.client_handlers.get(msg_type, None)
357 handler = self.client_handlers.get(msg_type, None)
349 try:
358 try:
350 assert handler is not None, "Bad Message Type: %s"%msg_type
359 assert handler is not None, "Bad Message Type: %s"%msg_type
351 except:
360 except:
352 content = wrap_exception()
361 content = wrap_exception()
353 logger.error("Bad Message Type: %s"%msg_type, exc_info=True)
362 logger.error("Bad Message Type: %s"%msg_type, exc_info=True)
354 self.session.send(self.clientele, "controller_error", ident=client_id,
363 self.session.send(self.clientele, "controller_error", ident=client_id,
355 content=content)
364 content=content)
356 return
365 return
357 else:
366 else:
358 handler(client_id, msg)
367 handler(client_id, msg)
359
368
360 def dispatch_db(self, msg):
369 def dispatch_db(self, msg):
361 """"""
370 """"""
362 raise NotImplementedError
371 raise NotImplementedError
363
372
364 #---------------------------------------------------------------------------
373 #---------------------------------------------------------------------------
365 # handler methods (1 per event)
374 # handler methods (1 per event)
366 #---------------------------------------------------------------------------
375 #---------------------------------------------------------------------------
367
376
368 #----------------------- Heartbeat --------------------------------------
377 #----------------------- Heartbeat --------------------------------------
369
378
370 def handle_new_heart(self, heart):
379 def handle_new_heart(self, heart):
371 """handler to attach to heartbeater.
380 """handler to attach to heartbeater.
372 Called when a new heart starts to beat.
381 Called when a new heart starts to beat.
373 Triggers completion of registration."""
382 Triggers completion of registration."""
374 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
383 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
375 if heart not in self.incoming_registrations:
384 if heart not in self.incoming_registrations:
376 logger.info("heartbeat::ignoring new heart: %r"%heart)
385 logger.info("heartbeat::ignoring new heart: %r"%heart)
377 else:
386 else:
378 self.finish_registration(heart)
387 self.finish_registration(heart)
379
388
380
389
381 def handle_heart_failure(self, heart):
390 def handle_heart_failure(self, heart):
382 """handler to attach to heartbeater.
391 """handler to attach to heartbeater.
383 called when a previously registered heart fails to respond to beat request.
392 called when a previously registered heart fails to respond to beat request.
384 triggers unregistration"""
393 triggers unregistration"""
385 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
394 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
386 eid = self.hearts.get(heart, None)
395 eid = self.hearts.get(heart, None)
387 queue = self.engines[eid].queue
396 queue = self.engines[eid].queue
388 if eid is None:
397 if eid is None:
389 logger.info("heartbeat::ignoring heart failure %r"%heart)
398 logger.info("heartbeat::ignoring heart failure %r"%heart)
390 else:
399 else:
391 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
400 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
392
401
393 #----------------------- MUX Queue Traffic ------------------------------
402 #----------------------- MUX Queue Traffic ------------------------------
394
403
395 def save_queue_request(self, idents, msg):
404 def save_queue_request(self, idents, msg):
396 if len(idents) < 2:
405 if len(idents) < 2:
397 logger.error("invalid identity prefix: %s"%idents)
406 logger.error("invalid identity prefix: %s"%idents)
398 return
407 return
399 queue_id, client_id = idents[:2]
408 queue_id, client_id = idents[:2]
400 try:
409 try:
401 msg = self.session.unpack_message(msg, content=False)
410 msg = self.session.unpack_message(msg, content=False)
402 except:
411 except:
403 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
412 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
404 return
413 return
405
414
406 eid = self.by_ident.get(queue_id, None)
415 eid = self.by_ident.get(queue_id, None)
407 if eid is None:
416 if eid is None:
408 logger.error("queue::target %r not registered"%queue_id)
417 logger.error("queue::target %r not registered"%queue_id)
409 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
418 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
410 return
419 return
411
420
412 header = msg['header']
421 header = msg['header']
413 msg_id = header['msg_id']
422 msg_id = header['msg_id']
414 record = init_record(msg)
423 record = init_record(msg)
415 record['engine_uuid'] = queue_id
424 record['engine_uuid'] = queue_id
416 record['client_uuid'] = client_id
425 record['client_uuid'] = client_id
417 record['queue'] = 'mux'
426 record['queue'] = 'mux'
418 if MongoDB is not None and isinstance(self.db, MongoDB):
427 if MongoDB is not None and isinstance(self.db, MongoDB):
419 record['buffers'] = map(Binary, record['buffers'])
428 record['buffers'] = map(Binary, record['buffers'])
420 self.pending.add(msg_id)
429 self.pending.add(msg_id)
421 self.queues[eid].append(msg_id)
430 self.queues[eid].append(msg_id)
422 self.db.add_record(msg_id, record)
431 self.db.add_record(msg_id, record)
423
432
424 def save_queue_result(self, idents, msg):
433 def save_queue_result(self, idents, msg):
425 if len(idents) < 2:
434 if len(idents) < 2:
426 logger.error("invalid identity prefix: %s"%idents)
435 logger.error("invalid identity prefix: %s"%idents)
427 return
436 return
428
437
429 client_id, queue_id = idents[:2]
438 client_id, queue_id = idents[:2]
430 try:
439 try:
431 msg = self.session.unpack_message(msg, content=False)
440 msg = self.session.unpack_message(msg, content=False)
432 except:
441 except:
433 logger.error("queue::engine %r sent invalid message to %r: %s"%(
442 logger.error("queue::engine %r sent invalid message to %r: %s"%(
434 queue_id,client_id, msg), exc_info=True)
443 queue_id,client_id, msg), exc_info=True)
435 return
444 return
436
445
437 eid = self.by_ident.get(queue_id, None)
446 eid = self.by_ident.get(queue_id, None)
438 if eid is None:
447 if eid is None:
439 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
448 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
440 logger.debug("queue:: %s"%msg[2:])
449 logger.debug("queue:: %s"%msg[2:])
441 return
450 return
442
451
443 parent = msg['parent_header']
452 parent = msg['parent_header']
444 if not parent:
453 if not parent:
445 return
454 return
446 msg_id = parent['msg_id']
455 msg_id = parent['msg_id']
447 if msg_id in self.pending:
456 if msg_id in self.pending:
448 self.pending.remove(msg_id)
457 self.pending.remove(msg_id)
449 self.all_completed.add(msg_id)
458 self.all_completed.add(msg_id)
450 self.queues[eid].remove(msg_id)
459 self.queues[eid].remove(msg_id)
451 self.completed[eid].append(msg_id)
460 self.completed[eid].append(msg_id)
452 rheader = msg['header']
461 rheader = msg['header']
453 completed = datetime.strptime(rheader['date'], ISO8601)
462 completed = datetime.strptime(rheader['date'], ISO8601)
454 started = rheader.get('started', None)
463 started = rheader.get('started', None)
455 if started is not None:
464 if started is not None:
456 started = datetime.strptime(started, ISO8601)
465 started = datetime.strptime(started, ISO8601)
457 result = {
466 result = {
458 'result_header' : rheader,
467 'result_header' : rheader,
459 'result_content': msg['content'],
468 'result_content': msg['content'],
460 'started' : started,
469 'started' : started,
461 'completed' : completed
470 'completed' : completed
462 }
471 }
463 if MongoDB is not None and isinstance(self.db, MongoDB):
472 if MongoDB is not None and isinstance(self.db, MongoDB):
464 result['result_buffers'] = map(Binary, msg['buffers'])
473 result['result_buffers'] = map(Binary, msg['buffers'])
465 else:
474 else:
466 result['result_buffers'] = msg['buffers']
475 result['result_buffers'] = msg['buffers']
467 self.db.update_record(msg_id, result)
476 self.db.update_record(msg_id, result)
468 else:
477 else:
469 logger.debug("queue:: unknown msg finished %s"%msg_id)
478 logger.debug("queue:: unknown msg finished %s"%msg_id)
470
479
471 #--------------------- Task Queue Traffic ------------------------------
480 #--------------------- Task Queue Traffic ------------------------------
472
481
473 def save_task_request(self, idents, msg):
482 def save_task_request(self, idents, msg):
474 """Save the submission of a task."""
483 """Save the submission of a task."""
475 client_id = idents[0]
484 client_id = idents[0]
476
485
477 try:
486 try:
478 msg = self.session.unpack_message(msg, content=False)
487 msg = self.session.unpack_message(msg, content=False)
479 except:
488 except:
480 logger.error("task::client %r sent invalid task message: %s"%(
489 logger.error("task::client %r sent invalid task message: %s"%(
481 client_id, msg), exc_info=True)
490 client_id, msg), exc_info=True)
482 return
491 return
483 record = init_record(msg)
492 record = init_record(msg)
484 if MongoDB is not None and isinstance(self.db, MongoDB):
493 if MongoDB is not None and isinstance(self.db, MongoDB):
485 record['buffers'] = map(Binary, record['buffers'])
494 record['buffers'] = map(Binary, record['buffers'])
486 record['client_uuid'] = client_id
495 record['client_uuid'] = client_id
487 record['queue'] = 'task'
496 record['queue'] = 'task'
488 header = msg['header']
497 header = msg['header']
489 msg_id = header['msg_id']
498 msg_id = header['msg_id']
490 self.pending.add(msg_id)
499 self.pending.add(msg_id)
491 self.db.add_record(msg_id, record)
500 self.db.add_record(msg_id, record)
492
501
493 def save_task_result(self, idents, msg):
502 def save_task_result(self, idents, msg):
494 """save the result of a completed task."""
503 """save the result of a completed task."""
495 client_id = idents[0]
504 client_id = idents[0]
496 try:
505 try:
497 msg = self.session.unpack_message(msg, content=False)
506 msg = self.session.unpack_message(msg, content=False)
498 except:
507 except:
499 logger.error("task::invalid task result message send to %r: %s"%(
508 logger.error("task::invalid task result message send to %r: %s"%(
500 client_id, msg), exc_info=True)
509 client_id, msg), exc_info=True)
501 raise
510 raise
502 return
511 return
503
512
504 parent = msg['parent_header']
513 parent = msg['parent_header']
505 if not parent:
514 if not parent:
506 # print msg
515 # print msg
507 logger.warn("Task %r had no parent!"%msg)
516 logger.warn("Task %r had no parent!"%msg)
508 return
517 return
509 msg_id = parent['msg_id']
518 msg_id = parent['msg_id']
510
519
511 header = msg['header']
520 header = msg['header']
512 engine_uuid = header.get('engine', None)
521 engine_uuid = header.get('engine', None)
513 eid = self.by_ident.get(engine_uuid, None)
522 eid = self.by_ident.get(engine_uuid, None)
514
523
515 if msg_id in self.pending:
524 if msg_id in self.pending:
516 self.pending.remove(msg_id)
525 self.pending.remove(msg_id)
517 self.all_completed.add(msg_id)
526 self.all_completed.add(msg_id)
518 if eid is not None:
527 if eid is not None:
519 self.completed[eid].append(msg_id)
528 self.completed[eid].append(msg_id)
520 if msg_id in self.tasks[eid]:
529 if msg_id in self.tasks[eid]:
521 self.tasks[eid].remove(msg_id)
530 self.tasks[eid].remove(msg_id)
522 completed = datetime.strptime(header['date'], ISO8601)
531 completed = datetime.strptime(header['date'], ISO8601)
523 started = header.get('started', None)
532 started = header.get('started', None)
524 if started is not None:
533 if started is not None:
525 started = datetime.strptime(started, ISO8601)
534 started = datetime.strptime(started, ISO8601)
526 result = {
535 result = {
527 'result_header' : header,
536 'result_header' : header,
528 'result_content': msg['content'],
537 'result_content': msg['content'],
529 'started' : started,
538 'started' : started,
530 'completed' : completed,
539 'completed' : completed,
531 'engine_uuid': engine_uuid
540 'engine_uuid': engine_uuid
532 }
541 }
533 if MongoDB is not None and isinstance(self.db, MongoDB):
542 if MongoDB is not None and isinstance(self.db, MongoDB):
534 result['result_buffers'] = map(Binary, msg['buffers'])
543 result['result_buffers'] = map(Binary, msg['buffers'])
535 else:
544 else:
536 result['result_buffers'] = msg['buffers']
545 result['result_buffers'] = msg['buffers']
537 self.db.update_record(msg_id, result)
546 self.db.update_record(msg_id, result)
538
547
539 else:
548 else:
540 logger.debug("task::unknown task %s finished"%msg_id)
549 logger.debug("task::unknown task %s finished"%msg_id)
541
550
542 def save_task_destination(self, idents, msg):
551 def save_task_destination(self, idents, msg):
543 try:
552 try:
544 msg = self.session.unpack_message(msg, content=True)
553 msg = self.session.unpack_message(msg, content=True)
545 except:
554 except:
546 logger.error("task::invalid task tracking message", exc_info=True)
555 logger.error("task::invalid task tracking message", exc_info=True)
547 return
556 return
548 content = msg['content']
557 content = msg['content']
549 print (content)
558 print (content)
550 msg_id = content['msg_id']
559 msg_id = content['msg_id']
551 engine_uuid = content['engine_id']
560 engine_uuid = content['engine_id']
552 eid = self.by_ident[engine_uuid]
561 eid = self.by_ident[engine_uuid]
553
562
554 logger.info("task::task %s arrived on %s"%(msg_id, eid))
563 logger.info("task::task %s arrived on %s"%(msg_id, eid))
555 # if msg_id in self.mia:
564 # if msg_id in self.mia:
556 # self.mia.remove(msg_id)
565 # self.mia.remove(msg_id)
557 # else:
566 # else:
558 # logger.debug("task::task %s not listed as MIA?!"%(msg_id))
567 # logger.debug("task::task %s not listed as MIA?!"%(msg_id))
559
568
560 self.tasks[eid].append(msg_id)
569 self.tasks[eid].append(msg_id)
561 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
570 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
562 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
571 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
563
572
564 def mia_task_request(self, idents, msg):
573 def mia_task_request(self, idents, msg):
565 raise NotImplementedError
574 raise NotImplementedError
566 client_id = idents[0]
575 client_id = idents[0]
567 # content = dict(mia=self.mia,status='ok')
576 # content = dict(mia=self.mia,status='ok')
568 # self.session.send('mia_reply', content=content, idents=client_id)
577 # self.session.send('mia_reply', content=content, idents=client_id)
569
578
570
579
571 #--------------------- IOPub Traffic ------------------------------
580 #--------------------- IOPub Traffic ------------------------------
572
581
573 def save_iopub_message(self, topics, msg):
582 def save_iopub_message(self, topics, msg):
574 """save an iopub message into the db"""
583 """save an iopub message into the db"""
575 print (topics)
584 print (topics)
576 try:
585 try:
577 msg = self.session.unpack_message(msg, content=True)
586 msg = self.session.unpack_message(msg, content=True)
578 except:
587 except:
579 logger.error("iopub::invalid IOPub message", exc_info=True)
588 logger.error("iopub::invalid IOPub message", exc_info=True)
580 return
589 return
581
590
582 parent = msg['parent_header']
591 parent = msg['parent_header']
592 if not parent:
593 logger.error("iopub::invalid IOPub message: %s"%msg)
594 return
583 msg_id = parent['msg_id']
595 msg_id = parent['msg_id']
584 msg_type = msg['msg_type']
596 msg_type = msg['msg_type']
585 content = msg['content']
597 content = msg['content']
586
598
587 # ensure msg_id is in db
599 # ensure msg_id is in db
588 try:
600 try:
589 rec = self.db.get_record(msg_id)
601 rec = self.db.get_record(msg_id)
590 except:
602 except:
591 logger.error("iopub::IOPub message has invalid parent", exc_info=True)
603 logger.error("iopub::IOPub message has invalid parent", exc_info=True)
592 return
604 return
593 # stream
605 # stream
594 d = {}
606 d = {}
595 if msg_type == 'stream':
607 if msg_type == 'stream':
596 name = content['name']
608 name = content['name']
597 s = rec[name] or ''
609 s = rec[name] or ''
598 d[name] = s + content['data']
610 d[name] = s + content['data']
599
611
600 elif msg_type == 'pyerr':
612 elif msg_type == 'pyerr':
601 d['pyerr'] = content
613 d['pyerr'] = content
602 else:
614 else:
603 d[msg_type] = content['data']
615 d[msg_type] = content['data']
604
616
605 self.db.update_record(msg_id, d)
617 self.db.update_record(msg_id, d)
606
618
607
619
608
620
609 #-------------------------------------------------------------------------
621 #-------------------------------------------------------------------------
610 # Registration requests
622 # Registration requests
611 #-------------------------------------------------------------------------
623 #-------------------------------------------------------------------------
612
624
613 def connection_request(self, client_id, msg):
625 def connection_request(self, client_id, msg):
614 """Reply with connection addresses for clients."""
626 """Reply with connection addresses for clients."""
615 logger.info("client::client %s connected"%client_id)
627 logger.info("client::client %s connected"%client_id)
616 content = dict(status='ok')
628 content = dict(status='ok')
617 content.update(self.client_addrs)
629 content.update(self.client_addrs)
618 jsonable = {}
630 jsonable = {}
619 for k,v in self.keytable.iteritems():
631 for k,v in self.keytable.iteritems():
620 jsonable[str(k)] = v
632 jsonable[str(k)] = v
621 content['engines'] = jsonable
633 content['engines'] = jsonable
622 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
634 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
623
635
624 def register_engine(self, reg, msg):
636 def register_engine(self, reg, msg):
625 """Register a new engine."""
637 """Register a new engine."""
626 content = msg['content']
638 content = msg['content']
627 try:
639 try:
628 queue = content['queue']
640 queue = content['queue']
629 except KeyError:
641 except KeyError:
630 logger.error("registration::queue not specified", exc_info=True)
642 logger.error("registration::queue not specified", exc_info=True)
631 return
643 return
632 heart = content.get('heartbeat', None)
644 heart = content.get('heartbeat', None)
633 """register a new engine, and create the socket(s) necessary"""
645 """register a new engine, and create the socket(s) necessary"""
634 eid = self._new_id()
646 eid = self._next_id
635 # print (eid, queue, reg, heart)
647 # print (eid, queue, reg, heart)
636
648
637 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
649 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
638
650
639 content = dict(id=eid,status='ok')
651 content = dict(id=eid,status='ok')
640 content.update(self.engine_addrs)
652 content.update(self.engine_addrs)
641 # check if requesting available IDs:
653 # check if requesting available IDs:
642 if queue in self.by_ident:
654 if queue in self.by_ident:
643 try:
655 try:
644 raise KeyError("queue_id %r in use"%queue)
656 raise KeyError("queue_id %r in use"%queue)
645 except:
657 except:
646 content = wrap_exception()
658 content = wrap_exception()
659 logger.error("queue_id %r in use"%queue, exc_info=True)
647 elif heart in self.hearts: # need to check unique hearts?
660 elif heart in self.hearts: # need to check unique hearts?
648 try:
661 try:
649 raise KeyError("heart_id %r in use"%heart)
662 raise KeyError("heart_id %r in use"%heart)
650 except:
663 except:
664 logger.error("heart_id %r in use"%heart, exc_info=True)
651 content = wrap_exception()
665 content = wrap_exception()
652 else:
666 else:
653 for h, pack in self.incoming_registrations.iteritems():
667 for h, pack in self.incoming_registrations.iteritems():
654 if heart == h:
668 if heart == h:
655 try:
669 try:
656 raise KeyError("heart_id %r in use"%heart)
670 raise KeyError("heart_id %r in use"%heart)
657 except:
671 except:
672 logger.error("heart_id %r in use"%heart, exc_info=True)
658 content = wrap_exception()
673 content = wrap_exception()
659 break
674 break
660 elif queue == pack[1]:
675 elif queue == pack[1]:
661 try:
676 try:
662 raise KeyError("queue_id %r in use"%queue)
677 raise KeyError("queue_id %r in use"%queue)
663 except:
678 except:
679 logger.error("queue_id %r in use"%queue, exc_info=True)
664 content = wrap_exception()
680 content = wrap_exception()
665 break
681 break
666
682
667 msg = self.session.send(self.registrar, "registration_reply",
683 msg = self.session.send(self.registrar, "registration_reply",
668 content=content,
684 content=content,
669 ident=reg)
685 ident=reg)
670
686
671 if content['status'] == 'ok':
687 if content['status'] == 'ok':
672 if heart in self.heartbeat.hearts:
688 if heart in self.heartmonitor.hearts:
673 # already beating
689 # already beating
674 self.incoming_registrations[heart] = (eid,queue,reg,None)
690 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
675 self.finish_registration(heart)
691 self.finish_registration(heart)
676 else:
692 else:
677 purge = lambda : self._purge_stalled_registration(heart)
693 purge = lambda : self._purge_stalled_registration(heart)
678 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
694 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
679 dc.start()
695 dc.start()
680 self.incoming_registrations[heart] = (eid,queue,reg,dc)
696 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
681 else:
697 else:
682 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
698 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
683 return eid
699 return eid
684
700
685 def unregister_engine(self, ident, msg):
701 def unregister_engine(self, ident, msg):
686 """Unregister an engine that explicitly requested to leave."""
702 """Unregister an engine that explicitly requested to leave."""
687 try:
703 try:
688 eid = msg['content']['id']
704 eid = msg['content']['id']
689 except:
705 except:
690 logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
706 logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
691 return
707 return
692 logger.info("registration::unregister_engine(%s)"%eid)
708 logger.info("registration::unregister_engine(%s)"%eid)
693 content=dict(id=eid, queue=self.engines[eid].queue)
709 content=dict(id=eid, queue=self.engines[eid].queue)
694 self.ids.remove(eid)
710 self.ids.remove(eid)
695 self.keytable.pop(eid)
711 self.keytable.pop(eid)
696 ec = self.engines.pop(eid)
712 ec = self.engines.pop(eid)
697 self.hearts.pop(ec.heartbeat)
713 self.hearts.pop(ec.heartbeat)
698 self.by_ident.pop(ec.queue)
714 self.by_ident.pop(ec.queue)
699 self.completed.pop(eid)
715 self.completed.pop(eid)
700 for msg_id in self.queues.pop(eid):
716 for msg_id in self.queues.pop(eid):
701 msg = self.pending.remove(msg_id)
717 msg = self.pending.remove(msg_id)
702 ############## TODO: HANDLE IT ################
718 ############## TODO: HANDLE IT ################
703
719
704 if self.notifier:
720 if self.notifier:
705 self.session.send(self.notifier, "unregistration_notification", content=content)
721 self.session.send(self.notifier, "unregistration_notification", content=content)
706
722
707 def finish_registration(self, heart):
723 def finish_registration(self, heart):
708 """Second half of engine registration, called after our HeartMonitor
724 """Second half of engine registration, called after our HeartMonitor
709 has received a beat from the Engine's Heart."""
725 has received a beat from the Engine's Heart."""
710 try:
726 try:
711 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
727 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
712 except KeyError:
728 except KeyError:
713 logger.error("registration::tried to finish nonexistant registration", exc_info=True)
729 logger.error("registration::tried to finish nonexistant registration", exc_info=True)
714 return
730 return
715 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
731 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
716 if purge is not None:
732 if purge is not None:
717 purge.stop()
733 purge.stop()
718 control = queue
734 control = queue
719 self.ids.add(eid)
735 self.ids.add(eid)
720 self.keytable[eid] = queue
736 self.keytable[eid] = queue
721 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
737 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
738 control=control, heartbeat=heart)
722 self.by_ident[queue] = eid
739 self.by_ident[queue] = eid
723 self.queues[eid] = list()
740 self.queues[eid] = list()
724 self.tasks[eid] = list()
741 self.tasks[eid] = list()
725 self.completed[eid] = list()
742 self.completed[eid] = list()
726 self.hearts[heart] = eid
743 self.hearts[heart] = eid
727 content = dict(id=eid, queue=self.engines[eid].queue)
744 content = dict(id=eid, queue=self.engines[eid].queue)
728 if self.notifier:
745 if self.notifier:
729 self.session.send(self.notifier, "registration_notification", content=content)
746 self.session.send(self.notifier, "registration_notification", content=content)
730
747
731 def _purge_stalled_registration(self, heart):
748 def _purge_stalled_registration(self, heart):
732 if heart in self.incoming_registrations:
749 if heart in self.incoming_registrations:
733 eid = self.incoming_registrations.pop(heart)[0]
750 eid = self.incoming_registrations.pop(heart)[0]
734 logger.info("registration::purging stalled registration: %i"%eid)
751 logger.info("registration::purging stalled registration: %i"%eid)
735 else:
752 else:
736 pass
753 pass
737
754
738 #-------------------------------------------------------------------------
755 #-------------------------------------------------------------------------
739 # Client Requests
756 # Client Requests
740 #-------------------------------------------------------------------------
757 #-------------------------------------------------------------------------
741
758
742 def shutdown_request(self, client_id, msg):
759 def shutdown_request(self, client_id, msg):
743 """handle shutdown request."""
760 """handle shutdown request."""
744 # s = self.context.socket(zmq.XREQ)
761 # s = self.context.socket(zmq.XREQ)
745 # s.connect(self.client_connections['mux'])
762 # s.connect(self.client_connections['mux'])
746 # time.sleep(0.1)
763 # time.sleep(0.1)
747 # for eid,ec in self.engines.iteritems():
764 # for eid,ec in self.engines.iteritems():
748 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
765 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
749 # time.sleep(1)
766 # time.sleep(1)
750 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
767 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
751 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
768 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
752 dc.start()
769 dc.start()
753
770
754 def _shutdown(self):
771 def _shutdown(self):
755 logger.info("controller::controller shutting down.")
772 logger.info("controller::controller shutting down.")
756 time.sleep(0.1)
773 time.sleep(0.1)
757 sys.exit(0)
774 sys.exit(0)
758
775
759
776
760 def check_load(self, client_id, msg):
777 def check_load(self, client_id, msg):
761 content = msg['content']
778 content = msg['content']
762 try:
779 try:
763 targets = content['targets']
780 targets = content['targets']
764 targets = self._validate_targets(targets)
781 targets = self._validate_targets(targets)
765 except:
782 except:
766 content = wrap_exception()
783 content = wrap_exception()
767 self.session.send(self.clientele, "controller_error",
784 self.session.send(self.clientele, "controller_error",
768 content=content, ident=client_id)
785 content=content, ident=client_id)
769 return
786 return
770
787
771 content = dict(status='ok')
788 content = dict(status='ok')
772 # loads = {}
789 # loads = {}
773 for t in targets:
790 for t in targets:
774 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
791 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
775 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
792 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
776
793
777
794
778 def queue_status(self, client_id, msg):
795 def queue_status(self, client_id, msg):
779 """Return the Queue status of one or more targets.
796 """Return the Queue status of one or more targets.
780 if verbose: return the msg_ids
797 if verbose: return the msg_ids
781 else: return len of each type.
798 else: return len of each type.
782 keys: queue (pending MUX jobs)
799 keys: queue (pending MUX jobs)
783 tasks (pending Task jobs)
800 tasks (pending Task jobs)
784 completed (finished jobs from both queues)"""
801 completed (finished jobs from both queues)"""
785 content = msg['content']
802 content = msg['content']
786 targets = content['targets']
803 targets = content['targets']
787 try:
804 try:
788 targets = self._validate_targets(targets)
805 targets = self._validate_targets(targets)
789 except:
806 except:
790 content = wrap_exception()
807 content = wrap_exception()
791 self.session.send(self.clientele, "controller_error",
808 self.session.send(self.clientele, "controller_error",
792 content=content, ident=client_id)
809 content=content, ident=client_id)
793 return
810 return
794 verbose = content.get('verbose', False)
811 verbose = content.get('verbose', False)
795 content = dict(status='ok')
812 content = dict(status='ok')
796 for t in targets:
813 for t in targets:
797 queue = self.queues[t]
814 queue = self.queues[t]
798 completed = self.completed[t]
815 completed = self.completed[t]
799 tasks = self.tasks[t]
816 tasks = self.tasks[t]
800 if not verbose:
817 if not verbose:
801 queue = len(queue)
818 queue = len(queue)
802 completed = len(completed)
819 completed = len(completed)
803 tasks = len(tasks)
820 tasks = len(tasks)
804 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
821 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
805 # pending
822 # pending
806 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
823 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
807
824
808 def purge_results(self, client_id, msg):
825 def purge_results(self, client_id, msg):
809 """Purge results from memory. This method is more valuable before we move
826 """Purge results from memory. This method is more valuable before we move
810 to a DB based message storage mechanism."""
827 to a DB based message storage mechanism."""
811 content = msg['content']
828 content = msg['content']
812 msg_ids = content.get('msg_ids', [])
829 msg_ids = content.get('msg_ids', [])
813 reply = dict(status='ok')
830 reply = dict(status='ok')
814 if msg_ids == 'all':
831 if msg_ids == 'all':
815 self.db.drop_matching_records(dict(completed={'$ne':None}))
832 self.db.drop_matching_records(dict(completed={'$ne':None}))
816 else:
833 else:
817 for msg_id in msg_ids:
834 for msg_id in msg_ids:
818 if msg_id in self.all_completed:
835 if msg_id in self.all_completed:
819 self.db.drop_record(msg_id)
836 self.db.drop_record(msg_id)
820 else:
837 else:
821 if msg_id in self.pending:
838 if msg_id in self.pending:
822 try:
839 try:
823 raise IndexError("msg pending: %r"%msg_id)
840 raise IndexError("msg pending: %r"%msg_id)
824 except:
841 except:
825 reply = wrap_exception()
842 reply = wrap_exception()
826 else:
843 else:
827 try:
844 try:
828 raise IndexError("No such msg: %r"%msg_id)
845 raise IndexError("No such msg: %r"%msg_id)
829 except:
846 except:
830 reply = wrap_exception()
847 reply = wrap_exception()
831 break
848 break
832 eids = content.get('engine_ids', [])
849 eids = content.get('engine_ids', [])
833 for eid in eids:
850 for eid in eids:
834 if eid not in self.engines:
851 if eid not in self.engines:
835 try:
852 try:
836 raise IndexError("No such engine: %i"%eid)
853 raise IndexError("No such engine: %i"%eid)
837 except:
854 except:
838 reply = wrap_exception()
855 reply = wrap_exception()
839 break
856 break
840 msg_ids = self.completed.pop(eid)
857 msg_ids = self.completed.pop(eid)
841 uid = self.engines[eid].queue
858 uid = self.engines[eid].queue
842 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
859 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
843
860
844 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
861 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
845
862
846 def resubmit_task(self, client_id, msg, buffers):
863 def resubmit_task(self, client_id, msg, buffers):
847 """Resubmit a task."""
864 """Resubmit a task."""
848 raise NotImplementedError
865 raise NotImplementedError
849
866
850 def get_results(self, client_id, msg):
867 def get_results(self, client_id, msg):
851 """Get the result of 1 or more messages."""
868 """Get the result of 1 or more messages."""
852 content = msg['content']
869 content = msg['content']
853 msg_ids = sorted(set(content['msg_ids']))
870 msg_ids = sorted(set(content['msg_ids']))
854 statusonly = content.get('status_only', False)
871 statusonly = content.get('status_only', False)
855 pending = []
872 pending = []
856 completed = []
873 completed = []
857 content = dict(status='ok')
874 content = dict(status='ok')
858 content['pending'] = pending
875 content['pending'] = pending
859 content['completed'] = completed
876 content['completed'] = completed
860 buffers = []
877 buffers = []
861 if not statusonly:
878 if not statusonly:
862 content['results'] = {}
879 content['results'] = {}
863 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
880 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
864 for msg_id in msg_ids:
881 for msg_id in msg_ids:
865 if msg_id in self.pending:
882 if msg_id in self.pending:
866 pending.append(msg_id)
883 pending.append(msg_id)
867 elif msg_id in self.all_completed:
884 elif msg_id in self.all_completed:
868 completed.append(msg_id)
885 completed.append(msg_id)
869 if not statusonly:
886 if not statusonly:
870 rec = records[msg_id]
887 rec = records[msg_id]
871 io_dict = {}
888 io_dict = {}
872 for key in 'pyin pyout pyerr stdout stderr'.split():
889 for key in 'pyin pyout pyerr stdout stderr'.split():
873 io_dict[key] = rec[key]
890 io_dict[key] = rec[key]
874 content[msg_id] = { 'result_content': rec['result_content'],
891 content[msg_id] = { 'result_content': rec['result_content'],
875 'header': rec['header'],
892 'header': rec['header'],
876 'result_header' : rec['result_header'],
893 'result_header' : rec['result_header'],
877 'io' : io_dict,
894 'io' : io_dict,
878 }
895 }
879 buffers.extend(map(str, rec['result_buffers']))
896 buffers.extend(map(str, rec['result_buffers']))
880 else:
897 else:
881 try:
898 try:
882 raise KeyError('No such message: '+msg_id)
899 raise KeyError('No such message: '+msg_id)
883 except:
900 except:
884 content = wrap_exception()
901 content = wrap_exception()
885 break
902 break
886 self.session.send(self.clientele, "result_reply", content=content,
903 self.session.send(self.clientele, "result_reply", content=content,
887 parent=msg, ident=client_id,
904 parent=msg, ident=client_id,
888 buffers=buffers)
905 buffers=buffers)
889
906
@@ -1,424 +1,434 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6 """
6 """
7
7
8 #----------------------------------------------------------------------
8 #----------------------------------------------------------------------
9 # Imports
9 # Imports
10 #----------------------------------------------------------------------
10 #----------------------------------------------------------------------
11
11
12 from __future__ import print_function
12 from __future__ import print_function
13 from random import randint,random
13 from random import randint,random
14 import logging
15 from types import FunctionType
14
16
15 try:
17 try:
16 import numpy
18 import numpy
17 except ImportError:
19 except ImportError:
18 numpy = None
20 numpy = None
19
21
20 import zmq
22 import zmq
21 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
22
24
23 # local imports
25 # local imports
24 from IPython.zmq.log import logger # a Logger object
26 from IPython.external.decorator import decorator
27 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import Instance
29
25 from client import Client
30 from client import Client
26 from dependency import Dependency
31 from dependency import Dependency
27 import streamsession as ss
32 import streamsession as ss
33 from entry_point import connect_logger, local_logger
28
34
29 from IPython.external.decorator import decorator
35
36 logger = logging.getLogger()
30
37
31 @decorator
38 @decorator
32 def logged(f,self,*args,**kwargs):
39 def logged(f,self,*args,**kwargs):
33 # print ("#--------------------")
40 # print ("#--------------------")
34 # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
41 logger.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 # print ("#--")
42 # print ("#--")
36 return f(self,*args, **kwargs)
43 return f(self,*args, **kwargs)
37
44
38 #----------------------------------------------------------------------
45 #----------------------------------------------------------------------
39 # Chooser functions
46 # Chooser functions
40 #----------------------------------------------------------------------
47 #----------------------------------------------------------------------
41
48
42 def plainrandom(loads):
49 def plainrandom(loads):
43 """Plain random pick."""
50 """Plain random pick."""
44 n = len(loads)
51 n = len(loads)
45 return randint(0,n-1)
52 return randint(0,n-1)
46
53
47 def lru(loads):
54 def lru(loads):
48 """Always pick the front of the line.
55 """Always pick the front of the line.
49
56
50 The content of `loads` is ignored.
57 The content of `loads` is ignored.
51
58
52 Assumes LRU ordering of loads, with oldest first.
59 Assumes LRU ordering of loads, with oldest first.
53 """
60 """
54 return 0
61 return 0
55
62
56 def twobin(loads):
63 def twobin(loads):
57 """Pick two at random, use the LRU of the two.
64 """Pick two at random, use the LRU of the two.
58
65
59 The content of loads is ignored.
66 The content of loads is ignored.
60
67
61 Assumes LRU ordering of loads, with oldest first.
68 Assumes LRU ordering of loads, with oldest first.
62 """
69 """
63 n = len(loads)
70 n = len(loads)
64 a = randint(0,n-1)
71 a = randint(0,n-1)
65 b = randint(0,n-1)
72 b = randint(0,n-1)
66 return min(a,b)
73 return min(a,b)
67
74
68 def weighted(loads):
75 def weighted(loads):
69 """Pick two at random using inverse load as weight.
76 """Pick two at random using inverse load as weight.
70
77
71 Return the less loaded of the two.
78 Return the less loaded of the two.
72 """
79 """
73 # weight 0 a million times more than 1:
80 # weight 0 a million times more than 1:
74 weights = 1./(1e-6+numpy.array(loads))
81 weights = 1./(1e-6+numpy.array(loads))
75 sums = weights.cumsum()
82 sums = weights.cumsum()
76 t = sums[-1]
83 t = sums[-1]
77 x = random()*t
84 x = random()*t
78 y = random()*t
85 y = random()*t
79 idx = 0
86 idx = 0
80 idy = 0
87 idy = 0
81 while sums[idx] < x:
88 while sums[idx] < x:
82 idx += 1
89 idx += 1
83 while sums[idy] < y:
90 while sums[idy] < y:
84 idy += 1
91 idy += 1
85 if weights[idy] > weights[idx]:
92 if weights[idy] > weights[idx]:
86 return idy
93 return idy
87 else:
94 else:
88 return idx
95 return idx
89
96
90 def leastload(loads):
97 def leastload(loads):
91 """Always choose the lowest load.
98 """Always choose the lowest load.
92
99
93 If the lowest load occurs more than once, the first
100 If the lowest load occurs more than once, the first
94 occurance will be used. If loads has LRU ordering, this means
101 occurance will be used. If loads has LRU ordering, this means
95 the LRU of those with the lowest load is chosen.
102 the LRU of those with the lowest load is chosen.
96 """
103 """
97 return loads.index(min(loads))
104 return loads.index(min(loads))
98
105
99 #---------------------------------------------------------------------
106 #---------------------------------------------------------------------
100 # Classes
107 # Classes
101 #---------------------------------------------------------------------
108 #---------------------------------------------------------------------
102 class TaskScheduler(object):
109 class TaskScheduler(Configurable):
103 """Python TaskScheduler object.
110 """Python TaskScheduler object.
104
111
105 This is the simplest object that supports msg_id based
112 This is the simplest object that supports msg_id based
106 DAG dependencies. *Only* task msg_ids are checked, not
113 DAG dependencies. *Only* task msg_ids are checked, not
107 msg_ids of jobs submitted via the MUX queue.
114 msg_ids of jobs submitted via the MUX queue.
108
115
109 """
116 """
110
117
111 scheme = leastload # function for determining the destination
118 # configurables:
112 client_stream = None # client-facing stream
119 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
113 engine_stream = None # engine-facing stream
120 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
114 mon_stream = None # controller-facing stream
121 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
122 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
123 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
124 io_loop = Instance(ioloop.IOLoop)
125
126 # internals:
115 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
127 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
116 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
128 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
117 pending = None # dict by engine_uuid of submitted tasks
129 pending = None # dict by engine_uuid of submitted tasks
118 completed = None # dict by engine_uuid of completed tasks
130 completed = None # dict by engine_uuid of completed tasks
119 clients = None # dict by msg_id for who submitted the task
131 clients = None # dict by msg_id for who submitted the task
120 targets = None # list of target IDENTs
132 targets = None # list of target IDENTs
121 loads = None # list of engine loads
133 loads = None # list of engine loads
122 all_done = None # set of all completed tasks
134 all_done = None # set of all completed tasks
123 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
135 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
124
136
125
137
126 def __init__(self, client_stream, engine_stream, mon_stream,
138 def __init__(self, **kwargs):
127 notifier_stream, scheme=None, io_loop=None):
139 super(TaskScheduler, self).__init__(**kwargs)
128 if io_loop is None:
129 io_loop = ioloop.IOLoop.instance()
130 self.io_loop = io_loop
131 self.client_stream = client_stream
132 self.engine_stream = engine_stream
133 self.mon_stream = mon_stream
134 self.notifier_stream = notifier_stream
135
136 if scheme is not None:
137 self.scheme = scheme
138 else:
139 self.scheme = TaskScheduler.scheme
140
140
141 self.session = ss.StreamSession(username="TaskScheduler")
141 self.session = ss.StreamSession(username="TaskScheduler")
142
143 self.dependencies = {}
142 self.dependencies = {}
144 self.depending = {}
143 self.depending = {}
145 self.completed = {}
144 self.completed = {}
146 self.pending = {}
145 self.pending = {}
147 self.all_done = set()
146 self.all_done = set()
148 self.blacklist = {}
147 self.blacklist = {}
149
148
150 self.targets = []
149 self.targets = []
151 self.loads = []
150 self.loads = []
152
151
153 engine_stream.on_recv(self.dispatch_result, copy=False)
152 self.engine_stream.on_recv(self.dispatch_result, copy=False)
154 self._notification_handlers = dict(
153 self._notification_handlers = dict(
155 registration_notification = self._register_engine,
154 registration_notification = self._register_engine,
156 unregistration_notification = self._unregister_engine
155 unregistration_notification = self._unregister_engine
157 )
156 )
158 self.notifier_stream.on_recv(self.dispatch_notification)
157 self.notifier_stream.on_recv(self.dispatch_notification)
158 logger.info("Scheduler started...%r"%self)
159
159
160 def resume_receiving(self):
160 def resume_receiving(self):
161 """Resume accepting jobs."""
161 """Resume accepting jobs."""
162 self.client_stream.on_recv(self.dispatch_submission, copy=False)
162 self.client_stream.on_recv(self.dispatch_submission, copy=False)
163
163
164 def stop_receiving(self):
164 def stop_receiving(self):
165 """Stop accepting jobs while there are no engines.
165 """Stop accepting jobs while there are no engines.
166 Leave them in the ZMQ queue."""
166 Leave them in the ZMQ queue."""
167 self.client_stream.on_recv(None)
167 self.client_stream.on_recv(None)
168
168
169 #-----------------------------------------------------------------------
169 #-----------------------------------------------------------------------
170 # [Un]Registration Handling
170 # [Un]Registration Handling
171 #-----------------------------------------------------------------------
171 #-----------------------------------------------------------------------
172
172
173 def dispatch_notification(self, msg):
173 def dispatch_notification(self, msg):
174 """dispatch register/unregister events."""
174 """dispatch register/unregister events."""
175 idents,msg = self.session.feed_identities(msg)
175 idents,msg = self.session.feed_identities(msg)
176 msg = self.session.unpack_message(msg)
176 msg = self.session.unpack_message(msg)
177 msg_type = msg['msg_type']
177 msg_type = msg['msg_type']
178 handler = self._notification_handlers.get(msg_type, None)
178 handler = self._notification_handlers.get(msg_type, None)
179 if handler is None:
179 if handler is None:
180 raise Exception("Unhandled message type: %s"%msg_type)
180 raise Exception("Unhandled message type: %s"%msg_type)
181 else:
181 else:
182 try:
182 try:
183 handler(str(msg['content']['queue']))
183 handler(str(msg['content']['queue']))
184 except KeyError:
184 except KeyError:
185 logger.error("task::Invalid notification msg: %s"%msg)
185 logger.error("task::Invalid notification msg: %s"%msg)
186
186 @logged
187 @logged
187 def _register_engine(self, uid):
188 def _register_engine(self, uid):
188 """New engine with ident `uid` became available."""
189 """New engine with ident `uid` became available."""
189 # head of the line:
190 # head of the line:
190 self.targets.insert(0,uid)
191 self.targets.insert(0,uid)
191 self.loads.insert(0,0)
192 self.loads.insert(0,0)
192 # initialize sets
193 # initialize sets
193 self.completed[uid] = set()
194 self.completed[uid] = set()
194 self.pending[uid] = {}
195 self.pending[uid] = {}
195 if len(self.targets) == 1:
196 if len(self.targets) == 1:
196 self.resume_receiving()
197 self.resume_receiving()
197
198
198 def _unregister_engine(self, uid):
199 def _unregister_engine(self, uid):
199 """Existing engine with ident `uid` became unavailable."""
200 """Existing engine with ident `uid` became unavailable."""
200 if len(self.targets) == 1:
201 if len(self.targets) == 1:
201 # this was our only engine
202 # this was our only engine
202 self.stop_receiving()
203 self.stop_receiving()
203
204
204 # handle any potentially finished tasks:
205 # handle any potentially finished tasks:
205 self.engine_stream.flush()
206 self.engine_stream.flush()
206
207
207 self.completed.pop(uid)
208 self.completed.pop(uid)
208 lost = self.pending.pop(uid)
209 lost = self.pending.pop(uid)
209
210
210 idx = self.targets.index(uid)
211 idx = self.targets.index(uid)
211 self.targets.pop(idx)
212 self.targets.pop(idx)
212 self.loads.pop(idx)
213 self.loads.pop(idx)
213
214
214 self.handle_stranded_tasks(lost)
215 self.handle_stranded_tasks(lost)
215
216
216 def handle_stranded_tasks(self, lost):
217 def handle_stranded_tasks(self, lost):
217 """Deal with jobs resident in an engine that died."""
218 """Deal with jobs resident in an engine that died."""
218 # TODO: resubmit the tasks?
219 # TODO: resubmit the tasks?
219 for msg_id in lost:
220 for msg_id in lost:
220 pass
221 pass
221
222
222
223
223 #-----------------------------------------------------------------------
224 #-----------------------------------------------------------------------
224 # Job Submission
225 # Job Submission
225 #-----------------------------------------------------------------------
226 #-----------------------------------------------------------------------
226 @logged
227 @logged
227 def dispatch_submission(self, raw_msg):
228 def dispatch_submission(self, raw_msg):
228 """Dispatch job submission to appropriate handlers."""
229 """Dispatch job submission to appropriate handlers."""
229 # ensure targets up to date:
230 # ensure targets up to date:
230 self.notifier_stream.flush()
231 self.notifier_stream.flush()
231 try:
232 try:
232 idents, msg = self.session.feed_identities(raw_msg, copy=False)
233 idents, msg = self.session.feed_identities(raw_msg, copy=False)
233 except Exception as e:
234 except Exception as e:
234 logger.error("task::Invaid msg: %s"%msg)
235 logger.error("task::Invaid msg: %s"%msg)
235 return
236 return
236
237
237 # send to monitor
238 # send to monitor
238 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
239 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
239
240
240 msg = self.session.unpack_message(msg, content=False, copy=False)
241 msg = self.session.unpack_message(msg, content=False, copy=False)
241 header = msg['header']
242 header = msg['header']
242 msg_id = header['msg_id']
243 msg_id = header['msg_id']
243
244
244 # time dependencies
245 # time dependencies
245 after = Dependency(header.get('after', []))
246 after = Dependency(header.get('after', []))
246 if after.mode == 'all':
247 if after.mode == 'all':
247 after.difference_update(self.all_done)
248 after.difference_update(self.all_done)
248 if after.check(self.all_done):
249 if after.check(self.all_done):
249 # recast as empty set, if `after` already met,
250 # recast as empty set, if `after` already met,
250 # to prevent unnecessary set comparisons
251 # to prevent unnecessary set comparisons
251 after = Dependency([])
252 after = Dependency([])
252
253
253 # location dependencies
254 # location dependencies
254 follow = Dependency(header.get('follow', []))
255 follow = Dependency(header.get('follow', []))
255 if len(after) == 0:
256 if len(after) == 0:
256 # time deps already met, try to run
257 # time deps already met, try to run
257 if not self.maybe_run(msg_id, raw_msg, follow):
258 if not self.maybe_run(msg_id, raw_msg, follow):
258 # can't run yet
259 # can't run yet
259 self.save_unmet(msg_id, raw_msg, after, follow)
260 self.save_unmet(msg_id, raw_msg, after, follow)
260 else:
261 else:
261 self.save_unmet(msg_id, raw_msg, after, follow)
262 self.save_unmet(msg_id, raw_msg, after, follow)
262
263
263 @logged
264 @logged
264 def maybe_run(self, msg_id, raw_msg, follow=None):
265 def maybe_run(self, msg_id, raw_msg, follow=None):
265 """check location dependencies, and run if they are met."""
266 """check location dependencies, and run if they are met."""
266
267
267 if follow:
268 if follow:
268 def can_run(idx):
269 def can_run(idx):
269 target = self.targets[idx]
270 target = self.targets[idx]
270 return target not in self.blacklist.get(msg_id, []) and\
271 return target not in self.blacklist.get(msg_id, []) and\
271 follow.check(self.completed[target])
272 follow.check(self.completed[target])
272
273
273 indices = filter(can_run, range(len(self.targets)))
274 indices = filter(can_run, range(len(self.targets)))
274 if not indices:
275 if not indices:
275 return False
276 return False
276 else:
277 else:
277 indices = None
278 indices = None
278
279
279 self.submit_task(msg_id, raw_msg, indices)
280 self.submit_task(msg_id, raw_msg, indices)
280 return True
281 return True
281
282
282 @logged
283 @logged
283 def save_unmet(self, msg_id, msg, after, follow):
284 def save_unmet(self, msg_id, msg, after, follow):
284 """Save a message for later submission when its dependencies are met."""
285 """Save a message for later submission when its dependencies are met."""
285 self.depending[msg_id] = (msg_id,msg,after,follow)
286 self.depending[msg_id] = (msg_id,msg,after,follow)
286 # track the ids in both follow/after, but not those already completed
287 # track the ids in both follow/after, but not those already completed
287 for dep_id in after.union(follow).difference(self.all_done):
288 for dep_id in after.union(follow).difference(self.all_done):
288 if dep_id not in self.dependencies:
289 if dep_id not in self.dependencies:
289 self.dependencies[dep_id] = set()
290 self.dependencies[dep_id] = set()
290 self.dependencies[dep_id].add(msg_id)
291 self.dependencies[dep_id].add(msg_id)
291
292
292 @logged
293 @logged
293 def submit_task(self, msg_id, msg, follow=None, indices=None):
294 def submit_task(self, msg_id, msg, follow=None, indices=None):
294 """Submit a task to any of a subset of our targets."""
295 """Submit a task to any of a subset of our targets."""
295 if indices:
296 if indices:
296 loads = [self.loads[i] for i in indices]
297 loads = [self.loads[i] for i in indices]
297 else:
298 else:
298 loads = self.loads
299 loads = self.loads
299 idx = self.scheme(loads)
300 idx = self.scheme(loads)
300 if indices:
301 if indices:
301 idx = indices[idx]
302 idx = indices[idx]
302 target = self.targets[idx]
303 target = self.targets[idx]
303 # print (target, map(str, msg[:3]))
304 # print (target, map(str, msg[:3]))
304 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
305 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
305 self.engine_stream.send_multipart(msg, copy=False)
306 self.engine_stream.send_multipart(msg, copy=False)
306 self.add_job(idx)
307 self.add_job(idx)
307 self.pending[target][msg_id] = (msg, follow)
308 self.pending[target][msg_id] = (msg, follow)
308 content = dict(msg_id=msg_id, engine_id=target)
309 content = dict(msg_id=msg_id, engine_id=target)
309 self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask')
310 self.session.send(self.mon_stream, 'task_destination', content=content,
311 ident=['tracktask',self.session.session])
310
312
311 #-----------------------------------------------------------------------
313 #-----------------------------------------------------------------------
312 # Result Handling
314 # Result Handling
313 #-----------------------------------------------------------------------
315 #-----------------------------------------------------------------------
314 @logged
316 @logged
315 def dispatch_result(self, raw_msg):
317 def dispatch_result(self, raw_msg):
316 try:
318 try:
317 idents,msg = self.session.feed_identities(raw_msg, copy=False)
319 idents,msg = self.session.feed_identities(raw_msg, copy=False)
318 except Exception as e:
320 except Exception as e:
319 logger.error("task::Invaid result: %s"%msg)
321 logger.error("task::Invaid result: %s"%msg)
320 return
322 return
321 msg = self.session.unpack_message(msg, content=False, copy=False)
323 msg = self.session.unpack_message(msg, content=False, copy=False)
322 header = msg['header']
324 header = msg['header']
323 if header.get('dependencies_met', True):
325 if header.get('dependencies_met', True):
324 self.handle_result_success(idents, msg['parent_header'], raw_msg)
326 self.handle_result_success(idents, msg['parent_header'], raw_msg)
325 # send to monitor
327 # send to monitor
326 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
328 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
327 else:
329 else:
328 self.handle_unmet_dependency(idents, msg['parent_header'])
330 self.handle_unmet_dependency(idents, msg['parent_header'])
329
331
330 @logged
332 @logged
331 def handle_result_success(self, idents, parent, raw_msg):
333 def handle_result_success(self, idents, parent, raw_msg):
332 # first, relay result to client
334 # first, relay result to client
333 engine = idents[0]
335 engine = idents[0]
334 client = idents[1]
336 client = idents[1]
335 # swap_ids for XREP-XREP mirror
337 # swap_ids for XREP-XREP mirror
336 raw_msg[:2] = [client,engine]
338 raw_msg[:2] = [client,engine]
337 # print (map(str, raw_msg[:4]))
339 # print (map(str, raw_msg[:4]))
338 self.client_stream.send_multipart(raw_msg, copy=False)
340 self.client_stream.send_multipart(raw_msg, copy=False)
339 # now, update our data structures
341 # now, update our data structures
340 msg_id = parent['msg_id']
342 msg_id = parent['msg_id']
341 self.pending[engine].pop(msg_id)
343 self.pending[engine].pop(msg_id)
342 self.completed[engine].add(msg_id)
344 self.completed[engine].add(msg_id)
343 self.all_done.add(msg_id)
345 self.all_done.add(msg_id)
344
346
345 self.update_dependencies(msg_id)
347 self.update_dependencies(msg_id)
346
348
347 @logged
349 @logged
348 def handle_unmet_dependency(self, idents, parent):
350 def handle_unmet_dependency(self, idents, parent):
349 engine = idents[0]
351 engine = idents[0]
350 msg_id = parent['msg_id']
352 msg_id = parent['msg_id']
351 if msg_id not in self.blacklist:
353 if msg_id not in self.blacklist:
352 self.blacklist[msg_id] = set()
354 self.blacklist[msg_id] = set()
353 self.blacklist[msg_id].add(engine)
355 self.blacklist[msg_id].add(engine)
354 raw_msg,follow = self.pending[engine].pop(msg_id)
356 raw_msg,follow = self.pending[engine].pop(msg_id)
355 if not self.maybe_run(msg_id, raw_msg, follow):
357 if not self.maybe_run(msg_id, raw_msg, follow):
356 # resubmit failed, put it back in our dependency tree
358 # resubmit failed, put it back in our dependency tree
357 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
359 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
358 pass
360 pass
359 @logged
361 @logged
360 def update_dependencies(self, dep_id):
362 def update_dependencies(self, dep_id):
361 """dep_id just finished. Update our dependency
363 """dep_id just finished. Update our dependency
362 table and submit any jobs that just became runable."""
364 table and submit any jobs that just became runable."""
363
365
364 if dep_id not in self.dependencies:
366 if dep_id not in self.dependencies:
365 return
367 return
366 jobs = self.dependencies.pop(dep_id)
368 jobs = self.dependencies.pop(dep_id)
367 for job in jobs:
369 for job in jobs:
368 msg_id, raw_msg, after, follow = self.depending[job]
370 msg_id, raw_msg, after, follow = self.depending[job]
369 if dep_id in after:
371 if dep_id in after:
370 after.remove(dep_id)
372 after.remove(dep_id)
371 if not after: # time deps met, maybe run
373 if not after: # time deps met, maybe run
372 if self.maybe_run(msg_id, raw_msg, follow):
374 if self.maybe_run(msg_id, raw_msg, follow):
373 self.depending.pop(job)
375 self.depending.pop(job)
374 for mid in follow:
376 for mid in follow:
375 if mid in self.dependencies:
377 if mid in self.dependencies:
376 self.dependencies[mid].remove(msg_id)
378 self.dependencies[mid].remove(msg_id)
377
379
378 #----------------------------------------------------------------------
380 #----------------------------------------------------------------------
379 # methods to be overridden by subclasses
381 # methods to be overridden by subclasses
380 #----------------------------------------------------------------------
382 #----------------------------------------------------------------------
381
383
382 def add_job(self, idx):
384 def add_job(self, idx):
383 """Called after self.targets[idx] just got the job with header.
385 """Called after self.targets[idx] just got the job with header.
384 Override with subclasses. The default ordering is simple LRU.
386 Override with subclasses. The default ordering is simple LRU.
385 The default loads are the number of outstanding jobs."""
387 The default loads are the number of outstanding jobs."""
386 self.loads[idx] += 1
388 self.loads[idx] += 1
387 for lis in (self.targets, self.loads):
389 for lis in (self.targets, self.loads):
388 lis.append(lis.pop(idx))
390 lis.append(lis.pop(idx))
389
391
390
392
391 def finish_job(self, idx):
393 def finish_job(self, idx):
392 """Called after self.targets[idx] just finished a job.
394 """Called after self.targets[idx] just finished a job.
393 Override with subclasses."""
395 Override with subclasses."""
394 self.loads[idx] -= 1
396 self.loads[idx] -= 1
395
397
396
398
397
399
398 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):
400 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
399 from zmq.eventloop import ioloop
401 from zmq.eventloop import ioloop
400 from zmq.eventloop.zmqstream import ZMQStream
402 from zmq.eventloop.zmqstream import ZMQStream
401
403
402 ctx = zmq.Context()
404 ctx = zmq.Context()
403 loop = ioloop.IOLoop()
405 loop = ioloop.IOLoop()
404
406
405 scheme = globals().get(scheme)
407 scheme = globals().get(scheme)
406
408
407 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
409 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
408 ins.bind(in_addr)
410 ins.bind(in_addr)
409 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
411 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
410 outs.bind(out_addr)
412 outs.bind(out_addr)
411 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
413 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
412 mons.connect(mon_addr)
414 mons.connect(mon_addr)
413 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
415 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
414 nots.setsockopt(zmq.SUBSCRIBE, '')
416 nots.setsockopt(zmq.SUBSCRIBE, '')
415 nots.connect(not_addr)
417 nots.connect(not_addr)
416
418
417 scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop)
419 # setup logging
420 if log_addr:
421 connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel)
422 else:
423 local_logger(loglevel)
424
425 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
426 mon_stream=mons,notifier_stream=nots,
427 scheme=scheme,io_loop=loop)
418
428
419 loop.start()
429 loop.start()
420
430
421
431
422 if __name__ == '__main__':
432 if __name__ == '__main__':
423 iface = 'tcp://127.0.0.1:%i'
433 iface = 'tcp://127.0.0.1:%i'
424 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
434 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
@@ -1,459 +1,461 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
4 """
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Imports
7 # Imports
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9
9
10 # Standard library imports.
10 # Standard library imports.
11 from __future__ import print_function
11 from __future__ import print_function
12 import __builtin__
12 import __builtin__
13 from code import CommandCompiler
13 from code import CommandCompiler
14 import os
14 import os
15 import sys
15 import sys
16 import time
16 import time
17 import traceback
17 import traceback
18 import logging
18 from datetime import datetime
19 from datetime import datetime
19 from signal import SIGTERM, SIGKILL
20 from signal import SIGTERM, SIGKILL
20 from pprint import pprint
21 from pprint import pprint
21
22
22 # System library imports.
23 # System library imports.
23 import zmq
24 import zmq
24 from zmq.eventloop import ioloop, zmqstream
25 from zmq.eventloop import ioloop, zmqstream
25
26
26 # Local imports.
27 # Local imports.
27 from IPython.core import ultratb
28 from IPython.core import ultratb
28 from IPython.utils.traitlets import HasTraits, Instance, List
29 from IPython.utils.traitlets import HasTraits, Instance, List, Int
29 from IPython.zmq.completer import KernelCompleter
30 from IPython.zmq.completer import KernelCompleter
30 from IPython.zmq.log import logger # a Logger object
31 from IPython.zmq.iostream import OutStream
31 from IPython.zmq.iostream import OutStream
32 from IPython.zmq.displayhook import DisplayHook
32 from IPython.zmq.displayhook import DisplayHook
33
33
34
34
35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
36 unpack_apply_message, ISO8601, wrap_exception
36 unpack_apply_message, ISO8601, wrap_exception
37 from dependency import UnmetDependency
37 from dependency import UnmetDependency
38 import heartmonitor
38 import heartmonitor
39 from client import Client
39 from client import Client
40
40
41 logger = logging.getLogger()
42
41 def printer(*args):
43 def printer(*args):
42 pprint(args, stream=sys.__stdout__)
44 pprint(args, stream=sys.__stdout__)
43
45
44 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
45 # Main kernel class
47 # Main kernel class
46 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
47
49
48 class Kernel(HasTraits):
50 class Kernel(HasTraits):
49
51
50 #---------------------------------------------------------------------------
52 #---------------------------------------------------------------------------
51 # Kernel interface
53 # Kernel interface
52 #---------------------------------------------------------------------------
54 #---------------------------------------------------------------------------
53
55
56 id = Int(-1)
54 session = Instance(StreamSession)
57 session = Instance(StreamSession)
55 shell_streams = Instance(list)
58 shell_streams = List()
56 control_stream = Instance(zmqstream.ZMQStream)
59 control_stream = Instance(zmqstream.ZMQStream)
57 task_stream = Instance(zmqstream.ZMQStream)
60 task_stream = Instance(zmqstream.ZMQStream)
58 iopub_stream = Instance(zmqstream.ZMQStream)
61 iopub_stream = Instance(zmqstream.ZMQStream)
59 client = Instance(Client)
62 client = Instance(Client)
60 loop = Instance(ioloop.IOLoop)
63 loop = Instance(ioloop.IOLoop)
61
64
62 def __init__(self, **kwargs):
65 def __init__(self, **kwargs):
63 super(Kernel, self).__init__(**kwargs)
66 super(Kernel, self).__init__(**kwargs)
64 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
67 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
65 self.prefix = 'engine.%s'%self.identity
68 self.prefix = 'engine.%s'%self.id
69 logger.root_topic = self.prefix
66 self.user_ns = {}
70 self.user_ns = {}
67 self.history = []
71 self.history = []
68 self.compiler = CommandCompiler()
72 self.compiler = CommandCompiler()
69 self.completer = KernelCompleter(self.user_ns)
73 self.completer = KernelCompleter(self.user_ns)
70 self.aborted = set()
74 self.aborted = set()
71
75
72 # Build dict of handlers for message types
76 # Build dict of handlers for message types
73 self.shell_handlers = {}
77 self.shell_handlers = {}
74 self.control_handlers = {}
78 self.control_handlers = {}
75 for msg_type in ['execute_request', 'complete_request', 'apply_request',
79 for msg_type in ['execute_request', 'complete_request', 'apply_request',
76 'clear_request']:
80 'clear_request']:
77 self.shell_handlers[msg_type] = getattr(self, msg_type)
81 self.shell_handlers[msg_type] = getattr(self, msg_type)
78
82
79 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
83 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
80 self.control_handlers[msg_type] = getattr(self, msg_type)
84 self.control_handlers[msg_type] = getattr(self, msg_type)
81
85
82
86
83 def _wrap_exception(self, method=None):
87 def _wrap_exception(self, method=None):
84 e_info = dict(engineid=self.identity, method=method)
88 e_info = dict(engineid=self.identity, method=method)
85 content=wrap_exception(e_info)
89 content=wrap_exception(e_info)
86 return content
90 return content
87
91
88 #-------------------- control handlers -----------------------------
92 #-------------------- control handlers -----------------------------
89 def abort_queues(self):
93 def abort_queues(self):
90 for stream in self.shell_streams:
94 for stream in self.shell_streams:
91 if stream:
95 if stream:
92 self.abort_queue(stream)
96 self.abort_queue(stream)
93
97
94 def abort_queue(self, stream):
98 def abort_queue(self, stream):
95 while True:
99 while True:
96 try:
100 try:
97 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
101 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
98 except zmq.ZMQError as e:
102 except zmq.ZMQError as e:
99 if e.errno == zmq.EAGAIN:
103 if e.errno == zmq.EAGAIN:
100 break
104 break
101 else:
105 else:
102 return
106 return
103 else:
107 else:
104 if msg is None:
108 if msg is None:
105 return
109 return
106 else:
110 else:
107 idents,msg = msg
111 idents,msg = msg
108
112
109 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
113 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
110 # msg = self.reply_socket.recv_json()
114 # msg = self.reply_socket.recv_json()
111 print ("Aborting:", file=sys.__stdout__)
115 logger.info("Aborting:")
112 print (Message(msg), file=sys.__stdout__)
116 logger.info(str(msg))
113 msg_type = msg['msg_type']
117 msg_type = msg['msg_type']
114 reply_type = msg_type.split('_')[0] + '_reply'
118 reply_type = msg_type.split('_')[0] + '_reply'
115 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
119 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
116 # self.reply_socket.send(ident,zmq.SNDMORE)
120 # self.reply_socket.send(ident,zmq.SNDMORE)
117 # self.reply_socket.send_json(reply_msg)
121 # self.reply_socket.send_json(reply_msg)
118 reply_msg = self.session.send(stream, reply_type,
122 reply_msg = self.session.send(stream, reply_type,
119 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
123 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
120 print(Message(reply_msg), file=sys.__stdout__)
124 logger.debug(str(reply_msg))
121 # We need to wait a bit for requests to come in. This can probably
125 # We need to wait a bit for requests to come in. This can probably
122 # be set shorter for true asynchronous clients.
126 # be set shorter for true asynchronous clients.
123 time.sleep(0.05)
127 time.sleep(0.05)
124
128
125 def abort_request(self, stream, ident, parent):
129 def abort_request(self, stream, ident, parent):
126 """abort a specifig msg by id"""
130 """abort a specifig msg by id"""
127 msg_ids = parent['content'].get('msg_ids', None)
131 msg_ids = parent['content'].get('msg_ids', None)
128 if isinstance(msg_ids, basestring):
132 if isinstance(msg_ids, basestring):
129 msg_ids = [msg_ids]
133 msg_ids = [msg_ids]
130 if not msg_ids:
134 if not msg_ids:
131 self.abort_queues()
135 self.abort_queues()
132 for mid in msg_ids:
136 for mid in msg_ids:
133 self.aborted.add(str(mid))
137 self.aborted.add(str(mid))
134
138
135 content = dict(status='ok')
139 content = dict(status='ok')
136 reply_msg = self.session.send(stream, 'abort_reply', content=content,
140 reply_msg = self.session.send(stream, 'abort_reply', content=content,
137 parent=parent, ident=ident)[0]
141 parent=parent, ident=ident)[0]
138 print(Message(reply_msg), file=sys.__stdout__)
142 logger(Message(reply_msg), file=sys.__stdout__)
139
143
140 def shutdown_request(self, stream, ident, parent):
144 def shutdown_request(self, stream, ident, parent):
141 """kill ourself. This should really be handled in an external process"""
145 """kill ourself. This should really be handled in an external process"""
142 try:
146 try:
143 self.abort_queues()
147 self.abort_queues()
144 except:
148 except:
145 content = self._wrap_exception('shutdown')
149 content = self._wrap_exception('shutdown')
146 else:
150 else:
147 content = dict(parent['content'])
151 content = dict(parent['content'])
148 content['status'] = 'ok'
152 content['status'] = 'ok'
149 msg = self.session.send(stream, 'shutdown_reply',
153 msg = self.session.send(stream, 'shutdown_reply',
150 content=content, parent=parent, ident=ident)
154 content=content, parent=parent, ident=ident)
151 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
155 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
152 # content, parent, ident)
156 # content, parent, ident)
153 # print >> sys.__stdout__, msg
157 # print >> sys.__stdout__, msg
154 # time.sleep(0.2)
158 # time.sleep(0.2)
155 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
159 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
156 dc.start()
160 dc.start()
157
161
158 def dispatch_control(self, msg):
162 def dispatch_control(self, msg):
159 idents,msg = self.session.feed_identities(msg, copy=False)
163 idents,msg = self.session.feed_identities(msg, copy=False)
160 try:
164 try:
161 msg = self.session.unpack_message(msg, content=True, copy=False)
165 msg = self.session.unpack_message(msg, content=True, copy=False)
162 except:
166 except:
163 logger.error("Invalid Message", exc_info=True)
167 logger.error("Invalid Message", exc_info=True)
164 return
168 return
165
169
166 header = msg['header']
170 header = msg['header']
167 msg_id = header['msg_id']
171 msg_id = header['msg_id']
168
172
169 handler = self.control_handlers.get(msg['msg_type'], None)
173 handler = self.control_handlers.get(msg['msg_type'], None)
170 if handler is None:
174 if handler is None:
171 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
175 logger.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
172 else:
176 else:
173 handler(self.control_stream, idents, msg)
177 handler(self.control_stream, idents, msg)
174
178
175
179
176 #-------------------- queue helpers ------------------------------
180 #-------------------- queue helpers ------------------------------
177
181
178 def check_dependencies(self, dependencies):
182 def check_dependencies(self, dependencies):
179 if not dependencies:
183 if not dependencies:
180 return True
184 return True
181 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
185 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
182 anyorall = dependencies[0]
186 anyorall = dependencies[0]
183 dependencies = dependencies[1]
187 dependencies = dependencies[1]
184 else:
188 else:
185 anyorall = 'all'
189 anyorall = 'all'
186 results = self.client.get_results(dependencies,status_only=True)
190 results = self.client.get_results(dependencies,status_only=True)
187 if results['status'] != 'ok':
191 if results['status'] != 'ok':
188 return False
192 return False
189
193
190 if anyorall == 'any':
194 if anyorall == 'any':
191 if not results['completed']:
195 if not results['completed']:
192 return False
196 return False
193 else:
197 else:
194 if results['pending']:
198 if results['pending']:
195 return False
199 return False
196
200
197 return True
201 return True
198
202
199 def check_aborted(self, msg_id):
203 def check_aborted(self, msg_id):
200 return msg_id in self.aborted
204 return msg_id in self.aborted
201
205
202 #-------------------- queue handlers -----------------------------
206 #-------------------- queue handlers -----------------------------
203
207
204 def clear_request(self, stream, idents, parent):
208 def clear_request(self, stream, idents, parent):
205 """Clear our namespace."""
209 """Clear our namespace."""
206 self.user_ns = {}
210 self.user_ns = {}
207 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
211 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
208 content = dict(status='ok'))
212 content = dict(status='ok'))
209
213
210 def execute_request(self, stream, ident, parent):
214 def execute_request(self, stream, ident, parent):
211 try:
215 try:
212 code = parent[u'content'][u'code']
216 code = parent[u'content'][u'code']
213 except:
217 except:
214 print("Got bad msg: ", file=sys.__stderr__)
218 logger.error("Got bad msg: %s"%parent, exc_info=True)
215 print(Message(parent), file=sys.__stderr__)
216 return
219 return
217 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
220 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
218 # self.iopub_stream.send(pyin_msg)
221 # self.iopub_stream.send(pyin_msg)
219 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
222 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
220 ident=self.identity+'.pyin')
223 ident='%s.pyin'%self.prefix)
221 started = datetime.now().strftime(ISO8601)
224 started = datetime.now().strftime(ISO8601)
222 try:
225 try:
223 comp_code = self.compiler(code, '<zmq-kernel>')
226 comp_code = self.compiler(code, '<zmq-kernel>')
224 # allow for not overriding displayhook
227 # allow for not overriding displayhook
225 if hasattr(sys.displayhook, 'set_parent'):
228 if hasattr(sys.displayhook, 'set_parent'):
226 sys.displayhook.set_parent(parent)
229 sys.displayhook.set_parent(parent)
227 sys.stdout.set_parent(parent)
230 sys.stdout.set_parent(parent)
228 sys.stderr.set_parent(parent)
231 sys.stderr.set_parent(parent)
229 exec comp_code in self.user_ns, self.user_ns
232 exec comp_code in self.user_ns, self.user_ns
230 except:
233 except:
231 exc_content = self._wrap_exception('execute')
234 exc_content = self._wrap_exception('execute')
232 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
235 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
233 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
236 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
234 ident=self.identity+'.pyerr')
237 ident='%s.pyerr'%self.prefix)
235 reply_content = exc_content
238 reply_content = exc_content
236 else:
239 else:
237 reply_content = {'status' : 'ok'}
240 reply_content = {'status' : 'ok'}
238 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
241 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
239 # self.reply_socket.send(ident, zmq.SNDMORE)
242 # self.reply_socket.send(ident, zmq.SNDMORE)
240 # self.reply_socket.send_json(reply_msg)
243 # self.reply_socket.send_json(reply_msg)
241 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
244 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
242 ident=ident, subheader = dict(started=started))
245 ident=ident, subheader = dict(started=started))
243 print(Message(reply_msg), file=sys.__stdout__)
246 logger.debug(str(reply_msg))
244 if reply_msg['content']['status'] == u'error':
247 if reply_msg['content']['status'] == u'error':
245 self.abort_queues()
248 self.abort_queues()
246
249
247 def complete_request(self, stream, ident, parent):
250 def complete_request(self, stream, ident, parent):
248 matches = {'matches' : self.complete(parent),
251 matches = {'matches' : self.complete(parent),
249 'status' : 'ok'}
252 'status' : 'ok'}
250 completion_msg = self.session.send(stream, 'complete_reply',
253 completion_msg = self.session.send(stream, 'complete_reply',
251 matches, parent, ident)
254 matches, parent, ident)
252 # print >> sys.__stdout__, completion_msg
255 # print >> sys.__stdout__, completion_msg
253
256
254 def complete(self, msg):
257 def complete(self, msg):
255 return self.completer.complete(msg.content.line, msg.content.text)
258 return self.completer.complete(msg.content.line, msg.content.text)
256
259
257 def apply_request(self, stream, ident, parent):
260 def apply_request(self, stream, ident, parent):
258 # print (parent)
261 # print (parent)
259 try:
262 try:
260 content = parent[u'content']
263 content = parent[u'content']
261 bufs = parent[u'buffers']
264 bufs = parent[u'buffers']
262 msg_id = parent['header']['msg_id']
265 msg_id = parent['header']['msg_id']
263 bound = content.get('bound', False)
266 bound = content.get('bound', False)
264 except:
267 except:
265 print("Got bad msg: ", file=sys.__stderr__)
268 logger.error("Got bad msg: %s"%parent, exc_info=True)
266 print(Message(parent), file=sys.__stderr__)
267 return
269 return
268 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
270 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
269 # self.iopub_stream.send(pyin_msg)
271 # self.iopub_stream.send(pyin_msg)
270 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
272 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
271 sub = {'dependencies_met' : True, 'engine' : self.identity,
273 sub = {'dependencies_met' : True, 'engine' : self.identity,
272 'started': datetime.now().strftime(ISO8601)}
274 'started': datetime.now().strftime(ISO8601)}
273 try:
275 try:
274 # allow for not overriding displayhook
276 # allow for not overriding displayhook
275 if hasattr(sys.displayhook, 'set_parent'):
277 if hasattr(sys.displayhook, 'set_parent'):
276 sys.displayhook.set_parent(parent)
278 sys.displayhook.set_parent(parent)
277 sys.stdout.set_parent(parent)
279 sys.stdout.set_parent(parent)
278 sys.stderr.set_parent(parent)
280 sys.stderr.set_parent(parent)
279 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
281 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
280 if bound:
282 if bound:
281 working = self.user_ns
283 working = self.user_ns
282 suffix = str(msg_id).replace("-","")
284 suffix = str(msg_id).replace("-","")
283 prefix = "_"
285 prefix = "_"
284
286
285 else:
287 else:
286 working = dict()
288 working = dict()
287 suffix = prefix = "_" # prevent keyword collisions with lambda
289 suffix = prefix = "_" # prevent keyword collisions with lambda
288 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
290 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
289 # if f.fun
291 # if f.fun
290 if hasattr(f, 'func_name'):
292 if hasattr(f, 'func_name'):
291 fname = f.func_name
293 fname = f.func_name
292 else:
294 else:
293 fname = f.__name__
295 fname = f.__name__
294
296
295 fname = prefix+fname.strip('<>')+suffix
297 fname = prefix+fname.strip('<>')+suffix
296 argname = prefix+"args"+suffix
298 argname = prefix+"args"+suffix
297 kwargname = prefix+"kwargs"+suffix
299 kwargname = prefix+"kwargs"+suffix
298 resultname = prefix+"result"+suffix
300 resultname = prefix+"result"+suffix
299
301
300 ns = { fname : f, argname : args, kwargname : kwargs }
302 ns = { fname : f, argname : args, kwargname : kwargs }
301 # print ns
303 # print ns
302 working.update(ns)
304 working.update(ns)
303 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
305 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
304 exec code in working, working
306 exec code in working, working
305 result = working.get(resultname)
307 result = working.get(resultname)
306 # clear the namespace
308 # clear the namespace
307 if bound:
309 if bound:
308 for key in ns.iterkeys():
310 for key in ns.iterkeys():
309 self.user_ns.pop(key)
311 self.user_ns.pop(key)
310 else:
312 else:
311 del working
313 del working
312
314
313 packed_result,buf = serialize_object(result)
315 packed_result,buf = serialize_object(result)
314 result_buf = [packed_result]+buf
316 result_buf = [packed_result]+buf
315 except:
317 except:
316 exc_content = self._wrap_exception('apply')
318 exc_content = self._wrap_exception('apply')
317 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
319 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
318 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
320 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
319 ident=self.identity+'.pyerr')
321 ident='%s.pyerr'%self.prefix)
320 reply_content = exc_content
322 reply_content = exc_content
321 result_buf = []
323 result_buf = []
322
324
323 if exc_content['ename'] == UnmetDependency.__name__:
325 if exc_content['ename'] == UnmetDependency.__name__:
324 sub['dependencies_met'] = False
326 sub['dependencies_met'] = False
325 else:
327 else:
326 reply_content = {'status' : 'ok'}
328 reply_content = {'status' : 'ok'}
327 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
329 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
328 # self.reply_socket.send(ident, zmq.SNDMORE)
330 # self.reply_socket.send(ident, zmq.SNDMORE)
329 # self.reply_socket.send_json(reply_msg)
331 # self.reply_socket.send_json(reply_msg)
330 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
332 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
331 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
333 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
332 # print(Message(reply_msg), file=sys.__stdout__)
334 # print(Message(reply_msg), file=sys.__stdout__)
333 # if reply_msg['content']['status'] == u'error':
335 # if reply_msg['content']['status'] == u'error':
334 # self.abort_queues()
336 # self.abort_queues()
335
337
336 def dispatch_queue(self, stream, msg):
338 def dispatch_queue(self, stream, msg):
337 self.control_stream.flush()
339 self.control_stream.flush()
338 idents,msg = self.session.feed_identities(msg, copy=False)
340 idents,msg = self.session.feed_identities(msg, copy=False)
339 try:
341 try:
340 msg = self.session.unpack_message(msg, content=True, copy=False)
342 msg = self.session.unpack_message(msg, content=True, copy=False)
341 except:
343 except:
342 logger.error("Invalid Message", exc_info=True)
344 logger.error("Invalid Message", exc_info=True)
343 return
345 return
344
346
345
347
346 header = msg['header']
348 header = msg['header']
347 msg_id = header['msg_id']
349 msg_id = header['msg_id']
348 if self.check_aborted(msg_id):
350 if self.check_aborted(msg_id):
349 self.aborted.remove(msg_id)
351 self.aborted.remove(msg_id)
350 # is it safe to assume a msg_id will not be resubmitted?
352 # is it safe to assume a msg_id will not be resubmitted?
351 reply_type = msg['msg_type'].split('_')[0] + '_reply'
353 reply_type = msg['msg_type'].split('_')[0] + '_reply'
352 reply_msg = self.session.send(stream, reply_type,
354 reply_msg = self.session.send(stream, reply_type,
353 content={'status' : 'aborted'}, parent=msg, ident=idents)
355 content={'status' : 'aborted'}, parent=msg, ident=idents)
354 return
356 return
355 handler = self.shell_handlers.get(msg['msg_type'], None)
357 handler = self.shell_handlers.get(msg['msg_type'], None)
356 if handler is None:
358 if handler is None:
357 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
359 logger.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
358 else:
360 else:
359 handler(stream, idents, msg)
361 handler(stream, idents, msg)
360
362
361 def start(self):
363 def start(self):
362 #### stream mode:
364 #### stream mode:
363 if self.control_stream:
365 if self.control_stream:
364 self.control_stream.on_recv(self.dispatch_control, copy=False)
366 self.control_stream.on_recv(self.dispatch_control, copy=False)
365 self.control_stream.on_err(printer)
367 self.control_stream.on_err(printer)
366
368
367 def make_dispatcher(stream):
369 def make_dispatcher(stream):
368 def dispatcher(msg):
370 def dispatcher(msg):
369 return self.dispatch_queue(stream, msg)
371 return self.dispatch_queue(stream, msg)
370 return dispatcher
372 return dispatcher
371
373
372 for s in self.shell_streams:
374 for s in self.shell_streams:
373 s.on_recv(make_dispatcher(s), copy=False)
375 s.on_recv(make_dispatcher(s), copy=False)
374 s.on_err(printer)
376 s.on_err(printer)
375
377
376 if self.iopub_stream:
378 if self.iopub_stream:
377 self.iopub_stream.on_err(printer)
379 self.iopub_stream.on_err(printer)
378 # self.iopub_stream.on_send(printer)
380 # self.iopub_stream.on_send(printer)
379
381
380 #### while True mode:
382 #### while True mode:
381 # while True:
383 # while True:
382 # idle = True
384 # idle = True
383 # try:
385 # try:
384 # msg = self.shell_stream.socket.recv_multipart(
386 # msg = self.shell_stream.socket.recv_multipart(
385 # zmq.NOBLOCK, copy=False)
387 # zmq.NOBLOCK, copy=False)
386 # except zmq.ZMQError, e:
388 # except zmq.ZMQError, e:
387 # if e.errno != zmq.EAGAIN:
389 # if e.errno != zmq.EAGAIN:
388 # raise e
390 # raise e
389 # else:
391 # else:
390 # idle=False
392 # idle=False
391 # self.dispatch_queue(self.shell_stream, msg)
393 # self.dispatch_queue(self.shell_stream, msg)
392 #
394 #
393 # if not self.task_stream.empty():
395 # if not self.task_stream.empty():
394 # idle=False
396 # idle=False
395 # msg = self.task_stream.recv_multipart()
397 # msg = self.task_stream.recv_multipart()
396 # self.dispatch_queue(self.task_stream, msg)
398 # self.dispatch_queue(self.task_stream, msg)
397 # if idle:
399 # if idle:
398 # # don't busywait
400 # # don't busywait
399 # time.sleep(1e-3)
401 # time.sleep(1e-3)
400
402
401 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
403 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
402 client_addr=None, loop=None, context=None, key=None,
404 client_addr=None, loop=None, context=None, key=None,
403 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
405 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
404
406
405 # create loop, context, and session:
407 # create loop, context, and session:
406 if loop is None:
408 if loop is None:
407 loop = ioloop.IOLoop.instance()
409 loop = ioloop.IOLoop.instance()
408 if context is None:
410 if context is None:
409 context = zmq.Context()
411 context = zmq.Context()
410 c = context
412 c = context
411 session = StreamSession(key=key)
413 session = StreamSession(key=key)
412 # print (session.key)
414 # print (session.key)
413 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
415 # print (control_addr, shell_addrs, iopub_addr, hb_addrs)
414
416
415 # create Control Stream
417 # create Control Stream
416 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
418 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
417 control_stream.setsockopt(zmq.IDENTITY, identity)
419 control_stream.setsockopt(zmq.IDENTITY, identity)
418 control_stream.connect(control_addr)
420 control_stream.connect(control_addr)
419
421
420 # create Shell Streams (MUX, Task, etc.):
422 # create Shell Streams (MUX, Task, etc.):
421 shell_streams = []
423 shell_streams = []
422 for addr in shell_addrs:
424 for addr in shell_addrs:
423 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
425 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
424 stream.setsockopt(zmq.IDENTITY, identity)
426 stream.setsockopt(zmq.IDENTITY, identity)
425 stream.connect(addr)
427 stream.connect(addr)
426 shell_streams.append(stream)
428 shell_streams.append(stream)
427
429
428 # create iopub stream:
430 # create iopub stream:
429 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
431 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
430 iopub_stream.setsockopt(zmq.IDENTITY, identity)
432 iopub_stream.setsockopt(zmq.IDENTITY, identity)
431 iopub_stream.connect(iopub_addr)
433 iopub_stream.connect(iopub_addr)
432
434
433 # Redirect input streams and set a display hook.
435 # Redirect input streams and set a display hook.
434 if out_stream_factory:
436 if out_stream_factory:
435 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
437 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
436 sys.stdout.topic = identity+'.stdout'
438 sys.stdout.topic = 'engine.%i.stdout'%int_id
437 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
439 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
438 sys.stderr.topic = identity+'.stderr'
440 sys.stderr.topic = 'engine.%i.stderr'%int_id
439 if display_hook_factory:
441 if display_hook_factory:
440 sys.displayhook = display_hook_factory(session, iopub_stream)
442 sys.displayhook = display_hook_factory(session, iopub_stream)
441 sys.displayhook.topic = identity+'.pyout'
443 sys.displayhook.topic = 'engine.%i.pyout'%int_id
442
444
443
445
444 # launch heartbeat
446 # launch heartbeat
445 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
447 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
446 heart.start()
448 heart.start()
447
449
448 # create (optional) Client
450 # create (optional) Client
449 if client_addr:
451 if client_addr:
450 client = Client(client_addr, username=identity)
452 client = Client(client_addr, username=identity)
451 else:
453 else:
452 client = None
454 client = None
453
455
454 kernel = Kernel(session=session, control_stream=control_stream,
456 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
455 shell_streams=shell_streams, iopub_stream=iopub_stream,
457 shell_streams=shell_streams, iopub_stream=iopub_stream,
456 client=client, loop=loop)
458 client=client, loop=loop)
457 kernel.start()
459 kernel.start()
458 return loop, c, kernel
460 return loop, c, kernel
459
461
@@ -1,35 +1,79 b''
1 """some generic utilities"""
1 """some generic utilities"""
2 import re
2
3
3 class ReverseDict(dict):
4 class ReverseDict(dict):
4 """simple double-keyed subset of dict methods."""
5 """simple double-keyed subset of dict methods."""
5
6
6 def __init__(self, *args, **kwargs):
7 def __init__(self, *args, **kwargs):
7 dict.__init__(self, *args, **kwargs)
8 dict.__init__(self, *args, **kwargs)
8 self._reverse = dict()
9 self._reverse = dict()
9 for key, value in self.iteritems():
10 for key, value in self.iteritems():
10 self._reverse[value] = key
11 self._reverse[value] = key
11
12
12 def __getitem__(self, key):
13 def __getitem__(self, key):
13 try:
14 try:
14 return dict.__getitem__(self, key)
15 return dict.__getitem__(self, key)
15 except KeyError:
16 except KeyError:
16 return self._reverse[key]
17 return self._reverse[key]
17
18
18 def __setitem__(self, key, value):
19 def __setitem__(self, key, value):
19 if key in self._reverse:
20 if key in self._reverse:
20 raise KeyError("Can't have key %r on both sides!"%key)
21 raise KeyError("Can't have key %r on both sides!"%key)
21 dict.__setitem__(self, key, value)
22 dict.__setitem__(self, key, value)
22 self._reverse[value] = key
23 self._reverse[value] = key
23
24
24 def pop(self, key):
25 def pop(self, key):
25 value = dict.pop(self, key)
26 value = dict.pop(self, key)
26 self.d1.pop(value)
27 self.d1.pop(value)
27 return value
28 return value
28
29
29 def get(self, key, default=None):
30 def get(self, key, default=None):
30 try:
31 try:
31 return self[key]
32 return self[key]
32 except KeyError:
33 except KeyError:
33 return default
34 return default
34
35
35
36
37 def validate_url(url):
38 """validate a url for zeromq"""
39 if not isinstance(url, basestring):
40 raise TypeError("url must be a string, not %r"%type(url))
41 url = url.lower()
42
43 proto_addr = url.split('://')
44 assert len(proto_addr) == 2, 'Invalid url: %r'%url
45 proto, addr = proto_addr
46 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
47
48 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
49 # author: Remi Sabourin
50 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
51
52 if proto == 'tcp':
53 lis = addr.split(':')
54 assert len(lis) == 2, 'Invalid url: %r'%url
55 addr,s_port = lis
56 try:
57 port = int(s_port)
58 except ValueError:
59 raise AssertionError("Invalid port %r in url: %r"%(port, url))
60
61 assert pat.match(addr) is not None, 'Invalid url: %r'%url
62
63 else:
64 # only validate tcp urls currently
65 pass
66
67 return True
68
69
70 def validate_url_container(container):
71 """validate a potentially nested collection of urls."""
72 if isinstance(container, basestring):
73 url = container
74 return validate_url(url)
75 elif isinstance(container, dict):
76 container = container.itervalues()
77
78 for element in container:
79 validate_url_container(element) No newline at end of file
@@ -1,62 +1,70 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple log process that prints messages incoming from"""
2 """A simple log process that prints messages incoming from"""
3
3
4 #
4 #
5 # Copyright (c) 2010 Min Ragan-Kelley
5 # Copyright (c) 2010 Min Ragan-Kelley
6 #
6 #
7 # This file is part of pyzmq.
7 # This file is part of pyzmq.
8 #
8 #
9 # pyzmq is free software; you can redistribute it and/or modify it under
9 # pyzmq is free software; you can redistribute it and/or modify it under
10 # the terms of the Lesser GNU General Public License as published by
10 # the terms of the Lesser GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
12 # (at your option) any later version.
13 #
13 #
14 # pyzmq is distributed in the hope that it will be useful,
14 # pyzmq is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # Lesser GNU General Public License for more details.
17 # Lesser GNU General Public License for more details.
18 #
18 #
19 # You should have received a copy of the Lesser GNU General Public License
19 # You should have received a copy of the Lesser GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21
21
22 import sys
22 import zmq
23 import zmq
23 logport = 20202
24 logport = 20202
24 def main(topics, addrs):
25 def main(topics, addrs):
25
26
26 context = zmq.Context()
27 context = zmq.Context()
27 socket = context.socket(zmq.SUB)
28 socket = context.socket(zmq.SUB)
28 for topic in topics:
29 for topic in topics:
30 print "Subscribing to: %r"%topic
29 socket.setsockopt(zmq.SUBSCRIBE, topic)
31 socket.setsockopt(zmq.SUBSCRIBE, topic)
30 if addrs:
32 if addrs:
31 for addr in addrs:
33 for addr in addrs:
32 print "Connecting to: ", addr
34 print "Connecting to: ", addr
33 socket.connect(addr)
35 socket.connect(addr)
34 else:
36 else:
35 socket.bind('tcp://127.0.0.1:%i'%logport)
37 socket.bind('tcp://*:%i'%logport)
36
38
37 while True:
39 while True:
38 # topic = socket.recv()
40 # topic = socket.recv()
39 # print topic
41 # print topic
40 topic, msg = socket.recv_multipart()
42 # print 'tic'
41 # msg = socket.recv_pyobj()
43 raw = socket.recv_multipart()
42 print "%s | %s " % (topic, msg),
44 if len(raw) != 2:
45 print "!!! invalid log message: %s"%raw
46 else:
47 topic, msg = raw
48 # don't newline, since log messages always newline:
49 print "%s | %s" % (topic, msg),
50 sys.stdout.flush()
43
51
44 if __name__ == '__main__':
52 if __name__ == '__main__':
45 import sys
53 import sys
46 topics = []
54 topics = []
47 addrs = []
55 addrs = []
48 for arg in sys.argv[1:]:
56 for arg in sys.argv[1:]:
49 if '://' in arg:
57 if '://' in arg:
50 addrs.append(arg)
58 addrs.append(arg)
51 else:
59 else:
52 topics.append(arg)
60 topics.append(arg)
53 if not topics:
61 if not topics:
54 # default to everything
62 # default to everything
55 topics = ['']
63 topics = ['']
56 if len(addrs) < 1:
64 if len(addrs) < 1:
57 print "binding instead of connecting"
65 print "binding instead of connecting"
58 # addrs = ['tcp://127.0.0.1:%i'%p for p in range(logport,logport+10)]
66 # addrs = ['tcp://127.0.0.1:%i'%p for p in range(logport,logport+10)]
59 # print "usage: display.py <address> [ <topic> <address>...]"
67 # print "usage: display.py <address> [ <topic> <address>...]"
60 # raise SystemExit
68 # raise SystemExit
61
69
62 main(topics, addrs)
70 main(topics, addrs)
General Comments 0
You need to be logged in to leave comments. Login now