作者: 京东物流 刘丽侠 姚再毅 康睿 刘斌 李振
一、Redis的特性
1.1 Redis为什么快?
- 依据内存操作,操作不需求跟磁盘交互,单次履行很快
- 指令履行是单线程,由所以依据内存操作,单次履行的时刻快于线程切换时刻,一起通讯选用多路复用
- Redis本身便是一个k-v结构,相似于hashMap,所以查询功能接近于O(1)
- 一起redis自己底层数据结构支撑,比方跳表、SDS
- lO多路复用,单个线程中经过记载盯梢每一个sock(I/O流)的状况来管理多个I/O流
1.2 Redis其他特性
- 更丰富的数据类型,虽然都是k、v结构,value能够存储很多的数据类型
- 完善的内存管理机制、确保数据一致性:耐久化机制、过期战略
- 支撑多种编程言语
- 高可用,集群、确保高可用
1.3 Redis高可用
- 很完善的内存管理机制,过期、筛选、耐久化
- 集群模式,主从、岗兵、cluster集群
二、Redis数据类型以及运用场景
Redis的数据类型有String、Hash、Set、List、Zset、bitMap(依据String类型)、 Hyperloglog(依据String类型)、Geo(地理方位)、Streams流。
2.1 String
2.1.1 根本指令
// 批量设置
> mset key1 value1 key2 value2
// 批量获取
> mget key1 key2
// 获取长度
> strlen key
// 字符串追加内容
> append key xxx
// 获取指定区间的字符
> getrange key 0 5
// 整数值递加 (递加指定的值)
> incr intkey (incrby intkey 10)
// 当key 存在时将掩盖
> SETEX key seconds value
// 将 key 的值设为 value ,当且仅当 key 不存在。
> SETNX key value
2.1.2 运用场景
- 缓存相关场景 缓存、 token(跟过期属性完美符合)
- 线程安全的计数场景 (软限流、分布式ID等)
2.2 Hash
2.2.1 根本指令
// 将哈希表 key 中的域 field 的值设为 value 。
> HSET key field value
// 回来哈希表 key 中给定域 field 的值。
> HGET key field
// 回来哈希表 key 中的一切域。
> HKEYS key
// 回来哈希表 key 中一切域的值。
> HVALS key
// 为哈希表 key 中的域 field 的值加上增量 increment 。
> HINCRBY key field increment
// 查看哈希表 key 中,给定域 field 是否存在。
> HEXISTS key field
2.2.2 运用场景
- 存储目标类的数据(官网说的)比方一个目标下有多个字段
- 统计类的数据,能够对单个统计数据进行独自操作
2.3 List
存储有序的字符串列表,元素能够重复。列表的最大长度为 2^32 – 1 个元素(4294967295,每个列表超越 40 亿个元素)。
2.3.1 根本指令
// 将一个或多个值 value 刺进到列表 key 的表头
> LPUSH key value [value ...]
// 将一个或多个值 value 刺进到列表 key 的表尾(最右边)。
> RPUSH key value [value ...]
// 移除并回来列表 key 的头元素。
> LPOP key
// 移除并回来列表 key 的尾元素。
> RPOP key
// BLPOP 是列表的堵塞式(blocking)弹出原语。
> BLPOP key [key ...] timeout
// BRPOP 是列表的堵塞式(blocking)弹出原语。
> BRPOP key [key ...] timeout
// 获取点拨方位元素
> LINDEX key index
2.3.2 运用场景
- 用户消息时刻线
由于list是有序的,所以咱们能够先进先出,先进后出的列表都能够做
2.4 Set
2.4.1 根本指令
// 将一个或多个 member 元素加入到调集 key 当中,现已存在于调集的 member 元素将被疏忽。
> SADD key member [member ...]
// 回来调集 key 中的一切成员。
> SMEMBERS key
// 回来调集 key 的基数(调集中元素的数量)。
> SCARD key
// 假设指令履行时,只供给了 key 参数,那么回来调集中的一个随机元素。
> SRANDMEMBER key [count]
// 移除并回来调集中的一个随机元素。
> SPOP key
// 移除调集 key 中的一个或多个 member 元素,不存在的 member 元素会被疏忽。
> SREM key member [member ...]
// 判别 member 元素是否调集 key 的成员。
> SISMEMBER key member
// 获取前一个调集有而后边1个调集没有的
> sdiff huihuiset huihuiset1
// 获取交集
> sinter huihuiset huihuiset1
// 获取并集
> sunion huihuiset huihuiset1
2.4.2 运用场景
- 抽奖 spop跟srandmember随机弹出或许获取元素
- 点赞、签到等,sadd调集存储
- 交集并集 关注等场景
2.5 ZSet(SortedSet)
sorted set,有序的set,每个元素有个score。
score相一起,依照key的ASCII码排序。
2.5.1 根本指令
//将一个或多个 member 元素及其 score 值加入到有序集 key 当中。
> ZADD key score member [[score member] [score member] ...]
// 回来有序集 key 中,指定区间内的成员。其间成员的方位按 score 值递加(从小到大)来排序
> ZRANGE key start stop [WITHSCORES]
// 回来有序集 key 中,指定区间内的成员。其间成员的方位按 score 值递减(从大到小)来排列。
> ZREVRANGE key start stop [WITHSCORES]
// 回来有序集 key 中,一切 score 值介于 min 和 max 之间(包含等于 min 或 max )的成员。有序集成员按 score 值递加(从小到大)次序排列。
> ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
// 移除有序集 key 中的一个或多个成员,不存在的成员将被疏忽。
> ZREM key member [member ...]
// 回来有序集 key 的基数。
> ZCARD key
// 为有序集 key 的成员 member 的 score 值加上增量 increment 。
> ZINCRBY key increment member
// 回来有序集 key 中, score 值在 min 和 max 之间(默许包含 score 值等于 min 或 max )的成员的数量。
> ZCOUNT key min max
// 回来有序集 key 中成员 member 的排名。其间有序集成员按 score 值递加(从小到大)次序排列。
> ZRANK key member
2.5.2 运用场景
- 排行榜
三、Redis的业务
3.1 业务根本操作
// 1. multi指令敞开业务,exec指令提交业务
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) OK
// 2. 组队的过程中能够经过discard来抛弃组队。
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> set k4 v4
QUEUED
127.0.0.1:6379(TX)> discard
OK
3.2 Redis业务特性
- 独自的阻隔操作
业务中的一切指令都会序列化、按次序地履行。
业务在履行过程中,不会被其他客户端发送来的指令恳求所打断。
- 没有阻隔级别的概念
队列中的指令没有提交之前,都不会被实际地履行,由于业务提交之前任何指令都不会被实际履行,也就不存在“业务内的查询要看到业务里的更新,在业务外查询不能看到”这个让人万分头疼的问题。
- 不确保原子性
Redis同一个业务中假设有一条指令履行失利,其后的指令仍然会被履行,没有回滚。
四、Redis 分布式锁
4.1 Lua 完结分布式锁
local lockSet = redis.call('exists', KEYS[1])
if lockSet == 0
then
redis.call('set', KEYS[1], ARG[1])
redis.call('expire', KEYS[1], ARG[2])
end
return lockSet
五、Redis总体数据结构
5.1 Redis dict字典
5.1.1 RedisDb数据接口(server.h文件)
数据最外层的结构为RedisDb
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */ //数据库的键
dict *expires; /* Timeout of keys with a timeout set */ //设置了超时时刻的键
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ //客户端等候的keys
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */ //所在数 据库ID
long long avg_ttl; /* Average TTL, just for tats */ //平均TTL,仅用于统计
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
5.1.2 dict数据结构(dict.h文件)
咱们发现数据存储在dict中
typedef struct dict {
dictType *type; //理解为面向目标思维,为支撑不同的数据类型对应dictType抽象办法,不同的数据类型能够不同完结
void *privdata; //也可不同的数据类型相关,不同类型特定函数的可选参数。
dictht ht[2]; //2个hash表,用来数据存储 2个dictht
long rehashidx; /* rehashing not in progress if rehashidx == -1 */ // rehash符号 -1代表不再rehash
unsigned long iterators; /* number of iterators currently running */
} dict;
5.1.3 dictht结构(dict.h文件)
typedef struct dictht {
dictEntry **table; //dictEntry 数组
unsigned long size; //数组巨细 默许为4 #define DICT_HT_INITIAL_SIZE 4
unsigned long sizemask; //size-1 用来取模得到数据的下标值
unsigned long used; //改hash表中已有的节点数据
} dictht;
5.1.4 dictEntry节点结构(dict.h文件)
typedef struct dictEntry {
void *key; //key
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v; //value
struct dictEntry *next; //指向下一个节点
} dictEntry;
5.1.5 RedisObject(service.h文件)
/* Objects encoding. Some kind of objects like Strings and
Hashes can be
* internally represented in multiple ways. The 'encoding'
field of the object
* is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0 /* Raw representation */
#define OBJ_ENCODING_INT 1 /* Encoded as integer */
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old
list encoding. */
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string
encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list
of ziplists */
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree
of listpacks */
#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of
obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution
in ms */
#define OBJ_SHARED_REFCOUNT INT_MAX /* Global object
never destroyed. */
#define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object
allocated in the stack. */
#define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT
typedef struct redisObject {
unsigned type:4; //数据类型 string hash list
unsigned encoding:4; //底层的数据结构 跳表
unsigned lru:LRU_BITS; /* LRU time (relative to global
lru_clock) or
* LFU data (least significant
8 bits frequency
* and most significant 16 bits
access time). */
int refcount; //用于回收,引用计数法
void *ptr; //指向详细的数据结构的内存地址 比方 跳表、紧缩列表
} robj;
5.2 Redis数据结构图
5.3 Redis扩容机制
由于咱们的dictEntry数组默许巨细是4,假设不进行扩容,那么咱们一切的数据都会以链表的方式增加至数组下标 跟着数据量越来越大,之前只需求hash取模就能得到下标方位,现在得去循环我下标的链表,所以功能会越来越慢,当咱们的数据量抵达必定程度后,咱们就得去触发扩容操作。
5.3.1 什么时分扩容
5.3.1.1 扩容机制
- 当没有fork子进程在进行RDB或AOF耐久化时,ht[0]的used大于size时,触发扩容
- 假设有子进程在进行RDB或许AOF时,ht[0]的used大于等于size的5倍的时分,会触发扩容
5.3.1.2 源码验证
扩容,必定是在增加数据的时分才会扩容,所以咱们找一个增加数据的进口。
- 进口,增加或替换dictReplace (dict.c文件)
int dictReplace(dict *d, void *key, void *val) {
dictEntry *entry, *existing, auxentry;
/* Try to add the element. If the key
* does not exists dictAdd will succeed. */
entry = dictAddRaw(d,key,&existing);
if (entry) {
dictSetVal(d, entry, val);
return 1;
}
/* Set the new value and free the old one. Note that
it is important
* to do that in this order, as the value may just be
exactly the same
* as the previous one. In this context, think to
reference counting,
* you want to increment (set), and then decrement
(free), and not the
* reverse. */
auxentry = *existing;
dictSetVal(d, existing, val);
dictFreeVal(d, &auxentry);
return 0;
}
- 进入dictAddRaw办法 (dict.c文件)
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing){
long index;
dictEntry *entry;
dictht *ht;
if (dictIsRehashing(d)) _dictRehashStep(d); //假设正在Rehash 进行渐进式hash 按步rehash
/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key),existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that
in a database
* system it is more likely that recently added
entries are accessed
* more frequently. */
//假设在hash 放在ht[1] 否则放在ht[0]
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
//选用头插法刺进hash桶
entry->next = ht->table[index];
ht->table[index] = entry;
//已运用++
ht->used++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}
- 得到数据下标的办法 _dictKeyIndex (dict.c文件)
static long _dictKeyIndex(dict *d, const void *key,uint64_t hash, dictEntry **existing){
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
/* Expand the hash table if needed */
//扩容假设需求
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
//比较是否存在这个值
for (table = 0; table <= 1; table++) {
idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain
the given key */
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key,he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
if (!dictIsRehashing(d)) break;
}
return idx;
}
- _dictExpandIfNeeded 验证是否需求扩容 (dict.c文件)
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d){
/* Incremental rehashing already in progress. Return.*/
if (dictIsRehashing(d)) return DICT_OK; //正在rehash,回来
/* If the hash table is empty expand it to the initialsize. */
//假设ht[0]的长度为0,设置默许长度4 dictExpand为扩容的要害办法
if (d->ht[0].size == 0)
return dictExpand(d,DICT_HT_INITIAL_SIZE);
//扩容条件 hash表中已运用的数量大于等于 hash表的巨细
//而且dict_can_resize为1 或许 已运用的巨细大于hash表巨细的 5倍,大于等于1倍的时分,下面2个满意一个条件即可
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize || d->ht[0].used/d->ht[0].size >dict_force_resize_ratio)){
//扩容成原本的2倍
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}
5.3.2 怎样扩容
5.3.2.1 扩容过程
- 当满意我扩容条件,触发扩容时,判别是否在扩容,假设在扩容,或许 扩容的巨细跟我现在的ht[0].size相同,这次扩容不做
- new一个新的dictht,巨细为ht[0].used * 2(可是必须向上2的幂,比 如6 ,那么巨细为8) ,而且ht[1]=新创建的dictht
- 咱们有个更大的table了,可是需求把数据搬迁到ht[1].table ,所以将 dict的rehashidx(数据搬迁的偏移量)赋值为0 ,代表能够进行数据迁 移了,也便是能够rehash了
- 等候数据搬迁完结,数据不会立刻搬迁,而是选用渐进式rehash,渐渐的把数据搬迁到ht[1]
- 当数据搬迁完结,ht[0].table=ht[1] ,ht[1] .table = NULL、ht[1] .size = 0、ht[1] .sizemask = 0、 ht[1] .used = 0
- 把dict的rehashidex=-1
5.3.2.2 源码验证
- dictExpand办法在_dictExpandIfNeeded 办法中调用 (dict.c文件),为扩容的要害办法
int dictExpand(dict *d, unsigned long size){
/* the size is invalid if it is smaller than the
number of
* elements already inside the hash table */
//正在扩容,不允许第二次扩容,或许ht[0]的数据量大于扩容后的数量,没有意义,这次不扩容
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
dictht n; /* the new hash table */
//得到最接近2的幂 跟hashMap思维相同
unsigned long realsize = _dictNextPower(size);
/* Rehashing to the same table size is not useful. */
//假设跟ht[0]的巨细一致 直接回来错误
if (realsize == d->ht[0].size) return DICT_ERR;
/* Allocate the new hash table and initialize all
pointers to NULL */
//为新的dictht赋值
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* Is this the first initialization? If so it's not
really a rehashing
* we just set the first hash table so that it can
accept keys. */
//ht[0].table为空,代表是初始化
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
//将扩容后的dictht赋值给ht[1]
d->ht[1] = n;
//符号为0,代表能够开始rehash
d->rehashidx = 0;
return DICT_OK;
}
5.3.3 数据怎样搬迁(渐进式hash)
5.3.3.1 搬迁方式
假设一次性把数据悉数搬迁会很耗时刻,会让单条指令等候好久,会构成堵塞。
所以,redis选用渐进式Rehash,所谓渐进式,便是渐渐的,不会一次性把一切数据搬迁。
那什么时分会进行渐进式数据搬迁?
- 每次进行key的crud操作都会进行一个hash桶的数据搬迁
- 守时使命,进行部分数据搬迁
5.3.3.2 源码验证
CRUD触发都会触发_dictRehashStep(渐进式hash)
- 每次增修正的时分都会调用_dictRehashStep办法
if (dictIsRehashing(d)) _dictRehashStep(d); //这个代码在增修正查的办法中都会调用
- _dictRehashStep办法
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}
- dictRehash办法
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty
buckets to visit. */ //要拜访的最大的空桶数 避免此次耗时过长
if (!dictIsRehashing(d)) return 0; //假设没有在rehash回来0
//循环,最多1次,而且ht[0]的数据没有搬迁完 进入循环
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are
sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
//rehashidx rehash的索引,默许0 假设hash桶为空 进入自旋 而且判别是否到了最大的拜访空桶数
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx]; //得到ht[0]的下标为rehashidx桶
/* Move all the keys in this bucket from the old
to the new hash HT */
while(de) { //循环hash桶的链表 而且搬运到ht[1]
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d-
>ht[1].sizemask;
de->next = d->ht[1].table[h]; //头插
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
//搬运完后 将ht[0]相对的hash桶设置为null
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
//ht[0].used=0 代表rehash悉数完结
if (d->ht[0].used == 0) {
//清空ht[0]table
zfree(d->ht[0].table);
//将ht[0] 从头指向现已完结rehash的ht[1]
d->ht[0] = d->ht[1];
//将ht[1]一切数据从头设置
_dictReset(&d->ht[1]);
//设置-1,代表rehash完结
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
//将ht[1]的属性复位
static void _dictReset(dictht *ht)
{
ht->table = NULL;
ht->size = 0;
ht->sizemask = 0;
ht->used = 0;
}
守时使命触发
再讲守时使命之前,咱们要知道Redis中有个时刻工作驱动,在 server.c文件下有个serverCron 办法。
serverCron 每隔多久会履行一次,履行频率依据redis.conf中的hz装备,serverCorn中有个办法databasesCron()
- 守时办法serverCron
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//.......
/* We need to do a few operations on clients
asynchronously. */
clientsCron();
/* Handle background operations on Redis databases. */
databasesCron(); //处理Redis数据库上的后台操作。
//.......
}
- databasesCron办法
void databasesCron(void) {
/* Expire keys by random sampling. Not required for
slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (iAmMaster()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
/* Defrag keys gradually. */
activeDefragCycle();
/* Perform hash tables rehashing if needed, but only
if there are no
* other processes saving the DB on disk. Otherwise
rehashing is bad
* as will cause a lot of copy-on-write of memory
pages. */
if (!hasActiveChildProcess()) {
/* We use global counters so if we stop the
computation at a given
* DB we'll be able to start from the successive
in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;
/* Don't test more DBs than we have. */
if (dbs_per_call > server.dbnum)
dbs_per_call = server.dbnum;
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
/* Rehash */ //Rehash逻辑
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
//进入incrementallyRehash
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop
here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash,
we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}
- 进入incrementallyRehash办法(server.c)
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1;
/* already used our millisecond for this
loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this
loop... */
}
return 0;
}
- 进入dictRehashMilliseconds(dict.c)
int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;
//进入rehash办法 dictRehash 去完结渐进时HASH
while(dictRehash(d,100)) {
rehashes += 100;
//判别是否超时
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
5.4 String类型数据结构
Redis中String的底层没有用c的char来完结,而是选用了SDS(simple Dynamic String)的数据结构来完结。而且供给了5种不同的类型
5.4.1 SDS数据结构界说(sds.h文件)
typedef char *sds;
/* Note: sdshdr5 is never used, we just access the flags
byte directly.
* However is here to document the layout of type 5 SDS
strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of
string length */
char buf[];
}
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* used */ //已运用的长度
uint8_t alloc; /* excluding the header and null terminator */ //分配的总容量 不包含头和空终止符
unsigned char flags; /* 3 lsb of type, 5 unused bits */ //低三位类型 高5bit未运用
char buf[]; //数据buf 存储字符串数组
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len; /* used */
uint16_t alloc; /* excluding the header and null
terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits
*/
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null
terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits
*/
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null
terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits
*/
char buf[];
};
5.4.2 SDS数据结构的优势
- char字符串不记载本身长度,所以获取一个字符串长度的复杂度是O(n),可是SDS记载分配的长度alloc,已运用的长度len,获取长度为 O(1)
- 削减修正字符串带来的内存重分配次数
- 空间预分配,SDS长度假设小于1MB,预分配跟长度相同的,大于1M,每次跟len的巨细多1M
- 惰性空间开释,截取的时分不立刻开释空间,供下次运用!一起供给相应的开释SDS未运用空间的API
- 二进制安全
5.5 Hash类型数据结构
Redis的字典相当于Java言语中的HashMap,他是无序字典。内部结构上同样是数组 + 链表二维结构。榜首维hash的数组方位磕碰时,就会将磕碰的元素运用链表串起来。
5.5.1 Hash数据结构图
5.5.2 Hash的紧缩列表
紧缩列表会依据存入的数据的不同类型以及不同巨细,分配不同巨细的空间。所以是为了节约内存而选用的。
由所以一块完好的内存空间,当里边的元素发生变更时,会发生连锁更新,严重影响咱们的拜访功能。所以,只适用于数据量比较小的场景。
所以,Redis会有相关装备,Hashes只需小数据量的时分才会用到ziplist,当hash目标一起满意以下两个条件的时分,运用ziplist编码:
- 哈希目标保存的键值对数量<512个
- 一切的键值对的键和值的字符串长度都<64byte(一个英文字母一个字节)
hash-max-ziplist-value 64 // ziplist中最大能寄存的值长度
hash-max-ziplist-entries 512 // ziplist中最多能寄存的entry节点数量
5.6 List类型数据结构
5.6.1 quickList快速列表
兼顾了ziplist的节约内存,而且必定程度上处理了连锁更新的问题,咱们的 quicklistNode节点里边是个ziplist,每个节点是分开的。那么就算发生了连锁更新,也只会发生在一个quicklistNode节点。
quicklist.h文件
typedef struct{
struct quicklistNode *prev; //前指针
struct quicklistNode *next; //后指针
unsigned char *zl; //数据指针 指向ziplist结果
unsigned int sz; //ziplist巨细 /* ziplist
size in bytes */
unsigned int count : 16; /* count of items in ziplist */ //ziplist的元素
unsigned int encoding : 2; /* RAW==1 or LZF==2 */ //
是否紧缩, 1没有紧缩 2 lzf紧缩
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ //预留容器字段
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */ //预留字段
} quicklistNode;
5.6.2 list数据结构图
5.7 Set类型数据结构
Redis用intset或hashtable存储set。满意下面条件,就用inset存储。
- 假设不是整数类型,就用dictht hash表(数组+链表)
- 假设元素个数超越512个,也会用hashtable存储。跟一个装备有关:
set-max-intset-entries 512
不满意上述条件的,都运用hash表存储,value存null。
5.8 ZSet数据结构
5.8.1 ZipList
默许运用ziplist编码。
在ziplist的内部,依照score排序递加来存储。刺进的时分要移动之后的数据。
假设元素数量大于等于128个,或许任一member长度大于等于64字节运用 skiplist+dict存储。
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
假设不满意条件,选用跳表。
5.8.2 跳表(skiplist)
结构界说(servier.h)
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
sds ele; //sds数据
double score; //score
struct zskiplistNode *backward; //撤退指针
//层级数组
struct zskiplistLevel {
struct zskiplistNode *forward; //前进指针
unsigned long span; //跨度
} level[];
} zskiplistNode;
//跳表列表
typedef struct zskiplist {
struct zskiplistNode *header, *tail; //头尾节点
unsigned long length; //节点数量
int level; //最大的节点层级
} zskiplist;
zslInsert 增加节点办法 (t_zset.c)
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds
ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert
position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
咕泡出品 必属精品精品属精品 咕泡出品 必属精品 咕泡出品 必属精品 咕泡咕泡
(x->level[i].forward->score == score
&&
sdscmp(x->level[i].forward->ele,ele) <
0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since
we allow duplicated
* scores, reinserting the same element should never
happen since the
* caller of zslInsert() should test in the hash table
if the element is
* already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
//不同层级建立链表联络
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is
inserted here */
x->level[i].span = update[i]->level[i].span -
(rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) +
1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL :
update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
ZSKIPLIST_MAXLEVEL默许32 界说在server.h
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64
elements */
六、Redis过期战略
6.1 惰性过期
所谓惰性过期,便是在每次拜访操作key的时分,判别这个key是不是过期了,过期了就删去。
6.1.1 源码验证
- expireIfNeeded办法(db.c文件)
int expireIfNeeded(redisDb *db, robj *key) {
if (!keyIsExpired(db,key)) return 0;
/* If we are running in the context of a slave,
instead of
* evicting the expired key from the database, we
return ASAP:
* the slave key expiration is controlled by the
master that will
* send us synthesized DEL operations for expired
keys.
*
* Still we try to return the right information to the
caller,
* that is, 0 if we think the key should be still
valid, 1 if
* we think the key is expired at this time. */
// 假设装备有masterhost,说明是从节点,那么不操作删去
if (server.masterhost != NULL) return 1;
/* Delete the key */
server.stat_expiredkeys++;
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
//是否是异步删去 避免单个Key的数据量很大 堵塞主线程 是4.0之后增加的新功能,默许封闭
int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key);
if (retval) signalModifiedKey(NULL,db,key);
return retval;
}
每次调用到相关指令时,才会履行expireIfNeeded判别是否过期,平时不会去判别是否过期。
该战略能够最大化的节约CPU资源。
可是却对内存十分不友爱。由于假设没有再次拜访,就或许一向堆积再内存中。占用内存
所以就需求另一种战略来合作运用,便是定期过期
6.2 定期过期
那么多久去清除一次,咱们在讲Rehash的时分,有个办法是serverCron,履行频率依据redis.conf中的hz装备。
这办法除了做Rehash以外,还会做很多其他的工作,比方:
- 整理数据库中的过期键值对
- 更新服务器的各类统计信息,比方时刻、内存占用、数据库占用情况等
- 封闭和整理连接失效的客户端
- 尝试进行耐久化操作
6.2.1 完结流程
- 守时serverCron办法去履行整理,履行频率依据redis.cong中的hz装备的值(默许是10,也便是1s履行10次,100ms履行一次)
- 履行整理的时分,不是去扫描一切的key,而是去扫描一切设置了过期时刻的key redisDB.expires
- 假设每次去把一切的key都拿出来,那么假设过期的key很多,就会很慢,所以也不是一次性拿出一切的key
- 依据hash桶的维度去扫描key,扫到20(可装备)个key为止。假设榜首个桶是15个key没有满意20,持续扫描第二个桶,第二个桶20个key,由所以以hash桶的维度扫描的,所以第二个扫到了就会全扫,一共扫描35个key
- 找到扫描的key里边过期的key,并进行删去
- 假设取了400个空桶(最多拿400个桶),或许扫描的删去比例跟扫描的总数超越10%,持续履行4、5步
- 也不能无限的循环,循环16次后回去检测时刻,超越指守时刻会跳出
6.2.2 源码验证
- 进口serverCrom
// serverCron办法调用databasesCron办法(server.c)
/* Handle background operations on Redis databases. */
databasesCron();
void databasesCron(void) {
/* Expire keys by random sampling. Not required for
slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (iAmMaster()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
//过期删去办法
} else {
expireSlaveKeys();
}
}
}
七、Redis筛选战略
由于Redis内存是有巨细的,而且我或许里边的数据都没有过期,当快满的时分,我又没有过期的数据进行筛选,那么这个时分内存也会满。内存满了,Redis也会不能放入新的数据。所以,咱们不得已需求一些战略来处理这个问题来确保可用性。
7.1 筛选战略
7.1.1 noeviction
New values aren’t saved when memory limit is reached. When a database uses replication, this applies to the primary database.
默许,不筛选 能读不能写
7.1.2 allkeys-lru
Keeps most recently used keys; removes least recently used (LRU) keys
依据伪LRU算法,在一切的key中去筛选
7.1.3 allkeys-lfu
Keeps frequently used keys; removes least frequently used (LFU) keys.
依据伪LFU算法,在一切的key中去筛选
7.1.4 volatile-lru
Removes least recently used keys with the expire field set to true.
依据伪LRU算法,在设置了过期时刻的key中去筛选
7.1.5 volatile-lfu
Removes least frequently used keys with the expire field set to true.
依据伪LFU算法 在设置了过期时刻的key中去筛选
7.1.6 allkeys-random
Randomly removes keys to make space for the new data added.
依据随机算法,在一切的key中去筛选
7.1.7 volatile-random
Randomly removes keys with expire field set to true.
依据随机算法 在设置了过期时刻的key中去筛选
7.1.8 volatile-ttl
Removes least frequently used keys with expire field set to true and the shortest remaining time-to-live (TTL) value.
依据过期时刻来,筛选即将过期的
7.2 筛选流程
- 首要,咱们会有个筛选池,默许巨细是16,而且里边的数据是结尾筛选制
- 每次指令操作的时分,自旋会判别当时内存是否满意指令所需求的内存
- 假设当时不满意,会从筛选池的尾部拿一个最合适筛选的数据
- 会取样(装备 maxmemory-samples),从Redis中获取随机获取到取样的数据,处理一次性读取一切的数据慢的问题
- 在取样的数据中,依据筛选算法找到最合适筛选的数据
- 将最合适的那个数据跟筛选池中的数据进行比较,是否比筛选池的数据更合适筛选,假设更合适,放入筛选池
- 依照合适的程度进行排序,最合适筛选的放入尾部
- 将需求筛选的数据从Redis删去,而且从筛选池移除
7.3 LRU算法
Lru,Least Recently Used 翻译过来是最久未运用,依据时刻轴来走,仍好久没用的数据。只需最近有用过,我就默许是有用的。
那么它的一个衡量标准是啥?时刻!依据运用时刻,从近到远,越远的越简单筛选。
7.3.1 完结原理
需求:得到目标隔多久没拜访,隔的时刻越久,越简单被筛选
- 首要,LRU是依据这个目标的拜访操作时刻来进行筛选的,那么咱们需求知道这个目标最终操作的拜访时刻
- 知道了目标的最终操作拜访时刻后,咱们只需求跟当时系统时刻来进行比照,就能算出目标多久没拜访了
7.3.2 源码验证
redis中,目标都会被redisObject目标包装,其间有个字段叫lru。
redisObject目标 (server.h文件)
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global
lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
看LRU的字段的说明,那么咱们大概能猜出来,redis去完结lru筛选算法必定跟咱们这个目标的lru相关
而且这个字段的巨细为24bit,那么这个字段记载的是目标操作拜访时分的秒单位时刻的后24bit
相当于:
long timeMillis=System.currentTimeMillis();
System.out.println(timeMillis/1000); //获取当时秒
System.out.println(timeMillis/1000 & ((1<<24)-1)); //获取秒的后24位
咱们知道了这个目标的最终操作拜访的时刻。假设咱们要得到这个目标多久没拜访了,咱们是不是就很简单,用我当时的时刻-这个目标的拜访时刻就能够了!可是这个拜访时刻是秒单位时刻的后24bit 所以,也是用当时的时刻的秒单位的后24bit去减。
estimateObjectIdleTime办法(evict.c)
unsigned long long estimateObjectIdleTime(robj *o) {
//获取秒单位时刻的最终24位
unsigned long long lruclock = LRU_CLOCK();
//由于只需24位,一切最大的值为2的24次方-1
//超越最大值从0开始,所以需求判别lruclock(当时系统时刻)跟缓存目标的lru字段的巨细
if (lruclock >= o->lru) {
//假设lruclock>=robj.lru,回来lruclock-o->lru,再转换单位
return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
} else {
//否则选用lruclock + (LRU_CLOCK_MAX - o->lru),得到对
象的值越小,回来的值越大,越大越简单被筛选
return (lruclock + (LRU_CLOCK_MAX - o->lru)) *
LRU_CLOCK_RESOLUTION;
}
}
7.3.3 LRU总结
用lruclock与 redisObject.lru进行比较,由于lruclock只获取了当时秒单位时刻的后24位,所以必定有个轮询。
所以,咱们会用lruclock跟 redisObject.lru进行比较,假设lruclock>redisObject.lru,那么咱们用lruclock- redisObject.lru,否则lruclock+(24bit的最大值-redisObject.lru),得到lru越小,那么回来的数据越大,相差越大的越优先会被筛选!
www.processon.com/view/link/5…
7.4 LFU算法
LFU,英文 Least Frequently Used,翻译成中文便是最不常用的优先筛选。
不常用,它的衡量标准便是次数,次数越少的越简单被筛选。
这个完结起来应该也很简单,目标被操作拜访的时分,去记载次数,每次操作拜访一次,就+1;筛选的时分,直接去比较这个次数,次数越少的越简单筛选!
7.4.1 LFU的时效性问题
何为时效性问题?便是只会去考虑数量,可是不会去考虑时刻。
ps:上一年的某个新闻很火,点击量3000W,本年有个新闻刚出来点击量100次,原本咱们是想保留本年的新的新闻的,可是假设依据LFU来做的话,咱们发现筛选的是本年的新闻,明显是不合理的
导致的问题:新的数据进不去,旧的数据出不来。
那么怎么处理呢,且往下看
7.4.2 源码分析
咱们还是来看redisObject(server.h)
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global
lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
咱们看它的lru,它假设在LFU算法的时分!它前面16位代表的是时刻,后8位代表的是一个数值,frequency是频率,应该便是代表这个目标的拜访次数,咱们先给它叫做counter。
前16bits时刻有啥用?
跟时刻相关,我猜测应该是跟处理时效性相关的,那么怎样处理的?从生活中找例子
大家应该充过一些会员,比方我这个年岁的,小时分喜欢充腾讯的黄钻、绿钻、蓝钻等等。可是有一个点,假设哪天我没充钱了的话,或许没有续VIP的时分,我这个钻石等级会跟着时刻的丢掉而降低。比方我原本是黄钻V6,可是一年不充钱的话,或许就变成了V4。
那么回到Redis,大胆的去猜测,那么这个时刻是不是去记载这个目标有多久没拜访了,假设多久没拜访,我就去削减对应的次数。
LFUDecrAndReturn办法(evict.c)
unsigned long LFUDecrAndReturn(robj *o) {
//lru字段右移8位,得到前面16位的时刻
unsigned long ldt = o->lru >> 8;
//lru字段与255进行&运算(255代表8位的最大值),
//得到8位counter值
unsigned long counter = o->lru & 255;
//假设装备了lfu_decay_time,用LFUTimeElapsed(ldt) 除以装备的值
//LFUTimeElapsed(ldt)源码见下
//总的没拜访的分钟时刻/装备值,得到每分钟没拜访衰减多少
unsigned long num_periods = server.lfu_decay_time ?LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
if (num_periods)
//不能削减为负数,非负数用couter值减去衰减值
counter = (num_periods > counter) ? 0 : counter - num_periods;
return counter;
}
衰减因子由装备
lfu-decay-time 1 //多少分钟没操作拜访就去衰减一次
后8bits的次数,最大值是255,够不够?
必定不够,一个目标的拜访操作次数必定不止255次。
可是咱们能够让数据抵达255的很难。那么怎样做的?这个比较简单,咱们先看源码,然后再总结
LFULogIncr办法 (evict.c 文件)
uint8_t LFULogIncr(uint8_t counter) {
//假设现已到最大值255,回来255 ,8位的最大值
if (counter == 255) return 255;
//得到随机数(0-1)
double r = (double)rand()/RAND_MAX;
//LFU_INIT_VAL表明基数值(在server.h装备)
double baseval = counter - LFU_INIT_VAL;
//假设达不到基数值,表明快不行了,baseval =0
if (baseval < 0) baseval = 0;
//假设快不行了,必定给他加counter
//不然,依照几率是否加counter,一起跟baseval与lfu_log_factor相关
//都是在分子,所以2个值越大,加counter几率越小
double p = 1.0/(baseval*server.lfu_log_factor+1);
if (r < p) counter++;
return counter;
}
八、Redis 耐久化
8.1 RBD
8.1.1 何时触发RBD
主动触发
- 装备触发
save 900 1 900s查看一次,至少有1个key被修正就触发
save 300 10 300s查看一次,至少有10个key被修正就触发
save 60 10000 60s查看一次,至少有10000个key被修正
- shutdown正常封闭
任何组件在正常封闭的时分,都会去完结应该完结的事。比方Mysql 中的Redolog耐久化,正常封闭的时分也会去耐久化。
- flushall指令触发
数据清空指令会触发RDB操作,而且是触发一个空的RDB文件,所以, 假设在没有敞开其他的耐久化的时分,flushall是能够删库跑路的,在生产环境慎用。
8.1.2 RDB的优劣
优势
- 是个十分紧凑型的文件,十分合适备份与灾祸恢复
- 最大极限的提高了功能,会fork一个子进程,父进程永远不会产于磁盘 IO或许相似操作
- 更快的重启
缺乏
- 数据安全性不是很高,由所以依据装备的时刻来备份,假设每5分钟备份 一次,也会有5分钟数据的丢掉
- 经常fork子进程,所以比较耗CPU,对CPU不是很友爱
8.2 AOF
由于RDB的数据可靠性十分低,所以Redis又供给了别的一种耐久化方案: Append Only File 简称:AOF
AOF默许是封闭的,你能够在装备文件中进行敞开:
appendonly no //默许封闭,能够进行敞开
# The name of the append only file (default:"appendonly.aof")
appendfilename "appendonly.aof" //AOF文件名
追加文件,即每次更改的指令都会附加到我的AOF文件中。
8.2.1 同步机制
AOF会记载每个写操作,那么问题来了。我难道每次操作指令都要和磁盘交互?
当然不行,所以redis支撑几种战略,由用户来决议要不要每次都和磁盘交互
# appendfsync always 表明每次写入都履行fsync(改写)函数 功能会十分十分慢 可是十分安全
appendfsync everysec 每秒履行一次fsync函数 或许丢掉1s的数据
# appendfsync no 由操作系统确保数据同步到磁盘,速度最快 你的数据只需求交给操作系统就行
默许1s一次,最多有1s丢掉
8.2.2 重写机制
由于AOF是追加的方式,所以文件会越来越大,越大的话,数据加载越慢。 所以咱们需求对AOF文件进行重写。
何为重写
比方 咱们的incr指令,假设咱们incr了100次,现在数据是100,可是咱们的aof文件中会有100条incr指令,可是咱们发现这个100条指令用处不大, 假设咱们能把最新的内存里的数据保存下来的话。所以,重写便是做了这么一件工作,把当时内存的数据重写下来,然后把之前的追加的文件删去。
重写流程
- 在Redis7之前
- Redis fork一个子进程,在一个暂时文件中写入新的AOF(当时内存的数据生成的新的AOF)
- 那么在写入新的AOF的时分,主进程还会有指令进入,那么主进程会在内存缓存区中累计新的指令 (可是一起也会写在旧的AOF文件中,就算 重写失利,也不会导致AOF损坏或许数据丢掉)
- 假设子进程重写完结,父进程会收到完结信号,而且把内存缓存中的指令追加到新的AOF文件中
- 替换旧的AOF文件 ,而且将新的指令附加到重写好的AOF文件中
- 在Redis7之后,AOF文件不再是一个,所以会有暂时清单的概念
- 子进程开始在一个暂时文件中写入新的根底AOF
- 父级翻开一个新的增量 AOF 文件以持续写入更新。假设重写失利,旧的根底和增量文件(假设有的话)加上这个新翻开的增量文件就代表了完好的更新数据集,所以咱们是安全的
- 当子进程完结根底文件的重写后,父进程收到一个信号,并运用新翻开 的增量文件和子进程生成的根底文件来构建暂时清单,并将其耐久化
- 现在 Redis 对清单文件进行原子交流,以便此 AOF 重写的结果生效。 Redis 还会整理旧的根底文件和任何未运用的增量文件
可是重写是把当时内存的数据,写入一个新的AOF文件,假设当时数据比较大,然后以指令的方式写入的话,会很慢很慢
所以在4.0之后,在重写的时分是选用RDB的方式去生成新的AOF文件,然 后后续追加的指令,还是以指令追加的方式追加的文件结尾
aof-use-rdb-preamble yes //是否敞开RDB与AOF混合模式
什么时分重写
装备文件redis.conf
# 重写触发机制
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb 就算抵达了榜首个百分比的巨细,也必须大于 64M
在 aof 文件小于64mb的时分不进行重写,当抵达64mb的时分,就重写一
次。重写后的 aof 文件或许是40mb。上面装备了auto-aof-rewritepercentag为100,即 aof 文件到了80mb的时分,进行重写。
8.2.3 AOF的优势与缺乏
优势
- 安全性高,就算是默许的耐久化同步机制,也最多只会丢掉1s数据
- AOF由于某些原因,比方磁盘满了等导致追加失利,也能经过redischeck-aof 工具来修复
- 格局都是追加的日志,所以可读性更高
缺乏
- 数据集一般比RDB大
- 耐久化跟数据加载比RDB更慢
- 在7.0之前,重写的时分,由于重写的时分,新的指令会缓存在内存区,所以会导致很多的内存运用
- 而且重写期间,会跟磁盘进行2次IO,一个是写入老的AOF文件,一个是写入新的AOF文件
九、Redis常见问题总结
9.1 Redis数据丢掉场景
9.1.1 耐久化丢掉
选用RDB或许不耐久化, 会有数据丢掉,由所以手动或许装备以快照的方式来进行备份。
处理:启用AOF,以指令追加的方式进行备份,可是默许也会有1s丢掉, 这是在功能与数据安全性中寻求的一个最合适的方案,假设为了确保数据 一致性,能够将装备更改为always,可是功能很慢,一般不用。
9.1.2 主从切换
由于Redis的数据是主异步同步给从的,提高了功能,可是由所以异步同步到从。所以存在数据丢掉的或许
- master写入数据k1 ,由所以异步同步到slave,当master没有同步给slave的时分,master挂了
- slave会成为新的master,而且没有同步k1
- master重启,会成为新master的slave,同步数据会清空自己的数据,从新的master加载
- k1丢掉
9.2 Redis缓存雪崩、穿透、击穿问题分析
9.2.1 缓存雪崩
缓存雪崩便是Redis的很多热点数据一起过期(失效),由于设置了相同的过期时刻,刚好这个时分Redis恳求的并发量又很大,就会导致一切的恳求落到数据库。
- 确保Redis的高可用,避免由于Redis不可用导致悉数打到DB
- 加互斥锁或许运用队列,针对同一个key只允许一个线程到数据库查询
- 缓存守时预先更新,避免一起失效
- 经过加随机数,使key在不同的时刻过期
9.2.2 缓存穿透
缓存穿透是指缓存和数据库中都没有的数据,可是用户一向恳求不存在的数据!这时的用户很或许便是攻击者,歹意搞你们公司的,攻击会导致数据库压力过大。
处理方案:布隆过滤器
布隆过滤器的思维:已然是由于你Redis跟DB都没有,然后用户歹意一向拜访不存在的key,然后悉数打到Redis跟DB。那么咱们能不能独自把DB的数据独自存到别的一个当地,可是这个当地只存一个简单的符号,符号这个key存不存在,假设存在,就去拜访Redis跟DB,否则直接回来。而且这个内存最好很小。
9.2.3 缓存击穿
单个key过期的时分有很多并发,运用互斥锁,回写redis,而且选用双重查看锁来提高功能!削减对DB的拜访。