Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Тема
Описание
Доп.

📌 Собрать все до кучи, плохая структура

📌 Новый раздел

  • 👾 Best practice
  • Варианты применения

Повод использовать асинхронность по сравнению с одной из моделей параллелизма (Потоки ОС)

Потоки ОС не требуют каких-либо изменений в модели программирования, что упрощает реализацию параллелизма. Однако синхронизация между потоками может быть затруднена, а издержки производительности велики. Пулы потоков могут снизить некоторые из этих затрат, но не настолько, чтобы поддерживать массивные рабочие нагрузки, связанные с вводом-выводом.

Отличие асинхронности от выделения отдельного потока

02_why_async

Наивный способ написать приложение, которое одновременно работает со многими вещами, - это создать новый поток для каждой задачи. Если количество задач невелико, это прекрасное решение, но по мере того, как количество задач становится большим, вы в конечном итоге столкнетесь с проблемами из-за большого количества потоков.

Главная цель асинхронного выполнения — эффективное использование потоков для обработки большого количества входящих событий, таких как сетевые запросы. Если в таком окружении выполняется тяжелая задача с высокой нагрузкой на ЦП, которая не выполняет ожидания ввода-вывода (например, если вам нужно выполнить ресурсозатратные операции, такие как обработка больших массивов данных, шифрование или рендеринг. Блокирующий ввод-вывод. Операции, требующие синхронного взаимодействия с файловой системой или базой данных, которая не поддерживает асинхронный интерфейс. Тогда используйте tokio::spawn_blocking), это может блокировать поток, мешая обработке других асинхронных задач. Это называется блокировкой среды выполнения. Решение: tokio::spawn_blocking Tokio предоставляет метод tokio::spawn_blocking для выполнения подобных блокирующих задач. Вот что происходит: tokio::spawn_blocking запускает задачу в отдельном пуле потоков, называемом blocking thread pool. Этот пул используется специально для задач, которые могут блокировать выполнение (например, вычислительные задачи или операции, требующие взаимодействия с диском). Главный пул потоков (где выполняются асинхронные задачи) продолжает работать, обрабатывая новые входящие события.

Асинхронный код позволяет нам запускать несколько задач параллельно(Concurrency) в одном потоке ОС. И выделенный поток так же может запускать несколько задач параллельно в одном потоке ОС но в процессе переключения между разными потоками и обменом данными между ними возникает много накладных расходов. Даже поток, который сидит и ничего не делает, использует ценные системные ресурсы. Асинхронный код предназначен для устранения этих проблем. В целом, асинхронные приложения могут быть намного быстрее и использовать меньше ресурсов, чем соответствующая многопоточная реализация. Важно помнить, что традиционные приложения с потоками могут быть вполне эффективными и предсказуемость Rust и небольшой объём используемой памяти могут значить, что вы можете далеко продвинуться и без использования async.

Возможность отменить операцию на лету — одна из главных причин использования асинхронного подхода


Синхронная функция в скомпилированном виде - это цельный кусок машинного кода. Мы не можем остановить ее выполнение посередине или даже поставить на паузу, если мы не хотим/не можем пользоваться инструментами, которые нам предоставляет ядро (например, таким инструментом будет SIGINT+обработчик, или выведение синхронной функции в отдельный поток, а затем остановка этого потока из другого потока нашей программы)

Напротив, асинхронная функция - это конечный автомат, по сути - множество синхронных функций с промежуточными точками между ними. Каждая (кроме последней) из этих синхронных функций, представляющих собой этапы выполнения большой асинхронной функции, возвращает нам "Я частично готова, можно пока выполнить что-то еще". Последняя же возвращает сам результат. Мы можем прервать выполнение асинхронной функции в любой такой точке, можем послать на исполнение (в т.ч. так же с середины) другую асинхронную или синхронную функцию, а затем вернуться к первой, а когда в Rust завезут async iters - сможем в таких точках получать промежуточные значения и сразу посылать их дальше. С синхронной функцией мы такого сделать не можем.

Собственно, в этом и есть главный плюс и главный минус асинхронного подхода. С одной стороны, неблокирующее IO благодаря промежуточным точкам. С другой стороны - при неаккуратном использовании можно получить то, что получил автор в первом примере - асинхронная функция выполнилась частично, была остановлена на середине, а поскольку она внутри себя меняла некоторую внешнюю структуру - эта самая внешняя структура была оставлена в не консистентном состоянии.


Например, для расчетных задач, где нет большой работы с файлами или сетью и зависимостей между ними потоки могут стать более предпочтительным выбором. Для сервиса — асинхронность работает лучше, потому что сеть отвечает не мгновенно, да и человек на том конце соединения может "задуматься".

Вызов асинхронного метода обычно используется для процесса, который должен выполнять работу вдали от текущего приложения и мы не хотим ждать и блокировать наше приложение, ожидающее ответа.

Например, получение данных из базы данных может занять некоторое время, но мы не хотим блокировать наш пользовательский интерфейс, ожидающий данных. Асинхронный вызов берет ссылку обратного вызова и возвращает выполнение обратно в ваш код, как только запрос будет помещен в удаленную систему.

Асинхронность

Асинхронность позволяет создавать меньше потоков по сравнению с синхронной моделью Асинхронное программирование в основном занимается concurrency, но если используется среда выполнения, поддерживающая многопоточность (например, Tokio с рабочими потоками), возможно и сочетание concurrency и parallelism.

При очень больших масштабах потоки слишком дороги; они используют больше памяти, чем необходимо, и значительная стоимость при переключении контекста. При асинхронной работе поток запоминает что в него нужно вернуться если он застрял на какой то задаче. На высокопроизводительном сервере тысячи задач могут ждать и обрабатывать вход одновременно, используя только один поток ОС (или один поток ОС на ядро).

Когда использовать асинхронный код

Если вы столкнулись с ситуацией, когда вам может понадобиться один из следующих блокирующих модулей (std::sync, std::thread, std::fs, std::net), вы должны проверить, есть ли в вашей среде выполнения асинхронная альтернатива для задачи, которую вы хотите выполнить. Если в вашей среде выполнения нет эквивалента, вы можете использовать spawn_blocking и выполнять операцию в пуле потоков, как если бы вы делали это с задачами, интенсивно использующими процессор, и await результатом.

std::sync: Вместо стандартных примитивов синхронизации (например, Mutex или RwLock из std::sync), используйте их асинхронные аналоги, такие как tokio::sync::Mutex или tokio::sync::RwLock.

std::thread: Вместо создания потоков вручную (std::thread::spawn), используйте асинхронные задачи (tokio::spawn), которые более легковесны, так как не создают отдельный поток.

std::fs: Для асинхронного взаимодействия с файловой системой используйте, например,** tokio::fs**, который предоставляет такие методы, как tokio::fs::read или tokio::fs::write.

std::net: Для работы с сетью используйте асинхронные аналоги, такие как tokio::net::TcpStream или tokio::net::TcpListener.

Если альтернативы нет, используйте spawn_blocking.

В таких ситуациях использование tokio::task::spawn_blocking — правильный подход. Этот метод позволяет передать блокирующую операцию в специальный пул потоков, выделенный для таких задач. Это предотвращает блокировку основной асинхронной среды выполнения.

Асинхронная среда выполнения, такая как Tokio, работает по принципу кооперативной многозадачности. Если вы выполняете блокирующую операцию (например, чтение файла через std::fs), она блокирует поток, на котором выполняется, что мешает другим задачам работать.

spawn_blocking решает эту проблему, передавая задачу в пул потоков, предназначенный для таких операций. Это сохраняет производительность и отзывчивость асинхронной среды.

Отличие Многопоточности от Асинхронной модели

В Rust доступны как однопоточные, так и многопоточные среды выполнения асинхронного кода, которые имеют разные сильные и слабые стороны.

Асинхронный однопоточный - работает и приостанавливает и начинает работает постоянно с несколькими задачами прерывая их

Асинхронный многопоточный - все потоки могут работать в асинхронной модели т.е. все задачи прыгаю по всем потокам

Синхронный однопоточный - задачи идут одна за одной полностью заканчивается и начинается следующая

Синхронный многопоточный - на каждый поток своя полностью задача после ее завершения берет свободную не начатую задачу

Отличие Многопоточности от Асинхронной модели

В синхронном многопоточном коде каждый поток работает со своей задачей полностью ее завершая и приступая к новой задаче беря ее целиком. В асинхронном многопоточном коде все потоки работают со всеми задачами разбив их на кусочки

ввод-вывод и потоки

В то время как потоки представляют собой решение проблем, связанных с процессором(когда мало ядер), то для проблем, связанных с вводом-выводом (проблема в том что процессор намного быстрее обрабатывает задачи чем они вводятся потоком IO) , традиционно решением является асинхронный ввод-вывод

Давайте предположим, что вы хотите собрать что-то наподобие веб-сервера. Он будет обрабатывать тысячи запросов в каждый момент времени (проблема c10k). Говоря общими словами, рассматриваемая нами проблема состоит из многих задач, выполняющих в основном I/O операции (особенно связанных с сетевым взаимодействием).

«Одновременная обработка N задач» — такая задача лучше всего решается использованием threads потоков. Однако… Тысячи потоков? Наверное, это слишком много. Работа с потоками может быть довольно ресурсозатратной: каждый поток должен выделить большой стек (stack), настроить поток, используя набор системных вызовов. Ко всему прочему переключение контекста тоже затратно.

Конечно, тысячи одновременно работающих потоков не будет: вы имеете ограниченное число ядер (core), и в любой момент времени только один поток будет исполняться на этом ядре.

Но в примерах подобных данному веб-серверу большинство этих потоков не будут делать никакой работы. Они будут ждать или поступления запроса, или отправки ответа.

C обычными потоками, когда вы производите блокирующую I/O операцию, системный вызов возвращает управление ядру, которое не возвратит управление обратно потоку, потому что, вероятно, I/O операция еще не завершилась. Вместо этого ядро будет использовать данный момент как возможность «подгрузить» (swap in) другой поток и продолжить выполнение исходного потока (начавшей I/O операцию) когда I/O операция будет завершена, то есть когда исходный поток будет «разблокирована» (unblocked). Вот так вы решаете такие задачи в Rust, когда не используете Tokio и подобные ей библиотеки — запускаете миллион потоков и позволяете ОС самостоятельно планировать (schedule) запуск и завершение потоков в зависимости от I/O.

Но, как мы уже выяснили, использование потоков не очень хорошо масштабируется для задач, подобных данной

Зелёные потоки в Rust В ранних версиях Rust действительно использовались зелёные потоки (green threads). Зелёные потоки — это лёгкие, абстрактные потоки, реализованные на уровне среды выполнения, а не операционной системы. Они имеют следующие особенности:

  • Позволяют управлять потоками самостоятельно, без участия ОС.
  • Могут эффективно переключаться между задачами.
  • Но требуют сложной среды выполнения (runtime), что увеличивает накладные расходы и усложняет отладку. Rust отказался от зелёных потоков ещё в версии 1.0, чтобы минимизировать накладные расходы и дать пользователям больше контроля над производительностью. Это решение было принято, чтобы Rust соответствовал своей философии "нулевая стоимость абстракции" (zero-cost abstractions).

Подход Rust: минимализм и контроль В отличие от Go и Node.js, Rust поставляется без встроенной среды выполнения (runtime) для асинхронности. Вместо этого он предлагает асинхронный синтаксис и ключевые ингредиенты, которые можно использовать для создания кастомной среды выполнения. Основные моменты:

  • Нет встроенного runtime: Rust предоставляет минимальный синтаксис (async/await) и API (Future, Pin, Waker), но сам по себе не реализует полноценный runtime. В итоге разработчики могут выбирать сторонние библиотеки (например, Tokio, async-std, или smol).
  • Асинхронные концепции: Rust понимает асинхронные задачи благодаря концепции Future — это "обещание" выполнить что-то в будущем. Комбинаторы Future и встроенные примитивы позволяют разработчикам эффективно использовать асинхронность, при этом сохраняя контроль над производительностью.

Rust предоставляет инструменты для создания runtime Rust предоставляет несколько ключевых "ингредиентов" для построения асинхронной среды выполнения:

  • Future: Основной строительный блок асинхронного программирования.
  • Pin и Waker: Механизмы для управления состоянием и сигнализации о готовности задачи.
  • Синтаксис async/await: Удобный способ определения асинхронных операций.
  • Интеграция с системой типов: Rust гарантирует безопасность при работе с конкурентными задачами (например, через Send и Sync). Среды выполнения, такие как Tokio, используют эти ингредиенты для создания высокопроизводительных и гибких runtime.

Модель «polling»

Отложенные вычисления

Модель «polling»: В отличие от языков с моделью «callback» (например, JavaScript), где вы работаете с колбэками и обещаниями, Rust использует модель опроса (polling), что означает, что Future должен быть опрошен, чтобы проверить, готов ли он к выполнению. Это делает асинхронный код более предсказуемым и легко управляемым.

Отложенные вычисления: В Rust асинхронные функции, такие как async { ... }, создают объекты Future, которые не выполняются до тех пор, пока их не опросят. В отличие от Python или JavaScript, где await вызывает выполнение немедленно, в Rust await инициирует опрос, и код внутри async блока не выполняется сразу.

Основы

‌Асинхронное приложение должно извлекать минимум два ящика из экосистемы Rust:

  1. futures, официальный ящик Rust, который живет в репозитории rust-lang
  2. Среда выполнения, такие как Tokio, async_std, smol и т.д.

Trait std::future::Future и модуль std::task. Эти сущности связывают программы и библиотеки с разными средами асинхронного исполнения (пример такой среды — Tokio)

Выбор среды выполнения (В Rust доступны как однопоточные, так и многопоточные среды выполнения.

Исполнители Async могут быть однопоточными или многопоточными.

Например, crate async-executor обрешетка бывает как однопоточной, так LocalExecutor и многопоточной Executor)

Самим языком Rust исполнитель не предоставляется.

Доступны несколько исполнителей в виде библиотек.

На практике нужны асинхронные средства межзадачной связи, утилиты ввода-вывода и так далее.

Большинство важных библиотек используют Toki , зрелую среду выполнения производственного качества (которая фактически предшествует современным функциям асинхронного языка Rust)

Многопоточные среды выполнения Примеры: tokio многопоточный планировщик , async-executor of async-std, futures::executor::ThreadPool

Однопоточные среды выполнения Примеры: tokio планировщик текущего потока, tokio-uring, futures::executor::LocalPool

Асинхронные среды выполнения

Асинхронные среды выполнения - это библиотеки, используемые для выполнения асинхронных приложений. Среды выполнения обычно объединяют реактор с одним или несколькими исполнителями.

Реакторы предоставляют механизмы подписки для внешних событий, таких как асинхронный ввод-вывод, межпроцессное взаимодействие и таймеры.

В асинхронной среде выполнения подписчики обычно являются фьючерсами, представляющими низкоуровневые операции ввода-вывода.

