错误处理
- Result类型包装:在异步函数中,返回值使用
Result
类型来包装可能出现的错误。例如:
use std::io;
async fn async_operation() -> Result<i32, io::Error> {
// 模拟异步操作,这里可能返回错误
Ok(42)
}
- await 与? 操作符:使用
await
时,可以通过?
操作符来便捷地传播错误。例如:
async fn handle_async_operation() -> Result<(), io::Error> {
let result = async_operation().await?;
println!("The result is: {}", result);
Ok(())
}
- try_join! 宏:当有多个异步任务并行执行且需要处理错误时,
try_join!
宏很有用。它并行运行多个异步任务,一旦其中一个任务返回错误,整个操作立即返回错误。
use futures::try_join;
async fn task1() -> Result<i32, &'static str> {
Ok(1)
}
async fn task2() -> Result<i32, &'static str> {
Ok(2)
}
async fn parallel_tasks() -> Result<(i32, i32), &'static str> {
try_join!(task1(), task2())
}
控制并发数量
- tokio::sync::Semaphore:Tokio库中的
Semaphore
可以用来限制并发任务的数量。例如:
use tokio::sync::Semaphore;
async fn limited_task(semaphore: &Semaphore) {
let permit = semaphore.acquire().await.unwrap();
// 执行异步任务
drop(permit); // 任务完成后释放许可
}
async fn run_limited_tasks() {
let semaphore = Semaphore::new(5); // 最多允许5个并发任务
let tasks: Vec<_> = (0..10).map(|_| {
let semaphore = semaphore.clone();
tokio::spawn(async move {
limited_task(&semaphore).await;
})
}).collect();
for task in tasks {
task.await.unwrap();
}
}
- futures::stream::FuturesUnordered:结合
FuturesUnordered
和try_fold
也可以控制并发数量并处理错误。
use futures::{stream::FuturesUnordered, TryStreamExt};
async fn async_task() -> Result<i32, &'static str> {
Ok(42)
}
async fn run_tasks() -> Result<(), &'static str> {
let mut tasks = FuturesUnordered::new();
for _ in 0..10 {
tasks.push(async_task());
}
tasks.try_fold(0, |acc, result| async move {
let new_acc = acc + result?;
Ok(new_acc)
}).await?;
Ok(())
}