Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Тема
Описание
Доп.

Модель синхронизации потоков - Атомарные операции

Атомарные операции представлены модулем стандартной библиотеки Rust std::sync::atomic (и дополнительно crate atomic).

📌 Новый раздел

  • 👾 Best practice
  • Варианты применения

Для синхронизации доступа к ресурсу в многопоточной среде, который должен быть инициализирован только один раз

std::sync::Once используется для выполнения какой-либо операции только один раз в течение жизни программы. Это особенно полезно для инициализации ресурсов, таких как глобальные или статические переменные.

use std::sync::{Once, ONCE_INIT};
use std::sync::atomic::{AtomicU64, Ordering};

static INIT: Once = Once::new();
static mut X: Option<AtomicU64> = None;

fn get_x() -> u64 {
    // Инициализация переменной X
    INIT.call_once(|| {
        unsafe {
            X = Some(AtomicU64::new(calculate_x()));
        }
    });

    // Доступ к переменной X
    unsafe {
        X.as_ref().unwrap().load(Ordering::Relaxed)
    }
}

fn calculate_x() -> u64 {
    // Пример дорогостоящих вычислений
    42
}

OnceLock: Предоставляет удобный способ для хранения значения, инициализированного только один раз, и позволяет безопасно получать это значение в многопоточной среде.

use std::sync::OnceLock;
static X: OnceLock<AtomicU64> = OnceLock::new();

fn get_x() -> u64 {
    // Инициализация переменной X
    X.get_or_init(|| AtomicU64::new(calculate_x()));

    // Доступ к переменной X
    X.get().unwrap().load(Ordering::Relaxed)
}

fn calculate_x() -> u64 {
    // Пример дорогостоящих вычислений
    42
}

Модуль std::sync::atomic содержит атомарные типы для без блокировочного конкурентного программирования.

Атомные типы AtomicBool, AtomicIsize и AtomicUsize представляют операции, которые при правильном использовании синхронизируют обновления между потоками.

Атомарные переменные безопасно разделять между потоками (они реализуют Sync), но сами по себе они не предоставляют механизма для совместного использования и следуют потоковой модели Rust.

Атомарная операция — операция, которая либо выполняется целиком, либо не выполняется вовсе; операция, которая не может быть частично выполнена и частично не выполнена. Несколько потоков может одновременно читать и записывать атомарное значение, не опасаясь гонок данных.

Несмотря на то, что Mutex обеспечивает превосходную безопасность потоков для совместно используемых изменяемых данных, существуют сценарии, в которых требуется одновременный доступ без блокировок.

Совместное использование в потоках благодаря Arc

Atomic type - это аналог Cell для Copy типов (i32, bool, etc) в многопоточной среде.
Как и Cell, атомарные типы предотвращают неопределённое поведение, позволяя работать с данными целиком, не позволяя напрямую заимствовать их содержимое.

Обеспечивают безопасный доступ к данным в многопоточном контексте через атомарные операции, которые являются атомарными (неделимыми). Это означает, что операции над такими типами выполняются целиком без прерывания и видны всем потокам как единое целое. Например, операции вроде fetch_add или compare_and_swap на атомарных типах обеспечивают согласованность данных без необходимости использования дополнительных механизмов синхронизации, таких как мьютексы.

Атомарные типы часто не содержат непосредственно информацию, которую нужно передавать между потоками, потому что они ограничены по размеру. Обычно атомарные типы используются как инструменты для координации доступа к более крупным данным или структурам данных.

Однако атомарные операции позволяют различным потокам безопасно читать и изменять одну и ту же переменную. Поскольку такая операция неделима, она происходит либо полностью до, либо полностью после другой операции, избегая неопределенного поведения.

Атомарные операции являются основным строительным блоком всего, что связано с несколькими потоками. Все остальные примитивы параллелизма, такие как Mutex и Condvar, реализуются с использованием атомарных операций. Атомарные операции допускают модификацию через общую ссылку (например, &AtomicU8)

Логика проверки того, что определенное переупорядочение или другая оптимизация компилятора не повлияет на поведение вашей программы, не учитывает другие потоки. Мы должны явно сообщать компилятору и процессору, что они могут и не могут делать с нашими атомарными операциями, поскольку их обычная логика игнорирует взаимодействия между потоками и может допускать оптимизации, которые действительно изменяют результат вашей программы.

Порядок памяти (std::sync::atomic::Ordering) используется для контроля того, как операции с атомарными переменными синхронизируются между потоками. Понимание различных уровней порядка памяти важно для правильного управления конкурентными и многопоточными программами

