Alien-Libjio

 view release on metacpan or  search on metacpan

lib/Alien/Libjio.pm  view on Meta::CPAN

committing them to disk. That way, if a transaction were to fail due to a
system crash or other unexpected event, the write ahead log could be used to
finish writing the data.

While this functionality is often available with networked databases, it can
be a rather memory- and processor-intensive solution, even where reliable
writes are important. In other cases, the filesystem does not provide native
journalling support, so other tricks may be used to ensure data integrity,
such as writing to a separate temporary file and then overwriting the file
instead of modifying it in-place. Unfortunately, this method cannot handle
threaded operations appropriately.

Thankfully, Alberto Bertogli published a userspace C library called libjio
that can provide these features in a small (less than 1500 lines of code)
library with no external dependencies.

This package is designed to install it, and provide a way to get the flags
necessary to compile programs using it. It is particularly useful for Perl XS
programs that use it, such as B<IO::Journal>.

=head1 METHODS

libjio/bindings/preload/libjio_preload.c  view on Meta::CPAN

#define _FILE_OFFSET_BITS 64
#define off_t off64_t
#include <libjio.h>
#undef off_t
#undef _FILE_OFFSET_BITS


/* maximum number of simultaneous open file descriptors we support */
#define MAXFD (4096 * 2)

/* recursion counter, per-thread */
static int __thread called = 0;


/* C library functions, filled via the dynamic loader */
static void *libc;

static int (*c_open)(const char *pathname, int flags, mode_t mode);
static int (*c_open64)(const char *pathname, int flags, mode_t mode);
static int (*c_close)(int fd);
static int (*c_unlink)(const char *pathname);
static ssize_t (*c_read)(int fd, void *buf, size_t count);

libjio/bindings/preload/libjio_preload.c  view on Meta::CPAN

static int (*c_fsync)(int fd);
static int (*c_dup)(int oldfd);
static int (*c_dup2)(int oldfd, int newfd);


/* file descriptor table, to translate fds to jfs */
struct fd_entry {
	int fd;
	unsigned int *refcount;
	jfs_t *fs;
	pthread_mutex_t lock;
};
static struct fd_entry fd_table[MAXFD];

/* useful macros, mostly for debugging purposes */
#if 1
	#define rec_inc() do { called++; } while(0)
	#define rec_dec() do { called--; } while(0)
	#define printd(...) do { } while(0)

#else

libjio/bindings/preload/libjio_preload.c  view on Meta::CPAN

 * catch out of bounds accesses */
static inline int fd_lock(int fd)
{
	int r;

	if (fd < 0 || fd >= MAXFD) {
		printd("locking out of bounds fd %d\n", fd);
		return 0;
	}
	//printd("L %d\n", fd);
	r = pthread_mutex_lock(&(fd_table[fd].lock));
	//printd("OK %d\n", fd);
	return !r;
}

static inline int fd_unlock(int fd)
{
	int r;

	if (fd < 0 || fd >= MAXFD) {
		printd("unlocking out of bounds fd %d\n", fd);
		return 0;
	}
	//printd("U %d\n", fd);
	r = pthread_mutex_unlock(&(fd_table[fd].lock));
	//printd("OK %d\n", fd);
	return !r;
}


/*
 * library intialization
 */

static int __attribute__((constructor)) init(void)
{
	int i;
	pthread_mutexattr_t attr;

	printd("starting\n");

	/* initialize fd_table */
	pthread_mutexattr_init(&attr);
	pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
	for (i = 0; i < MAXFD; i++) {
		fd_table[i].fd = -1;
		fd_table[i].fs = NULL;
		pthread_mutex_init(&(fd_table[i].lock), &attr);
	}
	pthread_mutexattr_destroy(&attr);

	/* dynamically load the C library */
	libc = dlopen("libc.so.6", RTLD_NOW);
	if (libc == NULL) {
		printd("Error loading libc: %s\n", dlerror());
		return 0;
	}

	/* load symbols from the C library */
	#define libc_load(F) c_##F = dlsym(libc, #F)

libjio/bindings/python/libjio.c  view on Meta::CPAN

	if (rv != 0)
		return PyErr_SetFromErrno(PyExc_IOError);

	return PyLong_FromLong(rv);
}

/* jfs_autosync_start() */
PyDoc_STRVAR(jf_autosync_start__doc,
"autosync_start(max_sec, max_bytes)\n\
\n\
Starts the automatic sync thread (only useful when using lingering\n\
transactions).\n");

static PyObject *jf_autosync_start(jfile_object *fp, PyObject *args)
{
	int rv;
	unsigned int max_sec, max_bytes;

	if (!PyArg_ParseTuple(args, "II:autosync_start", &max_sec,
				&max_bytes))
		return NULL;

libjio/bindings/python/libjio.c  view on Meta::CPAN

	if (rv != 0)
		return PyErr_SetFromErrno(PyExc_IOError);

	return PyLong_FromLong(rv);
}

/* jfs_autosync_stop() */
PyDoc_STRVAR(jf_autosync_stop__doc,
"autosync_stop()\n\
\n\
Stops the automatic sync thread started by autosync_start()\n");

