Browse Source

Merge pull request #17996 from glassez/non-blocking

Perform some unavoidable blocking calls asynchronously
adaptive-webui-19844
Vladimir Golovnev 2 years ago committed by GitHub
parent
commit
f54b66eb75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      src/base/bittorrent/nativesessionextension.cpp
  2. 16
      src/base/bittorrent/nativesessionextension.h
  3. 82
      src/base/bittorrent/portforwarderimpl.cpp
  4. 20
      src/base/bittorrent/portforwarderimpl.h
  5. 142
      src/base/bittorrent/sessionimpl.cpp
  6. 18
      src/base/bittorrent/sessionimpl.h

20
src/base/bittorrent/nativesessionextension.cpp

@ -62,6 +62,17 @@ namespace
} }
} }
bool NativeSessionExtension::isSessionListening() const
{
const QReadLocker locker {&m_lock};
return m_isSesssionListening;
}
void NativeSessionExtension::added(const lt::session_handle &nativeSession)
{
m_nativeSession = nativeSession;
}
lt::feature_flags_t NativeSessionExtension::implemented_features() lt::feature_flags_t NativeSessionExtension::implemented_features()
{ {
return alert_feature; return alert_feature;
@ -76,6 +87,9 @@ void NativeSessionExtension::on_alert(const lt::alert *alert)
{ {
switch (alert->type()) switch (alert->type())
{ {
case lt::session_stats_alert::alert_type:
handleSessionStatsAlert(static_cast<const lt::session_stats_alert *>(alert));
break;
case lt::add_torrent_alert::alert_type: case lt::add_torrent_alert::alert_type:
handleAddTorrentAlert(static_cast<const lt::add_torrent_alert *>(alert)); handleAddTorrentAlert(static_cast<const lt::add_torrent_alert *>(alert));
break; break;
@ -86,3 +100,9 @@ void NativeSessionExtension::on_alert(const lt::alert *alert)
break; break;
} }
} }
void NativeSessionExtension::handleSessionStatsAlert([[maybe_unused]] const lt::session_stats_alert *alert)
{
const QWriteLocker locker {&m_lock};
m_isSesssionListening = m_nativeSession.is_listening();
}

16
src/base/bittorrent/nativesessionextension.h

@ -29,12 +29,28 @@
#pragma once #pragma once
#include <libtorrent/extensions.hpp> #include <libtorrent/extensions.hpp>
#include <libtorrent/fwd.hpp>
#include <libtorrent/session_handle.hpp>
#include <QReadWriteLock>
#include "extensiondata.h" #include "extensiondata.h"
class NativeSessionExtension final : public lt::plugin class NativeSessionExtension final : public lt::plugin
{ {
public:
bool isSessionListening() const;
private:
void added(const lt::session_handle &nativeSession) override;
lt::feature_flags_t implemented_features() override; lt::feature_flags_t implemented_features() override;
std::shared_ptr<lt::torrent_plugin> new_torrent(const lt::torrent_handle &torrentHandle, LTClientData clientData) override; std::shared_ptr<lt::torrent_plugin> new_torrent(const lt::torrent_handle &torrentHandle, LTClientData clientData) override;
void on_alert(const lt::alert *alert) override; void on_alert(const lt::alert *alert) override;
void handleSessionStatsAlert(const lt::session_stats_alert *alert);
lt::session_handle m_nativeSession;
mutable QReadWriteLock m_lock;
bool m_isSesssionListening = false;
}; };

82
src/base/bittorrent/portforwarderimpl.cpp

@ -1,6 +1,6 @@
/* /*
* Bittorrent Client using Qt and libtorrent. * Bittorrent Client using Qt and libtorrent.
* Copyright (C) 2019 Vladimir Golovnev <glassez@yandex.ru> * Copyright (C) 2019-2022 Vladimir Golovnev <glassez@yandex.ru>
* *
* This program is free software; you can redistribute it and/or * This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License * modify it under the terms of the GNU General Public License
@ -28,13 +28,12 @@
#include "portforwarderimpl.h" #include "portforwarderimpl.h"
#include <libtorrent/session.hpp> #include <utility>
#include "base/algorithm.h" #include "base/bittorrent/sessionimpl.h"
#include "base/logger.h"
PortForwarderImpl::PortForwarderImpl(lt::session *provider, QObject *parent) PortForwarderImpl::PortForwarderImpl(BitTorrent::SessionImpl *provider, QObject *parent)
: Net::PortForwarder {parent} : Net::PortForwarder(parent)
, m_storeActive {u"Network/PortForwardingEnabled"_qs, true} , m_storeActive {u"Network/PortForwardingEnabled"_qs, true}
, m_provider {provider} , m_provider {provider}
{ {
@ -66,38 +65,13 @@ void PortForwarderImpl::setEnabled(const bool enabled)
void PortForwarderImpl::setPorts(const QString &profile, QSet<quint16> ports) void PortForwarderImpl::setPorts(const QString &profile, QSet<quint16> ports)
{ {
PortMapping &portMapping = m_portProfiles[profile]; const QSet<quint16> oldForwardedPorts = std::accumulate(m_portProfiles.cbegin(), m_portProfiles.cend(), QSet<quint16>());
Algorithm::removeIf(portMapping, [this, &ports](const quint16 port, const std::vector<lt::port_mapping_t> &handles)
{
// keep existing forwardings
const bool isAlreadyMapped = ports.remove(port);
if (isAlreadyMapped)
return false;
// remove outdated forwardings
for (const lt::port_mapping_t &handle : handles)
m_provider->delete_port_mapping(handle);
m_forwardedPorts.remove(port);
return true;
});
// add new forwardings
for (const quint16 port : ports)
{
// port already forwarded/taken by other profile, don't do anything
if (m_forwardedPorts.contains(port))
continue;
if (isEnabled()) m_portProfiles[profile] = ports;
portMapping.insert(port, m_provider->add_port_mapping(lt::session::tcp, port, port)); const QSet<quint16> newForwardedPorts = std::accumulate(m_portProfiles.cbegin(), m_portProfiles.cend(), QSet<quint16>());
else
portMapping.insert(port, {});
m_forwardedPorts.insert(port);
}
if (portMapping.isEmpty()) m_provider->removeMappedPorts(oldForwardedPorts - newForwardedPorts);
m_portProfiles.remove(profile); m_provider->addMappedPorts(newForwardedPorts - oldForwardedPorts);
} }
void PortForwarderImpl::removePorts(const QString &profile) void PortForwarderImpl::removePorts(const QString &profile)
@ -107,40 +81,12 @@ void PortForwarderImpl::removePorts(const QString &profile)
void PortForwarderImpl::start() void PortForwarderImpl::start()
{ {
lt::settings_pack settingsPack; m_provider->enablePortMapping();
settingsPack.set_bool(lt::settings_pack::enable_upnp, true); for (const QSet<quint16> &ports : asConst(m_portProfiles))
settingsPack.set_bool(lt::settings_pack::enable_natpmp, true); m_provider->addMappedPorts(ports);
m_provider->apply_settings(std::move(settingsPack));
for (auto profileIter = m_portProfiles.begin(); profileIter != m_portProfiles.end(); ++profileIter)
{
PortMapping &portMapping = profileIter.value();
for (auto iter = portMapping.begin(); iter != portMapping.end(); ++iter)
{
Q_ASSERT(iter.value().empty());
const quint16 port = iter.key();
iter.value() = m_provider->add_port_mapping(lt::session::tcp, port, port);
}
}
LogMsg(tr("UPnP/NAT-PMP support: ON"), Log::INFO);
} }
void PortForwarderImpl::stop() void PortForwarderImpl::stop()
{ {
lt::settings_pack settingsPack; m_provider->disablePortMapping();
settingsPack.set_bool(lt::settings_pack::enable_upnp, false);
settingsPack.set_bool(lt::settings_pack::enable_natpmp, false);
m_provider->apply_settings(std::move(settingsPack));
// don't clear m_portProfiles so a later `start()` call can restore the port forwardings
for (auto profileIter = m_portProfiles.begin(); profileIter != m_portProfiles.end(); ++profileIter)
{
PortMapping &portMapping = profileIter.value();
for (auto iter = portMapping.begin(); iter != portMapping.end(); ++iter)
iter.value().clear();
}
LogMsg(tr("UPnP/NAT-PMP support: OFF"), Log::INFO);
} }

20
src/base/bittorrent/portforwarderimpl.h

@ -1,6 +1,6 @@
/* /*
* Bittorrent Client using Qt and libtorrent. * Bittorrent Client using Qt and libtorrent.
* Copyright (C) 2019 Vladimir Golovnev <glassez@yandex.ru> * Copyright (C) 2019-2022 Vladimir Golovnev <glassez@yandex.ru>
* *
* This program is free software; you can redistribute it and/or * This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License * modify it under the terms of the GNU General Public License
@ -28,24 +28,24 @@
#pragma once #pragma once
#include <vector>
#include <libtorrent/fwd.hpp>
#include <libtorrent/portmap.hpp>
#include <QHash> #include <QHash>
#include <QSet> #include <QSet>
#include "base/net/portforwarder.h" #include "base/net/portforwarder.h"
#include "base/settingvalue.h" #include "base/settingvalue.h"
namespace BitTorrent
{
class SessionImpl;
}
class PortForwarderImpl final : public Net::PortForwarder class PortForwarderImpl final : public Net::PortForwarder
{ {
Q_OBJECT Q_OBJECT
Q_DISABLE_COPY_MOVE(PortForwarderImpl) Q_DISABLE_COPY_MOVE(PortForwarderImpl)
public: public:
explicit PortForwarderImpl(lt::session *provider, QObject *parent = nullptr); explicit PortForwarderImpl(BitTorrent::SessionImpl *provider, QObject *parent = nullptr);
~PortForwarderImpl() override; ~PortForwarderImpl() override;
bool isEnabled() const override; bool isEnabled() const override;
@ -59,9 +59,7 @@ private:
void stop(); void stop();
CachedSettingValue<bool> m_storeActive; CachedSettingValue<bool> m_storeActive;
lt::session *const m_provider = nullptr;
using PortMapping = QHash<quint16, std::vector<lt::port_mapping_t>>; // <port, handles> BitTorrent::SessionImpl *const m_provider = nullptr;
QHash<QString, PortMapping> m_portProfiles; QHash<QString, QSet<quint16>> m_portProfiles;
QSet<quint16> m_forwardedPorts;
}; };

142
src/base/bittorrent/sessionimpl.cpp

@ -72,6 +72,7 @@
#include <QRegularExpression> #include <QRegularExpression>
#include <QString> #include <QString>
#include <QThread> #include <QThread>
#include <QThreadPool>
#include <QTimer> #include <QTimer>
#include <QUuid> #include <QUuid>
@ -509,14 +510,18 @@ SessionImpl::SessionImpl(QObject *parent)
} }
) )
, m_resumeDataStorageType(BITTORRENT_SESSION_KEY(u"ResumeDataStorageType"_qs), ResumeDataStorageType::Legacy) , m_resumeDataStorageType(BITTORRENT_SESSION_KEY(u"ResumeDataStorageType"_qs), ResumeDataStorageType::Legacy)
, m_seedingLimitTimer {new QTimer {this}} , m_seedingLimitTimer {new QTimer(this)}
, m_resumeDataTimer {new QTimer {this}} , m_resumeDataTimer {new QTimer(this)}
, m_ioThread {new QThread {this}} , m_ioThread {new QThread(this)}
, m_recentErroredTorrentsTimer {new QTimer {this}} , m_asyncWorker {new QThreadPool(this)}
, m_recentErroredTorrentsTimer {new QTimer(this)}
#if (QT_VERSION < QT_VERSION_CHECK(6, 0, 0)) #if (QT_VERSION < QT_VERSION_CHECK(6, 0, 0))
, m_networkManager {new QNetworkConfigurationManager {this}} , m_networkManager {new QNetworkConfigurationManager(this)}
#endif #endif
{ {
// It is required to perform async access to libtorrent sequentially
m_asyncWorker->setMaxThreadCount(1);
if (port() < 0) if (port() < 0)
m_port = Utils::Random::rand(1024, 65535); m_port = Utils::Random::rand(1024, 65535);
@ -572,7 +577,7 @@ SessionImpl::SessionImpl(QObject *parent)
loadStatistics(); loadStatistics();
// initialize PortForwarder instance // initialize PortForwarder instance
new PortForwarderImpl(m_nativeSession); new PortForwarderImpl(this);
// start embedded tracker // start embedded tracker
enableTracker(isTrackerEnabled()); enableTracker(isTrackerEnabled());
@ -597,6 +602,9 @@ SessionImpl::~SessionImpl()
saveStatistics(); saveStatistics();
m_asyncWorker->clear();
m_asyncWorker->waitForDone();
// We must delete FilterParserThread // We must delete FilterParserThread
// before we delete lt::session // before we delete lt::session
delete m_filterParser; delete m_filterParser;
@ -1498,7 +1506,9 @@ void SessionImpl::initializeNativeSession()
if (isPeXEnabled()) if (isPeXEnabled())
m_nativeSession->add_extension(&lt::create_ut_pex_plugin); m_nativeSession->add_extension(&lt::create_ut_pex_plugin);
m_nativeSession->add_extension(std::make_shared<NativeSessionExtension>()); auto nativeSessionExtension = std::make_shared<NativeSessionExtension>();
m_nativeSession->add_extension(nativeSessionExtension);
m_nativeSessionExtension = nativeSessionExtension.get();
} }
void SessionImpl::processBannedIPs(lt::ip_filter &filter) void SessionImpl::processBannedIPs(lt::ip_filter &filter)
@ -2228,22 +2238,27 @@ bool SessionImpl::hasRunningSeed() const
void SessionImpl::banIP(const QString &ip) void SessionImpl::banIP(const QString &ip)
{ {
QStringList bannedIPs = m_bannedIPs; if (m_bannedIPs.get().contains(ip))
if (!bannedIPs.contains(ip)) return;
{
lt::ip_filter filter = m_nativeSession->get_ip_filter();
lt::error_code ec; lt::error_code ec;
const lt::address addr = lt::make_address(ip.toLatin1().constData(), ec); const lt::address addr = lt::make_address(ip.toLatin1().constData(), ec);
Q_ASSERT(!ec); Q_ASSERT(!ec);
if (ec) return; if (ec)
return;
invokeAsync([session = m_nativeSession, addr]
{
lt::ip_filter filter = session->get_ip_filter();
filter.add_rule(addr, addr, lt::ip_filter::blocked); filter.add_rule(addr, addr, lt::ip_filter::blocked);
m_nativeSession->set_ip_filter(filter); session->set_ip_filter(std::move(filter));
});
bannedIPs << ip; QStringList bannedIPs = m_bannedIPs;
bannedIPs.append(ip);
bannedIPs.sort(); bannedIPs.sort();
m_bannedIPs = bannedIPs; m_bannedIPs = bannedIPs;
} }
}
// Delete a torrent from the session, given its hash // Delete a torrent from the session, given its hash
// and from the disk, if the corresponding deleteOption is chosen // and from the disk, if the corresponding deleteOption is chosen
@ -2807,6 +2822,83 @@ void SessionImpl::findIncompleteFiles(const TorrentInfo &torrentInfo, const Path
}); });
} }
void SessionImpl::enablePortMapping()
{
invokeAsync([this]
{
if (m_isPortMappingEnabled)
return;
lt::settings_pack settingsPack;
settingsPack.set_bool(lt::settings_pack::enable_upnp, true);
settingsPack.set_bool(lt::settings_pack::enable_natpmp, true);
m_nativeSession->apply_settings(std::move(settingsPack));
m_isPortMappingEnabled = true;
LogMsg(tr("UPnP/NAT-PMP support: ON"), Log::INFO);
});
}
void SessionImpl::disablePortMapping()
{
invokeAsync([this]
{
if (!m_isPortMappingEnabled)
return;
lt::settings_pack settingsPack;
settingsPack.set_bool(lt::settings_pack::enable_upnp, false);
settingsPack.set_bool(lt::settings_pack::enable_natpmp, false);
m_nativeSession->apply_settings(std::move(settingsPack));
m_mappedPorts.clear();
m_isPortMappingEnabled = false;
LogMsg(tr("UPnP/NAT-PMP support: OFF"), Log::INFO);
});
}
void SessionImpl::addMappedPorts(const QSet<quint16> &ports)
{
invokeAsync([this, ports]
{
if (!m_isPortMappingEnabled)
return;
for (const quint16 port : ports)
{
if (!m_mappedPorts.contains(port))
m_mappedPorts.insert(port, m_nativeSession->add_port_mapping(lt::session::tcp, port, port));
}
});
}
void SessionImpl::removeMappedPorts(const QSet<quint16> &ports)
{
invokeAsync([this, ports]
{
if (!m_isPortMappingEnabled)
return;
Algorithm::removeIf(m_mappedPorts, [this, ports](const quint16 port, const std::vector<lt::port_mapping_t> &handles)
{
if (!ports.contains(port))
return false;
for (const lt::port_mapping_t &handle : handles)
m_nativeSession->delete_port_mapping(handle);
return true;
});
});
}
void SessionImpl::invokeAsync(std::function<void ()> func)
{
m_asyncWorker->start(std::move(func));
}
// Add a torrent to libtorrent session in hidden mode // Add a torrent to libtorrent session in hidden mode
// and force it to download its metadata // and force it to download its metadata
bool SessionImpl::downloadMetadata(const MagnetUri &magnetUri) bool SessionImpl::downloadMetadata(const MagnetUri &magnetUri)
@ -4494,7 +4586,7 @@ void SessionImpl::setTrackerFilteringEnabled(const bool enabled)
bool SessionImpl::isListening() const bool SessionImpl::isListening() const
{ {
return m_nativeSession->is_listening(); return m_nativeSessionExtension->isSessionListening();
} }
MaxRatioAction SessionImpl::maxRatioAction() const MaxRatioAction SessionImpl::maxRatioAction() const
@ -5739,14 +5831,28 @@ void SessionImpl::handleTorrentConflictAlert(const lt::torrent_conflict_alert *a
else else
cancelDownloadMetadata(torrentIDv1); cancelDownloadMetadata(torrentIDv1);
torrent2->nativeHandle().set_metadata(a->metadata->info_section()); invokeAsync([torrentHandle = torrent2->nativeHandle(), metadata = a->metadata]
{
try
{
torrentHandle.set_metadata(metadata->info_section());
}
catch (const std::exception &) {}
});
} }
else if (torrent1) else if (torrent1)
{ {
if (!torrent2) if (!torrent2)
cancelDownloadMetadata(torrentIDv2); cancelDownloadMetadata(torrentIDv2);
torrent1->nativeHandle().set_metadata(a->metadata->info_section()); invokeAsync([torrentHandle = torrent1->nativeHandle(), metadata = a->metadata]
{
try
{
torrentHandle.set_metadata(metadata->info_section());
}
catch (const std::exception &) {}
});
} }
else else
{ {

18
src/base/bittorrent/sessionimpl.h

@ -33,6 +33,7 @@
#include <vector> #include <vector>
#include <libtorrent/fwd.hpp> #include <libtorrent/fwd.hpp>
#include <libtorrent/portmap.hpp>
#include <libtorrent/torrent_handle.hpp> #include <libtorrent/torrent_handle.hpp>
#include <QElapsedTimer> #include <QElapsedTimer>
@ -59,12 +60,14 @@ class QNetworkConfigurationManager;
#endif #endif
class QString; class QString;
class QThread; class QThread;
class QThreadPool;
class QTimer; class QTimer;
class QUrl; class QUrl;
class BandwidthScheduler; class BandwidthScheduler;
class FileSearcher; class FileSearcher;
class FilterParserThread; class FilterParserThread;
class NativeSessionExtension;
namespace Net namespace Net
{ {
@ -430,6 +433,13 @@ namespace BitTorrent
void findIncompleteFiles(const TorrentInfo &torrentInfo, const Path &savePath void findIncompleteFiles(const TorrentInfo &torrentInfo, const Path &savePath
, const Path &downloadPath, const PathList &filePaths = {}) const; , const Path &downloadPath, const PathList &filePaths = {}) const;
void enablePortMapping();
void disablePortMapping();
void addMappedPorts(const QSet<quint16> &ports);
void removeMappedPorts(const QSet<quint16> &ports);
void invokeAsync(std::function<void ()> func);
private slots: private slots:
void configureDeferred(); void configureDeferred();
void readAlerts(); void readAlerts();
@ -549,6 +559,7 @@ namespace BitTorrent
// BitTorrent // BitTorrent
lt::session *m_nativeSession = nullptr; lt::session *m_nativeSession = nullptr;
NativeSessionExtension *m_nativeSessionExtension = nullptr;
bool m_deferredConfigureScheduled = false; bool m_deferredConfigureScheduled = false;
bool m_IPFilteringConfigured = false; bool m_IPFilteringConfigured = false;
@ -692,6 +703,7 @@ namespace BitTorrent
QPointer<Tracker> m_tracker; QPointer<Tracker> m_tracker;
QThread *m_ioThread = nullptr; QThread *m_ioThread = nullptr;
QThreadPool *m_asyncWorker = nullptr;
ResumeDataStorage *m_resumeDataStorage = nullptr; ResumeDataStorage *m_resumeDataStorage = nullptr;
FileSearcher *m_fileSearcher = nullptr; FileSearcher *m_fileSearcher = nullptr;
@ -728,6 +740,12 @@ namespace BitTorrent
bool m_needUpgradeDownloadPath = false; bool m_needUpgradeDownloadPath = false;
// All port mapping related routines are invoked from working thread
// so there are no synchronization used. If multithreaded access is
// ever required, synchronization should also be provided.
bool m_isPortMappingEnabled = false;
QHash<quint16, std::vector<lt::port_mapping_t>> m_mappedPorts;
friend void Session::initInstance(); friend void Session::initInstance();
friend void Session::freeInstance(); friend void Session::freeInstance();
friend Session *Session::instance(); friend Session *Session::instance();

Loading…
Cancel
Save