##// END OF EJS Templates
propagate iopub to clients
MinRK -
Show More
@@ -1,21 +1,23 b''
1 import __builtin__
1 import __builtin__
2
2
3 from session import extract_header
3 from session import extract_header
4
4
5 class DisplayHook(object):
5 class DisplayHook(object):
6
6
7 topic=None
8
7 def __init__(self, session, pub_socket):
9 def __init__(self, session, pub_socket):
8 self.session = session
10 self.session = session
9 self.pub_socket = pub_socket
11 self.pub_socket = pub_socket
10 self.parent_header = {}
12 self.parent_header = {}
11
13
12 def __call__(self, obj):
14 def __call__(self, obj):
13 if obj is None:
15 if obj is None:
14 return
16 return
15
17
16 __builtin__._ = obj
18 __builtin__._ = obj
17 msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)},
19 msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)},
18 parent=self.parent_header)
20 parent=self.parent_header, ident=self.topic)
19
21
20 def set_parent(self, parent):
22 def set_parent(self, parent):
21 self.parent_header = extract_header(parent) No newline at end of file
23 self.parent_header = extract_header(parent)
@@ -1,96 +1,97 b''
1 import logging
1 import logging
2 import sys
2 import sys
3 import time
3 import time
4 from cStringIO import StringIO
4 from cStringIO import StringIO
5
5
6 from session import extract_header, Message
6 from session import extract_header, Message
7
7
8 from IPython.utils import io
8 from IPython.utils import io
9
9
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Globals
11 # Globals
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 # Module-level logger
14 # Module-level logger
15 logger = logging.getLogger(__name__)
15 logger = logging.getLogger(__name__)
16
16
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18 # Stream classes
18 # Stream classes
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20
20
21 class OutStream(object):
21 class OutStream(object):
22 """A file like object that publishes the stream to a 0MQ PUB socket."""
22 """A file like object that publishes the stream to a 0MQ PUB socket."""
23
23
24 # The time interval between automatic flushes, in seconds.
24 # The time interval between automatic flushes, in seconds.
25 flush_interval = 0.05
25 flush_interval = 0.05
26
26 topic=None
27
27 def __init__(self, session, pub_socket, name):
28 def __init__(self, session, pub_socket, name):
28 self.session = session
29 self.session = session
29 self.pub_socket = pub_socket
30 self.pub_socket = pub_socket
30 self.name = name
31 self.name = name
31 self.parent_header = {}
32 self.parent_header = {}
32 self._new_buffer()
33 self._new_buffer()
33
34
34 def set_parent(self, parent):
35 def set_parent(self, parent):
35 self.parent_header = extract_header(parent)
36 self.parent_header = extract_header(parent)
36
37
37 def close(self):
38 def close(self):
38 self.pub_socket = None
39 self.pub_socket = None
39
40
40 def flush(self):
41 def flush(self):
41 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
42 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
42 if self.pub_socket is None:
43 if self.pub_socket is None:
43 raise ValueError(u'I/O operation on closed file')
44 raise ValueError(u'I/O operation on closed file')
44 else:
45 else:
45 data = self._buffer.getvalue()
46 data = self._buffer.getvalue()
46 if data:
47 if data:
47 # Make sure that we're handling unicode
48 # Make sure that we're handling unicode
48 if not isinstance(data, unicode):
49 if not isinstance(data, unicode):
49 enc = sys.stdin.encoding or sys.getdefaultencoding()
50 enc = sys.stdin.encoding or sys.getdefaultencoding()
50 data = data.decode(enc, 'replace')
51 data = data.decode(enc, 'replace')
51 content = {u'name':self.name, u'data':data}
52 content = {u'name':self.name, u'data':data}
52 msg = self.session.send(self.pub_socket, u'stream',
53 msg = self.session.send(self.pub_socket, u'stream', content=content,
53 content=content,
54 parent=self.parent_header, ident=self.topic)
54 parent=self.parent_header)
55 logger.debug(msg)
55 logger.debug(msg)
56
56 self._buffer.close()
57 self._buffer.close()
57 self._new_buffer()
58 self._new_buffer()
58
59
59 def isatty(self):
60 def isatty(self):
60 return False
61 return False
61
62
62 def next(self):
63 def next(self):
63 raise IOError('Read not supported on a write only stream.')
64 raise IOError('Read not supported on a write only stream.')
64
65
65 def read(self, size=-1):
66 def read(self, size=-1):
66 raise IOError('Read not supported on a write only stream.')
67 raise IOError('Read not supported on a write only stream.')
67
68
68 def readline(self, size=-1):
69 def readline(self, size=-1):
69 raise IOError('Read not supported on a write only stream.')
70 raise IOError('Read not supported on a write only stream.')
70
71
71 def write(self, string):
72 def write(self, string):
72 if self.pub_socket is None:
73 if self.pub_socket is None:
73 raise ValueError('I/O operation on closed file')
74 raise ValueError('I/O operation on closed file')
74 else:
75 else:
75 # We can only send raw bytes, not unicode objects, so we encode
76 # We can only send raw bytes, not unicode objects, so we encode
76 # into utf-8 for all frontends if we get unicode inputs.
77 # into utf-8 for all frontends if we get unicode inputs.
77 if type(string) == unicode:
78 if type(string) == unicode:
78 string = string.encode('utf-8')
79 string = string.encode('utf-8')
79
80
80 self._buffer.write(string)
81 self._buffer.write(string)
81 current_time = time.time()
82 current_time = time.time()
82 if self._start <= 0:
83 if self._start <= 0:
83 self._start = current_time
84 self._start = current_time
84 elif current_time - self._start > self.flush_interval:
85 elif current_time - self._start > self.flush_interval:
85 self.flush()
86 self.flush()
86
87
87 def writelines(self, sequence):
88 def writelines(self, sequence):
88 if self.pub_socket is None:
89 if self.pub_socket is None:
89 raise ValueError('I/O operation on closed file')
90 raise ValueError('I/O operation on closed file')
90 else:
91 else:
91 for string in sequence:
92 for string in sequence:
92 self.write(string)
93 self.write(string)
93
94
94 def _new_buffer(self):
95 def _new_buffer(self):
95 self._buffer = StringIO()
96 self._buffer = StringIO()
96 self._start = -1
97 self._start = -1
@@ -1,186 +1,188 b''
1 """AsyncResult objects for the client"""
1 """AsyncResult objects for the client"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.external.decorator import decorator
13 from IPython.external.decorator import decorator
14 import error
14 import error
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Classes
17 # Classes
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 @decorator
20 @decorator
21 def check_ready(f, self, *args, **kwargs):
21 def check_ready(f, self, *args, **kwargs):
22 """Call spin() to sync state prior to calling the method."""
22 """Call spin() to sync state prior to calling the method."""
23 self.wait(0)
23 self.wait(0)
24 if not self._ready:
24 if not self._ready:
25 raise error.TimeoutError("result not ready")
25 raise error.TimeoutError("result not ready")
26 return f(self, *args, **kwargs)
26 return f(self, *args, **kwargs)
27
27
28 class AsyncResult(object):
28 class AsyncResult(object):
29 """Class for representing results of non-blocking calls.
29 """Class for representing results of non-blocking calls.
30
30
31 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
31 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
32 """
32 """
33 def __init__(self, client, msg_ids, fname=''):
33 def __init__(self, client, msg_ids, fname=''):
34 self._client = client
34 self._client = client
35 self.msg_ids = msg_ids
35 self.msg_ids = msg_ids
36 self._fname=fname
36 self._fname=fname
37 self._ready = False
37 self._ready = False
38 self._success = None
38 self._success = None
39
39
40 def __repr__(self):
40 def __repr__(self):
41 if self._ready:
41 if self._ready:
42 return "<%s: finished>"%(self.__class__.__name__)
42 return "<%s: finished>"%(self.__class__.__name__)
43 else:
43 else:
44 return "<%s: %s>"%(self.__class__.__name__,self._fname)
44 return "<%s: %s>"%(self.__class__.__name__,self._fname)
45
45
46
46
47 def _reconstruct_result(self, res):
47 def _reconstruct_result(self, res):
48 """
48 """
49 Override me in subclasses for turning a list of results
49 Override me in subclasses for turning a list of results
50 into the expected form.
50 into the expected form.
51 """
51 """
52 if len(self.msg_ids) == 1:
52 if len(self.msg_ids) == 1:
53 return res[0]
53 return res[0]
54 else:
54 else:
55 return res
55 return res
56
56
57 def get(self, timeout=-1):
57 def get(self, timeout=-1):
58 """Return the result when it arrives.
58 """Return the result when it arrives.
59
59
60 If `timeout` is not ``None`` and the result does not arrive within
60 If `timeout` is not ``None`` and the result does not arrive within
61 `timeout` seconds then ``TimeoutError`` is raised. If the
61 `timeout` seconds then ``TimeoutError`` is raised. If the
62 remote call raised an exception then that exception will be reraised
62 remote call raised an exception then that exception will be reraised
63 by get().
63 by get().
64 """
64 """
65 if not self.ready():
65 if not self.ready():
66 self.wait(timeout)
66 self.wait(timeout)
67
67
68 if self._ready:
68 if self._ready:
69 if self._success:
69 if self._success:
70 return self._result
70 return self._result
71 else:
71 else:
72 raise self._exception
72 raise self._exception
73 else:
73 else:
74 raise error.TimeoutError("Result not ready.")
74 raise error.TimeoutError("Result not ready.")
75
75
76 def ready(self):
76 def ready(self):
77 """Return whether the call has completed."""
77 """Return whether the call has completed."""
78 if not self._ready:
78 if not self._ready:
79 self.wait(0)
79 self.wait(0)
80 return self._ready
80 return self._ready
81
81
82 def wait(self, timeout=-1):
82 def wait(self, timeout=-1):
83 """Wait until the result is available or until `timeout` seconds pass.
83 """Wait until the result is available or until `timeout` seconds pass.
84 """
84 """
85 if self._ready:
85 if self._ready:
86 return
86 return
87 self._ready = self._client.barrier(self.msg_ids, timeout)
87 self._ready = self._client.barrier(self.msg_ids, timeout)
88 if self._ready:
88 if self._ready:
89 try:
89 try:
90 results = map(self._client.results.get, self.msg_ids)
90 results = map(self._client.results.get, self.msg_ids)
91 self._result = results
91 self._result = results
92 results = error.collect_exceptions(results, self._fname)
92 results = error.collect_exceptions(results, self._fname)
93 self._result = self._reconstruct_result(results)
93 self._result = self._reconstruct_result(results)
94 except Exception, e:
94 except Exception, e:
95 self._exception = e
95 self._exception = e
96 self._success = False
96 self._success = False
97 else:
97 else:
98 self._success = True
98 self._success = True
99 finally:
99 finally:
100 self._metadata = map(self._client.metadata.get, self.msg_ids)
100 self._metadata = map(self._client.metadata.get, self.msg_ids)
101
101
102
102
103 def successful(self):
103 def successful(self):
104 """Return whether the call completed without raising an exception.
104 """Return whether the call completed without raising an exception.
105
105
106 Will raise ``AssertionError`` if the result is not ready.
106 Will raise ``AssertionError`` if the result is not ready.
107 """
107 """
108 assert self._ready
108 assert self._ready
109 return self._success
109 return self._success
110
110
111 #----------------------------------------------------------------
111 #----------------------------------------------------------------
112 # Extra methods not in mp.pool.AsyncResult
112 # Extra methods not in mp.pool.AsyncResult
113 #----------------------------------------------------------------
113 #----------------------------------------------------------------
114
114
115 def get_dict(self, timeout=-1):
115 def get_dict(self, timeout=-1):
116 """Get the results as a dict, keyed by engine_id."""
116 """Get the results as a dict, keyed by engine_id."""
117 results = self.get(timeout)
117 results = self.get(timeout)
118 engine_ids = [md['engine_id'] for md in self._metadata ]
118 engine_ids = [md['engine_id'] for md in self._metadata ]
119 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
119 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
120 maxcount = bycount.count(bycount[-1])
120 maxcount = bycount.count(bycount[-1])
121 if maxcount > 1:
121 if maxcount > 1:
122 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
122 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
123 maxcount, bycount[-1]))
123 maxcount, bycount[-1]))
124
124
125 return dict(zip(engine_ids,results))
125 return dict(zip(engine_ids,results))
126
126
127 @property
127 @property
128 @check_ready
128 @check_ready
129 def result(self):
129 def result(self):
130 """result property."""
130 """result property."""
131 return self._result
131 return self._result
132
132
133 @property
133 @property
134 @check_ready
134 @check_ready
135 def metadata(self):
135 def metadata(self):
136 """metadata property."""
136 """metadata property."""
137 return self._metadata
137 return self._metadata
138
138
139 @property
139 @property
140 @check_ready
141 def result_dict(self):
140 def result_dict(self):
142 """result property as a dict."""
141 """result property as a dict."""
143 return self.get_dict(0)
142 return self.get_dict(0)
144
143
144 def __dict__(self):
145 return self.get_dict(0)
146
145 #-------------------------------------
147 #-------------------------------------
146 # dict-access
148 # dict-access
147 #-------------------------------------
149 #-------------------------------------
148
150
149 @check_ready
151 @check_ready
150 def __getitem__(self, key):
152 def __getitem__(self, key):
151 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
153 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
152 """
154 """
153 if isinstance(key, int):
155 if isinstance(key, int):
154 return error.collect_exceptions([self._result[key]], self._fname)[0]
156 return error.collect_exceptions([self._result[key]], self._fname)[0]
155 elif isinstance(key, slice):
157 elif isinstance(key, slice):
156 return error.collect_exceptions(self._result[key], self._fname)
158 return error.collect_exceptions(self._result[key], self._fname)
157 elif isinstance(key, basestring):
159 elif isinstance(key, basestring):
158 return [ md[key] for md in self._metadata ]
160 return [ md[key] for md in self._metadata ]
159 else:
161 else:
160 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
162 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
161
163
162 @check_ready
164 @check_ready
163 def __getattr__(self, key):
165 def __getattr__(self, key):
164 """getattr maps to getitem for convenient access to metadata."""
166 """getattr maps to getitem for convenient access to metadata."""
165 if key not in self._metadata[0].keys():
167 if key not in self._metadata[0].keys():
166 raise AttributeError("%r object has no attribute %r"%(
168 raise AttributeError("%r object has no attribute %r"%(
167 self.__class__.__name__, key))
169 self.__class__.__name__, key))
168 return self.__getitem__(key)
170 return self.__getitem__(key)
169
171
170
172
171 class AsyncMapResult(AsyncResult):
173 class AsyncMapResult(AsyncResult):
172 """Class for representing results of non-blocking gathers.
174 """Class for representing results of non-blocking gathers.
173
175
174 This will properly reconstruct the gather.
176 This will properly reconstruct the gather.
175 """
177 """
176
178
177 def __init__(self, client, msg_ids, mapObject, fname=''):
179 def __init__(self, client, msg_ids, mapObject, fname=''):
178 self._mapObject = mapObject
180 self._mapObject = mapObject
179 AsyncResult.__init__(self, client, msg_ids, fname=fname)
181 AsyncResult.__init__(self, client, msg_ids, fname=fname)
180
182
181 def _reconstruct_result(self, res):
183 def _reconstruct_result(self, res):
182 """Perform the gather on the actual results."""
184 """Perform the gather on the actual results."""
183 return self._mapObject.joinPartitions(res)
185 return self._mapObject.joinPartitions(res)
184
186
185
187
186 __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file
188 __all__ = ['AsyncResult', 'AsyncMapResult']
@@ -1,1105 +1,1177 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import time
14 import time
15 from getpass import getpass
15 from getpass import getpass
16 from pprint import pprint
16 from pprint import pprint
17 from datetime import datetime
17 from datetime import datetime
18
18
19 import zmq
19 import zmq
20 from zmq.eventloop import ioloop, zmqstream
20 from zmq.eventloop import ioloop, zmqstream
21
21
22 from IPython.external.decorator import decorator
22 from IPython.external.decorator import decorator
23 from IPython.zmq import tunnel
23 from IPython.zmq import tunnel
24
24
25 import streamsession as ss
25 import streamsession as ss
26 # from remotenamespace import RemoteNamespace
26 # from remotenamespace import RemoteNamespace
27 from view import DirectView, LoadBalancedView
27 from view import DirectView, LoadBalancedView
28 from dependency import Dependency, depend, require
28 from dependency import Dependency, depend, require
29 import error
29 import error
30 import map as Map
30 import map as Map
31 from asyncresult import AsyncResult, AsyncMapResult
31 from asyncresult import AsyncResult, AsyncMapResult
32 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
32 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
33 from util import ReverseDict
33 from util import ReverseDict
34
34
35 #--------------------------------------------------------------------------
35 #--------------------------------------------------------------------------
36 # helpers for implementing old MEC API via client.apply
36 # helpers for implementing old MEC API via client.apply
37 #--------------------------------------------------------------------------
37 #--------------------------------------------------------------------------
38
38
39 def _push(ns):
39 def _push(ns):
40 """helper method for implementing `client.push` via `client.apply`"""
40 """helper method for implementing `client.push` via `client.apply`"""
41 globals().update(ns)
41 globals().update(ns)
42
42
43 def _pull(keys):
43 def _pull(keys):
44 """helper method for implementing `client.pull` via `client.apply`"""
44 """helper method for implementing `client.pull` via `client.apply`"""
45 g = globals()
45 g = globals()
46 if isinstance(keys, (list,tuple, set)):
46 if isinstance(keys, (list,tuple, set)):
47 for key in keys:
47 for key in keys:
48 if not g.has_key(key):
48 if not g.has_key(key):
49 raise NameError("name '%s' is not defined"%key)
49 raise NameError("name '%s' is not defined"%key)
50 return map(g.get, keys)
50 return map(g.get, keys)
51 else:
51 else:
52 if not g.has_key(keys):
52 if not g.has_key(keys):
53 raise NameError("name '%s' is not defined"%keys)
53 raise NameError("name '%s' is not defined"%keys)
54 return g.get(keys)
54 return g.get(keys)
55
55
56 def _clear():
56 def _clear():
57 """helper method for implementing `client.clear` via `client.apply`"""
57 """helper method for implementing `client.clear` via `client.apply`"""
58 globals().clear()
58 globals().clear()
59
59
60 def _execute(code):
60 def _execute(code):
61 """helper method for implementing `client.execute` via `client.apply`"""
61 """helper method for implementing `client.execute` via `client.apply`"""
62 exec code in globals()
62 exec code in globals()
63
63
64
64
65 #--------------------------------------------------------------------------
65 #--------------------------------------------------------------------------
66 # Decorators for Client methods
66 # Decorators for Client methods
67 #--------------------------------------------------------------------------
67 #--------------------------------------------------------------------------
68
68
69 @decorator
69 @decorator
70 def spinfirst(f, self, *args, **kwargs):
70 def spinfirst(f, self, *args, **kwargs):
71 """Call spin() to sync state prior to calling the method."""
71 """Call spin() to sync state prior to calling the method."""
72 self.spin()
72 self.spin()
73 return f(self, *args, **kwargs)
73 return f(self, *args, **kwargs)
74
74
75 @decorator
75 @decorator
76 def defaultblock(f, self, *args, **kwargs):
76 def defaultblock(f, self, *args, **kwargs):
77 """Default to self.block; preserve self.block."""
77 """Default to self.block; preserve self.block."""
78 block = kwargs.get('block',None)
78 block = kwargs.get('block',None)
79 block = self.block if block is None else block
79 block = self.block if block is None else block
80 saveblock = self.block
80 saveblock = self.block
81 self.block = block
81 self.block = block
82 try:
82 try:
83 ret = f(self, *args, **kwargs)
83 ret = f(self, *args, **kwargs)
84 finally:
84 finally:
85 self.block = saveblock
85 self.block = saveblock
86 return ret
86 return ret
87
87
88
88
89 #--------------------------------------------------------------------------
89 #--------------------------------------------------------------------------
90 # Classes
90 # Classes
91 #--------------------------------------------------------------------------
91 #--------------------------------------------------------------------------
92
92
93 class AbortedTask(object):
93 class AbortedTask(object):
94 """A basic wrapper object describing an aborted task."""
94 """A basic wrapper object describing an aborted task."""
95 def __init__(self, msg_id):
95 def __init__(self, msg_id):
96 self.msg_id = msg_id
96 self.msg_id = msg_id
97
97
98 class ResultDict(dict):
98 class ResultDict(dict):
99 """A subclass of dict that raises errors if it has them."""
99 """A subclass of dict that raises errors if it has them."""
100 def __getitem__(self, key):
100 def __getitem__(self, key):
101 res = dict.__getitem__(self, key)
101 res = dict.__getitem__(self, key)
102 if isinstance(res, error.KernelError):
102 if isinstance(res, error.KernelError):
103 raise res
103 raise res
104 return res
104 return res
105
105
106 class Metadata(dict):
107 """Subclass of dict for initializing metadata values."""
108 def __init__(self, *args, **kwargs):
109 dict.__init__(self)
110 md = {'msg_id' : None,
111 'submitted' : None,
112 'started' : None,
113 'completed' : None,
114 'received' : None,
115 'engine_uuid' : None,
116 'engine_id' : None,
117 'follow' : None,
118 'after' : None,
119 'status' : None,
120
121 'pyin' : None,
122 'pyout' : None,
123 'pyerr' : None,
124 'stdout' : '',
125 'stderr' : '',
126 }
127 self.update(md)
128 self.update(dict(*args, **kwargs))
129
130
131
106 class Client(object):
132 class Client(object):
107 """A semi-synchronous client to the IPython ZMQ controller
133 """A semi-synchronous client to the IPython ZMQ controller
108
134
109 Parameters
135 Parameters
110 ----------
136 ----------
111
137
112 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
138 addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
113 The address of the controller's registration socket.
139 The address of the controller's registration socket.
114 [Default: 'tcp://127.0.0.1:10101']
140 [Default: 'tcp://127.0.0.1:10101']
115 context : zmq.Context
141 context : zmq.Context
116 Pass an existing zmq.Context instance, otherwise the client will create its own
142 Pass an existing zmq.Context instance, otherwise the client will create its own
117 username : bytes
143 username : bytes
118 set username to be passed to the Session object
144 set username to be passed to the Session object
119 debug : bool
145 debug : bool
120 flag for lots of message printing for debug purposes
146 flag for lots of message printing for debug purposes
121
147
122 #-------------- ssh related args ----------------
148 #-------------- ssh related args ----------------
123 # These are args for configuring the ssh tunnel to be used
149 # These are args for configuring the ssh tunnel to be used
124 # credentials are used to forward connections over ssh to the Controller
150 # credentials are used to forward connections over ssh to the Controller
125 # Note that the ip given in `addr` needs to be relative to sshserver
151 # Note that the ip given in `addr` needs to be relative to sshserver
126 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
152 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
127 # and set sshserver as the same machine the Controller is on. However,
153 # and set sshserver as the same machine the Controller is on. However,
128 # the only requirement is that sshserver is able to see the Controller
154 # the only requirement is that sshserver is able to see the Controller
129 # (i.e. is within the same trusted network).
155 # (i.e. is within the same trusted network).
130
156
131 sshserver : str
157 sshserver : str
132 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
158 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
133 If keyfile or password is specified, and this is not, it will default to
159 If keyfile or password is specified, and this is not, it will default to
134 the ip given in addr.
160 the ip given in addr.
135 sshkey : str; path to public ssh key file
161 sshkey : str; path to public ssh key file
136 This specifies a key to be used in ssh login, default None.
162 This specifies a key to be used in ssh login, default None.
137 Regular default ssh keys will be used without specifying this argument.
163 Regular default ssh keys will be used without specifying this argument.
138 password : str;
164 password : str;
139 Your ssh password to sshserver. Note that if this is left None,
165 Your ssh password to sshserver. Note that if this is left None,
140 you will be prompted for it if passwordless key based login is unavailable.
166 you will be prompted for it if passwordless key based login is unavailable.
141
167
142 #------- exec authentication args -------
168 #------- exec authentication args -------
143 # If even localhost is untrusted, you can have some protection against
169 # If even localhost is untrusted, you can have some protection against
144 # unauthorized execution by using a key. Messages are still sent
170 # unauthorized execution by using a key. Messages are still sent
145 # as cleartext, so if someone can snoop your loopback traffic this will
171 # as cleartext, so if someone can snoop your loopback traffic this will
146 # not help anything.
172 # not help anything.
147
173
148 exec_key : str
174 exec_key : str
149 an authentication key or file containing a key
175 an authentication key or file containing a key
150 default: None
176 default: None
151
177
152
178
153 Attributes
179 Attributes
154 ----------
180 ----------
155 ids : set of int engine IDs
181 ids : set of int engine IDs
156 requesting the ids attribute always synchronizes
182 requesting the ids attribute always synchronizes
157 the registration state. To request ids without synchronization,
183 the registration state. To request ids without synchronization,
158 use semi-private _ids attributes.
184 use semi-private _ids attributes.
159
185
160 history : list of msg_ids
186 history : list of msg_ids
161 a list of msg_ids, keeping track of all the execution
187 a list of msg_ids, keeping track of all the execution
162 messages you have submitted in order.
188 messages you have submitted in order.
163
189
164 outstanding : set of msg_ids
190 outstanding : set of msg_ids
165 a set of msg_ids that have been submitted, but whose
191 a set of msg_ids that have been submitted, but whose
166 results have not yet been received.
192 results have not yet been received.
167
193
168 results : dict
194 results : dict
169 a dict of all our results, keyed by msg_id
195 a dict of all our results, keyed by msg_id
170
196
171 block : bool
197 block : bool
172 determines default behavior when block not specified
198 determines default behavior when block not specified
173 in execution methods
199 in execution methods
174
200
175 Methods
201 Methods
176 -------
202 -------
177 spin : flushes incoming results and registration state changes
203 spin : flushes incoming results and registration state changes
178 control methods spin, and requesting `ids` also ensures up to date
204 control methods spin, and requesting `ids` also ensures up to date
179
205
180 barrier : wait on one or more msg_ids
206 barrier : wait on one or more msg_ids
181
207
182 execution methods: apply/apply_bound/apply_to/apply_bound
208 execution methods: apply/apply_bound/apply_to/apply_bound
183 legacy: execute, run
209 legacy: execute, run
184
210
185 query methods: queue_status, get_result, purge
211 query methods: queue_status, get_result, purge
186
212
187 control methods: abort, kill
213 control methods: abort, kill
188
214
189 """
215 """
190
216
191
217
192 _connected=False
218 _connected=False
193 _ssh=False
219 _ssh=False
194 _engines=None
220 _engines=None
195 _addr='tcp://127.0.0.1:10101'
221 _addr='tcp://127.0.0.1:10101'
196 _registration_socket=None
222 _registration_socket=None
197 _query_socket=None
223 _query_socket=None
198 _control_socket=None
224 _control_socket=None
225 _iopub_socket=None
199 _notification_socket=None
226 _notification_socket=None
200 _mux_socket=None
227 _mux_socket=None
201 _task_socket=None
228 _task_socket=None
202 block = False
229 block = False
203 outstanding=None
230 outstanding=None
204 results = None
231 results = None
205 history = None
232 history = None
206 debug = False
233 debug = False
207 targets = None
234 targets = None
208
235
209 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
236 def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False,
210 sshserver=None, sshkey=None, password=None, paramiko=None,
237 sshserver=None, sshkey=None, password=None, paramiko=None,
211 exec_key=None,):
238 exec_key=None,):
212 if context is None:
239 if context is None:
213 context = zmq.Context()
240 context = zmq.Context()
214 self.context = context
241 self.context = context
215 self.targets = 'all'
242 self.targets = 'all'
216 self._addr = addr
243 self._addr = addr
217 self._ssh = bool(sshserver or sshkey or password)
244 self._ssh = bool(sshserver or sshkey or password)
218 if self._ssh and sshserver is None:
245 if self._ssh and sshserver is None:
219 # default to the same
246 # default to the same
220 sshserver = addr.split('://')[1].split(':')[0]
247 sshserver = addr.split('://')[1].split(':')[0]
221 if self._ssh and password is None:
248 if self._ssh and password is None:
222 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
249 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
223 password=False
250 password=False
224 else:
251 else:
225 password = getpass("SSH Password for %s: "%sshserver)
252 password = getpass("SSH Password for %s: "%sshserver)
226 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
253 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
227
254
228 if exec_key is not None and os.path.isfile(exec_key):
255 if exec_key is not None and os.path.isfile(exec_key):
229 arg = 'keyfile'
256 arg = 'keyfile'
230 else:
257 else:
231 arg = 'key'
258 arg = 'key'
232 key_arg = {arg:exec_key}
259 key_arg = {arg:exec_key}
233 if username is None:
260 if username is None:
234 self.session = ss.StreamSession(**key_arg)
261 self.session = ss.StreamSession(**key_arg)
235 else:
262 else:
236 self.session = ss.StreamSession(username, **key_arg)
263 self.session = ss.StreamSession(username, **key_arg)
237 self._registration_socket = self.context.socket(zmq.XREQ)
264 self._registration_socket = self.context.socket(zmq.XREQ)
238 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
265 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
239 if self._ssh:
266 if self._ssh:
240 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
267 tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs)
241 else:
268 else:
242 self._registration_socket.connect(addr)
269 self._registration_socket.connect(addr)
243 self._engines = ReverseDict()
270 self._engines = ReverseDict()
244 self._ids = set()
271 self._ids = set()
245 self.outstanding=set()
272 self.outstanding=set()
246 self.results = {}
273 self.results = {}
247 self.metadata = {}
274 self.metadata = {}
248 self.history = []
275 self.history = []
249 self.debug = debug
276 self.debug = debug
250 self.session.debug = debug
277 self.session.debug = debug
251
278
252 self._notification_handlers = {'registration_notification' : self._register_engine,
279 self._notification_handlers = {'registration_notification' : self._register_engine,
253 'unregistration_notification' : self._unregister_engine,
280 'unregistration_notification' : self._unregister_engine,
254 }
281 }
255 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
282 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
256 'apply_reply' : self._handle_apply_reply}
283 'apply_reply' : self._handle_apply_reply}
257 self._connect(sshserver, ssh_kwargs)
284 self._connect(sshserver, ssh_kwargs)
258
285
259
286
260 @property
287 @property
261 def ids(self):
288 def ids(self):
262 """Always up to date ids property."""
289 """Always up to date ids property."""
263 self._flush_notifications()
290 self._flush_notifications()
264 return self._ids
291 return self._ids
265
292
266 def _update_engines(self, engines):
293 def _update_engines(self, engines):
267 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
294 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
268 for k,v in engines.iteritems():
295 for k,v in engines.iteritems():
269 eid = int(k)
296 eid = int(k)
270 self._engines[eid] = bytes(v) # force not unicode
297 self._engines[eid] = bytes(v) # force not unicode
271 self._ids.add(eid)
298 self._ids.add(eid)
272
299
273 def _build_targets(self, targets):
300 def _build_targets(self, targets):
274 """Turn valid target IDs or 'all' into two lists:
301 """Turn valid target IDs or 'all' into two lists:
275 (int_ids, uuids).
302 (int_ids, uuids).
276 """
303 """
277 if targets is None:
304 if targets is None:
278 targets = self._ids
305 targets = self._ids
279 elif isinstance(targets, str):
306 elif isinstance(targets, str):
280 if targets.lower() == 'all':
307 if targets.lower() == 'all':
281 targets = self._ids
308 targets = self._ids
282 else:
309 else:
283 raise TypeError("%r not valid str target, must be 'all'"%(targets))
310 raise TypeError("%r not valid str target, must be 'all'"%(targets))
284 elif isinstance(targets, int):
311 elif isinstance(targets, int):
285 targets = [targets]
312 targets = [targets]
286 return [self._engines[t] for t in targets], list(targets)
313 return [self._engines[t] for t in targets], list(targets)
287
314
288 def _connect(self, sshserver, ssh_kwargs):
315 def _connect(self, sshserver, ssh_kwargs):
289 """setup all our socket connections to the controller. This is called from
316 """setup all our socket connections to the controller. This is called from
290 __init__."""
317 __init__."""
291 if self._connected:
318 if self._connected:
292 return
319 return
293 self._connected=True
320 self._connected=True
294
321
295 def connect_socket(s, addr):
322 def connect_socket(s, addr):
296 if self._ssh:
323 if self._ssh:
297 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
324 return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs)
298 else:
325 else:
299 return s.connect(addr)
326 return s.connect(addr)
300
327
301 self.session.send(self._registration_socket, 'connection_request')
328 self.session.send(self._registration_socket, 'connection_request')
302 idents,msg = self.session.recv(self._registration_socket,mode=0)
329 idents,msg = self.session.recv(self._registration_socket,mode=0)
303 if self.debug:
330 if self.debug:
304 pprint(msg)
331 pprint(msg)
305 msg = ss.Message(msg)
332 msg = ss.Message(msg)
306 content = msg.content
333 content = msg.content
307 if content.status == 'ok':
334 if content.status == 'ok':
308 if content.queue:
335 if content.queue:
309 self._mux_socket = self.context.socket(zmq.PAIR)
336 self._mux_socket = self.context.socket(zmq.PAIR)
310 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
337 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
311 connect_socket(self._mux_socket, content.queue)
338 connect_socket(self._mux_socket, content.queue)
312 if content.task:
339 if content.task:
313 self._task_socket = self.context.socket(zmq.PAIR)
340 self._task_socket = self.context.socket(zmq.PAIR)
314 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
341 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
315 connect_socket(self._task_socket, content.task)
342 connect_socket(self._task_socket, content.task)
316 if content.notification:
343 if content.notification:
317 self._notification_socket = self.context.socket(zmq.SUB)
344 self._notification_socket = self.context.socket(zmq.SUB)
318 connect_socket(self._notification_socket, content.notification)
345 connect_socket(self._notification_socket, content.notification)
319 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
346 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
320 if content.query:
347 if content.query:
321 self._query_socket = self.context.socket(zmq.PAIR)
348 self._query_socket = self.context.socket(zmq.PAIR)
322 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
349 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
323 connect_socket(self._query_socket, content.query)
350 connect_socket(self._query_socket, content.query)
324 if content.control:
351 if content.control:
325 self._control_socket = self.context.socket(zmq.PAIR)
352 self._control_socket = self.context.socket(zmq.PAIR)
326 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
353 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
327 connect_socket(self._control_socket, content.control)
354 connect_socket(self._control_socket, content.control)
355 if content.iopub:
356 self._iopub_socket = self.context.socket(zmq.SUB)
357 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
358 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
359 connect_socket(self._iopub_socket, content.iopub)
328 self._update_engines(dict(content.engines))
360 self._update_engines(dict(content.engines))
329
361
330 else:
362 else:
331 self._connected = False
363 self._connected = False
332 raise Exception("Failed to connect!")
364 raise Exception("Failed to connect!")
333
365
334 #--------------------------------------------------------------------------
366 #--------------------------------------------------------------------------
335 # handlers and callbacks for incoming messages
367 # handlers and callbacks for incoming messages
336 #--------------------------------------------------------------------------
368 #--------------------------------------------------------------------------
337
369
338 def _register_engine(self, msg):
370 def _register_engine(self, msg):
339 """Register a new engine, and update our connection info."""
371 """Register a new engine, and update our connection info."""
340 content = msg['content']
372 content = msg['content']
341 eid = content['id']
373 eid = content['id']
342 d = {eid : content['queue']}
374 d = {eid : content['queue']}
343 self._update_engines(d)
375 self._update_engines(d)
344 self._ids.add(int(eid))
376 self._ids.add(int(eid))
345
377
346 def _unregister_engine(self, msg):
378 def _unregister_engine(self, msg):
347 """Unregister an engine that has died."""
379 """Unregister an engine that has died."""
348 content = msg['content']
380 content = msg['content']
349 eid = int(content['id'])
381 eid = int(content['id'])
350 if eid in self._ids:
382 if eid in self._ids:
351 self._ids.remove(eid)
383 self._ids.remove(eid)
352 self._engines.pop(eid)
384 self._engines.pop(eid)
353 #
385
354 def _build_metadata(self, header, parent, content):
386 def _extract_metadata(self, header, parent, content):
355 md = {'msg_id' : parent['msg_id'],
387 md = {'msg_id' : parent['msg_id'],
356 'submitted' : datetime.strptime(parent['date'], ss.ISO8601),
388 'submitted' : datetime.strptime(parent['date'], ss.ISO8601),
357 'started' : datetime.strptime(header['started'], ss.ISO8601),
389 'started' : datetime.strptime(header['started'], ss.ISO8601),
358 'completed' : datetime.strptime(header['date'], ss.ISO8601),
390 'completed' : datetime.strptime(header['date'], ss.ISO8601),
359 'received' : datetime.now(),
391 'received' : datetime.now(),
360 'engine_uuid' : header['engine'],
392 'engine_uuid' : header['engine'],
361 'engine_id' : self._engines.get(header['engine'], None),
393 'engine_id' : self._engines.get(header['engine'], None),
362 'follow' : parent['follow'],
394 'follow' : parent['follow'],
363 'after' : parent['after'],
395 'after' : parent['after'],
364 'status' : content['status']
396 'status' : content['status'],
365 }
397 }
366 return md
398 return md
367
399
368 def _handle_execute_reply(self, msg):
400 def _handle_execute_reply(self, msg):
369 """Save the reply to an execute_request into our results.
401 """Save the reply to an execute_request into our results.
370
402
371 execute messages are never actually used. apply is used instead.
403 execute messages are never actually used. apply is used instead.
372 """
404 """
373
405
374 parent = msg['parent_header']
406 parent = msg['parent_header']
375 msg_id = parent['msg_id']
407 msg_id = parent['msg_id']
376 if msg_id not in self.outstanding:
408 if msg_id not in self.outstanding:
377 print("got unknown result: %s"%msg_id)
409 print("got unknown result: %s"%msg_id)
378 else:
410 else:
379 self.outstanding.remove(msg_id)
411 self.outstanding.remove(msg_id)
380 self.results[msg_id] = ss.unwrap_exception(msg['content'])
412 self.results[msg_id] = ss.unwrap_exception(msg['content'])
381
413
382 def _handle_apply_reply(self, msg):
414 def _handle_apply_reply(self, msg):
383 """Save the reply to an apply_request into our results."""
415 """Save the reply to an apply_request into our results."""
384 parent = msg['parent_header']
416 parent = msg['parent_header']
385 msg_id = parent['msg_id']
417 msg_id = parent['msg_id']
386 if msg_id not in self.outstanding:
418 if msg_id not in self.outstanding:
387 print ("got unknown result: %s"%msg_id)
419 print ("got unknown result: %s"%msg_id)
388 else:
420 else:
389 self.outstanding.remove(msg_id)
421 self.outstanding.remove(msg_id)
390 content = msg['content']
422 content = msg['content']
391 header = msg['header']
423 header = msg['header']
392
424
393 self.metadata[msg_id] = self._build_metadata(header, parent, content)
425 # construct metadata:
426 md = self.metadata.setdefault(msg_id, Metadata())
427 md.update(self._extract_metadata(header, parent, content))
428 self.metadata[msg_id] = md
394
429
430 # construct result:
395 if content['status'] == 'ok':
431 if content['status'] == 'ok':
396 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
432 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
397 elif content['status'] == 'aborted':
433 elif content['status'] == 'aborted':
398 self.results[msg_id] = error.AbortedTask(msg_id)
434 self.results[msg_id] = error.AbortedTask(msg_id)
399 elif content['status'] == 'resubmitted':
435 elif content['status'] == 'resubmitted':
400 # TODO: handle resubmission
436 # TODO: handle resubmission
401 pass
437 pass
402 else:
438 else:
403 e = ss.unwrap_exception(content)
439 e = ss.unwrap_exception(content)
404 e_uuid = e.engine_info['engineid']
440 e_uuid = e.engine_info['engineid']
405 eid = self._engines[e_uuid]
441 eid = self._engines[e_uuid]
406 e.engine_info['engineid'] = eid
442 e.engine_info['engineid'] = eid
407 self.results[msg_id] = e
443 self.results[msg_id] = e
408
444
409 def _flush_notifications(self):
445 def _flush_notifications(self):
410 """Flush notifications of engine registrations waiting
446 """Flush notifications of engine registrations waiting
411 in ZMQ queue."""
447 in ZMQ queue."""
412 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
448 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
413 while msg is not None:
449 while msg is not None:
414 if self.debug:
450 if self.debug:
415 pprint(msg)
451 pprint(msg)
416 msg = msg[-1]
452 msg = msg[-1]
417 msg_type = msg['msg_type']
453 msg_type = msg['msg_type']
418 handler = self._notification_handlers.get(msg_type, None)
454 handler = self._notification_handlers.get(msg_type, None)
419 if handler is None:
455 if handler is None:
420 raise Exception("Unhandled message type: %s"%msg.msg_type)
456 raise Exception("Unhandled message type: %s"%msg.msg_type)
421 else:
457 else:
422 handler(msg)
458 handler(msg)
423 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
459 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
424
460
425 def _flush_results(self, sock):
461 def _flush_results(self, sock):
426 """Flush task or queue results waiting in ZMQ queue."""
462 """Flush task or queue results waiting in ZMQ queue."""
427 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
463 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
428 while msg is not None:
464 while msg is not None:
429 if self.debug:
465 if self.debug:
430 pprint(msg)
466 pprint(msg)
431 msg = msg[-1]
467 msg = msg[-1]
432 msg_type = msg['msg_type']
468 msg_type = msg['msg_type']
433 handler = self._queue_handlers.get(msg_type, None)
469 handler = self._queue_handlers.get(msg_type, None)
434 if handler is None:
470 if handler is None:
435 raise Exception("Unhandled message type: %s"%msg.msg_type)
471 raise Exception("Unhandled message type: %s"%msg.msg_type)
436 else:
472 else:
437 handler(msg)
473 handler(msg)
438 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
474 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
439
475
440 def _flush_control(self, sock):
476 def _flush_control(self, sock):
441 """Flush replies from the control channel waiting
477 """Flush replies from the control channel waiting
442 in the ZMQ queue.
478 in the ZMQ queue.
443
479
444 Currently: ignore them."""
480 Currently: ignore them."""
445 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
481 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
446 while msg is not None:
482 while msg is not None:
447 if self.debug:
483 if self.debug:
448 pprint(msg)
484 pprint(msg)
449 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
485 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
450
486
487 def _flush_iopub(self, sock):
488 """Flush replies from the iopub channel waiting
489 in the ZMQ queue.
490 """
491 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
492 while msg is not None:
493 if self.debug:
494 pprint(msg)
495 msg = msg[-1]
496 parent = msg['parent_header']
497 msg_id = parent['msg_id']
498 content = msg['content']
499 header = msg['header']
500 msg_type = msg['msg_type']
501
502 # init metadata:
503 md = self.metadata.setdefault(msg_id, Metadata())
504
505 if msg_type == 'stream':
506 name = content['name']
507 s = md[name] or ''
508 md[name] = s + content['data']
509 elif msg_type == 'pyerr':
510 md.update({'pyerr' : ss.unwrap_exception(content)})
511 else:
512 md.update({msg_type : content['data']})
513
514 self.metadata[msg_id] = md
515
516 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
517
451 #--------------------------------------------------------------------------
518 #--------------------------------------------------------------------------
452 # getitem
519 # getitem
453 #--------------------------------------------------------------------------
520 #--------------------------------------------------------------------------
454
521
455 def __getitem__(self, key):
522 def __getitem__(self, key):
456 """Dict access returns DirectView multiplexer objects or,
523 """Dict access returns DirectView multiplexer objects or,
457 if key is None, a LoadBalancedView."""
524 if key is None, a LoadBalancedView."""
458 if key is None:
525 if key is None:
459 return LoadBalancedView(self)
526 return LoadBalancedView(self)
460 if isinstance(key, int):
527 if isinstance(key, int):
461 if key not in self.ids:
528 if key not in self.ids:
462 raise IndexError("No such engine: %i"%key)
529 raise IndexError("No such engine: %i"%key)
463 return DirectView(self, key)
530 return DirectView(self, key)
464
531
465 if isinstance(key, slice):
532 if isinstance(key, slice):
466 indices = range(len(self.ids))[key]
533 indices = range(len(self.ids))[key]
467 ids = sorted(self._ids)
534 ids = sorted(self._ids)
468 key = [ ids[i] for i in indices ]
535 key = [ ids[i] for i in indices ]
469 # newkeys = sorted(self._ids)[thekeys[k]]
536 # newkeys = sorted(self._ids)[thekeys[k]]
470
537
471 if isinstance(key, (tuple, list, xrange)):
538 if isinstance(key, (tuple, list, xrange)):
472 _,targets = self._build_targets(list(key))
539 _,targets = self._build_targets(list(key))
473 return DirectView(self, targets)
540 return DirectView(self, targets)
474 else:
541 else:
475 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
542 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
476
543
477 #--------------------------------------------------------------------------
544 #--------------------------------------------------------------------------
478 # Begin public methods
545 # Begin public methods
479 #--------------------------------------------------------------------------
546 #--------------------------------------------------------------------------
480
547
481 @property
548 @property
482 def remote(self):
549 def remote(self):
483 """property for convenient RemoteFunction generation.
550 """property for convenient RemoteFunction generation.
484
551
485 >>> @client.remote
552 >>> @client.remote
486 ... def f():
553 ... def f():
487 import os
554 import os
488 print (os.getpid())
555 print (os.getpid())
489 """
556 """
490 return remote(self, block=self.block)
557 return remote(self, block=self.block)
491
558
492 def spin(self):
559 def spin(self):
493 """Flush any registration notifications and execution results
560 """Flush any registration notifications and execution results
494 waiting in the ZMQ queue.
561 waiting in the ZMQ queue.
495 """
562 """
496 if self._notification_socket:
563 if self._notification_socket:
497 self._flush_notifications()
564 self._flush_notifications()
498 if self._mux_socket:
565 if self._mux_socket:
499 self._flush_results(self._mux_socket)
566 self._flush_results(self._mux_socket)
500 if self._task_socket:
567 if self._task_socket:
501 self._flush_results(self._task_socket)
568 self._flush_results(self._task_socket)
502 if self._control_socket:
569 if self._control_socket:
503 self._flush_control(self._control_socket)
570 self._flush_control(self._control_socket)
571 if self._iopub_socket:
572 self._flush_iopub(self._iopub_socket)
504
573
505 def barrier(self, msg_ids=None, timeout=-1):
574 def barrier(self, msg_ids=None, timeout=-1):
506 """waits on one or more `msg_ids`, for up to `timeout` seconds.
575 """waits on one or more `msg_ids`, for up to `timeout` seconds.
507
576
508 Parameters
577 Parameters
509 ----------
578 ----------
510 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
579 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
511 ints are indices to self.history
580 ints are indices to self.history
512 strs are msg_ids
581 strs are msg_ids
513 default: wait on all outstanding messages
582 default: wait on all outstanding messages
514 timeout : float
583 timeout : float
515 a time in seconds, after which to give up.
584 a time in seconds, after which to give up.
516 default is -1, which means no timeout
585 default is -1, which means no timeout
517
586
518 Returns
587 Returns
519 -------
588 -------
520 True : when all msg_ids are done
589 True : when all msg_ids are done
521 False : timeout reached, some msg_ids still outstanding
590 False : timeout reached, some msg_ids still outstanding
522 """
591 """
523 tic = time.time()
592 tic = time.time()
524 if msg_ids is None:
593 if msg_ids is None:
525 theids = self.outstanding
594 theids = self.outstanding
526 else:
595 else:
527 if isinstance(msg_ids, (int, str, AsyncResult)):
596 if isinstance(msg_ids, (int, str, AsyncResult)):
528 msg_ids = [msg_ids]
597 msg_ids = [msg_ids]
529 theids = set()
598 theids = set()
530 for msg_id in msg_ids:
599 for msg_id in msg_ids:
531 if isinstance(msg_id, int):
600 if isinstance(msg_id, int):
532 msg_id = self.history[msg_id]
601 msg_id = self.history[msg_id]
533 elif isinstance(msg_id, AsyncResult):
602 elif isinstance(msg_id, AsyncResult):
534 map(theids.add, msg_id.msg_ids)
603 map(theids.add, msg_id.msg_ids)
535 continue
604 continue
536 theids.add(msg_id)
605 theids.add(msg_id)
537 if not theids.intersection(self.outstanding):
606 if not theids.intersection(self.outstanding):
538 return True
607 return True
539 self.spin()
608 self.spin()
540 while theids.intersection(self.outstanding):
609 while theids.intersection(self.outstanding):
541 if timeout >= 0 and ( time.time()-tic ) > timeout:
610 if timeout >= 0 and ( time.time()-tic ) > timeout:
542 break
611 break
543 time.sleep(1e-3)
612 time.sleep(1e-3)
544 self.spin()
613 self.spin()
545 return len(theids.intersection(self.outstanding)) == 0
614 return len(theids.intersection(self.outstanding)) == 0
546
615
547 #--------------------------------------------------------------------------
616 #--------------------------------------------------------------------------
548 # Control methods
617 # Control methods
549 #--------------------------------------------------------------------------
618 #--------------------------------------------------------------------------
550
619
551 @spinfirst
620 @spinfirst
552 @defaultblock
621 @defaultblock
553 def clear(self, targets=None, block=None):
622 def clear(self, targets=None, block=None):
554 """Clear the namespace in target(s)."""
623 """Clear the namespace in target(s)."""
555 targets = self._build_targets(targets)[0]
624 targets = self._build_targets(targets)[0]
556 for t in targets:
625 for t in targets:
557 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
626 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
558 error = False
627 error = False
559 if self.block:
628 if self.block:
560 for i in range(len(targets)):
629 for i in range(len(targets)):
561 idents,msg = self.session.recv(self._control_socket,0)
630 idents,msg = self.session.recv(self._control_socket,0)
562 if self.debug:
631 if self.debug:
563 pprint(msg)
632 pprint(msg)
564 if msg['content']['status'] != 'ok':
633 if msg['content']['status'] != 'ok':
565 error = ss.unwrap_exception(msg['content'])
634 error = ss.unwrap_exception(msg['content'])
566 if error:
635 if error:
567 return error
636 return error
568
637
569
638
570 @spinfirst
639 @spinfirst
571 @defaultblock
640 @defaultblock
572 def abort(self, msg_ids = None, targets=None, block=None):
641 def abort(self, msg_ids = None, targets=None, block=None):
573 """Abort the execution queues of target(s)."""
642 """Abort the execution queues of target(s)."""
574 targets = self._build_targets(targets)[0]
643 targets = self._build_targets(targets)[0]
575 if isinstance(msg_ids, basestring):
644 if isinstance(msg_ids, basestring):
576 msg_ids = [msg_ids]
645 msg_ids = [msg_ids]
577 content = dict(msg_ids=msg_ids)
646 content = dict(msg_ids=msg_ids)
578 for t in targets:
647 for t in targets:
579 self.session.send(self._control_socket, 'abort_request',
648 self.session.send(self._control_socket, 'abort_request',
580 content=content, ident=t)
649 content=content, ident=t)
581 error = False
650 error = False
582 if self.block:
651 if self.block:
583 for i in range(len(targets)):
652 for i in range(len(targets)):
584 idents,msg = self.session.recv(self._control_socket,0)
653 idents,msg = self.session.recv(self._control_socket,0)
585 if self.debug:
654 if self.debug:
586 pprint(msg)
655 pprint(msg)
587 if msg['content']['status'] != 'ok':
656 if msg['content']['status'] != 'ok':
588 error = ss.unwrap_exception(msg['content'])
657 error = ss.unwrap_exception(msg['content'])
589 if error:
658 if error:
590 return error
659 return error
591
660
592 @spinfirst
661 @spinfirst
593 @defaultblock
662 @defaultblock
594 def shutdown(self, targets=None, restart=False, controller=False, block=None):
663 def shutdown(self, targets=None, restart=False, controller=False, block=None):
595 """Terminates one or more engine processes, optionally including the controller."""
664 """Terminates one or more engine processes, optionally including the controller."""
596 if controller:
665 if controller:
597 targets = 'all'
666 targets = 'all'
598 targets = self._build_targets(targets)[0]
667 targets = self._build_targets(targets)[0]
599 for t in targets:
668 for t in targets:
600 self.session.send(self._control_socket, 'shutdown_request',
669 self.session.send(self._control_socket, 'shutdown_request',
601 content={'restart':restart},ident=t)
670 content={'restart':restart},ident=t)
602 error = False
671 error = False
603 if block or controller:
672 if block or controller:
604 for i in range(len(targets)):
673 for i in range(len(targets)):
605 idents,msg = self.session.recv(self._control_socket,0)
674 idents,msg = self.session.recv(self._control_socket,0)
606 if self.debug:
675 if self.debug:
607 pprint(msg)
676 pprint(msg)
608 if msg['content']['status'] != 'ok':
677 if msg['content']['status'] != 'ok':
609 error = ss.unwrap_exception(msg['content'])
678 error = ss.unwrap_exception(msg['content'])
610
679
611 if controller:
680 if controller:
612 time.sleep(0.25)
681 time.sleep(0.25)
613 self.session.send(self._query_socket, 'shutdown_request')
682 self.session.send(self._query_socket, 'shutdown_request')
614 idents,msg = self.session.recv(self._query_socket, 0)
683 idents,msg = self.session.recv(self._query_socket, 0)
615 if self.debug:
684 if self.debug:
616 pprint(msg)
685 pprint(msg)
617 if msg['content']['status'] != 'ok':
686 if msg['content']['status'] != 'ok':
618 error = ss.unwrap_exception(msg['content'])
687 error = ss.unwrap_exception(msg['content'])
619
688
620 if error:
689 if error:
621 raise error
690 raise error
622
691
623 #--------------------------------------------------------------------------
692 #--------------------------------------------------------------------------
624 # Execution methods
693 # Execution methods
625 #--------------------------------------------------------------------------
694 #--------------------------------------------------------------------------
626
695
627 @defaultblock
696 @defaultblock
628 def execute(self, code, targets='all', block=None):
697 def execute(self, code, targets='all', block=None):
629 """Executes `code` on `targets` in blocking or nonblocking manner.
698 """Executes `code` on `targets` in blocking or nonblocking manner.
630
699
631 ``execute`` is always `bound` (affects engine namespace)
700 ``execute`` is always `bound` (affects engine namespace)
632
701
633 Parameters
702 Parameters
634 ----------
703 ----------
635 code : str
704 code : str
636 the code string to be executed
705 the code string to be executed
637 targets : int/str/list of ints/strs
706 targets : int/str/list of ints/strs
638 the engines on which to execute
707 the engines on which to execute
639 default : all
708 default : all
640 block : bool
709 block : bool
641 whether or not to wait until done to return
710 whether or not to wait until done to return
642 default: self.block
711 default: self.block
643 """
712 """
644 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
713 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
645 return result
714 return result
646
715
647 def run(self, code, block=None):
716 def run(self, code, block=None):
648 """Runs `code` on an engine.
717 """Runs `code` on an engine.
649
718
650 Calls to this are load-balanced.
719 Calls to this are load-balanced.
651
720
652 ``run`` is never `bound` (no effect on engine namespace)
721 ``run`` is never `bound` (no effect on engine namespace)
653
722
654 Parameters
723 Parameters
655 ----------
724 ----------
656 code : str
725 code : str
657 the code string to be executed
726 the code string to be executed
658 block : bool
727 block : bool
659 whether or not to wait until done
728 whether or not to wait until done
660
729
661 """
730 """
662 result = self.apply(_execute, (code,), targets=None, block=block, bound=False)
731 result = self.apply(_execute, (code,), targets=None, block=block, bound=False)
663 return result
732 return result
664
733
665 def _maybe_raise(self, result):
734 def _maybe_raise(self, result):
666 """wrapper for maybe raising an exception if apply failed."""
735 """wrapper for maybe raising an exception if apply failed."""
667 if isinstance(result, error.RemoteError):
736 if isinstance(result, error.RemoteError):
668 raise result
737 raise result
669
738
670 return result
739 return result
671
740
672 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
741 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
673 after=None, follow=None):
742 after=None, follow=None):
674 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
743 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
675
744
676 This is the central execution command for the client.
745 This is the central execution command for the client.
677
746
678 Parameters
747 Parameters
679 ----------
748 ----------
680
749
681 f : function
750 f : function
682 The fuction to be called remotely
751 The fuction to be called remotely
683 args : tuple/list
752 args : tuple/list
684 The positional arguments passed to `f`
753 The positional arguments passed to `f`
685 kwargs : dict
754 kwargs : dict
686 The keyword arguments passed to `f`
755 The keyword arguments passed to `f`
687 bound : bool (default: True)
756 bound : bool (default: True)
688 Whether to execute in the Engine(s) namespace, or in a clean
757 Whether to execute in the Engine(s) namespace, or in a clean
689 namespace not affecting the engine.
758 namespace not affecting the engine.
690 block : bool (default: self.block)
759 block : bool (default: self.block)
691 Whether to wait for the result, or return immediately.
760 Whether to wait for the result, or return immediately.
692 False:
761 False:
693 returns msg_id(s)
762 returns msg_id(s)
694 if multiple targets:
763 if multiple targets:
695 list of ids
764 list of ids
696 True:
765 True:
697 returns actual result(s) of f(*args, **kwargs)
766 returns actual result(s) of f(*args, **kwargs)
698 if multiple targets:
767 if multiple targets:
699 dict of results, by engine ID
768 dict of results, by engine ID
700 targets : int,list of ints, 'all', None
769 targets : int,list of ints, 'all', None
701 Specify the destination of the job.
770 Specify the destination of the job.
702 if None:
771 if None:
703 Submit via Task queue for load-balancing.
772 Submit via Task queue for load-balancing.
704 if 'all':
773 if 'all':
705 Run on all active engines
774 Run on all active engines
706 if list:
775 if list:
707 Run on each specified engine
776 Run on each specified engine
708 if int:
777 if int:
709 Run on single engine
778 Run on single engine
710
779
711 after : Dependency or collection of msg_ids
780 after : Dependency or collection of msg_ids
712 Only for load-balanced execution (targets=None)
781 Only for load-balanced execution (targets=None)
713 Specify a list of msg_ids as a time-based dependency.
782 Specify a list of msg_ids as a time-based dependency.
714 This job will only be run *after* the dependencies
783 This job will only be run *after* the dependencies
715 have been met.
784 have been met.
716
785
717 follow : Dependency or collection of msg_ids
786 follow : Dependency or collection of msg_ids
718 Only for load-balanced execution (targets=None)
787 Only for load-balanced execution (targets=None)
719 Specify a list of msg_ids as a location-based dependency.
788 Specify a list of msg_ids as a location-based dependency.
720 This job will only be run on an engine where this dependency
789 This job will only be run on an engine where this dependency
721 is met.
790 is met.
722
791
723 Returns
792 Returns
724 -------
793 -------
725 if block is False:
794 if block is False:
726 if single target:
795 if single target:
727 return msg_id
796 return msg_id
728 else:
797 else:
729 return list of msg_ids
798 return list of msg_ids
730 ? (should this be dict like block=True) ?
799 ? (should this be dict like block=True) ?
731 else:
800 else:
732 if single target:
801 if single target:
733 return result of f(*args, **kwargs)
802 return result of f(*args, **kwargs)
734 else:
803 else:
735 return dict of results, keyed by engine
804 return dict of results, keyed by engine
736 """
805 """
737
806
738 # defaults:
807 # defaults:
739 block = block if block is not None else self.block
808 block = block if block is not None else self.block
740 args = args if args is not None else []
809 args = args if args is not None else []
741 kwargs = kwargs if kwargs is not None else {}
810 kwargs = kwargs if kwargs is not None else {}
742
811
743 # enforce types of f,args,kwrags
812 # enforce types of f,args,kwrags
744 if not callable(f):
813 if not callable(f):
745 raise TypeError("f must be callable, not %s"%type(f))
814 raise TypeError("f must be callable, not %s"%type(f))
746 if not isinstance(args, (tuple, list)):
815 if not isinstance(args, (tuple, list)):
747 raise TypeError("args must be tuple or list, not %s"%type(args))
816 raise TypeError("args must be tuple or list, not %s"%type(args))
748 if not isinstance(kwargs, dict):
817 if not isinstance(kwargs, dict):
749 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
818 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
750
819
751 if isinstance(after, Dependency):
820 if isinstance(after, Dependency):
752 after = after.as_dict()
821 after = after.as_dict()
753 elif isinstance(after, AsyncResult):
822 elif isinstance(after, AsyncResult):
754 after=after.msg_ids
823 after=after.msg_ids
755 elif after is None:
824 elif after is None:
756 after = []
825 after = []
757 if isinstance(follow, Dependency):
826 if isinstance(follow, Dependency):
758 follow = follow.as_dict()
827 follow = follow.as_dict()
759 elif isinstance(follow, AsyncResult):
828 elif isinstance(follow, AsyncResult):
760 follow=follow.msg_ids
829 follow=follow.msg_ids
761 elif follow is None:
830 elif follow is None:
762 follow = []
831 follow = []
763 options = dict(bound=bound, block=block, after=after, follow=follow)
832 options = dict(bound=bound, block=block, after=after, follow=follow)
764
833
765 if targets is None:
834 if targets is None:
766 return self._apply_balanced(f, args, kwargs, **options)
835 return self._apply_balanced(f, args, kwargs, **options)
767 else:
836 else:
768 return self._apply_direct(f, args, kwargs, targets=targets, **options)
837 return self._apply_direct(f, args, kwargs, targets=targets, **options)
769
838
770 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
839 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
771 after=None, follow=None):
840 after=None, follow=None):
772 """The underlying method for applying functions in a load balanced
841 """The underlying method for applying functions in a load balanced
773 manner, via the task queue."""
842 manner, via the task queue."""
774
843
775 subheader = dict(after=after, follow=follow)
844 subheader = dict(after=after, follow=follow)
776 bufs = ss.pack_apply_message(f,args,kwargs)
845 bufs = ss.pack_apply_message(f,args,kwargs)
777 content = dict(bound=bound)
846 content = dict(bound=bound)
778
847
779 msg = self.session.send(self._task_socket, "apply_request",
848 msg = self.session.send(self._task_socket, "apply_request",
780 content=content, buffers=bufs, subheader=subheader)
849 content=content, buffers=bufs, subheader=subheader)
781 msg_id = msg['msg_id']
850 msg_id = msg['msg_id']
782 self.outstanding.add(msg_id)
851 self.outstanding.add(msg_id)
783 self.history.append(msg_id)
852 self.history.append(msg_id)
784 ar = AsyncResult(self, [msg_id], fname=f.__name__)
853 ar = AsyncResult(self, [msg_id], fname=f.__name__)
785 if block:
854 if block:
786 return ar.get()
855 return ar.get()
787 else:
856 else:
788 return ar
857 return ar
789
858
790 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
859 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
791 after=None, follow=None):
860 after=None, follow=None):
792 """Then underlying method for applying functions to specific engines
861 """Then underlying method for applying functions to specific engines
793 via the MUX queue."""
862 via the MUX queue."""
794
863
795 queues,targets = self._build_targets(targets)
864 queues,targets = self._build_targets(targets)
796
865
797 subheader = dict(after=after, follow=follow)
866 subheader = dict(after=after, follow=follow)
798 content = dict(bound=bound)
867 content = dict(bound=bound)
799 bufs = ss.pack_apply_message(f,args,kwargs)
868 bufs = ss.pack_apply_message(f,args,kwargs)
800
869
801 msg_ids = []
870 msg_ids = []
802 for queue in queues:
871 for queue in queues:
803 msg = self.session.send(self._mux_socket, "apply_request",
872 msg = self.session.send(self._mux_socket, "apply_request",
804 content=content, buffers=bufs,ident=queue, subheader=subheader)
873 content=content, buffers=bufs,ident=queue, subheader=subheader)
805 msg_id = msg['msg_id']
874 msg_id = msg['msg_id']
806 self.outstanding.add(msg_id)
875 self.outstanding.add(msg_id)
807 self.history.append(msg_id)
876 self.history.append(msg_id)
808 msg_ids.append(msg_id)
877 msg_ids.append(msg_id)
809 ar = AsyncResult(self, msg_ids, fname=f.__name__)
878 ar = AsyncResult(self, msg_ids, fname=f.__name__)
810 if block:
879 if block:
811 return ar.get()
880 return ar.get()
812 else:
881 else:
813 return ar
882 return ar
814
883
815 #--------------------------------------------------------------------------
884 #--------------------------------------------------------------------------
816 # Map and decorators
885 # Map and decorators
817 #--------------------------------------------------------------------------
886 #--------------------------------------------------------------------------
818
887
819 def map(self, f, *sequences):
888 def map(self, f, *sequences):
820 """Parallel version of builtin `map`, using all our engines."""
889 """Parallel version of builtin `map`, using all our engines."""
821 pf = ParallelFunction(self, f, block=self.block,
890 pf = ParallelFunction(self, f, block=self.block,
822 bound=True, targets='all')
891 bound=True, targets='all')
823 return pf.map(*sequences)
892 return pf.map(*sequences)
824
893
825 def parallel(self, bound=True, targets='all', block=True):
894 def parallel(self, bound=True, targets='all', block=True):
826 """Decorator for making a ParallelFunction."""
895 """Decorator for making a ParallelFunction."""
827 return parallel(self, bound=bound, targets=targets, block=block)
896 return parallel(self, bound=bound, targets=targets, block=block)
828
897
829 def remote(self, bound=True, targets='all', block=True):
898 def remote(self, bound=True, targets='all', block=True):
830 """Decorator for making a RemoteFunction."""
899 """Decorator for making a RemoteFunction."""
831 return remote(self, bound=bound, targets=targets, block=block)
900 return remote(self, bound=bound, targets=targets, block=block)
832
901
833 #--------------------------------------------------------------------------
902 #--------------------------------------------------------------------------
834 # Data movement
903 # Data movement
835 #--------------------------------------------------------------------------
904 #--------------------------------------------------------------------------
836
905
837 @defaultblock
906 @defaultblock
838 def push(self, ns, targets='all', block=None):
907 def push(self, ns, targets='all', block=None):
839 """Push the contents of `ns` into the namespace on `target`"""
908 """Push the contents of `ns` into the namespace on `target`"""
840 if not isinstance(ns, dict):
909 if not isinstance(ns, dict):
841 raise TypeError("Must be a dict, not %s"%type(ns))
910 raise TypeError("Must be a dict, not %s"%type(ns))
842 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
911 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
843 return result
912 return result
844
913
845 @defaultblock
914 @defaultblock
846 def pull(self, keys, targets='all', block=None):
915 def pull(self, keys, targets='all', block=None):
847 """Pull objects from `target`'s namespace by `keys`"""
916 """Pull objects from `target`'s namespace by `keys`"""
848 if isinstance(keys, str):
917 if isinstance(keys, str):
849 pass
918 pass
850 elif isinstance(keys, (list,tuple,set)):
919 elif isinstance(keys, (list,tuple,set)):
851 for key in keys:
920 for key in keys:
852 if not isinstance(key, str):
921 if not isinstance(key, str):
853 raise TypeError
922 raise TypeError
854 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
923 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
855 return result
924 return result
856
925
857 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
926 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
858 """
927 """
859 Partition a Python sequence and send the partitions to a set of engines.
928 Partition a Python sequence and send the partitions to a set of engines.
860 """
929 """
861 block = block if block is not None else self.block
930 block = block if block is not None else self.block
862 targets = self._build_targets(targets)[-1]
931 targets = self._build_targets(targets)[-1]
863 mapObject = Map.dists[dist]()
932 mapObject = Map.dists[dist]()
864 nparts = len(targets)
933 nparts = len(targets)
865 msg_ids = []
934 msg_ids = []
866 for index, engineid in enumerate(targets):
935 for index, engineid in enumerate(targets):
867 partition = mapObject.getPartition(seq, index, nparts)
936 partition = mapObject.getPartition(seq, index, nparts)
868 if flatten and len(partition) == 1:
937 if flatten and len(partition) == 1:
869 r = self.push({key: partition[0]}, targets=engineid, block=False)
938 r = self.push({key: partition[0]}, targets=engineid, block=False)
870 else:
939 else:
871 r = self.push({key: partition}, targets=engineid, block=False)
940 r = self.push({key: partition}, targets=engineid, block=False)
872 msg_ids.extend(r.msg_ids)
941 msg_ids.extend(r.msg_ids)
873 r = AsyncResult(self, msg_ids, fname='scatter')
942 r = AsyncResult(self, msg_ids, fname='scatter')
874 if block:
943 if block:
875 return r.get()
944 return r.get()
876 else:
945 else:
877 return r
946 return r
878
947
879 def gather(self, key, dist='b', targets='all', block=None):
948 def gather(self, key, dist='b', targets='all', block=None):
880 """
949 """
881 Gather a partitioned sequence on a set of engines as a single local seq.
950 Gather a partitioned sequence on a set of engines as a single local seq.
882 """
951 """
883 block = block if block is not None else self.block
952 block = block if block is not None else self.block
884
953
885 targets = self._build_targets(targets)[-1]
954 targets = self._build_targets(targets)[-1]
886 mapObject = Map.dists[dist]()
955 mapObject = Map.dists[dist]()
887 msg_ids = []
956 msg_ids = []
888 for index, engineid in enumerate(targets):
957 for index, engineid in enumerate(targets):
889 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
958 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
890
959
891 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
960 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
892 if block:
961 if block:
893 return r.get()
962 return r.get()
894 else:
963 else:
895 return r
964 return r
896
965
897 #--------------------------------------------------------------------------
966 #--------------------------------------------------------------------------
898 # Query methods
967 # Query methods
899 #--------------------------------------------------------------------------
968 #--------------------------------------------------------------------------
900
969
901 @spinfirst
970 @spinfirst
902 def get_results(self, msg_ids, status_only=False):
971 def get_results(self, msg_ids, status_only=False):
903 """Returns the result of the execute or task request with `msg_ids`.
972 """Returns the result of the execute or task request with `msg_ids`.
904
973
905 Parameters
974 Parameters
906 ----------
975 ----------
907 msg_ids : list of ints or msg_ids
976 msg_ids : list of ints or msg_ids
908 if int:
977 if int:
909 Passed as index to self.history for convenience.
978 Passed as index to self.history for convenience.
910 status_only : bool (default: False)
979 status_only : bool (default: False)
911 if False:
980 if False:
912 return the actual results
981 return the actual results
913
982
914 Returns
983 Returns
915 -------
984 -------
916
985
917 results : dict
986 results : dict
918 There will always be the keys 'pending' and 'completed', which will
987 There will always be the keys 'pending' and 'completed', which will
919 be lists of msg_ids.
988 be lists of msg_ids.
920 """
989 """
921 if not isinstance(msg_ids, (list,tuple)):
990 if not isinstance(msg_ids, (list,tuple)):
922 msg_ids = [msg_ids]
991 msg_ids = [msg_ids]
923 theids = []
992 theids = []
924 for msg_id in msg_ids:
993 for msg_id in msg_ids:
925 if isinstance(msg_id, int):
994 if isinstance(msg_id, int):
926 msg_id = self.history[msg_id]
995 msg_id = self.history[msg_id]
927 if not isinstance(msg_id, str):
996 if not isinstance(msg_id, str):
928 raise TypeError("msg_ids must be str, not %r"%msg_id)
997 raise TypeError("msg_ids must be str, not %r"%msg_id)
929 theids.append(msg_id)
998 theids.append(msg_id)
930
999
931 completed = []
1000 completed = []
932 local_results = {}
1001 local_results = {}
933 # temporarily disable local shortcut
1002 # temporarily disable local shortcut
934 # for msg_id in list(theids):
1003 # for msg_id in list(theids):
935 # if msg_id in self.results:
1004 # if msg_id in self.results:
936 # completed.append(msg_id)
1005 # completed.append(msg_id)
937 # local_results[msg_id] = self.results[msg_id]
1006 # local_results[msg_id] = self.results[msg_id]
938 # theids.remove(msg_id)
1007 # theids.remove(msg_id)
939
1008
940 if theids: # some not locally cached
1009 if theids: # some not locally cached
941 content = dict(msg_ids=theids, status_only=status_only)
1010 content = dict(msg_ids=theids, status_only=status_only)
942 msg = self.session.send(self._query_socket, "result_request", content=content)
1011 msg = self.session.send(self._query_socket, "result_request", content=content)
943 zmq.select([self._query_socket], [], [])
1012 zmq.select([self._query_socket], [], [])
944 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1013 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
945 if self.debug:
1014 if self.debug:
946 pprint(msg)
1015 pprint(msg)
947 content = msg['content']
1016 content = msg['content']
948 if content['status'] != 'ok':
1017 if content['status'] != 'ok':
949 raise ss.unwrap_exception(content)
1018 raise ss.unwrap_exception(content)
950 buffers = msg['buffers']
1019 buffers = msg['buffers']
951 else:
1020 else:
952 content = dict(completed=[],pending=[])
1021 content = dict(completed=[],pending=[])
953
1022
954 content['completed'].extend(completed)
1023 content['completed'].extend(completed)
955
1024
956 if status_only:
1025 if status_only:
957 return content
1026 return content
958
1027
959 failures = []
1028 failures = []
960 # load cached results into result:
1029 # load cached results into result:
961 content.update(local_results)
1030 content.update(local_results)
962 # update cache with results:
1031 # update cache with results:
963 for msg_id in sorted(theids):
1032 for msg_id in sorted(theids):
964 if msg_id in content['completed']:
1033 if msg_id in content['completed']:
965 rec = content[msg_id]
1034 rec = content[msg_id]
966 parent = rec['header']
1035 parent = rec['header']
967 header = rec['result_header']
1036 header = rec['result_header']
968 rcontent = rec['result_content']
1037 rcontent = rec['result_content']
1038 iodict = rec['io']
969 if isinstance(rcontent, str):
1039 if isinstance(rcontent, str):
970 rcontent = self.session.unpack(rcontent)
1040 rcontent = self.session.unpack(rcontent)
971
1041
972 self.metadata[msg_id] = self._build_metadata(header, parent, rcontent)
1042 md = self.metadata.setdefault(msg_id, Metadata())
1043 md.update(self._extract_metadata(header, parent, rcontent))
1044 md.update(iodict)
973
1045
974 if rcontent['status'] == 'ok':
1046 if rcontent['status'] == 'ok':
975 res,buffers = ss.unserialize_object(buffers)
1047 res,buffers = ss.unserialize_object(buffers)
976 else:
1048 else:
977 res = ss.unwrap_exception(rcontent)
1049 res = ss.unwrap_exception(rcontent)
978 failures.append(res)
1050 failures.append(res)
979
1051
980 self.results[msg_id] = res
1052 self.results[msg_id] = res
981 content[msg_id] = res
1053 content[msg_id] = res
982
1054
983 error.collect_exceptions(failures, "get_results")
1055 error.collect_exceptions(failures, "get_results")
984 return content
1056 return content
985
1057
986 @spinfirst
1058 @spinfirst
987 def queue_status(self, targets=None, verbose=False):
1059 def queue_status(self, targets=None, verbose=False):
988 """Fetch the status of engine queues.
1060 """Fetch the status of engine queues.
989
1061
990 Parameters
1062 Parameters
991 ----------
1063 ----------
992 targets : int/str/list of ints/strs
1064 targets : int/str/list of ints/strs
993 the engines on which to execute
1065 the engines on which to execute
994 default : all
1066 default : all
995 verbose : bool
1067 verbose : bool
996 Whether to return lengths only, or lists of ids for each element
1068 Whether to return lengths only, or lists of ids for each element
997 """
1069 """
998 targets = self._build_targets(targets)[1]
1070 targets = self._build_targets(targets)[1]
999 content = dict(targets=targets, verbose=verbose)
1071 content = dict(targets=targets, verbose=verbose)
1000 self.session.send(self._query_socket, "queue_request", content=content)
1072 self.session.send(self._query_socket, "queue_request", content=content)
1001 idents,msg = self.session.recv(self._query_socket, 0)
1073 idents,msg = self.session.recv(self._query_socket, 0)
1002 if self.debug:
1074 if self.debug:
1003 pprint(msg)
1075 pprint(msg)
1004 content = msg['content']
1076 content = msg['content']
1005 status = content.pop('status')
1077 status = content.pop('status')
1006 if status != 'ok':
1078 if status != 'ok':
1007 raise ss.unwrap_exception(content)
1079 raise ss.unwrap_exception(content)
1008 return ss.rekey(content)
1080 return ss.rekey(content)
1009
1081
1010 @spinfirst
1082 @spinfirst
1011 def purge_results(self, msg_ids=[], targets=[]):
1083 def purge_results(self, msg_ids=[], targets=[]):
1012 """Tell the controller to forget results.
1084 """Tell the controller to forget results.
1013
1085
1014 Individual results can be purged by msg_id, or the entire
1086 Individual results can be purged by msg_id, or the entire
1015 history of specific targets can be purged.
1087 history of specific targets can be purged.
1016
1088
1017 Parameters
1089 Parameters
1018 ----------
1090 ----------
1019 msg_ids : str or list of strs
1091 msg_ids : str or list of strs
1020 the msg_ids whose results should be forgotten.
1092 the msg_ids whose results should be forgotten.
1021 targets : int/str/list of ints/strs
1093 targets : int/str/list of ints/strs
1022 The targets, by uuid or int_id, whose entire history is to be purged.
1094 The targets, by uuid or int_id, whose entire history is to be purged.
1023 Use `targets='all'` to scrub everything from the controller's memory.
1095 Use `targets='all'` to scrub everything from the controller's memory.
1024
1096
1025 default : None
1097 default : None
1026 """
1098 """
1027 if not targets and not msg_ids:
1099 if not targets and not msg_ids:
1028 raise ValueError
1100 raise ValueError
1029 if targets:
1101 if targets:
1030 targets = self._build_targets(targets)[1]
1102 targets = self._build_targets(targets)[1]
1031 content = dict(targets=targets, msg_ids=msg_ids)
1103 content = dict(targets=targets, msg_ids=msg_ids)
1032 self.session.send(self._query_socket, "purge_request", content=content)
1104 self.session.send(self._query_socket, "purge_request", content=content)
1033 idents, msg = self.session.recv(self._query_socket, 0)
1105 idents, msg = self.session.recv(self._query_socket, 0)
1034 if self.debug:
1106 if self.debug:
1035 pprint(msg)
1107 pprint(msg)
1036 content = msg['content']
1108 content = msg['content']
1037 if content['status'] != 'ok':
1109 if content['status'] != 'ok':
1038 raise ss.unwrap_exception(content)
1110 raise ss.unwrap_exception(content)
1039
1111
1040 #----------------------------------------
1112 #----------------------------------------
1041 # activate for %px,%autopx magics
1113 # activate for %px,%autopx magics
1042 #----------------------------------------
1114 #----------------------------------------
1043 def activate(self):
1115 def activate(self):
1044 """Make this `View` active for parallel magic commands.
1116 """Make this `View` active for parallel magic commands.
1045
1117
1046 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1118 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1047 In a given IPython session there is a single active one. While
1119 In a given IPython session there is a single active one. While
1048 there can be many `Views` created and used by the user,
1120 there can be many `Views` created and used by the user,
1049 there is only one active one. The active `View` is used whenever
1121 there is only one active one. The active `View` is used whenever
1050 the magic commands %px and %autopx are used.
1122 the magic commands %px and %autopx are used.
1051
1123
1052 The activate() method is called on a given `View` to make it
1124 The activate() method is called on a given `View` to make it
1053 active. Once this has been done, the magic commands can be used.
1125 active. Once this has been done, the magic commands can be used.
1054 """
1126 """
1055
1127
1056 try:
1128 try:
1057 # This is injected into __builtins__.
1129 # This is injected into __builtins__.
1058 ip = get_ipython()
1130 ip = get_ipython()
1059 except NameError:
1131 except NameError:
1060 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1132 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1061 else:
1133 else:
1062 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1134 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1063 if pmagic is not None:
1135 if pmagic is not None:
1064 pmagic.active_multiengine_client = self
1136 pmagic.active_multiengine_client = self
1065 else:
1137 else:
1066 print "You must first load the parallelmagic extension " \
1138 print "You must first load the parallelmagic extension " \
1067 "by doing '%load_ext parallelmagic'"
1139 "by doing '%load_ext parallelmagic'"
1068
1140
1069 class AsynClient(Client):
1141 class AsynClient(Client):
1070 """An Asynchronous client, using the Tornado Event Loop.
1142 """An Asynchronous client, using the Tornado Event Loop.
1071 !!!unfinished!!!"""
1143 !!!unfinished!!!"""
1072 io_loop = None
1144 io_loop = None
1073 _queue_stream = None
1145 _queue_stream = None
1074 _notifier_stream = None
1146 _notifier_stream = None
1075 _task_stream = None
1147 _task_stream = None
1076 _control_stream = None
1148 _control_stream = None
1077
1149
1078 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1150 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1079 Client.__init__(self, addr, context, username, debug)
1151 Client.__init__(self, addr, context, username, debug)
1080 if io_loop is None:
1152 if io_loop is None:
1081 io_loop = ioloop.IOLoop.instance()
1153 io_loop = ioloop.IOLoop.instance()
1082 self.io_loop = io_loop
1154 self.io_loop = io_loop
1083
1155
1084 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1156 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1085 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1157 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1086 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1158 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1087 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1159 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1088
1160
1089 def spin(self):
1161 def spin(self):
1090 for stream in (self.queue_stream, self.notifier_stream,
1162 for stream in (self.queue_stream, self.notifier_stream,
1091 self.task_stream, self.control_stream):
1163 self.task_stream, self.control_stream):
1092 stream.flush()
1164 stream.flush()
1093
1165
1094 __all__ = [ 'Client',
1166 __all__ = [ 'Client',
1095 'depend',
1167 'depend',
1096 'require',
1168 'require',
1097 'remote',
1169 'remote',
1098 'parallel',
1170 'parallel',
1099 'RemoteFunction',
1171 'RemoteFunction',
1100 'ParallelFunction',
1172 'ParallelFunction',
1101 'DirectView',
1173 'DirectView',
1102 'LoadBalancedView',
1174 'LoadBalancedView',
1103 'AsyncResult',
1175 'AsyncResult',
1104 'AsyncMapResult'
1176 'AsyncMapResult'
1105 ]
1177 ]
@@ -1,227 +1,254 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import os
18 import os
19 import time
19 import time
20 from multiprocessing import Process
20 from multiprocessing import Process
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25 from zmq.devices import ProcessMonitoredQueue
25 from zmq.devices import ProcessMonitoredQueue
26
26
27 # internal:
27 # internal:
28 from IPython.zmq.entry_point import bind_port
28 from IPython.zmq.entry_point import bind_port
29
29
30 from hub import Hub
30 from hub import Hub
31 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
31 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
32 connect_logger, parse_url, signal_children, generate_exec_key)
32 connect_logger, parse_url, signal_children, generate_exec_key)
33
33
34
34
35 import streamsession as session
35 import streamsession as session
36 import heartmonitor
36 import heartmonitor
37 from scheduler import launch_scheduler
37 from scheduler import launch_scheduler
38
38
39 from dictdb import DictDB
39 from dictdb import DictDB
40 try:
40 try:
41 import pymongo
41 import pymongo
42 except ImportError:
42 except ImportError:
43 MongoDB=None
43 MongoDB=None
44 else:
44 else:
45 from mongodb import MongoDB
45 from mongodb import MongoDB
46
46
47 #-------------------------------------------------------------------------
47 #-------------------------------------------------------------------------
48 # Entry Point
48 # Entry Point
49 #-------------------------------------------------------------------------
49 #-------------------------------------------------------------------------
50
50
51 def make_argument_parser():
51 def make_argument_parser():
52 """Make an argument parser"""
52 """Make an argument parser"""
53 parser = make_base_argument_parser()
53 parser = make_base_argument_parser()
54
54
55 parser.add_argument('--client', type=int, metavar='PORT', default=0,
55 parser.add_argument('--client', type=int, metavar='PORT', default=0,
56 help='set the XREP port for clients [default: random]')
56 help='set the XREP port for clients [default: random]')
57 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
57 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
58 help='set the PUB socket for registration notification [default: random]')
58 help='set the PUB socket for registration notification [default: random]')
59 parser.add_argument('--hb', type=str, metavar='PORTS',
59 parser.add_argument('--hb', type=str, metavar='PORTS',
60 help='set the 2 ports for heartbeats [default: random]')
60 help='set the 2 ports for heartbeats [default: random]')
61 parser.add_argument('--ping', type=int, default=3000,
61 parser.add_argument('--ping', type=int, default=100,
62 help='set the heartbeat period in ms [default: 3000]')
62 help='set the heartbeat period in ms [default: 100]')
63 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
63 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
64 help='set the SUB port for queue monitoring [default: random]')
64 help='set the SUB port for queue monitoring [default: random]')
65 parser.add_argument('--mux', type=str, metavar='PORTS',
65 parser.add_argument('--mux', type=str, metavar='PORTS',
66 help='set the XREP ports for the MUX queue [default: random]')
66 help='set the XREP ports for the MUX queue [default: random]')
67 parser.add_argument('--task', type=str, metavar='PORTS',
67 parser.add_argument('--task', type=str, metavar='PORTS',
68 help='set the XREP/XREQ ports for the task queue [default: random]')
68 help='set the XREP/XREQ ports for the task queue [default: random]')
69 parser.add_argument('--control', type=str, metavar='PORTS',
69 parser.add_argument('--control', type=str, metavar='PORTS',
70 help='set the XREP ports for the control queue [default: random]')
70 help='set the XREP ports for the control queue [default: random]')
71 parser.add_argument('--scheduler', type=str, default='pure',
71 parser.add_argument('--iopub', type=str, metavar='PORTS',
72 help='set the PUB/SUB ports for the iopub relay [default: random]')
73 parser.add_argument('--scheduler', type=str, default='lru',
72 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
74 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
73 help='select the task scheduler [default: pure ZMQ]')
75 help='select the task scheduler [default: Python LRU]')
74 parser.add_argument('--mongodb', action='store_true',
76 parser.add_argument('--mongodb', action='store_true',
75 help='Use MongoDB task storage [default: in-memory]')
77 help='Use MongoDB task storage [default: in-memory]')
78 parser.add_argument('--session', type=str, default=None,
79 help='Manually specify the session id.')
76
80
77 return parser
81 return parser
78
82
79 def main(argv=None):
83 def main(argv=None):
80
84
81 parser = make_argument_parser()
85 parser = make_argument_parser()
82
86
83 args = parser.parse_args(argv)
87 args = parser.parse_args(argv)
84 parse_url(args)
88 parse_url(args)
85
89
86 iface="%s://%s"%(args.transport,args.ip)+':%i'
90 iface="%s://%s"%(args.transport,args.ip)+':%i'
87
91
88 random_ports = 0
92 random_ports = 0
89 if args.hb:
93 if args.hb:
90 hb = split_ports(args.hb, 2)
94 hb = split_ports(args.hb, 2)
91 else:
95 else:
92 hb = select_random_ports(2)
96 hb = select_random_ports(2)
93 if args.mux:
97 if args.mux:
94 mux = split_ports(args.mux, 2)
98 mux = split_ports(args.mux, 2)
95 else:
99 else:
96 mux = None
100 mux = None
97 random_ports += 2
101 random_ports += 2
102 if args.iopub:
103 iopub = split_ports(args.iopub, 2)
104 else:
105 iopub = None
106 random_ports += 2
98 if args.task:
107 if args.task:
99 task = split_ports(args.task, 2)
108 task = split_ports(args.task, 2)
100 else:
109 else:
101 task = None
110 task = None
102 random_ports += 2
111 random_ports += 2
103 if args.control:
112 if args.control:
104 control = split_ports(args.control, 2)
113 control = split_ports(args.control, 2)
105 else:
114 else:
106 control = None
115 control = None
107 random_ports += 2
116 random_ports += 2
108
117
109 ctx = zmq.Context()
118 ctx = zmq.Context()
110 loop = ioloop.IOLoop.instance()
119 loop = ioloop.IOLoop.instance()
111
120
112 # setup logging
121 # setup logging
113 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
122 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
114
123
115 # Registrar socket
124 # Registrar socket
116 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
125 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
117 regport = bind_port(reg, args.ip, args.regport)
126 regport = bind_port(reg, args.ip, args.regport)
118
127
119 ### Engine connections ###
128 ### Engine connections ###
120
129
121 # heartbeat
130 # heartbeat
122 hpub = ctx.socket(zmq.PUB)
131 hpub = ctx.socket(zmq.PUB)
123 bind_port(hpub, args.ip, hb[0])
132 bind_port(hpub, args.ip, hb[0])
124 hrep = ctx.socket(zmq.XREP)
133 hrep = ctx.socket(zmq.XREP)
125 bind_port(hrep, args.ip, hb[1])
134 bind_port(hrep, args.ip, hb[1])
126
135
127 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
136 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
128 hmon.start()
137 hmon.start()
129
138
130 ### Client connections ###
139 ### Client connections ###
131 # Clientele socket
140 # Clientele socket
132 c = ZMQStream(ctx.socket(zmq.XREP), loop)
141 c = ZMQStream(ctx.socket(zmq.XREP), loop)
133 cport = bind_port(c, args.ip, args.client)
142 cport = bind_port(c, args.ip, args.client)
134 # Notifier socket
143 # Notifier socket
135 n = ZMQStream(ctx.socket(zmq.PUB), loop)
144 n = ZMQStream(ctx.socket(zmq.PUB), loop)
136 nport = bind_port(n, args.ip, args.notice)
145 nport = bind_port(n, args.ip, args.notice)
137
146
138 ### Key File ###
147 ### Key File ###
139 if args.execkey and not os.path.isfile(args.execkey):
148 if args.execkey and not os.path.isfile(args.execkey):
140 generate_exec_key(args.execkey)
149 generate_exec_key(args.execkey)
141
150
142 thesession = session.StreamSession(username=args.ident or "controller", keyfile=args.execkey)
151 thesession = session.StreamSession(username=args.ident or "controller",
152 keyfile=args.execkey, session=args.session)
143
153
144 ### build and launch the queues ###
154 ### build and launch the queues ###
145
155
146 # monitor socket
156 # monitor socket
147 sub = ctx.socket(zmq.SUB)
157 sub = ctx.socket(zmq.SUB)
148 sub.setsockopt(zmq.SUBSCRIBE, "")
158 sub.setsockopt(zmq.SUBSCRIBE, "")
149 monport = bind_port(sub, args.ip, args.monitor)
159 monport = bind_port(sub, args.ip, args.monitor)
150 sub = ZMQStream(sub, loop)
160 sub = ZMQStream(sub, loop)
151
161
152 ports = select_random_ports(random_ports)
162 ports = select_random_ports(random_ports)
153 children = []
163 children = []
164
165 # IOPub relay (in a Process)
166 if not iopub:
167 iopub = (ports.pop(),ports.pop())
168 q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A')
169 q.bind_in(iface%iopub[1])
170 q.bind_out(iface%iopub[0])
171 q.setsockopt_in(zmq.SUBSCRIBE, '')
172 q.connect_mon(iface%monport)
173 q.daemon=True
174 q.start()
175 children.append(q.launcher)
176
154 # Multiplexer Queue (in a Process)
177 # Multiplexer Queue (in a Process)
155 if not mux:
178 if not mux:
156 mux = (ports.pop(),ports.pop())
179 mux = (ports.pop(),ports.pop())
157 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
180 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
158 q.bind_in(iface%mux[0])
181 q.bind_in(iface%mux[0])
159 q.bind_out(iface%mux[1])
182 q.bind_out(iface%mux[1])
160 q.connect_mon(iface%monport)
183 q.connect_mon(iface%monport)
161 q.daemon=True
184 q.daemon=True
162 q.start()
185 q.start()
163 children.append(q.launcher)
186 children.append(q.launcher)
164
187
165 # Control Queue (in a Process)
188 # Control Queue (in a Process)
166 if not control:
189 if not control:
167 control = (ports.pop(),ports.pop())
190 control = (ports.pop(),ports.pop())
168 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
191 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
169 q.bind_in(iface%control[0])
192 q.bind_in(iface%control[0])
170 q.bind_out(iface%control[1])
193 q.bind_out(iface%control[1])
171 q.connect_mon(iface%monport)
194 q.connect_mon(iface%monport)
172 q.daemon=True
195 q.daemon=True
173 q.start()
196 q.start()
174 children.append(q.launcher)
197 children.append(q.launcher)
175 # Task Queue (in a Process)
198 # Task Queue (in a Process)
176 if not task:
199 if not task:
177 task = (ports.pop(),ports.pop())
200 task = (ports.pop(),ports.pop())
178 if args.scheduler == 'pure':
201 if args.scheduler == 'pure':
179 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
202 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
180 q.bind_in(iface%task[0])
203 q.bind_in(iface%task[0])
181 q.bind_out(iface%task[1])
204 q.bind_out(iface%task[1])
182 q.connect_mon(iface%monport)
205 q.connect_mon(iface%monport)
183 q.daemon=True
206 q.daemon=True
184 q.start()
207 q.start()
185 children.append(q.launcher)
208 children.append(q.launcher)
186 else:
209 else:
187 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
210 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
188 print (sargs)
211 print (sargs)
189 q = Process(target=launch_scheduler, args=sargs)
212 q = Process(target=launch_scheduler, args=sargs)
190 q.daemon=True
213 q.daemon=True
191 q.start()
214 q.start()
192 children.append(q)
215 children.append(q)
193
216
194 if args.mongodb:
217 if args.mongodb:
195 from mongodb import MongoDB
218 from mongodb import MongoDB
196 db = MongoDB(thesession.session)
219 db = MongoDB(thesession.session)
197 else:
220 else:
198 db = DictDB()
221 db = DictDB()
199 time.sleep(.25)
222 time.sleep(.25)
200
223
201 # build connection dicts
224 # build connection dicts
202 engine_addrs = {
225 engine_addrs = {
203 'control' : iface%control[1],
226 'control' : iface%control[1],
204 'queue': iface%mux[1],
227 'queue': iface%mux[1],
205 'heartbeat': (iface%hb[0], iface%hb[1]),
228 'heartbeat': (iface%hb[0], iface%hb[1]),
206 'task' : iface%task[1],
229 'task' : iface%task[1],
230 'iopub' : iface%iopub[1],
207 'monitor' : iface%monport,
231 'monitor' : iface%monport,
208 }
232 }
209
233
210 client_addrs = {
234 client_addrs = {
211 'control' : iface%control[0],
235 'control' : iface%control[0],
212 'query': iface%cport,
236 'query': iface%cport,
213 'queue': iface%mux[0],
237 'queue': iface%mux[0],
214 'task' : iface%task[0],
238 'task' : iface%task[0],
239 'iopub' : iface%iopub[0],
215 'notification': iface%nport
240 'notification': iface%nport
216 }
241 }
242
243 # register relay of signals to the children
217 signal_children(children)
244 signal_children(children)
218 hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
245 hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
219 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
246 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
220 dc.start()
247 dc.start()
221 loop.start()
248 loop.start()
222
249
223
250
224
251
225
252
226 if __name__ == '__main__':
253 if __name__ == '__main__':
227 main()
254 main()
@@ -1,146 +1,139 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple engine that talks to a controller over 0MQ.
2 """A simple engine that talks to a controller over 0MQ.
3 it handles registration, etc. and launches a kernel
3 it handles registration, etc. and launches a kernel
4 connected to the Controller's queue(s).
4 connected to the Controller's queue(s).
5 """
5 """
6 from __future__ import print_function
6 from __future__ import print_function
7 import sys
7 import sys
8 import time
8 import time
9 import traceback
9 import traceback
10 import uuid
10 import uuid
11 from pprint import pprint
11 from pprint import pprint
12
12
13 import zmq
13 import zmq
14 from zmq.eventloop import ioloop, zmqstream
14 from zmq.eventloop import ioloop, zmqstream
15
15
16 from IPython.utils.traitlets import HasTraits
16 from IPython.utils.traitlets import HasTraits
17 from IPython.utils.localinterfaces import LOCALHOST
17 from IPython.utils.localinterfaces import LOCALHOST
18
18
19 from streamsession import Message, StreamSession
19 from streamsession import Message, StreamSession
20 from client import Client
20 from client import Client
21 from streamkernel import Kernel, make_kernel
21 from streamkernel import Kernel, make_kernel
22 import heartmonitor
22 import heartmonitor
23 from entry_point import make_base_argument_parser, connect_logger, parse_url
23 from entry_point import make_base_argument_parser, connect_logger, parse_url
24 # import taskthread
24 # import taskthread
25 # from log import logger
25 # from log import logger
26
26
27
27
28 def printer(*msg):
28 def printer(*msg):
29 pprint(msg)
29 pprint(msg, stream=sys.__stdout__)
30
30
31 class Engine(object):
31 class Engine(object):
32 """IPython engine"""
32 """IPython engine"""
33
33
34 id=None
34 id=None
35 context=None
35 context=None
36 loop=None
36 loop=None
37 session=None
37 session=None
38 ident=None
38 ident=None
39 registrar=None
39 registrar=None
40 heart=None
40 heart=None
41 kernel=None
41 kernel=None
42 user_ns=None
42 user_ns=None
43
43
44 def __init__(self, context, loop, session, registrar, client=None, ident=None,
44 def __init__(self, context, loop, session, registrar, client=None, ident=None,
45 heart_id=None, user_ns=None):
45 heart_id=None, user_ns=None):
46 self.context = context
46 self.context = context
47 self.loop = loop
47 self.loop = loop
48 self.session = session
48 self.session = session
49 self.registrar = registrar
49 self.registrar = registrar
50 self.client = client
50 self.client = client
51 self.ident = ident if ident else str(uuid.uuid4())
51 self.ident = ident if ident else str(uuid.uuid4())
52 self.registrar.on_send(printer)
52 self.registrar.on_send(printer)
53 self.user_ns = user_ns
53 self.user_ns = user_ns
54
54
55 def register(self):
55 def register(self):
56
56
57 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
57 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
58 self.registrar.on_recv(self.complete_registration)
58 self.registrar.on_recv(self.complete_registration)
59 # print (self.session.key)
59 # print (self.session.key)
60 self.session.send(self.registrar, "registration_request",content=content)
60 self.session.send(self.registrar, "registration_request",content=content)
61
61
62 def complete_registration(self, msg):
62 def complete_registration(self, msg):
63 # print msg
63 # print msg
64 idents,msg = self.session.feed_identities(msg)
64 idents,msg = self.session.feed_identities(msg)
65 msg = Message(self.session.unpack_message(msg))
65 msg = Message(self.session.unpack_message(msg))
66 if msg.content.status == 'ok':
66 if msg.content.status == 'ok':
67 self.session.username = str(msg.content.id)
67 self.session.username = str(msg.content.id)
68 queue_addr = msg.content.queue
68 queue_addr = msg.content.queue
69 shell_addrs = [str(queue_addr)]
69 shell_addrs = [str(queue_addr)]
70 control_addr = str(msg.content.control)
70 control_addr = str(msg.content.control)
71 task_addr = msg.content.task
71 task_addr = msg.content.task
72 iopub_addr = msg.content.iopub
72 if task_addr:
73 if task_addr:
73 shell_addrs.append(str(task_addr))
74 shell_addrs.append(str(task_addr))
74
75
75 hb_addrs = msg.content.heartbeat
76 hb_addrs = msg.content.heartbeat
76 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
77 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
77
78 # placeholder for no, since pub isn't hooked up:
79 sub = self.context.socket(zmq.SUB)
80 sub = zmqstream.ZMQStream(sub, self.loop)
81 sub.on_recv(lambda *a: None)
82 port = sub.bind_to_random_port("tcp://%s"%LOCALHOST)
83 iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345)
84
85 k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
78 k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
86 hb_addrs, client_addr=None, loop=self.loop,
79 hb_addrs, client_addr=None, loop=self.loop,
87 context=self.context, key=self.session.key)[-1]
80 context=self.context, key=self.session.key)[-1]
88 self.kernel = k
81 self.kernel = k
89 if self.user_ns is not None:
82 if self.user_ns is not None:
90 self.user_ns.update(self.kernel.user_ns)
83 self.user_ns.update(self.kernel.user_ns)
91 self.kernel.user_ns = self.user_ns
84 self.kernel.user_ns = self.user_ns
92
85
93 else:
86 else:
94 # logger.error("Registration Failed: %s"%msg)
87 # logger.error("Registration Failed: %s"%msg)
95 raise Exception("Registration Failed: %s"%msg)
88 raise Exception("Registration Failed: %s"%msg)
96
89
97 # logger.info("engine::completed registration with id %s"%self.session.username)
90 # logger.info("engine::completed registration with id %s"%self.session.username)
98
91
99 print (msg)
92 print (msg,file=sys.__stdout__)
100
93
101 def unregister(self):
94 def unregister(self):
102 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
95 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
103 time.sleep(1)
96 time.sleep(1)
104 sys.exit(0)
97 sys.exit(0)
105
98
106 def start(self):
99 def start(self):
107 print ("registering")
100 print ("registering",file=sys.__stdout__)
108 self.register()
101 self.register()
109
102
110
103
111
104
112 def main(argv=None, user_ns=None):
105 def main(argv=None, user_ns=None):
113
106
114 parser = make_base_argument_parser()
107 parser = make_base_argument_parser()
115
108
116 args = parser.parse_args(argv)
109 args = parser.parse_args(argv)
117
110
118 parse_url(args)
111 parse_url(args)
119
112
120 iface="%s://%s"%(args.transport,args.ip)+':%i'
113 iface="%s://%s"%(args.transport,args.ip)+':%i'
121
114
122 loop = ioloop.IOLoop.instance()
115 loop = ioloop.IOLoop.instance()
123 session = StreamSession(keyfile=args.execkey)
116 session = StreamSession(keyfile=args.execkey)
124 # print (session.key)
117 # print (session.key)
125 ctx = zmq.Context()
118 ctx = zmq.Context()
126
119
127 # setup logging
120 # setup logging
128 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
121 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
129
122
130 reg_conn = iface % args.regport
123 reg_conn = iface % args.regport
131 print (reg_conn)
124 print (reg_conn, file=sys.__stdout__)
132 print ("Starting the engine...", file=sys.__stderr__)
125 print ("Starting the engine...", file=sys.__stderr__)
133
126
134 reg = ctx.socket(zmq.PAIR)
127 reg = ctx.socket(zmq.PAIR)
135 reg.connect(reg_conn)
128 reg.connect(reg_conn)
136 reg = zmqstream.ZMQStream(reg, loop)
129 reg = zmqstream.ZMQStream(reg, loop)
137 client = None
130 client = None
138
131
139 e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns)
132 e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns)
140 dc = ioloop.DelayedCallback(e.start, 100, loop)
133 dc = ioloop.DelayedCallback(e.start, 100, loop)
141 dc.start()
134 dc.start()
142 loop.start()
135 loop.start()
143
136
144 # Execution as a script
137 # Execution as a script
145 if __name__ == '__main__':
138 if __name__ == '__main__':
146 main()
139 main()
@@ -1,116 +1,121 b''
1 """ Defines helper functions for creating kernel entry points and process
1 """ Defines helper functions for creating kernel entry points and process
2 launchers.
2 launchers.
3 """
3 """
4
4
5 # Standard library imports.
5 # Standard library imports.
6 import logging
6 import logging
7 import atexit
7 import atexit
8 import sys
8 import sys
9 import os
9 import os
10 import stat
10 import stat
11 import socket
11 import socket
12 from subprocess import Popen, PIPE
12 from subprocess import Popen, PIPE
13 from signal import signal, SIGINT, SIGABRT, SIGTERM
13 from signal import signal, SIGINT, SIGABRT, SIGTERM
14 try:
14 try:
15 from signal import SIGKILL
15 from signal import SIGKILL
16 except ImportError:
16 except ImportError:
17 SIGKILL=None
17 SIGKILL=None
18
18
19 # System library imports.
19 # System library imports.
20 import zmq
20 import zmq
21 from zmq.log import handlers
21 from zmq.log import handlers
22 # Local imports.
22 # Local imports.
23 from IPython.core.ultratb import FormattedTB
23 from IPython.core.ultratb import FormattedTB
24 from IPython.external.argparse import ArgumentParser
24 from IPython.external.argparse import ArgumentParser
25 from IPython.zmq.log import logger
25 from IPython.zmq.log import logger
26
26
27 def split_ports(s, n):
27 def split_ports(s, n):
28 """Parser helper for multiport strings"""
28 """Parser helper for multiport strings"""
29 if not s:
29 if not s:
30 return tuple([0]*n)
30 return tuple([0]*n)
31 ports = map(int, s.split(','))
31 ports = map(int, s.split(','))
32 if len(ports) != n:
32 if len(ports) != n:
33 raise ValueError
33 raise ValueError
34 return ports
34 return ports
35
35
36 def select_random_ports(n):
36 def select_random_ports(n):
37 """Selects and return n random ports that are available."""
37 """Selects and return n random ports that are available."""
38 ports = []
38 ports = []
39 for i in xrange(n):
39 for i in xrange(n):
40 sock = socket.socket()
40 sock = socket.socket()
41 sock.bind(('', 0))
41 sock.bind(('', 0))
42 ports.append(sock)
42 ports.append(sock)
43 for i, sock in enumerate(ports):
43 for i, sock in enumerate(ports):
44 port = sock.getsockname()[1]
44 port = sock.getsockname()[1]
45 sock.close()
45 sock.close()
46 ports[i] = port
46 ports[i] = port
47 return ports
47 return ports
48
48
49 def parse_url(args):
49 def parse_url(args):
50 """Ensure args.url contains full transport://interface:port"""
50 """Ensure args.url contains full transport://interface:port"""
51 if args.url:
51 if args.url:
52 iface = args.url.split('://',1)
52 iface = args.url.split('://',1)
53 if len(args) == 2:
53 if len(args) == 2:
54 args.transport,iface = iface
54 args.transport,iface = iface
55 iface = iface.split(':')
55 iface = iface.split(':')
56 args.ip = iface[0]
56 args.ip = iface[0]
57 if iface[1]:
57 if iface[1]:
58 args.regport = iface[1]
58 args.regport = iface[1]
59 args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
59 args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
60
60
61 def signal_children(children):
61 def signal_children(children):
62 """Relay interupt/term signals to children, for more solid process cleanup."""
62 """Relay interupt/term signals to children, for more solid process cleanup."""
63 def terminate_children(sig, frame):
63 def terminate_children(sig, frame):
64 for child in children:
64 for child in children:
65 child.terminate()
65 child.terminate()
66 # sys.exit(sig)
66 # sys.exit(sig)
67 for sig in (SIGINT, SIGABRT, SIGTERM):
67 for sig in (SIGINT, SIGABRT, SIGTERM):
68 signal(sig, terminate_children)
68 signal(sig, terminate_children)
69
69
70 def generate_exec_key(keyfile):
70 def generate_exec_key(keyfile):
71 import uuid
71 import uuid
72 newkey = str(uuid.uuid4())
72 newkey = str(uuid.uuid4())
73 with open(keyfile, 'w') as f:
73 with open(keyfile, 'w') as f:
74 # f.write('ipython-key ')
74 # f.write('ipython-key ')
75 f.write(newkey)
75 f.write(newkey)
76 # set user-only RW permissions (0600)
76 # set user-only RW permissions (0600)
77 # this will have no effect on Windows
77 # this will have no effect on Windows
78 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
78 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
79
79
80
80
81 def make_base_argument_parser():
81 def make_base_argument_parser():
82 """ Creates an ArgumentParser for the generic arguments supported by all
82 """ Creates an ArgumentParser for the generic arguments supported by all
83 ipcluster entry points.
83 ipcluster entry points.
84 """
84 """
85 parser = ArgumentParser()
85 parser = ArgumentParser()
86 parser.add_argument('--ip', type=str, default='127.0.0.1',
86 parser.add_argument('--ip', type=str, default='127.0.0.1',
87 help='set the controller\'s IP address [default: local]')
87 help='set the controller\'s IP address [default: local]')
88 parser.add_argument('--transport', type=str, default='tcp',
88 parser.add_argument('--transport', type=str, default='tcp',
89 help='set the transport to use [default: tcp]')
89 help='set the transport to use [default: tcp]')
90 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
90 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
91 help='set the XREP port for registration [default: 10101]')
91 help='set the XREP port for registration [default: 10101]')
92 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
92 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
93 help='set the PUB port for logging [default: 10201]')
93 help='set the PUB port for logging [default: 10201]')
94 parser.add_argument('--loglevel', type=int, metavar='LEVEL', default=logging.DEBUG,
94 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG,
95 help='set the log level [default: DEBUG]')
95 help='set the log level [default: DEBUG]')
96 parser.add_argument('--ident', type=str,
96 parser.add_argument('--ident', type=str,
97 help='set the ZMQ identity [default: random]')
97 help='set the ZMQ identity [default: random]')
98 parser.add_argument('--packer', type=str, default='json',
98 parser.add_argument('--packer', type=str, default='json',
99 choices=['json','pickle'],
99 choices=['json','pickle'],
100 help='set the message format method [default: json]')
100 help='set the message format method [default: json]')
101 parser.add_argument('--url', type=str,
101 parser.add_argument('--url', type=str,
102 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
102 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
103 parser.add_argument('--execkey', type=str,
103 parser.add_argument('--execkey', type=str,
104 help="File containing key for authenticating requests.")
104 help="File containing key for authenticating requests.")
105
105
106 return parser
106 return parser
107
107
108
108
109 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
109 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
110 try:
111 loglevel = int(loglevel)
112 except ValueError:
113 if isinstance(loglevel, str):
114 loglevel = getattr(logging, loglevel)
110 lsock = context.socket(zmq.PUB)
115 lsock = context.socket(zmq.PUB)
111 lsock.connect(iface)
116 lsock.connect(iface)
112 handler = handlers.PUBHandler(lsock)
117 handler = handlers.PUBHandler(lsock)
113 handler.setLevel(loglevel)
118 handler.setLevel(loglevel)
114 handler.root_topic = root
119 handler.root_topic = root
115 logger.addHandler(handler)
120 logger.addHandler(handler)
116 No newline at end of file
121
@@ -1,837 +1,889 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 from datetime import datetime
19 from datetime import datetime
20 import time
20 import time
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24
24
25 # internal:
25 # internal:
26 from IPython.zmq.log import logger # a Logger object
26 from IPython.zmq.log import logger # a Logger object
27
27
28 from streamsession import Message, wrap_exception, ISO8601
28 from streamsession import Message, wrap_exception, ISO8601
29
29
30 try:
30 try:
31 from pymongo.binary import Binary
31 from pymongo.binary import Binary
32 except ImportError:
32 except ImportError:
33 MongoDB=None
33 MongoDB=None
34 else:
34 else:
35 from mongodb import MongoDB
35 from mongodb import MongoDB
36
36
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Code
38 # Code
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40
40
41 def _passer(*args, **kwargs):
41 def _passer(*args, **kwargs):
42 return
42 return
43
43
44 def _printer(*args, **kwargs):
45 print (args)
46 print (kwargs)
47
44 def init_record(msg):
48 def init_record(msg):
45 """return an empty TaskRecord dict, with all keys initialized with None."""
49 """return an empty TaskRecord dict, with all keys initialized with None."""
46 header = msg['header']
50 header = msg['header']
47 return {
51 return {
48 'msg_id' : header['msg_id'],
52 'msg_id' : header['msg_id'],
49 'header' : header,
53 'header' : header,
50 'content': msg['content'],
54 'content': msg['content'],
51 'buffers': msg['buffers'],
55 'buffers': msg['buffers'],
52 'submitted': datetime.strptime(header['date'], ISO8601),
56 'submitted': datetime.strptime(header['date'], ISO8601),
53 'client_uuid' : None,
57 'client_uuid' : None,
54 'engine_uuid' : None,
58 'engine_uuid' : None,
55 'started': None,
59 'started': None,
56 'completed': None,
60 'completed': None,
57 'resubmitted': None,
61 'resubmitted': None,
58 'result_header' : None,
62 'result_header' : None,
59 'result_content' : None,
63 'result_content' : None,
60 'result_buffers' : None,
64 'result_buffers' : None,
61 'queue' : None
65 'queue' : None,
66 'pyin' : None,
67 'pyout': None,
68 'pyerr': None,
69 'stdout': '',
70 'stderr': '',
62 }
71 }
63
72
64
73
65 class EngineConnector(object):
74 class EngineConnector(object):
66 """A simple object for accessing the various zmq connections of an object.
75 """A simple object for accessing the various zmq connections of an object.
67 Attributes are:
76 Attributes are:
68 id (int): engine ID
77 id (int): engine ID
69 uuid (str): uuid (unused?)
78 uuid (str): uuid (unused?)
70 queue (str): identity of queue's XREQ socket
79 queue (str): identity of queue's XREQ socket
71 registration (str): identity of registration XREQ socket
80 registration (str): identity of registration XREQ socket
72 heartbeat (str): identity of heartbeat XREQ socket
81 heartbeat (str): identity of heartbeat XREQ socket
73 """
82 """
74 id=0
83 id=0
75 queue=None
84 queue=None
76 control=None
85 control=None
77 registration=None
86 registration=None
78 heartbeat=None
87 heartbeat=None
79 pending=None
88 pending=None
80
89
81 def __init__(self, id, queue, registration, control, heartbeat=None):
90 def __init__(self, id, queue, registration, control, heartbeat=None):
82 logger.info("engine::Engine Connected: %i"%id)
91 logger.info("engine::Engine Connected: %i"%id)
83 self.id = id
92 self.id = id
84 self.queue = queue
93 self.queue = queue
85 self.registration = registration
94 self.registration = registration
86 self.control = control
95 self.control = control
87 self.heartbeat = heartbeat
96 self.heartbeat = heartbeat
88
97
89 class Hub(object):
98 class Hub(object):
90 """The IPython Controller Hub with 0MQ connections
99 """The IPython Controller Hub with 0MQ connections
91
100
92 Parameters
101 Parameters
93 ==========
102 ==========
94 loop: zmq IOLoop instance
103 loop: zmq IOLoop instance
95 session: StreamSession object
104 session: StreamSession object
96 <removed> context: zmq context for creating new connections (?)
105 <removed> context: zmq context for creating new connections (?)
97 queue: ZMQStream for monitoring the command queue (SUB)
106 queue: ZMQStream for monitoring the command queue (SUB)
98 registrar: ZMQStream for engine registration requests (XREP)
107 registrar: ZMQStream for engine registration requests (XREP)
99 heartbeat: HeartMonitor object checking the pulse of the engines
108 heartbeat: HeartMonitor object checking the pulse of the engines
100 clientele: ZMQStream for client connections (XREP)
109 clientele: ZMQStream for client connections (XREP)
101 not used for jobs, only query/control commands
110 not used for jobs, only query/control commands
102 notifier: ZMQStream for broadcasting engine registration changes (PUB)
111 notifier: ZMQStream for broadcasting engine registration changes (PUB)
103 db: connection to db for out of memory logging of commands
112 db: connection to db for out of memory logging of commands
104 NotImplemented
113 NotImplemented
105 engine_addrs: dict of zmq connection information for engines to connect
114 engine_addrs: dict of zmq connection information for engines to connect
106 to the queues.
115 to the queues.
107 client_addrs: dict of zmq connection information for engines to connect
116 client_addrs: dict of zmq connection information for engines to connect
108 to the queues.
117 to the queues.
109 """
118 """
110 # internal data structures:
119 # internal data structures:
111 ids=None # engine IDs
120 ids=None # engine IDs
112 keytable=None
121 keytable=None
113 engines=None
122 engines=None
114 clients=None
123 clients=None
115 hearts=None
124 hearts=None
116 pending=None
125 pending=None
117 results=None
126 results=None
118 tasks=None
127 tasks=None
119 completed=None
128 completed=None
120 mia=None
129 mia=None
121 incoming_registrations=None
130 incoming_registrations=None
122 registration_timeout=None
131 registration_timeout=None
123
132
124 #objects from constructor:
133 #objects from constructor:
125 loop=None
134 loop=None
126 registrar=None
135 registrar=None
127 clientelle=None
136 clientelle=None
128 queue=None
137 queue=None
129 heartbeat=None
138 heartbeat=None
130 notifier=None
139 notifier=None
131 db=None
140 db=None
132 client_addr=None
141 client_addr=None
133 engine_addrs=None
142 engine_addrs=None
134
143
135
144
136 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
145 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
137 """
146 """
138 # universal:
147 # universal:
139 loop: IOLoop for creating future connections
148 loop: IOLoop for creating future connections
140 session: streamsession for sending serialized data
149 session: streamsession for sending serialized data
141 # engine:
150 # engine:
142 queue: ZMQStream for monitoring queue messages
151 queue: ZMQStream for monitoring queue messages
143 registrar: ZMQStream for engine registration
152 registrar: ZMQStream for engine registration
144 heartbeat: HeartMonitor object for tracking engines
153 heartbeat: HeartMonitor object for tracking engines
145 # client:
154 # client:
146 clientele: ZMQStream for client connections
155 clientele: ZMQStream for client connections
147 # extra:
156 # extra:
148 db: ZMQStream for db connection (NotImplemented)
157 db: ZMQStream for db connection (NotImplemented)
149 engine_addrs: zmq address/protocol dict for engine connections
158 engine_addrs: zmq address/protocol dict for engine connections
150 client_addrs: zmq address/protocol dict for client connections
159 client_addrs: zmq address/protocol dict for client connections
151 """
160 """
152 self.ids = set()
161 self.ids = set()
153 self.keytable={}
162 self.keytable={}
154 self.incoming_registrations={}
163 self.incoming_registrations={}
155 self.engines = {}
164 self.engines = {}
156 self.by_ident = {}
165 self.by_ident = {}
157 self.clients = {}
166 self.clients = {}
158 self.hearts = {}
167 self.hearts = {}
159 # self.mia = set()
168 # self.mia = set()
160
169
161 # self.sockets = {}
170 # self.sockets = {}
162 self.loop = loop
171 self.loop = loop
163 self.session = session
172 self.session = session
164 self.registrar = registrar
173 self.registrar = registrar
165 self.clientele = clientele
174 self.clientele = clientele
166 self.queue = queue
175 self.queue = queue
167 self.heartbeat = heartbeat
176 self.heartbeat = heartbeat
168 self.notifier = notifier
177 self.notifier = notifier
169 self.db = db
178 self.db = db
170
179
171 # validate connection dicts:
180 # validate connection dicts:
172 self.client_addrs = client_addrs
181 self.client_addrs = client_addrs
173 assert isinstance(client_addrs['queue'], str)
182 assert isinstance(client_addrs['queue'], str)
174 assert isinstance(client_addrs['control'], str)
183 assert isinstance(client_addrs['control'], str)
175 # self.hb_addrs = hb_addrs
184 # self.hb_addrs = hb_addrs
176 self.engine_addrs = engine_addrs
185 self.engine_addrs = engine_addrs
177 assert isinstance(engine_addrs['queue'], str)
186 assert isinstance(engine_addrs['queue'], str)
178 assert isinstance(client_addrs['control'], str)
187 assert isinstance(client_addrs['control'], str)
179 assert len(engine_addrs['heartbeat']) == 2
188 assert len(engine_addrs['heartbeat']) == 2
180
189
181 # register our callbacks
190 # register our callbacks
182 self.registrar.on_recv(self.dispatch_register_request)
191 self.registrar.on_recv(self.dispatch_register_request)
183 self.clientele.on_recv(self.dispatch_client_msg)
192 self.clientele.on_recv(self.dispatch_client_msg)
184 self.queue.on_recv(self.dispatch_queue_traffic)
193 self.queue.on_recv(self.dispatch_monitor_traffic)
185
194
186 if heartbeat is not None:
195 if heartbeat is not None:
187 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
196 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
188 heartbeat.add_new_heart_handler(self.handle_new_heart)
197 heartbeat.add_new_heart_handler(self.handle_new_heart)
189
198
190 self.queue_handlers = { 'in' : self.save_queue_request,
199 self.monitor_handlers = { 'in' : self.save_queue_request,
191 'out': self.save_queue_result,
200 'out': self.save_queue_result,
192 'intask': self.save_task_request,
201 'intask': self.save_task_request,
193 'outtask': self.save_task_result,
202 'outtask': self.save_task_result,
194 'tracktask': self.save_task_destination,
203 'tracktask': self.save_task_destination,
195 'incontrol': _passer,
204 'incontrol': _passer,
196 'outcontrol': _passer,
205 'outcontrol': _passer,
206 'iopub': self.save_iopub_message,
197 }
207 }
198
208
199 self.client_handlers = {'queue_request': self.queue_status,
209 self.client_handlers = {'queue_request': self.queue_status,
200 'result_request': self.get_results,
210 'result_request': self.get_results,
201 'purge_request': self.purge_results,
211 'purge_request': self.purge_results,
202 'load_request': self.check_load,
212 'load_request': self.check_load,
203 'resubmit_request': self.resubmit_task,
213 'resubmit_request': self.resubmit_task,
204 'shutdown_request': self.shutdown_request,
214 'shutdown_request': self.shutdown_request,
205 }
215 }
206
216
207 self.registrar_handlers = {'registration_request' : self.register_engine,
217 self.registrar_handlers = {'registration_request' : self.register_engine,
208 'unregistration_request' : self.unregister_engine,
218 'unregistration_request' : self.unregister_engine,
209 'connection_request': self.connection_request,
219 'connection_request': self.connection_request,
210 }
220 }
211 self.registration_timeout = max(5000, 2*self.heartbeat.period)
221 self.registration_timeout = max(5000, 2*self.heartbeat.period)
212 # this is the stuff that will move to DB:
222 # this is the stuff that will move to DB:
213 # self.results = {} # completed results
223 # self.results = {} # completed results
214 self.pending = set() # pending messages, keyed by msg_id
224 self.pending = set() # pending messages, keyed by msg_id
215 self.queues = {} # pending msg_ids keyed by engine_id
225 self.queues = {} # pending msg_ids keyed by engine_id
216 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
226 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
217 self.completed = {} # completed msg_ids keyed by engine_id
227 self.completed = {} # completed msg_ids keyed by engine_id
218 self.all_completed = set()
228 self.all_completed = set()
219
229
220 logger.info("controller::created controller")
230 logger.info("controller::created controller")
221
231
222 def _new_id(self):
232 def _new_id(self):
223 """gemerate a new ID"""
233 """gemerate a new ID"""
224 newid = 0
234 newid = 0
225 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
235 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
226 # print newid, self.ids, self.incoming_registrations
236 # print newid, self.ids, self.incoming_registrations
227 while newid in self.ids or newid in incoming:
237 while newid in self.ids or newid in incoming:
228 newid += 1
238 newid += 1
229 return newid
239 return newid
230
240
231 #-----------------------------------------------------------------------------
241 #-----------------------------------------------------------------------------
232 # message validation
242 # message validation
233 #-----------------------------------------------------------------------------
243 #-----------------------------------------------------------------------------
234
244
235 def _validate_targets(self, targets):
245 def _validate_targets(self, targets):
236 """turn any valid targets argument into a list of integer ids"""
246 """turn any valid targets argument into a list of integer ids"""
237 if targets is None:
247 if targets is None:
238 # default to all
248 # default to all
239 targets = self.ids
249 targets = self.ids
240
250
241 if isinstance(targets, (int,str,unicode)):
251 if isinstance(targets, (int,str,unicode)):
242 # only one target specified
252 # only one target specified
243 targets = [targets]
253 targets = [targets]
244 _targets = []
254 _targets = []
245 for t in targets:
255 for t in targets:
246 # map raw identities to ids
256 # map raw identities to ids
247 if isinstance(t, (str,unicode)):
257 if isinstance(t, (str,unicode)):
248 t = self.by_ident.get(t, t)
258 t = self.by_ident.get(t, t)
249 _targets.append(t)
259 _targets.append(t)
250 targets = _targets
260 targets = _targets
251 bad_targets = [ t for t in targets if t not in self.ids ]
261 bad_targets = [ t for t in targets if t not in self.ids ]
252 if bad_targets:
262 if bad_targets:
253 raise IndexError("No Such Engine: %r"%bad_targets)
263 raise IndexError("No Such Engine: %r"%bad_targets)
254 if not targets:
264 if not targets:
255 raise IndexError("No Engines Registered")
265 raise IndexError("No Engines Registered")
256 return targets
266 return targets
257
267
258 def _validate_client_msg(self, msg):
268 def _validate_client_msg(self, msg):
259 """validates and unpacks headers of a message. Returns False if invalid,
269 """validates and unpacks headers of a message. Returns False if invalid,
260 (ident, header, parent, content)"""
270 (ident, header, parent, content)"""
261 client_id = msg[0]
271 client_id = msg[0]
262 try:
272 try:
263 msg = self.session.unpack_message(msg[1:], content=True)
273 msg = self.session.unpack_message(msg[1:], content=True)
264 except:
274 except:
265 logger.error("client::Invalid Message %s"%msg)
275 logger.error("client::Invalid Message %s"%msg, exc_info=True)
266 return False
276 return False
267
277
268 msg_type = msg.get('msg_type', None)
278 msg_type = msg.get('msg_type', None)
269 if msg_type is None:
279 if msg_type is None:
270 return False
280 return False
271 header = msg.get('header')
281 header = msg.get('header')
272 # session doesn't handle split content for now:
282 # session doesn't handle split content for now:
273 return client_id, msg
283 return client_id, msg
274
284
275
285
276 #-----------------------------------------------------------------------------
286 #-----------------------------------------------------------------------------
277 # dispatch methods (1 per stream)
287 # dispatch methods (1 per stream)
278 #-----------------------------------------------------------------------------
288 #-----------------------------------------------------------------------------
279
289
280 def dispatch_register_request(self, msg):
290 def dispatch_register_request(self, msg):
281 """"""
291 """"""
282 logger.debug("registration::dispatch_register_request(%s)"%msg)
292 logger.debug("registration::dispatch_register_request(%s)"%msg)
283 idents,msg = self.session.feed_identities(msg)
293 idents,msg = self.session.feed_identities(msg)
284 if not idents:
294 if not idents:
285 logger.error("Bad Queue Message: %s"%msg, exc_info=True)
295 logger.error("Bad Queue Message: %s"%msg, exc_info=True)
286 return
296 return
287 try:
297 try:
288 msg = self.session.unpack_message(msg,content=True)
298 msg = self.session.unpack_message(msg,content=True)
289 except:
299 except:
290 logger.error("registration::got bad registration message: %s"%msg, exc_info=True)
300 logger.error("registration::got bad registration message: %s"%msg, exc_info=True)
291 return
301 return
292
302
293 msg_type = msg['msg_type']
303 msg_type = msg['msg_type']
294 content = msg['content']
304 content = msg['content']
295
305
296 handler = self.registrar_handlers.get(msg_type, None)
306 handler = self.registrar_handlers.get(msg_type, None)
297 if handler is None:
307 if handler is None:
298 logger.error("registration::got bad registration message: %s"%msg)
308 logger.error("registration::got bad registration message: %s"%msg)
299 else:
309 else:
300 handler(idents, msg)
310 handler(idents, msg)
301
311
302 def dispatch_queue_traffic(self, msg):
312 def dispatch_monitor_traffic(self, msg):
303 """all ME and Task queue messages come through here"""
313 """all ME and Task queue messages come through here, as well as
304 logger.debug("queue traffic: %s"%msg[:2])
314 IOPub traffic."""
315 logger.debug("monitor traffic: %s"%msg[:2])
305 switch = msg[0]
316 switch = msg[0]
306 idents, msg = self.session.feed_identities(msg[1:])
317 idents, msg = self.session.feed_identities(msg[1:])
307 if not idents:
318 if not idents:
308 logger.error("Bad Queue Message: %s"%msg)
319 logger.error("Bad Monitor Message: %s"%msg)
309 return
320 return
310 handler = self.queue_handlers.get(switch, None)
321 handler = self.monitor_handlers.get(switch, None)
311 if handler is not None:
322 if handler is not None:
312 handler(idents, msg)
323 handler(idents, msg)
313 else:
324 else:
314 logger.error("Invalid message topic: %s"%switch)
325 logger.error("Invalid monitor topic: %s"%switch)
315
326
316
327
317 def dispatch_client_msg(self, msg):
328 def dispatch_client_msg(self, msg):
318 """Route messages from clients"""
329 """Route messages from clients"""
319 idents, msg = self.session.feed_identities(msg)
330 idents, msg = self.session.feed_identities(msg)
320 if not idents:
331 if not idents:
321 logger.error("Bad Client Message: %s"%msg)
332 logger.error("Bad Client Message: %s"%msg)
322 return
333 return
323 client_id = idents[0]
334 client_id = idents[0]
324 try:
335 try:
325 msg = self.session.unpack_message(msg, content=True)
336 msg = self.session.unpack_message(msg, content=True)
326 except:
337 except:
327 content = wrap_exception()
338 content = wrap_exception()
328 logger.error("Bad Client Message: %s"%msg, exc_info=True)
339 logger.error("Bad Client Message: %s"%msg, exc_info=True)
329 self.session.send(self.clientele, "controller_error", ident=client_id,
340 self.session.send(self.clientele, "controller_error", ident=client_id,
330 content=content)
341 content=content)
331 return
342 return
332
343
333 # print client_id, header, parent, content
344 # print client_id, header, parent, content
334 #switch on message type:
345 #switch on message type:
335 msg_type = msg['msg_type']
346 msg_type = msg['msg_type']
336 logger.info("client:: client %s requested %s"%(client_id, msg_type))
347 logger.info("client:: client %s requested %s"%(client_id, msg_type))
337 handler = self.client_handlers.get(msg_type, None)
348 handler = self.client_handlers.get(msg_type, None)
338 try:
349 try:
339 assert handler is not None, "Bad Message Type: %s"%msg_type
350 assert handler is not None, "Bad Message Type: %s"%msg_type
340 except:
351 except:
341 content = wrap_exception()
352 content = wrap_exception()
342 logger.error("Bad Message Type: %s"%msg_type, exc_info=True)
353 logger.error("Bad Message Type: %s"%msg_type, exc_info=True)
343 self.session.send(self.clientele, "controller_error", ident=client_id,
354 self.session.send(self.clientele, "controller_error", ident=client_id,
344 content=content)
355 content=content)
345 return
356 return
346 else:
357 else:
347 handler(client_id, msg)
358 handler(client_id, msg)
348
359
349 def dispatch_db(self, msg):
360 def dispatch_db(self, msg):
350 """"""
361 """"""
351 raise NotImplementedError
362 raise NotImplementedError
352
363
353 #---------------------------------------------------------------------------
364 #---------------------------------------------------------------------------
354 # handler methods (1 per event)
365 # handler methods (1 per event)
355 #---------------------------------------------------------------------------
366 #---------------------------------------------------------------------------
356
367
357 #----------------------- Heartbeat --------------------------------------
368 #----------------------- Heartbeat --------------------------------------
358
369
359 def handle_new_heart(self, heart):
370 def handle_new_heart(self, heart):
360 """handler to attach to heartbeater.
371 """handler to attach to heartbeater.
361 Called when a new heart starts to beat.
372 Called when a new heart starts to beat.
362 Triggers completion of registration."""
373 Triggers completion of registration."""
363 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
374 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
364 if heart not in self.incoming_registrations:
375 if heart not in self.incoming_registrations:
365 logger.info("heartbeat::ignoring new heart: %r"%heart)
376 logger.info("heartbeat::ignoring new heart: %r"%heart)
366 else:
377 else:
367 self.finish_registration(heart)
378 self.finish_registration(heart)
368
379
369
380
370 def handle_heart_failure(self, heart):
381 def handle_heart_failure(self, heart):
371 """handler to attach to heartbeater.
382 """handler to attach to heartbeater.
372 called when a previously registered heart fails to respond to beat request.
383 called when a previously registered heart fails to respond to beat request.
373 triggers unregistration"""
384 triggers unregistration"""
374 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
385 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
375 eid = self.hearts.get(heart, None)
386 eid = self.hearts.get(heart, None)
376 queue = self.engines[eid].queue
387 queue = self.engines[eid].queue
377 if eid is None:
388 if eid is None:
378 logger.info("heartbeat::ignoring heart failure %r"%heart)
389 logger.info("heartbeat::ignoring heart failure %r"%heart)
379 else:
390 else:
380 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
391 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
381
392
382 #----------------------- MUX Queue Traffic ------------------------------
393 #----------------------- MUX Queue Traffic ------------------------------
383
394
384 def save_queue_request(self, idents, msg):
395 def save_queue_request(self, idents, msg):
385 if len(idents) < 2:
396 if len(idents) < 2:
386 logger.error("invalid identity prefix: %s"%idents)
397 logger.error("invalid identity prefix: %s"%idents)
387 return
398 return
388 queue_id, client_id = idents[:2]
399 queue_id, client_id = idents[:2]
389 try:
400 try:
390 msg = self.session.unpack_message(msg, content=False)
401 msg = self.session.unpack_message(msg, content=False)
391 except:
402 except:
392 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
403 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
393 return
404 return
394
405
395 eid = self.by_ident.get(queue_id, None)
406 eid = self.by_ident.get(queue_id, None)
396 if eid is None:
407 if eid is None:
397 logger.error("queue::target %r not registered"%queue_id)
408 logger.error("queue::target %r not registered"%queue_id)
398 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
409 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
399 return
410 return
400
411
401 header = msg['header']
412 header = msg['header']
402 msg_id = header['msg_id']
413 msg_id = header['msg_id']
403 record = init_record(msg)
414 record = init_record(msg)
404 record['engine_uuid'] = queue_id
415 record['engine_uuid'] = queue_id
405 record['client_uuid'] = client_id
416 record['client_uuid'] = client_id
406 record['queue'] = 'mux'
417 record['queue'] = 'mux'
407 if MongoDB is not None and isinstance(self.db, MongoDB):
418 if MongoDB is not None and isinstance(self.db, MongoDB):
408 record['buffers'] = map(Binary, record['buffers'])
419 record['buffers'] = map(Binary, record['buffers'])
409 self.pending.add(msg_id)
420 self.pending.add(msg_id)
410 self.queues[eid].append(msg_id)
421 self.queues[eid].append(msg_id)
411 self.db.add_record(msg_id, record)
422 self.db.add_record(msg_id, record)
412
423
413 def save_queue_result(self, idents, msg):
424 def save_queue_result(self, idents, msg):
414 if len(idents) < 2:
425 if len(idents) < 2:
415 logger.error("invalid identity prefix: %s"%idents)
426 logger.error("invalid identity prefix: %s"%idents)
416 return
427 return
417
428
418 client_id, queue_id = idents[:2]
429 client_id, queue_id = idents[:2]
419 try:
430 try:
420 msg = self.session.unpack_message(msg, content=False)
431 msg = self.session.unpack_message(msg, content=False)
421 except:
432 except:
422 logger.error("queue::engine %r sent invalid message to %r: %s"%(
433 logger.error("queue::engine %r sent invalid message to %r: %s"%(
423 queue_id,client_id, msg), exc_info=True)
434 queue_id,client_id, msg), exc_info=True)
424 return
435 return
425
436
426 eid = self.by_ident.get(queue_id, None)
437 eid = self.by_ident.get(queue_id, None)
427 if eid is None:
438 if eid is None:
428 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
439 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
429 logger.debug("queue:: %s"%msg[2:])
440 logger.debug("queue:: %s"%msg[2:])
430 return
441 return
431
442
432 parent = msg['parent_header']
443 parent = msg['parent_header']
433 if not parent:
444 if not parent:
434 return
445 return
435 msg_id = parent['msg_id']
446 msg_id = parent['msg_id']
436 if msg_id in self.pending:
447 if msg_id in self.pending:
437 self.pending.remove(msg_id)
448 self.pending.remove(msg_id)
438 self.all_completed.add(msg_id)
449 self.all_completed.add(msg_id)
439 self.queues[eid].remove(msg_id)
450 self.queues[eid].remove(msg_id)
440 self.completed[eid].append(msg_id)
451 self.completed[eid].append(msg_id)
441 rheader = msg['header']
452 rheader = msg['header']
442 completed = datetime.strptime(rheader['date'], ISO8601)
453 completed = datetime.strptime(rheader['date'], ISO8601)
443 started = rheader.get('started', None)
454 started = rheader.get('started', None)
444 if started is not None:
455 if started is not None:
445 started = datetime.strptime(started, ISO8601)
456 started = datetime.strptime(started, ISO8601)
446 result = {
457 result = {
447 'result_header' : rheader,
458 'result_header' : rheader,
448 'result_content': msg['content'],
459 'result_content': msg['content'],
449 'started' : started,
460 'started' : started,
450 'completed' : completed
461 'completed' : completed
451 }
462 }
452 if MongoDB is not None and isinstance(self.db, MongoDB):
463 if MongoDB is not None and isinstance(self.db, MongoDB):
453 result['result_buffers'] = map(Binary, msg['buffers'])
464 result['result_buffers'] = map(Binary, msg['buffers'])
454 else:
465 else:
455 result['result_buffers'] = msg['buffers']
466 result['result_buffers'] = msg['buffers']
456 self.db.update_record(msg_id, result)
467 self.db.update_record(msg_id, result)
457 else:
468 else:
458 logger.debug("queue:: unknown msg finished %s"%msg_id)
469 logger.debug("queue:: unknown msg finished %s"%msg_id)
459
470
460 #--------------------- Task Queue Traffic ------------------------------
471 #--------------------- Task Queue Traffic ------------------------------
461
472
462 def save_task_request(self, idents, msg):
473 def save_task_request(self, idents, msg):
463 """Save the submission of a task."""
474 """Save the submission of a task."""
464 client_id = idents[0]
475 client_id = idents[0]
465
476
466 try:
477 try:
467 msg = self.session.unpack_message(msg, content=False)
478 msg = self.session.unpack_message(msg, content=False)
468 except:
479 except:
469 logger.error("task::client %r sent invalid task message: %s"%(
480 logger.error("task::client %r sent invalid task message: %s"%(
470 client_id, msg), exc_info=True)
481 client_id, msg), exc_info=True)
471 return
482 return
472 record = init_record(msg)
483 record = init_record(msg)
473 if MongoDB is not None and isinstance(self.db, MongoDB):
484 if MongoDB is not None and isinstance(self.db, MongoDB):
474 record['buffers'] = map(Binary, record['buffers'])
485 record['buffers'] = map(Binary, record['buffers'])
475 record['client_uuid'] = client_id
486 record['client_uuid'] = client_id
476 record['queue'] = 'task'
487 record['queue'] = 'task'
477 header = msg['header']
488 header = msg['header']
478 msg_id = header['msg_id']
489 msg_id = header['msg_id']
479 self.pending.add(msg_id)
490 self.pending.add(msg_id)
480 self.db.add_record(msg_id, record)
491 self.db.add_record(msg_id, record)
481
492
482 def save_task_result(self, idents, msg):
493 def save_task_result(self, idents, msg):
483 """save the result of a completed task."""
494 """save the result of a completed task."""
484 client_id = idents[0]
495 client_id = idents[0]
485 try:
496 try:
486 msg = self.session.unpack_message(msg, content=False)
497 msg = self.session.unpack_message(msg, content=False)
487 except:
498 except:
488 logger.error("task::invalid task result message send to %r: %s"%(
499 logger.error("task::invalid task result message send to %r: %s"%(
489 client_id, msg))
500 client_id, msg), exc_info=True)
490 raise
501 raise
491 return
502 return
492
503
493 parent = msg['parent_header']
504 parent = msg['parent_header']
494 if not parent:
505 if not parent:
495 # print msg
506 # print msg
496 logger.warn("Task %r had no parent!"%msg)
507 logger.warn("Task %r had no parent!"%msg)
497 return
508 return
498 msg_id = parent['msg_id']
509 msg_id = parent['msg_id']
499
510
500 header = msg['header']
511 header = msg['header']
501 engine_uuid = header.get('engine', None)
512 engine_uuid = header.get('engine', None)
502 eid = self.by_ident.get(engine_uuid, None)
513 eid = self.by_ident.get(engine_uuid, None)
503
514
504 if msg_id in self.pending:
515 if msg_id in self.pending:
505 self.pending.remove(msg_id)
516 self.pending.remove(msg_id)
506 self.all_completed.add(msg_id)
517 self.all_completed.add(msg_id)
507 if eid is not None:
518 if eid is not None:
508 self.completed[eid].append(msg_id)
519 self.completed[eid].append(msg_id)
509 if msg_id in self.tasks[eid]:
520 if msg_id in self.tasks[eid]:
510 self.tasks[eid].remove(msg_id)
521 self.tasks[eid].remove(msg_id)
511 completed = datetime.strptime(header['date'], ISO8601)
522 completed = datetime.strptime(header['date'], ISO8601)
512 started = header.get('started', None)
523 started = header.get('started', None)
513 if started is not None:
524 if started is not None:
514 started = datetime.strptime(started, ISO8601)
525 started = datetime.strptime(started, ISO8601)
515 result = {
526 result = {
516 'result_header' : header,
527 'result_header' : header,
517 'result_content': msg['content'],
528 'result_content': msg['content'],
518 'started' : started,
529 'started' : started,
519 'completed' : completed,
530 'completed' : completed,
520 'engine_uuid': engine_uuid
531 'engine_uuid': engine_uuid
521 }
532 }
522 if MongoDB is not None and isinstance(self.db, MongoDB):
533 if MongoDB is not None and isinstance(self.db, MongoDB):
523 result['result_buffers'] = map(Binary, msg['buffers'])
534 result['result_buffers'] = map(Binary, msg['buffers'])
524 else:
535 else:
525 result['result_buffers'] = msg['buffers']
536 result['result_buffers'] = msg['buffers']
526 self.db.update_record(msg_id, result)
537 self.db.update_record(msg_id, result)
527
538
528 else:
539 else:
529 logger.debug("task::unknown task %s finished"%msg_id)
540 logger.debug("task::unknown task %s finished"%msg_id)
530
541
531 def save_task_destination(self, idents, msg):
542 def save_task_destination(self, idents, msg):
532 try:
543 try:
533 msg = self.session.unpack_message(msg, content=True)
544 msg = self.session.unpack_message(msg, content=True)
534 except:
545 except:
535 logger.error("task::invalid task tracking message")
546 logger.error("task::invalid task tracking message", exc_info=True)
536 return
547 return
537 content = msg['content']
548 content = msg['content']
538 print (content)
549 print (content)
539 msg_id = content['msg_id']
550 msg_id = content['msg_id']
540 engine_uuid = content['engine_id']
551 engine_uuid = content['engine_id']
541 eid = self.by_ident[engine_uuid]
552 eid = self.by_ident[engine_uuid]
542
553
543 logger.info("task::task %s arrived on %s"%(msg_id, eid))
554 logger.info("task::task %s arrived on %s"%(msg_id, eid))
544 # if msg_id in self.mia:
555 # if msg_id in self.mia:
545 # self.mia.remove(msg_id)
556 # self.mia.remove(msg_id)
546 # else:
557 # else:
547 # logger.debug("task::task %s not listed as MIA?!"%(msg_id))
558 # logger.debug("task::task %s not listed as MIA?!"%(msg_id))
548
559
549 self.tasks[eid].append(msg_id)
560 self.tasks[eid].append(msg_id)
550 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
561 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
551 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
562 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
552
563
553 def mia_task_request(self, idents, msg):
564 def mia_task_request(self, idents, msg):
554 raise NotImplementedError
565 raise NotImplementedError
555 client_id = idents[0]
566 client_id = idents[0]
556 # content = dict(mia=self.mia,status='ok')
567 # content = dict(mia=self.mia,status='ok')
557 # self.session.send('mia_reply', content=content, idents=client_id)
568 # self.session.send('mia_reply', content=content, idents=client_id)
558
569
559
570
571 #--------------------- IOPub Traffic ------------------------------
572
573 def save_iopub_message(self, topics, msg):
574 """save an iopub message into the db"""
575 print (topics)
576 try:
577 msg = self.session.unpack_message(msg, content=True)
578 except:
579 logger.error("iopub::invalid IOPub message", exc_info=True)
580 return
581
582 parent = msg['parent_header']
583 msg_id = parent['msg_id']
584 msg_type = msg['msg_type']
585 content = msg['content']
586
587 # ensure msg_id is in db
588 try:
589 rec = self.db.get_record(msg_id)
590 except:
591 logger.error("iopub::IOPub message has invalid parent", exc_info=True)
592 return
593 # stream
594 d = {}
595 if msg_type == 'stream':
596 name = content['name']
597 s = rec[name] or ''
598 d[name] = s + content['data']
599
600 elif msg_type == 'pyerr':
601 d['pyerr'] = content
602 else:
603 d[msg_type] = content['data']
604
605 self.db.update_record(msg_id, d)
606
607
560
608
561 #-------------------------------------------------------------------------
609 #-------------------------------------------------------------------------
562 # Registration requests
610 # Registration requests
563 #-------------------------------------------------------------------------
611 #-------------------------------------------------------------------------
564
612
565 def connection_request(self, client_id, msg):
613 def connection_request(self, client_id, msg):
566 """Reply with connection addresses for clients."""
614 """Reply with connection addresses for clients."""
567 logger.info("client::client %s connected"%client_id)
615 logger.info("client::client %s connected"%client_id)
568 content = dict(status='ok')
616 content = dict(status='ok')
569 content.update(self.client_addrs)
617 content.update(self.client_addrs)
570 jsonable = {}
618 jsonable = {}
571 for k,v in self.keytable.iteritems():
619 for k,v in self.keytable.iteritems():
572 jsonable[str(k)] = v
620 jsonable[str(k)] = v
573 content['engines'] = jsonable
621 content['engines'] = jsonable
574 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
622 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
575
623
576 def register_engine(self, reg, msg):
624 def register_engine(self, reg, msg):
577 """Register a new engine."""
625 """Register a new engine."""
578 content = msg['content']
626 content = msg['content']
579 try:
627 try:
580 queue = content['queue']
628 queue = content['queue']
581 except KeyError:
629 except KeyError:
582 logger.error("registration::queue not specified")
630 logger.error("registration::queue not specified", exc_info=True)
583 return
631 return
584 heart = content.get('heartbeat', None)
632 heart = content.get('heartbeat', None)
585 """register a new engine, and create the socket(s) necessary"""
633 """register a new engine, and create the socket(s) necessary"""
586 eid = self._new_id()
634 eid = self._new_id()
587 # print (eid, queue, reg, heart)
635 # print (eid, queue, reg, heart)
588
636
589 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
637 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
590
638
591 content = dict(id=eid,status='ok')
639 content = dict(id=eid,status='ok')
592 content.update(self.engine_addrs)
640 content.update(self.engine_addrs)
593 # check if requesting available IDs:
641 # check if requesting available IDs:
594 if queue in self.by_ident:
642 if queue in self.by_ident:
595 try:
643 try:
596 raise KeyError("queue_id %r in use"%queue)
644 raise KeyError("queue_id %r in use"%queue)
597 except:
645 except:
598 content = wrap_exception()
646 content = wrap_exception()
599 elif heart in self.hearts: # need to check unique hearts?
647 elif heart in self.hearts: # need to check unique hearts?
600 try:
648 try:
601 raise KeyError("heart_id %r in use"%heart)
649 raise KeyError("heart_id %r in use"%heart)
602 except:
650 except:
603 content = wrap_exception()
651 content = wrap_exception()
604 else:
652 else:
605 for h, pack in self.incoming_registrations.iteritems():
653 for h, pack in self.incoming_registrations.iteritems():
606 if heart == h:
654 if heart == h:
607 try:
655 try:
608 raise KeyError("heart_id %r in use"%heart)
656 raise KeyError("heart_id %r in use"%heart)
609 except:
657 except:
610 content = wrap_exception()
658 content = wrap_exception()
611 break
659 break
612 elif queue == pack[1]:
660 elif queue == pack[1]:
613 try:
661 try:
614 raise KeyError("queue_id %r in use"%queue)
662 raise KeyError("queue_id %r in use"%queue)
615 except:
663 except:
616 content = wrap_exception()
664 content = wrap_exception()
617 break
665 break
618
666
619 msg = self.session.send(self.registrar, "registration_reply",
667 msg = self.session.send(self.registrar, "registration_reply",
620 content=content,
668 content=content,
621 ident=reg)
669 ident=reg)
622
670
623 if content['status'] == 'ok':
671 if content['status'] == 'ok':
624 if heart in self.heartbeat.hearts:
672 if heart in self.heartbeat.hearts:
625 # already beating
673 # already beating
626 self.incoming_registrations[heart] = (eid,queue,reg,None)
674 self.incoming_registrations[heart] = (eid,queue,reg,None)
627 self.finish_registration(heart)
675 self.finish_registration(heart)
628 else:
676 else:
629 purge = lambda : self._purge_stalled_registration(heart)
677 purge = lambda : self._purge_stalled_registration(heart)
630 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
678 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
631 dc.start()
679 dc.start()
632 self.incoming_registrations[heart] = (eid,queue,reg,dc)
680 self.incoming_registrations[heart] = (eid,queue,reg,dc)
633 else:
681 else:
634 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
682 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
635 return eid
683 return eid
636
684
637 def unregister_engine(self, ident, msg):
685 def unregister_engine(self, ident, msg):
638 """Unregister an engine that explicitly requested to leave."""
686 """Unregister an engine that explicitly requested to leave."""
639 try:
687 try:
640 eid = msg['content']['id']
688 eid = msg['content']['id']
641 except:
689 except:
642 logger.error("registration::bad engine id for unregistration: %s"%ident)
690 logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
643 return
691 return
644 logger.info("registration::unregister_engine(%s)"%eid)
692 logger.info("registration::unregister_engine(%s)"%eid)
645 content=dict(id=eid, queue=self.engines[eid].queue)
693 content=dict(id=eid, queue=self.engines[eid].queue)
646 self.ids.remove(eid)
694 self.ids.remove(eid)
647 self.keytable.pop(eid)
695 self.keytable.pop(eid)
648 ec = self.engines.pop(eid)
696 ec = self.engines.pop(eid)
649 self.hearts.pop(ec.heartbeat)
697 self.hearts.pop(ec.heartbeat)
650 self.by_ident.pop(ec.queue)
698 self.by_ident.pop(ec.queue)
651 self.completed.pop(eid)
699 self.completed.pop(eid)
652 for msg_id in self.queues.pop(eid):
700 for msg_id in self.queues.pop(eid):
653 msg = self.pending.remove(msg_id)
701 msg = self.pending.remove(msg_id)
654 ############## TODO: HANDLE IT ################
702 ############## TODO: HANDLE IT ################
655
703
656 if self.notifier:
704 if self.notifier:
657 self.session.send(self.notifier, "unregistration_notification", content=content)
705 self.session.send(self.notifier, "unregistration_notification", content=content)
658
706
659 def finish_registration(self, heart):
707 def finish_registration(self, heart):
660 """Second half of engine registration, called after our HeartMonitor
708 """Second half of engine registration, called after our HeartMonitor
661 has received a beat from the Engine's Heart."""
709 has received a beat from the Engine's Heart."""
662 try:
710 try:
663 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
711 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
664 except KeyError:
712 except KeyError:
665 logger.error("registration::tried to finish nonexistant registration")
713 logger.error("registration::tried to finish nonexistant registration", exc_info=True)
666 return
714 return
667 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
715 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
668 if purge is not None:
716 if purge is not None:
669 purge.stop()
717 purge.stop()
670 control = queue
718 control = queue
671 self.ids.add(eid)
719 self.ids.add(eid)
672 self.keytable[eid] = queue
720 self.keytable[eid] = queue
673 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
721 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
674 self.by_ident[queue] = eid
722 self.by_ident[queue] = eid
675 self.queues[eid] = list()
723 self.queues[eid] = list()
676 self.tasks[eid] = list()
724 self.tasks[eid] = list()
677 self.completed[eid] = list()
725 self.completed[eid] = list()
678 self.hearts[heart] = eid
726 self.hearts[heart] = eid
679 content = dict(id=eid, queue=self.engines[eid].queue)
727 content = dict(id=eid, queue=self.engines[eid].queue)
680 if self.notifier:
728 if self.notifier:
681 self.session.send(self.notifier, "registration_notification", content=content)
729 self.session.send(self.notifier, "registration_notification", content=content)
682
730
683 def _purge_stalled_registration(self, heart):
731 def _purge_stalled_registration(self, heart):
684 if heart in self.incoming_registrations:
732 if heart in self.incoming_registrations:
685 eid = self.incoming_registrations.pop(heart)[0]
733 eid = self.incoming_registrations.pop(heart)[0]
686 logger.info("registration::purging stalled registration: %i"%eid)
734 logger.info("registration::purging stalled registration: %i"%eid)
687 else:
735 else:
688 pass
736 pass
689
737
690 #-------------------------------------------------------------------------
738 #-------------------------------------------------------------------------
691 # Client Requests
739 # Client Requests
692 #-------------------------------------------------------------------------
740 #-------------------------------------------------------------------------
693
741
694 def shutdown_request(self, client_id, msg):
742 def shutdown_request(self, client_id, msg):
695 """handle shutdown request."""
743 """handle shutdown request."""
696 # s = self.context.socket(zmq.XREQ)
744 # s = self.context.socket(zmq.XREQ)
697 # s.connect(self.client_connections['mux'])
745 # s.connect(self.client_connections['mux'])
698 # time.sleep(0.1)
746 # time.sleep(0.1)
699 # for eid,ec in self.engines.iteritems():
747 # for eid,ec in self.engines.iteritems():
700 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
748 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
701 # time.sleep(1)
749 # time.sleep(1)
702 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
750 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
703 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
751 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
704 dc.start()
752 dc.start()
705
753
706 def _shutdown(self):
754 def _shutdown(self):
707 logger.info("controller::controller shutting down.")
755 logger.info("controller::controller shutting down.")
708 time.sleep(0.1)
756 time.sleep(0.1)
709 sys.exit(0)
757 sys.exit(0)
710
758
711
759
712 def check_load(self, client_id, msg):
760 def check_load(self, client_id, msg):
713 content = msg['content']
761 content = msg['content']
714 try:
762 try:
715 targets = content['targets']
763 targets = content['targets']
716 targets = self._validate_targets(targets)
764 targets = self._validate_targets(targets)
717 except:
765 except:
718 content = wrap_exception()
766 content = wrap_exception()
719 self.session.send(self.clientele, "controller_error",
767 self.session.send(self.clientele, "controller_error",
720 content=content, ident=client_id)
768 content=content, ident=client_id)
721 return
769 return
722
770
723 content = dict(status='ok')
771 content = dict(status='ok')
724 # loads = {}
772 # loads = {}
725 for t in targets:
773 for t in targets:
726 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
774 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
727 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
775 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
728
776
729
777
730 def queue_status(self, client_id, msg):
778 def queue_status(self, client_id, msg):
731 """Return the Queue status of one or more targets.
779 """Return the Queue status of one or more targets.
732 if verbose: return the msg_ids
780 if verbose: return the msg_ids
733 else: return len of each type.
781 else: return len of each type.
734 keys: queue (pending MUX jobs)
782 keys: queue (pending MUX jobs)
735 tasks (pending Task jobs)
783 tasks (pending Task jobs)
736 completed (finished jobs from both queues)"""
784 completed (finished jobs from both queues)"""
737 content = msg['content']
785 content = msg['content']
738 targets = content['targets']
786 targets = content['targets']
739 try:
787 try:
740 targets = self._validate_targets(targets)
788 targets = self._validate_targets(targets)
741 except:
789 except:
742 content = wrap_exception()
790 content = wrap_exception()
743 self.session.send(self.clientele, "controller_error",
791 self.session.send(self.clientele, "controller_error",
744 content=content, ident=client_id)
792 content=content, ident=client_id)
745 return
793 return
746 verbose = content.get('verbose', False)
794 verbose = content.get('verbose', False)
747 content = dict(status='ok')
795 content = dict(status='ok')
748 for t in targets:
796 for t in targets:
749 queue = self.queues[t]
797 queue = self.queues[t]
750 completed = self.completed[t]
798 completed = self.completed[t]
751 tasks = self.tasks[t]
799 tasks = self.tasks[t]
752 if not verbose:
800 if not verbose:
753 queue = len(queue)
801 queue = len(queue)
754 completed = len(completed)
802 completed = len(completed)
755 tasks = len(tasks)
803 tasks = len(tasks)
756 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
804 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
757 # pending
805 # pending
758 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
806 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
759
807
760 def purge_results(self, client_id, msg):
808 def purge_results(self, client_id, msg):
761 """Purge results from memory. This method is more valuable before we move
809 """Purge results from memory. This method is more valuable before we move
762 to a DB based message storage mechanism."""
810 to a DB based message storage mechanism."""
763 content = msg['content']
811 content = msg['content']
764 msg_ids = content.get('msg_ids', [])
812 msg_ids = content.get('msg_ids', [])
765 reply = dict(status='ok')
813 reply = dict(status='ok')
766 if msg_ids == 'all':
814 if msg_ids == 'all':
767 self.db.drop_matching_records(dict(completed={'$ne':None}))
815 self.db.drop_matching_records(dict(completed={'$ne':None}))
768 else:
816 else:
769 for msg_id in msg_ids:
817 for msg_id in msg_ids:
770 if msg_id in self.all_completed:
818 if msg_id in self.all_completed:
771 self.db.drop_record(msg_id)
819 self.db.drop_record(msg_id)
772 else:
820 else:
773 if msg_id in self.pending:
821 if msg_id in self.pending:
774 try:
822 try:
775 raise IndexError("msg pending: %r"%msg_id)
823 raise IndexError("msg pending: %r"%msg_id)
776 except:
824 except:
777 reply = wrap_exception()
825 reply = wrap_exception()
778 else:
826 else:
779 try:
827 try:
780 raise IndexError("No such msg: %r"%msg_id)
828 raise IndexError("No such msg: %r"%msg_id)
781 except:
829 except:
782 reply = wrap_exception()
830 reply = wrap_exception()
783 break
831 break
784 eids = content.get('engine_ids', [])
832 eids = content.get('engine_ids', [])
785 for eid in eids:
833 for eid in eids:
786 if eid not in self.engines:
834 if eid not in self.engines:
787 try:
835 try:
788 raise IndexError("No such engine: %i"%eid)
836 raise IndexError("No such engine: %i"%eid)
789 except:
837 except:
790 reply = wrap_exception()
838 reply = wrap_exception()
791 break
839 break
792 msg_ids = self.completed.pop(eid)
840 msg_ids = self.completed.pop(eid)
793 uid = self.engines[eid].queue
841 uid = self.engines[eid].queue
794 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
842 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
795
843
796 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
844 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
797
845
798 def resubmit_task(self, client_id, msg, buffers):
846 def resubmit_task(self, client_id, msg, buffers):
799 """Resubmit a task."""
847 """Resubmit a task."""
800 raise NotImplementedError
848 raise NotImplementedError
801
849
802 def get_results(self, client_id, msg):
850 def get_results(self, client_id, msg):
803 """Get the result of 1 or more messages."""
851 """Get the result of 1 or more messages."""
804 content = msg['content']
852 content = msg['content']
805 msg_ids = sorted(set(content['msg_ids']))
853 msg_ids = sorted(set(content['msg_ids']))
806 statusonly = content.get('status_only', False)
854 statusonly = content.get('status_only', False)
807 pending = []
855 pending = []
808 completed = []
856 completed = []
809 content = dict(status='ok')
857 content = dict(status='ok')
810 content['pending'] = pending
858 content['pending'] = pending
811 content['completed'] = completed
859 content['completed'] = completed
812 buffers = []
860 buffers = []
813 if not statusonly:
861 if not statusonly:
814 content['results'] = {}
862 content['results'] = {}
815 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
863 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
816 for msg_id in msg_ids:
864 for msg_id in msg_ids:
817 if msg_id in self.pending:
865 if msg_id in self.pending:
818 pending.append(msg_id)
866 pending.append(msg_id)
819 elif msg_id in self.all_completed:
867 elif msg_id in self.all_completed:
820 completed.append(msg_id)
868 completed.append(msg_id)
821 if not statusonly:
869 if not statusonly:
822 rec = records[msg_id]
870 rec = records[msg_id]
871 io_dict = {}
872 for key in 'pyin pyout pyerr stdout stderr'.split():
873 io_dict[key] = rec[key]
823 content[msg_id] = { 'result_content': rec['result_content'],
874 content[msg_id] = { 'result_content': rec['result_content'],
824 'header': rec['header'],
875 'header': rec['header'],
825 'result_header' : rec['result_header'],
876 'result_header' : rec['result_header'],
877 'io' : io_dict,
826 }
878 }
827 buffers.extend(map(str, rec['result_buffers']))
879 buffers.extend(map(str, rec['result_buffers']))
828 else:
880 else:
829 try:
881 try:
830 raise KeyError('No such message: '+msg_id)
882 raise KeyError('No such message: '+msg_id)
831 except:
883 except:
832 content = wrap_exception()
884 content = wrap_exception()
833 break
885 break
834 self.session.send(self.clientele, "result_reply", content=content,
886 self.session.send(self.clientele, "result_reply", content=content,
835 parent=msg, ident=client_id,
887 parent=msg, ident=client_id,
836 buffers=buffers)
888 buffers=buffers)
837
889
@@ -1,424 +1,424 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6 """
6 """
7
7
8 #----------------------------------------------------------------------
8 #----------------------------------------------------------------------
9 # Imports
9 # Imports
10 #----------------------------------------------------------------------
10 #----------------------------------------------------------------------
11
11
12 from __future__ import print_function
12 from __future__ import print_function
13 from random import randint,random
13 from random import randint,random
14
14
15 try:
15 try:
16 import numpy
16 import numpy
17 except ImportError:
17 except ImportError:
18 numpy = None
18 numpy = None
19
19
20 import zmq
20 import zmq
21 from zmq.eventloop import ioloop, zmqstream
21 from zmq.eventloop import ioloop, zmqstream
22
22
23 # local imports
23 # local imports
24 from IPython.zmq.log import logger # a Logger object
24 from IPython.zmq.log import logger # a Logger object
25 from client import Client
25 from client import Client
26 from dependency import Dependency
26 from dependency import Dependency
27 import streamsession as ss
27 import streamsession as ss
28
28
29 from IPython.external.decorator import decorator
29 from IPython.external.decorator import decorator
30
30
31 @decorator
31 @decorator
32 def logged(f,self,*args,**kwargs):
32 def logged(f,self,*args,**kwargs):
33 print ("#--------------------")
33 # print ("#--------------------")
34 print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
34 # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 print ("#--")
35 # print ("#--")
36 return f(self,*args, **kwargs)
36 return f(self,*args, **kwargs)
37
37
38 #----------------------------------------------------------------------
38 #----------------------------------------------------------------------
39 # Chooser functions
39 # Chooser functions
40 #----------------------------------------------------------------------
40 #----------------------------------------------------------------------
41
41
42 def plainrandom(loads):
42 def plainrandom(loads):
43 """Plain random pick."""
43 """Plain random pick."""
44 n = len(loads)
44 n = len(loads)
45 return randint(0,n-1)
45 return randint(0,n-1)
46
46
47 def lru(loads):
47 def lru(loads):
48 """Always pick the front of the line.
48 """Always pick the front of the line.
49
49
50 The content of `loads` is ignored.
50 The content of `loads` is ignored.
51
51
52 Assumes LRU ordering of loads, with oldest first.
52 Assumes LRU ordering of loads, with oldest first.
53 """
53 """
54 return 0
54 return 0
55
55
56 def twobin(loads):
56 def twobin(loads):
57 """Pick two at random, use the LRU of the two.
57 """Pick two at random, use the LRU of the two.
58
58
59 The content of loads is ignored.
59 The content of loads is ignored.
60
60
61 Assumes LRU ordering of loads, with oldest first.
61 Assumes LRU ordering of loads, with oldest first.
62 """
62 """
63 n = len(loads)
63 n = len(loads)
64 a = randint(0,n-1)
64 a = randint(0,n-1)
65 b = randint(0,n-1)
65 b = randint(0,n-1)
66 return min(a,b)
66 return min(a,b)
67
67
68 def weighted(loads):
68 def weighted(loads):
69 """Pick two at random using inverse load as weight.
69 """Pick two at random using inverse load as weight.
70
70
71 Return the less loaded of the two.
71 Return the less loaded of the two.
72 """
72 """
73 # weight 0 a million times more than 1:
73 # weight 0 a million times more than 1:
74 weights = 1./(1e-6+numpy.array(loads))
74 weights = 1./(1e-6+numpy.array(loads))
75 sums = weights.cumsum()
75 sums = weights.cumsum()
76 t = sums[-1]
76 t = sums[-1]
77 x = random()*t
77 x = random()*t
78 y = random()*t
78 y = random()*t
79 idx = 0
79 idx = 0
80 idy = 0
80 idy = 0
81 while sums[idx] < x:
81 while sums[idx] < x:
82 idx += 1
82 idx += 1
83 while sums[idy] < y:
83 while sums[idy] < y:
84 idy += 1
84 idy += 1
85 if weights[idy] > weights[idx]:
85 if weights[idy] > weights[idx]:
86 return idy
86 return idy
87 else:
87 else:
88 return idx
88 return idx
89
89
90 def leastload(loads):
90 def leastload(loads):
91 """Always choose the lowest load.
91 """Always choose the lowest load.
92
92
93 If the lowest load occurs more than once, the first
93 If the lowest load occurs more than once, the first
94 occurance will be used. If loads has LRU ordering, this means
94 occurance will be used. If loads has LRU ordering, this means
95 the LRU of those with the lowest load is chosen.
95 the LRU of those with the lowest load is chosen.
96 """
96 """
97 return loads.index(min(loads))
97 return loads.index(min(loads))
98
98
99 #---------------------------------------------------------------------
99 #---------------------------------------------------------------------
100 # Classes
100 # Classes
101 #---------------------------------------------------------------------
101 #---------------------------------------------------------------------
102 class TaskScheduler(object):
102 class TaskScheduler(object):
103 """Python TaskScheduler object.
103 """Python TaskScheduler object.
104
104
105 This is the simplest object that supports msg_id based
105 This is the simplest object that supports msg_id based
106 DAG dependencies. *Only* task msg_ids are checked, not
106 DAG dependencies. *Only* task msg_ids are checked, not
107 msg_ids of jobs submitted via the MUX queue.
107 msg_ids of jobs submitted via the MUX queue.
108
108
109 """
109 """
110
110
111 scheme = leastload # function for determining the destination
111 scheme = leastload # function for determining the destination
112 client_stream = None # client-facing stream
112 client_stream = None # client-facing stream
113 engine_stream = None # engine-facing stream
113 engine_stream = None # engine-facing stream
114 mon_stream = None # controller-facing stream
114 mon_stream = None # controller-facing stream
115 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
115 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
116 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
116 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
117 pending = None # dict by engine_uuid of submitted tasks
117 pending = None # dict by engine_uuid of submitted tasks
118 completed = None # dict by engine_uuid of completed tasks
118 completed = None # dict by engine_uuid of completed tasks
119 clients = None # dict by msg_id for who submitted the task
119 clients = None # dict by msg_id for who submitted the task
120 targets = None # list of target IDENTs
120 targets = None # list of target IDENTs
121 loads = None # list of engine loads
121 loads = None # list of engine loads
122 all_done = None # set of all completed tasks
122 all_done = None # set of all completed tasks
123 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
123 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
124
124
125
125
126 def __init__(self, client_stream, engine_stream, mon_stream,
126 def __init__(self, client_stream, engine_stream, mon_stream,
127 notifier_stream, scheme=None, io_loop=None):
127 notifier_stream, scheme=None, io_loop=None):
128 if io_loop is None:
128 if io_loop is None:
129 io_loop = ioloop.IOLoop.instance()
129 io_loop = ioloop.IOLoop.instance()
130 self.io_loop = io_loop
130 self.io_loop = io_loop
131 self.client_stream = client_stream
131 self.client_stream = client_stream
132 self.engine_stream = engine_stream
132 self.engine_stream = engine_stream
133 self.mon_stream = mon_stream
133 self.mon_stream = mon_stream
134 self.notifier_stream = notifier_stream
134 self.notifier_stream = notifier_stream
135
135
136 if scheme is not None:
136 if scheme is not None:
137 self.scheme = scheme
137 self.scheme = scheme
138 else:
138 else:
139 self.scheme = TaskScheduler.scheme
139 self.scheme = TaskScheduler.scheme
140
140
141 self.session = ss.StreamSession(username="TaskScheduler")
141 self.session = ss.StreamSession(username="TaskScheduler")
142
142
143 self.dependencies = {}
143 self.dependencies = {}
144 self.depending = {}
144 self.depending = {}
145 self.completed = {}
145 self.completed = {}
146 self.pending = {}
146 self.pending = {}
147 self.all_done = set()
147 self.all_done = set()
148 self.blacklist = {}
148 self.blacklist = {}
149
149
150 self.targets = []
150 self.targets = []
151 self.loads = []
151 self.loads = []
152
152
153 engine_stream.on_recv(self.dispatch_result, copy=False)
153 engine_stream.on_recv(self.dispatch_result, copy=False)
154 self._notification_handlers = dict(
154 self._notification_handlers = dict(
155 registration_notification = self._register_engine,
155 registration_notification = self._register_engine,
156 unregistration_notification = self._unregister_engine
156 unregistration_notification = self._unregister_engine
157 )
157 )
158 self.notifier_stream.on_recv(self.dispatch_notification)
158 self.notifier_stream.on_recv(self.dispatch_notification)
159
159
160 def resume_receiving(self):
160 def resume_receiving(self):
161 """Resume accepting jobs."""
161 """Resume accepting jobs."""
162 self.client_stream.on_recv(self.dispatch_submission, copy=False)
162 self.client_stream.on_recv(self.dispatch_submission, copy=False)
163
163
164 def stop_receiving(self):
164 def stop_receiving(self):
165 """Stop accepting jobs while there are no engines.
165 """Stop accepting jobs while there are no engines.
166 Leave them in the ZMQ queue."""
166 Leave them in the ZMQ queue."""
167 self.client_stream.on_recv(None)
167 self.client_stream.on_recv(None)
168
168
169 #-----------------------------------------------------------------------
169 #-----------------------------------------------------------------------
170 # [Un]Registration Handling
170 # [Un]Registration Handling
171 #-----------------------------------------------------------------------
171 #-----------------------------------------------------------------------
172
172
173 def dispatch_notification(self, msg):
173 def dispatch_notification(self, msg):
174 """dispatch register/unregister events."""
174 """dispatch register/unregister events."""
175 idents,msg = self.session.feed_identities(msg)
175 idents,msg = self.session.feed_identities(msg)
176 msg = self.session.unpack_message(msg)
176 msg = self.session.unpack_message(msg)
177 msg_type = msg['msg_type']
177 msg_type = msg['msg_type']
178 handler = self._notification_handlers.get(msg_type, None)
178 handler = self._notification_handlers.get(msg_type, None)
179 if handler is None:
179 if handler is None:
180 raise Exception("Unhandled message type: %s"%msg_type)
180 raise Exception("Unhandled message type: %s"%msg_type)
181 else:
181 else:
182 try:
182 try:
183 handler(str(msg['content']['queue']))
183 handler(str(msg['content']['queue']))
184 except KeyError:
184 except KeyError:
185 logger.error("task::Invalid notification msg: %s"%msg)
185 logger.error("task::Invalid notification msg: %s"%msg)
186 @logged
186 @logged
187 def _register_engine(self, uid):
187 def _register_engine(self, uid):
188 """New engine with ident `uid` became available."""
188 """New engine with ident `uid` became available."""
189 # head of the line:
189 # head of the line:
190 self.targets.insert(0,uid)
190 self.targets.insert(0,uid)
191 self.loads.insert(0,0)
191 self.loads.insert(0,0)
192 # initialize sets
192 # initialize sets
193 self.completed[uid] = set()
193 self.completed[uid] = set()
194 self.pending[uid] = {}
194 self.pending[uid] = {}
195 if len(self.targets) == 1:
195 if len(self.targets) == 1:
196 self.resume_receiving()
196 self.resume_receiving()
197
197
198 def _unregister_engine(self, uid):
198 def _unregister_engine(self, uid):
199 """Existing engine with ident `uid` became unavailable."""
199 """Existing engine with ident `uid` became unavailable."""
200 if len(self.targets) == 1:
200 if len(self.targets) == 1:
201 # this was our only engine
201 # this was our only engine
202 self.stop_receiving()
202 self.stop_receiving()
203
203
204 # handle any potentially finished tasks:
204 # handle any potentially finished tasks:
205 self.engine_stream.flush()
205 self.engine_stream.flush()
206
206
207 self.completed.pop(uid)
207 self.completed.pop(uid)
208 lost = self.pending.pop(uid)
208 lost = self.pending.pop(uid)
209
209
210 idx = self.targets.index(uid)
210 idx = self.targets.index(uid)
211 self.targets.pop(idx)
211 self.targets.pop(idx)
212 self.loads.pop(idx)
212 self.loads.pop(idx)
213
213
214 self.handle_stranded_tasks(lost)
214 self.handle_stranded_tasks(lost)
215
215
216 def handle_stranded_tasks(self, lost):
216 def handle_stranded_tasks(self, lost):
217 """Deal with jobs resident in an engine that died."""
217 """Deal with jobs resident in an engine that died."""
218 # TODO: resubmit the tasks?
218 # TODO: resubmit the tasks?
219 for msg_id in lost:
219 for msg_id in lost:
220 pass
220 pass
221
221
222
222
223 #-----------------------------------------------------------------------
223 #-----------------------------------------------------------------------
224 # Job Submission
224 # Job Submission
225 #-----------------------------------------------------------------------
225 #-----------------------------------------------------------------------
226 @logged
226 @logged
227 def dispatch_submission(self, raw_msg):
227 def dispatch_submission(self, raw_msg):
228 """Dispatch job submission to appropriate handlers."""
228 """Dispatch job submission to appropriate handlers."""
229 # ensure targets up to date:
229 # ensure targets up to date:
230 self.notifier_stream.flush()
230 self.notifier_stream.flush()
231 try:
231 try:
232 idents, msg = self.session.feed_identities(raw_msg, copy=False)
232 idents, msg = self.session.feed_identities(raw_msg, copy=False)
233 except Exception as e:
233 except Exception as e:
234 logger.error("task::Invaid msg: %s"%msg)
234 logger.error("task::Invaid msg: %s"%msg)
235 return
235 return
236
236
237 # send to monitor
237 # send to monitor
238 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
238 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
239
239
240 msg = self.session.unpack_message(msg, content=False, copy=False)
240 msg = self.session.unpack_message(msg, content=False, copy=False)
241 header = msg['header']
241 header = msg['header']
242 msg_id = header['msg_id']
242 msg_id = header['msg_id']
243
243
244 # time dependencies
244 # time dependencies
245 after = Dependency(header.get('after', []))
245 after = Dependency(header.get('after', []))
246 if after.mode == 'all':
246 if after.mode == 'all':
247 after.difference_update(self.all_done)
247 after.difference_update(self.all_done)
248 if after.check(self.all_done):
248 if after.check(self.all_done):
249 # recast as empty set, if `after` already met,
249 # recast as empty set, if `after` already met,
250 # to prevent unnecessary set comparisons
250 # to prevent unnecessary set comparisons
251 after = Dependency([])
251 after = Dependency([])
252
252
253 # location dependencies
253 # location dependencies
254 follow = Dependency(header.get('follow', []))
254 follow = Dependency(header.get('follow', []))
255 if len(after) == 0:
255 if len(after) == 0:
256 # time deps already met, try to run
256 # time deps already met, try to run
257 if not self.maybe_run(msg_id, raw_msg, follow):
257 if not self.maybe_run(msg_id, raw_msg, follow):
258 # can't run yet
258 # can't run yet
259 self.save_unmet(msg_id, raw_msg, after, follow)
259 self.save_unmet(msg_id, raw_msg, after, follow)
260 else:
260 else:
261 self.save_unmet(msg_id, raw_msg, after, follow)
261 self.save_unmet(msg_id, raw_msg, after, follow)
262
262
263 @logged
263 @logged
264 def maybe_run(self, msg_id, raw_msg, follow=None):
264 def maybe_run(self, msg_id, raw_msg, follow=None):
265 """check location dependencies, and run if they are met."""
265 """check location dependencies, and run if they are met."""
266
266
267 if follow:
267 if follow:
268 def can_run(idx):
268 def can_run(idx):
269 target = self.targets[idx]
269 target = self.targets[idx]
270 return target not in self.blacklist.get(msg_id, []) and\
270 return target not in self.blacklist.get(msg_id, []) and\
271 follow.check(self.completed[target])
271 follow.check(self.completed[target])
272
272
273 indices = filter(can_run, range(len(self.targets)))
273 indices = filter(can_run, range(len(self.targets)))
274 if not indices:
274 if not indices:
275 return False
275 return False
276 else:
276 else:
277 indices = None
277 indices = None
278
278
279 self.submit_task(msg_id, raw_msg, indices)
279 self.submit_task(msg_id, raw_msg, indices)
280 return True
280 return True
281
281
282 @logged
282 @logged
283 def save_unmet(self, msg_id, msg, after, follow):
283 def save_unmet(self, msg_id, msg, after, follow):
284 """Save a message for later submission when its dependencies are met."""
284 """Save a message for later submission when its dependencies are met."""
285 self.depending[msg_id] = (msg_id,msg,after,follow)
285 self.depending[msg_id] = (msg_id,msg,after,follow)
286 # track the ids in both follow/after, but not those already completed
286 # track the ids in both follow/after, but not those already completed
287 for dep_id in after.union(follow).difference(self.all_done):
287 for dep_id in after.union(follow).difference(self.all_done):
288 if dep_id not in self.dependencies:
288 if dep_id not in self.dependencies:
289 self.dependencies[dep_id] = set()
289 self.dependencies[dep_id] = set()
290 self.dependencies[dep_id].add(msg_id)
290 self.dependencies[dep_id].add(msg_id)
291
291
292 @logged
292 @logged
293 def submit_task(self, msg_id, msg, follow=None, indices=None):
293 def submit_task(self, msg_id, msg, follow=None, indices=None):
294 """Submit a task to any of a subset of our targets."""
294 """Submit a task to any of a subset of our targets."""
295 if indices:
295 if indices:
296 loads = [self.loads[i] for i in indices]
296 loads = [self.loads[i] for i in indices]
297 else:
297 else:
298 loads = self.loads
298 loads = self.loads
299 idx = self.scheme(loads)
299 idx = self.scheme(loads)
300 if indices:
300 if indices:
301 idx = indices[idx]
301 idx = indices[idx]
302 target = self.targets[idx]
302 target = self.targets[idx]
303 # print (target, map(str, msg[:3]))
303 # print (target, map(str, msg[:3]))
304 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
304 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
305 self.engine_stream.send_multipart(msg, copy=False)
305 self.engine_stream.send_multipart(msg, copy=False)
306 self.add_job(idx)
306 self.add_job(idx)
307 self.pending[target][msg_id] = (msg, follow)
307 self.pending[target][msg_id] = (msg, follow)
308 content = dict(msg_id=msg_id, engine_id=target)
308 content = dict(msg_id=msg_id, engine_id=target)
309 self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask')
309 self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask')
310
310
311 #-----------------------------------------------------------------------
311 #-----------------------------------------------------------------------
312 # Result Handling
312 # Result Handling
313 #-----------------------------------------------------------------------
313 #-----------------------------------------------------------------------
314 @logged
314 @logged
315 def dispatch_result(self, raw_msg):
315 def dispatch_result(self, raw_msg):
316 try:
316 try:
317 idents,msg = self.session.feed_identities(raw_msg, copy=False)
317 idents,msg = self.session.feed_identities(raw_msg, copy=False)
318 except Exception as e:
318 except Exception as e:
319 logger.error("task::Invaid result: %s"%msg)
319 logger.error("task::Invaid result: %s"%msg)
320 return
320 return
321 msg = self.session.unpack_message(msg, content=False, copy=False)
321 msg = self.session.unpack_message(msg, content=False, copy=False)
322 header = msg['header']
322 header = msg['header']
323 if header.get('dependencies_met', True):
323 if header.get('dependencies_met', True):
324 self.handle_result_success(idents, msg['parent_header'], raw_msg)
324 self.handle_result_success(idents, msg['parent_header'], raw_msg)
325 # send to monitor
325 # send to monitor
326 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
326 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
327 else:
327 else:
328 self.handle_unmet_dependency(idents, msg['parent_header'])
328 self.handle_unmet_dependency(idents, msg['parent_header'])
329
329
330 @logged
330 @logged
331 def handle_result_success(self, idents, parent, raw_msg):
331 def handle_result_success(self, idents, parent, raw_msg):
332 # first, relay result to client
332 # first, relay result to client
333 engine = idents[0]
333 engine = idents[0]
334 client = idents[1]
334 client = idents[1]
335 # swap_ids for XREP-XREP mirror
335 # swap_ids for XREP-XREP mirror
336 raw_msg[:2] = [client,engine]
336 raw_msg[:2] = [client,engine]
337 # print (map(str, raw_msg[:4]))
337 # print (map(str, raw_msg[:4]))
338 self.client_stream.send_multipart(raw_msg, copy=False)
338 self.client_stream.send_multipart(raw_msg, copy=False)
339 # now, update our data structures
339 # now, update our data structures
340 msg_id = parent['msg_id']
340 msg_id = parent['msg_id']
341 self.pending[engine].pop(msg_id)
341 self.pending[engine].pop(msg_id)
342 self.completed[engine].add(msg_id)
342 self.completed[engine].add(msg_id)
343 self.all_done.add(msg_id)
343 self.all_done.add(msg_id)
344
344
345 self.update_dependencies(msg_id)
345 self.update_dependencies(msg_id)
346
346
347 @logged
347 @logged
348 def handle_unmet_dependency(self, idents, parent):
348 def handle_unmet_dependency(self, idents, parent):
349 engine = idents[0]
349 engine = idents[0]
350 msg_id = parent['msg_id']
350 msg_id = parent['msg_id']
351 if msg_id not in self.blacklist:
351 if msg_id not in self.blacklist:
352 self.blacklist[msg_id] = set()
352 self.blacklist[msg_id] = set()
353 self.blacklist[msg_id].add(engine)
353 self.blacklist[msg_id].add(engine)
354 raw_msg,follow = self.pending[engine].pop(msg_id)
354 raw_msg,follow = self.pending[engine].pop(msg_id)
355 if not self.maybe_run(msg_id, raw_msg, follow):
355 if not self.maybe_run(msg_id, raw_msg, follow):
356 # resubmit failed, put it back in our dependency tree
356 # resubmit failed, put it back in our dependency tree
357 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
357 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
358 pass
358 pass
359 @logged
359 @logged
360 def update_dependencies(self, dep_id):
360 def update_dependencies(self, dep_id):
361 """dep_id just finished. Update our dependency
361 """dep_id just finished. Update our dependency
362 table and submit any jobs that just became runable."""
362 table and submit any jobs that just became runable."""
363
363
364 if dep_id not in self.dependencies:
364 if dep_id not in self.dependencies:
365 return
365 return
366 jobs = self.dependencies.pop(dep_id)
366 jobs = self.dependencies.pop(dep_id)
367 for job in jobs:
367 for job in jobs:
368 msg_id, raw_msg, after, follow = self.depending[job]
368 msg_id, raw_msg, after, follow = self.depending[job]
369 if dep_id in after:
369 if dep_id in after:
370 after.remove(dep_id)
370 after.remove(dep_id)
371 if not after: # time deps met, maybe run
371 if not after: # time deps met, maybe run
372 if self.maybe_run(msg_id, raw_msg, follow):
372 if self.maybe_run(msg_id, raw_msg, follow):
373 self.depending.pop(job)
373 self.depending.pop(job)
374 for mid in follow:
374 for mid in follow:
375 if mid in self.dependencies:
375 if mid in self.dependencies:
376 self.dependencies[mid].remove(msg_id)
376 self.dependencies[mid].remove(msg_id)
377
377
378 #----------------------------------------------------------------------
378 #----------------------------------------------------------------------
379 # methods to be overridden by subclasses
379 # methods to be overridden by subclasses
380 #----------------------------------------------------------------------
380 #----------------------------------------------------------------------
381
381
382 def add_job(self, idx):
382 def add_job(self, idx):
383 """Called after self.targets[idx] just got the job with header.
383 """Called after self.targets[idx] just got the job with header.
384 Override with subclasses. The default ordering is simple LRU.
384 Override with subclasses. The default ordering is simple LRU.
385 The default loads are the number of outstanding jobs."""
385 The default loads are the number of outstanding jobs."""
386 self.loads[idx] += 1
386 self.loads[idx] += 1
387 for lis in (self.targets, self.loads):
387 for lis in (self.targets, self.loads):
388 lis.append(lis.pop(idx))
388 lis.append(lis.pop(idx))
389
389
390
390
391 def finish_job(self, idx):
391 def finish_job(self, idx):
392 """Called after self.targets[idx] just finished a job.
392 """Called after self.targets[idx] just finished a job.
393 Override with subclasses."""
393 Override with subclasses."""
394 self.loads[idx] -= 1
394 self.loads[idx] -= 1
395
395
396
396
397
397
398 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):
398 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):
399 from zmq.eventloop import ioloop
399 from zmq.eventloop import ioloop
400 from zmq.eventloop.zmqstream import ZMQStream
400 from zmq.eventloop.zmqstream import ZMQStream
401
401
402 ctx = zmq.Context()
402 ctx = zmq.Context()
403 loop = ioloop.IOLoop()
403 loop = ioloop.IOLoop()
404
404
405 scheme = globals().get(scheme)
405 scheme = globals().get(scheme)
406
406
407 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
407 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
408 ins.bind(in_addr)
408 ins.bind(in_addr)
409 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
409 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
410 outs.bind(out_addr)
410 outs.bind(out_addr)
411 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
411 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
412 mons.connect(mon_addr)
412 mons.connect(mon_addr)
413 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
413 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
414 nots.setsockopt(zmq.SUBSCRIBE, '')
414 nots.setsockopt(zmq.SUBSCRIBE, '')
415 nots.connect(not_addr)
415 nots.connect(not_addr)
416
416
417 scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop)
417 scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop)
418
418
419 loop.start()
419 loop.start()
420
420
421
421
422 if __name__ == '__main__':
422 if __name__ == '__main__':
423 iface = 'tcp://127.0.0.1:%i'
423 iface = 'tcp://127.0.0.1:%i'
424 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
424 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
@@ -1,435 +1,459 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
4 """
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Imports
7 # Imports
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9
9
10 # Standard library imports.
10 # Standard library imports.
11 from __future__ import print_function
11 from __future__ import print_function
12 import __builtin__
12 import __builtin__
13 from code import CommandCompiler
13 from code import CommandCompiler
14 import os
14 import os
15 import sys
15 import sys
16 import time
16 import time
17 import traceback
17 import traceback
18 from datetime import datetime
18 from datetime import datetime
19 from signal import SIGTERM, SIGKILL
19 from signal import SIGTERM, SIGKILL
20 from pprint import pprint
20 from pprint import pprint
21
21
22 # System library imports.
22 # System library imports.
23 import zmq
23 import zmq
24 from zmq.eventloop import ioloop, zmqstream
24 from zmq.eventloop import ioloop, zmqstream
25
25
26 # Local imports.
26 # Local imports.
27 from IPython.core import ultratb
27 from IPython.core import ultratb
28 from IPython.utils.traitlets import HasTraits, Instance, List
28 from IPython.utils.traitlets import HasTraits, Instance, List
29 from IPython.zmq.completer import KernelCompleter
29 from IPython.zmq.completer import KernelCompleter
30 from IPython.zmq.log import logger # a Logger object
30 from IPython.zmq.log import logger # a Logger object
31 from IPython.zmq.iostream import OutStream
32 from IPython.zmq.displayhook import DisplayHook
33
31
34
32 from streamsession import StreamSession, Message, extract_header, serialize_object,\
35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
33 unpack_apply_message, ISO8601, wrap_exception
36 unpack_apply_message, ISO8601, wrap_exception
34 from dependency import UnmetDependency
37 from dependency import UnmetDependency
35 import heartmonitor
38 import heartmonitor
36 from client import Client
39 from client import Client
37
40
38 def printer(*args):
41 def printer(*args):
39 pprint(args)
42 pprint(args, stream=sys.__stdout__)
40
43
41 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
42 # Main kernel class
45 # Main kernel class
43 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
44
47
45 class Kernel(HasTraits):
48 class Kernel(HasTraits):
46
49
47 #---------------------------------------------------------------------------
50 #---------------------------------------------------------------------------
48 # Kernel interface
51 # Kernel interface
49 #---------------------------------------------------------------------------
52 #---------------------------------------------------------------------------
50
53
51 session = Instance(StreamSession)
54 session = Instance(StreamSession)
52 shell_streams = Instance(list)
55 shell_streams = Instance(list)
53 control_stream = Instance(zmqstream.ZMQStream)
56 control_stream = Instance(zmqstream.ZMQStream)
54 task_stream = Instance(zmqstream.ZMQStream)
57 task_stream = Instance(zmqstream.ZMQStream)
55 iopub_stream = Instance(zmqstream.ZMQStream)
58 iopub_stream = Instance(zmqstream.ZMQStream)
56 client = Instance(Client)
59 client = Instance(Client)
57 loop = Instance(ioloop.IOLoop)
60 loop = Instance(ioloop.IOLoop)
58
61
59 def __init__(self, **kwargs):
62 def __init__(self, **kwargs):
60 super(Kernel, self).__init__(**kwargs)
63 super(Kernel, self).__init__(**kwargs)
61 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
64 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
65 self.prefix = 'engine.%s'%self.identity
62 self.user_ns = {}
66 self.user_ns = {}
63 self.history = []
67 self.history = []
64 self.compiler = CommandCompiler()
68 self.compiler = CommandCompiler()
65 self.completer = KernelCompleter(self.user_ns)
69 self.completer = KernelCompleter(self.user_ns)
66 self.aborted = set()
70 self.aborted = set()
67
71
68 # Build dict of handlers for message types
72 # Build dict of handlers for message types
69 self.shell_handlers = {}
73 self.shell_handlers = {}
70 self.control_handlers = {}
74 self.control_handlers = {}
71 for msg_type in ['execute_request', 'complete_request', 'apply_request',
75 for msg_type in ['execute_request', 'complete_request', 'apply_request',
72 'clear_request']:
76 'clear_request']:
73 self.shell_handlers[msg_type] = getattr(self, msg_type)
77 self.shell_handlers[msg_type] = getattr(self, msg_type)
74
78
75 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
79 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
76 self.control_handlers[msg_type] = getattr(self, msg_type)
80 self.control_handlers[msg_type] = getattr(self, msg_type)
77
81
78
82
79 def _wrap_exception(self, method=None):
83 def _wrap_exception(self, method=None):
80 e_info = dict(engineid=self.identity, method=method)
84 e_info = dict(engineid=self.identity, method=method)
81 content=wrap_exception(e_info)
85 content=wrap_exception(e_info)
82 return content
86 return content
83
87
84 #-------------------- control handlers -----------------------------
88 #-------------------- control handlers -----------------------------
85 def abort_queues(self):
89 def abort_queues(self):
86 for stream in self.shell_streams:
90 for stream in self.shell_streams:
87 if stream:
91 if stream:
88 self.abort_queue(stream)
92 self.abort_queue(stream)
89
93
90 def abort_queue(self, stream):
94 def abort_queue(self, stream):
91 while True:
95 while True:
92 try:
96 try:
93 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
97 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
94 except zmq.ZMQError as e:
98 except zmq.ZMQError as e:
95 if e.errno == zmq.EAGAIN:
99 if e.errno == zmq.EAGAIN:
96 break
100 break
97 else:
101 else:
98 return
102 return
99 else:
103 else:
100 if msg is None:
104 if msg is None:
101 return
105 return
102 else:
106 else:
103 idents,msg = msg
107 idents,msg = msg
104
108
105 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
109 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
106 # msg = self.reply_socket.recv_json()
110 # msg = self.reply_socket.recv_json()
107 print ("Aborting:", file=sys.__stdout__)
111 print ("Aborting:", file=sys.__stdout__)
108 print (Message(msg), file=sys.__stdout__)
112 print (Message(msg), file=sys.__stdout__)
109 msg_type = msg['msg_type']
113 msg_type = msg['msg_type']
110 reply_type = msg_type.split('_')[0] + '_reply'
114 reply_type = msg_type.split('_')[0] + '_reply'
111 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
115 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
112 # self.reply_socket.send(ident,zmq.SNDMORE)
116 # self.reply_socket.send(ident,zmq.SNDMORE)
113 # self.reply_socket.send_json(reply_msg)
117 # self.reply_socket.send_json(reply_msg)
114 reply_msg = self.session.send(stream, reply_type,
118 reply_msg = self.session.send(stream, reply_type,
115 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
119 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
116 print(Message(reply_msg), file=sys.__stdout__)
120 print(Message(reply_msg), file=sys.__stdout__)
117 # We need to wait a bit for requests to come in. This can probably
121 # We need to wait a bit for requests to come in. This can probably
118 # be set shorter for true asynchronous clients.
122 # be set shorter for true asynchronous clients.
119 time.sleep(0.05)
123 time.sleep(0.05)
120
124
121 def abort_request(self, stream, ident, parent):
125 def abort_request(self, stream, ident, parent):
122 """abort a specifig msg by id"""
126 """abort a specifig msg by id"""
123 msg_ids = parent['content'].get('msg_ids', None)
127 msg_ids = parent['content'].get('msg_ids', None)
124 if isinstance(msg_ids, basestring):
128 if isinstance(msg_ids, basestring):
125 msg_ids = [msg_ids]
129 msg_ids = [msg_ids]
126 if not msg_ids:
130 if not msg_ids:
127 self.abort_queues()
131 self.abort_queues()
128 for mid in msg_ids:
132 for mid in msg_ids:
129 self.aborted.add(str(mid))
133 self.aborted.add(str(mid))
130
134
131 content = dict(status='ok')
135 content = dict(status='ok')
132 reply_msg = self.session.send(stream, 'abort_reply', content=content,
136 reply_msg = self.session.send(stream, 'abort_reply', content=content,
133 parent=parent, ident=ident)[0]
137 parent=parent, ident=ident)[0]
134 print(Message(reply_msg), file=sys.__stdout__)
138 print(Message(reply_msg), file=sys.__stdout__)
135
139
136 def shutdown_request(self, stream, ident, parent):
140 def shutdown_request(self, stream, ident, parent):
137 """kill ourself. This should really be handled in an external process"""
141 """kill ourself. This should really be handled in an external process"""
138 try:
142 try:
139 self.abort_queues()
143 self.abort_queues()
140 except:
144 except:
141 content = self._wrap_exception('shutdown')
145 content = self._wrap_exception('shutdown')
142 else:
146 else:
143 content = dict(parent['content'])
147 content = dict(parent['content'])
144 content['status'] = 'ok'
148 content['status'] = 'ok'
145 msg = self.session.send(stream, 'shutdown_reply',
149 msg = self.session.send(stream, 'shutdown_reply',
146 content=content, parent=parent, ident=ident)
150 content=content, parent=parent, ident=ident)
147 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
151 # msg = self.session.send(self.pub_socket, 'shutdown_reply',
148 # content, parent, ident)
152 # content, parent, ident)
149 # print >> sys.__stdout__, msg
153 # print >> sys.__stdout__, msg
150 # time.sleep(0.2)
154 # time.sleep(0.2)
151 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
155 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
152 dc.start()
156 dc.start()
153
157
154 def dispatch_control(self, msg):
158 def dispatch_control(self, msg):
155 idents,msg = self.session.feed_identities(msg, copy=False)
159 idents,msg = self.session.feed_identities(msg, copy=False)
156 try:
160 try:
157 msg = self.session.unpack_message(msg, content=True, copy=False)
161 msg = self.session.unpack_message(msg, content=True, copy=False)
158 except:
162 except:
159 logger.error("Invalid Message", exc_info=True)
163 logger.error("Invalid Message", exc_info=True)
160 return
164 return
161
165
162 header = msg['header']
166 header = msg['header']
163 msg_id = header['msg_id']
167 msg_id = header['msg_id']
164
168
165 handler = self.control_handlers.get(msg['msg_type'], None)
169 handler = self.control_handlers.get(msg['msg_type'], None)
166 if handler is None:
170 if handler is None:
167 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
171 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
168 else:
172 else:
169 handler(self.control_stream, idents, msg)
173 handler(self.control_stream, idents, msg)
170
174
171
175
172 #-------------------- queue helpers ------------------------------
176 #-------------------- queue helpers ------------------------------
173
177
174 def check_dependencies(self, dependencies):
178 def check_dependencies(self, dependencies):
175 if not dependencies:
179 if not dependencies:
176 return True
180 return True
177 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
181 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
178 anyorall = dependencies[0]
182 anyorall = dependencies[0]
179 dependencies = dependencies[1]
183 dependencies = dependencies[1]
180 else:
184 else:
181 anyorall = 'all'
185 anyorall = 'all'
182 results = self.client.get_results(dependencies,status_only=True)
186 results = self.client.get_results(dependencies,status_only=True)
183 if results['status'] != 'ok':
187 if results['status'] != 'ok':
184 return False
188 return False
185
189
186 if anyorall == 'any':
190 if anyorall == 'any':
187 if not results['completed']:
191 if not results['completed']:
188 return False
192 return False
189 else:
193 else:
190 if results['pending']:
194 if results['pending']:
191 return False
195 return False
192
196
193 return True
197 return True
194
198
195 def check_aborted(self, msg_id):
199 def check_aborted(self, msg_id):
196 return msg_id in self.aborted
200 return msg_id in self.aborted
197
201
198 #-------------------- queue handlers -----------------------------
202 #-------------------- queue handlers -----------------------------
199
203
200 def clear_request(self, stream, idents, parent):
204 def clear_request(self, stream, idents, parent):
201 """Clear our namespace."""
205 """Clear our namespace."""
202 self.user_ns = {}
206 self.user_ns = {}
203 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
207 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
204 content = dict(status='ok'))
208 content = dict(status='ok'))
205
209
206 def execute_request(self, stream, ident, parent):
210 def execute_request(self, stream, ident, parent):
207 try:
211 try:
208 code = parent[u'content'][u'code']
212 code = parent[u'content'][u'code']
209 except:
213 except:
210 print("Got bad msg: ", file=sys.__stderr__)
214 print("Got bad msg: ", file=sys.__stderr__)
211 print(Message(parent), file=sys.__stderr__)
215 print(Message(parent), file=sys.__stderr__)
212 return
216 return
213 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
217 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
214 # self.iopub_stream.send(pyin_msg)
218 # self.iopub_stream.send(pyin_msg)
215 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
219 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
220 ident=self.identity+'.pyin')
216 started = datetime.now().strftime(ISO8601)
221 started = datetime.now().strftime(ISO8601)
217 try:
222 try:
218 comp_code = self.compiler(code, '<zmq-kernel>')
223 comp_code = self.compiler(code, '<zmq-kernel>')
219 # allow for not overriding displayhook
224 # allow for not overriding displayhook
220 if hasattr(sys.displayhook, 'set_parent'):
225 if hasattr(sys.displayhook, 'set_parent'):
221 sys.displayhook.set_parent(parent)
226 sys.displayhook.set_parent(parent)
227 sys.stdout.set_parent(parent)
228 sys.stderr.set_parent(parent)
222 exec comp_code in self.user_ns, self.user_ns
229 exec comp_code in self.user_ns, self.user_ns
223 except:
230 except:
224 exc_content = self._wrap_exception('execute')
231 exc_content = self._wrap_exception('execute')
225 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
232 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
226 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
233 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
234 ident=self.identity+'.pyerr')
227 reply_content = exc_content
235 reply_content = exc_content
228 else:
236 else:
229 reply_content = {'status' : 'ok'}
237 reply_content = {'status' : 'ok'}
230 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
238 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
231 # self.reply_socket.send(ident, zmq.SNDMORE)
239 # self.reply_socket.send(ident, zmq.SNDMORE)
232 # self.reply_socket.send_json(reply_msg)
240 # self.reply_socket.send_json(reply_msg)
233 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
241 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
234 ident=ident, subheader = dict(started=started))
242 ident=ident, subheader = dict(started=started))
235 print(Message(reply_msg), file=sys.__stdout__)
243 print(Message(reply_msg), file=sys.__stdout__)
236 if reply_msg['content']['status'] == u'error':
244 if reply_msg['content']['status'] == u'error':
237 self.abort_queues()
245 self.abort_queues()
238
246
239 def complete_request(self, stream, ident, parent):
247 def complete_request(self, stream, ident, parent):
240 matches = {'matches' : self.complete(parent),
248 matches = {'matches' : self.complete(parent),
241 'status' : 'ok'}
249 'status' : 'ok'}
242 completion_msg = self.session.send(stream, 'complete_reply',
250 completion_msg = self.session.send(stream, 'complete_reply',
243 matches, parent, ident)
251 matches, parent, ident)
244 # print >> sys.__stdout__, completion_msg
252 # print >> sys.__stdout__, completion_msg
245
253
246 def complete(self, msg):
254 def complete(self, msg):
247 return self.completer.complete(msg.content.line, msg.content.text)
255 return self.completer.complete(msg.content.line, msg.content.text)
248
256
249 def apply_request(self, stream, ident, parent):
257 def apply_request(self, stream, ident, parent):
250 print (parent)
258 # print (parent)
251 try:
259 try:
252 content = parent[u'content']
260 content = parent[u'content']
253 bufs = parent[u'buffers']
261 bufs = parent[u'buffers']
254 msg_id = parent['header']['msg_id']
262 msg_id = parent['header']['msg_id']
255 bound = content.get('bound', False)
263 bound = content.get('bound', False)
256 except:
264 except:
257 print("Got bad msg: ", file=sys.__stderr__)
265 print("Got bad msg: ", file=sys.__stderr__)
258 print(Message(parent), file=sys.__stderr__)
266 print(Message(parent), file=sys.__stderr__)
259 return
267 return
260 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
268 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
261 # self.iopub_stream.send(pyin_msg)
269 # self.iopub_stream.send(pyin_msg)
262 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
270 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
263 sub = {'dependencies_met' : True, 'engine' : self.identity,
271 sub = {'dependencies_met' : True, 'engine' : self.identity,
264 'started': datetime.now().strftime(ISO8601)}
272 'started': datetime.now().strftime(ISO8601)}
265 try:
273 try:
266 # allow for not overriding displayhook
274 # allow for not overriding displayhook
267 if hasattr(sys.displayhook, 'set_parent'):
275 if hasattr(sys.displayhook, 'set_parent'):
268 sys.displayhook.set_parent(parent)
276 sys.displayhook.set_parent(parent)
277 sys.stdout.set_parent(parent)
278 sys.stderr.set_parent(parent)
269 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
279 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
270 if bound:
280 if bound:
271 working = self.user_ns
281 working = self.user_ns
272 suffix = str(msg_id).replace("-","")
282 suffix = str(msg_id).replace("-","")
273 prefix = "_"
283 prefix = "_"
274
284
275 else:
285 else:
276 working = dict()
286 working = dict()
277 suffix = prefix = "_" # prevent keyword collisions with lambda
287 suffix = prefix = "_" # prevent keyword collisions with lambda
278 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
288 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
279 # if f.fun
289 # if f.fun
280 if hasattr(f, 'func_name'):
290 if hasattr(f, 'func_name'):
281 fname = f.func_name
291 fname = f.func_name
282 else:
292 else:
283 fname = f.__name__
293 fname = f.__name__
284
294
285 fname = prefix+fname.strip('<>')+suffix
295 fname = prefix+fname.strip('<>')+suffix
286 argname = prefix+"args"+suffix
296 argname = prefix+"args"+suffix
287 kwargname = prefix+"kwargs"+suffix
297 kwargname = prefix+"kwargs"+suffix
288 resultname = prefix+"result"+suffix
298 resultname = prefix+"result"+suffix
289
299
290 ns = { fname : f, argname : args, kwargname : kwargs }
300 ns = { fname : f, argname : args, kwargname : kwargs }
291 # print ns
301 # print ns
292 working.update(ns)
302 working.update(ns)
293 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
303 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
294 exec code in working, working
304 exec code in working, working
295 result = working.get(resultname)
305 result = working.get(resultname)
296 # clear the namespace
306 # clear the namespace
297 if bound:
307 if bound:
298 for key in ns.iterkeys():
308 for key in ns.iterkeys():
299 self.user_ns.pop(key)
309 self.user_ns.pop(key)
300 else:
310 else:
301 del working
311 del working
302
312
303 packed_result,buf = serialize_object(result)
313 packed_result,buf = serialize_object(result)
304 result_buf = [packed_result]+buf
314 result_buf = [packed_result]+buf
305 except:
315 except:
306 exc_content = self._wrap_exception('apply')
316 exc_content = self._wrap_exception('apply')
307 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
317 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
308 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
318 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
319 ident=self.identity+'.pyerr')
309 reply_content = exc_content
320 reply_content = exc_content
310 result_buf = []
321 result_buf = []
311
322
312 if exc_content['ename'] == UnmetDependency.__name__:
323 if exc_content['ename'] == UnmetDependency.__name__:
313 sub['dependencies_met'] = False
324 sub['dependencies_met'] = False
314 else:
325 else:
315 reply_content = {'status' : 'ok'}
326 reply_content = {'status' : 'ok'}
316 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
327 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
317 # self.reply_socket.send(ident, zmq.SNDMORE)
328 # self.reply_socket.send(ident, zmq.SNDMORE)
318 # self.reply_socket.send_json(reply_msg)
329 # self.reply_socket.send_json(reply_msg)
319 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
330 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
320 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
331 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
321 print(Message(reply_msg), file=sys.__stdout__)
332 # print(Message(reply_msg), file=sys.__stdout__)
322 # if reply_msg['content']['status'] == u'error':
333 # if reply_msg['content']['status'] == u'error':
323 # self.abort_queues()
334 # self.abort_queues()
324
335
325 def dispatch_queue(self, stream, msg):
336 def dispatch_queue(self, stream, msg):
326 self.control_stream.flush()
337 self.control_stream.flush()
327 idents,msg = self.session.feed_identities(msg, copy=False)
338 idents,msg = self.session.feed_identities(msg, copy=False)
328 try:
339 try:
329 msg = self.session.unpack_message(msg, content=True, copy=False)
340 msg = self.session.unpack_message(msg, content=True, copy=False)
330 except:
341 except:
331 logger.error("Invalid Message", exc_info=True)
342 logger.error("Invalid Message", exc_info=True)
332 return
343 return
333
344
334
345
335 header = msg['header']
346 header = msg['header']
336 msg_id = header['msg_id']
347 msg_id = header['msg_id']
337 if self.check_aborted(msg_id):
348 if self.check_aborted(msg_id):
338 self.aborted.remove(msg_id)
349 self.aborted.remove(msg_id)
339 # is it safe to assume a msg_id will not be resubmitted?
350 # is it safe to assume a msg_id will not be resubmitted?
340 reply_type = msg['msg_type'].split('_')[0] + '_reply'
351 reply_type = msg['msg_type'].split('_')[0] + '_reply'
341 reply_msg = self.session.send(stream, reply_type,
352 reply_msg = self.session.send(stream, reply_type,
342 content={'status' : 'aborted'}, parent=msg, ident=idents)
353 content={'status' : 'aborted'}, parent=msg, ident=idents)
343 return
354 return
344 handler = self.shell_handlers.get(msg['msg_type'], None)
355 handler = self.shell_handlers.get(msg['msg_type'], None)
345 if handler is None:
356 if handler is None:
346 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
357 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
347 else:
358 else:
348 handler(stream, idents, msg)
359 handler(stream, idents, msg)
349
360
350 def start(self):
361 def start(self):
351 #### stream mode:
362 #### stream mode:
352 if self.control_stream:
363 if self.control_stream:
353 self.control_stream.on_recv(self.dispatch_control, copy=False)
364 self.control_stream.on_recv(self.dispatch_control, copy=False)
354 self.control_stream.on_err(printer)
365 self.control_stream.on_err(printer)
355
366
356 def make_dispatcher(stream):
367 def make_dispatcher(stream):
357 def dispatcher(msg):
368 def dispatcher(msg):
358 return self.dispatch_queue(stream, msg)
369 return self.dispatch_queue(stream, msg)
359 return dispatcher
370 return dispatcher
360
371
361 for s in self.shell_streams:
372 for s in self.shell_streams:
362 s.on_recv(make_dispatcher(s), copy=False)
373 s.on_recv(make_dispatcher(s), copy=False)
363 s.on_err(printer)
374 s.on_err(printer)
364
375
365 if self.iopub_stream:
376 if self.iopub_stream:
366 self.iopub_stream.on_err(printer)
377 self.iopub_stream.on_err(printer)
367 self.iopub_stream.on_send(printer)
378 # self.iopub_stream.on_send(printer)
368
379
369 #### while True mode:
380 #### while True mode:
370 # while True:
381 # while True:
371 # idle = True
382 # idle = True
372 # try:
383 # try:
373 # msg = self.shell_stream.socket.recv_multipart(
384 # msg = self.shell_stream.socket.recv_multipart(
374 # zmq.NOBLOCK, copy=False)
385 # zmq.NOBLOCK, copy=False)
375 # except zmq.ZMQError, e:
386 # except zmq.ZMQError, e:
376 # if e.errno != zmq.EAGAIN:
387 # if e.errno != zmq.EAGAIN:
377 # raise e
388 # raise e
378 # else:
389 # else:
379 # idle=False
390 # idle=False
380 # self.dispatch_queue(self.shell_stream, msg)
391 # self.dispatch_queue(self.shell_stream, msg)
381 #
392 #
382 # if not self.task_stream.empty():
393 # if not self.task_stream.empty():
383 # idle=False
394 # idle=False
384 # msg = self.task_stream.recv_multipart()
395 # msg = self.task_stream.recv_multipart()
385 # self.dispatch_queue(self.task_stream, msg)
396 # self.dispatch_queue(self.task_stream, msg)
386 # if idle:
397 # if idle:
387 # # don't busywait
398 # # don't busywait
388 # time.sleep(1e-3)
399 # time.sleep(1e-3)
389
400
390 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
401 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
391 client_addr=None, loop=None, context=None, key=None):
402 client_addr=None, loop=None, context=None, key=None,
403 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
404
392 # create loop, context, and session:
405 # create loop, context, and session:
393 if loop is None:
406 if loop is None:
394 loop = ioloop.IOLoop.instance()
407 loop = ioloop.IOLoop.instance()
395 if context is None:
408 if context is None:
396 context = zmq.Context()
409 context = zmq.Context()
397 c = context
410 c = context
398 session = StreamSession(key=key)
411 session = StreamSession(key=key)
399 # print (session.key)
412 # print (session.key)
400 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
413 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
401
414
402 # create Control Stream
415 # create Control Stream
403 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
416 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
404 control_stream.setsockopt(zmq.IDENTITY, identity)
417 control_stream.setsockopt(zmq.IDENTITY, identity)
405 control_stream.connect(control_addr)
418 control_stream.connect(control_addr)
406
419
407 # create Shell Streams (MUX, Task, etc.):
420 # create Shell Streams (MUX, Task, etc.):
408 shell_streams = []
421 shell_streams = []
409 for addr in shell_addrs:
422 for addr in shell_addrs:
410 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
423 stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
411 stream.setsockopt(zmq.IDENTITY, identity)
424 stream.setsockopt(zmq.IDENTITY, identity)
412 stream.connect(addr)
425 stream.connect(addr)
413 shell_streams.append(stream)
426 shell_streams.append(stream)
414
427
415 # create iopub stream:
428 # create iopub stream:
416 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
429 iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
417 iopub_stream.setsockopt(zmq.IDENTITY, identity)
430 iopub_stream.setsockopt(zmq.IDENTITY, identity)
418 iopub_stream.connect(iopub_addr)
431 iopub_stream.connect(iopub_addr)
419
432
433 # Redirect input streams and set a display hook.
434 if out_stream_factory:
435 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
436 sys.stdout.topic = identity+'.stdout'
437 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
438 sys.stderr.topic = identity+'.stderr'
439 if display_hook_factory:
440 sys.displayhook = display_hook_factory(session, iopub_stream)
441 sys.displayhook.topic = identity+'.pyout'
442
443
420 # launch heartbeat
444 # launch heartbeat
421 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
445 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
422 heart.start()
446 heart.start()
423
447
424 # create (optional) Client
448 # create (optional) Client
425 if client_addr:
449 if client_addr:
426 client = Client(client_addr, username=identity)
450 client = Client(client_addr, username=identity)
427 else:
451 else:
428 client = None
452 client = None
429
453
430 kernel = Kernel(session=session, control_stream=control_stream,
454 kernel = Kernel(session=session, control_stream=control_stream,
431 shell_streams=shell_streams, iopub_stream=iopub_stream,
455 shell_streams=shell_streams, iopub_stream=iopub_stream,
432 client=client, loop=loop)
456 client=client, loop=loop)
433 kernel.start()
457 kernel.start()
434 return loop, c, kernel
458 return loop, c, kernel
435
459
@@ -1,544 +1,543 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """edited session.py to work with streams, and move msg_type to the header
2 """edited session.py to work with streams, and move msg_type to the header
3 """
3 """
4
4
5
5
6 import os
6 import os
7 import sys
7 import sys
8 import traceback
8 import traceback
9 import pprint
9 import pprint
10 import uuid
10 import uuid
11 from datetime import datetime
11 from datetime import datetime
12
12
13 import zmq
13 import zmq
14 from zmq.utils import jsonapi
14 from zmq.utils import jsonapi
15 from zmq.eventloop.zmqstream import ZMQStream
15 from zmq.eventloop.zmqstream import ZMQStream
16
16
17 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
17 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
18 from IPython.utils.newserialized import serialize, unserialize
18 from IPython.utils.newserialized import serialize, unserialize
19
19
20 from IPython.zmq.parallel.error import RemoteError
20 from IPython.zmq.parallel.error import RemoteError
21
21
22 try:
22 try:
23 import cPickle
23 import cPickle
24 pickle = cPickle
24 pickle = cPickle
25 except:
25 except:
26 cPickle = None
26 cPickle = None
27 import pickle
27 import pickle
28
28
29 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
29 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
30 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
30 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
31 if json_name in ('jsonlib', 'jsonlib2'):
31 if json_name in ('jsonlib', 'jsonlib2'):
32 use_json = True
32 use_json = True
33 elif json_name:
33 elif json_name:
34 if cPickle is None:
34 if cPickle is None:
35 use_json = True
35 use_json = True
36 else:
36 else:
37 use_json = False
37 use_json = False
38 else:
38 else:
39 use_json = False
39 use_json = False
40
40
41 def squash_unicode(obj):
41 def squash_unicode(obj):
42 if isinstance(obj,dict):
42 if isinstance(obj,dict):
43 for key in obj.keys():
43 for key in obj.keys():
44 obj[key] = squash_unicode(obj[key])
44 obj[key] = squash_unicode(obj[key])
45 if isinstance(key, unicode):
45 if isinstance(key, unicode):
46 obj[squash_unicode(key)] = obj.pop(key)
46 obj[squash_unicode(key)] = obj.pop(key)
47 elif isinstance(obj, list):
47 elif isinstance(obj, list):
48 for i,v in enumerate(obj):
48 for i,v in enumerate(obj):
49 obj[i] = squash_unicode(v)
49 obj[i] = squash_unicode(v)
50 elif isinstance(obj, unicode):
50 elif isinstance(obj, unicode):
51 obj = obj.encode('utf8')
51 obj = obj.encode('utf8')
52 return obj
52 return obj
53
53
54 if use_json:
54 if use_json:
55 default_packer = jsonapi.dumps
55 default_packer = jsonapi.dumps
56 default_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
56 default_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
57 else:
57 else:
58 default_packer = lambda o: pickle.dumps(o,-1)
58 default_packer = lambda o: pickle.dumps(o,-1)
59 default_unpacker = pickle.loads
59 default_unpacker = pickle.loads
60
60
61
61
62 DELIM="<IDS|MSG>"
62 DELIM="<IDS|MSG>"
63 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
63 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
64
64
65 def wrap_exception(engine_info={}):
65 def wrap_exception(engine_info={}):
66 etype, evalue, tb = sys.exc_info()
66 etype, evalue, tb = sys.exc_info()
67 stb = traceback.format_exception(etype, evalue, tb)
67 stb = traceback.format_exception(etype, evalue, tb)
68 exc_content = {
68 exc_content = {
69 'status' : 'error',
69 'status' : 'error',
70 'traceback' : stb,
70 'traceback' : stb,
71 'ename' : unicode(etype.__name__),
71 'ename' : unicode(etype.__name__),
72 'evalue' : unicode(evalue),
72 'evalue' : unicode(evalue),
73 'engine_info' : engine_info
73 'engine_info' : engine_info
74 }
74 }
75 return exc_content
75 return exc_content
76
76
77 def unwrap_exception(content):
77 def unwrap_exception(content):
78 err = RemoteError(content['ename'], content['evalue'],
78 err = RemoteError(content['ename'], content['evalue'],
79 ''.join(content['traceback']),
79 ''.join(content['traceback']),
80 content.get('engine_info', {}))
80 content.get('engine_info', {}))
81 return err
81 return err
82
82
83
83
84 class Message(object):
84 class Message(object):
85 """A simple message object that maps dict keys to attributes.
85 """A simple message object that maps dict keys to attributes.
86
86
87 A Message can be created from a dict and a dict from a Message instance
87 A Message can be created from a dict and a dict from a Message instance
88 simply by calling dict(msg_obj)."""
88 simply by calling dict(msg_obj)."""
89
89
90 def __init__(self, msg_dict):
90 def __init__(self, msg_dict):
91 dct = self.__dict__
91 dct = self.__dict__
92 for k, v in dict(msg_dict).iteritems():
92 for k, v in dict(msg_dict).iteritems():
93 if isinstance(v, dict):
93 if isinstance(v, dict):
94 v = Message(v)
94 v = Message(v)
95 dct[k] = v
95 dct[k] = v
96
96
97 # Having this iterator lets dict(msg_obj) work out of the box.
97 # Having this iterator lets dict(msg_obj) work out of the box.
98 def __iter__(self):
98 def __iter__(self):
99 return iter(self.__dict__.iteritems())
99 return iter(self.__dict__.iteritems())
100
100
101 def __repr__(self):
101 def __repr__(self):
102 return repr(self.__dict__)
102 return repr(self.__dict__)
103
103
104 def __str__(self):
104 def __str__(self):
105 return pprint.pformat(self.__dict__)
105 return pprint.pformat(self.__dict__)
106
106
107 def __contains__(self, k):
107 def __contains__(self, k):
108 return k in self.__dict__
108 return k in self.__dict__
109
109
110 def __getitem__(self, k):
110 def __getitem__(self, k):
111 return self.__dict__[k]
111 return self.__dict__[k]
112
112
113
113
114 def msg_header(msg_id, msg_type, username, session):
114 def msg_header(msg_id, msg_type, username, session):
115 date=datetime.now().strftime(ISO8601)
115 date=datetime.now().strftime(ISO8601)
116 return locals()
116 return locals()
117
117
118 def extract_header(msg_or_header):
118 def extract_header(msg_or_header):
119 """Given a message or header, return the header."""
119 """Given a message or header, return the header."""
120 if not msg_or_header:
120 if not msg_or_header:
121 return {}
121 return {}
122 try:
122 try:
123 # See if msg_or_header is the entire message.
123 # See if msg_or_header is the entire message.
124 h = msg_or_header['header']
124 h = msg_or_header['header']
125 except KeyError:
125 except KeyError:
126 try:
126 try:
127 # See if msg_or_header is just the header
127 # See if msg_or_header is just the header
128 h = msg_or_header['msg_id']
128 h = msg_or_header['msg_id']
129 except KeyError:
129 except KeyError:
130 raise
130 raise
131 else:
131 else:
132 h = msg_or_header
132 h = msg_or_header
133 if not isinstance(h, dict):
133 if not isinstance(h, dict):
134 h = dict(h)
134 h = dict(h)
135 return h
135 return h
136
136
137 def rekey(dikt):
137 def rekey(dikt):
138 """Rekey a dict that has been forced to use str keys where there should be
138 """Rekey a dict that has been forced to use str keys where there should be
139 ints by json. This belongs in the jsonutil added by fperez."""
139 ints by json. This belongs in the jsonutil added by fperez."""
140 for k in dikt.iterkeys():
140 for k in dikt.iterkeys():
141 if isinstance(k, str):
141 if isinstance(k, str):
142 ik=fk=None
142 ik=fk=None
143 try:
143 try:
144 ik = int(k)
144 ik = int(k)
145 except ValueError:
145 except ValueError:
146 try:
146 try:
147 fk = float(k)
147 fk = float(k)
148 except ValueError:
148 except ValueError:
149 continue
149 continue
150 if ik is not None:
150 if ik is not None:
151 nk = ik
151 nk = ik
152 else:
152 else:
153 nk = fk
153 nk = fk
154 if nk in dikt:
154 if nk in dikt:
155 raise KeyError("already have key %r"%nk)
155 raise KeyError("already have key %r"%nk)
156 dikt[nk] = dikt.pop(k)
156 dikt[nk] = dikt.pop(k)
157 return dikt
157 return dikt
158
158
159 def serialize_object(obj, threshold=64e-6):
159 def serialize_object(obj, threshold=64e-6):
160 """Serialize an object into a list of sendable buffers.
160 """Serialize an object into a list of sendable buffers.
161
161
162 Parameters
162 Parameters
163 ----------
163 ----------
164
164
165 obj : object
165 obj : object
166 The object to be serialized
166 The object to be serialized
167 threshold : float
167 threshold : float
168 The threshold for not double-pickling the content.
168 The threshold for not double-pickling the content.
169
169
170
170
171 Returns
171 Returns
172 -------
172 -------
173 ('pmd', [bufs]) :
173 ('pmd', [bufs]) :
174 where pmd is the pickled metadata wrapper,
174 where pmd is the pickled metadata wrapper,
175 bufs is a list of data buffers
175 bufs is a list of data buffers
176 """
176 """
177 databuffers = []
177 databuffers = []
178 if isinstance(obj, (list, tuple)):
178 if isinstance(obj, (list, tuple)):
179 clist = canSequence(obj)
179 clist = canSequence(obj)
180 slist = map(serialize, clist)
180 slist = map(serialize, clist)
181 for s in slist:
181 for s in slist:
182 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
182 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
183 databuffers.append(s.getData())
183 databuffers.append(s.getData())
184 s.data = None
184 s.data = None
185 return pickle.dumps(slist,-1), databuffers
185 return pickle.dumps(slist,-1), databuffers
186 elif isinstance(obj, dict):
186 elif isinstance(obj, dict):
187 sobj = {}
187 sobj = {}
188 for k in sorted(obj.iterkeys()):
188 for k in sorted(obj.iterkeys()):
189 s = serialize(can(obj[k]))
189 s = serialize(can(obj[k]))
190 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
190 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
191 databuffers.append(s.getData())
191 databuffers.append(s.getData())
192 s.data = None
192 s.data = None
193 sobj[k] = s
193 sobj[k] = s
194 return pickle.dumps(sobj,-1),databuffers
194 return pickle.dumps(sobj,-1),databuffers
195 else:
195 else:
196 s = serialize(can(obj))
196 s = serialize(can(obj))
197 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
197 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
198 databuffers.append(s.getData())
198 databuffers.append(s.getData())
199 s.data = None
199 s.data = None
200 return pickle.dumps(s,-1),databuffers
200 return pickle.dumps(s,-1),databuffers
201
201
202
202
203 def unserialize_object(bufs):
203 def unserialize_object(bufs):
204 """reconstruct an object serialized by serialize_object from data buffers."""
204 """reconstruct an object serialized by serialize_object from data buffers."""
205 bufs = list(bufs)
205 bufs = list(bufs)
206 sobj = pickle.loads(bufs.pop(0))
206 sobj = pickle.loads(bufs.pop(0))
207 if isinstance(sobj, (list, tuple)):
207 if isinstance(sobj, (list, tuple)):
208 for s in sobj:
208 for s in sobj:
209 if s.data is None:
209 if s.data is None:
210 s.data = bufs.pop(0)
210 s.data = bufs.pop(0)
211 return uncanSequence(map(unserialize, sobj)), bufs
211 return uncanSequence(map(unserialize, sobj)), bufs
212 elif isinstance(sobj, dict):
212 elif isinstance(sobj, dict):
213 newobj = {}
213 newobj = {}
214 for k in sorted(sobj.iterkeys()):
214 for k in sorted(sobj.iterkeys()):
215 s = sobj[k]
215 s = sobj[k]
216 if s.data is None:
216 if s.data is None:
217 s.data = bufs.pop(0)
217 s.data = bufs.pop(0)
218 newobj[k] = uncan(unserialize(s))
218 newobj[k] = uncan(unserialize(s))
219 return newobj, bufs
219 return newobj, bufs
220 else:
220 else:
221 if sobj.data is None:
221 if sobj.data is None:
222 sobj.data = bufs.pop(0)
222 sobj.data = bufs.pop(0)
223 return uncan(unserialize(sobj)), bufs
223 return uncan(unserialize(sobj)), bufs
224
224
225 def pack_apply_message(f, args, kwargs, threshold=64e-6):
225 def pack_apply_message(f, args, kwargs, threshold=64e-6):
226 """pack up a function, args, and kwargs to be sent over the wire
226 """pack up a function, args, and kwargs to be sent over the wire
227 as a series of buffers. Any object whose data is larger than `threshold`
227 as a series of buffers. Any object whose data is larger than `threshold`
228 will not have their data copied (currently only numpy arrays support zero-copy)"""
228 will not have their data copied (currently only numpy arrays support zero-copy)"""
229 msg = [pickle.dumps(can(f),-1)]
229 msg = [pickle.dumps(can(f),-1)]
230 databuffers = [] # for large objects
230 databuffers = [] # for large objects
231 sargs, bufs = serialize_object(args,threshold)
231 sargs, bufs = serialize_object(args,threshold)
232 msg.append(sargs)
232 msg.append(sargs)
233 databuffers.extend(bufs)
233 databuffers.extend(bufs)
234 skwargs, bufs = serialize_object(kwargs,threshold)
234 skwargs, bufs = serialize_object(kwargs,threshold)
235 msg.append(skwargs)
235 msg.append(skwargs)
236 databuffers.extend(bufs)
236 databuffers.extend(bufs)
237 msg.extend(databuffers)
237 msg.extend(databuffers)
238 return msg
238 return msg
239
239
240 def unpack_apply_message(bufs, g=None, copy=True):
240 def unpack_apply_message(bufs, g=None, copy=True):
241 """unpack f,args,kwargs from buffers packed by pack_apply_message()
241 """unpack f,args,kwargs from buffers packed by pack_apply_message()
242 Returns: original f,args,kwargs"""
242 Returns: original f,args,kwargs"""
243 bufs = list(bufs) # allow us to pop
243 bufs = list(bufs) # allow us to pop
244 assert len(bufs) >= 3, "not enough buffers!"
244 assert len(bufs) >= 3, "not enough buffers!"
245 if not copy:
245 if not copy:
246 for i in range(3):
246 for i in range(3):
247 bufs[i] = bufs[i].bytes
247 bufs[i] = bufs[i].bytes
248 cf = pickle.loads(bufs.pop(0))
248 cf = pickle.loads(bufs.pop(0))
249 sargs = list(pickle.loads(bufs.pop(0)))
249 sargs = list(pickle.loads(bufs.pop(0)))
250 skwargs = dict(pickle.loads(bufs.pop(0)))
250 skwargs = dict(pickle.loads(bufs.pop(0)))
251 # print sargs, skwargs
251 # print sargs, skwargs
252 f = uncan(cf, g)
252 f = uncan(cf, g)
253 for sa in sargs:
253 for sa in sargs:
254 if sa.data is None:
254 if sa.data is None:
255 m = bufs.pop(0)
255 m = bufs.pop(0)
256 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
256 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
257 if copy:
257 if copy:
258 sa.data = buffer(m)
258 sa.data = buffer(m)
259 else:
259 else:
260 sa.data = m.buffer
260 sa.data = m.buffer
261 else:
261 else:
262 if copy:
262 if copy:
263 sa.data = m
263 sa.data = m
264 else:
264 else:
265 sa.data = m.bytes
265 sa.data = m.bytes
266
266
267 args = uncanSequence(map(unserialize, sargs), g)
267 args = uncanSequence(map(unserialize, sargs), g)
268 kwargs = {}
268 kwargs = {}
269 for k in sorted(skwargs.iterkeys()):
269 for k in sorted(skwargs.iterkeys()):
270 sa = skwargs[k]
270 sa = skwargs[k]
271 if sa.data is None:
271 if sa.data is None:
272 sa.data = bufs.pop(0)
272 sa.data = bufs.pop(0)
273 kwargs[k] = uncan(unserialize(sa), g)
273 kwargs[k] = uncan(unserialize(sa), g)
274
274
275 return f,args,kwargs
275 return f,args,kwargs
276
276
277 class StreamSession(object):
277 class StreamSession(object):
278 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
278 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
279 debug=False
279 debug=False
280 key=None
280 key=None
281
281
282 def __init__(self, username=None, session=None, packer=None, unpacker=None, key=None, keyfile=None):
282 def __init__(self, username=None, session=None, packer=None, unpacker=None, key=None, keyfile=None):
283 if username is None:
283 if username is None:
284 username = os.environ.get('USER','username')
284 username = os.environ.get('USER','username')
285 self.username = username
285 self.username = username
286 if session is None:
286 if session is None:
287 self.session = str(uuid.uuid4())
287 self.session = str(uuid.uuid4())
288 else:
288 else:
289 self.session = session
289 self.session = session
290 self.msg_id = str(uuid.uuid4())
290 self.msg_id = str(uuid.uuid4())
291 if packer is None:
291 if packer is None:
292 self.pack = default_packer
292 self.pack = default_packer
293 else:
293 else:
294 if not callable(packer):
294 if not callable(packer):
295 raise TypeError("packer must be callable, not %s"%type(packer))
295 raise TypeError("packer must be callable, not %s"%type(packer))
296 self.pack = packer
296 self.pack = packer
297
297
298 if unpacker is None:
298 if unpacker is None:
299 self.unpack = default_unpacker
299 self.unpack = default_unpacker
300 else:
300 else:
301 if not callable(unpacker):
301 if not callable(unpacker):
302 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
302 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
303 self.unpack = unpacker
303 self.unpack = unpacker
304
304
305 if key is not None and keyfile is not None:
305 if key is not None and keyfile is not None:
306 raise TypeError("Must specify key OR keyfile, not both")
306 raise TypeError("Must specify key OR keyfile, not both")
307 if keyfile is not None:
307 if keyfile is not None:
308 with open(keyfile) as f:
308 with open(keyfile) as f:
309 self.key = f.read().strip()
309 self.key = f.read().strip()
310 else:
310 else:
311 self.key = key
311 self.key = key
312 # print key, keyfile, self.key
312 # print key, keyfile, self.key
313 self.none = self.pack({})
313 self.none = self.pack({})
314
314
315 def msg_header(self, msg_type):
315 def msg_header(self, msg_type):
316 h = msg_header(self.msg_id, msg_type, self.username, self.session)
316 h = msg_header(self.msg_id, msg_type, self.username, self.session)
317 self.msg_id = str(uuid.uuid4())
317 self.msg_id = str(uuid.uuid4())
318 return h
318 return h
319
319
320 def msg(self, msg_type, content=None, parent=None, subheader=None):
320 def msg(self, msg_type, content=None, parent=None, subheader=None):
321 msg = {}
321 msg = {}
322 msg['header'] = self.msg_header(msg_type)
322 msg['header'] = self.msg_header(msg_type)
323 msg['msg_id'] = msg['header']['msg_id']
323 msg['msg_id'] = msg['header']['msg_id']
324 msg['parent_header'] = {} if parent is None else extract_header(parent)
324 msg['parent_header'] = {} if parent is None else extract_header(parent)
325 msg['msg_type'] = msg_type
325 msg['msg_type'] = msg_type
326 msg['content'] = {} if content is None else content
326 msg['content'] = {} if content is None else content
327 sub = {} if subheader is None else subheader
327 sub = {} if subheader is None else subheader
328 msg['header'].update(sub)
328 msg['header'].update(sub)
329 return msg
329 return msg
330
330
331 def check_key(self, msg_or_header):
331 def check_key(self, msg_or_header):
332 """Check that a message's header has the right key"""
332 """Check that a message's header has the right key"""
333 if self.key is None:
333 if self.key is None:
334 return True
334 return True
335 header = extract_header(msg_or_header)
335 header = extract_header(msg_or_header)
336 return header.get('key', None) == self.key
336 return header.get('key', None) == self.key
337
337
338
338
339 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
339 def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
340 """Build and send a message via stream or socket.
340 """Build and send a message via stream or socket.
341
341
342 Parameters
342 Parameters
343 ----------
343 ----------
344
344
345 stream : zmq.Socket or ZMQStream
345 stream : zmq.Socket or ZMQStream
346 the socket-like object used to send the data
346 the socket-like object used to send the data
347 msg_type : str or Message/dict
347 msg_or_type : str or Message/dict
348 Normally, msg_type will be
348 Normally, msg_or_type will be a msg_type unless a message is being sent more
349
349 than once.
350
351
350
352 Returns
351 Returns
353 -------
352 -------
354 (msg,sent) : tuple
353 (msg,sent) : tuple
355 msg : Message
354 msg : Message
356 the nice wrapped dict-like object containing the headers
355 the nice wrapped dict-like object containing the headers
357
356
358 """
357 """
359 if isinstance(msg_type, (Message, dict)):
358 if isinstance(msg_or_type, (Message, dict)):
360 # we got a Message, not a msg_type
359 # we got a Message, not a msg_type
361 # don't build a new Message
360 # don't build a new Message
362 msg = msg_type
361 msg = msg_or_type
363 content = msg['content']
362 content = msg['content']
364 else:
363 else:
365 msg = self.msg(msg_type, content, parent, subheader)
364 msg = self.msg(msg_or_type, content, parent, subheader)
366 buffers = [] if buffers is None else buffers
365 buffers = [] if buffers is None else buffers
367 to_send = []
366 to_send = []
368 if isinstance(ident, list):
367 if isinstance(ident, list):
369 # accept list of idents
368 # accept list of idents
370 to_send.extend(ident)
369 to_send.extend(ident)
371 elif ident is not None:
370 elif ident is not None:
372 to_send.append(ident)
371 to_send.append(ident)
373 to_send.append(DELIM)
372 to_send.append(DELIM)
374 if self.key is not None:
373 if self.key is not None:
375 to_send.append(self.key)
374 to_send.append(self.key)
376 to_send.append(self.pack(msg['header']))
375 to_send.append(self.pack(msg['header']))
377 to_send.append(self.pack(msg['parent_header']))
376 to_send.append(self.pack(msg['parent_header']))
378
377
379 if content is None:
378 if content is None:
380 content = self.none
379 content = self.none
381 elif isinstance(content, dict):
380 elif isinstance(content, dict):
382 content = self.pack(content)
381 content = self.pack(content)
383 elif isinstance(content, str):
382 elif isinstance(content, str):
384 # content is already packed, as in a relayed message
383 # content is already packed, as in a relayed message
385 pass
384 pass
386 else:
385 else:
387 raise TypeError("Content incorrect type: %s"%type(content))
386 raise TypeError("Content incorrect type: %s"%type(content))
388 to_send.append(content)
387 to_send.append(content)
389 flag = 0
388 flag = 0
390 if buffers:
389 if buffers:
391 flag = zmq.SNDMORE
390 flag = zmq.SNDMORE
392 stream.send_multipart(to_send, flag, copy=False)
391 stream.send_multipart(to_send, flag, copy=False)
393 for b in buffers[:-1]:
392 for b in buffers[:-1]:
394 stream.send(b, flag, copy=False)
393 stream.send(b, flag, copy=False)
395 if buffers:
394 if buffers:
396 stream.send(buffers[-1], copy=False)
395 stream.send(buffers[-1], copy=False)
397 omsg = Message(msg)
396 omsg = Message(msg)
398 if self.debug:
397 if self.debug:
399 pprint.pprint(omsg)
398 pprint.pprint(omsg)
400 pprint.pprint(to_send)
399 pprint.pprint(to_send)
401 pprint.pprint(buffers)
400 pprint.pprint(buffers)
402 return omsg
401 return omsg
403
402
404 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
403 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
405 """Send a raw message via ident path.
404 """Send a raw message via ident path.
406
405
407 Parameters
406 Parameters
408 ----------
407 ----------
409 msg : list of sendable buffers"""
408 msg : list of sendable buffers"""
410 to_send = []
409 to_send = []
411 if isinstance(ident, str):
410 if isinstance(ident, str):
412 ident = [ident]
411 ident = [ident]
413 if ident is not None:
412 if ident is not None:
414 to_send.extend(ident)
413 to_send.extend(ident)
415 to_send.append(DELIM)
414 to_send.append(DELIM)
416 if self.key is not None:
415 if self.key is not None:
417 to_send.append(self.key)
416 to_send.append(self.key)
418 to_send.extend(msg)
417 to_send.extend(msg)
419 stream.send_multipart(msg, flags, copy=copy)
418 stream.send_multipart(msg, flags, copy=copy)
420
419
421 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
420 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
422 """receives and unpacks a message
421 """receives and unpacks a message
423 returns [idents], msg"""
422 returns [idents], msg"""
424 if isinstance(socket, ZMQStream):
423 if isinstance(socket, ZMQStream):
425 socket = socket.socket
424 socket = socket.socket
426 try:
425 try:
427 msg = socket.recv_multipart(mode)
426 msg = socket.recv_multipart(mode)
428 except zmq.ZMQError as e:
427 except zmq.ZMQError as e:
429 if e.errno == zmq.EAGAIN:
428 if e.errno == zmq.EAGAIN:
430 # We can convert EAGAIN to None as we know in this case
429 # We can convert EAGAIN to None as we know in this case
431 # recv_json won't return None.
430 # recv_json won't return None.
432 return None
431 return None
433 else:
432 else:
434 raise
433 raise
435 # return an actual Message object
434 # return an actual Message object
436 # determine the number of idents by trying to unpack them.
435 # determine the number of idents by trying to unpack them.
437 # this is terrible:
436 # this is terrible:
438 idents, msg = self.feed_identities(msg, copy)
437 idents, msg = self.feed_identities(msg, copy)
439 try:
438 try:
440 return idents, self.unpack_message(msg, content=content, copy=copy)
439 return idents, self.unpack_message(msg, content=content, copy=copy)
441 except Exception as e:
440 except Exception as e:
442 print (idents, msg)
441 print (idents, msg)
443 # TODO: handle it
442 # TODO: handle it
444 raise e
443 raise e
445
444
446 def feed_identities(self, msg, copy=True):
445 def feed_identities(self, msg, copy=True):
447 """feed until DELIM is reached, then return the prefix as idents and remainder as
446 """feed until DELIM is reached, then return the prefix as idents and remainder as
448 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
447 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
449
448
450 Parameters
449 Parameters
451 ----------
450 ----------
452 msg : a list of Message or bytes objects
451 msg : a list of Message or bytes objects
453 the message to be split
452 the message to be split
454 copy : bool
453 copy : bool
455 flag determining whether the arguments are bytes or Messages
454 flag determining whether the arguments are bytes or Messages
456
455
457 Returns
456 Returns
458 -------
457 -------
459 (idents,msg) : two lists
458 (idents,msg) : two lists
460 idents will always be a list of bytes - the indentity prefix
459 idents will always be a list of bytes - the indentity prefix
461 msg will be a list of bytes or Messages, unchanged from input
460 msg will be a list of bytes or Messages, unchanged from input
462 msg should be unpackable via self.unpack_message at this point.
461 msg should be unpackable via self.unpack_message at this point.
463 """
462 """
464 msg = list(msg)
463 msg = list(msg)
465 idents = []
464 idents = []
466 while len(msg) > 3:
465 while len(msg) > 3:
467 if copy:
466 if copy:
468 s = msg[0]
467 s = msg[0]
469 else:
468 else:
470 s = msg[0].bytes
469 s = msg[0].bytes
471 if s == DELIM:
470 if s == DELIM:
472 msg.pop(0)
471 msg.pop(0)
473 break
472 break
474 else:
473 else:
475 idents.append(s)
474 idents.append(s)
476 msg.pop(0)
475 msg.pop(0)
477
476
478 return idents, msg
477 return idents, msg
479
478
480 def unpack_message(self, msg, content=True, copy=True):
479 def unpack_message(self, msg, content=True, copy=True):
481 """Return a message object from the format
480 """Return a message object from the format
482 sent by self.send.
481 sent by self.send.
483
482
484 Parameters:
483 Parameters:
485 -----------
484 -----------
486
485
487 content : bool (True)
486 content : bool (True)
488 whether to unpack the content dict (True),
487 whether to unpack the content dict (True),
489 or leave it serialized (False)
488 or leave it serialized (False)
490
489
491 copy : bool (True)
490 copy : bool (True)
492 whether to return the bytes (True),
491 whether to return the bytes (True),
493 or the non-copying Message object in each place (False)
492 or the non-copying Message object in each place (False)
494
493
495 """
494 """
496 ikey = int(self.key is not None)
495 ikey = int(self.key is not None)
497 minlen = 3 + ikey
496 minlen = 3 + ikey
498 if not len(msg) >= minlen:
497 if not len(msg) >= minlen:
499 raise TypeError("malformed message, must have at least %i elements"%minlen)
498 raise TypeError("malformed message, must have at least %i elements"%minlen)
500 message = {}
499 message = {}
501 if not copy:
500 if not copy:
502 for i in range(minlen):
501 for i in range(minlen):
503 msg[i] = msg[i].bytes
502 msg[i] = msg[i].bytes
504 if ikey:
503 if ikey:
505 if not self.key == msg[0]:
504 if not self.key == msg[0]:
506 raise KeyError("Invalid Session Key: %s"%msg[0])
505 raise KeyError("Invalid Session Key: %s"%msg[0])
507 message['header'] = self.unpack(msg[ikey+0])
506 message['header'] = self.unpack(msg[ikey+0])
508 message['msg_type'] = message['header']['msg_type']
507 message['msg_type'] = message['header']['msg_type']
509 message['parent_header'] = self.unpack(msg[ikey+1])
508 message['parent_header'] = self.unpack(msg[ikey+1])
510 if content:
509 if content:
511 message['content'] = self.unpack(msg[ikey+2])
510 message['content'] = self.unpack(msg[ikey+2])
512 else:
511 else:
513 message['content'] = msg[ikey+2]
512 message['content'] = msg[ikey+2]
514
513
515 # message['buffers'] = msg[3:]
514 # message['buffers'] = msg[3:]
516 # else:
515 # else:
517 # message['header'] = self.unpack(msg[0].bytes)
516 # message['header'] = self.unpack(msg[0].bytes)
518 # message['msg_type'] = message['header']['msg_type']
517 # message['msg_type'] = message['header']['msg_type']
519 # message['parent_header'] = self.unpack(msg[1].bytes)
518 # message['parent_header'] = self.unpack(msg[1].bytes)
520 # if content:
519 # if content:
521 # message['content'] = self.unpack(msg[2].bytes)
520 # message['content'] = self.unpack(msg[2].bytes)
522 # else:
521 # else:
523 # message['content'] = msg[2].bytes
522 # message['content'] = msg[2].bytes
524
523
525 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
524 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
526 return message
525 return message
527
526
528
527
529
528
530 def test_msg2obj():
529 def test_msg2obj():
531 am = dict(x=1)
530 am = dict(x=1)
532 ao = Message(am)
531 ao = Message(am)
533 assert ao.x == am['x']
532 assert ao.x == am['x']
534
533
535 am['y'] = dict(z=1)
534 am['y'] = dict(z=1)
536 ao = Message(am)
535 ao = Message(am)
537 assert ao.y.z == am['y']['z']
536 assert ao.y.z == am['y']['z']
538
537
539 k1, k2 = 'y', 'z'
538 k1, k2 = 'y', 'z'
540 assert ao[k1][k2] == am[k1][k2]
539 assert ao[k1][k2] == am[k1][k2]
541
540
542 am2 = dict(ao)
541 am2 = dict(ao)
543 assert am['x'] == am2['x']
542 assert am['x'] == am2['x']
544 assert am['y']['z'] == am2['y']['z']
543 assert am['y']['z'] == am2['y']['z']
General Comments 0
You need to be logged in to leave comments. Login now