static PyObject *jf_autosync_stop(jfile_object *fp, PyObject *args)
{
	int rv;

	if (!PyArg_ParseTuple(args, ":autosync_stop"))
		return NULL;

	Py_BEGIN_ALLOW_THREADS
	rv = jfs_autosync_stop(fp->fs);

libjio/doc/guide.rst  view on Meta::CPAN

UNIX-alike API
--------------

There is a set of functions that emulate the UNIX API (*read()*, *write()*,
and so on) which make each operation a transaction. This can be useful if you
don't need to have the full power of the transactions but only to provide
guarantees between the different functions. They are a lot like the normal
UNIX functions, but instead of getting a file descriptor as their first
parameter they get a file structure. You can check out the manual page to see
the details, but they work just like their UNIX version, only that they
preserve atomicity and thread-safety within each call.

In particular, the group of functions related to reading (which was described
above in `Basic operation`_) are extremely useful because they take care of
the locking needed for the library proper behaviour. You should use them
instead of the regular calls.

The full function list is available on the man page and I won't reproduce it
here; however the naming is quite simple: just prepend a 'j' to all the names:
*jread()*, *jwrite()*, etc.


Processes, threads and locking
------------------------------

The library is completely safe to use in multi-process and/or multi-thread
applications, as long as you abide by the following rules:

 - Within a process, a file must not be held open at the same time more than
   once, due to *fcntl()* locking limitations. Opening, closing and then
   opening again is safe.
 - *jclose()* must only be called when there are no other I/O operations in
   progress.
 - *jfsck()* must only be called when the file is known **not** to be open by
   any process.
 - *jmove_journal()* must only be called when the file is known **not** to be

libjio/doc/guide.rst  view on Meta::CPAN

considerations.


Lingering transactions
----------------------

If you need to increase performance, you can use lingering transactions. In
this mode, transactions take up more disk space but allows you to do the
synchronous write only once, making commits much faster. To use them, just add
*J_LINGER* to the *jflags* parameter in *jopen()*. You should call *jsync()*
frequently to avoid using up too much space, or start an asynchronous thread
that calls *jsync()* automatically using *jfs_autosync_start()*. Note that
files opened with this mode must not be opened by more than one process at the
same time.


Disk layout
-----------

The library creates a single directory for each file opened, named after it.
So if we open a file *output*, a directory named *.output.jio* will be

libjio/doc/libjio.rst  view on Meta::CPAN

the journal, so there is really nothing left to be done. So if the transaction
is complete, we only need to rollback.


UNIX-alike API
--------------

We call UNIX-alike API to the functions provided by the library that emulate
the good old UNIX file manipulation calls. Most of them are just wrappers
around commits, and implement proper locking when operating in order to allow
simultaneous operations (either across threads or processes). They are
described in detail in the manual pages, we'll only list them here for
completion:

 - jopen()
 - jread(), jpread(), jreadv()
 - jwrite(), jpwrite(), jwritev()
 - jtruncate()
 - jclose()


libjio/doc/tids.rst  view on Meta::CPAN

Transaction ID assignment procedure
===================================

This brief document describes how libjio assigns an unique number to each
transaction that identifies it univocally during its lifetime.

It is a very delicate issue, because the rest of the library depends on the
uniqueness of the ID. An ID has to be coherent across threads and procesess,
and choosing one it can't take long: it serializes transaction creation (and
it's the only contention point for independent non-overlapping transactions).


Description
-----------

We have two functions: *get_tid()* and *free_tid()*, which respectively return
a new transaction ID, and mark a given transaction ID as no longer in use.

libjio/doc/tids.rst  view on Meta::CPAN


Things to notice
----------------

The following is a list of small things to notice about the mechanism. They're
useful because races tend to be subtle, and I *will* forget about them. The
descriptions are not really detailed, just enough to give a general idea.

 - It is possible that we get in *free_tid()* and the transaction we want to
   free is greater than the max tid. In that case, we do nothing: it's a valid
   situation. How to get there: two threads about to free two tids. The first
   one calls *unlink()* and just after its return (before it gets a chance to
   call *free_tid()*), another thread, the holder of the current max, steps in
   and performs both the *unlink()* and *free_tid()*, which would force a
   lookup to find a new tid, and as in the first thread we have removed the
   file, the max tid could be lower (in particular, it could be 0). This is
   why we only test for equalty.
 - Unlink after *free_tid()* is not desirable: in that case, it'd be normal
   for the tid to increment even if we have only one thread writing. It
   overflows quite easily.
 - The fact that new tids are always bigger than the current max is not only
   because the code is cleaner and faster: that way when recovering we know
   the order to apply transactions. A nice catch: this doesn't matter if we're
   working with non-overlapping transactions, but if they overlap, we know
   that it's impossible that transaction A and B (B gets committed after A)
   get applied in the wrong order, because B will only begin to commit *after*
   A has been worked on.

libjio/libjio/Makefile  view on Meta::CPAN

MANDATORY_LDFLAGS := $(shell getconf LFS_LIBS 2>/dev/null)

ALL_CFLAGS += $(CFLAGS) $(MANDATORY_CFLAGS) -fPIC
ALL_LDFLAGS += $(LDFLAGS) $(MANDATORY_LDFLAGS) -fPIC


# some platforms do not have librt, we only use it if available
NEED_LIBRT := $(shell ld -o rtcheck.so -shared -lrt 2>/dev/null && echo -lrt; \
	rm -f rtcheck.so)

LIBS = -lpthread $(NEED_LIBRT)


# shorthands for common build variants

ifdef DEBUG
ALL_CFLAGS += -g
ALL_LDFLAGS += -g
endif

ifdef PROFILE

libjio/libjio/ansi.c  view on Meta::CPAN

 * ANSI C API wrappers
 */

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>

#include "libjio.h"
#include "common.h"
#include "trans.h"


/*
 * To avoid completely useless code duplication, this functions rely on the
 * UNIX wrappers from unix.c.
 *

libjio/libjio/ansi.c  view on Meta::CPAN

int jfeof(struct jfs *stream)
{
	/* ANSI expects that when an EOF is reached in any operation (like
	 * fread() or fwrite()) some internal flag is set, and this function
	 * can be used to check if it is set or unset.
	 * As we don't do that (it's pointless for this kind of I/O), this
	 * just checks if the file pointer is at the end of the file */

	off_t curpos, endpos;

	pthread_mutex_lock(&(stream->lock));

	curpos = lseek(jfileno(stream), 0, SEEK_CUR);
	endpos = lseek(jfileno(stream), 0, SEEK_END);

	lseek(jfileno(stream), curpos, SEEK_SET);

	pthread_mutex_unlock(&(stream->lock));

	if (curpos >= endpos)
		return 1;
	else
		return 0;
}

