Alien-Libjio

 view release on metacpan or  search on metacpan

libjio/tests/stress/jiostress  view on Meta::CPAN

#!/usr/bin/env python3

"""
This application is a stress tester for libjio. It's not a traditional stress
test like fsx (which can be used to test libjio using the preloading library),
but uses fault injection to check how the library behaves under random
failures.
"""

import sys
import os
import time
import select
import random
import fcntl
import traceback
from optparse import OptionParser
import libjio

try:
	import fiu
except ImportError:
	print()
	print("Error: unable to load fiu module. This test needs libfiu")
	print("support. Please install libfiu and recompile libjio with FI=1.")
	print()
	raise

#
# Auxiliary stuff
#

gbcount = 0
def getbytes(n):
	global gbcount
	gbcount = (gbcount + 1) % 10
	return bytes(str(gbcount) * n, 'ascii')

def randfrange(maxend, maxsize):
	start = random.randint(0, maxend - 1)
	size = random.randint(0, (maxend - 1) - start) % maxsize
	return start, start + size

def randfloat(min, max):
	return min + random.random() % (max - min)

class ConsistencyError (Exception):
	pass

def jfsck(fname, cleanup = False):
	flags = 0
	if cleanup:
		flags = libjio.J_CLEANUP

	try:
		r = libjio.jfsck(fname, flags = flags)
		return r
	except IOError as e:
		if e.args[0] == libjio.J_ENOJOURNAL:
			return { 'total': 0 }
		else:
			raise

def comp_cont(bytes):
	"'aaaabbcc' -> [ ('a', 4), ('b', 2), ('c', 2) ]"
	l = []
	prev = bytes[0]
	c = 1
	for b in bytes[1:]:
		if (b == prev):
			c += 1
			continue

		l.append((prev, c))
		prev = b
		c = 1
	l.append((b, c))
	return l

def pread(fd, start, end):
	ppos = fd.tell()
	fd.seek(start, 0)
	r = bytes()
	c = 0
	total = end - start
	while c < total:
		n = fd.read(total - c)
		if (n == ''):
			break
		c += len(n)
		r += n
	fd.seek(ppos, 0)
	assert c == end - start
	return r

#
# Output handler, used to get a nice output when using multiple processes
#

class OutputHandler:
	def __init__(self, every):
		# fds to read from
		self.rs = []

		# we will report every this number of seconds
		self.every = every

		# how many transactions has each child processed; we use the
		# read end of the pipe to identify them
		self.ntrans = {}

		# like self.ntrans but counts only the failed ones
		self.nfailures = {}

		# fd to write to, only relevant in the child
		self.w = None

libjio/tests/stress/jiostress  view on Meta::CPAN

			print("-" * 50)
			raise

class T_readwrite (T_writeonly):
	def __init__(self, f, jf, fsize, lockmgr, do_verify):
		T_writeonly.__init__(self, f, jf, fsize, lockmgr, do_verify)
		self.read_ranges = []

	def prepare(self):
		for r in self.ranges:
			if random.choice((True, False)):
				r.prepare_w(self.f)
			else:
				r.prepare_r()

	def apply(self):
		t = self.jf.new_trans()
		for r in self.ranges:
			if r.type == 'r':
				t.add_r(r.read_buf, r.start)
			else:
				t.add_w(r.new_data, r.start)
		t.commit()

	def verify(self, write_only = False):
		if not self.do_verify:
			return

		try:
			for r in self.ranges:
				if write_only and r.type == 'r':
					continue
				r.verify(self.f)
		except ConsistencyError:
			# show context on errors
			print("-" * 50)
			for r in self.ranges:
				r.show(self.f)
			print("-" * 50)
			raise

t_list = [T_jwrite, T_writeonly, T_readwrite]


#
# The test itself
#

class Stresser:
	def __init__(self, fname, fsize, nops, use_fi, use_as, output,
			lockmgr, do_verify):
		self.fname = fname
		self.fsize = fsize
		self.nops = nops
		self.use_fi = use_fi
		self.use_as = use_as
		self.output = output
		self.lockmgr = lockmgr
		self.do_verify = do_verify

		jflags = 0
		if use_as:
			jflags = libjio.J_LINGER

		self.jf = libjio.open(fname, libjio.O_RDWR | libjio.O_CREAT,
				0o600, jflags)
		self.f = open(fname, mode = 'rb')

		self.jf.truncate(fsize)

		if use_as:
			self.jf.autosync_start(5, 10 * 1024 * 1024)

	def apply(self, trans):
		trans.prepare()
		trans.apply()
		trans.verify()
		return True

	def apply_fork(self, trans):
		# do the prep before the fork so we can verify() afterwards
		trans.prepare()

		sys.stdout.flush()
		pid = os.fork()
		if pid == 0:
			# child
			try:
				self.fiu_enable()
				trans.apply()
				self.fiu_disable()
			except (IOError, MemoryError):
				try:
					self.reopen(trans)
				except (IOError, MemoryError):
					pass
				except:
					self.fiu_disable()
					traceback.print_exc()
				self.fiu_disable()
				sys.exit(1)
			except MemoryError:
				self.fiu_disable()
				sys.exit(1)
			except:
				self.fiu_disable()
				traceback.print_exc()
				sys.exit(1)
			trans.verify()
			sys.exit(0)
		else:
			# parent
			id, status = os.waitpid(pid, 0)
			if not os.WIFEXITED(status):
				i = (status,
					os.WIFSIGNALED(status),
					os.WTERMSIG(status))
				raise RuntimeError(i)

			if os.WEXITSTATUS(status) != 0:
				return False
			return True

	def reopen(self, trans):
		self.jf = None
		r = jfsck(self.fname)



( run in 4.170 seconds using v1.01-cache-2.11-cpan-524268b4103 )