Почему важен порядок операций с памятью: В многопоточных программах, компилятор и процессор могут менять порядок выполнения команд для оптимизации, что нарушает согласованность данных между потоками. Управление порядком операций необходимо для синхронизации.

SeqCst не так хорош в качестве значения по умолчанию, как вы думаете, и в большинстве случаев его следует избегать.

Relaxed подходит, когда вы хотите применить атомарные операции к отдельным переменным, но не заботитесь о синхронизации чтения и записи для нескольких переменных. Например, для увеличения простых счетчиков или сбора статистики.

Release/Acquire используется, когда вы хотите синхронизировать чтение и запись в несколько переменных в нескольких потоках. Сохранение значения в заданной переменной с помощью Release гарантирует, что все записи, которые произошли с этим моментом или до него, будут видны после загрузки той же переменной с помощью Acquire.

Release/Acquire полезны для построения всех видов примитивов синхронизации, включая блокировки, каналы и сигналы. Думайте об Release освобождении некоторых изменений (или блокировки) для других потоков, в то время как Acquire получает эти изменения (или блокировку).

Категории порядка операций:

Relaxed: Используется, когда не требуется синхронизация между потоками, например, для обновления отдельных переменных.

Release (для записи) /Acquire(для чтения): Используется для синхронизации потоков. Release гарантирует, что все записи до этого момента будут видимы после Acquire.

Sequential Consistency (SeqCst): Строгий, но редко применяемый порядок, из-за высокого влияния на производительность.

AcqRel (эквивалент «Приобретать для загрузки и выпуска для хранения»; полезно, когда оба участвуют в одной операции, такой как compare-and-swap)

Пример использования Release/Acquire: Для передачи данных между потоками используется сигнализация с помощью атомарных переменных. Например, поток может записать данные с Release, и другой поток сможет их считать, после того как загрузит сигнал с Acquire.

Пример синхронизации через блокировки: Простые примеры реализации блокировок с использованием атомарных переменных, где Acquire используется для получения блокировки, а Release — для её освобождения, гарантируя, что все изменения завершены.

Relaxed - Это означает, что два потока могут видеть, что операции с разными переменными выполняются в разном порядке. Например, если один поток сначала записывает в одну переменную, а затем очень быстро после этого во вторую переменную, другой поток может увидеть, что это происходит в противоположном порядке.

Каждый метод принимает значение Ordering, которое представляет собой силу барьера памяти для этой операции. Эти порядки такие же, как и атомарные порядки C++20 При доступе к / модификации атомного типа следует указывать порядок памяти, представляющий силу барьера памяти. Rust обеспечивает 5 примитивов порядка доступа к памяти (упорядочения памяти): нечто похожее на уровень изоляции транзакций в базе данных.

Пример сигнализации/Signaling example

Давайте рассмотрим пример Мар, где используется метод AtomicBoolto для оповещения о готовности некоторых данных: Основной поток ожидает DATA готовности и спит в течение 100 миллисекунд между каждой проверкой.

Важно то, что строка READY.store(true, Release); гарантирует, что все записи, которые произошли с этим моментом или до него, будут видны после Acquire на этой же переменной . Обратите внимание, как DATA написано до момента Release, и это только с использованием Relaxed порядка.

Когда основной поток наконец замечает, READY.load(Acquire) что true, мы выходим из while цикла и наконец считываем значение. Даже если DATA.load(Relaxed) использует Relaxed, он гарантированно увидит значение. Запись произошла до Release момента на READY переменной, и это load происходит после соответствующего Acquire на READY.


use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};

static DATA: AtomicU64 = AtomicU64::new(0);
static READY: AtomicBool = AtomicBool::new(false);

fn main() {
    std::thread::spawn(|| {
        DATA.store(123, Relaxed);
        READY.store(true, Release); // Everything from before this store ..
    });
    while !READY.load(Acquire) { // .. is visible after this loads `true`.
        std::thread::sleep(std::time::Duration::from_millis(100));
        println!("waiting...");
    }
    println!("{}", DATA.load(Relaxed));
}

Пример блокировки/Locking example

Следующий пример Мары для упорядочивания Release/Acquire — это очень элементарная блокировка, которая использует AtomicBool для защиты доступа к a String.

