|
@@
-1,510
+1,510
b''
|
|
1
|
#!/usr/bin/env python
|
|
1
|
#!/usr/bin/env python
|
|
2
|
"""edited session.py to work with streams, and move msg_type to the header
|
|
2
|
"""edited session.py to work with streams, and move msg_type to the header
|
|
3
|
"""
|
|
3
|
"""
|
|
4
|
|
|
4
|
|
|
5
|
|
|
5
|
|
|
6
|
import os
|
|
6
|
import os
|
|
7
|
import sys
|
|
7
|
import sys
|
|
8
|
import traceback
|
|
8
|
import traceback
|
|
9
|
import pprint
|
|
9
|
import pprint
|
|
10
|
import uuid
|
|
10
|
import uuid
|
|
11
|
from datetime import datetime
|
|
11
|
from datetime import datetime
|
|
12
|
|
|
12
|
|
|
13
|
import zmq
|
|
13
|
import zmq
|
|
14
|
from zmq.utils import jsonapi
|
|
14
|
from zmq.utils import jsonapi
|
|
15
|
from zmq.eventloop.zmqstream import ZMQStream
|
|
15
|
from zmq.eventloop.zmqstream import ZMQStream
|
|
16
|
|
|
16
|
|
|
17
|
from IPython.zmq.pickleutil import can, uncan, canSequence, uncanSequence
|
|
17
|
from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
|
|
18
|
from IPython.zmq.newserialized import serialize, unserialize
|
|
18
|
from IPython.utils.newserialized import serialize, unserialize
|
|
19
|
|
|
19
|
|
|
20
|
try:
|
|
20
|
try:
|
|
21
|
import cPickle
|
|
21
|
import cPickle
|
|
22
|
pickle = cPickle
|
|
22
|
pickle = cPickle
|
|
23
|
except:
|
|
23
|
except:
|
|
24
|
cPickle = None
|
|
24
|
cPickle = None
|
|
25
|
import pickle
|
|
25
|
import pickle
|
|
26
|
|
|
26
|
|
|
27
|
# packer priority: jsonlib[2], cPickle, simplejson/json, pickle
|
|
27
|
# packer priority: jsonlib[2], cPickle, simplejson/json, pickle
|
|
28
|
json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
|
|
28
|
json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
|
|
29
|
if json_name in ('jsonlib', 'jsonlib2'):
|
|
29
|
if json_name in ('jsonlib', 'jsonlib2'):
|
|
30
|
use_json = True
|
|
30
|
use_json = True
|
|
31
|
elif json_name:
|
|
31
|
elif json_name:
|
|
32
|
if cPickle is None:
|
|
32
|
if cPickle is None:
|
|
33
|
use_json = True
|
|
33
|
use_json = True
|
|
34
|
else:
|
|
34
|
else:
|
|
35
|
use_json = False
|
|
35
|
use_json = False
|
|
36
|
else:
|
|
36
|
else:
|
|
37
|
use_json = False
|
|
37
|
use_json = False
|
|
38
|
|
|
38
|
|
|
39
|
def squash_unicode(obj):
|
|
39
|
def squash_unicode(obj):
|
|
40
|
if isinstance(obj,dict):
|
|
40
|
if isinstance(obj,dict):
|
|
41
|
for key in obj.keys():
|
|
41
|
for key in obj.keys():
|
|
42
|
obj[key] = squash_unicode(obj[key])
|
|
42
|
obj[key] = squash_unicode(obj[key])
|
|
43
|
if isinstance(key, unicode):
|
|
43
|
if isinstance(key, unicode):
|
|
44
|
obj[squash_unicode(key)] = obj.pop(key)
|
|
44
|
obj[squash_unicode(key)] = obj.pop(key)
|
|
45
|
elif isinstance(obj, list):
|
|
45
|
elif isinstance(obj, list):
|
|
46
|
for i,v in enumerate(obj):
|
|
46
|
for i,v in enumerate(obj):
|
|
47
|
obj[i] = squash_unicode(v)
|
|
47
|
obj[i] = squash_unicode(v)
|
|
48
|
elif isinstance(obj, unicode):
|
|
48
|
elif isinstance(obj, unicode):
|
|
49
|
obj = obj.encode('utf8')
|
|
49
|
obj = obj.encode('utf8')
|
|
50
|
return obj
|
|
50
|
return obj
|
|
51
|
|
|
51
|
|
|
52
|
if use_json:
|
|
52
|
if use_json:
|
|
53
|
default_packer = jsonapi.dumps
|
|
53
|
default_packer = jsonapi.dumps
|
|
54
|
default_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
|
|
54
|
default_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
|
|
55
|
else:
|
|
55
|
else:
|
|
56
|
default_packer = lambda o: pickle.dumps(o,-1)
|
|
56
|
default_packer = lambda o: pickle.dumps(o,-1)
|
|
57
|
default_unpacker = pickle.loads
|
|
57
|
default_unpacker = pickle.loads
|
|
58
|
|
|
58
|
|
|
59
|
|
|
59
|
|
|
60
|
DELIM="<IDS|MSG>"
|
|
60
|
DELIM="<IDS|MSG>"
|
|
61
|
|
|
61
|
|
|
62
|
def wrap_exception():
|
|
62
|
def wrap_exception():
|
|
63
|
etype, evalue, tb = sys.exc_info()
|
|
63
|
etype, evalue, tb = sys.exc_info()
|
|
64
|
tb = traceback.format_exception(etype, evalue, tb)
|
|
64
|
tb = traceback.format_exception(etype, evalue, tb)
|
|
65
|
exc_content = {
|
|
65
|
exc_content = {
|
|
66
|
'status' : 'error',
|
|
66
|
'status' : 'error',
|
|
67
|
'traceback' : str(tb),
|
|
67
|
'traceback' : str(tb),
|
|
68
|
'etype' : str(etype),
|
|
68
|
'etype' : str(etype),
|
|
69
|
'evalue' : str(evalue)
|
|
69
|
'evalue' : str(evalue)
|
|
70
|
}
|
|
70
|
}
|
|
71
|
return exc_content
|
|
71
|
return exc_content
|
|
72
|
|
|
72
|
|
|
73
|
class KernelError(Exception):
|
|
73
|
class KernelError(Exception):
|
|
74
|
pass
|
|
74
|
pass
|
|
75
|
|
|
75
|
|
|
76
|
def unwrap_exception(content):
|
|
76
|
def unwrap_exception(content):
|
|
77
|
err = KernelError(content['etype'], content['evalue'])
|
|
77
|
err = KernelError(content['etype'], content['evalue'])
|
|
78
|
err.evalue = content['evalue']
|
|
78
|
err.evalue = content['evalue']
|
|
79
|
err.etype = content['etype']
|
|
79
|
err.etype = content['etype']
|
|
80
|
err.traceback = ''.join(content['traceback'])
|
|
80
|
err.traceback = ''.join(content['traceback'])
|
|
81
|
return err
|
|
81
|
return err
|
|
82
|
|
|
82
|
|
|
83
|
|
|
83
|
|
|
84
|
class Message(object):
|
|
84
|
class Message(object):
|
|
85
|
"""A simple message object that maps dict keys to attributes.
|
|
85
|
"""A simple message object that maps dict keys to attributes.
|
|
86
|
|
|
86
|
|
|
87
|
A Message can be created from a dict and a dict from a Message instance
|
|
87
|
A Message can be created from a dict and a dict from a Message instance
|
|
88
|
simply by calling dict(msg_obj)."""
|
|
88
|
simply by calling dict(msg_obj)."""
|
|
89
|
|
|
89
|
|
|
90
|
def __init__(self, msg_dict):
|
|
90
|
def __init__(self, msg_dict):
|
|
91
|
dct = self.__dict__
|
|
91
|
dct = self.__dict__
|
|
92
|
for k, v in dict(msg_dict).iteritems():
|
|
92
|
for k, v in dict(msg_dict).iteritems():
|
|
93
|
if isinstance(v, dict):
|
|
93
|
if isinstance(v, dict):
|
|
94
|
v = Message(v)
|
|
94
|
v = Message(v)
|
|
95
|
dct[k] = v
|
|
95
|
dct[k] = v
|
|
96
|
|
|
96
|
|
|
97
|
# Having this iterator lets dict(msg_obj) work out of the box.
|
|
97
|
# Having this iterator lets dict(msg_obj) work out of the box.
|
|
98
|
def __iter__(self):
|
|
98
|
def __iter__(self):
|
|
99
|
return iter(self.__dict__.iteritems())
|
|
99
|
return iter(self.__dict__.iteritems())
|
|
100
|
|
|
100
|
|
|
101
|
def __repr__(self):
|
|
101
|
def __repr__(self):
|
|
102
|
return repr(self.__dict__)
|
|
102
|
return repr(self.__dict__)
|
|
103
|
|
|
103
|
|
|
104
|
def __str__(self):
|
|
104
|
def __str__(self):
|
|
105
|
return pprint.pformat(self.__dict__)
|
|
105
|
return pprint.pformat(self.__dict__)
|
|
106
|
|
|
106
|
|
|
107
|
def __contains__(self, k):
|
|
107
|
def __contains__(self, k):
|
|
108
|
return k in self.__dict__
|
|
108
|
return k in self.__dict__
|
|
109
|
|
|
109
|
|
|
110
|
def __getitem__(self, k):
|
|
110
|
def __getitem__(self, k):
|
|
111
|
return self.__dict__[k]
|
|
111
|
return self.__dict__[k]
|
|
112
|
|
|
112
|
|
|
113
|
|
|
113
|
|
|
114
|
def msg_header(msg_id, msg_type, username, session):
|
|
114
|
def msg_header(msg_id, msg_type, username, session):
|
|
115
|
date=datetime.now().isoformat()
|
|
115
|
date=datetime.now().isoformat()
|
|
116
|
return locals()
|
|
116
|
return locals()
|
|
117
|
# return {
|
|
117
|
# return {
|
|
118
|
# 'msg_id' : msg_id,
|
|
118
|
# 'msg_id' : msg_id,
|
|
119
|
# 'msg_type': msg_type,
|
|
119
|
# 'msg_type': msg_type,
|
|
120
|
# 'username' : username,
|
|
120
|
# 'username' : username,
|
|
121
|
# 'session' : session
|
|
121
|
# 'session' : session
|
|
122
|
# }
|
|
122
|
# }
|
|
123
|
|
|
123
|
|
|
124
|
|
|
124
|
|
|
125
|
def extract_header(msg_or_header):
|
|
125
|
def extract_header(msg_or_header):
|
|
126
|
"""Given a message or header, return the header."""
|
|
126
|
"""Given a message or header, return the header."""
|
|
127
|
if not msg_or_header:
|
|
127
|
if not msg_or_header:
|
|
128
|
return {}
|
|
128
|
return {}
|
|
129
|
try:
|
|
129
|
try:
|
|
130
|
# See if msg_or_header is the entire message.
|
|
130
|
# See if msg_or_header is the entire message.
|
|
131
|
h = msg_or_header['header']
|
|
131
|
h = msg_or_header['header']
|
|
132
|
except KeyError:
|
|
132
|
except KeyError:
|
|
133
|
try:
|
|
133
|
try:
|
|
134
|
# See if msg_or_header is just the header
|
|
134
|
# See if msg_or_header is just the header
|
|
135
|
h = msg_or_header['msg_id']
|
|
135
|
h = msg_or_header['msg_id']
|
|
136
|
except KeyError:
|
|
136
|
except KeyError:
|
|
137
|
raise
|
|
137
|
raise
|
|
138
|
else:
|
|
138
|
else:
|
|
139
|
h = msg_or_header
|
|
139
|
h = msg_or_header
|
|
140
|
if not isinstance(h, dict):
|
|
140
|
if not isinstance(h, dict):
|
|
141
|
h = dict(h)
|
|
141
|
h = dict(h)
|
|
142
|
return h
|
|
142
|
return h
|
|
143
|
|
|
143
|
|
|
144
|
def rekey(dikt):
|
|
144
|
def rekey(dikt):
|
|
145
|
"""Rekey a dict that has been forced to use str keys where there should be
|
|
145
|
"""Rekey a dict that has been forced to use str keys where there should be
|
|
146
|
ints by json. This belongs in the jsonutil added by fperez."""
|
|
146
|
ints by json. This belongs in the jsonutil added by fperez."""
|
|
147
|
for k in dikt.iterkeys():
|
|
147
|
for k in dikt.iterkeys():
|
|
148
|
if isinstance(k, str):
|
|
148
|
if isinstance(k, str):
|
|
149
|
ik=fk=None
|
|
149
|
ik=fk=None
|
|
150
|
try:
|
|
150
|
try:
|
|
151
|
ik = int(k)
|
|
151
|
ik = int(k)
|
|
152
|
except ValueError:
|
|
152
|
except ValueError:
|
|
153
|
try:
|
|
153
|
try:
|
|
154
|
fk = float(k)
|
|
154
|
fk = float(k)
|
|
155
|
except ValueError:
|
|
155
|
except ValueError:
|
|
156
|
continue
|
|
156
|
continue
|
|
157
|
if ik is not None:
|
|
157
|
if ik is not None:
|
|
158
|
nk = ik
|
|
158
|
nk = ik
|
|
159
|
else:
|
|
159
|
else:
|
|
160
|
nk = fk
|
|
160
|
nk = fk
|
|
161
|
if nk in dikt:
|
|
161
|
if nk in dikt:
|
|
162
|
raise KeyError("already have key %r"%nk)
|
|
162
|
raise KeyError("already have key %r"%nk)
|
|
163
|
dikt[nk] = dikt.pop(k)
|
|
163
|
dikt[nk] = dikt.pop(k)
|
|
164
|
return dikt
|
|
164
|
return dikt
|
|
165
|
|
|
165
|
|
|
166
|
def serialize_object(obj, threshold=64e-6):
|
|
166
|
def serialize_object(obj, threshold=64e-6):
|
|
167
|
"""Serialize an object into a list of sendable buffers.
|
|
167
|
"""Serialize an object into a list of sendable buffers.
|
|
168
|
|
|
168
|
|
|
169
|
Parameters
|
|
169
|
Parameters
|
|
170
|
----------
|
|
170
|
----------
|
|
171
|
|
|
171
|
|
|
172
|
obj : object
|
|
172
|
obj : object
|
|
173
|
The object to be serialized
|
|
173
|
The object to be serialized
|
|
174
|
threshold : float
|
|
174
|
threshold : float
|
|
175
|
The threshold for not double-pickling the content.
|
|
175
|
The threshold for not double-pickling the content.
|
|
176
|
|
|
176
|
|
|
177
|
|
|
177
|
|
|
178
|
Returns
|
|
178
|
Returns
|
|
179
|
-------
|
|
179
|
-------
|
|
180
|
('pmd', [bufs]) :
|
|
180
|
('pmd', [bufs]) :
|
|
181
|
where pmd is the pickled metadata wrapper,
|
|
181
|
where pmd is the pickled metadata wrapper,
|
|
182
|
bufs is a list of data buffers"""
|
|
182
|
bufs is a list of data buffers"""
|
|
183
|
# threshold is 100 B
|
|
183
|
# threshold is 100 B
|
|
184
|
databuffers = []
|
|
184
|
databuffers = []
|
|
185
|
if isinstance(obj, (list, tuple)):
|
|
185
|
if isinstance(obj, (list, tuple)):
|
|
186
|
clist = canSequence(obj)
|
|
186
|
clist = canSequence(obj)
|
|
187
|
slist = map(serialize, clist)
|
|
187
|
slist = map(serialize, clist)
|
|
188
|
for s in slist:
|
|
188
|
for s in slist:
|
|
189
|
if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
|
|
189
|
if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
|
|
190
|
databuffers.append(s.getData())
|
|
190
|
databuffers.append(s.getData())
|
|
191
|
s.data = None
|
|
191
|
s.data = None
|
|
192
|
return pickle.dumps(slist,-1), databuffers
|
|
192
|
return pickle.dumps(slist,-1), databuffers
|
|
193
|
elif isinstance(obj, dict):
|
|
193
|
elif isinstance(obj, dict):
|
|
194
|
sobj = {}
|
|
194
|
sobj = {}
|
|
195
|
for k in sorted(obj.iterkeys()):
|
|
195
|
for k in sorted(obj.iterkeys()):
|
|
196
|
s = serialize(can(obj[k]))
|
|
196
|
s = serialize(can(obj[k]))
|
|
197
|
if s.getDataSize() > threshold:
|
|
197
|
if s.getDataSize() > threshold:
|
|
198
|
databuffers.append(s.getData())
|
|
198
|
databuffers.append(s.getData())
|
|
199
|
s.data = None
|
|
199
|
s.data = None
|
|
200
|
sobj[k] = s
|
|
200
|
sobj[k] = s
|
|
201
|
return pickle.dumps(sobj,-1),databuffers
|
|
201
|
return pickle.dumps(sobj,-1),databuffers
|
|
202
|
else:
|
|
202
|
else:
|
|
203
|
s = serialize(can(obj))
|
|
203
|
s = serialize(can(obj))
|
|
204
|
if s.getDataSize() > threshold:
|
|
204
|
if s.getDataSize() > threshold:
|
|
205
|
databuffers.append(s.getData())
|
|
205
|
databuffers.append(s.getData())
|
|
206
|
s.data = None
|
|
206
|
s.data = None
|
|
207
|
return pickle.dumps(s,-1),databuffers
|
|
207
|
return pickle.dumps(s,-1),databuffers
|
|
208
|
|
|
208
|
|
|
209
|
|
|
209
|
|
|
210
|
def unserialize_object(bufs):
|
|
210
|
def unserialize_object(bufs):
|
|
211
|
"""reconstruct an object serialized by serialize_object from data buffers"""
|
|
211
|
"""reconstruct an object serialized by serialize_object from data buffers"""
|
|
212
|
bufs = list(bufs)
|
|
212
|
bufs = list(bufs)
|
|
213
|
sobj = pickle.loads(bufs.pop(0))
|
|
213
|
sobj = pickle.loads(bufs.pop(0))
|
|
214
|
if isinstance(sobj, (list, tuple)):
|
|
214
|
if isinstance(sobj, (list, tuple)):
|
|
215
|
for s in sobj:
|
|
215
|
for s in sobj:
|
|
216
|
if s.data is None:
|
|
216
|
if s.data is None:
|
|
217
|
s.data = bufs.pop(0)
|
|
217
|
s.data = bufs.pop(0)
|
|
218
|
return uncanSequence(map(unserialize, sobj))
|
|
218
|
return uncanSequence(map(unserialize, sobj))
|
|
219
|
elif isinstance(sobj, dict):
|
|
219
|
elif isinstance(sobj, dict):
|
|
220
|
newobj = {}
|
|
220
|
newobj = {}
|
|
221
|
for k in sorted(sobj.iterkeys()):
|
|
221
|
for k in sorted(sobj.iterkeys()):
|
|
222
|
s = sobj[k]
|
|
222
|
s = sobj[k]
|
|
223
|
if s.data is None:
|
|
223
|
if s.data is None:
|
|
224
|
s.data = bufs.pop(0)
|
|
224
|
s.data = bufs.pop(0)
|
|
225
|
newobj[k] = uncan(unserialize(s))
|
|
225
|
newobj[k] = uncan(unserialize(s))
|
|
226
|
return newobj
|
|
226
|
return newobj
|
|
227
|
else:
|
|
227
|
else:
|
|
228
|
if sobj.data is None:
|
|
228
|
if sobj.data is None:
|
|
229
|
sobj.data = bufs.pop(0)
|
|
229
|
sobj.data = bufs.pop(0)
|
|
230
|
return uncan(unserialize(sobj))
|
|
230
|
return uncan(unserialize(sobj))
|
|
231
|
|
|
231
|
|
|
232
|
def pack_apply_message(f, args, kwargs, threshold=64e-6):
|
|
232
|
def pack_apply_message(f, args, kwargs, threshold=64e-6):
|
|
233
|
"""pack up a function, args, and kwargs to be sent over the wire
|
|
233
|
"""pack up a function, args, and kwargs to be sent over the wire
|
|
234
|
as a series of buffers. Any object whose data is larger than `threshold`
|
|
234
|
as a series of buffers. Any object whose data is larger than `threshold`
|
|
235
|
will not have their data copied (currently only numpy arrays support zero-copy)"""
|
|
235
|
will not have their data copied (currently only numpy arrays support zero-copy)"""
|
|
236
|
msg = [pickle.dumps(can(f),-1)]
|
|
236
|
msg = [pickle.dumps(can(f),-1)]
|
|
237
|
databuffers = [] # for large objects
|
|
237
|
databuffers = [] # for large objects
|
|
238
|
sargs, bufs = serialize_object(args,threshold)
|
|
238
|
sargs, bufs = serialize_object(args,threshold)
|
|
239
|
msg.append(sargs)
|
|
239
|
msg.append(sargs)
|
|
240
|
databuffers.extend(bufs)
|
|
240
|
databuffers.extend(bufs)
|
|
241
|
skwargs, bufs = serialize_object(kwargs,threshold)
|
|
241
|
skwargs, bufs = serialize_object(kwargs,threshold)
|
|
242
|
msg.append(skwargs)
|
|
242
|
msg.append(skwargs)
|
|
243
|
databuffers.extend(bufs)
|
|
243
|
databuffers.extend(bufs)
|
|
244
|
msg.extend(databuffers)
|
|
244
|
msg.extend(databuffers)
|
|
245
|
return msg
|
|
245
|
return msg
|
|
246
|
|
|
246
|
|
|
247
|
def unpack_apply_message(bufs, g=None, copy=True):
|
|
247
|
def unpack_apply_message(bufs, g=None, copy=True):
|
|
248
|
"""unpack f,args,kwargs from buffers packed by pack_apply_message()
|
|
248
|
"""unpack f,args,kwargs from buffers packed by pack_apply_message()
|
|
249
|
Returns: original f,args,kwargs"""
|
|
249
|
Returns: original f,args,kwargs"""
|
|
250
|
bufs = list(bufs) # allow us to pop
|
|
250
|
bufs = list(bufs) # allow us to pop
|
|
251
|
assert len(bufs) >= 3, "not enough buffers!"
|
|
251
|
assert len(bufs) >= 3, "not enough buffers!"
|
|
252
|
if not copy:
|
|
252
|
if not copy:
|
|
253
|
for i in range(3):
|
|
253
|
for i in range(3):
|
|
254
|
bufs[i] = bufs[i].bytes
|
|
254
|
bufs[i] = bufs[i].bytes
|
|
255
|
cf = pickle.loads(bufs.pop(0))
|
|
255
|
cf = pickle.loads(bufs.pop(0))
|
|
256
|
sargs = list(pickle.loads(bufs.pop(0)))
|
|
256
|
sargs = list(pickle.loads(bufs.pop(0)))
|
|
257
|
skwargs = dict(pickle.loads(bufs.pop(0)))
|
|
257
|
skwargs = dict(pickle.loads(bufs.pop(0)))
|
|
258
|
# print sargs, skwargs
|
|
258
|
# print sargs, skwargs
|
|
259
|
f = uncan(cf, g)
|
|
259
|
f = uncan(cf, g)
|
|
260
|
for sa in sargs:
|
|
260
|
for sa in sargs:
|
|
261
|
if sa.data is None:
|
|
261
|
if sa.data is None:
|
|
262
|
m = bufs.pop(0)
|
|
262
|
m = bufs.pop(0)
|
|
263
|
if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
|
|
263
|
if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
|
|
264
|
if copy:
|
|
264
|
if copy:
|
|
265
|
sa.data = buffer(m)
|
|
265
|
sa.data = buffer(m)
|
|
266
|
else:
|
|
266
|
else:
|
|
267
|
sa.data = m.buffer
|
|
267
|
sa.data = m.buffer
|
|
268
|
else:
|
|
268
|
else:
|
|
269
|
if copy:
|
|
269
|
if copy:
|
|
270
|
sa.data = m
|
|
270
|
sa.data = m
|
|
271
|
else:
|
|
271
|
else:
|
|
272
|
sa.data = m.bytes
|
|
272
|
sa.data = m.bytes
|
|
273
|
|
|
273
|
|
|
274
|
args = uncanSequence(map(unserialize, sargs), g)
|
|
274
|
args = uncanSequence(map(unserialize, sargs), g)
|
|
275
|
kwargs = {}
|
|
275
|
kwargs = {}
|
|
276
|
for k in sorted(skwargs.iterkeys()):
|
|
276
|
for k in sorted(skwargs.iterkeys()):
|
|
277
|
sa = skwargs[k]
|
|
277
|
sa = skwargs[k]
|
|
278
|
if sa.data is None:
|
|
278
|
if sa.data is None:
|
|
279
|
sa.data = bufs.pop(0)
|
|
279
|
sa.data = bufs.pop(0)
|
|
280
|
kwargs[k] = uncan(unserialize(sa), g)
|
|
280
|
kwargs[k] = uncan(unserialize(sa), g)
|
|
281
|
|
|
281
|
|
|
282
|
return f,args,kwargs
|
|
282
|
return f,args,kwargs
|
|
283
|
|
|
283
|
|
|
284
|
class StreamSession(object):
|
|
284
|
class StreamSession(object):
|
|
285
|
"""tweaked version of IPython.zmq.session.Session, for development in Parallel"""
|
|
285
|
"""tweaked version of IPython.zmq.session.Session, for development in Parallel"""
|
|
286
|
debug=False
|
|
286
|
debug=False
|
|
287
|
def __init__(self, username=None, session=None, packer=None, unpacker=None):
|
|
287
|
def __init__(self, username=None, session=None, packer=None, unpacker=None):
|
|
288
|
if username is None:
|
|
288
|
if username is None:
|
|
289
|
username = os.environ.get('USER','username')
|
|
289
|
username = os.environ.get('USER','username')
|
|
290
|
self.username = username
|
|
290
|
self.username = username
|
|
291
|
if session is None:
|
|
291
|
if session is None:
|
|
292
|
self.session = str(uuid.uuid4())
|
|
292
|
self.session = str(uuid.uuid4())
|
|
293
|
else:
|
|
293
|
else:
|
|
294
|
self.session = session
|
|
294
|
self.session = session
|
|
295
|
self.msg_id = str(uuid.uuid4())
|
|
295
|
self.msg_id = str(uuid.uuid4())
|
|
296
|
if packer is None:
|
|
296
|
if packer is None:
|
|
297
|
self.pack = default_packer
|
|
297
|
self.pack = default_packer
|
|
298
|
else:
|
|
298
|
else:
|
|
299
|
if not callable(packer):
|
|
299
|
if not callable(packer):
|
|
300
|
raise TypeError("packer must be callable, not %s"%type(packer))
|
|
300
|
raise TypeError("packer must be callable, not %s"%type(packer))
|
|
301
|
self.pack = packer
|
|
301
|
self.pack = packer
|
|
302
|
|
|
302
|
|
|
303
|
if unpacker is None:
|
|
303
|
if unpacker is None:
|
|
304
|
self.unpack = default_unpacker
|
|
304
|
self.unpack = default_unpacker
|
|
305
|
else:
|
|
305
|
else:
|
|
306
|
if not callable(unpacker):
|
|
306
|
if not callable(unpacker):
|
|
307
|
raise TypeError("unpacker must be callable, not %s"%type(unpacker))
|
|
307
|
raise TypeError("unpacker must be callable, not %s"%type(unpacker))
|
|
308
|
self.unpack = unpacker
|
|
308
|
self.unpack = unpacker
|
|
309
|
|
|
309
|
|
|
310
|
self.none = self.pack({})
|
|
310
|
self.none = self.pack({})
|
|
311
|
|
|
311
|
|
|
312
|
def msg_header(self, msg_type):
|
|
312
|
def msg_header(self, msg_type):
|
|
313
|
h = msg_header(self.msg_id, msg_type, self.username, self.session)
|
|
313
|
h = msg_header(self.msg_id, msg_type, self.username, self.session)
|
|
314
|
self.msg_id = str(uuid.uuid4())
|
|
314
|
self.msg_id = str(uuid.uuid4())
|
|
315
|
return h
|
|
315
|
return h
|
|
316
|
|
|
316
|
|
|
317
|
def msg(self, msg_type, content=None, parent=None, subheader=None):
|
|
317
|
def msg(self, msg_type, content=None, parent=None, subheader=None):
|
|
318
|
msg = {}
|
|
318
|
msg = {}
|
|
319
|
msg['header'] = self.msg_header(msg_type)
|
|
319
|
msg['header'] = self.msg_header(msg_type)
|
|
320
|
msg['msg_id'] = msg['header']['msg_id']
|
|
320
|
msg['msg_id'] = msg['header']['msg_id']
|
|
321
|
msg['parent_header'] = {} if parent is None else extract_header(parent)
|
|
321
|
msg['parent_header'] = {} if parent is None else extract_header(parent)
|
|
322
|
msg['msg_type'] = msg_type
|
|
322
|
msg['msg_type'] = msg_type
|
|
323
|
msg['content'] = {} if content is None else content
|
|
323
|
msg['content'] = {} if content is None else content
|
|
324
|
sub = {} if subheader is None else subheader
|
|
324
|
sub = {} if subheader is None else subheader
|
|
325
|
msg['header'].update(sub)
|
|
325
|
msg['header'].update(sub)
|
|
326
|
return msg
|
|
326
|
return msg
|
|
327
|
|
|
327
|
|
|
328
|
def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
|
|
328
|
def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
|
|
329
|
"""Build and send a message via stream or socket.
|
|
329
|
"""Build and send a message via stream or socket.
|
|
330
|
|
|
330
|
|
|
331
|
Parameters
|
|
331
|
Parameters
|
|
332
|
----------
|
|
332
|
----------
|
|
333
|
|
|
333
|
|
|
334
|
stream : zmq.Socket or ZMQStream
|
|
334
|
stream : zmq.Socket or ZMQStream
|
|
335
|
the socket-like object used to send the data
|
|
335
|
the socket-like object used to send the data
|
|
336
|
msg_type : str or Message/dict
|
|
336
|
msg_type : str or Message/dict
|
|
337
|
Normally, msg_type will be
|
|
337
|
Normally, msg_type will be
|
|
338
|
|
|
338
|
|
|
339
|
|
|
339
|
|
|
340
|
|
|
340
|
|
|
341
|
Returns
|
|
341
|
Returns
|
|
342
|
-------
|
|
342
|
-------
|
|
343
|
(msg,sent) : tuple
|
|
343
|
(msg,sent) : tuple
|
|
344
|
msg : Message
|
|
344
|
msg : Message
|
|
345
|
the nice wrapped dict-like object containing the headers
|
|
345
|
the nice wrapped dict-like object containing the headers
|
|
346
|
|
|
346
|
|
|
347
|
"""
|
|
347
|
"""
|
|
348
|
if isinstance(msg_type, (Message, dict)):
|
|
348
|
if isinstance(msg_type, (Message, dict)):
|
|
349
|
# we got a Message, not a msg_type
|
|
349
|
# we got a Message, not a msg_type
|
|
350
|
# don't build a new Message
|
|
350
|
# don't build a new Message
|
|
351
|
msg = msg_type
|
|
351
|
msg = msg_type
|
|
352
|
content = msg['content']
|
|
352
|
content = msg['content']
|
|
353
|
else:
|
|
353
|
else:
|
|
354
|
msg = self.msg(msg_type, content, parent, subheader)
|
|
354
|
msg = self.msg(msg_type, content, parent, subheader)
|
|
355
|
buffers = [] if buffers is None else buffers
|
|
355
|
buffers = [] if buffers is None else buffers
|
|
356
|
to_send = []
|
|
356
|
to_send = []
|
|
357
|
if isinstance(ident, list):
|
|
357
|
if isinstance(ident, list):
|
|
358
|
# accept list of idents
|
|
358
|
# accept list of idents
|
|
359
|
to_send.extend(ident)
|
|
359
|
to_send.extend(ident)
|
|
360
|
elif ident is not None:
|
|
360
|
elif ident is not None:
|
|
361
|
to_send.append(ident)
|
|
361
|
to_send.append(ident)
|
|
362
|
to_send.append(DELIM)
|
|
362
|
to_send.append(DELIM)
|
|
363
|
to_send.append(self.pack(msg['header']))
|
|
363
|
to_send.append(self.pack(msg['header']))
|
|
364
|
to_send.append(self.pack(msg['parent_header']))
|
|
364
|
to_send.append(self.pack(msg['parent_header']))
|
|
365
|
|
|
365
|
|
|
366
|
if content is None:
|
|
366
|
if content is None:
|
|
367
|
content = self.none
|
|
367
|
content = self.none
|
|
368
|
elif isinstance(content, dict):
|
|
368
|
elif isinstance(content, dict):
|
|
369
|
content = self.pack(content)
|
|
369
|
content = self.pack(content)
|
|
370
|
elif isinstance(content, str):
|
|
370
|
elif isinstance(content, str):
|
|
371
|
# content is already packed, as in a relayed message
|
|
371
|
# content is already packed, as in a relayed message
|
|
372
|
pass
|
|
372
|
pass
|
|
373
|
else:
|
|
373
|
else:
|
|
374
|
raise TypeError("Content incorrect type: %s"%type(content))
|
|
374
|
raise TypeError("Content incorrect type: %s"%type(content))
|
|
375
|
to_send.append(content)
|
|
375
|
to_send.append(content)
|
|
376
|
flag = 0
|
|
376
|
flag = 0
|
|
377
|
if buffers:
|
|
377
|
if buffers:
|
|
378
|
flag = zmq.SNDMORE
|
|
378
|
flag = zmq.SNDMORE
|
|
379
|
stream.send_multipart(to_send, flag, copy=False)
|
|
379
|
stream.send_multipart(to_send, flag, copy=False)
|
|
380
|
for b in buffers[:-1]:
|
|
380
|
for b in buffers[:-1]:
|
|
381
|
stream.send(b, flag, copy=False)
|
|
381
|
stream.send(b, flag, copy=False)
|
|
382
|
if buffers:
|
|
382
|
if buffers:
|
|
383
|
stream.send(buffers[-1], copy=False)
|
|
383
|
stream.send(buffers[-1], copy=False)
|
|
384
|
omsg = Message(msg)
|
|
384
|
omsg = Message(msg)
|
|
385
|
if self.debug:
|
|
385
|
if self.debug:
|
|
386
|
pprint.pprint(omsg)
|
|
386
|
pprint.pprint(omsg)
|
|
387
|
pprint.pprint(to_send)
|
|
387
|
pprint.pprint(to_send)
|
|
388
|
pprint.pprint(buffers)
|
|
388
|
pprint.pprint(buffers)
|
|
389
|
return omsg
|
|
389
|
return omsg
|
|
390
|
|
|
390
|
|
|
391
|
def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
|
|
391
|
def send_raw(self, stream, msg, flags=0, copy=True, idents=None):
|
|
392
|
"""Send a raw message via idents.
|
|
392
|
"""Send a raw message via idents.
|
|
393
|
|
|
393
|
|
|
394
|
Parameters
|
|
394
|
Parameters
|
|
395
|
----------
|
|
395
|
----------
|
|
396
|
msg : list of sendable buffers"""
|
|
396
|
msg : list of sendable buffers"""
|
|
397
|
to_send = []
|
|
397
|
to_send = []
|
|
398
|
if isinstance(ident, str):
|
|
398
|
if isinstance(ident, str):
|
|
399
|
ident = [ident]
|
|
399
|
ident = [ident]
|
|
400
|
if ident is not None:
|
|
400
|
if ident is not None:
|
|
401
|
to_send.extend(ident)
|
|
401
|
to_send.extend(ident)
|
|
402
|
to_send.append(DELIM)
|
|
402
|
to_send.append(DELIM)
|
|
403
|
to_send.extend(msg)
|
|
403
|
to_send.extend(msg)
|
|
404
|
stream.send_multipart(msg, flags, copy=copy)
|
|
404
|
stream.send_multipart(msg, flags, copy=copy)
|
|
405
|
|
|
405
|
|
|
406
|
def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
|
|
406
|
def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
|
|
407
|
"""receives and unpacks a message
|
|
407
|
"""receives and unpacks a message
|
|
408
|
returns [idents], msg"""
|
|
408
|
returns [idents], msg"""
|
|
409
|
if isinstance(socket, ZMQStream):
|
|
409
|
if isinstance(socket, ZMQStream):
|
|
410
|
socket = socket.socket
|
|
410
|
socket = socket.socket
|
|
411
|
try:
|
|
411
|
try:
|
|
412
|
msg = socket.recv_multipart(mode)
|
|
412
|
msg = socket.recv_multipart(mode)
|
|
413
|
except zmq.ZMQError as e:
|
|
413
|
except zmq.ZMQError as e:
|
|
414
|
if e.errno == zmq.EAGAIN:
|
|
414
|
if e.errno == zmq.EAGAIN:
|
|
415
|
# We can convert EAGAIN to None as we know in this case
|
|
415
|
# We can convert EAGAIN to None as we know in this case
|
|
416
|
# recv_json won't return None.
|
|
416
|
# recv_json won't return None.
|
|
417
|
return None
|
|
417
|
return None
|
|
418
|
else:
|
|
418
|
else:
|
|
419
|
raise
|
|
419
|
raise
|
|
420
|
# return an actual Message object
|
|
420
|
# return an actual Message object
|
|
421
|
# determine the number of idents by trying to unpack them.
|
|
421
|
# determine the number of idents by trying to unpack them.
|
|
422
|
# this is terrible:
|
|
422
|
# this is terrible:
|
|
423
|
idents, msg = self.feed_identities(msg, copy)
|
|
423
|
idents, msg = self.feed_identities(msg, copy)
|
|
424
|
try:
|
|
424
|
try:
|
|
425
|
return idents, self.unpack_message(msg, content=content, copy=copy)
|
|
425
|
return idents, self.unpack_message(msg, content=content, copy=copy)
|
|
426
|
except Exception as e:
|
|
426
|
except Exception as e:
|
|
427
|
print (idents, msg)
|
|
427
|
print (idents, msg)
|
|
428
|
# TODO: handle it
|
|
428
|
# TODO: handle it
|
|
429
|
raise e
|
|
429
|
raise e
|
|
430
|
|
|
430
|
|
|
431
|
def feed_identities(self, msg, copy=True):
|
|
431
|
def feed_identities(self, msg, copy=True):
|
|
432
|
"""This is a completely horrible thing, but it strips the zmq
|
|
432
|
"""This is a completely horrible thing, but it strips the zmq
|
|
433
|
ident prefixes off of a message. It will break if any identities
|
|
433
|
ident prefixes off of a message. It will break if any identities
|
|
434
|
are unpackable by self.unpack."""
|
|
434
|
are unpackable by self.unpack."""
|
|
435
|
msg = list(msg)
|
|
435
|
msg = list(msg)
|
|
436
|
idents = []
|
|
436
|
idents = []
|
|
437
|
while len(msg) > 3:
|
|
437
|
while len(msg) > 3:
|
|
438
|
if copy:
|
|
438
|
if copy:
|
|
439
|
s = msg[0]
|
|
439
|
s = msg[0]
|
|
440
|
else:
|
|
440
|
else:
|
|
441
|
s = msg[0].bytes
|
|
441
|
s = msg[0].bytes
|
|
442
|
if s == DELIM:
|
|
442
|
if s == DELIM:
|
|
443
|
msg.pop(0)
|
|
443
|
msg.pop(0)
|
|
444
|
break
|
|
444
|
break
|
|
445
|
else:
|
|
445
|
else:
|
|
446
|
idents.append(s)
|
|
446
|
idents.append(s)
|
|
447
|
msg.pop(0)
|
|
447
|
msg.pop(0)
|
|
448
|
|
|
448
|
|
|
449
|
return idents, msg
|
|
449
|
return idents, msg
|
|
450
|
|
|
450
|
|
|
451
|
def unpack_message(self, msg, content=True, copy=True):
|
|
451
|
def unpack_message(self, msg, content=True, copy=True):
|
|
452
|
"""Return a message object from the format
|
|
452
|
"""Return a message object from the format
|
|
453
|
sent by self.send.
|
|
453
|
sent by self.send.
|
|
454
|
|
|
454
|
|
|
455
|
Parameters:
|
|
455
|
Parameters:
|
|
456
|
-----------
|
|
456
|
-----------
|
|
457
|
|
|
457
|
|
|
458
|
content : bool (True)
|
|
458
|
content : bool (True)
|
|
459
|
whether to unpack the content dict (True),
|
|
459
|
whether to unpack the content dict (True),
|
|
460
|
or leave it serialized (False)
|
|
460
|
or leave it serialized (False)
|
|
461
|
|
|
461
|
|
|
462
|
copy : bool (True)
|
|
462
|
copy : bool (True)
|
|
463
|
whether to return the bytes (True),
|
|
463
|
whether to return the bytes (True),
|
|
464
|
or the non-copying Message object in each place (False)
|
|
464
|
or the non-copying Message object in each place (False)
|
|
465
|
|
|
465
|
|
|
466
|
"""
|
|
466
|
"""
|
|
467
|
if not len(msg) >= 3:
|
|
467
|
if not len(msg) >= 3:
|
|
468
|
raise TypeError("malformed message, must have at least 3 elements")
|
|
468
|
raise TypeError("malformed message, must have at least 3 elements")
|
|
469
|
message = {}
|
|
469
|
message = {}
|
|
470
|
if not copy:
|
|
470
|
if not copy:
|
|
471
|
for i in range(3):
|
|
471
|
for i in range(3):
|
|
472
|
msg[i] = msg[i].bytes
|
|
472
|
msg[i] = msg[i].bytes
|
|
473
|
message['header'] = self.unpack(msg[0])
|
|
473
|
message['header'] = self.unpack(msg[0])
|
|
474
|
message['msg_type'] = message['header']['msg_type']
|
|
474
|
message['msg_type'] = message['header']['msg_type']
|
|
475
|
message['parent_header'] = self.unpack(msg[1])
|
|
475
|
message['parent_header'] = self.unpack(msg[1])
|
|
476
|
if content:
|
|
476
|
if content:
|
|
477
|
message['content'] = self.unpack(msg[2])
|
|
477
|
message['content'] = self.unpack(msg[2])
|
|
478
|
else:
|
|
478
|
else:
|
|
479
|
message['content'] = msg[2]
|
|
479
|
message['content'] = msg[2]
|
|
480
|
|
|
480
|
|
|
481
|
# message['buffers'] = msg[3:]
|
|
481
|
# message['buffers'] = msg[3:]
|
|
482
|
# else:
|
|
482
|
# else:
|
|
483
|
# message['header'] = self.unpack(msg[0].bytes)
|
|
483
|
# message['header'] = self.unpack(msg[0].bytes)
|
|
484
|
# message['msg_type'] = message['header']['msg_type']
|
|
484
|
# message['msg_type'] = message['header']['msg_type']
|
|
485
|
# message['parent_header'] = self.unpack(msg[1].bytes)
|
|
485
|
# message['parent_header'] = self.unpack(msg[1].bytes)
|
|
486
|
# if content:
|
|
486
|
# if content:
|
|
487
|
# message['content'] = self.unpack(msg[2].bytes)
|
|
487
|
# message['content'] = self.unpack(msg[2].bytes)
|
|
488
|
# else:
|
|
488
|
# else:
|
|
489
|
# message['content'] = msg[2].bytes
|
|
489
|
# message['content'] = msg[2].bytes
|
|
490
|
|
|
490
|
|
|
491
|
message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ]
|
|
491
|
message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ]
|
|
492
|
return message
|
|
492
|
return message
|
|
493
|
|
|
493
|
|
|
494
|
|
|
494
|
|
|
495
|
|
|
495
|
|
|
496
|
def test_msg2obj():
|
|
496
|
def test_msg2obj():
|
|
497
|
am = dict(x=1)
|
|
497
|
am = dict(x=1)
|
|
498
|
ao = Message(am)
|
|
498
|
ao = Message(am)
|
|
499
|
assert ao.x == am['x']
|
|
499
|
assert ao.x == am['x']
|
|
500
|
|
|
500
|
|
|
501
|
am['y'] = dict(z=1)
|
|
501
|
am['y'] = dict(z=1)
|
|
502
|
ao = Message(am)
|
|
502
|
ao = Message(am)
|
|
503
|
assert ao.y.z == am['y']['z']
|
|
503
|
assert ao.y.z == am['y']['z']
|
|
504
|
|
|
504
|
|
|
505
|
k1, k2 = 'y', 'z'
|
|
505
|
k1, k2 = 'y', 'z'
|
|
506
|
assert ao[k1][k2] == am[k1][k2]
|
|
506
|
assert ao[k1][k2] == am[k1][k2]
|
|
507
|
|
|
507
|
|
|
508
|
am2 = dict(ao)
|
|
508
|
am2 = dict(ao)
|
|
509
|
assert am['x'] == am2['x']
|
|
509
|
assert am['x'] == am2['x']
|
|
510
|
assert am['y']['z'] == am2['y']['z']
|
|
510
|
assert am['y']['z'] == am2['y']['z']
|