pub struct Sender<T> { /* private fields */ }
Expand description
Implementations§
source§impl<T> Sender<T>
impl<T> Sender<T>
sourcepub fn new(init: T) -> Self
pub fn new(init: T) -> Self
Creates the sending-half of the watch
channel.
See documentation of watch::channel
for errors when calling this function.
Beware that attempting to send a value when there are no receivers will
return an error.
Examples
let sender = tokio::sync::watch::Sender::new(0u8);
assert!(sender.send(3).is_err());
let _rec = sender.subscribe();
assert!(sender.send(4).is_ok());
sourcepub fn send(&self, value: T) -> Result<(), SendError<T>>
pub fn send(&self, value: T) -> Result<(), SendError<T>>
Sends a new value via the channel, notifying all receivers.
This method fails if the channel is closed, which is the case when
every receiver has been dropped. It is possible to reopen the channel
using the subscribe
method. However, when send
fails, the value
isn’t made available for future receivers (but returned with the
SendError
).
To always make a new value available for future receivers, even if no
receiver currently exists, one of the other send methods
(send_if_modified
, send_modify
, or send_replace
) can be
used instead.
sourcepub fn send_modify<F>(&self, modify: F)where
F: FnOnce(&mut T),
pub fn send_modify<F>(&self, modify: F)where F: FnOnce(&mut T),
Modifies the watched value unconditionally in-place, notifying all receivers.
This can be useful for modifying the watched value, without having to allocate a new instance. Additionally, this method permits sending values even when there are no receivers.
Prefer to use the more versatile function Self::send_if_modified()
if the value is only modified conditionally during the mutable borrow
to prevent unneeded change notifications for unmodified values.
Panics
This function panics when the invocation of the modify
closure panics.
No receivers are notified when panicking. All changes of the watched
value applied by the closure before panicking will be visible in
subsequent calls to borrow
.
Examples
use tokio::sync::watch;
struct State {
counter: usize,
}
let (state_tx, state_rx) = watch::channel(State { counter: 0 });
state_tx.send_modify(|state| state.counter += 1);
assert_eq!(state_rx.borrow().counter, 1);
sourcepub fn send_if_modified<F>(&self, modify: F) -> boolwhere
F: FnOnce(&mut T) -> bool,
pub fn send_if_modified<F>(&self, modify: F) -> boolwhere F: FnOnce(&mut T) -> bool,
Modifies the watched value conditionally in-place, notifying all receivers only if modified.
This can be useful for modifying the watched value, without having to allocate a new instance. Additionally, this method permits sending values even when there are no receivers.
The modify
closure must return true
if the value has actually
been modified during the mutable borrow. It should only return false
if the value is guaranteed to be unmodified despite the mutable
borrow.
Receivers are only notified if the closure returned true
. If the
closure has modified the value but returned false
this results
in a silent modification, i.e. the modified value will be visible
in subsequent calls to borrow
, but receivers will not receive
a change notification.
Returns the result of the closure, i.e. true
if the value has
been modified and false
otherwise.
Panics
This function panics when the invocation of the modify
closure panics.
No receivers are notified when panicking. All changes of the watched
value applied by the closure before panicking will be visible in
subsequent calls to borrow
.
Examples
use tokio::sync::watch;
struct State {
counter: usize,
}
let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
let inc_counter_if_odd = |state: &mut State| {
if state.counter % 2 == 1 {
state.counter += 1;
return true;
}
false
};
assert_eq!(state_rx.borrow().counter, 1);
assert!(!state_rx.has_changed().unwrap());
assert!(state_tx.send_if_modified(inc_counter_if_odd));
assert!(state_rx.has_changed().unwrap());
assert_eq!(state_rx.borrow_and_update().counter, 2);
assert!(!state_rx.has_changed().unwrap());
assert!(!state_tx.send_if_modified(inc_counter_if_odd));
assert!(!state_rx.has_changed().unwrap());
assert_eq!(state_rx.borrow_and_update().counter, 2);
sourcepub fn send_replace(&self, value: T) -> T
pub fn send_replace(&self, value: T) -> T
Sends a new value via the channel, notifying all receivers and returning the previous value in the channel.
This can be useful for reusing the buffers inside a watched value. Additionally, this method permits sending values even when there are no receivers.
Examples
use tokio::sync::watch;
let (tx, _rx) = watch::channel(1);
assert_eq!(tx.send_replace(2), 1);
assert_eq!(tx.send_replace(3), 2);
sourcepub fn borrow(&self) -> Ref<'_, T>
pub fn borrow(&self) -> Ref<'_, T>
Returns a reference to the most recently sent value
Outstanding borrows hold a read lock on the inner value. This means that
long-lived borrows could cause the producer half to block. It is recommended
to keep the borrow as short-lived as possible. Additionally, if you are
running in an environment that allows !Send
futures, you must ensure that
the returned Ref
type is never held alive across an .await
point,
otherwise, it can lead to a deadlock.
Examples
use tokio::sync::watch;
let (tx, _) = watch::channel("hello");
assert_eq!(*tx.borrow(), "hello");
sourcepub fn is_closed(&self) -> bool
pub fn is_closed(&self) -> bool
Checks if the channel has been closed. This happens when all receivers have dropped.
Examples
let (tx, rx) = tokio::sync::watch::channel(());
assert!(!tx.is_closed());
drop(rx);
assert!(tx.is_closed());
sourcepub async fn closed(&self)
pub async fn closed(&self)
Completes when all receivers have dropped.
This allows the producer to get notified when interest in the produced
values is canceled and immediately stop doing work. Once a channel is
closed, the only way to reopen it is to call Sender::subscribe
to
get a new receiver.
If the channel becomes closed for a brief amount of time (e.g., the last
receiver is dropped and then subscribe
is called), then this call to
closed
might return, but it is also possible that it does not “notice”
that the channel was closed for a brief amount of time.
Cancel safety
This method is cancel safe.
Examples
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, rx) = watch::channel("hello");
tokio::spawn(async move {
// use `rx`
drop(rx);
});
// Waits for `rx` to drop
tx.closed().await;
println!("the `rx` handles dropped")
}
sourcepub fn subscribe(&self) -> Receiver<T>
pub fn subscribe(&self) -> Receiver<T>
Creates a new Receiver
connected to this Sender
.
All messages sent before this call to subscribe
are initially marked
as seen by the new Receiver
.
This method can be called even if there are no other receivers. In this case, the channel is reopened.
Examples
The new channel will receive messages sent on this Sender
.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, _rx) = watch::channel(0u64);
tx.send(5).unwrap();
let rx = tx.subscribe();
assert_eq!(5, *rx.borrow());
tx.send(10).unwrap();
assert_eq!(10, *rx.borrow());
}
The most recent message is considered seen by the channel, so this test is guaranteed to pass.
use tokio::sync::watch;
use tokio::time::Duration;
#[tokio::main]
async fn main() {
let (tx, _rx) = watch::channel(0u64);
tx.send(5).unwrap();
let mut rx = tx.subscribe();
tokio::spawn(async move {
// by spawning and sleeping, the message is sent after `main`
// hits the call to `changed`.
tokio::time::sleep(Duration::from_millis(10)).await;
tx.send(100).unwrap();
});
rx.changed().await.unwrap();
assert_eq!(100, *rx.borrow());
}
sourcepub fn receiver_count(&self) -> usize
pub fn receiver_count(&self) -> usize
Returns the number of receivers that currently exist.
Examples
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, rx1) = watch::channel("hello");
assert_eq!(1, tx.receiver_count());
let mut _rx2 = rx1.clone();
assert_eq!(2, tx.receiver_count());
}
sourcepub fn sender_count(&self) -> usize
pub fn sender_count(&self) -> usize
Returns the number of senders that currently exist.
Examples
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx1, rx) = watch::channel("hello");
assert_eq!(1, tx1.sender_count());
let tx2 = tx1.clone();
assert_eq!(2, tx1.sender_count());
assert_eq!(2, tx2.sender_count());
}
sourcepub fn same_channel(&self, other: &Self) -> bool
pub fn same_channel(&self, other: &Self) -> bool
Returns true
if senders belong to the same channel.
Examples
let (tx, rx) = tokio::sync::watch::channel(true);
let tx2 = tx.clone();
assert!(tx.same_channel(&tx2));
let (tx3, rx3) = tokio::sync::watch::channel(true);
assert!(!tx3.same_channel(&tx2));