Вот слегка измененная версия этого примера: Две критические линии:

  1. if LOCKED.compare_exchange(false, true, Acquire, Relaxed).is_ok() {
  2. LOCKED.store(false, Release);

Первая строка атомарно считывается LOCKED с использованием Acquire порядка, и если значение равно false, она устанавливает значение с true использованием Relaxed порядка. Вторая строка LOCKED возвращает нас к false использованию Release. Как мы уже видели, Release означает, что все записи, которые произошли с этим моментом или до него, будут видны после Acquire этой же переменной. Объединение этих двух строк гарантирует, что unsafe запись в DATA и Release запись в READY будут завершены до того, как другой поток заметит LOCKED использование false порядка Acquire.


use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use std::sync::atomic::AtomicBool;

static mut DATA: String = String::new();
static LOCKED: AtomicBool = AtomicBool::new(false);

fn f() {
    if LOCKED.compare_exchange(false, true, Acquire, Relaxed).is_ok() {
        // Safety: We hold the exclusive lock, so nothing else is accessing DATA.
        unsafe { DATA.push('!') };
        LOCKED.store(false, Release);
    }
}

fn main() {
    std::thread::scope(|s| {
        for _ in 0..100 {
            s.spawn(f);
        }
    });
    println!("{}", unsafe { &DATA });
}

std::sync::atomic::Ordering::Relaxed (самый слабый)

Этот порядок обеспечивает только атомарность операций (т.е., операция выполняется как единое целое, без прерываний). Он не гарантирует никакой дополнительной синхронизации между потоками. Подходит для случаев, когда вам не нужно заботиться о том, как значения соотносятся между потоками, кроме обеспечения атомарности самой операции.

Пример: Подсчёт количества событий в многопоточном приложении, где важен только корректный счёт, но не порядок этих событий.


use std::sync::atomic::{AtomicU64, Ordering};
fn main(){
    let atomic_value = AtomicU64::new(0);
    atomic_value.store(1, Ordering::Relaxed);
    let value = atomic_value.load(Ordering::Relaxed);
}

std::sync::atomic::Ordering::Acquire (для чтения, только к операциям load)

Гарантирует, что все операции чтения и записи, которые следуют за этой операцией в коде, видят обновлённое состояние памяти. Используется для синхронизации чтения с другими потоками.

Применяется, когда вам нужно убедиться, что все предыдущие операции (в рамках одной логической последовательности) завершены до того, как будет выполнено чтение.

Пример: Чтение флага, который показывает, что поток завершил какую-то задачу.

std::sync::atomic::Release (для записи, только к операциям store)

Гарантирует, что все операции записи, которые были сделаны до этой записи в коде, завершены до того, как будет выполнена запись. Используется для синхронизации записи с другими потоками.

Применяется, когда вы хотите убедиться, что все предыдущие операции (в рамках одной логической последовательности) завершены перед записью нового значения.

Пример: Установка флага, указывающего, что поток завершил задачу.

std::sync::atomic::AcqRel (эквивалент «Acquire для загрузки и Release для хранения») (для чтения и записи)

Обеспечивает как порядок чтения, так и порядок записи, то есть, гарантирует, что все предыдущие операции завершены до чтения, и все последующие операции завершены после записи. Полезен в случаях, когда требуется полная синхронизация между операциями чтения и записи.

Пример: Операции, где вы хотите гарантировать, что предыдущие записи и чтения корректно синхронизированы.


use std::sync::atomic::{AtomicU64, Ordering};
fn main(){
    let atomic_value = AtomicU64::new(0);
    atomic_value.store(1, Ordering::Release);
    let value = atomic_value.load(Ordering::Acquire);
}

std::sync::atomic::SeqCst (самый сильный)

Гарантирует полный последовательный порядок для всех операций. Это означает, что все потоки видят операции в том же порядке, в каком они были выполнены. Это самый строгий порядок памяти. Применяется, когда требуется гарантировать, что все операции в программе видны в том же порядке, независимо от того, какие потоки их выполняли.

Пример: Сложные синхронизационные примеры, где необходима полная последовательность операций.


use std::sync::atomic::Ordering::SeqCst;
static A: AtomicBool = AtomicBool::new(false);
static B: AtomicBool = AtomicBool::new(false);
static mut S: String = String::new();
fn main() {
    let a = thread::spawn(|| {
        A.store(true, SeqCst);
        if !B.load(SeqCst) {
            unsafe { S.push('!') };
        }
    });
    let b = thread::spawn(|| {
        B.store(true, SeqCst);
        if !A.load(SeqCst) {
            unsafe { S.push('!') };
        }
    });
    a.join().unwrap();
    b.join().unwrap();
}

Переполнение типа

compare_exchange_weak

static

Самая продвинутая и гибкая атомарная операция — это операция сравнения и обмена compare_exchange.

Эта операция проверяет, равно ли атомарное значение заданному значению, и только если это так, она заменяет его новым значением, все атомарно как одна операция. Она вернет предыдущее значение и сообщит нам, заменила ли она его или нет.

use std::sync::atomic::AtomicU32;
fn allocate_new_id() -> u32 {
    static NEXT_ID: AtomicU32 = AtomicU32::new(0);
    NEXT_ID.fetch_add(1, Relaxed)
}

fn main() {
  println!("{}",allocate_new_id());// 0
  println!("{}",allocate_new_id());// 1
}

Единственная проблема здесь — поведение переноса при переполнении. 4 294 967 296-й вызов переполнит 32-битное целое число, так что следующий вызов снова вернет 0

Чтобы остановить увеличение NEXT_ID сверх определенного предела и предотвратить переполнение, мы можем использовать Compare_exchange для реализации атомарного сложения с верхней границей. Используя эту идею, давайте создадим версию allocate_new_id, которая всегда корректно обрабатывает переполнение, даже в практически невозможных ситуациях:

fn allocate_new_id() -> u32 {
    static NEXT_ID: AtomicU32 = AtomicU32::new(0);
    let mut id = NEXT_ID.load(Relaxed);
    loop {
        assert!(id < u32::MAX, "too many IDs!");
        match NEXT_ID.compare_exchange_weak(id, id + 1, Relaxed, Relaxed) {
            Ok(_) => return id,
            Err(v) => id = v,
        }
    }
}

Теперь мы проверяем и впадаем в панику перед изменением, гарантируя, что он никогда не увеличится больше U32::MAX, что делает переполнение невозможным.

Применение в static переменных


use std::sync::atomic::{AtomicUsize, Ordering};

static CALL_COUNT: AtomicUsize = AtomicUsize::new(0);

fn do_a_call() {
    CALL_COUNT.fetch_add(1, Ordering::SeqCst);
}
fn main() {
    do_a_call();
    do_a_call();
    do_a_call();

    println!("called {}", CALL_COUNT.load(Ordering::SeqCst));
}


#[macro_use]
extern crate lazy_static;
use std::sync::Mutex;

lazy_static! {
    static ref ARRAY: Mutex> = Mutex::new(vec![]);
}

fn do_a_call() {
    ARRAY.lock().unwrap().push(1);
}
fn main(){
    do_a_call();
    do_a_call();
    do_a_call();

    println!("called {}", ARRAY.lock().unwrap().len());
}

std::sync::atomic::fence

синхронизации между операциями на уровне атомарных операций

marabos.nl/atomics/memory-ordering.html#fences

fence используется для создания синхронизации между операциями на уровне атомарных операций. Это позволяет разработчикам контролировать порядок выполнения операций в многопоточной среде и обеспечивает дополнительные гарантии для предотвращения нежелательных оптимизаций компилятора и процессора.


use std::sync::atomic::{AtomicBool, fence, Ordering};
use std::thread;
use std::sync::Arc;
fn main() {
    let flag = Arc::new(AtomicBool::new(false));
    let flag_clone = Arc::clone(&flag);
    // Поток 1
    let thread1 = thread::spawn(move || {
        // Выполнение какой-то работы
        flag_clone.store(true, Ordering::Release);
        fence(Ordering::Release); // Обеспечиваем порядок записи
    });
    // Поток 2
    let thread2 = thread::spawn(move || {
        fence(Ordering::Acquire); // Обеспечиваем порядок чтения
        if flag.load(Ordering::Acquire) {
            println!("Flag was set!");
        }
    });
    thread1.join().unwrap();
    thread2.join().unwrap();
}

std::sync::atomic::fence

синхронизации между операциями на уровне атомарных операций

marabos.nl/atomics/memory-ordering.html#fences

В этом примере 10 потоков выполняют некоторые вычисления и сохраняют свои результаты в (неатомарной) общей переменной. Каждый поток устанавливает атомарное логическое значение, чтобы указать, что данные готовы к чтению основным потоком, используя обычное хранилище выпуска. Основной поток ждет полсекунды, проверяет все 10 логических значений, чтобы увидеть, какие потоки выполнены, и выводит готовые результаты.

Вместо использования 10 операций acquire-load для чтения булевых значений, основной поток использует ослабленные операции и один забор acquire. Он выполняет забор перед чтением данных, но только если есть данные для чтения.

Хотя в этом конкретном примере может быть совершенно излишним прилагать какие-либо усилия для такой оптимизации, этот шаблон для экономии накладных расходов на дополнительные операции получения данных может быть важен при построении высокоэффективных параллельных структур данных.


use std::sync::atomic::fence;
static mut DATA: [u64; 10] = [0; 10];

const ATOMIC_FALSE: AtomicBool = AtomicBool::new(false);
static READY: [AtomicBool; 10] = [ATOMIC_FALSE; 10];

fn main() {
    for i in 0..10 {
        thread::spawn(move || {
            let data = some_calculation(i);
            unsafe { DATA[i] = data };
            READY[i].store(true, Release);
        });
    }
    thread::sleep(Duration::from_millis(500));
    let ready: [bool; 10] = std::array::from_fn(|i| READY[i].load(Relaxed));
    if ready.contains(&true) {
        fence(Acquire);
        for i in 0..10 {
            if ready[i] {
                println!("data{i} = {}", unsafe { DATA[i] });
            }
        }
    }
}

Для чего используют Атомные типы

В этом примере мы используем счетчик AtomicUsize, который поддерживает атомарные операции без блокировки. Каждый поток вызывает fetch_add безопасное увеличение счетчика, и мы используем этот load метод для чтения конечного значения.


use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
fn main() {
    let counter = Arc::new(AtomicUsize::new(0));
    let handles: Vec<_> = (0..10)
        .map(|_| {
            let counter = Arc::clone(&counter);
            thread::spawn(move || {
                counter.fetch_add(1, Ordering::SeqCst);
            })
        })
        .collect();
    for handle in handles {
        handle.join().expect("Thread panicked");
    }
    println!("Counter: {}", counter.load(Ordering::SeqCst));
}

то же самое, но в scope:


fn main(){
    let counter = Arc::new(AtomicUsize::new(0));  
    thread::scope(|s| { 
        for n_thread in 1..=10 {
            let counter = Arc::clone(&counter);
            let _ = thread::Builder::new().name(format!("{n_thread}"))
                .spawn_scoped(s,move || {
                    counter.fetch_add(1, Ordering::SeqCst);
            }).unwrap();            
        }
    }); 
    println!("Counter: {}", counter.load(Ordering::SeqCst));
}

Одно из простых применений атомарных типов – прерывание потока


use std::sync::atomic::{AtomicBool, Ordering};
fn main(){
    let cancel_flag = Arc::new(AtomicBool::new(false));
    let worker_cancel_flag = cancel_flag.clone();
}

Ниже приведен код рабочего потока:


fn main(){
    let worker_handle = spawn(move || {
        for pixel in animation.pixels_mut() {
             render(pixel); // трассировка лучей – занимает несколько микросекунд
             if worker_cancel_flag.load(Ordering::SeqCst) {
                  return None;
             }
        }
        Some(animation)
   });
}

Если в главном потоке мы решим прервать рабочий поток, то сохраним значение true в AtomicBool, а затем дождемся завершения потока:

// Прервать рендеринг.
cancel_flag.store(true, Ordering::SeqCst);
// Отбросить результат, который, скорее всего, равен `None`.
worker_handle.join().unwrap();

Конечно, задачу можно решить и по-другому. Тип AtomicBool можно было бы заменить мьютексом Mutex<bool> или каналом. Основное отличие состоит в том, что у атомарных типов накладные расходы минимальны. Атомарные операции никогда не приводят к системному вызову. Загрузка и сохранение значений часто компилируются в одну машинную команду.

Спин-блокировка — это механизм синхронизации, который позволяет потокам ожидать освобождения ресурса, активно "крутясь" (то есть, постоянно проверяя состояние ресурса), вместо того чтобы переходить в спящий режим. Это полезно, когда время ожидания ресурса предполагается коротким, поскольку спин-блокировки могут быть более эффективными, чем более сложные механизмы, такие как мьютексы.

Преимущества: Спин-блокировка может быть более эффективной, чем использование мьютексов или других сложных механизмов синхронизации, если время ожидания невелико. Это связано с тем, что потоки не переходят в спящий режим, а продолжают активно проверять состояние ресурса.

Недостатки: Спин-блокировки могут быть неэффективными, если время ожидания ресурса значительно, поскольку активное ожидание потребляет процессорное время, которое могло бы быть использовано для выполнения другой работы. Это также может привести к увеличению энергопотребления и снижению общей производительности.


use std::ops::{Deref, DerefMut};
use std::cell::UnsafeCell;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Acquire, Release};

