##// END OF EJS Templates
Read pickles in pure binary mode....
Fernando Perez -
Show More
@@ -1,364 +1,364 b''
1 1 #!/usr/bin/env python
2 2
3 3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4 4
5 5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 6 shelve, many processes can access the database simultaneously. Changing a
7 7 value in database is immediately visible to other processes accessing the
8 8 same database.
9 9
10 10 Concurrency is possible because the values are stored in separate files. Hence
11 11 the "database" is a directory where *all* files are governed by PickleShare.
12 12
13 13 Example usage::
14 14
15 15 from pickleshare import *
16 16 db = PickleShareDB('~/testpickleshare')
17 17 db.clear()
18 18 print "Should be empty:",db.items()
19 19 db['hello'] = 15
20 20 db['aku ankka'] = [1,2,313]
21 21 db['paths/are/ok/key'] = [1,(5,46)]
22 22 print db.keys()
23 23 del db['aku ankka']
24 24
25 25 This module is certainly not ZODB, but can be used for low-load
26 26 (non-mission-critical) situations where tiny code size trumps the
27 27 advanced features of a "real" object database.
28 28
29 29 Installation guide: easy_install pickleshare
30 30
31 31 Author: Ville Vainio <vivainio@gmail.com>
32 32 License: MIT open source license.
33 33
34 34 """
35 35
36 36 from IPython.external.path import path as Path
37 37 import os,stat,time
38 38 import collections
39 39 import cPickle as pickle
40 40 import glob
41 41
42 42 def gethashfile(key):
43 43 return ("%02x" % abs(hash(key) % 256))[-2:]
44 44
45 45 _sentinel = object()
46 46
47 47 class PickleShareDB(collections.MutableMapping):
48 48 """ The main 'connection' object for PickleShare database """
49 49 def __init__(self,root):
50 50 """ Return a db object that will manage the specied directory"""
51 51 self.root = Path(root).expanduser().abspath()
52 52 if not self.root.isdir():
53 53 self.root.makedirs()
54 54 # cache has { 'key' : (obj, orig_mod_time) }
55 55 self.cache = {}
56 56
57 57
58 58 def __getitem__(self,key):
59 59 """ db['key'] reading """
60 60 fil = self.root / key
61 61 try:
62 62 mtime = (fil.stat()[stat.ST_MTIME])
63 63 except OSError:
64 64 raise KeyError(key)
65 65
66 66 if fil in self.cache and mtime == self.cache[fil][1]:
67 67 return self.cache[fil][0]
68 68 try:
69 69 # The cached item has expired, need to read
70 obj = pickle.loads(fil.open("rbU").read())
70 obj = pickle.loads(fil.open("rb").read())
71 71 except:
72 72 raise KeyError(key)
73 73
74 74 self.cache[fil] = (obj,mtime)
75 75 return obj
76 76
77 77 def __setitem__(self,key,value):
78 78 """ db['key'] = 5 """
79 79 fil = self.root / key
80 80 parent = fil.parent
81 81 if parent and not parent.isdir():
82 82 parent.makedirs()
83 83 # We specify protocol 2, so that we can mostly go between Python 2
84 84 # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
85 85 pickled = pickle.dump(value,fil.open('wb'), protocol=2)
86 86 try:
87 87 self.cache[fil] = (value,fil.mtime)
88 88 except OSError,e:
89 89 if e.errno != 2:
90 90 raise
91 91
92 92 def hset(self, hashroot, key, value):
93 93 """ hashed set """
94 94 hroot = self.root / hashroot
95 95 if not hroot.isdir():
96 96 hroot.makedirs()
97 97 hfile = hroot / gethashfile(key)
98 98 d = self.get(hfile, {})
99 99 d.update( {key : value})
100 100 self[hfile] = d
101 101
102 102
103 103
104 104 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
105 105 """ hashed get """
106 106 hroot = self.root / hashroot
107 107 hfile = hroot / gethashfile(key)
108 108
109 109 d = self.get(hfile, _sentinel )
110 110 #print "got dict",d,"from",hfile
111 111 if d is _sentinel:
112 112 if fast_only:
113 113 if default is _sentinel:
114 114 raise KeyError(key)
115 115
116 116 return default
117 117
118 118 # slow mode ok, works even after hcompress()
119 119 d = self.hdict(hashroot)
120 120
121 121 return d.get(key, default)
122 122
123 123 def hdict(self, hashroot):
124 124 """ Get all data contained in hashed category 'hashroot' as dict """
125 125 hfiles = self.keys(hashroot + "/*")
126 126 hfiles.sort()
127 127 last = len(hfiles) and hfiles[-1] or ''
128 128 if last.endswith('xx'):
129 129 # print "using xx"
130 130 hfiles = [last] + hfiles[:-1]
131 131
132 132 all = {}
133 133
134 134 for f in hfiles:
135 135 # print "using",f
136 136 try:
137 137 all.update(self[f])
138 138 except KeyError:
139 139 print "Corrupt",f,"deleted - hset is not threadsafe!"
140 140 del self[f]
141 141
142 142 self.uncache(f)
143 143
144 144 return all
145 145
146 146 def hcompress(self, hashroot):
147 147 """ Compress category 'hashroot', so hset is fast again
148 148
149 149 hget will fail if fast_only is True for compressed items (that were
150 150 hset before hcompress).
151 151
152 152 """
153 153 hfiles = self.keys(hashroot + "/*")
154 154 all = {}
155 155 for f in hfiles:
156 156 # print "using",f
157 157 all.update(self[f])
158 158 self.uncache(f)
159 159
160 160 self[hashroot + '/xx'] = all
161 161 for f in hfiles:
162 162 p = self.root / f
163 163 if p.basename() == 'xx':
164 164 continue
165 165 p.remove()
166 166
167 167
168 168
169 169 def __delitem__(self,key):
170 170 """ del db["key"] """
171 171 fil = self.root / key
172 172 self.cache.pop(fil,None)
173 173 try:
174 174 fil.remove()
175 175 except OSError:
176 176 # notfound and permission denied are ok - we
177 177 # lost, the other process wins the conflict
178 178 pass
179 179
180 180 def _normalized(self, p):
181 181 """ Make a key suitable for user's eyes """
182 182 return str(self.root.relpathto(p)).replace('\\','/')
183 183
184 184 def keys(self, globpat = None):
185 185 """ All keys in DB, or all keys matching a glob"""
186 186
187 187 if globpat is None:
188 188 files = self.root.walkfiles()
189 189 else:
190 190 files = [Path(p) for p in glob.glob(self.root/globpat)]
191 191 return [self._normalized(p) for p in files if p.isfile()]
192 192
193 193 def __iter__(self):
194 194 return iter(keys)
195 195
196 196 def __len__(self):
197 197 return len(keys)
198 198
199 199 def uncache(self,*items):
200 200 """ Removes all, or specified items from cache
201 201
202 202 Use this after reading a large amount of large objects
203 203 to free up memory, when you won't be needing the objects
204 204 for a while.
205 205
206 206 """
207 207 if not items:
208 208 self.cache = {}
209 209 for it in items:
210 210 self.cache.pop(it,None)
211 211
212 212 def waitget(self,key, maxwaittime = 60 ):
213 213 """ Wait (poll) for a key to get a value
214 214
215 215 Will wait for `maxwaittime` seconds before raising a KeyError.
216 216 The call exits normally if the `key` field in db gets a value
217 217 within the timeout period.
218 218
219 219 Use this for synchronizing different processes or for ensuring
220 220 that an unfortunately timed "db['key'] = newvalue" operation
221 221 in another process (which causes all 'get' operation to cause a
222 222 KeyError for the duration of pickling) won't screw up your program
223 223 logic.
224 224 """
225 225
226 226 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
227 227 tries = 0
228 228 waited = 0
229 229 while 1:
230 230 try:
231 231 val = self[key]
232 232 return val
233 233 except KeyError:
234 234 pass
235 235
236 236 if waited > maxwaittime:
237 237 raise KeyError(key)
238 238
239 239 time.sleep(wtimes[tries])
240 240 waited+=wtimes[tries]
241 241 if tries < len(wtimes) -1:
242 242 tries+=1
243 243
244 244 def getlink(self,folder):
245 245 """ Get a convenient link for accessing items """
246 246 return PickleShareLink(self, folder)
247 247
248 248 def __repr__(self):
249 249 return "PickleShareDB('%s')" % self.root
250 250
251 251
252 252
253 253 class PickleShareLink:
254 254 """ A shortdand for accessing nested PickleShare data conveniently.
255 255
256 256 Created through PickleShareDB.getlink(), example::
257 257
258 258 lnk = db.getlink('myobjects/test')
259 259 lnk.foo = 2
260 260 lnk.bar = lnk.foo + 5
261 261
262 262 """
263 263 def __init__(self, db, keydir ):
264 264 self.__dict__.update(locals())
265 265
266 266 def __getattr__(self,key):
267 267 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
268 268 def __setattr__(self,key,val):
269 269 self.db[self.keydir+'/' + key] = val
270 270 def __repr__(self):
271 271 db = self.__dict__['db']
272 272 keys = db.keys( self.__dict__['keydir'] +"/*")
273 273 return "<PickleShareLink '%s': %s>" % (
274 274 self.__dict__['keydir'],
275 275 ";".join([Path(k).basename() for k in keys]))
276 276
277 277
278 278 def test():
279 279 db = PickleShareDB('~/testpickleshare')
280 280 db.clear()
281 281 print "Should be empty:",db.items()
282 282 db['hello'] = 15
283 283 db['aku ankka'] = [1,2,313]
284 284 db['paths/nest/ok/keyname'] = [1,(5,46)]
285 285 db.hset('hash', 'aku', 12)
286 286 db.hset('hash', 'ankka', 313)
287 287 print "12 =",db.hget('hash','aku')
288 288 print "313 =",db.hget('hash','ankka')
289 289 print "all hashed",db.hdict('hash')
290 290 print db.keys()
291 291 print db.keys('paths/nest/ok/k*')
292 292 print dict(db) # snapsot of whole db
293 293 db.uncache() # frees memory, causes re-reads later
294 294
295 295 # shorthand for accessing deeply nested files
296 296 lnk = db.getlink('myobjects/test')
297 297 lnk.foo = 2
298 298 lnk.bar = lnk.foo + 5
299 299 print lnk.bar # 7
300 300
301 301 def stress():
302 302 db = PickleShareDB('~/fsdbtest')
303 303 import time,sys
304 304 for i in range(1000):
305 305 for j in range(1000):
306 306 if i % 15 == 0 and i < 200:
307 307 if str(j) in db:
308 308 del db[str(j)]
309 309 continue
310 310
311 311 if j%33 == 0:
312 312 time.sleep(0.02)
313 313
314 314 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
315 315 db.hset('hash',j, db.hget('hash',j,15) + 1 )
316 316
317 317 print i,
318 318 sys.stdout.flush()
319 319 if i % 10 == 0:
320 320 db.uncache()
321 321
322 322 def main():
323 323 import textwrap
324 324 usage = textwrap.dedent("""\
325 325 pickleshare - manage PickleShare databases
326 326
327 327 Usage:
328 328
329 329 pickleshare dump /path/to/db > dump.txt
330 330 pickleshare load /path/to/db < dump.txt
331 331 pickleshare test /path/to/db
332 332 """)
333 333 DB = PickleShareDB
334 334 import sys
335 335 if len(sys.argv) < 2:
336 336 print usage
337 337 return
338 338
339 339 cmd = sys.argv[1]
340 340 args = sys.argv[2:]
341 341 if cmd == 'dump':
342 342 if not args: args= ['.']
343 343 db = DB(args[0])
344 344 import pprint
345 345 pprint.pprint(db.items())
346 346 elif cmd == 'load':
347 347 cont = sys.stdin.read()
348 348 db = DB(args[0])
349 349 data = eval(cont)
350 350 db.clear()
351 351 for k,v in db.items():
352 352 db[k] = v
353 353 elif cmd == 'testwait':
354 354 db = DB(args[0])
355 355 db.clear()
356 356 print db.waitget('250')
357 357 elif cmd == 'test':
358 358 test()
359 359 stress()
360 360
361 361 if __name__== "__main__":
362 362 main()
363 363
364 364
General Comments 0
You need to be logged in to leave comments. Login now