Browse Source

send message right a way if in same thread

pull/2094/head
orignal 2 months ago
parent
commit
96ea630274
  1. 37
      libi2pd_client/I2CP.cpp
  2. 7
      libi2pd_client/I2CP.h

37
libi2pd_client/I2CP.cpp

@ -24,10 +24,11 @@ namespace client
{ {
I2CPDestination::I2CPDestination (boost::asio::io_service& service, std::shared_ptr<I2CPSession> owner, I2CPDestination::I2CPDestination (boost::asio::io_service& service, std::shared_ptr<I2CPSession> owner,
std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, const std::map<std::string, std::string>& params): std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, bool isSameThread,
const std::map<std::string, std::string>& params):
LeaseSetDestination (service, isPublic, &params), LeaseSetDestination (service, isPublic, &params),
m_Owner (owner), m_Identity (identity), m_EncryptionKeyType (m_Identity->GetCryptoKeyType ()), m_Owner (owner), m_Identity (identity), m_EncryptionKeyType (m_Identity->GetCryptoKeyType ()),
m_IsCreatingLeaseSet (false), m_LeaseSetCreationTimer (service) m_IsCreatingLeaseSet (false), m_IsSameThread (isSameThread), m_LeaseSetCreationTimer (service)
{ {
} }
@ -152,20 +153,32 @@ namespace client
memcpy (buf + 4, payload, len); memcpy (buf + 4, payload, len);
msg->len += len + 4; msg->len += len + 4;
msg->FillI2NPMessageHeader (eI2NPData); msg->FillI2NPMessageHeader (eI2NPData);
auto s = GetSharedFromThis ();
auto remote = FindLeaseSet (ident); auto remote = FindLeaseSet (ident);
if (remote) if (remote)
{ {
GetService ().post ( if (m_IsSameThread)
[s, msg, remote, nonce]() {
{ // send right a way
bool sent = s->SendMsg (msg, remote); bool sent = SendMsg (msg, remote);
if (s->m_Owner) if (m_Owner)
s->m_Owner->SendMessageStatusMessage (nonce, sent ? eI2CPMessageStatusGuaranteedSuccess : eI2CPMessageStatusGuaranteedFailure); m_Owner->SendMessageStatusMessage (nonce, sent ? eI2CPMessageStatusGuaranteedSuccess : eI2CPMessageStatusGuaranteedFailure);
}); }
else
{
// send in destination's thread
auto s = GetSharedFromThis ();
GetService ().post (
[s, msg, remote, nonce]()
{
bool sent = s->SendMsg (msg, remote);
if (s->m_Owner)
s->m_Owner->SendMessageStatusMessage (nonce, sent ? eI2CPMessageStatusGuaranteedSuccess : eI2CPMessageStatusGuaranteedFailure);
});
}
} }
else else
{ {
auto s = GetSharedFromThis ();
RequestDestination (ident, RequestDestination (ident,
[s, msg, nonce](std::shared_ptr<i2p::data::LeaseSet> ls) [s, msg, nonce](std::shared_ptr<i2p::data::LeaseSet> ls)
{ {
@ -246,7 +259,7 @@ namespace client
RunnableI2CPDestination::RunnableI2CPDestination (std::shared_ptr<I2CPSession> owner, RunnableI2CPDestination::RunnableI2CPDestination (std::shared_ptr<I2CPSession> owner,
std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, const std::map<std::string, std::string>& params): std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, const std::map<std::string, std::string>& params):
RunnableService ("I2CP"), RunnableService ("I2CP"),
I2CPDestination (GetIOService (), owner, identity, isPublic, params) I2CPDestination (GetIOService (), owner, identity, isPublic, false, params)
{ {
} }
@ -583,7 +596,7 @@ namespace client
if (!m_Destination) if (!m_Destination)
{ {
m_Destination = m_Owner.IsSingleThread () ? m_Destination = m_Owner.IsSingleThread () ?
std::make_shared<I2CPDestination>(m_Owner.GetService (), shared_from_this (), identity, true, params): std::make_shared<I2CPDestination>(m_Owner.GetService (), shared_from_this (), identity, true, true, params):
std::make_shared<RunnableI2CPDestination>(shared_from_this (), identity, true, params); std::make_shared<RunnableI2CPDestination>(shared_from_this (), identity, true, params);
if (m_Owner.InsertSession (shared_from_this ())) if (m_Owner.InsertSession (shared_from_this ()))
{ {

7
libi2pd_client/I2CP.h

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2013-2023, The PurpleI2P Project * Copyright (c) 2013-2024, The PurpleI2P Project
* *
* This file is part of Purple i2pd project and licensed under BSD3 * This file is part of Purple i2pd project and licensed under BSD3
* *
@ -79,7 +79,8 @@ namespace client
public: public:
I2CPDestination (boost::asio::io_service& service, std::shared_ptr<I2CPSession> owner, I2CPDestination (boost::asio::io_service& service, std::shared_ptr<I2CPSession> owner,
std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, const std::map<std::string, std::string>& params); std::shared_ptr<const i2p::data::IdentityEx> identity, bool isPublic, bool isSameThread,
const std::map<std::string, std::string>& params);
~I2CPDestination () {}; ~I2CPDestination () {};
void Stop (); void Stop ();
@ -120,7 +121,7 @@ namespace client
std::shared_ptr<i2p::crypto::ECIESX25519AEADRatchetDecryptor> m_ECIESx25519Decryptor; std::shared_ptr<i2p::crypto::ECIESX25519AEADRatchetDecryptor> m_ECIESx25519Decryptor;
uint8_t m_ECIESx25519PrivateKey[32]; uint8_t m_ECIESx25519PrivateKey[32];
uint64_t m_LeaseSetExpirationTime; uint64_t m_LeaseSetExpirationTime;
bool m_IsCreatingLeaseSet; bool m_IsCreatingLeaseSet, m_IsSameThread;
boost::asio::deadline_timer m_LeaseSetCreationTimer; boost::asio::deadline_timer m_LeaseSetCreationTimer;
i2p::util::MemoryPoolMt<I2NPMessageBuffer<I2NP_MAX_MESSAGE_SIZE> > m_I2NPMsgsPool; i2p::util::MemoryPoolMt<I2NPMessageBuffer<I2NP_MAX_MESSAGE_SIZE> > m_I2NPMsgsPool;
}; };

Loading…
Cancel
Save