pub struct SpinLock {
    locked: AtomicBool,
    value: UnsafeCell,
}

unsafe impl Sync for SpinLock where T: Send {}

pub struct Guard<'a, T> {
    lock: &'a SpinLock,
}

unsafe impl Sync for Guard<'_, T> where T: Sync {}

impl SpinLock {
    pub const fn new(value: T) -> Self {
        Self {
            locked: AtomicBool::new(false),
            value: UnsafeCell::new(value),
        }
    }
    pub fn lock(&self) -> Guard {
        while self.locked.swap(true, Acquire) {
            std::hint::spin_loop();
        }
        Guard { lock: self }
    }
}
impl Deref for Guard<'_, T> {
    type Target = T;
    fn deref(&self) -> &T {
        // Safety: The very existence of this Guard
        // guarantees we've exclusively locked the lock.
        unsafe { &*self.lock.value.get() }
    }
}
impl DerefMut for Guard<'_, T> {
    fn deref_mut(&mut self) -> &mut T {
        // Safety: The very existence of this Guard
        // guarantees we've exclusively locked the lock.
        unsafe { &mut *self.lock.value.get() }
    }
}
impl Drop for Guard<'_, T> {
    fn drop(&mut self) {
        self.lock.locked.store(false, Release);
    }
}
fn main() {
    use std::thread;
    let x = SpinLock::new(Vec::new());
    thread::scope(|s| {
        s.spawn(|| x.lock().push(1));
        s.spawn(|| {
            let mut g = x.lock();
            g.push(2);
            g.push(2);
        });
    });
    let g = x.lock();
    assert!(g.as_slice() == [1, 2, 2] || g.as_slice() == [2, 2, 1]);
}
  • store() - Сохраняет новое значение
  • load() - Загружает (возвращает) значение
  • as_ptr()
  • compare_and_swap()
  • compare_exchange()
  • compare_exchange_weak()
  • fetch_add() - Добавляет к текущему значению, возвращая предыдущее значение./ Побитовое «И» с текущим значением.
  • fetch_max() - Максимум при текущем значении.
  • fetch_min() - Минимум при текущем значении.
  • fetch_nand() - Побитовое «nand» с текущим значением.
  • fetch_or() - Побитовое «или» с текущим значением.
  • fetch_sub() - Вычитает из текущего значения, возвращая предыдущее значение.
  • swap() - Сохраняет значение в атомарном целом числе, возвращая предыдущее значение.
  • fetch_xor() - Побитовое «исключающее ИЛИ» с текущим значением.
  • fetch_update()
  • from_mut()
  • from_mut_slice()
  • from_ptr()
  • get_mut()
  • get_mut_slice()
  • into_inner()
  • new()

