initial commit, 4.5 stable
Some checks failed
🔗 GHA / 📊 Static checks (push) Has been cancelled
🔗 GHA / 🤖 Android (push) Has been cancelled
🔗 GHA / 🍏 iOS (push) Has been cancelled
🔗 GHA / 🐧 Linux (push) Has been cancelled
🔗 GHA / 🍎 macOS (push) Has been cancelled
🔗 GHA / 🏁 Windows (push) Has been cancelled
🔗 GHA / 🌐 Web (push) Has been cancelled
Some checks failed
🔗 GHA / 📊 Static checks (push) Has been cancelled
🔗 GHA / 🤖 Android (push) Has been cancelled
🔗 GHA / 🍏 iOS (push) Has been cancelled
🔗 GHA / 🐧 Linux (push) Has been cancelled
🔗 GHA / 🍎 macOS (push) Has been cancelled
🔗 GHA / 🏁 Windows (push) Has been cancelled
🔗 GHA / 🌐 Web (push) Has been cancelled
This commit is contained in:
257
core/templates/command_queue_mt.h
Normal file
257
core/templates/command_queue_mt.h
Normal file
@@ -0,0 +1,257 @@
|
||||
/**************************************************************************/
|
||||
/* command_queue_mt.h */
|
||||
/**************************************************************************/
|
||||
/* This file is part of: */
|
||||
/* GODOT ENGINE */
|
||||
/* https://godotengine.org */
|
||||
/**************************************************************************/
|
||||
/* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */
|
||||
/* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */
|
||||
/* */
|
||||
/* Permission is hereby granted, free of charge, to any person obtaining */
|
||||
/* a copy of this software and associated documentation files (the */
|
||||
/* "Software"), to deal in the Software without restriction, including */
|
||||
/* without limitation the rights to use, copy, modify, merge, publish, */
|
||||
/* distribute, sublicense, and/or sell copies of the Software, and to */
|
||||
/* permit persons to whom the Software is furnished to do so, subject to */
|
||||
/* the following conditions: */
|
||||
/* */
|
||||
/* The above copyright notice and this permission notice shall be */
|
||||
/* included in all copies or substantial portions of the Software. */
|
||||
/* */
|
||||
/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
|
||||
/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
|
||||
/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */
|
||||
/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
|
||||
/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
|
||||
/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
|
||||
/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
|
||||
/**************************************************************************/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "core/object/worker_thread_pool.h"
|
||||
#include "core/os/condition_variable.h"
|
||||
#include "core/os/mutex.h"
|
||||
#include "core/templates/local_vector.h"
|
||||
#include "core/templates/simple_type.h"
|
||||
#include "core/templates/tuple.h"
|
||||
#include "core/typedefs.h"
|
||||
|
||||
class CommandQueueMT {
|
||||
struct CommandBase {
|
||||
bool sync = false;
|
||||
virtual void call() = 0;
|
||||
virtual ~CommandBase() = default;
|
||||
|
||||
CommandBase(bool p_sync) :
|
||||
sync(p_sync) {}
|
||||
};
|
||||
|
||||
template <typename T, typename M, bool NeedsSync, typename... Args>
|
||||
struct Command : public CommandBase {
|
||||
T *instance;
|
||||
M method;
|
||||
Tuple<GetSimpleTypeT<Args>...> args;
|
||||
|
||||
template <typename... FwdArgs>
|
||||
_FORCE_INLINE_ Command(T *p_instance, M p_method, FwdArgs &&...p_args) :
|
||||
CommandBase(NeedsSync), instance(p_instance), method(p_method), args(std::forward<FwdArgs>(p_args)...) {}
|
||||
|
||||
void call() {
|
||||
call_impl(BuildIndexSequence<sizeof...(Args)>{});
|
||||
}
|
||||
|
||||
private:
|
||||
template <size_t... I>
|
||||
_FORCE_INLINE_ void call_impl(IndexSequence<I...>) {
|
||||
// Move out of the Tuple, this will be destroyed as soon as the call is complete.
|
||||
(instance->*method)(std::move(get<I>())...);
|
||||
}
|
||||
|
||||
// This method exists so we can call it in the parameter pack expansion in call_impl.
|
||||
template <size_t I>
|
||||
_FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
|
||||
};
|
||||
|
||||
// Separate class from Command so we can save the space of the ret pointer for commands that don't return.
|
||||
template <typename T, typename M, typename R, typename... Args>
|
||||
struct CommandRet : public CommandBase {
|
||||
T *instance;
|
||||
M method;
|
||||
R *ret;
|
||||
Tuple<GetSimpleTypeT<Args>...> args;
|
||||
|
||||
_FORCE_INLINE_ CommandRet(T *p_instance, M p_method, R *p_ret, GetSimpleTypeT<Args>... p_args) :
|
||||
CommandBase(true), instance(p_instance), method(p_method), ret(p_ret), args{ p_args... } {}
|
||||
|
||||
void call() override {
|
||||
*ret = call_impl(BuildIndexSequence<sizeof...(Args)>{});
|
||||
}
|
||||
|
||||
private:
|
||||
template <size_t... I>
|
||||
_FORCE_INLINE_ R call_impl(IndexSequence<I...>) {
|
||||
// Move out of the Tuple, this will be destroyed as soon as the call is complete.
|
||||
return (instance->*method)(std::move(get<I>())...);
|
||||
}
|
||||
|
||||
// This method exists so we can call it in the parameter pack expansion in call_impl.
|
||||
template <size_t I>
|
||||
_FORCE_INLINE_ auto &get() { return ::tuple_get<I>(args); }
|
||||
};
|
||||
|
||||
/***** BASE *******/
|
||||
|
||||
static const uint32_t DEFAULT_COMMAND_MEM_SIZE_KB = 64;
|
||||
|
||||
BinaryMutex mutex;
|
||||
LocalVector<uint8_t> command_mem;
|
||||
ConditionVariable sync_cond_var;
|
||||
uint32_t sync_head = 0;
|
||||
uint32_t sync_tail = 0;
|
||||
uint32_t sync_awaiters = 0;
|
||||
WorkerThreadPool::TaskID pump_task_id = WorkerThreadPool::INVALID_TASK_ID;
|
||||
uint64_t flush_read_ptr = 0;
|
||||
std::atomic<bool> pending{ false };
|
||||
|
||||
template <typename T, typename... Args>
|
||||
_FORCE_INLINE_ void create_command(Args &&...p_args) {
|
||||
// alloc size is size+T+safeguard
|
||||
constexpr uint64_t alloc_size = ((sizeof(T) + 8U - 1U) & ~(8U - 1U));
|
||||
static_assert(alloc_size < UINT32_MAX, "Type too large to fit in the command queue.");
|
||||
|
||||
uint64_t size = command_mem.size();
|
||||
command_mem.resize(size + alloc_size + sizeof(uint64_t));
|
||||
*(uint64_t *)&command_mem[size] = alloc_size;
|
||||
void *cmd = &command_mem[size + sizeof(uint64_t)];
|
||||
new (cmd) T(std::forward<Args>(p_args)...);
|
||||
pending.store(true);
|
||||
}
|
||||
|
||||
template <typename T, bool NeedsSync, typename... Args>
|
||||
_FORCE_INLINE_ void _push_internal(Args &&...args) {
|
||||
MutexLock mlock(mutex);
|
||||
create_command<T>(std::forward<Args>(args)...);
|
||||
|
||||
if (pump_task_id != WorkerThreadPool::INVALID_TASK_ID) {
|
||||
WorkerThreadPool::get_singleton()->notify_yield_over(pump_task_id);
|
||||
}
|
||||
|
||||
if constexpr (NeedsSync) {
|
||||
sync_tail++;
|
||||
_wait_for_sync(mlock);
|
||||
}
|
||||
}
|
||||
|
||||
_FORCE_INLINE_ void _prevent_sync_wraparound() {
|
||||
bool safe_to_reset = !sync_awaiters;
|
||||
bool already_sync_to_latest = sync_head == sync_tail;
|
||||
if (safe_to_reset && already_sync_to_latest) {
|
||||
sync_head = 0;
|
||||
sync_tail = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void _flush() {
|
||||
if (unlikely(flush_read_ptr)) {
|
||||
// Re-entrant call.
|
||||
return;
|
||||
}
|
||||
|
||||
MutexLock lock(mutex);
|
||||
|
||||
while (flush_read_ptr < command_mem.size()) {
|
||||
uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
|
||||
flush_read_ptr += 8;
|
||||
CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
|
||||
uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock);
|
||||
cmd->call();
|
||||
WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id);
|
||||
|
||||
// Handle potential realloc due to the command and unlock allowance.
|
||||
cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
|
||||
|
||||
if (unlikely(cmd->sync)) {
|
||||
sync_head++;
|
||||
lock.~MutexLock(); // Give an opportunity to awaiters right away.
|
||||
sync_cond_var.notify_all();
|
||||
new (&lock) MutexLock(mutex);
|
||||
// Handle potential realloc happened during unlock.
|
||||
cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]);
|
||||
}
|
||||
|
||||
cmd->~CommandBase();
|
||||
|
||||
flush_read_ptr += size;
|
||||
}
|
||||
|
||||
command_mem.clear();
|
||||
pending.store(false);
|
||||
flush_read_ptr = 0;
|
||||
|
||||
_prevent_sync_wraparound();
|
||||
}
|
||||
|
||||
_FORCE_INLINE_ void _wait_for_sync(MutexLock<BinaryMutex> &p_lock) {
|
||||
sync_awaiters++;
|
||||
uint32_t sync_head_goal = sync_tail;
|
||||
do {
|
||||
sync_cond_var.wait(p_lock);
|
||||
} while (sync_head < sync_head_goal);
|
||||
sync_awaiters--;
|
||||
_prevent_sync_wraparound();
|
||||
}
|
||||
|
||||
void _no_op() {}
|
||||
|
||||
public:
|
||||
template <typename T, typename M, typename... Args>
|
||||
void push(T *p_instance, M p_method, Args &&...p_args) {
|
||||
// Standard command, no sync.
|
||||
using CommandType = Command<T, M, false, Args...>;
|
||||
_push_internal<CommandType, false>(p_instance, p_method, std::forward<Args>(p_args)...);
|
||||
}
|
||||
|
||||
template <typename T, typename M, typename... Args>
|
||||
void push_and_sync(T *p_instance, M p_method, Args... p_args) {
|
||||
// Standard command, sync.
|
||||
using CommandType = Command<T, M, true, Args...>;
|
||||
_push_internal<CommandType, true>(p_instance, p_method, std::forward<Args>(p_args)...);
|
||||
}
|
||||
|
||||
template <typename T, typename M, typename R, typename... Args>
|
||||
void push_and_ret(T *p_instance, M p_method, R *r_ret, Args... p_args) {
|
||||
// Command with return value, sync.
|
||||
using CommandType = CommandRet<T, M, R, Args...>;
|
||||
_push_internal<CommandType, true>(p_instance, p_method, r_ret, std::forward<Args>(p_args)...);
|
||||
}
|
||||
|
||||
_FORCE_INLINE_ void flush_if_pending() {
|
||||
if (unlikely(pending.load())) {
|
||||
_flush();
|
||||
}
|
||||
}
|
||||
|
||||
void flush_all() {
|
||||
_flush();
|
||||
}
|
||||
|
||||
void sync() {
|
||||
push_and_sync(this, &CommandQueueMT::_no_op);
|
||||
}
|
||||
|
||||
void wait_and_flush() {
|
||||
ERR_FAIL_COND(pump_task_id == WorkerThreadPool::INVALID_TASK_ID);
|
||||
WorkerThreadPool::get_singleton()->wait_for_task_completion(pump_task_id);
|
||||
_flush();
|
||||
}
|
||||
|
||||
void set_pump_task_id(WorkerThreadPool::TaskID p_task_id) {
|
||||
MutexLock lock(mutex);
|
||||
pump_task_id = p_task_id;
|
||||
}
|
||||
|
||||
CommandQueueMT();
|
||||
~CommandQueueMT();
|
||||
};
|
Reference in New Issue
Block a user