前语

在数据结构中,行列遵循着FIFO(先进先出)的规则。在此基础上,人们引申出了“优先级行列”的概念。

优先级行列,是带有优先级特点的行列,一切的行列元素依照优先级进行排序,顾客会先对优先级高的行列元素进行处理。

优先级行列的运用场景也是十分多的。比方,作业调度体系,当一个作业完成后,需求从剩下的作业中取出优先级最高的作业进行处理。又比方,一个商城的用户分为普通用户和vip用户,vip用户更容易抢到那些秒杀商品。

在本文中,我将和咱们一同评论,golang优先级行列的一种完成计划。

你能够收成

  • golang切片特性
  • golang map特性
  • golang并发场景下的解决计划
  • golang优先级行列的完成思路

正文

内容脉络

为了让咱们脑海里有个大致的概括,我先把正文的纲要展现出来。

golang优先级队列的实现

基础知识

在正式开端“优先级行列”这个论题之前,咱们首要要明确以下的一些golang特性。

  • 切片的特性

    • 元素的有序性
    • 非线程安全
  • map的特性

    • 元素的无序性
    • 非线程安全
  • 并发场景下的解决计划

    • 互斥锁:能够对非线程安全的数据结构创立临界区,一般用于同步场景;
    • 管道:能够对非线程安全的数据结构进行异步处理

完成思路

已然,咱们了解了golang的一些特性,那么,咱们接下来就要明确,怎样去完成优先级行列了。

咱们都知道,无论是哪一种行列,必定是存在生产者和顾客两个部分,关于优先级行列来说,更是如此。因而,咱们的完成思路,也将从这两个部分来谈。

1、生产者

关于生产者来说,他只需求推送一个使命及其优先级过来,咱们就得依据优先级处理他的使命。

由于,咱们不大好判别,究竟会有多少种不同的优先级传过来,也无法确定,每种优先级下有多少个使命要处理,所以,咱们能够考虑运用map来存储优先级行列。其中key为优先级,value为归于该优先级下的使命行列(即管道)

golang优先级队列的实现
2、顾客

关于顾客来说,他需求获取优先级最高的使命进行消费。

可是,假如只依照上面所说的map来存储优先级行列的话,咱们是无法找到优先级最高的使命行列的,由于map的元素是无序的。那么,咱们怎样处理这个问题呢?

咱们都知道,在golang的数据结构里,切片的元素是具有有序性的。那么,咱们只需求将一切的优先级按从小到大的方法,存储在一个切片里,就能够了。比及消费的时分,咱们能够先从切片中,取出最大的优先级,然后再依据这个key去优先级行列的map中查询,是不是就能够了?

golang优先级队列的实现

方针规划

想好了完成思路之后,咱们就得对接下来的代码完成做一个规划了。

  • 数据结构

    • 存储优先级行列的map
    • 存储优先级的切片
    • 互斥锁
    • 其他……
  • 生产者

    • 添加使命到优先级行列
  • 顾客

    • 从优先级行列获取使命

步步为营

1、数据流

(1)调用NewPriorityQueue() ,初始化优先级行列目标。

(2)初始化优先级行列map。

(3)开启协程,监听一个接收推送使命的全局管道pushChan

(4)用户调用Push() ,推送的使命进入pushChan

(5)推送的使命被加到优先级行列中。

(6)顾客从优先级行列中获取优先级最高的一个使命。

(7)顾客履行使命。

golang优先级队列的实现

2、数据结构

(1)优先级行列目标

type PriorityQueue struct {
   mLock      sync.Mutex         // 互斥锁,queues和priorities并发操作时运用,当然针对当时读多写少的场景,也能够运用读写锁
   queues     map[int]chan *task // 优先级行列map
   pushChan   chan *task         // 推送使命管道
   priorities []int              // 记载优先级的切片(优先级从小到大摆放)
}

(2)使命目标

type task struct {
   priority int    // 使命的优先级
   f        func() // 使命的履行函数
}

3、初始化优先级行列目标

func NewPriorityQueue() *PriorityQueue {
   pq := &PriorityQueue{
      queues:   make(map[int]chan *task), // 初始化优先级行列map
      pushChan: make(chan *task, 100),
   }
   return pq
}