use std::sync::atomic::AtomicI32;
fn main(){
    let a = AtomicI32::new(100);
    let b = a.fetch_add(23, Relaxed);
    let c = a.load(Relaxed);

    assert_eq!(b, 100);
    assert_eq!(c, 123);
}

AtomicPtr<T>— это атомарная версия *mut T: указатель на T

marabos.nl/atomics/memory-ordering

Для гораздо большего типа данных, который не помещается в одну атомарную переменную AtomicU64, нам нужно AtomicPtr

use std::sync::atomic::AtomicPtr;
fn get_data() -> &'static Data {
    static PTR: AtomicPtr<Data> = AtomicPtr::new(std::ptr::null_mut());

    let mut p = PTR.load(Acquire);

    if p.is_null() {
        p = Box::into_raw(Box::new(generate_data()));
        if let Err(e) = PTR.compare_exchange(
            std::ptr::null_mut(), p, Release, Acquire
        ) {
            // Safety: p взят из Box::into_raw прямо выше, 
            // и не был передан ни одному другому потоку.
            drop(unsafe { Box::from_raw(p) });
            p = e;
        }
    }
    unsafe { &*p }
// Safety: p не равен нулю и указывает на правильно инициализированное значение.
// Чтобы убедиться в справедливости нашего предположения, мы используем порядок освобождения и получения памяти, чтобы убедиться, что инициализация данных действительно произошла, прежде чем создавать ссылку на них.
}

