原因

最开端是想学习下通用协程池的写法,想自己实现一下。
然后想到了最近很火的ChatGPT,看看能不能让它帮我写一个,或许写完之后我再拿去优化一下~
最终生成的代码可以直接划到文章结尾的最终版

动手

简略版

image.png

最开端我的提问很简略,便是期望它帮我生成一个Go的协程池,
它写完之后 ,本着不会百分百相信AI的准则,自己check一下
发现这个协程池其实是有几个问题的

  1. 超时操控:子func履行时刻过长的话是无法处理或感知到的
  2. 资源耗尽:池中的func没有限制,无法管理资源

简略版剖析

接着我想让ChatGPT帮我剖析这个协程池的危险

image.png
ChatGPT帮我剖析出了四个或许的危险

  1. 竞赛条件:在Submit函数中,咱们向使命通道中发送使命,而在worker函数中,咱们从使命通道中读取使命。假如多个goroutine一起调用Submit函数来提交使命,那么它们或许会抢夺向使命通道中发送使命的权限,然后导致竞赛条件。为了防止这种状况,咱们需求运用互斥锁或其他同步机制来维护使命通道的并发访问。
  2. 内存走漏:在Submit函数中,咱们创建了一个新的Task目标,并将其添加到使命通道中。假如咱们没有在某个时刻清理这些Task目标,那么它们或许会一向占用内存,然后导致内存走漏。为了防止这种状况,咱们需求在worker函数中正确处理一切的使命,以确保它们在履行完成后可以被垃圾回收。
    3.. 资源耗尽:在worker函数中,咱们创建了一个新的goroutine来履行每个使命,并运用select句子来等候使命完成或超时。假如咱们提交了很多的长期运转的使命,那么这些goroutine或许会占用很多的系统资源,然后导致资源耗尽。为了防止这种状况,咱们需求限制协程池的最大并发数,并且合理地设置使命的最长履行时刻,以防止过多的长期运转的使命一起履行。
  3. 使命超时:在worker函数中,咱们运用time.After函数来设置使命的最长履行时刻,并在select句子中等候使命完成或超时。假如咱们设置的最长履行时刻太短,那么或许会导致一些正常的使命被过错地标记为超时。假如咱们设置的最长履行时刻太长,那么或许会导致一些长期运转的使命占用过多的系统资源,然后影响系统的性能和稳定性。为了防止这种状况,咱们需求根据实际状况合理地设置使命的最长履行时刻,以确保使命可以在预期的时刻内完成。

改善

接着我让它帮我生成一个优化的版别

image.png

简略看了一下,它用了简略的生产-消费模型生成了一个支撑单Task过期时刻设置的协程池,单Task的过期时刻运用方式契合预期,可是它引入了其他问题

  1. 内存走漏:假如Task行列中积压了很多使命,会导致内存一向被占用,无法被释放
  2. 过期时刻:由于使命在每个worker中归于串行消费,或许导致前面的func履行耗时过长,后边的func还没被消费到 就已通过期了

因此我让它再帮我改善一下~
以下的代码其实通过屡次调试改善,因为我感觉ChatGPT有时候无法理解上下文