Исполнители занимаются планированием и выполнением задач. Они отслеживают запущенные и приостановленные задачи, опрашивают будущее до завершения и "будят" задачи, когда они могут добиться прогресса. Слово «исполнитель» часто используется как синоним слова «время выполнения». Здесь мы используем слово «экосистема» для описания среды выполнения, объединенной с совместимыми характеристиками и функциями.

futures-rs имеет собственный исполнитель, но не собственный реактор, поэтому он не поддерживает выполнение асинхронного ввода-вывода или фьючерсов таймера. По этой причине он не считается полноценной средой выполнения. Распространенный выбор - использовать утилиты из futures с исполнителем из другого ящика.

Популярные асинхронные среды выполнения

В стандартной библиотеке нет асинхронной среды выполнения, и она официально не рекомендуется. Следующие ящики предоставляют популярные среды выполнения.

  • Tokio: популярная асинхронная экосистема с HTTP, gRPC и фреймворками трассировки.
  • async-std: ящик, который предоставляет асинхронные аналоги стандартных библиотечных компонентов.
  • smol: небольшая упрощенная асинхронная среда выполнения. Предоставляет Trait Async, который можно использовать для обертывания таких структур, как UnixStream или TcpListener.
  • fuchsia-async: исполнитель для использования в ОС Fuchsia.

Асинхронный код никогда не должен долго находиться, не достигнув .await.

Ниже вы найдете памятку о том, какие методы вы можете использовать, если хотите заблокировать:

МетодВычисления с привязкой к ЦПСинхронный ввод-выводБеги вечно
spawn_blockingНе оптимальноOkНет
rayonOkНетНет
Выделенная веткаOkOkOk

Асинхронное программирование невозможно без поддержки неблокирующего ввода-вывода

Ржавчина экосистема имеет crate mio ящик, который обеспечивает очень сырые примитивы для асинхронной I/O, и Tokio ящик, который обеспечивает высокого уровень фьючерсы основанной абстракцию для использования асинхронного ввода/вывода

crate mio (Metal IO) играет фундаментальную роль в реализации асинхронности в Rust, особенно в низкоуровневых библиотеках и runtime, таких как Tokio. Оно предоставляет интерфейс для мониторинга событий ввода-вывода, используя нативные механизмы операционных систем, такие как epoll (Linux), kqueue (macOS, BSD), и IOCP (Windows).

Основные функции Mio:

  • Механизм мониторинга событий: Mio позволяет наблюдать за событиями на файловых дескрипторах, сокетах и других ресурсах, не блокируя поток. Например, вы можете узнать, когда сокет готов для чтения или записи.
  • Базовый элемент асинхронного runtime: Mio предоставляет минимальный интерфейс для управления событиями, который используется в высокоуровневых фреймворках, таких как Tokio, для построения эффективных асинхронных систем.
  • Платформонезависимость: Mio абстрагирует работу с механизмами мониторинга событий, что позволяет писать переносимый код.

Роль Mio в асинхронности Rust

  • Низкоуровневая основа для runtime: Mio предоставляет базовые строительные блоки для создания асинхронных runtime, таких как: Tokio: Использует Mio для управления событиями ввода-вывода в своем многопоточном и однопоточном режиме выполнения. Actix: Внутренне использует Mio для обработки событий.
  • Асинхронный ввод-вывод: Mio управляет событиями ввода-вывода (например, готовностью сокета) и предоставляет интерфейс для обработки этих событий без блокировки. Вместо использования стандартных блокирующих операций (например, read() или write()), вы можете регистрировать события и быть уведомлены, когда операция доступна.
  • Минимальные накладные расходы: Mio ориентирован на производительность. Он не включает сложной логики, такой как задачи и планировщики, оставляя это runtime высокого уровня (например, Tokio).

Когда использовать Mio напрямую:

  • Если вы создаете собственный runtime или у вас есть уникальные требования к обработке событий.
  • Для обучения и понимания, как работают низкоуровневые механизмы асинхронности.
  • Для большинства задач рекомендуется использовать более высокоуровневые библиотеки, такие как Tokio.

std::future::Future

У std::future::Future мало возможностей, есть только две структуры:

  • Ready,
  • Pending

И две функции:

  • ready(T),
  • pending()

async-book

futures-tutorial

Введение в futures-rs: асинхронщина на Rust

Future — трейт для асинхронных (конкурентных) вычислений с poll-based кооперативной моделью исполнения. Объекты Future являются программными компонентами первого класса: мы можем с ними обращаться как нам вздумается, а сами по себе они ничего не делают — чтобы их выполнить, необходимо вызывать метод Future::poll до того момента, когда он вернёт Poll::Ready(Output). Обычно этим занимается отдельный модуль, называемый экзекьютором (или планировщиком) Далее — про то, что позволяет безжизненные футуры превратить в работающий код: про асинхронную среду исполнения Tokio

Базовый примитив async story в Rust - это абстракция будущего (в других языках программирования ее можно назвать «обещанием»). Вот основные концепции, которые отличают реализацию фьючерсов в Rust от других языков программирования:

1.Futures основаны на опросах, а не на push-уведомлениях. Это означает, что после создания future не будет выполняться автоматически на месте, а скорее должен быть явно выполнен каким-либо исполнителем (runtime / event-loop для futures). Future ничего не делает, если не опрашивается , поэтому обычно представляет собой ленивые вычисления .

2.Futures нулевые. Это означает, что код, написанный на futures, компилируется до чего-то эквивалентного (или лучшего, чем) «ручная» реализация, которая обычно использует ручные конечные автоматы и тщательное управление памятью.

Где используют Futures типаж Future, который полезен в случае вычисления всего лишь одного значения в течение всего времени

Запрос базы данных, выполняемый в пуле потоков. Когда запрос завершается, будущее завершается, и его значение является результатом запроса.

Вызов RPC для сервера. Когда сервер отвечает, будущее завершается, и его значение является ответом сервера.

Тайм-аут. Когда время закончится, будущее будет завершено, и его значение будет справедливым () (значение «единицы» в Rust).

Долгосрочная задача с интенсивным процессором, работающая в пуле потоков. Когда задача завершается, будущее завершается, и ее значение является возвращаемым значением задачи.

Чтение байтов из сокета. Когда байты готовы, будущее будет завершено - и в зависимости от стратегии буферизации байты могут быть возвращены напрямую или записаны как побочный эффект в некоторый существующий буфер.

Пример функций которые могут возвращать Futures:

// Поиск строки в таблице по указанному идентификатору, выдача строки по завершении
fn get_row(id: i32) -> impl Future<Item = Row>;

// Выполняет вызов RPC, который выдаст i32
fn id_rpc(server: &RpcServer) -> impl Future<Item = i32>;

// Записывает всю строку в TcpStream, возвращая поток по завершении
fn write_string(socket: TcpStream, data: String) -> impl Future<Item = TcpStream>;

Для чего может потребоваться реализовывать std::future::Future

1. Создание кастомных асинхронных задач

Если стандартные абстракции, предоставляемые библиотеками вроде async-std или tokio, недостаточны для ваших нужд, вы можете создать свой собственный тип, реализующий Future, чтобы кастомизировать поведение асинхронной задачи.

Пример: Реализация асинхронного таймера или специфической логики, которая требует управления состоянием внутри Future.

Реализация трейта std::future::Future в Rust нужна для создания объектов, которые представляют собой отложенные вычисления, результат которых станет доступен в будущем. Это ключевой компонент асинхронного программирования в Rust.

[dependencies]
futures = "0.3"

/// Этот пример показывает минимальную реализацию std::future::Future для задачи с задержкой на один цикл выполнения.
/// Т.е. при первом опросе `poll` изменяыется состояние обьекта которые при втором опросе 
/// отдаст результат завершения `Poll::Ready`
mod one_execution_cycle{
    use futures::task::Context;
    use std::future::Future;
    use std::pin::Pin;
    use std::task::Poll;
    struct SimpleFuture{
        completed: bool,
    }
    impl Future for SimpleFuture {
        type Output = u32;
    
        fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll {
            if self.completed {
                Poll::Ready(42) // Возвращаем результат, когда готовы
            } else {
                self.completed = true;
                Poll::Pending // Говорим, что нужно подождать
            }
        }
    }
    
    // Output:
    // Still pending...
    // 42
    pub fn run() {
        let mut future = SimpleFuture { completed: false };
        let waker = futures::task::noop_waker();
        let mut context = Context::from_waker(&waker);
        let mut pinned = Box::pin(future);
        loop{
            match pinned.as_mut().poll(&mut context) {
                Poll::Ready(result) => {println!("{}", result);break;},
                Poll::Pending => {println!("Still pending...");},
            }        
        }
    }
}
fn main(){
   one_execution_cycle::run();
}

Для чего может потребоваться реализовывать std::future::Future

1. Создание кастомных асинхронных задач

Если стандартные абстракции, предоставляемые библиотеками вроде async-std или tokio, недостаточны для ваших нужд, вы можете создать свой собственный тип, реализующий Future, чтобы кастомизировать поведение асинхронной задачи.

Пример: Реализация асинхронного таймера или специфической логики, которая требует управления состоянием внутри Future.

Если ваш Future должен ждать несколько циклов выполнения, то его состояние должно отслеживать, сколько времени или попыток прошло, прежде чем он сможет завершиться. В этом случае вам нужно сохранять информацию о промежуточных состояниях в структуре Future и использовать метод poll для отслеживания этих состояний.


mod multiple_execution_cycle{
    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll};
    use std::time::{Duration, Instant};
    
    struct DelayedFuture {
        start_time: Instant,
        delay_duration: Duration,
        poll_count: u32,
    }
    impl DelayedFuture {
        fn new(delay_duration: Duration) -> Self {
            DelayedFuture {
                start_time: Instant::now(),
                delay_duration,
                poll_count: 0,
            }
        }
    }
    impl Future for DelayedFuture {
        type Output = u32; // Результат, который будет возвращен, когда Future завершится
    
        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
            self.poll_count += 1;
    
            // Проверка, прошло ли достаточно времени для завершения
            if self.start_time.elapsed() >= self.delay_duration {
                // Если прошло достаточно времени, возвращаем результат
                Poll::Ready(self.poll_count)
            } else {
                // Если еще не прошло времени, говорим, что нужно подождать
                Poll::Pending
            }
        }
    }
    // Output:
    // Poll count: 1
    // Pending... waiting for the future to complete.
    // Poll count: 2
    // Pending... waiting for the future to complete.
    // Poll count: 3
    // Pending... waiting for the future to complete.
    // Poll count: 4
    // Pending... waiting for the future to complete.
    // Poll count: 5
    // Pending... waiting for the future to complete.
    // Poll count: 6
    // Future completed with output: 6
    pub fn run() {
        use std::thread;
        use std::time::Duration;
        use futures::task::noop_waker_ref;
        use futures::future::FutureExt;
    
        let mut future = DelayedFuture::new(Duration::from_secs(5));
        let waker = noop_waker_ref();
        let mut cx = Context::from_waker(&waker);
    
        let mut pinned_future = Pin::new(&mut future);
    
        // Имитация циклов опроса
        for i in 1..=10 {
            println!("Poll count: {}", i);
            match pinned_future.as_mut().poll(&mut cx) {
                Poll::Pending => {
                    println!("Pending... waiting for the future to complete.");
                    thread::sleep(Duration::from_secs(1)); // Ожидание между опросами
                }
                Poll::Ready(output) => {
                    println!("Future completed with output: {}", output);
                    break;
                }
            }
        }
    }
}
fn main(){
    multiple_execution_cycle::run();
}

Для чего может потребоваться реализовывать std::future::Future

1. Создание кастомных асинхронных задач

Чтобы избежать активного ожидания (busy-waiting) и излишней загрузки процессора, необходим (Waker и Executor) планировщик задач с асинхронным таймером, который опросит Future когда настанет время. Для этого нам нужен асинхронный рантайм в котором реализован механизм таймера.

Работа с таймерами и задержками: Таймеры, такие как tokio::time::sleep, требуют планировщика задач, который будет отслеживать время и уведомлять задачи, когда наступит заданное время. Без рантайма невозможно реализовать эту функциональность, потому что нет системы, которая может отслеживать и запускать задачи по расписанию.

Если ваш Future возвращает Poll::Pending, вам нужно будет вызвать wake() из другого места, чтобы пробудить задачу и позволить ей продолжить выполнение, так как планировщик будет ждать от вас сигнала cx.waker(). Когда poll возвращает Poll::Pending, это сигнализирует исполнителю (например, tokio или async-std), что Future не готов к завершению и должен быть повторно опрошен позже. Для того чтобы исполнитель знал, когда нужно повторно попробовать, ваш Future может использовать cx.waker() для регистрации колбэка (waker). Этот waker будет пробуждать Future позже, когда условия его завершения будут выполнены (например, когда I/O событие будет готово)

Когда вы устанавливаете waker через cx.waker(), вы просто сохраняете его, чтобы позднее использовать для уведомления системы, что Future должен быть повторно опрошен. Но сам по себе вызов cx.waker().clone() не вызывает никакого действия — это просто регистрация waker, который будет пробуждать Future позже. Чтобы использовать его, нужно подключить его к системе, которая будет управлять временем (например, таймеру), чтобы пробудить Future


mod not_busy_waiting{
    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll, Waker};
    use std::time::{Duration, Instant};
    use std::thread;
    use futures::task::{waker_ref, noop_waker_ref, ArcWake};
    use std::sync::{Arc, Mutex};
    use tokio::time::{sleep, Sleep};

    struct DelayedFuture {
        start_time: Instant,
        delay_duration: Duration,
        waker: Option, // Поле для хранения Waker-а
    }
    
    impl DelayedFuture {
        fn new(delay_duration: Duration) -> Self {
            DelayedFuture {
                start_time: Instant::now(),
                delay_duration,
                waker: None,
            }
        }
    }
    impl Future for DelayedFuture {
        type Output = u32;
    
        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
            if self.start_time.elapsed() >= self.delay_duration {
                // Если прошло достаточно времени, возвращаем Poll::Ready
                Poll::Ready(42)
            } else {
                // Если еще не прошло время, сохраняем waker, чтобы пробудить его позже
                if self.waker.is_none() {
                    self.waker = Some(cx.waker().clone());
                    // Здесь мы бы регистрировали `waker` с таймером или другим механизмом,
                    // который пробудит `Future`, когда пройдет нужное время.
                    let delay = self.delay_duration - self.start_time.elapsed();
                    let waker_clone = self.waker.as_ref().unwrap().clone();
                    tokio::spawn(async move {
                        sleep(delay).await;
                        waker_clone.wake();
                    });
                }
                // Указываем, что `Future` еще не готов
                Poll::Pending
            }
        }
    }
    #[tokio::main]
    pub async fn run() {
        let delay_duration = Duration::from_secs(5);
        let mut future = DelayedFuture::new(delay_duration);

        // Параллельно с ожиданием можно запускать таймер, который ограничивыет время ожидания Future
        /*let result = tokio::time::timeout(delay_duration, async {
            future.await
        })
        .await;
    
        match result {
            Ok(42) => println!("Future completed successfully!"),
            Ok(_) => println!("Unexpected result."),
            Err(_) => println!("Future did not complete in time."),
        }    
       */

        // но можно и без дополнительно ограничения
        match future.await {
            42 => println!("Future completed successfully!"),
            _ => println!("Unexpected result."),
        }
    }
}
fn main(){
    not_busy_waiting::run();
}