Нужно использовать Acquire как для упорядочивания загрузочной памяти, так и для упорядочивания памяти ошибок Compare_exchange, чтобы иметь возможность синхронизироваться с операцией, сохраняющей указатель. Это сохранение происходит, когда операция Compare_exchange завершается успешно, поэтому мы должны использовать Release в качестве порядка ее успеха.

Пример: отчет о ходе работы


use std::sync::atomic::AtomicUsize;
fn main() {
    let num_done = AtomicUsize::new(0);
    thread::scope(|s| {
        // Фоновый поток для обработки всех 100 элементов.
        s.spawn(|| {
            for i in 0..100 {
                process_item(i); // Assuming this takes some time.
                num_done.store(i + 1, Relaxed);
            }
        });
        // В основной теме каждую секунду отображаются обновления статуса.
       // т.е. будет выведено много дублей текущего состояния num_done пока не обновиться до нового состояния !!!
        loop {
            let n = num_done.load(Relaxed);
            if n >= 100 { break; }
            println!("Working.. {n}/100 done");
            thread::sleep(std::time::Duration::from_millis(1));
        }
    });
    println!("Done!");
}
fn process_item(n:usize){}

После обработки последнего элемента может потребоваться до одной целой секунды, чтобы основной поток узнал об этом, что приводит к ненужной задержке в конце. Чтобы решить эту проблему, мы можем использовать парковку потоков чтобы вывести основной поток из состояния сна всякий раз, когда появляется новая информация, которая может его заинтересовать.


