Модель синхронизации потоков - Эксклюзивный доступ
Эксклюзивный доступ можно контролировать с помощью примитивов:
crate parking_lot
|
📌 Новый раздел
- 👾 Best practice
- Варианты применения
|
|
Общая изменяемость (shared mutability)
|
Общая изменяемость (shared mutability) в Rust означает, что несколько частей программы могут одновременно иметь доступ к изменению данных.
Это возможно благодаря использованию механизмов синхронизации, таких как мьютексы (mutexes), атомарные типы данных или другие механизмы, которые обеспечивают безопасное и согласованное изменение общих данных.
Основные сценарии использования общей изменяемости включают:
Параллельное программирование: В многопоточных или распределенных приложениях, где несколько потоков или процессов могут одновременно изменять общие данные, общая изменяемость позволяет безопасно и согласованно обновлять эти данные.
Кэширование: В случаях, когда несколько компонентов программы должны иметь доступ к кэшированным данным и одновременно обновлять их, общая изменяемость обеспечивает безопасное обновление кэшированных данных.
Совместное использование ресурсов: При работе с общими ресурсами, такими как базы данных или сетевые соединения, несколько частей программы могут одновременно обновлять или изменять состояние этих ресурсов с помощью общей изменяемости.
Событийно-ориентированное программирование: В асинхронных или событийно-ориентированных приложениях, где события могут происходить параллельно и обрабатываться несколькими компонентами, общая изменяемость позволяет безопасно обмениваться данными между этими компонентами.
Общая изменяемость важна для обеспечения согласованности данных и избежания гонок данных (data races) в параллельных или многопоточных программах. Однако ее использование требует внимательного проектирования и обеспечения безопасности, чтобы избежать потенциальных проблем с синхронизацией и конфликтами при доступе к данным.
|
|
|
Когда использовать?
- Когда у вас есть глобальные или разделяемые данные, к которым обращается множество потоков для чтения.
- Когда требуется безопасное обновление данных, не мешая остальным потокам.
Например:
- Кэш данных в многопоточном сервере.
- Общие настройки приложения, которые изменяются редко, но читаются часто.
- Реализация подписки на события.
Заменяет или улучшает использование примитивов для управления доступом к разделяемым данным в многопоточных приложениях.
- crate arc-swap эффективнее, чем RwLock, в сценариях с редкими записями и частыми чтениями.
- При использовании RwLock читатели могут блокироваться, если писатель активен. В случае arc-swap старые данные остаются доступны для читателей даже во время обновления.
use arc_swap::ArcSwap;
use std::sync::Arc;
fn main() {
// Создаём данные, которые будут обновляться
let data = Arc::new("Initial data".to_string());
let shared_data = ArcSwap::new(data);
// Чтение данных из другого потока
let reader_thread = std::thread::spawn({
let shared_data = shared_data.clone();
move || {
for _ in 0..5 {
let current_data = shared_data.load();
println!("Read data: {}", current_data);
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
});
// Обновление данных
let writer_thread = std::thread::spawn({
let shared_data = shared_data.clone();
move || {
std::thread::sleep(std::time::Duration::from_millis(200));
let new_data = Arc::new("Updated data".to_string());
shared_data.store(new_data);
}
});
reader_thread.join().unwrap();
writer_thread.join().unwrap();
}
|
|
|
Блокировка на запись . Разделяет кому давать какую блокировку.
Этот тип блокировки позволяет много читателей или не более одного писателя.
Это похоже на Mutex и RefCell
|
|
|
Стандартная библиотека Rust предоставляет только один RwLock тип общего назначения, но его реализация зависит от операционной системы.
Существует множество тонких различий между реализациями блокировки чтения-записи.
Большинство реализаций будут блокировать новых читателей, когда есть ожидающий писатель, даже если блокировка уже заблокирована чтением.
Это делается для предотвращения «голода» писателя , ситуации, когда множество читателей коллективно не дают возможности разблокировать блокировку, не позволяя ни одному писателю обновить данные.
|
|
Почему RwLock требует T: Send + Sync
а Mutex только T: Send?
|
Для сравнения, Mutex не различает читателей или писателей, которые приобретают блокировку, поэтому блокирует любые потоки, ожидающие появления блокировки. Это RwLock позволит любому числу читателей приобретать замок до тех пор, пока писатель не удерживает блокировку.
|
|
Вы можете создавать объекты Mutex и RwLock на этапе компиляции, а не только во время выполнения программы.
|
Теперь вы можете создавать глобальные константные или статические переменные, содержащие мьютексы или блокировки чтения-записи, что раньше было невозможно. Ранее такие конструкции требовали дополнительных трюков, например, использования макроса lazy_static!.
Улучшение производительности: Поскольку данные инициализируются на этапе компиляции, снижается накладная стоимость при старте программы.
use std::sync::Mutex;
static GLOBAL_MUTEX: Mutex = Mutex::new(42);
fn main() {
let lock = GLOBAL_MUTEX.lock().unwrap();
println!("Value: {}", *lock);
}
Хотя создание Mutex возможно на этапе компиляции, сами операции блокировки (lock(), try_lock()) всё равно остаются операциями времени выполнения.
Это не устраняет проблемы с мёртвыми блокировками или другими нюансами многопоточности — эти аспекты должны быть обработаны программно.
|
|
|
- new() - Создает новый экземпляр, RwLock
<T>
- get_mut() - Возвращает изменяемую ссылку на базовые данные.
- into_inner() - Расходует RwLock возвращая данные
- is_poisoned(&self) -> bool Определяет паниковал ли lock в потоке
- clear_poison() - Очистить отравленное состояние из RwLock
- read() - Блокирует с общим доступом для чтения, блокируя текущий поток до тех пор, пока он не будет получен.
- try_read() - Пытается получить это RwLock с общим доступом на чтение.
- try_write() - Пытается заблокировать это RwLock с эксклюзивным доступом на запись.
- write() - Блокирует с эксклюзивным доступом на запись, блокируя текущий поток до тех пор, пока он не будет получен.
|
|
RwLock одновременный доступ для чтения и записи
try_read() - Пытается получить это RwLock с общим доступом на чтение.
try_write() - Пытается заблокировать это RwLock с эксклюзивным доступом на запись.
RwLock допускает одновременный доступ для чтения, то есть несколько потоков могут получить блокировку чтения одновременно, если нет активных блокировок записи.
Это позволяет эффективно выполнять параллельные операции чтения.
Однако доступ для записи является эксклюзивным, то есть только один поток может одновременно получить блокировку записи, что обеспечивает целостность данных.
|
use std::sync::Arc;
use std::sync::RwLock;
use std::thread;
fn main() {
let rw_data = Arc::new(RwLock::new(Vec::::new()));
let shared_data = Arc::clone(&rw_data);
let shared_data2 = Arc::clone(&rw_data);
let reader_thread = thread::spawn(move || {
for _ in 0..5{
if let Ok(ref shared_data_reader) = shared_data.try_read(){
println!("Reader Thread: {:?}", *shared_data_reader);
}
std::thread::sleep(std::time::Duration::from_millis(1));
}
});
let writer_thread = thread::spawn(move || {
if let Ok(mut shared_data_writer) = shared_data2.try_write() {
(*shared_data_writer).push(42);
println!("Writer Thread {:?}", shared_data_writer);
} else {
println!("Couldn't get write access, sorry!")
};
});
reader_thread.join().expect("Reader thread panicked");
writer_thread.join().expect("Writer thread panicked");
let final_data = rw_data.read().unwrap();
println!("Final Data: {:?}", *final_data);
}
|
|
get_mut(&mut self) -> LockResult<&mut T> - Возвращает изменяемую ссылку на базовые данные
|
use std::sync::RwLock;
fn main(){
let mut lock = RwLock::new(0);
*lock.get_mut().unwrap() = 10;
assert_eq!(*lock.read().unwrap(), 10);
}
|
|
new(t: T) -> RwLock<T> - Создает новый экземпляр RwLock , который разблокирован.
|
use std::sync::RwLock;
fn main(){
let lock = RwLock::new(5);
}
|
|
into_inner(self) -> LockResult<T> - Расходует RwLock возвращая данные
|
use std::sync::RwLock;
fn main(){
let lock = RwLock::new(String::new());
{
let mut s = lock.write().unwrap();
*s = "modified".to_owned();
}
assert_eq!(lock.into_inner().unwrap(), "modified");
}
|
|
read(&self) -> LockResult<RwLockReadGuard<T>> - Блокирует этот rwlock с общим доступом для чтения, блокируя текущий поток, пока он не будет получен.
Вызывающий поток будет заблокирован, пока не будет больше писателей, удерживающих блокировку.
|
use std::sync::{Arc, RwLock};
use std::thread;
fn main(){
let lock = Arc::new(RwLock::new(1));
let c_lock = Arc::clone(&lock);
let n = lock.read().unwrap();
assert_eq!(*n, 1);
thread::spawn(move || {
let r = c_lock.read();
assert!(r.is_ok());
}).join().unwrap();
}
|
|
try_read(&self) -> TryLockResult<RwLockReadGuard<T>> - Попытки приобрести этот Rwlock с общим доступом для чтения.
Если доступ не может быть предоставлен в это время, то возвращается Err
|
use std::sync::RwLock;
fn main(){
let lock = RwLock::new(1);
match lock.try_read() {
Ok(n) => assert_eq!(*n, 1),
Err(_) => unreachable!(),
};
}
|
|
write(&self) -> LockResult<RwLockWriteGuard<T>> - Блокирует этот Rwlock с исключительным доступом для записи, блокируя текущий поток, пока он не будет получен.
Эта функция не вернется, пока другие авторы или другие читатели имеют доступ к блокировке.
|
use std::sync::RwLock;
fn main(){
let lock = RwLock::new(1);
let mut n = lock.write().unwrap();
*n = 2;
assert!(lock.try_read().is_err());
}
|
|
try_write(&self) -> TryLockResult<RwLockWriteGuard<T>> - Попытки заблокировать этот rwlock с исключительным доступом для записи.
|
use std::sync::RwLock;
fn main(){
let lock = RwLock::new(1);
let n = lock.read().unwrap();
assert_eq!(*n, 1);
assert!(lock.try_write().is_err());
}
use std::sync::RwLock;
fn main() {
let my_rwlock = RwLock::new(5);
let read1 = my_rwlock.read().unwrap();
let read2 = my_rwlock.read().unwrap();
if let Ok(mut number) = my_rwlock.try_write() {
*number += 10;
println!("Now the number is {}", number);
} else {
println!("Couldn't get write access, sorry!")
};
}
|
|
is_poisoned(&self) -> bool Определяет паниковал ли lock в потоке
clear_poison() - Очистить отравленное состояние из RwLock
|
#![feature(mutex_unpoison)]
use std::sync::{Arc, RwLock};
use std::thread;
fn main(){
let lock = Arc::new(RwLock::new(0));
let c_lock = Arc::clone(&lock);
let _ = thread::spawn(move || {
let _lock = c_lock.write().unwrap();
panic!(); // the mutex gets poisoned
}).join();
assert_eq!(lock.is_poisoned(), true);
let guard = lock.write().unwrap_or_else(|mut e| {
**e.get_mut() = 1;
lock.clear_poison();
e.into_inner()
});
assert_eq!(lock.is_poisoned(), false);
assert_eq!(*guard, 1);
}
|
|
|
|
|
|
use std::sync::RwLock;
use std::mem::drop;
fn main() {
let my_rwlock = RwLock::new(5);
let read1 = my_rwlock.read().unwrap();
let read2 = my_rwlock.read().unwrap();
println!("{:?}, {:?}", read1, read2);
drop(read1);
drop(read2); // we dropped both, so we can use .write() now
let mut write1 = my_rwlock.write().unwrap();
*write1 = 6;
std::mem::drop(write1);
println!("{:?}", my_rwlock);
}
|
|
Пример с потоком и channel
|
use std::sync::{Arc, RwLock};
use std::thread;
use std::sync::mpsc::{Receiver,Sender};
fn main() {
let (tx, rx): (Sender<()>, Receiver<()>) = std::sync::mpsc::channel();
let mut v:Vec = vec![1,2,3,4];
let rwlock = Arc::new(RwLock::new(v));
for i in 0..4 {
let (c_rwlock, tx) = (Arc::clone(&rwlock), tx.clone());
thread::spawn(move || {
if let Ok(mut n) = c_rwlock.try_write() {
n[i]+=10;
};
tx.send(());
});
}
//thread::sleep_ms(50);
for _ in 0..4 {
rx.recv();
}
if rwlock.is_poisoned() == false{
println!("{:?}",*rwlock.try_read().unwrap());
}
/* match rwlock.try_read() {
Ok(n) => println!("{:?}",*n), // [11, 12, 13, 14]
Err(_) =>println!(""),
};*/
}
|
|
|
Механизм парковки и размораживания (thread::park и thread::unpark) полезен для простых случаев синхронизации и может работать в сценариях с несколькими потоками, если вы внимательно управляете его использованием. Однако для более сложных сценариев с несколькими потоками и необходимостью более сложной синхронизации рекомендуется использовать условные переменные (Condvar) или каналы (channel). Эти инструменты обеспечивают более высокоуровневую и безопасную абстракцию для работы с многопоточностью в Rust.
|
|
|
При работе с несколькими потоками механизм парковки и размораживания (thread::park и thread::unpark) может стать сложным и менее эффективным, особенно если требуется синхронизация между множественными потребляющими потоками.
Когда у вас несколько потребляющих потоков, производящий поток не знает, какой из потребляющих потоков ожидает данных.
Если несколько потребляющих потоков одновременно ожидают данные, вызов unpark() только для одного из них может быть неэффективным. Остальные потребляющие потоки будут продолжать ожидать, несмотря на наличие новых данных.
Использование park и unpark требует тщательного управления уведомлениями, чтобы избежать ситуации, когда уведомление теряется или поток пробуждается без необходимости. Это усложняет код и может приводить к ошибкам.
Альтернативные подходы
Вместо использования park и unpark для управления несколькими потоками можно рассмотреть следующие альтернативы: Condvar или channel
|
|
Parking and Condition Variables
Один из способов ожидания уведомления от другого потока называется парковкой потока
|
Когда данные мутируют несколько потоков, есть много ситуаций, когда им нужно будет ждать некоторого события, некоторого условия о данных, чтобы оно стало истинным.
Например, если у нас есть мьютекс, защищающий a Vec, мы можем захотеть подождать, пока он не будет содержать что-либо.
Хотя мьютекс позволяет потокам ждать, пока он не будет разблокирован, он не предоставляет функционал для ожидания каких-либо других условий.
Если бы мьютекс был всем, что у нас было, нам пришлось бы постоянно блокировать мьютекс, чтобы повторно проверять, есть ли что-нибудь в Vec
Поток может припарковать себя, что усыпит его и не даст ему потреблять циклы ЦП. Затем другой поток может распарковать припаркованный поток, выведя его из состояния сна.
park() не гарантирует, что она вернется только из-за сопоставления unpark().
Хотя это и довольно редко, она может иметь ложные пробуждения
Соблюдайте очередность park=>unpark
Важным свойством парковки потоков является то, что вызов unpark() до того, как поток сам припаркуется, не теряется. Запрос на распарковку все равно записывается, и в следующий раз, когда поток попытается припарковаться, он очищает этот запрос и напрямую продолжает работу, фактически не переходя в спящий режим.
|
|
Важно вызывать thread::park() только когда действительно нет элементов в очереди, а не после каждого обработанного элемента. Это предотвращает лишние блокировки и разблокировки очереди.
|
Если очередь пуста, поток будет "парковаться" с помощью thread::park(). Это значит, что поток приостановит свою работу до тех пор, пока кто-то не вызовет thread::unpark() на этом потоке.
use std::collections::VecDeque;
fn main() {
let queue = Mutex::new(VecDeque::new());
thread::scope(|s| {
// Consuming thread
let t = s.spawn(|| loop {
let item = queue.lock().unwrap().pop_front();
if let Some(item) = item {
dbg!(item);
} else {
thread::park(); // приостановка пока не будет вызван unpark даюший понять что данные появились
}
});
// Producing thread
for i in 0.. {
queue.lock().unwrap().push_back(i);
t.thread().unpark();
thread::sleep(Duration::from_secs(1));
}
});
}
|
|
|
Замена для Mutex, RwLock, Condvar, and Once меньше, быстрее и более гибким, чем в стандартной библиотеке ржавчины, а также ReentrantMutex тип, который поддерживает рекурсивную блокировку.
Он также предоставляет низкоуровневый API для создания ваших собственных эффективных примитивов синхронизации.
Когда использовать crate parking_lot?
- Для высоконагруженных многопоточных приложений, где важна производительность.
- Когда требуется меньший размер блокировок или удобство работы с Condvar.
- Для улучшения производительности в сценариях с высокой конкуренцией потоков.
crate parking_lot может быть особенно полезен в серверных приложениях или системах реального времени.
|
|
|
Преимущества parking_lot над std::sync::Mutex:
-
Более низкие накладные расходы:
parking_lot::Mutex использует более эффективный алгоритм блокировки, который минимизирует использование системных вызовов.
-
Без необходимости Arc для Condvar:
В отличие от стандартной реализации, parking_lot позволяет напрямую использовать переменные условия (Condvar) без обёртки в Arc.
-
Меньший размер примитивов:
Примитивы parking_lot занимают меньше памяти по сравнению с std::sync.
|
|
Сравнение с std::sync::Mutex
При высокой конкурентности parking_lot::Mutex показывает лучшее время выполнения.
Код с std::sync::Mutex будет аналогичен, но parking_lot работает быстрее из-за более эффективного алгоритма блокировки.
|
use parking_lot::Mutex;
use std::thread;
fn main() {
let data = Mutex::new(0);
let handles: Vec<_> = (0..10).map(|_| {
let data = data.clone();
thread::spawn(move || {
let mut lock = data.lock();
*lock += 1;
})
}).collect();
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *data.lock());
}
|
|
Использование с Condvar (переменная условия)
Пример с использованием Condvar для синхронизации потоков
|
use parking_lot::{Condvar, Mutex};
use std::thread;
fn main() {
let pair = Mutex::new(false);
let condvar = Condvar::new();
let thread = thread::spawn({
let pair = &pair;
let condvar = &condvar;
move || {
let mut lock = pair.lock();
*lock = true;
condvar.notify_one();
}
});
let mut lock = pair.lock();
while !*lock {
condvar.wait(&mut lock);
}
println!("Condition met!");
thread.join().unwrap();
}
|
|
|
Семафор поддерживает набор разрешений.
Разрешения используются для синхронизации доступа к общему ресурсу.
Семафор отличается от мьютекса тем, что он может разрешить одновременному доступу к общему ресурсу более чем одному вызывающему объекту.
Перед доступом к ресурсу поток уменьшает счетчик семафора, а после завершения работы с ресурсом увеличивает его счетчик.
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{Semaphore, TryAcquireError};
use std::sync::Arc;
#[tokio::main]
async fn main() {
let semaphore_a = Arc::new(Semaphore::new(10));
let semaphore_b = Arc::new(Semaphore::new(10));
let mut handles = Vec::new();
{
let a = Arc::clone(&semaphore_a);
let b = Arc::clone(&semaphore_b);
let handle = tokio::spawn(async move{
let response = 0;
if let Ok(permit_a) = a.try_acquire(){
if let Ok(permit_b) = b.try_acquire(){
// Send the request.
let response = 1;
// Drop the permit after the request has been sent.
//drop(permit_a);
//drop(permit_b);
std::thread::sleep(std::time::Duration::from_secs(2));
return response;
}
}
response
});
handles.push(handle);
}
{
let a = Arc::clone(&semaphore_a);
let b = Arc::clone(&semaphore_b);
let handle = tokio::spawn(async move{
/*
let response = 0;
if let Ok(permit_b) = b.try_acquire(){
if let Ok(permit_a) = a.try_acquire(){
// Send the request.
let response = 2;
// Drop the permit after the request has been sent.
//drop(permit_b);
//drop(permit_a);
//std::thread::sleep(std::time::Duration::from_secs(2));
return response;
}
}*/
let permit_b = b.acquire().await.unwrap();
let permit_a = a.acquire().await.unwrap();
let response = 2;
response
});
handles.push(handle);
}
// Collect responses from tasks.
let mut responses = Vec::new();
for jh in handles {
let response = jh.await.unwrap();
responses.push(response);
}
// Process responses.
println!("{:?}",responses);
}
|
|
|
Mutex Send "внутренняя изменчивость" - Один из способов использования общих данных между потоками (Потокобезопасный аналог RefCell<T>)
У мьютексов есть репутация труднодоступности, потому что вы должны помнить два правила:
- Прежде чем использовать данные, вы должны попытаться приобрести блокировку.
- Когда вы закончите с данными, которые защищают мьютекс, вы должны разблокировать данные, чтобы другие потоки могли получить блокировку.
|
|
📌 Рассмотрите возможность использования parking_lot::Mutex в качестве более быстрой альтернативы std::sync::Mutex
crate parking_lot
|
|
|
Мьютексы в других языках программирования
|
Самое большое отличие в том, что Rust Mutex<T> содержит данные, которые он защищает.
В C++, например, не содержит данных, которые он защищает, и даже не знает, что он защищает. Это означает, что пользователь должен помнить, какие данные защищены и каким мьютексом, и обеспечивать блокировку нужного мьютекса каждый раз, когда осуществляется доступ к «защищенным» данным.
|
|
|
Mutex<T> это умный указатель. Точнее, вызов lock возвращает смарт - указатель MutexGuard. Этот интеллектуальный указатель реализует, Deref и DerefMut чтобы указать на наши внутренние данные; интеллектуальный указатель также имеет Drop реализацию, которая автоматически освобождает блокировку, когда MutexGuard выходит из области видимости, что происходит в конце внутренней области
Обеспечивает взаимное исключение потоков в rinetime. Mutex будет блокировать потоки, ожидая, когда блокировка станет доступной.
Нам нужен тип, который может обеспечить изменение своего содержимого лишь одним пользователем одновременно - это будет Mutex, какой поток первым получи блокировку тот и сможет изменить данные, а остальные только читать
Если мы присваиваем охраннику имя с помощью выражения let, относительно просто понять, когда он будет освобожден, поскольку локальные переменные освобождаются в конце области видимости, в которой они определены.
let mut guard: MutexGuard<'_,i32> = mutex_clone.lock().unwrap();
Также возможно использовать охранника без присвоения ему имени, и это может быть очень удобно в некоторых случаях.
Например, если у вас есть Mutex<Vec<i32>>, вы можете заблокировать mutex, добавить элемент в Vec и снова разблокировать mutex в одном выражении:
mutex_clone.lock().unwrap().push(1);
Любые временные значения, возникающие в рамках более крупного выражения, такие как охранник, возвращаемый методом lock(), будут освобождены в конце выражения.
Временный охранник не будет освобожден до конца всего выражения if let, что означает, что мы ненужно удерживаем блокировку во время обработки элемента.
if let Some(item) = list.lock().unwrap().pop() {
process_item(item);
}
Мы можем избежать этой проблемы, переместив операцию pop в отдельное выражение let. Тогда охранник будет освобожден в конце этого выражения до if let:
let item = list.lock().unwrap().pop();
if let Some(item) = item {
process_item(item);
}
|
|
Отравление (poisoning) мьютекса
|
В Rust poisoning (отравление) мьютекса происходит, если поток, удерживающий мьютекс, завершился аварийно (например, из-за паники) до того, как освободил мьютекс.
Это сигнализирует другим потокам о том, что состояние защищённых данным мьютексом данных может быть неконсистентным или повреждённым.
Другие потоки всё ещё могут попытаться захватить мьютекс.
Отравление не блокирует доступ к мьютексу для других потоков.
Это означает, что другие потоки могут успешно вызвать метод lock() и получить доступ к мьютексу.
Метод lock() возвращает Result<MutexGuard<T>, PoisonError<MutexGuard<T>>>.
Если мьютекс был отравлён, lock() вернёт Err, содержащий информацию об отравлении.
Это позволяет вызывающему коду решить, как поступить дальше:
- Обработка ошибки: Можно явно обработать ошибку отравления, например, попытаться восстановить состояние или выполнить какие-то корректирующие действия.
- Использование
unwrap(): В практических случаях, как вы упомянули, часто используется .unwrap(), что приводит к панике, если мьютекс отравлён. Это оправдано, если отравление мьютекса указывает на критическую ошибку, после которой безопасно продолжать выполнение программы невозможно.
|
|
Взаимоблокировка (deadlock)
Cооветуют предопределить очередность захвата этих мьютексов, и во всех потоках придерживаться этой очередности
deadlock
|
Риск создания взаимоблокировок.
Это происходит, когда операция должна блокировать два ресурса, и каждый из двух потоков приобрел один из lock (замков), заставляя ждать друг друга навсегда.
Поток может заблокировать сам себя, когда пытается захватить блокировку, которую уже удерживает.
let mut guard1 = self.waiting_list.lock().unwrap();
let mut guard2 = self.waiting_list.lock().unwrap(); // взаимоблокировка
Предположим, что первый вызов self.waiting_list.lock() завершился успешно,т. е. блокировка захвачена. Второй вызов обнаруживает, что блокировка кем-то
захвачена, и ждет ее освобождения. Ждать он будет вечно, поскольку именно ожидающий поток и удерживает блокировку.
Иными словами, блокировка в Mutex не является рекурсивной.
В данном случае ошибка очевидна. Но в реальной программе вызовы lock()
могут находиться в двух разных методах, один из которых вызывает другой. Код каждого метода по отдельности выглядит нормально.
Взаимоблокировка возможна и при работе с каналами. Например, два потока могут взаимно заблокироваться, если будут ждать получения сообщений друг от друга.
Упрощенная попытка решить эту проблему заключается в уменьшении области действия блокировок, чтобы не было момента, когда обе блокировки будут удерживаться одновременно
Если невозможно избежать параллелизма с разделяемым состоянием, то есть несколько способов снизить вероятность написания кода, подверженного взаимоблокировкам:
- Поместите структуры данных, которые должны быть согласованы друг с другом, под одну защиту .
- Сохраняйте области блокировки небольшими и очевидными
- Избегайте вызова замыканий при удерживаемых блокировках; это ставит код в зависимость от любого замыкания, которое будет добавлено в кодовую базу в будущем.
- Аналогично, избегайте возвращать сообщение MutexGuard вызывающему абоненту: с точки зрения deadlock ситуации это все равно, что передавать заряженное ружье. (Если вызывающий код хранит MutexGuard дольше, чем ожидалось, это может привести к тому, что мьютекс будет заблокирован дольше, чем необходимо, что увеличивает риск возникновения дедлоков. Если несколько функций или потоков начинают возвращать и удерживать свои MutexGuard, это может привести к ситуации, когда разные части программы ожидают освобождения мьютексов, которые никогда не будут освобождены из-за неправильного управления MutexGuard. Возвращая MutexGuard, вы нарушаете инкапсуляцию, позволяя внешнему коду напрямую взаимодействовать с мьютексом. Это может привести к нежелательным изменениям состояния, которые сложно отслеживать и отлаживать.)
- Включите в свою систему CI инструменты обнаружения взаимоблокировок, такие как
no_deadlocks, ThreadSanitizer или parking_lot::deadlock
#![feature(mutex_unlock)]
use std::thread;
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
fn main(){
let a = Arc::new(Mutex::new(0));
let b = Arc::new(Mutex::new(0));
let mut handles = vec![];
{
let a = Arc::clone(&a);
let b = Arc::clone(&b);
let handle = thread::spawn(move || {
let mut a_num = a.lock().unwrap();
*a_num += 1;
println!("Thread 1 holds a lock and starts waiting b lock");
//let mut b_num = b.lock().unwrap(); --------------- deadlock
if let Ok(ref mut b_num) = b.try_lock(){
**b_num += 1;
}else{
Mutex::unlock(a_num);
}
});
handles.push(handle);
}
{
let a = Arc::clone(&a);
let b = Arc::clone(&b);
let handle = thread::spawn(move || {
let mut b_num = b.lock().unwrap();
*b_num += 1;
println!("Thread 2 holds b lock and starts waiting a lock");
//let mut a_num = a.lock().unwrap(); --------------- deadlock
if let Ok(ref mut a_num) = a.try_lock(){
**a_num += 1;
}else{
Mutex::unlock(b_num);
}
println!("Thread 2");
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Done {}", *a.lock().unwrap()); // never reach here
}
|
|
|
- new() - Создает новый Mutex в разблокированном состоянии
- get_mut() - Возвращает изменчивую ссылку на базовые данные.
- into_inner() - Потребляет Mutex, возвращая базовые данные.
- lock() - Блокирует локальный поток, пока он не будет доступен для получения Mutex.
- try_lock() - Попытки взять блокировку
- unlock() - Немедленно сбрасывает защиту и, следовательно, разблокирует Mutex.
- is_poisoned() - Проверка отравлен ли Mutex, была ли паника?
- clear_poison() - Очистить отравленное состояние из Mutex
|
|
new() - Создает новый Mutex в разблокированном состоянии
into_inner() - Потребляет Mutex, возвращая базовые данные.
|
use std::sync::{Mutex, Arc};
use std::thread;
fn main(){
// new() - создаем защищенный счет
let account = Arc::new(Mutex::new(1000)); // начальный баланс
let mut handles = vec![];
// Несколько потоков работают с счетом
for _ in 0..10 {
let account = Arc::clone(&account);
handles.push(thread::spawn(move || {
// lock() - блокируем для безопасного доступа
let mut balance = account.lock().unwrap();
*balance += 100; // пополнение
// автоматическая разблокировка
}));
}
for handle in handles {
handle.join().unwrap();
}
// into_inner() - забираем финальный результат
let final_balance = Arc::try_unwrap(account)
.unwrap()
.into_inner()
.unwrap();
println!("Final balance: {}", final_balance); // 2000
}
|
|
try_lock() - Попытки взять блокировку
|
use std::sync::{Mutex, Arc};
use std::thread;
use std::time::Duration;
fn main(){
let data = Arc::new(Mutex::new(0));
let data_clone = Arc::clone(&data);
let producer = thread::spawn(move || {
for i in 1..=5 {
let mut lock = data_clone.lock().unwrap();
*lock = i;
thread::sleep(Duration::from_millis(100)); // имитация работы
}
});
let consumer = thread::spawn(move || {
while let Ok(lock) = data.try_lock() {
// try_lock() - не блокируем поток если мьютекс занят
if *lock == 5 { break; }
println!("Current value: {}", *lock);
thread::sleep(Duration::from_millis(50));
}
println!("Producer finished!");
});
producer.join().unwrap();
consumer.join().unwrap();
}
|
|
get_mut() - Возвращает изменчивую ссылку на базовые данные.
Работает ТОЛЬКО когда есть эксклюзивный доступ
|
Правильное использование:
- до создания потоков
- инициализация перед многопоточным использованием
- локальное использование/тестирование без многопоточности
Неправильное использование:
- попытка использовать get_mut() с Arc
- после передачи в поток
✅ Правильное использование - до создания потоков
use std::sync::Mutex;
fn main() {
// Создаем мьютекс в основном потоке
let mut mutex = Mutex::new(vec![1, 2, 3]);
// get_mut() безопасен - нет других ссылок на Mutex
let data = mutex.get_mut().unwrap();
data.push(4);
data.push(5);
println!("До создания потоков: {:?}", data); // [1, 2, 3, 4, 5]
// Теперь можно передавать в потоки
let shared_mutex = std::sync::Arc::new(mutex);
// Дальше работаем через lock()
}
✅ Инициализация перед многопоточным использованием
use std::sync::{Mutex, Arc};
fn initialize_config() -> Arc<Mutex<Config>> {
let mut mutex = Mutex::new(Config::default());
// Безопасная инициализация без блокировок
let config = mutex.get_mut().unwrap();
config.timeout = 30;
config.retries = 3;
config.load_credentials();
Arc::new(mutex) // Теперь передаем в потоки
}
struct Config {
timeout: u32,
retries: u32,
// другие поля...
}
impl Default for Config {
fn default() -> Self {
Self { timeout: 0, retries: 0 }
}
}
❌ Попытка использовать get_mut() с Arc
use std::sync::{Mutex, Arc};
fn main(){
let shared_mutex = Arc::new(Mutex::new(42));
// ОШИБКА! get_mut() требует эксклюзивного доступа
// let data = shared_mutex.get_mut().unwrap(); // Не скомпилируется
// Правильно - через lock()
let data = shared_mutex.lock().unwrap();
}
❌ После передачи в поток
use std::sync::{Mutex, Arc};
use std::thread;
fn main(){
let mutex = Arc::new(Mutex::new(0));
let thread_mutex = Arc::clone(&mutex);
thread::spawn(move || {
let _lock = thread_mutex.lock().unwrap();
});
// ОШИБКА! Мьютекс уже разделяется между потоками
// let data = mutex.get_mut(); // Не скомпилируется
}
|
|
is_poisoned(&self) -> bool - Проверка отравлен ли мьютекс, была ли паника?
|
Проблема отравления блокировок
Вызовы unwrap() в приведенных выше примерах связаны с проблемой отравления блокировок.
Mutex в Rust помечается как отравленный, когда поток вызывает панику, удерживая блокировку. Когда это происходит, Mutex больше не будет заблокирован, но вызов его метода lock приведет к возврату Err, чтобы указать, что он был отравлен.
Это механизм для защиты от оставления данных, защищенных mutex, в непоследовательном состоянии. В приведенном выше примере, если поток вызовет панику после увеличения целого числа менее чем на 100, mutex разблокируется, и целое число останется в неожиданном состоянии, где оно больше не является кратным 100, что может нарушить предположения других потоков. Автоматическая пометка mutex как отравленного в таком случае заставляет пользователя обработать эту возможность.
Вызов lock() на отравленном mutex все равно заблокирует mutex. Err, возвращаемый lock(), содержит MutexGuard, что позволяет нам при необходимости исправить непоследовательное состояние.
Хотя отравление блокировок может казаться мощным механизмом, на практике восстановление из потенциально непоследовательного состояния не всегда осуществляется. Большинство кода либо игнорирует отравление, либо использует unwrap(), чтобы вызвать панику, если блокировка была отравлена, тем самым эффективно распространяя паники на всех пользователей mutex.
use std::thread;
use std::thread::JoinHandle;
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
use std::str::FromStr;
fn main(){
let arc_mutex:Arc> = Arc::new(Mutex::new(0));
let mut handles:Vec> = Vec::new();
for n_thread in 1..=10 {
let mutex_clone:Arc> = Arc::clone(&arc_mutex);
let handle:JoinHandle<()> = thread::Builder::new()
.name(format!("{n_thread}"))
.spawn(move || {
let id:i32 = i32::from_str(thread::current().name().unwrap()).unwrap();
if id%2 == 0{
panic!("value is a multiple of 2");
}
// try_lock не ожидет получения блокировки Mutex
if let Ok(ref mut guard) = mutex_clone.try_lock(){
**guard += 1;
}
// lock ожидет получения блокировки т.е. блокирует текуший поток до получения возможности заблокировать Mutex
//let mut guard:MutexGuard<'_,i32> = mutex_clone.lock().unwrap();
// *guard += 1;
}).unwrap();
handles.push(handle);
}
for handle in handles {
if let Ok(_) = handle.join(){
}
}
println!("data:{}",*arc_mutex.lock().unwrap());
println!("{}",*arc_mutex.lock().unwrap());
let mut mutex:Mutex = Arc::into_inner(arc_mutex).unwrap();
if !mutex.is_poisoned(){
let ref_mutex:Result<&mut i32,PoisonError<_>> = mutex.get_mut();
let data:&mut i32 = ref_mutex.unwrap();
*data+=1;
}
println!("data:{}",mutex.into_inner().unwrap());
}
|
|
|
|
|
|
Доступ к этим значениям может занять нетривиальное количество времени.
Если к нескольким таким значениям обычно обращаются вместе, может быть лучше поместить их в одну оболочку.
❌ Например, такая структура:
struct S {
x: Arc<Mutex<u32>>,
y: Arc<Mutex<u32>>,
}
✅ может быть лучше представлено так:
struct S {
xy: Arc<Mutex<(u32, u32)>>
}
|
|
crossbeam - может гарантировать что поток не переживет ссылку на данные т.е. можно передавать ссылку на изменяемые данные
|
fn main(){
let xs = Mutex::new([0,0,0,0]);
crossbeam::scope(|scope_|{
for _ in 0..10{// запускаем 10 потоков
scope_.spawn(||{
let mut guard = xs.lock().unwrap();// блокируем данные
// теперь с данными безопасно обращаться так как мы держим блокировку Mutex
let xs:&mut[i32;4] = &mut guard;// разименовывание Mutex
for i in xs{
*i+=1;
}
});
}
});
println!("{:?}",*xs.lock().unwrap());
}
|
|
В примере создается 10 потоков, каждый из которых увеличивает значение внутри мьютекса.
Нет гарантии, в каком порядке выполняются потоки, и try_lock может не получить блокировку и пропустить поток дальше
|
use std::thread;
use std::thread::JoinHandle;
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
fn main(){
let arc_mutex:Arc> = Arc::new(Mutex::new(0));
let mut handles:Vec> = Vec::new();
for _ in 0..10 {
let mutex_clone:Arc> = Arc::clone(&arc_mutex);
let handle = thread::spawn(move || {
// try_lock не ожидет получения блокировки Mutex
if let Ok(ref mut guard) = mutex_clone.try_lock(){
**guard += 1;
}
// lock ожидет получения блокировки т.е. блокирует текуший поток до получения возможности заблокировать Mutex
let mut guard:MutexGuard<'_,i32> = mutex_clone.lock().unwrap();
*guard += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("data:{}",*arc_mutex.lock().unwrap());
let mut mutex:Mutex = Arc::into_inner(arc_mutex).unwrap();
if !mutex.is_poisoned(){
let ref_mutex:Result<&mut i32,PoisonError<_>> = mutex.get_mut();
let data:&mut i32 = ref_mutex.unwrap();
*data+=1;
}
println!("data:{}",mutex.into_inner().unwrap());
}
|
|
|
fn main(){
let arc_mutex:Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
let mut handles:Vec<JoinHandle<()>> = Vec::new();
for id in 1..=10 {
let mutex_clone:Arc<Mutex<i32>> = Arc::clone(&arc_mutex);
let handle = thread::Builder::new()
.name(format!("{}",id)).spawn(move || {
// lock ожидет получения блокировки т.е. блокирует текуший поток до получения возможности заблокировать Mutex
let mut guard: MutexGuard<'_,i32> = mutex_clone.lock().unwrap();
*guard += 1;
let result = std::panic::catch_unwind(||
if *guard == 7{
panic!("Aaaaa");
}
).map_err(|e|{
println!("PANICKED thread:{:?}",thread::current().name())
});
println!("+++ thread:{:?}",thread::current().name());
}).unwrap();
handles.push(handle);
}
}
|
|
Пример Mutex синхронизация через std::sync::mpsc::channel
|
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;// канал
use std::sync::mpsc::{Receiver,Sender};
fn main() {
let data = Arc::new(Mutex::new(vec![1u32, 2, 3]));
// Каналы имеют две конечные точки: `Sender ` и `Receiver `,
// где `T` - тип передаваемого сообщения
let (tx, rx): (Sender, Receiver) = std::sync::mpsc::channel();// создать новый канал , tx --> rx
for i in 0..3 {
// clone увеличивает внутренний счетчик, и ее забирает поток
let (data, tx) = (Arc::clone(&data), tx.clone());
// поток завладевает data в качестве окружения замыкания
thread::spawn(move || {
// новый поток
let mut data = data.lock();// lock(), который захватывает блокировку мьютекса.
match data {
Ok( mut _data) => { _data[i] += 1; },// мы свободно изменяем данные, так как у нас есть блокировка.
Err(e) => {}
}
tx.send(i as i32);// передает по каналу данные или пустой кортеж (), а затем в главном потоке ждем, пока не будут приняты все значений
// tx.send(()) пустой кортеж () не несёт никаких данных это просто сигнал
// мы можем отправить по каналу любое значение, которое реализует типаж Send
});
}
//println!("{:?}",data);// Mutex { data: [1, 2, 3] } еще не завершились потоки
// Пробежимся по всем потокам и будем знать что они завершились
for _ in 0..3 {
// то что вернул после синхронизации канал
let r = rx.recv().ok().expect("Could not receive answer");
println!("{}",r);// 120 021 короче пралелльная работа
}
//thread::sleep_ms(50);
println!("{:?}",data);// Mutex { data: [2, 3, 4] }
}
|
|
|
Переменная состояния
Условные переменные используются для синхронизации потоков, которые ждут наступления определённых событий или условий. Они позволяют потокам ожидать, пока не произойдут изменения в данных, защищённых мьютексом, и уведомляют их о том, что событие произошло.
Переменные условия представляют собой возможность заблокировать поток, чтобы он не потреблял процессорное время во время ожидания возникновения события.
Условные переменные обычно связаны с логическим предикатом (условием) и мьютексом.
Предикат всегда проверяется внутри мьютекса, прежде чем определить, что поток должен блокироваться.
Функции в этом модуле блокируют текущий поток выполнения.
|
|
|
Обычно a Condvar используется только вместе с одним Mutex. Если два потока попытаются одновременно wait использовать переменную условия, используя два разных мьютекса, это может вызвать панику.
Недостатком a Condvar является то, что он работает только вместе с a Mutex, но для большинства случаев использования этого вполне достаточно, поскольку именно он и так уже используется для защиты данных.
Иногда существует удобный блокирующий API, точно соответствующий ожидаемому условию, например в примере с остановкой сервера подошел бы метод JoinHandle::join.
В других случаях готового блокирующего API нет.
Для его построения программы могут воспользоваться условными переменными, которые в Rust реализованы типом std::sync::Condvar.
|
|
|
- new() - Создает новую переменную условия, которая готова к ожиданию и уведомлению
- notify_all() - разбудить ждущие его потоки
- notify_one() - разбудить ждущие его поток
- wait() - Блокирует текущий поток до тех пор, пока эта условная переменная не получит уведомление.
- wait_timeout() - Ожидает уведомления от этой условной переменной, истекая по истечении заданного времени.
- wait_timeout_ms()
- wait_timeout_while() - Сочетает ожидание условия с таймаутом и дополнительной проверкой предиката.
- wait_while() - Блокирует текущий поток до тех пор, пока эта условная переменная не получит уведомление и предоставленное условие не станет ложным.
|
|
|
|
|
|
|
|
notify_all() - будит все потоки, ожидающие на этом condition variable.
|
Идеальные use cases для notify_all():
- Graceful shutdown — остановка всех потоков
- Запуск группы потоков — одновременный старт, когда работу должны выполнить все потоки
- Изменение конфигурации — все потоки должны узнать об изменении
- Барьеры синхронизации — когда все потоки достигли точки
- Глобальные события — события которые касаются всех потоков
use std::sync::{Mutex, Condvar, Arc};
use std::thread;
fn main(){
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let (lock, cvar) = &*pair;
// Несколько потоков ждут
for i in 0..3 {
let pair = Arc::clone(&pair);
thread::spawn(move || {
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
println!("Поток {} проснулся!", i);
});
}
// Главный поток будит всех
thread::sleep(std::time::Duration::from_secs(1));
{
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_all(); // Будим ВСЕ ожидающие потоки
}}
|
|
wait_timeout_while() - Сочетает ожидание условия с таймаутом и дополнительной проверкой предиката.
|
wait_timeout_while - это самый продвинутый и безопасный метод ожидания в Rust, который сочетает все необходимые механизмы для надежной синхронизации!
Чем лучше обычного wait:
| Метод | Преимущества |
wait_timeout_while | + Таймаут + Предикат + Защита от ложных пробуждений |
wait | Может ждать вечно |
wait_timeout | Только таймаут, без предиката |
wait_while | Только предикат, без таймаута |
Идеальные use cases:
- Очереди с таймаутом — ждать задачу не больше X времени
- Resource pooling — ждать свободный ресурс с таймаутом
- Graceful shutdown — ждать завершения с максимальным временем
- Progress waiting — ждать определенного прогресса с таймаутом
- Coordination — синхронизация потоков с ограничением по времени
Важные нюансы:
Ложные пробуждения (spurious wakeups)
// Предикат защищает от ложных пробуждений
cvar.wait_timeout_while(
guard,
timeout,
|data| data.is_empty() // Перепроверяем условие при пробуждении
)
Обработка результата
fn main(){
let (guard, wait_result) = cvar.wait_timeout_while(guard, timeout, predicate).unwrap();
if wait_result.timed_out() {
// Таймаут - условие не выполнено
} else {
// Условие выполнено (получено notify или предикат изменился)
}
}
Возвращаемое значение
fn main(){
// Возвращает кортеж:
// - Guard: возобновленная блокировка мьютекса
// - WaitTimeoutResult: информация о таймауте
let (mut guard, result) = cvar.wait_timeout_while(guard, timeout, |x| x < 10);
// Можно использовать guard дальше
*guard += 1;
if result.timed_out() {
println!("Не дождались за {} мс", timeout.as_millis());
}
}
|
|
wait_timeout_while() - Сочетает ожидание условия с таймаутом и дополнительной проверкой предиката.
|
Что делает:
#![allow(unused)]
fn main() {
// Ждет пока:
// 1. Получит уведомление ИЛИ истечет таймаут
// 2. И условие предиката вернет true
// 3. Возвращает результат: (MutexGuard, bool) - где bool = таймаут ли?
}
Примеры использования:
1. Ожидание данных с таймаутом
use std::sync::{Mutex, Condvar};
use std::time::Duration;
fn main(){
let pair = (Mutex::new(false), Condvar::new());
let (lock, cvar) = &pair;
// Поток-ожидатель
let (guard, timeout) = cvar.wait_timeout_while(
lock.lock().unwrap(),
Duration::from_secs(5), // 5 секунд таймаут
|&mut data_ready| !data_ready // Ждем пока data_ready != true
).unwrap();
if timeout.timed_out() {
println!("Таймаут! Данные не получены за 5 секунд");
} else {
println!("Данные получены!");
}
}
2. Ожидание очереди с условием
struct TaskQueue {
tasks: Mutex<Vec<Task>>,
cvar: Condvar,
}
impl TaskQueue {
fn pop_with_timeout(&self, timeout: Duration) -> Option<Task> {
let (mut guard, result) = self.cvar.wait_timeout_while(
self.tasks.lock().unwrap(),
timeout,
|tasks| tasks.is_empty() // Ждем пока очередь НЕ пуста
).unwrap();
if result.timed_out() {
None // Таймаут - задач нет
} else {
guard.pop() // Берем задачу
}
}
}
3. Ожидание конкретного состояния
struct Processor {
status: Mutex<Status>,
cvar: Condvar,
}
#[derive(PartialEq)]
enum Status {
Idle,
Processing,
Complete,
}
impl Processor {
fn wait_for_completion(&self, timeout: Duration) -> bool {
let (guard, result) = self.cvar.wait_timeout_while(
self.status.lock().unwrap(),
timeout,
|status| *status != Status::Complete // Ждем завершения
).unwrap();
!result.timed_out() // true если завершился, false если таймаут
}
}
4. Реализация connection pool с таймаутом
struct ConnectionPool {
connections: Mutex<Vec<Connection>>,
cvar: Condvar,
}
impl ConnectionPool {
fn get_connection(&self, timeout: Duration) -> Option<Connection> {
let (mut guard, result) = self.cvar.wait_timeout_while(
self.connections.lock().unwrap(),
timeout,
|conns| conns.is_empty() // Ждем пока есть свободные соединения
).unwrap();
if result.timed_out() {
None // Нет свободных соединений
} else {
guard.pop() // Берем соединение
}
}
}
5. Ожидание с прогрессом
struct Downloader {
progress: Mutex<f64>,
cvar: Condvar,
}
impl Downloader {
fn wait_for_progress(&self, target: f64, timeout: Duration) -> bool {
let (guard, result) = self.cvar.wait_timeout_while(
self.progress.lock().unwrap(),
timeout,
|progress| *progress < target // Ждем пока прогресс >= target
).unwrap();
if result.timed_out() {
println!("Таймаут! Прогресс: {}", *guard);
false
} else {
println!("Достигнут прогресс: {}", *guard);
true
}
}
}
|
|
|
|
|
Аналогичный пример был в примере для park но тут используем переменнаую состояния Condvar
|
use std::sync::Condvar;
fn main(){
let queue = Mutex::new(VecDeque::new());
let not_empty = Condvar::new();
thread::scope(|s| {
s.spawn(|| {
loop {
let mut q = queue.lock().unwrap();
let item = loop {
if let Some(item) = q.pop_front() {
break item;
} else {
q = not_empty.wait(q).unwrap();
}
};
drop(q);
dbg!(item);
}
});
for i in 0.. {
queue.lock().unwrap().push_back(i);
not_empty.notify_one();
thread::sleep(Duration::from_secs(1));
}
});
}
|
|
|
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
fn main(){
let pair:Arc<(Mutex,Condvar)> = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
// Внутри нашей блокировки создайте новый поток и дождитесь его запуска.
thread::spawn(move|| {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
println!("1");
thread::sleep(std::time::Duration::from_millis(1000));
// Мы уведомляем condvar о том, что значение изменилось.
cvar.notify_one();
println!("2");
});
let (lock, cvar) = &*pair;
let mut started:MutexGuard<'_,bool> = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap(); // Подождите, пока поток запустится.
println!("3");
}
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|