Для чего может потребоваться реализовывать std::future::Future

2. Интеграция со сторонними системами

При необходимости работы с API или библиотеками, которые не поддерживают асинхронность напрямую, вы можете обернуть их в Future, чтобы встроить в экосистему Rust асинхронного программирования.

Пример: Обертывание блокирующего API в асинхронное поведение с использованием пула потоков.


use std::fs::File;
use std::io::{self, Read};
use std::path::Path;
use sha2::{Sha256, Digest};
use tokio::task;

fn calculate_file_hash_sync(file_path: &Path) -> io::Result {
    let mut file = File::open(file_path)?;
    let mut hasher = Sha256::new();
    let mut buffer = Vec::new();

    file.read_to_end(&mut buffer)?;
    hasher.update(buffer);

    let hash_result = hasher.finalize();
    Ok(format!("{:x}", hash_result))
}
async fn calculate_file_hash(file_path: &Path) -> io::Result {
    let path = file_path.to_path_buf();

    // Оборачиваем синхронный код в асинхронный Future
    task::spawn_blocking(move || calculate_file_hash_sync(&path)).await?
}
#[tokio::main]
async fn main() {
    let file_path = std::path::Path::new("example.txt");

    match calculate_file_hash(file_path).await {
        Ok(hash) => println!("Hash of the file: {}", hash),
        Err(e) => eprintln!("Failed to calculate hash: {}", e),
    }
}

Для чего может потребоваться реализовывать std::future::Future

3. Оптимизация асинхронных операций

Реализация Future вручную может быть полезной для оптимизации сложных задач. Вы можете уменьшить накладные расходы, связанные с автоматической генерацией кода через async/await, за счет более точного управления состоянием.

  • Минимизация переключений:

Вместо того чтобы возвращаться из poll на каждом шаге, цикл loop внутри poll сразу переходит к следующему состоянию, если это возможно.

  • Ручное управление состоянием:

Это уменьшает накладные расходы, связанные с async/await, за счет исключения необходимости компиляции и поддержки автоматических состояний имеет накладные расходы: Так как если бы вы создали код на основе синтаксиса async/await то:

  • Компилятор добавляет промежуточные состояния.
  • Дополнительная память используется для хранения временных данных.
  • Управление состояниями требует больше инструкций во время выполнения

Ручная реализация Future для оптимизации асинхронных операций позволяет вам точно контролировать, как и когда происходит переход между состояниями, что помогает минимизировать накладные расходы. Это особенно полезно, если вы выполняете сложные вычисления или интегрируете несколько асинхронных операций с минимальной задержкой. В этом примере создается Future, который выполняет последовательность асинхронных операций с минимальным переключением состояний:


use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::{sleep, Duration};

struct OptimizedFuture {
    state: State,
}
enum State {
    Step1,
    Step2,
    Step3,
    Complete,
}
impl OptimizedFuture {
    fn new() -> Self {
        OptimizedFuture {
            state: State::Step1,
        }
    }
}
impl Future for OptimizedFuture {
    type Output = u32;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
        loop {
            match self.state {
                State::Step1 => {
                    println!("Performing step 1...");
                    // Имитация асинхронной работы
                    let waker = cx.waker().clone();
                    tokio::spawn(async move {
                        sleep(Duration::from_secs(1)).await;
                        waker.wake();
                    });
                    self.state = State::Step2;
                    return Poll::Pending; // Ждем завершения асинхронной операции
                }
                State::Step2 => {
                    println!("Performing step 2...");
                    // Имитация второй асинхронной операции
                    let waker = cx.waker().clone();
                    tokio::spawn(async move {
                        sleep(Duration::from_secs(1)).await;
                        waker.wake();
                    });
                    self.state = State::Step3;
                    return Poll::Pending;
                }
                State::Step3 => {
                    println!("Performing step 3...");
                    self.state = State::Complete;
                    return Poll::Ready(42); // Возвращаем готовый результат
                }
                State::Complete => panic!("Polling a completed future."),
            }
        }
    }
}
#[tokio::main]
async fn main() {
    let result = OptimizedFuture::new().await;
    println!("Future completed with result: {}", result);
}

Когда вы используете async/await, компилятор Rust автоматически генерирует сложный код на основе вашего async fn или async {} блока. Этот код включает создание структуры, которая представляет ваше асинхронное состояние, управление переходами между этими состояниями, а также механизм для хранения данных, необходимых между вызовами .poll().

Как работает async/await за кулисами:

  1. Генерация конечного автомата:

    • Компилятор преобразует каждый async fn или async блок в конечный автомат (state machine), который имеет состояния, соответствующие вашему коду между await вызовами.
    • Например, если в вашем коде есть три await, компилятор создаст состояния для:
      • Начала выполнения,
      • После первого await,
      • После второго await,
      • Завершения выполнения.
  2. Сохранение контекста:

    • Компилятор добавляет поля в сгенерированную структуру, чтобы хранить промежуточные значения, которые нужны между await вызовами.
  3. Накладные расходы:

    • Создание и управление этим автоматом требует дополнительной памяти и времени выполнения.
    • Компилятор вынужден генерировать много дополнительного кода для управления состояниями.

Пример накладных расходов async/await

async fn example() -> u32 {
    let x = compute().await;
    let y = compute().await;
    x + y
}

Компилятор генерирует структуру, которая примерно эквивалентна:

enum ExampleState {
    Start,
    AwaitingCompute1(Pin<Box<dyn Future<Output = u32>>>),
    AwaitingCompute2(u32, Pin<Box<dyn Future<Output = u32>>>),
    Done,
}

struct ExampleFuture {
    state: ExampleState,
}

impl Future for ExampleFuture {
    type Output = u32;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.state {
            ExampleState::Start => {
                let fut1 = compute();
                self.state = ExampleState::AwaitingCompute1(Box::pin(fut1));
                cx.waker().wake_by_ref();
                Poll::Pending
            }
            ExampleState::AwaitingCompute1(ref mut fut1) => {
                if let Poll::Ready(val1) = fut1.as_mut().poll(cx) {
                    let fut2 = compute();
                    self.state = ExampleState::AwaitingCompute2(val1, Box::pin(fut2));
                    cx.waker().wake_by_ref();
                    Poll::Pending
                } else {
                    Poll::Pending
                }
            }
            ExampleState::AwaitingCompute2(val1, ref mut fut2) => {
                if let Poll::Ready(val2) = fut2.as_mut().poll(cx) {
                    self.state = ExampleState::Done;
                    Poll::Ready(val1 + val2)
                } else {
                    Poll::Pending
                }
            }
            ExampleState::Done => panic!("Polling after completion"),
        }
    }
}

Этот автоматически созданный код работает корректно, но он имеет накладные расходы:

  • Компилятор добавляет промежуточные состояния.
  • Дополнительная память используется для хранения временных данных (val1 в примере).
  • Управление состояниями требует больше инструкций во время выполнения.

Ручная реализация Future:

Когда вы реализуете Future вручную:

  • Вы сами управляете состояниями, поэтому компилятор не создает конечный автомат.
  • Вы можете минимизировать количество состояний и точно определить, что должно сохраняться между вызовами poll.
  • Это снижает накладные расходы и может быть полезно в задачах, где требуется высокая производительность.

Пример выше с OptimizedFuture минимизирует накладные расходы за счет явного управления состоянием, что делает его более легковесным по сравнению с аналогичным кодом на async/await.

Кастомная реализация исполнителя

...


mod custom_executor{

    use std::future::Future;
    use std::task::{Context, Poll, Waker};
    use std::sync::{Arc, Mutex};
    use std::collections::VecDeque;
    use std::pin::Pin;
    use std::time::{Duration, Instant};
    use std::thread;
    use std::sync::mpsc;
    
    struct Task {
        future: Mutex + Send>>>,
        waker: Mutex>,
    }
    
    struct Executor {
        tasks: Arc>>>,
    }
    
    impl Executor {
        fn new() -> Self {
            Executor {
                tasks: Arc::new(Mutex::new(VecDeque::new())),
            }
        }
    
        fn spawn(&self, future: impl Future + Send + 'static) {
            let task = Arc::new(Task {
                future: Mutex::new(Box::pin(future)),
                waker: Mutex::new(None),
            });
            self.tasks.lock().unwrap().push_back(task);
        }
    
        fn run(&self) {
            while let Some(task) = self.tasks.lock().unwrap().pop_front() {
                let waker = waker_fn::waker_fn({
                    let task = task.clone(); // Клонируем `task` для того, чтобы создать новый указатель.
                    move || {
                        task.waker.lock().unwrap().take().map(|w| w.wake());
                    }
                });
    
                let mut context = Context::from_waker(&waker);
                let mut future = task.future.lock().unwrap();
                let poll_result = future.as_mut().poll(&mut context);
                drop(future); // Освобождаем блокировку перед повторным использованием `task`.
    
                if let Poll::Pending = poll_result {
                    // Если задача не завершена, возвращаем её в очередь.
                    self.tasks.lock().unwrap().push_back(task);
                }
                // Если задача завершена (Poll::Ready), она больше не добавляется обратно в очередь.
            }
        }
    }
    
    /// Модуль waker_fn отвечает за создание Waker — объекта, который позволяет пробуждать задачи в контексте асинхронного выполнения. 
    /// В асинхронной среде Waker используется для уведомления системы о том, что задача готова 
    /// к выполнению и должна быть снова поставлена в очередь для планирования.
    mod waker_fn {
        use std::sync::Arc;
        use std::task::{RawWaker, RawWakerVTable, Waker};
    
        type FnPtr = Arc;
    
        pub fn waker_fn(f: impl Fn() + Send + Sync + 'static) -> Waker {
            let data = Arc::new(f) as FnPtr;
            unsafe {
                let raw_waker = raw_waker(Arc::into_raw(data.into()));
                Waker::from_raw(raw_waker)
            }
        }
    
        unsafe fn raw_waker(data: *const FnPtr) -> RawWaker {
            RawWaker::new(
                data as *const (),
                &RawWakerVTable::new(clone, wake, wake_by_ref, drop_data),
            )
        }
    
        unsafe fn clone(data: *const ()) -> RawWaker {
            let arc = Arc::from_raw(data as *const FnPtr);
            let cloned = Arc::clone(&arc);
            std::mem::forget(arc);
            raw_waker(Arc::into_raw(cloned))
        }
    
        unsafe fn wake(data: *const ()) {
            let arc = Arc::from_raw(data as *const FnPtr);
            arc();
        }
    
        unsafe fn wake_by_ref(data: *const ()) {
            let arc = Arc::from_raw(data as *const FnPtr);
            arc();
            std::mem::forget(arc);
        }
    
        unsafe fn drop_data(data: *const ()) {
            drop(Arc::from_raw(data as *const FnPtr));
        }
    }
    
    async fn simple_sleep(duration: Duration) {
        let start = Instant::now();
        while start.elapsed() < duration {
            // Простой способ задержки без использования сторонних библиотек.
            thread::sleep(Duration::from_millis(100));
        }
    }
    
    pub fn run() {
        let executor = Executor::new();
        let my_future = async {
            println!("Task 1 started");
            simple_sleep(Duration::from_secs(1)).await;
            println!("Task 1 completed");
        };

        executor.spawn(my_future);
    
        executor.spawn(async {
            println!("Task 2 started");
            simple_sleep(Duration::from_secs(2)).await;
            println!("Task 2 completed");
        });
    
        executor.run();
    }
}
fn main(){
    custom_executor::run();
}
 

Runtime Tokio

Если использовать Исполнитель (Runtime) от Tokio, то мы получаем планировщиком задач (executor), который отвечает за выполнение асинхронных задач.

Но еще Асинхронные примитивы (Таймеры и задержки tokio::time::sleep, асинхронные каналы: tokio::sync::mpsc, Асинхронные блокировки: Использование типов, таких как tokio::sync::Mutex)

Еще Встроенные утилиты:

  • Параллельное выполнение: Например, tokio::join!, который позволяет запускать несколько задач параллельно и ждать их завершения.
  • Асинхронные потоки: Модули, такие как tokio::stream, для работы с потоками данных.

Поддержка потоков

  • tokio может выполнять задачи в одном или нескольких потоках

async: Используется для обозначения асинхронной функции или блока кода.

Когда вы добавляете ключевое слово async перед функцией или блоком кода, компилятор Rust превращает его в std::future::Future, который может быть опрошен и выполнен позже.

async fn my_async_function() -> u32 { 42 }

// Превращается в нечто подобное:
fn my_async_function() -> impl Future<Output = u32> {
    async { 42 }
}

Как запустить среду выполнения Tokio (runtime tokio сам планирует когда опрашивать задачу "task ruture" через метод "poll")

[dependencies]
futures = "0.3"
tokio =  { version = "1.12", features = ["full"] }

async fn app() {
    println!("Hello");
}
// Способ через tokio::runtime::Runtime
fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    let future = app();
    rt.block_on(future);
}
// Способ через  #[tokio::main]
#[tokio::main]
async fn main() {
    println!("Hello");
}

Как запустить среду выполнения Tokio

tokio::runtime::Builder


use tokio::runtime::Builder;
use tokio::time::{sleep, Duration};

fn main() {
    // Создаем кастомный экземпляр Runtime
    let rt = Builder::new_multi_thread()
        .worker_threads(4) // Указываем количество рабочих потоков
        .max_blocking_threads(2) // Максимальное количество блокирующих потоков
        .enable_all() // Включаем таймеры, блокировку и другие функции
        .build()
        .expect("Failed to build runtime");

    // Используем созданный Runtime для выполнения асинхронной задачи
    rt.block_on(async {
        let task1 = async {
            println!("Task 1: Sleeping for 2 seconds");
            sleep(Duration::from_secs(2)).await;
            println!("Task 1: Done");
        };

        let task2 = async {
            println!("Task 2: Sleeping for 1 second");
            sleep(Duration::from_secs(1)).await;
            println!("Task 2: Done");
        };

        tokio::join!(task1, task2);
    });
}

Оператор "turbofish" указать тип возвращаемого значения

err_in_async_blocks


fn main(){
    let fut = async {
        foo().await?;
        bar().await?;
        Ok::<(), MyError>(())  
    };
}

Вернуть Ready в async

    type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
     async fn future(name:&str) -> Result<String> {
        Ok(format!("Hello {}",name)) 
    }
