什么是heap
Heap 是一种数据结构,其中包括一个特殊的根节点,且每个节点的值都不小于(或不大于)其一切子节点的值。这种数据结构常用于完成优先行列。
Heap的数据结构
Heap 能够经过一个数组来完成,这个数组满意以下条件:
-
和二叉查找树不同,堆并不需求满意左子节点小于父节点的值,右子节点大于父节点的值的条件。
-
堆中的一些列节点依照某种特定的次序摆放。这样的次序能够是最小的元素在最前面,也能够是最大的元素在最前面。这个次序满意父节点必定小于(或大于)它的一切子节点。
-
堆中的元素数量纷歧定是满的,也就是说堆并纷歧定是一个彻底二叉树。
堆具有以下特点。
- 任何节点都小于(或大于)其一切后代,并且最小元素(或最大元素)坐落堆的根(堆有序性)。
- 堆始终是一棵完好的树。即各级节点都填充除底层以外的元素,并且底层尽可能从左到右填充。
彻底二叉树和满二叉树的差异如下所示。
根节点最大的堆称为最大堆或大根堆,根节点最小的堆称为最小堆或小根堆。
因为堆是彻底二叉树,因而它们能够表明为次序数组,如下所示。
怎么完成优先级行列
优先行列是一种数据结构,其中每个元素都有一个优先级,优先级高的元素在前面,优先级相一起依照刺进次序摆放。能够运用堆来完成优先行列。完成优先行列的关键是将一个元素添加到行列中,并保持行列中的元素有序。假如运用数组来存储元素,需求频频对数组进行调整,时刻复杂度是O(n),不行高效。假如运用堆来存储元素,则能够在刺进时进行堆化,时刻复杂度是O(nlogn)。
在堆中,节点的方位与它们在数组中的方位有必定的联系。例如,根节点坐落数组的第一个元素,其他节点顺次摆放。左子节点坐落(2i),右子节点坐落(2i+1),父节点坐落(i/2)。这个联系能够方便地完成在数组上进行堆化的操作。
为什么需求运用优先级行列
优先级行列是一种十分有用的数据结构,在许多运用中都会被广泛运用。比如作业调度、事情管理等范畴,都需求运用优先级行列来帮助处理使命以及事情等的优先级次序。
长处和缺陷
长处
-
简略高效:优先级行列的完成较为简略,查找和刺进等操作都能够在 O(log(n))O(log(n)) 的时刻复杂度内完成,所以在完成简略的情况下,能够极大进步程序性能。
-
优先级:优先级行列能够依据使命或者事情的优先级,对其依照优先级巨细进行排序,并在需求的时候顺次处理。
缺陷
-
空间占用:优先级行列需求占用额外的内存空间,以存储使命和事情的优先级信息。
-
使命时效性:当优先级较高的使命过多时,可能会导致低优先级使命的响应推迟,从而影响使命的时效性。
heap PriorityQueue完成
代码来自github.com/hashicorp/v…
package go_pool_priority
import (
"container/heap"
"errors"
"sync"
"github.com/mitchellh/copystructure"
)
// ErrEmpty is returned for queues with no items
var ErrEmpty = errors.New("queue is empty")
// ErrDuplicateItem is returned when the queue attmepts to push an item to a key that
// already exists. The queue does not attempt to update, instead returns this
// error. If an Item needs to be updated or replaced, pop the item first.
var ErrDuplicateItem = errors.New("duplicate item")
// New initializes the internal data structures and returns a new
// PriorityQueue
func NewPriorityQueue() *PriorityQueue {
pq := PriorityQueue{
data: make(queue, 0),
dataMap: make(map[string]*Item),
}
heap.Init(&pq.data)
return &pq
}
// PriorityQueue facilitates queue of Items, providing Push, Pop, and
// PopByKey convenience methods. The ordering (priority) is an int64 value
// with the smallest value is the highest priority. PriorityQueue maintains both
// an internal slice for the queue as well as a map of the same items with their
// keys as the index. This enables users to find specific items by key. The map
// must be kept in sync with the data slice.
// See https://golang.org/pkg/container/heap/#example__priorityQueue
type PriorityQueue struct {
// data is the internal structure that holds the queue, and is operated on by
// heap functions
data queue
// dataMap represents all the items in the queue, with unique indexes, used
// for finding specific items. dataMap is kept in sync with the data slice
dataMap map[string]*Item
// lock is a read/write mutex, and used to facilitate read/write locks on the
// data and dataMap fields
lock sync.RWMutex
}
// queue is the internal data structure used to satisfy heap.Interface. This
// prevents users from calling Pop and Push heap methods directly
type queue []*Item
// Item is something managed in the priority queue
type Item struct {
// Key is a unique string used to identify items in the internal data map
Key string
// Value is an unspecified type that implementations can use to store
// information
Value interface{}
// Priority determines ordering in the queue, with the lowest value being the
// highest priority
Priority int64
// index is an internal value used by the heap package, and should not be
// modified by any consumer of the priority queue
index int
}
// Len returns the count of items in the Priority Queue
func (pq *PriorityQueue) Len() int {
pq.lock.RLock()
defer pq.lock.RUnlock()
return pq.data.Len()
}
// Pop pops the highest priority item from the queue. This is a
// wrapper/convenience method that calls heap.Pop, so consumers do not need to
// invoke heap functions directly
func (pq *PriorityQueue) Pop() (*Item, error) {
pq.lock.Lock()
defer pq.lock.Unlock()
if pq.data.Len() == 0 {
return nil, ErrEmpty
}
item := heap.Pop(&pq.data).(*Item)
delete(pq.dataMap, item.Key)
return item, nil
}
// Push pushes an item on to the queue. This is a wrapper/convenience
// method that calls heap.Push, so consumers do not need to invoke heap
// functions directly. Items must have unique Keys, and Items in the queue
// cannot be updated. To modify an Item, users must first remove it and re-push
// it after modifications
func (pq *PriorityQueue) Push(i *Item) error {
if i == nil || i.Key == "" {
return errors.New("error adding item: Item Key is required")
}
pq.lock.Lock()
defer pq.lock.Unlock()
if _, ok := pq.dataMap[i.Key]; ok {
return ErrDuplicateItem
}
// Copy the item value(s) so that modifications to the source item does not
// affect the item on the queue
clone, err := copystructure.Copy(i)
if err != nil {
return err
}
pq.dataMap[i.Key] = clone.(*Item)
heap.Push(&pq.data, clone)
return nil
}
// PopByKey searches the queue for an item with the given key and removes it
// from the queue if found. Returns nil if not found. This method must fix the
// queue after removing any key.
func (pq *PriorityQueue) PopByKey(key string) (*Item, error) {
pq.lock.Lock()
defer pq.lock.Unlock()
item, ok := pq.dataMap[key]
if !ok {
return nil, nil
}
// Remove the item the heap and delete it from the dataMap
itemRaw := heap.Remove(&pq.data, item.index)
delete(pq.dataMap, key)
if itemRaw != nil {
if i, ok := itemRaw.(*Item); ok {
return i, nil
}
}
return nil, nil
}
// Len returns the number of items in the queue data structure. Do not use this
// method directly on the queue, use PriorityQueue.Len() instead.
func (q queue) Len() int { return len(q) }
// Less returns whether the Item with index i should sort before the Item with
// index j in the queue. This method is used by the queue to determine priority
// internally; the Item with the lower value wins. (priority zero is higher
// priority than 1). The priority of Items with equal values is undetermined.
func (q queue) Less(i, j int) bool {
return q[i].Priority < q[j].Priority
}
// Swap swaps things in-place; part of sort.Interface
func (q queue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
q[i].index = i
q[j].index = j
}
// Push is used by heap.Interface to push items onto the heap. This method is
// invoked by container/heap, and should not be used directly.
// See: https://golang.org/pkg/container/heap/#Interface
func (q *queue) Push(x interface{}) {
n := len(*q)
item := x.(*Item)
item.index = n
*q = append(*q, item)
}
// Pop is used by heap.Interface to pop items off of the heap. This method is
// invoked by container/heap, and should not be used directly.
// See: https://golang.org/pkg/container/heap/#Interface
func (q *queue) Pop() interface{} {
old := *q
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*q = old[0 : n-1]
return item
}
– 内部运用container/heap中的Interface接口完成堆结构;
– 提供了Push、Pop和PopByKey等一系列方法;
– 运用一个内部slice和一个以Key为索引的映射map来保护行列元素;
– 依据元素的Priority值进行优先级排序,Priority值越小表明优先级越高;
– 在Push时需求保证Key值仅有;
– PopByKey方法能够依据Key查找并移除对应的元素。
完成思路
既然,咱们了解了heap的一些特性,那么咱们接下来就要考虑怎么用现有的数据结构,完成优先行列。
咱们都知道,无论是哪一种行列,必然是存在生产者和顾客两个部分,关于优先级行列来说,更是如此。因而,咱们的完成思路,也将从这两个部分来谈。
生产者
关于生产者来说,他只需求推送一个使命及其优先级过来,咱们就得依据优先级处理他的使命。
因为,咱们不大好判别,究竟会有多少种不同的优先级传过来,也无法确定,每种优先级下有多少个使命要处理,所以,咱们能够直接运用heap存储task
顾客
关于顾客来说,他需求获取优先级最高的使命进行消费。运用heap pop 取出优先级最高的使命即可
数据结构
(1)优先级行列目标
type PriorityQueueTask struct {
mLock sync.Mutex // 互斥锁,queues和priorities并发操作时运用,当然针对当前读多写少的场景,也能够运用读写锁
pushChan chan *task // 推送使命管道
pq *PriorityQueue
}
(2)使命目标
type task struct {
priority int64 // 使命的优先级
value interface{}
key string
}
初始化优先级行列目标
在初始化目标时,需求先经过 NewPriorityQueue() 函数创立一个空的 PriorityQueue,然后再创立一个 PriorityQueueTask 目标,并将刚刚创立的 PriorityQueue 赋值给该目标的 pq 特点。一起,还要创立一个用于接收推送使命的管道,用于在生产者推送使命时,将新使命添加到行列中。
func NewPriorityQueueTask() *PriorityQueueTask {
pq := &PriorityQueueTask{
pushChan: make(chan *task, 100),
pq: NewPriorityQueue(),
}
// 监听pushChan
go pq.listenPushChan()
return pq
}
func (pq *PriorityQueueTask) listenPushChan() {
for {
select {
case taskEle := <-pq.pushChan:
pq.mLock.Lock()
pq.pq.Push(&Item{Key: taskEle.key, Priority: taskEle.priority, Value: taskEle.value})
pq.mLock.Unlock()
}
}
}
生产者推送使命
生产者向推送使命管道中推送新使命时,实际上是将一个 task 结构体实例发送到了管道中。在 task 结构体中,priority 特点表明这个使命的优先级,value 特点表明这个使命的值,key 特点表明这个使命的键。
// 刺进work
func (pq *PriorityQueueTask) Push(priority int64, value interface{}, key string) {
pq.pushChan <- &task{
value: value,
priority: priority,
key: key,
}
}
顾客消费行列
顾客从行列中取出一个使命,然后进行相应的操作。在这段代码中,顾客轮询获取最高优先级的使命。假如没有获取到使命,则持续轮询;假如获取到了使命,则执行对应的操作。在这里,执行操作的具体形式是打印使命的编号、优先级等信息。
// Consume 顾客轮询获取最高优先级的使命
func (pq *PriorityQueueTask) Consume() {
for {
task := pq.Pop()
if task == nil {
// 未获取到使命,则持续轮询
time.Sleep(time.Millisecond)
continue
}
// 获取到了使命,就执行使命
fmt.Println("推送使命的编号为:", task.Value)
fmt.Println("推送的使命优先级为:", task.Priority)
fmt.Println("============")
}
}
完好代码
package go_pool_priority
import (
"fmt"
"sync"
"time"
)
type PriorityQueueTask struct {
mLock sync.Mutex // 互斥锁,queues和priorities并发操作时运用,当然针对当前读多写少的场景,也能够运用读写锁
pushChan chan *task // 推送使命管道
pq *PriorityQueue
}
type task struct {
priority int64 // 使命的优先级
value interface{}
key string
}
func NewPriorityQueueTask() *PriorityQueueTask {
pq := &PriorityQueueTask{
pushChan: make(chan *task, 100),
pq: NewPriorityQueue(),
}
// 监听pushChan
go pq.listenPushChan()
return pq
}
func (pq *PriorityQueueTask) listenPushChan() {
for {
select {
case taskEle := <-pq.pushChan:
pq.mLock.Lock()
pq.pq.Push(&Item{Key: taskEle.key, Priority: taskEle.priority, Value: taskEle.value})
pq.mLock.Unlock()
}
}
}
// 刺进work
func (pq *PriorityQueueTask) Push(priority int64, value interface{}, key string) {
pq.pushChan <- &task{
value: value,
priority: priority,
key: key,
}
}
// Pop 取出最高优先级行列中的一个使命
func (pq *PriorityQueueTask) Pop() *Item {
pq.mLock.Lock()
defer pq.mLock.Unlock()
item, err := pq.pq.Pop()
if err != nil {
return nil
}
// 假如一切行列都没有使命,则回来null
return item
}
// Consume 顾客轮询获取最高优先级的使命
func (pq *PriorityQueueTask) Consume() {
for {
task := pq.Pop()
if task == nil {
// 未获取到使命,则持续轮询
time.Sleep(time.Millisecond)
continue
}
// 获取到了使命,就执行使命
fmt.Println("推送使命的编号为:", task.Value)
fmt.Println("推送的使命优先级为:", task.Priority)
fmt.Println("============")
}
}
测试用例
func TestQueue(t *testing.T) {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
pq := NewPriorityQueueTask()
// 咱们在这里,随机生成一些优先级使命
for i := 0; i < 100; i++ {
a := rand.Intn(1000)
go func(a int64) {
pq.Push(a, a, strconv.Itoa(int(a)))
}(int64(a))
}
// 这里会阻塞,顾客会轮询查询使命行列
pq.Consume()
}