##// END OF EJS Templates
Specify protocol 2 for pickleshare, so objects stored by Python 3 can be used in Python 2.
Thomas Kluyver -
Show More
@@ -1,362 +1,364 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2
2
3 """ PickleShare - a small 'shelve' like datastore with concurrency support
3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4
4
5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 shelve, many processes can access the database simultaneously. Changing a
6 shelve, many processes can access the database simultaneously. Changing a
7 value in database is immediately visible to other processes accessing the
7 value in database is immediately visible to other processes accessing the
8 same database.
8 same database.
9
9
10 Concurrency is possible because the values are stored in separate files. Hence
10 Concurrency is possible because the values are stored in separate files. Hence
11 the "database" is a directory where *all* files are governed by PickleShare.
11 the "database" is a directory where *all* files are governed by PickleShare.
12
12
13 Example usage::
13 Example usage::
14
14
15 from pickleshare import *
15 from pickleshare import *
16 db = PickleShareDB('~/testpickleshare')
16 db = PickleShareDB('~/testpickleshare')
17 db.clear()
17 db.clear()
18 print "Should be empty:",db.items()
18 print "Should be empty:",db.items()
19 db['hello'] = 15
19 db['hello'] = 15
20 db['aku ankka'] = [1,2,313]
20 db['aku ankka'] = [1,2,313]
21 db['paths/are/ok/key'] = [1,(5,46)]
21 db['paths/are/ok/key'] = [1,(5,46)]
22 print db.keys()
22 print db.keys()
23 del db['aku ankka']
23 del db['aku ankka']
24
24
25 This module is certainly not ZODB, but can be used for low-load
25 This module is certainly not ZODB, but can be used for low-load
26 (non-mission-critical) situations where tiny code size trumps the
26 (non-mission-critical) situations where tiny code size trumps the
27 advanced features of a "real" object database.
27 advanced features of a "real" object database.
28
28
29 Installation guide: easy_install pickleshare
29 Installation guide: easy_install pickleshare
30
30
31 Author: Ville Vainio <vivainio@gmail.com>
31 Author: Ville Vainio <vivainio@gmail.com>
32 License: MIT open source license.
32 License: MIT open source license.
33
33
34 """
34 """
35
35
36 from IPython.external.path import path as Path
36 from IPython.external.path import path as Path
37 import os,stat,time
37 import os,stat,time
38 import collections
38 import collections
39 import cPickle as pickle
39 import cPickle as pickle
40 import glob
40 import glob
41
41
42 def gethashfile(key):
42 def gethashfile(key):
43 return ("%02x" % abs(hash(key) % 256))[-2:]
43 return ("%02x" % abs(hash(key) % 256))[-2:]
44
44
45 _sentinel = object()
45 _sentinel = object()
46
46
47 class PickleShareDB(collections.MutableMapping):
47 class PickleShareDB(collections.MutableMapping):
48 """ The main 'connection' object for PickleShare database """
48 """ The main 'connection' object for PickleShare database """
49 def __init__(self,root):
49 def __init__(self,root):
50 """ Return a db object that will manage the specied directory"""
50 """ Return a db object that will manage the specied directory"""
51 self.root = Path(root).expanduser().abspath()
51 self.root = Path(root).expanduser().abspath()
52 if not self.root.isdir():
52 if not self.root.isdir():
53 self.root.makedirs()
53 self.root.makedirs()
54 # cache has { 'key' : (obj, orig_mod_time) }
54 # cache has { 'key' : (obj, orig_mod_time) }
55 self.cache = {}
55 self.cache = {}
56
56
57
57
58 def __getitem__(self,key):
58 def __getitem__(self,key):
59 """ db['key'] reading """
59 """ db['key'] reading """
60 fil = self.root / key
60 fil = self.root / key
61 try:
61 try:
62 mtime = (fil.stat()[stat.ST_MTIME])
62 mtime = (fil.stat()[stat.ST_MTIME])
63 except OSError:
63 except OSError:
64 raise KeyError(key)
64 raise KeyError(key)
65
65
66 if fil in self.cache and mtime == self.cache[fil][1]:
66 if fil in self.cache and mtime == self.cache[fil][1]:
67 return self.cache[fil][0]
67 return self.cache[fil][0]
68 try:
68 try:
69 # The cached item has expired, need to read
69 # The cached item has expired, need to read
70 obj = pickle.loads(fil.open("rbU").read())
70 obj = pickle.loads(fil.open("rbU").read())
71 except:
71 except:
72 raise KeyError(key)
72 raise KeyError(key)
73
73
74 self.cache[fil] = (obj,mtime)
74 self.cache[fil] = (obj,mtime)
75 return obj
75 return obj
76
76
77 def __setitem__(self,key,value):
77 def __setitem__(self,key,value):
78 """ db['key'] = 5 """
78 """ db['key'] = 5 """
79 fil = self.root / key
79 fil = self.root / key
80 parent = fil.parent
80 parent = fil.parent
81 if parent and not parent.isdir():
81 if parent and not parent.isdir():
82 parent.makedirs()
82 parent.makedirs()
83 pickled = pickle.dump(value,fil.open('wb'))
83 # We specify protocol 2, so that we can mostly go between Python 2
84 # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
85 pickled = pickle.dump(value,fil.open('wb'), protocol=2)
84 try:
86 try:
85 self.cache[fil] = (value,fil.mtime)
87 self.cache[fil] = (value,fil.mtime)
86 except OSError,e:
88 except OSError,e:
87 if e.errno != 2:
89 if e.errno != 2:
88 raise
90 raise
89
91
90 def hset(self, hashroot, key, value):
92 def hset(self, hashroot, key, value):
91 """ hashed set """
93 """ hashed set """
92 hroot = self.root / hashroot
94 hroot = self.root / hashroot
93 if not hroot.isdir():
95 if not hroot.isdir():
94 hroot.makedirs()
96 hroot.makedirs()
95 hfile = hroot / gethashfile(key)
97 hfile = hroot / gethashfile(key)
96 d = self.get(hfile, {})
98 d = self.get(hfile, {})
97 d.update( {key : value})
99 d.update( {key : value})
98 self[hfile] = d
100 self[hfile] = d
99
101
100
102
101
103
102 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
104 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
103 """ hashed get """
105 """ hashed get """
104 hroot = self.root / hashroot
106 hroot = self.root / hashroot
105 hfile = hroot / gethashfile(key)
107 hfile = hroot / gethashfile(key)
106
108
107 d = self.get(hfile, _sentinel )
109 d = self.get(hfile, _sentinel )
108 #print "got dict",d,"from",hfile
110 #print "got dict",d,"from",hfile
109 if d is _sentinel:
111 if d is _sentinel:
110 if fast_only:
112 if fast_only:
111 if default is _sentinel:
113 if default is _sentinel:
112 raise KeyError(key)
114 raise KeyError(key)
113
115
114 return default
116 return default
115
117
116 # slow mode ok, works even after hcompress()
118 # slow mode ok, works even after hcompress()
117 d = self.hdict(hashroot)
119 d = self.hdict(hashroot)
118
120
119 return d.get(key, default)
121 return d.get(key, default)
120
122
121 def hdict(self, hashroot):
123 def hdict(self, hashroot):
122 """ Get all data contained in hashed category 'hashroot' as dict """
124 """ Get all data contained in hashed category 'hashroot' as dict """
123 hfiles = self.keys(hashroot + "/*")
125 hfiles = self.keys(hashroot + "/*")
124 hfiles.sort()
126 hfiles.sort()
125 last = len(hfiles) and hfiles[-1] or ''
127 last = len(hfiles) and hfiles[-1] or ''
126 if last.endswith('xx'):
128 if last.endswith('xx'):
127 # print "using xx"
129 # print "using xx"
128 hfiles = [last] + hfiles[:-1]
130 hfiles = [last] + hfiles[:-1]
129
131
130 all = {}
132 all = {}
131
133
132 for f in hfiles:
134 for f in hfiles:
133 # print "using",f
135 # print "using",f
134 try:
136 try:
135 all.update(self[f])
137 all.update(self[f])
136 except KeyError:
138 except KeyError:
137 print "Corrupt",f,"deleted - hset is not threadsafe!"
139 print "Corrupt",f,"deleted - hset is not threadsafe!"
138 del self[f]
140 del self[f]
139
141
140 self.uncache(f)
142 self.uncache(f)
141
143
142 return all
144 return all
143
145
144 def hcompress(self, hashroot):
146 def hcompress(self, hashroot):
145 """ Compress category 'hashroot', so hset is fast again
147 """ Compress category 'hashroot', so hset is fast again
146
148
147 hget will fail if fast_only is True for compressed items (that were
149 hget will fail if fast_only is True for compressed items (that were
148 hset before hcompress).
150 hset before hcompress).
149
151
150 """
152 """
151 hfiles = self.keys(hashroot + "/*")
153 hfiles = self.keys(hashroot + "/*")
152 all = {}
154 all = {}
153 for f in hfiles:
155 for f in hfiles:
154 # print "using",f
156 # print "using",f
155 all.update(self[f])
157 all.update(self[f])
156 self.uncache(f)
158 self.uncache(f)
157
159
158 self[hashroot + '/xx'] = all
160 self[hashroot + '/xx'] = all
159 for f in hfiles:
161 for f in hfiles:
160 p = self.root / f
162 p = self.root / f
161 if p.basename() == 'xx':
163 if p.basename() == 'xx':
162 continue
164 continue
163 p.remove()
165 p.remove()
164
166
165
167
166
168
167 def __delitem__(self,key):
169 def __delitem__(self,key):
168 """ del db["key"] """
170 """ del db["key"] """
169 fil = self.root / key
171 fil = self.root / key
170 self.cache.pop(fil,None)
172 self.cache.pop(fil,None)
171 try:
173 try:
172 fil.remove()
174 fil.remove()
173 except OSError:
175 except OSError:
174 # notfound and permission denied are ok - we
176 # notfound and permission denied are ok - we
175 # lost, the other process wins the conflict
177 # lost, the other process wins the conflict
176 pass
178 pass
177
179
178 def _normalized(self, p):
180 def _normalized(self, p):
179 """ Make a key suitable for user's eyes """
181 """ Make a key suitable for user's eyes """
180 return str(self.root.relpathto(p)).replace('\\','/')
182 return str(self.root.relpathto(p)).replace('\\','/')
181
183
182 def keys(self, globpat = None):
184 def keys(self, globpat = None):
183 """ All keys in DB, or all keys matching a glob"""
185 """ All keys in DB, or all keys matching a glob"""
184
186
185 if globpat is None:
187 if globpat is None:
186 files = self.root.walkfiles()
188 files = self.root.walkfiles()
187 else:
189 else:
188 files = [Path(p) for p in glob.glob(self.root/globpat)]
190 files = [Path(p) for p in glob.glob(self.root/globpat)]
189 return [self._normalized(p) for p in files if p.isfile()]
191 return [self._normalized(p) for p in files if p.isfile()]
190
192
191 def __iter__(self):
193 def __iter__(self):
192 return iter(keys)
194 return iter(keys)
193
195
194 def __len__(self):
196 def __len__(self):
195 return len(keys)
197 return len(keys)
196
198
197 def uncache(self,*items):
199 def uncache(self,*items):
198 """ Removes all, or specified items from cache
200 """ Removes all, or specified items from cache
199
201
200 Use this after reading a large amount of large objects
202 Use this after reading a large amount of large objects
201 to free up memory, when you won't be needing the objects
203 to free up memory, when you won't be needing the objects
202 for a while.
204 for a while.
203
205
204 """
206 """
205 if not items:
207 if not items:
206 self.cache = {}
208 self.cache = {}
207 for it in items:
209 for it in items:
208 self.cache.pop(it,None)
210 self.cache.pop(it,None)
209
211
210 def waitget(self,key, maxwaittime = 60 ):
212 def waitget(self,key, maxwaittime = 60 ):
211 """ Wait (poll) for a key to get a value
213 """ Wait (poll) for a key to get a value
212
214
213 Will wait for `maxwaittime` seconds before raising a KeyError.
215 Will wait for `maxwaittime` seconds before raising a KeyError.
214 The call exits normally if the `key` field in db gets a value
216 The call exits normally if the `key` field in db gets a value
215 within the timeout period.
217 within the timeout period.
216
218
217 Use this for synchronizing different processes or for ensuring
219 Use this for synchronizing different processes or for ensuring
218 that an unfortunately timed "db['key'] = newvalue" operation
220 that an unfortunately timed "db['key'] = newvalue" operation
219 in another process (which causes all 'get' operation to cause a
221 in another process (which causes all 'get' operation to cause a
220 KeyError for the duration of pickling) won't screw up your program
222 KeyError for the duration of pickling) won't screw up your program
221 logic.
223 logic.
222 """
224 """
223
225
224 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
226 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
225 tries = 0
227 tries = 0
226 waited = 0
228 waited = 0
227 while 1:
229 while 1:
228 try:
230 try:
229 val = self[key]
231 val = self[key]
230 return val
232 return val
231 except KeyError:
233 except KeyError:
232 pass
234 pass
233
235
234 if waited > maxwaittime:
236 if waited > maxwaittime:
235 raise KeyError(key)
237 raise KeyError(key)
236
238
237 time.sleep(wtimes[tries])
239 time.sleep(wtimes[tries])
238 waited+=wtimes[tries]
240 waited+=wtimes[tries]
239 if tries < len(wtimes) -1:
241 if tries < len(wtimes) -1:
240 tries+=1
242 tries+=1
241
243
242 def getlink(self,folder):
244 def getlink(self,folder):
243 """ Get a convenient link for accessing items """
245 """ Get a convenient link for accessing items """
244 return PickleShareLink(self, folder)
246 return PickleShareLink(self, folder)
245
247
246 def __repr__(self):
248 def __repr__(self):
247 return "PickleShareDB('%s')" % self.root
249 return "PickleShareDB('%s')" % self.root
248
250
249
251
250
252
251 class PickleShareLink:
253 class PickleShareLink:
252 """ A shortdand for accessing nested PickleShare data conveniently.
254 """ A shortdand for accessing nested PickleShare data conveniently.
253
255
254 Created through PickleShareDB.getlink(), example::
256 Created through PickleShareDB.getlink(), example::
255
257
256 lnk = db.getlink('myobjects/test')
258 lnk = db.getlink('myobjects/test')
257 lnk.foo = 2
259 lnk.foo = 2
258 lnk.bar = lnk.foo + 5
260 lnk.bar = lnk.foo + 5
259
261
260 """
262 """
261 def __init__(self, db, keydir ):
263 def __init__(self, db, keydir ):
262 self.__dict__.update(locals())
264 self.__dict__.update(locals())
263
265
264 def __getattr__(self,key):
266 def __getattr__(self,key):
265 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
267 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
266 def __setattr__(self,key,val):
268 def __setattr__(self,key,val):
267 self.db[self.keydir+'/' + key] = val
269 self.db[self.keydir+'/' + key] = val
268 def __repr__(self):
270 def __repr__(self):
269 db = self.__dict__['db']
271 db = self.__dict__['db']
270 keys = db.keys( self.__dict__['keydir'] +"/*")
272 keys = db.keys( self.__dict__['keydir'] +"/*")
271 return "<PickleShareLink '%s': %s>" % (
273 return "<PickleShareLink '%s': %s>" % (
272 self.__dict__['keydir'],
274 self.__dict__['keydir'],
273 ";".join([Path(k).basename() for k in keys]))
275 ";".join([Path(k).basename() for k in keys]))
274
276
275
277
276 def test():
278 def test():
277 db = PickleShareDB('~/testpickleshare')
279 db = PickleShareDB('~/testpickleshare')
278 db.clear()
280 db.clear()
279 print "Should be empty:",db.items()
281 print "Should be empty:",db.items()
280 db['hello'] = 15
282 db['hello'] = 15
281 db['aku ankka'] = [1,2,313]
283 db['aku ankka'] = [1,2,313]
282 db['paths/nest/ok/keyname'] = [1,(5,46)]
284 db['paths/nest/ok/keyname'] = [1,(5,46)]
283 db.hset('hash', 'aku', 12)
285 db.hset('hash', 'aku', 12)
284 db.hset('hash', 'ankka', 313)
286 db.hset('hash', 'ankka', 313)
285 print "12 =",db.hget('hash','aku')
287 print "12 =",db.hget('hash','aku')
286 print "313 =",db.hget('hash','ankka')
288 print "313 =",db.hget('hash','ankka')
287 print "all hashed",db.hdict('hash')
289 print "all hashed",db.hdict('hash')
288 print db.keys()
290 print db.keys()
289 print db.keys('paths/nest/ok/k*')
291 print db.keys('paths/nest/ok/k*')
290 print dict(db) # snapsot of whole db
292 print dict(db) # snapsot of whole db
291 db.uncache() # frees memory, causes re-reads later
293 db.uncache() # frees memory, causes re-reads later
292
294
293 # shorthand for accessing deeply nested files
295 # shorthand for accessing deeply nested files
294 lnk = db.getlink('myobjects/test')
296 lnk = db.getlink('myobjects/test')
295 lnk.foo = 2
297 lnk.foo = 2
296 lnk.bar = lnk.foo + 5
298 lnk.bar = lnk.foo + 5
297 print lnk.bar # 7
299 print lnk.bar # 7
298
300
299 def stress():
301 def stress():
300 db = PickleShareDB('~/fsdbtest')
302 db = PickleShareDB('~/fsdbtest')
301 import time,sys
303 import time,sys
302 for i in range(1000):
304 for i in range(1000):
303 for j in range(1000):
305 for j in range(1000):
304 if i % 15 == 0 and i < 200:
306 if i % 15 == 0 and i < 200:
305 if str(j) in db:
307 if str(j) in db:
306 del db[str(j)]
308 del db[str(j)]
307 continue
309 continue
308
310
309 if j%33 == 0:
311 if j%33 == 0:
310 time.sleep(0.02)
312 time.sleep(0.02)
311
313
312 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
314 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
313 db.hset('hash',j, db.hget('hash',j,15) + 1 )
315 db.hset('hash',j, db.hget('hash',j,15) + 1 )
314
316
315 print i,
317 print i,
316 sys.stdout.flush()
318 sys.stdout.flush()
317 if i % 10 == 0:
319 if i % 10 == 0:
318 db.uncache()
320 db.uncache()
319
321
320 def main():
322 def main():
321 import textwrap
323 import textwrap
322 usage = textwrap.dedent("""\
324 usage = textwrap.dedent("""\
323 pickleshare - manage PickleShare databases
325 pickleshare - manage PickleShare databases
324
326
325 Usage:
327 Usage:
326
328
327 pickleshare dump /path/to/db > dump.txt
329 pickleshare dump /path/to/db > dump.txt
328 pickleshare load /path/to/db < dump.txt
330 pickleshare load /path/to/db < dump.txt
329 pickleshare test /path/to/db
331 pickleshare test /path/to/db
330 """)
332 """)
331 DB = PickleShareDB
333 DB = PickleShareDB
332 import sys
334 import sys
333 if len(sys.argv) < 2:
335 if len(sys.argv) < 2:
334 print usage
336 print usage
335 return
337 return
336
338
337 cmd = sys.argv[1]
339 cmd = sys.argv[1]
338 args = sys.argv[2:]
340 args = sys.argv[2:]
339 if cmd == 'dump':
341 if cmd == 'dump':
340 if not args: args= ['.']
342 if not args: args= ['.']
341 db = DB(args[0])
343 db = DB(args[0])
342 import pprint
344 import pprint
343 pprint.pprint(db.items())
345 pprint.pprint(db.items())
344 elif cmd == 'load':
346 elif cmd == 'load':
345 cont = sys.stdin.read()
347 cont = sys.stdin.read()
346 db = DB(args[0])
348 db = DB(args[0])
347 data = eval(cont)
349 data = eval(cont)
348 db.clear()
350 db.clear()
349 for k,v in db.items():
351 for k,v in db.items():
350 db[k] = v
352 db[k] = v
351 elif cmd == 'testwait':
353 elif cmd == 'testwait':
352 db = DB(args[0])
354 db = DB(args[0])
353 db.clear()
355 db.clear()
354 print db.waitget('250')
356 print db.waitget('250')
355 elif cmd == 'test':
357 elif cmd == 'test':
356 test()
358 test()
357 stress()
359 stress()
358
360
359 if __name__== "__main__":
361 if __name__== "__main__":
360 main()
362 main()
361
363
362
364
General Comments 0
You need to be logged in to leave comments. Login now