##// END OF EJS Templates
expedite IPython.parallel tests...
expedite IPython.parallel tests Fewer engines are added during the test suite, by counting the absolute number of engines, rather than repeatedly adding a minimum number.

File last commit:

r5413:7869eba7
r6162:91932a06
Show More
pickleshare.py
364 lines | 10.1 KiB | text/x-python | PythonLexer
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 #!/usr/bin/env python
""" PickleShare - a small 'shelve' like datastore with concurrency support
Bernardo B. Marques
remove all trailling spaces
r4872 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
shelve, many processes can access the database simultaneously. Changing a
value in database is immediately visible to other processes accessing the
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 same database.
Concurrency is possible because the values are stored in separate files. Hence
the "database" is a directory where *all* files are governed by PickleShare.
Example usage::
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 from pickleshare import *
db = PickleShareDB('~/testpickleshare')
db.clear()
print "Should be empty:",db.items()
db['hello'] = 15
db['aku ankka'] = [1,2,313]
db['paths/are/ok/key'] = [1,(5,46)]
print db.keys()
del db['aku ankka']
Bernardo B. Marques
remove all trailling spaces
r4872 This module is certainly not ZODB, but can be used for low-load
(non-mission-critical) situations where tiny code size trumps the
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 advanced features of a "real" object database.
Installation guide: easy_install pickleshare
Author: Ville Vainio <vivainio@gmail.com>
License: MIT open source license.
"""
vivainio
move path to external
r964 from IPython.external.path import path as Path
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 import os,stat,time
Thomas Kluyver
Make PickleShareDB inherit from collections.MutableMapping abc
r4754 import collections
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 import cPickle as pickle
import glob
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 def gethashfile(key):
return ("%02x" % abs(hash(key) % 256))[-2:]
vivainio
pickleshare compression
r731 _sentinel = object()
Thomas Kluyver
Make PickleShareDB inherit from collections.MutableMapping abc
r4754 class PickleShareDB(collections.MutableMapping):
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 """ The main 'connection' object for PickleShare database """
def __init__(self,root):
""" Return a db object that will manage the specied directory"""
self.root = Path(root).expanduser().abspath()
if not self.root.isdir():
self.root.makedirs()
# cache has { 'key' : (obj, orig_mod_time) }
self.cache = {}
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165
def __getitem__(self,key):
""" db['key'] reading """
fil = self.root / key
try:
mtime = (fil.stat()[stat.ST_MTIME])
except OSError:
raise KeyError(key)
if fil in self.cache and mtime == self.cache[fil][1]:
return self.cache[fil][0]
try:
# The cached item has expired, need to read
Fernando Perez
Read pickles in pure binary mode....
r5413 obj = pickle.loads(fil.open("rb").read())
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 except:
raise KeyError(key)
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 self.cache[fil] = (obj,mtime)
return obj
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 def __setitem__(self,key,value):
""" db['key'] = 5 """
fil = self.root / key
parent = fil.parent
if parent and not parent.isdir():
parent.makedirs()
Thomas Kluyver
Specify protocol 2 for pickleshare, so objects stored by Python 3 can be used in Python 2.
r5388 # We specify protocol 2, so that we can mostly go between Python 2
# and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
pickled = pickle.dump(value,fil.open('wb'), protocol=2)
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 try:
self.cache[fil] = (value,fil.mtime)
except OSError,e:
if e.errno != 2:
raise
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 def hset(self, hashroot, key, value):
vivainio
pickleshare compression
r731 """ hashed set """
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 hroot = self.root / hashroot
if not hroot.isdir():
hroot.makedirs()
hfile = hroot / gethashfile(key)
d = self.get(hfile, {})
d.update( {key : value})
Bernardo B. Marques
remove all trailling spaces
r4872 self[hfile] = d
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720
vivainio
pickleshare compression
r731 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
""" hashed get """
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 hroot = self.root / hashroot
hfile = hroot / gethashfile(key)
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare compression
r731 d = self.get(hfile, _sentinel )
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 #print "got dict",d,"from",hfile
vivainio
pickleshare compression
r731 if d is _sentinel:
if fast_only:
if default is _sentinel:
raise KeyError(key)
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare compression
r731 return default
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare compression
r731 # slow mode ok, works even after hcompress()
d = self.hdict(hashroot)
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 return d.get(key, default)
def hdict(self, hashroot):
vivainio
pickleshare compression
r731 """ Get all data contained in hashed category 'hashroot' as dict """
hfiles = self.keys(hashroot + "/*")
vivainio
rm extra print
r732 hfiles.sort()
vivainio
pickleshare compression
r731 last = len(hfiles) and hfiles[-1] or ''
if last.endswith('xx'):
vivainio
rm extra print
r732 # print "using xx"
vivainio
pickleshare compression
r731 hfiles = [last] + hfiles[:-1]
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 all = {}
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 for f in hfiles:
# print "using",f
vivainio
pickleshare: survive corrupt pickles
r813 try:
all.update(self[f])
except KeyError:
print "Corrupt",f,"deleted - hset is not threadsafe!"
del self[f]
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 self.uncache(f)
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 return all
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare compression
r731 def hcompress(self, hashroot):
""" Compress category 'hashroot', so hset is fast again
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare compression
r731 hget will fail if fast_only is True for compressed items (that were
hset before hcompress).
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare compression
r731 """
hfiles = self.keys(hashroot + "/*")
all = {}
for f in hfiles:
# print "using",f
all.update(self[f])
self.uncache(f)
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
pickleshare compression
r731 self[hashroot + '/xx'] = all
for f in hfiles:
p = self.root / f
if p.basename() == 'xx':
continue
p.remove()
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 def __delitem__(self,key):
""" del db["key"] """
fil = self.root / key
self.cache.pop(fil,None)
try:
fil.remove()
except OSError:
# notfound and permission denied are ok - we
# lost, the other process wins the conflict
pass
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 def _normalized(self, p):
""" Make a key suitable for user's eyes """
return str(self.root.relpathto(p)).replace('\\','/')
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 def keys(self, globpat = None):
""" All keys in DB, or all keys matching a glob"""
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 if globpat is None:
files = self.root.walkfiles()
else:
files = [Path(p) for p in glob.glob(self.root/globpat)]
return [self._normalized(p) for p in files if p.isfile()]
Bernardo B. Marques
remove all trailling spaces
r4872
Thomas Kluyver
Make PickleShareDB inherit from collections.MutableMapping abc
r4754 def __iter__(self):
return iter(keys)
Bernardo B. Marques
remove all trailling spaces
r4872
Thomas Kluyver
Make PickleShareDB inherit from collections.MutableMapping abc
r4754 def __len__(self):
return len(keys)
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165
def uncache(self,*items):
""" Removes all, or specified items from cache
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 Use this after reading a large amount of large objects
to free up memory, when you won't be needing the objects
for a while.
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 """
if not items:
self.cache = {}
for it in items:
self.cache.pop(it,None)
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 def waitget(self,key, maxwaittime = 60 ):
""" Wait (poll) for a key to get a value
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 Will wait for `maxwaittime` seconds before raising a KeyError.
The call exits normally if the `key` field in db gets a value
within the timeout period.
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 Use this for synchronizing different processes or for ensuring
Bernardo B. Marques
remove all trailling spaces
r4872 that an unfortunately timed "db['key'] = newvalue" operation
in another process (which causes all 'get' operation to cause a
KeyError for the duration of pickling) won't screw up your program
logic.
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 """
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
tries = 0
waited = 0
while 1:
try:
val = self[key]
return val
except KeyError:
pass
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 if waited > maxwaittime:
raise KeyError(key)
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 time.sleep(wtimes[tries])
waited+=wtimes[tries]
if tries < len(wtimes) -1:
tries+=1
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 def getlink(self,folder):
""" Get a convenient link for accessing items """
return PickleShareLink(self, folder)
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 def __repr__(self):
return "PickleShareDB('%s')" % self.root
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 class PickleShareLink:
""" A shortdand for accessing nested PickleShare data conveniently.
Created through PickleShareDB.getlink(), example::
lnk = db.getlink('myobjects/test')
lnk.foo = 2
lnk.bar = lnk.foo + 5
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 """
Bernardo B. Marques
remove all trailling spaces
r4872 def __init__(self, db, keydir ):
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 self.__dict__.update(locals())
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 def __getattr__(self,key):
return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
def __setattr__(self,key,val):
self.db[self.keydir+'/' + key] = val
def __repr__(self):
db = self.__dict__['db']
keys = db.keys( self.__dict__['keydir'] +"/*")
return "<PickleShareLink '%s': %s>" % (
self.__dict__['keydir'],
";".join([Path(k).basename() for k in keys]))
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 def test():
db = PickleShareDB('~/testpickleshare')
db.clear()
print "Should be empty:",db.items()
db['hello'] = 15
db['aku ankka'] = [1,2,313]
db['paths/nest/ok/keyname'] = [1,(5,46)]
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 db.hset('hash', 'aku', 12)
db.hset('hash', 'ankka', 313)
print "12 =",db.hget('hash','aku')
print "313 =",db.hget('hash','ankka')
print "all hashed",db.hdict('hash')
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 print db.keys()
print db.keys('paths/nest/ok/k*')
print dict(db) # snapsot of whole db
db.uncache() # frees memory, causes re-reads later
# shorthand for accessing deeply nested files
lnk = db.getlink('myobjects/test')
lnk.foo = 2
lnk.bar = lnk.foo + 5
print lnk.bar # 7
def stress():
db = PickleShareDB('~/fsdbtest')
import time,sys
for i in range(1000):
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 for j in range(1000):
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 if i % 15 == 0 and i < 200:
if str(j) in db:
del db[str(j)]
continue
if j%33 == 0:
time.sleep(0.02)
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
vivainio
pickleshare: hget, hset, hdict (for write efficient hash bucket file storage)
r720 db.hset('hash',j, db.hget('hash',j,15) + 1 )
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 print i,
sys.stdout.flush()
if i % 10 == 0:
db.uncache()
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 def main():
import textwrap
usage = textwrap.dedent("""\
Bernardo B. Marques
remove all trailling spaces
r4872 pickleshare - manage PickleShare databases
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 Usage:
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 pickleshare dump /path/to/db > dump.txt
pickleshare load /path/to/db < dump.txt
pickleshare test /path/to/db
""")
DB = PickleShareDB
import sys
if len(sys.argv) < 2:
print usage
return
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 cmd = sys.argv[1]
args = sys.argv[2:]
if cmd == 'dump':
if not args: args= ['.']
db = DB(args[0])
import pprint
pprint.pprint(db.items())
elif cmd == 'load':
cont = sys.stdin.read()
db = DB(args[0])
data = eval(cont)
db.clear()
for k,v in db.items():
db[k] = v
elif cmd == 'testwait':
db = DB(args[0])
db.clear()
print db.waitget('250')
elif cmd == 'test':
test()
stress()
Bernardo B. Marques
remove all trailling spaces
r4872
vivainio
Grand Persistence Overhaul, featuring PickleShare. startup...
r165 if __name__== "__main__":
main()
Bernardo B. Marques
remove all trailling spaces
r4872