MST

星途 面试题库

面试题:Go信号量在复杂业务场景下的并发限制应用

假设你正在开发一个爬虫程序,需要爬取大量网页。为避免对目标服务器造成过大压力,要求同时并发的爬取任务数不超过10个。请结合Go的信号量机制,设计一个完整的、可复用的并发控制模块,包括任务的排队、执行以及出错处理等功能,并说明你的设计思路和关键代码实现。
32.8万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 信号量机制:使用sync.Mutexsync.Cond实现信号量,控制同时运行的任务数量。
  2. 任务排队:使用一个通道taskQueue来存储待执行的任务。
  3. 任务执行:启动固定数量(不超过10个)的goroutine从任务队列中取出任务并执行。
  4. 出错处理:在每个任务执行函数中捕获可能的错误,并通过返回值或全局错误处理机制进行处理。

关键代码实现

package main

import (
    "fmt"
    "sync"
)

// 定义信号量结构体
type Semaphore struct {
    count int
    mutex sync.Mutex
    cond  sync.Cond
}

// 创建新的信号量
func NewSemaphore(count int) *Semaphore {
    sem := &Semaphore{
        count: count,
    }
    sem.cond.L = &sem.mutex
    return sem
}

// 获取信号量
func (s *Semaphore) Acquire() {
    s.mutex.Lock()
    for s.count <= 0 {
        s.cond.Wait()
    }
    s.count--
    s.mutex.Unlock()
}

// 释放信号量
func (s *Semaphore) Release() {
    s.mutex.Lock()
    s.count++
    s.cond.Broadcast()
    s.mutex.Unlock()
}

// 定义任务类型
type Task func() error

// 并发控制模块
type TaskScheduler struct {
    semaphore *Semaphore
    taskQueue chan Task
    wg        sync.WaitGroup
}

// 创建新的任务调度器
func NewTaskScheduler(maxConcurrent int, queueSize int) *TaskScheduler {
    return &TaskScheduler{
        semaphore: NewSemaphore(maxConcurrent),
        taskQueue: make(chan Task, queueSize),
    }
}

// 启动任务调度器
func (ts *TaskScheduler) Start() {
    for i := 0; i < cap(ts.taskQueue); i++ {
        ts.wg.Add(1)
        go func() {
            defer ts.wg.Done()
            for task := range ts.taskQueue {
                ts.semaphore.Acquire()
                err := task()
                ts.semaphore.Release()
                if err != nil {
                    fmt.Printf("Task failed: %v\n", err)
                }
            }
        }()
    }
}

// 提交任务
func (ts *TaskScheduler) Submit(task Task) {
    ts.taskQueue <- task
}

// 等待所有任务完成
func (ts *TaskScheduler) Wait() {
    close(ts.taskQueue)
    ts.wg.Wait()
}

使用示例

func main() {
    scheduler := NewTaskScheduler(10, 100)
    scheduler.Start()

    for i := 0; i < 100; i++ {
        task := func() error {
            // 模拟爬取任务
            fmt.Printf("Task %d is running\n", i)
            return nil
        }
        scheduler.Submit(task)
    }

    scheduler.Wait()
}

在上述代码中:

  1. Semaphore结构体实现了信号量机制,Acquire方法用于获取信号量,Release方法用于释放信号量。
  2. TaskScheduler结构体包含信号量和任务队列,Start方法启动固定数量的goroutine执行任务,Submit方法用于提交任务,Wait方法用于等待所有任务完成。
  3. main函数中,创建了一个任务调度器,启动后提交100个模拟任务,最后等待所有任务完成。