Cache-FastMmap

 view release on metacpan or  search on metacpan

mmap_cache.c  view on Meta::CPAN

      MU64 p_offset = (MU64)i * cache->c_page_size;
      mmc_lock_page(cache, p_offset);
      _mmc_init_page(cache, i);
      mmc_unlock_page(cache, p_offset);
    }

    /* Unmap and re-map to stop gtop telling us our memory usage is up */
    if ( mmc_unmap_memory(cache) == -1) return -1;
    if ( mmc_map_memory(cache) == -1) return -1;
  }

  /* Test pages in file if asked */
  if (cache->test_file) {
    for (i = 0; i < cache->c_num_pages; i++) {
      int bad_page = 0;
      MU64 p_offset = (MU64)i * cache->c_page_size;

      /* Need to lock page, which tests header structure */
      if (mmc_lock(cache, i)) {
        /* If that failed, assume bad header, so manually lock */
        mmc_lock_page(cache, p_offset);
        bad_page = 1;

      /* If lock succeeded, test page structure */
      } else {
        if (!_mmc_test_page(cache))
          bad_page = 1;
      }

      /* A bad page, initialise it */
      if (bad_page) {
        _mmc_init_page(cache, i);
        /* Rerun test on this page, potential infinite
           loop if init_page is broken, but then things
           are really broken anyway */
        i--;
      }

      mmc_unlock_page(cache, p_offset);
      cache->p_cur = NOPAGE;
    }
  }

  return 0;
}

/*
 * int mmc_close(mmap_cache * cache)
 *
 * Close the given cache, unmmap'ing any memory and closing file
 * descriptors. 
 * 
*/
int mmc_close(mmap_cache *cache) {
  int res;

  /* Shouldn't call if not init'ed */
  ASSERT(cache->fh);
  ASSERT(cache->mm_var);

  /* Shouldn't call if page still locked */
  ASSERT(cache->p_cur == NOPAGE);

  /* Unlock any locked page */
  if (cache->p_cur != NOPAGE) {
    mmc_unlock(cache);
  }

  /* Close file */
  if (cache->fh) {
    mmc_close_fh(cache);
  }

  /* Unmap memory */
  if (cache->mm_var) {
    res = mmc_unmap_memory(cache);
    if (res == -1) {
      return _mmc_set_error(cache, errno, "Mmap of shared file %s failed", cache->share_file);
    }
  }

  free(cache);

  return 0;
}

char * mmc_error(mmap_cache * cache) {
  if (cache->last_error)
    return cache->last_error;
  return "Unknown error";
}