/* clearerr() wrapper */
void jclearerr(struct jfs *stream)
{

libjio/libjio/ansi.c  view on Meta::CPAN

	/* The same as the above; however not returning this might have some
	 * side effects on very subtle programs relying on this behaviour */
	return 0;
}

/* fseek() wrapper */
int jfseek(struct jfs *stream, long offset, int whence)
{
	off_t pos;

	pthread_mutex_lock(&(stream->lock));
	pos = lseek(stream->fd, offset, whence);
	pthread_mutex_unlock(&(stream->lock));

	/* fseek returns 0 on success, -1 on error */
	if (pos == -1)
		return 1;

	return 0;
}

/* ftell() wrapper */
long jftell(struct jfs *stream)

libjio/libjio/autosync.c  view on Meta::CPAN


/*
 * Autosync API
 */

#include <pthread.h>	/* pthread_* */
#include <errno.h>	/* ETIMEDOUT */
#include <signal.h>	/* sig_atomic_t */
#include <stdlib.h>	/* malloc() and friends */
#include <time.h>	/* clock_gettime() */

#include "common.h"
#include "libjio.h"
#include "compat.h"


/** Configuration of an autosync thread */
struct autosync_cfg {
	/** File structure to jsync() */
	struct jfs *fs;

	/** Thread id */
	pthread_t tid;

	/** Max number of seconds between each jsync() */
	time_t max_sec;

	/** Max number of bytes written between each jsync() */
	size_t max_bytes;

	/** When the thread must die, we set this to 1 */
	sig_atomic_t must_die;

	/** Condition variable to wake up the thread */
	pthread_cond_t cond;

	/** Mutex to use for the condition variable */
	pthread_mutex_t mutex;
};

/** Thread that performs the automatic syncing */
static void *autosync_thread(void *arg)
{
	int rv;
	void *had_errors;
	struct timespec ts;
	struct autosync_cfg *cfg;

	cfg = (struct autosync_cfg *) arg;

	/* had_errors is a void * just to avoid weird casts, since we want to
	 * return it, but it's used as a boolean */
	had_errors = (void *) 0;

	pthread_mutex_lock(&cfg->mutex);
	for (;;) {
		clock_gettime(CLOCK_REALTIME, &ts);
		ts.tv_sec += cfg->max_sec;

		rv = pthread_cond_timedwait(&cfg->cond, &cfg->mutex, &ts);
		if (rv != 0 && rv != ETIMEDOUT)
			break;

		if (cfg->must_die)
			break;

		/* cover from spurious wakeups */
		if (rv != ETIMEDOUT && cfg->fs->ltrans_len < cfg->max_bytes)
			continue;

		rv = jsync(cfg->fs);
		if (rv != 0)
			had_errors = (void *) 1;

	}
	pthread_mutex_unlock(&cfg->mutex);

	pthread_exit(had_errors);
	return NULL;
}

/* Starts the autosync thread, which will perform a jsync() every max_sec
 * seconds, or every max_bytes written using lingering transactions. */
int jfs_autosync_start(struct jfs *fs, time_t max_sec, size_t max_bytes)
{
	struct autosync_cfg *cfg;

	if (fs->as_cfg != NULL)
		return -1;

	cfg = malloc(sizeof(struct autosync_cfg));
	if (cfg == NULL)
		return -1;

	cfg->fs = fs;
	cfg->max_sec = max_sec;
	cfg->max_bytes = max_bytes;
	cfg->must_die = 0;
	pthread_cond_init(&cfg->cond, NULL);
	pthread_mutex_init(&cfg->mutex, NULL);

	fs->as_cfg = cfg;

	return pthread_create(&cfg->tid, NULL, &autosync_thread, cfg);
}

/* Stops the autosync thread started by jfs_autosync_start(). It's
 * automatically called in jclose(). */
