Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Тема
Описание
Доп.

Гонка данных

атомарные инструкции

nomicon/races

Гонка данных (data race) возникает при несинхронизированном обращении к данным из нескольких потоков, при условии, что как минимум одно из этих обращений является записью. Гонка данных (data race) возникает, когда несколько потоков одновременно обращаются к одной и той же памяти, и хотя бы один из них выполняет запись, что приводит к неопределенному поведению.

Фактически, утверждение о предотвращении всех гонок данных — это такой способ сказать, что вы не сможете случайно "поделиться состоянием" между потоками.

Синхронизация здесь включает в себя такие низкоуровневые вещи, как атомарные инструкции. По сути, это способ сказать, что вы не можете случайно «разделить состояние» между потоками; любой (изменяющийся) доступ к состоянию должен быть обеспечен некоторой формой синхронизации.

Rust предотвращает гонки данных на уровне системы типов.

  • Использование unsafe может привести к гонке данных, если вы неправильно управляете памятью.
  • Для безопасного параллельного программирования используйте предоставленные Rust инструменты: Mutex, RwLock, Arc, и атомарные типы.
use std::thread;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

let data = vec![1, 2, 3, 4];

let idx = Arc::new(AtomicUsize::new(0));
let other_idx = idx.clone();

// `move` захватывает other_idx по значению, перемещая его в этот поток
thread::spawn(move || {
    // Мутировать idx можно, поскольку это значение является атомарным и не может вызвать гонку данных.
    other_idx.fetch_add(10, Ordering::SeqCst);
});

// println!("{}", data[idx.load(Ordering::SeqCst)]);
 или Мы можем вызвать гонку данных, если вместо  заранее выполним проверку привязки, а затем небезопасно получим доступ к данным с непроверенным значением:

if idx.load(Ordering::SeqCst) < data.len() {
    unsafe {
       // Неправильная загрузка idx после проверки границ.
        // idx могло измениться. Это состояние гонки, и опасно
        // потому что мы решили сделать `get_unchecked`, что unsafe `небезопасно`.
        println!("{}", data.get_unchecked(idx.load(Ordering::SeqCst)));
    }
}

Гонка данных

Почему здесь гонка данных:

  • Два потока одновременно обращаются к data_ptr.
  • Оба потока изменяют значение, что создает состояние гонки.
  • Результат может быть непредсказуемым, например, вместо увеличения на 2, итоговое значение может быть увеличено только на 1

use std::thread;
use std::sync::Arc;
fn main() {
    let data = Arc::new(42); // Оборачиваем в Arc для разделяемого доступа
    let data_ptr = Arc::into_raw(data) as *mut i32; // Преобразуем в указатель

    // Создаем два потока, которые обращаются к одной и той же памяти
    let handle1 = thread::spawn(move || {
        unsafe {
            *data_ptr += 1; // Поток 1 изменяет значение
        }
    });

    let handle2 = thread::spawn(move || {
        unsafe {
            *data_ptr += 1; // Поток 2 изменяет значение
        }
    });

    handle1.join().unwrap();
    handle2.join().unwrap();

    // Освобождаем память
    unsafe {
        drop(Arc::from_raw(data_ptr)); // Преобразуем обратно в Arc, чтобы вызвать drop
    }
    println!("Программа завершена.");
}

Rust не предотвращает общие состояния гонки


use std::sync::{Arc, Mutex, MutexGuard, Condvar}; 
use std::time::{Instant, Duration};
use std::thread;
use std::sync::atomic::{AtomicUsize, Ordering};
fn main(){
    let data = vec![1, 2, 3, 4];
  
    let idx = Arc::new(AtomicUsize::new(0));
    let other_idx = idx.clone();
    
    thread::spawn(move || {
        other_idx.fetch_add(10, Ordering::SeqCst);
    });
    
    thread::sleep(std::time::Duration::from_millis(100));
    println!("{}", data[idx.load(Ordering::SeqCst)]);  // panic вышли за пределы мыссива, так как idx мог успеть увеличится сверх диапазона массива
}

Потоки ОС не требуют каких-либо изменений в модели программирования, что позволяет очень легко выразить параллелизм. Однако синхронизация между потоками может быть сложной, и велики издержки на производительность. Пулы потоков могут снизить некоторые из этих затрат, но этого недостаточно для поддержки массивных рабочих нагрузок с привязкой к вводу-выводу.

Программирование, управляемое событиями, в сочетании с callback вызовами может быть очень производительным, но, как правило, приводит к многословному, «нелинейному» потоку управления. За потоком данных и распространением ошибок часто трудно проследить.

Сопрограммы, как и потоки, не требуют изменений модели программирования, что упрощает их использование. Как и async, они также могут поддерживать большое количество задач. Однако они абстрагируются от низкоуровневых деталей, которые важны для системного программирования и разработчиков пользовательской среды выполнения.

Модель акторов делит все параллельные вычисления на блоки, называемые акторами, которые общаются посредством передачи ошибочных сообщений, как и в распределенных системах. Модель актора может быть эффективно реализована, но она оставляет без ответа многие практические вопросы, такие как управление потоком и логика повторных попыток.

Асинхронное программирование - Он позволяет запускать большое количество одновременных задач в небольшом количестве потоков ОС. Асинхронное программирование позволяет реализовать высокопроизводительные реализации, подходящие для языков низкого уровня, таких как Rust, обеспечивая при этом большинство эргономических преимуществ потоков и сопрограмм.

Асинхронность - переключение между задачами без их блокировки,задачи могут выполняться не последовательно, а с переключением между ними, может работать в одном потоке. Цель не блокировать основной пото процесс.

Многопоточность - позволяет разбить процесс на потоки выполняемые конкурентно (concurrently), но не обязательно параллельно.

Параллелизм - позволяет разбить процесс на потоки на разных ядрах или процессорах.

Синхронизация потоков — это обширная тема, но обычно она осуществляется с помощью атомарных операций или общего состояния с монопольным доступом или взаимодействия потоков.

В Rust есть встроенная поддержка всех из них.

Атомарные операции представлены модулем стандартной библиотеки Rust std::sync::atomic (и дополнительно crate atomic).

Эксклюзивный доступ можно контролировать с помощью примитивов std::sync(Mutex, RwLock, Condvar) модуля стандартной библиотеки Rust

Взаимодействие потоков обычно представлено через каналы и реализовано в std::sync::mpsc(channel) модуле стандартной библиотеки Rust

Экосистема Rust:

  • crate crossbeam предоставляющий более многофункциональные и оптимизированные примитивы параллелизма и синхронизации.
  • Наиболее примечательным является crate crossbeam-channel улучшение реализации каналов std

Concurrency - это дело с множеством вещей одновременно. Parallelism заключается в том, чтобы делать много вещей сразу

Concurrency(логически) - это когда две или более задачи могут запускаться, выполняться и завершаться в перекрывающиеся периоды времени . Это не обязательно означает, что они оба будут работать одновременно . Например, многозадачность на одноядерной машине.В одноядерном процессоре вы можете получить Concurrency, но НЕ parallelism. Concurrency связан с управлением доступом к общему состоянию из разных потоков.

Parallelism(физически) - это когда задачи буквально выполняются одновременно, например, на многоядерном процессоре. При Parallelism'e задачи выполняются параллельно на разных процессорах не завися от общих данных

Конкурентность и параллелизм — это не одно и то же. Если вы чередуете две задачи, то вы работаете над обеими задачами одновременно, а не параллельно. Чтобы его можно было считать параллельным, вам потребуются два человека, по одному на каждую задачу.

Concurrency + Parallelism это когда два повара будут не последовательно выполнять свои задачи. Пример с шахматным турниром на котором профессионал играет с множеством любителей делая по одному ходу с каждым игроком по кругу это Concurrency но если пригласить еще одного профессионала и разделить любителей между ними и профессионалы будут играть с каждым любителем полностью игру и переходить к следующему т.е. последовательно это Parallelism, а когда профессионалы будут ходить по одному шагу с каждым по кругу в своей группе это Concurrency + Parallelism

Поток - это последовательность инструкций внутри процесса и ведет себя как «процесс внутри процесса». Он отличается от процесса, потому что у него нет собственного блока управления процессом (сбор информации о процессах). Обычно в процессе создается несколько потоков. Потоки выполняются в процессе и процессы выполняются в ядре операционной системы.

Rust Parallelism

crate simd

Экосистема Rust поддерживает Parallelism в виде crate rayon и dpc-pariter, которые позволяют легко преобразовать последовательный итератор для выполнения в параллельных потоках.

Другой способ выполнить параллельную обработку данных без использования потоков — использование инструкций SIMD. Если алгоритм достаточно распараллеливаем, применение инструкций SIMD может значительно повысить производительность. Экосистема Rust обеспечивает базовую поддержку инструкций SIMD в виде crate packed_simd

Подсчитайте частоту букв в текстах с помощью параллельных вычислений.

Параллелизм - это параллельное выполнение вещей, которые также могут выполняться последовательно. Типичный пример - подсчет количества букв. Создайте функцию, которая возвращает общую частоту каждой буквы в списке текстов и использует параллелизм.

Просто вернуть значение из потока Joinhandle


use std::collections::HashMap;
use std::thread;
pub fn frequency(input: &[&str], worker_count: usize) -> HashMap {
    let mut result: HashMap = HashMap::new();
    let chunks = input.chunks((input.len() / worker_count).max(1));
    let mut handles = Vec::new();

    for chunk in chunks {
        let string = chunk.join("");
        // return a HashMap from each thread, the JoinHandle wraps this hashmap
        let handle = thread::spawn(move || {
            let mut map: HashMap = HashMap::new();
            for c in string.chars().filter(|c| c.is_alphabetic()) {
                *map.entry(c.to_ascii_lowercase()).or_default() += 1;
            }
            map
        });
        handles.push(handle);
    }
    for handle in handles {
        let map = handle.join().unwrap();
        for (key, value) in map {
            *result.entry(key).or_default() += value;
        }
    }
    result
}
fn main() {
   let content:&str = "abcafdabc";
   let v: Vec<&str>   = content.split("").filter(|s|s.len()>0).collect();// ["a","b"]
   println!("{:#?}", frequency(&v[..],3));
}

Подсчитайте частоту букв в текстах с помощью параллельных вычислений.

Параллелизм - это параллельное выполнение вещей, которые также могут выполняться последовательно. Типичный пример - подсчет количества букв. Создайте функцию, которая возвращает общую частоту каждой буквы в списке текстов и использует параллелизм.

Через каналы mpsc::channel

use std::collections::HashMap;
use std::mem;
use std::sync::mpsc;
use std::thread;
pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {
    let mut result: HashMap<char, usize> = HashMap::new();
    let chunks = input.chunks((input.len() / worker_count).max(1));
    let (sender, receiver) = mpsc::channel();
    for chunk in chunks {
        let sender = sender.clone();
        let string = chunk.join("");
        thread::spawn(move || {
            // Solve each chunk and send the resulting HashMap down the channel
            let mut map: HashMap<char, usize> = HashMap::new();
            for c in string.chars().filter(|c| c.is_alphabetic()) {
                *map.entry(c.to_ascii_lowercase()).or_default() += 1;
            }
            sender.send(map).unwrap();
        });
    }
     // Если не удалить sender то receiver будет ждать, пока все senders не будут отброшены
    // исходный receiver никогда не отбрасывается, так что receiver ждет вечно
    // удалить оригинал sender, иначе канал останется открытым, заставляя получателя бесконечно ждать
    mem::drop(sender);
    // combine every received HashMap
    for received in receiver {
        for (key, value) in received {
            *result.entry(key).or_default() += value;
        }
    }
    result
}

Подсчитайте частоту букв в текстах с помощью параллельных вычислений.

Параллелизм - это параллельное выполнение вещей, которые также могут выполняться последовательно. Типичный пример - подсчет количества букв. Создайте функцию, которая возвращает общую частоту каждой буквы в списке текстов и использует параллелизм.

Через Mutex

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;
pub fn frequency(input: &[&str], worker_count: usize) -> HashMap<char, usize> {
    let result = Arc::new(Mutex::new(HashMap::new()));
    let chunks = input.chunks((input.len() / worker_count).max(1));
    let mut handles: Vec<_> = Vec::new();
    for chunk in chunks {
        let string = chunk.join("");
        let result = Arc::clone(&result);
        let handle = thread::spawn(move || {
            let mut map: HashMap<char, usize> = HashMap::new();
            for c in string.chars().filter(|c| c.is_alphabetic()) {
                *map.entry(c.to_ascii_lowercase()).or_default() += 1;
            }
            let mut result = result.lock().unwrap();
            for (key, value) in map {
                *result.entry(key).or_default() += value;
            }
        });
        handles.push(handle);
    }
    // wait for each thread to finish
    for handle in handles {
        handle.join().unwrap()
    }
    // получить HashMap из Arc<Mutex<HashMap>>
    Arc::try_unwrap(result).unwrap().into_inner().unwrap()
}

rayon::ThreadPoolBuilder

Пример устойчивой логики кеширования к недетерминированности

Пример пула потоков

rust_five_easy_pieces

rust-concurrency-five-easy-pieces

Более эффективное решение для проверки кеша - ввести еще одну часть общих данных, и на этот раз соединить ее с условной переменной для сигнализации, что мы и сделаем в пятом и последнем примере.

Дополнением в этом раунде является Condvar и еще одна часть общих данных, «состояние кеша», которое сообщает нам для каждого ключа, готов ли кеш для чтения. Если кеш не готов для данного ключа, рабочий поток блокирует condvar до тех пор, пока он не будет активирован. Это делает нашу логику кеширования устойчивой к недетерминированности, присущей параллелизму рабочих потоков: рабочий либо находит кеш «готовым» для данного ключа, либо нет. Если нет, он ждет, пока он станет «готовым».

Это означает, что если работник «выполняет работу», связанную с данным ключом, другие рабочие, выполняющие несвязанную работу (для других ключей), могут просто выполнять свою работу параллельно, в то время как работник, выполняющий связанную работу, обнаружит, что соответствующее состояние кеша будет « не готов », и просто дождитесь, пока он станет« готовым ». Мы избегаем потери параллелизма, которая может быть вызвана включением критического раздела в кэше «выполнение работы», сохраняя при этом детерминированный результат в случае, если работник уже «выполняет связанную работу».