/*
 * mmc_lock(
 *   cache_mmap * cache, MU32 p_cur
 * )
 *
 * Lock the given page number using fcntl locking. Setup
 * cache->p_* fields with correct values for the given page
 *
*/
int mmc_lock(mmap_cache * cache, MU32 p_cur) {
  MU64 p_offset;
  void * p_ptr;
  int res = 0;

  /* Argument sanity check */
  if (p_cur == NOPAGE || p_cur > cache->c_num_pages)
    return _mmc_set_error(cache, 0, "page %u is NOPAGE or larger than number of pages", p_cur);

  /* Check not already locked */
  if (cache->p_cur != NOPAGE)
    return _mmc_set_error(cache, 0, "page %u is already locked, can't lock multiple pages", cache->p_cur);

  /* Setup page details */
  p_offset = (MU64)p_cur * cache->c_page_size;
  p_ptr = PTR_ADD(cache->mm_var, p_offset);

  res = mmc_lock_page(cache, p_offset);
  if (res) return res;

  if (!(P_Magic(p_ptr) == 0x92f7e3b1)) {
    mmc_unlock_page(cache, p_offset);
    return _mmc_set_error(cache, 0, "magic page start marker not found. p_cur is %u, offset is %llu", p_cur, p_offset);
  }

  /* Copy to cache structure */
  cache->p_num_slots = P_NumSlots(p_ptr);
  cache->p_free_slots = P_FreeSlots(p_ptr);
  cache->p_old_slots = P_OldSlots(p_ptr);
  cache->p_free_data = P_FreeData(p_ptr);
  cache->p_free_bytes = P_FreeBytes(p_ptr);
  cache->p_n_reads = P_NReads(p_ptr);
  cache->p_n_read_hits = P_NReadHits(p_ptr);

  /* Reality check */
  if (cache->p_num_slots < 89 || cache->p_num_slots > cache->c_page_size)
    res = _mmc_set_error(cache, 0, "cache num_slots mistmatch");
  else if (cache->p_free_slots < 0 || cache->p_free_slots > cache->p_num_slots)
    res = _mmc_set_error(cache, 0, "cache free slots mustmatch");
  else if (cache->p_old_slots > cache->p_free_slots)
    res = _mmc_set_error(cache, 0, "cache old slots mistmatch");
  else if (cache->p_free_data + cache->p_free_bytes != cache->c_page_size)
    res = _mmc_set_error(cache, 0, "cache free data mistmatch");
  if (res) {
    mmc_unlock_page(cache, p_offset);
    return res;
  }

  /* Check page header */
  ASSERT(P_Magic(p_ptr) == 0x92f7e3b1);
  ASSERT(P_NumSlots(p_ptr) >= 89 && P_NumSlots(p_ptr) < cache->c_page_size);
  ASSERT(P_FreeSlots(p_ptr) >= 0 && P_FreeSlots(p_ptr) <= P_NumSlots(p_ptr));
  ASSERT(P_OldSlots(p_ptr) <= P_FreeSlots(p_ptr));
  ASSERT(P_FreeData(p_ptr) + P_FreeBytes(p_ptr) == cache->c_page_size);

  /* Setup page pointers */
  cache->p_cur = p_cur;
  cache->p_offset = p_offset;
  cache->p_base = p_ptr;
  cache->p_base_slots = PTR_ADD(p_ptr, P_HEADERSIZE);

  ASSERT(_mmc_test_page(cache));

  return 0;
}

/*
 * mmc_unlock(
 *   cache_mmap * cache
 * )
 *
 * Unlock any currently locked page
 *
*/
int mmc_unlock(mmap_cache * cache) {

  ASSERT(cache->p_cur != NOPAGE);

  /* If changed, save page header changes back */
  if (cache->p_changed) {
    void * p_ptr = cache->p_base;

    /* Save any changed information back to page */
    P_NumSlots(p_ptr) = cache->p_num_slots;
    P_FreeSlots(p_ptr) = cache->p_free_slots;
    P_OldSlots(p_ptr) = cache->p_old_slots;
    P_FreeData(p_ptr) = cache->p_free_data;
    P_FreeBytes(p_ptr) = cache->p_free_bytes;
    P_NReads(p_ptr) = cache->p_n_reads;
    P_NReadHits(p_ptr) = cache->p_n_read_hits;

    cache->p_changed = 0;
  }

  /* Test before unlocking */
  ASSERT(_mmc_test_page(cache));

  mmc_unlock_page(cache, cache->p_offset);

  cache->p_cur = NOPAGE;

  return 0;
}

/*
 * mmc_is_locked(
 *   cache_mmap * cache
 * )
 *
 * Return true if page is locked
 *
*/
int mmc_is_locked(mmap_cache * cache) {

  return cache->p_cur != NOPAGE ? 1 : 0;
}

/*
 * int mmc_hash(
 *   cache_mmap * cache,
 *   void *key_ptr, int key_len,
 *   MU32 *hash_page, MU32 *hash_slot
 * )
 *
 * Hashes the given key, and returns hash value, hash page and hash
 * slot part
 *
*/
int mmc_hash(
  mmap_cache *cache,
  void *key_ptr, int key_len,
  MU32 *hash_page, MU32 *hash_slot
) {
  MU32 h = 0x92f7e3b1;
  unsigned char * uc_key_ptr = (unsigned char *)key_ptr;
  unsigned char * uc_key_ptr_end = uc_key_ptr + key_len;

  while (uc_key_ptr != uc_key_ptr_end) {
    h = (h << 4) + (h >> 28) + *uc_key_ptr++;
  }

  *hash_page = h % cache->c_num_pages;
  *hash_slot = h / cache->c_num_pages;

  return 0;
}

