MST

星途 面试题库

面试题:Go中基于channel封装复杂并发任务调度逻辑

假设你需要开发一个任务调度系统,有多种不同类型的任务(用不同结构体表示),任务需要并发执行,并且执行顺序要按照一定规则(比如优先级)。请使用Go语言的channel来封装这个并发逻辑,详细描述设计思路并给出关键代码片段,同时说明如何处理任务执行过程中的错误。
21.2万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 任务结构体定义:定义不同类型任务的结构体,在结构体中添加优先级字段用于任务排序。
  2. 任务队列:使用一个优先级队列(可以借助container/heap包实现)来存储任务,这样能保证按照优先级顺序取出任务。
  3. 调度器:调度器从优先级队列中取出任务,通过channel将任务发送给工作协程执行。
  4. 工作协程:多个工作协程从channel接收任务并执行,执行完成后向另一个channel发送执行结果(包括错误信息)。
  5. 错误处理:在调度器中监听结果channel,当接收到错误时,进行相应的错误处理,比如记录日志、重试任务等。

关键代码片段

package main

import (
    "container/heap"
    "fmt"
)

// 定义任务结构体
type Task struct {
    ID       int
    Priority int
    // 其他任务相关字段
}

// 定义优先级队列
type TaskQueue []*Task

func (pq TaskQueue) Len() int { return len(pq) }

func (pq TaskQueue) Less(i, j int) bool {
    // 按照优先级从高到低排序
    return pq[i].Priority > pq[j].Priority
}

func (pq TaskQueue) Swap(i, j int) {
    pq[i], pq[j] = pq[j], pq[i]
}

func (pq *TaskQueue) Push(x interface{}) {
    *pq = append(*pq, x.(*Task))
}

func (pq *TaskQueue) Pop() interface{} {
    old := *pq
    n := len(old)
    item := old[n - 1]
    *pq = old[0 : n - 1]
    return item
}

func main() {
    taskQueue := &TaskQueue{}
    heap.Init(taskQueue)

    // 向任务队列添加任务
    task1 := &Task{ID: 1, Priority: 3}
    task2 := &Task{ID: 2, Priority: 1}
    task3 := &Task{ID: 3, Priority: 2}
    heap.Push(taskQueue, task1)
    heap.Push(taskQueue, task2)
    heap.Push(taskQueue, task3)

    // 定义任务执行通道和结果通道
    taskCh := make(chan *Task)
    resultCh := make(chan error)

    // 启动工作协程
    numWorkers := 3
    for i := 0; i < numWorkers; i++ {
        go worker(taskCh, resultCh)
    }

    // 调度任务
    go func() {
        for taskQueue.Len() > 0 {
            task := heap.Pop(taskQueue).(*Task)
            taskCh <- task
        }
        close(taskCh)
    }()

    // 处理任务执行结果
    for i := 0; i < numWorkers; i++ {
        err := <-resultCh
        if err != nil {
            fmt.Printf("Task execution error: %v\n", err)
        }
    }
    close(resultCh)
}

func worker(taskCh <-chan *Task, resultCh chan<- error) {
    for task := range taskCh {
        // 模拟任务执行
        err := executeTask(task)
        resultCh <- err
    }
}

func executeTask(task *Task) error {
    // 实际任务执行逻辑,这里简单返回nil
    return nil
}

错误处理

  1. 任务执行函数:在executeTask函数中,在任务执行失败时返回错误信息,如return fmt.Errorf("task %d failed", task.ID)
  2. 结果通道监听:在调度器(main函数中的for i := 0; i < numWorkers; i++循环部分)通过resultCh接收错误信息,根据具体业务需求进行处理,如记录日志、重试任务。
  3. 重试逻辑:如果需要重试任务,可以在接收到错误后,将任务重新添加到优先级队列taskQueue中,然后重新调度执行。例如:
for i := 0; i < numWorkers; i++ {
    err := <-resultCh
    if err != nil {
        fmt.Printf("Task execution error: %v\n", err)
        // 重新添加任务到队列
        heap.Push(taskQueue, &Task{ID: 1, Priority: 3})
    }
}