# 用 Golang 从0到1实现一个高性能的 Worker Pool(一) - 每天5分钟玩转 GPT 编程系列(3)


## 1. 概述

> 2023年7月26日注：GoPool 项目已经重写，本文内容过期。请移步我的个人博客网站 danielhu.cn 查看新版对应文章[《仅三天，我用 GPT-4 生成了性能全网第一的 Golang Worker Pool，轻松打败 GitHub 万星项目》](https://www.danielhu.cn/golang-gopool-1/)。

话说，早几天一口气过了一遍 Golang 的并发机制：

- [跟着 GPT-4 从0到1学习 Golang 并发机制(一)](https://www.danielhu.cn/from_0_to_1_go_concurrency_with_gpt_1/)
- [跟着 GPT-4 从0到1学习 Golang 并发机制(二)](https://www.danielhu.cn/from_0_to_1_go_concurrency_with_gpt_1/)
- [跟着 GPT-4 从0到1学习 Golang 并发机制(三)](https://www.danielhu.cn/from_0_to_1_go_concurrency_with_gpt_1/)

这些机制是使用 Golang 并发编程的基础知识，但是要使用 Golang 编写出高效、易维护、高质量的代码，我们可以使用一些通用的“并发模式”。在 Golang 并发编程中大家经常用到的一些模式主要有：

- Worker-Pool 模式
- Producer-Consumer 模式
- Pipeline 模式
- Event-Driven 模式
- Reactor 模式
- Futures and Promises 模式
- ……

从 **Worker-Pool 模式**开始聊吧，Worker-Pool 模式是一种常见的并发设计模式，主要用于控制并发任务的数量，提高系统性能，以及更有效地管理系统资源。

如果在 GitHub 上搜索 Worker Pool 的开源实现，你会看到结果琳琅满目，五花八门，各有千秋，眼花缭乱。咱干脆试试看能不能让 GPT-4 来实现一个 Worker Pool，然后对比下 Star 数较高的开源实现，看下 GPT-4 “信手拈来”的 Worker Pool 能不能在**功能集、易用性、性能**上相比各种千星万星 GitHub 开源流行款更胜一筹。

> 下文通过 DevChat 和 GPT-4 交互，如果你对 DevChat 感兴趣，可以跳转下文浏览：
> - [《DevChat：将 GPT-4 无缝融入 VS Code，极致提升你的编程体验》](https://zh.devchat.blog/devchat-gpt4-vscode)

## 2. 设计

下面先来明确这个程序需要实现哪些功能，怎么实现。

### 2.1 让 GPT-4 给出功能点

先问下 DevChat 我应该实现哪些功能：

![](1.png)

- **DevChat 的回答文字版：**

> 在Golang中实现一个高性能、功能丰富且简单易用的Worker Pool，你可能需要考虑以下功能点：
> 
> 1. **任务队列**：你需要一个队列来存储待处理的任务。这个队列应该是线程安全的，以便多个goroutine可以并发地向队列中添加任务和从队列中取出任务。
> 
> 2. **工作线程**：你需要一组工作线程（goroutine）来并发地执行任务。这些工作线程应该可以从任务队列中取出任务> 并执行。
> 
> 3. **线程池大小控制**：你需要一种机制来控制线程池的大小，即同时运行的工作线程的数量。这可以帮助你控制并发级别，防止过度消耗系统资源。
> 
> 4. **优雅的关闭**：你需要一种机制来优雅地关闭线程池，即在关闭线程池之前，确保所有已经开始的任务都已经完成。
> 
> 5. **错误处理**：你需要一种机制来处理任务执行过程中的错误。这可能包括记录错误日志，或者将错误返回给任务的提交者。
> 
> 6. **任务超时控制**：对于可能长时间运行的任务，你可能需要一种机制来控制任务的执行时间，如果任务运行时间超过了预设的超时时间，那么应该中止任务的执行。
> 
> 7. **任务优先级**：如果你的应用场景中有不同优先级的任务，你可能需要一种机制来控制任务的执行顺序，确保高优先级的任务先于低优先级的任务执行。
> 
> 8. **任务结果获取**：对于需要返回结果的任务，你需要一种机制来获取任务的执行结果。
> 
> 以上就是实现一个Worker Pool可能需要考虑的功能点，具体的功能点可能会根据你的具体需求和应用场景有所不同。

酷，看起来清新脱俗，“花里胡哨”。目测完全实现这些功能涉及到的代码量会超出目前 GPT-4 的上下文限制，外加细节太多，直接让 GPT-4 上手写代码大概率拿不到满意的结果，还是继续只把 AI 当一个“编程助手”吧。

### 2.2 自己总结需求，再给 GPT 派活

**1. 给项目起一个名字（起名字是最难的，纠结死人）**

就叫 GoPool 吧，对应代码库就叫 [gopool](https://github.com/devchat-ai/gopool)，放到 GitHub 上开源出去。

**2. 用代码的方式总结需求**

整个程序涉及到的几个核心对象是 **Task、Worker 和 Pool**。Pool 中需要维护一个 workers 栈（或者队列，不过栈更易引入“Worker 过期”等逻辑）；Task 可以先用一个最简单的函数类型；tasks 似乎由 Pool 来维护会导致过多的 workers（goroutines）上下文切换，影响性能…… 不行，细节太多了，文字不足以描述我想在第一个版本中包含哪些功能（肯定不是上来就实现一个GA版本），或者我先写几行对象、接口定义，然后让 GPT 在此基础上逐步实现相关逻辑吧。

于是，我写了[这些代码](https://github.com/devchat-ai/gopool/commit/b30bc386ebba85c45b69c9bce5bea518d875a677)，有效行数不到50，看起来却金光闪闪有没有？

- `pool.go`

```go
type Pool struct {
 	// capacity represents the maximum number of workers the pool can have
 	capacity int
 	// running represents the number of workers that are currently running
 	running int
 	// workers represents a pool of workers that are executing tasks
 	workers WorkerStack
 	// lock represents a mutex to protect the workers slice
 	lock sync.Mutex
 }

 func (p *Pool) Submit(task Task) {
 	// TODO
 }

 func (p *Pool) Running() int {
 	return p.running
 }

 func (p *Pool) Cap() int {
 	return p.capacity
 }

 func (p *Pool) Release() {
 	// TODO
 }
```

- `worker.go`

```go
type Task func()

type Worker struct {
	// pool represents a reference to the pool that owns this worker
	pool *Pool
	// task represents the task that the worker will execute
	task chan Task
	// lastUsedTime represents the last time the worker was used
	lastUsedTime time.Time
}
type WorkerStack interface {
	Push(w *Worker)
	Pop() *Worker
	Len() int
}
type workerStack struct {
	workers []*Worker
}
```

这个骨架看着还算清晰吧，其实一开始只想定义几个结构体和方法，但是结构体名字一出来，GitHub Copilot 就开始自动补全相应属性和方法，包括注释了（没错，我写的为数不多的几十行代码里还有一大半是 Copilot 补全的）。于是结合 Copilot 的补全，加上部分的取舍，最终就有了这个初始代码段。

## 3. 实现

关门，放 GPT-4！

### 3.1 你先随意发挥

- **我对 GPT 说：**

> 我要用 Golang 实现一个 worker pool 程序，项目名叫做 GoPool，代码库是 gopool。在 pool.go 和 worker.go 中我已经放了一些相关代码，请在此基础上实现 GoPool 程序。

这里需要基于选中代码/文件来聊天，DevChat 的这个小功能还是很酷：

![](2.png)

- **GPT 回答：**

> 首先，我们需要理解需求和已有代码：
> 
> - 用户需要在 Golang 中实现一个名为 GoPool 的 worker pool 程序。
> - 项目的代码库名为 gopool，已有的代码文件包括 pool.go 和 worker.go。
> - pool.go 文件中定义了 Pool 结构体，包含了一些字段如 capacity（池的最大工作数）、running（当前运行的工作数）、workers（执行任务的工作池）和 lock（保护 workers 切片的互斥锁）。此外，还定义了一些方法，如 Submit（提交任务）、Running（获取当前运行的工作数）、Cap（获取池的最大工作数）和 Release（释放池）。
> - worker.go 文件中定义了 Task 类型（一个函数类型）、Worker 结构体（包含了一些字段如 pool（拥有此工作的池的引用）、task（工作将执行的任务）和 lastUsedTime（工作最后使用的时间））和 WorkerStack 接口（包含了一些方法如 Push（推入工作）、Pop（弹出工作）和 Len（获取工作数））。
> 
> 基于以上信息，我们需要实现以下功能：
> 
> 1. 在 Pool 结构体的 Submit 方法中，实现任务的提交。
> 2. 在 Pool 结构体的 Release 方法中，实现池的释放。
> 3. 实现 WorkerStack 接口的方法。
> 
> 下面是实现代码：
> 
> ```go
> // pool.go
> package gopool
> 
> import (
> 	"sync"
> )
> 
> type Pool struct {
> 	capacity int
> 	running  int
> 	workers  WorkerStack
> 	lock     sync.Mutex
> }
> 
> func (p *Pool) Submit(task Task) {
> 	p.lock.Lock()
> 	defer p.lock.Unlock()
> 
> 	if p.running < p.capacity {
> 		worker := p.workers.Pop()
> 		if worker == nil {
> 			worker = &Worker{
> 				pool: p,
> 				task: make(chan Task),
> 			}
> 		}
> 		worker.task <- task
> 		p.running++
> 	} else {
> 		// TODO: handle the case when the pool is full
> 	}
> }
> 
> func (p *Pool) Running() int {
> 	return p.running
> }
> 
> func (p *Pool) Cap() int {
> 	return p.capacity
> }
> 
> func (p *Pool) Release() {
> 	p.lock.Lock()
> 	defer p.lock.Unlock()
> 
> 	for p.workers.Len() > 0 {
> 		worker := p.workers.Pop()
> 		close(worker.task)
> 	}
> 	p.running = 0
> }
> ```
> 
> ```go
> // worker.go
> package gopool
> 
> import (
> 	"time"
> )
> 
> type Task func()
> 
> type Worker struct {
> 	pool         *Pool
> 	task         chan Task
> 	lastUsedTime time.Time
> }
> 
> type WorkerStack interface {
> 	Push(w *Worker)
> 	Pop() *Worker
> 	Len() int
> }
> 
> type workerStack struct {
> 	workers []*Worker
> }
> 
> func (ws *workerStack) Push(w *Worker) {
> 	ws.workers = append(ws.workers, w)
> }
> 
> func (ws *workerStack) Pop() *Worker {
> 	if len(ws.workers) == 0 {
> 		return nil
> 	}
> 	w := ws.workers[len(ws.workers)-1]
> 	ws.workers = ws.workers[:len(ws.workers)-1]
> 	return w
> }
> 
> func (ws *workerStack) Len() int {
> 	return len(ws.workers)
> }
> ```
> 
> 注意：在 Submit 方法中，当池已满时，我们需要处理这种情况。这里我没有实现，因为这取决于你想如何处理这种情况（例如，你可能想要阻塞直到有可用的工作，或者你可能想要返回一个错误）。

不管三七二十一，先把代码替换到我的源文件中再说。（不过这里实际操作的时候还是要注意，如下图所示，多数时候“插入”是最有用的；有时候“替换全文件”也不错；写到后面其实先看 Diff，然后选择性接受 GPT 的代码还是非常实用）

![](3.png)

> 这一轮聊天的成果是[这个 Commit](https://github.com/devchat-ai/gopool/commit/8921e1d9af408a350fbcdff70ea8fb91a59590ac)。

### 3.2 你得让 Worker 跑起来呀

- **我对 GPT 说：**

> 看来 Worker 还需要一个 run() 方法用来异步执行 task。当这个 task 执行完成后，Worker 应该被放回 workStack。在 Submit() 方法中将 task 给到 worker 后，顺带调用一下这个 run() 方法吧。请继续编码。

GPT 给出了加上 `run()` 方法后的完整源文件，贴出来有点长，直接看增加的 `run()` 部分吧：

```go
func (w *Worker) run() {
 	go func() {
 		for task := range w.task {
 			if task == nil {
 				return
 			}
 			task()
 			w.pool.lock.Lock()
 			w.pool.workers.Push(w)
 			w.pool.running--
 			w.pool.lock.Unlock()
 		}
 	}()
 }
```

> 这一轮聊天的成果是[这个 Commit](https://github.com/devchat-ai/gopool/commit/aa152680ba745ccdc85e606d347684587f463b9b)。

### 3.3 你说说 Pool 满了怎么办

前面 GPT 给的代码中 Summit() 方法里有一段 TODO：

```go
func (p *Pool) Submit(task Task) {
	// ……
    else {
		// TODO: handle the case when the pool is full
	}
}
```

满了怎么办？挺好办的，满了就等呀，空闲了再继续。于是，我想到了条件变量 `sync.Cond`。那就不拐弯抹角，让 GPT 自由发挥了，给点明确的提示吧。

- **我对 GPT 说：**

> 下面继续来实现 handle the case when the pool is full 相关逻辑吧。这里可以引入 sync.Cond 来实现，当 pool full 的时候就 Wait()；同时 Worker 的 run() 方法中当 Worker 被放回到 Pool 中时，说明又有空闲的 Worker 可用了，这时候通过 Signal() 来唤醒 Wait() 的 goroutine。

这时候上下文越来越长了，GPT 的回答贴出来有点碍眼。直接看结果吧：

> 这一轮聊天的成果是[这个 Commit](https://github.com/devchat-ai/gopool/commit/678a72b321924305249eb98d9d68bd460fa5b64f)。

这时候 `Submit()` 方法就变成了这样：

```go
func (p *Pool) Submit(task Task) {
 	p.lock.Lock()
 	defer p.lock.Unlock()

 	for p.running == p.capacity {
 		p.cond.Wait()
 	}

 	worker := p.workers.Pop()
 	if worker == nil {
 		worker = &Worker{
 			pool: p,
 			task: make(chan Task),
 		}
 	}
 	worker.task <- task
 	worker.run()
 	p.running++
 }
```

看起来基本功能差不多了，继续添加 UT 吧。

### 3.4 你得把 UT 给我加上

- **我对 GPT 说：**

> 接下来我想添加一个 pool_test.go 来放单元测试测一下 Submit() 的功能，请给出测试代码。

- **GPT 回答：**

> 为了测试 `Submit()` 方法，我们可以创建一个 `Pool` 对象，然后提交一些任务，检查是否所有的任务都被正确地执行了。以下是测试代码：
> 
> ```go
> // pool_test.go
> package gopool
> 
> import (
> 	"sync"
> 	"testing"
> )
> 
> func TestSubmit(t *testing.T) {
> 	var wg sync.WaitGroup
> 	p := NewPool(10)
> 
> 	for i := 0; i < 20; i++ {
> 		wg.Add(1)
> 		task := func() {
> 			defer wg.Done()
> 		}
> 		p.Submit(task)
> 	}
> 
> 	wg.Wait()
> 
> 	if p.Running() != 0 {
> 		t.Errorf("Expected running workers to be 0, but got %d", p.Running())
> 	}
> }
> ```
> 
> 在这个测试中，我们首先创建了一个新的 `Pool` 对象，然后提交了 20 个任务。每个任务都会在完成时调用 `wg.Done()`。我们使用 `sync.WaitGroup` 来确保所有的任务都被执行了。最后，我们检查 `Running()` 方法的返回值是否为 0，如果不是，那么说明有一些任务没有被执行，测试就失败了。

我决定手动加上一条日志，把 for 循环改成这样：

```go
	for i := 0; i < 20; i++ {
		wg.Add(1)
		taskNum := i // Capture the task number
		task := func() {
			t.Logf("Executing task %d\n", taskNum)
			defer wg.Done()
		}
		p.Submit(task)
	}
```

这时候运行下单元测试，惨惨惨，死锁了。不过这时候我也不想去考 GPT 了，看到死锁我已经第一时间想到了 Channel 阻塞问题，也就是这段代码：

```go
func (p *Pool) Submit(task Task) {
	// ……
	if worker == nil {
		worker = &Worker{
			pool: p,
			task: make(chan Task),
		}
	}
    worker.task <- task
	// ……
}
```

这里给 `task` 赋值了一个 `make(chan Task)`，导致下面这行 `worker.task <- task` 直接阻塞了。随手改一下吧：

```go
func (p *Pool) Submit(task Task) {
	// ……
	if worker == nil {
		worker = &Worker{
			pool: p,
			task: make(chan Task, 1),
		}
	}
    worker.task <- task
	// ……
}
```

没错，把 `task: make(chan Task)` 改成 `task: make(chan Task, 1)` 就可以了。

运行一下吧：

```bash
$ go test . -v
=== RUN   TestSubmit
    pool_test.go:16: Executing task 9
    pool_test.go:16: Executing task 7
    pool_test.go:16: Executing task 8
    pool_test.go:16: Executing task 10
    pool_test.go:16: Executing task 11
    pool_test.go:16: Executing task 12
    pool_test.go:16: Executing task 13
    pool_test.go:16: Executing task 14
    pool_test.go:16: Executing task 15
    pool_test.go:16: Executing task 16
    pool_test.go:16: Executing task 17
    pool_test.go:16: Executing task 18
    pool_test.go:16: Executing task 19
    pool_test.go:16: Executing task 1
    pool_test.go:16: Executing task 0
    pool_test.go:16: Executing task 2
    pool_test.go:16: Executing task 4
    pool_test.go:16: Executing task 3
    pool_test.go:16: Executing task 6
    pool_test.go:16: Executing task 5
--- PASS: TestSubmit (0.00s)
PASS
ok      github.com/devchat-ai/gopool    0.753s
```

> 这一轮聊天的成果是[这个 Commit](https://github.com/devchat-ai/gopool/commit/60884278e8e5077b14cab4319a3c5de43b60b528)。

### 3.5 你再给我把文档补上

- **我对 GPT 说：**

> 我已经初步实现了 GoPool 程序，请帮我完善 README.md 文件内容。

今天最后一轮聊天了，我决定把所有文件内容都丢给 GPT，让它自由发挥一下。

![](4.png)

> 这一轮聊天的成果是[这个 Commit](https://github.com/devchat-ai/gopool/commit/ae16928941f490830243c34f75f73a03cb77f807)。

## 4. 总结

经过一顿瞎聊，最终 GPT-4 写了差不多200行，我写了20行，Copilot 写了20行，加一起，完成了一个初级的 Golang worker pool 程序 [GoPool](https://github.com/devchat-ai/gopool)。

> 本文发布时对应的代码版本看[这里](https://github.com/devchat-ai/gopool/tree/ae16928941f490830243c34f75f73a03cb77f807)。

接下来几天我准备继续和 GPT-4 瞎聊，让它帮着把一个 worker pool 该有的功能都加上，并且不断优化性能，看能不能打造一款炫酷的“开源 Golang Goroutine/Worker Pool 库”。

当然，大伙有啥好想法，需求，或者 bug 反馈，欢迎直接提到 [GitHub Issues](https://github.com/devchat-ai/gopool/issues)

（关注不迷路，我的个人微信公众号：“胡说云原生”）

（关注不迷路，我的个人微信公众号：“胡说云原生”）

（关注不迷路，我的个人微信公众号：“胡说云原生”）

