Browse Source

Squashed 'src/leveldb/' changes from be1b0ff..936b461

936b461 Merge upstream LevelDB 1.13.
748539c LevelDB 1.13

git-subtree-dir: src/leveldb
git-subtree-split: 936b4613ea4551992e6096b1e05eeefc09a20e3b
0.15
Pieter Wuille 11 years ago
parent
commit
eed29f0f50
  1. 6
      Makefile
  2. 118
      db/autocompact_test.cc
  3. 49
      db/corruption_test.cc
  4. 41
      db/db_impl.cc
  5. 9
      db/db_impl.h
  6. 41
      db/db_iter.cc
  7. 8
      db/db_iter.h
  8. 3
      db/dbformat.h
  9. 88
      db/version_set.cc
  10. 15
      db/version_set.h
  11. 2
      include/leveldb/db.h
  12. 33
      util/env_posix.cc
  13. 7
      util/random.h

6
Makefile

@ -31,6 +31,7 @@ TESTHARNESS = ./util/testharness.o $(TESTUTIL)
TESTS = \ TESTS = \
arena_test \ arena_test \
autocompact_test \
bloom_test \ bloom_test \
c_test \ c_test \
cache_test \ cache_test \
@ -70,7 +71,7 @@ SHARED = $(SHARED1)
else else
# Update db.h if you change these. # Update db.h if you change these.
SHARED_MAJOR = 1 SHARED_MAJOR = 1
SHARED_MINOR = 12 SHARED_MINOR = 13
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT) SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1).$(SHARED_MAJOR) SHARED2 = $(SHARED1).$(SHARED_MAJOR)
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR) SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
@ -114,6 +115,9 @@ leveldbutil: db/leveldb_main.o $(LIBOBJECTS)
arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) $(CXX) $(LDFLAGS) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
autocompact_test: db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) $(CXX) $(LDFLAGS) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)

118
db/autocompact_test.cc

@ -0,0 +1,118 @@
// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/db.h"
#include "db/db_impl.h"
#include "leveldb/cache.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace leveldb {
class AutoCompactTest {
public:
std::string dbname_;
Cache* tiny_cache_;
Options options_;
DB* db_;
AutoCompactTest() {
dbname_ = test::TmpDir() + "/autocompact_test";
tiny_cache_ = NewLRUCache(100);
options_.block_cache = tiny_cache_;
DestroyDB(dbname_, options_);
options_.create_if_missing = true;
options_.compression = kNoCompression;
ASSERT_OK(DB::Open(options_, dbname_, &db_));
}
~AutoCompactTest() {
delete db_;
DestroyDB(dbname_, Options());
delete tiny_cache_;
}
std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key%06d", i);
return std::string(buf);
}
uint64_t Size(const Slice& start, const Slice& limit) {
Range r(start, limit);
uint64_t size;
db_->GetApproximateSizes(&r, 1, &size);
return size;
}
void DoReads(int n);
};
static const int kValueSize = 200 * 1024;
static const int kTotalSize = 100 * 1024 * 1024;
static const int kCount = kTotalSize / kValueSize;
// Read through the first n keys repeatedly and check that they get
// compacted (verified by checking the size of the key space).
void AutoCompactTest::DoReads(int n) {
std::string value(kValueSize, 'x');
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
// Fill database
for (int i = 0; i < kCount; i++) {
ASSERT_OK(db_->Put(WriteOptions(), Key(i), value));
}
ASSERT_OK(dbi->TEST_CompactMemTable());
// Delete everything
for (int i = 0; i < kCount; i++) {
ASSERT_OK(db_->Delete(WriteOptions(), Key(i)));
}
ASSERT_OK(dbi->TEST_CompactMemTable());
// Get initial measurement of the space we will be reading.
const int64_t initial_size = Size(Key(0), Key(n));
const int64_t initial_other_size = Size(Key(n), Key(kCount));
// Read until size drops significantly.
std::string limit_key = Key(n);
for (int read = 0; true; read++) {
ASSERT_LT(read, 100) << "Taking too long to compact";
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst();
iter->Valid() && iter->key().ToString() < limit_key;
iter->Next()) {
// Drop data
}
delete iter;
// Wait a little bit to allow any triggered compactions to complete.
Env::Default()->SleepForMicroseconds(1000000);
uint64_t size = Size(Key(0), Key(n));
fprintf(stderr, "iter %3d => %7.3f MB [other %7.3f MB]\n",
read+1, size/1048576.0, Size(Key(n), Key(kCount))/1048576.0);
if (size <= initial_size/10) {
break;
}
}
// Verify that the size of the key space not touched by the reads
// is pretty much unchanged.
const int64_t final_other_size = Size(Key(n), Key(kCount));
ASSERT_LE(final_other_size, initial_other_size + 1048576);
ASSERT_GE(final_other_size, initial_other_size/5 - 1048576);
}
TEST(AutoCompactTest, ReadAll) {
DoReads(kCount);
}
TEST(AutoCompactTest, ReadHalf) {
DoReads(kCount/2);
}
} // namespace leveldb
int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}

