Removed interactive loader, added proper thread loading.

This commit is contained in:
Juan Linietsky
2020-02-28 08:27:04 -03:00
parent 5beccaf86f
commit 475e4ea67b
57 changed files with 872 additions and 592 deletions

View File

@ -39,26 +39,16 @@
#include "core/translation.h"
#include "core/variant_parser.h"
#ifdef DEBUG_LOAD_THREADED
#define print_lt(m_text) print_line(m_text)
#else
#define print_lt(m_text)
#endif
Ref<ResourceFormatLoader> ResourceLoader::loader[ResourceLoader::MAX_LOADERS];
int ResourceLoader::loader_count = 0;
Error ResourceInteractiveLoader::wait() {
Error err = poll();
while (err == OK) {
err = poll();
}
return err;
}
ResourceInteractiveLoader::~ResourceInteractiveLoader() {
if (path_loading != String()) {
ResourceLoader::_remove_from_loading_map_and_thread(path_loading, path_loading_thread);
}
}
bool ResourceFormatLoader::recognize_path(const String &p_path, const String &p_for_type) const {
String extension = p_path.get_extension();
@ -111,45 +101,6 @@ void ResourceLoader::get_recognized_extensions_for_type(const String &p_type, Li
}
}
void ResourceInteractiveLoader::_bind_methods() {
ClassDB::bind_method(D_METHOD("get_resource"), &ResourceInteractiveLoader::get_resource);
ClassDB::bind_method(D_METHOD("poll"), &ResourceInteractiveLoader::poll);
ClassDB::bind_method(D_METHOD("wait"), &ResourceInteractiveLoader::wait);
ClassDB::bind_method(D_METHOD("get_stage"), &ResourceInteractiveLoader::get_stage);
ClassDB::bind_method(D_METHOD("get_stage_count"), &ResourceInteractiveLoader::get_stage_count);
}
class ResourceInteractiveLoaderDefault : public ResourceInteractiveLoader {
GDCLASS(ResourceInteractiveLoaderDefault, ResourceInteractiveLoader);
public:
Ref<Resource> resource;
virtual void set_local_path(const String &p_local_path) { /*scene->set_filename(p_local_path);*/
}
virtual Ref<Resource> get_resource() { return resource; }
virtual Error poll() { return ERR_FILE_EOF; }
virtual int get_stage() const { return 1; }
virtual int get_stage_count() const { return 1; }
virtual void set_translation_remapped(bool p_remapped) { resource->set_as_translation_remapped(p_remapped); }
ResourceInteractiveLoaderDefault() {}
};
Ref<ResourceInteractiveLoader> ResourceFormatLoader::load_interactive(const String &p_path, const String &p_original_path, Error *r_error) {
//either this
Ref<Resource> res = load(p_path, p_original_path, r_error);
if (res.is_null())
return Ref<ResourceInteractiveLoader>();
Ref<ResourceInteractiveLoaderDefault> ril = Ref<ResourceInteractiveLoaderDefault>(memnew(ResourceInteractiveLoaderDefault));
ril->resource = res;
return ril;
}
bool ResourceFormatLoader::exists(const String &p_path) const {
return FileAccess::exists(p_path); //by default just check file
}
@ -168,10 +119,10 @@ void ResourceFormatLoader::get_recognized_extensions(List<String> *p_extensions)
}
}
RES ResourceFormatLoader::load(const String &p_path, const String &p_original_path, Error *r_error) {
RES ResourceFormatLoader::load(const String &p_path, const String &p_original_path, Error *r_error, bool p_use_sub_threads, float *r_progress) {
if (get_script_instance() && get_script_instance()->has_method("load")) {
Variant res = get_script_instance()->call("load", p_path, p_original_path);
Variant res = get_script_instance()->call("load", p_path, p_original_path, p_use_sub_threads);
if (res.get_type() == Variant::INT) {
@ -184,29 +135,11 @@ RES ResourceFormatLoader::load(const String &p_path, const String &p_original_pa
*r_error = OK;
return res;
}
return res;
}
//or this must be implemented
Ref<ResourceInteractiveLoader> ril = load_interactive(p_path, p_original_path, r_error);
if (!ril.is_valid())
return RES();
ril->set_local_path(p_original_path);
while (true) {
Error err = ril->poll();
if (err == ERR_FILE_EOF) {
if (r_error)
*r_error = OK;
return ril->get_resource();
}
if (r_error)
*r_error = err;
ERR_FAIL_COND_V_MSG(err != OK, RES(), "Failed to load resource '" + p_path + "'.");
}
ERR_FAIL_V_MSG(RES(), "Failed to load resource '" + p_path + "', ResourceFormatLoader::load was not implemented for this resource type.");
}
void ResourceFormatLoader::get_dependencies(const String &p_path, List<String> *p_dependencies, bool p_add_types) {
@ -256,7 +189,7 @@ void ResourceFormatLoader::_bind_methods() {
///////////////////////////////////
RES ResourceLoader::_load(const String &p_path, const String &p_original_path, const String &p_type_hint, bool p_no_cache, Error *r_error) {
RES ResourceLoader::_load(const String &p_path, const String &p_original_path, const String &p_type_hint, bool p_no_cache, Error *r_error, bool p_use_sub_threads, float *r_progress) {
bool found = false;
@ -267,7 +200,7 @@ RES ResourceLoader::_load(const String &p_path, const String &p_original_path, c
continue;
}
found = true;
RES res = loader[i]->load(p_path, p_original_path != String() ? p_original_path : p_path, r_error);
RES res = loader[i]->load(p_path, p_original_path != String() ? p_original_path : p_path, r_error, p_use_sub_threads, r_progress);
if (res.is_null()) {
continue;
}
@ -285,43 +218,302 @@ RES ResourceLoader::_load(const String &p_path, const String &p_original_path, c
ERR_FAIL_V_MSG(RES(), "No loader found for resource: " + p_path + ".");
}
bool ResourceLoader::_add_to_loading_map(const String &p_path) {
void ResourceLoader::_thread_load_function(void *p_userdata) {
bool success;
MutexLock lock(loading_map_mutex);
ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata;
load_task.loader_id = Thread::get_caller_id();
LoadingMapKey key;
key.path = p_path;
key.thread = Thread::get_caller_id();
if (load_task.semaphore) {
//this is an actual thread, so wait for Ok fom semaphore
thread_load_semaphore->wait(); //wait until its ok to start loading
}
load_task.resource = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, false, &load_task.error, load_task.use_sub_threads, &load_task.progress);
if (loading_map.has(key)) {
success = false;
load_task.progress = 1.0; //it was fully loaded at this point, so force progress to 1.0
thread_load_mutex->lock();
if (load_task.error != OK) {
load_task.status = THREAD_LOAD_FAILED;
} else {
loading_map[key] = true;
success = true;
load_task.status = THREAD_LOAD_LOADED;
}
if (load_task.semaphore) {
if (load_task.start_next && thread_waiting_count > 0) {
thread_waiting_count--;
//thread loading count remains constant, this ends but another one begins
thread_load_semaphore->post();
} else {
thread_loading_count--; //no threads waiting, just reduce loading count
}
print_lt("END: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count));
for (int i = 0; i < load_task.poll_requests; i++) {
load_task.semaphore->post();
}
memdelete(load_task.semaphore);
load_task.semaphore = nullptr;
}
return success;
if (load_task.resource.is_valid()) {
load_task.resource->set_path(load_task.local_path);
if (load_task.xl_remapped)
load_task.resource->set_as_translation_remapped(true);
#ifdef TOOLS_ENABLED
load_task.resource->set_edited(false);
if (timestamp_on_load) {
uint64_t mt = FileAccess::get_modified_time(load_task.remapped_path);
//printf("mt %s: %lli\n",remapped_path.utf8().get_data(),mt);
load_task.resource->set_last_modified_time(mt);
}
#endif
if (_loaded_callback) {
_loaded_callback(load_task.resource, load_task.local_path);
}
}
thread_load_mutex->unlock();
}
Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, const String &p_source_resource) {
String local_path;
if (p_path.is_rel_path())
local_path = "res://" + p_path;
else
local_path = ProjectSettings::get_singleton()->localize_path(p_path);
thread_load_mutex->lock();
if (p_source_resource != String()) {
//must be loading from this resource
if (!thread_load_tasks.has(p_source_resource)) {
thread_load_mutex->unlock();
ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "There is no thread loading source resource '" + p_source_resource + "'.");
}
//must be loading from this thread
if (thread_load_tasks[p_source_resource].loader_id != Thread::get_caller_id()) {
thread_load_mutex->unlock();
ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Threading loading resource'" + local_path + " failed: Source specified: '" + p_source_resource + "' but was not called by it.");
}
//must not be already added as s sub tasks
if (thread_load_tasks[p_source_resource].sub_tasks.has(local_path)) {
thread_load_mutex->unlock();
ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Thread loading source resource '" + p_source_resource + "' already is loading '" + local_path + "'.");
}
}
if (thread_load_tasks.has(local_path)) {
thread_load_tasks[local_path].requests++;
if (p_source_resource != String()) {
thread_load_tasks[p_source_resource].sub_tasks.insert(local_path);
}
thread_load_mutex->unlock();
return OK;
}
{
//create load task
ThreadLoadTask load_task;
load_task.requests = 1;
load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped);
load_task.local_path = local_path;
load_task.type_hint = p_type_hint;
load_task.use_sub_threads = p_use_sub_threads;
{ //must check if resource is already loaded before attempting to load it in a thread
if (load_task.loader_id == Thread::get_caller_id()) {
thread_load_mutex->unlock();
ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Attempted to load a resource already being loaded from this thread, cyclic reference?");
}
//lock first if possible
if (ResourceCache::lock) {
ResourceCache::lock->read_lock();
}
//get ptr
Resource **rptr = ResourceCache::resources.getptr(local_path);
if (rptr) {
RES res(*rptr);
//it is possible this resource was just freed in a thread. If so, this referencing will not work and resource is considered not cached
if (res.is_valid()) {
//referencing is fine
load_task.resource = res;
load_task.status = THREAD_LOAD_LOADED;
load_task.progress = 1.0;
}
}
if (ResourceCache::lock) {
ResourceCache::lock->read_unlock();
}
}
if (p_source_resource != String()) {
thread_load_tasks[p_source_resource].sub_tasks.insert(local_path);
}
thread_load_tasks[local_path] = load_task;
}
ThreadLoadTask &load_task = thread_load_tasks[local_path];
if (load_task.resource.is_null()) { //needs to be loaded in thread
load_task.semaphore = memnew(Semaphore);
if (thread_loading_count < thread_load_max) {
thread_loading_count++;
thread_load_semaphore->post(); //we have free threads, so allow one
} else {
thread_waiting_count++;
}
print_lt("REQUEST: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count));
load_task.thread = Thread::create(_thread_load_function, &thread_load_tasks[local_path]);
load_task.loader_id = load_task.thread->get_id();
}
thread_load_mutex->unlock();
return OK;
}
void ResourceLoader::_remove_from_loading_map(const String &p_path) {
MutexLock lock(loading_map_mutex);
float ResourceLoader::_dependency_get_progress(const String &p_path) {
LoadingMapKey key;
key.path = p_path;
key.thread = Thread::get_caller_id();
if (thread_load_tasks.has(p_path)) {
ThreadLoadTask &load_task = thread_load_tasks[p_path];
int dep_count = load_task.sub_tasks.size();
if (dep_count > 0) {
float dep_progress = 0;
for (Set<String>::Element *E = load_task.sub_tasks.front(); E; E = E->next()) {
dep_progress += _dependency_get_progress(E->get());
}
dep_progress /= float(dep_count);
dep_progress *= 0.5;
dep_progress += load_task.progress * 0.5;
return dep_progress;
} else {
return load_task.progress;
}
loading_map.erase(key);
} else {
return 1.0; //assume finished loading it so it no longer exists
}
}
void ResourceLoader::_remove_from_loading_map_and_thread(const String &p_path, Thread::ID p_thread) {
MutexLock lock(loading_map_mutex);
ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const String &p_path, float *r_progress) {
LoadingMapKey key;
key.path = p_path;
key.thread = p_thread;
String local_path;
if (p_path.is_rel_path())
local_path = "res://" + p_path;
else
local_path = ProjectSettings::get_singleton()->localize_path(p_path);
loading_map.erase(key);
thread_load_mutex->lock();
if (!thread_load_tasks.has(local_path)) {
thread_load_mutex->unlock();
return THREAD_LOAD_INVALID_RESOURCE;
}
ThreadLoadTask &load_task = thread_load_tasks[local_path];
ThreadLoadStatus status;
status = load_task.status;
if (r_progress) {
*r_progress = _dependency_get_progress(local_path);
}
thread_load_mutex->unlock();
return status;
}
RES ResourceLoader::load_threaded_get(const String &p_path, Error *r_error) {
String local_path;
if (p_path.is_rel_path())
local_path = "res://" + p_path;
else
local_path = ProjectSettings::get_singleton()->localize_path(p_path);
thread_load_mutex->lock();
if (!thread_load_tasks.has(local_path)) {
thread_load_mutex->unlock();
if (r_error) {
*r_error = ERR_INVALID_PARAMETER;
}
return RES();
}
ThreadLoadTask &load_task = thread_load_tasks[local_path];
//semaphore still exists, meaning its still loading, request poll
Semaphore *semaphore = load_task.semaphore;
if (semaphore) {
load_task.poll_requests++;
{
// As we got a semaphore, this means we are going to have to wait
// until the sub-resource is done loading
//
// As this thread will become 'blocked' we should "echange" its
// active status with a waiting one, to ensure load continues.
//
// This ensures loading is never blocked and that is also within
// the maximum number of active threads.
if (thread_waiting_count > 0) {
thread_waiting_count--;
thread_loading_count++;
thread_load_semaphore->post();
load_task.start_next = false; //do not start next since we are doing it here
}
thread_suspended_count++;
print_lt("GET: load count: " + itos(thread_loading_count) + " / wait count: " + itos(thread_waiting_count) + " / suspended count: " + itos(thread_suspended_count) + " / active: " + itos(thread_loading_count - thread_suspended_count));
}
thread_load_mutex->unlock();
semaphore->wait();
thread_load_mutex->lock();
thread_suspended_count--;
if (!thread_load_tasks.has(local_path)) { //may have been erased during unlock and this was always an invalid call
thread_load_mutex->unlock();
if (r_error) {
*r_error = ERR_INVALID_PARAMETER;
}
return RES();
}
}
RES resource = load_task.resource;
if (r_error) {
*r_error = load_task.error;
}
load_task.requests--;
if (load_task.requests == 0) {
if (load_task.thread) { //thread may not have been used
Thread::wait_to_finish(load_task.thread);
memdelete(load_task.thread);
}
thread_load_tasks.erase(local_path);
}
thread_load_mutex->unlock();
return resource;
}
RES ResourceLoader::load(const String &p_path, const String &p_type_hint, bool p_no_cache, Error *r_error) {
@ -337,83 +529,101 @@ RES ResourceLoader::load(const String &p_path, const String &p_type_hint, bool p
if (!p_no_cache) {
{
bool success = _add_to_loading_map(local_path);
ERR_FAIL_COND_V_MSG(!success, RES(), "Resource: '" + local_path + "' is already being loaded. Cyclic reference?");
thread_load_mutex->lock();
//Is it already being loaded? poll until done
if (thread_load_tasks.has(local_path)) {
Error err = load_threaded_request(p_path, p_type_hint);
if (err != OK) {
if (r_error) {
*r_error = err;
}
return RES();
}
thread_load_mutex->unlock();
return load_threaded_get(p_path, r_error);
}
//lock first if possible
//Is it cached?
if (ResourceCache::lock) {
ResourceCache::lock->read_lock();
}
//get ptr
Resource **rptr = ResourceCache::resources.getptr(local_path);
if (rptr) {
RES res(*rptr);
//it is possible this resource was just freed in a thread. If so, this referencing will not work and resource is considered not cached
if (res.is_valid()) {
//referencing is fine
if (r_error)
*r_error = OK;
if (ResourceCache::lock) {
ResourceCache::lock->read_unlock();
}
_remove_from_loading_map(local_path);
return res;
thread_load_mutex->unlock();
if (r_error) {
*r_error = OK;
}
return res; //use cached
}
}
if (ResourceCache::lock) {
ResourceCache::lock->read_unlock();
}
}
bool xl_remapped = false;
String path = _path_remap(local_path, &xl_remapped);
//load using task (but this thread)
ThreadLoadTask load_task;
if (path == "") {
if (!p_no_cache) {
_remove_from_loading_map(local_path);
load_task.requests = 1;
load_task.local_path = local_path;
load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped);
load_task.type_hint = p_type_hint;
load_task.loader_id = Thread::get_caller_id();
thread_load_tasks[local_path] = load_task;
thread_load_mutex->unlock();
_thread_load_function(&thread_load_tasks[local_path]);
return load_threaded_get(p_path, r_error);
} else {
bool xl_remapped = false;
String path = _path_remap(local_path, &xl_remapped);
if (path == "") {
ERR_FAIL_V_MSG(RES(), "Remapping '" + local_path + "' failed.");
}
ERR_FAIL_V_MSG(RES(), "Remapping '" + local_path + "' failed.");
}
print_verbose("Loading resource: " + path);
RES res = _load(path, local_path, p_type_hint, p_no_cache, r_error);
print_verbose("Loading resource: " + path);
float p;
RES res = _load(path, local_path, p_type_hint, p_no_cache, r_error, false, &p);
if (res.is_null()) {
if (!p_no_cache) {
_remove_from_loading_map(local_path);
if (res.is_null()) {
print_verbose("Failed loading resource: " + path);
return RES();
}
print_verbose("Failed loading resource: " + path);
return RES();
}
if (!p_no_cache)
res->set_path(local_path);
if (xl_remapped)
res->set_as_translation_remapped(true);
if (xl_remapped)
res->set_as_translation_remapped(true);
#ifdef TOOLS_ENABLED
res->set_edited(false);
if (timestamp_on_load) {
uint64_t mt = FileAccess::get_modified_time(path);
//printf("mt %s: %lli\n",remapped_path.utf8().get_data(),mt);
res->set_last_modified_time(mt);
}
res->set_edited(false);
if (timestamp_on_load) {
uint64_t mt = FileAccess::get_modified_time(path);
//printf("mt %s: %lli\n",remapped_path.utf8().get_data(),mt);
res->set_last_modified_time(mt);
}
#endif
if (!p_no_cache) {
_remove_from_loading_map(local_path);
return res;
}
if (_loaded_callback) {
_loaded_callback(res, p_path);
}
return res;
}
bool ResourceLoader::exists(const String &p_path, const String &p_type_hint) {
@ -446,76 +656,6 @@ bool ResourceLoader::exists(const String &p_path, const String &p_type_hint) {
return false;
}
Ref<ResourceInteractiveLoader> ResourceLoader::load_interactive(const String &p_path, const String &p_type_hint, bool p_no_cache, Error *r_error) {
if (r_error)
*r_error = ERR_CANT_OPEN;
String local_path;
if (p_path.is_rel_path())
local_path = "res://" + p_path;
else
local_path = ProjectSettings::get_singleton()->localize_path(p_path);
if (!p_no_cache) {
bool success = _add_to_loading_map(local_path);
ERR_FAIL_COND_V_MSG(!success, RES(), "Resource: '" + local_path + "' is already being loaded. Cyclic reference?");
if (ResourceCache::has(local_path)) {
print_verbose("Loading resource: " + local_path + " (cached)");
Ref<Resource> res_cached = ResourceCache::get(local_path);
Ref<ResourceInteractiveLoaderDefault> ril = Ref<ResourceInteractiveLoaderDefault>(memnew(ResourceInteractiveLoaderDefault));
ril->resource = res_cached;
ril->path_loading = local_path;
ril->path_loading_thread = Thread::get_caller_id();
return ril;
}
}
bool xl_remapped = false;
String path = _path_remap(local_path, &xl_remapped);
if (path == "") {
if (!p_no_cache) {
_remove_from_loading_map(local_path);
}
ERR_FAIL_V_MSG(RES(), "Remapping '" + local_path + "' failed.");
}
print_verbose("Loading resource: " + path);
bool found = false;
for (int i = 0; i < loader_count; i++) {
if (!loader[i]->recognize_path(path, p_type_hint))
continue;
found = true;
Ref<ResourceInteractiveLoader> ril = loader[i]->load_interactive(path, local_path, r_error);
if (ril.is_null())
continue;
if (!p_no_cache) {
ril->set_local_path(local_path);
ril->path_loading = local_path;
ril->path_loading_thread = Thread::get_caller_id();
}
if (xl_remapped)
ril->set_translation_remapped(true);
return ril;
}
if (!p_no_cache) {
_remove_from_loading_map(local_path);
}
ERR_FAIL_COND_V_MSG(found, Ref<ResourceInteractiveLoader>(), "Failed loading resource: " + path + ".");
ERR_FAIL_V_MSG(Ref<ResourceInteractiveLoader>(), "No loader found for resource: " + path + ".");
}
void ResourceLoader::add_resource_format_loader(Ref<ResourceFormatLoader> p_format_loader, bool p_at_front) {
ERR_FAIL_COND(p_format_loader.is_null());
@ -984,20 +1124,19 @@ void ResourceLoader::remove_custom_loaders() {
}
}
Mutex ResourceLoader::loading_map_mutex;
HashMap<ResourceLoader::LoadingMapKey, int, ResourceLoader::LoadingMapKeyHasher> ResourceLoader::loading_map;
void ResourceLoader::initialize() {
thread_load_mutex = memnew(Mutex);
thread_load_max = OS::get_singleton()->get_processor_count();
thread_loading_count = 0;
thread_waiting_count = 0;
thread_suspended_count = 0;
thread_load_semaphore = memnew(Semaphore);
}
void ResourceLoader::finalize() {
#ifndef NO_THREADS
const LoadingMapKey *K = NULL;
while ((K = loading_map.next(K))) {
ERR_PRINT("Exited while resource is being loaded: " + K->path);
}
loading_map.clear();
#endif
memdelete(thread_load_mutex);
memdelete(thread_load_semaphore);
}
ResourceLoadErrorNotify ResourceLoader::err_notify = NULL;
@ -1009,6 +1148,15 @@ void *ResourceLoader::dep_err_notify_ud = NULL;
bool ResourceLoader::abort_on_missing_resource = true;
bool ResourceLoader::timestamp_on_load = false;
Mutex *ResourceLoader::thread_load_mutex = nullptr;
HashMap<String, ResourceLoader::ThreadLoadTask> ResourceLoader::thread_load_tasks;
Semaphore *ResourceLoader::thread_load_semaphore = nullptr;
int ResourceLoader::thread_loading_count = 0;
int ResourceLoader::thread_waiting_count = 0;
int ResourceLoader::thread_suspended_count = 0;
int ResourceLoader::thread_load_max = 0;
SelfList<Resource>::List ResourceLoader::remapped_list;
HashMap<String, Vector<String> > ResourceLoader::translation_remaps;
HashMap<String, String> ResourceLoader::path_remaps;