##// END OF EJS Templates
serialize elements of args/kwargs in pack_apply message...
MinRK -
Show More
@@ -1,180 +1,204 b''
1 """serialization utilities for apply messages
1 """serialization utilities for apply messages
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports
18 # Standard library imports
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import socket
22 import socket
23 import sys
23 import sys
24
24
25 try:
25 try:
26 import cPickle
26 import cPickle
27 pickle = cPickle
27 pickle = cPickle
28 except:
28 except:
29 cPickle = None
29 cPickle = None
30 import pickle
30 import pickle
31
31
32
32
33 # IPython imports
33 # IPython imports
34 from IPython.utils import py3compat
34 from IPython.utils import py3compat
35 from IPython.utils.data import flatten
35 from IPython.utils.pickleutil import (
36 from IPython.utils.pickleutil import (
36 can, uncan, can_sequence, uncan_sequence, CannedObject
37 can, uncan, can_sequence, uncan_sequence, CannedObject
37 )
38 )
38
39
39 if py3compat.PY3:
40 if py3compat.PY3:
40 buffer = memoryview
41 buffer = memoryview
41
42
42 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
43 # Serialization Functions
44 # Serialization Functions
44 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
45
46
46 # default values for the thresholds:
47 # default values for the thresholds:
47 MAX_ITEMS = 64
48 MAX_ITEMS = 64
48 MAX_BYTES = 1024
49 MAX_BYTES = 1024
49
50
50 def _extract_buffers(obj, threshold=MAX_BYTES):
51 def _extract_buffers(obj, threshold=MAX_BYTES):
51 """extract buffers larger than a certain threshold"""
52 """extract buffers larger than a certain threshold"""
52 buffers = []
53 buffers = []
53 if isinstance(obj, CannedObject) and obj.buffers:
54 if isinstance(obj, CannedObject) and obj.buffers:
54 for i,buf in enumerate(obj.buffers):
55 for i,buf in enumerate(obj.buffers):
55 if len(buf) > threshold:
56 if len(buf) > threshold:
56 # buffer larger than threshold, prevent pickling
57 # buffer larger than threshold, prevent pickling
57 obj.buffers[i] = None
58 obj.buffers[i] = None
58 buffers.append(buf)
59 buffers.append(buf)
59 elif isinstance(buf, buffer):
60 elif isinstance(buf, buffer):
60 # buffer too small for separate send, coerce to bytes
61 # buffer too small for separate send, coerce to bytes
61 # because pickling buffer objects just results in broken pointers
62 # because pickling buffer objects just results in broken pointers
62 obj.buffers[i] = bytes(buf)
63 obj.buffers[i] = bytes(buf)
63 return buffers
64 return buffers
64
65
65 def _restore_buffers(obj, buffers):
66 def _restore_buffers(obj, buffers):
66 """restore buffers extracted by """
67 """restore buffers extracted by """
67 if isinstance(obj, CannedObject) and obj.buffers:
68 if isinstance(obj, CannedObject) and obj.buffers:
68 for i,buf in enumerate(obj.buffers):
69 for i,buf in enumerate(obj.buffers):
69 if buf is None:
70 if buf is None:
70 obj.buffers[i] = buffers.pop(0)
71 obj.buffers[i] = buffers.pop(0)
71
72
72 def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
73 def serialize_object(obj, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
73 """Serialize an object into a list of sendable buffers.
74 """Serialize an object into a list of sendable buffers.
74
75
75 Parameters
76 Parameters
76 ----------
77 ----------
77
78
78 obj : object
79 obj : object
79 The object to be serialized
80 The object to be serialized
80 buffer_threshold : int
81 buffer_threshold : int
81 The threshold (in bytes) for pulling out data buffers
82 The threshold (in bytes) for pulling out data buffers
82 to avoid pickling them.
83 to avoid pickling them.
83 item_threshold : int
84 item_threshold : int
84 The maximum number of items over which canning will iterate.
85 The maximum number of items over which canning will iterate.
85 Containers (lists, dicts) larger than this will be pickled without
86 Containers (lists, dicts) larger than this will be pickled without
86 introspection.
87 introspection.
87
88
88 Returns
89 Returns
89 -------
90 -------
90 [bufs] : list of buffers representing the serialized object.
91 [bufs] : list of buffers representing the serialized object.
91 """
92 """
92 buffers = []
93 buffers = []
93 if isinstance(obj, (list, tuple)) and len(obj) < item_threshold:
94 if isinstance(obj, (list, tuple)) and len(obj) < item_threshold:
94 cobj = can_sequence(obj)
95 cobj = can_sequence(obj)
95 for c in cobj:
96 for c in cobj:
96 buffers.extend(_extract_buffers(c, buffer_threshold))
97 buffers.extend(_extract_buffers(c, buffer_threshold))
97 elif isinstance(obj, dict) and len(obj) < item_threshold:
98 elif isinstance(obj, dict) and len(obj) < item_threshold:
98 cobj = {}
99 cobj = {}
99 for k in sorted(obj.iterkeys()):
100 for k in sorted(obj.iterkeys()):
100 c = can(obj[k])
101 c = can(obj[k])
101 buffers.extend(_extract_buffers(c, buffer_threshold))
102 buffers.extend(_extract_buffers(c, buffer_threshold))
102 cobj[k] = c
103 cobj[k] = c
103 else:
104 else:
104 cobj = can(obj)
105 cobj = can(obj)
105 buffers.extend(_extract_buffers(cobj, buffer_threshold))
106 buffers.extend(_extract_buffers(cobj, buffer_threshold))
106
107
107 buffers.insert(0, pickle.dumps(cobj,-1))
108 buffers.insert(0, pickle.dumps(cobj,-1))
108 return buffers
109 return buffers
109
110
110 def unserialize_object(buffers, g=None):
111 def unserialize_object(buffers, g=None):
111 """reconstruct an object serialized by serialize_object from data buffers.
112 """reconstruct an object serialized by serialize_object from data buffers.
112
113
113 Parameters
114 Parameters
114 ----------
115 ----------
115
116
116 bufs : list of buffers/bytes
117 bufs : list of buffers/bytes
117
118
118 g : globals to be used when uncanning
119 g : globals to be used when uncanning
119
120
120 Returns
121 Returns
121 -------
122 -------
122
123
123 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
124 (newobj, bufs) : unpacked object, and the list of remaining unused buffers.
124 """
125 """
125 bufs = list(buffers)
126 bufs = list(buffers)
126 canned = pickle.loads(bufs.pop(0))
127 pobj = bufs.pop(0)
128 if not isinstance(pobj, bytes):
129 # a zmq message
130 pobj = bytes(pobj)
131 canned = pickle.loads(pobj)
127 if isinstance(canned, (list, tuple)) and len(canned) < MAX_ITEMS:
132 if isinstance(canned, (list, tuple)) and len(canned) < MAX_ITEMS:
128 for c in canned:
133 for c in canned:
129 _restore_buffers(c, bufs)
134 _restore_buffers(c, bufs)
130 newobj = uncan_sequence(canned, g)
135 newobj = uncan_sequence(canned, g)
131 elif isinstance(canned, dict) and len(canned) < MAX_ITEMS:
136 elif isinstance(canned, dict) and len(canned) < MAX_ITEMS:
132 newobj = {}
137 newobj = {}
133 for k in sorted(canned.iterkeys()):
138 for k in sorted(canned.iterkeys()):
134 c = canned[k]
139 c = canned[k]
135 _restore_buffers(c, bufs)
140 _restore_buffers(c, bufs)
136 newobj[k] = uncan(c, g)
141 newobj[k] = uncan(c, g)
137 else:
142 else:
138 _restore_buffers(canned, bufs)
143 _restore_buffers(canned, bufs)
139 newobj = uncan(canned, g)
144 newobj = uncan(canned, g)
140
145
141 return newobj, bufs
146 return newobj, bufs
142
147
143 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
148 def pack_apply_message(f, args, kwargs, buffer_threshold=MAX_BYTES, item_threshold=MAX_ITEMS):
144 """pack up a function, args, and kwargs to be sent over the wire
149 """pack up a function, args, and kwargs to be sent over the wire
145
150
146 as a series of buffers. Any object whose data is larger than `threshold`
151 Each element of args/kwargs will be canned for special treatment,
147 will not have their data copied (currently only numpy arrays support zero-copy)
152 but inspection will not go any deeper than that.
153
154 Any object whose data is larger than `threshold` will not have their data copied
155 (only numpy arrays and bytes/buffers support zero-copy)
156
157 Message will be a list of bytes/buffers of the format:
158
159 [ cf, pinfo, <arg_bufs>, <kwarg_bufs> ]
160
161 With length at least two + len(args) + len(kwargs)
148 """
162 """
163
164 arg_bufs = flatten(serialize_object(arg, buffer_threshold, item_threshold) for arg in args)
165
166 kw_keys = sorted(kwargs.keys())
167 kwarg_bufs = flatten(serialize_object(kwargs[key], buffer_threshold, item_threshold) for key in kw_keys)
168
169 info = dict(nargs=len(args), narg_bufs=len(arg_bufs), kw_keys=kw_keys)
170
149 msg = [pickle.dumps(can(f),-1)]
171 msg = [pickle.dumps(can(f),-1)]
150 databuffers = [] # for large objects
172 msg.append(pickle.dumps(info, -1))
151 sargs = serialize_object(args, buffer_threshold, item_threshold)
173 msg.extend(arg_bufs)
152 msg.append(sargs[0])
174 msg.extend(kwarg_bufs)
153 databuffers.extend(sargs[1:])
175
154 skwargs = serialize_object(kwargs, buffer_threshold, item_threshold)
155 msg.append(skwargs[0])
156 databuffers.extend(skwargs[1:])
157 msg.extend(databuffers)
158 return msg
176 return msg
159
177
160 def unpack_apply_message(bufs, g=None, copy=True):
178 def unpack_apply_message(bufs, g=None, copy=True):
161 """unpack f,args,kwargs from buffers packed by pack_apply_message()
179 """unpack f,args,kwargs from buffers packed by pack_apply_message()
162 Returns: original f,args,kwargs"""
180 Returns: original f,args,kwargs"""
163 bufs = list(bufs) # allow us to pop
181 bufs = list(bufs) # allow us to pop
164 assert len(bufs) >= 3, "not enough buffers!"
182 assert len(bufs) >= 2, "not enough buffers!"
165 if not copy:
183 if not copy:
166 for i in range(3):
184 for i in range(2):
167 bufs[i] = bufs[i].bytes
185 bufs[i] = bufs[i].bytes
168 f = uncan(pickle.loads(bufs.pop(0)), g)
186 f = uncan(pickle.loads(bufs.pop(0)), g)
169 # sargs = bufs.pop(0)
187 info = pickle.loads(bufs.pop(0))
170 # pop kwargs out, so first n-elements are args, serialized
188 arg_bufs, kwarg_bufs = bufs[:info['narg_bufs']], bufs[info['narg_bufs']:]
171 skwargs = bufs.pop(1)
189
172 args, bufs = unserialize_object(bufs, g)
190 args = []
173 # put skwargs back in as the first element
191 for i in range(info['nargs']):
174 bufs.insert(0, skwargs)
192 arg, arg_bufs = unserialize_object(arg_bufs, g)
175 kwargs, bufs = unserialize_object(bufs, g)
193 args.append(arg)
176
194 args = tuple(args)
177 assert not bufs, "Shouldn't be any data left over"
195 assert not arg_bufs, "Shouldn't be any arg bufs left over"
196
197 kwargs = {}
198 for key in info['kw_keys']:
199 kwarg, kwarg_bufs = unserialize_object(kwarg_bufs, g)
200 kwargs[key] = kwarg
201 assert not kwarg_bufs, "Shouldn't be any kwarg bufs left over"
178
202
179 return f,args,kwargs
203 return f,args,kwargs
180
204
General Comments 0
You need to be logged in to leave comments. Login now