или
    async fn future(name:&str) -> String {
        format!("Hello {}",name)
           или
        // std::future::ready(format!("Hello {}",name)).await
        // futures::future::ready("Hello world".to_string()).await
        // futures_util::future::err(ErrorBadRequest("no luck"))   
    }
или 
    // Синтаксис async/await заворачивает String в  impl Future<Output = String>   
    async fn future(name:&str) -> String {
        // use crate futures
        let f: futures::future::Ready<u32> = futures::future::ready(format!("Hello {}",name));
        //let f: futures::future::Pending<String> = futures::future::pending(); 

        // use std
        //let f: std::future::Ready<String> = std::future::ready(format!("Hello {}",name));
        //let f: std::future::Pending<String> = std::future::pending();
        f.await
    }
или
    fn future(name:&str) -> impl std::future::Future<Output = String> {
        // use crate futures
        let f: futures::future::Ready<String> = futures::future::ready(format!("Hello {}",name));
        //let f: futures::future::Pending<String> = futures::future::pending(); 
    
        // use std
        //let f: std::future::Ready<String> = std::future::ready(format!("Hello {}",name));
        //let f: std::future::Pending<String> = std::future::pending();
        f
    }
или
    fn future(name:&str) ->  impl std::future::Future<Output = String>{
        async {
            format!("Hello {}",name)
        }
    }

Чтобы ещё глубже понять, как выглядит Future, можно вручную вызывать метод poll. Мы вызываем poll самостоятельно без использования Executor runtime tokio


use std::future::Future;
use std::task::{Context, Poll};
use futures::task::noop_waker;

async fn my_async_function() -> u32 {  42 }

fn main() {
        let fut:impl Future = my_async_function(); // Получаем Future
        let mut pinned = Box::pin(fut); // Оборачиваем в Pin>

        let waker = noop_waker();
        let mut context = Context::from_waker(&waker);

        match pinned.as_mut().poll(&mut context) {
            Poll::Ready(val) => println!("Future completed with value: {}", val),
            Poll::Pending => println!("Future is still pending"),
        }
}

Преимущества синергии

Когда вы комбинируете акторов с асинхронным программированием в Tokio, вы получаете:

  • Эффективную изоляцию: каждый актор имеет своё независимое состояние и обрабатывает сообщения, избегая гонок данных.
  • Управление большим количеством задач: Tokio позволяет создавать тысячи акторов, обрабатывая их на ограниченном числе потоков.
  • Простое масштабирование: сообщения между акторами могут быть легко отправлены по сети, что упрощает распределение задач в кластере.

В модели Акторов отдельные "акторы" взаимодействуют между собой, отправляя и получая сообщения, что позволяет организовать асинхронное выполнение задач без необходимости использовать блокировки или мьютексы для защиты данных.

Асинхронная коммуникация: Акторы обмениваются сообщениями асинхронно, что позволяет избегать блокировок и делает взаимодействие между ними более естественным и безопасным. Сообщения обычно помещаются в очередь, и актор обрабатывает их по мере своей готовности.

Простота асинхронного программирования: Модель акторов совместима с асинхронным программированием и может использовать async/await для обработки сообщений и выполнения задач. Это упрощает написание асинхронного кода, так как каждый актор может быть асинхронным и работать с асинхронными операциями.

Как связаны акторы, async и Tokio

Асинхронность и модель акторов идеально сочетаются.

Вот ключевые моменты их взаимодействия:

Акторы как асинхронные задачи. В контексте Tokio, акторы могут быть реализованы как асинхронные задачи (асинхронные функции или блоки кода), которые:

  • Выполняются внутри планировщика Tokio.
  • Обрабатывают сообщения через асинхронные каналы.

Изоляция состояний через async

  • Каждому актору можно предоставить своё состояние, которое никогда не будет разделяться напрямую, что соответствует изоляции модели акторов.

Акторы и планировщик задач

  • Tokio, как среда выполнения, выступает в роли диспетчера задач, аналогично тому, как модель акторов требует некоторого "менеджера", который распределяет задачи.
  • Планировщик Tokio вызывает poll на каждой асинхронной задаче, управляя их прогрессом.

Сообщения через каналы

  • Tokio предоставляет каналы (mpsc, oneshot), которые используются для передачи сообщений между задачами, включая акторов.
  • Актор может ожидать сообщения через канал:

use tokio::sync::mpsc;
struct Actor { // Здесь актор -- это задача, которая асинхронно ждёт сообщения
    rx: mpsc::Receiver,
}
impl Actor {
    async fn run(mut self) {
        while let Some(message) = self.rx.recv().await {
            println!("Received: {}", message);
        }
    }
}
#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(32);
    let actor = Actor { rx };
    tokio::spawn(actor.run());
    tx.send("Hello, Actor!".to_string()).await.unwrap();
}

Асинхронные структуры синхронизации:

  • tokio::sync::Mutex: Асинхронный Mutex.
  • tokio::sync::RwLock: Асинхронный rw-lock.
  • tokio::sync::mpsc: Асинхронные каналы передачи сообщений.
  • tokio::sync::oneshot: Одноразовые каналы.
  • tokio::sync::watch: Канал для передачи последних обновлений.

Асинхронный Mutex позволяет задаче "припарковаться", не блокируя поток, пока ресурс занят. Это предотвращает простой потоков, освобождая их для выполнения других задач.


use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(0));

    let data_clone = Arc::clone(&data);
    tokio::spawn(async move {
        let mut lock = data_clone.lock().await;
        *lock += 1;
    });

    let lock = data.lock().await;
    println!("Value: {}", *lock);
}

std::pin::Pin

std/pin

std::pin::Pin

Зачем это нужно? Асинхронные функции (async fn) компилируются в (state machines), которые хранят промежуточные данные между вызовами await.

Эти данные включают:

  • Локальные переменные функции.
  • Промежуточные результаты асинхронных операций. Если такие данные переместить в памяти, то ссылки или указатели внутри машины могут стать недействительными, что нарушит правила безопасности Rust и приведёт к временным ссылкам на недопустимую память (dangling references). Pin предотвращает перемещение таких объектов в памяти, гарантируя, что их размещение остаётся стабильным.

std::pin::Unpin — это автоматическая черта (auto-trait), указывающая, что тип безопасен для перемещения, даже если он был обёрнут в Pin.

Типы, реализующие Unpin, могут свободно перемещаться в памяти.

Например:

  • Простые примитивные типы (i32, f64).
  • Ссылки (&T, &mut T), так как они не хранят состояние.
  • Типы, не содержащие самореференций (self-references).

Асинхронные задачи, которые не требуют фиксации в памяти (например, не используют ссылки на собственные поля), автоматически реализуют Unpin

Запретить перемещение в памяти асинхронных операций во время их выполнения, что и делает self: Pin<&mut Self>

Как вы уже могли заметить, в сигнатуре метода Future::poll присутствует self: Pin<&mut Self>. Что это значит? Это даёт гарантию компилятору, что асинхронная операция не переместится в памяти во время выполнения.

У большинства типов нет проблем с перемещением. Эти типы реализуют типаж Unpin. Указатели на Unpin-типы могут свободно помещаться в Pin или извлекаться из него. Например, тип u8 реализует Unpin, таким образом Pin<&mut u8> ведёт себя также, как и &mut u8.

Некоторые функции требуют, чтобы футуры, с которыми они работают, были Unpin. Чтобы использовать Future или Stream, который не реализует Unpin, с функцией, которая требует Unpin-типы, сначала нужно закрепить значение, используя либо Box::pin (чтобы создать Pin<Box<T>>) или макрос pin_utils::pin_mut! (чтобы создать Pin<&mut T>). Pin<Box<Fut>> и Pin<&mut Fut> могут быть использованы как футура и оба реализуют Unpin.

Например:


fn main(){
    use pin_utils::pin_mut; // `pin_utils` is a handy crate available on crates.io

    // A function which takes a `Future` that implements `Unpin`.
    fn execute_unpin_future(x: impl Future + Unpin) { /* ... */ }

    let fut = async { /* ... */ };
    execute_unpin_future(fut); // Error: `fut` does not implement `Unpin` trait

    // Pinning with `Box`:
    let fut = async { /* ... */ };
    let fut = Box::pin(fut);
    execute_unpin_future(fut); // OK

    // Pinning with `pin_mut!`:
    let fut = async { /* ... */ };
    pin_mut!(fut);
    execute_unpin_future(fut); // OK
}

Асинхронные блоки и функции возвращают типы, реализующие этот Futureпризнак. Возвращаемый тип является результатом преобразования компилятора, который превращает локальные переменные в данные, хранящиеся внутри будущего.

Некоторые из этих переменных могут содержать указатели на другие локальные переменные. По этой причине будущее никогда не следует перемещать в другую ячейку памяти, поскольку это сделает эти указатели недействительными.

Чтобы предотвратить перемещение будущего типа в памяти, его можно опросить только через закрепленный указатель. Pin— это оболочка ссылки, которая запрещает все операции, которые могли бы переместить экземпляр, на который она указывает, в другую ячейку памяти.


use tokio::sync::{mpsc, oneshot};
use tokio::task::spawn;
use tokio::time::{sleep, Duration};

// A work item. В этом случае просто поспите заданное время и ответьте 
// сообщением в канале `respond_on`.
#[derive(Debug)]
struct Work {
    input: u32,
    respond_on: oneshot::Sender,
}
// worker, который прослушивает очередь заданий и выполняет их.
async fn worker(mut work_queue: mpsc::Receiver) {
    let mut iterations = 0;
    loop {
        tokio::select! {
            Some(work) = work_queue.recv() => {
                sleep(Duration::from_millis(10)).await; // Притворяйтесь, что работаете.
                work.respond_on
                    .send(work.input * 1000)
                    .expect("failed to send response");
                iterations += 1;
            }
            // TODO: сообщать количество итераций каждые 100 мс
        }
    }
}
// Инициатор запроса, который запрашивает работу и ждет ее завершения.
async fn do_work(work_queue: &mpsc::Sender, input: u32) -> u32 {
    let (tx, rx) = oneshot::channel();
    work_queue
        .send(Work { input, respond_on: tx })
        .await
        .expect("failed to send on work queue");
    rx.await.expect("failed waiting for response")
}
#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(10);
    spawn(worker(rx));
    for i in 0..100 {
        let resp = do_work(&tx, i).await;
        println!("work result for iteration {i}: {resp}");
    }
}

Если одна из асинхронных операций блокирует поток, другие также остановятся.

Макрос join! позволяет среде выполнения запускать несколько асинхронных операций в одном потоке одновременно (но не параллельно). Если одна из этих асинхронных операций блокирует поток, другие также остановятся.

А как запускать параллельно асинхронные операции если одна из них заблокирует поток то что бы другие смогли продолжать работать?

Если одна из асинхронных операций блокирует поток, а вы хотите, чтобы другие продолжали работать, нужно использовать многопоточность. В Rust это можно реализовать с помощью tokio::spawn или других эквивалентных механизмов, использование пула потоков (tokio::task::block_in_place или spawn_blocking).

Использование tokio::spawn для независимых задач. Вместо запуска всех операций в одном потоке, tokio::spawn запускает каждую операцию в отдельной задаче (которая может быть запущена на другом потоке в рамках пула потоков)

Смешивание и сопоставление синхронизации и асинхронности; контекст потока tokio::task::spawn_blocking

rust-polyglot/async

Есть средства для вызова асинхронного кода из синхронной среды и из асинхронной.

Но есть подводные камни. В частности, существуют сложные правила о том, какую функцию вы можете вызывать из какого контекста времени выполнения (т. е. в каком потоке).

Например, если вы вызываете tokio::runtime::Handle::block_on из не асинхронной функции, чтобы запустить асинхронный код из неасинхронного кода, думая, что вы находитесь не в контексте асинхронного выполнения, а на самом деле текущий поток является потоком-исполнителем Tokio, это вызовет панику. Конечно, оболочка синхронизации над асинхронной библиотекой может не знать, была ли она вызвана косвенно из асинхронной задачи. Если вы думаете, что это может случиться, вы должны использовать spawn_blocking.

Зачем использовать tokio::task::spawn_blocking?

spawn_blocking создает задачу и помещает её в специальный пул потоков, предназначенный для выполнения блокирующего кода. Этот пул изначально имеет ограниченное количество потоков (количество можно настроить с помощью tokio::runtime::Builder), что позволяет контролировать использование системных ресурсов.

spawn_blocking следует использовать только для задач, которые не имеют асинхронных API и которые могут блокировать поток, например, когда используется библиотека, работающая только синхронно или требуется выполнение долгих вычислительных операций, которые не требуют асинхронности.

spawn_blocking в Tokio используется для выполнения блокирующих операций в асинхронном контексте. В отличие от обычных асинхронных задач, которые могут выполняться асинхронно и не блокировать поток, блокирующие операции могут задерживать выполнение всех задач на данном потоке, что приводит к проблемам с производительностью и отзывчивостью приложения.

Блокирующий код: Если у вас есть операция, которая не является асинхронной и блокирует поток, например:

  • работа с файловой системой (асинхронные аналоги: tokio::fs и тогда не надо использовать spawn_blocking);
  • синхронные сетевые запросы(асинхронные аналоги: tokio интегрируется с библиотекой reqwest, которая имеет асинхронную версию. Она поддерживает асинхронные HTTP-запросы без блокировки потока.);
  • работа с базами данных, которые не поддерживают асинхронный интерфейс (асинхронные аналоги: tokio-postgres, sqlx);
  • сложные вычислительные задачи, которые занимают значительное время(Асинхронные аналоги: Если задача может быть разделена на более мелкие независимые части, её можно выполнять с использованием асинхронных функций и tokio::task::spawn)

#[tokio::main]
async fn main() {
    let handle = tokio::task::spawn_blocking(|| {
        // Эта операция блокирует поток, например, синхронная работа с файлом
        let result = std::fs::read_to_string("large_file.txt");
        result.unwrap_or_else(|_| "Error reading file".to_string())
    });
    // Асинхронный код может продолжать выполняться без блокировки
    println!("Doing other work...");
    // Ожидание завершения блокирующей задачи
    let result = handle.await.unwrap();
    println!("File content: {}", result);
 ()}
}

Пример выполнения сложной вычислительной задачи с использованием tokio::task::spawn:


