面试题答案
一键面试设计思路
- 信号量机制:使用
sync.Mutex
和sync.Cond
实现信号量,控制同时运行的任务数量。 - 任务排队:使用一个通道
taskQueue
来存储待执行的任务。 - 任务执行:启动固定数量(不超过10个)的goroutine从任务队列中取出任务并执行。
- 出错处理:在每个任务执行函数中捕获可能的错误,并通过返回值或全局错误处理机制进行处理。
关键代码实现
package main
import (
"fmt"
"sync"
)
// 定义信号量结构体
type Semaphore struct {
count int
mutex sync.Mutex
cond sync.Cond
}
// 创建新的信号量
func NewSemaphore(count int) *Semaphore {
sem := &Semaphore{
count: count,
}
sem.cond.L = &sem.mutex
return sem
}
// 获取信号量
func (s *Semaphore) Acquire() {
s.mutex.Lock()
for s.count <= 0 {
s.cond.Wait()
}
s.count--
s.mutex.Unlock()
}
// 释放信号量
func (s *Semaphore) Release() {
s.mutex.Lock()
s.count++
s.cond.Broadcast()
s.mutex.Unlock()
}
// 定义任务类型
type Task func() error
// 并发控制模块
type TaskScheduler struct {
semaphore *Semaphore
taskQueue chan Task
wg sync.WaitGroup
}
// 创建新的任务调度器
func NewTaskScheduler(maxConcurrent int, queueSize int) *TaskScheduler {
return &TaskScheduler{
semaphore: NewSemaphore(maxConcurrent),
taskQueue: make(chan Task, queueSize),
}
}
// 启动任务调度器
func (ts *TaskScheduler) Start() {
for i := 0; i < cap(ts.taskQueue); i++ {
ts.wg.Add(1)
go func() {
defer ts.wg.Done()
for task := range ts.taskQueue {
ts.semaphore.Acquire()
err := task()
ts.semaphore.Release()
if err != nil {
fmt.Printf("Task failed: %v\n", err)
}
}
}()
}
}
// 提交任务
func (ts *TaskScheduler) Submit(task Task) {
ts.taskQueue <- task
}
// 等待所有任务完成
func (ts *TaskScheduler) Wait() {
close(ts.taskQueue)
ts.wg.Wait()
}
使用示例
func main() {
scheduler := NewTaskScheduler(10, 100)
scheduler.Start()
for i := 0; i < 100; i++ {
task := func() error {
// 模拟爬取任务
fmt.Printf("Task %d is running\n", i)
return nil
}
scheduler.Submit(task)
}
scheduler.Wait()
}
在上述代码中:
Semaphore
结构体实现了信号量机制,Acquire
方法用于获取信号量,Release
方法用于释放信号量。TaskScheduler
结构体包含信号量和任务队列,Start
方法启动固定数量的goroutine执行任务,Submit
方法用于提交任务,Wait
方法用于等待所有任务完成。- 在
main
函数中,创建了一个任务调度器,启动后提交100个模拟任务,最后等待所有任务完成。