协程并发策略设计
- 理论方面
- 限制并发数:在高并发网络爬虫应用中,过多的并发请求可能会耗尽系统资源或被目标服务器封禁。使用
Semaphore
来限制同时执行的协程数量。Semaphore
类似于一个信号量,允许一定数量的协程同时通过,其他协程则需要等待信号量释放。
- 任务优先级:根据任务的重要性或紧急程度为任务分配优先级。例如,对于某些关键页面的爬取任务可以设置较高优先级,优先执行。可以使用优先级队列来管理任务,在通道中按照优先级顺序发送和接收任务。
- 代码实现思路
import kotlinx.coroutines.*
import java.util.concurrent.Semaphore
// 定义最大并发数
private const val MAX_CONCURRENT = 10
private val semaphore = Semaphore(MAX_CONCURRENT)
fun main() = runBlocking {
val tasks = List(100) { "Task $it" }
val job = launch {
tasks.forEach { task ->
semaphore.acquire()
launch {
try {
// 模拟耗时操作
delay(100)
println("Processed $task")
} finally {
semaphore.release()
}
}
}
}
job.join()
}
优化挂起函数执行效率
- 理论方面
- 减少挂起次数:挂起函数会暂停协程执行,过多的挂起操作会增加上下文切换开销。尽量将多个相关的异步操作合并成一个挂起函数调用,减少不必要的挂起。
- 复用资源:对于网络连接等资源,尽量复用而不是每次请求都重新创建。例如,使用连接池来管理网络连接,避免频繁创建和销毁连接带来的性能损耗。
- 非阻塞I/O:使用Kotlin协程中的非阻塞I/O操作,如
OkHttp
的异步请求,它在执行I/O操作时不会阻塞主线程,提高系统整体的并发处理能力。
- 代码实现思路
import kotlinx.coroutines.*
import okhttp3.*
import java.io.IOException
val client = OkHttpClient()
suspend fun fetchData(url: String): String {
return suspendCancellableCoroutine { continuation ->
val request = Request.Builder()
.url(url)
.build()
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
continuation.resumeWithException(e)
}
override fun onResponse(call: Call, response: Response) {
response.use {
if (!response.isSuccessful) throw IOException("Unexpected code $response")
continuation.resume(response.body!!.string())
}
}
})
}
}
处理通道可能出现的性能瓶颈
- 理论方面
- 缓冲处理:通道的默认模式是无缓冲的,这意味着发送和接收操作必须同时准备好才能进行。对于高并发场景,可以使用有缓冲的通道(
Channel
的构造函数中指定缓冲区大小)。这样发送端可以先将数据发送到缓冲区,而不需要等待接收端立即处理,减少发送端的等待时间。
- 背压处理:当发送端速度远快于接收端时,会产生背压问题。可以使用
produce
函数创建一个生产者协程,并通过 ReceiveChannel
的 flow
扩展函数将其转换为 Flow
,然后使用 Flow
的背压策略(如 collectLatest
、conflate
等)来处理背压。
- 代码实现思路
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
fun main() = runBlocking {
// 创建一个有缓冲的通道
val channel = Channel<Int>(100)
launch {
for (i in 1..1000) {
channel.send(i)
}
channel.close()
}
launch {
for (num in channel) {
// 模拟处理数据
delay(10)
println("Processed $num")
}
}
delay(2000)
}
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val producer = produce<Int> {
for (i in 1..1000) {
send(i)
}
}
producer.asFlow()
.collectLatest { num ->
// 模拟处理数据
delay(10)
println("Processed $num")
}
}