int jfs_autosync_stop(struct jfs *fs)
{
	int rv = 0;
	void *had_errors;

	if (fs->as_cfg == NULL)
		return 0;

	fs->as_cfg->must_die = 1;
	pthread_cond_signal(&fs->as_cfg->cond);
	pthread_join(fs->as_cfg->tid, &had_errors);

	if (had_errors)
		rv = -1;

	pthread_cond_destroy(&fs->as_cfg->cond);
	pthread_mutex_destroy(&fs->as_cfg->mutex);
	free(fs->as_cfg);
	fs->as_cfg = NULL;

	return rv;
}

/** Notify the autosync thread that it should check the number of bytes
 * written. Must be called with fs' ltlock held. */
void autosync_check(struct jfs *fs)
{
	if (fs->as_cfg == NULL)
		return;

	if (fs->ltrans_len > fs->as_cfg->max_bytes)
		pthread_cond_signal(&fs->as_cfg->cond);
}

libjio/libjio/check.c  view on Meta::CPAN

		if (map != NULL)
			munmap(map, filelen);

		while (curts->op != NULL) {
			tmpop = curts->op->next;
			if (curts->op->pdata)
				free(curts->op->pdata);
			free(curts->op);
			curts->op = tmpop;
		}
		pthread_mutex_destroy(&(curts->lock));
		free(curts);

		res->total++;
	}

	if (flags & J_CLEANUP) {
		if (jfsck_cleanup(name, fs.jdir) < 0) {
			ret = J_ECLEANUP;
		}
	}

libjio/libjio/common.h  view on Meta::CPAN

/*
 * Header for internal functions and definitions
 */

#ifndef _COMMON_H
#define _COMMON_H

#include <sys/types.h>	/* for ssize_t and off_t */
#include <stdint.h>	/* for uint*_t */
#include <sys/uio.h>	/* for struct iovec */
#include <pthread.h>	/* pthread_mutex_t */

#include "fiu-local.h"	/* for fault injection functions */

#define _F_READ		0x00001
#define _F_WRITE	0x00010
#define _F_LOCK		0x00100
#define _F_TLOCK	0x01000
#define _F_ULOCK	0x10000

#define F_LOCKR		(_F_LOCK | _F_READ)

libjio/libjio/common.h  view on Meta::CPAN

	/** Flags passed to the real open() */
	uint32_t open_flags;

	/** Lingering transactions (linked list) */
	struct jlinger *ltrans;

	/** Length of all the lingered transactions */
	size_t ltrans_len;

	/** Lingering transactions' lock */
	pthread_mutex_t ltlock;

	/** A soft lock used in some operations */
	pthread_mutex_t lock;

	/** Autosync config */
	struct autosync_cfg *as_cfg;
};


off_t plockf(int fd, int cmd, off_t offset, off_t len);
ssize_t spread(int fd, void *buf, size_t count, off_t offset);
ssize_t spwrite(int fd, const void *buf, size_t count, off_t offset);
ssize_t swritev(int fd, struct iovec *iov, int iovcnt);

libjio/libjio/libjio.3  view on Meta::CPAN

.I jfs_t
instead of a file descriptor; take a look at their manpages if you have any
doubts about how to use them.

.B jmove_journal()
can be used to move the journal directory to a new location. It can be called
only when nobody else is using the file. It is usually not used, except for
very special cases.

.B jfs_autosync_start()
can be used to start a thread which will automatically perform a
.B jsync()
after the given number of seconds or the given number of bytes written using
lingering transactions (whatever comes first). It's very useful when using
lingering transactions.
.B jfs_autosync_stop()
stops the thread started by
.BR jfs_autosync_start() .
The thread is also stopped automatically when
.B jclose()
is called.

.B jfsck()
takes as the first two parameters the path to the file to check and the path
to the journal directory (usually NULL for the default, unless you've changed
it manually using
.BR jmove_journal() ),
and optionally a flags parameter, which can be 0 for the default behaviour, or
J_CLEANUP to indicate that the journal should be cleaned up after successful

libjio/libjio/libjio.h  view on Meta::CPAN

 * Alberto Bertogli (albertito@blitiri.com.ar)
 */

#ifndef _LIBJIO_H
#define _LIBJIO_H

#include <stdint.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <pthread.h>
#include <unistd.h>

/* Check if we're using Large File Support - otherwise refuse to build.
 * Otherwise, we would allow applications not using LFS to link with the
 * library (which uses LFS) and that's just begging for problems. There should
 * be a portable way for the C library to do some of this for us, but until I
 * find one, this is the best we can do */
#ifndef _LARGEFILE_SOURCE
#error "You must compile your application with Large File Support"
#endif

libjio/libjio/libjio.h  view on Meta::CPAN

 * @see jclose(), open()
 * @ingroup basic
 */
jfs_t *jopen(const char *name, int flags, int mode, unsigned int jflags);

/** Close a file opened with jopen().
 *
 * After a call to this function, the memory allocated for the open file will
 * be freed.
 *
 * If there was an autosync thread started for this file, it will be stopped.
 *
 * @param fs open file
 * @returns 0 on success, -1 on error
 * @see jopen(), jfs_autosync_start()
 * @ingroup basic
 */
int jclose(jfs_t *fs);

/** Sync a file. Makes sense only when using lingering transactions.
 *

libjio/libjio/libjio.h  view on Meta::CPAN

/** Free a transaction structure.
 *
 * @param ts transaction to free
 * @see jtrans_new()
 * @ingroup basic
 */
void jtrans_free(jtrans_t *ts);

