##// END OF EJS Templates
Make PickleShareDB inherit from collections.MutableMapping abc
Thomas Kluyver -
Show More
@@ -1,356 +1,362 b''
1 1 #!/usr/bin/env python
2 2
3 3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4 4
5 5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 6 shelve, many processes can access the database simultaneously. Changing a
7 7 value in database is immediately visible to other processes accessing the
8 8 same database.
9 9
10 10 Concurrency is possible because the values are stored in separate files. Hence
11 11 the "database" is a directory where *all* files are governed by PickleShare.
12 12
13 13 Example usage::
14 14
15 15 from pickleshare import *
16 16 db = PickleShareDB('~/testpickleshare')
17 17 db.clear()
18 18 print "Should be empty:",db.items()
19 19 db['hello'] = 15
20 20 db['aku ankka'] = [1,2,313]
21 21 db['paths/are/ok/key'] = [1,(5,46)]
22 22 print db.keys()
23 23 del db['aku ankka']
24 24
25 25 This module is certainly not ZODB, but can be used for low-load
26 26 (non-mission-critical) situations where tiny code size trumps the
27 27 advanced features of a "real" object database.
28 28
29 29 Installation guide: easy_install pickleshare
30 30
31 31 Author: Ville Vainio <vivainio@gmail.com>
32 32 License: MIT open source license.
33 33
34 34 """
35 35
36 36 from IPython.external.path import path as Path
37 37 import os,stat,time
38 import collections
38 39 import cPickle as pickle
39 import UserDict
40 40 import glob
41 41
42 42 def gethashfile(key):
43 43 return ("%02x" % abs(hash(key) % 256))[-2:]
44 44
45 45 _sentinel = object()
46 46
47 class PickleShareDB(UserDict.DictMixin):
47 class PickleShareDB(collections.MutableMapping):
48 48 """ The main 'connection' object for PickleShare database """
49 49 def __init__(self,root):
50 50 """ Return a db object that will manage the specied directory"""
51 51 self.root = Path(root).expanduser().abspath()
52 52 if not self.root.isdir():
53 53 self.root.makedirs()
54 54 # cache has { 'key' : (obj, orig_mod_time) }
55 55 self.cache = {}
56 56
57 57
58 58 def __getitem__(self,key):
59 59 """ db['key'] reading """
60 60 fil = self.root / key
61 61 try:
62 62 mtime = (fil.stat()[stat.ST_MTIME])
63 63 except OSError:
64 64 raise KeyError(key)
65 65
66 66 if fil in self.cache and mtime == self.cache[fil][1]:
67 67 return self.cache[fil][0]
68 68 try:
69 69 # The cached item has expired, need to read
70 70 obj = pickle.load(fil.open())
71 71 except:
72 72 raise KeyError(key)
73 73
74 74 self.cache[fil] = (obj,mtime)
75 75 return obj
76 76
77 77 def __setitem__(self,key,value):
78 78 """ db['key'] = 5 """
79 79 fil = self.root / key
80 80 parent = fil.parent
81 81 if parent and not parent.isdir():
82 82 parent.makedirs()
83 83 pickled = pickle.dump(value,fil.open('w'))
84 84 try:
85 85 self.cache[fil] = (value,fil.mtime)
86 86 except OSError,e:
87 87 if e.errno != 2:
88 88 raise
89 89
90 90 def hset(self, hashroot, key, value):
91 91 """ hashed set """
92 92 hroot = self.root / hashroot
93 93 if not hroot.isdir():
94 94 hroot.makedirs()
95 95 hfile = hroot / gethashfile(key)
96 96 d = self.get(hfile, {})
97 97 d.update( {key : value})
98 98 self[hfile] = d
99 99
100 100
101 101
102 102 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
103 103 """ hashed get """
104 104 hroot = self.root / hashroot
105 105 hfile = hroot / gethashfile(key)
106 106
107 107 d = self.get(hfile, _sentinel )
108 108 #print "got dict",d,"from",hfile
109 109 if d is _sentinel:
110 110 if fast_only:
111 111 if default is _sentinel:
112 112 raise KeyError(key)
113 113
114 114 return default
115 115
116 116 # slow mode ok, works even after hcompress()
117 117 d = self.hdict(hashroot)
118 118
119 119 return d.get(key, default)
120 120
121 121 def hdict(self, hashroot):
122 122 """ Get all data contained in hashed category 'hashroot' as dict """
123 123 hfiles = self.keys(hashroot + "/*")
124 124 hfiles.sort()
125 125 last = len(hfiles) and hfiles[-1] or ''
126 126 if last.endswith('xx'):
127 127 # print "using xx"
128 128 hfiles = [last] + hfiles[:-1]
129 129
130 130 all = {}
131 131
132 132 for f in hfiles:
133 133 # print "using",f
134 134 try:
135 135 all.update(self[f])
136 136 except KeyError:
137 137 print "Corrupt",f,"deleted - hset is not threadsafe!"
138 138 del self[f]
139 139
140 140 self.uncache(f)
141 141
142 142 return all
143 143
144 144 def hcompress(self, hashroot):
145 145 """ Compress category 'hashroot', so hset is fast again
146 146
147 147 hget will fail if fast_only is True for compressed items (that were
148 148 hset before hcompress).
149 149
150 150 """
151 151 hfiles = self.keys(hashroot + "/*")
152 152 all = {}
153 153 for f in hfiles:
154 154 # print "using",f
155 155 all.update(self[f])
156 156 self.uncache(f)
157 157
158 158 self[hashroot + '/xx'] = all
159 159 for f in hfiles:
160 160 p = self.root / f
161 161 if p.basename() == 'xx':
162 162 continue
163 163 p.remove()
164 164
165 165
166 166
167 167 def __delitem__(self,key):
168 168 """ del db["key"] """
169 169 fil = self.root / key
170 170 self.cache.pop(fil,None)
171 171 try:
172 172 fil.remove()
173 173 except OSError:
174 174 # notfound and permission denied are ok - we
175 175 # lost, the other process wins the conflict
176 176 pass
177 177
178 178 def _normalized(self, p):
179 179 """ Make a key suitable for user's eyes """
180 180 return str(self.root.relpathto(p)).replace('\\','/')
181 181
182 182 def keys(self, globpat = None):
183 183 """ All keys in DB, or all keys matching a glob"""
184 184
185 185 if globpat is None:
186 186 files = self.root.walkfiles()
187 187 else:
188 188 files = [Path(p) for p in glob.glob(self.root/globpat)]
189 189 return [self._normalized(p) for p in files if p.isfile()]
190
191 def __iter__(self):
192 return iter(keys)
193
194 def __len__(self):
195 return len(keys)
190 196
191 197 def uncache(self,*items):
192 198 """ Removes all, or specified items from cache
193 199
194 200 Use this after reading a large amount of large objects
195 201 to free up memory, when you won't be needing the objects
196 202 for a while.
197 203
198 204 """
199 205 if not items:
200 206 self.cache = {}
201 207 for it in items:
202 208 self.cache.pop(it,None)
203 209
204 210 def waitget(self,key, maxwaittime = 60 ):
205 211 """ Wait (poll) for a key to get a value
206 212
207 213 Will wait for `maxwaittime` seconds before raising a KeyError.
208 214 The call exits normally if the `key` field in db gets a value
209 215 within the timeout period.
210 216
211 217 Use this for synchronizing different processes or for ensuring
212 218 that an unfortunately timed "db['key'] = newvalue" operation
213 219 in another process (which causes all 'get' operation to cause a
214 220 KeyError for the duration of pickling) won't screw up your program
215 221 logic.
216 222 """
217 223
218 224 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
219 225 tries = 0
220 226 waited = 0
221 227 while 1:
222 228 try:
223 229 val = self[key]
224 230 return val
225 231 except KeyError:
226 232 pass
227 233
228 234 if waited > maxwaittime:
229 235 raise KeyError(key)
230 236
231 237 time.sleep(wtimes[tries])
232 238 waited+=wtimes[tries]
233 239 if tries < len(wtimes) -1:
234 240 tries+=1
235 241
236 242 def getlink(self,folder):
237 243 """ Get a convenient link for accessing items """
238 244 return PickleShareLink(self, folder)
239 245
240 246 def __repr__(self):
241 247 return "PickleShareDB('%s')" % self.root
242 248
243 249
244 250
245 251 class PickleShareLink:
246 252 """ A shortdand for accessing nested PickleShare data conveniently.
247 253
248 254 Created through PickleShareDB.getlink(), example::
249 255
250 256 lnk = db.getlink('myobjects/test')
251 257 lnk.foo = 2
252 258 lnk.bar = lnk.foo + 5
253 259
254 260 """
255 261 def __init__(self, db, keydir ):
256 262 self.__dict__.update(locals())
257 263
258 264 def __getattr__(self,key):
259 265 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
260 266 def __setattr__(self,key,val):
261 267 self.db[self.keydir+'/' + key] = val
262 268 def __repr__(self):
263 269 db = self.__dict__['db']
264 270 keys = db.keys( self.__dict__['keydir'] +"/*")
265 271 return "<PickleShareLink '%s': %s>" % (
266 272 self.__dict__['keydir'],
267 273 ";".join([Path(k).basename() for k in keys]))
268 274
269 275
270 276 def test():
271 277 db = PickleShareDB('~/testpickleshare')
272 278 db.clear()
273 279 print "Should be empty:",db.items()
274 280 db['hello'] = 15
275 281 db['aku ankka'] = [1,2,313]
276 282 db['paths/nest/ok/keyname'] = [1,(5,46)]
277 283 db.hset('hash', 'aku', 12)
278 284 db.hset('hash', 'ankka', 313)
279 285 print "12 =",db.hget('hash','aku')
280 286 print "313 =",db.hget('hash','ankka')
281 287 print "all hashed",db.hdict('hash')
282 288 print db.keys()
283 289 print db.keys('paths/nest/ok/k*')
284 290 print dict(db) # snapsot of whole db
285 291 db.uncache() # frees memory, causes re-reads later
286 292
287 293 # shorthand for accessing deeply nested files
288 294 lnk = db.getlink('myobjects/test')
289 295 lnk.foo = 2
290 296 lnk.bar = lnk.foo + 5
291 297 print lnk.bar # 7
292 298
293 299 def stress():
294 300 db = PickleShareDB('~/fsdbtest')
295 301 import time,sys
296 302 for i in range(1000):
297 303 for j in range(1000):
298 304 if i % 15 == 0 and i < 200:
299 305 if str(j) in db:
300 306 del db[str(j)]
301 307 continue
302 308
303 309 if j%33 == 0:
304 310 time.sleep(0.02)
305 311
306 312 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
307 313 db.hset('hash',j, db.hget('hash',j,15) + 1 )
308 314
309 315 print i,
310 316 sys.stdout.flush()
311 317 if i % 10 == 0:
312 318 db.uncache()
313 319
314 320 def main():
315 321 import textwrap
316 322 usage = textwrap.dedent("""\
317 323 pickleshare - manage PickleShare databases
318 324
319 325 Usage:
320 326
321 327 pickleshare dump /path/to/db > dump.txt
322 328 pickleshare load /path/to/db < dump.txt
323 329 pickleshare test /path/to/db
324 330 """)
325 331 DB = PickleShareDB
326 332 import sys
327 333 if len(sys.argv) < 2:
328 334 print usage
329 335 return
330 336
331 337 cmd = sys.argv[1]
332 338 args = sys.argv[2:]
333 339 if cmd == 'dump':
334 340 if not args: args= ['.']
335 341 db = DB(args[0])
336 342 import pprint
337 343 pprint.pprint(db.items())
338 344 elif cmd == 'load':
339 345 cont = sys.stdin.read()
340 346 db = DB(args[0])
341 347 data = eval(cont)
342 348 db.clear()
343 349 for k,v in db.items():
344 350 db[k] = v
345 351 elif cmd == 'testwait':
346 352 db = DB(args[0])
347 353 db.clear()
348 354 print db.waitget('250')
349 355 elif cmd == 'test':
350 356 test()
351 357 stress()
352 358
353 359 if __name__== "__main__":
354 360 main()
355 361
356 362
General Comments 0
You need to be logged in to leave comments. Login now