41 struct no_control : std::logic_error {
no_control() : std::logic_error{
"neolib::event_handle::no_control" } {} };
44 iControl{ &aControl }, iRef{ aControl.
get(), aId }, iPrimary{
true }
50 iControl{ aOther.iControl }, iRef{ aOther.iRef }, iPrimary{
false }
56 iControl{ aOther.iControl }, iRef{ aOther.iRef }, iPrimary{
false }
58 aOther.iPrimary =
false;
76 auto oldControl = iControl;
77 iControl = aRhs.iControl;
80 if (oldControl !=
nullptr)
81 oldControl->release();
88 return iControl !=
nullptr;
119 struct no_event : std::logic_error {
no_event() : std::logic_error{
"neolib::event_control::no_event" } {} };
122 iEvent{ &aEvent }, iRefCount{ 0u }
128 get().release_control();
137 if (--iRefCount == 0u)
142 return iEvent !=
nullptr;
156 std::atomic<i_event*> iEvent;
157 std::atomic<uint32_t> iRefCount;
160 template <
typename... Args>
163 template <
typename...>
167 typedef std::function<void(Args...)> function_type;
168 typedef std::shared_ptr<function_type> handler_ptr;
169 typedef std::tuple<Args...> argument_pack;
172 iEvent{ aEvent }, iHandler{ aHandler }, iArguments{ aArguments... }
182 std::apply(*iHandler, iArguments);
186 handler_ptr iHandler;
187 argument_pack iArguments;
195 template <
typename...>
203 typedef uint64_t transaction;
204 typedef std::optional<transaction> optional_transaction;
205 typedef std::unique_ptr<i_event_callback> callback_ptr;
206 typedef std::deque<std::pair<transaction, callback_ptr>> event_list_t;
216 transaction
enqueue(callback_ptr aCallback,
const optional_transaction& aTransaction = {})
218 return add(std::move(aCallback), aTransaction);
220 void unqueue(
const i_event& aEvent);
225 bool terminated()
const;
226 transaction add(callback_ptr aCallback,
const optional_transaction& aTransaction);
227 void remove(
const i_event& aEvent);
228 bool has(
const i_event& aEvent)
const;
229 bool publish_events();
232 std::recursive_mutex iMutex;
233 std::unique_ptr<callback_timer> iTimer;
234 event_list_t iEvents;
235 std::atomic<bool> iTerminated;
237 std::atomic<uint32_t> iPublishNestingLevel;
238 std::vector<std::unique_ptr<event_list_t>> iPublishCache;
239 transaction iNextTransaction;
251 template <
typename... Args>
258 typedef std::optional<std::scoped_lock<std::recursive_mutex>> optional_scoped_lock;
259 typedef typename event_callback<Args...>::function_type function_type;
260 typedef typename event_callback<Args...>::handler_ptr handler_ptr;
261 typedef async_event_queue::optional_transaction optional_async_transaction;
269 queueDestroyed{ queue }
272 typedef std::shared_ptr<queue_ref> queue_ref_ptr;
275 queue_ref_ptr queueRef;
276 uint32_t referenceCount;
277 const void* clientId;
278 handler_ptr callback;
279 bool handleInSameThreadAsEmitter;
280 uint64_t triggerId = 0ull;
284 const void* clientId,
285 handler_ptr callback,
286 bool handleInSameThreadAsEmitter =
false) :
287 queueRef{ std::make_shared<queue_ref>(
queue) },
288 referenceCount{ 0u },
289 clientId{ clientId },
290 callback{ callback },
291 handleInSameThreadAsEmitter{ handleInSameThreadAsEmitter }
298 std::atomic<bool> handlersChanged;
301 handlersChanged{
false }
304 context(
const context& aOther) :
305 accepted{ aOther.accepted },
306 handlersChanged{ aOther.handlersChanged.load() }
309 context&
operator=(
const context& aOther)
311 accepted = aOther.accepted;
312 handlersChanged = aOther.handlersChanged.load();
316 typedef std::vector<context> context_list_t;
319 bool ignoreErrors =
false;
321 handler_list_t handlers;
322 context_list_t contexts;
323 bool triggering =
false;
324 uint64_t triggerId = 0ull;
325 std::atomic<bool> handlersChanged =
false;
326 std::atomic<uint32_t> filterCount;
329 event() : iAlias{ *
this }, iMutex{ std::make_shared<std::recursive_mutex>() }, iControl{
nullptr }, iInstanceDataPtr{
nullptr }
337 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
356 if (iControl !=
nullptr)
358 iControl.load()->reset();
359 iControl.store(
nullptr);
364 instance().handlers[aHandleId].handleInSameThreadAsEmitter =
true;
369 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
370 instance().contexts.emplace_back();
374 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
375 instance().contexts.pop_back();
386 instance().ignoreErrors =
true;
390 return instance().triggerType;
394 instance().triggerType = aTriggerType;
400 switch (trigger_type())
406 return sync_trigger(aArguments...);
409 async_trigger(aArguments...);
420 optional_scoped_lock lock{ *mutex };
421 if (instance().handlers.empty() && !filtered())
430 if (instance().contexts.back().accepted)
436 if (instance().handlers.empty())
439 if (!instance().triggering)
441 instance().triggering =
true;
442 instance().triggerId = 0ull;
443 for (
auto& handler : instance().handlers)
444 handler.triggerId = 0ull;
446 auto triggerId = ++instance().triggerId;
447 optional_async_transaction transaction;
448 for (std::size_t handlerIndex = {}; handlerIndex < instance().handlers.size();)
450 auto& handler = instance().handlers.at_index(handlerIndex++);
451 if (handler.triggerId < triggerId)
452 handler.triggerId = triggerId;
453 else if (handler.triggerId == triggerId)
457 transaction = enqueue(lock, handler,
false, transaction, aArguments...);
468 if (instance().contexts.back().accepted)
473 if (instance().handlersChanged.exchange(
false))
486 optional_scoped_lock lock{ *mutex };
487 if (instance().handlers.empty())
491 if (!instance().triggering)
493 instance().triggering =
true;
494 instance().triggerId = 0ull;
495 for (
auto& handler : instance().handlers)
496 handler.triggerId = 0ull;
498 auto triggerId = ++instance().triggerId;
499 optional_async_transaction transaction;
500 for (std::size_t handlerIndex = {}; handlerIndex < instance().handlers.size();)
502 auto& handler = instance().handlers.at_index(handlerIndex++);
503 if (handler.triggerId < triggerId)
504 handler.triggerId = triggerId;
505 else if (handler.triggerId == triggerId)
507 transaction = enqueue(lock, handler,
true, transaction, aArguments...);
510 if (instance().handlersChanged.exchange(
false))
516 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
517 return !instance().contexts.empty() ? instance().contexts.back().accepted :
false;
521 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
522 instance().contexts.back().accepted =
true;
526 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
527 instance().contexts.back().accepted =
false;
532 return instance().filterCount > 0u;
536 ++instance().filterCount;
540 --instance().filterCount;
544 instance().filterCount = 0u;
549 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
550 invalidate_handler_list();
555 return subscribe(aHandlerCallback, aUniqueId);
557 template <
typename T>
560 return subscribe(aHandlerCallback, static_cast<const void*>(aClientId));
562 template <
typename T>
565 return subscribe(aHandlerCallback, static_cast<const void*>(aClientId));
567 template <
typename T>
570 return subscribe(aHandlerCallback, static_cast<const void*>(&aClientId));
572 template <
typename T>
575 return subscribe(aHandlerCallback, static_cast<const void*>(&aClientId));
579 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
580 invalidate_handler_list();
581 instance().handlers.remove(aHandle.
id());
585 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
586 invalidate_handler_list();
587 for (
auto h = instance().handlers.begin(); h != instance().handlers.end();)
588 if (h->clientId == aClientId)
589 h = instance().handlers.erase(h);
593 template <
typename T>
596 return unsubscribe(static_cast<const void*>(aClientId));
598 template <
typename T>
601 return unsubscribe(static_cast<const void*>(&aClientId));
604 void add_ref(
cookie aCookie)
override 606 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
607 ++instance().handlers[aCookie].referenceCount;
609 void release(
cookie aCookie)
override 611 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
612 if (--instance().handlers[aCookie].referenceCount == 0u)
613 instance().handlers.remove(aCookie);
615 long use_count(
cookie aCookie)
const override 617 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
618 return instance().handlers[aCookie].referenceCount;
621 void invalidate_handler_list()
const 623 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
624 instance().handlersChanged =
true;
625 for (
auto& context : instance().contexts)
626 context.handlersChanged =
true;
628 optional_async_transaction enqueue(optional_scoped_lock& lock, handler& aHandler,
bool aAsync,
const optional_async_transaction& aAsyncTransaction, Args... aArguments)
const 630 optional_async_transaction transaction;
632 if (!aAsync && !aHandler.queueRef->queueDestroyed && &aHandler.queueRef->queue == &emitterQueue)
635 auto callback = aHandler.callback;
637 (*callback)(aArguments...);
638 lock.emplace(*mutex);
642 auto ecb = std::make_unique<
event_callback<Args...>>(*
this, aHandler.callback, aArguments...);
643 if (aHandler.handleInSameThreadAsEmitter)
644 transaction = emitterQueue.enqueue(std::move(ecb), aAsyncTransaction);
647 if (!aHandler.queueRef->queueDestroyed)
648 transaction = aHandler.queueRef->queue.enqueue(std::move(ecb), aAsyncTransaction);
649 else if (!instance().ignoreErrors)
657 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
658 std::unordered_set<async_event_queue*> queues;
659 for (
auto const& h : instance().handlers)
660 if (!h.queueRef->queueDestroyed)
661 queues.insert(&h.queueRef->queue);
662 for (
auto const& q : queues)
667 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
668 for (
auto& h : instance().handlers)
669 if (!h.queueRef->queueDestroyed)
670 h.queueRef->queue.remove(*
this);
671 iInstanceDataPtr =
nullptr;
672 iInstanceData = std::nullopt;
674 bool is_controlled()
const 676 return iControl !=
nullptr;
680 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
681 if (iControl ==
nullptr)
688 bool has_instance()
const 690 return iInstanceDataPtr !=
nullptr;
692 instance_data& instance()
const 694 if (iInstanceDataPtr !=
nullptr)
695 return *iInstanceDataPtr;
696 std::scoped_lock<std::recursive_mutex> lock{ *iMutex };
697 iInstanceData.emplace();
698 iInstanceDataPtr = &*iInstanceData;
699 return *iInstanceDataPtr;
703 mutable std::shared_ptr<std::recursive_mutex> iMutex;
704 mutable std::atomic<i_event_control*> iControl;
705 mutable std::optional<instance_data> iInstanceData;
706 mutable std::atomic<instance_data*> iInstanceDataPtr;
717 iHandles.push_back(aHandle);
721 iHandles.push_back(std::move(aHandle));
724 iHandles{ aSink.iHandles }
736 iHandles = aSink.iHandles;
742 return *
this =
sink{ aHandle };
746 return *
this =
sink{ std::move(aHandle) };
751 iHandles.insert(iHandles.end(), s.iHandles.begin(), s.iHandles.end());
756 sink s{ std::move(aHandle) };
757 iHandles.insert(iHandles.end(), s.iHandles.begin(), s.iHandles.end());
766 mutable std::vector<event_handle> iHandles;
static async_event_queue & instance()
friend class async_event_queue
event_handle operator()(const function_type &aHandlerCallback, const void *aUniqueId=nullptr) const
void release_control() override
std::recursive_mutex & mutex() const
bool valid() const override
void filter_added() const override
sink & operator+=(event_handle &&aHandle)
void unsubscribe(event_handle aHandle) const
event_handle subscribe(const function_type &aHandlerCallback, const T *aClientId) const
const i_event & event() const override
event_callback(const i_event &aEvent, handler_ptr aHandler, Args... aArguments)
void accept() const override
i_event_control & control() const
event_handle subscribe(const function_type &aHandlerCallback, const void *aUniqueId=nullptr) const
void push_context() const override
event_handle subscribe(const function_type &aHandlerCallback, const T &aClientId) const
sink(event_handle &&aHandle)
virtual i_event & get() const =0
transaction enqueue(callback_ptr aCallback, const optional_transaction &aTransaction={})
void filters_removed() const override
event_control(i_event &aEvent)
virtual void handle_in_same_thread_as_emitter(cookie aHandleId)=0
virtual void uninstall_event_filter(i_event_filter &aFilter, const i_event &aEvent)=0
event_handle(i_event_control &aControl, cookie aId)
bool accepted() const override
async_event_queue_needs_a_task()
virtual void pre_filter_event(const i_event &aEvent) const =0
bool have_control() const
void async_trigger(Args... aArguments) const
sink & operator=(const sink &aSink)
void unsubscribe(const void *aClientId) const
std::vector< operation > queue
event_handle & operator=(const event_handle &aRhs)
i_event_filter_registry & filter_registry()
event_handle operator()(const function_type &aHandlerCallback, const T &aClientId) const
sink(const event_handle &aHandle)
event_handle(const event_handle &aOther)
void call() const override
bool sync_trigger(Args... aArguments) const
void unsubscribe(const T &aClientId) const
event_trigger_type trigger_type() const
event_handle operator()(const function_type &aHandlerCallback, const T *aClientId) const
sink & operator=(event_handle &&aHandle)
event_handle & operator~()
event_handle(event_handle &&aOther)
void set_trigger_type(event_trigger_type aTriggerType)
sink & operator+=(const event_handle &aHandle)
async_event_queue_already_instantiated()
bool filtered() const override
async_event_queue_terminated()
virtual void filter_event(const i_event &aEvent) const =0
void ignore() const override
void pre_trigger() const override
auto destroyed(Object &aObject, const Handler aHandler)
sink & operator=(const event_handle &aHandle)
void pop_context() const override
bool trigger(Args... aArguments) const
void filter_removed() const override
void unsubscribe(const T *aClientId) const
void handle_in_same_thread_as_emitter(cookie aHandleId) override