Skip to content

Commit

Permalink
Timestamped input buffering - prevent stalling and improve timing
Browse files Browse the repository at this point in the history
Input was resulting in stalling and ANRs on Android and possibly other platforms. This PR separates buffered input into two parts to prevent stalls. It also timestamps input events and applies them on the relevant logical physics tick, rather than attempting to apply them "realtime".
  • Loading branch information
lawnjelly committed May 26, 2023
1 parent 2eec9a6 commit 045601e
Show file tree
Hide file tree
Showing 9 changed files with 378 additions and 70 deletions.
293 changes: 247 additions & 46 deletions core/input/input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,190 @@ void (*Input::warp_mouse_func)(const Vector2 &p_position) = nullptr;
Input::CursorShape (*Input::get_current_cursor_shape_func)() = nullptr;
void (*Input::set_custom_mouse_cursor_func)(const Ref<Resource> &, Input::CursorShape, const Vector2 &) = nullptr;

// Accumulating immediately rather than deferred at flush
// is slightly more efficient (because of less allocations / transfer to the main buffer etc)
// but is less accurate timing wise, so should only be used in frame buffering mode.
void InputEventBuffer::accumulate_or_push_event(const Ref<InputEvent> &p_event, uint64_t p_timestamp) {
// Events can come in any time, including when we are preparing to read the incoming queue,
// so we must lock to prevent race condition.
MutexLock lock(data.incoming_mutex);

LocalVector<Event> &incoming = data.incoming[data.incoming_write];

// First, attempt to accumulate.
if (incoming.size()) {
Event &prev = incoming[incoming.size() - 1];
if (prev.event->accumulate(p_event)) {
return;
}
}

// Accumulate failed, fall back to push.
incoming.resize(incoming.size() + 1);
Event &e = incoming[incoming.size() - 1];
e.event = p_event;
e.timestamp = p_timestamp;
}

void InputEventBuffer::push_event(const Ref<InputEvent> &p_event, uint64_t p_timestamp) {
// Events can come in any time, including when we are preparing to read the incoming queue,
// so we must lock to prevent race condition.
MutexLock lock(data.incoming_mutex);

LocalVector<Event> &incoming = data.incoming[data.incoming_write];
incoming.resize(incoming.size() + 1);
Event &e = incoming[incoming.size() - 1];
e.event = p_event;
e.timestamp = p_timestamp;
}

void InputEventBuffer::_try_accumulate(uint64_t p_timestamp) {
// Try and accumulate events after the current front
// until we fail or pass the current timestamp.
List<Event>::Element *front = data.buffer.front();
Event &front_event = front->get();

while (List<Event>::Element *next = data.buffer.front()->next()) {
const Event &next_event = next->get();
if (next_event.timestamp > p_timestamp) {
// Don't want to accumulate events that are on the next tick..
// want to keep some resolution to the events.
break;
}
if (front_event.event->accumulate(next_event.event)) {
// Remove the accumulated event from the buffer.
data.buffer.swap(front, next);
data.buffer.pop_front();
// Check this does not invalidate front and front_event.
DEV_ASSERT(front == data.buffer.front());
DEV_ASSERT(&front_event == &front->get());
} else {
break;
}
}
}

void InputEventBuffer::flush_events(uint64_t p_current_timestamp, Input &r_input_handler, bool p_accumulate) {
// Flushing function is not re-entrant.
// This is unlikely to be called multithread, but this check should be cheap.
MutexLock lock(data.buffer_mutex);

// Only allow one flush at a time.
// This *does* occur notably on starting android debugging
// from the editor, where Main::iteration is called recursively.
if (data.flushing) {
return;
}
data.flushing = true;

data.incoming_mutex.lock();
SWAP(data.incoming_write, data.incoming_read);
data.incoming_mutex.unlock();

LocalVector<Event> &incoming = data.incoming[data.incoming_read];

for (uint32_t n = 0; n < incoming.size(); n++) {
// Copy to main buffer.
data.buffer.push_back(incoming[n]);
}

// Prepare for more input next time, prevent leak.
incoming.clear();

// Now we can read through the input buffer, up to the current time, and process.
while (data.buffer.front()) {
const Event &e = data.buffer.front()->get();

// Timestamp within range?
if (e.timestamp > p_current_timestamp) {
// We are up to date, process no more input on this tick / frame.
break;
}

if (p_accumulate) {
_try_accumulate(p_current_timestamp);
}

r_input_handler._parse_input_event_impl(e.event, false, false);

// Event processed, remove from buffer.
data.buffer.pop_front();
}

data.flushing = false;
}

Input *Input::get_singleton() {
return singleton;
}

