实现思路
- 定义任务结构体,包含大文件路径及处理操作相关信息。
- 创建任务数组,填充任务。
- 使用
for
循环启动goroutine
处理任务。
- 利用
sync.WaitGroup
等待所有goroutine
完成。
- 为避免数据竞争,对共享资源(如全局计数器等)操作使用
sync.Mutex
保护。
- 为避免资源耗尽,限制并发数,可使用带缓冲的通道控制同时运行的
goroutine
数量。
关键点
- 任务结构体设计:清晰定义任务所需参数,方便传递和处理。
- 同步机制:
sync.WaitGroup
用于等待所有goroutine
完成任务,确保程序在所有任务处理完毕后退出。sync.Mutex
用于保护可能产生数据竞争的共享资源。
- 并发控制:通过带缓冲的通道限制同时运行的
goroutine
数量,避免资源耗尽。例如:semaphore := make(chan struct{}, maxConcurrent)
,在启动goroutine
前semaphore <- struct{}{}
获取信号量,goroutine
结束时<-semaphore
释放信号量。
示例代码
package main
import (
"fmt"
"sync"
)
// 定义任务结构体
type Task struct {
filePath string
// 其他处理操作相关字段
}
func processTask(task Task, wg *sync.WaitGroup, semaphore chan struct{}) {
defer wg.Done()
<-semaphore
defer func() { semaphore <- struct{}{} }()
// 实际的大文件处理操作
fmt.Printf("Processing file: %s\n", task.filePath)
}
func main() {
tasks := []Task{
{filePath: "file1.txt"},
{filePath: "file2.txt"},
// 更多任务
}
var wg sync.WaitGroup
maxConcurrent := 3 // 最大并发数
semaphore := make(chan struct{}, maxConcurrent)
for _, task := range tasks {
wg.Add(1)
go processTask(task, &wg, semaphore)
}
wg.Wait()
}