/** Change the location of the journal directory.
 *
 * The file MUST NOT be in use by any other thread or process. The older
 * journal directory will be removed.
 *
 * @param fs open file
 * @param newpath path to the new journal directory, which will be created if
 * 	it doesn't exist
 * @returns 0 on success, -1 on error
 * @ingroup basic
 */
int jmove_journal(jfs_t *fs, const char *newpath);


/*
 * Autosync
 */

/** Start an autosync thread.
 *
 * The thread will call jsync(fs) every max_sec seconds, or every max_bytes
 * have been written. Only one autosync thread per open file is allowed.
 *
 * @param fs open file
 * @param max_sec maximum number of seconds that should pass between each
 * 	call to jsync()
 * @param max_bytes maximum number of bytes that should be written between
 *	each call to jsync()
 * @returns 0 on success, -1 on error
 * @ingroup basic
 */
int jfs_autosync_start(jfs_t *fs, time_t max_sec, size_t max_bytes);

/** Stop an autosync thread that was started using jfs_autosync_start(fs).
 * 
 * @param fs open file
 * @returns 0 on success, -1 on error
 * @ingroup basic
 */
int jfs_autosync_stop(jfs_t *fs);


/*
 * Journal checker
 */

/** Check and repair the given path.
 *
 * The file MUST NOT be in use by any other thread or process. This
 * requirement will be lifted in future releases.
 *
 * @param name path to the file to check
 * @param jdir journal directory of the given file, use NULL for the default
 * @param res structure where to store the result
 * @param flags flags that change the checking behaviour, currently only
 *	J_CLEANUP is supported, which removes the journal directory after a
 *	successful recovery
 * @see struct jfsck_result
 * @returns 0 on success, < 0 on error, with the following possible negative

libjio/libjio/trans.c  view on Meta::CPAN

#include "trans.h"


/*
 * Transaction functions
 */

/* Initialize a transaction structure */
struct jtrans *jtrans_new(struct jfs *fs, unsigned int flags)
{
	pthread_mutexattr_t attr;
	struct jtrans *ts;

	ts = malloc(sizeof(struct jtrans));
	if (ts == NULL)
		return NULL;

	ts->fs = fs;
	ts->id = 0;
	ts->flags = fs->flags | flags;
	ts->op = NULL;
	ts->numops_r = 0;
	ts->numops_w = 0;
	ts->len_w = 0;

	pthread_mutexattr_init(&attr);
	pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
	pthread_mutex_init(&(ts->lock), &attr);
	pthread_mutexattr_destroy(&attr);

	return ts;
}

/* Free the contents of a transaction structure */
void jtrans_free(struct jtrans *ts)
{
	struct operation *tmpop;

	ts->fs = NULL;

libjio/libjio/trans.c  view on Meta::CPAN

		tmpop = ts->op->next;

		if (ts->op->buf && ts->op->direction == D_WRITE)
			free(ts->op->buf);
		if (ts->op->pdata)
			free(ts->op->pdata);
		free(ts->op);

		ts->op = tmpop;
	}
	pthread_mutex_destroy(&(ts->lock));

	free(ts);
}

/** Lock/unlock the ranges of the file covered by the transaction. mode must
 * be either F_LOCKW or F_UNLOCK. Returns 0 on success, -1 on error. */
static int lock_file_ranges(struct jtrans *ts, int mode)
{
	unsigned int nops;
	off_t lr, min_offset;

libjio/libjio/trans.c  view on Meta::CPAN

}

/** Common function to add an operation to a transaction */
static int jtrans_add_common(struct jtrans *ts, const void *buf, size_t count,
		off_t offset, enum op_direction direction)
{
	struct operation *op, *tmpop;

	op = tmpop = NULL;

	pthread_mutex_lock(&(ts->lock));

	/* Writes are not allowed in read-only mode, they fail early */
	if ((ts->flags & J_RDONLY) && direction == D_WRITE)
		goto error;

	if (count == 0)
		goto error;

	if ((long long) ts->len_w + count > MAX_TSIZE)
		goto error;

libjio/libjio/trans.c  view on Meta::CPAN

	if (ts->op == NULL) {
		ts->op = op;
		op->prev = NULL;
	} else {
		for (tmpop = ts->op; tmpop->next != NULL; tmpop = tmpop->next)
			;
		tmpop->next = op;
		op->prev = tmpop;
	}

	pthread_mutex_unlock(&(ts->lock));

	op->len = count;
	op->offset = offset;
	op->plen = 0;
	op->pdata = NULL;
	op->locked = 0;
	op->direction = direction;

	if (direction == D_WRITE) {
		memcpy(op->buf, buf, count);

libjio/libjio/trans.c  view on Meta::CPAN

		/* if there are no overlapping writes, jtrans_commit() will
		 * want to read the data from the disk; and if there are we
		 * will already have submitted a request and one more won't
		 * hurt */
		posix_fadvise(ts->fs->fd, offset, count, POSIX_FADV_WILLNEED);
	}

	return 0;

error:
	pthread_mutex_unlock(&(ts->lock));

	if (op && direction == D_WRITE)
		free(op->buf);
	free(op);

	return -1;
}

