Browse Source

fixed race condition

pull/102/head
orignal 10 years ago
parent
commit
1acce0cc80
  1. 16
      Streaming.cpp
  2. 3
      Streaming.h

16
Streaming.cpp

@ -603,6 +603,7 @@ namespace stream
Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote) Stream * StreamingDestination::CreateNewOutgoingStream (const i2p::data::LeaseSet& remote)
{ {
Stream * s = new Stream (m_Service, this, remote); Stream * s = new Stream (m_Service, this, remote);
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s; m_Streams[s->GetRecvStreamID ()] = s;
return s; return s;
} }
@ -610,6 +611,7 @@ namespace stream
Stream * StreamingDestination::CreateNewIncomingStream () Stream * StreamingDestination::CreateNewIncomingStream ()
{ {
Stream * s = new Stream (m_Service, this); Stream * s = new Stream (m_Service, this);
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams[s->GetRecvStreamID ()] = s; m_Streams[s->GetRecvStreamID ()] = s;
return s; return s;
} }
@ -618,6 +620,7 @@ namespace stream
{ {
if (stream) if (stream)
{ {
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams.erase (stream->GetRecvStreamID ()); m_Streams.erase (stream->GetRecvStreamID ());
delete stream; delete stream;
} }
@ -716,6 +719,7 @@ namespace stream
StreamingDestination * StreamingDestinations::LoadLocalDestination (const std::string& filename, bool isPublic) StreamingDestination * StreamingDestinations::LoadLocalDestination (const std::string& filename, bool isPublic)
{ {
auto localDestination = new StreamingDestination (m_Service, i2p::util::filesystem::GetFullPath (filename), isPublic); auto localDestination = new StreamingDestination (m_Service, i2p::util::filesystem::GetFullPath (filename), isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[localDestination->GetIdentHash ()] = localDestination; m_Destinations[localDestination->GetIdentHash ()] = localDestination;
return localDestination; return localDestination;
} }
@ -723,6 +727,7 @@ namespace stream
StreamingDestination * StreamingDestinations::CreateNewLocalDestination (bool isPublic) StreamingDestination * StreamingDestinations::CreateNewLocalDestination (bool isPublic)
{ {
auto localDestination = new StreamingDestination (m_Service, isPublic); auto localDestination = new StreamingDestination (m_Service, isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[localDestination->GetIdentHash ()] = localDestination; m_Destinations[localDestination->GetIdentHash ()] = localDestination;
return localDestination; return localDestination;
} }
@ -734,6 +739,7 @@ namespace stream
if (it != m_Destinations.end ()) if (it != m_Destinations.end ())
{ {
delete it->second; delete it->second;
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations.erase (it); m_Destinations.erase (it);
} }
} }
@ -747,6 +753,7 @@ namespace stream
return nullptr; return nullptr;
} }
auto localDestination = new StreamingDestination (m_Service, keys, isPublic); auto localDestination = new StreamingDestination (m_Service, keys, isPublic);
std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations[keys.GetPublic ().GetIdentHash ()] = localDestination; m_Destinations[keys.GetPublic ().GetIdentHash ()] = localDestination;
return localDestination; return localDestination;
} }
@ -760,14 +767,7 @@ namespace stream
void StreamingDestinations::DeleteStream (Stream * stream) void StreamingDestinations::DeleteStream (Stream * stream)
{ {
if (stream) if (stream)
{ stream->GetLocalDestination ()->DeleteStream (stream);
m_Service.post (
[=](void)
{
stream->GetLocalDestination ()->DeleteStream (stream);
}
);
}
} }
void StreamingDestinations::HandleNextPacket (i2p::data::IdentHash destination, Packet * packet) void StreamingDestinations::HandleNextPacket (i2p::data::IdentHash destination, Packet * packet)

3
Streaming.h

@ -8,6 +8,7 @@
#include <queue> #include <queue>
#include <thread> #include <thread>
#include <functional> #include <functional>
#include <mutex>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <cryptopp/dsa.h> #include <cryptopp/dsa.h>
@ -171,6 +172,7 @@ namespace stream
private: private:
boost::asio::io_service& m_Service; boost::asio::io_service& m_Service;
std::mutex m_StreamsMutex;
std::map<uint32_t, Stream *> m_Streams; std::map<uint32_t, Stream *> m_Streams;
i2p::data::PrivateKeys m_Keys; i2p::data::PrivateKeys m_Keys;
uint8_t m_EncryptionPublicKey[256], m_EncryptionPrivateKey[256]; uint8_t m_EncryptionPublicKey[256], m_EncryptionPrivateKey[256];
@ -217,6 +219,7 @@ namespace stream
boost::asio::io_service m_Service; boost::asio::io_service m_Service;
boost::asio::io_service::work m_Work; boost::asio::io_service::work m_Work;
std::mutex m_DestinationsMutex;
std::map<i2p::data::IdentHash, StreamingDestination *> m_Destinations; std::map<i2p::data::IdentHash, StreamingDestination *> m_Destinations;
StreamingDestination * m_SharedLocalDestination; StreamingDestination * m_SharedLocalDestination;

Loading…
Cancel
Save