package goroutine_pool
import (
   "context"
   "fmt"
   "sync"
   "time"
)
// Task represents a function to be executed in the worker pool
type Task struct {
   Handler func() error
   Timeout time.Duration
}
// Pool represents a worker pool
type Pool struct {
   size           int        // number of workers
   taskQueue      chan *Task // channel to store tasks
   ctx            context.Context
   cancel         context.CancelFunc
   wg             sync.WaitGroup
   isClosed       bool
   workerCallback func(*Worker)
   mutex          sync.Mutex
}
// Worker represents a worker in the pool
type Worker struct {
   id        int        // worker id
   taskQueue chan *Task // channel to receive tasks from the pool
   ctx       context.Context
   cancel    context.CancelFunc
   pool      *Pool // pointer to the pool
}
// NewPool creates a new worker pool with the given size
func NewPool(size int) *Pool {
   pool := &Pool{
      size:      size,
      taskQueue: make(chan *Task, size*10),
      isClosed:  false,
   }
   pool.ctx, pool.cancel = context.WithCancel(context.Background())
   return pool
}
// NewWorker creates a new worker and starts it
func NewWorker(id int, pool *Pool) *Worker {
   ctx, cancel := context.WithCancel(pool.ctx)
   worker := &Worker{
      id:        id,
      taskQueue: make(chan *Task),
      ctx:       ctx,
      cancel:    cancel,
      pool:      pool,
   }
   go worker.Start()
   return worker
}
// Start starts the worker
func (w *Worker) Start() {
   defer w.pool.wg.Done()
   for {
      select {
      case task := <-w.taskQueue:
         if task == nil {
            // channel closed
            return
         }
         if task.Timeout > 0 {
            _, cancel := context.WithTimeout(w.ctx, task.Timeout)
            defer cancel()
            w.pool.workerCallback(w)
            err := task.Handler()
            if err != nil {
               fmt.Printf("worker %d: error executing task: %v\n", w.id, err)
            }
         } else {
            w.pool.workerCallback(w)
            err := task.Handler()
            if err != nil {
               fmt.Printf("worker %d: error executing task: %v\n", w.id, err)
            }
         }
      case <-w.ctx.Done():
         // worker is shutting down
         return
      }
   }
}
// Submit adds a new task to the task queue
func (p *Pool) Submit(handler func() error, timeout time.Duration) error {
   if p.isClosed {
      return fmt.Errorf("pool is closed")
   }
   select {
   case p.taskQueue <- &Task{Handler: handler, Timeout: timeout}:
      return nil
   default:
      return fmt.Errorf("task queue is full")
   }
}
// Close closes the worker pool
func (p *Pool) Close() {
   p.mutex.Lock()
   defer p.mutex.Unlock()
   if p.isClosed {
      return
   }
   p.isClosed = true
   close(p.taskQueue)
   p.cancel()
   p.wg.Wait()
}
// SetWorkerCallback sets the callback function to be called before and after a worker executes a task
func (p *Pool) SetWorkerCallback(callback func(*Worker)) {
   p.workerCallback = callback
}
func main() {
   pool := NewPool(5)
   defer pool.Close()
   pool.SetWorkerCallback(func(w *Worker) {
      fmt.Printf("worker %d: start executing task\n", w.id)
   })
   for i := 0; i < 10; i++ {
      j := i
      err := pool.Submit(func() error {
         fmt.Printf("task %d: start\n", j)
         time.Sleep(time.Second)
         fmt.Printf("task %d: end\n", j)
         return nil
      }, time.Second*2)
      if err != nil {
         fmt.Printf("error submitting task: %v\n", err)
      }
   }
}

改善版危险

我自己看了下代码,感觉现在这个版别现已足够了~可是不知道还有没有什么细节的危险,因此再问了ChatGPT,一起期望它能帮我生成单测代码来验证这个Pool的正确性

image.png

ChatGPT告诉我这个pool有三个危险:

  1. Task的Timeout是以阻塞方式处理的,假如task履行时刻过长,或许会影响整个协程池的性能。
  2. Submit函数里的select中没有default分支,当task行列满了之后,调用方会被阻塞,而没有任何反应。
  3. Submit函数并没有回来使命履行成果,无法判断使命是否履行成功。

可是我自己剖析了一下:
(1) 我觉得这个可以接受
(2) Submit函数里边其实有default分支的
(3) 这个是没办法防止的工作,或许说可以解决可是投入本钱太大~当前计划就现已可以接受了

最终版

我期望线程池最好仍是可以带上一定的可监控性,所以最终让ChatGPT帮我完善了一下之前的协程池~

