Модель синхронизации потоков - Взаимодействие потоков
Взаимодействие потоков обычно представлено через каналы и реализовано в std::sync::mpsc(channel) модуле стандартной библиотеки Rust
|
📌 Новый раздел
- 👾 Best practice
- Варианты применения
|
|
|
Не стоит передавать информацию с помощью разделяемой памяти; лучше делитесь памятью, передавая информацию
|
|
crate postage
Каналы, которые хорошо интегрируются с асинхронным кодом, с другими опциями, чем Tokio
|
Когда использовать postage?
Если проект не использует Tokio или хочет быть независимым от рантайма.
Если нужен лёгкий и минималистичный API для асинхронных каналов.
В приложениях, где важна многопоточная или многозадачная асинхронная коммуникация.
use postage::prelude::*;
use postage::broadcast;
use async_std::task;
#[async_std::main]
async fn main() -> Result<(), Box> {
let mut sender = broadcast::channel(16).0;
let mut receiver1 = sender.subscribe();
let mut receiver2 = sender.subscribe();
task::spawn(async move {
while let Some(msg) = receiver1.recv().await {
println!("Receiver 1 got: {}", msg);
}
});
task::spawn(async move {
while let Some(msg) = receiver2.recv().await {
println!("Receiver 2 got: {}", msg);
}
});
sender.send("Hello, World!").await?;
sender.send("Another message").await?;
Ok(())
}
|
|
|
Точные способы синхронизации потоков - Канал channel.
Асинхронную связь потоков.
Sender tx Трансмиттер - отправитель
Receiver rx Ресивер - получатель
Каналы - лучший способ связи между потоками. Это как очередь между потоками. Объекты могут быть отправлены «Sender» из разных потоков, в то время как все отправленные значения могут быть получены «Receiver» в другом потоке.
|
|
async channel в Tokio
tokio/sync
|
|
|
Приоритет для обмена данными каналам channel
Mutex vs Channel
lock-and-condition-variable
|
Если вы можете смоделировать параллелизм с использованием channel передачи сообщений и избежать совместного использования Mutex, это обычно делает систему более простой для понимания.
Тем не менее, иногда, даже по историческим причинам, совместное общее состояник может быть подходящим вариантом.
В таком случае condvars может дать вам очень четкую модель того, «как ваши потоки работают на основе изменений в общем состоянии».
Модель передачи сообщений имеет тенденцию работать хорошо, когда есть естественный ресурс, который уже сериализует доступы (например, устройство ввода-вывода), линейный конечный автомат, лучше всего выраженный в виде одной процедуры, или когда критические секции длинные.
Мьютексы хорошо работают, когда критические секции короткие и могут быть вызваны во многих местах, или когда можно эффективно использовать блокировки чтения и записи.
|
|
Struct std::sync::mpsc::Sender
send() - Пытается отправить значение по этому каналу и возвращает его обратно, если отправить его не удалось
std::sync::mpsc::Sender
|
use std::sync::mpsc::channel;
use std::thread;
fn main(){
let (sender, receiver) = channel();
let sender2 = sender.clone();
// First thread owns sender
thread::spawn(move || {
sender.send(1).unwrap();
});
// Second thread owns sender2
thread::spawn(move || {
sender2.send(2).unwrap();
});
let msg = receiver.recv().unwrap();
let msg2 = receiver.recv().unwrap();
assert_eq!(3, msg + msg2);
}
|
|
std::sync::mpsc::sync_channel() - Создает новый синхронный ограниченный канал.
Struct std::sync::mpsc::SyncSender - С ограниченными (синхронными) каналами send можно заблокировать текущий поток
send() - Отправляет значение по этому синхронному каналу, блокирует пока не освободится буфер или получит Receiver
try_send() - Пытается отправить значение по этому каналу без блокировки, сбой если буфер канала заполнен или ни один получатель не ожидает
|
В стандартной библиотеке Rust мы можем выбирать между использованием канала по умолчанию std::sync::mpsc::channel, который поставляется с отправителем с неограниченным буфером и, следовательно, никогда не блокирует отправку, и std::sync::mpsc::sync_channel, который поставляется с SyncSender, который блокирует отправку, если буфер заполнен.
use std::sync::mpsc::{sync_channel, SyncSender, Receiver};
use std::thread;
fn main() {
let (sender, receiver):(SyncSender,Receiver) = sync_channel(1);
// this returns immediately
sender.send(1).unwrap();
thread::spawn(move|| {
// this will block until the previous message has been received
sender.send(2).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
}
|
|
Struct std::sync::mpsc::Receiver
try_recv() - Пытается вернуть ожидающее значение получателю без блокировки.
recv() - Пытается дождаться значения, возвращая ошибку, если соответствующий канал завис, с блокировкой
recv_timeout() - как recv() но если он ожидает более timeout
recv_deadline() - как recv() но если deadline достигнут
iter() - итератор блокирующий ожидание сообщений, но никогда panic!. Он вернется None, когда канал повесит трубку
try_iter() - итератор попытается получить все ожидающие значения. Вернется None, если больше нет ожидающих значений или если канал повесил трубку.
|
use std::sync::mpsc::channel;
use std::thread;
fn main(){
let (send, recv) = channel();
thread::spawn(move || {
send.send(1).unwrap();
send.send(2).unwrap();
send.send(3).unwrap();
});
let mut iter = recv.iter();
assert_eq!(iter.next(), Some(1));
assert_eq!(iter.next(), Some(2));
assert_eq!(iter.next(), Some(3));
assert_eq!(iter.next(), None);
}
|
|
Rust обеспечивает асинхронную channels связь между потоками.
Каналы позволяют иметь однонаправленный поток информации между двумя конечными точками: Sender и Receiver
rust-by-example/std_misc/channels
|
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;
static NTHREADS: i32 = 3;
fn main() {
// Каналы имеют две конечные точки: `Sender ` и `Receiver `, где `T` - тип передаваемого сообщения
let (tx, rx): (Sender, Receiver) = mpsc::channel();
for id in 0..NTHREADS {
let thread_tx = tx.clone();// Конечную точку отправителя можно скопировать
if id%2==0{continue;} // для проверки ожидания Receiver всех отработанных Sender
// Каждый поток будет отправлять свой идентификатор через канал
thread::spawn(move || {
//Поток берет на себя ответственность за `thread_tx`
// Каждый поток ставит в очередь сообщение в канале
// `send` не блокирует поток
thread_tx.send(id).unwrap();
// Отправка является неблокирующей операцией, поток будет продолжаться
// сразу после отправки своего сообщенияe
println!("thread {} finished", id);
});
}
// Если не удалить sender то receiver будет ждать, пока все senders завершат свою отправку
// исходный receiver никогда не отбрасывается, так что receiver ждет вечно
// Канал считается закрытым , если либо передающая, либо принимающая его половина уничтожена.
std::mem::drop(tx);
// Здесь собраны все сообщения
let mut ids = Vec::with_capacity(NTHREADS as usize);
for _ in 0..NTHREADS {
// Метод `recv` забирает сообщение из канала
// `recv` блокирует текущий поток, если сообщений нет
ids.push(rx.recv());
}
println!("{:?}", ids); //Показывать порядок отправки сообщений
}
|
|
Передача изменяемых данных не копируя их
|
Есть поток и надо что-либо в него передать.
Передача изменяемых данных не копируя их - синхронизированные изменяемые данные.
use std::sync::mpsc::channel;
fn main(){
let (tx,rx) = channel();
let result = std::thread::spawn( move ||{
let mut xs:Vec = rx.recv().unwrap();// Ресивер получает данные
xs[2]=10;
xs.push(40);
println!("{:?}",xs);// [1, 2, 10, 40]
});
let mut xs = vec![1,2,3];// буффер останется в куче передается только ссылка на буффер
tx.send(xs).unwrap();// Трансмитер отправляет данные
result.join();// результат получим после отправки в канал данных иначе зависнем
}
|
|
Получение данных через итератор
|
fn main(){
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// можно получить из канала данные с помощью итератора
// println!("Got: {}", rx.iter().next().unwrap());
for received in rx {
println!("Got: {}", received);
}
/*
loop {
match rx.iter().next() {
Some(x) => {
println!("продолжим {}", x);
},
None => { break }
}
}*/
}
|
|
Несколько отправителей tx в один приемник rx
Способ которым стандартная библиотека Rust реализует каналы, означает, что канал может иметь несколько отправляющих источников генерирующих значения, но только одну принимающую сторону, которая потребляет эти значения.
|
use std::sync::mpsc::{Sender, Receiver};
use std::thread::JoinHandle;
fn test2() {
let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel::<String>();
// Создаем 2 потока и в каждом клонируем передатчик tx который отправляет данные в общий приемник rx
for i in (0..2){
let tx_ = mpsc::Sender::clone(&tx);
thread::Builder::new().name(i.to_string()).spawn(move || {
let vals = vec![
format!("{} {}", "hi ", thread::current().name().unwrap_or("unknown name")),
format!("{} {}", "from ", thread::current().name().unwrap_or("unknown name")),
format!("{} {}", "the ", thread::current().name().unwrap_or("unknown name")),
format!("{} {}", "thread ", thread::current().name().unwrap_or("unknown name"))
];
for val in vals {
tx_.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
}
/*let mut ids = Vec::with_capacity(8usize);
for _ in 0..8 {
ids.push(rx.recv());
}
for val in ids {
println!("Got: {:?}", val);
}*/
// Что бы не зависало нужно получить все данные из оригинального Sender tx а не клона его или через фиксированный цикл итераций
let tx_ = mpsc::Sender::clone(&tx);
thread::Builder::new().name("tx".to_string()).spawn(move || {
let vals = vec![
format!("{} {}", "hi ", thread::current().name().unwrap_or("unknown name")),
format!("{} {}", "from ", thread::current().name().unwrap_or("unknown name")),
format!("{} {}", "the ", thread::current().name().unwrap_or("unknown name")),
format!("{} {}", "thread ", thread::current().name().unwrap_or("unknown name"))
];
for val in vals {
tx_.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
std::mem::drop(tx);
while let Ok(j) = rx.recv() {
println!("Got: {:?}", j);
}
}
|
|
|
Мы можем обернуть Receiver мьюксом и сделать его разделяемым.
pub mod shared_channel {
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver};
/// Потокобезопасная обертка вокруг `Receiver`.
#[derive(Clone)]
pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>);
impl<T> Iterator for SharedReceiver<T> {
type Item = T;
/// Получает следующий объект от обернутого получателя.
fn next(&mut self) -> Option<T> {
let guard = self.0.lock().unwrap();
guard.recv().ok()
}
}
/// Создает новый канал, получатель которого может разделяться между потоками.
/// Возвращает отправителя и получателя, как стандартная функция
/// `channel()`, и иногда может быть подставлена вместо нее.
pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
let (sender, receiver) = channel();
(sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
}
}
|
|
|
crate crossbeam может гарантировать что поток не переживет ссылку на данные т.е. можно передавать ссылку на изменяемые данные
|
|
crossbeam дает гарантию что потоки завершатся когда мы выйдем из ф-ции scope
|
crate crossbeam может гарантировать что поток не переживет ссылку на данные т.е. можно передавать ссылку на изменяемые данные.
extern crate crossbeam;
fn main(){
let mut xs:[i32;4] =[0,0,0,0];
crossbeam::scope(|scope_|{
for i in &mut xs{
scope_.spawn(move || {
*i+=1;
});
}
});
println!("{:?}",xs);
}
|
|
|
fn main(){
let mut v:Vec = vec![0,0,0,0];
/*thread::spawn(/*move*/ || { // в таком виде только через перемещение вектора но после потока вектора уже не будет
v.push(1);
println!("{:?}",v);
}).join();*/
crossbeam::scope(|scope_|{
for i in v.iter_mut(){
scope_.spawn(move || {
*i=1;
}).join();
}
});
println!("{:?}",v);
}
|
|
|
fn main(){
let mut v:Vec = vec![];
crossbeam::scope(|scope_|{
let mut v2 = &mut v;
scope_.spawn(move || {
v2.push(1);
}).join();
});
assert_eq!(1,v[0]); // println!("{:?}",v);
}
|
|
|
fn main(){
let xs = Mutex::new([0,0,0,0]);
crossbeam::scope(|scope_|{
for _ in 0..10{ // запускаем 10 потоков
scope_.spawn(||{
let mut guard = xs.lock().unwrap();// блокируем данные
// теперь с данными безопасно обращаться так как мы держим блокировку Mutex
let xs:&mut[i32;4] = &mut guard;// разименовывание Mutex
for i in xs{
*i+=1;
}
});
}
});
println!("{:?}",*xs.lock().unwrap());// [10,10,10,10]
}
|
|
|
Каналы Crossbeam являются альтернативой std::sync::mpsc каналам, предоставляемым стандартной библиотекой.
Это улучшение с точки зрения производительности, эргономики и возможностей.
Каналы создаются с использованием двух функций:
- bounded создает канал ограниченной емкости, т. е. существует ограничение на количество сообщений, которое он может удерживать.
- unbounded создает канал неограниченной емкости, т. е. он может содержать произвольное количество сообщений в любое время.
|
|
|
Канал не более 5 сообщений
fn main(){
let (tx, rx): (crossbeam_channel::Sender, crossbeam_channel::Receiver) = crossbeam_channel::bounded( 5 );
}
|
|
|
extern crate crossbeam;
use crossbeam_channel as channel;
fn main(){
let (s, r) = channel::unbounded();
crossbeam::scope(|scope| {
// Spawn a thread that sends one message and then receives one.
scope.spawn(|| {
s.send(1);
r.recv().unwrap();
});
// Spawn another thread that does the same thing.
scope.spawn(|| {
s.send(2);
r.recv().unwrap();
});
});
}
|
|
select! выдаст первое пришедшее сообщение или случайное в случае одновременного прибытия
|
select! выдаст первое пришедшее сообщение или случайное в случае одновременного прибытия
#[macro_use]
extern crate crossbeam_channel;
fn main(){
let (tx, rx): (crossbeam_channel::Sender<&str>, crossbeam_channel::Receiver<&str>) = crossbeam_channel::unbounded();
let (tx2, rx2): (crossbeam_channel::Sender<&str>, crossbeam_channel::Receiver<&str>) = crossbeam_channel::unbounded();
thread::spawn(move || {
tx.send("Hello");
});
thread::spawn(move || {
tx2.send("Hi");
});
select! {
recv(rx, msg) => println!("Message: {:?}",msg.unwrap()),
recv(rx2, msg) => println!("Message: {:?}",msg.unwrap()),
default => println!("the channel is full"),
}
}
|
|
crossbeam_channel::tick - Создает приемник, который периодически отправляет по одному сообщению.
|
fn main(){
let ms:fn(u64)->Duration = |ms| Duration::from_millis(ms);
let r = crossbeam_channel::tick( ms(100) );
println!("{:?}",r.recv().unwrap());// Instant { tv_sec: 15753, tv_nsec: 440847088 }
std::thread::sleep(Duration::new(2, 0));
println!("{:?}",r.recv().unwrap());//Instant { tv_sec: 15753, tv_nsec: 540949952 }
}
|
|
Ждем 100 миллисекунд ответ или обрыв
|
use std::time::Duration;
fn test_crossbeam(){
let (tx, rx): (crossbeam_channel::Sender<&str>, crossbeam_channel::Receiver<&str>) = crossbeam_channel::unbounded();
let (tx2, rx2): (crossbeam_channel::Sender<&str>, crossbeam_channel::Receiver<&str>) = crossbeam_channel::unbounded();
thread::spawn(move || {
std::thread::sleep(Duration::new(2, 0));
tx.send("Hello");
});
thread::spawn(move || {
std::thread::sleep(Duration::new(2, 0));
tx2.send("Hi");
});
/* select! {
recv(rx, msg) => println!("Message: {:?}",msg.unwrap()),
recv(rx2, msg) => println!("Message: {:?}",msg.unwrap()),
default => println!("the channel is full"),
}*/
let timeout = Duration::from_millis(100);
select! {
recv(rx, msg) => match msg {
Some(msg) => println!("received {:?}", msg),
None => println!("the channel is closed"),
}
recv(rx2, msg) => match msg {
Some(msg) => println!("received {:?}", msg),
None => println!("the channel is closed"),
}
recv(crossbeam_channel::after(timeout)) => println!("timed out; the channel is still empty"),
}
}
|
|
Жизненный цикл состоит из генерации квадратных матриц одним Producer и вычисление этих матриц двумя Consumer.
|
extern crate crossbeam;
extern crate rand;
extern crate rayon;
#[macro_use]
extern crate crossbeam_channel;
use rand::thread_rng;
use rand::Rng;
use rayon::prelude::*;
use std::collections::HashMap;
use std::io::Write;
use std::sync::mpsc;
use std::thread;
/// # Parallel matrix counting.
///
/// The life cycle consists of the generation of square matrices by a single `Producer`
/// and the calculation of these matrices by two `Consumer`.
///
/// ## Examples
///
/// Basic usage:
///
/// ` ` `rust
///
/// use threads_synchronization_and_parallelism::*;
///
/// let (tx, rx): (
/// crossbeam_channel::Sender>,
/// crossbeam_channel::Receiver>,
/// ) = crossbeam_channel::unbounded();
///
/// let rx_2 = rx.clone();
///
/// crossbeam::scope(|scope_| {
/// scope_.spawn(move || loop {
/// tx.send(Producer::generate_matrix());
/// });
///
/// scope_.spawn(move || {
/// for _i in rx {
/// Consumer::sum_matrix(_i);
/// }
/// });
///
/// scope_.spawn(move || {
/// for _i in rx_2 {
/// Consumer::sum_matrix(_i);
/// }
/// });
///
/// });
/// ` ` `
mod threads_synchronization_and_parallelism {
use super::*;
/// `Producer` continuously generates square matrixes of random `u8` elements and size `4096`.
pub struct Producer;
/// Implement Producer.
impl Producer {
/// Implement generates square matrixes.
pub fn generate_matrix() -> HashMap<(i32, i32), u8> {
let mut matrix: HashMap<(i32, i32), u8> = HashMap::with_capacity(4096);
let mut rng = thread_rng();
for x in (1..65) {
for y in (1..65) {
matrix.insert((x, y), rng.gen::());
}
}
matrix
}
}
/// `Consumer` takes generated matrix, counts sum of all its elements and prints the sum to STDOUT.
#[derive(Debug)]
pub struct Consumer;
/// Implement Consumer.
impl Consumer {
/// Implement the calculation of the sum of a square matrix.
/// The matrix is counted in parallel.
pub fn sum_matrix(matrix: HashMap<(i32, i32), u8>) {
let sum: u32 = matrix.par_iter().map(|(&k, &val)| val as u32).sum();
writeln!(std::io::stdout(), "Matrix sum:{}", sum);
}
}
}
fn main() {
use threads_synchronization_and_parallelism::*;
let (tx, rx): (
crossbeam_channel::Sender>,
crossbeam_channel::Receiver>,
) = crossbeam_channel::unbounded();
let rx_2 = rx.clone();
crossbeam::scope(|scope_| {
scope_.spawn(move || loop {
tx.send(Producer::generate_matrix());
});
scope_.spawn(move || {
for _i in rx {
Consumer::sum_matrix(_i);
}
});
scope_.spawn(move || {
for _i in rx_2 {
Consumer::sum_matrix(_i);
}
});
});
}
|
|
Жизненный цикл состоит из генерации квадратных матриц одним Producer и вычисление этих матриц двумя Consumer.
|
use std::sync::mpsc;
use std::thread;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
struct Producer(mpsc::Receiver<[[u8; 64]; 64]>);
impl Producer {
fn new() -> Self {
let (tx, rx) = mpsc::channel();
thread::spawn(move || loop {
let mut matrix = [[0_u8; 64]; 64];
for row in matrix.iter_mut() {
thread_rng().fill(row);
}
tx.send(matrix).unwrap();
});
Producer(rx)
}
fn recv(&self) -> [[u8; 64]; 64] {
self.0.recv().unwrap()
}
}
fn main() {
let producer = Producer::new();
loop {
let matrix = producer.recv();
crossbeam::scope(|scope| {
let handles = (0..)
.map(|_| {
scope.spawn(|_| {
let sum: u64 = matrix
.into_par_iter()
.map(|x: &[u8; 64]| x.iter().fold(0_u64, |a, b| a + u64::from(*b)))
.sum();
sum
})
})
.take(2)
.collect::>();
let results = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect::>();
assert_eq!(results[0], results[1]);
println!("{}", results[0]);
})
.unwrap();
}
}
|
|
|
|
|
|
use tokio::sync::mpsc::{self, Receiver};
async fn ping_handler(mut input: Receiver<()>) {
let mut count: usize = 0;
while let Some(_) = input.recv().await {
count += 1;
println!("Received {count} pings so far.");
}
println!("ping_handler complete");
}
#[tokio::main]
async fn main() {
let (sender, receiver) = mpsc::channel(32);
let ping_handler_task = tokio::spawn(ping_handler(receiver));
for i in 0..10 {
sender.send(()).await.expect("Failed to send ping.");
println!("Sent {} pings so far.", i + 1);
}
drop(sender);
ping_handler_task.await.expect("Something went wrong in ping handler task.");
}
|
|
|
Crate flume — это библиотека для асинхронных и синхронных каналов, обеспечивающая удобный и мощный интерфейс для передачи сообщений между потоками или задачами. Flume предоставляет альтернативу стандартным каналам Rust (std::sync::mpsc) и каналам из Tokio, с акцентом на производительность и удобство использования.
Предоставляет богатую функциональность, включая таймауты, выборку (select) сообщений из нескольких каналов и поддержку AsyncRead/AsyncWrite.
use flume::{unbounded, Selector};
fn main() {
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let selector = Selector::new()
.recv(&rx1, |msg| println!("Received from channel 1: {}", msg))
.recv(&rx2, |msg| println!("Received from channel 2: {}", msg));
std::thread::spawn(move || {
tx1.send("Message 1").unwrap();
tx2.send("Message 2").unwrap();
});
selector.wait(); // Ожидаем сообщение из любого канала
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|