当然,在这个过程中,咱们需求对pushChan进行监听。假如有使命推送过来,咱们得处理。

func (pq *PriorityQueue) listenPushChan() {
   for {
      select {
      case taskEle := <-pq.pushChan:
         // TODO 这儿接收到推送的使命,而且预备处理
      }
   }
}

将这个监听函数放到NewPriorityQueue()中:

func NewPriorityQueue() *PriorityQueue {
   pq := &PriorityQueue{
      queues:   make(map[int]chan *task),
      pushChan: make(chan *task, 100),
   }
   // 监听pushChan
   go pq.listenPushChan()
   return pq
}

4、生产者推送使命

生产者推送使命的时分,咱们只需求将使命放到pushChan中:

func (pq *PriorityQueue) Push(f func(), priority int) {
   pq.pushChan <- &task{
      f:        f,
      priority: priority,
   }
}

5、将推送使命加到优先级行列中

这一步就比较关键了。咱们前面谈到,优先级行列最核心的数据结构有两个:优先级行列map和优先级切片。因而,推送使命添加到优先级行列的操作,咱们得分两种状况来看:

(1)之前已经推过相同优先级的使命

这种状况十分简略,咱们其实只要操作优先级行列map就能够了。

func (pq *PriorityQueue) listenPushChan() {
   for {
      select {
      case taskEle := <-pq.pushChan:
         priority := taskEle.priority
         pq.mLock.Lock()
         if v, ok := pq.queues[priority]; ok {
            pq.mLock.Unlock()
            // 之前推送过相同优先级的使命
            // 将推送的使命塞到对应优先级的行列中
            v <- taskEle
            continue
         }
         // todo 之前未推过相同优先级使命的处理...
      }
   }
}

(2)之前未推过相同优先级的使命

这种状况会稍微杂乱一些。咱们不仅要将新的优先级刺进到优先级切片正确的方位,而且要将使命添加到对应优先级的行列。

1)将新的优先级刺进到优先级切片中

a. 首要,咱们得寻觅新优先级在切片中的刺进方位。这儿,咱们用了二分法

// 经过二分法寻觅新优先级的切片刺进方位
func (pq *PriorityQueue) getNewPriorityInsertIndex(priority int, leftIndex, rightIndex int) (index int) {
   if len(pq.priorities) == 0 {
      // 假如当时优先级切片没有元素,则刺进的index便是0
      return 0
   }
   length := rightIndex - leftIndex
   if pq.priorities[leftIndex] >= priority {
      // 假如当时切片中最小的元素都超越了刺进的优先级,则刺进方位应该是最左边
      return leftIndex
   }
   if pq.priorities[rightIndex] <= priority {
      // 假如当时切片中最大的元素都没超越刺进的优先级,则刺进方位应该是最右边
      return rightIndex + 1
   }
   if length == 1 && pq.priorities[leftIndex] < priority && pq.priorities[rightIndex] >= priority {
      // 假如刺进的优先级刚好在仅有的两个优先级之间,则中间的方位便是刺进方位
      return leftIndex + 1
   }
   middleVal := pq.priorities[leftIndex+length/2]
   // 这儿用二分法递归的方法,一向寻觅正确的刺进方位
   if priority <= middleVal {
      return pq.getNewPriorityInsertIndex(priority, leftIndex, leftIndex+length/2)
   } else {
      return pq.getNewPriorityInsertIndex(priority, leftIndex+length/2, rightIndex)
   }
}

b. 找到刺进方位之后,咱们才要刺进。在这个过程中,刺进方位右侧的元素全部都要向右边移动一位。

// index右侧元素均需求向后移动一个单位
func (pq *PriorityQueue) moveNextPriorities(index, priority int) {
   pq.priorities = append(pq.priorities, 0)
   copy(pq.priorities[index+1:], pq.priorities[index:])
   pq.priorities[index] = priority
}

这样,咱们就成功地将新的优先级刺进了切片。

2)将推送使命放入优先级行列map也就顺理成章。

// 创立一个新优先级管道
pq.queues[priority] = make(chan *task, 10000)
// 将使命塞到新的优先级管道中
pq.queues[priority] <- taskEle