package goroutine_pool
import (
   "context"
   "fmt"
   "sync"
   "time"
)
// 使命结构体
type Task struct {
   Handler func(ctx context.Context) error // 使命履行的函数
   Timeout time.Duration                   // 使命履行超时时刻
}
// 协程池结构体
type Pool struct {
   capacity int            // 协程池容量
   workers  []*Worker      // 协程池中的协程
   taskChan chan *Task     // 使命通道
   wg       sync.WaitGroup // 等候一切协程履行完毕
}
// 协程结构体
type Worker struct {
   id        int           // 协程id
   taskChan  chan *Task    // 使命通道
   stopChan  chan struct{} // 中止协程的通道
   running   bool          // 协程是否在运转中
   terminate bool          // 是否需求中止协程
}
// 创建协程池
func NewPool(capacity int) *Pool {
   return &Pool{
      capacity: capacity,
      workers:  make([]*Worker, capacity),
      taskChan: make(chan *Task),
   }
}
// 创建一个协程
func newWorker(id int, taskChan chan *Task) *Worker {
   return &Worker{
      id:        id,
      taskChan:  taskChan,
      stopChan:  make(chan struct{}),
      running:   true,
      terminate: false,
   }
}
// 启动协程
func (w *Worker) start() {
   go func() {
      for {
         select {
         case task := <-w.taskChan:
            // 假如使命为空,则退出协程
            if task == nil {
               w.running = false
               w.terminate = true
               return
            }
            ctx, cancel := context.WithTimeout(context.Background(), task.Timeout)
            defer cancel()
            err := task.Handler(ctx)
            if err != nil {
               // 处理使命履行失败的状况
            }
         case <-w.stopChan:
            w.running = false
            w.terminate = true
            return
         }
      }
   }()
}
// 中止协程
func (w *Worker) stop() {
   w.stopChan <- struct{}{}
}
// 启动协程池
func (p *Pool) Start() {
   for i := 0; i < p.capacity; i++ {
      p.workers[i] = newWorker(i, p.taskChan)
      p.workers[i].start()
   }
}
// 中止协程池
func (p *Pool) Stop() {
   for i := 0; i < p.capacity; i++ {
      p.workers[i].stop()
   }
   for i := 0; i < p.capacity; i++ {
      // 等候一切协程退出
      if p.workers[i].running {
         p.workers[i].stop()
      }
   }
   close(p.taskChan)
   p.wg.Wait()
}
// 提交使命到协程池
func (p *Pool) Submit(task *Task) {
   p.wg.Add(1)
   p.taskChan <- task
}
// 监控协程池
func (p *Pool) Monitor() {
   for {
      // 统计使命行列长度
      queueLen := len(p.taskChan)
      // 统计协程池中正在运转的协程数和需求退出的协程数
      var runningCount, terminateCount int
      for i := 0; i < p.capacity; i++ {
         if p.workers[i].running {
            runningCount++
         }
         if p.workers[i].terminate {
            terminateCount++
         }
      }
      // 打印监控信息
      fmt.Printf("Queue length: %d, Running workers: %d, Terminating workers: %d\n", queueLen, runningCount, terminateCount)
      time.Sleep(time.Second)
   }
}

测验代码

自己简略跑了下功能测验和benchmark ,下面直接上代码

package goroutine_pool
import (
   "context"
   "fmt"
   "log"
   "net/http"
   "sync"
   "testing"
   "time"
)
var (
   pool = NewPool(10)
   ctx  = context.Background()
)
func init() {
   pool.Start()
   log.Println(http.ListenAndServe("localhost:8080", nil))
}
func TestPool(t *testing.T) {
   testSubmit(ctx)
}
func testSubmit(ctx context.Context) {
   wg := sync.WaitGroup{}
   for i := 0; i < 100; i++ {
      wg.Add(1)
      go func(ii int) {
         defer wg.Done()
         task := &Task{
            Handler: func(ctx context.Context) error {
               err := handlePrintFunc(ctx, ii)
               return err
            },
            Timeout: time.Second,
         }
         pool.Submit(task)
      }(i)
   }
   wg.Wait()
   pool.Stop()
   go hang()
   select {}
}
func handlePrintFunc(ctx context.Context, i int) (err error) {
   fmt.Println(fmt.Sprintf("task:[%d] in", i))
   time.Sleep(2 * time.Second)
   fmt.Println(fmt.Sprintf("task:[%d] out", i))
   return nil
}
func hang() {
   time.Sleep(time.Hour)
}
func BenchmarkPool(b *testing.B) {
   go hang()
   b.ResetTimer()
   for i := 0; i < b.N; i++ {
      task := &Task{
         Handler: func(ctx context.Context) error {
            time.Sleep(time.Millisecond)
            return nil
         },
         Timeout: time.Second,
      }
      pool.Submit(task)
   }
   pool.Stop()
}
// copilot生成压测代码
func BenchmarkPoolV2(b *testing.B) {
   go hang()
   b.ResetTimer()
   for i := 0; i < b.N; i++ {
      task := &Task{
         Handler: func(ctx context.Context) error {
            time.Sleep(time.Millisecond)
            return nil
         },
         Timeout: time.Second,
      }
      pool.Submit(task)
   }
   pool.Stop()
}

总结

在调试的过程中,发现ChapGPT应该是学习了不少开源项目的代码风格,全体来说代码写的仍是比较高雅,便是有时候生成的代码编译不通过,需求再次生成。
生成的代码能用,可是根本需求手动再改改 -.-