/*
 * int mmc_read(
 *   cache_mmap * cache, MU32 hash_slot,
 *   void *key_ptr, int key_len,
 *   void **val_ptr, int *val_len,
 *   MU32 *expire_on, MU32 *flags
 * )
 *
 * Read key from current page
 *
*/
int mmc_read(
  mmap_cache *cache, MU32 hash_slot,
  void *key_ptr, int key_len,
  void **val_ptr, int *val_len,
  MU32 *expire_on_p, MU32 *flags_p
) {
  MU32 * slot_ptr;

  /* Increase read count for page */
  if (cache->enable_stats) {
    cache->p_changed = 1;
    cache->p_n_reads++;
  }

  /* Search slots for key */

mmap_cache.c  view on Meta::CPAN

#ifdef DEBUG
    /* Check hash actually matches stored value */
    {
      MU32 hash_page_dummy, hash_slot;
      mmc_hash(cache, S_KeyPtr(old_base_det), S_KeyLen(old_base_det), &hash_page_dummy, &hash_slot);

      ASSERT(hash_slot == S_SlotHash(old_base_det));
    }
#endif

    /* Find free slot */
    new_slot_ptr = new_slot_data + slot;
    while (*new_slot_ptr) {
      if (++slot >= new_num_slots) { slot = 0; }
      new_slot_ptr = new_slot_data + slot;
    }

    /* Copy slot and KV data */
    kvlen = S_SlotLen(old_base_det);
    memcpy(PTR_ADD(new_kv_data, new_offset), old_base_det, kvlen);

    /* Store slot data and mark as used */
    *new_slot_ptr = new_offset + new_num_slots * 4 + P_HEADERSIZE;

    ROUNDLEN(kvlen);
    new_offset += kvlen;
  }

  ASSERT(new_offset <= page_data_size);

/*  printf("page=%d\n", cache->p_cur);
  printf("old_slots=%d, new_slots=%d\n", old_num_slots, new_num_slots);
  printf("old_used_slots=%d, new_used_slots=%d\n", old_used_slots, new_used_slots);*/

  /* Store back into mmap'ed file space */
  memcpy(base_slots, new_slot_data, slot_data_size);
  memcpy(base_slots + new_num_slots, new_kv_data, new_offset);

  cache->p_num_slots = new_num_slots;
  cache->p_free_slots = new_num_slots - new_used_slots;
  cache->p_old_slots = 0;
  cache->p_free_data = new_offset + new_num_slots * 4 + P_HEADERSIZE;
  cache->p_free_bytes = page_data_size - new_offset;

  /* Make sure changes are saved back to mmap'ed file */
  cache->p_changed = 1;

  /* Free allocated memory */
  free(new_kv_data);
  free(new_slot_data);
  free(to_expunge);

  ASSERT(_mmc_test_page(cache));

  return 1;
}

/*
 * void mmc_get_page_details(mmap_cache * cache, MU32 * n_reads, MU32 * n_read_hits)
 *
 * Return details about the current locked page. Currently just
 * number of reads and number of reads that hit
 *
*/
void mmc_get_page_details(mmap_cache * cache, MU32 * n_reads, MU32 * n_read_hits) {
  *n_reads = cache->p_n_reads;
  *n_read_hits = cache->p_n_read_hits;
  return;
}

/*
 * void mmc_reset_page_details(mmap_cache * cache)
 *
 * Reset any page details (currently just read hits)
 *
*/
void mmc_reset_page_details(mmap_cache * cache) {
  cache->p_n_reads = 0;
  cache->p_n_read_hits = 0;
  cache->p_changed = 1;
  return;
}

/*
 * mmap_cache_it * mmc_iterate_new(mmap_cache * cache)
 *
 * Setup a new iterator to iterate over stored items
 * in the cache
 *
*/
mmap_cache_it * mmc_iterate_new(mmap_cache * cache) {
  mmap_cache_it * it = (mmap_cache_it *)calloc(1, sizeof(mmap_cache_it));
  it->cache = cache;
  it->p_cur = NOPAGE;

  return it;
}

