Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make MessageQueue growable #62303

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 39 additions & 25 deletions core/object/message_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,18 @@ Error MessageQueue::push_set(ObjectID p_id, const StringName &p_prop, const Vari

uint8_t room_needed = sizeof(Message) + sizeof(Variant);

if ((buffer_end + room_needed) >= buffer_size) {
String type;
if (ObjectDB::get_instance(p_id)) {
type = ObjectDB::get_instance(p_id)->get_class();
if ((buffer_end + room_needed) >= buffer.size()) {
if ((buffer_end + room_needed) >= max_allowed_buffer_size) {
String type;
if (ObjectDB::get_instance(p_id)) {
type = ObjectDB::get_instance(p_id)->get_class();
}
print_line("Failed set: " + type + ":" + p_prop + " target ID: " + itos(p_id));
statistics();
ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings.");
} else {
buffer.resize(buffer_end + room_needed);
}
print_line("Failed set: " + type + ":" + p_prop + " target ID: " + itos(p_id));
statistics();
ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_kb' in project settings.");
}

Message *msg = memnew_placement(&buffer[buffer_end], Message);
Expand All @@ -81,10 +85,18 @@ Error MessageQueue::push_notification(ObjectID p_id, int p_notification) {

uint8_t room_needed = sizeof(Message);

if ((buffer_end + room_needed) >= buffer_size) {
print_line("Failed notification: " + itos(p_notification) + " target ID: " + itos(p_id));
statistics();
ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_kb' in project settings.");
if ((buffer_end + room_needed) >= buffer.size()) {
if ((buffer_end + room_needed) >= max_allowed_buffer_size) {
String type;
if (ObjectDB::get_instance(p_id)) {
type = ObjectDB::get_instance(p_id)->get_class();
}
print_line("Failed notification: " + itos(p_notification) + " target ID: " + itos(p_id));
statistics();
ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings.");
} else {
buffer.resize(buffer_end + room_needed);
}
}

Message *msg = memnew_placement(&buffer[buffer_end], Message);
Expand Down Expand Up @@ -116,10 +128,18 @@ Error MessageQueue::push_callablep(const Callable &p_callable, const Variant **p

int room_needed = sizeof(Message) + sizeof(Variant) * p_argcount;

if ((buffer_end + room_needed) >= buffer_size) {
print_line("Failed method: " + p_callable);
statistics();
ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_kb' in project settings.");
if ((buffer_end + room_needed) >= buffer.size()) {
if ((buffer_end + room_needed) >= max_allowed_buffer_size) {
String type;
if (ObjectDB::get_instance(p_callable.get_object_id())) {
type = ObjectDB::get_instance(p_callable.get_object_id())->get_class();
}
print_line("Failed method: " + p_callable);
statistics();
ERR_FAIL_V_MSG(ERR_OUT_OF_MEMORY, "Message queue out of memory. Try increasing 'memory/limits/message_queue/max_size_mb' in project settings.");
} else {
buffer.resize(buffer_end + room_needed);
}
}

Message *msg = memnew_placement(&buffer[buffer_end], Message);
Expand Down Expand Up @@ -212,7 +232,7 @@ void MessageQueue::statistics() {
}

int MessageQueue::get_max_buffer_usage() const {
return buffer_max_used;
return buffer.size();
}

void MessageQueue::_call_function(const Callable &p_callable, const Variant *p_args, int p_argcount, bool p_show_error) {
Expand All @@ -233,10 +253,6 @@ void MessageQueue::_call_function(const Callable &p_callable, const Variant *p_a
}

void MessageQueue::flush() {
if (buffer_end > buffer_max_used) {
buffer_max_used = buffer_end;
}

uint32_t read_pos = 0;

//using reverse locking strategy
Expand Down Expand Up @@ -314,10 +330,9 @@ MessageQueue::MessageQueue() {
ERR_FAIL_COND_MSG(singleton != nullptr, "A MessageQueue singleton already exists.");
singleton = this;

buffer_size = GLOBAL_DEF_RST("memory/limits/message_queue/max_size_kb", DEFAULT_QUEUE_SIZE_KB);
ProjectSettings::get_singleton()->set_custom_property_info("memory/limits/message_queue/max_size_kb", PropertyInfo(Variant::INT, "memory/limits/message_queue/max_size_kb", PROPERTY_HINT_RANGE, "1024,4096,1,or_greater"));
buffer_size *= 1024;
buffer = memnew_arr(uint8_t, buffer_size);
max_allowed_buffer_size = GLOBAL_DEF_RST("memory/limits/message_queue/max_size_mb", 32);
max_allowed_buffer_size *= 1024 * 1024;
ProjectSettings::get_singleton()->set_custom_property_info("memory/limits/message_queue/max_size_mb", PropertyInfo(Variant::INT, "memory/limits/message_queue/max_size_mb", PROPERTY_HINT_RANGE, "4,512,1,or_greater"));
}

MessageQueue::~MessageQueue() {
Expand All @@ -341,5 +356,4 @@ MessageQueue::~MessageQueue() {
}

singleton = nullptr;
memdelete_arr(buffer);
}
12 changes: 4 additions & 8 deletions core/object/message_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,14 @@

#include "core/object/object_id.h"
#include "core/os/thread_safe.h"
#include "core/templates/local_vector.h"
#include "core/variant/variant.h"

akien-mga marked this conversation as resolved.
Show resolved Hide resolved
class Object;

class MessageQueue {
_THREAD_SAFE_CLASS_

enum {
DEFAULT_QUEUE_SIZE_KB = 4096
};

enum {
TYPE_CALL,
TYPE_NOTIFICATION,
Expand All @@ -62,10 +59,9 @@ class MessageQueue {
};
};

uint8_t *buffer = nullptr;
uint32_t buffer_end = 0;
uint32_t buffer_max_used = 0;
uint32_t buffer_size = 0;
LocalVector<uint8_t> buffer;
uint64_t buffer_end = 0;
uint64_t max_allowed_buffer_size = 0;

void _call_function(const Callable &p_callable, const Variant *p_args, int p_argcount, bool p_show_error);

Expand Down
3 changes: 1 addition & 2 deletions doc/classes/ProjectSettings.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1367,8 +1367,7 @@
<member name="layer_names/3d_render/layer_9" type="String" setter="" getter="" default="&quot;&quot;">
Optional name for the 3D render layer 9. If left empty, the layer will display as "Layer 9".
</member>
<member name="memory/limits/message_queue/max_size_kb" type="int" setter="" getter="" default="4096">
Godot uses a message queue to defer some function calls. If you run out of space on it (you will see an error), you can increase the size here.
<member name="memory/limits/message_queue/max_size_mb" type="int" setter="" getter="" default="32">
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to re-add the description.

</member>
<member name="memory/limits/multithreaded_server/rid_pool_prealloc" type="int" setter="" getter="" default="60">
This is used by servers when used in multi-threading mode (servers and visual). RIDs are preallocated to avoid stalling the server requesting them on threads. If servers get stalled too often when loading resources in a thread, increase this number.
Expand Down