fn main() {
    let num_done = AtomicUsize::new(0);
    let main_thread = thread::current();
    thread::scope(|s| {
        // Фоновый поток для обработки всех 100 элементов.
        s.spawn(|| {
            for i in 0..100 {
                process_item(i); // Если предположить, что это займет некоторое время.
                num_done.store(i + 1, Relaxed);
                main_thread.unpark(); // Разбудите основной поток.
            }
        });
        // В основной теме отображаются обновления статуса.
        loop {
            let n = num_done.load(Relaxed);
            if n == 100 { break; }
            println!("Working.. {n}/100 done");
            thread::park_timeout(Duration::from_secs(1));// если 1 сек пройдет до снятия unpark то он снова покажет прошлое значение
        }
    });
    println!("Done!");
}

use std::thread;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
fn main() {
    let data = vec![1, 2, 3, 4];
    let idx = Arc::new(AtomicUsize::new(0));
    let other_idx = idx.clone();
    thread::spawn(move || {
        other_idx.fetch_add(10, Ordering::SeqCst);
    });
    println!("{}", data[idx.load(Ordering::SeqCst)]);

    if idx.load(Ordering::SeqCst) < data.len() {
        unsafe {
            println!("{}", data.get_unchecked(idx.load(Ordering::SeqCst)));
        }
    }
}

В приведенном ниже примере мы продемонстрируем, как порядок «Relaxed» отличается от порядка «Acquire» и «Release»

learntutorials

Атомные типы являются строительными блоками незакрепленных структур данных и других параллельных типов. При доступе к / модификации атомного типа следует указывать порядок памяти, представляющий силу барьера памяти. Rust обеспечивает 5 примитивов упорядочения памяти: Relaxed (самый слабый), Acquire (для чтения aka load), Release (для записи aka магазинов), AcqRel (эквивалент «Приобретать для загрузки и выпуска для хранения»; полезно, когда оба участвуют в одной операции, такой как compare-and-swap) и SeqCst (самый сильный).


use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;

struct UsizePair {
    atom: AtomicUsize,
    norm: UnsafeCell,
}

// UnsafeCell не является потокобезопасным. Поэтому вручную пометьте наш UsizePair как Sync. 
// (Фактически сообщая компилятору: «Я сам об этом позабочусь!»)
unsafe impl Sync for UsizePair {}

static NTHREADS: usize = 8;
static NITERS: usize = 1000000;

fn main() {
    let upair = Arc::new(UsizePair::new(0));

// Barrier — это структура синхронизации, похожая на счетчик (не путать
// с барьером памяти). Он блокируется при вызове 'wait' до тех пор, пока не будет получено фиксированное число
// вызовов 'wait' из различных потоков (например, ожидание, пока все
// игроки доберутся до стартовой линии, прежде чем выстрелить из стартового пистолета).
    let barrier = Arc::new(Barrier::new(NTHREADS + 1));

    let mut children = vec![];

    for _ in 0..NTHREADS {
        let upair = upair.clone();
        let barrier = barrier.clone();
        children.push(thread::spawn(move || {
            barrier.wait();

            let mut v = 0;
            while v < NITERS - 1 {
                // Прочитайте оба члена `atom` и `norm` и проверьте, содержит ли `atom` 
                // более новое значение, чем `norm`. См. описание реализации `UsizePair` для подробностей.
                let (atom, norm) = upair.get();
                if atom > norm {
                    // Если в `get` и `set` используется порядок `Acquire`-`Release`, 
                   // то этот оператор никогда не будет выполнен.
                    println!("Reordered! {} > {}", atom, norm);
                }
                v = atom;
            }
        }));
    }

    barrier.wait();

    for v in 1..NITERS {
        // Обновите оба члена `atom` и `norm`, указав значение `v`. См. описание реализации.  
        upair.set(v);
    }

    for child in children {
        let _ = child.join();
    }
}