/*
 * MU32 * mmc_iterate_next(mmap_cache_it * it)
 *
 * Move iterator to next item in the cache and return
 * pointer to details (0 if there is no next).
 *
 * You can retrieve details with mmc_get_details(...)
 *
*/
MU32 * mmc_iterate_next(mmap_cache_it * it) {
  mmap_cache * cache = it->cache;
  MU32 * slot_ptr = it->slot_ptr;
  MU32 * base_det;
  MU32 expire_on;
  MU32 now = time_override ? time_override : (MU32)time(0);

  /* Go until we find a slot or exit */
  while (1) {

    /* End of page ... */
    if (slot_ptr == it->slot_ptr_end) {

      if (it->p_cur == NOPAGE) {
        it->p_cur = 0;

      /* Unlock current page if any */
      } else {
        mmc_unlock(it->cache);

        /* Move to the next page, return 0 if no more pages */
        if (++it->p_cur == cache->c_num_pages) {
          it->p_cur = NOPAGE;
          it->slot_ptr = 0;
          return 0;
        }
      }

      /* Lock the new page number */
      mmc_lock(it->cache, it->p_cur);

      /* Setup new pointers */
      slot_ptr = cache->p_base_slots;
      it->slot_ptr_end = slot_ptr + cache->p_num_slots;

      /* Check again */
      continue;
    }

    /* Slot not used */
    if (*slot_ptr <= 1) {
      slot_ptr++;
      continue;
    }

    /* Get pointer to details for this entry */
    base_det = S_Ptr(cache->p_base, *slot_ptr);

    /* Slot expired */
    expire_on = S_ExpireOn(base_det);
    if (expire_on && now >= expire_on) {
      slot_ptr++;
      continue;
    }

    break;
  }

  /* Move to the next slot for next iteration */
  it->slot_ptr = ++slot_ptr;

  /* Return that we found the next item */
  return base_det;
}

/*
 * void mmc_iterate_close(mmap_cache_it * it)
 *
 * Finish and dispose of iterator memory
 *
*/
void mmc_iterate_close(mmap_cache_it * it) {
  /* Unlock page if locked */
  if (it->p_cur != NOPAGE) {
    mmc_unlock(it->cache);
  }

  /* Free memory */
  free(it);
}

/*
 * void mmc_get_details(
 *   mmap_cache * cache,
 *   MU32 * base_det,
 *   void ** key_ptr, int * key_len,
 *   void ** val_ptr, int * val_len,
 *   MU32 * last_access, MU32 * expire_on, MU32 * flags
 * )
 *
 * Given a base_det pointer to entries details
 * (as returned by mmc_iterate_next(...) and
 * mmc_calc_expunge(...)) return details of that
 * entry in the cache
 *
*/
void mmc_get_details(
  mmap_cache * cache,
  MU32 * base_det,
  void ** key_ptr, int * key_len,
  void ** val_ptr, int * val_len,
  MU32 * last_access, MU32 * expire_on, MU32 * flags
) {
  cache = cache;

  *key_ptr = S_KeyPtr(base_det);
  *key_len = S_KeyLen(base_det);

  *val_ptr = S_ValPtr(base_det);
  *val_len = S_ValLen(base_det);

  *last_access = S_LastAccess(base_det);
  *expire_on = S_ExpireOn(base_det);
  *flags = S_Flags(base_det);
}