int jtrans_add_r(struct jtrans *ts, void *buf, size_t count, off_t offset)
{

libjio/libjio/trans.c  view on Meta::CPAN


/* Commit a transaction */
ssize_t jtrans_commit(struct jtrans *ts)
{
	ssize_t r, retval = -1;
	struct operation *op;
	struct jlinger *linger;
	jop_t *jop = NULL;
	size_t written = 0;

	pthread_mutex_lock(&(ts->lock));

	/* clear the flags */
	ts->flags = ts->flags & ~J_COMMITTED;
	ts->flags = ts->flags & ~J_ROLLBACKED;

	if (ts->numops_r + ts->numops_w == 0)
		goto exit;

	/* fail for read-only accesses if we have write operations */
	if (ts->numops_w && (ts->flags & J_RDONLY))

libjio/libjio/trans.c  view on Meta::CPAN

	if (jop && (ts->flags & J_LINGER)) {
		struct jlinger *lp;

		linger = malloc(sizeof(struct jlinger));
		if (linger == NULL)
			goto rollback_exit;

		linger->jop = jop;
		linger->next = NULL;

		pthread_mutex_lock(&(ts->fs->ltlock));

		/* add it to the end of the list so they're in order */
		if (ts->fs->ltrans == NULL) {
			ts->fs->ltrans = linger;
		} else {
			lp = ts->fs->ltrans;
			while (lp->next != NULL)
				lp = lp->next;
			lp->next = linger;
		}

		ts->fs->ltrans_len += written;
		autosync_check(ts->fs);

		pthread_mutex_unlock(&(ts->fs->ltlock));

		/* Leave the journal_free() up to jsync() */
		jop = NULL;
	} else if (jop) {
		if (have_sync_range) {
			for (op = ts->op; op != NULL; op = op->next) {
				if (op->direction == D_READ)
					continue;

				r = sync_range_wait(ts->fs->fd, op->len,

libjio/libjio/trans.c  view on Meta::CPAN

		jop = NULL;
	}

unlock_exit:
	/* always unlock everything at the end; otherwise we could have
	 * half-overlapping transactions applying simultaneously, and if
	 * anything goes wrong it would be possible to break consistency */
	lock_file_ranges(ts, F_UNLOCK);

exit:
	pthread_mutex_unlock(&(ts->lock));

	return retval;
}

/* Rollback a transaction */
ssize_t jtrans_rollback(struct jtrans *ts)
{
	ssize_t rv;
	struct jtrans *newts;
	struct operation *op, *curop, *lop;

libjio/libjio/trans.c  view on Meta::CPAN

 * Basic operations
 */

/* Open a file */
struct jfs *jopen(const char *name, int flags, int mode, unsigned int jflags)
{
	int jfd, rv;
	unsigned int t;
	char jdir[PATH_MAX], jlockfile[PATH_MAX];
	struct stat sinfo;
	pthread_mutexattr_t attr;
	struct jfs *fs;

	fs = malloc(sizeof(struct jfs));
	if (fs == NULL)
		return NULL;

	fs->fd = -1;
	fs->jfd = -1;
	fs->jdir = NULL;
	fs->jdirfd = -1;

libjio/libjio/trans.c  view on Meta::CPAN

	fs->open_flags = flags;
	fs->ltrans = NULL;
	fs->ltrans_len = 0;

	/* Note on fs->lock usage: this lock is used only to protect the file
	 * pointer. This means that it must only be held while performing
	 * operations that depend or alter the file pointer (jread, jreadv,
	 * jwrite, jwritev), but the others (jpread, jpwrite) are left
	 * unprotected because they can be performed in parallel as long as
	 * they don't affect the same portion of the file (this is protected
	 * by lockf). The lock doesn't slow things down tho: any threaded app
	 * MUST implement this kind of locking anyways if it wants to prevent
	 * data corruption, we only make it easier for them by taking care of
	 * it here. If performance is essential, the jpread/jpwrite functions
	 * should be used, just as real life.
	 * About fs->ltlock, it's used to protect the lingering transactions
	 * list, fs->ltrans. */
	pthread_mutexattr_init(&attr);
	pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
	pthread_mutex_init( &(fs->lock), &attr);
	pthread_mutex_init( &(fs->ltlock), &attr);
	pthread_mutexattr_destroy(&attr);

	fs->fd = open(name, flags, mode);
	if (fs->fd < 0)
		goto error_exit;

	/* nothing else to do for read-only access */
	if (jflags & J_RDONLY) {
		return fs;
	}

libjio/libjio/trans.c  view on Meta::CPAN

	if (fs->fd < 0)
		return -1;

	rv = fdatasync(fs->fd);
	if (rv != 0)
		return rv;

	/* note the jops will be in order, so if we crash or fail in the
	 * middle of this, there will be no problem applying the remaining
	 * transactions */
	pthread_mutex_lock(&(fs->ltlock));
	while (fs->ltrans != NULL) {
		fiu_exit_on("jio/jsync/pre_unlink");
		if (journal_free(fs->ltrans->jop, 1) != 0) {
			pthread_mutex_unlock(&(fs->ltlock));
			return -1;
		}

		ltmp = fs->ltrans->next;
		free(fs->ltrans);
		fs->ltrans = ltmp;
	}

	fs->ltrans_len = 0;
	pthread_mutex_unlock(&(fs->ltlock));
	return 0;
}

/* Change the location of the journal directory */
int jmove_journal(struct jfs *fs, const char *newpath)
{
	int ret;
	char *oldpath, jlockfile[PATH_MAX], oldjlockfile[PATH_MAX];

	/* we try to be sure that all lingering transactions have been

libjio/libjio/trans.c  view on Meta::CPAN

	}

	if (fs->fd < 0 || close(fs->fd))
		ret = -1;
	if (fs->name)
		/* allocated by strdup() in jopen() */
		free(fs->name);
	if (fs->jdir)
		free(fs->jdir);

	pthread_mutex_destroy(&(fs->lock));

	free(fs);

	return ret;
}

libjio/libjio/trans.h  view on Meta::CPAN

	/** Number of read operations in the list */
	unsigned int numops_r;

	/** Number of write operations in the list */
	unsigned int numops_w;

	/** Sum of the lengths of the write operations */
	size_t len_w;

	/** Lock that protects the list of operations */
	pthread_mutex_t lock;

	/** List of operations */
	struct operation *op;
};

/** Possible operation directions */
enum op_direction {
	D_READ = 1,
	D_WRITE = 2,
};

libjio/libjio/unix.c  view on Meta::CPAN

/*
 * read() family wrappers
 */

/* read() wrapper */
ssize_t jread(struct jfs *fs, void *buf, size_t count)
{
	ssize_t rv;
	off_t pos;

	pthread_mutex_lock(&(fs->lock));

	pos = lseek(fs->fd, 0, SEEK_CUR);

	plockf(fs->fd, F_LOCKR, pos, count);
	rv = spread(fs->fd, buf, count, pos);
	plockf(fs->fd, F_UNLOCK, pos, count);

	if (rv > 0)
		lseek(fs->fd, rv, SEEK_CUR);

	pthread_mutex_unlock(&(fs->lock));

	return rv;
}

/* pread() wrapper */
ssize_t jpread(struct jfs *fs, void *buf, size_t count, off_t offset)
{
	ssize_t rv;

	plockf(fs->fd, F_LOCKR, offset, count);

libjio/libjio/unix.c  view on Meta::CPAN


	return rv;
}

/* readv() wrapper */
ssize_t jreadv(struct jfs *fs, const struct iovec *vector, int count)
{
	ssize_t rv;
	off_t pos;

	pthread_mutex_lock(&(fs->lock));
	pos = lseek(fs->fd, 0, SEEK_CUR);
	if (pos < 0)
		return -1;

	plockf(fs->fd, F_LOCKR, pos, count);
	rv = readv(fs->fd, vector, count);
	plockf(fs->fd, F_UNLOCK, pos, count);

	pthread_mutex_unlock(&(fs->lock));

	return rv;
}


/*
 * write() family wrappers
 */

/* write() wrapper */
ssize_t jwrite(struct jfs *fs, const void *buf, size_t count)
{
	ssize_t rv;
	off_t pos;
	struct jtrans *ts;

	ts = jtrans_new(fs, 0);
	if (ts == NULL)
		return -1;

	pthread_mutex_lock(&(fs->lock));

	if (fs->open_flags & O_APPEND)
		pos = lseek(fs->fd, 0, SEEK_END);
	else
		pos = lseek(fs->fd, 0, SEEK_CUR);

	rv = jtrans_add_w(ts, buf, count, pos);
	if (rv < 0)
		goto exit;

	rv = jtrans_commit(ts);

	if (rv >= 0)
		lseek(fs->fd, count, SEEK_CUR);

exit:

	pthread_mutex_unlock(&(fs->lock));

	jtrans_free(ts);

	return (rv >= 0) ? count : rv;
}

/* pwrite() wrapper */
ssize_t jpwrite(struct jfs *fs, const void *buf, size_t count, off_t offset)
{
	ssize_t rv;

libjio/libjio/unix.c  view on Meta::CPAN

	int i;
	size_t sum;
	ssize_t rv;
	off_t ipos, t;
	struct jtrans *ts;

	ts = jtrans_new(fs, 0);
	if (ts == NULL)
		return -1;

	pthread_mutex_lock(&(fs->lock));

	if (fs->open_flags & O_APPEND)
		ipos = lseek(fs->fd, 0, SEEK_END);
	else
		ipos = lseek(fs->fd, 0, SEEK_CUR);

	t = ipos;

	sum = 0;
	for (i = 0; i < count; i++) {

libjio/libjio/unix.c  view on Meta::CPAN

		sum += vector[i].iov_len;
		t += vector[i].iov_len;
	}

	rv = jtrans_commit(ts);

	if (rv >= 0)
		lseek(fs->fd, sum, SEEK_CUR);

exit:
	pthread_mutex_unlock(&(fs->lock));

	jtrans_free(ts);

	return (rv >= 0) ? sum : rv;
}

/* Truncate a file. Be careful with this */
int jtruncate(struct jfs *fs, off_t length)
{
	int rv;

libjio/libjio/unix.c  view on Meta::CPAN

	plockf(fs->fd, F_UNLOCK, length, 0);

	return rv;
}

/* lseek() wrapper */
off_t jlseek(struct jfs *fs, off_t offset, int whence)
{
	off_t rv;

	pthread_mutex_lock(&(fs->lock));
	rv = lseek(fs->fd, offset, whence);
	pthread_mutex_unlock(&(fs->lock));

	return rv;
}

libjio/tests/performance/performance.c  view on Meta::CPAN


/*
 * performance.c - A program to test speed of parallel writes using libjio.
 * Alberto Bertogli (albertito@blitiri.com.ar)
 *
 * It creates a big file, extends it using truncate, and forks N threads which
 * write the file in chunks (ie. if we have three threads, the first one
 * writes the first 1/3rd of the file, and so on).
 */

#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/time.h>
#include <string.h>
#include <libjio.h>

#define FILENAME "test_file"

/* These are shared among threads, to make the code simpler */
static jfs_t *fs;
static unsigned long mb;
static ssize_t blocksize, towrite;


static void help(void)
{
	printf("Use: performance towrite blocksize nthreads\n");
	printf("\n");
	printf(" - towrite: how many MB to write per thread\n");
	printf(" - blocksize: size of blocks written, in KB\n");
	printf(" - nthreads: number of threads to use\n");
}

static void *worker(void *tno)
{
	void *buf;
	unsigned long tid;
	ssize_t work_done, rv;
	off_t localoffset;
	long secs, usecs;
	double seconds, mb_per_sec;

libjio/tests/performance/performance.c  view on Meta::CPAN


	printf("%lu %zd %zd %f %f\n", tid, mb, blocksize, seconds, mb_per_sec);

	free(buf);

	return NULL;
}

int main(int argc, char **argv)
{
	int nthreads;
	unsigned long i;
	pthread_t *threads;
	struct jfsck_result ckres;

	if (argc != 4) {
		help();
		return 1;
	}

	mb = atoi(argv[1]);
	blocksize = atoi(argv[2]) * 1024;
	nthreads = atoi(argv[3]);
	towrite = mb * 1024 * 1024;

	threads = malloc(sizeof(pthread_t) * nthreads);
	if (threads == NULL) {
		perror("malloc()");
		return 1;
	}

	fs = jopen(FILENAME, O_RDWR | O_CREAT | O_TRUNC, 0600, 0);
	if (fs == NULL) {
		perror("jopen()");
		return 1;
	}

	jtruncate(fs, towrite * nthreads);

	for (i = 0; i < nthreads; i++) {
		pthread_create(threads + i, NULL, &worker, (void *) i);
	}

	for (i = 0; i < nthreads; i++) {
		pthread_join(*(threads + i), NULL);
	}

	jclose(fs);
	jfsck(FILENAME, NULL, &ckres, 0);
	if (ckres.total != 0) {
		fprintf(stderr, "There were %d errors during the test\n",
				ckres.total);
		fprintf(stderr, "jfsck() was used to fix them, but that ");
		fprintf(stderr, "shouldn't happen.\n");
		return 1;

libjio/tests/performance/random.c  view on Meta::CPAN


/*
 * random.c - A program to test speed of random writes using libjio.
 * Alberto Bertogli (albertito@blitiri.com.ar)
 *
 * It creates a big file, extends it using truncate, and forks N threads which
 * write the file in chunks (ie. if we have three threads, the first one
 * writes the first 1/3rd of the file, and so on).
 */

#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/time.h>
#include <string.h>
#include <libjio.h>

#define FILENAME "test_file"

/* These are shared among threads, to make the code simpler */
static jfs_t *fs;
static unsigned long mb;
static ssize_t blocksize, towrite;


static void help(void)
{
	printf("Use: performance towrite blocksize nthreads\n");
	printf("\n");
	printf(" - towrite: how many MB to write per thread\n");
	printf(" - blocksize: size of blocks written, in KB\n");
	printf(" - nthreads: number of threads to use\n");
}

static void *worker(void *tno)
{
	void *buf;
	unsigned long tid;
	ssize_t work_done, rv;
	off_t localoffset, offset;
	long secs, usecs;
	double seconds, mb_per_sec;

libjio/tests/performance/random.c  view on Meta::CPAN


	printf("%lu %zd %zd %f %f\n", tid, mb, blocksize, seconds, mb_per_sec);

	free(buf);

	return NULL;
}

int main(int argc, char **argv)
{
	int nthreads;
	unsigned long i;
	pthread_t *threads;
	struct jfsck_result ckres;

	if (argc != 4) {
		help();
		return 1;
	}

	mb = atoi(argv[1]);
	blocksize = atoi(argv[2]) * 1024;
	nthreads = atoi(argv[3]);
	towrite = mb * 1024 * 1024;

	threads = malloc(sizeof(pthread_t) * nthreads);
	if (threads == NULL) {
		perror("malloc()");
		return 1;
	}

	fs = jopen(FILENAME, O_RDWR | O_CREAT | O_TRUNC, 0600, 0);
	if (fs == NULL) {
		perror("jopen()");
		return 1;
	}

	jtruncate(fs, towrite * nthreads);

	for (i = 0; i < nthreads; i++) {
		pthread_create(threads + i, NULL, &worker, (void *) i);
	}

	for (i = 0; i < nthreads; i++) {
		pthread_join(*(threads + i), NULL);
	}

	jclose(fs);
	jfsck(FILENAME, NULL, &ckres, 0);
	if (ckres.total != 0) {
		fprintf(stderr, "There were %d errors during the test\n",
				ckres.total);
		fprintf(stderr, "jfsck() was used to fix them, but that ");
		fprintf(stderr, "shouldn't happen.\n");
		return 1;



( run in 0.474 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )