对于大型的互联网运用程序,如电商渠道、交际网络、金融交易渠道
等,每秒钟都会收到很多的恳求。在这些运用程序中,需求运用高效的技能来应对高并发的恳求,尤其是在短时间内处理很多的恳求,如1分钟百万恳求。
一起,为了下降用户的运用门槛和提升用户体会,前端需求实现参数
的无感知传递。这样用户在运用时,无需忧虑参数传递的问题,能够轻松地享受运用程序的服务。
在处理1分钟百万恳求时,需求运用高效的技能和算法,以提高恳求的响应速度和处理才能。Go言语以其高效性和并发性而闻名,因而成为处理高并发恳求的优异挑选。Go中有多种形式可供挑选,如依据goroutine和channel的并发模型、运用池技能的协程模型等,以便依据详细运用的需求来挑选合适的技能形式。
本文代码参阅搬至
marksuper.xyz/2021/10/08/… marcio.io/2015/07/han…
W1
W1 结构体类型,它有五个成员:
- WgSend 用于等候使命发送的 goroutine 完结。
- Wg 用于等候使命处理的 goroutine 完结。
- MaxNum 表示 goroutine 池的大小。
- Ch 是一个字符串类型的通道,用于传递使命。
- DispatchStop 是一个空结构体类型的通道,用于间断使命分发。
type W1 struct {
WgSend *sync.WaitGroup
Wg *sync.WaitGroup
MaxNum int
Ch chan string
DispatchStop chan struct{}
}
接下来是 Dispatch 办法,它将使命发送到通道 Ch 中。它经过 for 循环来发送 10 倍于 MaxNum 的使命,每个使命都是一个 goroutine。defer 句子用于在使命完结时削减 WgSend 的计数。select 句子用于在使命分发被间断时退出使命发送。
Dispatch
func (w *W1) Dispatch(job string) {
w.WgSend.Add(10 * w.MaxNum)
for i := 0; i < 10*w.MaxNum; i++ {
go func(i int) {
defer w.WgSend.Done()
select {
case w.Ch <- fmt.Sprintf("%d", i):
return
case <-w.DispatchStop:
fmt.Println("退出发送 job: ", fmt.Sprintf("%d", i))
return
}
}(i)
}
}
StartPool
然后是 StartPool
办法,它创立了一个 goroutine 池来处理从通道 Ch 中读取到的使命。
假如通道 Ch 还没有被创立,那么它将被创立。假如计数器 WgSend 还没有被创立,那么它也将被创立。假如计数器 Wg 还没有被创立,那么它也将被创立。
假如通道 DispatchStop 还没有被创立,那么它也将被创立。
for 循环用于创立 MaxNum 个 goroutine
来处理从通道中读取到的使命。defer 句子用于在使命完结时削减 Wg 的计数。
func (w *W1) StartPool() {
if w.Ch == nil {
w.Ch = make(chan string, w.MaxNum)
}
if w.WgSend == nil {
w.WgSend = &sync.WaitGroup{}
}
if w.Wg == nil {
w.Wg = &sync.WaitGroup{}
}
if w.DispatchStop == nil {
w.DispatchStop = make(chan struct{})
}
w.Wg.Add(w.MaxNum)
for i := 0; i < w.MaxNum; i++ {
go func() {
defer w.Wg.Done()
for v := range w.Ch {
fmt.Printf("完结作业: %s \n", v)
}
}()
}
}
Stop
最终是 Stop 办法,它间断使命分发并等候一切使命完结。
它封闭了通道 DispatchStop
,等候 WgSend 中的使命发送 goroutine 完结,然后封闭通道 Ch,等候 Wg 中的使命处理 goroutine 完结。
func (w *W1) Stop() {
close(w.DispatchStop)
w.WgSend.Wait()
close(w.Ch)
w.Wg.Wait()
}
W2
SubWorker
type SubWorker struct {
JobChan chan string
}
子协程,它有一个 JobChan,用于接纳使命。
Run:SubWorker 的办法,用于发动一个子协程,从 JobChan 中读取使命并履行。
func (sw *SubWorker) Run(wg *sync.WaitGroup, poolCh chan chan string, quitCh chan struct{}) {
if sw.JobChan == nil {
sw.JobChan = make(chan string)
}
wg.Add(1)
go func() {
defer wg.Done()
for {
poolCh <- sw.JobChan
select {
case res := <-sw.JobChan:
fmt.Printf("完结作业: %s \n", res)
case <-quitCh:
fmt.Printf("顾客完毕...... \n")
return
}
}
}()
}
W2
type W2 struct {
SubWorkers []SubWorker
Wg *sync.WaitGroup
MaxNum int
ChPool chan chan string
QuitChan chan struct{}
}
Dispatch
Dispatch:W2 的办法,用于从 ChPool 中获取 TaskChan,将使命发送给一个 SubWorker 履行。
func (w *W2) Dispatch(job string) {
jobChan := <-w.ChPool
select {
case jobChan <- job:
fmt.Printf("发送使命 : %s 完结 \n", job)
return
case <-w.QuitChan:
fmt.Printf("发送者(%s)完毕 \n", job)
return
}
}
StartPool
StartPool:W2 的办法,用于初始化协程池,发动一切子协程并把 TaskChan 存储在 ChPool 中。
func (w *W2) StartPool() {
if w.ChPool == nil {
w.ChPool = make(chan chan string, w.MaxNum)
}
if w.SubWorkers == nil {
w.SubWorkers = make([]SubWorker, w.MaxNum)
}
if w.Wg == nil {
w.Wg = &sync.WaitGroup{}
}
for i := 0; i < len(w.SubWorkers); i++ {
w.SubWorkers[i].Run(w.Wg, w.ChPool, w.QuitChan)
}
}
Stop
Stop:W2 的办法,用于间断协程的作业,并等候一切协程完毕。
func (w *W2) Stop() {
close(w.QuitChan)
w.Wg.Wait()
close(w.ChPool)
}
DealW2 函数则是整个协程池的进口,它经过 NewWorker 办法创立一个 W2 实例,然后调用 StartPool 发动协程池,并经过 Dispatch 发送使命,最终调用 Stop 间断协程池。
func DealW2(max int) {
w := NewWorker(w2, max)
w.StartPool()
for i := 0; i < 10*max; i++ {
go w.Dispatch(fmt.Sprintf("%d", i))
}
w.Stop()
}
个人见解
- 看到这里对于w2我已经有点迷糊了,还能传递
w.Wg, w.ChPool, w.QuitChan
?
原来是golang里假如办法传递的不是地址,那么就会做一个复制,所以这里调用的wg根本就不是一个对象。
传递的当地传递地址就能够了,假如不传递地址,将会呈现死锁
go doSomething(i, &wg, ch)
func doSomething(index int, wg *sync.WaitGroup, ch chan int) {
- w1也有一个比较大的问题。在处理恳求时,每个 Goroutine 都会占用一定的系统资源,假如恳求量过大,会
造成 Goroutine 数量的剧增
,耗费过多系统资源,程序或许会崩溃
- 能不能用上我之前写的协程池 探求 Go 的高档特性之 【Go 协程池】
探求原文
在这段代码中,poolCh代表作业者池,sw.JobChan代表作业者的作业通道。当一个作业者完结了作业后,它会将作业成果发送到sw.JobChan,此刻能够经过case res := <-sw.JobChan:来接纳该作业的成果。
在这个代码块中,还需求处理一个退出信号quitCh。因而,第二个case <-quitCh:用于检测是否接纳到了退出信号。假如接纳到了退出信号,程序将打印出消息并完毕。
需求留意的是,这两个case句子是互斥的,只有当作业者完结作业或收到退出信号时,才会进入其中一个句子。因而,这个循环能够保证在作业者完结作业或收到退出信号时退出。
需求读取两次sw.JobChan的原因是:第一次读取用于将作业者的作业通道放回作业者池中,这样其他作业者就能够运用该通道。第二次读取用于接纳作业者的作业成果或退出信号。因而,这两次读取是为了保证能够在正确的时刻将作业者的作业通道放回作业者池中并正确地处理作业成果或退出信号。
依据w2的特色 我自己写了一个w2
import (
"fmt"
"sync"
)
type SubWorkerNew struct {
JobChan chan string
}
type W2New struct {
SubWorkers []SubWorkerNew
Wg *sync.WaitGroup
MaxNum int
ChPool chan chan string
QuitChan chan struct{}
}
func NewW2(maxNum int) *W2New {
subWorkers := make([]SubWorkerNew, maxNum)
for i := 0; i < maxNum; i++ {
subWorkers[i] = SubWorkerNew{JobChan: make(chan string)}
}
pool := make(chan chan string, maxNum)
for i := 0; i < maxNum; i++ {
pool <- subWorkers[i].JobChan
}
return &W2New{
SubWorkers: subWorkers,
Wg: &sync.WaitGroup{},
MaxNum: maxNum,
ChPool: pool,
QuitChan: make(chan struct{}),
}
}
func (w *W2New) Dispatch(job string) {
select {
case jobChannel := <-w.ChPool:
jobChannel <- job
default:
fmt.Println("All workers busy")
}
}
func (w *W2New) StartPool() {
for i := 0; i < w.MaxNum; i++ {
go func(subWorker *SubWorkerNew) {
w.Wg.Add(1)
defer w.Wg.Done()
for {
select {
case job := <-subWorker.JobChan:
fmt.Println("processing ", job)
case <-w.QuitChan:
return
}
}
}(&w.SubWorkers[i])
}
}
func (w *W2New) Stop() {
close(w.QuitChan)
w.Wg.Wait()
close(w.ChPool)
for _, subWorker := range w.SubWorkers {
close(subWorker.JobChan)
}
}
func main() {
w := NewW2(5)
w.StartPool()
for i := 0; i < 20; i++ {
w.Dispatch(fmt.Sprintf("job %d", i))
}
w.Stop()
}
但是有几个点需求留意
1.没有考虑JobChan通道的缓冲区大小
,假如有很多使命被并发分配,容易导致内存占用过高;
2.每个线程都会履行无限循环
,此刻线程退出的条件是接纳到QuitChan通道的信号,或许导致线程的堵塞等问题;
3.Dispatch
函数的默许情况下只会输出”All workers busy”,而不是堵塞,这意味着当一切线程都处于忙碌状态时,使命会丢掉
4.线程池发动后无法动态扩展或缩小
。
优化
这个优化版本改了很多次。有一些需求留意的点是,不然会一直死锁
1.运用sync.WaitGroup来保证线程池中一切线程都能够发动并运转;
2.在Stop函数中,先向SubWorker的JobChan中发送一个封闭信号,再等候一切SubWorker线程退出;
3.在Dispatch函数中,将默许情况下的输出改为堵塞等候可用通道;
w2new
package handle_million_requests
import (
"fmt"
"sync"
"time"
)
type SubWorkerNew struct {
Id int
JobChan chan string
}
type W2New struct {
SubWorkers []SubWorkerNew
MaxNum int
ChPool chan chan string
QuitChan chan struct{}
Wg *sync.WaitGroup
}
func NewW2(maxNum int) *W2New {
chPool := make(chan chan string, maxNum)
subWorkers := make([]SubWorkerNew, maxNum)
for i := 0; i < maxNum; i++ {
subWorkers[i] = SubWorkerNew{Id: i, JobChan: make(chan string)}
chPool <- subWorkers[i].JobChan
}
wg := new(sync.WaitGroup)
wg.Add(maxNum)
return &W2New{
MaxNum: maxNum,
SubWorkers: subWorkers,
ChPool: chPool,
QuitChan: make(chan struct{}),
Wg: wg,
}
}
func (w *W2New) StartPool() {
for i := 0; i < w.MaxNum; i++ {
go func(wg *sync.WaitGroup, subWorker *SubWorkerNew) {
defer wg.Done()
for {
select {
case job := <-subWorker.JobChan:
fmt.Printf("SubWorker %d processing job %s\n", subWorker.Id, job)
time.Sleep(time.Second) // 模仿使命处理进程
case <-w.QuitChan:
return
}
}
}(w.Wg, &w.SubWorkers[i])
}
}
func (w *W2New) Stop() {
close(w.QuitChan)
for i := 0; i < w.MaxNum; i++ {
close(w.SubWorkers[i].JobChan)
}
w.Wg.Wait()
}
func (w *W2New) Dispatch(job string) {
select {
case jobChan := <-w.ChPool:
jobChan <- job
default:
fmt.Println("All workers busy")
}
}
func (w *W2New) AddWorker() {
newWorker := SubWorkerNew{Id: w.MaxNum, JobChan: make(chan string)}
w.SubWorkers = append(w.SubWorkers, newWorker)
w.ChPool <- newWorker.JobChan
w.MaxNum++
w.Wg.Add(1)
go func(subWorker *SubWorkerNew) {
defer w.Wg.Done()
for {
select {
case job := <-subWorker.JobChan:
fmt.Printf("SubWorker %d processing job %s\n", subWorker.Id, job)
time.Sleep(time.Second) // 模仿使命处理进程
case <-w.QuitChan:
return
}
}
}(&newWorker)
}
func (w *W2New) RemoveWorker() {
if w.MaxNum > 1 {
worker := w.SubWorkers[w.MaxNum-1]
close(worker.JobChan)
w.MaxNum--
w.SubWorkers = w.SubWorkers[:w.MaxNum]
}
}
AddWorker
和RemoveWorker
,用于动态扩展/缩小线程池。
- 在
AddWorker
函数中,咱们首先将MaxNum增加了1,然后创立一个新的SubWorkerNew结构体,将其添加到SubWorkers中,并将其JobChan通道添加到ChPool通道中。最终,咱们创立一个新的协程来处理新添加的SubWorkerNew并让它进入无限循环,等候接纳使命。 - 在
RemoveWorker
函数中,咱们首先将MaxNum削减1,然后获取最终一个SubWorkerNew结构体,将它的JobChan通道发送到ChPool通道中,并从其通道中读取任何待处理的使命,最终创立一个新的协程来处理SubWorkerNew,继续处理使命。
测验用例
func TestW2New(t *testing.T) {
pool := NewW2(3)
pool.StartPool()
pool.Dispatch("task 1")
pool.Dispatch("task 2")
pool.Dispatch("task 3")
pool.AddWorker()
pool.AddWorker()
pool.RemoveWorker()
pool.Stop()
}
当Dispatch函数向ChPool通道获取可用通道时,会从通道中取出一个SubWorker的JobChan通道,并将使命发送到该通道中。而对于SubWorker来说,并没有进行使命的运用次数约束,所以它能够处理多个使命。
在这个例子中,当使命数量比SubWorker数量多时,一个SubWorker的JobChan通道会接纳到多个使命,它们会在SubWorker的循环中按顺序依次处理,直到JobChan中没有未处理的使命停止。因而,假如使命数量特别大,或许会导致某些SubWorker的JobChan通道暂时处于未处理使命状态,而其他的SubWorker在履行使命。
在测验成果中,最终三行中呈现了多个”SubWorker 0 processing job”,阐明SubWorker 0的JobChan通道接纳了多个使命,而且在其循环中处理这些使命。下面的代码片段显示了这个进程:
// SubWorker 0 的循环部分
for {
select {
case job := <-subWorker.JobChan:
fmt.Printf("SubWorker %d processing job %s\n", subWorker.Id, job)
case <-w.QuitChan:
return
}
}