Merge pull request #118824 from hpvb/fix-118085

Fix a race in `ResourceLoader::load_threaded_request()`
This commit is contained in:
Thaddeus Crews
2026-05-06 14:56:33 -05:00
2 changed files with 65 additions and 9 deletions
+59 -8
View File
@@ -337,12 +337,42 @@ Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_origin
void ResourceLoader::_run_load_task(void *p_userdata) {
ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata;
bool wait = false;
{
MutexLock thread_load_lock(thread_load_mutex);
if (cleaning_tasks) {
load_task.status = THREAD_LOAD_FAILED;
return;
}
int thread_index = WorkerThreadPool::get_singleton()->get_thread_index();
if (load_task.started_load && load_task.thread_index != thread_index) {
wait = true;
} else {
load_task.started_load = true;
load_task.thread_index = thread_index;
}
}
if (wait) {
// There are a couple of reasons why we got here:
// 1) We re-started the task in _load_complete_inner but we also
// got started via the original task in the WorkerThreadPool
// 2) There's a race between multiple threads in _load_complete_inner
// and more than one thread thought they had to restart
//
// This task was already running, wait for the other thread to complete.
ThreadLoadStatus status;
do {
OS::get_singleton()->delay_usec(1000);
thread_load_mutex.lock();
status = load_task.status;
thread_load_mutex.unlock();
} while (status == THREAD_LOAD_IN_PROGRESS);
load_task.load_token->unreference();
return;
}
ThreadLoadTask *curr_load_task_backup = curr_load_task;
@@ -487,6 +517,8 @@ String ResourceLoader::_validate_local_path(const String &p_path) {
Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, CacheMode p_cache_mode) {
Ref<ResourceLoader::LoadToken> token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode, true);
// We need to keep at least one reference to the token until it is done.
token->reference();
return token.is_valid() ? OK : FAILED;
}
@@ -843,16 +875,32 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
bool loader_is_wtp = load_task.task_id != 0;
if (loader_is_wtp) {
// Loading thread is in the worker pool.
p_thread_load_lock.temp_unlock();
int load_nesting_backup = load_nesting;
load_nesting = 0;
Error wait_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
int thread_index = WorkerThreadPool::get_singleton()->get_thread_index();
int task_thread_index = load_task.thread_index;
bool restart = false;
bool running = load_task.started_load;
p_thread_load_lock.temp_unlock();
if (running && task_thread_index != thread_index) {
Error wait_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
DEV_ASSERT(!wait_err || wait_err == ERR_BUSY);
if (wait_err == ERR_BUSY) {
restart = true;
}
} else {
restart = true;
}
DEV_ASSERT(load_nesting == 0);
load_nesting = load_nesting_backup;
DEV_ASSERT(!wait_err || wait_err == ERR_BUSY);
if (wait_err == ERR_BUSY) {
if (restart) {
// The WorkerThreadPool has reported that the current task wants to await on an older one.
// That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of
// resource loading that means that the task to wait for can be restarted here to break the
@@ -907,10 +955,13 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
if (resource.is_valid()) {
if (load_task_ptr->parent_task) {
// A task awaiting another => Let the awaiter accumulate the resource changed connections.
DEV_ASSERT(load_task_ptr->parent_task != load_task_ptr);
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
load_task_ptr->parent_task->resource_changed_connections.push_back(rcc);
if (!load_task_ptr->connections_propagated) {
// A task awaiting another => Let the awaiter accumulate the resource changed connections.
DEV_ASSERT(load_task_ptr->parent_task != load_task_ptr);
for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) {
load_task_ptr->parent_task->resource_changed_connections.push_back(rcc);
}
load_task_ptr->connections_propagated = true;
}
} else {
p_thread_load_lock.temp_unlock();
+6 -1
View File
@@ -181,6 +181,7 @@ private:
struct ThreadLoadTask {
WorkerThreadPool::TaskID task_id = 0; // Used if run on a worker thread from the pool.
Thread::ID thread_id = 0; // Used if running on an user thread (e.g., simple non-threaded load).
int thread_index = -1;
ConditionVariable *cond_var = nullptr; // In not in the worker pool or already awaiting, this is used as a secondary awaiting mechanism.
uint32_t awaiters_count = 0;
LoadToken *load_token = nullptr;
@@ -200,6 +201,8 @@ private:
bool need_wait : 1;
bool in_progress_check : 1; // Measure against recursion cycles in progress reporting. Cycles are not expected, but can happen due to how it's currently implemented.
bool use_sub_threads : 1;
bool started_load : 1;
bool connections_propagated : 1;
struct ResourceChangedConnection {
Resource *source = nullptr;
@@ -212,7 +215,9 @@ private:
awaited(false),
need_wait(true),
in_progress_check(false),
use_sub_threads(false) {}
use_sub_threads(false),
started_load(false),
connections_propagated(false) {}
};
static void _run_load_task(void *p_userdata);