Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Тема
Описание
Доп.

Модель синхронизации потоков - Взаимодействие потоков

Взаимодействие потоков обычно представлено через каналы и реализовано в std::sync::mpsc(channel) модуле стандартной библиотеки Rust

📌 Новый раздел

  • 👾 Best practice
  • Варианты применения

Не стоит передавать информацию с помощью разделяемой памяти; лучше делитесь памятью, передавая информацию

crate postage

Каналы, которые хорошо интегрируются с асинхронным кодом, с другими опциями, чем Tokio

Когда использовать postage? Если проект не использует Tokio или хочет быть независимым от рантайма. Если нужен лёгкий и минималистичный API для асинхронных каналов. В приложениях, где важна многопоточная или многозадачная асинхронная коммуникация.


use postage::prelude::*;
use postage::broadcast;
use async_std::task;

#[async_std::main]
async fn main() -> Result<(), Box> {
    let mut sender = broadcast::channel(16).0;
    let mut receiver1 = sender.subscribe();
    let mut receiver2 = sender.subscribe();

    task::spawn(async move {
        while let Some(msg) = receiver1.recv().await {
            println!("Receiver 1 got: {}", msg);
        }
    });

    task::spawn(async move {
        while let Some(msg) = receiver2.recv().await {
            println!("Receiver 2 got: {}", msg);
        }
    });

    sender.send("Hello, World!").await?;
    sender.send("Another message").await?;

    Ok(())
}

Точные способы синхронизации потоков - Канал channel.

Асинхронную связь потоков.

Sender tx Трансмиттер - отправитель

Receiver rx Ресивер - получатель

Каналы - лучший способ связи между потоками. Это как очередь между потоками. Объекты могут быть отправлены «Sender» из разных потоков, в то время как все отправленные значения могут быть получены «Receiver» в другом потоке.

async channel в Tokio

tokio/sync

Приоритет для обмена данными каналам channel

Mutex vs Channel

lock-and-condition-variable

Если вы можете смоделировать параллелизм с использованием channel передачи сообщений и избежать совместного использования Mutex, это обычно делает систему более простой для понимания.

Тем не менее, иногда, даже по историческим причинам, совместное общее состояник может быть подходящим вариантом. В таком случае condvars может дать вам очень четкую модель того, «как ваши потоки работают на основе изменений в общем состоянии».

Модель передачи сообщений имеет тенденцию работать хорошо, когда есть естественный ресурс, который уже сериализует доступы (например, устройство ввода-вывода), линейный конечный автомат, лучше всего выраженный в виде одной процедуры, или когда критические секции длинные.

Мьютексы хорошо работают, когда критические секции короткие и могут быть вызваны во многих местах, или когда можно эффективно использовать блокировки чтения и записи.

Struct std::sync::mpsc::Sender

send() - Пытается отправить значение по этому каналу и возвращает его обратно, если отправить его не удалось

std::sync::mpsc::Sender


use std::sync::mpsc::channel;
use std::thread;
fn main(){
    let (sender, receiver) = channel();
    let sender2 = sender.clone();

    // First thread owns sender
    thread::spawn(move || {
        sender.send(1).unwrap();
    });

    // Second thread owns sender2
    thread::spawn(move || {
        sender2.send(2).unwrap();
    });

    let msg = receiver.recv().unwrap();
    let msg2 = receiver.recv().unwrap();

    assert_eq!(3, msg + msg2);
}

std::sync::mpsc::sync_channel() - Создает новый синхронный ограниченный канал.

Struct std::sync::mpsc::SyncSender - С ограниченными (синхронными) каналами send можно заблокировать текущий поток

send() - Отправляет значение по этому синхронному каналу, блокирует пока не освободится буфер или получит Receiver

try_send() - Пытается отправить значение по этому каналу без блокировки, сбой если буфер канала заполнен или ни один получатель не ожидает