#[test]
fn fifth() {
    enum WorkMsg {
        Work(u8),
        Exit,
    }

    #[derive(Debug, Eq, PartialEq)]
    enum WorkPerformed {
        FromCache,
        New,
    }

    #[derive(Debug, Eq, PartialEq)]
    enum CacheState {
        Ready,
        WorkInProgress,
    }

    enum ResultMsg {
        Result(u8, WorkPerformed),
        Exited,
    }

    #[derive(Eq, Hash, PartialEq)]
    struct CacheKey(u8);

    let (work_sender, work_receiver) = unbounded();
    let (result_sender, result_receiver) = unbounded();
    let (pool_result_sender, pool_result_receiver) = unbounded();
    let mut ongoing_work = 0;
    let mut exiting = false;
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(2)
        .build()
        .unwrap();
    let cache: Arc<Mutex<HashMap<CacheKey, u8>>> = Arc::new(Mutex::new(HashMap::new()));

    // A new `cache_state` shared piece of data, indicating whether for a given key,
    // the cache is ready to be read from.
    let cache_state: Arc<Mutex<HashMap<CacheKey, Arc<(Mutex<CacheState>, Condvar)>>>> =
        Arc::new(Mutex::new(HashMap::new()));

    let _ = thread::spawn(move || loop {
        select! {
            recv(work_receiver) -> msg => {
                match msg {
                    Ok(WorkMsg::Work(num)) => {
                        let result_sender = result_sender.clone();
                        let pool_result_sender = pool_result_sender.clone();
                        let cache = cache.clone();
                        let cache_state = cache_state.clone();
                        ongoing_work += 1;
                        pool.spawn(move || {
                            let num = {
                                let (lock, cvar) = {
                                    // Start of critical section on `cache_state`.
                                    let mut state_map = cache_state.lock().unwrap();
                                    &*state_map
                                        .entry(CacheKey(num.clone()))
                                        .or_insert_with(|| {
                                            Arc::new((
                                                Mutex::new(CacheState::Ready),
                                                Condvar::new(),
                                            ))
                                        })
                                        .clone()
                                    // End of critical section on `cache_state`.
                                };

                                // Start of critical section on `state`.
                                let mut state = lock.lock().unwrap();

                                // Note: the `while` loop is necessary
                                // for the logic to be robust to spurious wake-ups.
                                while let CacheState::WorkInProgress = *state {
                                    // Block until the state is `CacheState::Ready`.
                                    //
                                    // Note: this will atomically release the lock,
                                    // and reacquire it on wake-up.
                                    let current_state = cvar
                                        .wait(state)
                                        .unwrap();
                                    state = current_state;
                                }

                                // Here, since we're out of the loop,
                                // we can be certain that the state is "ready".
                                assert_eq!(*state, CacheState::Ready);

                                let (num, result) = {
                                    // Start of critical section on the cache.
                                    let cache = cache.lock().unwrap();
                                    let key = CacheKey(num);
                                    let result = match cache.get(&key) {
                                        Some(result) => Some(result.clone()),
                                        None => None,
                                    };
                                    (key.0, result)
                                    // End of critical section on the cache.
                                };

                                if let Some(result) = result {
                                    // We're getting a result from the cache,
                                    // send it back,
                                    // along with a flag indicating we got it from the cache.
                                    let _ = result_sender.send(ResultMsg::Result(result, WorkPerformed::FromCache));
                                    let _ = pool_result_sender.send(());

                                    // Don't forget to notify the waiting thread,
                                    // if any, that the state is ready.
                                    cvar.notify_one();
                                    return;
                                } else {
                                    // If we didn't find a result in the cache,
                                    // switch the state to in-progress.
                                    *state = CacheState::WorkInProgress;
                                    num
                                }
                                // End of critical section on `state`.
                            };

                            // Do some "expensive work", outside of any critical section.

                            let _ = result_sender.send(ResultMsg::Result(num.clone(), WorkPerformed::New));

                            {
                                // Start of critical section on the cache.
                                // Insert the result of the work into the cache.
                                let mut cache = cache.lock().unwrap();
                                let key = CacheKey(num.clone());
                                cache.insert(key, num);
                                // End of critical section on the cache.
                            }

                            let (lock, cvar) = {
                                let mut state_map = cache_state.lock().unwrap();
                                &*state_map
                                    .get_mut(&CacheKey(num))
                                    .expect("Entry in cache state to have been previously inserted")
                                    .clone()
                            };
                            // Re-enter the critical section on `state`.
                            let mut state = lock.lock().unwrap();

                            // Here, since we've set it earlier,
                            // and any other worker would wait
                            // on the state to switch back to ready,
                            // we can be certain the state is "in-progress".
                            assert_eq!(*state, CacheState::WorkInProgress);

                            // Switch the state to ready.
                            *state = CacheState::Ready;

                            // Notify the waiting thread, if any, that the state has changed.
                            // This can be done while still inside the critical section.
                            cvar.notify_one();

                            let _ = pool_result_sender.send(());
                        });
                    },
                    Ok(WorkMsg::Exit) => {
                        exiting = true;
                        if ongoing_work == 0 {
                            let _ = result_sender.send(ResultMsg::Exited);
                            break;
                        }
                    },
                    _ => panic!("Error receiving a WorkMsg."),
                }
            },
            recv(pool_result_receiver) -> _ => {
                if ongoing_work == 0 {
                    panic!("Received an unexpected pool result.");
                }
                ongoing_work -=1;
                if ongoing_work == 0 && exiting {
                    let _ = result_sender.send(ResultMsg::Exited);
                    break;
                }
            },
        }
    });

    let _ = work_sender.send(WorkMsg::Work(0));
    let _ = work_sender.send(WorkMsg::Work(1));
    let _ = work_sender.send(WorkMsg::Work(1));
    let _ = work_sender.send(WorkMsg::Exit);

    let mut counter = 0;

    // A new counter for work on 1.
    let mut work_one_counter = 0;

    loop {
        match result_receiver.recv() {
            Ok(ResultMsg::Result(num, cached)) => {
                counter += 1;

                if num == 1 {
                    work_one_counter += 1;
                }

                // Now we can assert that by the time
                // the second result for 1 has been received,
                // it came from the cache.
                if num == 1 && work_one_counter == 2 {
                    assert_eq!(cached, WorkPerformed::FromCache);
                }
            }
            Ok(ResultMsg::Exited) => {
                assert_eq!(3, counter);
                break;
            }
            _ => panic!("Error receiving a ResultMsg."),
        }
    }
}
//! From https://doc.rust-lang.org/book/ch20-03-graceful-shutdown-and-cleanup.html
#![allow(unused_variables)]
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

enum Message {
    NewJob(Job),
    Terminate,
}
pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Message>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
        ThreadPool { workers, sender }
    }
    pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static {
        let job = Box::new(f);
        self.sender.send(Message::NewJob(job)).unwrap();
    }
}
impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Sending terminate message to all workers.");
        for _ in &mut self.workers {
            self.sender.send(Message::Terminate).unwrap();
        }
        println!("Shutting down all workers.");
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}
struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();
            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got a job; executing.", id);
                    job();
                }
                Message::Terminate => {
                    println!("Worker {} was told to terminate.", id);
                    break;
                }
            }
        });
        Worker { id, thread: Some(thread) }
    }
}

crate threadpool используется для создания пула потоков (thread pool) — механизма управления группой потоков для выполнения задач. Это позволяет эффективно управлять многопоточностью, избегая затрат на создание и завершение потоков для каждой новой задачи. Вместо этого задачи ставятся в очередь, а выделенные потоки из пула обрабатывают их по мере освобождения.

Основные преимущества threadpool

Повышение производительности: Потоки создаются один раз, и переиспользуются для выполнения задач, что экономит ресурсы.

Управление количеством потоков: Пул ограничивает количество одновременно работающих потоков, предотвращая избыточное использование ресурсов.

Простота использования: Удобный API для отправки задач в пул.

Снижение накладных расходов: Нет необходимости создавать новые потоки при запуске каждой задачи.

Rust имеет встроенную поддержку собственных потоков в виде модуля std::thread стандартной библиотеки.

crate crossbeam также обеспечивает реализацию потоков с ограниченной областью действия, которые позволяют заимствовать значения из стека. Они также доступны в виде std::thread::scope

Выполняемая программа Rust состоит из набора собственных потоков ОС, каждый из которых имеет собственный стек и локальное состояние. Фатальные логические ошибки в Rust вызывают панику потока , во время которой поток раскручивает стек, запуская деструкторы и освобождая принадлежащие ему ресурсы. Порожденный поток может пережить или не пережить порождающий поток, если только порождающий поток не является основным потоком.

Rust имеет встроенную поддержку собственных потоков в виде std::thread модуля стандартной библиотеки.

Традиционно потоки используются для решения проблем, связанных с процессором(когда мало ядер), поскольку они позволяют выполнять задачи параллельно. Однако на практике потоки также часто используются для решения проблем, связанных с вводом-выводом, особенно когда асинхронный ввод-вывод не поддерживается должным образом (что верно для библиотеки Rust std на данный момент).

Процессы: Процесс — это программа, которая выполняется. У него есть свой стек памяти, регистры для переменных и код.

Потоки: Поток — это легковесный процесс, который управляется независимо планировщиком. Однако он делится данными с другими потоками и main программой.

Связь между потоками может осуществляться через каналы, типы передачи сообщений Rust, а также другие формы синхронизации потоков и структуры данных с разделяемой памятью.

В частности, типы, которые гарантируют, что они являются потокобезопасными, легко разделяются между потоками, использующими контейнер с атомно-ориентированным отсчетом Arc

std::thread::Thread

id() - id потока

name() - имя потока

unpark() - разблокирует поток


std::thread::Builder

name() - Задает имя потоку для целей идентификации.

new() - Создает базовую конфигурацию

spawn() - Создает новый поток

spawn_scoped() - Создает новый поток с заданной областью действия, авто-присоединение к порожденному

spawn_unchecked()

stack_size()


std::thread::JoinHandle

is_finished()

join() - Ожидает завершения связанного потока.

thread()->&Thread - Извлекает дескриптор базового потока


use std::thread;
use std::thread::JoinHandle;
fn main() {
    let handler:JoinHandle = thread::Builder::new()
        .name("thread1".into())
        .stack_size(2000000) // Размер стека по умолчанию 2 MB
        .spawn(move || {
            let thread:thread::Thread = thread::current();
            assert_eq!(thread.name(), Some("thread1"));
            println!("ID:{:?}",thread.id());
            1_i32
    }).unwrap();
   let res:i32 = handler.join().unwrap();
}

Function std::thread::spawn - Создает новый поток


use std::thread;
use std::thread::JoinHandle;
fn main() {
    let handler:JoinHandle<()> = thread::spawn(move || {
        println!("ID:{:?}",thread::current().id());
    });
}

Как средство изоляции panic!


fn main(){
    let result = std::panic::catch_unwind(||
      if *guard == 7{
        panic!("Aaaaa");
      }
    ).map_err(|e|{
        println!("PANICKED thread:{:?}",thread::current().name())
    }); 
}
  • std::thread::available_parallelism() - Возвращает оценку степени параллелизма по умолчанию
  • std::thread::current() - Возвращает дескриптор потока, который вызывает его.
  • std::thread::panicking() - Определяет, освобождается ли текущий поток из-за паники.
  • std::thread::park() - Блокирует, если или пока токен текущего потока не будет доступен.
  • std::thread::park_timeout() - Блокирует, если или до тех пор, пока токен текущего потока не будет доступен или не будет достигнута указанная продолжительность (может просыпаться ложно).
  • std::thread::sleep() - Заставляет текущий поток спать в течение указанного времени.
  • std::thread::spawn() - Создает новый поток, возвращая для него JoinHandle.
  • std::thread::yield_now() - Совместно отказывается от времени на планировщик ОС.
  • std::thread::scope() - Создает новый поток с заданной областью действия, авто-присоединение к порожденному

use std::thread;
use std::thread::JoinHandle;
fn main() -> std::io::Result<()>{
    let handler:JoinHandle = thread::spawn(move || {
        thread::sleep(std::time::Duration::from_millis(1));
        println!("ID:{:?}",thread::current().id());
        1
    });
    let join:Result> = handler.join();
    assert_eq!(1,join.unwrap());

    let count = thread::available_parallelism()?.get();
    println!("ID:{:?}",thread::current().id());
    println!("степень параллелизма по умолчанию:{count}");// 12
    Ok(())
}

Rust запрещает передавать ссылку внутрь потока

thread::scope - поток с ограниченной областью действия

В отличие от потоков без ограниченной области действия thread::spawn, потоки с ограниченной областью действия thread::scope могут заимствовать не только 'static, поскольку область действия гарантирует, что все потоки будут объединены в конце области действия».

Отлично! Это значит, что вам не нужно использовать .join() ни то, ни другое, потому что потоки будут автоматически объединены в конце области действия.


fn main() {
    let v:Vec = vec![1,2];
    // По умолчанию лямбда захватывает значение по ссылке
    // но поток может пережить ф-цию main и мы можем обратится к не существующей памяти вектора
    thread::spawn(|| {   
            println!("{:?}",v);
     }).join();
    // решение move захватывает переменную по значению, передается ссылка на вектор а буффер остается на месте не копируется каждый раз 
    thread::spawn(move || {   
            println!("{:?}",v);
     }).join();
    // ... тут вектора уже нет
}

Обычные потоки не могут заимствовать данные из своего окружения. Однако для этого вы можете использовать поток с ограниченной областью действия thread::scope


use std::thread;
fn main() {
    let s = String::from("Hello");

    thread::scope(|scope| {
        scope.spawn(|| {
            println!("Length: {}", s.len());
        });
    });
}

Создает новый поток с заданной областью действия, авто-присоединение к порожденному потоку

std::thread::Builder::spawn_scoped()

std::thread::scope()

Без надоюности собирать все JoinHandle и вызывать им join


use std::{thread::{self,JoinHandle}};
use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; 
use std::str::FromStr;
fn main(){
    let arc_mutex:Arc> = Arc::new(Mutex::new(0));
    let mut handles:Vec> = Vec::new();
    for n_thread in 1..=10 {
        let mutex_clone:Arc> = Arc::clone(&arc_mutex);
        thread::scope(|s| { 
           let scoped_h:thread::ScopedJoinHandle<'_,()> = thread::Builder::new()
                .name(format!("{n_thread}"))
                .spawn_scoped(s,move || {
                    // try_lock не ожидет получения блокировки Mutex
                    if let Ok(ref mut guard) = mutex_clone.try_lock(){
                        **guard += 1;
                    }
            }).unwrap();
        }); 
    }
    println!("{}",*arc_mutex.lock().unwrap()); // после thread::scope данные сразу доступны для развопачивания
}

Function std::thread::panicking() Определяет, освобождается ли текущий поток из-за паники


struct SomeStruct;
impl Drop for SomeStruct {
    fn drop(&mut self) {
        if thread::panicking() {
            println!("dropped while unwinding");
        } else {
            println!("dropped while not unwinding");
        }
    }
}
fn main(){
    {
        print!("a: ");
        let a = SomeStruct;
    }
    {
        print!("b: ");
        let b = SomeStruct;
        panic!()
    }
}

park_timeout()

Блокирует, если или до тех пор, пока токен текущего потока не будет доступен или не будет достигнута указанная продолжительность

use std::thread::park_timeout;
use std::time::{Instant, Duration};
fn main(){
    let timeout = Duration::from_secs(2);
    let beginning_park = Instant::now();
    let mut timeout_remaining = timeout;
    loop {
        park_timeout(timeout_remaining);
        let elapsed = beginning_park.elapsed();
        if elapsed >= timeout {
            break;
        }
        println!("restarting park_timeout after {:?}", elapsed);
        timeout_remaining = timeout - elapsed;
    }
}

Синхронизация

После того, как последний элемент будет обработан, главному потоку может потребоваться до одной целой секунды, что приведет к ненужной задержке в конце. Чтобы решить эту проблему, мы можем использовать парковку потоков, чтобы вывести основной поток из режима сна всякий раз, когда появляется новая информация, которая может его заинтересовать.

atomics

parking


fn main() { 
    let num_done = AtomicUsize::new(0); 
    пусть main_thread = thread::current(); 

    thread::scope(|s| { 
        // Фоновый поток для обработки всех 100 элементов. 
        s.spawn(|| { 
            for i in 0..100 { 
                process_item(i); // Предполагая, что это занимает некоторое время. 
                num_done. store(i + 1, Relaxed); 
                main_thread.unpark(); // Пробуждаем основной поток. 
            } 
        }); 

        // Главный поток показывает обновления статуса. 
        цикла { 
            let n = num_done.load(Relaxed); 
            if n == 100 {break; } 
            println!("Работаем.. {n}/100 выполнено");
            thread::park_timeout(Duration::from_secs(1)); 
        } 
    }); 
    println!("Готово!"); 
}