#[tokio::main]
async fn main() {
    let handle = tokio::task::spawn(async {
        // Сложная вычислительная задача
        let result = (0..1_000_000).sum::();
        result
    });
    // Асинхронный код может продолжать выполняться здесь
    println!("Doing other work...");
    // Ожидание завершения вычислительной задачи
    let result = handle.await.unwrap();
    println!("Computation result: {}", result);

Как вызвать блокирующие или ресурсоёмкие задачи

tokio::task::spawn_blocking , block_in_place Так же, как task::spawn, task::spawn_blocking возвращает JoinHandle

В отличие от task::spawn где не блокируется поток, spawn_blocking вместо этого порождает функцию блокировки в выделенном пуле потоков для блокировки задач

tokio/task

async-what-is-blocking

Что, если я захочу заблокировать? Иногда мы просто хотим заблокировать поток. Это совершенно нормально. Для этого есть две распространенные причины:

  • Дорогие вычисления, связанные с ЦП.
  • Синхронный ввод-вывод.

В обоих случаях мы имеем дело с операцией, которая не позволяет задаче достичь ожидаемого значения в течение длительного периода времени. Чтобы решить эту проблему, мы должны переместить операцию блокировки в поток за пределами пула потоков Tokio. Есть три варианта этого:

  • Используйте tokio::task::spawn_blocking функцию.
  • Используйте crate rayon.
  • Создайте специальный поток с помощью std::thread::spawn.

Это обычная проблема при написании асинхронного кода в целом. Если вы хотите воспользоваться средой выполнения, которая запускает ваш код одновременно, вам следует избегать блокировки или запуска кода, интенсивно использующего ЦП, в самих Futures. (т.е. запуск нового потока в котором нет среды выполнения, находят в асинхронном коде, что бы не блокировать текущий поток)


use tokio::task;
// Ошибки в многопоточном асинхронные контексты требуют дополнительных ограничений
type Result = std::result::Result>;

fn fib_cpu_intensive(n: u32) -> u32 {
    match n {
        0 => 0,
        1 => 1,
        n => fib_cpu_intensive(n - 1) + fib_cpu_intensive(n - 2),
    }
}
async fn app() -> Result<()>  {
    // сдесь помимо асинхронных задач с .await мы можем запустить отдельный поток для задачи что бы не блокировать текущий аснхронный поток
    let threadpool_future = task::spawn_blocking(||fib_cpu_intensive(30));
    let res = threadpool_future.await?;
    println!("fib={}",res);
    Ok(())
}
fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();

    match rt.block_on(app()) {
        Ok(_) => println!("Done"),
        Err(e) => eprintln!("An error ocurred: {}", e),
    };
}

crate rayon vs tokio::task::spawn_blocking

crate rayon

rayon: Его пул оптимизирован для распараллеливания множества вычислений. Он использует алгоритм "work-stealing" (кража работы): если один поток завершил свою задачу, он "крадет" часть работы у другого, еще занятого потока. Это обеспечивает максимальную загрузку всех ядер процессора и высокую пропускную способность для большого количества задач. Поэтому он идеально подходит для дорогостоящих вычислений, которые нужно распараллеливать.

spawn_blocking (из tokio): Его пул предназначен для изоляции. Его задача — принять блокирующий код, выполнить его в отдельном потоке и тем самым не заблокировать главный асинхронный runtime. Если вы отправите в него 100 тяжелых задач, они встанут в очередь и будут выполняться медленно, по мере освобождения каждого из небольшого числа потоков. Он НЕ подходит для параллелизации множества дорогостоящих вычислений.

Мы будем использовать сумму большого списка в качестве примера дорогостоящих вычислений, но обратите внимание, что на практике, если массив не очень-очень большой, простое вычисление суммы, вероятно, будет достаточно дешевым, чтобы вы могли просто сделать это прямо в Tokio через spawn_blocking.

Основная опасность использования rayon заключается в том, что вы должны быть осторожны, чтобы не заблокировать поток во время ожидания завершения вискозы.

Для этого совместите rayon::spawn с tokio::sync::oneshot таким образом:


async fn parallel_sum(nums: Vec) -> i32 {
    let (send, recv) = tokio::sync::oneshot::channel();

    // Spawn a task on rayon.
    rayon::spawn(move || {
        // Perform an expensive computation.
        let mut sum = 0;
        for num in nums {
            sum += num;
        }

        // Send the result back to Tokio.
        let _ = send.send(sum);
    });

    // Wait for the rayon task.
    recv.await.expect("Panic in rayon::spawn")
}

#[tokio::main]
async fn main() {
    let nums = vec![1; 1024 * 1024];
    println!("{}", parallel_sum(nums).await);
}

Это использует пул потоков района для выполнения дорогостоящей операции. Имейте в виду, что в приведенном выше примере для каждого вызова используется только один поток в пуле потоков rayon parallel_sum. Это имеет смысл, если у вас много вызовов parallel_sum в вашем приложении, но также можно использовать параллельные итераторы rayon для вычисления суммы в нескольких потоках:


use rayon::prelude::*;
fn main(){
    // Spawn a task on rayon.
    rayon::spawn(move || {
        // Compute the sum on multiple threads.
        let sum = nums.par_iter().sum();

        // Send the result back to Tokio.
        let _ = send.send(sum);
    });
}

Обратите внимание, что rayon::spawn вызов по- прежнему требуется при использовании параллельных итераторов, поскольку параллельные итераторы блокируются.


Создать выделенный поток

Если операция блокировки продолжается вечно, вы должны запустить ее в выделенном потоке. Например, рассмотрим поток, который управляет подключением к базе данных, используя канал для приема операций с базой данных для выполнения. Поскольку этот поток прослушивает этот канал в цикле, он никогда не завершается.

Выполнение такой задачи в любом из двух других пулов потоков представляет собой проблему, поскольку по существу она навсегда отбирает поток из пула. После того, как вы сделаете это несколько раз, у вас больше не будет потоков в пуле потоков, и все другие блокирующие задачи не будут выполнены.

Конечно, вы также можете использовать выделенные потоки для более коротких целей, если вы готовы платить за создание нового потока каждый раз, когда вы запускаете новый.

tokio::task::spawn (tokio::spawn (глобальный) это синоним tokio::task::spawn и используется аналогично.)

Задача является легким весом, неблокирующая блок исполнения (зеленые нити) Выполняются в исполнителе tokio Эквивалент стандартной библиотеки thread::spawn. Требуется async блок или другое будущее и создается новая задача для одновременного выполнения этой работы

Код, выполняемый в асинхронных задачах task::spawn, не должен выполнять операции, которые могут блокировать. Операция блокировки, выполняемая в задаче, выполняемой в потоке, который также выполняет другие задачи, заблокировала бы весь поток, предотвращая выполнение других задач.

Улучшение производительности через параллелизм Если у вас есть несколько задач, которые можно выполнять параллельно, tokio::task::spawn позволяет запускать их одновременно:


#[tokio::main]
async fn main() {
    let task1 = tokio::task::spawn(async {
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        println!("Task 1 completed");
    });
    let task2 = tokio::task::spawn(async {
        tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
        println!("Task 2 completed");
    });
    // Ждём завершения обеих задач
    let _ = tokio::join!(task1, task2);
}

tokio::join! — это удобный макрос, предоставляемый библиотекой Tokio для выполнения нескольких асинхронных операций параллельно. (аналог futures::futures::join!)

tokio::join! завершает все асинхронные задачи, даже если одна из них завершилась с ошибкой. Если вы хотите остановиться при первой ошибке, вам нужно использовать другие подходы, такие как try_join! (из futures).

Недостатки

  • Все задачи, переданные в tokio::join!, должны быть "Send" и 'static, так как они выполняются в изолированных потоках исполнения.
  • Задачи, переданные в tokio::join!, не могут захватывать значения с ограниченным временем жизни, что может ограничивать их использование в некоторых сценариях.

Для простых асинхронных задач достаточно tokio::join!

Для сложных задач можно завернуть вашу асинхронную операцию в выделенный поток tokio::spawn и его уже отдать в tokio::join! Это позволяет переместить задачу в пул потоков, освобождая текущий поток.

 async fn work(delay: u64) -> String {
    tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
    println!("Sleep:{delay}");
    std::future::ready(format!("Sleep {}",delay)).await
}
#[tokio::main]
pub async fn run(){  
    // Не блокирующие вычисления
    let (first, second,..) = tokio::join!(
        // парралельное выполнение задач
        tokio::task::spawn(async {
            work(5).await
        }),
        tokio::task::spawn(async {
            work(2).await
        }),
        tokio::task::spawn(async {
            work(1).await
        }),
    ); 
    // Не блокирующие вычисления
    tokio::join!(
        work(5),work(2),work(1),
    );
   // Блокирующее исполнение т.е. последовательно синхронно
    work(5).await;
    work(2).await;
    work(1).await;  
}

Отличие tokio::join! от tokio::spawn

tokio::spawn используется для запуска задач в фоновом режиме на пуле потоков и возвращает handle, через который можно дожидаться завершения.

join! управляет задачами внутри текущего контекста исполнения.

spawn запускает задачи в пуле потоков и может масштабироваться между потоками.


#[tokio::main]
async fn main() {
    // `join!` запускает несколько задач и ждет их завершения.
    let (res1, res2) = tokio::join!(task_one(), task_two());
    // `spawn` создает задачи, которые могут работать в фоновом режиме.
    let handle1 = tokio::spawn(task_one());
    let handle2 = tokio::spawn(task_two());
    // Ожидаем завершения фоновых задач
    let res1 = handle1.await.unwrap();
    let res2 = handle2.await.unwrap();
    println!("Results: {}, {}", res1, res2);
}

Пример ожидания программы c tokio::task::spawn

wait...
wait...
wait...
wait...
wait...
wait...
wait...
{
  "content": [
    "/home/jeka/Projects/Rust/ttttttest/src/main.rs",
    "/home/jeka/Projects/Rust/ttttttest/src/main_rs"
  ]
}

use std::process::{Stdio, Command};

#[tokio::main]
async fn main() {
    let handle = tokio::task::spawn(async move {
        let (is_err,s) = read_dir().await;
        print(is_err,s);
    });
     
    // Цикл для периодической проверки состояния задачи
    loop {
        // Проверяем, завершилась ли задача
        if handle.is_finished() {
            // Дожидаемся завершения задачи и обрабатываем результат
            handle.await.unwrap();
            break;
        }
        println!("wait...");// Выполняем другие задачи, если нужно
        // Ждем некоторое время перед следующей проверкой
        std::thread::sleep(std::time::Duration::from_millis(500));
    }
}
async fn read_dir() -> (bool, String){
    std::thread::sleep(std::time::Duration::from_secs(3));// stopper

    let executable_path = "/home/jeka/Projects/Rust/58/58web3/target/debug/llm-system-fuction";
    
    // Запускаем первую программу асинхронно
    let output = Command::new(executable_path)
        .args(["rl","-d","/home/jeka/Projects/Rust/ttttttest/src"])
        .stdout(Stdio::piped())
        .output()
        .expect("Failed to execute process");
 
    if output.status.success() {
        let stdout: String = String::from_utf8_lossy(&output.stdout).into_owned();
       return (true,stdout);
    } else {
        let stderr: String = String::from_utf8_lossy(&output.stderr).into_owned();
        return (false,stderr);
    }
}
fn print(is_err: bool, out: String){
    if is_err {  eprintln!("{}", out); } else { println!("{}", out); }
}

Убедитесь, что ваши асинхронные функции и замыкания реализуют трейт Send, если вы планируете передавать их между потоками


#[tokio::main]
async fn main() {
    // Определяем асинхронную функцию, которая принимает замыкание
    async fn my_async_function(f: F)
    where
        F: Fn() + Send + 'static, {
          // Вызываем замыкание
          f();
    }
    // Замыкание, которое будет передано в асинхронную функцию
    let my_closure = || {
        println!("Hello from the closure!");
    };
    // Запускаем асинхронную задачу и передаем в нее замыкание
    let handle = tokio::task::spawn(async move {
        my_async_function(my_closure).await; // Вызываем асинхронную функцию
    });
    handle.await.unwrap(); // Ожидаем завершения задачи
}

tokio::select!

Позволяет ожидать завершения первой из нескольких асинхронных операций. Удобен для реализации таймаутов или конкуренции задач


use tokio::time::{sleep, Duration};
async fn fast_task() -> &'static str {
    sleep(Duration::from_secs(1)).await; // Быстрая задача (завершается через 1 секунду)
    "Fast task completed"
}
async fn slow_task() -> &'static str {
    sleep(Duration::from_secs(3)).await; // Медленная задача (завершается через 3 секунды)
    "Slow task completed"
}

#[tokio::main]
async fn main() {
    tokio::select! {
        result = fast_task() => {
            println!("{}", result); // Этот блок выполнится первым
        }
        result = slow_task() => {
            println!("{}", result);
        }
    }
    println!("One of the tasks completed!");
}

tokio::task::block_in_place

Позволяет выполнять блокирующий код в асинхронной среде, не блокируя текущий поток. Используется для интеграции с блокирующими библиотеками.


#[tokio::main]
async fn main() {
    tokio::task::block_in_place(|| {
        // Блокирующий код
        std::thread::sleep(std::time::Duration::from_secs(1));
        println!("Blocking operation completed");
    });
}

tokio::time::timeout

Устанавливает таймаут на выполнение задачи. т.е. завершит ее если она раньше не вернет Ready


use tokio::time::{sleep, timeout, Duration};
#[tokio::main]
async fn main() {
    let fut = sleep(Duration::from_secs(3));

    match timeout(Duration::from_secs(1), fut).await {
        Ok(_) => println!("Task completed"),
        Err(_) => println!("Task timed out"),
    }
}

tokio::time::sleep

Асинхронная версия std::thread::sleep. Позволяет приостанавливать выполнение задачи без блокировки потока.


use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
    println!("Start sleeping...");
    sleep(Duration::from_secs(2)).await;
    println!("Finished sleeping!");
}

Канал tokio::sync::mpsc позволяет задачам ожидать передачи данных, не блокируя потоки.

В std::sync::mpsc вызов recv() на приемнике блокирует поток, пока данные не будут доступны. Это неприемлемо в асинхронном контексте, так как поток, выполняющий асинхронные задачи, должен быть свободен для других задач.

tokio::sync::mpsc предоставляет асинхронный метод recv().await, который позволяет "припарковать" задачу, не блокируя поток, пока данные не станут доступными.


use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    tokio::spawn(async move {
        tx.send("Hello, world!").await.unwrap();
    });

    while let Some(message) = rx.recv().await {
        println!("Received: {}", message);
    }
}


// std::sync::mpsc (блокирует поток)
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        tx.send("Message").unwrap();
    });

    // Блокирует выполнение потока, пока не появятся данные
    let message = rx.recv().unwrap();
    println!("Received: {}", message);
}

tokio::sync::broadcast

Канал для передачи сообщений нескольким подписчикам.

Каждый подписчик получает копию сообщения.


use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel(10);
    let mut rx2 = tx.subscribe();

    tokio::spawn(async move {
        while let Ok(msg) = rx1.recv().await {
            println!("Receiver 1 got: {}", msg);
        }
    });
    tokio::spawn(async move {
        while let Ok(msg) = rx2.recv().await {
            println!("Receiver 2 got: {}", msg);
        }
    });
    tx.send("Hello, world!").unwrap();
}