В стандартной библиотеке Rust мы можем выбирать между использованием канала по умолчанию std::sync::mpsc::channel, который поставляется с отправителем с неограниченным буфером и, следовательно, никогда не блокирует отправку, и std::sync::mpsc::sync_channel, который поставляется с SyncSender, который блокирует отправку, если буфер заполнен.


use std::sync::mpsc::{sync_channel, SyncSender, Receiver};
use std::thread;
fn main() {
    let (sender, receiver):(SyncSender,Receiver) = sync_channel(1);
    // this returns immediately
    sender.send(1).unwrap();
    
    thread::spawn(move|| {
        // this will block until the previous message has been received
        sender.send(2).unwrap();
    });
    assert_eq!(receiver.recv().unwrap(), 1);
    assert_eq!(receiver.recv().unwrap(), 2);
}

Struct std::sync::mpsc::Receiver

try_recv() - Пытается вернуть ожидающее значение получателю без блокировки.

recv() - Пытается дождаться значения, возвращая ошибку, если соответствующий канал завис, с блокировкой

recv_timeout() - как recv() но если он ожидает более timeout

recv_deadline() - как recv() но если deadline достигнут

iter() - итератор блокирующий ожидание сообщений, но никогда panic!. Он вернется None, когда канал повесит трубку

try_iter() - итератор попытается получить все ожидающие значения. Вернется None, если больше нет ожидающих значений или если канал повесил трубку.


use std::sync::mpsc::channel;
use std::thread;
fn main(){
    let (send, recv) = channel();

    thread::spawn(move || {
        send.send(1).unwrap();
        send.send(2).unwrap();
        send.send(3).unwrap();
    });

    let mut iter = recv.iter();
    assert_eq!(iter.next(), Some(1));
    assert_eq!(iter.next(), Some(2));
    assert_eq!(iter.next(), Some(3));
    assert_eq!(iter.next(), None);
}

Rust обеспечивает асинхронную channels связь между потоками.

Каналы позволяют иметь однонаправленный поток информации между двумя конечными точками: Sender и Receiver

rust-by-example/std_misc/channels


use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;

static NTHREADS: i32 = 3;
fn main() {
    // Каналы имеют две конечные точки: `Sender ` и `Receiver `, где `T` - тип передаваемого сообщения
    let (tx, rx): (Sender, Receiver) = mpsc::channel();
    for id in 0..NTHREADS {
        let thread_tx = tx.clone();// Конечную точку отправителя можно скопировать
        if id%2==0{continue;} // для проверки ожидания Receiver всех отработанных Sender
        // Каждый поток будет отправлять свой идентификатор через канал
        thread::spawn(move || {
            //Поток берет на себя ответственность за `thread_tx`
            //  Каждый поток ставит в очередь сообщение в канале
            // `send` не блокирует поток
            thread_tx.send(id).unwrap();

            // Отправка является неблокирующей операцией, поток будет продолжаться
            // сразу после отправки своего сообщенияe
            println!("thread {} finished", id);
        });
    }

    // Если не удалить sender то receiver будет ждать, пока все senders завершат свою отправку
    // исходный receiver никогда не отбрасывается, так что receiver ждет вечно
    // Канал считается закрытым , если либо передающая, либо принимающая его половина уничтожена.
    std::mem::drop(tx);

    // Здесь собраны все сообщения
    let mut ids = Vec::with_capacity(NTHREADS as usize);
    for _ in 0..NTHREADS {
        // Метод `recv` забирает сообщение из канала
        //  `recv` блокирует текущий поток, если сообщений нет
        ids.push(rx.recv());
    }

    println!("{:?}", ids);  //Показывать порядок отправки сообщений
}

Передача изменяемых данных не копируя их

Есть поток и надо что-либо в него передать. Передача изменяемых данных не копируя их - синхронизированные изменяемые данные.


use std::sync::mpsc::channel;
fn main(){
    let (tx,rx) = channel();
    let result = std::thread::spawn( move ||{ 
        let mut xs:Vec = rx.recv().unwrap();// Ресивер получает данные
        xs[2]=10;
        xs.push(40);
        println!("{:?}",xs);// [1, 2, 10, 40]
    });

    let mut xs = vec![1,2,3];// буффер останется в куче передается только ссылка на буффер
    tx.send(xs).unwrap();// Трансмитер отправляет данные

    result.join();// результат получим после отправки в канал данных иначе зависнем
}

