Go流量操控 系列文章目录 结合自己在作业中运用nacos+go-sentinel完结动态流量操控
第一篇会介绍一下Nacos的布置,服务端,客户端的原理以及Nacos共同性协议Raft
Nacos介绍
Nacos的适用场景涵盖了许多领域,其间一些包括:
- 数据库衔接信息办理:Nacos能够用于办理数据库衔接信息,使得运用程序能够动态地获取最新的数据库衔接信息,然后完结数据库衔接的动态办理和装备。
- 限流规矩和降级开关:经过Nacos,能够动态地办理限流规矩和降级开关的装备,以应对体系在高负载或异常状况下的流量操控需求,确保体系的稳定性和可靠性。
- 流量的动态调度:Nacos能够用于动态调整流量的分发战略和装备,依据实时的事务需求和体系负载状况,灵敏地调整流量的分配,完结流量的动态调度和负载均衡。
在Nacos 1.X架构中,装备中心的推送功能经过长轮询构建,周期性地由客户端主动发送HTTP恳求并在产生更新时回来改变内容;而服务注册中心的推送则经过UDP推送+HTTP定期对账来完结。
但是,装备中心的长轮训、服务注册中心的定期对账,都需求周期性地关于服务端进行一次主动建连和装备传送,增大服务端的内存开支;跟着Nacos用户的服务数和装备数规划的增大,服务端的内存泄漏危险也大大添加。
为了更好的支撑用户的功能要求,克服HTTP短衔接架构固有的功能瓶颈,Nacos社区进行了一次依据长衔接的重构晋级。长衔接时代的Nacos2.x在本来1.x的架构基础上新增了对gRPC长衔接模型的支撑,一起保存对旧客户端和OpenAPI的兼容。
通讯层目前经过gRPC完结了长衔接RPC调用和推送能力。晋级完结之后,服务改变、装备改变等信息会经过gRPC的双向流主动推送给客户端,而客户端只需求针对各个长衔接主动发送轻量级的心跳即可。晋级后的技术架构极大地减少了服务端处理数据的开支;一起,因为长衔接依据可复用TCP的机制,也大大降低了网络阻塞的危险。
牛逼完了就该布置了
布置单机Nacos
docker 拉取镜像
留意:官方镜像中以slim结尾支撑arm64架构
docker pull nacos/nacos-server:v2.1.2-slim
创立容器
docker run --env PREFER_HOST_MODE=hostname --env MODE=standalone --env NACOS_AUTH_ENABLE=true -p 8848:8848 -p 9848:9848 -p 9849:9849 -d nacos/nacos-server:v2.1.2-slim
注:这儿三个端口一个要对应,不然或许会呈现衔接不上的问题
进入监管界面
[http://ip:8848/nacos](服务器布置进不去记住检查一些防火墙,三个端口都放开)
注:初始账户暗码均为 nacos,只要进入了页面就代表发动成功了
4、修正暗码:为安全起见,主张及时修正暗码
自己玩玩能够单机,出产怎么能单机
集群布置Nacos
布置架构
- 一般来说,在出产环境上,咱们需求完结Nacos高可用
- 方案包括多节点反向代理,多节点Nacos,高可用Mysql
Nacos作为装备中心的集群结构中,是一种无中心化节点的规划,因为没有主从节点,也没有推举机制,所以为了能够完结热备,就需求添加虚拟IP(VIP)。
Nacos的数据存储分为两部分
- Mysql数据库存储,一切Nacos节点共享同一份数据,数据的副本机制由Mysql本身的主从方案来处理,然后确保数据的可靠性。
- 每个节点的本地磁盘,会保存一份全量数据,详细途径:
/data/program/nacos-1/data/config-data/${GROUP}
.
在Nacos的规划中,Mysql是一个中心数据仓库,且认为在Mysql中的数据是绝对正确的。 除此之外,Nacos在发动时会把Mysql中的数据写一份到本地磁盘。
这么规划的好处是能够进步功能,当客户端需求恳求某个装备项时,服务端会想Ian从磁盘中读取对应文件回来,而磁盘的读取功率要比数据库功率高。
当装备产生改变时:
- Nacos会把改变的装备保存到数据库,然后再写入本地文件。
- 接着发送一个HTTP恳求,给到集群中的其他节点,其他节点收到事情后,从Mysql中dump刚刚写入的数据到本地文件中。
别的,NacosServer发动后,会同步发动一个守时使命,每隔6小时,会dump一次全量数据到本地文件
docker-compose
- nacos docker-compose 7848为http端口,nacos装备文件以环境变量nacos-embedded.env装备
- hostname: 修正hostname为主机ip
- 挂载日志到/data/software/nacos/cluster-logs/
version:"3"
services:
nacos1:
hostname:
container_name: nacos
image: nacos
volumes:
- /data/software/nacos/logs/:/home/nacos/logs
- /data/software/nacos/application.properties:/home/nacos/conf/application.properties
ports:
-"7848:7848"
-"8848:8848"
-"9848:9848"
-"9849:9849"
env_file:
- ./nacos-embedded.env
restart: always
application.properties
# spring
server.servlet.contextPath =${SERVER_SERVLET_CONTEXTPATH: / nacos}
server.contextPath = / nacos
server.port =${NACOS_APPLICATION_PORT: 8848}
server.tomcat.accesslog.max - days = 30
server.tomcat.accesslog.pattern =
server.tomcat.accesslog.enabled =${TOMCAT_ACCESSLOG_ENABLED: false}
server.error.include - message = ALWAYS
# default current work dir
server.tomcat.basedir = file:.
# *************** Config Module Related Configurations ***************#
### Deprecated configuration property, it is recommended to use `spring.sql.init.platform` replaced.
# spring.datasource.platform=${SPRING_DATASOURCE_PLATFORM:}
spring.sql.init.platform =${SPRING_DATASOURCE_PLATFORM:}
nacos.cmdb.dumpTaskInterval = 3600
nacos.cmdb.eventTaskInterval = 10
nacos.cmdb.labelTaskInterval = 300
nacos.cmdb.loadDataAtStart = false
db.num =${MYSQL_DATABASE_NUM: 1}
db.url= jdbc:mysql: // ${MYSQL_SERVICE_HOST}:${MYSQL_SERVICE_PORT: 3306} /${MYSQL_SERVICE_DB_NAME}?${MYSQL_SERVICE_DB_PARAM: characterEncoding = utf8 & connectTimeout = 1000 & socketTimeout = 3000 & autoReconnect = true & useSSL = false}
db.user=${MYSQL_SERVICE_USER}
db.password=${MYSQL_SERVICE_PASSWORD}
### The auth system to use, currently only 'nacos' and 'ldap' is supported:
nacos.core.auth.system.type =${NACOS_AUTH_SYSTEM_TYPE: nacos}
### worked when nacos.core.auth.system.type=nacos
### The token expiration in seconds:
nacos.core.auth.plugin.nacos.token.expire.seconds =${NACOS_AUTH_TOKEN_EXPIRE_SECONDS: 18000}
### The default token:
nacos.core.auth.plugin.nacos.token.secret.key =${NACOS_AUTH_TOKEN:}
### Turn on/off caching of auth information. By turning on this switch, the update of auth information would have a 15 seconds delay.
nacos.core.auth.caching.enabled =${NACOS_AUTH_CACHE_ENABLE: false}
nacos.core.auth.enable.userAgentAuthWhite =${NACOS_AUTH_USER_AGENT_AUTH_WHITE_ENABLE: false}
nacos.core.auth.server.identity.key =${NACOS_AUTH_IDENTITY_KEY:}
nacos.core.auth.server.identity.value =${NACOS_AUTH_IDENTITY_VALUE:}
## spring security config
### turn off security
nacos.security.ignore.urls =${}
# metrics for elastic search
management.metrics.export.elastic.enabled = false
management.metrics.export.influx.enabled = false
nacos.naming.distro.taskDispatchThreadCount = 10
nacos.naming.distro.taskDispatchPeriod = 200
nacos.naming.distro.batchSyncKeyCount = 1000
nacos.naming.distro.initDataRatio = 0.9
nacos.naming.distro.syncRetryDelay = 5000
nacos.naming.data.warmup = true
management.endpoints.web.exposure.include = *
nacos-embedded.env
PREFER_HOST_MODE=hostname
EMBEDDED_STORAGE=embedded
NACOS_SERVERS=ip1,ip2,ip3
SPRING_DATASOURCE_PLATFORM=mysql
MYSQL_SERVICE_HOST=xxxxxxxxx
MYSQL_SERVICE_DB_NAME=nacos_config
MYSQL_SERVICE_PORT=3306
MYSQL_SERVICE_USER=xxx
MYSQL_SERVICE_PASSWORD=xxx
NACOS_AUTH_ENABLE=true
NACOS_AUTH_IDENTITY_KEY=xxxxxxxx
NACOS_AUTH_IDENTITY_VALUE=xxxxxxxxxxx
NACOS_AUTH_TOKEN=SecretKey012345678901234567890123456789012345678901234567890123456789
NACOS_AUTH_TOKEN_EXPIRE_SECONDS=86400
Nginx装备
upstream nacos-cluster { #这一部分装备在http{}内
server ip:8848;
server ip:8848;
server ip:8848;
}
server {
listen 9915;
server_name xxxxx;
location / {
proxy_pass http://nacos-cluster/;
proxy_set_header Host $host;
proxy_set_header X-Rea:l-IP $remote_addr;
}
access_log /var/log/nginx/nacos.log main;
检查集群状态
- nacos自带了metrics,能够对接上promethus
- 参阅Nacos 监控手册
客户端原理(要点)
当客户端拿到装备后,需求动态刷新,然后确保数据和服务器端是共同的,这个过程是怎么完结的呢?
Nacos选用长轮训机制来完结数据改变的同步,原理如下!
整体作业流程如下:
- 客户端建议长轮训恳求
- 服务端收到恳求今后,先比较服务端缓存中的数据是否相同,假如不通,则直接回来
- 假如相同,则经过schedule推迟29.5s之后再履行比较
- 为了确保当服务端在29.5s之内产生数据改变能够及时告诉给客户端,服务端选用事情订阅的办法来监听服务端本地数据改变的事情,一旦收到事情,则触发DataChangeTask的告诉,而且遍历allStubs队列中的ClientLongPolling,把成果写回到客户端,就完结了一次数据的推送
- 假如 DataChangeTask 使命完结了数据的 “推送” 之后,ClientLongPolling 中的调度使命又开端履行了怎么办呢?
很简略,只要在进行 “推送” 操作之前,先将本来等待履行的调度使命取消掉就能够了,这样就防止了推送操作写完呼应数据之后,调度使命又去写呼应数据,这时肯定会报错的。所以,在ClientLongPolling办法中,最开端的一个过程便是删去订阅事情
长轮训使命发动进口
在NewConfigClient的办法中,当ConfigClient被实例化今后,有做一些事情
- 创立一个root的config.listenConfigExecutor,用于监听装备(主协程)
- 初始化一个GetHttpAgent
- 确定装备文件、日志目录、缓存目录
func NewConfigClient(nc nacos_client.INacosClient) (*ConfigClient, error) {
config := &ConfigClient{
cacheMap: cache.NewConcurrentMap(),
schedulerMap: cache.NewConcurrentMap(),
}
config.schedulerMap.Set("root", true)
# 主协程
go config.delayScheduler(time.NewTimer(1*time.Millisecond), 500*time.Millisecond, "root", config.listenConfigExecutor())
config.INacosClient = nc
clientConfig, err := nc.GetClientConfig()
if err != nil {
return config, err
}
# HttpAgent
serverConfig, err := nc.GetServerConfig()
if err != nil {
return config, err
}
httpAgent, err := nc.GetHttpAgent()
if err != nil {
return config, err
}
loggerConfig := logger.Config{
LogFileName: constant.LOG_FILE_NAME,
Level: clientConfig.LogLevel,
Sampling: clientConfig.LogSampling,
LogRollingConfig: clientConfig.LogRollingConfig,
LogDir: clientConfig.LogDir,
CustomLogger: clientConfig.CustomLogger,
LogStdout: clientConfig.AppendToStdout,
}
err = logger.InitLogger(loggerConfig)
if err != nil {
return config, err
}
logger.GetLogger().Infof("logDir:<%s> cacheDir:<%s>", clientConfig.LogDir, clientConfig.CacheDir)
config.configCacheDir = clientConfig.CacheDir + string(os.PathSeparator) + "config"
config.configProxy, err = NewConfigProxy(serverConfig, clientConfig, httpAgent)
if clientConfig.OpenKMS {
kmsClient, err := kms.NewClientWithAccessKey(clientConfig.RegionId, clientConfig.AccessKey, clientConfig.SecretKey)
if err != nil {
return config, err
}
config.kmsClient = kmsClient
}
return config, err
}
咱们要点关注这行代码
go config.delayScheduler(time.NewTimer(1*time.Millisecond), 500*time.Millisecond, "root", config.listenConfigExecutor())
listenConfigExecutor
// Listen for the configuration executor
func (client *ConfigClient) listenConfigExecutor() func() error {
return func() error {
listenerSize := client.cacheMap.Count()
taskCount := int(math.Ceil(float64(listenerSize) / float64(perTaskConfigSize)))
currentTaskCount := int(atomic.LoadInt32(&client.currentTaskCount))
if taskCount > currentTaskCount {
for i := currentTaskCount; i < taskCount; i++ {
client.schedulerMap.Set(strconv.Itoa(i), true)
go client.delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, strconv.Itoa(i), client.longPulling(i))
}
atomic.StoreInt32(&client.currentTaskCount, int32(taskCount))
} else if taskCount < currentTaskCount {
for i := taskCount; i < currentTaskCount; i++ {
if _, ok := client.schedulerMap.Get(strconv.Itoa(i)); ok {
client.schedulerMap.Set(strconv.Itoa(i), false)
}
}
atomic.StoreInt32(&client.currentTaskCount, int32(taskCount))
}
return nil
}
}
-
获取监听器数量和核算使命数量:
- 经过
client.cacheMap.Count()
获取当时注册的监听器数量listenerSize
。 - 依据监听器数量核算需求履行的使命数量
taskCount
,运用了数学库中的math.Ceil()
函数和类型转换。
- 经过
-
动态调整使命履行状况:
- 经过比较
taskCount
和currentTaskCount
的大小,判别是否需求发动新的使命或中止部分使命。 - 假如
taskCount
大于currentTaskCount
,则依据新增使命数量,经过循环发动新的使命,并将其加入到schedulerMap
中。 - 发动新使命的办法是调用
delayScheduler()
办法,在新的 Goroutine 中履行longPulling()
办法。 - 终究,运用原子操作更新
currentTaskCount
的值。
- 经过比较
要点咱们看看client.longPulling(i)。go的客户端是一层套一层,读起来还是很舒服的
go client.delayScheduler(time.NewTimer(1*time.Millisecond), 10*time.Millisecond, strconv.Itoa(i), client.longPulling(i))
longPulling
先经过本地装备的读取和检查来判别数据是否产生改变然后完结改变的告诉
接着,当时的协程还需求去长途服务器上获得最新的数据,检查哪些数据产生了改变
- 经过匹配taskId获取长途服务器上数据改变的listeningConfigs
- 遍历这些改变的集合,然后调用ListenConfig从长途服务器获得对应的内容
- 更新本地的cache,设置为服务器端回来的内容
- 终究遍历cacheDatas,找到改变的数据进行调用callListener进行告诉
// Long polling listening configuration
func (client *ConfigClient) longPulling(taskId int) func() error {
return func() error {
var listeningConfigs string
initializationList := make([]cacheData, 0)
for _, key := range client.cacheMap.Keys() {
if value, ok := client.cacheMap.Get(key); ok {
cData := value.(cacheData)
if cData.taskId == taskId {
if cData.isInitializing {
initializationList = append(initializationList, cData)
}
if len(cData.tenant) > 0 {
listeningConfigs += cData.dataId + constant.SPLIT_CONFIG_INNER + cData.group + constant.SPLIT_CONFIG_INNER +
cData.md5 + constant.SPLIT_CONFIG_INNER + cData.tenant + constant.SPLIT_CONFIG
} else {
listeningConfigs += cData.dataId + constant.SPLIT_CONFIG_INNER + cData.group + constant.SPLIT_CONFIG_INNER +
cData.md5 + constant.SPLIT_CONFIG
}
}
}
}
if len(listeningConfigs) > 0 {
clientConfig, err := client.GetClientConfig()
if err != nil {
logger.Errorf("[checkConfigInfo.GetClientConfig] err: %+v", err)
return err
}
// http get
params := make(map[string]string)
params[constant.KEY_LISTEN_CONFIGS] = listeningConfigs
var changed string
changedTmp, err := client.configProxy.ListenConfig(params, len(initializationList) > 0, clientConfig.NamespaceId, clientConfig.AccessKey, clientConfig.SecretKey)
if err == nil {
changed = changedTmp
} else {
if _, ok := err.(*nacos_error.NacosError); ok {
changed = changedTmp
} else {
logger.Errorf("[client.ListenConfig] listen config error: %+v", err)
}
return err
}
for _, v := range initializationList {
v.isInitializing = false
client.cacheMap.Set(util.GetConfigCacheKey(v.dataId, v.group, v.tenant), v)
}
if len(strings.ToLower(strings.Trim(changed, " "))) == 0 {
logger.Info("[client.ListenConfig] no change")
} else {
logger.Info("[client.ListenConfig] config changed:" + changed)
client.callListener(changed, clientConfig.NamespaceId)
}
}
return nil
}
}
callListener
那是怎么调用,告诉的呢?
需求遍历装备列表changedConfigs
,对每个装备进行拆分并测验从cacheMap
中获取相应的装备信息。
- 假如获取成功,依据装备信息获取最新装备内容,并与缓存的 MD5 值进行比对。
- 假如 MD5 值产生改变,则履行监听器的回调函数,并更新缓存数据的 MD5 值和内容。
- 终究,在新的 Goroutine 中履行监听器的回调函数,并更新装备信息到缓存中。
这样就完结了装备监听告诉改变
graph LR
NewConfigClient --> listenConfigExecutor
listenConfigExecutor --> longPulling
longPulling --> changedTmp
changedTmp --> callListener
装备文件注册
前面咱们了解到客户端是怎么监听装备的。
那咱们想要完结监听某个dataId,只需求注册进listenConfigExecutor即可
err = nacos.ConfClient.ListenConfig(vo.ConfigParam{
DataId: DataId,
Group: APIConfig.Nacos.NacosConfig.Group,
OnChange: func(namespace, group, dataId, data string)}
func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) {
if len(param.DataId) <= 0 {
err = errors.New("[client.ListenConfig] DataId can not be empty")
return err
}
if len(param.Group) <= 0 {
err = errors.New("[client.ListenConfig] Group can not be empty")
return err
}
clientConfig, err := client.GetClientConfig()
if err != nil {
err = errors.New("[checkConfigInfo.GetClientConfig] failed")
return err
}
key := util.GetConfigCacheKey(param.DataId, param.Group, clientConfig.NamespaceId)
var cData cacheData
if v, ok := client.cacheMap.Get(key); ok {
cData = v.(cacheData)
cData.isInitializing = true
} else {
var (
content string
md5Str string
)
if content, _ = cache.ReadConfigFromFile(key, client.configCacheDir); len(content) > 0 {
md5Str = util.Md5(content)
}
listener := &cacheDataListener{
listener: param.OnChange,
lastMd5: md5Str,
}
cData = cacheData{
isInitializing: true,
dataId: param.DataId,
group: param.Group,
tenant: clientConfig.NamespaceId,
content: content,
md5: md5Str,
cacheDataListener: listener,
taskId: client.cacheMap.Count() / perTaskConfigSize,
}
}
client.cacheMap.Set(key, cData)
return
}
整个代码其实便是为了凑cData,终究就client.cacheMap.Set(key, cData)
终究,再从头梳理一下,构成真正的闭环
graph LR
NewConfigClient --> listenConfigExecutor
ListenConfig --> listenConfigExecutor
listenConfigExecutor --> longPulling
longPulling --> changedTmp
changedTmp --> callListener
callListener --> ListenConfig
ConfigProxy ListenConfig
来了,要点中的要点。客户端怎么跟服务端恳求数据的细节来了
nacos
选用的是客户端主动拉pull
模型,运用长轮询(Long Polling
)的办法来获取装备数据
客户端建议恳求后,服务端不会当即回来恳求成果,而是将恳求挂起等待一段时刻,假如此段时刻内服务端数据改变,当即呼应客户端恳求,若是一向无改变则等到指定的超时时刻后呼应恳求,客户端从头建议长链接。
func (cp *ConfigProxy) ListenConfig(params map[string]string, isInitializing bool, tenant, accessKey, secretKey string) (string, error) {
//fixed at 30000ms,avoid frequent request on the server
var listenInterval uint64 = 30000
headers := map[string]string{
"Content-Type": "application/x-www-form-urlencoded;charset=utf-8",
"Long-Pulling-Timeout": strconv.FormatUint(listenInterval, 10),
}
if isInitializing {
headers["Long-Pulling-Timeout-No-Hangup"] = "true"
}
headers["accessKey"] = accessKey
headers["secretKey"] = secretKey
if len(tenant) > 0 {
params["tenant"] = tenant
}
logger.Infof("[client.ListenConfig] request params:%+v header:%+v n", params, headers)
// In order to prevent the server from handling the delay of the client's long task,
// increase the client's read timeout to avoid this problem.
timeout := listenInterval + listenInterval/10
result, err := cp.nacosServer.ReqConfigApi(constant.CONFIG_LISTEN_PATH, params, headers, http.MethodPost, timeout)
return result, err
}
- 首先,设置了一个固定的监听距离时刻
listenInterval
,这个时刻是30000毫秒,即30秒,用来操控客户端和服务端之间的恳求距离,避免频繁恳求。 - 然后,界说了一个恳求头
headers
,包括了Content-Type
和Long-Pulling-Timeout
,用来指定恳求的内容类型和长轮询的超时时刻。 - 假如
isInitializing
为true,即当时为初始化阶段,会设置Long-Pulling-Timeout-No-Hangup
为true,表明在长轮询超时时不断开衔接。 - 接下来,依据传入的
accessKey
和secretKey
设置恳求头中的accessKey
和secretKey
字段,用于进行身份验证。 - 假如传入的
tenant
参数不为空,将其添加到params
中。 - 然后,经过
cp.nacosServer
发送一个POST恳求,恳求途径为constant.CONFIG_LISTEN_PATH
,即装备监听途径。恳求携带了params
作为参数,headers
作为恳求头,以及超时时刻为listenInterval
加上listenInterval
的十分之一的时长。
客户端缓存装备长轮训机制总结
整体完结的中心点就一下几个部分
-
对本地缓存的装备做使命拆分,每一个批次是3000条
-
针对每3000条创立一个线程去履行
-
先把每一个批次的缓存和本地磁盘文件中的数据进行比较,
- 假如和本地装备不共同,则表明该缓存产生了更新,直接告诉客户端监听
- 假如本地缓存和磁盘数据共同,则需求建议长途恳求检查装备改变
-
先以tenent/groupId/dataId拼接成字符串,发送到服务端进行检查,回来产生了改变的装备
-
客户端收到改变装备列表,再逐项遍历发送到服务端获取装备内容
服务端原理(要点)
服务端是怎么处理客户端的恳求的?那么相同,咱们需求考虑几个问题
- 服务端是怎么完结长轮训机制的
- 客户端的超时时刻为什么要设置30s
客户端建议的恳求地址是:/v1/cs/configs/listener
,于是找到这个接口进行检查,代码如下。
//# ConfigController.java
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
//解析客户端传递过来的或许产生改变的装备项目,转化为Map集合(key=dataId,value=md5)
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// 开端履行长轮训。
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
doPollingConfig
这个办法主要是用来做长轮训和短轮询的判别
- 假如是长轮训,直接走addLongPollingClient办法
- 假如是短轮询,直接比较服务端的数据,假如存在md5不共同,直接把数据回来。
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
// 判别当时恳求是否支撑长轮训。()
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
//假如是短轮询,走下面的恳求,下面的恳求便是把客户端传过来的数据和服务端的数据逐项进行比较,保存到changeGroups中。
// Compatible with short polling logic.
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// Compatible with short polling result.
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);
String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);
// Before 2.0.4 version, return value is put into header.
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}
Loggers.AUTH.info("new content:" + newResult);
// Disable cache.
response.setHeader("Pragma", "no-cache");
response.setDateHeader("Expires", 0);
response.setHeader("Cache-Control", "no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}
addLongPollingClient
把客户端的恳求,保存到长轮训的履行引擎中。
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
//获取客户端长轮训的超时时刻
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
//不允许断开的标记
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
//运用称号
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
//
String tag = req.getHeader("Vipserver-Tag");
//延期时刻,默认为500ms
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
// 提前500ms回来一个呼应,避免客户端呈现超时
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// Do nothing but set fix polling timeout.
} else {
long start = System.currentTimeMillis();
//经过md5判别客户端恳求过来的key是否有和服务器端有不共同的,假如有,则保存到changedGroups中。
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) { //假如发现有改变,则直接把恳求回来给客户端
generateResponse(req, rsp, changedGroups);
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { //假如noHangUpFlag为true,说明不需求挂起客户端,所以直接回来。
LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
//获取恳求端的ip
String ip = RequestUtil.getRemoteIp(req);
// Must be called by http thread, or send response.
//把当时恳求转化为一个异步恳求(意味着此刻tomcat线程被释放,也便是客户端的恳求,需求经过asyncContext来手动触发回来,否则一向挂起)
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout() is incorrect, Control by oneself
asyncContext.setTimeout(0L); //设置异步恳求超时时刻,
//履行长轮训恳求
ConfigExecutor.executeLongPolling(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
ClientLongPolling
- 智能化推迟履行:clientLongPolling 使命巧妙地选用推迟履行机制,在约 29.5 秒后履行。这不只有效减少了不必要的恳求,还能在一定程度上节省网络资源。这种智能化的推迟履行战略有助于进步体系的功能体现。
- 精准监控机制:经过定期履行使命并比对 MD5 值,clientLongPolling 使命精准地监控装备信息的改变。这种高效的监控机制确保了客户端能够在数据产生改变时及时获取到最新的装备信息。
- 即时告诉订阅机制:客户端经过订阅机制与服务端建立了即时通讯的桥梁,一旦装备信息产生改变,服务端能够当即告诉一切订阅了相关装备的客户端。这种高效的订阅告诉机制确保了体系能够快速呼应改变,完结了装备信息的实时更新和同步。
这些优化办法不只提升了体系的功能和功率,一起也增强了体系的可靠性和实时性,为用户供给了更优质的服务体会。
class ClientLongPolling implements Runnable {
@Override
public void run() {
//构建一个异步使命,拖延29.5s履行
asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
@Override
public void run() { //假如达到29.5s,说明这个期间没有做任何装备修正,则主动触发履行
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
// Delete subsciber's relations.
allSubs.remove(ClientLongPolling.this); //移除订阅联系
if (isFixedPolling()) { //假如是固定距离的长轮训
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
//比较改变的key
List<String> changedGroups = MD5Util
.compareMd5((HttpServletRequest) asyncContext.getRequest(),
(HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
if (changedGroups.size() > 0) {//假如大于0,表明有改变,直接呼应
sendResponse(changedGroups);
} else {
sendResponse(null); //否则回来null
}
} else {
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
sendResponse(null);
}
} catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this); //把当时线程添加到订阅事情队列中
}
}
allSubs
/**
* 长轮询订阅联系
*/
final Queue<ClientLongPolling> allSubs;
allSubs.add(this);
LongPollingService
在LongPollingService
的构造办法中,经过订阅LocalDataChangeEvent
事情来监听服务端数据的改变是一个高效的做法。当事情触发时,履行DataChangeTask
线程以处理数据改变。这种规划确保了体系能够实时呼应数据的改变,并在需求时进行相应的处理。
DataChangeTask
class DataChangeTask implements Runnable {
@Override
public void run() {
try {
ConfigCacheService.getContentBetaMd5(groupKey); //
//遍历一切订阅事情表
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next(); //得到ClientLongPolling
//判别当时的ClientLongPolling中,恳求的key是否包含当时修正的groupKey
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// If published tag is not in the beta list, then it skipped.
if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) { //假如是beta办法且betaIps不包含当时客户端ip,直接回来
continue;
}
// If published tag is not in the tag list, then it skipped.
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {//假如装备了tag标签且不包含当时客户端的tag,直接回来
continue;
}
//
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // Delete subscribers' relationships. 移除当时客户端的订阅联系
LogUtil.CLIENT_LOG
.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
RequestUtil
.getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
"polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
clientSub.sendResponse(Arrays.asList(groupKey)); //呼应客户端恳求。
}
}
} catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
}
}
}
服务端总结
Nacos ⼀致性协议
单机下其实问题不大,简略的内嵌联系型数据库即可;
但是集群形式下,就需求考虑怎么确保各个节点之间的数据⼀致性以及数据同步,而要处理这个问题,就不得不引进一致算法,经过算法来确保各个节点之间的数据的⼀致性。
为什么 Nacos 挑选了 Raft 以及 Distro
为什么 Nacos 会在单个集群中一起运转 CP 协议以及 AP 协议呢?这其实要从 Nacos 的场景出发的:Nacos 是⼀个集服务注册发现以及装备办理于⼀体的组件,因而关于集群下,各个节点之间的数据⼀致性确保问题,需求拆分红两个方面
从服务注册发现来看
服务之间感知对方服务的当时可正常供给服务的实例信息,有必要从服务发现注册中心进行获取,因而关于服务注册发现中心组件的可用性,提出了很高的要求,需求在任何场景下,尽最大或许确保服务注册发现能力能够对外供给服务;
一起 Nacos 的服务注册发现规划,采取了心跳可主动完结服务数据补偿的机制。假如数据丢掉的话,是能够经过该机制快速补偿数据丢掉。
因而,为了满意服务发现注册中心的可用性,强⼀致性的一致算法这儿就不太合适了,因为强⼀致性一致算法能否对外供给服务是有要求的,假如当时集群可用的节点数没有过半的话,整个算法直接“罢工”,而终究⼀致一致算法的话,更多确保服务的可用性,而且能够确保在⼀定的时刻内各个节点之间的数据能够达成⼀致。
上述的都是针关于 Nacos 服务发现注册中的非持久化服务而言(即需求客户端上报心跳进行服务实例续约)。
而关于 Nacos 服务发现注册中的持久化服务,因为一切的数据都是直接运用调用 Nacos服务端直接创立,因而需求由 Nacos 确保数据在各个节点之间的强⼀致性,故而针对此类型的服务数据,挑选了强⼀致性一致算法来确保数据的⼀致性
从装备办理来看
装备数据,是直接在 Nacos 服务端进行创立并进行办理的,有必要确保大部分的节点都保存了此装备数据才能认为装备被成功保存了,否则就会丢掉装备的改变,假如呈现这种状况,问题是很严峻的,假如是发布重要装备改变呈现了丢掉改变动作的状况,那八成就要引起严峻的现网故障了,因而关于装备数据的办理,是有必要要求集群中大部分的节点是强⼀致的,而这儿的话只能运用强⼀致性一致算法
Raft (CP形式)
关于强⼀致性一致算法,当时工业出产中,最多运用的便是 Raft 协议,Raft 协议更容易让人理解,而且有许多成熟的工业算法完结,比方
- 蚂蚁金服的 JRaft
- Zookeeper 的 ZAB
- Consul 的 Raft
- 百度的 braft
- Apache Ratis
因为 Nacos 是 Java 技术栈,因而只能在 JRaft、ZAB、ApacheRatis 中挑选,但是 ZAB 因为和 Zookeeper 强绑定,再加上希望能够和 Raft 算法库的支撑团队沟通交流,因而挑选了 JRaft,挑选 JRaft 也是因为 JRaft 支撑多 RaftGroup,为 Nacos 后边的多数据分片带来了或许。
Distro (AP形式)
而 Distro 协议是阿里巴巴自研的⼀个终究⼀致性协议,而终究⼀致性协议有许多,比方 Gossip、Eureka 内的数据同步算法。而 Distro 算法是集 Gossip 以及 Eureka 协议的长处并加以优化而出来的,关于原生的 Gossip,因为随机选取发送音讯的节点,也就不可避免的存在音讯重复发送给同⼀节点的状况,添加了网络的传输的压力,也给音讯节点带来额定的处理负载,而 Distro 算法引进
了威望 Server 的概念,每个节点负责⼀部分数据以及将自己的数据同步给其他节点,有效的降低了音讯冗余的问题。