В Substrate мы реализовали практичное решение в виде «контролируемого неограниченного канала». Количество элементов, входящих и исходящих из каждого канала, сообщается клиенту Prometheus. Когда разница между этими двумя показателями превышает определенный порог, инициируется оповещение. Ограниченные каналы тоже используются, но только в тех местах, где управление потоком передачи данных действительно важно (например, в сетях). Так что этот подход оказался практичным и, похоже, работает. Думаю, что ситуация значительно улучшится, когда у экосистемы Rust найдутся надежные вспомогательные средства для шаблонов асинхронного кода. Например, фоновая задача, которая получает сообщения и отправляет обратно ответы, или широковещательный канал tokio.

crate flume — это библиотека для асинхронных и синхронных каналов, обеспечивающая удобный и мощный интерфейс для передачи сообщений между потоками или задачами. Flume предоставляет альтернативу стандартным каналам Rust (std::sync::mpsc) и каналам из Tokio, с акцентом на производительность и удобство использования.

Предоставляет богатую функциональность, включая таймауты, выборку (select) сообщений из нескольких каналов и поддержку AsyncRead/AsyncWrite.


use flume::{unbounded, Selector};

fn main() {
    let (tx1, rx1) = unbounded();
    let (tx2, rx2) = unbounded();

    let selector = Selector::new()
        .recv(&rx1, |msg| println!("Received from channel 1: {}", msg))
        .recv(&rx2, |msg| println!("Received from channel 2: {}", msg));

    std::thread::spawn(move || {
        tx1.send("Message 1").unwrap();
        tx2.send("Message 2").unwrap();
    });

    selector.wait(); // Ожидаем сообщение из любого канала
}

tokio::fs

Асинхронные операции с файловой системой:

Чтение (tokio::fs::read, tokio::fs::File::read).

Запись (tokio::fs::write, tokio::fs::File::write).

Удаление, создание и другие операции.


use tokio::fs;
#[tokio::main]
async fn main() {
    let data = "Hello, Tokio!";
    fs::write("example.txt", data).await.unwrap();

    let content = fs::read_to_string("example.txt").await.unwrap();
    println!("File content: {}", content);
}

tokio::io

Асинхронные операции ввода-вывода:

Используется для работы с файлами, сетевыми соединениями и др.


use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() {
    let mut file = File::create("example.txt").await.unwrap();
    file.write_all(b"Hello, Tokio!").await.unwrap();
}

tokio::signal

Асинхронная обработка сигналов операционной системы, таких как SIGINT, SIGTERM, и других. Это полезно в приложениях, где требуется безопасное завершение или перезапуск в ответ на сигналы.


use tokio::signal;
#[tokio::main]
async fn main() {
    println!("Press Ctrl+C to exit");

    signal::ctrl_c().await.unwrap();
    println!("Received Ctrl+C, shutting down.");
}

std::stream::Stream — это асинхронный поток, предоставляющий интерфейс для работы с последовательностью асинхронных значений. Эта концепция имеет аналогию с итераторами (Iterator), но работает асинхронно и подходит для работы с асинхронными операциями, где элементы могут поступать с задержкой, например, при чтении данных из сети или при обработке асинхронных событий.

tokio::stream::Stream — расширенный трейт, реализует std::stream::Stream и добавляет асинхронные методы для работы с асинхронными потоками, поддерживающий асинхронные методы и интегрирующийся с async/await

Основные цели и применения Stream:

  • Асинхронное получение данных: Stream используется для асинхронного получения последовательных данных. Например, при обработке входящих данных от клиента через сокет, при чтении из асинхронного файла или при получении событий из потокового API.
  • Обработка событий: Когда необходимо обрабатывать события в реальном времени, такие как обновления данных или поступающие сообщения, поток может быть использован для их последовательного получения и обработки.
  • Параллельная работа: Комбинирование Stream с асинхронной функцией и инструментами, такими как futures::stream или tokio::stream, позволяет создавать сложные обработки, где множество потоков могут быть объединены или обрабатываться одновременно.

std/stream

Предварительно мы рассмотрели типаж Future, который полезен в случае вычисления всего лишь одного значения в течение всего времени. Но иногда вычисления лучше представить в виде потока значений. Для примера, TCP слушатель производит множество TCP соединений в течение своего времени жизни.


use tokio::stream::{self, StreamExt};
use tokio::sync::mpsc;
use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
    // Создаем канал для отправки и получения сообщений.
    let (tx, mut rx) = mpsc::channel(32);
    // Запускаем задачу, которая будет отправлять сообщения в канал каждую секунду.
    tokio::spawn(async move {
        for i in 1..=5 {
            tx.send(i).await.expect("Failed to send message");
            time::sleep(Duration::from_secs(1)).await;
        }
    });
    // Обрабатываем входящие сообщения из канала, используя поток.
    while let Some(value) = rx.recv().await {
        println!("Получено значение: {}", value);
    }
    println!("Все сообщения были обработаны.");
}


use tokio::stream::{self, StreamExt};
use tokio::time::{self, Duration};

#[tokio::main]
async fn main() {
    // Создаем поток с числами от 1 до 5, где каждый элемент будет появляться с интервалом 1 секунда.
    let mut stream = stream::unfold(1, |state| async move {
        if state <= 5 {
            // Ждем 1 секунду перед тем, как вернуть новое значение.
            time::sleep(Duration::from_secs(1)).await;
            Some((state, state + 1))
        } else {
            None
        }
    });
    while let Some(value) = stream.next().await {
        println!("Получено значение: {}", value);
    }
    println!("Поток завершил обработку.");
}

futures — это библиотека, которая предоставляет дополнительные инструменты для работы с Future и Stream, такие как:

  • Различные утилиты для комбинаторов, которые упрощают работу с цепочками вызовов (join_all, and_then).
  • Множественные типы Future и Stream, которые используются для создания сложных асинхронных операций.
  • future::poll_fn и другие вспомогательные функции, которые позволяют создавать кастомные Future

futures может быть полезен, если вы:

  • Хотите использовать дополнительные утилиты для комбинаторов, обработки ошибок и управления потоками выполнения.
  • Разрабатываете библиотеки или инструменты, которые должны работать вне зависимости от конкретного асинхронного рантайма, и вам нужно использовать Future и Stream API, которые предоставляют больше гибкости.

futures-rs - это библиотека, которая реализует фьючерсы и потоки нулевой стоимости в Rust.

futures имеет собственный исполнитель, но не собственный реактор, поэтому он не поддерживает выполнение асинхронного ввода-вывода или фьючерсов таймера.

По этой причине он не считается полноценной средой выполнения. Распространенный выбор - использовать утилиты из futures с исполнителем из другого ящика.

Контейнер futures является низкоуровневой реализацией futures, которая не несёт в себе какой-либо среды выполнения или слоя ввода/вывода.

Когда стоит использовать futures вместе с tokio?

Расширенная функциональность: Если вам нужны дополнительные комбинаторы и методы, которые предоставляет библиотека futures, вы можете добавить ее как зависимость.

Допустим, у нас есть несколько асинхронных операций, и мы хотим объединить их таким образом, чтобы выполнить их параллельно и дождаться завершения всех.

futures предоставляет удобные комбинаторы для таких задач, например, future::join_all

В отличие от использования tokio::join!, который работает только с фиксированным количеством задач, join_all позволяет работать с коллекцией Future, что дает больше гибкости.


use tokio::time::{sleep, Duration};
use futures::future::join_all;

async fn task_one() -> u32 {
    sleep(Duration::from_secs(2)).await;
    println!("Task one completed");
    42
}
async fn task_two() -> u32 {
    sleep(Duration::from_secs(1)).await;
    println!("Task two completed");
    24
}
async fn task_three() -> u32 {
    sleep(Duration::from_secs(3)).await;
    println!("Task three completed");
    7
}
#[tokio::main]
async fn main() {
    // Создаем список задач
    let tasks = vec![task_one(), task_two(), task_three()];
    // Используем `join_all` для запуска всех задач параллельно и ожидания их завершения
    let results = join_all(tasks).await;
    // Печать результатов выполнения всех задач
    for result in results {
        println!("Task completed with result: {}", result);
    }
}

and_then — это метод, который позволяет цепочечно обрабатывать результаты выполнения Future. Он используется для последовательного выполнения операций, где следующая операция зависит от результата предыдущей. Этот метод может быть полезен, когда нужно выполнить дополнительную асинхронную задачу после завершения текущей.

future::map — позволяет трансформировать результат выполнения асинхронной задачи (Future)


use tokio::time::{sleep, Duration};
use futures::future::FutureExt; // Для использования метода `and_then`

async fn task_one() -> u32 {
    sleep(Duration::from_secs(2)).await;
    println!("Task one completed with result: 42");
    42
}
async fn task_two(value: u32) -> String {
    sleep(Duration::from_secs(1)).await;
    println!("Task two received value: {}", value);
    format!("Processed value: {}", value * 2)
}
#[tokio::main]
async fn main() {
    // Запуск первой задачи
    let future = task_one()
        .map(|value| value + 1) // Дополнительная обработка результата
        .and_then(|value| {
            // Запуск второй задачи с результатом первой
            async move {
                let result = task_two(value).await;
                Ok(result)
            }
        });
    // Ожидание завершения всей цепочки и вывод результата
    match future.await {
        Ok(result) => println!("Final result: {}", result),
        Err(e) => eprintln!("An error occurred: {}", e),
    }
}

future::or_else — это комбинатор, который используется для обработки ошибок в Future и для создания цепочек выполнения, которые могут переключаться между различными вариантами в случае возникновения ошибки.

Обработка ошибок: or_else позволяет вам определить альтернативный путь выполнения, если асинхронная операция завершится с ошибкой.


use futures::future::{self, FutureExt};
use std::error::Error;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // Основная асинхронная задача, которая может завершиться с ошибкой
    let main_future = async {
        println!("Running main future...");
        Err::("Something went wrong!")
    };
    // Вторая асинхронная задача, которая будет выполнена в случае ошибки
    let fallback_future = async {
        println!("Running fallback future...");
        Ok(42)
    };
    // Использование `or_else` для переключения на альтернативную операцию при ошибке
    let result = main_future.or_else(|err| {
        println!("Error occurred: {}", err);
        fallback_future
    }).await;
    match result {
        Ok(value) => println!("Final result: {}", value),
        Err(err) => println!("Failed with error: {}", err),
    }
}

futures::future::BoxFuture: Тип Future, обернутый в Box, что позволяет создать динамический тип Future.
Когда вы хотите вернуть Future из функции, но не знаете его точный тип (рекурсивные типы, трейт-обьекты), например, при использовании разных ветвлений или различных асинхронных операций, BoxFuture помогает сделать тип возвращаемого значения унифицированным.

use futures::future::{BoxFuture, FutureExt};
use std::time::Duration;
use tokio::time::sleep;

// Определение рекурсивного типа (дерево)
enum Tree {
    Leaf(i32),
    Node(i32, Box<Tree>, Box<Tree>),
}
// Функция, которая возвращает BoxFuture с рекурсивным типом Tree
fn create_tree(condition: bool) -> BoxFuture<'static, Tree> {
    if condition {
        // Возвращаем асинхронную задачу, которая создает дерево с одной вершиной (Node)
        async move {
            sleep(Duration::from_secs(1)).await;
            Tree::Node(10, Box::new(Tree::Leaf(5)), Box::new(Tree::Leaf(15)))
        }
        .boxed()
    } else {
        // Возвращаем асинхронную задачу, которая создает простое дерево с одним узлом (Leaf)
        async move {
            sleep(Duration::from_secs(1)).await;
            Tree::Leaf(20)
        }
        .boxed()
    }
}
#[tokio::main]
pub async fn run() {
    let tree_future = create_tree(true);
    let tree = tree_future.await;
    match tree {
        Tree::Node(value, left, right) => {
            println!("Node value: {}", value);
            if let Tree::Leaf(left_value) = *left {
                println!("Left child value: {}", left_value);
            }
            if let Tree::Leaf(right_value) = *right {
                println!("Right child value: {}", right_value);
            }
        }
        Tree::Leaf(value) => {
            println!("Leaf value: {}", value);
        }
    }
}

Пример для типа без размера

use futures::future::{BoxFuture, FutureExt};
use std::time::Duration;
use tokio::time::sleep;

// Структура Cacher с полем, содержащим замыкание, обернутое в Box
struct Cacher<N, M> {
    calculation: Box<dyn Fn(N) -> M>,
}
// Имплементация функции, которая создает асинхронную задачу, использующую Cacher
impl<N, M> Cacher<N, M>
where
    M: 'static + std::marker::Send, // Чтобы результат был совместим с BoxFuture, нужен 'static
{
    fn new<F>(calculation: F) -> Self
    where
        F: Fn(N) -> M + 'static,
    {
        Cacher {
            calculation: Box::new(calculation),
        }
    }
    fn run(&self, input: N) -> BoxFuture<'static, M> {
        let result = (self.calculation)(input); // Выполнение замыкания
        async move { result }.boxed()
    }
}
#[tokio::main]
pub async fn run() {
    // Создаем новый Cacher с функцией, которая возвращает квадрат числа
    let cacher = Cacher::new(|x: i32| x * x);
    // Запускаем асинхронную задачу, которая выполняет функцию и возвращает результат
    let future = cacher.run(5);
    let result = future.await;
    println!("Result: {}", result); // Печатает "Result: 25"
}

futures::future::poll_fn — это функция, которая позволяет создать Future из функции, принимающей &mut Context и возвращающей Poll. Это особенно полезно, когда вы хотите создать простой Future без необходимости создавать целую структуру и реализовывать для неё трейт Future.


use futures::future::{poll_fn, FutureExt};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let start_time = Instant::now();
    let my_future = poll_fn(|cx| {
        if start_time.elapsed() >= Duration::from_secs(2) {
            Poll::Ready(42) // Возвращаем результат, если прошло достаточно времени
        } else {
            cx.waker().wake_by_ref(); // Указываем, что нужно вызвать poll снова
            Poll::Pending
        }
    });
    let result = my_future.await;
    println!("Future completed with result: {}", result);
}

future::loop_fn — позволяет создавать асинхронные циклы, работающие с Future. Он полезен, когда вы хотите повторять асинхронную задачу до тех пор, пока не будет выполнено определенное условие.

Вместо использования обычного loop с .await, future::loop_fn предоставляет более декларативный способ организации цикла, который возвращает асинхронные задачи и управляет их выполнением.

Когда использовать future::loop_fn

  • Если вы хотите создать цикл с асинхронной логикой, который выполняется до выполнения определенного условия.
  • Если вам нужно больше контроля над состоянием и последовательностью выполнения цикла, чем это возможно с обычным loop

Асинхронные итерации: Обеспечивает возможность асинхронного ожидания в каждой итерации, что особенно полезно при работе с асинхронными задачами, требующими ожидания (например, тайм-ауты, ожидание ответа от сервера и т.д.)

В большинстве случаев, для простых циклов, асинхронный loop с использованием .await будет достаточно, но для более сложных сценариев, включая управление состоянием, future::loop_fn может быть более удобным инструментом.