/*
 * _mmc_delete_slot(
 *   mmap_cache * cache, MU32 * slot_ptr
 * )
 *
 * Delete details from the given slot
 *
*/
void _mmc_delete_slot(
  mmap_cache * cache, MU32 * slot_ptr
) {
  ASSERT(*slot_ptr > 1);
  ASSERT(cache->p_cur != NOPAGE);

  /* Set offset to 1 */
  *slot_ptr = 1;

mmap_cache.c  view on Meta::CPAN

  slots_left = cache->p_num_slots;
  slots_end = cache->p_base_slots + slots_left;

  ASSERT(cache->p_cur != NOPAGE);

  /* Loop with integer probing till we find or don't */
  while (slots_left--) {
    MU32 data_offset = *slot_ptr;
    ASSERT(data_offset == 0 || data_offset == 1 ||
        ((data_offset >= P_HEADERSIZE + cache->p_num_slots*4) &&
         (data_offset < cache->c_page_size) &&
         ((data_offset & 3) == 0)));

    /* data_offset == 0 means empty slot, and no more beyond */
    /* data_offset == 1 means deleted slot, we can reuse if writing */
    if (data_offset == 0) {
      /* Return pointer to last checked slot */
      break;
    }
    if (data_offset == 1 && mode == 1 && 0 == first_deleted) {
      /* Save pointer to first usable slot; if we don't find the key later,
         we'll fall back to returning this.
      */
      first_deleted = slot_ptr;
    }
    /* deleted slot, keep looking */
    if (data_offset == 1) {

    } else {
      /* Offset is from start of data area */
      MU32 * base_det = S_Ptr(cache->p_base, data_offset);

      /* Two longs are key len and data len */
      MU32 fkey_len = S_KeyLen(base_det);

      /* Key matches? */
      if (fkey_len == (MU32)key_len && !memcmp(key_ptr, S_KeyPtr(base_det), key_len)) {

        /* Yep, found it! */
        return slot_ptr;
      }
    }

    /* Linear probe and wrap at end of slot data... */
    if (++slot_ptr == slots_end) { slot_ptr = cache->p_base_slots; }
    ASSERT(slot_ptr >= cache->p_base_slots && slot_ptr < slots_end);
  }
  /* No slot found */
  if (++slots_left == 0) slot_ptr = 0;

  if (1 == mode && 0 != first_deleted)
    return first_deleted;
  else
    return slot_ptr;
}

/*
 * void _mmc_init_page(mmap_cache * cache, int page)
 *
 * Initialise the given page as empty. It's expected
 *  that you've already locked the page before doing
 *  this
 *
*/
void _mmc_init_page(mmap_cache * cache, MU32 p_cur) {
  /* Setup page details */
  MU64 p_offset = (MU64)p_cur * cache->c_page_size;
  void * p_ptr = PTR_ADD(cache->mm_var, p_offset);

  /* Initialise to all 0's */
  memset(p_ptr, 0, cache->c_page_size);

  /* Setup header */
  P_Magic(p_ptr) = 0x92f7e3b1;
  P_NumSlots(p_ptr) = cache->start_slots;
  P_FreeSlots(p_ptr) = cache->start_slots;
  P_OldSlots(p_ptr) = 0;
  P_FreeData(p_ptr) = P_HEADERSIZE + cache->start_slots * 4;
  P_FreeBytes(p_ptr) = cache->c_page_size - P_FreeData(p_ptr);
  P_NReads(p_ptr) = 0;
  P_NReadHits(p_ptr) = 0;

}

/*
 * int _mmc_test_page(mmap_cache * cache)
 *
 * Test integrity of current page
 *
*/
int  _mmc_test_page(mmap_cache * cache) {
  MU32 * slot_ptr = cache->p_base_slots;
  MU32 count_free = 0, count_old = 0, max_data_offset = 0;
  MU32 data_size = cache->c_page_size;

  ASSERT(cache->p_cur != NOPAGE);
  if (cache->p_cur == NOPAGE) return 0;

  for (; slot_ptr < cache->p_base_slots + cache->p_num_slots; slot_ptr++) {
    MU32 data_offset = *slot_ptr;

    ASSERT(data_offset == 0 || data_offset == 1 ||
        (data_offset >= P_HEADERSIZE + cache->p_num_slots * 4 &&
         data_offset < cache->c_page_size));
    if (!(data_offset == 0 || data_offset == 1 ||
        (data_offset >= P_HEADERSIZE + cache->p_num_slots * 4 &&
         data_offset < cache->c_page_size))) return 0;

    if (data_offset == 1) {
      count_old++;
    }
    if (data_offset <= 1) {
      count_free++;
      continue;
    }

    if (data_offset > 1) {
      MU32 * base_det = S_Ptr(cache->p_base, data_offset);

      MU32 last_access = S_LastAccess(base_det);
      MU32 expire_on = S_ExpireOn(base_det);



( run in 1.793 second using v1.01-cache-2.11-cpan-39bf76dae61 )