Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync, coop: apply cooperative scheduling to sync::watch #6846

Merged

Conversation

tglane
Copy link
Contributor

@tglane tglane commented Sep 14, 2024

Motivation

#6839 showed that sync::watch::Receiver::changed will not cooperatively yield control back to the runtime which led to it blocking the thread it was running on.
Edit: This is also true for sync::watch::Receiver::wait_for and sync::watch::Sender::closed

Solution

Added cooperative scheduling to sync::watch::Receiver::changed by utilizing the runtime::coop module.
More specifically I added a new future type that wraps the implementation of sync::watch::Receiver::changed and checks if the current task has exceeded its budget before polling the wrapped future.

Closes #6839.

@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Sep 14, 2024
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync M-coop Module: tokio/coop labels Sep 14, 2024
Comment on lines 745 to 748
pub async fn changed(&mut self) -> Result<(), error::RecvError> {
changed_impl(&self.shared, &mut self.version).await
crate::runtime::coop::budget_constraint(changed_impl(&self.shared, &mut self.version)).await
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! It has to be a loom test, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not necessary. You can do something like this:

let my_fut = async { loop { chan.changed().await } };
tokio::select! {
    biased;
    _ = my_fut => {},
    _ = core::future::ready(()) => {},
}

Without coop, this test would run forever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added tests to tokio::tests::sync_watch for changed, wait_for and closed.

tokio/src/sync/watch.rs Show resolved Hide resolved
tokio/src/runtime/coop.rs Outdated Show resolved Hide resolved
tokio/src/runtime/coop.rs Outdated Show resolved Hide resolved
tokio/src/runtime/coop.rs Outdated Show resolved Hide resolved
@tglane tglane changed the title sync, coop: apply cooperative scheduling to sync::watch::Receiver::changed sync, coop: apply cooperative scheduling to sync::watch Sep 14, 2024
tokio/tests/sync_watch.rs Outdated Show resolved Hide resolved
Comment on lines 847 to 850
closed = changed_impl(&self.shared, &mut self.version).await.is_err();
closed = cooperative(changed_impl(&self.shared, &mut self.version))
.await
.is_err();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't make wait_for cooperative, as it may exit before it reaches this call without touching the budget. You need to wrap the entire function.

Also, you may want to make a wait_for_inner method instead of wrapping the entire body in cooperative(async { ... }) to avoid extra indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yea I totally missed that! Thanks. Done in d5a6b3b.

biased;
_ = async {
loop {
let _ = rx.wait_for(|val| *val == 1).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let _ = rx.wait_for(|val| *val == 1).await;
assert!(rx.wait_for(|val| *val == 1).await.is_err());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in d5a6b3b.

tokio/tests/sync_watch.rs Show resolved Hide resolved
@tglane tglane force-pushed the sync-cooperative-scheduling-for-watch branch from d5a6b3b to 95e4f9b Compare September 26, 2024 10:50
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

@Darksonn Darksonn enabled auto-merge (squash) September 26, 2024 10:56
@Darksonn Darksonn merged commit c8af499 into tokio-rs:master Sep 26, 2024
81 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-coop Module: tokio/coop M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

watch::Receiver::changed does use cooperative scheduling
2 participants