diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index a2085cdd39..548ce3c581 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -337,12 +337,42 @@ Ref ResourceLoader::_load(const String &p_path, const String &p_origin void ResourceLoader::_run_load_task(void *p_userdata) { ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata; + bool wait = false; { MutexLock thread_load_lock(thread_load_mutex); if (cleaning_tasks) { load_task.status = THREAD_LOAD_FAILED; return; } + + int thread_index = WorkerThreadPool::get_singleton()->get_thread_index(); + + if (load_task.started_load && load_task.thread_index != thread_index) { + wait = true; + } else { + load_task.started_load = true; + load_task.thread_index = thread_index; + } + } + + if (wait) { + // There are a couple of reasons why we got here: + // 1) We re-started the task in _load_complete_inner but we also + // got started via the original task in the WorkerThreadPool + // 2) There's a race between multiple threads in _load_complete_inner + // and more than one thread thought they had to restart + // + // This task was already running, wait for the other thread to complete. + ThreadLoadStatus status; + do { + OS::get_singleton()->delay_usec(1000); + thread_load_mutex.lock(); + status = load_task.status; + thread_load_mutex.unlock(); + } while (status == THREAD_LOAD_IN_PROGRESS); + + load_task.load_token->unreference(); + return; } ThreadLoadTask *curr_load_task_backup = curr_load_task; @@ -487,6 +517,8 @@ String ResourceLoader::_validate_local_path(const String &p_path) { Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, CacheMode p_cache_mode) { Ref token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode, true); + // We need to keep at least one reference to the token until it is done. + token->reference(); return token.is_valid() ? OK : FAILED; } @@ -843,16 +875,32 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro bool loader_is_wtp = load_task.task_id != 0; if (loader_is_wtp) { // Loading thread is in the worker pool. - p_thread_load_lock.temp_unlock(); int load_nesting_backup = load_nesting; load_nesting = 0; - Error wait_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); + int thread_index = WorkerThreadPool::get_singleton()->get_thread_index(); + int task_thread_index = load_task.thread_index; + + bool restart = false; + bool running = load_task.started_load; + + p_thread_load_lock.temp_unlock(); + + if (running && task_thread_index != thread_index) { + Error wait_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); + + DEV_ASSERT(!wait_err || wait_err == ERR_BUSY); + if (wait_err == ERR_BUSY) { + restart = true; + } + } else { + restart = true; + } + DEV_ASSERT(load_nesting == 0); load_nesting = load_nesting_backup; - DEV_ASSERT(!wait_err || wait_err == ERR_BUSY); - if (wait_err == ERR_BUSY) { + if (restart) { // The WorkerThreadPool has reported that the current task wants to await on an older one. // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of // resource loading that means that the task to wait for can be restarted here to break the @@ -907,10 +955,13 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro if (resource.is_valid()) { if (load_task_ptr->parent_task) { - // A task awaiting another => Let the awaiter accumulate the resource changed connections. - DEV_ASSERT(load_task_ptr->parent_task != load_task_ptr); - for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) { - load_task_ptr->parent_task->resource_changed_connections.push_back(rcc); + if (!load_task_ptr->connections_propagated) { + // A task awaiting another => Let the awaiter accumulate the resource changed connections. + DEV_ASSERT(load_task_ptr->parent_task != load_task_ptr); + for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) { + load_task_ptr->parent_task->resource_changed_connections.push_back(rcc); + } + load_task_ptr->connections_propagated = true; } } else { p_thread_load_lock.temp_unlock(); diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index ec738adca9..8e94f0dee9 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -181,6 +181,7 @@ private: struct ThreadLoadTask { WorkerThreadPool::TaskID task_id = 0; // Used if run on a worker thread from the pool. Thread::ID thread_id = 0; // Used if running on an user thread (e.g., simple non-threaded load). + int thread_index = -1; ConditionVariable *cond_var = nullptr; // In not in the worker pool or already awaiting, this is used as a secondary awaiting mechanism. uint32_t awaiters_count = 0; LoadToken *load_token = nullptr; @@ -200,6 +201,8 @@ private: bool need_wait : 1; bool in_progress_check : 1; // Measure against recursion cycles in progress reporting. Cycles are not expected, but can happen due to how it's currently implemented. bool use_sub_threads : 1; + bool started_load : 1; + bool connections_propagated : 1; struct ResourceChangedConnection { Resource *source = nullptr; @@ -212,7 +215,9 @@ private: awaited(false), need_wait(true), in_progress_check(false), - use_sub_threads(false) {} + use_sub_threads(false), + started_load(false), + connections_propagated(false) {} }; static void _run_load_task(void *p_userdata);