49
db/corruption_test.cc

@ -35,6 +35,7 @@ class CorruptionTest {
CorruptionTest() { CorruptionTest() {
tiny_cache_ = NewLRUCache(100); tiny_cache_ = NewLRUCache(100);
options_.env = &env_; options_.env = &env_;
options_.block_cache = tiny_cache_;
dbname_ = test::TmpDir() + "/db_test"; dbname_ = test::TmpDir() + "/db_test";
DestroyDB(dbname_, options_); DestroyDB(dbname_, options_);
@ -50,17 +51,14 @@ class CorruptionTest {
delete tiny_cache_; delete tiny_cache_;
} }
Status TryReopen(Options* options = NULL) { Status TryReopen() {
delete db_; delete db_;
db_ = NULL; db_ = NULL;
Options opt = (options ? *options : options_); return DB::Open(options_, dbname_, &db_);
opt.env = &env_;
opt.block_cache = tiny_cache_;
return DB::Open(opt, dbname_, &db_);
} }
void Reopen(Options* options = NULL) { void Reopen() {
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen());
} }
void RepairDB() { void RepairDB() {
@ -92,6 +90,10 @@ class CorruptionTest {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
uint64_t key; uint64_t key;
Slice in(iter->key()); Slice in(iter->key());
if (in == "" || in == "~") {
// Ignore boundary keys.
continue;
}
if (!ConsumeDecimalNumber(&in, &key) || if (!ConsumeDecimalNumber(&in, &key) ||
!in.empty() || !in.empty() ||
key < next_expected) { key < next_expected) {
@ -233,7 +235,7 @@ TEST(CorruptionTest, TableFile) {
dbi->TEST_CompactRange(1, NULL, NULL); dbi->TEST_CompactRange(1, NULL, NULL);
Corrupt(kTableFile, 100, 1); Corrupt(kTableFile, 100, 1);
Check(99, 99); Check(90, 99);
} }
TEST(CorruptionTest, TableFileIndexData) { TEST(CorruptionTest, TableFileIndexData) {
@ -299,7 +301,7 @@ TEST(CorruptionTest, CompactionInputError) {
ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last))); ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
Corrupt(kTableFile, 100, 1); Corrupt(kTableFile, 100, 1);
Check(9, 9); Check(5, 9);
// Force compactions by writing lots of values // Force compactions by writing lots of values
Build(10000); Build(10000);
@ -307,32 +309,23 @@ TEST(CorruptionTest, CompactionInputError) {
} }
TEST(CorruptionTest, CompactionInputErrorParanoid) { TEST(CorruptionTest, CompactionInputErrorParanoid) {
Options options; options_.paranoid_checks = true;
options.paranoid_checks = true; options_.write_buffer_size = 512 << 10;
options.write_buffer_size = 1048576; Reopen();
Reopen(&options);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
// Fill levels >= 1 so memtable compaction outputs to level 1 // Make multiple inputs so we need to compact.
for (int level = 1; level < config::kNumLevels; level++) { for (int i = 0; i < 2; i++) {
dbi->Put(WriteOptions(), "", "begin");
dbi->Put(WriteOptions(), "~", "end");
dbi->TEST_CompactMemTable();
}
Build(10); Build(10);
dbi->TEST_CompactMemTable(); dbi->TEST_CompactMemTable();
ASSERT_EQ(1, Property("leveldb.num-files-at-level0"));
Corrupt(kTableFile, 100, 1); Corrupt(kTableFile, 100, 1);
Check(9, 9); env_.SleepForMicroseconds(100000);
}
dbi->CompactRange(NULL, NULL);
// Write must eventually fail because of corrupted table // Write must fail because of corrupted table
Status s;
std::string tmp1, tmp2; std::string tmp1, tmp2;
for (int i = 0; i < 10000 && s.ok(); i++) { Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2));
s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
}
ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db"; ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
} }

41
db/db_impl.cc

@ -113,14 +113,14 @@ Options SanitizeOptions(const std::string& dbname,
return result; return result;
} }
DBImpl::DBImpl(const Options& options, const std::string& dbname) DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
: env_(options.env), : env_(raw_options.env),
internal_comparator_(options.comparator), internal_comparator_(raw_options.comparator),
internal_filter_policy_(options.filter_policy), internal_filter_policy_(raw_options.filter_policy),
options_(SanitizeOptions( options_(SanitizeOptions(dbname, &internal_comparator_,
dbname, &internal_comparator_, &internal_filter_policy_, options)), &internal_filter_policy_, raw_options)),
owns_info_log_(options_.info_log != options.info_log), owns_info_log_(options_.info_log != raw_options.info_log),
owns_cache_(options_.block_cache != options.block_cache), owns_cache_(options_.block_cache != raw_options.block_cache),
dbname_(dbname), dbname_(dbname),
db_lock_(NULL), db_lock_(NULL),
shutting_down_(NULL), shutting_down_(NULL),
@ -130,6 +130,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
logfile_(NULL), logfile_(NULL),
logfile_number_(0), logfile_number_(0),
log_(NULL), log_(NULL),
seed_(0),
tmp_batch_(new WriteBatch), tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false), bg_compaction_scheduled_(false),
manual_compaction_(NULL), manual_compaction_(NULL),
@ -138,7 +139,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
has_imm_.Release_Store(NULL); has_imm_.Release_Store(NULL);
// Reserve ten files or so for other uses and give the rest to TableCache. // Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options.max_open_files - kNumNonTableCacheFiles; const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
table_cache_ = new TableCache(dbname_, &options_, table_cache_size); table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
versions_ = new VersionSet(dbname_, &options_, table_cache_, versions_ = new VersionSet(dbname_, &options_, table_cache_,
@ -1027,7 +1028,8 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
} // namespace } // namespace
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot) { SequenceNumber* latest_snapshot,
uint32_t* seed) {
IterState* cleanup = new IterState; IterState* cleanup = new IterState;
mutex_.Lock(); mutex_.Lock();
*latest_snapshot = versions_->LastSequence(); *latest_snapshot = versions_->LastSequence();
@ -1051,13 +1053,15 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
cleanup->version = versions_->current(); cleanup->version = versions_->current();
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
*seed = ++seed_;
mutex_.Unlock(); mutex_.Unlock();
return internal_iter; return internal_iter;
} }
Iterator* DBImpl::TEST_NewInternalIterator() { Iterator* DBImpl::TEST_NewInternalIterator() {
SequenceNumber ignored; SequenceNumber ignored;
return NewInternalIterator(ReadOptions(), &ignored); uint32_t ignored_seed;
return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
} }
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
@ -1114,12 +1118,21 @@ Status DBImpl::Get(const ReadOptions& options,
Iterator* DBImpl::NewIterator(const ReadOptions& options) { Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot; SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot); uint32_t seed;
Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
return NewDBIterator( return NewDBIterator(
&dbname_, env_, user_comparator(), internal_iter, this, user_comparator(), iter,
(options.snapshot != NULL (options.snapshot != NULL
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_ ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot)); : latest_snapshot),
seed);
}
void DBImpl::RecordReadSample(Slice key) {
MutexLock l(&mutex_);
if (versions_->current()->RecordReadSample(key)) {
MaybeScheduleCompaction();
}
} }
const Snapshot* DBImpl::GetSnapshot() { const Snapshot* DBImpl::GetSnapshot() {

9
db/db_impl.h

@ -59,13 +59,19 @@ class DBImpl : public DB {
// file at a level >= 1. // file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes(); int64_t TEST_MaxNextLevelOverlappingBytes();
// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes.
void RecordReadSample(Slice key);
private: private:
friend class DB; friend class DB;
struct CompactionState; struct CompactionState;
struct Writer; struct Writer;
Iterator* NewInternalIterator(const ReadOptions&, Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot); SequenceNumber* latest_snapshot,
uint32_t* seed);
Status NewDB(); Status NewDB();
@ -135,6 +141,7 @@ class DBImpl : public DB {
WritableFile* logfile_; WritableFile* logfile_;
uint64_t logfile_number_; uint64_t logfile_number_;
log::Writer* log_; log::Writer* log_;
uint32_t seed_; // For sampling.
// Queue of writers. // Queue of writers.
std::deque<Writer*> writers_; std::deque<Writer*> writers_;

41
db/db_iter.cc

@ -5,12 +5,14 @@
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/filename.h" #include "db/filename.h"
#include "db/db_impl.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
#include "port/port.h" #include "port/port.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/random.h"
namespace leveldb { namespace leveldb {
@ -46,15 +48,16 @@ class DBIter: public Iterator {
kReverse kReverse
}; };
DBIter(const std::string* dbname, Env* env, DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s,
const Comparator* cmp, Iterator* iter, SequenceNumber s) uint32_t seed)
: dbname_(dbname), : db_(db),
env_(env),
user_comparator_(cmp), user_comparator_(cmp),
iter_(iter), iter_(iter),
sequence_(s), sequence_(s),
direction_(kForward), direction_(kForward),
valid_(false) { valid_(false),
rnd_(seed),
bytes_counter_(RandomPeriod()) {
} }
virtual ~DBIter() { virtual ~DBIter() {
delete iter_; delete iter_;
@ -100,8 +103,12 @@ class DBIter: public Iterator {
} }
} }
const std::string* const dbname_; // Pick next gap with average value of config::kReadBytesPeriod.
Env* const env_; ssize_t RandomPeriod() {
return rnd_.Uniform(2*config::kReadBytesPeriod);
}
DBImpl* db_;
const Comparator* const user_comparator_; const Comparator* const user_comparator_;
Iterator* const iter_; Iterator* const iter_;
SequenceNumber const sequence_; SequenceNumber const sequence_;
@ -112,13 +119,23 @@ class DBIter: public Iterator {
Direction direction_; Direction direction_;
bool valid_; bool valid_;
Random rnd_;
ssize_t bytes_counter_;
// No copying allowed // No copying allowed
DBIter(const DBIter&); DBIter(const DBIter&);
void operator=(const DBIter&); void operator=(const DBIter&);
}; };
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
if (!ParseInternalKey(iter_->key(), ikey)) { Slice k = iter_->key();
ssize_t n = k.size() + iter_->value().size();
bytes_counter_ -= n;
while (bytes_counter_ < 0) {
bytes_counter_ += RandomPeriod();
db_->RecordReadSample(k);
}
if (!ParseInternalKey(k, ikey)) {
status_ = Status::Corruption("corrupted internal key in DBIter"); status_ = Status::Corruption("corrupted internal key in DBIter");
return false; return false;
} else { } else {
@ -288,12 +305,12 @@ void DBIter::SeekToLast() {
} // anonymous namespace } // anonymous namespace
Iterator* NewDBIterator( Iterator* NewDBIterator(
const std::string* dbname, DBImpl* db,
Env* env,
const Comparator* user_key_comparator, const Comparator* user_key_comparator,
Iterator* internal_iter, Iterator* internal_iter,
const SequenceNumber& sequence) { SequenceNumber sequence,
return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence); uint32_t seed) {
return new DBIter(db, user_key_comparator, internal_iter, sequence, seed);
} }
} // namespace leveldb } // namespace leveldb

8
db/db_iter.h

@ -11,15 +11,17 @@
namespace leveldb { namespace leveldb {
class DBImpl;
// Return a new iterator that converts internal keys (yielded by // Return a new iterator that converts internal keys (yielded by
// "*internal_iter") that were live at the specified "sequence" number // "*internal_iter") that were live at the specified "sequence" number
// into appropriate user keys. // into appropriate user keys.
extern Iterator* NewDBIterator( extern Iterator* NewDBIterator(
const std::string* dbname, DBImpl* db,
Env* env,
const Comparator* user_key_comparator, const Comparator* user_key_comparator,
Iterator* internal_iter, Iterator* internal_iter,
const SequenceNumber& sequence); SequenceNumber sequence,
uint32_t seed);
} // namespace leveldb } // namespace leveldb

3
db/dbformat.h

@ -38,6 +38,9 @@ static const int kL0_StopWritesTrigger = 12;
// space if the same key space is being repeatedly overwritten. // space if the same key space is being repeatedly overwritten.
static const int kMaxMemCompactLevel = 2; static const int kMaxMemCompactLevel = 2;
// Approximate gap in bytes between samples of data read during iteration.
static const int kReadBytesPeriod = 1048576;
} // namespace config } // namespace config
class InternalKey; class InternalKey;

88
db/version_set.cc

@ -289,6 +289,51 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number; return a->number > b->number;
} }
void Version::ForEachOverlapping(Slice user_key, Slice internal_key,
void* arg,
bool (*func)(void*, int, FileMetaData*)) {
// TODO(sanjay): Change Version::Get() to use this function.
const Comparator* ucmp = vset_->icmp_.user_comparator();
// Search level-0 in order from newest to oldest.
std::vector<FileMetaData*> tmp;
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++) {
FileMetaData* f = files_[0][i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (!tmp.empty()) {
std::sort(tmp.begin(), tmp.end(), NewestFirst);
for (uint32_t i = 0; i < tmp.size(); i++) {
if (!(*func)(arg, 0, tmp[i])) {
return;
}
}
}
// Search other levels.
for (int level = 1; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;
// Binary search to find earliest index whose largest key >= internal_key.
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files) {
FileMetaData* f = files_[level][index];
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
// All of "f" is past any data for user_key
} else {
if (!(*func)(arg, level, f)) {
return;
}
}
}
}
}
Status Version::Get(const ReadOptions& options, Status Version::Get(const ReadOptions& options,
const LookupKey& k, const LookupKey& k,
std::string* value, std::string* value,
@ -401,6 +446,44 @@ bool Version::UpdateStats(const GetStats& stats) {
return false; return false;
} }
bool Version::RecordReadSample(Slice internal_key) {
ParsedInternalKey ikey;
if (!ParseInternalKey(internal_key, &ikey)) {
return false;
}
struct State {
GetStats stats; // Holds first matching file
int matches;
static bool Match(void* arg, int level, FileMetaData* f) {
State* state = reinterpret_cast<State*>(arg);
state->matches++;
if (state->matches == 1) {
// Remember first match.
state->stats.seek_file = f;
state->stats.seek_file_level = level;
}
// We can stop iterating once we have a second match.
return state->matches < 2;
}
};
State state;
state.matches = 0;
ForEachOverlapping(ikey.user_key, internal_key, &state, &State::Match);
// Must have at least two matches since we want to merge across
// files. But what if we have a single file that contains many
// overwrites and deletions? Should we have another mechanism for
// finding such files?
if (state.matches >= 2) {
// 1MB cost is about 1 seek (see comment in Builder::Apply).
return UpdateStats(state.stats);
}
return false;
}
void Version::Ref() { void Version::Ref() {
++refs_; ++refs_;
} }
@ -435,11 +518,14 @@ int Version::PickLevelForMemTableOutput(
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) { if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break; break;
} }
if (level + 2 < config::kNumLevels) {
// Check that file does not overlap too many grandparent bytes.
GetOverlappingInputs(level + 2, &start, &limit, &overlaps); GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const int64_t sum = TotalFileSize(overlaps); const int64_t sum = TotalFileSize(overlaps);
if (sum > kMaxGrandParentOverlapBytes) { if (sum > kMaxGrandParentOverlapBytes) {
break; break;
} }
}
level++; level++;
} }
} }
@ -452,6 +538,8 @@ void Version::GetOverlappingInputs(
const InternalKey* begin, const InternalKey* begin,
const InternalKey* end, const InternalKey* end,
std::vector<FileMetaData*>* inputs) { std::vector<FileMetaData*>* inputs) {
assert(level >= 0);
assert(level < config::kNumLevels);
inputs->clear(); inputs->clear();
Slice user_begin, user_end; Slice user_begin, user_end;
if (begin != NULL) { if (begin != NULL) {

15
db/version_set.h

@ -78,6 +78,12 @@ class Version {
// REQUIRES: lock is held // REQUIRES: lock is held
bool UpdateStats(const GetStats& stats); bool UpdateStats(const GetStats& stats);
// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes. Returns true if a new compaction may need to be triggered.
// REQUIRES: lock is held
bool RecordReadSample(Slice key);
// Reference count management (so Versions do not disappear out from // Reference count management (so Versions do not disappear out from
// under live iterators) // under live iterators)
void Ref(); void Ref();
@ -114,6 +120,15 @@ class Version {
class LevelFileNumIterator; class LevelFileNumIterator;
Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const; Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;
// Call func(arg, level, f) for every file that overlaps user_key in
// order from newest to oldest. If an invocation of func returns
// false, makes no more calls.
//
// REQUIRES: user portion of internal_key == user_key.
void ForEachOverlapping(Slice user_key, Slice internal_key,
void* arg,
bool (*func)(void*, int, FileMetaData*));
VersionSet* vset_; // VersionSet to which this Version belongs VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list Version* prev_; // Previous version in linked list

2
include/leveldb/db.h

@ -14,7 +14,7 @@ namespace leveldb {
// Update Makefile if you change these // Update Makefile if you change these
static const int kMajorVersion = 1; static const int kMajorVersion = 1;
static const int kMinorVersion = 12; static const int kMinorVersion = 13;
struct Options; struct Options;
struct ReadOptions; struct ReadOptions;

33
util/env_posix.cc

@ -320,8 +320,39 @@ class PosixMmapFile : public WritableFile {
return Status::OK(); return Status::OK();
} }
virtual Status Sync() { Status SyncDirIfManifest() {
const char* f = filename_.c_str();
const char* sep = strrchr(f, '/');
Slice basename;
std::string dir;
if (sep == NULL) {
dir = ".";
basename = f;
} else {
dir = std::string(f, sep - f);
basename = sep + 1;
}
Status s; Status s;
if (basename.starts_with("MANIFEST")) {
int fd = open(dir.c_str(), O_RDONLY);
if (fd < 0) {
s = IOError(dir, errno);
} else {
if (fsync(fd) < 0) {
s = IOError(dir, errno);
}
close(fd);
}
}
return s;
}
virtual Status Sync() {
// Ensure new files referred to by the manifest are in the filesystem.
Status s = SyncDirIfManifest();
if (!s.ok()) {
return s;
}
if (pending_sync_) { if (pending_sync_) {
// Some unmapped data was not synced // Some unmapped data was not synced

7
util/random.h

@ -16,7 +16,12 @@ class Random {
private: private:
uint32_t seed_; uint32_t seed_;
public: public:
explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) { } explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) {
// Avoid bad seeds.
if (seed_ == 0 || seed_ == 2147483647L) {
seed_ = 1;
}
}
uint32_t Next() { uint32_t Next() {
static const uint32_t M = 2147483647L; // 2^31-1 static const uint32_t M = 2147483647L; // 2^31-1
static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0 static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0

Loading…
Cancel
Save