Alien-Libjio

 view release on metacpan or  search on metacpan

libjio/tests/behaviour/tf.py  view on Meta::CPAN

import random
import struct
import libjio


# Useful constants, must match journal.h
DHS = 8		# disk header size
DOHS = 12	# disk op header size
DTS = 8		# disk trailer size


def tmppath():
	"""Returns a temporary path. We could use os.tmpnam() if it didn't
	print a warning, or os.tmpfile() if it allowed us to get its name.
	Since we just need a stupid name, we got our own function. Truly a
	shame. Yes, it's not safe; I know and I don't care."""
	tmpdir = os.environ.get('TMPDIR', '/tmp')
	now = time.time()
	now_s = str(int(now))
	now_f = str((now - int(now)) * 10000)
	now_str = "%s.%s" % (now_s[-5:], now_f[:now_f.find('.')])
	return tmpdir + '/jiotest.%s.%s' % (now_str, os.getpid())


def run_forked(f, *args, **kwargs):
	"""Runs the function in a different process."""
	sys.stdout.flush()
	pid = os.fork()
	if pid == 0:
		# child
		f(*args, **kwargs)
		sys.exit(0)
	else:
		# parent
		id, status = os.waitpid(pid, 0)
		if not os.WIFEXITED(status):
			raise RuntimeError, (id, status)

def forked(f):
	"Decorator that makes the function run in a different process."
	def newf(*args, **kwargs):
		run_forked(f, *args, **kwargs)
	return newf


def gencontent(size = 9377):
	"Generates random content."
	s = ''
	a = "%.20f" % random.random()
	while len(s) < size:
		s += a
	s = s[:size]
	return s

def content(path):
	"Returns the content of the given path."
	f = open(path)
	return f.read()


def biopen(path, mode = 'w+', jflags = 0):
	"Returns (open(path), libjio.open(path))."
	if 'r' in mode:
		flags = os.O_RDONLY
		if '+' in mode:
			flags = os.O_RDWR
	elif 'w' in mode:
		flags = os.O_RDWR
		if '+' in mode:
			flags = flags | os.O_CREAT | os.O_TRUNC
	elif 'a' in mode:
		flags = os.O_RDWR | os.O_APPEND
	else:
		raise RuntimeError

	return open(path, mode), libjio.open(path, flags, 0400, jflags)

def bitmp(mode = 'w+', jflags = 0):
	"Opens a temporary file with biopen()."
	path = tmppath()
	return biopen(path, mode, jflags)


def run_with_tmp(func, jflags = 0):
	"""Runs the given function, that takes a file and a jfile as
	parameters, using a temporary file. Returns the path of the temporary
	file. The function runs in a new process that exits afterwards."""
	f, jf = bitmp(jflags = jflags)
	run_forked(func, f, jf)
	return f.name


def jiodir(path):
	return os.path.dirname(path) + '/.' + os.path.basename(path) + '.jio'

def transpath(path, ntrans):
	jpath = jiodir(path)
	return jpath + '/' + str(ntrans)

def fsck(path, flags = 0):
	"Calls libjio's jfsck()."
	res = libjio.jfsck(path, flags = flags)
	return res

def fsck_verify(n, **kwargs):
	"""Runs fsck(n), and verifies that the fsck result matches the given
	values. The default is to match all elements except total to 0 (total
	is calculated automatically from the sum of the others). Raises an
	AssertionError if the given results were not the ones expected."""
	expected = {
		'invalid': 0,
		'broken': 0,
		'reapplied': 0,
		'corrupt': 0,
		'in_progress': 0,
	}
	expected.update(kwargs)
	expected['total'] = sum(expected.values())
	res = fsck(n, flags = libjio.J_CLEANUP)

	for k in expected:
		if k not in res:
			raise AssertionError, k + ' not in res'
		if res[k] != expected[k]:
			raise AssertionError, k + ' does not match: ' + \
					str(res)

def cleanup(path):
	"""Unlinks the path and its temporary libjio directory. The libjio
	directory must only have the 'lock' file in it."""
	os.unlink(path)
	jpath = jiodir(path)
	if os.path.isdir(jpath):
		assert 'lock' in os.listdir(jpath)
		os.unlink(jpath + '/lock')
		os.rmdir(jpath)


class attrdict (dict):
	def __getattr__(self, name):
		return self[name]

	def __setattr__(self, name, value):
		self[name] = value

	def __delattr__(self, name):
		del self[name]

class TransFile (object):
	def __init__(self, path = ''):
		self.ver = 1
		self.id = -1
		self.flags = 0
		self.numops = -1
		self.checksum = -1
		self.ops = []
		self.path = path
		if path:
			self.load()

	def load(self):
		fd = open(self.path)

		# header
		hdrfmt = "!HHI"
		self.ver, self.flags, self.id = struct.unpack(hdrfmt,
				fd.read(struct.calcsize(hdrfmt)))

		# operations (header only)
		opfmt = "!IQ"
		self.ops = []
		while True:
			tlen, offset = struct.unpack(opfmt,
					fd.read(struct.calcsize(opfmt)))
			if tlen == offset == 0:
				break
			payload = fd.read(tlen)
			assert len(payload) == tlen
			self.ops.append(attrdict(tlen = tlen, offset = offset,
				payload = payload))

		# trailer
		trailerfmt = "!II"
		self.numops, self.checksum = struct.unpack(trailerfmt,
				fd.read(struct.calcsize(trailerfmt)))

	def save(self):
		# the lack of integrity checking in this function is
		# intentional, so we can write broken transactions and see how
		# jfsck() copes with them
		fd = open(self.path, 'w')
		fd.write(struct.pack("!HHI", self.ver, self.flags, self.id))
		for o in self.ops:
			fd.write(struct.pack("!IQ", o.tlen, o.offset,))
			fd.write(o.payload)
		fd.write(struct.pack("!IQ", 0, 0))
		fd.write(struct.pack("!II", self.numops, self.checksum))

	def __repr__(self):
		return '<TransFile %s: id:%d f:%s n:%d ops:%s>' % \
			(self.path, self.id, hex(self.flags), self.numops,
					self.ops)


def gen_ret_seq(seq):
	"""Returns a function that each time it is called returns a value of
	the given sequence, in order. When the sequence is exhausted, returns
	the last value."""
	it = iter(seq)
	last = [0]
	def newf(*args, **kwargs):
		try:
			r = it.next()
			last[0] = r
			return r
		except StopIteration:
			return last[0]
	return newf

def autorun(module, specific_test = None):
	"Runs all the functions in the given module that begin with 'test'."
	for name in sorted(dir(module)):
		if not name.startswith('test'):
			continue

		obj = getattr(module, name)
		if '__call__' in dir(obj):
			name = name[len('test'):]
			if name.startswith('_'):
				name = name[1:]

			if specific_test and name != specific_test:
				continue

			desc = ''
			if obj.__doc__:
				desc = obj.__doc__
			print "%-10s %-60.60s" % (name, desc)
			obj()




( run in 0.500 second using v1.01-cache-2.11-cpan-acebb50784d )