面试题答案
一键面试- 异步任务编排
- 批量查询:将大规模数据查询拆分成多个较小的查询任务。例如,如果要查询表中所有记录,可以按照一定的ID范围或者分页方式,把查询操作分成多个子任务。这样每个任务查询的数据量较小,执行时间相对较短,能更高效地利用异步资源。
- 并发执行:使用Tokio的
join!
宏或futures::future::join_all
函数来并发执行这些子查询任务。例如:
use tokio;
use sqlx::PgPool;
async fn query_subset(pool: &PgPool, start: i64, end: i64) -> sqlx::Result<Vec<MyRowType>> {
sqlx::query_as!(MyRowType, "SELECT * FROM my_table WHERE id BETWEEN $1 AND $2", start, end)
.fetch_all(pool)
.await
}
async fn query_large_data(pool: &PgPool) -> sqlx::Result<Vec<MyRowType>> {
let num_subtasks = 10;
let mut tasks = Vec::with_capacity(num_subtasks);
let total_count = get_total_count(pool).await?;
let step = total_count / num_subtasks;
for i in 0..num_subtasks {
let start = i * step;
let end = if i == num_subtasks - 1 { total_count } else { (i + 1) * step };
tasks.push(tokio::spawn(query_subset(pool, start, end)));
}
let mut all_results = Vec::new();
for task in tasks {
let result = task.await??;
all_results.extend(result);
}
Ok(all_results)
}
- **任务优先级**:如果有其他重要的任务(如系统监控任务、紧急事务处理任务等),可以通过`tokio::task::spawn_with_priority`设置任务优先级,确保关键任务优先执行。
2. 数据库连接池管理
- 合理设置连接池大小:根据系统的硬件资源(如CPU核心数、内存大小)和预期的并发请求数来设置sqlx
连接池的大小。一般来说,可以根据经验公式,例如连接池大小为CPU核心数的2 - 4倍。例如:
use sqlx::PgPool;
let pool = PgPool::connect_with(
sqlx::postgres::PgConnectOptions::new()
.host("localhost")
.database("mydb")
.username("user")
.password("password")
.pool_size(4 * num_cpus::get())
).await.expect("Failed to connect to database");
- **连接复用**:`sqlx`连接池会自动管理连接的复用。在异步任务中,每次获取连接后,使用`let conn = pool.acquire().await.expect("Failed to acquire connection");`,任务完成后连接会自动返回连接池,供其他任务复用,避免频繁创建和销毁连接带来的开销。
- **连接健康检查**:定期对连接池中的连接进行健康检查。可以使用`sqlx`提供的`Pool::recycle`方法来检查并清理无效连接,确保连接池中的连接始终可用。例如,可以在一个单独的异步任务中定时执行:
use tokio::time::{sleep, Duration};
async fn check_connection_health(pool: &PgPool) {
loop {
for conn in pool.clone().get_connections() {
if let Err(e) = sqlx::query("SELECT 1").fetch_one(&conn).await {
pool.recycle(&conn).await.expect("Failed to recycle connection");
}
}
sleep(Duration::from_secs(60)).await;
}
}
- 避免资源耗尽
- 限制并发任务数量:除了设置连接池大小外,还可以限制同时执行的异步任务数量。可以使用
tokio::sync::Semaphore
来实现这一点。例如:
- 限制并发任务数量:除了设置连接池大小外,还可以限制同时执行的异步任务数量。可以使用
use tokio::sync::Semaphore;
let max_tasks = 10;
let semaphore = Semaphore::new(max_tasks);
let mut tasks = Vec::new();
for _ in 0..100 {
let permit = semaphore.acquire().await.expect("Failed to acquire permit");
tasks.push(tokio::spawn(async move {
// 执行查询任务
drop(permit);
}));
}
- **内存管理**:在处理大量数据时,要注意内存的使用。如果查询结果数据量过大,可以考虑分块处理,避免一次性加载过多数据到内存中。例如,在查询时使用`fetch`方法逐块获取数据,而不是`fetch_all`一次性获取所有数据。
let mut rows = sqlx::query_as!(MyRowType, "SELECT * FROM my_table")
.fetch(pool);
while let Some(row) = rows.next().await {
let row = row.expect("Failed to fetch row");
// 处理每一行数据
}