В std::thread_local! заключается механизм создания переменных, уникальных для каждого потока. Эти переменные хранятся в области памяти, которая доступна только текущему потоку. Это означает, что каждый поток имеет собственный экземпляр переменной, и изменения этой переменной в одном потоке не влияют на её значения в других потоках.

Разрешены публичность и атрибуты для каждой статики. Локальный объект потока инициализируется при первом использовании в потоке. И, как следует из названия, каждый поток будет получать новую копию, независимую от других потоков.

std::thread_loca!

Struct std::thread::LocalKey Ключ локального хранилища потока, которому принадлежит его содержимое. Этот ключ использует самую быструю возможную реализацию, доступную ему для целевой платформы.

Он создается с помощью макроса thread_local!, а основным методом является метод with.

Метод with дает ссылку на содержащееся значение, которое не может быть послано через потоки или избежать данного закрытия. Метод with получает ссылку на значение в этом ключе TLS.

Потоковое локальное хранилище - это способ хранения данных в глобальной переменной, чтобы каждый поток в программе имел свою собственную копию. Темы не разделяют эти данные, поэтому обращения не требуют синхронизации. Локальный ключ потока владеет значением, которое он содержит, и будет уничтожать значение при выходе потока.


use std::thread;
use std::cell::RefCell;
 
thread_local! {
    // создание локальной глобальной переменной
        pub static FOO: RefCell = RefCell::new(1);
        #[allow(unused)]
        static BAR: RefCell = RefCell::new(1.0);
 }
fn main(){
    FOO.with(|f| {
        assert_eq!(*f.borrow(), 1);
        *f.borrow_mut() = 2;
    });
    //каждый thread начинается с начального значения 1 (локальный поток) так как каждый поток имеет свою собственную копию которая будет уничтожена при выходе потока
    thread::spawn(move|| {
        FOO.with(|f| {
            assert_eq!(*f.borrow(), 1);
            *f.borrow_mut() = 3;
        });
    });
// мы по прежнему имеем исходное значение 2, несмотря на дочерний thread
    FOO.with(|f| {
        assert_eq!(*f.borrow(), 2);
    });
   // test();
}
use std::thread::AccessError;
fn test() -> Result<(), AccessError>{
    FOO.try_with(|f| {
        assert_eq!(*f.borrow(), 1);
        *f.borrow_mut() = 2;
    })?;
    Ok(())
}

Для выполнения concurrency параллельных вычислений разработаны ряд математических моделей, в том числе сети Петри, исчисление процессов, модели параллельных случайных доступов к вычислениям и модели акторов.

Она обеспечивает высокую степень изоляции (Каждый актор управляет собственным состоянием. Это устраняет необходимость в использовании мьютексов или других механизмов синхронизации, что упрощает код и снижает вероятность гонок данных), масштабируемости (Акторы могут быть распределены по нескольким потокам или даже по нескольким узлам в распределённой системе. Модель легко масштабируется горизонтально.) и упрощает управление состоянием в многопоточном окружении.

Устойчивость (Resilience) - Модель акторов поддерживает механизм самовосстановления через супервизоры. Один актор может наблюдать за другими и перезапускать их в случае сбоя.Это повышает устойчивость системы к отказам и упрощает обработку ошибок.

Природная асинхронность - Отправка и получение сообщений являются асинхронными операциями. Сообщения ставятся в очередь, и актор обрабатывает их по одному. Это позволяет избежать состояния гонки, так как обработка сообщений выполняется последовательно.

Распределённость - Модель акторов подходит для распределённых систем, где акторам не нужно находиться на одном физическом устройстве.

Построение concurrency параллельных систем требует поиска надёжных методов координации выполняемых процессов, обмена данными, распределения памяти и планирования для минимизации времени отклика и увеличения пропускной способности.

Модель актера - еще одна очень распространенная и известная парадигма concurrency параллельного программирования. Он подходит для решения основных проблем Concurrency параллельной связи, поэтому многие языки приняли его в качестве основной парадигмы параллелизма (самой известной версией является Akka).

Rust имеет хорошо продуманную и реалистичную реализацию модели актора в форме crate actix. Прочтите его документацию и руководство, чтобы ознакомиться с его концепциями и использованием.

Кроме того, crate actix-web обеспечивает чрезвычайно быструю и многофункциональную клиентскую / серверную HTTP-реализацию и веб-инфраструктуру.

Поскольку actix предоставляет как синхронизирующие актеры, так и асинхронные актеры, которые делают его очень хорошим как «клей» для объединения миров синхронного / асинхронного, что является серьезной проблемой в настоящее время Rust (потому что многие библиотеки действуют только синхронно).

Основная идея актера — создать автономную задачу, выполняющую некоторую работу независимо от других частей программы. Обычно эти субъекты общаются с остальной частью программы посредством использования каналов передачи сообщений. Поскольку каждый актер работает независимо, программы, разработанные с его использованием, естественно, параллельны.

Актеры - это объекты, которые инкапсулируют состояние и поведение, они общаются исключительно путем обмена сообщениями. Актеры Actix реализованы поверх Токио . Несколько участников могут работать в одном потоке. Актеры могут работать в нескольких потоках с поддержкой Arbiter. Актеры обмениваются введенными сообщениями.

Trait actix::Actor Methods

  • create()-> Addr<Self> - Запустите нового асинхронного актера с учетом actix::Context
  • start()-> Addr<Self> - Запустите нового асинхронного актера, вернув его адрес.
  • start_default()-> Addr<Self> - Создайте и запустите нового асинхронного актера, возвращающего его адрес
  • start_in_arbiter()-> Addr<Self> - Запустите нового актера в треде арбитра.
  • started() Обработчик события вызова актер при первом опрашивании.
  • stopped() Обработчик события срабатывает после остановки актера.
  • stopping() Обработчик события при попадании в состояние Actor::Stopping

Struct actix::Addr

  • connected()->bool - жив ли актер.
  • downgrade() -> WeakAddr<A>
  • new()-> Addr<A>
  • recipient()-> Recipient<M> - Возвращает Получателя для определенного типа сообщения.
  • send() - Отправляет асинхронное сообщение и ожидает ответа. Канал связи с актером ограничен.
  • try_send() - Пытается отправить сообщение. Этот метод не работает, если почтовый ящик актера заполнен или закрыт.
  • do_send() - Отправляет сообщение безоговорочно, игнорируя любые потенциальные ошибки. Сообщение всегда находится в очереди, даже если почтовый ящик получателя заполнен. Если почтовый ящик закрыт, сообщение удаляется автоматически.

Struct actix::Context

Позволяет субъекту определять свой собственный адрес, изменять ограничения почтового ящика или останавливать его выполнение.

  • connected()-> bool - подключены ли какие-либо адреса.
  • handle()-> SpawnHandle - handle running future, AsyncContext::spawn()
  • into_future()-> ContextFut<A, Self>
  • new()->Self - Создайте контекст, не создавая его.
  • run()-> Addr<A>
  • with_receiver(rx: AddressReceiver<A>) -> Self
  • **set_mailbox_capacity() - Устанавливает емкость почтового ящика. Емкость почтового ящика по умолчанию составляет 16 сообщений.

impl<A> ActorContext for Context<A>

  • Context::stop() - В контексте выполнения актеров вы можете запретить актеру обрабатывать любые будущие сообщения почтового ящика.
  • Context::terminate() - Безоговорочно прекратить выполнение актера.
  • Context::state() -> ActorState Получить текущее состояние выполнения Актера.

impl<A> AsyncContext<A> for Context<A>

  • address()
  • cancel_future()
  • spawn()-> SpawnHandle
  • wait()
  • waiting()
  • add_message_stream()
  • add_stream()
  • notify()
  • notify_later()
  • run_interval()
  • run_later()

Struct actix::Addr

Получатель — это специализированная версия адреса, поддерживающая только один тип сообщения. Его можно использовать в случае, если сообщение необходимо отправить субъекту другого типа. Объект-получатель может быть создан из адреса с помощью Addr::recipient().

Для объектов адреса требуется тип субъекта, но если мы просто хотим отправить конкретное сообщение субъекту, который может обработать это сообщение, мы можем использовать интерфейс Recipient.

Например, получателя можно использовать для системы подписки.


use actix::prelude::*;

struct MyActor {
    val: usize,
}
impl Actor for MyActor {
    type Context = Context;
}

#[actix::main]
async fn main() {
    let addr:Addr = MyActor::create(|ctx: &mut Context| MyActor { val: 10 });
}

struct Actor1 {
    actor2_addr: Addr,
}
struct Actor2 {
    actor1_addr: Addr,
}
fn main(){
    let ctx1 = Context::::new();
    let ctx2 = Context::::new();

    let actor1 = Actor1 { actor2_addr: ctx2.address() };
    let actor2 = Actor2 { actor1_addr: ctx1.address() };
    ctx1.run(actor1);
    ctx2.run(actor2);
}

Состояние выполнения Актера меняется на stopping состояние в следующих ситуациях:

  • Context::stop вызывается самим актером
  • все обращения к актеру удаляются. т.е. ни один другой актер не ссылается на него.
  • в контексте не регистрируются объекты событий.

Актер может восстановиться из состояния stopping в состояние running, создав новый адрес или добавив объект события, а также вернув Running::Continue.

Struct actix::Arbiter

  • current()
  • handle()
  • join()
  • new()
  • spawn()
  • spawn_fn()
  • stop()
  • try_current()
  • with_tokio_rt()

Один Arbiter управляет одним потоком с одним пулом событий.

Когда арбитр порождает задачу (с помощью Arbiter::spawn, Context<Actor>::run_later или подобных конструкций), арбитр ставит задачу для выполнения в эту очередь задач.

Когда вы думаете Arbiter, вы можете подумать об «однопоточном цикле событий».

Actix в целом поддерживает параллелизм, но обычные Arbiter(не SyncArbiters) — нет. Чтобы использовать Actix одновременно, вы можете развернуть несколько Arbiters, используя Arbiter::new, ArbiterBuilder или Arbiter::start.

Struct actix::sync::SyncArbiter

  • start()
  • start_with_thread_builder()

SyncArbiter

SyncContext

Когда вы обычно запускаете Актеров, в потоке Арбитра системы работает несколько Актеров, используя его цикл событий. Однако для рабочих нагрузок, связанных с ЦП или высокопараллельных рабочих нагрузок, вам может потребоваться, чтобы актер запускал несколько экземпляров параллельно.

SyncArbite rпредоставляет ресурсы для одного актера синхронизации для работы в выделенном потоке или потоках. Обычно это используется для параллельных рабочих нагрузок, связанных с ЦП. Важно отметить, что генерируется SyncArbite rодин адрес для пула размещенных актеров синхронизации. Любое сообщение, отправленное на этот адрес, будет обрабатываться одним субъектом синхронизации из пула.

В отличие от Context, актер, использующий a, SyncContext не может быть остановлен вызовом stop или terminate: вместо этого они запускают перезапуск актера. Аналогично, возврат false из fn stopping не может предотвратить перезапуск или завершение работы Актера.

Простой пинг


use actix::prelude::*;
#[derive(Message)]
#[rtype(result = "Result")]
struct Ping(usize);
// impl Message for Ping { type Result = Result;}

struct MyActor {
    count: usize,
}
impl Actor for MyActor {
    type Context = Context;
    fn started(&mut self, ctx: &mut Context) {
        // Асинхронный актер может получить свой адрес из Context структуры. Контекст должен реализовать эту AsyncContext черту. AsyncContext::address() предоставляет адрес актера.
        let addr = ctx.address();
     }
     fn stopped(&mut self, ctx: &mut Context) {
        println!("Actor is stopped");
     }
}
impl Handler for MyActor {
    type Result = Result;

    fn handle(&mut self, msg: Ping, _: &mut Context) -> Self::Result {
        self.count += msg.0;
        Ok(self.count)
    }
}
#[actix::main]
async fn main() {
    let addr:Addr = MyActor::create(|ctx| {
        // теперь мы можем получить адрес первого актера
        let addr:Addr = ctx.address();

        // start new actor
        let my_actor = MyActor { count: 10 };
        my_actor
    });
    // или
    // start new actor
    //let addr = MyActor { count: 10 }.start();

    // send message and get future for result
    let res = addr.send(Ping(10)).await;

    // handle() returns tokio handle
    println!("RESULT: {}", res.unwrap().unwrap() == 20);

    // stop system and exit
    System::current().stop();
}

Кастомный тип возврата у сообщения


use actix::prelude::*;
use actix::dev::{MessageResponse, OneshotSender};
#[derive(Message)]
#[rtype(result = "Responses")]
struct Ping(usize);
// impl Message for Ping { type Result = Responses;}

#[derive(PartialEq)]
enum Responses {
    Value(usize),
    NotValue,
}
impl MessageResponse for Responses
where
    A: Actor,
    M: Message,
{
    fn handle(self, ctx: &mut A::Context, tx: Option>) {
        if let Some(tx) = tx {
            tx.send(self);
        }
    }
}
struct MyActor {
    count: usize,
}
impl Actor for MyActor {
    type Context = Context;
    fn started(&mut self, ctx: &mut Context) {
        println!("Actor is alive");
     }
     fn stopped(&mut self, ctx: &mut Context) {
        println!("Actor is stopped");
     }
}
impl Handler for MyActor {
    type Result = Responses;

    fn handle(&mut self, msg: Ping, _: &mut Context) -> Self::Result {
        self.count += msg.0;
        Responses::Value(self.count)
    }
}

#[actix::main]
async fn main() {
    let addr:Addr = MyActor::create(|ctx| {
        // теперь мы можем получить адрес первого актера
        let addr:Addr = ctx.address();

        // start new actor
        let my_actor = MyActor { count: 10 };
        my_actor
    });
    // или
    // start new actor
    //let addr = MyActor { count: 10 }.start();

    // send message and get future for result
    let res = addr.send(Ping(10)).await;

    // handle() returns tokio handle
    println!("RESULT: {}", res.unwrap() == Responses::Value(20));

    // stop system and exit
    System::current().stop();
}

use actix::prelude::*;
use std::time::Duration;
#[derive(Message)]
#[rtype(result = "()")]
struct Ping {
    pub id: usize,
}
// Actor definition
struct Game {
    counter: usize,
    name: String,
    recipient: Recipient,
}
impl Actor for Game {
    type Context = Context;
}
// простой обработчик сообщений для сообщения Ping
impl Handler for Game {
    type Result = ();
    fn handle(&mut self, msg: Ping, ctx: &mut Context) {
        self.counter += 1;
        if self.counter > 10 {
            System::current().stop();
        } else {
            println!("[{0}] Ping received {1}", self.name, msg.id);
            // wait 100 nanoseconds
            ctx.run_later(Duration::new(0, 100), move |act, _| {
                act.recipient.do_send(Ping { id: msg.id + 1 });
            });
        }
    }
}
fn main() {
    let mut system = System::new();
    // Чтобы получить объект Recipient, нам нужно использовать другой метод построителя что позволит отложить создание актера
    let addr = system.block_on(async {
        Game::create(|ctx| {
            // теперь мы можем получить адрес первого актера и создать второго актера
            let addr = ctx.address();
            let addr2 = Game {
                counter: 0,
                name: String::from("Game 2"),
                recipient: addr.recipient(),
            }
            .start();
            // давай начнем пинги
            addr2.do_send(Ping { id: 10 });
            // теперь мы наконец можем создать первого актера
            Game {
                counter: 0,
                name: String::from("Game 1"),
                recipient: addr2.recipient(),
            }
        });
    });
    system.run();
}

