##// END OF EJS Templates
use universal-newline in pickleshare...
Min Ragan-Kelley -
Show More
@@ -1,362 +1,362 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2
2
3 """ PickleShare - a small 'shelve' like datastore with concurrency support
3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4
4
5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 shelve, many processes can access the database simultaneously. Changing a
6 shelve, many processes can access the database simultaneously. Changing a
7 value in database is immediately visible to other processes accessing the
7 value in database is immediately visible to other processes accessing the
8 same database.
8 same database.
9
9
10 Concurrency is possible because the values are stored in separate files. Hence
10 Concurrency is possible because the values are stored in separate files. Hence
11 the "database" is a directory where *all* files are governed by PickleShare.
11 the "database" is a directory where *all* files are governed by PickleShare.
12
12
13 Example usage::
13 Example usage::
14
14
15 from pickleshare import *
15 from pickleshare import *
16 db = PickleShareDB('~/testpickleshare')
16 db = PickleShareDB('~/testpickleshare')
17 db.clear()
17 db.clear()
18 print "Should be empty:",db.items()
18 print "Should be empty:",db.items()
19 db['hello'] = 15
19 db['hello'] = 15
20 db['aku ankka'] = [1,2,313]
20 db['aku ankka'] = [1,2,313]
21 db['paths/are/ok/key'] = [1,(5,46)]
21 db['paths/are/ok/key'] = [1,(5,46)]
22 print db.keys()
22 print db.keys()
23 del db['aku ankka']
23 del db['aku ankka']
24
24
25 This module is certainly not ZODB, but can be used for low-load
25 This module is certainly not ZODB, but can be used for low-load
26 (non-mission-critical) situations where tiny code size trumps the
26 (non-mission-critical) situations where tiny code size trumps the
27 advanced features of a "real" object database.
27 advanced features of a "real" object database.
28
28
29 Installation guide: easy_install pickleshare
29 Installation guide: easy_install pickleshare
30
30
31 Author: Ville Vainio <vivainio@gmail.com>
31 Author: Ville Vainio <vivainio@gmail.com>
32 License: MIT open source license.
32 License: MIT open source license.
33
33
34 """
34 """
35
35
36 from IPython.external.path import path as Path
36 from IPython.external.path import path as Path
37 import os,stat,time
37 import os,stat,time
38 import collections
38 import collections
39 import cPickle as pickle
39 import cPickle as pickle
40 import glob
40 import glob
41
41
42 def gethashfile(key):
42 def gethashfile(key):
43 return ("%02x" % abs(hash(key) % 256))[-2:]
43 return ("%02x" % abs(hash(key) % 256))[-2:]
44
44
45 _sentinel = object()
45 _sentinel = object()
46
46
47 class PickleShareDB(collections.MutableMapping):
47 class PickleShareDB(collections.MutableMapping):
48 """ The main 'connection' object for PickleShare database """
48 """ The main 'connection' object for PickleShare database """
49 def __init__(self,root):
49 def __init__(self,root):
50 """ Return a db object that will manage the specied directory"""
50 """ Return a db object that will manage the specied directory"""
51 self.root = Path(root).expanduser().abspath()
51 self.root = Path(root).expanduser().abspath()
52 if not self.root.isdir():
52 if not self.root.isdir():
53 self.root.makedirs()
53 self.root.makedirs()
54 # cache has { 'key' : (obj, orig_mod_time) }
54 # cache has { 'key' : (obj, orig_mod_time) }
55 self.cache = {}
55 self.cache = {}
56
56
57
57
58 def __getitem__(self,key):
58 def __getitem__(self,key):
59 """ db['key'] reading """
59 """ db['key'] reading """
60 fil = self.root / key
60 fil = self.root / key
61 try:
61 try:
62 mtime = (fil.stat()[stat.ST_MTIME])
62 mtime = (fil.stat()[stat.ST_MTIME])
63 except OSError:
63 except OSError:
64 raise KeyError(key)
64 raise KeyError(key)
65
65
66 if fil in self.cache and mtime == self.cache[fil][1]:
66 if fil in self.cache and mtime == self.cache[fil][1]:
67 return self.cache[fil][0]
67 return self.cache[fil][0]
68 try:
68 try:
69 # The cached item has expired, need to read
69 # The cached item has expired, need to read
70 obj = pickle.load(fil.open("rb"))
70 obj = pickle.loads(fil.open("rbU").read())
71 except:
71 except:
72 raise KeyError(key)
72 raise KeyError(key)
73
73
74 self.cache[fil] = (obj,mtime)
74 self.cache[fil] = (obj,mtime)
75 return obj
75 return obj
76
76
77 def __setitem__(self,key,value):
77 def __setitem__(self,key,value):
78 """ db['key'] = 5 """
78 """ db['key'] = 5 """
79 fil = self.root / key
79 fil = self.root / key
80 parent = fil.parent
80 parent = fil.parent
81 if parent and not parent.isdir():
81 if parent and not parent.isdir():
82 parent.makedirs()
82 parent.makedirs()
83 pickled = pickle.dump(value,fil.open('wb'))
83 pickled = pickle.dump(value,fil.open('wb'))
84 try:
84 try:
85 self.cache[fil] = (value,fil.mtime)
85 self.cache[fil] = (value,fil.mtime)
86 except OSError,e:
86 except OSError,e:
87 if e.errno != 2:
87 if e.errno != 2:
88 raise
88 raise
89
89
90 def hset(self, hashroot, key, value):
90 def hset(self, hashroot, key, value):
91 """ hashed set """
91 """ hashed set """
92 hroot = self.root / hashroot
92 hroot = self.root / hashroot
93 if not hroot.isdir():
93 if not hroot.isdir():
94 hroot.makedirs()
94 hroot.makedirs()
95 hfile = hroot / gethashfile(key)
95 hfile = hroot / gethashfile(key)
96 d = self.get(hfile, {})
96 d = self.get(hfile, {})
97 d.update( {key : value})
97 d.update( {key : value})
98 self[hfile] = d
98 self[hfile] = d
99
99
100
100
101
101
102 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
102 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
103 """ hashed get """
103 """ hashed get """
104 hroot = self.root / hashroot
104 hroot = self.root / hashroot
105 hfile = hroot / gethashfile(key)
105 hfile = hroot / gethashfile(key)
106
106
107 d = self.get(hfile, _sentinel )
107 d = self.get(hfile, _sentinel )
108 #print "got dict",d,"from",hfile
108 #print "got dict",d,"from",hfile
109 if d is _sentinel:
109 if d is _sentinel:
110 if fast_only:
110 if fast_only:
111 if default is _sentinel:
111 if default is _sentinel:
112 raise KeyError(key)
112 raise KeyError(key)
113
113
114 return default
114 return default
115
115
116 # slow mode ok, works even after hcompress()
116 # slow mode ok, works even after hcompress()
117 d = self.hdict(hashroot)
117 d = self.hdict(hashroot)
118
118
119 return d.get(key, default)
119 return d.get(key, default)
120
120
121 def hdict(self, hashroot):
121 def hdict(self, hashroot):
122 """ Get all data contained in hashed category 'hashroot' as dict """
122 """ Get all data contained in hashed category 'hashroot' as dict """
123 hfiles = self.keys(hashroot + "/*")
123 hfiles = self.keys(hashroot + "/*")
124 hfiles.sort()
124 hfiles.sort()
125 last = len(hfiles) and hfiles[-1] or ''
125 last = len(hfiles) and hfiles[-1] or ''
126 if last.endswith('xx'):
126 if last.endswith('xx'):
127 # print "using xx"
127 # print "using xx"
128 hfiles = [last] + hfiles[:-1]
128 hfiles = [last] + hfiles[:-1]
129
129
130 all = {}
130 all = {}
131
131
132 for f in hfiles:
132 for f in hfiles:
133 # print "using",f
133 # print "using",f
134 try:
134 try:
135 all.update(self[f])
135 all.update(self[f])
136 except KeyError:
136 except KeyError:
137 print "Corrupt",f,"deleted - hset is not threadsafe!"
137 print "Corrupt",f,"deleted - hset is not threadsafe!"
138 del self[f]
138 del self[f]
139
139
140 self.uncache(f)
140 self.uncache(f)
141
141
142 return all
142 return all
143
143
144 def hcompress(self, hashroot):
144 def hcompress(self, hashroot):
145 """ Compress category 'hashroot', so hset is fast again
145 """ Compress category 'hashroot', so hset is fast again
146
146
147 hget will fail if fast_only is True for compressed items (that were
147 hget will fail if fast_only is True for compressed items (that were
148 hset before hcompress).
148 hset before hcompress).
149
149
150 """
150 """
151 hfiles = self.keys(hashroot + "/*")
151 hfiles = self.keys(hashroot + "/*")
152 all = {}
152 all = {}
153 for f in hfiles:
153 for f in hfiles:
154 # print "using",f
154 # print "using",f
155 all.update(self[f])
155 all.update(self[f])
156 self.uncache(f)
156 self.uncache(f)
157
157
158 self[hashroot + '/xx'] = all
158 self[hashroot + '/xx'] = all
159 for f in hfiles:
159 for f in hfiles:
160 p = self.root / f
160 p = self.root / f
161 if p.basename() == 'xx':
161 if p.basename() == 'xx':
162 continue
162 continue
163 p.remove()
163 p.remove()
164
164
165
165
166
166
167 def __delitem__(self,key):
167 def __delitem__(self,key):
168 """ del db["key"] """
168 """ del db["key"] """
169 fil = self.root / key
169 fil = self.root / key
170 self.cache.pop(fil,None)
170 self.cache.pop(fil,None)
171 try:
171 try:
172 fil.remove()
172 fil.remove()
173 except OSError:
173 except OSError:
174 # notfound and permission denied are ok - we
174 # notfound and permission denied are ok - we
175 # lost, the other process wins the conflict
175 # lost, the other process wins the conflict
176 pass
176 pass
177
177
178 def _normalized(self, p):
178 def _normalized(self, p):
179 """ Make a key suitable for user's eyes """
179 """ Make a key suitable for user's eyes """
180 return str(self.root.relpathto(p)).replace('\\','/')
180 return str(self.root.relpathto(p)).replace('\\','/')
181
181
182 def keys(self, globpat = None):
182 def keys(self, globpat = None):
183 """ All keys in DB, or all keys matching a glob"""
183 """ All keys in DB, or all keys matching a glob"""
184
184
185 if globpat is None:
185 if globpat is None:
186 files = self.root.walkfiles()
186 files = self.root.walkfiles()
187 else:
187 else:
188 files = [Path(p) for p in glob.glob(self.root/globpat)]
188 files = [Path(p) for p in glob.glob(self.root/globpat)]
189 return [self._normalized(p) for p in files if p.isfile()]
189 return [self._normalized(p) for p in files if p.isfile()]
190
190
191 def __iter__(self):
191 def __iter__(self):
192 return iter(keys)
192 return iter(keys)
193
193
194 def __len__(self):
194 def __len__(self):
195 return len(keys)
195 return len(keys)
196
196
197 def uncache(self,*items):
197 def uncache(self,*items):
198 """ Removes all, or specified items from cache
198 """ Removes all, or specified items from cache
199
199
200 Use this after reading a large amount of large objects
200 Use this after reading a large amount of large objects
201 to free up memory, when you won't be needing the objects
201 to free up memory, when you won't be needing the objects
202 for a while.
202 for a while.
203
203
204 """
204 """
205 if not items:
205 if not items:
206 self.cache = {}
206 self.cache = {}
207 for it in items:
207 for it in items:
208 self.cache.pop(it,None)
208 self.cache.pop(it,None)
209
209
210 def waitget(self,key, maxwaittime = 60 ):
210 def waitget(self,key, maxwaittime = 60 ):
211 """ Wait (poll) for a key to get a value
211 """ Wait (poll) for a key to get a value
212
212
213 Will wait for `maxwaittime` seconds before raising a KeyError.
213 Will wait for `maxwaittime` seconds before raising a KeyError.
214 The call exits normally if the `key` field in db gets a value
214 The call exits normally if the `key` field in db gets a value
215 within the timeout period.
215 within the timeout period.
216
216
217 Use this for synchronizing different processes or for ensuring
217 Use this for synchronizing different processes or for ensuring
218 that an unfortunately timed "db['key'] = newvalue" operation
218 that an unfortunately timed "db['key'] = newvalue" operation
219 in another process (which causes all 'get' operation to cause a
219 in another process (which causes all 'get' operation to cause a
220 KeyError for the duration of pickling) won't screw up your program
220 KeyError for the duration of pickling) won't screw up your program
221 logic.
221 logic.
222 """
222 """
223
223
224 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
224 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
225 tries = 0
225 tries = 0
226 waited = 0
226 waited = 0
227 while 1:
227 while 1:
228 try:
228 try:
229 val = self[key]
229 val = self[key]
230 return val
230 return val
231 except KeyError:
231 except KeyError:
232 pass
232 pass
233
233
234 if waited > maxwaittime:
234 if waited > maxwaittime:
235 raise KeyError(key)
235 raise KeyError(key)
236
236
237 time.sleep(wtimes[tries])
237 time.sleep(wtimes[tries])
238 waited+=wtimes[tries]
238 waited+=wtimes[tries]
239 if tries < len(wtimes) -1:
239 if tries < len(wtimes) -1:
240 tries+=1
240 tries+=1
241
241
242 def getlink(self,folder):
242 def getlink(self,folder):
243 """ Get a convenient link for accessing items """
243 """ Get a convenient link for accessing items """
244 return PickleShareLink(self, folder)
244 return PickleShareLink(self, folder)
245
245
246 def __repr__(self):
246 def __repr__(self):
247 return "PickleShareDB('%s')" % self.root
247 return "PickleShareDB('%s')" % self.root
248
248
249
249
250
250
251 class PickleShareLink:
251 class PickleShareLink:
252 """ A shortdand for accessing nested PickleShare data conveniently.
252 """ A shortdand for accessing nested PickleShare data conveniently.
253
253
254 Created through PickleShareDB.getlink(), example::
254 Created through PickleShareDB.getlink(), example::
255
255
256 lnk = db.getlink('myobjects/test')
256 lnk = db.getlink('myobjects/test')
257 lnk.foo = 2
257 lnk.foo = 2
258 lnk.bar = lnk.foo + 5
258 lnk.bar = lnk.foo + 5
259
259
260 """
260 """
261 def __init__(self, db, keydir ):
261 def __init__(self, db, keydir ):
262 self.__dict__.update(locals())
262 self.__dict__.update(locals())
263
263
264 def __getattr__(self,key):
264 def __getattr__(self,key):
265 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
265 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
266 def __setattr__(self,key,val):
266 def __setattr__(self,key,val):
267 self.db[self.keydir+'/' + key] = val
267 self.db[self.keydir+'/' + key] = val
268 def __repr__(self):
268 def __repr__(self):
269 db = self.__dict__['db']
269 db = self.__dict__['db']
270 keys = db.keys( self.__dict__['keydir'] +"/*")
270 keys = db.keys( self.__dict__['keydir'] +"/*")
271 return "<PickleShareLink '%s': %s>" % (
271 return "<PickleShareLink '%s': %s>" % (
272 self.__dict__['keydir'],
272 self.__dict__['keydir'],
273 ";".join([Path(k).basename() for k in keys]))
273 ";".join([Path(k).basename() for k in keys]))
274
274
275
275
276 def test():
276 def test():
277 db = PickleShareDB('~/testpickleshare')
277 db = PickleShareDB('~/testpickleshare')
278 db.clear()
278 db.clear()
279 print "Should be empty:",db.items()
279 print "Should be empty:",db.items()
280 db['hello'] = 15
280 db['hello'] = 15
281 db['aku ankka'] = [1,2,313]
281 db['aku ankka'] = [1,2,313]
282 db['paths/nest/ok/keyname'] = [1,(5,46)]
282 db['paths/nest/ok/keyname'] = [1,(5,46)]
283 db.hset('hash', 'aku', 12)
283 db.hset('hash', 'aku', 12)
284 db.hset('hash', 'ankka', 313)
284 db.hset('hash', 'ankka', 313)
285 print "12 =",db.hget('hash','aku')
285 print "12 =",db.hget('hash','aku')
286 print "313 =",db.hget('hash','ankka')
286 print "313 =",db.hget('hash','ankka')
287 print "all hashed",db.hdict('hash')
287 print "all hashed",db.hdict('hash')
288 print db.keys()
288 print db.keys()
289 print db.keys('paths/nest/ok/k*')
289 print db.keys('paths/nest/ok/k*')
290 print dict(db) # snapsot of whole db
290 print dict(db) # snapsot of whole db
291 db.uncache() # frees memory, causes re-reads later
291 db.uncache() # frees memory, causes re-reads later
292
292
293 # shorthand for accessing deeply nested files
293 # shorthand for accessing deeply nested files
294 lnk = db.getlink('myobjects/test')
294 lnk = db.getlink('myobjects/test')
295 lnk.foo = 2
295 lnk.foo = 2
296 lnk.bar = lnk.foo + 5
296 lnk.bar = lnk.foo + 5
297 print lnk.bar # 7
297 print lnk.bar # 7
298
298
299 def stress():
299 def stress():
300 db = PickleShareDB('~/fsdbtest')
300 db = PickleShareDB('~/fsdbtest')
301 import time,sys
301 import time,sys
302 for i in range(1000):
302 for i in range(1000):
303 for j in range(1000):
303 for j in range(1000):
304 if i % 15 == 0 and i < 200:
304 if i % 15 == 0 and i < 200:
305 if str(j) in db:
305 if str(j) in db:
306 del db[str(j)]
306 del db[str(j)]
307 continue
307 continue
308
308
309 if j%33 == 0:
309 if j%33 == 0:
310 time.sleep(0.02)
310 time.sleep(0.02)
311
311
312 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
312 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
313 db.hset('hash',j, db.hget('hash',j,15) + 1 )
313 db.hset('hash',j, db.hget('hash',j,15) + 1 )
314
314
315 print i,
315 print i,
316 sys.stdout.flush()
316 sys.stdout.flush()
317 if i % 10 == 0:
317 if i % 10 == 0:
318 db.uncache()
318 db.uncache()
319
319
320 def main():
320 def main():
321 import textwrap
321 import textwrap
322 usage = textwrap.dedent("""\
322 usage = textwrap.dedent("""\
323 pickleshare - manage PickleShare databases
323 pickleshare - manage PickleShare databases
324
324
325 Usage:
325 Usage:
326
326
327 pickleshare dump /path/to/db > dump.txt
327 pickleshare dump /path/to/db > dump.txt
328 pickleshare load /path/to/db < dump.txt
328 pickleshare load /path/to/db < dump.txt
329 pickleshare test /path/to/db
329 pickleshare test /path/to/db
330 """)
330 """)
331 DB = PickleShareDB
331 DB = PickleShareDB
332 import sys
332 import sys
333 if len(sys.argv) < 2:
333 if len(sys.argv) < 2:
334 print usage
334 print usage
335 return
335 return
336
336
337 cmd = sys.argv[1]
337 cmd = sys.argv[1]
338 args = sys.argv[2:]
338 args = sys.argv[2:]
339 if cmd == 'dump':
339 if cmd == 'dump':
340 if not args: args= ['.']
340 if not args: args= ['.']
341 db = DB(args[0])
341 db = DB(args[0])
342 import pprint
342 import pprint
343 pprint.pprint(db.items())
343 pprint.pprint(db.items())
344 elif cmd == 'load':
344 elif cmd == 'load':
345 cont = sys.stdin.read()
345 cont = sys.stdin.read()
346 db = DB(args[0])
346 db = DB(args[0])
347 data = eval(cont)
347 data = eval(cont)
348 db.clear()
348 db.clear()
349 for k,v in db.items():
349 for k,v in db.items():
350 db[k] = v
350 db[k] = v
351 elif cmd == 'testwait':
351 elif cmd == 'testwait':
352 db = DB(args[0])
352 db = DB(args[0])
353 db.clear()
353 db.clear()
354 print db.waitget('250')
354 print db.waitget('250')
355 elif cmd == 'test':
355 elif cmd == 'test':
356 test()
356 test()
357 stress()
357 stress()
358
358
359 if __name__== "__main__":
359 if __name__== "__main__":
360 main()
360 main()
361
361
362
362
General Comments 0
You need to be logged in to leave comments. Login now