Alien-Libjio
view release on metacpan or search on metacpan
libjio/tests/behaviour/tf.py view on Meta::CPAN
def gencontent(size = 9377):
"Generates random content."
s = ''
a = "%.20f" % random.random()
while len(s) < size:
s += a
s = s[:size]
return s
def content(path):
"Returns the content of the given path."
f = open(path)
return f.read()
def biopen(path, mode = 'w+', jflags = 0):
"Returns (open(path), libjio.open(path))."
if 'r' in mode:
flags = os.O_RDONLY
if '+' in mode:
flags = os.O_RDWR
elif 'w' in mode:
flags = os.O_RDWR
if '+' in mode:
flags = flags | os.O_CREAT | os.O_TRUNC
elif 'a' in mode:
flags = os.O_RDWR | os.O_APPEND
else:
raise RuntimeError
return open(path, mode), libjio.open(path, flags, 0400, jflags)
def bitmp(mode = 'w+', jflags = 0):
"Opens a temporary file with biopen()."
path = tmppath()
return biopen(path, mode, jflags)
def run_with_tmp(func, jflags = 0):
"""Runs the given function, that takes a file and a jfile as
parameters, using a temporary file. Returns the path of the temporary
file. The function runs in a new process that exits afterwards."""
f, jf = bitmp(jflags = jflags)
run_forked(func, f, jf)
return f.name
def jiodir(path):
return os.path.dirname(path) + '/.' + os.path.basename(path) + '.jio'
def transpath(path, ntrans):
jpath = jiodir(path)
return jpath + '/' + str(ntrans)
def fsck(path, flags = 0):
"Calls libjio's jfsck()."
res = libjio.jfsck(path, flags = flags)
return res
def fsck_verify(n, **kwargs):
"""Runs fsck(n), and verifies that the fsck result matches the given
values. The default is to match all elements except total to 0 (total
is calculated automatically from the sum of the others). Raises an
AssertionError if the given results were not the ones expected."""
expected = {
'invalid': 0,
'broken': 0,
'reapplied': 0,
'corrupt': 0,
'in_progress': 0,
}
expected.update(kwargs)
expected['total'] = sum(expected.values())
res = fsck(n, flags = libjio.J_CLEANUP)
for k in expected:
if k not in res:
raise AssertionError, k + ' not in res'
if res[k] != expected[k]:
raise AssertionError, k + ' does not match: ' + \
str(res)
def cleanup(path):
"""Unlinks the path and its temporary libjio directory. The libjio
directory must only have the 'lock' file in it."""
os.unlink(path)
jpath = jiodir(path)
if os.path.isdir(jpath):
assert 'lock' in os.listdir(jpath)
os.unlink(jpath + '/lock')
os.rmdir(jpath)
class attrdict (dict):
def __getattr__(self, name):
return self[name]
def __setattr__(self, name, value):
self[name] = value
def __delattr__(self, name):
del self[name]
class TransFile (object):
def __init__(self, path = ''):
self.ver = 1
self.id = -1
self.flags = 0
self.numops = -1
self.checksum = -1
self.ops = []
self.path = path
if path:
self.load()
def load(self):
fd = open(self.path)
# header
hdrfmt = "!HHI"
( run in 0.812 second using v1.01-cache-2.11-cpan-f6376fbd888 )