Получение данных через итератор


fn main(){
 let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    // можно получить из канала данные с помощью итератора
    // println!("Got: {}", rx.iter().next().unwrap());
    for received in rx {
        println!("Got: {}", received);
    }
    /*
    loop {
        match rx.iter().next() {
            Some(x) => {
                println!("продолжим {}", x);
            },
            None => { break }
        }
    }*/
}

Несколько отправителей tx в один приемник rx

Способ которым стандартная библиотека Rust реализует каналы, означает, что канал может иметь несколько отправляющих источников генерирующих значения, но только одну принимающую сторону, которая потребляет эти значения.

use std::sync::mpsc::{Sender, Receiver};
use std::thread::JoinHandle;
fn test2() {
    let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel::<String>();
    // Создаем 2 потока и в каждом клонируем передатчик tx который отправляет данные в общий приемник rx
    for i in (0..2){
       let  tx_ = mpsc::Sender::clone(&tx);
        thread::Builder::new().name(i.to_string()).spawn(move || {
            let vals = vec![
                format!("{} {}", "hi ", thread::current().name().unwrap_or("unknown name")),
                format!("{} {}", "from ", thread::current().name().unwrap_or("unknown name")),
                format!("{} {}", "the ", thread::current().name().unwrap_or("unknown name")),
                format!("{} {}", "thread ", thread::current().name().unwrap_or("unknown name"))
            ];
            for val in vals {
                tx_.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
     }
    /*let mut ids = Vec::with_capacity(8usize);
   for _ in 0..8 {
       ids.push(rx.recv());
   }
   for val in ids {
       println!("Got: {:?}", val);
   }*/
    // Что бы не зависало нужно получить все данные из оригинального Sender tx а не клона его или через фиксированный цикл итераций
    let  tx_ = mpsc::Sender::clone(&tx);
    thread::Builder::new().name("tx".to_string()).spawn(move || {
        let vals = vec![
            format!("{} {}", "hi ", thread::current().name().unwrap_or("unknown name")),
            format!("{} {}", "from ", thread::current().name().unwrap_or("unknown name")),
            format!("{} {}", "the ", thread::current().name().unwrap_or("unknown name")),
            format!("{} {}", "thread ", thread::current().name().unwrap_or("unknown name"))
        ];
        for val in vals {
            tx_.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

   std::mem::drop(tx);
   
    while let Ok(j) = rx.recv() {
        println!("Got: {:?}", j);
    }
}

Разделяемый Receiver

Мы можем обернуть Receiver мьюксом и сделать его разделяемым.

pub mod shared_channel {
use std::sync::{Arc, Mutex}; 
use std::sync::mpsc::{channel, Sender, Receiver};
    /// Потокобезопасная обертка вокруг `Receiver`.
    #[derive(Clone)]
    pub struct SharedReceiver<T>(Arc<Mutex<Receiver<T>>>);
    impl<T> Iterator for SharedReceiver<T> {
        type Item = T;
        /// Получает следующий объект от обернутого получателя.
        fn next(&mut self) -> Option<T> {
            let guard = self.0.lock().unwrap();
            guard.recv().ok()
        }
    }
    /// Создает новый канал, получатель которого может разделяться между потоками.
    /// Возвращает отправителя и получателя, как стандартная функция
    /// `channel()`, и иногда может быть подставлена вместо нее.
    pub fn shared_channel<T>() -> (Sender<T>, SharedReceiver<T>) {
        let (sender, receiver) = channel();
        (sender, SharedReceiver(Arc::new(Mutex::new(receiver))))
    } 
}

crate crossbeam может гарантировать что поток не переживет ссылку на данные т.е. можно передавать ссылку на изменяемые данные

crossbeam дает гарантию что потоки завершатся когда мы выйдем из ф-ции scope

crate crossbeam может гарантировать что поток не переживет ссылку на данные т.е. можно передавать ссылку на изменяемые данные.


extern crate crossbeam;
fn main(){
    let mut xs:[i32;4] =[0,0,0,0];
    crossbeam::scope(|scope_|{
        for i in &mut xs{
            scope_.spawn(move || {
                *i+=1;
            });
        }
    });
    println!("{:?}",xs);
}

fn main(){
    let mut v:Vec = vec![0,0,0,0];
    /*thread::spawn(/*move*/ || { // в таком виде только через перемещение вектора но после потока вектора уже не будет
            v.push(1);
            println!("{:?}",v);
      }).join();*/
     
    crossbeam::scope(|scope_|{
        for i in v.iter_mut(){
          scope_.spawn(move || {
               *i=1; 
          }).join();
        }
    });
    println!("{:?}",v);
}

fn main(){
    let mut v:Vec = vec![];
    crossbeam::scope(|scope_|{
          let mut v2 = &mut v;
          scope_.spawn(move || {
               v2.push(1);
          }).join();
        
    });
    assert_eq!(1,v[0]); // println!("{:?}",v);
}

fn main(){
let xs = Mutex::new([0,0,0,0]);

crossbeam::scope(|scope_|{
    for _ in 0..10{ // запускаем 10 потоков
        scope_.spawn(||{
            let mut guard = xs.lock().unwrap();// блокируем данные
            // теперь с данными безопасно обращаться так как мы держим блокировку Mutex
            let xs:&mut[i32;4] = &mut guard;// разименовывание Mutex
            for i in xs{
                *i+=1;
            }
        });
    }
});
println!("{:?}",*xs.lock().unwrap());// [10,10,10,10]
}

Каналы Crossbeam являются альтернативой std::sync::mpsc каналам, предоставляемым стандартной библиотекой. Это улучшение с точки зрения производительности, эргономики и возможностей.

Каналы создаются с использованием двух функций:

  • bounded создает канал ограниченной емкости, т. е. существует ограничение на количество сообщений, которое он может удерживать.
  • unbounded создает канал неограниченной емкости, т. е. он может содержать произвольное количество сообщений в любое время.

Канал не более 5 сообщений


fn main(){
   let (tx, rx): (crossbeam_channel::Sender, crossbeam_channel::Receiver) = crossbeam_channel::bounded( 5 );
}

extern crate crossbeam;
use crossbeam_channel as channel;
fn main(){
    let (s, r) = channel::unbounded();
    crossbeam::scope(|scope| {
        // Spawn a thread that sends one message and then receives one.
        scope.spawn(|| {
            s.send(1);
            r.recv().unwrap();
        });
        // Spawn another thread that does the same thing.
        scope.spawn(|| {
            s.send(2);
            r.recv().unwrap();
        });
    });
}

select! выдаст первое пришедшее сообщение или случайное в случае одновременного прибытия

select! выдаст первое пришедшее сообщение или случайное в случае одновременного прибытия


#[macro_use]
extern crate crossbeam_channel;
fn main(){
    let (tx, rx): (crossbeam_channel::Sender<&str>, crossbeam_channel::Receiver<&str>) = crossbeam_channel::unbounded();
    let (tx2, rx2): (crossbeam_channel::Sender<&str>, crossbeam_channel::Receiver<&str>) = crossbeam_channel::unbounded();

     thread::spawn(move || {
            tx.send("Hello");
     });
     thread::spawn(move || {
           tx2.send("Hi");
    });

     select! {
          recv(rx, msg) => println!("Message: {:?}",msg.unwrap()),
          recv(rx2, msg) => println!("Message: {:?}",msg.unwrap()),
          default => println!("the channel is full"),
    }
}

crossbeam_channel::tick - Создает приемник, который периодически отправляет по одному сообщению.


fn main(){
    let ms:fn(u64)->Duration = |ms| Duration::from_millis(ms);
    let r = crossbeam_channel::tick( ms(100) );

    println!("{:?}",r.recv().unwrap());// Instant { tv_sec: 15753, tv_nsec: 440847088 }
    std::thread::sleep(Duration::new(2, 0));
    println!("{:?}",r.recv().unwrap());//Instant { tv_sec: 15753, tv_nsec: 540949952 }
}

Ждем 100 миллисекунд ответ или обрыв

use std::time::Duration;
fn test_crossbeam(){
    let (tx, rx): (crossbeam_channel::Sender<&str>, crossbeam_channel::Receiver<&str>) = crossbeam_channel::unbounded();
    let (tx2, rx2): (crossbeam_channel::Sender<&str>, crossbeam_channel::Receiver<&str>) = crossbeam_channel::unbounded();

     thread::spawn(move || {
         std::thread::sleep(Duration::new(2, 0));
         tx.send("Hello");
     });
    thread::spawn(move || {
        std::thread::sleep(Duration::new(2, 0));
        tx2.send("Hi");
    });

   /* select! {
       recv(rx, msg) => println!("Message: {:?}",msg.unwrap()),
       recv(rx2, msg) => println!("Message: {:?}",msg.unwrap()),
       default => println!("the channel is full"),
    }*/
    
    let timeout = Duration::from_millis(100);

    select! {
        recv(rx, msg) => match msg {
            Some(msg) => println!("received {:?}", msg),
            None => println!("the channel is closed"),
        }
         recv(rx2, msg) => match msg {
            Some(msg) => println!("received {:?}", msg),
            None => println!("the channel is closed"),
        }
    recv(crossbeam_channel::after(timeout)) => println!("timed out; the channel is still empty"),
    }
}

Жизненный цикл состоит из генерации квадратных матриц одним Producer и вычисление этих матриц двумя Consumer.


extern crate crossbeam;
extern crate rand;
extern crate rayon;
#[macro_use]
extern crate crossbeam_channel;

use rand::thread_rng;
use rand::Rng;
use rayon::prelude::*;
use std::collections::HashMap;
use std::io::Write;
use std::sync::mpsc;
use std::thread;

/// # Parallel matrix counting.
///
/// The life cycle consists of the generation of square matrices by a single `Producer`
/// and the calculation of these matrices by two `Consumer`.
///
/// ## Examples
///
/// Basic usage:
///
/// ` ` `rust
///
///    use threads_synchronization_and_parallelism::*;
///
///    let (tx, rx): (
///        crossbeam_channel::Sender>,
///        crossbeam_channel::Receiver>,
///    ) = crossbeam_channel::unbounded();
///
///    let rx_2 = rx.clone();
///
///    crossbeam::scope(|scope_| {
///        scope_.spawn(move || loop {
///            tx.send(Producer::generate_matrix());
///        });
///
///        scope_.spawn(move || {
///            for _i in rx {
///                Consumer::sum_matrix(_i);
///            }
///        });
///
///        scope_.spawn(move || {
///            for _i in rx_2 {
///                Consumer::sum_matrix(_i);
///            }
///        });
///
///    });
/// ` ` `
mod threads_synchronization_and_parallelism {
    use super::*;

    /// `Producer` continuously generates square matrixes of random `u8` elements and size `4096`.
    pub struct Producer;
    /// Implement Producer.
    impl Producer {
        /// Implement generates square matrixes.
        pub fn generate_matrix() -> HashMap<(i32, i32), u8> {
            let mut matrix: HashMap<(i32, i32), u8> = HashMap::with_capacity(4096);
            let mut rng = thread_rng();
            for x in (1..65) {
                for y in (1..65) {
                    matrix.insert((x, y), rng.gen::());
                }
            }
            matrix
        }
    }

    /// `Consumer` takes generated matrix, counts sum of all its elements and prints the sum to STDOUT.
    #[derive(Debug)]
    pub struct Consumer;
    /// Implement Consumer.
    impl Consumer {
        /// Implement the calculation of the sum of a square matrix.
        /// The matrix is counted in parallel.
        pub fn sum_matrix(matrix: HashMap<(i32, i32), u8>) {
            let sum: u32 = matrix.par_iter().map(|(&k, &val)| val as u32).sum();
            writeln!(std::io::stdout(), "Matrix sum:{}", sum);
        }
    }
}
fn main() {
    use threads_synchronization_and_parallelism::*;
    let (tx, rx): (
        crossbeam_channel::Sender>,
        crossbeam_channel::Receiver>,
    ) = crossbeam_channel::unbounded();

    let rx_2 = rx.clone();

    crossbeam::scope(|scope_| {
        scope_.spawn(move || loop {
            tx.send(Producer::generate_matrix());
        });
        scope_.spawn(move || {
            for _i in rx {
                Consumer::sum_matrix(_i);
            }
        });
        scope_.spawn(move || {
            for _i in rx_2 {
                Consumer::sum_matrix(_i);
            }
        });
    });
}

Жизненный цикл состоит из генерации квадратных матриц одним Producer и вычисление этих матриц двумя Consumer.


use std::sync::mpsc;
use std::thread;

use rand::{thread_rng, Rng};
use rayon::prelude::*;

struct Producer(mpsc::Receiver<[[u8; 64]; 64]>);

impl Producer {
    fn new() -> Self {
        let (tx, rx) = mpsc::channel();

        thread::spawn(move || loop {
            let mut matrix = [[0_u8; 64]; 64];
            for row in matrix.iter_mut() {
                thread_rng().fill(row);
            }

            tx.send(matrix).unwrap();
        });

        Producer(rx)
    }

    fn recv(&self) -> [[u8; 64]; 64] {
        self.0.recv().unwrap()
    }
}

fn main() {
    let producer = Producer::new();

    loop {
        let matrix = producer.recv();

        crossbeam::scope(|scope| {
            let handles = (0..)
                .map(|_| {
                    scope.spawn(|_| {
                        let sum: u64 = matrix
                            .into_par_iter()
                            .map(|x: &[u8; 64]| x.iter().fold(0_u64, |a, b| a + u64::from(*b)))
                            .sum();
                        sum
                    })
                })
                .take(2)
                .collect::>();

            let results = handles
                .into_iter()
                .map(|handle| handle.join().unwrap())
                .collect::>();

            assert_eq!(results[0], results[1]);
            println!("{}", results[0]);
        })
        .unwrap();
    }
}

use tokio::sync::mpsc::{self, Receiver};

async fn ping_handler(mut input: Receiver<()>) {
    let mut count: usize = 0;

    while let Some(_) = input.recv().await {
        count += 1;
        println!("Received {count} pings so far.");
    }

    println!("ping_handler complete");
}

#[tokio::main]
async fn main() {
    let (sender, receiver) = mpsc::channel(32);
    let ping_handler_task = tokio::spawn(ping_handler(receiver));
    for i in 0..10 {
        sender.send(()).await.expect("Failed to send ping.");
        println!("Sent {} pings so far.", i + 1);
    }

    drop(sender);
    ping_handler_task.await.expect("Something went wrong in ping handler task.");
}

crate flume

Crate flume — это библиотека для асинхронных и синхронных каналов, обеспечивающая удобный и мощный интерфейс для передачи сообщений между потоками или задачами. Flume предоставляет альтернативу стандартным каналам Rust (std::sync::mpsc) и каналам из Tokio, с акцентом на производительность и удобство использования.

Предоставляет богатую функциональность, включая таймауты, выборку (select) сообщений из нескольких каналов и поддержку AsyncRead/AsyncWrite.


use flume::{unbounded, Selector};

fn main() {
    let (tx1, rx1) = unbounded();
    let (tx2, rx2) = unbounded();

    let selector = Selector::new()
        .recv(&rx1, |msg| println!("Received from channel 1: {}", msg))
        .recv(&rx2, |msg| println!("Received from channel 2: {}", msg));

    std::thread::spawn(move || {
        tx1.send("Message 1").unwrap();
        tx2.send("Message 2").unwrap();
    });

    selector.wait(); // Ожидаем сообщение из любого канала
}