Tokio: популярная асинхронная экосистема с HTTP, gRPC и фреймворками трассировки.
API для выполнения асинхронного ввода-вывода , включая сокеты TCP и UDP, операции с файловой системой , а также управление процессами и сигналами
Асинхронные среды выполнения - это библиотеки, используемые для выполнения асинхронных приложений.
Среды выполнения обычно объединяют реактор с одним или несколькими исполнителями.
Реакторы предоставляют механизмы подписки для внешних событий, таких как асинхронный ввод-вывод, межпроцессное взаимодействие и таймеры.
В асинхронной среде выполнения подписчики обычно являются фьючерсами, представляющими низкоуровневые операции ввода-вывода.
Исполнители занимаются планированием и выполнением задач.
Исполнитель отвечает за вызов Future::poll внешнего будущего, доводя асинхронные вычисления до завершения.
Они отслеживают запущенные и приостановленные задачи, опрашивают будущее до завершения и будят задачи, когда они могут добиться прогресса.
Rust имеет библиотеку mio, которая является платформо-независимой обёрткой (wrapper) над неблокирующим I/O и инструментами
Tokio — это в сущности замечательная обёртка над mio, которая использует футуры.
Tokio имеет главный цикл обработки событий (event loop) и вы передаете ему замыкания, которые возвращают футуры.
Этот цикл будет выполнять все замыкания, которые вы ему передадите, используя mio для выяснения того, какие футуры, могут прогрессировать, и продвигать их далее (вызывая poll())
[dependencies]
tokio = { version = "1.12", features = ["full"] }
Однопоточные среды выполнения
Примеры: tokio планировщик текущего потока Current-Thread Scheduler
Ускорение вычислений, связанных с процессором, за счет их параллельного выполнения в нескольких потоках.
Tokio разработан для приложений с привязкой к вводу-выводу, где каждая отдельная задача большую часть времени проводит в ожидании ввода-вывода.
Если единственное, что делает ваше приложение, - это параллельные вычисления, вы должны использовать rayon.
Тем не менее, все еще можно «смешивать и сочетать», если вам нужно делать и то, и другое.
Чтение большого количества файлов. Хотя кажется, что Tokio будет полезен для проектов, которым просто нужно читать много файлов,
Tokio не дает здесь никаких преимуществ по сравнению с обычным пулом потоков.
Это связано с тем, что операционные системы обычно не предоставляют асинхронные файловые API.
Отправка одного веб-запроса. Tokio дает вам преимущество, когда вам нужно делать много вещей одновременно.
Если вам нужно использовать библиотеку, предназначенную для асинхронного Rust, такую как reqwest, но вам не нужно делать сразу много вещей, вам следует предпочесть блокирующую версию этой библиотеки, так как это упростит ваш проект.
Использование Tokio, конечно, по-прежнему будет работать, но не дает реальных преимуществ перед блокирующим API.
Если библиотека не предоставляет блокирующий API, см. Главу о мостовом соединении с кодом синхронизации.
Для синхронного ввода-вывода используйте spawn_blocking.
Для вычислений с привязкой к процессору используйте отдельный пул потоков с разветвлением, например rayon.
Для синхронной работы, которая выполняется вечно (например, прослушивание соединения с базой данных), создайте собственный выделенный поток, чтобы не забирать его из пула Tokio/Rayon.
An async fn используется, когда мы хотим войти в асинхронный контекст. Однако асинхронные функции должны выполняться во время выполнения. Среда выполнения содержит асинхронный планировщик задач, обеспечивает событийный ввод-вывод, таймеры и т. д. Среда выполнения не запускается автоматически, поэтому ее должна запустить основная функция.
макрос #[tokio::main] преобразует async fn main() в синхронный fn main(), который инициализирует экземпляр среды выполнения и выполняет асинхронную основную функцию
Задача Tokio (tokio::task::JoinHandle) — это асинхронный зеленый поток.
Задачи — это единица выполнения, управляемая планировщиком.
Создание задачи отправляет его в планировщик Tokio, который затем гарантирует выполнение задачи, когда у нее есть работа.
Задача аналогична потоку ОС, но вместо того, чтобы управляться планировщиком ОС, ею управляет среда выполнения Tokio
Задачи, как правило, не должны выполнять системные вызовы или другие операции, которые могут заблокировать поток, поскольку это также помешает выполнению других задач, выполняющихся в том же потоке.
Вместо этого этот модуль предоставляет API для выполнения операций блокировки в асинхронном контексте.
#[tokio::main]
async fn main() {
let handle:tokio::task::JoinHandle = tokio::spawn(async {
// Do some async work
"return value"
});
// Выполняйте другую работу
let out = handle.await.unwrap();
println!("GOT {}", out);
}
Каналы ( oneshot, mpsc, watch и broadcast) для отправки значений между задачами:
mpsc: канал с несколькими производителями и одним потребителем. Можно отправить множество значений.
oneshot: один производитель, один потребительский канал. Можно отправить одно значение.
broadcast: мультипроизводитель, мультипотребитель. Можно отправить множество значений. Каждый получатель видит каждое значение.
watch: монопроизводители, многопотребители. Можно отправить множество значений, но история не сохраняется. Получатели видят только самое последнее значение.
Неблокирующий Mutex, для контроля доступа к общему изменяемому значению,
асинхронный Barrier тип, позволяющий синхронизировать несколько задач перед началом вычислений.
Если вам нужен канал с несколькими производителями Sender и несколькими потребителями Receiver, где только один потребитель видит каждое сообщение, вы можете использовать async-channel ящик.
Существуют также каналы для использования вне асинхронного Rust, такие как std::sync::mpsc и crossbeam::channel. Эти каналы ожидают сообщений, блокируя поток, что недопустимо в асинхронном коде.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send("sending from first handle").await;
});
tokio::spawn(async move {
tx2.send("sending from second handle").await;
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
tokio::task::spawn_blocking - переносит операцию блокировки в поток за пределами пула потоков Tokio
tokio::task::block_in_place - Хотя эта функция позволяет избежать остановки других независимо созданных задач, любой другой код, выполняющийся одновременно в той же задаче, будет приостановлен во время вызова block_in_place. Это может произойти, например, при использовании join!макроса. Чтобы избежать этой проблемы, используйте spawn_blocking вместо этого.
Асинхронный код никогда не должен проводить долгое время без достижения .await
Чтобы дать представление о том, сколько времени — это слишком много, хорошее эмпирическое правило — не более 10–100 микросекунд между каждым .await
Иногда мы просто хотим заблокировать поток. Это совершенно нормально. Для этого есть две распространенные причины:
Дорогостоящие вычисления, связанные с процессором.
Синхронный ввод-вывод.
В обоих случаях мы имеем дело с операцией, которая не позволяет задаче достичь a .awaitв течение длительного периода времени. Чтобы решить эту проблему, мы должны перенести операцию блокировки в поток за пределами пула потоков Tokio.
На этот счет есть три вариации:
Используйте функцию tokio::task::spawn_blocking
Используйте crate rayon rayon::spawn(move || {});
Создайте специальный поток с std::thread::spawn
Функция spawn_blocking
Среда выполнения Tokio включает отдельный пул потоков специально для запуска функций блокировки, и вы можете создавать в нем задачи, используя spawn_blocking.
Верхний предел этого пула потоков составляет около 500 потоков, поэтому вы можете создавать довольно много операций блокировки в этом пуле потоков.
Поскольку в пуле потоков очень много потоков, он лучше всего подходит для блокировки ввода-вывода, например взаимодействия с файловой системой или использования библиотеки блокировки базы данных, такой как diesel.
Пул потоков плохо подходит для дорогостоящих вычислений, связанных с ЦП, поскольку в нем гораздо больше потоков, чем ядер ЦП на вашем компьютере. Вычисления, связанные с ЦП, выполняются наиболее эффективно, если количество потоков равно количеству ядер ЦП. Тем не менее, если вам нужно всего лишь несколько вычислений, связанных с процессором, я не буду винить вас за их выполнение, spawn_blockingпоскольку это довольно просто сделать.
Параллелизм и организация очередей должны быть введены явно. Способы сделать это включают в себя:
tokio::task::spawn
tokio::spawn
select!
tokio::join!
mpsc::channel
При этом позаботьтесь о том, чтобы общий объем параллелизма был ограничен.
Например, при написании цикла принятия TCP убедитесь, что общее количество открытых сокетов ограничено.
Sleep - это будущее, которое не выполняет никакой работы и завершается в определенное Instant время.
Interval - это поток, дающий значение в фиксированный период. Он инициализируется с помощью a Duration и многократно выдается каждый раз, когда истекает продолжительность.
Timeout - оборачивает будущее или поток, устанавливая верхнюю границу времени, в течение которого ему разрешено выполнение. Если будущее или поток не завершаются вовремя, то оно отменяется и возвращается ошибка.
Поток — это асинхронная серия значений. Это асинхронный эквивалент Rust std::iter::Iterator и представлен признаком Stream.
Потоки можно повторять в async функциях.
Их также можно трансформировать с помощью адаптеров.
Tokio предоставляет ряд распространенных адаптеров для этой StreamExt особенности.
Tokio предоставляет поддержку потоков в отдельном ящике: tokio-stream.
Частичный асинхрон
Варианты использования асинхронного кода в синхронной среде
Создайте Runtime и вызовите асинхронный код в block_on.
Создайте Runtime и вызовите spawn.
Запустите Runtime в отдельной ветке и отправляйте в нее сообщения.
Общее состояние
Какой Mutex использовать
В Токио есть несколько разных способов поделиться состоянием.
Защитите общее состояние с помощью Mutex.
Создайте задачу для управления состоянием и используйте передачу сообщений для работы с ней.
Если один фрагмент данных должен быть доступен из нескольких задач одновременно, его необходимо использовать совместно с использованием примитивов синхронизации, таких как Arc
Обратите внимание, std::sync::Mutex а не tokio::sync::Mutex используется для охраны HashMap.
Распространенной ошибкой является безоговорочное использование tokio::sync::Mutex асинхронного кода.
Асинхронный мьютекс - это мьютекс, который заблокирован для вызовов .await.
Синхронный мьютекс блокирует текущий поток, ожидая получения блокировки.
Это, в свою очередь, заблокирует обработку других задач.
Однако переключение на tokio::sync::Mutex обычно не помогает, поскольку асинхронный мьютекс использует синхронный мьютекс внутри.
Как показывает практика, использование синхронного мьютекса из асинхронного кода нормально, пока конкуренция остается низкой и блокировка не удерживается между вызовами .await.
Кроме того, рассмотрите возможность использования parking_lot::Mutex в качестве более быстрой альтернативы std::sync::Mutex
tokio::sync::Mutex Основная особенность мьютекса Tokio заключается в том, что его можно .await без проблем удерживать в сети.
Тем не менее, асинхронный мьютекс дороже обычного мьютекса, и обычно лучше использовать один из двух других подходов.
Когда происходит борьба за блокировку, поток, выполняющий задачу, должен блокировать и ждать мьютекса. Это не только заблокирует текущую задачу, но также заблокирует все другие задачи, запланированные в текущем потоке.
По умолчанию среда выполнения Tokio использует многопоточный планировщик. Задачи планируются для любого количества потоков, управляемых средой выполнения. Если запланировано выполнение большого количества задач и всем им требуется доступ к мьютексу, возникнет конкуренция. С другой стороны, если используется current_thread разновидность среды выполнения, то мьютекс никогда не будет оспариваться.
Среда current_thread выполнения - это легкая однопоточная среда выполнения. Это хороший выбор, когда создается всего несколько задач и открывается несколько сокетов. Например, этот вариант хорошо работает при предоставлении синхронного моста API поверх асинхронной клиентской библиотеки.
Если конкуренция за синхронный мьютекс становится проблемой, лучшим решением будет редко переключаться на мьютекс Tokio.
Вместо этого можно рассмотреть следующие варианты:
Переключение на специальную задачу для управления состоянием и передачи сообщений.
В нашем случае, поскольку каждый ключ независим, сегментирование мьютексов будет работать хорошо.
Для этого вместо одного Mutex<HashMap<_, _>> экземпляра мы вводим N отдельные экземпляры.
fn main(){
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;
// Затем поиск ячейки для любого заданного ключа становится двухэтапным процессом.
// Во-первых, ключ используется для определения того, частью какого шарда он является. Затем ключ ищется в папке HashMap.
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);
}
3. Измените структуру кода, чтобы избежать мьютекса
Например, вы можете обернуть мьютекс в структуру и заблокировать мьютекс только внутри не асинхронных методов этой структуры.
Этот шаблон гарантирует, что вы не столкнетесь с Send ошибкой, потому что защита мьютекса не появляется нигде в асинхронной функции.
use std::sync::Mutex;
struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// This function is not marked async.
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}
async fn increment_and_do_stuff(can_incr: &CanIncrement) {
can_incr.increment();
do_something_async().await;
}
1. Переключение на специальную задачу для управления состоянием и передачи сообщений
// Это Клиент, для запуска сервера $ mini-redis-server
// Передача команд
use bytes::Bytes;
use mini_redis::client;
use tokio::sync::{mpsc, oneshot};
/// Несколько разных команд мультиплексируются по одному каналу.
#[derive(Debug)]
enum Command {
Get {
key: String, resp: Responder
Пример асинхронного TCP
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
let handle = tokio::spawn(async move {
process(socket).await
});
let out = handle.await.unwrap();
println!("GOT {}", out);
}
}
async fn process(socket: TcpStream) -> bool{
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
// Respond with an error
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
return true;
}
false
}
1. TCP server_async
use tokio::net::{TcpListener,TcpStream};
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use std::error::Error;
use std::fs::File;
use std::io::{Write,BufWriter};
use tokio::time::{sleep, Duration};
use std::net::SocketAddr;
pub struct Buffer(BufWriter<std::fs::File>);
impl Buffer {
pub fn new<P: AsRef<std::path::Path>>(file: P, capacity: usize) -> Self{
Buffer(BufWriter::with_capacity(capacity, File::create(file).unwrap()))
}
pub fn write_all(&mut self,d:&[u8]) {
let _ = self.0.write(d);
}
pub fn flush(&mut self){
self.0.flush().unwrap();
}
}
async fn process_socket(mut stream: TcpStream,addr:SocketAddr) -> Result<(), Box<dyn Error>>{
let (rd_stream,wr_stream):(tokio::net::tcp::ReadHalf<'_>, tokio::net::tcp::WriteHalf<'_>) = stream.split();
rd_stream.readable().await?;
let mut store:Buffer = Buffer::new(format!("source/async_new_pictures_{}.jpg",addr),8388608);
let mut buf:Vec<u8> = vec![0u8;8192];
loop {
rd_stream.readable().await?;
match rd_stream.try_read(&mut buf) {
Ok(0) => {
println!("{} data read successfully",addr);
store.flush();
break
},
Ok(size) => {
//println!("read {} bytes", size);
if buf[0]==8u8{
sleep(Duration::from_secs(3)).await;
}
store.write_all(&buf[0..size]);
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
println!("Error WouldBlock");
continue;
},
Err(e) => {
println!("Error {}",e);
store.flush();
return Err(e.into());
}
}
}
Ok(())
}
// cargo run --bin server_async
#[tokio::main]
async fn main() -> io::Result<()> {
let listener:TcpListener = TcpListener::bind("0.0.0.0:3333").await?;
/*let std_listener = std::net::TcpListener::bind("0.0.0.0:3333")?;
std_listener.set_nonblocking(true)?;
let listener:TcpListener = TcpListener::from_std(std_listener)?;*/
listener.set_ttl(128).expect("could not set TTL");
println!("Время жизни пакета {:?}",listener.ttl().unwrap_or(0));
loop {
let (socket, addr) = listener.accept().await?;
println!("{:?}",&addr);
tokio::spawn(async move {
let _ = process_socket(socket,addr).await;
});
println!("work in progress, do something!!!");
}
}
2. TCP client_async
use tokio::net::TcpStream;
use std::error::Error;
use std::time::Duration;
use std::net::SocketAddr;
use tokio::io::BufWriter;
// cargo run --bin client_async
#[tokio::main]
async fn main() -> Result<(), Box> {
let addr = "127.0.0.1:3333".parse::().unwrap();
let stream:TcpStream = TcpStream::connect(addr).await?;
stream.set_nodelay(false).expect("set_nodelay call failed");// включить алгоритм Нэгла, накопить и отправить
// Теперь, когда вы отключите поток, продолжит ли ОС отправлять данные в фоновом режиме?
// Это зависит от задержки и от того, включена она или нет.
let _ = stream.set_linger(Some(Duration::from_secs(5)));
let stream = BufWriter::new(stream);
let mut payload:Vec = std::fs::read("source/src.jpg").unwrap();
for (_count,chank) in payload.chunks_mut(8192).enumerate(){
stream.get_ref().writable().await?;
match stream.get_ref().try_write(&chank) {
Ok(_size) => {
continue;
},
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
println!("Error WouldBlock");
continue;
},
Err(e) => {
println!("Error {}",e);
return Err(e.into());
}
};
}
Ok(())
}
Как запустить среду выполнения Tokio
use tokio::task;
// Ошибки в многопоточном асинхронные контексты требуют дополнительных ограничений
type Result = std::result::Result>;
// Теперь мы хотим как получить некоторые данные, так и провести на них интенсивный анализ процессора.
async fn get_and_analyze(n: usize) -> Result<(u64, u64)> {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
println!("Dataset {}", n);
let txt = "dlfkjgnkldngldfnknngldfknglkdfngld";
// Мы отправляем нашу аналитическую работу в поток, в котором не запущена среда выполнения.
// чтобы мы не блокировали время выполнения анализом данных
let res = tokio::task::spawn_blocking(move ||analyze(&txt)).await?;
// let res = futures::future::ok::<(u64, u64),Box>((78_u64,88_u64) ).await?;
println!("Processed {}", n);
Ok(res)
}
// Подсчитав количество единиц и нулей в байтах
fn analyze(txt: &str) -> (u64, u64) {
let txt = txt.as_bytes();
// Давайте потратим как можно больше времени и посчитаем их за два прохода
let ones = txt.iter().fold(0u64, |acc, b: &u8| acc + b.count_ones() as u64);
let zeros = txt.iter().fold(0u64, |acc, b: &u8| acc + b.count_zeros() as u64);
(ones, zeros)
}
async fn app() -> Result<()> {
// Мы можем собирать фьючерсы в коллекцию.
let mut futures = vec![];
for i in 1..=10 {
let fut = task::spawn(get_and_analyze(i));
futures.push(fut);
}
let results = futures::future::join_all(futures).await;
let mut total_ones = 0;
let mut total_zeros = 0;
// Возврат ошибок с использованием символа `?` В итераторах может быть немного сложным.
// Используя цикл for для проверки и работы с результатами часто может быть более эргономично
for result in results {
// `spawn_blocking` returns a `JoinResult` we need to unwrap first
let ones_res: Result<(u64, u64)> = result?;
let (ones, zeros) = ones_res?;
total_ones += ones;
total_zeros += zeros;
}
println!("Ratio of ones/zeros: {:.02}",total_ones as f64 / total_zeros as f64);
Ok(())
}
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
match rt.block_on(app()) {
Ok(_) => println!("Done"),
Err(e) => eprintln!("An error ocurred: {}", e),
};
}
Как создать Future
tokio::task::spawn
Задача является легким весом, неблокирующая блок исполнения (зеленые нити)
Выполняются в исполнителе tokio
Эквивалент стандартной библиотеки thread::spawn. Требуется async блок или другое будущее и создается новая задача для одновременного выполнения этой работы
Код, выполняемый в асинхронных задачах task::spawn, не должен выполнять операции, которые могут блокировать. Операция блокировки, выполняемая в задаче, выполняемой в потоке, который также выполняет другие задачи, заблокировала бы весь поток, предотвращая выполнение других задач.
use tokio::task;
fn example() -> std::future::Ready{
let f: std::future::Ready = std::future::ready("Hello world ex_1".to_string());
f
}
fn main(){
let future = async {
// Требуется async блок или другое будущее и создается новая задача для одновременного выполнения этой работы:
// let concurrent_future: tokio::task::JoinHandle = tokio::task::spawn(example());
let concurrent_future: tokio::task::JoinHandle = tokio::task::spawn(std::future::ready("Hello world".to_string()));
concurrent_future.await
};
let mut rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(future);
println!("{:?}",result);
}
Пример:
async fn work(sleep:usize) -> String{
let mut s = "".to_owned();
for i in (0..10000000*sleep){
s.push_str(&format!("{}",i));
}
println!("Sleep {}",sleep);
std::future::ready(format!("Sleep {}",sleep)).await
}
fn main(){
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(
async {
// не блокирующие вычисления
tokio::join!(
tokio::task::spawn(async {
work(5).await;
}),
tokio::task::spawn(async {
work(1).await;
}),
tokio::task::spawn(async {
work(4).await;
}),
tokio::task::spawn(async {
work(5).await;
})
);
// Output: Sleep 1, Sleep 4, Sleep 5, Sleep 5 ( вариант 5 не ждал 5 секунд, они работали когда шло время у 1 и 4 секунд)
// Варианты ниже не будут работать параллельно так как в ф-ции work нет await на долгую работу и переключения не будет!!!
// 1. Этот вариант без `tokio::task::spawn` блокирует следующие await пока предыдущие не отработают
tokio::join!(
work(5),work(1),work(4)
);
// Output: Sleep 5, Sleep 1, Sleep 4
// 2. Или так, все равно await тут не переключается на другой await т.е. блокирует
work(5).await;
work(1).await;
work(4).await;
}
);
}
Как вызвать блокирующие или ресурсоёмкие задачи
tokio::task::spawn_blocking , block_in_place
Так же, как task::spawn, task::spawn_blocking возвращает JoinHandle
В отличие от task::spawn где не блокируется поток, spawn_blocking вместо этого порождает функцию блокировки в выделенном пуле потоков для блокировки задач
Что, если я захочу заблокировать?
Иногда мы просто хотим заблокировать поток. Это совершенно нормально. Для этого есть две распространенные причины:
Дорогие вычисления, связанные с ЦП.
Синхронный ввод-вывод.
В обоих случаях мы имеем дело с операцией, которая не позволяет задаче достичь ожидаемого значения в течение длительного периода времени.
Чтобы решить эту проблему, мы должны переместить операцию блокировки в поток за пределами пула потоков Tokio.
Есть три варианта этого:
Используйте tokio::task::spawn_blocking функцию.
Используйте rayon crate.
Создайте специальный поток с помощью std::thread::spawn.
Это обычная проблема при написании асинхронного кода в целом. Если вы хотите воспользоваться средой выполнения, которая запускает ваш код одновременно, вам следует избегать блокировки или запуска кода, интенсивно использующего ЦП, в самих Futures.
(т.е. запуск нового потока в котором нет среды выполнения, находят в асинхронном коде, что бы не блокировать текущий поток)
use tokio::task;
// Ошибки в многопоточном асинхронные контексты требуют дополнительных ограничений
type Result = std::result::Result>;
fn fib_cpu_intensive(n: u32) -> u32 {
match n {
0 => 0,
1 => 1,
n => fib_cpu_intensive(n - 1) + fib_cpu_intensive(n - 2),
}
}
async fn app() -> Result<()> {
// сдесь помимо асинхронных задач с .await мы можем запустить отдельный поток для задачи что бы не блокировать текущий аснхронный поток
let threadpool_future = task::spawn_blocking(||fib_cpu_intensive(30));
let res = threadpool_future.await?;
println!("fib={}",res);
Ok(())
}
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
match rt.block_on(app()) {
Ok(_) => println!("Done"),
Err(e) => eprintln!("An error ocurred: {}", e),
};
}
Эти две черты предоставляют возможности для асинхронного чтения и записи в байтовые потоки.
Методы этих признаков обычно не вызываются напрямую ,вместо этого вы будете использовать их с помощью AsyncReadExt, AsyncWriteExt
При традиционном блокировании ввода-вывода приложение останавливается, ожидая завершения каждой операции ввода-вывода, прежде чем продолжить работу. Это может привести к проблемам с производительностью и ограничить масштабируемость приложения. Чтобы решить эти проблемы, мы обратимся к асинхронному вводу-выводу.
use tokio::io::{self, BufWriter, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let f = File::create("foo.txt").await?;
{
let mut writer = BufWriter::new(f);
// Записать байт в буфер
writer.write(&[42u8]).await?;
// Очистить буфер до того, как он выйдет за пределы области действия.
writer.flush().await?;
} // Если он не очищен или не выключен, содержимое буфера удаляется при drop
Ok(())
}
use tokio::io::{BufReader, AsyncBufReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let f = File::open("foo.txt").await?;
let mut reader = BufReader::new(f);
let mut buffer = String::new();
// Read a line into the buffer
reader.read_line(&mut buffer).await?;
println!("{}", buffer);
Ok(())
}
AsyncReadExt::read предоставляет асинхронный метод для чтения данных в буфер, возвращающий количество прочитанных байтов.
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
// read up to 10 bytes
let n = f.read(&mut buffer[..]).await?;
println!("The bytes: {:?}", &buffer[..n]);
Ok(())
}
AsyncReadExt::read_to_end читает все байты из потока до EOF
use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = Vec::new();
// read the whole file
f.read_to_end(&mut buffer).await?;
Ok(())
}
AsyncWriteExt::write записывает буфер в писатель, возвращая, сколько байтов было записано
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("foo.txt").await?;
// Writes some prefix of the byte string, but not necessarily all of it.
let n = file.write(b"some bytes").await?;
println!("Wrote the first {} bytes of 'some bytes'.", n);
Ok(())
}
AsyncWriteExt::write_all записывает весь буфер в писатель
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut buffer = File::create("foo.txt").await?;
buffer.write_all(b"some bytes").await?;
Ok(())
}
tokio::io::copy асинхронно копирует все содержимое модуля чтения в модуль записи.
use tokio::fs::File;
use tokio::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("foo.txt").await?;
io::copy(&mut reader, &mut file).await?;
Ok(())
}
use std::io::Result;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::ReadBuf;
struct MemBuffer {
data: Vec,
read_pos: usize,
}
impl MemBuffer {
fn new() -> Self {
Self {
data: Vec::new(),
read_pos: 0,
}
}
}
impl AsyncRead for MemBuffer {
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context,
buf: &mut ReadBuf<'_>,
) -> Poll> {
// Calculate the number of available bytes to read
let available = self.data.len() - self.read_pos;
// Determine the number of bytes to read, taking the minimum of the remaining capacity of ReadBuf and the available bytes
let bytes_to_read = available.min(buf.remaining());
// Get the slice of data to be read from MemBuffer
let data = &self.data[self.read_pos..self.read_pos + bytes_to_read];
// Put the data slice into the ReadBuf
buf.put_slice(data);
// Update the read position in MemBuffer
self.read_pos += bytes_to_read;
Poll::Ready(Ok(()))
}
}
impl AsyncWrite for MemBuffer {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut Context,
buf: &[u8],
) -> Poll> {
self.data.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> {
// Since our buffer is in-memory, we don't need to do anything to flush.
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> {
// No special shutdown procedure is required for our in-memory buffer.
Poll::Ready(Ok(()))
}
}
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> std::result::Result<(), Box> {
let mut buffer = MemBuffer::new();
// Write data to the buffer
buffer.write_all(b"Hello, world!").await?;
// Reset the read position to read from the beginning
buffer.read_pos = 0;
// Read the data back from the buffer
let mut read_buf = vec![0; 13];
buffer.read_exact(&mut read_buf).await?;
assert_eq!(read_buf, b"Hello, world!");
println!("Successfully read data: {:?}", String::from_utf8(read_buf)?);
Ok(())
}
Это асинхронный эквивалент std::iter::Iterator который представлен Stream трейтом
Читать из steam/потока пока есть данные
Tokio предоставляет ряд распространенных адаптеров для StreamExt трейта
tokio-stream = "0.1"
В настоящее время в tokio-stream ящике находятся утилиты Tokio's Stream. Как только Stream трейт стабилизируется в стандартной библиотеке Rust, потоковые утилиты Tokio будут перемещены в tokio ящик
Асинхронные циклы итерация потоков
с while let в паре с StreamExt::next()
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
let doubled: Vec =
stream::iter(vec![1, 2, 3])
.map(|x| x * 2)
.collect()
.await;
assert_eq!(vec![2, 4, 6], doubled);
}
fn main(){
let content = String::from("Mary had a little lamb");
let mut stream = tokio_stream::iter(content.split(" "))
.map(|msg|msg.to_uppercase());
// let content: Vec = stream.collect().await;
while let Some(v) = stream.next().await {
println!("GOT = {:?}", v);
}
}
Преобразовать sync::mpsc::Receiver в impl Stream
async_stream::stream!
Преобразовать sync::mpsc::Receiver в impl Stream.
use tokio::sync::mpsc;
fn main(){
let (tx, mut rx) = mpsc::channel::(16);
let stream = async_stream::stream! {
while let Some(item) = rx.recv().await {
yield item;
}
};
}
use bytes::Bytes;
use tokio::io::{AsyncReadExt, Result};
use tokio_util::io::StreamReader;
fn main(){
// Create a stream from an iterator.
let stream = tokio::stream::iter(vec![
Result::Ok(Bytes::from_static(&[0, 1, 2, 3])),
Result::Ok(Bytes::from_static(&[4, 5, 6, 7])),
Result::Ok(Bytes::from_static(&[8, 9, 10, 11])),
]);
// Convert it to an AsyncRead.
let mut read = StreamReader::new(stream);
// Read five bytes from the stream.
let mut buf = [0; 5];
read.read_exact(&mut buf).await?;
assert_eq!(buf, [0, 1, 2, 3, 4]);
// Read the rest of the current chunk.
assert_eq!(read.read(&mut buf).await?, 3);
assert_eq!(&buf[..3], [5, 6, 7]);
// Read the next chunk.
assert_eq!(read.read(&mut buf).await?, 4);
assert_eq!(&buf[..4], [8, 9, 10, 11]);
// We have now reached the end.
assert_eq!(read.read(&mut buf).await?, 0);
}
Stream::poll_next() Функция очень похожа на Future::poll, за исключением того, что можно назвать несколько раз, чтобы получить много значений из потока.
Обычно при ручной реализации a Stream это делается путем компоновки фьючерсов и других потоков.
Поток, который выдает () три раза с интервалом 10 мс.
use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
struct Interval {
rem: usize,
delay: Delay,
}
impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<()>>
{
if self.rem == 0 {
// No more delays
return Poll::Ready(None);
}
match Pin::new(&mut self.delay).poll(cx) {
Poll::Ready(_) => {
let when = self.delay.when + Duration::from_millis(10);
self.delay = Delay { when };
self.rem -= 1;
Poll::Ready(Some(()))
}
Poll::Pending => Poll::Pending,
}
}
}
Реализация потоков вручную с использованием Stream трейта может быть утомительной.
К сожалению, язык программирования Rust пока не поддерживает async/await синтаксис для определения потоков.
async-stream crate доступен в качестве временного решения. Этот ящик предоставляет stream! макрос, который преобразует ввод в поток.
Используя этот ящик, указанный выше интервал можно реализовать следующим образом:
use async_stream::stream;
use std::time::{Duration, Instant};
fn main(){
stream! {
let mut when = Instant::now();
for _ in 0..3 {
let delay = Delay { when };
delay.await;
yield ();
when += Duration::from_millis(10);
}
}
}