You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
671 lines
27 KiB
671 lines
27 KiB
|
|
// Copyright Oliver Kowalke 2016. |
|
// Distributed under the Boost Software License, Version 1.0. |
|
// (See accompanying file LICENSE_1_0.txt or copy at |
|
// http://www.boost.org/LICENSE_1_0.txt) |
|
|
|
#ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H |
|
#define BOOST_FIBERS_UNBUFFERED_CHANNEL_H |
|
|
|
#include <atomic> |
|
#include <chrono> |
|
#include <cstddef> |
|
#include <cstdint> |
|
#include <memory> |
|
#include <vector> |
|
|
|
#include <boost/config.hpp> |
|
|
|
#include <boost/fiber/channel_op_status.hpp> |
|
#include <boost/fiber/context.hpp> |
|
#include <boost/fiber/detail/config.hpp> |
|
#include <boost/fiber/detail/convert.hpp> |
|
#if defined(BOOST_NO_CXX14_STD_EXCHANGE) |
|
#include <boost/fiber/detail/exchange.hpp> |
|
#endif |
|
#include <boost/fiber/detail/spinlock.hpp> |
|
#include <boost/fiber/exceptions.hpp> |
|
|
|
#ifdef BOOST_HAS_ABI_HEADERS |
|
# include BOOST_ABI_PREFIX |
|
#endif |
|
|
|
namespace boost { |
|
namespace fibers { |
|
|
|
template< typename T > |
|
class unbuffered_channel { |
|
public: |
|
typedef typename std::remove_reference< T >::type value_type; |
|
|
|
private: |
|
typedef context::wait_queue_t wait_queue_type; |
|
|
|
struct slot { |
|
value_type value; |
|
context * ctx; |
|
|
|
slot( value_type const& value_, context * ctx_) : |
|
value{ value_ }, |
|
ctx{ ctx_ } { |
|
} |
|
|
|
slot( value_type && value_, context * ctx_) : |
|
value{ std::move( value_) }, |
|
ctx{ ctx_ } { |
|
} |
|
}; |
|
|
|
// shared cacheline |
|
std::atomic< slot * > slot_{ nullptr }; |
|
// shared cacheline |
|
std::atomic_bool closed_{ false }; |
|
mutable detail::spinlock splk_producers_{}; |
|
wait_queue_type waiting_producers_{}; |
|
mutable detail::spinlock splk_consumers_{}; |
|
wait_queue_type waiting_consumers_{}; |
|
char pad_[cacheline_length]; |
|
|
|
bool is_empty_() { |
|
return nullptr == slot_.load( std::memory_order_acquire); |
|
} |
|
|
|
bool try_push_( slot * own_slot) { |
|
for (;;) { |
|
slot * s = slot_.load( std::memory_order_acquire); |
|
if ( nullptr == s) { |
|
if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) { |
|
continue; |
|
} |
|
return true; |
|
} else { |
|
return false; |
|
} |
|
} |
|
} |
|
|
|
slot * try_pop_() { |
|
slot * nil_slot = nullptr; |
|
for (;;) { |
|
slot * s = slot_.load( std::memory_order_acquire); |
|
if ( nullptr != s) { |
|
if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) { |
|
continue;} |
|
} |
|
return s; |
|
} |
|
} |
|
|
|
public: |
|
unbuffered_channel() { |
|
} |
|
|
|
~unbuffered_channel() { |
|
close(); |
|
} |
|
|
|
unbuffered_channel( unbuffered_channel const&) = delete; |
|
unbuffered_channel & operator=( unbuffered_channel const&) = delete; |
|
|
|
bool is_closed() const noexcept { |
|
return closed_.load( std::memory_order_acquire); |
|
} |
|
|
|
void close() noexcept { |
|
context * active_ctx = context::active(); |
|
// set flag |
|
if ( ! closed_.exchange( true, std::memory_order_acquire) ) { |
|
// notify current waiting |
|
slot * s = slot_.load( std::memory_order_acquire); |
|
if ( nullptr != s) { |
|
// notify context |
|
active_ctx->schedule( s->ctx); |
|
} |
|
// notify all waiting producers |
|
detail::spinlock_lock lk1{ splk_producers_ }; |
|
while ( ! waiting_producers_.empty() ) { |
|
context * producer_ctx = & waiting_producers_.front(); |
|
waiting_producers_.pop_front(); |
|
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); |
|
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { |
|
// notify context |
|
active_ctx->schedule( producer_ctx); |
|
} else if ( static_cast< std::intptr_t >( 0) == expected) { |
|
// no timed-wait op. |
|
// notify context |
|
active_ctx->schedule( producer_ctx); |
|
} |
|
} |
|
// notify all waiting consumers |
|
detail::spinlock_lock lk2{ splk_consumers_ }; |
|
while ( ! waiting_consumers_.empty() ) { |
|
context * consumer_ctx = & waiting_consumers_.front(); |
|
waiting_consumers_.pop_front(); |
|
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); |
|
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { |
|
// notify context |
|
active_ctx->schedule( consumer_ctx); |
|
} else if ( static_cast< std::intptr_t >( 0) == expected) { |
|
// no timed-wait op. |
|
// notify context |
|
active_ctx->schedule( consumer_ctx); |
|
} |
|
} |
|
} |
|
} |
|
|
|
channel_op_status push( value_type const& value) { |
|
context * active_ctx = context::active(); |
|
slot s{ value, active_ctx }; |
|
for (;;) { |
|
if ( BOOST_UNLIKELY( is_closed() ) ) { |
|
return channel_op_status::closed; |
|
} |
|
if ( try_push_( & s) ) { |
|
detail::spinlock_lock lk{ splk_consumers_ }; |
|
// notify one waiting consumer |
|
while ( ! waiting_consumers_.empty() ) { |
|
context * consumer_ctx = & waiting_consumers_.front(); |
|
waiting_consumers_.pop_front(); |
|
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); |
|
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { |
|
// notify context |
|
active_ctx->schedule( consumer_ctx); |
|
break; |
|
} else if ( static_cast< std::intptr_t >( 0) == expected) { |
|
// no timed-wait op. |
|
// notify context |
|
active_ctx->schedule( consumer_ctx); |
|
break; |
|
} |
|
} |
|
// suspend till value has been consumed |
|
active_ctx->suspend( lk); |
|
// resumed |
|
if ( nullptr == s.ctx) { |
|
// value has been consumed |
|
return channel_op_status::success; |
|
} else { |
|
// channel was closed before value was consumed |
|
return channel_op_status::closed; |
|
} |
|
} else { |
|
detail::spinlock_lock lk{ splk_producers_ }; |
|
if ( BOOST_UNLIKELY( is_closed() ) ) { |
|
return channel_op_status::closed; |
|
} |
|
if ( is_empty_() ) { |
|
continue; |
|
} |
|
active_ctx->wait_link( waiting_producers_); |
|
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); |
|
// suspend this producer |
|
active_ctx->suspend( lk); |
|
// resumed, slot mabye free |
|
} |
|
} |
|
} |
|
|
|
channel_op_status push( value_type && value) { |
|
context * active_ctx = context::active(); |
|
slot s{ std::move( value), active_ctx }; |
|
for (;;) { |
|
if ( BOOST_UNLIKELY( is_closed() ) ) { |
|
return channel_op_status::closed; |
|
} |
|
if ( try_push_( & s) ) { |
|
detail::spinlock_lock lk{ splk_consumers_ }; |
|
// notify one waiting consumer |
|
while ( ! waiting_consumers_.empty() ) { |
|
context * consumer_ctx = & waiting_consumers_.front(); |
|
waiting_consumers_.pop_front(); |
|
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); |
|
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { |
|
// notify context |
|
active_ctx->schedule( consumer_ctx); |
|
break; |
|
} else if ( static_cast< std::intptr_t >( 0) == expected) { |
|
// no timed-wait op. |
|
// notify context |
|
active_ctx->schedule( consumer_ctx); |
|
break; |
|
} |
|
} |
|
// suspend till value has been consumed |
|
active_ctx->suspend( lk); |
|
// resumed |
|
if ( nullptr == s.ctx) { |
|
// value has been consumed |
|
return channel_op_status::success; |
|
} else { |
|
// channel was closed before value was consumed |
|
return channel_op_status::closed; |
|
} |
|
} else { |
|
detail::spinlock_lock lk{ splk_producers_ }; |
|
if ( BOOST_UNLIKELY( is_closed() ) ) { |
|
return channel_op_status::closed; |
|
} |
|
if ( is_empty_() ) { |
|
continue; |
|
} |
|
active_ctx->wait_link( waiting_producers_); |
|
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); |
|
// suspend this producer |
|
active_ctx->suspend( lk); |
|
// resumed, slot mabye free |
|
} |
|
} |
|
} |
|
|
|
template< typename Rep, typename Period > |
|
channel_op_status push_wait_for( value_type const& value, |
|
std::chrono::duration< Rep, Period > const& timeout_duration) { |
|
return push_wait_until( value, |
|
std::chrono::steady_clock::now() + timeout_duration); |
|
} |
|
|
|
template< typename Rep, typename Period > |
|
channel_op_status push_wait_for( value_type && value, |
|
std::chrono::duration< Rep, Period > const& timeout_duration) { |
|
return push_wait_until( std::forward< value_type >( value), |
|
std::chrono::steady_clock::now() + timeout_duration); |
|
} |
|
|
|
template< typename Clock, typename Duration > |
|
channel_op_status push_wait_until( value_type const& value, |
|
std::chrono::time_point< Clock, Duration > const& timeout_time_) { |
|
context * active_ctx = context::active(); |
|
slot s{ value, active_ctx }; |
|
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); |
|
for (;;) { |
|
if ( BOOST_UNLIKELY( is_closed() ) ) { |
|
return channel_op_status::closed; |
|
} |
|
if ( try_push_( & s) ) { |
|
detail::spinlock_lock lk{ splk_consumers_ }; |
|
// notify one waiting consumer |
|
while ( ! waiting_consumers_.empty() ) { |
|
context * consumer_ctx = & waiting_consumers_.front(); |
|
waiting_consumers_.pop_front(); |
|
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); |
|
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { |
|
// notify context |
|
active_ctx->schedule( consumer_ctx); |
|
break; |
|
} else if ( static_cast< std::intptr_t >( 0) == expected) { |
|
// no timed-wait op. |
|
// notify context |
|
active_ctx->schedule( consumer_ctx); |
|
break; |
|
} |
|
} |
|
// suspend this producer |
|
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); |
|
if ( ! active_ctx->wait_until( timeout_time, lk) ) { |
|
// clear slot |
|
slot * nil_slot = nullptr, * own_slot = & s; |
|
slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel); |
|
// resumed, value has not been consumed |
|
return channel_op_status::timeout; |
|
} |
|
// resumed |
|
if ( nullptr == s.ctx) { |
|
// value has been consumed |
|
return channel_op_status::success; |
|
} else { |
|
// channel was closed before value was consumed |
|
return channel_op_status::closed; |
|
} |
|
} else { |
|
detail::spinlock_lock lk{ splk_producers_ }; |
|
if ( BOOST_UNLIKELY( is_closed() ) ) { |
|
return channel_op_status::closed; |
|
} |
|
if ( is_empty_() ) { |
|
continue; |
|
} |
|
active_ctx->wait_link( waiting_producers_); |
|
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); |
|
// suspend this producer |
|
if ( ! active_ctx->wait_until( timeout_time, lk) ) { |
|
// relock local lk |
|
lk.lock(); |
|
// remove from waiting-queue |
|
waiting_producers_.remove( * active_ctx); |
|
return channel_op_status::timeout; |
|
} |
|
// resumed, slot maybe free |
|
} |
|
} |
|
} |
|
|
|
template< typename Clock, typename Duration > |
|
channel_op_status push_wait_until( value_type && value, |
|
std::chrono::time_point< Clock, Duration > const& timeout_time_) { |
|
context * active_ctx = context::active(); |
|
slot s{ std::move( value), active_ctx }; |
|
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); |
|
for (;;) { |
|
if ( BOOST_UNLIKELY( is_closed() ) ) { |
|
return channel_op_status::closed; |
|
} |
|
if ( try_push_( & s) ) { |
|
detail::spinlock_lock lk{ splk_consumers_ }; |
|
// notify one waiting consumer |
|
while ( ! waiting_consumers_.empty() ) { |
|
context * consumer_ctx = & waiting_consumers_.front(); |
|
waiting_consumers_.pop_front(); |
|
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); |
|
if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { |
|
// notify context |
|
active_ctx->schedule( consumer_ctx); |
|
break; |
|
} else if ( static_cast< std::intptr_t >( 0) == expected) { |
|
// no timed-wait op. |
|
// notify context |
|
active_ctx->schedule( consumer_ctx); |
|
break; |
|
} |
|
} |
|
// suspend this producer |
|
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); |
|
if ( ! active_ctx->wait_until( timeout_time, lk) ) { |
|
// clear slot |
|
slot * nil_slot = nullptr, * own_slot = & s; |
|
slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel); |
|
// resumed, value has not been consumed |
|
return channel_op_status::timeout; |
|
} |
|
// resumed |
|
if ( nullptr == s.ctx) { |
|
// value has been consumed |
|
return channel_op_status::success; |
|
} else { |
|
// channel was closed before value was consumed |
|
return channel_op_status::closed; |
|
} |
|
} else { |
|
detail::spinlock_lock lk{ splk_producers_ }; |
|
if ( BOOST_UNLIKELY( is_closed() ) ) { |
|
return channel_op_status::closed; |
|
} |
|
if ( is_empty_() ) { |
|
continue; |
|
} |
|
active_ctx->wait_link( waiting_producers_); |
|
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); |
|
// suspend this producer |
|
if ( ! active_ctx->wait_until( timeout_time, lk) ) { |
|
// relock local lk |
|
lk.lock(); |
|
// remove from waiting-queue |
|
waiting_producers_.remove( * active_ctx); |
|
return channel_op_status::timeout; |
|
} |
|
// resumed, slot maybe free |
|
} |
|
} |
|
} |
|
|
|
channel_op_status pop( value_type & value) { |
|
context * active_ctx = context::active(); |
|
slot * s = nullptr; |
|
for (;;) { |
|
if ( nullptr != ( s = try_pop_() ) ) { |
|
{ |
|
detail::spinlock_lock lk{ splk_producers_ }; |
|
// notify one waiting producer |
|
while ( ! waiting_producers_.empty() ) { |
|
context * producer_ctx = & waiting_producers_.front(); |
|
waiting_producers_.pop_front(); |
|
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); |
|
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { |
|
lk.unlock(); |
|
// notify context |
|
active_ctx->schedule( producer_ctx); |
|
break; |
|
} else if ( static_cast< std::intptr_t >( 0) == expected) { |
|
lk.unlock(); |
|
// no timed-wait op. |
|
// notify context |
|
active_ctx->schedule( producer_ctx); |
|
break; |
|
} |
|
} |
|
} |
|
value = std::move( s->value); |
|
// notify context |
|
#if defined(BOOST_NO_CXX14_STD_EXCHANGE) |
|
active_ctx->schedule( detail::exchange( s->ctx, nullptr) ); |
|
#else |
|
active_ctx->schedule( std::exchange( s->ctx, nullptr) ); |
|
#endif |
|
return channel_op_status::success; |
|
} else { |
|
detail::spinlock_lock lk{ splk_consumers_ }; |
|
if ( BOOST_UNLIKELY( is_closed() ) ) { |
|
return channel_op_status::closed; |
|
} |
|
if ( ! is_empty_() ) { |
|
continue; |
|
} |
|
active_ctx->wait_link( waiting_consumers_); |
|
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); |
|
// suspend this consumer |
|
active_ctx->suspend( lk); |
|
// resumed, slot mabye set |
|
} |
|
} |
|
} |
|
|
|
value_type value_pop() { |
|
context * active_ctx = context::active(); |
|
slot * s = nullptr; |
|
for (;;) { |
|
if ( nullptr != ( s = try_pop_() ) ) { |
|
{ |
|
detail::spinlock_lock lk{ splk_producers_ }; |
|
// notify one waiting producer |
|
while ( ! waiting_producers_.empty() ) { |
|
context * producer_ctx = & waiting_producers_.front(); |
|
waiting_producers_.pop_front(); |
|
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); |
|
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { |
|
lk.unlock(); |
|
// notify context |
|
active_ctx->schedule( producer_ctx); |
|
break; |
|
} else if ( static_cast< std::intptr_t >( 0) == expected) { |
|
lk.unlock(); |
|
// no timed-wait op. |
|
// notify context |
|
active_ctx->schedule( producer_ctx); |
|
break; |
|
} |
|
} |
|
} |
|
// consume value |
|
value_type value = std::move( s->value); |
|
// notify context |
|
#if defined(BOOST_NO_CXX14_STD_EXCHANGE) |
|
active_ctx->schedule( detail::exchange( s->ctx, nullptr) ); |
|
#else |
|
active_ctx->schedule( std::exchange( s->ctx, nullptr) ); |
|
#endif |
|
return std::move( value); |
|
} else { |
|
detail::spinlock_lock lk{ splk_consumers_ }; |
|
if ( BOOST_UNLIKELY( is_closed() ) ) { |
|
throw fiber_error{ |
|
std::make_error_code( std::errc::operation_not_permitted), |
|
"boost fiber: channel is closed" }; |
|
} |
|
if ( ! is_empty_() ) { |
|
continue; |
|
} |
|
active_ctx->wait_link( waiting_consumers_); |
|
active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release); |
|
// suspend this consumer |
|
active_ctx->suspend( lk); |
|
// resumed, slot mabye set |
|
} |
|
} |
|
} |
|
|
|
template< typename Rep, typename Period > |
|
channel_op_status pop_wait_for( value_type & value, |
|
std::chrono::duration< Rep, Period > const& timeout_duration) { |
|
return pop_wait_until( value, |
|
std::chrono::steady_clock::now() + timeout_duration); |
|
} |
|
|
|
template< typename Clock, typename Duration > |
|
channel_op_status pop_wait_until( value_type & value, |
|
std::chrono::time_point< Clock, Duration > const& timeout_time_) { |
|
context * active_ctx = context::active(); |
|
slot * s = nullptr; |
|
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); |
|
for (;;) { |
|
if ( nullptr != ( s = try_pop_() ) ) { |
|
{ |
|
detail::spinlock_lock lk{ splk_producers_ }; |
|
// notify one waiting producer |
|
while ( ! waiting_producers_.empty() ) { |
|
context * producer_ctx = & waiting_producers_.front(); |
|
waiting_producers_.pop_front(); |
|
std::intptr_t expected = reinterpret_cast< std::intptr_t >( this); |
|
if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) { |
|
lk.unlock(); |
|
// notify context |
|
active_ctx->schedule( producer_ctx); |
|
break; |
|
} else if ( static_cast< std::intptr_t >( 0) == expected) { |
|
lk.unlock(); |
|
// no timed-wait op. |
|
// notify context |
|
active_ctx->schedule( producer_ctx); |
|
break; |
|
} |
|
} |
|
} |
|
// consume value |
|
value = std::move( s->value); |
|
// notify context |
|
#if defined(BOOST_NO_CXX14_STD_EXCHANGE) |
|
active_ctx->schedule( detail::exchange( s->ctx, nullptr) ); |
|
#else |
|
active_ctx->schedule( std::exchange( s->ctx, nullptr) ); |
|
#endif |
|
return channel_op_status::success; |
|
} else { |
|
detail::spinlock_lock lk{ splk_consumers_ }; |
|
if ( BOOST_UNLIKELY( is_closed() ) ) { |
|
return channel_op_status::closed; |
|
} |
|
if ( ! is_empty_() ) { |
|
continue; |
|
} |
|
active_ctx->wait_link( waiting_consumers_); |
|
active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release); |
|
// suspend this consumer |
|
if ( ! active_ctx->wait_until( timeout_time, lk) ) { |
|
// relock local lk |
|
lk.lock(); |
|
// remove from waiting-queue |
|
waiting_consumers_.remove( * active_ctx); |
|
return channel_op_status::timeout; |
|
} |
|
} |
|
} |
|
} |
|
|
|
class iterator { |
|
private: |
|
typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type; |
|
|
|
unbuffered_channel * chan_{ nullptr }; |
|
storage_type storage_; |
|
|
|
void increment_() { |
|
BOOST_ASSERT( nullptr != chan_); |
|
try { |
|
::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() }; |
|
} catch ( fiber_error const&) { |
|
chan_ = nullptr; |
|
} |
|
} |
|
|
|
public: |
|
typedef std::input_iterator_tag iterator_category; |
|
typedef std::ptrdiff_t difference_type; |
|
typedef value_type * pointer; |
|
typedef value_type & reference; |
|
|
|
typedef pointer pointer_t; |
|
typedef reference reference_t; |
|
|
|
iterator() noexcept = default; |
|
|
|
explicit iterator( unbuffered_channel< T > * chan) noexcept : |
|
chan_{ chan } { |
|
increment_(); |
|
} |
|
|
|
iterator( iterator const& other) noexcept : |
|
chan_{ other.chan_ } { |
|
} |
|
|
|
iterator & operator=( iterator const& other) noexcept { |
|
if ( this == & other) return * this; |
|
chan_ = other.chan_; |
|
return * this; |
|
} |
|
|
|
bool operator==( iterator const& other) const noexcept { |
|
return other.chan_ == chan_; |
|
} |
|
|
|
bool operator!=( iterator const& other) const noexcept { |
|
return other.chan_ != chan_; |
|
} |
|
|
|
iterator & operator++() { |
|
reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type(); |
|
increment_(); |
|
return * this; |
|
} |
|
|
|
iterator operator++( int) = delete; |
|
|
|
reference_t operator*() noexcept { |
|
return * reinterpret_cast< value_type * >( std::addressof( storage_) ); |
|
} |
|
|
|
pointer_t operator->() noexcept { |
|
return reinterpret_cast< value_type * >( std::addressof( storage_) ); |
|
} |
|
}; |
|
|
|
friend class iterator; |
|
}; |
|
|
|
template< typename T > |
|
typename unbuffered_channel< T >::iterator |
|
begin( unbuffered_channel< T > & chan) { |
|
return typename unbuffered_channel< T >::iterator( & chan); |
|
} |
|
|
|
template< typename T > |
|
typename unbuffered_channel< T >::iterator |
|
end( unbuffered_channel< T > &) { |
|
return typename unbuffered_channel< T >::iterator(); |
|
} |
|
|
|
}} |
|
|
|
#ifdef BOOST_HAS_ABI_HEADERS |
|
# include BOOST_ABI_SUFFIX |
|
#endif |
|
|
|
#endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H
|
|
|