##// END OF EJS Templates
Ensure files are closed after reading.
Bradley M. Froehle -
Show More
@@ -1,364 +1,366 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2
2
3 """ PickleShare - a small 'shelve' like datastore with concurrency support
3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4
4
5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 shelve, many processes can access the database simultaneously. Changing a
6 shelve, many processes can access the database simultaneously. Changing a
7 value in database is immediately visible to other processes accessing the
7 value in database is immediately visible to other processes accessing the
8 same database.
8 same database.
9
9
10 Concurrency is possible because the values are stored in separate files. Hence
10 Concurrency is possible because the values are stored in separate files. Hence
11 the "database" is a directory where *all* files are governed by PickleShare.
11 the "database" is a directory where *all* files are governed by PickleShare.
12
12
13 Example usage::
13 Example usage::
14
14
15 from pickleshare import *
15 from pickleshare import *
16 db = PickleShareDB('~/testpickleshare')
16 db = PickleShareDB('~/testpickleshare')
17 db.clear()
17 db.clear()
18 print "Should be empty:",db.items()
18 print "Should be empty:",db.items()
19 db['hello'] = 15
19 db['hello'] = 15
20 db['aku ankka'] = [1,2,313]
20 db['aku ankka'] = [1,2,313]
21 db['paths/are/ok/key'] = [1,(5,46)]
21 db['paths/are/ok/key'] = [1,(5,46)]
22 print db.keys()
22 print db.keys()
23 del db['aku ankka']
23 del db['aku ankka']
24
24
25 This module is certainly not ZODB, but can be used for low-load
25 This module is certainly not ZODB, but can be used for low-load
26 (non-mission-critical) situations where tiny code size trumps the
26 (non-mission-critical) situations where tiny code size trumps the
27 advanced features of a "real" object database.
27 advanced features of a "real" object database.
28
28
29 Installation guide: easy_install pickleshare
29 Installation guide: easy_install pickleshare
30
30
31 Author: Ville Vainio <vivainio@gmail.com>
31 Author: Ville Vainio <vivainio@gmail.com>
32 License: MIT open source license.
32 License: MIT open source license.
33
33
34 """
34 """
35
35
36 from IPython.external.path import path as Path
36 from IPython.external.path import path as Path
37 import os,stat,time
37 import os,stat,time
38 import collections
38 import collections
39 import cPickle as pickle
39 import cPickle as pickle
40 import glob
40 import glob
41
41
42 def gethashfile(key):
42 def gethashfile(key):
43 return ("%02x" % abs(hash(key) % 256))[-2:]
43 return ("%02x" % abs(hash(key) % 256))[-2:]
44
44
45 _sentinel = object()
45 _sentinel = object()
46
46
47 class PickleShareDB(collections.MutableMapping):
47 class PickleShareDB(collections.MutableMapping):
48 """ The main 'connection' object for PickleShare database """
48 """ The main 'connection' object for PickleShare database """
49 def __init__(self,root):
49 def __init__(self,root):
50 """ Return a db object that will manage the specied directory"""
50 """ Return a db object that will manage the specied directory"""
51 self.root = Path(root).expanduser().abspath()
51 self.root = Path(root).expanduser().abspath()
52 if not self.root.isdir():
52 if not self.root.isdir():
53 self.root.makedirs()
53 self.root.makedirs()
54 # cache has { 'key' : (obj, orig_mod_time) }
54 # cache has { 'key' : (obj, orig_mod_time) }
55 self.cache = {}
55 self.cache = {}
56
56
57
57
58 def __getitem__(self,key):
58 def __getitem__(self,key):
59 """ db['key'] reading """
59 """ db['key'] reading """
60 fil = self.root / key
60 fil = self.root / key
61 try:
61 try:
62 mtime = (fil.stat()[stat.ST_MTIME])
62 mtime = (fil.stat()[stat.ST_MTIME])
63 except OSError:
63 except OSError:
64 raise KeyError(key)
64 raise KeyError(key)
65
65
66 if fil in self.cache and mtime == self.cache[fil][1]:
66 if fil in self.cache and mtime == self.cache[fil][1]:
67 return self.cache[fil][0]
67 return self.cache[fil][0]
68 try:
68 try:
69 # The cached item has expired, need to read
69 # The cached item has expired, need to read
70 obj = pickle.loads(fil.open("rb").read())
70 with fil.open("rb") as f:
71 obj = pickle.loads(f.read())
71 except:
72 except:
72 raise KeyError(key)
73 raise KeyError(key)
73
74
74 self.cache[fil] = (obj,mtime)
75 self.cache[fil] = (obj,mtime)
75 return obj
76 return obj
76
77
77 def __setitem__(self,key,value):
78 def __setitem__(self,key,value):
78 """ db['key'] = 5 """
79 """ db['key'] = 5 """
79 fil = self.root / key
80 fil = self.root / key
80 parent = fil.parent
81 parent = fil.parent
81 if parent and not parent.isdir():
82 if parent and not parent.isdir():
82 parent.makedirs()
83 parent.makedirs()
83 # We specify protocol 2, so that we can mostly go between Python 2
84 # We specify protocol 2, so that we can mostly go between Python 2
84 # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
85 # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
85 pickled = pickle.dump(value,fil.open('wb'), protocol=2)
86 with fil.open('wb') as f:
87 pickled = pickle.dump(value, f, protocol=2)
86 try:
88 try:
87 self.cache[fil] = (value,fil.mtime)
89 self.cache[fil] = (value,fil.mtime)
88 except OSError,e:
90 except OSError,e:
89 if e.errno != 2:
91 if e.errno != 2:
90 raise
92 raise
91
93
92 def hset(self, hashroot, key, value):
94 def hset(self, hashroot, key, value):
93 """ hashed set """
95 """ hashed set """
94 hroot = self.root / hashroot
96 hroot = self.root / hashroot
95 if not hroot.isdir():
97 if not hroot.isdir():
96 hroot.makedirs()
98 hroot.makedirs()
97 hfile = hroot / gethashfile(key)
99 hfile = hroot / gethashfile(key)
98 d = self.get(hfile, {})
100 d = self.get(hfile, {})
99 d.update( {key : value})
101 d.update( {key : value})
100 self[hfile] = d
102 self[hfile] = d
101
103
102
104
103
105
104 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
106 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
105 """ hashed get """
107 """ hashed get """
106 hroot = self.root / hashroot
108 hroot = self.root / hashroot
107 hfile = hroot / gethashfile(key)
109 hfile = hroot / gethashfile(key)
108
110
109 d = self.get(hfile, _sentinel )
111 d = self.get(hfile, _sentinel )
110 #print "got dict",d,"from",hfile
112 #print "got dict",d,"from",hfile
111 if d is _sentinel:
113 if d is _sentinel:
112 if fast_only:
114 if fast_only:
113 if default is _sentinel:
115 if default is _sentinel:
114 raise KeyError(key)
116 raise KeyError(key)
115
117
116 return default
118 return default
117
119
118 # slow mode ok, works even after hcompress()
120 # slow mode ok, works even after hcompress()
119 d = self.hdict(hashroot)
121 d = self.hdict(hashroot)
120
122
121 return d.get(key, default)
123 return d.get(key, default)
122
124
123 def hdict(self, hashroot):
125 def hdict(self, hashroot):
124 """ Get all data contained in hashed category 'hashroot' as dict """
126 """ Get all data contained in hashed category 'hashroot' as dict """
125 hfiles = self.keys(hashroot + "/*")
127 hfiles = self.keys(hashroot + "/*")
126 hfiles.sort()
128 hfiles.sort()
127 last = len(hfiles) and hfiles[-1] or ''
129 last = len(hfiles) and hfiles[-1] or ''
128 if last.endswith('xx'):
130 if last.endswith('xx'):
129 # print "using xx"
131 # print "using xx"
130 hfiles = [last] + hfiles[:-1]
132 hfiles = [last] + hfiles[:-1]
131
133
132 all = {}
134 all = {}
133
135
134 for f in hfiles:
136 for f in hfiles:
135 # print "using",f
137 # print "using",f
136 try:
138 try:
137 all.update(self[f])
139 all.update(self[f])
138 except KeyError:
140 except KeyError:
139 print "Corrupt",f,"deleted - hset is not threadsafe!"
141 print "Corrupt",f,"deleted - hset is not threadsafe!"
140 del self[f]
142 del self[f]
141
143
142 self.uncache(f)
144 self.uncache(f)
143
145
144 return all
146 return all
145
147
146 def hcompress(self, hashroot):
148 def hcompress(self, hashroot):
147 """ Compress category 'hashroot', so hset is fast again
149 """ Compress category 'hashroot', so hset is fast again
148
150
149 hget will fail if fast_only is True for compressed items (that were
151 hget will fail if fast_only is True for compressed items (that were
150 hset before hcompress).
152 hset before hcompress).
151
153
152 """
154 """
153 hfiles = self.keys(hashroot + "/*")
155 hfiles = self.keys(hashroot + "/*")
154 all = {}
156 all = {}
155 for f in hfiles:
157 for f in hfiles:
156 # print "using",f
158 # print "using",f
157 all.update(self[f])
159 all.update(self[f])
158 self.uncache(f)
160 self.uncache(f)
159
161
160 self[hashroot + '/xx'] = all
162 self[hashroot + '/xx'] = all
161 for f in hfiles:
163 for f in hfiles:
162 p = self.root / f
164 p = self.root / f
163 if p.basename() == 'xx':
165 if p.basename() == 'xx':
164 continue
166 continue
165 p.remove()
167 p.remove()
166
168
167
169
168
170
169 def __delitem__(self,key):
171 def __delitem__(self,key):
170 """ del db["key"] """
172 """ del db["key"] """
171 fil = self.root / key
173 fil = self.root / key
172 self.cache.pop(fil,None)
174 self.cache.pop(fil,None)
173 try:
175 try:
174 fil.remove()
176 fil.remove()
175 except OSError:
177 except OSError:
176 # notfound and permission denied are ok - we
178 # notfound and permission denied are ok - we
177 # lost, the other process wins the conflict
179 # lost, the other process wins the conflict
178 pass
180 pass
179
181
180 def _normalized(self, p):
182 def _normalized(self, p):
181 """ Make a key suitable for user's eyes """
183 """ Make a key suitable for user's eyes """
182 return str(self.root.relpathto(p)).replace('\\','/')
184 return str(self.root.relpathto(p)).replace('\\','/')
183
185
184 def keys(self, globpat = None):
186 def keys(self, globpat = None):
185 """ All keys in DB, or all keys matching a glob"""
187 """ All keys in DB, or all keys matching a glob"""
186
188
187 if globpat is None:
189 if globpat is None:
188 files = self.root.walkfiles()
190 files = self.root.walkfiles()
189 else:
191 else:
190 files = [Path(p) for p in glob.glob(self.root/globpat)]
192 files = [Path(p) for p in glob.glob(self.root/globpat)]
191 return [self._normalized(p) for p in files if p.isfile()]
193 return [self._normalized(p) for p in files if p.isfile()]
192
194
193 def __iter__(self):
195 def __iter__(self):
194 return iter(self.keys())
196 return iter(self.keys())
195
197
196 def __len__(self):
198 def __len__(self):
197 return len(self.keys())
199 return len(self.keys())
198
200
199 def uncache(self,*items):
201 def uncache(self,*items):
200 """ Removes all, or specified items from cache
202 """ Removes all, or specified items from cache
201
203
202 Use this after reading a large amount of large objects
204 Use this after reading a large amount of large objects
203 to free up memory, when you won't be needing the objects
205 to free up memory, when you won't be needing the objects
204 for a while.
206 for a while.
205
207
206 """
208 """
207 if not items:
209 if not items:
208 self.cache = {}
210 self.cache = {}
209 for it in items:
211 for it in items:
210 self.cache.pop(it,None)
212 self.cache.pop(it,None)
211
213
212 def waitget(self,key, maxwaittime = 60 ):
214 def waitget(self,key, maxwaittime = 60 ):
213 """ Wait (poll) for a key to get a value
215 """ Wait (poll) for a key to get a value
214
216
215 Will wait for `maxwaittime` seconds before raising a KeyError.
217 Will wait for `maxwaittime` seconds before raising a KeyError.
216 The call exits normally if the `key` field in db gets a value
218 The call exits normally if the `key` field in db gets a value
217 within the timeout period.
219 within the timeout period.
218
220
219 Use this for synchronizing different processes or for ensuring
221 Use this for synchronizing different processes or for ensuring
220 that an unfortunately timed "db['key'] = newvalue" operation
222 that an unfortunately timed "db['key'] = newvalue" operation
221 in another process (which causes all 'get' operation to cause a
223 in another process (which causes all 'get' operation to cause a
222 KeyError for the duration of pickling) won't screw up your program
224 KeyError for the duration of pickling) won't screw up your program
223 logic.
225 logic.
224 """
226 """
225
227
226 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
228 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
227 tries = 0
229 tries = 0
228 waited = 0
230 waited = 0
229 while 1:
231 while 1:
230 try:
232 try:
231 val = self[key]
233 val = self[key]
232 return val
234 return val
233 except KeyError:
235 except KeyError:
234 pass
236 pass
235
237
236 if waited > maxwaittime:
238 if waited > maxwaittime:
237 raise KeyError(key)
239 raise KeyError(key)
238
240
239 time.sleep(wtimes[tries])
241 time.sleep(wtimes[tries])
240 waited+=wtimes[tries]
242 waited+=wtimes[tries]
241 if tries < len(wtimes) -1:
243 if tries < len(wtimes) -1:
242 tries+=1
244 tries+=1
243
245
244 def getlink(self,folder):
246 def getlink(self,folder):
245 """ Get a convenient link for accessing items """
247 """ Get a convenient link for accessing items """
246 return PickleShareLink(self, folder)
248 return PickleShareLink(self, folder)
247
249
248 def __repr__(self):
250 def __repr__(self):
249 return "PickleShareDB('%s')" % self.root
251 return "PickleShareDB('%s')" % self.root
250
252
251
253
252
254
253 class PickleShareLink:
255 class PickleShareLink:
254 """ A shortdand for accessing nested PickleShare data conveniently.
256 """ A shortdand for accessing nested PickleShare data conveniently.
255
257
256 Created through PickleShareDB.getlink(), example::
258 Created through PickleShareDB.getlink(), example::
257
259
258 lnk = db.getlink('myobjects/test')
260 lnk = db.getlink('myobjects/test')
259 lnk.foo = 2
261 lnk.foo = 2
260 lnk.bar = lnk.foo + 5
262 lnk.bar = lnk.foo + 5
261
263
262 """
264 """
263 def __init__(self, db, keydir ):
265 def __init__(self, db, keydir ):
264 self.__dict__.update(locals())
266 self.__dict__.update(locals())
265
267
266 def __getattr__(self,key):
268 def __getattr__(self,key):
267 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
269 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
268 def __setattr__(self,key,val):
270 def __setattr__(self,key,val):
269 self.db[self.keydir+'/' + key] = val
271 self.db[self.keydir+'/' + key] = val
270 def __repr__(self):
272 def __repr__(self):
271 db = self.__dict__['db']
273 db = self.__dict__['db']
272 keys = db.keys( self.__dict__['keydir'] +"/*")
274 keys = db.keys( self.__dict__['keydir'] +"/*")
273 return "<PickleShareLink '%s': %s>" % (
275 return "<PickleShareLink '%s': %s>" % (
274 self.__dict__['keydir'],
276 self.__dict__['keydir'],
275 ";".join([Path(k).basename() for k in keys]))
277 ";".join([Path(k).basename() for k in keys]))
276
278
277
279
278 def test():
280 def test():
279 db = PickleShareDB('~/testpickleshare')
281 db = PickleShareDB('~/testpickleshare')
280 db.clear()
282 db.clear()
281 print "Should be empty:",db.items()
283 print "Should be empty:",db.items()
282 db['hello'] = 15
284 db['hello'] = 15
283 db['aku ankka'] = [1,2,313]
285 db['aku ankka'] = [1,2,313]
284 db['paths/nest/ok/keyname'] = [1,(5,46)]
286 db['paths/nest/ok/keyname'] = [1,(5,46)]
285 db.hset('hash', 'aku', 12)
287 db.hset('hash', 'aku', 12)
286 db.hset('hash', 'ankka', 313)
288 db.hset('hash', 'ankka', 313)
287 print "12 =",db.hget('hash','aku')
289 print "12 =",db.hget('hash','aku')
288 print "313 =",db.hget('hash','ankka')
290 print "313 =",db.hget('hash','ankka')
289 print "all hashed",db.hdict('hash')
291 print "all hashed",db.hdict('hash')
290 print db.keys()
292 print db.keys()
291 print db.keys('paths/nest/ok/k*')
293 print db.keys('paths/nest/ok/k*')
292 print dict(db) # snapsot of whole db
294 print dict(db) # snapsot of whole db
293 db.uncache() # frees memory, causes re-reads later
295 db.uncache() # frees memory, causes re-reads later
294
296
295 # shorthand for accessing deeply nested files
297 # shorthand for accessing deeply nested files
296 lnk = db.getlink('myobjects/test')
298 lnk = db.getlink('myobjects/test')
297 lnk.foo = 2
299 lnk.foo = 2
298 lnk.bar = lnk.foo + 5
300 lnk.bar = lnk.foo + 5
299 print lnk.bar # 7
301 print lnk.bar # 7
300
302
301 def stress():
303 def stress():
302 db = PickleShareDB('~/fsdbtest')
304 db = PickleShareDB('~/fsdbtest')
303 import time,sys
305 import time,sys
304 for i in range(1000):
306 for i in range(1000):
305 for j in range(1000):
307 for j in range(1000):
306 if i % 15 == 0 and i < 200:
308 if i % 15 == 0 and i < 200:
307 if str(j) in db:
309 if str(j) in db:
308 del db[str(j)]
310 del db[str(j)]
309 continue
311 continue
310
312
311 if j%33 == 0:
313 if j%33 == 0:
312 time.sleep(0.02)
314 time.sleep(0.02)
313
315
314 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
316 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
315 db.hset('hash',j, db.hget('hash',j,15) + 1 )
317 db.hset('hash',j, db.hget('hash',j,15) + 1 )
316
318
317 print i,
319 print i,
318 sys.stdout.flush()
320 sys.stdout.flush()
319 if i % 10 == 0:
321 if i % 10 == 0:
320 db.uncache()
322 db.uncache()
321
323
322 def main():
324 def main():
323 import textwrap
325 import textwrap
324 usage = textwrap.dedent("""\
326 usage = textwrap.dedent("""\
325 pickleshare - manage PickleShare databases
327 pickleshare - manage PickleShare databases
326
328
327 Usage:
329 Usage:
328
330
329 pickleshare dump /path/to/db > dump.txt
331 pickleshare dump /path/to/db > dump.txt
330 pickleshare load /path/to/db < dump.txt
332 pickleshare load /path/to/db < dump.txt
331 pickleshare test /path/to/db
333 pickleshare test /path/to/db
332 """)
334 """)
333 DB = PickleShareDB
335 DB = PickleShareDB
334 import sys
336 import sys
335 if len(sys.argv) < 2:
337 if len(sys.argv) < 2:
336 print usage
338 print usage
337 return
339 return
338
340
339 cmd = sys.argv[1]
341 cmd = sys.argv[1]
340 args = sys.argv[2:]
342 args = sys.argv[2:]
341 if cmd == 'dump':
343 if cmd == 'dump':
342 if not args: args= ['.']
344 if not args: args= ['.']
343 db = DB(args[0])
345 db = DB(args[0])
344 import pprint
346 import pprint
345 pprint.pprint(db.items())
347 pprint.pprint(db.items())
346 elif cmd == 'load':
348 elif cmd == 'load':
347 cont = sys.stdin.read()
349 cont = sys.stdin.read()
348 db = DB(args[0])
350 db = DB(args[0])
349 data = eval(cont)
351 data = eval(cont)
350 db.clear()
352 db.clear()
351 for k,v in db.items():
353 for k,v in db.items():
352 db[k] = v
354 db[k] = v
353 elif cmd == 'testwait':
355 elif cmd == 'testwait':
354 db = DB(args[0])
356 db = DB(args[0])
355 db.clear()
357 db.clear()
356 print db.waitget('250')
358 print db.waitget('250')
357 elif cmd == 'test':
359 elif cmd == 'test':
358 test()
360 test()
359 stress()
361 stress()
360
362
361 if __name__== "__main__":
363 if __name__== "__main__":
362 main()
364 main()
363
365
364
366
General Comments 0
You need to be logged in to leave comments. Login now