void Input::flush_buffered_events_post_frame() {
if (data.use_legacy_flushing) {
// Matches old logic - if buffering, but not agile.
if (data.buffering_mode == BUFFERING_MODE_FRAME) {
_flush_buffered_events_ex(UINT64_MAX);
}
}
}

void Input::flush_buffered_events() {
_flush_buffered_events_ex(UINT64_MAX);
}

void Input::set_use_accumulated_input(bool p_enable) {
data.use_accumulated_input = p_enable;
_update_buffering_mode();
}

void Input::set_use_input_buffering(bool p_enable) {
data.use_buffering = p_enable;
_update_buffering_mode();
}

bool Input::is_using_input_buffering() const {
return data.use_buffering;
}

void Input::set_use_agile_flushing(bool p_enable) {
data.use_agile = p_enable;
_update_buffering_mode();
}

bool Input::is_agile_flushing() const {
return data.buffering_mode == BUFFERING_MODE_AGILE;
}

void Input::set_use_legacy_flushing(bool p_enable) {
data.use_legacy_flushing = p_enable;
}

void Input::set_has_input_thread(bool p_has_thread) {
data.has_input_thread = p_has_thread;
_update_buffering_mode();
}

void Input::_update_buffering_mode() {
// Logic here may appear confusing but it is historical,
// and to prevent compat breaking.
// Accumulated input was added in such a way as to override
// the use_buffering setting.
// i.e. Buffering is used if use_buffering is false, but accumulated is true,
// as accumulating needs the previous event in order to work.

// Additionally for agile input, it currently only makes sense to activate when
// the platform has a separate input thread, otherwise the extra processing
// on flush on each tick just wastes CPU.
if (data.use_accumulated_input || data.use_buffering) {
if (data.use_agile && data.has_input_thread) {
data.buffering_mode = BUFFERING_MODE_AGILE;
} else {
data.buffering_mode = BUFFERING_MODE_FRAME;
}
} else {
data.buffering_mode = BUFFERING_MODE_NONE;
}
}

void Input::set_mouse_mode(MouseMode p_mode) {
ERR_FAIL_INDEX((int)p_mode, 5);
set_mouse_mode_func(p_mode);
Expand Down Expand Up @@ -140,6 +320,7 @@ void Input::_bind_methods() {
ClassDB::bind_method(D_METHOD("parse_input_event", "event"), &Input::parse_input_event);
ClassDB::bind_method(D_METHOD("set_use_accumulated_input", "enable"), &Input::set_use_accumulated_input);
ClassDB::bind_method(D_METHOD("is_using_accumulated_input"), &Input::is_using_accumulated_input);

ClassDB::bind_method(D_METHOD("flush_buffered_events"), &Input::flush_buffered_events);

ADD_PROPERTY(PropertyInfo(Variant::INT, "mouse_mode"), "set_mouse_mode", "get_mouse_mode");
Expand Down Expand Up @@ -489,7 +670,7 @@ Vector3 Input::get_gyroscope() const {
return gyroscope;
}

void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_emulated) {
void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_emulated, bool p_unlock) {
// This function does the final delivery of the input event to user land.
// Regardless where the event came from originally, this has to happen on the main thread.
DEV_ASSERT(Thread::get_caller_id() == Thread::get_main_id());
Expand Down Expand Up @@ -546,9 +727,13 @@ void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_em
touch_event->set_position(mb->get_position());
touch_event->set_double_tap(mb->is_double_click());
touch_event->set_device(InputEvent::DEVICE_ID_EMULATION);
_THREAD_SAFE_UNLOCK_
event_dispatch_function(touch_event);
_THREAD_SAFE_LOCK_
if (p_unlock) {
_THREAD_SAFE_UNLOCK_
event_dispatch_function(touch_event);
_THREAD_SAFE_LOCK_
} else {
event_dispatch_function(touch_event);
}
}
}

Expand All @@ -574,9 +759,13 @@ void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_em
drag_event->set_velocity(get_last_mouse_velocity());
drag_event->set_device(InputEvent::DEVICE_ID_EMULATION);

_THREAD_SAFE_UNLOCK_
event_dispatch_function(drag_event);
_THREAD_SAFE_LOCK_
if (p_unlock) {
_THREAD_SAFE_UNLOCK_
event_dispatch_function(drag_event);
_THREAD_SAFE_LOCK_
} else {
event_dispatch_function(drag_event);
}
}
}

Expand Down Expand Up @@ -626,7 +815,7 @@ void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_em
}
button_event->set_button_mask(ev_bm);

_parse_input_event_impl(button_event, true);
_parse_input_event_impl(button_event, true, p_unlock);
}
}
}
Expand All @@ -652,7 +841,7 @@ void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_em
motion_event->set_velocity(sd->get_velocity());
motion_event->set_button_mask(mouse_button_mask);

