##// END OF EJS Templates
Make PickleShareDB inherit from collections.MutableMapping abc
Thomas Kluyver -
Show More
@@ -1,356 +1,362 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2
2
3 """ PickleShare - a small 'shelve' like datastore with concurrency support
3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4
4
5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 shelve, many processes can access the database simultaneously. Changing a
6 shelve, many processes can access the database simultaneously. Changing a
7 value in database is immediately visible to other processes accessing the
7 value in database is immediately visible to other processes accessing the
8 same database.
8 same database.
9
9
10 Concurrency is possible because the values are stored in separate files. Hence
10 Concurrency is possible because the values are stored in separate files. Hence
11 the "database" is a directory where *all* files are governed by PickleShare.
11 the "database" is a directory where *all* files are governed by PickleShare.
12
12
13 Example usage::
13 Example usage::
14
14
15 from pickleshare import *
15 from pickleshare import *
16 db = PickleShareDB('~/testpickleshare')
16 db = PickleShareDB('~/testpickleshare')
17 db.clear()
17 db.clear()
18 print "Should be empty:",db.items()
18 print "Should be empty:",db.items()
19 db['hello'] = 15
19 db['hello'] = 15
20 db['aku ankka'] = [1,2,313]
20 db['aku ankka'] = [1,2,313]
21 db['paths/are/ok/key'] = [1,(5,46)]
21 db['paths/are/ok/key'] = [1,(5,46)]
22 print db.keys()
22 print db.keys()
23 del db['aku ankka']
23 del db['aku ankka']
24
24
25 This module is certainly not ZODB, but can be used for low-load
25 This module is certainly not ZODB, but can be used for low-load
26 (non-mission-critical) situations where tiny code size trumps the
26 (non-mission-critical) situations where tiny code size trumps the
27 advanced features of a "real" object database.
27 advanced features of a "real" object database.
28
28
29 Installation guide: easy_install pickleshare
29 Installation guide: easy_install pickleshare
30
30
31 Author: Ville Vainio <vivainio@gmail.com>
31 Author: Ville Vainio <vivainio@gmail.com>
32 License: MIT open source license.
32 License: MIT open source license.
33
33
34 """
34 """
35
35
36 from IPython.external.path import path as Path
36 from IPython.external.path import path as Path
37 import os,stat,time
37 import os,stat,time
38 import collections
38 import cPickle as pickle
39 import cPickle as pickle
39 import UserDict
40 import glob
40 import glob
41
41
42 def gethashfile(key):
42 def gethashfile(key):
43 return ("%02x" % abs(hash(key) % 256))[-2:]
43 return ("%02x" % abs(hash(key) % 256))[-2:]
44
44
45 _sentinel = object()
45 _sentinel = object()
46
46
47 class PickleShareDB(UserDict.DictMixin):
47 class PickleShareDB(collections.MutableMapping):
48 """ The main 'connection' object for PickleShare database """
48 """ The main 'connection' object for PickleShare database """
49 def __init__(self,root):
49 def __init__(self,root):
50 """ Return a db object that will manage the specied directory"""
50 """ Return a db object that will manage the specied directory"""
51 self.root = Path(root).expanduser().abspath()
51 self.root = Path(root).expanduser().abspath()
52 if not self.root.isdir():
52 if not self.root.isdir():
53 self.root.makedirs()
53 self.root.makedirs()
54 # cache has { 'key' : (obj, orig_mod_time) }
54 # cache has { 'key' : (obj, orig_mod_time) }
55 self.cache = {}
55 self.cache = {}
56
56
57
57
58 def __getitem__(self,key):
58 def __getitem__(self,key):
59 """ db['key'] reading """
59 """ db['key'] reading """
60 fil = self.root / key
60 fil = self.root / key
61 try:
61 try:
62 mtime = (fil.stat()[stat.ST_MTIME])
62 mtime = (fil.stat()[stat.ST_MTIME])
63 except OSError:
63 except OSError:
64 raise KeyError(key)
64 raise KeyError(key)
65
65
66 if fil in self.cache and mtime == self.cache[fil][1]:
66 if fil in self.cache and mtime == self.cache[fil][1]:
67 return self.cache[fil][0]
67 return self.cache[fil][0]
68 try:
68 try:
69 # The cached item has expired, need to read
69 # The cached item has expired, need to read
70 obj = pickle.load(fil.open())
70 obj = pickle.load(fil.open())
71 except:
71 except:
72 raise KeyError(key)
72 raise KeyError(key)
73
73
74 self.cache[fil] = (obj,mtime)
74 self.cache[fil] = (obj,mtime)
75 return obj
75 return obj
76
76
77 def __setitem__(self,key,value):
77 def __setitem__(self,key,value):
78 """ db['key'] = 5 """
78 """ db['key'] = 5 """
79 fil = self.root / key
79 fil = self.root / key
80 parent = fil.parent
80 parent = fil.parent
81 if parent and not parent.isdir():
81 if parent and not parent.isdir():
82 parent.makedirs()
82 parent.makedirs()
83 pickled = pickle.dump(value,fil.open('w'))
83 pickled = pickle.dump(value,fil.open('w'))
84 try:
84 try:
85 self.cache[fil] = (value,fil.mtime)
85 self.cache[fil] = (value,fil.mtime)
86 except OSError,e:
86 except OSError,e:
87 if e.errno != 2:
87 if e.errno != 2:
88 raise
88 raise
89
89
90 def hset(self, hashroot, key, value):
90 def hset(self, hashroot, key, value):
91 """ hashed set """
91 """ hashed set """
92 hroot = self.root / hashroot
92 hroot = self.root / hashroot
93 if not hroot.isdir():
93 if not hroot.isdir():
94 hroot.makedirs()
94 hroot.makedirs()
95 hfile = hroot / gethashfile(key)
95 hfile = hroot / gethashfile(key)
96 d = self.get(hfile, {})
96 d = self.get(hfile, {})
97 d.update( {key : value})
97 d.update( {key : value})
98 self[hfile] = d
98 self[hfile] = d
99
99
100
100
101
101
102 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
102 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
103 """ hashed get """
103 """ hashed get """
104 hroot = self.root / hashroot
104 hroot = self.root / hashroot
105 hfile = hroot / gethashfile(key)
105 hfile = hroot / gethashfile(key)
106
106
107 d = self.get(hfile, _sentinel )
107 d = self.get(hfile, _sentinel )
108 #print "got dict",d,"from",hfile
108 #print "got dict",d,"from",hfile
109 if d is _sentinel:
109 if d is _sentinel:
110 if fast_only:
110 if fast_only:
111 if default is _sentinel:
111 if default is _sentinel:
112 raise KeyError(key)
112 raise KeyError(key)
113
113
114 return default
114 return default
115
115
116 # slow mode ok, works even after hcompress()
116 # slow mode ok, works even after hcompress()
117 d = self.hdict(hashroot)
117 d = self.hdict(hashroot)
118
118
119 return d.get(key, default)
119 return d.get(key, default)
120
120
121 def hdict(self, hashroot):
121 def hdict(self, hashroot):
122 """ Get all data contained in hashed category 'hashroot' as dict """
122 """ Get all data contained in hashed category 'hashroot' as dict """
123 hfiles = self.keys(hashroot + "/*")
123 hfiles = self.keys(hashroot + "/*")
124 hfiles.sort()
124 hfiles.sort()
125 last = len(hfiles) and hfiles[-1] or ''
125 last = len(hfiles) and hfiles[-1] or ''
126 if last.endswith('xx'):
126 if last.endswith('xx'):
127 # print "using xx"
127 # print "using xx"
128 hfiles = [last] + hfiles[:-1]
128 hfiles = [last] + hfiles[:-1]
129
129
130 all = {}
130 all = {}
131
131
132 for f in hfiles:
132 for f in hfiles:
133 # print "using",f
133 # print "using",f
134 try:
134 try:
135 all.update(self[f])
135 all.update(self[f])
136 except KeyError:
136 except KeyError:
137 print "Corrupt",f,"deleted - hset is not threadsafe!"
137 print "Corrupt",f,"deleted - hset is not threadsafe!"
138 del self[f]
138 del self[f]
139
139
140 self.uncache(f)
140 self.uncache(f)
141
141
142 return all
142 return all
143
143
144 def hcompress(self, hashroot):
144 def hcompress(self, hashroot):
145 """ Compress category 'hashroot', so hset is fast again
145 """ Compress category 'hashroot', so hset is fast again
146
146
147 hget will fail if fast_only is True for compressed items (that were
147 hget will fail if fast_only is True for compressed items (that were
148 hset before hcompress).
148 hset before hcompress).
149
149
150 """
150 """
151 hfiles = self.keys(hashroot + "/*")
151 hfiles = self.keys(hashroot + "/*")
152 all = {}
152 all = {}
153 for f in hfiles:
153 for f in hfiles:
154 # print "using",f
154 # print "using",f
155 all.update(self[f])
155 all.update(self[f])
156 self.uncache(f)
156 self.uncache(f)
157
157
158 self[hashroot + '/xx'] = all
158 self[hashroot + '/xx'] = all
159 for f in hfiles:
159 for f in hfiles:
160 p = self.root / f
160 p = self.root / f
161 if p.basename() == 'xx':
161 if p.basename() == 'xx':
162 continue
162 continue
163 p.remove()
163 p.remove()
164
164
165
165
166
166
167 def __delitem__(self,key):
167 def __delitem__(self,key):
168 """ del db["key"] """
168 """ del db["key"] """
169 fil = self.root / key
169 fil = self.root / key
170 self.cache.pop(fil,None)
170 self.cache.pop(fil,None)
171 try:
171 try:
172 fil.remove()
172 fil.remove()
173 except OSError:
173 except OSError:
174 # notfound and permission denied are ok - we
174 # notfound and permission denied are ok - we
175 # lost, the other process wins the conflict
175 # lost, the other process wins the conflict
176 pass
176 pass
177
177
178 def _normalized(self, p):
178 def _normalized(self, p):
179 """ Make a key suitable for user's eyes """
179 """ Make a key suitable for user's eyes """
180 return str(self.root.relpathto(p)).replace('\\','/')
180 return str(self.root.relpathto(p)).replace('\\','/')
181
181
182 def keys(self, globpat = None):
182 def keys(self, globpat = None):
183 """ All keys in DB, or all keys matching a glob"""
183 """ All keys in DB, or all keys matching a glob"""
184
184
185 if globpat is None:
185 if globpat is None:
186 files = self.root.walkfiles()
186 files = self.root.walkfiles()
187 else:
187 else:
188 files = [Path(p) for p in glob.glob(self.root/globpat)]
188 files = [Path(p) for p in glob.glob(self.root/globpat)]
189 return [self._normalized(p) for p in files if p.isfile()]
189 return [self._normalized(p) for p in files if p.isfile()]
190
190
191 def __iter__(self):
192 return iter(keys)
193
194 def __len__(self):
195 return len(keys)
196
191 def uncache(self,*items):
197 def uncache(self,*items):
192 """ Removes all, or specified items from cache
198 """ Removes all, or specified items from cache
193
199
194 Use this after reading a large amount of large objects
200 Use this after reading a large amount of large objects
195 to free up memory, when you won't be needing the objects
201 to free up memory, when you won't be needing the objects
196 for a while.
202 for a while.
197
203
198 """
204 """
199 if not items:
205 if not items:
200 self.cache = {}
206 self.cache = {}
201 for it in items:
207 for it in items:
202 self.cache.pop(it,None)
208 self.cache.pop(it,None)
203
209
204 def waitget(self,key, maxwaittime = 60 ):
210 def waitget(self,key, maxwaittime = 60 ):
205 """ Wait (poll) for a key to get a value
211 """ Wait (poll) for a key to get a value
206
212
207 Will wait for `maxwaittime` seconds before raising a KeyError.
213 Will wait for `maxwaittime` seconds before raising a KeyError.
208 The call exits normally if the `key` field in db gets a value
214 The call exits normally if the `key` field in db gets a value
209 within the timeout period.
215 within the timeout period.
210
216
211 Use this for synchronizing different processes or for ensuring
217 Use this for synchronizing different processes or for ensuring
212 that an unfortunately timed "db['key'] = newvalue" operation
218 that an unfortunately timed "db['key'] = newvalue" operation
213 in another process (which causes all 'get' operation to cause a
219 in another process (which causes all 'get' operation to cause a
214 KeyError for the duration of pickling) won't screw up your program
220 KeyError for the duration of pickling) won't screw up your program
215 logic.
221 logic.
216 """
222 """
217
223
218 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
224 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
219 tries = 0
225 tries = 0
220 waited = 0
226 waited = 0
221 while 1:
227 while 1:
222 try:
228 try:
223 val = self[key]
229 val = self[key]
224 return val
230 return val
225 except KeyError:
231 except KeyError:
226 pass
232 pass
227
233
228 if waited > maxwaittime:
234 if waited > maxwaittime:
229 raise KeyError(key)
235 raise KeyError(key)
230
236
231 time.sleep(wtimes[tries])
237 time.sleep(wtimes[tries])
232 waited+=wtimes[tries]
238 waited+=wtimes[tries]
233 if tries < len(wtimes) -1:
239 if tries < len(wtimes) -1:
234 tries+=1
240 tries+=1
235
241
236 def getlink(self,folder):
242 def getlink(self,folder):
237 """ Get a convenient link for accessing items """
243 """ Get a convenient link for accessing items """
238 return PickleShareLink(self, folder)
244 return PickleShareLink(self, folder)
239
245
240 def __repr__(self):
246 def __repr__(self):
241 return "PickleShareDB('%s')" % self.root
247 return "PickleShareDB('%s')" % self.root
242
248
243
249
244
250
245 class PickleShareLink:
251 class PickleShareLink:
246 """ A shortdand for accessing nested PickleShare data conveniently.
252 """ A shortdand for accessing nested PickleShare data conveniently.
247
253
248 Created through PickleShareDB.getlink(), example::
254 Created through PickleShareDB.getlink(), example::
249
255
250 lnk = db.getlink('myobjects/test')
256 lnk = db.getlink('myobjects/test')
251 lnk.foo = 2
257 lnk.foo = 2
252 lnk.bar = lnk.foo + 5
258 lnk.bar = lnk.foo + 5
253
259
254 """
260 """
255 def __init__(self, db, keydir ):
261 def __init__(self, db, keydir ):
256 self.__dict__.update(locals())
262 self.__dict__.update(locals())
257
263
258 def __getattr__(self,key):
264 def __getattr__(self,key):
259 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
265 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
260 def __setattr__(self,key,val):
266 def __setattr__(self,key,val):
261 self.db[self.keydir+'/' + key] = val
267 self.db[self.keydir+'/' + key] = val
262 def __repr__(self):
268 def __repr__(self):
263 db = self.__dict__['db']
269 db = self.__dict__['db']
264 keys = db.keys( self.__dict__['keydir'] +"/*")
270 keys = db.keys( self.__dict__['keydir'] +"/*")
265 return "<PickleShareLink '%s': %s>" % (
271 return "<PickleShareLink '%s': %s>" % (
266 self.__dict__['keydir'],
272 self.__dict__['keydir'],
267 ";".join([Path(k).basename() for k in keys]))
273 ";".join([Path(k).basename() for k in keys]))
268
274
269
275
270 def test():
276 def test():
271 db = PickleShareDB('~/testpickleshare')
277 db = PickleShareDB('~/testpickleshare')
272 db.clear()
278 db.clear()
273 print "Should be empty:",db.items()
279 print "Should be empty:",db.items()
274 db['hello'] = 15
280 db['hello'] = 15
275 db['aku ankka'] = [1,2,313]
281 db['aku ankka'] = [1,2,313]
276 db['paths/nest/ok/keyname'] = [1,(5,46)]
282 db['paths/nest/ok/keyname'] = [1,(5,46)]
277 db.hset('hash', 'aku', 12)
283 db.hset('hash', 'aku', 12)
278 db.hset('hash', 'ankka', 313)
284 db.hset('hash', 'ankka', 313)
279 print "12 =",db.hget('hash','aku')
285 print "12 =",db.hget('hash','aku')
280 print "313 =",db.hget('hash','ankka')
286 print "313 =",db.hget('hash','ankka')
281 print "all hashed",db.hdict('hash')
287 print "all hashed",db.hdict('hash')
282 print db.keys()
288 print db.keys()
283 print db.keys('paths/nest/ok/k*')
289 print db.keys('paths/nest/ok/k*')
284 print dict(db) # snapsot of whole db
290 print dict(db) # snapsot of whole db
285 db.uncache() # frees memory, causes re-reads later
291 db.uncache() # frees memory, causes re-reads later
286
292
287 # shorthand for accessing deeply nested files
293 # shorthand for accessing deeply nested files
288 lnk = db.getlink('myobjects/test')
294 lnk = db.getlink('myobjects/test')
289 lnk.foo = 2
295 lnk.foo = 2
290 lnk.bar = lnk.foo + 5
296 lnk.bar = lnk.foo + 5
291 print lnk.bar # 7
297 print lnk.bar # 7
292
298
293 def stress():
299 def stress():
294 db = PickleShareDB('~/fsdbtest')
300 db = PickleShareDB('~/fsdbtest')
295 import time,sys
301 import time,sys
296 for i in range(1000):
302 for i in range(1000):
297 for j in range(1000):
303 for j in range(1000):
298 if i % 15 == 0 and i < 200:
304 if i % 15 == 0 and i < 200:
299 if str(j) in db:
305 if str(j) in db:
300 del db[str(j)]
306 del db[str(j)]
301 continue
307 continue
302
308
303 if j%33 == 0:
309 if j%33 == 0:
304 time.sleep(0.02)
310 time.sleep(0.02)
305
311
306 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
312 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
307 db.hset('hash',j, db.hget('hash',j,15) + 1 )
313 db.hset('hash',j, db.hget('hash',j,15) + 1 )
308
314
309 print i,
315 print i,
310 sys.stdout.flush()
316 sys.stdout.flush()
311 if i % 10 == 0:
317 if i % 10 == 0:
312 db.uncache()
318 db.uncache()
313
319
314 def main():
320 def main():
315 import textwrap
321 import textwrap
316 usage = textwrap.dedent("""\
322 usage = textwrap.dedent("""\
317 pickleshare - manage PickleShare databases
323 pickleshare - manage PickleShare databases
318
324
319 Usage:
325 Usage:
320
326
321 pickleshare dump /path/to/db > dump.txt
327 pickleshare dump /path/to/db > dump.txt
322 pickleshare load /path/to/db < dump.txt
328 pickleshare load /path/to/db < dump.txt
323 pickleshare test /path/to/db
329 pickleshare test /path/to/db
324 """)
330 """)
325 DB = PickleShareDB
331 DB = PickleShareDB
326 import sys
332 import sys
327 if len(sys.argv) < 2:
333 if len(sys.argv) < 2:
328 print usage
334 print usage
329 return
335 return
330
336
331 cmd = sys.argv[1]
337 cmd = sys.argv[1]
332 args = sys.argv[2:]
338 args = sys.argv[2:]
333 if cmd == 'dump':
339 if cmd == 'dump':
334 if not args: args= ['.']
340 if not args: args= ['.']
335 db = DB(args[0])
341 db = DB(args[0])
336 import pprint
342 import pprint
337 pprint.pprint(db.items())
343 pprint.pprint(db.items())
338 elif cmd == 'load':
344 elif cmd == 'load':
339 cont = sys.stdin.read()
345 cont = sys.stdin.read()
340 db = DB(args[0])
346 db = DB(args[0])
341 data = eval(cont)
347 data = eval(cont)
342 db.clear()
348 db.clear()
343 for k,v in db.items():
349 for k,v in db.items():
344 db[k] = v
350 db[k] = v
345 elif cmd == 'testwait':
351 elif cmd == 'testwait':
346 db = DB(args[0])
352 db = DB(args[0])
347 db.clear()
353 db.clear()
348 print db.waitget('250')
354 print db.waitget('250')
349 elif cmd == 'test':
355 elif cmd == 'test':
350 test()
356 test()
351 stress()
357 stress()
352
358
353 if __name__== "__main__":
359 if __name__== "__main__":
354 main()
360 main()
355
361
356
362
General Comments 0
You need to be logged in to leave comments. Login now