Browse Source

* store peer profiles in single file

pull/1931/head
hagen 2 years ago
parent
commit
20e3d7c8fa
  1. 16
      libi2pd/NetDb.cpp
  2. 301
      libi2pd/Profiling.cpp
  3. 60
      libi2pd/Profiling.h

16
libi2pd/NetDb.cpp

@ -50,7 +50,6 @@ namespace data @@ -50,7 +50,6 @@ namespace data
{
m_Storage.SetPlace(i2p::fs::GetDataDir());
m_Storage.Init(i2p::data::GetBase64SubstitutionTable(), 64);
InitProfilesStorage ();
m_Families.LoadCertificates ();
Load ();
@ -75,6 +74,9 @@ namespace data @@ -75,6 +74,9 @@ namespace data
m_Floodfills.Insert (i2p::context.GetSharedRouterInfo ());
i2p::config::GetOption("persist.profiles", m_PersistProfiles);
if (m_PersistProfiles) {
LoadProfilesDB ();
}
m_IsRunning = true;
m_Thread = new std::thread (std::bind (&NetDb::Run, this));
@ -84,9 +86,11 @@ namespace data @@ -84,9 +86,11 @@ namespace data
{
if (m_IsRunning)
{
if (m_PersistProfiles)
SaveProfiles ();
DeleteObsoleteProfiles ();
if (m_PersistProfiles) {
PruneExpiredProfiles ();
SaveProfilesDB ();
}
ClearProfilesDB ();
m_RouterInfos.clear ();
m_Floodfills.Clear ();
if (m_Thread)
@ -175,8 +179,8 @@ namespace data @@ -175,8 +179,8 @@ namespace data
if (ts - lastProfilesCleanup >= (uint64_t)(i2p::data::PEER_PROFILE_AUTOCLEAN_TIMEOUT + profilesCleanupVariance) ||
ts + i2p::data::PEER_PROFILE_AUTOCLEAN_TIMEOUT < lastProfilesCleanup)
{
if (m_PersistProfiles) PersistProfiles ();
DeleteObsoleteProfiles ();
PruneExpiredProfiles ();
if (m_PersistProfiles) SaveProfilesDB ();
lastProfilesCleanup = ts;
profilesCleanupVariance = (rand () % (2 * i2p::data::PEER_PROFILE_AUTOCLEAN_VARIANCE) - i2p::data::PEER_PROFILE_AUTOCLEAN_VARIANCE);
}

301
libi2pd/Profiling.cpp

@ -11,7 +11,7 @@ @@ -11,7 +11,7 @@
#include <list>
#include <thread>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/ini_parser.hpp>
#include <boost/property_tree/json_parser.hpp>
#include "Base.h"
#include "FS.h"
#include "Log.h"
@ -22,159 +22,119 @@ namespace i2p @@ -22,159 +22,119 @@ namespace i2p
{
namespace data
{
static i2p::fs::HashedStorage g_ProfilesStorage("peerProfiles", "p", "profile-", "txt");
static std::unordered_map<i2p::data::IdentHash, std::shared_ptr<RouterProfile> > g_Profiles;
static std::mutex g_ProfilesMutex;
static boost::posix_time::ptime GetTime ()
static uint64_t GetTime ()
{
return boost::posix_time::second_clock::local_time();
return i2p::util::GetSecondsSinceEpoch ();
}
RouterProfile::RouterProfile ():
m_LastUpdateTime (GetTime ()), m_IsUpdated (false),
m_LastDeclineTime (0), m_LastUnreachableTime (0),
m_LastUpdateTime (0), m_LastDeclineTime (0), m_LastUnreachableTime (0),
m_NumTunnelsAgreed (0), m_NumTunnelsDeclined (0), m_NumTunnelsNonReplied (0),
m_NumTimesTaken (0), m_NumTimesRejected (0), m_HasConnected (false)
{
}
void RouterProfile::UpdateTime ()
std::string RouterProfile::Dump (const std::string& peerid)
{
m_LastUpdateTime = GetTime ();
m_IsUpdated = true;
}
void RouterProfile::Save (const IdentHash& identHash)
{
// fill sections
boost::property_tree::ptree participation;
participation.put (PEER_PROFILE_PARTICIPATION_AGREED, m_NumTunnelsAgreed);
participation.put (PEER_PROFILE_PARTICIPATION_DECLINED, m_NumTunnelsDeclined);
participation.put (PEER_PROFILE_PARTICIPATION_NON_REPLIED, m_NumTunnelsNonReplied);
boost::property_tree::ptree usage;
usage.put (PEER_PROFILE_USAGE_TAKEN, m_NumTimesTaken);
usage.put (PEER_PROFILE_USAGE_REJECTED, m_NumTimesRejected);
usage.put (PEER_PROFILE_USAGE_CONNECTED, m_HasConnected);
// fill property tree
boost::property_tree::ptree pt;
pt.put (PEER_PROFILE_LAST_UPDATE_TIME, boost::posix_time::to_simple_string (m_LastUpdateTime));
if (m_LastUnreachableTime)
pt.put (PEER_PROFILE_LAST_UNREACHABLE_TIME, m_LastUnreachableTime);
pt.put_child (PEER_PROFILE_SECTION_PARTICIPATION, participation);
pt.put_child (PEER_PROFILE_SECTION_USAGE, usage);
std::stringstream ss;
// save to file
std::string ident = identHash.ToBase64 ();
std::string path = g_ProfilesStorage.Path(ident);
pt.put(PEER_PROFILE_PEER_ID, peerid);
/* "times" hash */
pt.put(PEER_PROFILE_LAST_UPDATE_TIME, m_LastUpdateTime);
pt.put(PEER_PROFILE_LAST_DECLINE_TIME, m_LastDeclineTime);
pt.put(PEER_PROFILE_LAST_UNREACHABLE_TIME, m_LastUnreachableTime);
/* "tunnels" hash */
pt.put(PEER_PROFILE_PARTICIPATION_AGREED, m_NumTunnelsAgreed);
pt.put(PEER_PROFILE_PARTICIPATION_DECLINED, m_NumTunnelsDeclined);
pt.put(PEER_PROFILE_PARTICIPATION_NON_REPLIED, m_NumTunnelsNonReplied);
/* "usage" hash */
pt.put(PEER_PROFILE_USAGE_TAKEN, m_NumTimesTaken);
pt.put(PEER_PROFILE_USAGE_REJECTED, m_NumTimesRejected);
pt.put(PEER_PROFILE_USAGE_CONNECTED, m_HasConnected);
try {
boost::property_tree::write_ini (path, pt);
/* convert ptree to single line json string */
boost::property_tree::write_json (ss, pt, false);
} catch (std::exception& ex) {
/* boost exception verbose enough */
LogPrint (eLogError, "Profiling: ", ex.what ());
LogPrint (eLogError, "Profiling: can't serialize data to json -- ", ex.what ());
}
return ss.str();
}
void RouterProfile::Load (const IdentHash& identHash)
std::string RouterProfile::Load (const std::string& jsondata)
{
std::string ident = identHash.ToBase64 ();
std::string path = g_ProfilesStorage.Path(ident);
boost::property_tree::ptree pt;
std::stringstream ss(jsondata);
std::string peerid = "";
if (!i2p::fs::Exists(path))
{
LogPrint(eLogWarning, "Profiling: No profile yet for ", ident);
return;
}
try
{
boost::property_tree::read_ini (path, pt);
} catch (std::exception& ex)
{
try {
boost::property_tree::read_json (ss, pt);
} catch (std::exception& ex) {
/* boost exception verbose enough */
LogPrint (eLogError, "Profiling: ", ex.what ());
return;
LogPrint (eLogError, "Profiling: can't parse json data -- ", ex.what ());
return std::string("");
}
try
{
auto t = pt.get (PEER_PROFILE_LAST_UPDATE_TIME, "");
if (t.length () > 0)
m_LastUpdateTime = boost::posix_time::time_from_string (t);
if ((GetTime () - m_LastUpdateTime).hours () < PEER_PROFILE_EXPIRATION_TIMEOUT)
{
m_LastUnreachableTime = pt.get (PEER_PROFILE_LAST_UNREACHABLE_TIME, 0);
try
{
// read participations
auto participations = pt.get_child (PEER_PROFILE_SECTION_PARTICIPATION);
m_NumTunnelsAgreed = participations.get (PEER_PROFILE_PARTICIPATION_AGREED, 0);
m_NumTunnelsDeclined = participations.get (PEER_PROFILE_PARTICIPATION_DECLINED, 0);
m_NumTunnelsNonReplied = participations.get (PEER_PROFILE_PARTICIPATION_NON_REPLIED, 0);
}
catch (boost::property_tree::ptree_bad_path& ex)
{
LogPrint (eLogWarning, "Profiling: Missing section ", PEER_PROFILE_SECTION_PARTICIPATION, " in profile for ", ident);
}
try
{
// read usage
auto usage = pt.get_child (PEER_PROFILE_SECTION_USAGE);
m_NumTimesTaken = usage.get (PEER_PROFILE_USAGE_TAKEN, 0);
m_NumTimesRejected = usage.get (PEER_PROFILE_USAGE_REJECTED, 0);
m_HasConnected = usage.get (PEER_PROFILE_USAGE_CONNECTED, false);
}
catch (boost::property_tree::ptree_bad_path& ex)
{
LogPrint (eLogWarning, "Profiling: Missing section ", PEER_PROFILE_SECTION_USAGE, " in profile for ", ident);
}
}
else
*this = RouterProfile ();
try {
peerid = pt.get<std::string>(PEER_PROFILE_PEER_ID);
} catch (std::exception& ex) {
LogPrint (eLogError, "Profiling: Can't read profile data: missing peerid");
return std::string("");
}
catch (std::exception& ex)
{
LogPrint (eLogError, "Profiling: Can't read profile ", ident, " :", ex.what ());
try {
/* "lasttime" hash */
m_LastUpdateTime = pt.get<int>(PEER_PROFILE_LAST_UPDATE_TIME, 0);
m_LastDeclineTime = pt.get<int>(PEER_PROFILE_LAST_DECLINE_TIME, 0);
m_LastUnreachableTime = pt.get<int>(PEER_PROFILE_LAST_UNREACHABLE_TIME, 0);
/* "tunnels" hash */
m_NumTunnelsAgreed = pt.get<int>(PEER_PROFILE_PARTICIPATION_AGREED, 0);
m_NumTunnelsDeclined = pt.get<int>(PEER_PROFILE_PARTICIPATION_DECLINED, 0);
m_NumTunnelsNonReplied = pt.get<int>(PEER_PROFILE_PARTICIPATION_NON_REPLIED, 0);
/* "usage" hash */
m_NumTimesTaken = pt.get<int>(PEER_PROFILE_USAGE_TAKEN, 0);
m_NumTimesRejected = pt.get<int>(PEER_PROFILE_USAGE_REJECTED, 0);
m_HasConnected = pt.get<bool>(PEER_PROFILE_USAGE_CONNECTED, false);
} catch (boost::property_tree::ptree_bad_path& ex) {
LogPrint (eLogError, "Profiling: Can't read profile data: ", ex.what());
}
return peerid;
}
void RouterProfile::TunnelBuildResponse (uint8_t ret)
{
UpdateTime ();
if (ret > 0)
{
if (ret > 0) {
m_NumTunnelsDeclined++;
m_LastDeclineTime = i2p::util::GetSecondsSinceEpoch ();
}
else
{
m_NumTunnelsAgreed++;
m_LastDeclineTime = GetTime ();
} else {
m_NumTunnelsAgreed++;
m_LastDeclineTime = 0;
}
m_LastUpdateTime = GetTime ();
}
void RouterProfile::TunnelNonReplied ()
{
m_NumTunnelsNonReplied++;
UpdateTime ();
m_NumTunnelsNonReplied++;
if (m_NumTunnelsNonReplied > 2*m_NumTunnelsAgreed && m_NumTunnelsNonReplied > 3)
{
m_LastDeclineTime = i2p::util::GetSecondsSinceEpoch ();
}
m_LastDeclineTime = GetTime ();
m_LastUpdateTime = GetTime ();
}
void RouterProfile::Unreachable ()
{
m_LastUnreachableTime = i2p::util::GetSecondsSinceEpoch ();
UpdateTime ();
m_LastUnreachableTime = GetTime ();
m_LastUpdateTime = GetTime ();
}
void RouterProfile::Connected ()
{
m_HasConnected = true;
UpdateTime ();
m_LastUpdateTime = GetTime ();
}
bool RouterProfile::IsLowPartcipationRate () const
@ -241,84 +201,101 @@ namespace data @@ -241,84 +201,101 @@ namespace data
if (it != g_Profiles.end ())
return it->second;
}
LogPrint(eLogDebug, "Profiling: creating new profile for ", identHash.ToBase64());
auto profile = std::make_shared<RouterProfile> ();
profile->Load (identHash); // if possible
std::unique_lock<std::mutex> l(g_ProfilesMutex);
g_Profiles.emplace (identHash, profile);
return profile;
}
void InitProfilesStorage ()
{
g_ProfilesStorage.SetPlace(i2p::fs::GetDataDir());
g_ProfilesStorage.Init(i2p::data::GetBase64SubstitutionTable(), 64);
}
void LoadProfilesDB () {
unsigned int loaded = 0, linenum = 0;
static std::unordered_map<i2p::data::IdentHash, std::shared_ptr<RouterProfile> > new_db;
IdentHash identHash;
std::string oldDBDir = i2p::fs::DataDirPath("peerProfiles");
auto DBPath = i2p::fs::DataDirPath(PEER_PROFILES_DB_FILENAME);
if (i2p::fs::Exists(oldDBDir)) {
std::string oldDBBak = oldDBDir + ".bak";
LogPrint(eLogInfo, "Profiling: old peerProfiles/ directory still exists, you may safely remove it");
std::rename(oldDBDir.c_str(), oldDBBak.c_str());
}
if (!i2p::fs::Exists(DBPath))
return; /* no database yet */
void PersistProfiles ()
{
auto ts = GetTime ();
std::list<std::pair<i2p::data::IdentHash, std::shared_ptr<RouterProfile> > > tmp;
{
std::unique_lock<std::mutex> l(g_ProfilesMutex);
for (auto it = g_Profiles.begin (); it != g_Profiles.end ();)
{
if ((ts - it->second->GetLastUpdateTime ()).total_seconds () > PEER_PROFILE_PERSIST_INTERVAL)
{
if (it->second->IsUpdated ())
tmp.push_back (std::make_pair (it->first, it->second));
it = g_Profiles.erase (it);
}
else
it++;
std::ifstream in (DBPath);
if (!in.is_open()) {
LogPrint (eLogError, "Profiling: can't open profiles database ", DBPath);
return;
}
std::string line;
while (!(in.eof() || in.fail())) {
std::getline(in, line); linenum++;
if (line.empty()) continue;
if (line[0] != '{') {
LogPrint(eLogError, "Profiling: ignore profile data at line ", linenum);
continue;
}
auto profile = std::make_shared<RouterProfile> ();
std::string peerid = profile->Load(line);
if (peerid.empty())
continue; /* load failed, errors logged */
identHash.FromBase64(peerid);
new_db.emplace(identHash, profile);
loaded++;
}
for (auto& it: tmp)
if (it.second) it.second->Save (it.first);
}
LogPrint (eLogInfo, "Profiling: loaded ", loaded, " profiles");
void SaveProfiles ()
{
std::unordered_map<i2p::data::IdentHash, std::shared_ptr<RouterProfile> > tmp;
{
{ /* replace exiting database with just loaded */
std::unique_lock<std::mutex> l(g_ProfilesMutex);
tmp = g_Profiles;
g_Profiles.clear ();
g_Profiles = new_db;
}
auto ts = GetTime ();
for (auto& it: tmp)
if (it.second->IsUseful() && it.second->IsUpdated () && (ts - it.second->GetLastUpdateTime ()).total_seconds () < PEER_PROFILE_EXPIRATION_TIMEOUT*3600)
it.second->Save (it.first);
return;
}
void DeleteObsoleteProfiles ()
{
{
auto ts = GetTime ();
std::unique_lock<std::mutex> l(g_ProfilesMutex);
for (auto it = g_Profiles.begin (); it != g_Profiles.end ();)
{
if ((ts - it->second->GetLastUpdateTime ()).total_seconds () >= PEER_PROFILE_EXPIRATION_TIMEOUT*3600)
it = g_Profiles.erase (it);
else
it++;
void PruneExpiredProfiles () {
unsigned int pruned = 0;
auto ts = GetTime ();
std::unique_lock<std::mutex> l(g_ProfilesMutex);
for (auto it = g_Profiles.begin (); it != g_Profiles.end (); ) {
if ((ts - it->second->GetLastUpdateTime ()) >= PEER_PROFILE_EXPIRATION_TIMEOUT * 3600) {
it = g_Profiles.erase (it);
pruned++;
} else {
it++;
}
}
LogPrint(eLogInfo, "Profiling: pruned ", pruned, " expired peer profiles, ", g_Profiles.size(), " remains");
}
struct stat st;
std::time_t now = std::time(nullptr);
std::vector<std::string> files;
g_ProfilesStorage.Traverse(files);
for (const auto& path: files) {
if (stat(path.c_str(), &st) != 0) {
LogPrint(eLogWarning, "Profiling: Can't stat(): ", path);
continue;
}
if (now - st.st_mtime >= PEER_PROFILE_EXPIRATION_TIMEOUT*3600) {
LogPrint(eLogDebug, "Profiling: Removing expired peer profile: ", path);
i2p::fs::Remove(path);
}
void SaveProfilesDB () {
unsigned int saved = 0;
auto DBPath = i2p::fs::DataDirPath(PEER_PROFILES_DB_FILENAME);
auto DBPathNew = DBPath + ".new";
std::ofstream out (DBPathNew);
std::unique_lock<std::mutex> l(g_ProfilesMutex);
if (!out.is_open()) {
LogPrint(eLogError, "Profiling: can't open database file ", DBPathNew);
return;
}
auto ts = GetTime ();
/* save "old enough" profiles */
for (auto& it : g_Profiles) {
if (it.second->IsUseful() && (ts - it.second->GetLastUpdateTime ()) < PEER_PROFILE_PERSIST_INTERVAL)
continue; /* too new */
out << it.second->Dump(it.first.ToBase64());
saved++;
}
out.flush();
out.close();
LogPrint(eLogDebug, "Profiling: db path is", DBPath);
std::rename(DBPathNew.c_str(), DBPath.c_str());
LogPrint(eLogInfo, "Profiling: saved ", saved, " peer profiles");
}
void ClearProfilesDB () {
g_Profiles.clear();
}
}
}

60
libi2pd/Profiling.h

@ -16,18 +16,25 @@ namespace i2p @@ -16,18 +16,25 @@ namespace i2p
{
namespace data
{
// sections
const char PEER_PROFILE_SECTION_PARTICIPATION[] = "participation";
const char PEER_PROFILE_SECTION_USAGE[] = "usage";
const char PEER_PROFILES_DB_FILENAME[] = "peerProfiles.dat";
/** example json peer profile (pretty-printed):
{
"peerid": "<base64-ident>",
"lasttime": { "update": 123456789, "decline": 123456789, "unreachable": 123456789 },
"tunnels": { "agreed": 17. "declined": 4, "noreply: 2 },
"usage": { "taken": 10, "rejected": 3 }
} */
// params
const char PEER_PROFILE_LAST_UPDATE_TIME[] = "lastupdatetime";
const char PEER_PROFILE_LAST_UNREACHABLE_TIME[] = "lastunreachabletime";
const char PEER_PROFILE_PARTICIPATION_AGREED[] = "agreed";
const char PEER_PROFILE_PARTICIPATION_DECLINED[] = "declined";
const char PEER_PROFILE_PARTICIPATION_NON_REPLIED[] = "nonreplied";
const char PEER_PROFILE_USAGE_TAKEN[] = "taken";
const char PEER_PROFILE_USAGE_REJECTED[] = "rejected";
const char PEER_PROFILE_USAGE_CONNECTED[] = "connected";
const char PEER_PROFILE_PEER_ID[] = "peerid";
const char PEER_PROFILE_LAST_UPDATE_TIME[] = "lasttime.update";
const char PEER_PROFILE_LAST_DECLINE_TIME[] = "lasttime.decline";
const char PEER_PROFILE_LAST_UNREACHABLE_TIME[] = "lasttime.unreachable";
const char PEER_PROFILE_PARTICIPATION_AGREED[] = "tunnels.agreed";
const char PEER_PROFILE_PARTICIPATION_DECLINED[] = "tunnels.declined";
const char PEER_PROFILE_PARTICIPATION_NON_REPLIED[] = "tunnels.noreply";
const char PEER_PROFILE_USAGE_TAKEN[] = "usage.taken";
const char PEER_PROFILE_USAGE_REJECTED[] = "usage.rejected";
const char PEER_PROFILE_USAGE_CONNECTED[] = "usage.connected";
const int PEER_PROFILE_EXPIRATION_TIMEOUT = 36; // in hours (1.5 days)
const int PEER_PROFILE_AUTOCLEAN_TIMEOUT = 6 * 3600; // in seconds (6 hours)
@ -44,11 +51,12 @@ namespace data @@ -44,11 +51,12 @@ namespace data
RouterProfile ();
RouterProfile& operator= (const RouterProfile& ) = default;
void Save (const IdentHash& identHash);
void Load (const IdentHash& identHash);
std::string Dump (const std::string& peerid);
std::string Load (const std::string& jsondata);
bool IsBad ();
bool IsUnreachable ();
bool IsUseful() const;
bool IsReal () const { return m_HasConnected || m_NumTunnelsAgreed > 0 || m_NumTunnelsDeclined > 0; }
void TunnelBuildResponse (uint8_t ret);
@ -57,15 +65,10 @@ namespace data @@ -57,15 +65,10 @@ namespace data
void Unreachable ();
void Connected ();
boost::posix_time::ptime GetLastUpdateTime () const { return m_LastUpdateTime; };
bool IsUpdated () const { return m_IsUpdated; };
bool IsUseful() const;
uint64_t GetLastUpdateTime () const { return m_LastUpdateTime; };
private:
void UpdateTime ();
bool IsAlwaysDeclining () const { return !m_NumTunnelsAgreed && m_NumTunnelsDeclined >= 5; };
bool IsLowPartcipationRate () const;
bool IsLowReplyRate () const;
@ -73,9 +76,10 @@ namespace data @@ -73,9 +76,10 @@ namespace data
private:
boost::posix_time::ptime m_LastUpdateTime; // TODO: use std::chrono
bool m_IsUpdated;
uint64_t m_LastDeclineTime, m_LastUnreachableTime; // in seconds
// lasttime
uint64_t m_LastUpdateTime;
uint64_t m_LastDeclineTime;
uint64_t m_LastUnreachableTime;
// participation
uint32_t m_NumTunnelsAgreed;
uint32_t m_NumTunnelsDeclined;
@ -87,10 +91,14 @@ namespace data @@ -87,10 +91,14 @@ namespace data
};
std::shared_ptr<RouterProfile> GetRouterProfile (const IdentHash& identHash);
void InitProfilesStorage ();
void DeleteObsoleteProfiles ();
void SaveProfiles ();
void PersistProfiles ();
/** database file operations */
void LoadProfilesDB (); /*< read saved peer profiles from file to memory */
void SaveProfilesDB (); /*< serialize and write to file known peer profiles */
/** memory database operations */
void PruneExpiredProfiles (); /*< discard peer profiles inactive for long time */
void ClearProfilesDB (); /*< discard ALL known peer profiles */
}
}

Loading…
Cancel
Save