use futures::future::{self, Loop};
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let mut counter = 0;
    // Создаем цикл с использованием future::loop_fn
    let future = future::loop_fn(counter, |mut state| {
        let new_state = state + 1;
        // Асинхронная задача, которая выполняется на каждой итерации
        Box::pin(async move {
            println!("Counter value: {}", state);
            // Условие завершения цикла
            if state >= 5 {
                // Если условие выполнено, возвращаем Loop::Break с результатом
                return Loop::Break(state);
            }
            // В противном случае продолжаем цикл
            sleep(Duration::from_secs(1)).await; // Пауза на 1 секунду
            Loop::Continue(new_state)
        })
    });
    // Выполняем асинхронный цикл
    let result = future.await;
    println!("Loop completed with final value: {}", result);
}

future::catch_unwind — это комбинатор, который используется для ловли паник в асинхронных задачах и возврата ошибки вместо того, чтобы программа аварийно завершалась. Это полезно, когда вы хотите обработать возможные ошибки в коде, который может вызвать панику, и безопасно вернуть результат вместо того, чтобы приложение крашилось.


use futures::future::{self, FutureExt};
use std::panic;

#[tokio::main]
async fn main() {
    // Асинхронная задача, которая может вызвать панику
    let safe_future = async {
        let result: Result = Err("An error occurred!");
        match result {
            Ok(value) => println!("Task succeeded with value: {}", value),
            Err(err) => {
                println!("Task failed with error: {}", err);
                panic!("Simulating panic for demonstration purposes");
            }
        }
    };
    // Использование `catch_unwind` для безопасности
    let safe_result = future::catch_unwind(safe_future).await;
    match safe_result {
        Ok(_) => println!("Task completed without panic."),
        Err(err) => {
            if let Some(message) = err.downcast_ref::<&str>() {
                println!("Caught panic: {}", message);
            } else {
                println!("Caught an unknown panic type.");
            }
        }
    }
}

future::shared — это комбинатор из библиотеки futures, который позволяет вам создать "общий" (shared) Future. Это полезно, когда вам нужно, чтобы несколько задач могли совместно использовать один и тот же Future и получать его результат.

Метод shared можно применять к типам, которые реализуют Future. Основное ограничение заключается в том, что переданный тип должен быть реализован с помощью комбинаторов из futures, и он должен быть ленивым и обеспечивать возможность Clone.


use futures::future::{self, Shared};
use futures::FutureExt; // Для метода `shared`

async fn async_task() -> u32 {
    // Здесь могут быть более сложные асинхронные операции, например, сетевые запросы или вычисления
    42
}
#[tokio::main]
async fn main() {
    let shared_future = async_task().shared();
    let future_clone1 = shared_future.clone();
    let future_clone2 = shared_future.clone();
    let result1 = future_clone1.await;
    println!("Result from first clone: {}", result1);
    let result2 = future_clone2.await;
    println!("Result from second clone: {}", result2);
}

Асинхронные замыкания?

RFC 62290

То есть асинхронные замыкания уже стабилизировались (RFC)


fn main() {
    let closure = async || {
        dbg!();
    };
}

Не путайте асинхронные замыкания и замыкания, возвращающие Future:

Первое — async [move] |...| { ... } (стабилизированное) Второе — |...| async [move] { ... } (стабилизированное)

Времена жизни async

async-book

Один общий обходной путь для включения async fn со ссылками в аргументах в 'static футуру состоит в том, чтобы связать аргументы с вызовом async fn внутри async-блока

fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}
fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}

.await в многопоточном исполнителе

send-approximation

Обратите внимание, что при использовании Future в многопоточном исполнителе, Future может перемещаться между потоками. Поэтому любые переменные, используемые в телах async, должны иметь возможность перемещаться между потоками, так как любой .await потенциально может привести к переключению на новый поток.

Это означает, что не безопасно использовать Rc, &RefCell или любые другие типы, не реализующие типаж Send (включая ссылки на типы, которые не реализуют типаж Sync).

(Предостережение: можно использовать эти типы до тех пор, пока они не находятся в области действия вызова .await.)

Точно так же не очень хорошая идея держать традиционную non-futures-aware блокировку через .await, так как это может привести к блокировке пула потоков: одна задача может получить объект блокировки, вызвать .await и передать управление исполнителю, разрешив другой задаче совершить попытку взять блокировку, что вызовет взаимную блокировку. Чтобы избежать этого, используйте Mutex из futures::lock, а не из std::sync.

Например, рассмотрим простой не-Send тип, например, содержащий Rc:

use std::rc::Rc;

#[derive(Default)]
struct NotSend(Rc<()>);

async fn bad() {
    NotSend::default();
    bar().await;
}

// вы можете создать блок, инкапсулирующий любые не-Send переменные. 
// С помощью этого, компилятору будет проще понять, что такие переменные не переживут момент вызова .await

async fn good() {
    {
        let x = NotSend::default();
    }
    bar().await;
}
fn get_future(val:String)-> impl Future<Item=String, Error=()>{
    MyTask(val)
}

#[derive(Debug)]
struct Client {
    ping_count: u8,
}

impl Client {
    fn new() -> Self {
        Client { ping_count: 0 }
    }
    fn send_ping(self) -> futures::future::FutureResult<Self, std::io::Error> {
        futures::future::ok(Client { ping_count: self.ping_count + 1 })
    }
    fn receive_pong(self) -> futures::future::FutureResult<(Self, bool), std::io::Error> {
        let done = self.ping_count >= 5;
        futures::future::ok((self, done))
    }
}

Рекурсия

futures::future::BoxFuture

05_recursion

// Эта функция:
async fn foo() {
    step_one().await;
    step_two().await;
}
// создаёт типы, подобные следующим:
enum Foo {
    First(StepOne),
    Second(StepTwo),
}
// создаёт такие типы:
enum Recursive {
    First(Recursive),
    Second(Recursive),
}

// А эта функция не будет работать  - мы создали тип бесконечного размера!:
async fn recursive() {
    recursive().await;
    recursive().await;
}

Чтобы исправить это, мы должны ввести косвенность при помощи Box. К сожалению, из-за ограничений компилятора, обернуть вызов recursive() в Box::pin не достаточно. Чтобы это заработало, мы должны сделать recursive не асинхронной функцией, которая возвращает .boxed() с async блоком:

use futures::future::{BoxFuture, FutureExt};
fn recursive() -> BoxFuture<'static, ()> {
    async move {
        recursive().await;
        recursive().await;
    }.boxed()
}

Реализуйте инструмент CLI, управляемый асинхронным режимом, который загружает указанные веб-страницы:

cargo run -p step_3_11 - [--max-threads = <число> ] <файл>

Он должен прочитать список ссылок из <file>, а затем одновременно загрузить содержимое каждой ссылки в отдельный .html файл (названный ссылкой).

--max-threads Аргумент должен контролировать максимальное количество одновременно работающих потоков в программе (по умолчанию должно быть количество процессоров).


use futures::{future, StreamExt};
use once_cell::sync::Lazy;
use regex::Regex;
use reqwest::Response;
use structopt::StructOpt;
use tokio::{fs, io::AsyncWriteExt, runtime};

#[derive(Debug, StructOpt)]
struct Options {
    #[structopt(long, takes_value = true)]
    max_threads: Option,
    #[structopt(takes_value = true, value_name = "file")]
    file: String,
}
impl Options {
    fn init() -> Self {
        let mut options: Options = Options::from_args();
        options.file = format!("3_ecosystem/3_11_async/{}", options.file);
        if options.max_threads.is_none() {
            options.max_threads = Some(num_cpus::get());
        }
        options
    }
}
fn main() {
    let options = Options::init();
    let mut rt = runtime::Builder::new()
        .core_threads(options.max_threads.unwrap())
        .threaded_scheduler()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let urls = fs::read_to_string(options.file.as_str())
            .await
            .expect("No urls file found");
        let urls: Vec<&str> = urls.split('\n').collect();
        let requests = urls.into_iter().map(reqwest::get).collect::>();
        let responses = future::join_all(requests)
            .await
            .into_iter()
            .map(std::result::Result::unwrap)
            .collect::>();
        let streams = responses
            .into_iter()
            .map(|resp| async move {
                static RE: Lazy = Lazy::new(|| {
                    Regex::new(r"((https://)|(http://))?(www\.)?(?P[^.]+)").unwrap()
                });
                let captures = RE.captures(resp.url().as_ref()).unwrap();
                let filename = captures.name("name").unwrap().as_str();
                let mut file =
                    fs::File::create(format!("3_ecosystem/3_11_async/{}.html", filename))
                        .await
                        .unwrap();
                let mut stream = resp.bytes_stream();
                while let Some(chunk) = stream.next().await {
                    file.write_all(&chunk.unwrap()).await.unwrap();
                }
            })
            .collect::>();
        future::join_all(streams).await;
    });
}

worker pool

File Cargo.toml

[dependencies]
rand = "0.8"

# sync
rayon = "1"

# async
tokio = { version = "1", features = ["full"] }
futures = "0.3"

Async


use futures::{stream, StreamExt};
use rand::{thread_rng, Rng};
use std::time::Duration;

 async fn compute_job(job: i64) -> i64 {
     let mut rng = thread_rng();
     let sleep_ms: u64 = rng.gen_range(0..10);
     tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
     job * job
 }
 async fn process_result(result: i64) {
     println!("{}", result);
 }
 #[tokio::main]
 async fn main() {
    let jobs = 0..100;
    let concurrency = 42;
    stream::iter(jobs)
         .for_each_concurrent(concurrency, |job| async move {
             let result = compute_job(job).await;
             process_result(result).await;
         })
         .await;
   stream::iter(jobs)
     .map(compute_job)
     .buffer_unordered(concurrency)
     .for_each(process_result)
     .await;
   let results: Vec = stream::iter(jobs)
     .map(compute_job)
     .buffer_unordered(concurrency)
     .collect()
     .await;
 }

Sync


use rand::{thread_rng, Rng};
use rayon::prelude::*;
use std::time::Duration;

fn compute_job(job: i64) -> i64 {
    let mut rng = thread_rng();
    let sleep_ms: u64 = rng.gen_range(0..10);
    std::thread::sleep(Duration::from_millis(sleep_ms));
    job * job
}
fn process_result(result: i64) {
    println!("{}", result);
}
fn main() {
    let jobs = 0..100;
    jobs.into_par_iter()
        .map(compute_job)
        .for_each(process_result);
}

Я только начал экспериментировать с фьючерсами/токио в Rust. Я могу делать действительно простые вещи с фьючерсами или просто с потоками. Мне было интересно, как вы можете выбирать между будущим и потоком.

Как я могу продлить игрушку из документации tokio, чтобы использовать tokio_timer::Timer для выполнения запрошенного HTTPS-запроса?


extern crate futures;
extern crate native_tls;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_tls;

use std::io;
use std::net::ToSocketAddrs;
use futures::Future;
use native_tls::TlsConnector;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use tokio_tls::TlsConnectorExt;

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let addr = "www.rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap();

    let cx = TlsConnector::builder().unwrap().build().unwrap();
    let socket = TcpStream::connect(&addr, &handle);

    let tls_handshake = socket.and_then(|socket| {
        let tls = cx.connect_async("www.rust-lang.org", socket);
        tls.map_err(|e| {
            io::Error::new(io::ErrorKind::Other, e)
        })
    });
    let request = tls_handshake.and_then(|socket| {
        tokio_io::io::write_all(socket, "\
            GET / HTTP/1.0\r\n\
            Host: www.rust-lang.org\r\n\
            \r\n\
        ".as_bytes())
    });
    let response = request.and_then(|(socket, _request)| {
        tokio_io::io::read_to_end(socket, Vec::new())
    });
    let (_socket, data) = core.run(response).unwrap();
    println!("{}", String::from_utf8_lossy(&data));
}

Вы можете преобразовать Future в Stream, а затем выбрать два потока:

extern crate futures; // 0.2.1
use futures::{Future, FutureExt, Stream, StreamExt};

fn select_stream_or_future_as_stream<S, F>(
    stream: S,
    future: F,
) -> impl Stream<Item = S::Item, Error = S::Error>
where
    S: Stream + 'static,
    F: Future<Item = S::Item, Error = S::Error> + 'static,
{
    future.into_stream().select(stream)
}
let timer = tokio_timer::Timer::default();
    // Error out when timeout is reached
    let timeout = timer.sleep(time::Duration::from_millis(950)).then(|_| {
        future::err(io::Error::new(io::ErrorKind::Other, "Timeout"))
    });
    let handle = core.handle();

    // this returns IoFuture = BoxFuture<T, io::Error>;
    let addresses = tokio_dns::CpuPoolResolver::new(1 as usize).resolve("www.google.cz");
    let socket = addresses.and_then(|all_addresses| {
        let mut ipv4_addresses =  all_addresses.iter().filter(|x| is_ipv4(**x));
        let addr = ipv4_addresses.next().unwrap();
        let sock = TcpStream::connect(&SocketAddr::new(*addr, 443), &handle);
        sock.map_err(|e| {
            println!("{:?}", e);
            io::Error::new(io::ErrorKind::Other, e)
        })
    });
    let tls_handshake = socket.and_then(|socket| {
        println!("Got socket");
        let cx = TlsConnector::builder().unwrap().build().unwrap();
        let tls = cx.connect_async("www.google.cz", socket);
        tls.map_err(|e| {
            println!("{:?}", e);
            io::Error::new(io::ErrorKind::Other, e)
        })
    });
    let request = tls_handshake.and_then(|socket| {
        println!("SSL Handshake Successful");
        let write_all = tokio_io::io::write_all(socket, "\
            GET / HTTP/1.0\r\n\
            Host: www.google.cz\r\n\
            \r\n\
        ".as_bytes());
        println!("Wrote to socket");
        write_all.map_err(|e| {
            println!("{:?}", e);
            io::Error::new(io::ErrorKind::Other, e)
        })
    });
    let response = request.and_then(|(socket, _request)| {
        let read_till_end = tokio_io::io::read_to_end(socket, Vec::new());
        println!("Read till end of socket");
        read_till_end
    });
    let waiter = response.select(timeout).map(|(win, _)| {
        let (_socket, data) = win;
        data
    });
    let result = core.run(waiter);

Самый простой способ сделать HTTP в Rust - это reqwest, Это оболочка для создания Hyper проще в использовании.

Hyper является популярной библиотекой HTTP для Rust и использует две библиотеки: Tokio для создания неблокирующих запросов и фьючерсные-RS для фьючерсов / обещаний. Hyper приведенный ниже пример и во многом вдохновлен пример в его документации

File В Cargo.toml:

[dependencies]
hyper = "0.11"
tokio-core = "0.1"
futures = "0.1"

