Alien-Libjio

 view release on metacpan or  search on metacpan

libjio/tests/behaviour/tf.py  view on Meta::CPAN


"""
Our customized testing framework.

While not as sophisticated as the unittest module, it's targeted to our
particular kind of tests.

To that end, it has several simple but useful functions aimed to make tests
more easier to read and write.
"""

import sys
import os
import time
import random
import struct
import libjio


# Useful constants, must match journal.h
DHS = 8		# disk header size
DOHS = 12	# disk op header size
DTS = 8		# disk trailer size


def tmppath():
	"""Returns a temporary path. We could use os.tmpnam() if it didn't
	print a warning, or os.tmpfile() if it allowed us to get its name.
	Since we just need a stupid name, we got our own function. Truly a
	shame. Yes, it's not safe; I know and I don't care."""
	tmpdir = os.environ.get('TMPDIR', '/tmp')
	now = time.time()
	now_s = str(int(now))
	now_f = str((now - int(now)) * 10000)
	now_str = "%s.%s" % (now_s[-5:], now_f[:now_f.find('.')])
	return tmpdir + '/jiotest.%s.%s' % (now_str, os.getpid())


def run_forked(f, *args, **kwargs):
	"""Runs the function in a different process."""
	sys.stdout.flush()
	pid = os.fork()
	if pid == 0:
		# child
		f(*args, **kwargs)
		sys.exit(0)
	else:
		# parent
		id, status = os.waitpid(pid, 0)
		if not os.WIFEXITED(status):
			raise RuntimeError, (id, status)

def forked(f):
	"Decorator that makes the function run in a different process."
	def newf(*args, **kwargs):
		run_forked(f, *args, **kwargs)
	return newf


def gencontent(size = 9377):
	"Generates random content."
	s = ''
	a = "%.20f" % random.random()
	while len(s) < size:
		s += a
	s = s[:size]
	return s

def content(path):
	"Returns the content of the given path."
	f = open(path)
	return f.read()


def biopen(path, mode = 'w+', jflags = 0):
	"Returns (open(path), libjio.open(path))."
	if 'r' in mode:
		flags = os.O_RDONLY
		if '+' in mode:
			flags = os.O_RDWR
	elif 'w' in mode:
		flags = os.O_RDWR
		if '+' in mode:
			flags = flags | os.O_CREAT | os.O_TRUNC
	elif 'a' in mode:
		flags = os.O_RDWR | os.O_APPEND
	else:
		raise RuntimeError

	return open(path, mode), libjio.open(path, flags, 0400, jflags)

def bitmp(mode = 'w+', jflags = 0):
	"Opens a temporary file with biopen()."
	path = tmppath()
	return biopen(path, mode, jflags)


def run_with_tmp(func, jflags = 0):
	"""Runs the given function, that takes a file and a jfile as
	parameters, using a temporary file. Returns the path of the temporary
	file. The function runs in a new process that exits afterwards."""
	f, jf = bitmp(jflags = jflags)
	run_forked(func, f, jf)
	return f.name


def jiodir(path):
	return os.path.dirname(path) + '/.' + os.path.basename(path) + '.jio'



( run in 0.422 second using v1.01-cache-2.11-cpan-f6376fbd888 )