Browse Source

Apply adjacent changes within single transaction

PR #18635.
adaptive-webui-19844
Vladimir Golovnev 2 years ago committed by GitHub
parent
commit
ee6f699b48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 323
      src/base/bittorrent/dbresumedatastorage.cpp

323
src/base/bittorrent/dbresumedatastorage.cpp

@ -1,6 +1,6 @@
/* /*
* Bittorrent Client using Qt and libtorrent. * Bittorrent Client using Qt and libtorrent.
* Copyright (C) 2021-2022 Vladimir Golovnev <glassez@yandex.ru> * Copyright (C) 2021-2023 Vladimir Golovnev <glassez@yandex.ru>
* *
* This program is free software; you can redistribute it and/or * This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License * modify it under the terms of the GNU General Public License
@ -28,6 +28,8 @@
#include "dbresumedatastorage.h" #include "dbresumedatastorage.h"
#include <memory>
#include <queue>
#include <utility> #include <utility>
#include <libtorrent/bdecode.hpp> #include <libtorrent/bdecode.hpp>
@ -38,7 +40,9 @@
#include <libtorrent/write_resume_data.hpp> #include <libtorrent/write_resume_data.hpp>
#include <QByteArray> #include <QByteArray>
#include <QDebug>
#include <QFile> #include <QFile>
#include <QMutex>
#include <QSet> #include <QSet>
#include <QSqlDatabase> #include <QSqlDatabase>
#include <QSqlError> #include <QSqlError>
@ -46,6 +50,7 @@
#include <QSqlRecord> #include <QSqlRecord>
#include <QThread> #include <QThread>
#include <QVector> #include <QVector>
#include <QWaitCondition>
#include "base/exceptions.h" #include "base/exceptions.h"
#include "base/global.h" #include "base/global.h"
@ -68,6 +73,46 @@ namespace
const QString META_VERSION = u"version"_qs; const QString META_VERSION = u"version"_qs;
using namespace BitTorrent;
class Job
{
public:
virtual ~Job() = default;
virtual void perform(QSqlDatabase db) = 0;
};
class StoreJob final : public Job
{
public:
StoreJob(const TorrentID &torrentID, const LoadTorrentParams &resumeData);
void perform(QSqlDatabase db) override;
private:
const TorrentID m_torrentID;
const LoadTorrentParams m_resumeData;
};
class RemoveJob final : public Job
{
public:
explicit RemoveJob(const TorrentID &torrentID);
void perform(QSqlDatabase db) override;
private:
const TorrentID m_torrentID;
};
class StoreQueueJob final : public Job
{
public:
explicit StoreQueueJob(const QVector<TorrentID> &queue);
void perform(QSqlDatabase db) override;
private:
const QVector<TorrentID> m_queue;
};
struct Column struct Column
{ {
QString name; QString name;
@ -167,32 +212,7 @@ namespace
{ {
return u"%1 %2"_qs.arg(quoted(column.name), QString::fromLatin1(definition)); return u"%1 %2"_qs.arg(quoted(column.name), QString::fromLatin1(definition));
} }
}
namespace BitTorrent
{
class DBResumeDataStorage::Worker final : public QObject
{
Q_DISABLE_COPY_MOVE(Worker)
public:
Worker(const Path &dbPath, const QString &dbConnectionName, QReadWriteLock &dbLock);
void openDatabase() const;
void closeDatabase() const;
void store(const TorrentID &id, const LoadTorrentParams &resumeData) const;
void remove(const TorrentID &id) const;
void storeQueue(const QVector<TorrentID> &queue) const;
private:
const Path m_path;
const QString m_connectionName;
QReadWriteLock &m_dbLock;
};
namespace
{
LoadTorrentParams parseQueryResultRow(const QSqlQuery &query) LoadTorrentParams parseQueryResultRow(const QSqlQuery &query)
{ {
LoadTorrentParams resumeData; LoadTorrentParams resumeData;
@ -234,7 +254,8 @@ namespace BitTorrent
p = lt::read_resume_data(resumeDataRoot, ec); p = lt::read_resume_data(resumeDataRoot, ec);
if (const QByteArray bencodedMetadata = query.value(DB_COLUMN_METADATA.name).toByteArray(); !bencodedMetadata.isEmpty()) if (const QByteArray bencodedMetadata = query.value(DB_COLUMN_METADATA.name).toByteArray()
; !bencodedMetadata.isEmpty())
{ {
const lt::bdecode_node torentInfoRoot = lt::bdecode(bencodedMetadata, ec); const lt::bdecode_node torentInfoRoot = lt::bdecode(bencodedMetadata, ec);
p.ti = std::make_shared<lt::torrent_info>(torentInfoRoot, ec); p.ti = std::make_shared<lt::torrent_info>(torentInfoRoot, ec);
@ -252,6 +273,34 @@ namespace BitTorrent
return resumeData; return resumeData;
} }
} }
namespace BitTorrent
{
class DBResumeDataStorage::Worker final : public QThread
{
Q_DISABLE_COPY_MOVE(Worker)
public:
Worker(const Path &dbPath, QReadWriteLock &dbLock);
void run() override;
void requestInterruption();
void store(const TorrentID &id, const LoadTorrentParams &resumeData);
void remove(const TorrentID &id);
void storeQueue(const QVector<TorrentID> &queue);
private:
void addJob(std::unique_ptr<Job> job);
const QString m_connectionName = u"ResumeDataStorageWorker"_qs;
const Path m_path;
QReadWriteLock &m_dbLock;
std::queue<std::unique_ptr<Job>> m_jobs;
QMutex m_jobsMutex;
QWaitCondition m_waitCondition;
};
} }
BitTorrent::DBResumeDataStorage::DBResumeDataStorage(const Path &dbPath, QObject *parent) BitTorrent::DBResumeDataStorage::DBResumeDataStorage(const Path &dbPath, QObject *parent)
@ -276,31 +325,14 @@ BitTorrent::DBResumeDataStorage::DBResumeDataStorage(const Path &dbPath, QObject
updateDB(dbVersion); updateDB(dbVersion);
} }
m_asyncWorker = new Worker(dbPath, u"ResumeDataStorageWorker"_qs, m_dbLock); m_asyncWorker = new Worker(dbPath, m_dbLock);
m_asyncWorker->moveToThread(m_ioThread.get()); m_asyncWorker->start();
connect(m_ioThread.get(), &QThread::finished, m_asyncWorker, &QObject::deleteLater);
m_ioThread->start();
RuntimeError *errPtr = nullptr;
QMetaObject::invokeMethod(m_asyncWorker, [this, &errPtr]()
{
try
{
m_asyncWorker->openDatabase();
}
catch (const RuntimeError &err)
{
errPtr = new RuntimeError(err);
}
}, Qt::BlockingQueuedConnection);
if (errPtr)
throw *errPtr;
} }
BitTorrent::DBResumeDataStorage::~DBResumeDataStorage() BitTorrent::DBResumeDataStorage::~DBResumeDataStorage()
{ {
QMetaObject::invokeMethod(m_asyncWorker, &Worker::closeDatabase); m_asyncWorker->requestInterruption();
m_asyncWorker->wait();
QSqlDatabase::removeDatabase(DB_CONNECTION_NAME); QSqlDatabase::removeDatabase(DB_CONNECTION_NAME);
} }
@ -352,27 +384,18 @@ BitTorrent::LoadResumeDataResult BitTorrent::DBResumeDataStorage::load(const Tor
} }
void BitTorrent::DBResumeDataStorage::store(const TorrentID &id, const LoadTorrentParams &resumeData) const void BitTorrent::DBResumeDataStorage::store(const TorrentID &id, const LoadTorrentParams &resumeData) const
{
QMetaObject::invokeMethod(m_asyncWorker, [this, id, resumeData]()
{ {
m_asyncWorker->store(id, resumeData); m_asyncWorker->store(id, resumeData);
});
} }
void BitTorrent::DBResumeDataStorage::remove(const BitTorrent::TorrentID &id) const void BitTorrent::DBResumeDataStorage::remove(const BitTorrent::TorrentID &id) const
{
QMetaObject::invokeMethod(m_asyncWorker, [this, id]()
{ {
m_asyncWorker->remove(id); m_asyncWorker->remove(id);
});
} }
void BitTorrent::DBResumeDataStorage::storeQueue(const QVector<TorrentID> &queue) const void BitTorrent::DBResumeDataStorage::storeQueue(const QVector<TorrentID> &queue) const
{
QMetaObject::invokeMethod(m_asyncWorker, [this, queue]()
{ {
m_asyncWorker->storeQueue(queue); m_asyncWorker->storeQueue(queue);
});
} }
void BitTorrent::DBResumeDataStorage::doLoadAll() const void BitTorrent::DBResumeDataStorage::doLoadAll() const
@ -614,33 +637,117 @@ void BitTorrent::DBResumeDataStorage::enableWALMode() const
throw RuntimeError(tr("WAL mode is probably unsupported due to filesystem limitations.")); throw RuntimeError(tr("WAL mode is probably unsupported due to filesystem limitations."));
} }
BitTorrent::DBResumeDataStorage::Worker::Worker(const Path &dbPath, const QString &dbConnectionName, QReadWriteLock &dbLock) BitTorrent::DBResumeDataStorage::Worker::Worker(const Path &dbPath, QReadWriteLock &dbLock)
: m_path {dbPath} : m_path {dbPath}
, m_connectionName {dbConnectionName}
, m_dbLock {dbLock} , m_dbLock {dbLock}
{ {
} }
void BitTorrent::DBResumeDataStorage::Worker::openDatabase() const void BitTorrent::DBResumeDataStorage::Worker::run()
{
{ {
auto db = QSqlDatabase::addDatabase(u"QSQLITE"_qs, m_connectionName); auto db = QSqlDatabase::addDatabase(u"QSQLITE"_qs, m_connectionName);
db.setDatabaseName(m_path.data()); db.setDatabaseName(m_path.data());
if (!db.open()) if (!db.open())
throw RuntimeError(db.lastError().text()); throw RuntimeError(db.lastError().text());
int64_t transactedJobsCount = 0;
while (true)
{
m_jobsMutex.lock();
if (m_jobs.empty())
{
if (transactedJobsCount > 0)
{
db.commit();
m_dbLock.unlock();
qDebug() << "Resume data changes are commited. Transacted jobs:" << transactedJobsCount;
transactedJobsCount = 0;
}
if (isInterruptionRequested())
{
m_jobsMutex.unlock();
break;
}
m_waitCondition.wait(&m_jobsMutex);
if (isInterruptionRequested())
{
m_jobsMutex.unlock();
break;
} }
void BitTorrent::DBResumeDataStorage::Worker::closeDatabase() const m_dbLock.lockForWrite();
if (!db.transaction())
{ {
LogMsg(tr("Couldn't begin transaction. Error: %1").arg(db.lastError().text()), Log::WARNING);
m_dbLock.unlock();
break;
}
}
std::unique_ptr<Job> job = std::move(m_jobs.front());
m_jobs.pop();
m_jobsMutex.unlock();
job->perform(db);
++transactedJobsCount;
}
db.close();
}
QSqlDatabase::removeDatabase(m_connectionName); QSqlDatabase::removeDatabase(m_connectionName);
} }
void BitTorrent::DBResumeDataStorage::Worker::store(const TorrentID &id, const LoadTorrentParams &resumeData) const void DBResumeDataStorage::Worker::requestInterruption()
{
m_waitCondition.wakeAll();
QThread::requestInterruption();
}
void BitTorrent::DBResumeDataStorage::Worker::store(const TorrentID &id, const LoadTorrentParams &resumeData)
{
addJob(std::make_unique<StoreJob>(id, resumeData));
}
void BitTorrent::DBResumeDataStorage::Worker::remove(const TorrentID &id)
{
addJob(std::make_unique<RemoveJob>(id));
}
void BitTorrent::DBResumeDataStorage::Worker::storeQueue(const QVector<TorrentID> &queue)
{
addJob(std::make_unique<StoreQueueJob>(queue));
}
void BitTorrent::DBResumeDataStorage::Worker::addJob(std::unique_ptr<Job> job)
{
m_jobsMutex.lock();
m_jobs.push(std::move(job));
m_jobsMutex.unlock();
m_waitCondition.wakeAll();
}
namespace
{
using namespace BitTorrent;
StoreJob::StoreJob(const TorrentID &torrentID, const LoadTorrentParams &resumeData)
: m_torrentID {torrentID}
, m_resumeData {resumeData}
{
}
void StoreJob::perform(QSqlDatabase db)
{ {
// We need to adjust native libtorrent resume data // We need to adjust native libtorrent resume data
lt::add_torrent_params p = resumeData.ltAddTorrentParams; lt::add_torrent_params p = m_resumeData.ltAddTorrentParams;
p.save_path = Profile::instance()->toPortablePath(Path(p.save_path)) p.save_path = Profile::instance()->toPortablePath(Path(p.save_path))
.toString().toStdString(); .toString().toStdString();
if (resumeData.stopped) if (m_resumeData.stopped)
{ {
p.flags |= lt::torrent_flags::paused; p.flags |= lt::torrent_flags::paused;
p.flags &= ~lt::torrent_flags::auto_managed; p.flags &= ~lt::torrent_flags::auto_managed;
@ -649,7 +756,7 @@ void BitTorrent::DBResumeDataStorage::Worker::store(const TorrentID &id, const L
{ {
// Torrent can be actually "running" but temporarily "paused" to perform some // Torrent can be actually "running" but temporarily "paused" to perform some
// service jobs behind the scenes so we need to restore it as "running" // service jobs behind the scenes so we need to restore it as "running"
if (resumeData.operatingMode == BitTorrent::TorrentOperatingMode::AutoManaged) if (m_resumeData.operatingMode == BitTorrent::TorrentOperatingMode::AutoManaged)
{ {
p.flags |= lt::torrent_flags::auto_managed; p.flags |= lt::torrent_flags::auto_managed;
} }
@ -698,7 +805,7 @@ void BitTorrent::DBResumeDataStorage::Worker::store(const TorrentID &id, const L
} }
catch (const std::exception &err) catch (const std::exception &err)
{ {
LogMsg(tr("Couldn't save torrent metadata. Error: %1.") LogMsg(ResumeDataStorage::tr("Couldn't save torrent metadata. Error: %1.")
.arg(QString::fromLocal8Bit(err.what())), Log::CRITICAL); .arg(QString::fromLocal8Bit(err.what())), Log::CRITICAL);
return; return;
} }
@ -712,7 +819,6 @@ void BitTorrent::DBResumeDataStorage::Worker::store(const TorrentID &id, const L
const QString insertTorrentStatement = makeInsertStatement(DB_TABLE_TORRENTS, columns) const QString insertTorrentStatement = makeInsertStatement(DB_TABLE_TORRENTS, columns)
+ makeOnConflictUpdateStatement(DB_COLUMN_TORRENT_ID, columns); + makeOnConflictUpdateStatement(DB_COLUMN_TORRENT_ID, columns);
auto db = QSqlDatabase::database(m_connectionName);
QSqlQuery query {db}; QSqlQuery query {db};
try try
@ -720,110 +826,99 @@ void BitTorrent::DBResumeDataStorage::Worker::store(const TorrentID &id, const L
if (!query.prepare(insertTorrentStatement)) if (!query.prepare(insertTorrentStatement))
throw RuntimeError(query.lastError().text()); throw RuntimeError(query.lastError().text());
query.bindValue(DB_COLUMN_TORRENT_ID.placeholder, id.toString()); query.bindValue(DB_COLUMN_TORRENT_ID.placeholder, m_torrentID.toString());
query.bindValue(DB_COLUMN_NAME.placeholder, resumeData.name); query.bindValue(DB_COLUMN_NAME.placeholder, m_resumeData.name);
query.bindValue(DB_COLUMN_CATEGORY.placeholder, resumeData.category); query.bindValue(DB_COLUMN_CATEGORY.placeholder, m_resumeData.category);
query.bindValue(DB_COLUMN_TAGS.placeholder, (resumeData.tags.isEmpty() query.bindValue(DB_COLUMN_TAGS.placeholder, (m_resumeData.tags.isEmpty()
? QVariant(QVariant::String) : resumeData.tags.join(u","_qs))); ? QVariant(QVariant::String) : m_resumeData.tags.join(u","_qs)));
query.bindValue(DB_COLUMN_CONTENT_LAYOUT.placeholder, Utils::String::fromEnum(resumeData.contentLayout)); query.bindValue(DB_COLUMN_CONTENT_LAYOUT.placeholder, Utils::String::fromEnum(m_resumeData.contentLayout));
query.bindValue(DB_COLUMN_RATIO_LIMIT.placeholder, static_cast<int>(resumeData.ratioLimit * 1000)); query.bindValue(DB_COLUMN_RATIO_LIMIT.placeholder, static_cast<int>(m_resumeData.ratioLimit * 1000));
query.bindValue(DB_COLUMN_SEEDING_TIME_LIMIT.placeholder, resumeData.seedingTimeLimit); query.bindValue(DB_COLUMN_SEEDING_TIME_LIMIT.placeholder, m_resumeData.seedingTimeLimit);
query.bindValue(DB_COLUMN_HAS_OUTER_PIECES_PRIORITY.placeholder, resumeData.firstLastPiecePriority); query.bindValue(DB_COLUMN_HAS_OUTER_PIECES_PRIORITY.placeholder, m_resumeData.firstLastPiecePriority);
query.bindValue(DB_COLUMN_HAS_SEED_STATUS.placeholder, resumeData.hasFinishedStatus); query.bindValue(DB_COLUMN_HAS_SEED_STATUS.placeholder, m_resumeData.hasFinishedStatus);
query.bindValue(DB_COLUMN_OPERATING_MODE.placeholder, Utils::String::fromEnum(resumeData.operatingMode)); query.bindValue(DB_COLUMN_OPERATING_MODE.placeholder, Utils::String::fromEnum(m_resumeData.operatingMode));
query.bindValue(DB_COLUMN_STOPPED.placeholder, resumeData.stopped); query.bindValue(DB_COLUMN_STOPPED.placeholder, m_resumeData.stopped);
query.bindValue(DB_COLUMN_STOP_CONDITION.placeholder, Utils::String::fromEnum(resumeData.stopCondition)); query.bindValue(DB_COLUMN_STOP_CONDITION.placeholder, Utils::String::fromEnum(m_resumeData.stopCondition));
if (!resumeData.useAutoTMM) if (!m_resumeData.useAutoTMM)
{ {
query.bindValue(DB_COLUMN_TARGET_SAVE_PATH.placeholder, Profile::instance()->toPortablePath(resumeData.savePath).data()); query.bindValue(DB_COLUMN_TARGET_SAVE_PATH.placeholder, Profile::instance()->toPortablePath(m_resumeData.savePath).data());
query.bindValue(DB_COLUMN_DOWNLOAD_PATH.placeholder, Profile::instance()->toPortablePath(resumeData.downloadPath).data()); query.bindValue(DB_COLUMN_DOWNLOAD_PATH.placeholder, Profile::instance()->toPortablePath(m_resumeData.downloadPath).data());
} }
query.bindValue(DB_COLUMN_RESUMEDATA.placeholder, bencodedResumeData); query.bindValue(DB_COLUMN_RESUMEDATA.placeholder, bencodedResumeData);
if (!bencodedMetadata.isEmpty()) if (!bencodedMetadata.isEmpty())
query.bindValue(DB_COLUMN_METADATA.placeholder, bencodedMetadata); query.bindValue(DB_COLUMN_METADATA.placeholder, bencodedMetadata);
const QWriteLocker locker {&m_dbLock};
if (!query.exec()) if (!query.exec())
throw RuntimeError(query.lastError().text()); throw RuntimeError(query.lastError().text());
} }
catch (const RuntimeError &err) catch (const RuntimeError &err)
{ {
LogMsg(tr("Couldn't store resume data for torrent '%1'. Error: %2") LogMsg(ResumeDataStorage::tr("Couldn't store resume data for torrent '%1'. Error: %2")
.arg(id.toString(), err.message()), Log::CRITICAL); .arg(m_torrentID.toString(), err.message()), Log::CRITICAL);
}
} }
RemoveJob::RemoveJob(const TorrentID &torrentID)
: m_torrentID {torrentID}
{
} }
void BitTorrent::DBResumeDataStorage::Worker::remove(const TorrentID &id) const void RemoveJob::perform(QSqlDatabase db)
{ {
const auto deleteTorrentStatement = u"DELETE FROM %1 WHERE %2 = %3;"_qs const auto deleteTorrentStatement = u"DELETE FROM %1 WHERE %2 = %3;"_qs
.arg(quoted(DB_TABLE_TORRENTS), quoted(DB_COLUMN_TORRENT_ID.name), DB_COLUMN_TORRENT_ID.placeholder); .arg(quoted(DB_TABLE_TORRENTS), quoted(DB_COLUMN_TORRENT_ID.name), DB_COLUMN_TORRENT_ID.placeholder);
auto db = QSqlDatabase::database(m_connectionName);
QSqlQuery query {db}; QSqlQuery query {db};
try try
{ {
if (!query.prepare(deleteTorrentStatement)) if (!query.prepare(deleteTorrentStatement))
throw RuntimeError(query.lastError().text()); throw RuntimeError(query.lastError().text());
query.bindValue(DB_COLUMN_TORRENT_ID.placeholder, id.toString()); query.bindValue(DB_COLUMN_TORRENT_ID.placeholder, m_torrentID.toString());
const QWriteLocker locker {&m_dbLock};
if (!query.exec()) if (!query.exec())
throw RuntimeError(query.lastError().text()); throw RuntimeError(query.lastError().text());
} }
catch (const RuntimeError &err) catch (const RuntimeError &err)
{ {
LogMsg(tr("Couldn't delete resume data of torrent '%1'. Error: %2") LogMsg(ResumeDataStorage::tr("Couldn't delete resume data of torrent '%1'. Error: %2")
.arg(id.toString(), err.message()), Log::CRITICAL); .arg(m_torrentID.toString(), err.message()), Log::CRITICAL);
} }
} }
void BitTorrent::DBResumeDataStorage::Worker::storeQueue(const QVector<TorrentID> &queue) const StoreQueueJob::StoreQueueJob(const QVector<TorrentID> &queue)
: m_queue {queue}
{
}
void StoreQueueJob::perform(QSqlDatabase db)
{ {
const auto updateQueuePosStatement = u"UPDATE %1 SET %2 = %3 WHERE %4 = %5;"_qs const auto updateQueuePosStatement = u"UPDATE %1 SET %2 = %3 WHERE %4 = %5;"_qs
.arg(quoted(DB_TABLE_TORRENTS), quoted(DB_COLUMN_QUEUE_POSITION.name), DB_COLUMN_QUEUE_POSITION.placeholder .arg(quoted(DB_TABLE_TORRENTS), quoted(DB_COLUMN_QUEUE_POSITION.name), DB_COLUMN_QUEUE_POSITION.placeholder
, quoted(DB_COLUMN_TORRENT_ID.name), DB_COLUMN_TORRENT_ID.placeholder); , quoted(DB_COLUMN_TORRENT_ID.name), DB_COLUMN_TORRENT_ID.placeholder);
auto db = QSqlDatabase::database(m_connectionName);
try try
{ {
const QWriteLocker locker {&m_dbLock};
if (!db.transaction())
throw RuntimeError(db.lastError().text());
QSqlQuery query {db}; QSqlQuery query {db};
try
{
if (!query.prepare(updateQueuePosStatement)) if (!query.prepare(updateQueuePosStatement))
throw RuntimeError(query.lastError().text()); throw RuntimeError(query.lastError().text());
int pos = 0; int pos = 0;
for (const TorrentID &torrentID : queue) for (const TorrentID &torrentID : m_queue)
{ {
query.bindValue(DB_COLUMN_TORRENT_ID.placeholder, torrentID.toString()); query.bindValue(DB_COLUMN_TORRENT_ID.placeholder, torrentID.toString());
query.bindValue(DB_COLUMN_QUEUE_POSITION.placeholder, pos++); query.bindValue(DB_COLUMN_QUEUE_POSITION.placeholder, pos++);
if (!query.exec()) if (!query.exec())
throw RuntimeError(query.lastError().text()); throw RuntimeError(query.lastError().text());
} }
if (!db.commit())
throw RuntimeError(db.lastError().text());
}
catch (const RuntimeError &)
{
db.rollback();
throw;
}
} }
catch (const RuntimeError &err) catch (const RuntimeError &err)
{ {
LogMsg(tr("Couldn't store torrents queue positions. Error: %1") LogMsg(ResumeDataStorage::tr("Couldn't store torrents queue positions. Error: %1")
.arg(err.message()), Log::CRITICAL); .arg(err.message()), Log::CRITICAL);
} }
} }
}

Loading…
Cancel
Save