UnQLite
view release on metacpan or search on metacpan
unqlite/unqlite.c view on Meta::CPAN
/* Acquire a writer lock */
rc = pPage->pHash->pIo->xWrite(pPage->pRaw);
if( rc != UNQLITE_OK ){
return rc;
}
/* Offset of the first cell */
SyBigEndianPack16(zRaw,0);
zRaw += 2;
/* Offset of the first free block */
pHeader->iFree = L_HASH_PAGE_HDR_SZ;
SyBigEndianPack16(zRaw,L_HASH_PAGE_HDR_SZ);
zRaw += 2;
/* Slave page number */
SyBigEndianPack64(zRaw,0);
zRaw += 8;
/* Fill the free block */
SyBigEndianPack16(zRaw,0); /* Offset of the next free block */
zRaw += 2;
nByte = (sxu16)L_HASH_MX_FREE_SPACE(pPage->pHash->iPageSize);
SyBigEndianPack16(zRaw,nByte);
pPage->nFree = nByte;
/* Do not add this page to the hot dirty list */
pPage->pHash->pIo->xDontMkHot(pPage->pRaw);
return UNQLITE_OK;
}
/* Forward declaration */
static int lhSlaveStore(
lhpage *pPage,
const void *pKey,sxu32 nKeyLen,
const void *pData,unqlite_int64 nDataLen,
sxu32 nHash
);
/*
* Store a cell and its payload in a given page.
*/
static int lhStoreCell(
lhpage *pPage, /* Target page */
const void *pKey,sxu32 nKeyLen, /* Payload: Key */
const void *pData,unqlite_int64 nDataLen, /* Payload: Data */
sxu32 nHash, /* Hash of the key */
int auto_append /* Auto append a slave page if full */
)
{
lhash_kv_engine *pEngine = pPage->pHash;
int iNeedOvfl = 0; /* Need overflow page for this cell and its payload*/
lhcell *pCell;
sxu16 nOfft;
int rc;
/* Acquire a writer lock on this page first */
rc = pEngine->pIo->xWrite(pPage->pRaw);
if( rc != UNQLITE_OK ){
return rc;
}
/* Check for a free block */
rc = lhAllocateSpace(pPage,L_HASH_CELL_SZ+nKeyLen+nDataLen,&nOfft);
if( rc != UNQLITE_OK ){
/* Check for a free block to hold a single cell only (without payload) */
rc = lhAllocateSpace(pPage,L_HASH_CELL_SZ,&nOfft);
if( rc != UNQLITE_OK ){
if( !auto_append ){
/* A split must be done */
return UNQLITE_FULL;
}else{
/* Store this record in a slave page */
rc = lhSlaveStore(pPage,pKey,nKeyLen,pData,nDataLen,nHash);
return rc;
}
}
iNeedOvfl = 1;
}
/* Allocate a new cell instance */
pCell = lhNewCell(pEngine,pPage);
if( pCell == 0 ){
pEngine->pIo->xErr(pEngine->pIo->pHandle,"KV store is running out of memory");
return UNQLITE_NOMEM;
}
/* Fill-in the structure */
pCell->iStart = nOfft;
pCell->nKey = nKeyLen;
pCell->nData = (sxu64)nDataLen;
pCell->nHash = nHash;
if( nKeyLen < 262144 /* 256 KB */ ){
/* Keep the key in-memory for fast lookup */
SyBlobAppend(&pCell->sKey,pKey,nKeyLen);
}
/* Link the cell */
rc = lhInstallCell(pCell);
if( rc != UNQLITE_OK ){
return rc;
}
/* Write the payload */
if( iNeedOvfl ){
rc = lhCellWriteOvflPayload(pCell,pKey,nKeyLen,pData,nDataLen,(const void *)0);
if( rc != UNQLITE_OK ){
lhCellDiscard(pCell);
return rc;
}
}else{
lhCellWriteLocalPayload(pCell,pKey,nKeyLen,pData,nDataLen);
}
/* Finally, Write the cell header */
lhCellWriteHeader(pCell);
/* All done */
return UNQLITE_OK;
}
/*
* Find a slave page capable of hosting the given amount.
*/
static int lhFindSlavePage(lhpage *pPage,sxu64 nAmount,sxu16 *pOfft,lhpage **ppSlave)
{
lhash_kv_engine *pEngine = pPage->pHash;
lhpage *pMaster = pPage->pMaster;
lhpage *pSlave = pMaster->pSlave;
unqlite_page *pRaw;
lhpage *pNew;
sxu16 iOfft;
sxi32 i;
int rc;
/* Look for an already attached slave page */
for( i = 0 ; i < pMaster->iSlave ; ++i ){
/* Find a free chunk big enough */
( run in 2.918 seconds using v1.01-cache-2.11-cpan-71847e10f99 )