Актеры из Tokio (без библиотеки Actix)


use tokio::sync::{oneshot, mpsc};
struct MyActor {
    receiver: mpsc::Receiver,
    next_id: u32,
}
enum ActorMessage {
    GetUniqueId {
        respond_to: oneshot::Sender,
    },
}
impl MyActor {
    fn new(receiver: mpsc::Receiver) -> Self {
        MyActor {
            receiver,
            next_id: 0,
        }
    }
    fn handle_message(&mut self, msg: ActorMessage) {
        match msg {
            ActorMessage::GetUniqueId { respond_to } => {
                self.next_id += 1;

                // The `let _ =` ignores any errors when sending.
                //
                // This can happen if the `select!` macro is used
                // to cancel waiting for the response.
                let _ = respond_to.send(self.next_id);
            },
        }
    }
}
async fn run_my_actor(mut actor: MyActor) {
    while let Some(msg) = actor.receiver.recv().await {
        actor.handle_message(msg);
    }
}

#[derive(Clone)]
pub struct MyActorHandle {
    sender: mpsc::Sender,
}

impl MyActorHandle {
    pub fn new() -> Self {
        let (sender, receiver) = mpsc::channel(8);
        let actor = MyActor::new(receiver);
        tokio::spawn(run_my_actor(actor));

        Self { sender }
    }
    pub async fn get_unique_id(&self) -> u32 {
        let (send, recv) = oneshot::channel();
        let msg = ActorMessage::GetUniqueId {
            respond_to: send,
        };
        // Ignore send errors. If this send fails, so does the
        // recv.await below. There's no reason to check the
        // failure twice.
        let _ = self.sender.send(msg).await;
        recv.await.expect("Actor task has been killed")
    }
}
fn main(){
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        let h = MyActorHandle::new();

        let id:u32 = h.get_unique_id().await;
        assert_eq!(1_u32,id);
        let id:u32 = h.get_unique_id().await;
        assert_eq!(2_u32,id);
        let id:u32 = h.get_unique_id().await;
        assert_eq!(3_u32,id);
    }); 
}

1. Задача суммировать числа параллельно

только SyncContext для MyActorTask это делает

вариант Context для MyActorTask синхронно работает

use actix::prelude::*;
use std::time::{Instant};
struct MyActorTask {
    id:usize,
    //data:String,
    recipient: Recipient<ResultMessage>
}
impl Actor for MyActorTask {
    type Context = Context<Self>;
    fn started(&mut self, ctx: &mut Context<Self>) {
        println!("MyActorTask started");
    }
}
#[derive(Message)]
#[rtype(result = "()")]
struct TaskMessage{id:usize,payload:String}
impl Handler<TaskMessage> for MyActorTask {
    type Result = ();
    fn handle(&mut self, msg: TaskMessage, _ctx: &mut Context<Self>) -> Self::Result {
        let result:usize = msg.payload
        .chars()
        .map(|c| c.to_digit(10).expect("should be a digit") as usize)
        .sum();
         std::thread::sleep(std::time::Duration::from_secs(1));
        let _ = self.recipient.do_send(ResultMessage{id:msg.id,sum:result});
    }
}

struct MyActorResult {
    sum: usize,
    finish_len: usize,
    state_len: usize,
    start:Instant
}
impl Actor for MyActorResult {
    type Context = Context<Self>;
    fn stopped(&mut self, ctx: &mut Context<Self>) {
        println!("MyActorResult:{}",self.sum);
    }
}
#[derive(Message,Debug)]
#[rtype(result = "()")]
struct ResultMessage{
    id:usize,
    sum:usize,
}
impl Handler<ResultMessage> for MyActorResult {
    type Result = ();

    fn handle(&mut self, msg: ResultMessage, _ctx: &mut Context<Self>) -> Self::Result {
        println!("{:?}",&msg);
        self.sum+=msg.sum;
        self.state_len+=1;

        if self.state_len==self.finish_len{
           println!("MyActorResult is stopped Millis:{}",self.start.elapsed().as_millis());// Millis:8002
           System::current().stop();  
        }  
    }
}

2. Задача суммировать числа параллельно

только SyncContext для MyActorTask это делает

вариант Context для MyActorTask синхронно работает

use actix::prelude::*;
use std::time::{Instant};
struct MyActorTask {
    id:usize,
    recipient: Recipient<ResultMessage>
}
impl Actor for MyActorTask {
    type Context = SyncContext<Self>;
    fn started(&mut self, ctx: &mut SyncContext<Self>) {
        //println!("MyActorTask started");
    }
}
#[derive(Message)]
#[rtype(result = "()")]
struct TaskMessage{id:usize,payload:String}
impl Handler<TaskMessage> for MyActorTask {
    type Result = ();
    fn handle(&mut self, msg: TaskMessage, _ctx: &mut SyncContext<Self>) -> Self::Result {
        //let thread:std::thread::Thread = std::thread::current();println!("{:?}",thread.name());
        let result:usize = msg.payload
        .chars()
        .map(|c| c.to_digit(10).expect("should be a digit") as usize)
        .sum();
        std::thread::sleep(std::time::Duration::from_secs(1));
        let _ = self.recipient.do_send(ResultMessage{id:msg.id,sum:result});
    }
}

struct MyActorResult {
    sum: usize,
    finish_len: usize,
    state_len: usize,
    start:Instant
}
impl Actor for MyActorResult {
    type Context = Context<Self>;
    fn stopped(&mut self, ctx: &mut Context<Self>) {
        println!("MyActorResult:{}",self.sum);
    }
}
#[derive(Message,Debug)]
#[rtype(result = "()")]
struct ResultMessage{
    id:usize,
    sum:usize
}
#[derive(Message,Debug)]
#[rtype(result = "usize")]
struct TotalResultMessage;
impl Handler<ResultMessage> for MyActorResult {
    type Result = ();
    fn handle(&mut self, msg: ResultMessage, _ctx: &mut Context<Self>) -> Self::Result {
        println!("{:?}",&msg);
        self.sum+=msg.sum;
        self.state_len+=1;
        if self.state_len==self.finish_len{
           println!("MyActorResult is stopped Millis:{}",self.start.elapsed().as_millis());// Millis:1000
           System::current().stop();  
        }  
    }
}

3. Задача суммировать числа параллельно

только SyncContext для MyActorTask это делает

вариант Context для MyActorTask синхронно работает


// Executor System::block_on
fn main() {
    let sys = System::new();
    let addr = sys.block_on(async {
        let data = "86967897737416471853297327050364959
        11861322575564723963297542624962850
        70856234701860851907960690014725639
        38397966707106094172783238747669219
        52380795257888236525459303330302837
        58495327135744041048897885734297812
        69920216438980873548808413720956532
        16278424637452589860345374828574668";
        let chunked_data = data.split_whitespace();
        let count = 7;
      
        let addr_res = MyActorResult{
            sum:0,
            finish_len: count,
            state_len:0,
            start:Instant::now()
        }.start();
        
        let addr_task = MyActorTask {id:0,recipient: addr_res.clone().recipient() }.start();
    
        for (i, data_segment) in chunked_data.enumerate() {
            addr_task.do_send(TaskMessage{id:i,payload:data_segment.to_owned()});
        } 
        // stop system and exit
        //System::current().stop();
    });
    sys.run().unwrap();
}

// Executor Arbiter::spawn
fn _main() {
    let system = System::new();

    let execution = async {
        let data = "86967897737416471853297327050364959
        11861322575564723963297542624962850
        70856234701860851907960690014725639
        38397966707106094172783238747669219
        52380795257888236525459303330302837
        58495327135744041048897885734297812
        69920216438980873548808413720956532
        16278424637452589860345374828574668";

        let chunked_data = data.split_whitespace();
        let count = 7;
      
        let addr_res = MyActorResult{
            sum:0,
            finish_len: count,
            state_len:0,
            start:Instant::now()
        }.start();

        let addr_task = MyActorTask {id:0,recipient: addr_res.recipient()}.start();
        for (i, data_segment) in chunked_data.enumerate() {
            addr_task.do_send(TaskMessage{id:i,payload:data_segment.to_owned()});
        } 
    };
    Arbiter::current().spawn(execution);

    let _ = system.run(); 
}


// Executor System::block_on
fn _main() {
    let system = System::new();
    let execution = async {
        let data = "86967897737416471853297327050364959
        11861322575564723963297542624962850
        70856234701860851907960690014725639
        38397966707106094172783238747669219
        52380795257888236525459303330302837
        58495327135744041048897885734297812
        69920216438980873548808413720956532
        16278424637452589860345374828574668";
        let addr_res = MyActorResult{
            sum:0,
            finish_len: 8,
            state_len:0,
            start:Instant::now()
        }.start();
        let chunked_data = data.split_whitespace();
        let count = 8;
        let recipient = addr_res.clone().recipient();
        let addr_task = SyncArbiter::start(48, move|| MyActorTask {id:0,recipient: recipient.clone()});
        for (i, data_segment) in chunked_data.enumerate() {
            addr_task.do_send(TaskMessage{id:i,payload:data_segment.to_owned()});
        }
    };
    system.block_on(execution);
    let _ = system.run(); 
}
// Executor Arbiter::spawn
fn main() {
    let system = System::new();
    let execution = async {
        let data = "86967897737416471853297327050364959
        11861322575564723963297542624962850
        70856234701860851907960690014725639
        38397966707106094172783238747669219
        52380795257888236525459303330302837
        58495327135744041048897885734297812
        69920216438980873548808413720956532
        16278424637452589860345374828574668";
        let chunked_data = data.split_whitespace();
        let count = 8;
        // Получатель один в одном потоке и синхронный
        let addr_res = MyActorResult{
            sum:0,
            finish_len: 8,
            state_len:0,
            start:Instant::now(),
        }.start();
     
       let recipient = addr_res.clone().recipient();
       let addr_task = SyncArbiter::start(48, move|| MyActorTask {id:0,recipient: recipient.clone()});
       /*let addr_task = SyncArbiter::start_with_thread_builder(
        count, 
        move || std::thread::Builder::new().name("thread_1".into()).stack_size(2000000) ,
        move|| MyActorTask {id:0,recipient: addr_res.clone().recipient()});*/

        for (i, data_segment) in chunked_data.enumerate() {
            addr_task.do_send(TaskMessage{id:i,payload:data_segment.to_owned()});
        }
        //-----------------------------------------------------------------------------------------------------------
        /*for (i, data_segment) in chunked_data.enumerate() {
            let recipient =  addr_res.clone().recipient();
            //let addr_task = SyncArbiter::start(1, move|| MyActorTask {id:0,recipient: recipient.clone() });
            let addr_task = SyncArbiter::start_with_thread_builder(
                1, 
                move || std::thread::Builder::new().name(format!("thread_{}",i)).stack_size(2000000) ,
                move|| MyActorTask {id:0,recipient: recipient.clone()});
        
            addr_task.do_send(TaskMessage{id:i,payload:data_segment.to_owned()});
        }*/   
    };
    Arbiter::current().spawn(execution);
    let _ = system.run(); 
}

Вся поддержка гарантии безопасности в условиях многозадачности реализована в стандартной библиотеке Rust предоставляет два trait Send и Sync

Unsafe Маркеры ограничители (т.е. реализовывать их не безопасно): Sync можно безопасно передавать ссылки &T между потоками (т.е. к данным будет синхронизированный доступ, уникальный для мутаций) Send можно безопасно передавать значения T между потоками (т.е. они будут скопированны/склонированы)

Send это черта маркера (характеристика без API), которая обещает разработчикам безопасно отправлять (перемещать) в другой поток.

Sync это признак маркера, который обещает, что потоки могут безопасно обмениваться разработчиками через общую ссылку.

Почему существуют Send и Sync?

Send: Тип, реализующий трейт Send, указывает, что его значение может быть безопасно передано (перемещено) между потоками. Пример: Box<T> реализует Send, если T реализует Send.

Sync: Тип, реализующий трейт Sync, указывает, что ссылки на его значение могут быть безопасно переданы (поделены) между потоками. Пример: &T реализует Sync, если T реализует Sync.

Большинство типов являются Send

Прежде всего, важно понять, что большинство структур (или перечислений) Send: Многие типы реализуют Send, но не Sync (большинство типов Send, на самом деле)

Пример Send типов - Option, Vec, Cell, RefCell но они не Sync, потому что они могут быть безопасно отправлены между потоками, но не разделены между ними ссылкой.

  • любая структура, которая не содержит ссылки, может быть Send + 'static
  • любая структура, содержащая ссылки с нижним сроком жизни 'a, может быть Send + 'a

В результате вы обычно ожидаете, что любой Sync struct будет Send тоже, потому что Send - это такая простая панель для достижения (по сравнению с гораздо более сложной строкой Sync, которой требуется безопасная одновременная модификация из нескольких потоков).

Пример Sync типов: Arc, Mutex

Пример не Sync (!Sync): Cell, RefCell

Пример не Sync и не Send вместе:

  • необработанные указатели *mut T и *const T (поскольку у них нет средств защиты)
  • Rc (не имеет атомарность операций с данными)

Send + Sync (можно передавать между потоками и совместный доступ)

  • Примитивы: i8, i16, i32, i64, i128, isize
  • Примитивы: u8, u16, u32, u64, u128, usize
  • Примитивы: f32, f64, bool, char
  • Умные указатели: Box<T> (если T: Send + Sync)
  • Атомарные типы: AtomicBool, AtomicI8, ..., AtomicUsize
  • Синхронизация: Mutex<T>, RwLock<T>, Arc<T> (если T: Send + Sync)
  • Каналы: Sender<T>, Receiver<T> (если T: Send)
  • String, Vec<T> (если T: Send + Sync)
  • Option<T>, Result<T, E> (если T,E: Send + Sync)

!Send (нельзя передавать между потоками)

  • Ссылки: Rc<T> - подсчет ссылок не атомарный
  • Ссылки: &'static mut T - исключительная мутабельная ссылка
  • Селлы: Cell<T> (если T: !Copy)
  • Ссылки на локальные данные: &'a T где 'a не 'static
  • *mut T, *const T (сырые указатели)
  • MutexGuard<'a, T> - привязан к конкретному Mutex в потоке
  • dyn Trait (если Trait: !Send)

!Sync (нельзя передавать между потоками)

  • Клетки: Cell<T>, RefCell<T> - внутренняя мутабельность без синхронизации
  • Ссылки: Rc<T> - не атомарный подсчет ссылок
  • *mut T, *const T (сырые указатели)
  • UnsafeCell<T> - основа внутренней мутабельности
  • dyn Trait (если Trait: !Sync)

!Send + !Sync (полностью не thread-safe)

  • Ссылки с подсчетом: Rc<T> - основной пример!
  • Клетки: Cell<T>, RefCell<T> (если T: !Send)
  • *mut T, *const T (сырые указатели)
  • UnsafeCell<T> (если T: !Send)
  • dyn Trait (если Trait: !Send + !Sync)

use std::rc::Rc;
use std::cell::Cell;
use std::sync::Arc;
use std::sync::Mutex;
fn main(){
    // Send + Sync
    let atomic = Arc::new(5);
    let mutex = Mutex::new(42);

    // !Send
    let rc = Rc::new(5);
    let cell = Cell::new(42);

    // !Sync  
    let cell = Cell::new(42);
    let ref_cell = std::cell::RefCell::new(42);

    // !Send + !Sync
    let rc_cell = Rc::new(Cell::new(42));
}