impl UsizePair {
    pub fn new(v: usize) -> UsizePair {
        UsizePair {
            atom: AtomicUsize::new(v),
            norm: UnsafeCell::new(v),
        }
    }

    pub fn get(&self) -> (usize, usize) {
        let atom = self.atom.load(Ordering::Relaxed); //Ordering::Acquire

        // Если указанная выше операция загрузки выполняется с упорядочением `Acquire`, 
        // то все записи перед соответствующим хранилищем `Release` 
        // гарантированно будут видны ниже.

        let norm = unsafe { *self.norm.get() };
        (atom, norm)
    }

    pub fn set(&self, v: usize) {
        unsafe { *self.norm.get() = v };

// Если приведенная ниже операция хранения выполняется с упорядочением 'Release',
// то запись в 'norm' выше гарантированно будет видна всем
// потокам, которые 'загружают 'atom' с порядком 'Acquire' и видят то же
// значение, которое было сохранено ниже'. Тем не менее, никаких гарантий не предоставляется, так как
// когда другие читатели станут свидетелями приведенного ниже сохранения, и, следовательно
// вышеупомянутая запись. С другой стороны, также нет никакой гарантии, что
// эти два значения будут синхронизированы для читателей. Даже если другой поток
// увидит то же значение, которое было сохранено ниже, он может на самом деле увидеть значение
// 'позже' в 'норме' по сравнению с тем, что было написано выше. То есть, там
// нет ограничений на видимость будущего.

        self.atom.store(v, Ordering::Relaxed); //Ordering::Release
    }
}

Примечание. Архитектуры x86 имеют сильную модель памяти. Эта статья объясняет это подробно. Также взгляните на страницу Википедии для сравнения архитектур.

Эта библиотека определяет общий тип атомарной оболочки Atomic<T> для всех T: NoUninit типов. Привязка NoUninit взята из крейта bytemuck и указывает, что тип не имеет внутренних байтов заполнения. Вам нужно будет получить или реализовать эту черту для всех типов, используемых с Atomic<T>

Атомарные переменные безопасно совместно использовать между потоками (они реализуют Sync), но сами по себе они не предоставляют механизма совместного использования. Самый распространенный способ совместного использования атомарной переменной — это поместить ее в Arc

extern crate atomic;

#[test]
fn atomic_i128() {
    let a = Atomic::new(0i128);
    assert_eq!(
        Atomic::<i128>::is_lock_free(),
        cfg!(feature = "nightly") & cfg!(target_has_atomic = "128")
    );
    assert_eq!(format!("{:?}", a), "Atomic(0)");
    assert_eq!(a.load(SeqCst), 0);
    a.store(1, SeqCst);
    assert_eq!(a.swap(2, SeqCst), 1);
    assert_eq!(a.compare_exchange(5, 45, SeqCst, SeqCst), Err(2));
    assert_eq!(a.compare_exchange(2, 3, SeqCst, SeqCst), Ok(2));
    assert_eq!(a.fetch_add(123, SeqCst), 3);
    assert_eq!(a.fetch_sub(-56, SeqCst), 126);
    assert_eq!(a.fetch_and(7, SeqCst), 182);
    assert_eq!(a.fetch_or(64, SeqCst), 6);
    assert_eq!(a.fetch_xor(1, SeqCst), 70);
    assert_eq!(a.fetch_min(30, SeqCst), 71);
    assert_eq!(a.fetch_max(-25, SeqCst), 30);
    assert_eq!(a.load(SeqCst), 30);
}
extern crate atomic;
#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, NoUninit)]
#[repr(C)]
struct Bar(u64, u64);

#[test]
fn atomic_bar() {
    let a = Atomic::default();
    assert_eq!(Atomic::<Bar>::is_lock_free(), false);
    assert_eq!(format!("{:?}", a), "Atomic(Bar(0, 0))");
    assert_eq!(a.load(SeqCst), Bar(0, 0));
    a.store(Bar(1, 1), SeqCst);
    assert_eq!(a.swap(Bar(2, 2), SeqCst), Bar(1, 1));
    assert_eq!(
        a.compare_exchange(Bar(5, 5), Bar(45, 45), SeqCst, SeqCst),
        Err(Bar(2, 2))
    );
    assert_eq!(
        a.compare_exchange(Bar(2, 2), Bar(3, 3), SeqCst, SeqCst),
        Ok(Bar(2, 2))
    );
    assert_eq!(a.load(SeqCst), Bar(3, 3));
}