因而,listenPushChan()的代码如下:

func (pq *PriorityQueue) listenPushChan() {
   for {
      select {
      case taskEle := <-pq.pushChan:
         priority := taskEle.priority
         pq.mLock.Lock()
         if v, ok := pq.queues[priority]; ok {
            pq.mLock.Unlock()
            // 将推送的使命塞到对应优先级的行列中
            v <- taskEle
            continue
         }
         // 假如这是一个新的优先级,则需求刺进优先级切片,而且新建一个优先级的queue
         // 经过二分法寻觅新优先级的切片刺进方位
         index := pq.getNewPriorityInsertIndex(priority, 0, len(pq.priorities)-1)
         // index右侧元素均需求向后移动一个单位
         pq.moveNextPriorities(index, priority)
         // 创立一个新优先级行列
         pq.queues[priority] = make(chan *task, 10000)
         // 将使命塞到新的优先级行列中
         pq.queues[priority] <- taskEle
         pq.mLock.Unlock()
      }
   }
}

完成了生产者部分之后,接下来咱们看看顾客。

6、顾客消费行列

这儿分成两个步骤,首要咱们得拿到最高优先级行列的使命,然后再去履行使命。代码如下:

// 顾客轮询获取最高优先级的使命
func (pq *PriorityQueue) Consume() {
   for {
      task := pq.Pop()
      if task == nil {
         // 未获取到使命,则继续轮询
         time.Sleep(time.Millisecond)
         continue
      }
      // 获取到了使命,就履行使命
      task.f()
   }
}
// 取出最高优先级行列中的一个使命
func (pq *PriorityQueue) Pop() *task {
   pq.mLock.Lock()
   defer pq.mLock.Unlock()
   for i := len(pq.priorities) - 1; i >= 0; i-- {
      if len(pq.queues[pq.priorities[i]]) == 0 {
         // 假如当时优先级的行列没有使命,则看低一级优先级的行列中有没有使命
         continue
      }
      // 假如当时优先级的行列里有使命,则取出一个使命。
      return <-pq.queues[pq.priorities[i]]
   }
   // 假如一切行列都没有使命,则返回null
   return nil
}

7、完好代码

这样,咱们的优先级行列就完成了。下面,咱们将完好代码展现。

pq.go