Это ограничение гарантирует, что между задачами не будут передаваться такие "небезопасные" объекты, как заимствованные &T и управляемые указатели. Таким образом, доступ к составной структуре данных может быть осуществлён только через её собственное "корневое" значение, следовательно, никаких дополнительных блокировок или копирований не требуется для того, чтобы избежать состояния гонки за ресурсом (data races) при использовании структур такого типа.

Sync позволяет делиться значением между потоками.

Sync указывает компилятору, что использование переменных этого типа не приводит к небезопасной работе с памятью в многопоточной среде.

Например, совместное использование неизменяемых данных (тип Arc<T>) с помощью атомарного счетчика ссылок является потокобезопасным.

Реализуется автоматически если тип T Sync если &T Send т.е. T должен реализовывать Send, а Send также реализуется автоматически если компилятору это уместно !!!

impl<T> Send for Arc<T> where T: Send + Sync + ?Sized
impl<T> Sync for Arc<T> where T: Send + Sync + ?Sized

impl<T: ?Sized + Send> Send for Mutex<T>
impl<T: ?Sized + Send> Sync for Mutex<T>

Вот зачем мы оборачивали db в мьютекс: он делает вложенные данные Send и Sync — синхронизированными и отправляемыми в другой поток.

#![feature(negative_impls)]

struct SpecialThreadToken(u8);

impl !Send for SpecialThreadToken {}
impl !Sync for SpecialThreadToken {}

!Sync

Пример !Sync типа это Cell

Из-за свойств Cell/RefCell мутировать содержимое без разрешения то при совместном владении в разных потоках мы можем получить Гонку данных

nomicon/send-and-sync

#[derive(Debug, Clone)]
pub struct Point {
        x: Cell<i32>,
        y: Cell<i32>,
}
impl Point {
        pub fn new(x: Cell<i32>, y: Cell<i32>) -> Point {
            Point { x, y }
        }
        pub fn set_x(&mut self, x: i32) {
            self.x.set(x);
        }
        pub fn set_y(&mut self, y: i32) {
            self.y.set(y);
        }
        pub fn get_x(&self) -> i32 {
            self.x.get()
        }
}
#[test]
fn test() {
    let mut point: Point = Point::new(Cell::new(3), Cell::new(3));
    {
        let mut ref_point: &mut Point = &mut point;
        //let point_clone:Point_send = point.clone();
        crossbeam::scope(|scope_| {
            scope_
            .spawn(move || {
                //ref_point.x.set(0);
                //point_clone.x.set(0);
                ref_point.set_x(0);
                // println!("point_clone={:#?}",point_clone);
            })
            .join();
        });
    }
    thread::sleep_ms(50);
    // println!("point={:#?}",point);
    assert_eq!(point.get_x(), 0);
}

only Sync

Пример реализации типа onlysync. Небезопасная работа с необработанными указателями не реализует черты Send и Sync по умолчанию.


#[derive(Debug)]
pub struct OnlySync {
    pub field: *mut i32,
}
unsafe impl Sync for OnlySync {}
impl OnlySync {
    pub fn new() -> Arc> {
        Arc::new(Mutex::new(OnlySync { field: &mut 1 }))
    }
}
impl Drop for OnlySync {
    fn drop(&mut self) {}
}

fn main() {
    let mut onlySync: Arc> = OnlySync::new();

    use notsync::{self, Point as Point_send};
    let mut point: Point_send = Point_send::new(Cell::new(3), Cell::new(3));
    {
        let mut ref_point: &mut Point_send = &mut point;

        crossbeam::scope(|scope_| {
            scope_
                .spawn(move || {
                    //ref_point.x.set(0);
                    //point_clone.x.set(0);
                    ref_point.set_x(0);
                    // println!("point_clone={:#?}",point_clone);
                })
                .join();
        });
    }
    thread::sleep_ms(50);
    assert_eq!(point.get_x(), 0);
}

Parallelism

SIMD

rayon

rayon FAQ

rayon Example

packed_simd

syncbox Набор утилит для написания параллельного кода.

crate rayon является библиотекой Parallelism данных (data-parallelism) для Rust. Он чрезвычайно легкий и облегчает преобразование последовательного вычисления в параллельное. Упрощает преобразование последовательного итератора для выполнения в параллельных потоках.

Парралельные вычисления сильны когда характер операций предназначен больше для процессора т.е. арифметика сложить/вычесть/разделить, но если задачи характерны длительными операциями выделения памяти, парсинга то это для асинхронного программирования и выделения потоков

SIMD позволяет одновременно выполнять одну операцию над несколькими элементами данных, что повышает производительность для задач, таких как обработка массивов, мультимедиа и научные вычисления. Другой способ выполнить параллельную обработку данных без использования потоков - это использование инструкций SIMD. Если алгоритм достаточно распараллеливаемый, применение инструкций SIMD может значительно повысить производительность.

std::simd предоставляет удобный способ работы с SIMD (Single Instruction, Multiple Data) непосредственно через стандартную библиотеку Rust.

SIMD

Пример использования std::simd

Чтобы включить поддержку std::simd, необходимо использовать ночной выпуск Rust (nightly) и включить соответствующую экспериментальную функцию.


#![feature(portable_simd)] // Включение функции std::simd
use std::simd::{Simd, SimdPartialEq, SimdFloat};
fn main() {
    // Создаем SIMD-векторы с 4 элементами типа f32
    let a = Simd::::from_array([1.0, 2.0, 3.0, 4.0]);
    let b = Simd::::from_array([5.0, 6.0, 7.0, 8.0]);

    // Арифметические операции
    let sum = a + b;
    let product = a * b;

    println!("Sum: {:?}", sum); // Вывод: Simd([6.0, 8.0, 10.0, 12.0])
    println!("Product: {:?}", product); // Вывод: Simd([5.0, 12.0, 21.0, 32.0])

    // Сравнения
    let comparison = a.lanes_eq(b);
    println!("Comparison: {:?}", comparison); // Вывод: Mask([false, false, false, false])

    // Логические операции
    let mask = a.lanes_gt(Simd::splat(2.0)); // > 2.0
    println!("Mask: {:?}", mask); // Вывод: Mask([false, false, true, true])

    // Условные выборки
    let result = mask.select(a, b);
    println!("Select: {:?}", result); // Вывод: Simd([5.0, 6.0, 3.0, 4.0])
}

SIMD (Single Instruction, Multiple Data) — это когда одна инструкция процессора работает не с одним числом, а с целым вектором.

Представь, что обычные регистры (rax, rcx) — это стаканы. А SIMD-регистры (ymm в архитектуре AVX) — это длинные подносы, куда влезает сразу 8 чисел по 32 бита (int).

SIMD: 1 итерация = 8 сложений за раз.

#![allow(unused)]
fn main() {
use std::arch::x86_64::*; // Импортируем команды для x86_64

pub fn sum_avx2(a: &[i32]) -> i32 {
    // В реальности нужно проверить, поддерживает ли процессор AVX2
    // if is_x86_feature_detected!("avx2") { ... }

    let mut sum = 0;
    let chunks = a.chunks_exact(8); // Берем по 8 элементов (256 бит)
    let remainder = chunks.remainder();

    unsafe {
        // Создаем "нулевой" вектор-аккумулятор (поднос)
        let mut total_vec = _mm256_setzero_si256();

        for chunk in chunks {
            // Загружаем 8 чисел из памяти в один векторный регистр
            let data_vec = _mm256_loadu_si256(chunk.as_ptr() as *const __m256i);
            
            // Складываем 8 чисел из data_vec с 8 числами в total_vec
            // ОДНОЙ командой процессора!
            total_vec = _mm256_add_epi32(total_vec, data_vec);
        }

        // В конце нужно вытащить 8 чисел из вектора и сложить их между собой
        let mut results = [0i32; 8];
        _mm256_storeu_si256(results.as_mut_ptr() as *mut __m256i, total_vec);
        sum = results.iter().sum::<i32>();
    }

    sum + remainder.iter().sum::<i32>()
}
}

std::simd Обработка массивов


#![feature(portable_simd)]
use std::simd::Simd;
fn main() {
    let data = [1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0];
    let mut results = [0.0f32; 8];

    // Размер SIMD-вектора
    let lanes = Simd::::LANES;

    for (chunk, result_chunk) in data.chunks_exact(lanes).zip(results.chunks_exact_mut(lanes)) {
        let v = Simd::::from_slice(chunk);
        let squared = v * v; // Квадрат каждого элемента
        squared.write_to_slice(result_chunk);
    }

    println!("Squared results: {:?}", results);
    // Вывод: Squared results: [1.0, 4.0, 9.0, 16.0, 25.0, 36.0, 49.0, 64.0]
}

std::simd Перестановка элементов (Swizzling)


#![feature(portable_simd)]
use std::simd::{Simd, SimdSwizzle};
fn main() {
    let a = Simd::::from_array([10, 20, 30, 40]);

    // Перестановка элементов
    let swizzled = a.swizzle::<{ [3, 2, 1, 0] }>();
    println!("Swizzled: {:?}", swizzled); // Вывод: Simd([40, 30, 20, 10])
}

std::simd Реализация матричных операций


#![feature(portable_simd)]
use std::simd::Simd;
fn main() {
    let row1 = Simd::::from_array([1.0, 0.0, 0.0, 0.0]);
    let row2 = Simd::::from_array([0.0, 1.0, 0.0, 0.0]);

    let column = Simd::::from_array([5.0, 6.0, 7.0, 8.0]);

    let result1 = row1 * column;
    let result2 = row2 * column;

    println!("Row1 * Column: {:?}", result1); // Вывод: Simd([5.0, 0.0, 0.0, 0.0])
    println!("Row2 * Column: {:?}", result2); // Вывод: Simd([0.0, 6.0, 0.0, 0.0])
}

Rust поддерживает параллелизм в виде rayon. Чтобы использовать правильный инструмент в нужном месте, важно понять, как concurrency и параллелизм различаются

crate ryan является библиотекой для параллельной обработки данных с использованием концепции актеров. Он предоставляет простую и безопасную модель акторов для создания многозадачных и параллельных приложений в Rust. Основная цель ryan — это облегчение работы с многозадачностью, абстрагируя низкоуровневые детали потоков и синхронизации, предоставляя разработчику удобный способ создания параллельных задач.

Параллельность без явного использования потоков

Альтернативы:

Если вы ищете альтернативы для многозадачности или работы с акторной моделью, то можете обратить внимание на следующие библиотеки:

  • tokio или async-std для асинхронного программирования.
  • actix — другая популярная библиотека для акторной модели в Rust.

use rayon::prelude::*;

// Определим сообщение
struct MyMessage(String);

// Определим актера, который будет обрабатывать сообщения
struct MyActor;

impl Actor for MyActor {
    type Msg = MyMessage;

    fn receive(&mut self, msg: Self::Msg) {
        println!("Received message: {}", msg.0);
    }
}

fn main() {
    // Создаем актера
    let actor = MyActor;

    // Создаем систему акторов
    let system = ActorSystem::new();

    // Запускаем актера в системе
    let actor_handle = system.spawn(actor);

    // Отправляем сообщение актору
    actor_handle.send(MyMessage("Hello, world!".to_string()));

    // Завершаем выполнение системы
    system.shutdown().unwrap();
}

Паралельная загрузка изображений с подсчетом количества файлов png

Параллельная загрузка изображений с подсчетом количества файлов png

fn load_images(paths:&[PathBuf]) -> Vec<_Image> {
  let pngs = paths.par_iter().filter(|p|p.ends_with("png")).map(|_|1).sum(); // подсчет количества png файлов
  paths.par_iter().map(|path| _Image::load(path)).collect()
}

или

use std::sync::atomic::{AtomicUsize,Ordering};
fn load_images(paths:&[PathBuf]) -> Vec<_Image> {
 let pngs = AtomicUsize::new(0);
 paths.par_iter().map(|path| {
   if path.ends_with("png"){
    pngs.fetch_add(1,Ordering::SeqCst);
   }
   _Image::load(path);
  }).collect()
}

Параллельный инкремент

youtube

fn dot_product(vec1:&[i32],vec2:&[i32])-> i32{
 // vec1.iter().zip(vec2).map(|(e1, e2)|e1*e2).fold(0,|a,b| a+b) ❌ не паралельно
    vec1.par_iter().zip(vec2).map(|(e1, e2)|e1*e2).reduce(0,|a,b| a+b) // ✅ паралельно
}

Не параллельный инкремент

fn increment_all(counts:&mut[u32]){
    for c in counts.iter_mut(){
        *c+=1;
    }
}

Параллельный инкремент

fn par_increment_all(counts:&mut[u32]){
    counts.par_iter_mut().for_each(|c| *c+=1);
}

fn main(){
   let mut m = [1,2,3];
   par_increment_all(&mut m);
   println!("{:?}",m);
}

Сравнение подсчета суммы

trait.ParallelIterator.html#method.fold

use rayon::prelude::*;
use std::time::Instant;

#[derive(Debug)]
struct Data{
    payload: i32,
}
#[derive(Debug)]
struct Nodes{
    data: Vec<Data>
}

let mut buf = vec![];
let numbers = 50_000_000;
for i in 0..numbers{
    buf.push(Data::new(1));
}
let mut nodes: Nodes = Nodes::new(buf);
let now = Instant::now();
algorithm(&mut nodes);
println!("Time #1:{} millis", now.elapsed().as_millis());

fn algorithm(nodes: &mut Nodes) -> i32{
    // 336 millis for 50 mln
    let mut sum = 0;
    for d in nodes.data.iter(){ sum+=d.payload;}
    sum 
    
    // 108 millis for 50 mln
    // nodes.data.par_iter_mut().map(|v|v.payload).reduce(||0,| a, b|{ a + b }) 
     
    // 209 millis for 50 mln
    // nodes.data.par_iter_mut().fold(||0_i32,|a, b| a + b.payload).sum::<i32>()

    // 528 millis for 50 mln
    nodes.data.iter().map(|v|v.payload).sum()   
}

Functions:

  • current_num_threads - Возвращает количество потоков в текущем реестре.
  • join - Делает два closures и потенциально запускает их параллельно. Он возвращает пару результатов от этих closures. Разделяет задачи на две пока это выгодно и происходит воровство между потоками освободившихся задач
  • join_context - Идентично join, за исключением того, что у closures есть параметр, который предоставляет контекст для способа вызова closures
  • scope - Создайте область «fork-join» sи вызовите closures со ссылкой s. В результате этого closures могут возникать асинхронные задачи s.
  • spawn - Отправляет задачу в Raypool threadpool в «статическом» или глобальном масштабе.

Подобно стандартным потокам, эта задача не привязана к текущему фрейму стека и, следовательно, не может содержать ссылок, отличных от тех, которые имеют 'static срок службы. Если вы хотите создать задачу, которая ссылается на данные стека, используйте эту scope() функцию для создания области.

rayon::current_num_threads

Возвращает количество потоков в текущем реестре. Если этот код выполняется в пуле потоков Rayon, то это будет число потоков для пула потоков текущего потока.

В противном случае это будет число потоков для глобального пула потоков.

Это может быть полезно при попытке судить о том, сколько раз разделить параллельную работу (для этой цели используются параллельные итераторы).

print!("{}", rayon::current_num_threads() );

rayon::join

rayon::join могут использовать эксклюзивный стек

Делает два закрытия и потенциально запускает их параллельно. Он возвращает пару результатов от этих закрытий.

Предполагается, что указанные замыкания join() являются задачами, связанными с ЦП, которые не выполняют операции ввода-вывода или другие блокирующие операции.

Более того, если вы заблокируете одно закрытие, ожидающее другого (например, используя канал), это может привести к блокировке.


