最近在倒腾自建博客后端体系,需要用到延时使命的功能,但手头只要一套MySQL和Redis,假如搞一套MQ成本有点大,所以想着用redis完成延时音讯行列。有些场景用数据库的守时扫表也能简略完成延时音讯的功能,不过关于我这边的实践场景(比方计数体系)其实是把数据存到redis中,假如用数据库完成延时音讯会对数据库有比较大的压力。

体系规划

这里参考了有赞的推迟行列规划

基于Redis实现高性能延时消息队列

数据结构规划

事情音讯体

type EventEntity struct {
	EventId    int64
	Topic      string
	Body       string
	EffectTime time.Time
}
  • EVENT_POOL: 运用redis的hash,里边存储了使命事情的完整信息,key=prefix+namespace+topic,field=EventId, val=EventEntity;
  • EVENT_BUCKET: 运用redis的zset,里边存储了使命事情的有序调集,key=prefix+namespace+topic,score=EffectTime, member=EventId;
  • EVENT_QUEUE: 运用redis的list, list中存储了到期待消费使命的EventId。

推迟行列的执行流程

基于Redis实现高性能延时消息队列

1、当有新增延时使命过来时,会在EVENT_POOL对应的topic中增加一条记载,一起也会把使命增加到EVENT_BUCKET中,按收效时刻排序;

2、转移线程会守时扫描EVENT_BUCKET中已经到期的使命,将这些使命push到EVENT_QUEUE对应topic的行列当中,之后将这些使命从EVENT_BUCKET中删去;

3、EVENT_QUEUE每个topic会有一个监听线程,当发现当时topic行列中有待消费的使命,则会将使命pop出来,并从EVENT_POOL中查询使命详情,交给consumer消费。

代码完成

核心代码

发布延时使命
func (q *DelayQueue) PublishEvent(ctx context.Context, event *EventEntity) error {
	pipeline := q.redisClient.WithContext(ctx).Pipeline()
	defer pipeline.Close()
    // 向EVENT_POOL中增加使命
	pipeline.HSet(q.genPoolKey(event.Topic), strconv.FormatInt(event.EventId, 10), util.ToJsonString(event))
	// 将使命id增加到EVENT_BUCKET中,按收效时刻排序
	pipeline.ZAdd(q.genBucketKey(event.Topic), redis.Z{
		Member: strconv.FormatInt(event.EventId, 10),
		Score:  float64(event.EffectTime.Unix()),
	})
	_, err := pipeline.Exec()
	if err != nil {
		logs.CtxWarn(ctx, "pipeline.Exec", logs.String("err", err.Error()))
		return err
	}
	return nil
}
转移线程扫描到期使命
func (q *DelayQueue) carryEventToQueue(topic string) error {
	ctx := context.Background()
	// 扫描zset中到期的使命
	members, err := q.redisClient.WithContext(ctx).ZRangeByScoreWithScores(q.genBucketKey(topic), redis.ZRangeBy{Min: "0", Max: util.ToString(time.Now().Unix())}).Result()
	if err != nil && err != redis.Nil {
		logs.CtxWarn(ctx, "[carryEventToQueue] ZRangeByScoreWithScores", logs.String("err", err.Error()))
		return err
	}
	if len(members) == 0 {
		return nil
	}
	errMap := make(map[string]error)
	// 将使命增加到对应topic的待消费行列里
	for _, m := range members {
		eventId := m.Member.(string)
		err = q.redisClient.WithContext(ctx).LPush(q.genQueueKey(topic), eventId).Err()
		if err != nil {
			logs.CtxWarn(ctx, "[carryEventToQueue] LPush", logs.String("err", err.Error()))
			errMap[eventId] = err
		}
	}
	// 从Bucket中删去已进入待消费行列的事情
	var doneMembers []interface{}
	for _, m := range members {
		eventId := m.Member.(string)
		if _, ok := errMap[eventId]; !ok {
			doneMembers = append(doneMembers, eventId)
		}
	}
	if len(doneMembers) == 0 {
		return nil
	}
	err = q.redisClient.WithContext(ctx).ZRem(q.genBucketKey(topic), doneMembers...).Err()
	if err != nil {
		logs.CtxWarn(ctx, "[carryEventToQueue] ZRem", logs.String("err", err.Error()))
	}
	return nil
}
监听线程消费使命

这里运用了List的BLPop指令,当有数据时会立即回来,没有数据则会一直堵塞直到有数据过来;这样可以避免守时扫描列表浪费资源。

func (q *DelayQueue) runConsumer(topic string, subscriberList []IEventSubscriber) error {
	for {
		ctx := context.Background()
		kvPair, err := q.redisClient.WithContext(ctx).BLPop(60*time.Second, q.genQueueKey(topic)).Result()
		if err != nil {
			logs.CtxWarn(ctx, "[InitOnce] BLPop", logs.String("err", err.Error()))
			continue
		}
		if len(kvPair) < 2 {
			continue
		}
		eventId := kvPair[1]
		data, err := q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
		if err != nil && err != redis.Nil {
			logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
			if q.persistFn != nil {
				_ = q.persistFn(&EventEntity{
					EventId: util.String2Int64(eventId),
					Topic:   topic,
				})
			}
			continue
		}
		event := &EventEntity{}
		_ = jsoniter.UnmarshalFromString(data, event)
		for _, subscriber := range subscriberList {
			util.Retry(3, 0, func() (success bool) {
				err = subscriber.Handle(ctx, event)
				if err != nil {
					logs.CtxWarn(ctx, "[InitOnce] subscriber.Handle", logs.String("err", err.Error()))
					return false
				}
				return true
			})
		}
		err = q.redisClient.WithContext(ctx).HDel(q.genPoolKey(topic), eventId).Err()
		if err != nil {
			logs.CtxWarn(ctx, "[InitOnce] HDel", logs.String("err", err.Error()))
		}
	}
}
其他

1、高雅封闭

DelayQueue目标中运用wg、isRunning、stop三个变量来完成高雅封闭,详细可参考源码。

type DelayQueue struct {
	namespace   string
	redisClient *redis.Client
	once        sync.Once
	wg          sync.WaitGroup
	isRunning   int32
	stop        chan struct{}
	persistFn   PersistFn
}
// gracefully shudown
func (q *DelayQueue) ShutDown() {
	if !atomic.CompareAndSwapInt32(&q.isRunning, 1, 0) {
		return
	}
	close(q.stop)
	q.wg.Wait()
}

2、消费失败后耐久化使命

可为DelayQueue目标设置耐久化办法persistFn,用来在监听线程消费使命失败时将使命id耐久化以便人工处理。

...
q.redisClient.WithContext(ctx).HGet(q.genPoolKey(topic), eventId).Result()
if err != nil && err != redis.Nil {
	logs.CtxWarn(ctx, "[InitOnce] HGet", logs.String("err", err.Error()))
	if q.persistFn != nil {
		_ = q.persistFn(&EventEntity{
			EventId: util.String2Int64(eventId),
			Topic:   topic,
		})
	}
	continue
}
...

源码地址

redis_delay_queue: github.com/hudingyu/re…