MST

星途 面试题库

面试题:Go扇入扇出模式复杂场景下的代码复用优化

假设你有多个不同类型的数据源(如数据库查询、文件读取、网络请求),需要将这些数据源的数据通过扇入模式聚合,然后再通过扇出模式分发到不同的处理模块。请设计并实现一个Go程序,着重展示如何复用代码以适应这种复杂场景,包括如何封装数据源读取逻辑、数据处理逻辑以及分发逻辑。
40.2万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试
package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "sync"
)

// 定义数据源读取函数类型
type DataSourceReader func() (interface{}, error)

// 封装数据库查询逻辑
func databaseQuery() (interface{}, error) {
    // 实际实现中执行数据库查询
    return "Database data", nil
}

// 封装文件读取逻辑
func fileRead() (interface{}, error) {
    data, err := ioutil.ReadFile("test.txt")
    if err != nil {
        return nil, err
    }
    return string(data), nil
}

// 封装网络请求逻辑
func networkRequest() (interface{}, error) {
    resp, err := http.Get("http://example.com")
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    data, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }
    return string(data), nil
}

// 扇入模式,聚合多个数据源的数据
func fanIn(dataSources []DataSourceReader) <-chan interface{} {
    var wg sync.WaitGroup
    out := make(chan interface{})

    // 为每个数据源启动一个goroutine
    for _, src := range dataSources {
        wg.Add(1)
        go func(reader DataSourceReader) {
            defer wg.Done()
            data, err := reader()
            if err != nil {
                fmt.Println("Error reading data:", err)
                return
            }
            out <- data
        }(src)
    }

    // 所有数据源读取完成后关闭通道
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

// 定义数据处理函数类型
type DataProcessor func(interface{}) interface{}

// 数据处理逻辑1
func processor1(data interface{}) interface{} {
    return "Processed by processor1: " + data.(string)
}

// 数据处理逻辑2
func processor2(data interface{}) interface{} {
    return "Processed by processor2: " + data.(string)
}

// 扇出模式,分发数据到不同处理模块
func fanOut(in <-chan interface{}, processors []DataProcessor) []<-chan interface{} {
    var out []<-chan interface{}
    for _, proc := range processors {
        c := make(chan interface{})
        go func(p DataProcessor) {
            for data := range in {
                processed := p(data)
                c <- processed
            }
            close(c)
        }(proc)
        out = append(out, c)
    }
    return out
}

func main() {
    // 定义数据源
    dataSources := []DataSourceReader{databaseQuery, fileRead, networkRequest}

    // 扇入数据源数据
    mergedData := fanIn(dataSources)

    // 定义数据处理逻辑
    processors := []DataProcessor{processor1, processor2}

    // 扇出数据到不同处理模块
    processedData := fanOut(mergedData, processors)

    // 输出处理结果
    for _, c := range processedData {
        for result := range c {
            fmt.Println(result)
        }
    }
}

上述代码实现了:

  1. 封装数据源读取逻辑:通过定义 DataSourceReader 函数类型,将数据库查询、文件读取、网络请求等数据源读取逻辑封装成独立的函数。
  2. 扇入模式fanIn 函数将多个数据源的数据聚合到一个通道中。它为每个数据源启动一个goroutine进行读取,并使用 sync.WaitGroup 确保所有读取完成后关闭通道。
  3. 封装数据处理逻辑:通过定义 DataProcessor 函数类型,将不同的数据处理逻辑封装成独立的函数。
  4. 扇出模式fanOut 函数将扇入的通道数据分发到多个处理模块对应的通道中,每个处理模块在独立的goroutine中执行处理逻辑。
  5. 复用性:通过函数类型和模块化设计,使得添加新的数据源读取逻辑、数据处理逻辑都很方便,提高了代码的复用性和可维护性。