1. 数据结构设计
任务结构体
type Task struct {
ID string
Func func() error
RetryCount int
MaxRetries int
ErrorType string
ErrorMsg string
Status string // 如 "pending", "running", "success", "failed"
}
任务队列
type TaskQueue struct {
tasks []Task
}
错误日志结构体
type ErrorLog struct {
TaskID string
ErrorType string
ErrorMsg string
Timestamp string
}
2. 算法设计
任务调度算法
- 从任务队列中取出任务。
- 执行任务,如果任务成功,标记任务状态为“success”并从队列移除。
- 如果任务失败,记录错误日志,根据错误类型判断是否重试:
- 如果重试次数未达到最大重试次数,增加重试次数,重新将任务放入队列。
- 如果达到最大重试次数,标记任务状态为“failed”。
系统重启恢复算法
- 系统启动时,从持久化存储(如数据库)中读取所有任务的状态和重试信息。
- 将处于“pending”或“failed”状态的任务重新加入任务队列。
3. 关键代码逻辑
任务执行与重试
func executeTask(task *Task) {
err := task.Func()
if err != nil {
task.RetryCount++
logError(task.ID, err)
if task.RetryCount <= task.MaxRetries {
task.Status = "pending"
addTaskToQueue(task)
} else {
task.Status = "failed"
}
} else {
task.Status = "success"
}
}
错误日志记录
func logError(taskID string, err error) {
var errorType string
// 根据具体错误类型判断
if _, ok := err.(net.Error); ok {
errorType = "network_error"
} else if _, ok := err.(resourceError); ok {
errorType = "resource_error"
}
log := ErrorLog{
TaskID: taskID,
ErrorType: errorType,
ErrorMsg: err.Error(),
Timestamp: time.Now().Format(time.RFC3339),
}
// 这里可写入文件、数据库等持久化存储
fmt.Println(log)
}
任务队列操作
func addTaskToQueue(task *Task) {
queue.tasks = append(queue.tasks, *task)
}
func getTaskFromQueue() *Task {
if len(queue.tasks) == 0 {
return nil
}
task := queue.tasks[0]
queue.tasks = queue.tasks[1:]
return &task
}
系统重启恢复
func recoverTasks() {
// 从数据库读取任务
var tasks []Task
// 假设这里有从数据库查询任务的代码,将结果填充到tasks
for _, task := range tasks {
if task.Status == "pending" || task.Status == "failed" {
addTaskToQueue(&task)
}
}
}