// Rust 1.19, Hyper 0.11, tokio-core 0.1, futures 0.1
extern crate futures;
extern crate hyper;
extern crate tokio_core;
use futures::{Future};
use hyper::{Client, Uri};
use tokio_core::reactor::Core;

fn main() {
    // Core is the Tokio event loop used for making a non-blocking request
    let mut core = Core::new().unwrap();
    let client = Client::new(&core.handle());
    let url : Uri = "http://httpbin.org/response-headers?foo=bar".parse().unwrap();
    assert_eq!(url.query(), Some("foo=bar"));
    let request = client.get(url)
        .map(|res| {
            assert_eq!(res.status(), hyper::Ok);
        });
    // request is a Future, futures are lazy, so must explicitly run
    core.run(request).unwrap();
}

сервер на одном сокете

pub struct Server {
    listener: TcpListener,
    connections: Vec<Box<Future<Item = (), Error = io::Error> + Send>>,
}

impl Future for Server {
    type Item = ();
    type Error = io::Error;

    fn poll(&mut self) -> Result<Async<()>, io::Error> {
        // First, accept all new connections
        loop {
            match self.listener.poll_accept()? {
                Async::Ready((socket, _)) => {
                    let connection = process(socket);
                    self.connections.push(Box::new(connection));
                }
                Async::NotReady => break,
            }
        }

        // Now, poll all connection futures.
        let len = self.connections.len();

        for i in (0..len).rev() {
            match self.connections[i].poll()? {
                Async::Ready(_) => {
                    self.connections.remove(i);
                }
                Async::NotReady => {}
            }
        }

        // `NotReady` is returned here because the future never actually
        // completes. The server runs until it is dropped.
        Ok(Async::NotReady)
    }
}

Executor (Исполнитель)

Исполнители несут ответственность за повторное вызов poll задачи до Ready ее возвращения Например, CurrentThread исполнитель блокирует текущий поток и перебирает все нерешенные задачи, вызывая опрос на них. ThreadPool распределяет задачи по пулу потоков. Это также исполнитель по умолчанию, используемый средой выполнения runtime

extern crate futures;
extern crate rand;
extern crate tokio;
use rand::{thread_rng, Rng};
use std::collections::VecDeque;

// Исполнитель
pub struct SpinExecutor {
    tasks: VecDeque<Box<Future<Item = String, Error = () >>>,
}
impl SpinExecutor {
    pub fn spawn<T>(&mut self, task: T)
    where T: Future<Item = String, Error = ()> + 'static{
        self.tasks.push_back(Box::new(task));
    }
    pub fn run(&mut self) {
        while let Some(mut task) = self.tasks.pop_front() {
             match task.poll().unwrap()  {
                Async::Ready(_) => {}
                Async::NotReady => { self.tasks.push_back(task);}
            }
        /*
            match  task.poll(){
                Ok(value) => {
                    match value {
                        Async::Ready(_) => {}
                       Async::NotReady => { self.tasks.push_back(task);}
                   }
               },
               Err(e) =>{ eprintln!(""Error {:?}"",e); }
           }
        */
    }
    // Исполнитель вращается в цикле занятости и пытается опросить все задания, даже если задача снова вернет NotReady.
    //В идеале для исполнителя может быть какой-то способ узнать, когда изменяется состояние «готовности» задачи, то есть когда вызов опроса вернется в режим готовности.
    }
}

// Улучшенный исполнитель
pub struct SpinExecutorNew {
    ready_tasks: VecDeque<Box<Future<Item = String, Error = ()>>>,// для готовых задач на опрос
    not_ready_tasks:VecDeque<Box<Future<Item = String, Error = ()>>>// для не готовых задач, вернувшие состояние Async::NotReady
}

impl SpinExecutorNew {
    pub fn spawn<T>(&mut self, task: T)
    where T: Future<Item = String, Error = ()> + 'static{
        self.ready_tasks.push_back(Box::new(task));
    }
    pub fn run(&mut self) {
        // Исполнитель вращается в цикле занятости и пытается опросить все задания, даже если задача снова вернет NotReady.
        //В идеале для исполнителя может быть какой-то способ узнать, когда изменяется состояние «готовности» задачи, то есть когда вызов опроса вернется в режим готовности.
        loop {
            while let Some(mut task) = self.ready_tasks.pop_front() {
                match task.poll().unwrap() {
                    Async::Ready(_) => {}
                    Async::NotReady => {
                        self.not_ready_tasks.push_back(task);
                    }
                }
            }
            if self.not_ready_tasks.is_empty() {
                return;
            }
            println!(""готовых задач нет, ждем немного"");
            // Положите поток спать, пока не будет работы
            self.sleep_until_tasks_are_ready();
            // После паузы перебросим задачи на проверку
            if self.ready_tasks.is_empty() {
                while let Some(mut task) = self.not_ready_tasks.pop_front() {
                    self.ready_tasks.push_back(task);
                }
            }
        }
    }
    fn sleep_until_tasks_are_ready(&mut self){
        std::thread::sleep(std::time::Duration::from_millis(10));
    }
}

1. Task (Задача)

use futures::prelude::*;
fn poll_widget() -> futures::Async<u32> {
    let mut rng = thread_rng();
    let n: u32 = rng.gen_range(0, 100);
    if n > 89{
      futures::Async::Ready(n)
    }else{
     futures::Async::NotReady
    }
}
#[derive(Debug)]
pub struct MyTask(String);

impl Future for MyTask {
    type Item = String;
    type Error = ();

    fn poll(&mut self) -> Result<futures::Async<String>, ()> {
        let mut rng = thread_rng();
        let n: u32 = rng.gen_range(0, 100);
        if n > 89{
            println!(""Результат = {:?}"", n);
            Ok(futures::Async::Ready(format!(""{}"",n)))
        }else{
            Ok(futures::Async::NotReady)
        }

        /* 
        match poll_widget() {
            futures::Async::Ready(widget) => {
                println!(""widget={:?}"", widget);
                Ok(futures::Async::Ready(()))
            }
            futures::Async::NotReady => {
                Ok(futures::Async::NotReady)
            }
        }
        */
    }
}

2. Task (Задача)


fn main(){
    let task:_ = MyTask(""bla"".to_string());
    // let mut buf:VecDeque>> = VecDeque::new();
    // buf.push_back(Box::new(task));
    // let mut exec:SpinExecutor = SpinExecutor{tasks:buf};
    // exec.run();

    let mut exec:SpinExecutorNew = SpinExecutorNew{ready_tasks:buf,not_ready_tasks:VecDeque::new()};
    exec.spawn(Box::new(MyTask(""bla2"".to_string())));
    exec.spawn(Box::new(MyTask(""bla3"".to_string())));
    exec.run();
    //---------------------- изменение типа future (map, map_err);
    //map  преобразование future возвращает теперь u32
    let mut new_future = get_future(""11"".to_string()).map(|string| {
        string.parse::().unwrap()
    });
    //----------------------  futures::future::select_all получает результат из цикла событий
    let v=vec![ futures::future::result::(Ok(""result"".to_string()))];
    let (i, idx, v)  = futures::future::select_all(v).wait().ok().unwrap();
    println!(""{:?}"", i);
    //---------------------- запуск другого future, когда исходный будет выполнен (then, and_then, or_else)
    //and_then  вызов следующего future после успеха предыдущего
    let f = get_future(""future1"".to_string())
    .and_then(|res|{
        println!(""Второй Future  "");
        get_future(res)
    })
    .and_then(|res|{
        println!(""Третий Future  "");
        futures::future::ok::(""ok"".to_string())
    })
    .and_then(|res|{
        println!(""Четвертый Future  "");
        futures::lazy(|| {
            println!(""lazy."");
            futures::future::ok::(""lazy"".to_string())
        })
    })
    .and_then(|res|{
        println!(""Пятый Future  "");
        get_future_result()
    }).and_then(|res|{
        println!(""Шестой Future  "");
        futures::future::result::(Ok(""result"".to_string()))
    });

    fn get_future_result()-> futures::future::FutureResult{
        futures::future::ok(""ok"".to_string())
    }
    // Исполнители простой цикл перебора futures пока не вернуть готовый результат  Async::Ready
    let mut exec:SpinExecutorNew = SpinExecutorNew{ready_tasks:VecDeque::new(),not_ready_tasks:VecDeque::new()};
    exec.spawn(Box::new(  f ));
    exec.run();

    //let mut buf:VecDeque>> = VecDeque::new();
    //buf.push_back(Box::new( f ));
    //let mut exec:SpinExecutor = SpinExecutor{tasks:buf};
    //exec.run();
    //--------------------- futures::future::loop_fn создает цикл событий для объекта
    let ping_til_done = futures::future::loop_fn(Client::new(), |client| {
        client.send_ping()
        .and_then(|client| client.receive_pong())
        .and_then(|(client, done)| {// сами решаем из результата как крутить цикл
            if done {
               Ok(futures::future::Loop::Break(client))// остановить
            } else {
               Ok(futures::future::Loop::Continue(client)) // продолжить
            }
        })
    });
    println!(""{:?}"",  ping_til_done.wait().unwrap());// Client { ping_count: 5 }

    let v=vec![ ping_til_done ];
    let (i, idx, v)  = futures::future::select_all(v).wait().ok().unwrap();
    println!(""{:?}"", i);
    //--
    println!(""{:?}"", futures::executor::spawn( MyTask(""11"".to_string()).poll()) );
}

Проблема отмены future

Каждая точка await в асинхронном коде представляет собой момент, когда выполнение может быть прервано, а контроль — возвращен пользователю этого future. Пользователь может при желании удалить future в этой точке, полностью остановив его выполнение.

async fn read_send(file: &mut File, channel: &mut Sender<...>) {
  loop {
    let data = read_next(file).await;
    let items = parse(&data);
    for item in items {
      channel.send(item).await;
    }
  }
}

Когда пользователь вызывает функцию read_send, то опрашивает ее, пока она не достигнет второй точки await (отправка по каналу), затем уничтожает future read_send, а все локальные переменные (data, items и item) автоматически удаляются. Таким образом пользователь извлечет данные из file, но без отправки этих данных по каналу channel. Эти данные просто теряются. Вы спросите: «Зачем пользователю это делать? Зачем опрашивать future, а затем уничтожать до его завершения?». Именно это делает макрос futures::select!

Проблема не в том, что именно происходит, а скорее в том, что происходит не то, что задумал пользователь.

Я вижу четыре способа решения этой проблемы

Я вижу четыре способа решения этой проблемы

1. Чтобы не уничтожать future, переписываем select!.

Пример. Это, пожалуй, лучшее решение в данной конкретной ситуации. Но иногда оно чревато большими сложностями, например при воссоздании future с другим File, когда сокет получает сообщение.


fn main(){
    let mut file = ...;
    let mut channel = ...;
    let mut future = read_send(&mut file, &mut channel).fuse();
    pin_mut!(future);
    loop {
        futures::select! {
            _ => &mut future => {},
            some_data => socket.read_packet() => {
                // ...
            }
        }
    }
}

Я вижу четыре способа решения этой проблемы

2. Делаем так, чтобы read_send выполняла чтение и отправку атомарно.

Пример. В целом это лучшее решение, но оно не всегда возможно или приводит к накладным расходам в сложных ситуациях.

async fn read_send(file: &mut File, channel: &mut Sender<...>) {
  loop {
    future::poll_fn(|cx| channel.poll_ready(cx)).await; // Waits until channel has a slot
    let item = read_next_and_parse(file).await; // Only read the minimum for one item, to avoid multiple items
    channel.try_send(item).unwrap();  // We are guaranteed a slot in the channel as per above
  }
}

Я вижу четыре способа решения этой проблемы

3. Меняем API у read_send и избегаем любых локальных переменных в точке завершения.

Пример. Пример из реальной ситуации. Тоже хорошее решение, но такой код бывает трудно написать, так как он все больше походит на futures, написанные вручную.

struct ReadSend<'a> {
    file: &'a mut File,
    channel: &'a mut Sender<...>,
    items: Vec<...>,
}

impl ReadSend {
    async pub fn next(&mut self) {
        // Never use any local variable across an await point
        loop {
            for item in self.items.drain() {
                self.channel.send(item).await;
            }

            let data = read_next(self.file).await;
            self.items = parse(&data);
        }
    }
}

Я вижу четыре способа решения этой проблемы

4. Не задействуем select! и создаем фоновую задачу для чтения.

При необходимости используем канал для взаимодействия с фоновой задачей, так как на извлечение элементов из каналов отмена future не влияет.

Пример. Это часто оказывается лучшим решением, хотя добавляет временную задержку и делает невозможным последующий доступ к file и channel


fn main(){
    let mut file = ...;
    let mut channel = ...;

    tokio::spawn(async move {
        read_send(&mut file, &mut channel).await;
    });

    loop {
        let some_data = socket.read_packet().await;
        // ...
    }
}

Просто создайте новую задачу

самым простым решением проблем, связанных с отменой future, часто бывает создание дополнительной фоновой задачи. То есть, когда опрашивается больше 2–3 futures параллельно, все проблемы обычно решаются созданием дополнительных фоновых задач.

Обмен данными между несколькими такими фоновыми задачами осуществляется либо по каналам, либо с помощью Arc — как с Mutex (мьютексом), так и без него. Обычно это приводит к так называемой Arc-ификации кода. Вместо того, чтобы помещать объекты в стек, все оборачивают в Arc и передают. Ничего плохого нет в том, чтобы помещать все в Arc. Проблема в том, что… возникает ощущение, будто и не на Rust уже пишешь.


use std::io::{self, ErrorKind};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream};

struct LinesReader {
    stream: DuplexStream,
}
impl LinesReader {
    fn new(stream: DuplexStream) -> Self {
        Self { stream }
    }
    async fn next(&mut self) -> io::Result> {
        let mut bytes = Vec::new();
        let mut buf = [0];
        while self.stream.read(&mut buf[..]).await? != 0 {
            bytes.push(buf[0]);
            if buf[0] == b'\n' {
                break;
            }
        }
        if bytes.is_empty() {
            return Ok(None);
        }
        let s = String::from_utf8(bytes)
            .map_err(|_| io::Error::new(ErrorKind::InvalidData, "not UTF-8"))?;
        Ok(Some(s))
    }
}
async fn slow_copy(source: String, mut dest: DuplexStream) -> std::io::Result<()> {
    for b in source.bytes() {
        dest.write_u8(b).await?;
        tokio::time::sleep(Duration::from_millis(10)).await
    }
    Ok(())
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
    let (client, server) = tokio::io::duplex(5);
    let handle = tokio::spawn(slow_copy("hi\nthere\n".to_owned(), client));

    let mut lines = LinesReader::new(server);
    let mut interval = tokio::time::interval(Duration::from_millis(60));
    loop {
        tokio::select! {
            _ = interval.tick() => println!("tick!"),
            line = lines.next() => if let Some(l) = line? {
                print!("{}", l)
            } else {
                break
            },
        }
    }
    handle.await.unwrap()?;
    Ok(())
}