Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ build
.vs
.cache
.vscode
.idea
compile_commands.json
stagedir

Expand Down
197 changes: 108 additions & 89 deletions include/beman/execution/detail/associate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,136 +14,154 @@
#include <beman/execution/detail/make_sender.hpp>
#include <beman/execution/detail/default_impls.hpp>
#include <beman/execution/detail/impls_for.hpp>
#include <type_traits>
#include <optional>
#include <beman/execution/detail/sender_adaptor.hpp>
#include <memory>
#include <utility>

// ----------------------------------------------------------------------------

namespace beman::execution::detail {
template <::beman::execution::scope_token Token,
::beman::execution::sender Sender> //-dk:TODO detail export
template <::beman::execution::scope_token Token, ::beman::execution::sender Sender>
struct associate_data {
using wrap_sender = ::std::remove_cvref_t<decltype(::std::declval<Token&>().wrap(::std::declval<Sender>()))>;
using assoc_t = decltype(::std::declval<Token&>().try_associate());
using sender_ref = ::std::unique_ptr<wrap_sender, decltype(std::ranges::destroy_at)>;

explicit associate_data(Token t, Sender&& s) : sender(t.wrap(::std::forward<Sender>(s))) {
sender_ref guard{::std::addressof(this->sender)};
this->assoc = t.try_associate();
if (this->assoc) {
static_cast<void>(guard.release());
}
}

explicit associate_data(Token t, Sender&& s) : token(t), sender(this->token.wrap(::std::forward<Sender>(s))) {
if (!token.try_associate()) {
this->sender.reset();
explicit associate_data(::std::pair<assoc_t, sender_ref> parts) : assoc(::std::move(parts.first)) {
if (this->assoc) {
::std::construct_at(::std::addressof(this->sender), ::std::move(*parts.second));
}
}

associate_data(const associate_data& other) noexcept(::std::is_nothrow_copy_constructible_v<wrap_sender> &&
noexcept(token.try_associate()))
: token(other.token), sender() {
if (other.sender && this->token.try_associate()) {
try {
this->sender.emplace(*other.sender);
} catch (...) {
this->token.disassociate();
}
noexcept(other.assoc.try_associate()))
requires ::std::copy_constructible<wrap_sender>
: assoc(other.assoc.try_associate()) {
if (this->assoc) {
::std::construct_at(::std::addressof(this->sender), other.sender);
}
}

associate_data(associate_data&& other) noexcept(::std::is_nothrow_move_constructible_v<wrap_sender>)
: token(other.token), sender(::std::move(other.sender)) {
other.sender.reset();
}
: associate_data(::std::move(other).release()) {}

auto operator=(const associate_data&) -> associate_data& = delete;
auto operator=(associate_data&&) -> associate_data& = delete;

auto operator=(associate_data&&) -> associate_data& = delete;

~associate_data() {
if (this->sender) {
this->sender.reset();
this->token.disassociate();
if (this->assoc) {
::std::destroy_at(::std::addressof(this->sender));
}
}

auto release() -> ::std::optional<::std::pair<Token, wrap_sender>> {
return this->sender ? (std::unique_ptr<std::optional<wrap_sender>, decltype([](auto* opt) { opt->reset(); })>(
&this->sender),
::std::optional{::std::pair{::std::move(this->token), ::std::move(*this->sender)}})
: ::std::optional<::std::pair<Token, wrap_sender>>{};
auto release() && noexcept -> ::std::pair<assoc_t, sender_ref> {
return {::std::move(assoc), sender_ref{assoc ? ::std::addressof(this->sender) : nullptr}};
}

Token token;
::std::optional<wrap_sender> sender;
assoc_t assoc;
union {
wrap_sender sender;
};
};
template <::beman::execution::scope_token Token, ::beman::execution::sender Sender>
associate_data(Token, Sender&&) -> associate_data<Token, Sender>;

struct associate_t {
template <::beman::execution::sender Sender, ::beman::execution::scope_token Token>
auto operator()(Sender&& sender, Token&& token) const {
auto operator()(Sender&& sender, Token token) const {
auto domain(::beman::execution::detail::get_domain_early(sender));
return ::beman::execution::transform_sender(
domain,
::beman::execution::detail::make_sender(
*this,
::beman::execution::detail::associate_data(::std::forward<Token>(token),
::std::forward<Sender>(sender))));
::beman::execution::detail::associate_data(::std::move(token), ::std::forward<Sender>(sender))));
}

template <::beman::execution::scope_token Token>
auto operator()(Token token) const {
return ::beman::execution::detail::sender_adaptor{*this, ::std::move(token)};
}
};

template <>
struct impls_for<associate_t> : ::beman::execution::detail::default_impls {
template <typename, typename>
struct get_noexcept : ::std::false_type {};
template <typename Tag, typename Data, typename Receiver>
struct get_noexcept<::beman::execution::detail::basic_sender<Tag, Data>, Receiver>
: ::std::bool_constant<
::std::is_nothrow_move_constructible_v<typename ::std::remove_cvref_t<Data>::wrap_sender>&& ::beman::
execution::detail::nothrow_callable<::beman::execution::connect_t,
typename ::std::remove_cvref_t<Data>::wrap_sender,
Receiver>> {};
template <typename>
struct get_wrap_sender;

template <typename Tag, typename Data>
struct get_wrap_sender<::beman::execution::detail::basic_sender<Tag, Data>> {
using type = typename ::std::remove_cvref_t<Data>::wrap_sender;
};

template <typename AssociateData, typename Receiver>
struct op_state {
using assoc_t = typename AssociateData::assoc_t;
using sender_ref_t = typename AssociateData::sender_ref;
using op_t = ::beman::execution::connect_result_t<typename sender_ref_t::element_type, Receiver>;

assoc_t assoc;
union {
Receiver* rcvr;
op_t op;
};

explicit op_state(::std::pair<assoc_t, sender_ref_t> parts, Receiver& r) : assoc(::std::move(parts.first)) {
if (assoc) {
::std::construct_at(::std::addressof(op),
::beman::execution::connect(::std::move(*parts.second), ::std::move(r)));
} else {
rcvr = ::std::addressof(r);
}
}

explicit op_state(AssociateData&& ad, Receiver& r) : op_state(::std::move(ad).release(), r) {}

explicit op_state(const AssociateData& ad, Receiver& r)
requires ::std::copy_constructible<AssociateData>
: op_state(AssociateData(ad).release(), r) {}

op_state(const op_state&) = delete;

op_state(op_state&&) = delete;

~op_state() {
if (this->assoc) {
op.~op_t();
}
}

auto operator=(const op_state&) -> op_state& = delete;

auto operator=(op_state&&) -> op_state& = delete;

auto run() noexcept -> void {
if (this->assoc) {
::beman::execution::start(this->op);
} else {
::beman::execution::set_stopped(::std::move(*this->rcvr));
}
}
};

struct get_state_impl {
template <typename Sender, typename Receiver>
auto operator()(Sender&& sender, Receiver& receiver) const
noexcept(::std::is_nothrow_constructible_v<::std::remove_cvref_t<Sender>, Sender> &&
get_noexcept<::std::remove_cvref_t<Sender>, Receiver>::value) {
noexcept((::std::same_as<Sender, ::std::remove_cvref_t<Sender>> ||
::std::is_nothrow_constructible_v<::std::remove_cvref_t<Sender>, Sender>) &&
execution::detail::nothrow_callable<::beman::execution::connect_t,
typename get_wrap_sender<::std::remove_cvref_t<Sender>>::type,
Receiver>) {
auto [_, data] = ::std::forward<Sender>(sender);
auto dataParts{data.release()};

using scope_token = decltype(dataParts->first);
using wrap_sender = decltype(dataParts->second);
using op_t = decltype(::beman::execution::connect(::std::move(dataParts->second),
::std::forward<Receiver>(receiver)));

struct op_state {
using sop_t = op_t;
using sscope_token = scope_token;
struct assoc_t {
sscope_token tok;
sop_t op;
};

bool associated{false};
union {
Receiver* rcvr;
assoc_t assoc;
};
explicit op_state(Receiver& r) noexcept : rcvr(::std::addressof(r)) {}
explicit op_state(sscope_token tk, wrap_sender&& sndr, Receiver& r) try
: associated(true), assoc(tk, ::beman::execution::connect(::std::move(sndr), ::std::move(r))) {
} catch (...) {
tk.disassociate();
throw;
}
op_state(op_state&&) = delete;
~op_state() {
if (this->associated) {
this->assoc.op.~sop_t();
this->assoc.tok.disassociate();
this->assoc.tok.~sscope_token();
}
}
auto run() noexcept -> void {
if (this->associated) {
::beman::execution::start(this->assoc.op);
} else {
::beman::execution::set_stopped(::std::move(*this->rcvr));
}
}
};
return dataParts ? op_state(::std::move(dataParts->first), ::std::move(dataParts->second), receiver)
: op_state(receiver);
return op_state{::beman::execution::detail::forward_like<Sender>(data), receiver};
}
};
static constexpr auto get_state{get_state_impl{}};
Expand All @@ -157,7 +175,8 @@ template <typename Data, typename Env>
struct completion_signatures_for_impl<
::beman::execution::detail::basic_sender<::beman::execution::detail::associate_t, Data>,
Env> {
using type = ::beman::execution::completion_signatures<::beman::execution::set_value_t()>;
using child_type_t = typename ::std::remove_cvref_t<Data>::wrap_sender;
using type = ::beman::execution::detail::completion_signatures_for<child_type_t, Env>;
};
} // namespace beman::execution::detail

Expand Down
5 changes: 2 additions & 3 deletions include/beman/execution/detail/counting_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class beman::execution::counting_scope : public ::beman::execution::detail::coun

// ----------------------------------------------------------------------------

class beman::execution::counting_scope::token : public ::beman::execution::detail::counting_scope_base::token {
class beman::execution::counting_scope::token : public beman::execution::counting_scope::token_base {
public:
template <::beman::execution::sender Sender>
auto wrap(Sender&& sender) const noexcept -> ::beman::execution::sender auto {
Expand All @@ -49,8 +49,7 @@ class beman::execution::counting_scope::token : public ::beman::execution::detai

private:
friend class beman::execution::counting_scope;
explicit token(::beman::execution::counting_scope* s)
: ::beman::execution::detail::counting_scope_base::token(s) {}
explicit token(::beman::execution::counting_scope* s) : token_base(s) {}
};
static_assert(::beman::execution::scope_token<::beman::execution::counting_scope::token>);

Expand Down
61 changes: 47 additions & 14 deletions include/beman/execution/detail/counting_scope_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,47 @@ class beman::execution::detail::counting_scope_base : ::beman::execution::detail
auto start_node(node*) -> void;

protected:
class token {
class assoc_t {
public:
auto try_associate() const noexcept -> bool { return this->scope->try_associate(); }
auto disassociate() const noexcept -> void { this->scope->disassociate(); }
assoc_t() = default;

explicit assoc_t(counting_scope_base& scope) noexcept : scope(&scope) {}

assoc_t(const assoc_t&) = delete;

assoc_t(assoc_t&& other) noexcept : scope(::std::exchange(other.scope, nullptr)) {}

~assoc_t() {
if (this->scope) {
this->scope->disassociate();
}
}

auto operator=(assoc_t other) noexcept -> assoc_t& {
std::swap(scope, other.scope);
return *this;
}

explicit operator bool() const noexcept { return this->scope != nullptr; }

auto try_associate() const noexcept -> assoc_t {
if (this->scope) {
return this->scope->try_associate();
}
return assoc_t{};
}

private:
counting_scope_base* scope = nullptr;
};

class token_base {
public:
auto try_associate() const noexcept -> assoc_t { return this->scope->try_associate(); }

protected:
explicit token(::beman::execution::detail::counting_scope_base* s) : scope(s) {}
::beman::execution::detail::counting_scope_base* scope;
explicit token_base(counting_scope_base* s) : scope(s) {}
counting_scope_base* scope;
};

private:
Expand All @@ -58,10 +91,10 @@ class beman::execution::detail::counting_scope_base : ::beman::execution::detail
joined
};

auto try_associate() noexcept -> bool;
auto try_associate() noexcept -> assoc_t;
auto disassociate() noexcept -> void;
auto complete() noexcept -> void;
auto add_node(node* n, ::std::lock_guard<::std::mutex>&) noexcept -> void;
auto add_node(node* n) noexcept -> void;

::std::mutex mutex;
//-dk:TODO fuse state and count and use atomic accesses
Expand Down Expand Up @@ -100,23 +133,22 @@ inline auto beman::execution::detail::counting_scope_base::close() noexcept -> v
}
}

inline auto beman::execution::detail::counting_scope_base::add_node(node* n, ::std::lock_guard<::std::mutex>&) noexcept
-> void {
inline auto beman::execution::detail::counting_scope_base::add_node(node* n) noexcept -> void {
n->next = std::exchange(this->head, n);
}

inline auto beman::execution::detail::counting_scope_base::try_associate() noexcept -> bool {
inline auto beman::execution::detail::counting_scope_base::try_associate() noexcept -> assoc_t {
::std::lock_guard lock(this->mutex);
switch (this->state) {
default:
return false;
return assoc_t{};
case state_t::unused:
this->state = state_t::open; // fall-through!
[[fallthrough]];
case state_t::open:
case state_t::open_and_joining:
++this->count;
return true;
return assoc_t{*this};
}
}

Expand All @@ -141,12 +173,13 @@ inline auto beman::execution::detail::counting_scope_base::complete() noexcept -
}

inline auto beman::execution::detail::counting_scope_base::start_node(node* n) -> void {
::std::lock_guard kerberos(this->mutex);
::std::unique_lock guard(this->mutex);
switch (this->state) {
case ::beman::execution::detail::counting_scope_base::state_t::unused:
case ::beman::execution::detail::counting_scope_base::state_t::unused_and_closed:
case ::beman::execution::detail::counting_scope_base::state_t::joined:
this->state = ::beman::execution::detail::counting_scope_base::state_t::joined;
guard.unlock();
n->complete_inline();
return;
case ::beman::execution::detail::counting_scope_base::state_t::open:
Expand All @@ -160,7 +193,7 @@ inline auto beman::execution::detail::counting_scope_base::start_node(node* n) -
case ::beman::execution::detail::counting_scope_base::state_t::closed_and_joining:
break;
}
this->add_node(n, kerberos);
this->add_node(n);
}

// ----------------------------------------------------------------------------
Expand Down
Loading