fn main(){
     let res = rayon::join(|| { "str" }, || { 2 });
     print!("{:?}",res);// ("str", 2)

     let mut v = vec![5, 1, 8, 22, 0, 44];
     let (lo, hi):(&mut[i32],&mut[i32]) = v.split_at_mut(3);

     let res = rayon::join(|| { lo.sort(); lo}, || { hi.sort();hi });

     let y: Vec<_> = vec![res.0,res.1].into_par_iter().flatten().collect();
     print!("{:?}",y);// [1, 0, 5, 22, 8, 44]

    let par_iter = res.0.par_iter().chain(res.1.par_iter());
    let y: Vec<_> = par_iter.cloned().collect();
    print!("{:?}",y);// [1, 0, 5, 22, 8, 44]

    let y:Vec<_> = res.0.into_par_iter().interleave(res.1).collect();
    print!("{:?}",y);// [1, 0, 5, 22, 8, 44]
}

Сортировка разделением на части вектора


fn main(){
    let mut v = vec![5, 1, 8, 22, 0, 44];
    quick_sort(&mut v);
    assert_eq!(v, vec![0, 1, 5, 8, 22, 44]);
}

fn quick_sort(v: &mut [T]) {
    if v.len() > 1 {
        let mid = partition(v);
        let (lo, hi) = v.split_at_mut(mid);//split_at_mut() Разделяет один срез на два по индексу разделителя.

        println!("lo={:?} hi={:?}",lo,hi);
        rayon::join(|| quick_sort(lo),
                    || quick_sort(hi));
    }
}

// Partition rearranges all items `<=` to the pivot
// item (arbitrary selected to be the last item in the slice)
// to the first half of the slice. It then returns the
// "dividing point" where the pivot is placed.
fn partition(v: &mut [T]) -> usize {
    let pivot = v.len() - 1;
    let mut i = 0;
    for j in 0..pivot {
        if v[j] <= v[pivot] {
            v.swap(i, j);//меняет местами
            i += 1;
        }
    }
    v.swap(i, pivot);
    i
}

rayon::scope

Создайте область «fork-join» s и вызовите закрытие со ссылкой s. В результате этого закрытия могут возникать асинхронные задачи s. Эти задачи могут выполняться асинхронно по отношению к закрытию; они могут сами порождать дополнительные задачи s. Когда закрытие вернется, оно будет блокироваться до тех пор, пока все задачи, которые были порождены, не будут sзавершены.

scope() является более гибким строительным блоком по сравнению с join(), поскольку цикл может использоваться для создания любого количества задач без рекурсии. Однако эта гибкость достигается за счет производительности: задачи, порожденные использованием, scope()должны быть выделены в кучу, тогда как join()могут использовать эксклюзивный стек. Предпочитайте join() (или, что еще лучше, параллельные итераторы), где это возможно.


fn main(){
// point start
    rayon::scope(|s| {
        s.spawn(|s| { // task s.1
            s.spawn(|s| { // task s.1.1
                rayon::scope(|t| {
                    t.spawn(|_| ()); // task t.1
                    t.spawn(|_| ()); // task t.2
                });
            });
        });
        s.spawn(|s| { // task 2
        });
        // point mid
    });
// point end
}


fn main(){
    let mut v:Vec = vec![];
    rayon::scope(|s| {
        let mut v2 = &mut v;
        s.spawn(move |s| { // task s.1
            v2.push(1);
        });
    });
    println!("{:?}",v);
}

rayon::scope

Доступ к данным стека

Есть два варианта.

Мы можем сохранить замыкание как move замыкание, но вместо ссылки на переменную ok мы создаем затененную переменную, которая является заимствованием ok и захватом, который:


fn main(){
    let ok: Vec = vec![1, 2, 3];
    rayon::scope(|s| {
        let bad: Vec = vec![4, 5, 6];
        let ok: &Vec = &ok; // теневое оригинальное `ok`
        s.spawn(move |_| {
            println!("ok: {:?}", ok); // захватывает затененную версию
            println!("bad: {:?}", bad);
        });
        // Теперь мы также можем использовать теневое `ok`, так как` & Vec  `ссылки
        // могут свободно использоваться. Обратите внимание, что здесь нам нужно закрыть `move`,
        // если иначе мы будем пытаться заимствовать теневое` ok`,
        // и не пережить `scope`.
        s.spawn(move |_| println!("ok: {:?}", ok));
    });
}

Другой вариант - не использовать move ключевое слово, а вместо этого взять на себя ответственность за отдельные переменные:


fn main(){
    let ok: Vec = vec![1, 2, 3];
    rayon::scope(|s| {
        let bad: Vec = vec![4, 5, 6];
        s.spawn(|_| {
            // Передача права собственности на «bad» в локальную переменную (также называемую `bad`).
            // Это заставит закрытие завладеть «плохим» из среды.
            let bad = bad;
            println!("ok: {:?}", ok); // `ok` только заимствовано.
            println!("bad: {:?}", bad); // ссылается на нашу локальную переменную выше.
        });
        s.spawn(|_| println!("ok: {:?}", ok)); // мы тоже можем заимствовать `ok`
    });
}

Trait rayon::slice::ParallelSliceMut:

  • par_split_mut
  • par_chunks_mut
  • par_sort
  • par_sort_by
  • par_sort_by_key
  • par_sort_unstable
  • par_sort_unstable_by
  • par_sort_unstable_by_key
  • as_parallel_slice_mut

fn main(){
    extern crate rayon; use rayon::prelude::*;
    let mut v_par = vec![-5, 4, 1, -3, 2];
    let now = Instant::now();
    v_par.par_sort_unstable();
    println!("{:?}", now.elapsed());  assert_eq!(v_par[..], [-5, -3, 1, 2, 4]);
}


fn main(){
    let mut v = [5, 4, 1, 3, 2];
    v.par_sort_unstable_by(|a, b| a.cmp(b));
    assert_eq!(v, [1, 2, 3, 4, 5]);
}


fn main(){
    let mut v = [-5i32, 4, 1, -3, 2];
    v.par_sort_unstable_by_key(|k| k.abs());
    assert_eq!(v, [1, 2, -3, 4, -5]);
}


fn main(){
    let mut v = [5, 4, 1, 3, 2];
    v.par_sort_by(|a, b| a.cmp(b));
    assert_eq!(v, [1, 2, 3, 4, 5]);
}


fn main(){
    let mut v = [-5i32, 4, 1, -3, 2];
    v.par_sort_by_key(|k| k.abs());
    assert_eq!(v, [1, 2, -3, 4, -5]);
}

Возвращает параллельный итератор по большей части chunk_size элементов за self раз. Куски изменяемы и не перекрываются.


fn main(){
    let mut array = [1, 2, 3, 4, 5,6];
    array.par_chunks_mut(3).for_each(|slice| { println!("{:?}",slice); slice.reverse()});
    println!("{:?}",array);
    [1, 2, 3]  [4, 5, 6]  [3, 2, 1, 6, 5, 4]
}

Возвращает параллельный итератор над изменяемыми подклассами, разделенными элементами, которые соответствуют разделителю. В данном случае разделитель 0


fn main(){

    let mut array = [1, 2, 3, 0, 2, 4, 8, 0, 3, 6, 9];
    array.par_split_mut(|i| *i == 0)
        .for_each(|slice| slice.reverse());
    assert_eq!(array, [3, 2, 1, 0, 8, 4, 2, 0, 9, 6, 3]);
}

as_parallel_slice_mut(&mut self) -> &mut [T] - Возвращает простой измененный фрагмент, который используется для реализации остальных параллельных методов.


fn main(){
    let mut array = vec![1, 2, 3, 4, 5,6];
    assert_eq!(array.as_parallel_slice_mut(), [1, 2, 3, 4, 5,6]);
    println!("{:?}", array.as_parallel_slice_mut());
}

Trait rayon::slice::ParallelSlice:

  • par_split
  • par_windows
  • par_chunks
  • as_parallel_slice

Возвращает параллельный итератор над сегментами, разделенными элементами, которые соответствуют разделителю. Минимальный элемент из максимальных элементов отрезков разделенных нулем


fn main(){
    let smallest = [1, 2, 3, 0, 2, 4, 8, 0, 3, 6, 9]
        .par_split(|i| *i == 0)
        .map(|numbers| numbers.iter().max().unwrap())
        .min();
    assert_eq!(Some(&3), smallest);
}

Возвращает параллельный итератор по всем непрерывным окнам длины window_size. Окна перекрываются.


fn main(){
    let windows: Vec<_> = [1, 2, 3 , 4].par_windows(2).collect();
    assert_eq!(vec![[1, 2], [2, 3], [3, 4]], windows);
    let windows: Vec<_> = [1, 2, 3 , 4].par_windows(3).collect();
    assert_eq!(vec![[1, 2, 3], [2, 3, 4]], windows);
}

Возвращает параллельный итератор по большей части chunk_size элементов за self раз. Куски не перекрываются.


fn main(){
    let chunks: Vec<_> = [1, 2, 3, 4, 5].par_chunks(2).collect();
    assert_eq!(chunks, vec![&[1, 2][..], &[3, 4], &[5]]);

    let array = vec![1, 2, 3, 4, 5,6];
    assert_eq!(array.as_parallel_slice(), [1, 2, 3, 4, 5,6]);
    println!("{:?}", array.as_parallel_slice());
}

Modules:

  • collections - Параллельные типы итераторов для стандартных коллекций
  • iter - Черты для написания параллельных программ с использованием интерфейса в стиле итератора
  • option - Параллельные типы итераторов для опций
  • prelude - Превенция района включает в себя различные ParallelIterator черты. Цель состоит в том, что можно включать use * rayon::prelude::* и иметь легкий доступ к различным чертам и методам, которые вам понадобятся.
  • range - Параллельные типы итераторов для диапазонов, тип для значений, созданных a..b выражениями
  • result - Параллельные типы итераторов для результатов
  • slice - Параллельные типы итераторов для срезов
  • str - Параллельные типы итераторов для строк
  • vec - Параллельные типы итераторов для векторов (Vec<T>)

Trait rayon::iter::IntoParallelRefIterator

par_iter


fn sum_of_squares(input: &[i32]) -> i32 {
    // par_iter Преобразует self в параллельный итератор.
    input.par_iter() // <-- просто измените это !
        .map(|&i| i * i)
        .sum()
}
fn main(){
    let v_par: Vec<_> = (0..100).collect();
    println!("{:?}", sum_of_squares(&v_par[..]));

    let mut v_par = [-5, 4, 1, -3];
    println!("{:?}", sum_of_squares(&v_par[..]));
}

Trait rayon::iter::IntoParallelRefMutIterator


fn main(){
    let mut v = vec![0usize; 5];
    v.par_iter_mut().enumerate().for_each(|(i, x)| *x = i);
    assert_eq!(v, [0, 1, 2, 3, 4]);
}

 (0..10).into_par_iter()

Module rayon::str

Trait rayon::str::ParallelString

  • par_chars
  • par_char_indices
  • par_bytes
  • par_encode_utf16
  • par_split
  • par_split_terminator
  • par_lines
  • par_split_whitespace
  • par_matches
  • par_match_indices
  • as_parallel_string

for_each


fn main(){
//par_chars Возвращает итератор по символам строкового среза 
    let word = "hello";
    let mut chars = word.par_chars();
    let mut v: Vec = vec![];
    chars.for_each(|mut s|  {
        print!("{}",s);
    });// ehllo
}


use std::sync::mpsc::channel;
fn main(){
    let mut chars = word.par_chars();
    let (sender, receiver) = channel();
    chars.for_each_with(sender, |s, mut x| {  s.send(x.to_uppercase().to_string()).unwrap()});
    let mut res: String = receiver.iter().collect::();
    println!("{}",res);// LHLOE
}


fn main(){
    let (sender, receiver) = channel();
    (0..5).into_par_iter().for_each_with(sender, |s, x| s.send(x).unwrap());
    let mut res: Vec<_> = receiver.iter().collect();
    res.sort();
    assert_eq!(&res[..], &[0, 1, 2, 3, 4]);
}

par_char_indices - Возвращает параллельный итератор над символами строки с их позициями.


fn main(){
    let min = "hello".par_char_indices().min_by_key(|&(_i, c)| c as i32);
    assert_eq!(Some((1, 'e')), min);
}

par_bytes - Возвращает параллельный итератор по байтам строки


fn main(){

    let max = "hello".par_bytes().max();
    assert_eq!(Some(b'o'), max);
    //string.as_bytes().par_iter().cloned()
}

par_split - Возвращает параллельный итератор над подстроками, разделенными заданным символом или предикатом, похожим на str::split

par_split_terminator - не создает пустую подстроку после концевого терминатора


fn main(){

    let total = "1, 2, buckle, 3, 4, door"
        .par_split(',')
        .filter_map(|s| s.trim().parse::().ok())
        .sum();
    assert_eq!(10, total);
}

par_lines - Возвращает параллельный итератор по строкам


fn main(){
    let lengths: Vec<_> = "hello world\nfizbuzz"
        .par_lines()
        .map(|l| l.len())
        .collect();
    assert_eq!(vec![11, 7], lengths);
}

par_split_whitespace - Возвращает параллельный итератор над суб-срезами строки, разделяемой любым количеством пробелов.


fn main(){
    let longest = "which is the longest word?"
        .par_split_whitespace()
        .max_by_key(|word| word.len());
    assert_eq!(Some("longest"), longest);
}

par_matches - Возвращает параллельный итератор над подстроками, которые соответствуют заданному символу или предикату, похожим на str::matches


fn main(){
    let total = "1, 2, buckle, 3, 4, door"
        .par_matches(char::is_numeric)
        .map(|s| s.parse::().expect("digit"))
        .sum();
    assert_eq!(10, total);
}


fn main(){
    // par_match_indices как par_matches но возвращает и индекс
    let digits: Vec<_> = "1, 2, buckle, 3, 4, door"
        .par_match_indices(char::is_numeric)
        .collect();
    assert_eq!(digits, vec![(0, "1"), (3, "2"), (14, "3"), (17, "4")]);
}


fn main(){
    let total:String =  "1, 2, buckle, 3, 4, Door"
        .par_matches(char::is_uppercase).map(|s| s.to_string()).collect();
    println!("{:?}",total);
}

Module rayon::collections

Parallel iterator types for standard collections

fn check<I>(iter: I)
    where I: ParallelIterator + Clone,
          I::Item: std::fmt::Debug + PartialEq
{
    let a: Vec<_> = iter.clone().collect();
    let b: Vec<_> = iter.collect();
    assert_eq!(a, b);
}

fn check<I>(iter: I)
    where I: ParallelIterator + Debug
{
    println!("{:?}", iter);
}

use std::collections::HashMap;
fn main(){
    let mut map: HashMap<_,_> = (0..10).enumerate().collect();
    check(map.par_iter());
    check(map.par_iter_mut());
    check(map.into_par_iter());
}

use std::collections::BTreeMap;
fn main(){
    let mut map: BTreeMap<_,_> = (0..10).enumerate().collect();
    check(map.par_iter());
    check(map.par_iter_mut());
    check(map.into_par_iter());
}

use std::collections::VecDeque;
fn main(){
    let deque: VecDeque<_> = (0..1000).collect();
    check(deque.par_iter());
    check(deque.into_par_iter());
}

К методам строк добавляется приставка par_

fn main(){
    let s = String::from("hello")
    check(s.par_chars());
    check(s.par_lines());
    check(s.par_split('\n'));
    check(s.par_split_terminator('\n'));
    check(s.par_split_whitespace());

   let par_even: String = s.par_chars().filter(|&c| (c as u32) & 1 == 0).collect();
}
  • new
  • install
  • current_num_threads
  • current_thread_index
  • current_thread_has_pending_tasks
  • join
  • scope
  • spawn

