Alien-Libjio
view release on metacpan or search on metacpan
libjio/tests/behaviour/tf.py view on Meta::CPAN
"""
Our customized testing framework.
While not as sophisticated as the unittest module, it's targeted to our
particular kind of tests.
To that end, it has several simple but useful functions aimed to make tests
more easier to read and write.
"""
import sys
import os
import time
import random
import struct
import libjio
# Useful constants, must match journal.h
DHS = 8 # disk header size
DOHS = 12 # disk op header size
DTS = 8 # disk trailer size
def tmppath():
"""Returns a temporary path. We could use os.tmpnam() if it didn't
print a warning, or os.tmpfile() if it allowed us to get its name.
Since we just need a stupid name, we got our own function. Truly a
shame. Yes, it's not safe; I know and I don't care."""
tmpdir = os.environ.get('TMPDIR', '/tmp')
now = time.time()
now_s = str(int(now))
now_f = str((now - int(now)) * 10000)
now_str = "%s.%s" % (now_s[-5:], now_f[:now_f.find('.')])
return tmpdir + '/jiotest.%s.%s' % (now_str, os.getpid())
def run_forked(f, *args, **kwargs):
"""Runs the function in a different process."""
sys.stdout.flush()
pid = os.fork()
if pid == 0:
# child
f(*args, **kwargs)
sys.exit(0)
else:
# parent
id, status = os.waitpid(pid, 0)
if not os.WIFEXITED(status):
raise RuntimeError, (id, status)
def forked(f):
"Decorator that makes the function run in a different process."
def newf(*args, **kwargs):
run_forked(f, *args, **kwargs)
return newf
def gencontent(size = 9377):
"Generates random content."
s = ''
a = "%.20f" % random.random()
while len(s) < size:
s += a
s = s[:size]
return s
def content(path):
"Returns the content of the given path."
f = open(path)
return f.read()
def biopen(path, mode = 'w+', jflags = 0):
"Returns (open(path), libjio.open(path))."
if 'r' in mode:
flags = os.O_RDONLY
if '+' in mode:
flags = os.O_RDWR
elif 'w' in mode:
flags = os.O_RDWR
if '+' in mode:
flags = flags | os.O_CREAT | os.O_TRUNC
elif 'a' in mode:
flags = os.O_RDWR | os.O_APPEND
else:
raise RuntimeError
return open(path, mode), libjio.open(path, flags, 0400, jflags)
def bitmp(mode = 'w+', jflags = 0):
"Opens a temporary file with biopen()."
path = tmppath()
return biopen(path, mode, jflags)
def run_with_tmp(func, jflags = 0):
"""Runs the given function, that takes a file and a jfile as
parameters, using a temporary file. Returns the path of the temporary
file. The function runs in a new process that exits afterwards."""
f, jf = bitmp(jflags = jflags)
run_forked(func, f, jf)
return f.name
def jiodir(path):
return os.path.dirname(path) + '/.' + os.path.basename(path) + '.jio'
( run in 0.422 second using v1.01-cache-2.11-cpan-f6376fbd888 )