package priority_queue
import (
   "sync"
)
type PriorityQueue struct {
   mLock      sync.Mutex         // 互斥锁,queues和priorities并发操作时运用,当然针对当时读多写少的场景,也能够选用读写锁
   queues     map[int]chan *task // 优先级行列map
   pushChan   chan *task         // 推送使命管道
   priorities []int              // 记载优先级的切片(优先级从小到大摆放)
}
type task struct {
   priority int    // 使命的优先级
   f        func() // 使命的履行函数
}
func NewPriorityQueue() *PriorityQueue {
   pq := &PriorityQueue{
      queues:   make(map[int]chan *task),
      pushChan: make(chan *task, 100),
   }
   go pq.listenPushChan()
   return pq
}
func (pq *PriorityQueue) listenPushChan() {
   for {
      select {
      case taskEle := <-pq.pushChan:
         priority := taskEle.priority
         pq.mLock.Lock()
         if v, ok := pq.queues[priority]; ok {
            pq.mLock.Unlock()
            // 将推送的使命塞到对应优先级的行列中
            v <- taskEle
            continue
         }
         // 假如这是一个新的优先级,则需求刺进优先级切片,而且新建一个优先级的queue
         // 经过二分法寻觅新优先级的切片刺进方位
         index := pq.getNewPriorityInsertIndex(priority, 0, len(pq.priorities)-1)
         // index右侧元素均需求向后移动一个单位
         pq.moveNextPriorities(index, priority)
         // 创立一个新优先级行列
         pq.queues[priority] = make(chan *task, 10000)
         // 将使命塞到新的优先级行列中
         pq.queues[priority] <- taskEle
         pq.mLock.Unlock()
      }
   }
}
// 刺进work
func (pq *PriorityQueue) Push(f func(), priority int) {
   pq.pushChan <- &task{
      f:        f,
      priority: priority,
   }
}
// index右侧元素均需求向后移动一个单位
func (pq *PriorityQueue) moveNextPriorities(index, priority int) {
   pq.priorities = append(pq.priorities, 0)
   copy(pq.priorities[index+1:], pq.priorities[index:])
   pq.priorities[index] = priority
}
// 经过二分法寻觅新优先级的切片刺进方位
func (pq *PriorityQueue) getNewPriorityInsertIndex(priority int, leftIndex, rightIndex int) (index int) {
   if len(pq.priorities) == 0 {
      // 假如当时优先级切片没有元素,则刺进的index便是0
      return 0
   }
   length := rightIndex - leftIndex
   if pq.priorities[leftIndex] >= priority {
      // 假如当时切片中最小的元素都超越了刺进的优先级,则刺进方位应该是最左边
      return leftIndex
   }
   if pq.priorities[rightIndex] <= priority {
      // 假如当时切片中最大的元素都没超越刺进的优先级,则刺进方位应该是最右边
      return rightIndex + 1
   }
   if length == 1 && pq.priorities[leftIndex] < priority && pq.priorities[rightIndex] >= priority {
      // 假如刺进的优先级刚好在仅有的两个优先级之间,则中间的方位便是刺进方位
      return leftIndex + 1
   }
   middleVal := pq.priorities[leftIndex+length/2]
   // 这儿用二分法递归的方法,一向寻觅正确的刺进方位
   if priority <= middleVal {
      return pq.getNewPriorityInsertIndex(priority, leftIndex, leftIndex+length/2)
   } else {
      return pq.getNewPriorityInsertIndex(priority, leftIndex+length/2, rightIndex)
   }
}
// 取出最高优先级行列中的一个使命
func (pq *PriorityQueue) Pop() *task {
   pq.mLock.Lock()
   defer pq.mLock.Unlock()
   for i := len(pq.priorities) - 1; i >= 0; i-- {
      if len(pq.queues[pq.priorities[i]]) == 0 {
         // 假如当时优先级的行列没有使命,则看低一级优先级的行列中有没有使命
         continue
      }
      // 假如当时优先级的行列里有使命,则取出一个使命。
      return <-pq.queues[pq.priorities[i]]
   }
   // 假如一切行列都没有使命,则返回null
   return nil
}
// 顾客轮询获取最高优先级的使命
func (pq *PriorityQueue) Consume() {
   for {
      task := pq.Pop()
      if task == nil {
         // 未获取到使命,则继续轮询
         time.Sleep(time.Millisecond)
         continue
      }
      // 获取到了使命,就履行使命
      task.f()
   }
}

测验代码pq_test.go

package priority_queue
import (
   "fmt"
   "math/rand"
   "testing"
   "time"
)
func TestQueue(t *testing.T) {
   defer func() {
      if err := recover(); err != nil {
         fmt.Println(err)
      }
   }()
   pq := NewPriorityQueue()
   rand.Seed(time.Now().Unix())
   // 咱们在这儿,随机生成一些优先级使命
   for i := 0; i < 100; i++ {
      a := rand.Intn(10)
      go func(i int) {
         pq.Push(func() {
            fmt.Println("推送使命的编号为:", i)
            fmt.Println("推送的使命优先级为:", a)
            fmt.Println("============")
         }, a)
      }(i)
   }
   // 这儿会阻塞,顾客会轮询查询使命行列
   pq.Consume()
}

发散思想

上面的计划的确是完成了优先级行列,可是,有一种极点状况:假如顾客的消费速度远远小于生产者的生产速度,而且高优先级的使命被不断刺进,这样,低优先级的使命就会有“饿死”的风险。

关于这种状况,咱们在消费的时分,能够考虑给每一个优先级行列分配一个权重,高优先级的行列有更大的概率被消费,低优先级的概率相对较小。感兴趣的朋友们,能够自己去完成一下。

小结

本文和咱们评论了优先级行列在golang中的一种完成计划,里面使用到了切片、map、互斥锁、管道等许多golang特性,能够说是一个十分典型的案例。其实,优先级行列在实践的事务场景中运用广泛,其完成方法也不止一种,咱们需求依据实践的需求,选择最优解。