##// END OF EJS Templates
Read pickles in pure binary mode....
Fernando Perez -
Show More
@@ -1,364 +1,364 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2
2
3 """ PickleShare - a small 'shelve' like datastore with concurrency support
3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4
4
5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 shelve, many processes can access the database simultaneously. Changing a
6 shelve, many processes can access the database simultaneously. Changing a
7 value in database is immediately visible to other processes accessing the
7 value in database is immediately visible to other processes accessing the
8 same database.
8 same database.
9
9
10 Concurrency is possible because the values are stored in separate files. Hence
10 Concurrency is possible because the values are stored in separate files. Hence
11 the "database" is a directory where *all* files are governed by PickleShare.
11 the "database" is a directory where *all* files are governed by PickleShare.
12
12
13 Example usage::
13 Example usage::
14
14
15 from pickleshare import *
15 from pickleshare import *
16 db = PickleShareDB('~/testpickleshare')
16 db = PickleShareDB('~/testpickleshare')
17 db.clear()
17 db.clear()
18 print "Should be empty:",db.items()
18 print "Should be empty:",db.items()
19 db['hello'] = 15
19 db['hello'] = 15
20 db['aku ankka'] = [1,2,313]
20 db['aku ankka'] = [1,2,313]
21 db['paths/are/ok/key'] = [1,(5,46)]
21 db['paths/are/ok/key'] = [1,(5,46)]
22 print db.keys()
22 print db.keys()
23 del db['aku ankka']
23 del db['aku ankka']
24
24
25 This module is certainly not ZODB, but can be used for low-load
25 This module is certainly not ZODB, but can be used for low-load
26 (non-mission-critical) situations where tiny code size trumps the
26 (non-mission-critical) situations where tiny code size trumps the
27 advanced features of a "real" object database.
27 advanced features of a "real" object database.
28
28
29 Installation guide: easy_install pickleshare
29 Installation guide: easy_install pickleshare
30
30
31 Author: Ville Vainio <vivainio@gmail.com>
31 Author: Ville Vainio <vivainio@gmail.com>
32 License: MIT open source license.
32 License: MIT open source license.
33
33
34 """
34 """
35
35
36 from IPython.external.path import path as Path
36 from IPython.external.path import path as Path
37 import os,stat,time
37 import os,stat,time
38 import collections
38 import collections
39 import cPickle as pickle
39 import cPickle as pickle
40 import glob
40 import glob
41
41
42 def gethashfile(key):
42 def gethashfile(key):
43 return ("%02x" % abs(hash(key) % 256))[-2:]
43 return ("%02x" % abs(hash(key) % 256))[-2:]
44
44
45 _sentinel = object()
45 _sentinel = object()
46
46
47 class PickleShareDB(collections.MutableMapping):
47 class PickleShareDB(collections.MutableMapping):
48 """ The main 'connection' object for PickleShare database """
48 """ The main 'connection' object for PickleShare database """
49 def __init__(self,root):
49 def __init__(self,root):
50 """ Return a db object that will manage the specied directory"""
50 """ Return a db object that will manage the specied directory"""
51 self.root = Path(root).expanduser().abspath()
51 self.root = Path(root).expanduser().abspath()
52 if not self.root.isdir():
52 if not self.root.isdir():
53 self.root.makedirs()
53 self.root.makedirs()
54 # cache has { 'key' : (obj, orig_mod_time) }
54 # cache has { 'key' : (obj, orig_mod_time) }
55 self.cache = {}
55 self.cache = {}
56
56
57
57
58 def __getitem__(self,key):
58 def __getitem__(self,key):
59 """ db['key'] reading """
59 """ db['key'] reading """
60 fil = self.root / key
60 fil = self.root / key
61 try:
61 try:
62 mtime = (fil.stat()[stat.ST_MTIME])
62 mtime = (fil.stat()[stat.ST_MTIME])
63 except OSError:
63 except OSError:
64 raise KeyError(key)
64 raise KeyError(key)
65
65
66 if fil in self.cache and mtime == self.cache[fil][1]:
66 if fil in self.cache and mtime == self.cache[fil][1]:
67 return self.cache[fil][0]
67 return self.cache[fil][0]
68 try:
68 try:
69 # The cached item has expired, need to read
69 # The cached item has expired, need to read
70 obj = pickle.loads(fil.open("rbU").read())
70 obj = pickle.loads(fil.open("rb").read())
71 except:
71 except:
72 raise KeyError(key)
72 raise KeyError(key)
73
73
74 self.cache[fil] = (obj,mtime)
74 self.cache[fil] = (obj,mtime)
75 return obj
75 return obj
76
76
77 def __setitem__(self,key,value):
77 def __setitem__(self,key,value):
78 """ db['key'] = 5 """
78 """ db['key'] = 5 """
79 fil = self.root / key
79 fil = self.root / key
80 parent = fil.parent
80 parent = fil.parent
81 if parent and not parent.isdir():
81 if parent and not parent.isdir():
82 parent.makedirs()
82 parent.makedirs()
83 # We specify protocol 2, so that we can mostly go between Python 2
83 # We specify protocol 2, so that we can mostly go between Python 2
84 # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
84 # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
85 pickled = pickle.dump(value,fil.open('wb'), protocol=2)
85 pickled = pickle.dump(value,fil.open('wb'), protocol=2)
86 try:
86 try:
87 self.cache[fil] = (value,fil.mtime)
87 self.cache[fil] = (value,fil.mtime)
88 except OSError,e:
88 except OSError,e:
89 if e.errno != 2:
89 if e.errno != 2:
90 raise
90 raise
91
91
92 def hset(self, hashroot, key, value):
92 def hset(self, hashroot, key, value):
93 """ hashed set """
93 """ hashed set """
94 hroot = self.root / hashroot
94 hroot = self.root / hashroot
95 if not hroot.isdir():
95 if not hroot.isdir():
96 hroot.makedirs()
96 hroot.makedirs()
97 hfile = hroot / gethashfile(key)
97 hfile = hroot / gethashfile(key)
98 d = self.get(hfile, {})
98 d = self.get(hfile, {})
99 d.update( {key : value})
99 d.update( {key : value})
100 self[hfile] = d
100 self[hfile] = d
101
101
102
102
103
103
104 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
104 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
105 """ hashed get """
105 """ hashed get """
106 hroot = self.root / hashroot
106 hroot = self.root / hashroot
107 hfile = hroot / gethashfile(key)
107 hfile = hroot / gethashfile(key)
108
108
109 d = self.get(hfile, _sentinel )
109 d = self.get(hfile, _sentinel )
110 #print "got dict",d,"from",hfile
110 #print "got dict",d,"from",hfile
111 if d is _sentinel:
111 if d is _sentinel:
112 if fast_only:
112 if fast_only:
113 if default is _sentinel:
113 if default is _sentinel:
114 raise KeyError(key)
114 raise KeyError(key)
115
115
116 return default
116 return default
117
117
118 # slow mode ok, works even after hcompress()
118 # slow mode ok, works even after hcompress()
119 d = self.hdict(hashroot)
119 d = self.hdict(hashroot)
120
120
121 return d.get(key, default)
121 return d.get(key, default)
122
122
123 def hdict(self, hashroot):
123 def hdict(self, hashroot):
124 """ Get all data contained in hashed category 'hashroot' as dict """
124 """ Get all data contained in hashed category 'hashroot' as dict """
125 hfiles = self.keys(hashroot + "/*")
125 hfiles = self.keys(hashroot + "/*")
126 hfiles.sort()
126 hfiles.sort()
127 last = len(hfiles) and hfiles[-1] or ''
127 last = len(hfiles) and hfiles[-1] or ''
128 if last.endswith('xx'):
128 if last.endswith('xx'):
129 # print "using xx"
129 # print "using xx"
130 hfiles = [last] + hfiles[:-1]
130 hfiles = [last] + hfiles[:-1]
131
131
132 all = {}
132 all = {}
133
133
134 for f in hfiles:
134 for f in hfiles:
135 # print "using",f
135 # print "using",f
136 try:
136 try:
137 all.update(self[f])
137 all.update(self[f])
138 except KeyError:
138 except KeyError:
139 print "Corrupt",f,"deleted - hset is not threadsafe!"
139 print "Corrupt",f,"deleted - hset is not threadsafe!"
140 del self[f]
140 del self[f]
141
141
142 self.uncache(f)
142 self.uncache(f)
143
143
144 return all
144 return all
145
145
146 def hcompress(self, hashroot):
146 def hcompress(self, hashroot):
147 """ Compress category 'hashroot', so hset is fast again
147 """ Compress category 'hashroot', so hset is fast again
148
148
149 hget will fail if fast_only is True for compressed items (that were
149 hget will fail if fast_only is True for compressed items (that were
150 hset before hcompress).
150 hset before hcompress).
151
151
152 """
152 """
153 hfiles = self.keys(hashroot + "/*")
153 hfiles = self.keys(hashroot + "/*")
154 all = {}
154 all = {}
155 for f in hfiles:
155 for f in hfiles:
156 # print "using",f
156 # print "using",f
157 all.update(self[f])
157 all.update(self[f])
158 self.uncache(f)
158 self.uncache(f)
159
159
160 self[hashroot + '/xx'] = all
160 self[hashroot + '/xx'] = all
161 for f in hfiles:
161 for f in hfiles:
162 p = self.root / f
162 p = self.root / f
163 if p.basename() == 'xx':
163 if p.basename() == 'xx':
164 continue
164 continue
165 p.remove()
165 p.remove()
166
166
167
167
168
168
169 def __delitem__(self,key):
169 def __delitem__(self,key):
170 """ del db["key"] """
170 """ del db["key"] """
171 fil = self.root / key
171 fil = self.root / key
172 self.cache.pop(fil,None)
172 self.cache.pop(fil,None)
173 try:
173 try:
174 fil.remove()
174 fil.remove()
175 except OSError:
175 except OSError:
176 # notfound and permission denied are ok - we
176 # notfound and permission denied are ok - we
177 # lost, the other process wins the conflict
177 # lost, the other process wins the conflict
178 pass
178 pass
179
179
180 def _normalized(self, p):
180 def _normalized(self, p):
181 """ Make a key suitable for user's eyes """
181 """ Make a key suitable for user's eyes """
182 return str(self.root.relpathto(p)).replace('\\','/')
182 return str(self.root.relpathto(p)).replace('\\','/')
183
183
184 def keys(self, globpat = None):
184 def keys(self, globpat = None):
185 """ All keys in DB, or all keys matching a glob"""
185 """ All keys in DB, or all keys matching a glob"""
186
186
187 if globpat is None:
187 if globpat is None:
188 files = self.root.walkfiles()
188 files = self.root.walkfiles()
189 else:
189 else:
190 files = [Path(p) for p in glob.glob(self.root/globpat)]
190 files = [Path(p) for p in glob.glob(self.root/globpat)]
191 return [self._normalized(p) for p in files if p.isfile()]
191 return [self._normalized(p) for p in files if p.isfile()]
192
192
193 def __iter__(self):
193 def __iter__(self):
194 return iter(keys)
194 return iter(keys)
195
195
196 def __len__(self):
196 def __len__(self):
197 return len(keys)
197 return len(keys)
198
198
199 def uncache(self,*items):
199 def uncache(self,*items):
200 """ Removes all, or specified items from cache
200 """ Removes all, or specified items from cache
201
201
202 Use this after reading a large amount of large objects
202 Use this after reading a large amount of large objects
203 to free up memory, when you won't be needing the objects
203 to free up memory, when you won't be needing the objects
204 for a while.
204 for a while.
205
205
206 """
206 """
207 if not items:
207 if not items:
208 self.cache = {}
208 self.cache = {}
209 for it in items:
209 for it in items:
210 self.cache.pop(it,None)
210 self.cache.pop(it,None)
211
211
212 def waitget(self,key, maxwaittime = 60 ):
212 def waitget(self,key, maxwaittime = 60 ):
213 """ Wait (poll) for a key to get a value
213 """ Wait (poll) for a key to get a value
214
214
215 Will wait for `maxwaittime` seconds before raising a KeyError.
215 Will wait for `maxwaittime` seconds before raising a KeyError.
216 The call exits normally if the `key` field in db gets a value
216 The call exits normally if the `key` field in db gets a value
217 within the timeout period.
217 within the timeout period.
218
218
219 Use this for synchronizing different processes or for ensuring
219 Use this for synchronizing different processes or for ensuring
220 that an unfortunately timed "db['key'] = newvalue" operation
220 that an unfortunately timed "db['key'] = newvalue" operation
221 in another process (which causes all 'get' operation to cause a
221 in another process (which causes all 'get' operation to cause a
222 KeyError for the duration of pickling) won't screw up your program
222 KeyError for the duration of pickling) won't screw up your program
223 logic.
223 logic.
224 """
224 """
225
225
226 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
226 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
227 tries = 0
227 tries = 0
228 waited = 0
228 waited = 0
229 while 1:
229 while 1:
230 try:
230 try:
231 val = self[key]
231 val = self[key]
232 return val
232 return val
233 except KeyError:
233 except KeyError:
234 pass
234 pass
235
235
236 if waited > maxwaittime:
236 if waited > maxwaittime:
237 raise KeyError(key)
237 raise KeyError(key)
238
238
239 time.sleep(wtimes[tries])
239 time.sleep(wtimes[tries])
240 waited+=wtimes[tries]
240 waited+=wtimes[tries]
241 if tries < len(wtimes) -1:
241 if tries < len(wtimes) -1:
242 tries+=1
242 tries+=1
243
243
244 def getlink(self,folder):
244 def getlink(self,folder):
245 """ Get a convenient link for accessing items """
245 """ Get a convenient link for accessing items """
246 return PickleShareLink(self, folder)
246 return PickleShareLink(self, folder)
247
247
248 def __repr__(self):
248 def __repr__(self):
249 return "PickleShareDB('%s')" % self.root
249 return "PickleShareDB('%s')" % self.root
250
250
251
251
252
252
253 class PickleShareLink:
253 class PickleShareLink:
254 """ A shortdand for accessing nested PickleShare data conveniently.
254 """ A shortdand for accessing nested PickleShare data conveniently.
255
255
256 Created through PickleShareDB.getlink(), example::
256 Created through PickleShareDB.getlink(), example::
257
257
258 lnk = db.getlink('myobjects/test')
258 lnk = db.getlink('myobjects/test')
259 lnk.foo = 2
259 lnk.foo = 2
260 lnk.bar = lnk.foo + 5
260 lnk.bar = lnk.foo + 5
261
261
262 """
262 """
263 def __init__(self, db, keydir ):
263 def __init__(self, db, keydir ):
264 self.__dict__.update(locals())
264 self.__dict__.update(locals())
265
265
266 def __getattr__(self,key):
266 def __getattr__(self,key):
267 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
267 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
268 def __setattr__(self,key,val):
268 def __setattr__(self,key,val):
269 self.db[self.keydir+'/' + key] = val
269 self.db[self.keydir+'/' + key] = val
270 def __repr__(self):
270 def __repr__(self):
271 db = self.__dict__['db']
271 db = self.__dict__['db']
272 keys = db.keys( self.__dict__['keydir'] +"/*")
272 keys = db.keys( self.__dict__['keydir'] +"/*")
273 return "<PickleShareLink '%s': %s>" % (
273 return "<PickleShareLink '%s': %s>" % (
274 self.__dict__['keydir'],
274 self.__dict__['keydir'],
275 ";".join([Path(k).basename() for k in keys]))
275 ";".join([Path(k).basename() for k in keys]))
276
276
277
277
278 def test():
278 def test():
279 db = PickleShareDB('~/testpickleshare')
279 db = PickleShareDB('~/testpickleshare')
280 db.clear()
280 db.clear()
281 print "Should be empty:",db.items()
281 print "Should be empty:",db.items()
282 db['hello'] = 15
282 db['hello'] = 15
283 db['aku ankka'] = [1,2,313]
283 db['aku ankka'] = [1,2,313]
284 db['paths/nest/ok/keyname'] = [1,(5,46)]
284 db['paths/nest/ok/keyname'] = [1,(5,46)]
285 db.hset('hash', 'aku', 12)
285 db.hset('hash', 'aku', 12)
286 db.hset('hash', 'ankka', 313)
286 db.hset('hash', 'ankka', 313)
287 print "12 =",db.hget('hash','aku')
287 print "12 =",db.hget('hash','aku')
288 print "313 =",db.hget('hash','ankka')
288 print "313 =",db.hget('hash','ankka')
289 print "all hashed",db.hdict('hash')
289 print "all hashed",db.hdict('hash')
290 print db.keys()
290 print db.keys()
291 print db.keys('paths/nest/ok/k*')
291 print db.keys('paths/nest/ok/k*')
292 print dict(db) # snapsot of whole db
292 print dict(db) # snapsot of whole db
293 db.uncache() # frees memory, causes re-reads later
293 db.uncache() # frees memory, causes re-reads later
294
294
295 # shorthand for accessing deeply nested files
295 # shorthand for accessing deeply nested files
296 lnk = db.getlink('myobjects/test')
296 lnk = db.getlink('myobjects/test')
297 lnk.foo = 2
297 lnk.foo = 2
298 lnk.bar = lnk.foo + 5
298 lnk.bar = lnk.foo + 5
299 print lnk.bar # 7
299 print lnk.bar # 7
300
300
301 def stress():
301 def stress():
302 db = PickleShareDB('~/fsdbtest')
302 db = PickleShareDB('~/fsdbtest')
303 import time,sys
303 import time,sys
304 for i in range(1000):
304 for i in range(1000):
305 for j in range(1000):
305 for j in range(1000):
306 if i % 15 == 0 and i < 200:
306 if i % 15 == 0 and i < 200:
307 if str(j) in db:
307 if str(j) in db:
308 del db[str(j)]
308 del db[str(j)]
309 continue
309 continue
310
310
311 if j%33 == 0:
311 if j%33 == 0:
312 time.sleep(0.02)
312 time.sleep(0.02)
313
313
314 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
314 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
315 db.hset('hash',j, db.hget('hash',j,15) + 1 )
315 db.hset('hash',j, db.hget('hash',j,15) + 1 )
316
316
317 print i,
317 print i,
318 sys.stdout.flush()
318 sys.stdout.flush()
319 if i % 10 == 0:
319 if i % 10 == 0:
320 db.uncache()
320 db.uncache()
321
321
322 def main():
322 def main():
323 import textwrap
323 import textwrap
324 usage = textwrap.dedent("""\
324 usage = textwrap.dedent("""\
325 pickleshare - manage PickleShare databases
325 pickleshare - manage PickleShare databases
326
326
327 Usage:
327 Usage:
328
328
329 pickleshare dump /path/to/db > dump.txt
329 pickleshare dump /path/to/db > dump.txt
330 pickleshare load /path/to/db < dump.txt
330 pickleshare load /path/to/db < dump.txt
331 pickleshare test /path/to/db
331 pickleshare test /path/to/db
332 """)
332 """)
333 DB = PickleShareDB
333 DB = PickleShareDB
334 import sys
334 import sys
335 if len(sys.argv) < 2:
335 if len(sys.argv) < 2:
336 print usage
336 print usage
337 return
337 return
338
338
339 cmd = sys.argv[1]
339 cmd = sys.argv[1]
340 args = sys.argv[2:]
340 args = sys.argv[2:]
341 if cmd == 'dump':
341 if cmd == 'dump':
342 if not args: args= ['.']
342 if not args: args= ['.']
343 db = DB(args[0])
343 db = DB(args[0])
344 import pprint
344 import pprint
345 pprint.pprint(db.items())
345 pprint.pprint(db.items())
346 elif cmd == 'load':
346 elif cmd == 'load':
347 cont = sys.stdin.read()
347 cont = sys.stdin.read()
348 db = DB(args[0])
348 db = DB(args[0])
349 data = eval(cont)
349 data = eval(cont)
350 db.clear()
350 db.clear()
351 for k,v in db.items():
351 for k,v in db.items():
352 db[k] = v
352 db[k] = v
353 elif cmd == 'testwait':
353 elif cmd == 'testwait':
354 db = DB(args[0])
354 db = DB(args[0])
355 db.clear()
355 db.clear()
356 print db.waitget('250')
356 print db.waitget('250')
357 elif cmd == 'test':
357 elif cmd == 'test':
358 test()
358 test()
359 stress()
359 stress()
360
360
361 if __name__== "__main__":
361 if __name__== "__main__":
362 main()
362 main()
363
363
364
364
General Comments 0
You need to be logged in to leave comments. Login now