use rayon::ThreadPool;
fn main(){
// Создается пул потоков с 22 потоками
    let pool:ThreadPool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
//Чтобы вместо этого создать глобальный пул потоков, используйте build_global():
   // rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
}

install - Выполняется op в threadpool. Любые попытки использовать join, scope или параллельные итераторы будут работать в пределах этого ThreadPool.


use rayon::ThreadPool;
fn main(){
    let pool:ThreadPool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
    let n = pool.install(|| fib(20));
    println!("{}", n);
    fn fib(n: usize) -> usize {
        if n == 0 || n == 1 {
            return n;
        }
        let (a, b) = rayon::join(|| fib(n - 1), || fib(n - 2)); // runs inside of `pool`
        return a + b;
    }
}

current_num_threads - Возвращает (текущее) количество потоков в пуле потоков.


use rayon::ThreadPool;
fn main(){
    let pool:ThreadPool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
    println!("current_num_threads = {}", pool.current_num_threads());// 22
}

scope - Создает область действия, которая выполняется в этом пуле потоков. Эквивалентно pool.install(|| my_func(...))


use rayon::ThreadPool;
fn main(){
    let pool:ThreadPool = rayon::ThreadPoolBuilder::new().num_threads(22).build().unwrap();
    let mut v:Vec = vec![];
    pool.scope(|s| {
        let mut v2 = &mut v;
        s.spawn(move |s| { // task s.1
            v2.push(1);
           // println!("Name thread = {} ,ID = {:?}",thread::current().name().unwrap_or("unknown name"),thread::current().id());
        });
    });
    println!("{:?}",v);
}

Задача про философов у которых вторая вилка находится в блокировке пока он ест, после ее освобождения начинает есть другой философ


use std::thread;
use std::time::Duration;
use std::sync::{Mutex,Arc};
struct Philosopher {
    name:String,
    left:usize,
    right:usize
}
impl  Philosopher{
    fn new(name:&str,left:usize,right:usize)->Self{
        Philosopher{name:name.to_string(),left:left,right:right}
    }
    fn eat(&self,table:&Table_){
        let left_=table.forks[self.left].lock().unwrap();
        thread::sleep(Duration::from_millis(150));
        let right_=table.forks[self.right].lock().unwrap();
        println!("{} начала есть.", self.name);
        thread::sleep(Duration::from_millis(1000));
        println!("{} закончила есть.", self.name);
    }
}
struct Table_{
    forks:Vec>
}

fn main(){
    let table:Arc = Arc::new(Table_{forks:vec![Mutex::new(()),Mutex::new(()),Mutex::new(()),Mutex::new(()),Mutex::new(()) ]});
    let philosophers = vec![
        Philosopher::new("Джудит Батлер", 0, 1),
        Philosopher::new("Рая Дунаевская", 1, 2),
        Philosopher::new("Зарубина Наталья", 2, 3),
        Philosopher::new("Эмма Гольдман", 3, 4),
        Philosopher::new("Анна Шмидт", 0, 4),
    ];
    let handlers:Vec<_> = philosophers.into_iter().map(|p|{
        let table=table.clone();
        thread::spawn(move||{
            p.eat(&table);
        })
    }).collect();
    for h in handlers{
        h.join().unwrap();
    }
}

Пример распределения поиска предложения в файлах по потокам


use std::fs::File;
use std::io::Read;
use std::path::Path;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::thread;
use std::sync::mpsc::{self,Receiver,Sender};
use std::time::{Duration, Instant};
 
const DIR:&str = "Text";
fn main(){
    let mut paths:Vec = Vec::::new();
 
    if let Ok(entries) = std::fs::read_dir(DIR) {     
        for entry in entries {
            if let Ok(entry) = entry {
                 paths.push(entry.path());
            }
        }
    }
    let size = paths.len();

    let now = Instant::now();
    let (tx, rx): (Sender>, Receiver>) = mpsc::channel();

    let input:String = "Месопотамии".to_lowercase();
    let search = Arc::new(RwLock::new(input));

    let rwlock = Arc::new(RwLock::new(paths));

    for i in 0..size {
       let (c_rwlock, tx,search_) = (Arc::clone(&rwlock), tx.clone(),Arc::clone(&search));
        
       thread::spawn(move || {
             let mut result:Option<(String,u64)> = None;

            if let Ok(path_) = c_rwlock.try_read() {
                 
                let path:&Path = Path::new(path_[i].as_path());

                let mut file:File = File::open(path).unwrap();
                let len:u64 = file.metadata().unwrap().len();

                let mut contents:String  = String::with_capacity(len as usize);
                file.read_to_string(&mut contents).unwrap();

                let search:&str =  &search_.try_read().unwrap();

                if contents.as_str().to_lowercase().contains(search) == true {   
                 result=Some((String::from(path.to_str().unwrap()) ,contents.find(search).unwrap_or(0) as u64));
                } 
            }; 
            tx.send(result);// отправка результата
        });
    }
    // Получение результата
    for _ in 0..size {
        if let Some(r) =  rx.recv().ok(){
         if r.is_some(){
          println!("{:?}",r.unwrap()); 
         }
       }           
    }
   println!("{:?}", now.elapsed());// Duration { secs: 0, nanos: 22426377 }
  }

Посчитать числа с помощью потоков


use std::thread;
fn main() {
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";
    let mut children = vec![];
    let chunked_data = data.split_whitespace();
    for (i, data_segment) in chunked_data.enumerate() {
        println!("data segment {} is \"{}\"", i, data_segment);
        children.push(thread::spawn(move || -> u32 {
             let result = data_segment
                 .chars()
                 .map(|c| c.to_digit(10).expect("should be a digit"))
                 .sum();
            println!("processed segment {}, result={}", i, result);
            result
        }));
    }
    let mut intermediate_sums = vec![];
    for child in children {
        // collect each child thread's return-value
        let intermediate_sum = child.join().unwrap();
        intermediate_sums.push(intermediate_sum);
    }
    let final_result = intermediate_sums.iter().sum::();
    println!("Final sum result: {}", final_result);
    assert_eq!(final_result,1342);
}

Посчитать числа с помощью pool'а потоков threadpool::ThreadPool

threadpool


use threadpool::ThreadPool;
use std::sync::mpsc::channel;
fn main() {
    let pool = ThreadPool::new(8);
    let (tx, rx) = channel();

    let data = "86967897737416471853297327050364959
    11861322575564723963297542624962850
    70856234701860851907960690014725639
    38397966707106094172783238747669219
    52380795257888236525459303330302837
    58495327135744041048897885734297812
    69920216438980873548808413720956532
    16278424637452589860345374828574668";
   
    let chunked_data = data.split_whitespace();
    for (i, data_segment) in chunked_data.enumerate() {
        let tx = tx.clone();
        pool.execute(move|| { 
            let result:usize = data_segment
            .chars()
            .map(|c| c.to_digit(10).expect("should be a digit") as usize)
            .sum();
            tx.send(result).expect("channel will be there waiting for the pool");
        });
    }
    assert_eq!(rx.iter().take(8).fold(0, |a, b| a + b), 1342);
}

Посчитать числа с помошью pool'а потоков rayon::ThreadPoolBuilder


use std::sync::mpsc::channel;
fn main(){
    let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
    let (tx, rx) = channel();

    let data = "86967897737416471853297327050364959
    11861322575564723963297542624962850
    70856234701860851907960690014725639
    38397966707106094172783238747669219
    52380795257888236525459303330302837
    58495327135744041048897885734297812
    69920216438980873548808413720956532
    16278424637452589860345374828574668";
   
    let chunked_data = data.split_whitespace();
    for (i, data_segment) in chunked_data.enumerate() {
        let tx = tx.clone();
        pool.spawn(move|| { 
            let result:usize = data_segment
            .chars()
            .map(|c| c.to_digit(10).expect("should be a digit") as usize)
            .sum();
            tx.send(result).expect("channel will be there waiting for the pool");
        });
    }
    assert_eq!(rx.iter().take(8).fold(0, |a, b| a + b), 1342);
}

Посчитать числа с помошью channel

use std::thread;
use std::sync::mpsc;

fn channel_data(){
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";
    let chunked_data = data.split_whitespace();
    let mut lenght:i32=0;// количество ответов Sender
    let (tx, rx): (Sender<u32>, Receiver<u32>) = mpsc::channel::<u32>();
    for (i, data_segment) in chunked_data.enumerate() {
        let  tx_ = mpsc::Sender::clone(&tx);
        lenght+=1;
       thread::spawn(move || {
           println!("data segment {} is \"{}\"", i, data_segment);
            let result = data_segment
                .chars()
                .map(|c| c.to_digit(10).expect("should be a digit"))
                .sum();
            println!("processed segment {}, result={}", i, result);
           tx_.send(result).unwrap();
        });
    }
    let mut intermediate_sums = vec![];
    for child in (0..lenght) {
        intermediate_sums.push(rx.recv().unwrap());
    }
    let final_result = intermediate_sums.iter().sum::<u32>();
    println!("Final sum result: {}", final_result);
}

Посчитать числа с помошью crossbeam_channel

#[macro_use]
extern crate crossbeam_channel;// лучьше производительность чем у std::sync::mpsc::channel

fn crossbeam_channel_data(){
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";

    let chunked_data = data.split_whitespace();
    let mut lenght:i32=0;
    let (tx, rx): (crossbeam_channel::Sender<u32>, crossbeam_channel::Receiver<u32>) = crossbeam_channel::unbounded();
    for (i, data_segment) in chunked_data.enumerate() {
        let  tx_ =  tx.clone();
        lenght+=1;

        thread::spawn(move || {
            println!("data segment {} is \"{}\"", i, data_segment);
            let result = data_segment
                .chars()
                .map(|c| c.to_digit(10).expect("should be a digit"))
                .sum();
            println!("processed segment {}, result={}", i, result);
            tx_.send(result);
        });
    }
    
    let mut intermediate_sums = vec![];
    for child in (0..lenght) {
        intermediate_sums.push(rx.recv().unwrap());
    }

    let final_result = intermediate_sums.iter().sum::<u32>();
    println!("Final sum result: {}", final_result);
}

Посчитать числа с помошью Mutex

use std::sync::{Arc, Mutex};
fn mutex_data(){
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";
    let chunked_data = data.split_whitespace();
    let mut res:u32=0;// общая разделяемая память
    let data = Arc::new(Mutex::new(res));
    let mut sync_vec:Vec<thread::JoinHandle<_>> = vec![];
    for (i, data_segment) in chunked_data.enumerate() {
        let  data  = Arc::clone(&data) ;
        sync_vec.push(  thread::spawn(move || {
            println!("data segment {} is \"{}\"", i, data_segment);
            let result:u32 = data_segment
                .chars()
                .map(|c| c.to_digit(10).expect("should be a digit"))
                .sum();
            println!("processed segment {}, result={}", i, result);
            let mut data = data.lock();
            match data {
                Ok( mut _data) => { *_data += result;  },
                Err(_e) => {}
            }
        }));
    }
     // для синхронизации потоков, проходимся по всем потокам
    for i in sync_vec{
        i.join();
    }
    // берем результат с разделяемой памяти    
    println!("Result: {}", *data.lock().unwrap());
    if let Ok(res) = Arc::try_unwrap(data){
        match res.into_inner() {
            Ok(_data) => { println!("Final sum result: {}", _data); },
            Err(e) => {  println!("Final sum result: {}", e ); }
        }
    }
}

Посчитать числа с помощью atomic

use std::sync::atomic::{AtomicUsize, Ordering};
fn atomic_data(){
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";
    let chunked_data = data.split_whitespace();
    let data = Arc::new(AtomicUsize::new(0));
    let mut sync_vec:Vec<thread::JoinHandle<_>> = vec![];
    for (i, data_segment) in chunked_data.enumerate() {
        let  data  = Arc::clone(&data) ;
        sync_vec.push(  thread::spawn(move || {
            println!("data segment {} is \"{}\"", i, data_segment);
            let result:u32 = data_segment
                .chars()
                .map(|c| c.to_digit(10).expect("should be a digit"))
                .sum();
            println!("processed segment {}, result={}", i, result);
            data.fetch_add(result as usize, Ordering::SeqCst);
        }));
    }
    // для синхронизации потоков, проходимся по всем потокам
    for i in sync_vec{
        i.join();
    }
    println!("Result: {:?}", *data);
    if let Ok(res) = Arc::try_unwrap(data){
        println!("Result: {:?}", res);
    }
}

Посчитать числа с помощью atomic

То же самое только завершение всех потоков гарантирует crossbeam т.е. не надо собирать в вектор завершающие потоки для их прохода ожидая завершения всех потоков

fn atomic_data2(){
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";
    let chunked_data = data.split_whitespace();
    let data = Arc::new(AtomicUsize::new(0));
    
    for (i, data_segment) in chunked_data.enumerate() {
        let  data  = Arc::clone(&data) ;
        crossbeam::scope(|scope_| {
            scope_.spawn(move || {
                println!("data segment {} is \"{}\"", i, data_segment);
                let result: u32 = data_segment
                    .chars()
                    .map(|c| c.to_digit(10).expect("should be a digit"))
                    .sum();
                println!("processed segment {}, result={}", i, result);
                data.fetch_add(result as usize, Ordering::SeqCst);
            })
        });
    }
    println!("Result: {:?}", *data);
    if let Ok(res) = Arc::try_unwrap(data){
        println!("Result: {:?}", res);
    }
}

Посчитать числа с помошью crossbeam

extern crate crossbeam;// [dependencies] crossbeam = "0.3.2"
fn crossbeam_data(){
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";
    let chunked_data = data.split_whitespace();
    let mut data:u32 = 0;
    for (i, data_segment) in chunked_data.enumerate() {
        crossbeam::scope(|scope_| {
            let mut data_mut:&mut u32 = &mut data;
                scope_.spawn(move || {
                    let result:u32 = data_segment
                        .chars()
                        .map(|c| c.to_digit(10).expect("should be a digit"))
                        .sum();

                    *data_mut += result;
                })
        });
    }
    println!("Result: {:?}", data);
}

Посчитать числа с помощью Parallelism

rayon/ParallelIterator/for_each


use rayon::prelude::*; 
use std::sync::mpsc::channel;
fn main(){
    let (sender, receiver) = channel();
    let data = "86967897737416471853297327050364959
    11861322575564723963297542624962850
    70856234701860851907960690014725639
    38397966707106094172783238747669219
    52380795257888236525459303330302837
    58495327135744041048897885734297812
    69920216438980873548808413720956532
    16278424637452589860345374828574668";
    
    let chunked_data = data.par_split_whitespace();
     
    chunked_data.for_each_with(sender,|s,row|{
        let result:usize = row.to_owned()
            .chars()
            .map(|c| c.to_digit(10).expect("should be a digit") as usize)
            .sum();
        s.send(result).unwrap()
    }); 
    let mut res: Vec = receiver.iter().collect();
     
    let RESULT:usize = res.into_iter().sum();
    assert_eq!(RESULT,1342_usize);
}

Посчитать числа с помошью асинхронного кода


async fn calculate(data_segment:&str) -> usize{
    let result:usize = data_segment
    .chars()
    .map(|c| c.to_digit(10).expect("should be a digit") as usize)
    .sum();
    result
}

