use leveldb for read/write of pieces

This commit is contained in:
miguel 2013-08-18 14:44:49 -03:00
parent 0b69a1c5f3
commit bfc461a2dc

View File

@ -573,27 +573,20 @@ namespace libtorrent
int default_storage::writev(file::iovec_t const* bufs, int slot, int offset
, int num_bufs, int flags)
{
#ifdef TORRENT_DISK_STATS
disk_buffer_pool* pool = disk_pool();
if (pool)
{
pool->m_disk_access_log << log_time() << " write "
<< physical_offset(slot, offset) << std::endl;
}
#endif
fileop op = { &file::writev, &default_storage::write_unaligned
, m_settings ? settings().disk_io_write_mode : 0, file::read_write | flags };
#ifdef TORRENT_DISK_STATS
int ret = readwritev(bufs, slot, offset, num_bufs, op);
if (pool)
{
pool->m_disk_access_log << log_time() << " write_end "
<< (physical_offset(slot, offset) + ret) << std::endl;
}
return ret;
#else
return readwritev(bufs, slot, offset, num_bufs, op);
#endif
TORRENT_ASSERT(bufs != 0);
TORRENT_ASSERT(slot >= 0);
TORRENT_ASSERT(slot < m_files.num_pieces());
TORRENT_ASSERT(num_bufs == 1);
TORRENT_ASSERT(offset == 0);
std::string postStr(bufs[0].iov_base, bufs[0].iov_len);
if( Write(std::make_pair('p', slot), postStr) ) {
return postStr.size();
} else {
return -1;
}
}
size_type default_storage::physical_offset(int slot, int offset)
@ -614,296 +607,20 @@ namespace libtorrent
int default_storage::readv(file::iovec_t const* bufs, int slot, int offset
, int num_bufs, int flags)
{
#ifdef TORRENT_DISK_STATS
disk_buffer_pool* pool = disk_pool();
if (pool)
{
pool->m_disk_access_log << log_time() << " read "
<< physical_offset(slot, offset) << std::endl;
}
#endif
fileop op = { &file::readv, &default_storage::read_unaligned
, m_settings ? settings().disk_io_read_mode : 0, file::read_only | flags };
#ifdef TORRENT_SIMULATE_SLOW_READ
boost::thread::sleep(boost::get_system_time()
+ boost::posix_time::milliseconds(1000));
#endif
#ifdef TORRENT_DISK_STATS
int ret = readwritev(bufs, slot, offset, num_bufs, op);
if (pool)
{
pool->m_disk_access_log << log_time() << " read_end "
<< (physical_offset(slot, offset) + ret) << std::endl;
}
return ret;
#else
return readwritev(bufs, slot, offset, num_bufs, op);
#endif
}
TORRENT_ASSERT(bufs != 0);
TORRENT_ASSERT(slot >= 0);
TORRENT_ASSERT(slot < m_files.num_pieces());
TORRENT_ASSERT(num_bufs == 1);
TORRENT_ASSERT(offset == 0);
// much of what needs to be done when reading and writing
// is buffer management and piece to file mapping. Most
// of that is the same for reading and writing. This function
// is a template, and the fileop decides what to do with the
// file and the buffers.
int default_storage::readwritev(file::iovec_t const* bufs, int slot, int offset
, int num_bufs, fileop const& op)
{
TORRENT_ASSERT(bufs != 0);
TORRENT_ASSERT(slot >= 0);
TORRENT_ASSERT(slot < m_files.num_pieces());
TORRENT_ASSERT(offset >= 0);
TORRENT_ASSERT(offset < m_files.piece_size(slot));
TORRENT_ASSERT(num_bufs > 0);
int size = bufs_size(bufs, num_bufs);
TORRENT_ASSERT(size > 0);
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
std::vector<file_slice> slices
= files().map_block(slot, offset, size);
TORRENT_ASSERT(!slices.empty());
#endif
size_type start = slot * (size_type)m_files.piece_length() + offset;
TORRENT_ASSERT(start + size <= m_files.total_size());
// find the file iterator and file offset
file_storage::iterator file_iter = files().file_at_offset(start);
TORRENT_ASSERT(file_iter != files().end());
TORRENT_ASSERT(start >= files().file_offset(*file_iter));
TORRENT_ASSERT(start < files().file_offset(*file_iter) + files().file_size(*file_iter));
size_type file_offset = start - files().file_offset(*file_iter);
int buf_pos = 0;
error_code ec;
boost::intrusive_ptr<file> file_handle;
int bytes_left = size;
int slot_size = static_cast<int>(m_files.piece_size(slot));
if (offset + bytes_left > slot_size)
bytes_left = slot_size - offset;
TORRENT_ASSERT(bytes_left >= 0);
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
int counter = 0;
#endif
file::iovec_t* tmp_bufs = TORRENT_ALLOCA(file::iovec_t, num_bufs);
file::iovec_t* current_buf = TORRENT_ALLOCA(file::iovec_t, num_bufs);
copy_bufs(bufs, size, current_buf);
TORRENT_ASSERT(count_bufs(current_buf, size) == num_bufs);
int file_bytes_left;
for (;bytes_left > 0; ++file_iter, bytes_left -= file_bytes_left
, buf_pos += file_bytes_left)
{
TORRENT_ASSERT(file_iter != files().end());
TORRENT_ASSERT(buf_pos >= 0);
file_bytes_left = bytes_left;
if (file_offset + file_bytes_left > file_iter->size)
file_bytes_left = (std::max)(static_cast<int>(file_iter->size - file_offset), 0);
if (file_bytes_left == 0) continue;
#if defined TORRENT_DEBUG || TORRENT_RELEASE_ASSERTS
TORRENT_ASSERT(int(slices.size()) > counter);
size_type slice_size = slices[counter].size;
TORRENT_ASSERT(slice_size == file_bytes_left);
TORRENT_ASSERT((files().begin() + slices[counter].file_index)
== file_iter);
++counter;
#endif
if (file_iter->pad_file)
{
if ((op.mode & file::rw_mask) == file::read_only)
{
int num_tmp_bufs = copy_bufs(current_buf, file_bytes_left, tmp_bufs);
TORRENT_ASSERT(count_bufs(tmp_bufs, file_bytes_left) == num_tmp_bufs);
TORRENT_ASSERT(num_tmp_bufs <= num_bufs);
clear_bufs(tmp_bufs, num_tmp_bufs);
}
advance_bufs(current_buf, file_bytes_left);
TORRENT_ASSERT(count_bufs(current_buf, bytes_left - file_bytes_left) <= num_bufs);
file_offset = 0;
continue;
}
error_code ec;
file_handle = open_file(file_iter, op.mode, ec);
if (((op.mode & file::rw_mask) == file::read_write) && ec == boost::system::errc::no_such_file_or_directory)
{
// this means the directory the file is in doesn't exist.
// so create it
ec.clear();
std::string path = files().file_path(*file_iter, m_save_path);
create_directories(parent_path(path), ec);
// if the directory creation failed, don't try to open the file again
// but actually just fail
if (!ec) file_handle = open_file(file_iter, op.mode, ec);
}
if (!file_handle || ec)
{
std::string path = files().file_path(*file_iter, m_save_path);
TORRENT_ASSERT(ec);
set_error(path, ec);
return -1;
}
int num_tmp_bufs = copy_bufs(current_buf, file_bytes_left, tmp_bufs);
TORRENT_ASSERT(count_bufs(tmp_bufs, file_bytes_left) == num_tmp_bufs);
TORRENT_ASSERT(num_tmp_bufs <= num_bufs);
int bytes_transferred = 0;
// if the file is opened in no_buffer mode, and the
// read is unaligned, we need to fall back on a slow
// special read that reads aligned buffers and copies
// it into the one supplied
size_type adjusted_offset = files().file_base(*file_iter) + file_offset;
if ((file_handle->open_mode() & file::no_buffer)
&& ((adjusted_offset & (file_handle->pos_alignment()-1)) != 0
|| (uintptr_t(tmp_bufs->iov_base) & (file_handle->buf_alignment()-1)) != 0))
{
bytes_transferred = (int)(this->*op.unaligned_op)(file_handle, adjusted_offset
, tmp_bufs, num_tmp_bufs, ec);
if ((op.mode & file::rw_mask) == file::read_write
&& adjusted_offset + bytes_transferred >= file_iter->size
&& (file_handle->pos_alignment() > 0 || file_handle->size_alignment() > 0))
{
// we were writing, and we just wrote the last block of the file
// we likely wrote a bit too much, since we're restricted to
// a specific alignment for writes. Make sure to truncate the size
// TODO: 0 what if file_base is used to merge several virtual files
// into a single physical file? We should probably disable this
// if file_base is used. This is not a widely used feature though
file_handle->set_size(file_iter->size, ec);
}
}
else
{
bytes_transferred = (int)((*file_handle).*op.regular_op)(adjusted_offset
, tmp_bufs, num_tmp_bufs, ec);
TORRENT_ASSERT(bytes_transferred <= bufs_size(tmp_bufs, num_tmp_bufs));
}
file_offset = 0;
if (ec)
{
set_error(files().file_path(*file_iter, m_save_path), ec);
return -1;
}
if (file_bytes_left != bytes_transferred)
return bytes_transferred;
advance_bufs(current_buf, bytes_transferred);
TORRENT_ASSERT(count_bufs(current_buf, bytes_left - file_bytes_left) <= num_bufs);
}
return size;
}
// these functions are inefficient, but should be fairly uncommon. The read
// case happens if unaligned files are opened in no_buffer mode or if clients
// makes unaligned requests (and the disk cache is disabled or fully utilized
// for write cache).
// they read an unaligned buffer from a file that requires aligned access
size_type default_storage::read_unaligned(boost::intrusive_ptr<file> const& file_handle
, size_type file_offset, file::iovec_t const* bufs, int num_bufs, error_code& ec)
{
const int pos_align = file_handle->pos_alignment()-1;
const int size_align = file_handle->size_alignment()-1;
const int size = bufs_size(bufs, num_bufs);
const int start_adjust = file_offset & pos_align;
TORRENT_ASSERT(start_adjust == (file_offset % file_handle->pos_alignment()));
const size_type aligned_start = file_offset - start_adjust;
const int aligned_size = ((size+start_adjust) & size_align)
? ((size+start_adjust) & ~size_align) + size_align + 1 : size + start_adjust;
TORRENT_ASSERT((aligned_size & size_align) == 0);
// allocate a temporary, aligned, buffer
aligned_holder aligned_buf(aligned_size);
file::iovec_t b = {aligned_buf.get(), size_t(aligned_size) };
size_type ret = file_handle->readv(aligned_start, &b, 1, ec);
if (ret < 0)
{
TORRENT_ASSERT(ec);
return ret;
}
if (ret - start_adjust < size) return (std::max)(ret - start_adjust, size_type(0));
char* read_buf = aligned_buf.get() + start_adjust;
for (file::iovec_t const* i = bufs, *end(bufs + num_bufs); i != end; ++i)
{
memcpy(i->iov_base, read_buf, i->iov_len);
read_buf += i->iov_len;
}
return size;
}
// this is the really expensive one. To write unaligned, we need to read
// an aligned block, overlay the unaligned buffer, and then write it back
size_type default_storage::write_unaligned(boost::intrusive_ptr<file> const& file_handle
, size_type file_offset, file::iovec_t const* bufs, int num_bufs, error_code& ec)
{
const int pos_align = file_handle->pos_alignment()-1;
const int size_align = file_handle->size_alignment()-1;
const int size = bufs_size(bufs, num_bufs);
const int start_adjust = file_offset & pos_align;
TORRENT_ASSERT(start_adjust == (file_offset % file_handle->pos_alignment()));
const size_type aligned_start = file_offset - start_adjust;
const int aligned_size = ((size+start_adjust) & size_align)
? ((size+start_adjust) & ~size_align) + size_align + 1 : size + start_adjust;
TORRENT_ASSERT((aligned_size & size_align) == 0);
size_type actual_file_size = file_handle->get_size(ec);
if (ec && ec != make_error_code(boost::system::errc::no_such_file_or_directory)) return -1;
ec.clear();
// allocate a temporary, aligned, buffer
aligned_holder aligned_buf(aligned_size);
file::iovec_t b = {aligned_buf.get(), size_t(aligned_size) };
// we have something to read
if (aligned_start < actual_file_size && !ec)
{
size_type ret = file_handle->readv(aligned_start, &b, 1, ec);
if (ec
#ifdef TORRENT_WINDOWS
&& ec != error_code(ERROR_HANDLE_EOF, get_system_category())
#endif
)
return ret;
}
ec.clear();
// OK, we read the portion of the file. Now, overlay the buffer we're writing
char* write_buf = aligned_buf.get() + start_adjust;
for (file::iovec_t const* i = bufs, *end(bufs + num_bufs); i != end; ++i)
{
memcpy(write_buf, i->iov_base, i->iov_len);
write_buf += i->iov_len;
}
// write the buffer back to disk
size_type ret = file_handle->writev(aligned_start, &b, 1, ec);
if (ret < 0)
{
TORRENT_ASSERT(ec);
return ret;
}
if (ret - start_adjust < size) return (std::max)(ret - start_adjust, size_type(0));
return size;
std::string postStr;
if( Read(std::make_pair('p', slot), postStr) ) {
TORRENT_ASSERT(bufs[0].iov_len >= postStr.size());
memcpy(bufs[0].iov_base, postStr.data(), postStr.size());
return postStr.size();
} else {
return 0;
}
}
int default_storage::write(
@ -1323,7 +1040,7 @@ namespace libtorrent
// we may not have a slot for this piece yet.
// assume there is no re-mapping of slots
if (slot < 0) slot = piece_index;
return m_storage->physical_offset(slot, offset);
return m_storage->physical_offset(slot, offset);
}
int piece_manager::check_no_fastresume(error_code& error)