Browse Source

fixed race condition

pull/102/head
orignal 10 years ago
parent
commit
9750fb73b5
  1. 2
      NetDb.cpp
  2. 51
      Streaming.cpp

2
NetDb.cpp

@ -102,7 +102,7 @@ namespace data
{ {
try try
{ {
I2NPMessage * msg = m_Queue.GetNextWithTimeout (10000); // 10 sec I2NPMessage * msg = m_Queue.GetNextWithTimeout (30000); // 30 sec
if (msg) if (msg)
{ {
while (msg) while (msg)

51
Streaming.cpp

@ -38,19 +38,24 @@ namespace stream
Stream::~Stream () Stream::~Stream ()
{ {
Close ();
m_ReceiveTimer.cancel ();
m_ResendTimer.cancel ();
while (!m_ReceiveQueue.empty ()) while (!m_ReceiveQueue.empty ())
{ {
auto packet = m_ReceiveQueue.front (); auto packet = m_ReceiveQueue.front ();
m_ReceiveQueue.pop (); m_ReceiveQueue.pop ();
delete packet; delete packet;
} }
for (auto it: m_SavedPackets) m_ReceiveTimer.cancel ();
delete it;
for (auto it: m_SentPackets) for (auto it: m_SentPackets)
delete it; delete it;
m_SentPackets.clear ();
m_ResendTimer.cancel ();
for (auto it: m_SavedPackets)
delete it;
m_SavedPackets.clear ();
Close ();
} }
void Stream::HandleNextPacket (Packet * packet) void Stream::HandleNextPacket (Packet * packet)
@ -387,10 +392,13 @@ namespace stream
if (packet) if (packet)
{ {
SendPackets (std::vector<Packet *> { packet }); SendPackets (std::vector<Packet *> { packet });
bool isEmpty = m_SentPackets.empty (); if (m_IsOpen)
m_SentPackets.insert (packet); {
if (isEmpty) bool isEmpty = m_SentPackets.empty ();
ScheduleResend (); m_SentPackets.insert (packet);
if (isEmpty)
ScheduleResend ();
}
return true; return true;
} }
else else
@ -565,8 +573,12 @@ namespace stream
StreamingDestination::~StreamingDestination () StreamingDestination::~StreamingDestination ()
{ {
for (auto it: m_Streams) {
delete it.second; std::unique_lock<std::mutex> l(m_StreamsMutex);
for (auto it: m_Streams)
delete it.second;
m_Streams.clear ();
}
if (m_Pool) if (m_Pool)
i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool); i2p::tunnel::tunnels.DeleteTunnelPool (m_Pool);
delete m_LeaseSet; delete m_LeaseSet;
@ -621,8 +633,12 @@ namespace stream
if (stream) if (stream)
{ {
std::unique_lock<std::mutex> l(m_StreamsMutex); std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams.erase (stream->GetRecvStreamID ()); auto it = m_Streams.find (stream->GetRecvStreamID ());
delete stream; if (it != m_Streams.end ())
{
m_Streams.erase (it);
delete stream;
}
} }
} }
@ -738,9 +754,12 @@ namespace stream
auto it = m_Destinations.find (destination->GetIdentHash ()); auto it = m_Destinations.find (destination->GetIdentHash ());
if (it != m_Destinations.end ()) if (it != m_Destinations.end ())
{ {
delete it->second; auto d = it->second;
std::unique_lock<std::mutex> l(m_DestinationsMutex); {
m_Destinations.erase (it); std::unique_lock<std::mutex> l(m_DestinationsMutex);
m_Destinations.erase (it);
}
delete d;
} }
} }

Loading…
Cancel
Save