#[tokio::main]
async fn main(){
    let mut handlers:Vec> = vec![];
    let mut sum = 0usize;
    let data = "86967897737416471853297327050364959
    11861322575564723963297542624962850
    70856234701860851907960690014725639
    38397966707106094172783238747669219
    52380795257888236525459303330302837
    58495327135744041048897885734297812
    69920216438980873548808413720956532
    16278424637452589860345374828574668";
   
    let chunked_data = data.split_whitespace();
    for (i, data_segment) in chunked_data.enumerate() {
        handlers.push(tokio::spawn(async move {
            calculate(data_segment).await
        }));
    }

    for h in handlers.into_iter(){
        sum+=h.await.unwrap();
    }
    assert_eq!(sum,1342);
}

Посчитать числа с помощью Actor

ResultMessage { id: 0, sum: 187 }
ResultMessage { id: 3, sum: 177 }
ResultMessage { id: 2, sum: 154 }
ResultMessage { id: 1, sum: 157 }
ResultMessage { id: 4, sum: 153 }
ResultMessage { id: 5, sum: 172 }
ResultMessage { id: 7, sum: 177 }
ResultMessage { id: 6, sum: 165 }
MyActorResult is stopped Millis:1001
MyActorResult:1342
=====================================
Ok(1342)

use actix::prelude::*;
use std::time::{Instant};
struct MyActorTask {
    id:usize,
    recipient: Recipient
}
impl Actor for MyActorTask {
    type Context = SyncContext;
    fn started(&mut self, ctx: &mut SyncContext) {
        //println!("MyActorTask started");
    }
}
#[derive(Message)]
#[rtype(result = "()")]
struct TaskMessage{id:usize,payload:String}

impl Handler for MyActorTask {
    type Result = ();
    fn handle(&mut self, msg: TaskMessage, _ctx: &mut SyncContext) -> Self::Result {
        //let thread:std::thread::Thread = std::thread::current(); println!("{:?}",thread.name());
        let result:usize = msg.payload
        .chars()
        .map(|c| c.to_digit(10).expect("should be a digit") as usize)
        .sum();
        std::thread::sleep(std::time::Duration::from_secs(1));
        let _ = self.recipient.do_send(ResultMessage{id:msg.id,sum:result});
    }
}
//------------------------------------------------------
struct MyActorResult {
    sum: usize,
    finish_len: usize,
    state_len: usize,
    start:Instant,
    tx:Option>
}
impl Actor for MyActorResult {
    type Context = Context;
    fn stopped(&mut self, ctx: &mut Context) {
        println!("MyActorResult:{}",self.sum);
    }
}
#[derive(Message,Debug)]
#[rtype(result = "()")]
struct ResultMessage{
    id:usize,
    sum:usize
}
#[derive(Message,Debug)]
#[rtype(result = "usize")]
struct TotalResultMessage;
impl Handler for MyActorResult {
    type Result = ();
    fn handle(&mut self, msg: ResultMessage, _ctx: &mut Context) -> Self::Result {
        println!("{:?}",&msg);
        self.sum+=msg.sum;
        self.state_len+=1;
        if self.state_len==self.finish_len{
           println!("MyActorResult is stopped Millis:{}",self.start.elapsed().as_millis());// Millis:1000
           let sender = std::mem::take(&mut self.tx).unwrap();
           sender.send(self.sum);
           System::current().stop();  
        }  
    }
}
// Executor System::block_on
fn main() {
    let (tx, rx) = tokio::sync::oneshot::channel();
    let mut rx = Some(rx);
    std::thread::spawn(||{
        let system = System::new();
        
        let execution = async {
            let data = "86967897737416471853297327050364959
            11861322575564723963297542624962850
            70856234701860851907960690014725639
            38397966707106094172783238747669219
            52380795257888236525459303330302837
            58495327135744041048897885734297812
            69920216438980873548808413720956532
            16278424637452589860345374828574668";

            let addr_res = MyActorResult{
                sum:0,
                finish_len: 8,
                state_len:0,
                start:Instant::now(),
                tx:Some(tx)
            }.start();

            let chunked_data = data.split_whitespace();
            let count = 8;

            let recipient = addr_res.clone().recipient();
            let addr_task = SyncArbiter::start(48, move|| MyActorTask {id:0,recipient: recipient.clone()});
            for (i, data_segment) in chunked_data.enumerate() {
                addr_task.do_send(TaskMessage{id:i,payload:data_segment.to_owned()});
            }
        };
        system.block_on(execution);
        
        let _ = system.run(); 
    }).join();

    println!("=====================================");
    let system = System::new();
    system.block_on(async move{
        let receiver = std::mem::take(&mut rx).unwrap();
        println!("{:?}",receiver.await);
        System::current().stop();  
    });
    let _ = system.run(); 
}

Типы сеансов - это способ рассказать компилятору о протоколе, который вы хотите использовать для связи между потоками, а не протоколе, как в HTTP или FTP, а о шаблоне потока информации между потоками. Это полезно, так как компилятор теперь остановит вас от случайного нарушения вашего протокола и возникновения взаимоблокировок или живого потока между потоками - некоторые из наиболее печально известных проблем с отладкой и главный источник Heisenbugs. Типы сеансов работают аналогично описанным выше каналам, но могут быть более запугивающими для начала использования. Вот простая двухпоточная связь

Когда стоит использовать Session Types

Строгая типизация коммуникаций: Если вы хотите, чтобы процесс обмена сообщениями между потоками или процессами был четко структурирован, с гарантией, что сообщения передаются в правильном порядке и в соответствии с ожиданиями каждого процесса, Session Types обеспечивают типовую безопасность. Например, вы можете гарантировать, что отправляется сначала одно сообщение, а потом другое, и каждое сообщение имеет правильный тип.

Когда Session Types не нужны и можно использовать другие подходы

Высокая производительность без явных ограничений на порядок сообщений: Если ваша задача не требует строгой типизации и соблюдения порядка операций, и при этом важна производительность, то подходы, такие как каналы и tokio для асинхронных операций, могут быть предпочтительнее. Использование Session Types может накладывать дополнительные ограничения, которые не всегда оправданы в таких ситуациях.

Существует гораздо больше типов связи, чем просто Send and Recv - например, Offer дает другой стороне канала возможность выбирать между двумя возможными ветвями протокола, а Rec и Var работают вместе, чтобы разрешить циклы и рекурсию в протоколе

Session Types (Типы сеансов) — это концепция в теории типов и в парадигме программирования, которая используется для описания коммуникационных паттернов между различными частями системы, таких как процессы или потоки. В контексте межпоточной связи это позволяет формализовать и проверять, какие типы сообщений могут быть отправлены или получены между процессами (или потоками), обеспечивая безопасную и структурированную коммуникацию.

Основные идеи Session Types: Типизация каналов связи: В модели типов сеансов каналы (или сокеты, или любые другие каналы для передачи данных) типизируются с учетом последовательности операций ввода-вывода. Это означает, что передаваемые данные могут быть проверены на этапе компиляции, чтобы обеспечить корректную работу системы, избегая несоответствий и ошибок при коммуникации.

Управление состоянием общения: Каждый канал связи может иметь тип, который определяет, что можно отправить или получить через этот канал и в каком порядке. Это позволяет моделировать коммуникационные паттерны, такие как запрос-ответ, однонаправленные или двунаправленные потоки сообщений.

Безопасность типов: Использование типов сеансов предоставляет дополнительные гарантии безопасности при взаимодействии между процессами или потоками. Например, оно предотвращает ситуации, когда один процесс пытается отправить данные, которые не соответствуют ожидаемому формату или когда один поток пытается получить данные, которые не были отправлены.

Применение к многозадачности и параллелизму: В многозадачных или многопоточных приложениях использование типов сеансов помогает моделировать и контролировать обмен сообщениями между потоками, что снижает вероятность ошибок и гонок данных.

Вы должны заметить, что основной метод очень похож на основной метод межпоточной связи, определенный выше, если сервер был перемещен в свою собственную функцию.


extern crate session_types;
use session_types::*;
type Client = Send;
type Server = Recv;

fn run_client(channel: Chan<(), Client>) {
    let channel = channel.send(42);
    println!("The client just sent the number 42!");
    channel.close();
}
fn run_server(channel: Chan<(), Server>) {
    let (channel, data) = channel.recv();
    println!("The server received some data: {}", data);
    channel.close();
}
fn main() {
    let (server_channel, client_channel) = session_channel(); 
    let server_thread = std::thread::spawn(move || {
        run_server(server_channel);
    });

    run_client(client_channel);
    server_thread.join().unwrap();
}

Зачем решать все проблемы с определением типов клиентов и серверов? И почему мы переопределяем канал на клиенте и сервере? Эти вопросы имеют один и тот же ответ: компилятор остановит нас от нарушения протокола! Если клиент попытался получить данные вместо их отправки (что приведет к тупиковой ситуации в обычном коде), программа не будет компилироваться, поскольку объект канала клиента не имеет на нем метода recv . Кроме того, если мы попытались определить протокол таким образом, который может привести к тупиковой ситуации (например, если и клиент, и сервер попытались получить значение), тогда компиляция завершится неудачно, когда мы создадим каналы. Это связано с тем, что Send и Recv являются « Recv типами», то есть если сервер делает это, клиент должен сделать другой - если оба попытаются Recv , у вас будут проблемы. Eps - это свой собственный двойной тип, так как для клиента и сервера вполне нормально соглашаться закрыть канал.

Конечно, когда мы выполняем некоторую операцию на канале, мы переходим к новому состоянию в протоколе, и доступные нам функции могут измениться - поэтому нам нужно переопределить привязку канала. К счастью, session_types позаботится об этом для нас и всегда возвращает новый канал (кроме close, и в этом случае нет нового канала). Это также означает, что все методы на канале также владеют каналом, поэтому, если вы забудете переопределить канал, компилятор также даст вам об этом ошибку. Если вы отпустите канал, не закрывая его, это также ошибка времени выполнения (к сожалению, это невозможно проверить во время компиляции).

Существует гораздо больше типов связи, чем просто Send and Recv - например, Offer дает другой стороне канала возможность выбирать между двумя возможными ветвями протокола, а Rec и Var работают вместе, чтобы разрешить циклы и рекурсию в протоколе , В репозитории session_types GitHub доступно еще много примеров типов сеансов и других типов.

Простой пример с использованием Rumpsteak может выглядеть как создание типизированной коммуникации между сервером и клиентом. Это может быть полезно для сервис-ориентированных архитектур, где нужно точно знать, какие сообщения и в каком порядке передаются между сервисами.


// Пример из библиотеки Rumpsteak
use rumpsteak::{Session, send, receive};

#[session]
fn session_example() -> send!(String, receive!(i32)) {
    send!("Hello").and_then(|_| receive!(i32))
}
fn main() {
    let result = session_example(); // строгая типизация сессии
    // обрабатываем результат
}

Почему CRDT проще чем блокировки

Избегаем блокировок — разрешаем конфликты через структуры данных.

CRDT (Conflict-Free Replicated Data Types) — это структура данных, которая автоматически разрешает конфликты при репликации без координации между узлами.

Это подход без блокировок (lock-free), который масштабируется на распределённые системы и выдерживает сетевые сбои!

CRDT — это про данные, Mutex — про доступ.

Но CRDT работает только если данные можно определить как моноид. CRDT существует только там, где можно определить merge, т.е. на поведение реализующего типа наложены очень жёсткие ограничения:

  • Ассоциативность merge( merge(a, b), c ) == merge( a, merge(b, c) )
  • Коммутативность merge(a, b) == merge(b, a)
  • Идемпотентность merge(a, a) == a
  • Нейтральный элемент merge(x, empty) == x

CRDT силён, тем что не требует конфликт-резолвера (например, timestamp last-write-wins). То есть не требует внешней функции: «если конфликт — выбери левый или правый». CRDT = структура, в которой конфликтов НЕ бывает. Конфликты невозможны по определению, потому что операция merge всегда детерминирована, коммутативна, ассоциативна и идемпотентна.

Что может быть CRDT:

  • структуры данных могут быть CRDT: Множества (HashSet, BTreeSet), Карты, словари (Dictionary / Map / HashMap)
  • Счётчики: G-Counter (только увеличение), PN-Counter (увеличение/уменьшение через два счётчика)
  • Регистр (одно значение): LWW-Register (last-write-wins → timestamp max), MV-Register (множество всех одновременных значений)
  • Графы: Работает, если вершины и рёбра — CRDT множества.
  • Логи: если каждая запись формирует множество

Практические примеры где используются CRDT:

  1. Offline-First совместное редактирование (как Google Docs), заметки (Apple Notes, Simplenote), задачники (Todoist)
  2. Multiplayer игры, видимость объектов, позиции игроков, распределенный инвентарь магазина
  3. Онлайн-чаты и комментарии
  4. Распределенные очереди
  5. Геораспределенные системы
  6. IoT системы конфигурации устройств, хранилище сенсорных данных
  7. Социальные сети: счётчики, лайки, реактивные фиды
  8. Riak — самая известная CRDT-база
  9. Redis использует CRDT для: множества пользователей онлайн, Geo-распределённых ключей

CRDT не подходит для:

  • Транзакции, требующие строгой согласованности (банковские переводы)
  • Системы, где важен абсолютный порядок операций, т.е. если важна последовательность прихода событий
  • Если синхронизация должна быть быстрой, в CRDT merge линейный от размера структуры
  • CRDT ломаются, если есть side-effects от событий (логирование, отправка email, запись в стороннюю систему, списание денег в банке)
  • CRDT плохо подходят для больших бинарных объектов (видео, изображения). Потому merge будет тяжелым, а структуры grow-only → огромные.
    • CRDT = структура, которая хранит операции, а не только финальное состояние.
    • CRDT никогда не хранит «только текущее состояние», оно хранит кучу метаданных для merge, изменил 10 000 пикселей = 10 000 операций.
    • Картинка превращается в гигантское раздутое дерево CRDT-операций, огромный размер, гигантская память, дикая нагрузка.
    • CRDT оправдан, когда данные редактируют многие одновременно, для картинок это случается крайне редко.
// С блокировками:
def update_with_lock(shared_data):
    lock.acquire()          # ❌ Блокируем всех
    try:
        shared_data.value += 1
    finally:
        lock.release()

// С CRDT: 
def update_crdt(local_counter, increment):
    local_counter.value += increment  # ✅ Каждый работает локально
    # Потом асинхронно синхронизируем

CRDT vs Mutex

CRDT работают независимо от порядка событий

(а блокировки требуют строгого порядка)

Блокировки гарантируют:

  • один поток → входит
  • остальные → ждут
  • все операции → в строгом порядке

То есть порядок критичен.

Если порядок меняется — меняется результат.

Где CRDT бесполезны (и нужен Mutex / RwLock)

  • Последовательные операции balance -= 10. Эта операция зависит от предыдущего значения. Если два потока одновременно делают -10, мы можем уйти в минус.
  • Ограниченные ресурсы (состояния некоммутативные). Например:зарезервировать место в самолёте, два потока могут попытаться взять одно и то же место. Тут нет коммутативного merge.
  • Мутация единого объекта

Типы для CRDT

Обычная String не может быть CRDT из-за важности порядка символов и операции конкатенации и одновременного изменения.

Но можно использовать String если использовать спец. CRDT для последовательностей:

  • RGA (Replicated Growable Array)
  • LSEQ
  • WOOT
  • Yjs Text CRDT
  • Automerge Text

Так же и с Vec, если использовать спец. CRDT-последовательность:

  • Vec<CRDT_Item>
  • CRDT sequence (RGA, LSEQ, Yjs Array)

Массивы фиксированного размера [T; N]. Если T — CRDT → массив CRDT.

Может ли struct быть CRDT? Да — если каждая её часть (поле) — CRDT.