原因
最开端是想学习下通用协程池的写法,想自己实现一下。
然后想到了最近很火的ChatGPT,看看能不能让它帮我写一个,或许写完之后我再拿去优化一下~
最终生成的代码可以直接划到文章结尾的最终版
动手
简略版
最开端我的提问很简略,便是期望它帮我生成一个Go的协程池,
它写完之后 ,本着不会百分百相信AI的准则,自己check一下
发现这个协程池其实是有几个问题的
- 超时操控:子func履行时刻过长的话是无法处理或感知到的
- 资源耗尽:池中的func没有限制,无法管理资源
简略版剖析
接着我想让ChatGPT帮我剖析这个协程池的危险
ChatGPT帮我剖析出了四个或许的危险
- 竞赛条件:在
Submit
函数中,咱们向使命通道中发送使命,而在worker
函数中,咱们从使命通道中读取使命。假如多个goroutine一起调用Submit
函数来提交使命,那么它们或许会抢夺向使命通道中发送使命的权限,然后导致竞赛条件。为了防止这种状况,咱们需求运用互斥锁或其他同步机制来维护使命通道的并发访问。 - 内存走漏:在
Submit
函数中,咱们创建了一个新的Task
目标,并将其添加到使命通道中。假如咱们没有在某个时刻清理这些Task
目标,那么它们或许会一向占用内存,然后导致内存走漏。为了防止这种状况,咱们需求在worker
函数中正确处理一切的使命,以确保它们在履行完成后可以被垃圾回收。
3.. 资源耗尽:在worker
函数中,咱们创建了一个新的goroutine来履行每个使命,并运用select
句子来等候使命完成或超时。假如咱们提交了很多的长期运转的使命,那么这些goroutine或许会占用很多的系统资源,然后导致资源耗尽。为了防止这种状况,咱们需求限制协程池的最大并发数,并且合理地设置使命的最长履行时刻,以防止过多的长期运转的使命一起履行。 - 使命超时:在
worker
函数中,咱们运用time.After
函数来设置使命的最长履行时刻,并在select
句子中等候使命完成或超时。假如咱们设置的最长履行时刻太短,那么或许会导致一些正常的使命被过错地标记为超时。假如咱们设置的最长履行时刻太长,那么或许会导致一些长期运转的使命占用过多的系统资源,然后影响系统的性能和稳定性。为了防止这种状况,咱们需求根据实际状况合理地设置使命的最长履行时刻,以确保使命可以在预期的时刻内完成。
改善
接着我让它帮我生成一个优化的版别
简略看了一下,它用了简略的生产-消费模型生成了一个支撑单Task过期时刻设置的协程池,单Task的过期时刻运用方式契合预期,可是它引入了其他问题
- 内存走漏:假如Task行列中积压了很多使命,会导致内存一向被占用,无法被释放
- 过期时刻:由于使命在每个worker中归于串行消费,或许导致前面的func履行耗时过长,后边的func还没被消费到 就已通过期了
因此我让它再帮我改善一下~
以下的代码其实通过屡次调试改善,因为我感觉ChatGPT有时候无法理解上下文
package goroutine_pool
import (
"context"
"fmt"
"sync"
"time"
)
// Task represents a function to be executed in the worker pool
type Task struct {
Handler func() error
Timeout time.Duration
}
// Pool represents a worker pool
type Pool struct {
size int // number of workers
taskQueue chan *Task // channel to store tasks
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
isClosed bool
workerCallback func(*Worker)
mutex sync.Mutex
}
// Worker represents a worker in the pool
type Worker struct {
id int // worker id
taskQueue chan *Task // channel to receive tasks from the pool
ctx context.Context
cancel context.CancelFunc
pool *Pool // pointer to the pool
}
// NewPool creates a new worker pool with the given size
func NewPool(size int) *Pool {
pool := &Pool{
size: size,
taskQueue: make(chan *Task, size*10),
isClosed: false,
}
pool.ctx, pool.cancel = context.WithCancel(context.Background())
return pool
}
// NewWorker creates a new worker and starts it
func NewWorker(id int, pool *Pool) *Worker {
ctx, cancel := context.WithCancel(pool.ctx)
worker := &Worker{
id: id,
taskQueue: make(chan *Task),
ctx: ctx,
cancel: cancel,
pool: pool,
}
go worker.Start()
return worker
}
// Start starts the worker
func (w *Worker) Start() {
defer w.pool.wg.Done()
for {
select {
case task := <-w.taskQueue:
if task == nil {
// channel closed
return
}
if task.Timeout > 0 {
_, cancel := context.WithTimeout(w.ctx, task.Timeout)
defer cancel()
w.pool.workerCallback(w)
err := task.Handler()
if err != nil {
fmt.Printf("worker %d: error executing task: %v\n", w.id, err)
}
} else {
w.pool.workerCallback(w)
err := task.Handler()
if err != nil {
fmt.Printf("worker %d: error executing task: %v\n", w.id, err)
}
}
case <-w.ctx.Done():
// worker is shutting down
return
}
}
}
// Submit adds a new task to the task queue
func (p *Pool) Submit(handler func() error, timeout time.Duration) error {
if p.isClosed {
return fmt.Errorf("pool is closed")
}
select {
case p.taskQueue <- &Task{Handler: handler, Timeout: timeout}:
return nil
default:
return fmt.Errorf("task queue is full")
}
}
// Close closes the worker pool
func (p *Pool) Close() {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.isClosed {
return
}
p.isClosed = true
close(p.taskQueue)
p.cancel()
p.wg.Wait()
}
// SetWorkerCallback sets the callback function to be called before and after a worker executes a task
func (p *Pool) SetWorkerCallback(callback func(*Worker)) {
p.workerCallback = callback
}
func main() {
pool := NewPool(5)
defer pool.Close()
pool.SetWorkerCallback(func(w *Worker) {
fmt.Printf("worker %d: start executing task\n", w.id)
})
for i := 0; i < 10; i++ {
j := i
err := pool.Submit(func() error {
fmt.Printf("task %d: start\n", j)
time.Sleep(time.Second)
fmt.Printf("task %d: end\n", j)
return nil
}, time.Second*2)
if err != nil {
fmt.Printf("error submitting task: %v\n", err)
}
}
}
改善版危险
我自己看了下代码,感觉现在这个版别现已足够了~可是不知道还有没有什么细节的危险,因此再问了ChatGPT,一起期望它能帮我生成单测代码来验证这个Pool的正确性
ChatGPT告诉我这个pool有三个危险:
- Task的Timeout是以阻塞方式处理的,假如task履行时刻过长,或许会影响整个协程池的性能。
- Submit函数里的select中没有default分支,当task行列满了之后,调用方会被阻塞,而没有任何反应。
- Submit函数并没有回来使命履行成果,无法判断使命是否履行成功。
可是我自己剖析了一下:
(1) 我觉得这个可以接受
(2) Submit函数里边其实有default分支的
(3) 这个是没办法防止的工作,或许说可以解决可是投入本钱太大~当前计划就现已可以接受了
最终版
我期望线程池最好仍是可以带上一定的可监控性,所以最终让ChatGPT帮我完善了一下之前的协程池~
package goroutine_pool
import (
"context"
"fmt"
"sync"
"time"
)
// 使命结构体
type Task struct {
Handler func(ctx context.Context) error // 使命履行的函数
Timeout time.Duration // 使命履行超时时刻
}
// 协程池结构体
type Pool struct {
capacity int // 协程池容量
workers []*Worker // 协程池中的协程
taskChan chan *Task // 使命通道
wg sync.WaitGroup // 等候一切协程履行完毕
}
// 协程结构体
type Worker struct {
id int // 协程id
taskChan chan *Task // 使命通道
stopChan chan struct{} // 中止协程的通道
running bool // 协程是否在运转中
terminate bool // 是否需求中止协程
}
// 创建协程池
func NewPool(capacity int) *Pool {
return &Pool{
capacity: capacity,
workers: make([]*Worker, capacity),
taskChan: make(chan *Task),
}
}
// 创建一个协程
func newWorker(id int, taskChan chan *Task) *Worker {
return &Worker{
id: id,
taskChan: taskChan,
stopChan: make(chan struct{}),
running: true,
terminate: false,
}
}
// 启动协程
func (w *Worker) start() {
go func() {
for {
select {
case task := <-w.taskChan:
// 假如使命为空,则退出协程
if task == nil {
w.running = false
w.terminate = true
return
}
ctx, cancel := context.WithTimeout(context.Background(), task.Timeout)
defer cancel()
err := task.Handler(ctx)
if err != nil {
// 处理使命履行失败的状况
}
case <-w.stopChan:
w.running = false
w.terminate = true
return
}
}
}()
}
// 中止协程
func (w *Worker) stop() {
w.stopChan <- struct{}{}
}
// 启动协程池
func (p *Pool) Start() {
for i := 0; i < p.capacity; i++ {
p.workers[i] = newWorker(i, p.taskChan)
p.workers[i].start()
}
}
// 中止协程池
func (p *Pool) Stop() {
for i := 0; i < p.capacity; i++ {
p.workers[i].stop()
}
for i := 0; i < p.capacity; i++ {
// 等候一切协程退出
if p.workers[i].running {
p.workers[i].stop()
}
}
close(p.taskChan)
p.wg.Wait()
}
// 提交使命到协程池
func (p *Pool) Submit(task *Task) {
p.wg.Add(1)
p.taskChan <- task
}
// 监控协程池
func (p *Pool) Monitor() {
for {
// 统计使命行列长度
queueLen := len(p.taskChan)
// 统计协程池中正在运转的协程数和需求退出的协程数
var runningCount, terminateCount int
for i := 0; i < p.capacity; i++ {
if p.workers[i].running {
runningCount++
}
if p.workers[i].terminate {
terminateCount++
}
}
// 打印监控信息
fmt.Printf("Queue length: %d, Running workers: %d, Terminating workers: %d\n", queueLen, runningCount, terminateCount)
time.Sleep(time.Second)
}
}
测验代码
自己简略跑了下功能测验和benchmark ,下面直接上代码
package goroutine_pool
import (
"context"
"fmt"
"log"
"net/http"
"sync"
"testing"
"time"
)
var (
pool = NewPool(10)
ctx = context.Background()
)
func init() {
pool.Start()
log.Println(http.ListenAndServe("localhost:8080", nil))
}
func TestPool(t *testing.T) {
testSubmit(ctx)
}
func testSubmit(ctx context.Context) {
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func(ii int) {
defer wg.Done()
task := &Task{
Handler: func(ctx context.Context) error {
err := handlePrintFunc(ctx, ii)
return err
},
Timeout: time.Second,
}
pool.Submit(task)
}(i)
}
wg.Wait()
pool.Stop()
go hang()
select {}
}
func handlePrintFunc(ctx context.Context, i int) (err error) {
fmt.Println(fmt.Sprintf("task:[%d] in", i))
time.Sleep(2 * time.Second)
fmt.Println(fmt.Sprintf("task:[%d] out", i))
return nil
}
func hang() {
time.Sleep(time.Hour)
}
func BenchmarkPool(b *testing.B) {
go hang()
b.ResetTimer()
for i := 0; i < b.N; i++ {
task := &Task{
Handler: func(ctx context.Context) error {
time.Sleep(time.Millisecond)
return nil
},
Timeout: time.Second,
}
pool.Submit(task)
}
pool.Stop()
}
// copilot生成压测代码
func BenchmarkPoolV2(b *testing.B) {
go hang()
b.ResetTimer()
for i := 0; i < b.N; i++ {
task := &Task{
Handler: func(ctx context.Context) error {
time.Sleep(time.Millisecond)
return nil
},
Timeout: time.Second,
}
pool.Submit(task)
}
pool.Stop()
}
总结
在调试的过程中,发现ChapGPT应该是学习了不少开源项目的代码风格,全体来说代码写的仍是比较高雅,便是有时候生成的代码编译不通过,需求再次生成。
生成的代码能用,可是根本需求手动再改改 -.-