MST

星途 面试题库

面试题:Go 高阶函数在并发场景中的应用

假设你有一个任务列表,每个任务是一个函数,这些函数执行一些耗时的 I/O 操作(例如模拟网络请求或文件读取)。使用 Go 的高阶函数和 goroutine 实现一个调度器,能够并发执行这些任务,并在所有任务完成后返回结果。你需要处理可能出现的错误,并展示如何合理地控制并发数,避免资源耗尽。请详细解释你的设计思路和代码逻辑。
42.9万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 任务定义:将每个 I/O 操作封装成一个函数,这些函数返回操作结果和可能的错误。
  2. 并发控制:使用 channel 来限制并发数。我们创建一个带有固定缓冲区大小的 channel,这个缓冲区大小决定了可以同时执行的最大任务数。
  3. Goroutine 调度:为每个任务启动一个 goroutine 执行,将任务结果发送到一个结果 channel 中。
  4. 错误处理:在每个任务执行过程中捕获错误,并将错误与结果一起发送到结果 channel。
  5. 等待所有任务完成:使用 sync.WaitGroup 来等待所有 goroutine 完成任务。

代码逻辑

package main

import (
    "fmt"
    "sync"
)

// 定义任务函数类型
type Task func() (interface{}, error)

// 调度器函数
func scheduler(tasks []Task, concurrency int) ([]interface{}, []error) {
    var wg sync.WaitGroup
    resultChan := make(chan interface{}, len(tasks))
    errorChan := make(chan error, len(tasks))
    semaphore := make(chan struct{}, concurrency)

    // 启动每个任务
    for _, task := range tasks {
        semaphore <- struct{}{} // 获取信号量
        wg.Add(1)
        go func(t Task) {
            defer func() {
                <-semaphore // 释放信号量
                wg.Done()
            }()
            result, err := t()
            if err != nil {
                errorChan <- err
            } else {
                resultChan <- result
            }
        }(task)
    }

    // 等待所有任务完成
    go func() {
        wg.Wait()
        close(resultChan)
        close(errorChan)
    }()

    var results []interface{}
    var errors []error

    // 收集结果和错误
    for i := 0; i < len(tasks); i++ {
        select {
        case result := <-resultChan:
            results = append(results, result)
        case err := <-errorChan:
            errors = append(errors, err)
        }
    }

    return results, errors
}

使用示例

func main() {
    // 模拟任务
    task1 := func() (interface{}, error) {
        // 模拟网络请求或文件读取
        fmt.Println("Task 1 started")
        return "Result of Task 1", nil
    }
    task2 := func() (interface{}, error) {
        // 模拟网络请求或文件读取
        fmt.Println("Task 2 started")
        return "Result of Task 2", nil
    }
    task3 := func() (interface{}, error) {
        // 模拟网络请求或文件读取
        fmt.Println("Task 3 started")
        return nil, fmt.Errorf("Task 3 failed")
    }

    tasks := []Task{task1, task2, task3}
    results, errors := scheduler(tasks, 2)

    fmt.Println("Results:", results)
    fmt.Println("Errors:", errors)
}

代码解释

  1. Task 类型:定义了任务函数的类型,每个任务函数返回一个接口类型的结果和可能的错误。
  2. scheduler 函数
    • tasks 是要执行的任务列表,concurrency 是最大并发数。
    • sync.WaitGroup 用于等待所有任务完成。
    • resultChanerrorChan 分别用于接收任务的结果和错误。
    • semaphore 是一个带缓冲区的 channel,用于控制并发数。
    • 遍历任务列表,为每个任务启动一个 goroutine,获取信号量后执行任务,完成后释放信号量。
    • 使用 select 语句从 resultChanerrorChan 中收集结果和错误。
  3. main 函数:创建一些模拟任务并调用 scheduler 函数,最后打印结果和错误。