_parse_input_event_impl(motion_event, true);
_parse_input_event_impl(motion_event, true, p_unlock);
}
}

Expand All @@ -678,9 +867,13 @@ void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_em

if (ge.is_valid()) {
if (event_dispatch_function) {
_THREAD_SAFE_UNLOCK_
event_dispatch_function(ge);
_THREAD_SAFE_LOCK_
if (p_unlock) {
_THREAD_SAFE_UNLOCK_
event_dispatch_function(ge);
_THREAD_SAFE_LOCK_
} else {
event_dispatch_function(ge);
}
}
}

Expand All @@ -703,9 +896,13 @@ void Input::_parse_input_event_impl(const Ref<InputEvent> &p_event, bool p_is_em
}

if (event_dispatch_function) {
_THREAD_SAFE_UNLOCK_
event_dispatch_function(p_event);
_THREAD_SAFE_LOCK_
if (p_unlock) {
_THREAD_SAFE_UNLOCK_
event_dispatch_function(p_event);
_THREAD_SAFE_LOCK_
} else {
event_dispatch_function(p_event);
}
}
}

Expand Down Expand Up @@ -865,7 +1062,7 @@ void Input::ensure_touch_mouse_raised() {
ev_bm.clear_flag(MouseButtonMask::LEFT);
button_event->set_button_mask(ev_bm);

_parse_input_event_impl(button_event, true);
_parse_input_event_impl(button_event, true, true);
}
}

Expand Down Expand Up @@ -941,47 +1138,48 @@ void Input::parse_input_event(const Ref<InputEvent> &p_event) {
}
#endif

if (use_accumulated_input) {
if (buffered_events.is_empty() || !buffered_events.back()->get()->accumulate(p_event)) {
buffered_events.push_back(p_event);
}
} else if (use_input_buffering) {
buffered_events.push_back(p_event);
if (data.buffering_mode == Input::BUFFERING_MODE_NONE) {
_parse_input_event_impl(p_event, false, true);
} else {
_parse_input_event_impl(p_event, false);
// We can accumulate immediately on input if in frame mode,
// but if in agile / logical mode accumulation is deferred until flushing,
// so we can just push directly.
if ((data.buffering_mode == Input::BUFFERING_MODE_FRAME) && data.use_accumulated_input) {
_event_buffer.accumulate_or_push_event(p_event, OS::get_singleton()->get_ticks_usec());
} else {
_event_buffer.push_event(p_event, OS::get_singleton()->get_ticks_usec());
}
}
}

void Input::flush_buffered_events() {
_THREAD_SAFE_METHOD_

while (buffered_events.front()) {
// The final delivery of the input event involves releasing the lock.
// While the lock is released, another thread may lock it and add new events to the back.
// Therefore, we get each event and pop it while we still have the lock,
// to ensure the list is in a consistent state.
List<Ref<InputEvent>>::Element *E = buffered_events.front();
Ref<InputEvent> e = E->get();
buffered_events.pop_front();

_parse_input_event_impl(e, false);
}
void Input::_flush_buffered_events_ex(uint64_t p_up_to_timestamp) {
_event_buffer.flush_events(p_up_to_timestamp, *this, data.use_accumulated_input);
}

bool Input::is_using_input_buffering() {
return use_input_buffering;
}
void Input::flush_buffered_events_iteration() {
// legacy did not flush here.
if (data.use_legacy_flushing) {
return;
}

void Input::set_use_input_buffering(bool p_enable) {
use_input_buffering = p_enable;
if (data.buffering_mode == BUFFERING_MODE_FRAME) {
_flush_buffered_events_ex(UINT64_MAX);
}
}

void Input::set_use_accumulated_input(bool p_enable) {
use_accumulated_input = p_enable;
void Input::flush_buffered_events_tick(uint64_t p_tick_timestamp) {
if (data.buffering_mode == BUFFERING_MODE_AGILE) {
_flush_buffered_events_ex(p_tick_timestamp);
}
}

bool Input::is_using_accumulated_input() {
return use_accumulated_input;
void Input::flush_buffered_events_frame() {
// If we are in legacy mode, if not NONE or FRAME,
// then it will be AGILE, in which case legacy had a flush
// here, so the new logic works as before.
if (data.buffering_mode == BUFFERING_MODE_AGILE) {
_flush_buffered_events_ex(UINT64_MAX);
}
}

void Input::release_pressed_events() {
Expand Down Expand Up @@ -1531,6 +1729,9 @@ Input::Input() {
parse_mapping(entries[i]);
}
}

// Make sure buffering mode is appropriate for the state.
_update_buffering_mode();
}

Input::~Input() {
Expand Down
Loading

0 comments on commit 045601e

Please sign in to comment.