本文正在参与「金石方案」
前语
之前就遇到的一个问题吧,发生过一两次了,每次都是重启等着渐渐消费,但这样必定是不可的啦,仍是得上点魔法。依照既往常规,我都会在文章选题开端时,看看网上咱们都是怎么写的,看了不少文章吧,感觉咱们写的仍是比较抽象。(⊙﹏⊙),不过或许也是由于音讯积压这一块吧,这个处理方案事实是过分简略,的确也没啥好说的,导致咱们这个不怎么贴代码,老是讲思路,我也能够理解。可是作为好家伙棒小伙,我必定得给咱们端上一碗热腾腾的鸡汤,最近心境略差,压力过大,所以皮不动了。那么直接开端吧,鸡汤来咯!
事情背景
之前我不是自己保护了一套日志搜集体系嘛,是Filebeat+Kafka+数据处理服务+Elasticsearch+Kibana+Skywalking这样的架构,其间数据处理服务是一个spring boot项目布置在公司的DevOps渠道上。由所以我自己弄的,我就自己建了一个项目空间,专门放我写的组件的源码以及这个数据处理服务,平常我也是在上面自己布置的,由于安稳运转一年多嘛,我平常也不重视。
几个周前的时分,呃,不知道咋回事出产的那个节点挂了。周四晚上挂的,发的邮件告诉我没看到(邮件太多就不看了),周五没人看日志,就这么到了周一下午,有搭档过来问我为啥没有这几天的日志。我先登录Kibana看了下,的确少了这几天的日志,然后我就登录DevOps发现节点没了,上CMAK(Kafka监控)发现积压了八百多万数据,嚯嚯,小一千万,大的要来了。
宕机后的处理
首要告诉发现的小伙伴都别慌,小BUG小局面,先用DevOps上面自带的K8S日志输出看实时日志或许写指令看日志文件。
点击容器日志可检查实时输出的日志,如下图。我个人觉得这个其实也蛮便利的,可是要定位历史问题,或许要进行搜索、链路追踪啥的,就不如一套日志搜集体系了。
由于我这个自己做的日志搜集体系不是公司级的项目(架构组也在做,可是有技术问题,技术问题场景太多了,所以还没有彻底推广),大概就咱们大组几十个开发在运用,运用到了二十多个子体系上,现在日志量现已增长到了月度亿级。
由于我会在代码标准中明令禁止测验或许正式环境打印Sql,所以日志量总体还算是可控。再加上我之前在数据处理服务中写了定时,定时删去半年以上日志,因而磁盘表明它状况杰出。万幸的是,暂时没有遇到太大问题,可是后续或许要考虑紧缩历史日志归档的问题。
扯远了,说了一些正确的废话,成功让咱们摸鱼十秒钟,哈哈。回到正题,我是马上重启了数据处理服务进行消费,由于原Topic只要3个分区,所以并发度是3。或许有的人会说为啥消费的时分不开多线程呢,开多线程批量刺进ES就不受分区并发度约束了?对的,这是一个手法,可是我敞开了手动提交offset,为了确保日志不丢掉,所以这并发度没法进步了。
正式的环境不能乱动,可是测验的能够随便玩,那么这个音讯积压的问题场景就由我收下了!处理思路如下,非常简略,但问题就在于细节非常多,网上的实践内容也相对偏少,所以我就想着经过文章的方式来复盘下我悉数的操作。
- 修正现有consumer的问题,并将其停掉,再不停就真G了。
- 从头创立一个容量更大的Topic,比方patition是原本的N倍,大大大。
- 编写一个暂时consumer程序,消费原本积压的队列(留意该consumer不做任何耗时的操作,仅作为中转将音讯快速写入新创立的Topic里)。
- 将修正好的consumer布置到原本N倍的机器上消费新Topic。
- 音讯积压处理后,康复原有架构。
真实的操作场景复盘
新建一个中转Topic
这儿无论是经过指令仍是在可视化东西上创立,都能够啦,不论黑猫白猫,抓到耗子便是好猫。由于原本的Topic分区是3,这儿能够挑选进步10倍,改成30。可是我是挑选了相对保存的12作为分区数,由于我这资源有限啦。
暂时顾客,中转音讯
这儿咱们就在原数据处理服务直接写一个监听加中转就OK了。
@Autowired
@Qualifier("performanceKafkaTemplate")
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = {"log-zero-#{systemProperties['env']}"}, containerFactory = "testFactory")
public void forwardTopic(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
List<LogMsgDTO> logMsgList = new ArrayList<>(1024);
records.forEach(record -> {
ListenableFuture<SendResult<String, String>> sendListener =
kafkaTemplate.send("log-zero-test", record.value());
sendListener.addCallback(success -> {
}, err -> {
log.error("音讯发送失利", err);
});
});
ack.acknowledge();
}
这儿针对Kafka做了一个封装,用的是自己封装的高功用版出产者和测验版顾客。
消费新Topic
首要改写一下原有的消费逻辑,由于之前是按月动态索引,所以消费音讯时会先从redis中获取是否存在当月索引,现在我能够直接写死这个索引,新增即可,消费的代码如下。
@KafkaListener(topics = {"log-zero-test"}, containerFactory = "testFactory")
public void testLogListen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
List<LogMsgDTO> logMsgList = new ArrayList<>(1024);
records.forEach(record -> {
String value = record.value();
String[] split = value.split("\\|");
String system = split[2];
if (split.length > 13) {
String env = split[3];
LogInfoDTO info = new LogInfoDTO();
info.setCreateTime(split[0]);
info.setLogType(split[1]);
info.setSystem(system);
info.setEnv(env);
info.setLocalIp(split[4]);
info.setRequestIp(split[5]);
info.setTriceId(split[6]);
info.setRequestUrl(split[7]);
info.setRequestType(split[8]);
info.setRequestMethod(split[9]);
info.setUserId("null".equalsIgnoreCase(split[10]) ? "" : split[10]);
info.setUserName(split[11]);
info.setThreadName(split[12]);
info.setMsg(split[13]);
logMsgList.add(new LogMsgDTO("test-log-zero-uat-re", JSON.toJSONString(info)));
}
});
if (!CollectionUtils.isEmpty(logMsgList)) {
esService.bulkAddRequest(logMsgList);
}
ack.acknowledge();
}
/**
* 批量新增日志
*/
public void bulkAddRequest(List<LogMsgDTO> msgList) {
BulkRequest request = new BulkRequest();
//等候批量恳求作为执行的超时设置为2分钟
request.timeout(TimeValue.timeValueMinutes(3));
msgList.forEach(msg -> {
request.add(new IndexRequest(msg.getTopic()).source(msg.getMsg(), XContentType.JSON));
});
zeroClient.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
for (BulkItemResponse bulkItemResponse : bulkResponse) {
boolean failed = bulkItemResponse.isFailed();
if (failed) {
log.error("批量刺进音讯失利,详细信息={}", bulkItemResponse.getFailureMessage());
}
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
handleAddDocSuccess(indexResponse);
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
break;
default:
break;
}
}
}
@Override
public void onFailure(Exception e) {
log.error("批量刺进失利", e);
}
});
}
消费音讯这个场景做的很简略啊,便是转化日志信息,然后拼接ES批量刺进的恳求体,简直没什么费事的动作。这儿要留意一下,假如音讯早已发送到Topic,并且挑选了一个全新的顾客组,那么将auto.offset.reset改成earliest从头读起。
积压音讯消费之后,切回原本的架构
由于我这儿运用了DevOps渠道,所以切换很简略,选之前的容器改变布置即可。
年轻人,听说你也懂优化
我是真的栓Q,大乌鱼事情发生,现在不懂优化是真的不可的辣!
出产者优化
众所周知,Kafka不懂优化也没有太大关系,由于通用版现已够好用了,可是作为卷王怎么能够承受通用,必定是狠狠地定制,定在墙上那种。这儿简略说一下高功用版出产者参数的调整,顺道解说下调整意义。
- batch.size从16384(16KB)进步到163840(160KB),简略粗犷的进步10倍批量巨细,攒够这个巨细才允许发送音讯,然后削减恳求次数。
- linger.ms默许是0ms,也便是无延迟推送音讯,由于batch.size调大了所以这儿同步增加到20ms,留意这儿需求依据实际情况进行调整。
batch.size和linger.ms是决定吞吐量和延时的重要参数,两个条件任一满意即可发送音讯。batch.size过大会导致发送音讯难以攒够batch.size巨细导致音讯发不出去,因而需求linger.ms保底,linger.ms时刻一到也能发送音讯。
- buffer.memory默许是32MB,这个缓冲巨细要随着batch.size进步而进步到67108864(64MB)。防止过小导致音讯无法写入,然后堵塞后续的出产音讯。
- acks运用默许值1,确保分区leader副本写入即可。
- max.request.size默许1048576B(1MB),由于日志音讯,比方反常堆栈信息会超越1MB,所以这儿调整到5MB,留意Topic的message.max.bytes也要同步调整巨细。
- request.timeout.ms默许3000ms,增大了max.request.size也要适当进步下恳求超时参数,比方60000ms,暴力加参是这样的。
- compression.type默许无,由于日志场景会有较大量的输入,所以最好开一下紧缩。我这儿归纳考虑紧缩比和紧缩吞吐量挑选了lz4,留意这儿broker的compression.type默许是跟随producer的,假如自己有修正,一定要改回去,防止紧缩算法不一致导致的额外开支。
以上便是高功用通用版出产者的重要参数调整,接下来针对出产速度做一个比照。在本地笔记本8核16G环境下(远低于服务器功用,仅作比照参考),并发消费3分区向12分区(副本数为1)新Topic出产音讯,另一比照者reliableHighKafkaTemplate(仅敞开ACK为-1,调大恳求音讯体),成果峰值如下。高功用版的出产者是要比一般的版本要高出不少的,假如放在服务器上这个速度还会暴增,因而出产音讯的速度咱们并不需求太介意,必定是比消费更快的。
高功用版 | 高可靠版 |
---|---|
6045.41 条/s | 3925.80 条/s |
顾客优化
既然运用了Spring-Kafka就要老老实实的承受人家的设定(这儿是妄图经过Kafka-client写出顾客,可是被生命周期管理代码劝退,然后老老实实用Spring真香桶的败犬菜恐龙),依据Spring官方给的两条路,挑选并发消费。比较写装备文件调参,我更喜爱经过bean加载,除了能够更自由的挑选之外,还能够塞到组件里即插即用。接下来要点介绍下高功用模式下,怎么调整顾客参数,以及怎么定制Spring-Kafka的顾客工厂。
/**
* @author WangZY
* @date 2023/3/6 18:52
* @description 测验用手动
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> testFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> container =
new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> props = new HashMap<>(16);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, StringUtils.isEmpty(prop.getConsumerGroupId()) ?
"testConsumerGroup" : prop.getConsumerGroupId());
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
container.setConcurrency(3);
container.setBatchListener(true);
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return container;
}
- fetch.min.bytes该参数用来装备 Consumer 在一次拉取恳求(调用 poll() 办法)中能从 Kafka 中拉取的最小数据量,默许值为1B。为了削减网络IO,也是合作前面出产者的高功用装备,这儿果断上调到1048576(1MB),对应fetch.max.bytes为50MB,这个不必调。
- max.poll.records用来装备 Consumer 在一次拉取恳求中拉取的最大音讯数,默许值为500(条),略小略小,合作出产者调整至1000。这儿是预测值,要依据实际情况调整,下文有提,看到最终哟。
- max.poll.interval.ms这个参数界说了两次poll()之间的最大距离,默许值为3000(5分钟)。假如超越这个距离同样会触发rebalance。在大都情况下这个参数是导致rebalance音讯重复的关键,即事务处理音讯耗时太长。这儿一般来说不必调整,假如出现,也是优先调整代码,而不是修正该参数。
这儿Spring-Kafka的顾客工厂界说,除了调参之外,重要还有敞开并发监听setBatchListener(true)。只要敞开了这个参数,才能以public void testLogListen(List> records)这种入参为List的方式监听消费。同理这儿getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL)则是为了让入参中的Acknowledgment ack收效,改为手动提交offset后,虽然降低了消费速度,可是防止了因消费报错导致的音讯丢掉问题。
ES装备优化
前面ElasticSearch不停机重建索引引申来的优化与考虑 – ()也讲过这个问题,这一次当然仍是类似的装备副本数能够先设置为0,改写距离禁用,刷盘设置为异步即可。别的一些歪门邪道经测验没大用,除了前面三项最好的进步便是加内存,这个堆硬件最好使。插一个官方引荐的优化刺进速度的链接Tune for indexing speed | Elasticsearch Guide [8.6] | Elastic,顺道总结下ES官方引荐的进步刺进速度的办法
- 运用Bulk批量刺进
- 多线程恳求刺进ES
- 取消或许增加索引改写距离
- 刺进时禁用索引初始副本
- 服务器禁用swap
- 索引文档运用主动生成的ID
- 运用更快的硬件,比方SSD
- 加内存(这一点是我加的,由于ES很多装备是内存百分比的,进步内存一般是功用进步的最优解。懂吗朋友,加钱啊)
不同架构的消费速度比照
现在测验节点是三节点集群,都是mixed节点,服务器都是8核16G+1T机械磁盘(用SSD会更快)。ES除了一个节点因机器上服务过多(Kibana,SkywalkingUI,redis-cluster等),设置为4G(-Xms4G -Xmx4G),另两台设置为6G。接下来我会以表格的方式记载不同装备下,在Kibana上观测到的极限刺进速度。
贴一张Kibana的图,其他就不截图了,不然成灌水了。这个实测成果其实是契合我的预期的,不过让我意外的是1master(4G)+2data(6+6)的速度高于3mixed(4+6+6)。不知道是不是我的机器装备原因导致了这一点,可是我手里也没有空闲的服务器供我测验了,就留下读者们自己实验了,有测验过的大佬能够在评论区留言。简略来说,是契合上述ES官方引荐的优化装备的说法的,多线程的进步是巨量的,禁用副本和改写距离也会发生不小的进步。
消费的单次拉取值是需求依据实际情况调整的,我第一次调整是单次3000,后来调成2000,都发生了同一个问题,速度没上去,服务器负载却是飙升,如下图
影响的BUG战场
发送音讯体过大
新建了Topic后,写好中转出产者后,刚调用就G了,快如闪电。原因很简略,音讯体超越message.max.bytes默许值1MB了,调大即可。
ES刺进获取恳求衔接超时
java.util.concurrent.TimeoutException: Connection lease request time out
at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:411) ~[httpcore-nio-4.4.13.jar:4.4.13]
at org.apache.http.nio.pool.AbstractNIOConnPool.processNextPendingRequest(AbstractNIOConnPool.java:391) ~[httpcore-nio-4.4.13.jar:4.4.13]
at org.apache.http.nio.pool.AbstractNIOConnPool.release(AbstractNIOConnPool.java:355) ~[httpcore-nio-4.4.13.jar:4.4.13]
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.releaseConnection(PoolingNHttpClientConnectionManager.java:391) ~[httpasyncclient-4.1.4.jar:4.1.4]
我是在spring boot数据处理服务中运用了elasticsearch-rest-high-level-client客户端,其间封装了HttpClient来向ES发送恳求。这儿的问题来源于运用了bulkAsync(后依据场景换成了同步恳求bulk),这是个异步恳求,底层是AbstractNIOConnPool提供了恳求衔接池。发生这个报错的原因是新的异步恳求在获取衔接时超越了获取衔接超时时刻,处理方案最直接的便是调大衔接池巨细和恳求衔接超时时刻。
httpClientBuilder.setMaxConnTotal(zeroProperties.getMaxConnectTotal());
httpClientBuilder.setMaxConnPerRoute(zeroProperties.getMaxConnectPerRoute());
ES装备超时时刻无效
RestClientBuilder builder = RestClient.builder(httpHostZeroList.toArray(new HttpHost[0]));
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(zeroProperties.getMaxConnectTotal());
httpClientBuilder.setMaxConnPerRoute(zeroProperties.getMaxConnectPerRoute());
return httpClientBuilder;
});
if (!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder;
});
}
在处理上面问题时Debug发现一个很奇特的现象,便是这儿的装备超时时刻的setMaxConnTotal和setMaxConnPerRoute,假如后边有相同的setHttpClientConfigCallback办法时,就会走最终一个set办法,前面的不会收效。在没有debug之前,我设置的参数一直不能收效,始终是默许值,我百思不得其解,最终经过debug,发现了这个奇特的现象。最终处理办法如下,便是放在一起进行装备,希望有大佬能解答我这个问题。
RestClientBuilder builder = RestClient.builder(httpHostZeroList.toArray(new HttpHost[0]));
if (!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
httpClientBuilder.setMaxConnTotal(zeroProperties.getMaxConnectTotal());
httpClientBuilder.setMaxConnPerRoute(zeroProperties.getMaxConnectPerRoute());
return httpClientBuilder;
});
}else {
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(zeroProperties.getMaxConnectTotal());
httpClientBuilder.setMaxConnPerRoute(zeroProperties.getMaxConnectPerRoute());
return httpClientBuilder;
});
}
ES过量刺进导致OOM
org.elasticsearch.client.ResponseException: method [POST], host [http://192.168.158.115:9200], URI [/_bulk?timeout=3m], status line [HTTP/1.1 429 Too Many Requests]
{“error”:{“root_cause”:[{“type”:”circuit_breaking_exception”,”reason”:”[parent] Data too large, data for [] would be [2077061194/1.9gb], which is larger than the limit of [2040109465/1.8gb], real usage: [2074284352/1.9gb], new bytes reserved: [2776842/2.6mb], usages [request=0/0b, fielddata=37121/36.2kb, in_flight_requests=10749974/10.2mb, model_inference=0/0b, accounting=8100600/7.7mb]”,”bytes_wanted”:2077061194,”bytes_limit”:2040109465,”durability”:”TRANSIENT”}],”type”:”circuit_breaking_exception”,”reason”:”[parent] Data too large, data for [] would be [2077061194/1.9gb], which is larger than the limit of [2040109465/1.8gb], real usage: [2074284352/1.9gb], new bytes reserved: [2776842/2.6mb], usages [request=0/0b, fielddata=37121/36.2kb, in_flight_requests=10749974/10.2mb, model_inference=0/0b, accounting=8100600/7.7mb]”,”bytes_wanted”:2077061194,”bytes_limit”:2040109465,”durability”:”TRANSIENT”},”status”:429}
ES默许是1G,我这儿调整到2G,上面这段话的意思便是说,ES的堆内存满了,放不下新的数据,也便是标准OOM了。要点观察报错日志的几个参数
- which is larger than the limit of [2040109465/1.8gb],这儿提示的是内存上限,默许是ES内存最大值的95%
- real usage: [2074284352/1.9gb],这儿是当前已运用内存巨细
- new bytes reserved: [2776842/2.6mb],这儿是本次恳求需求的内存空间巨细
- data for [] would be [2077061194/1.9gb],这儿指的是已运用内存巨细+本次恳求巨细,这个值大于内存上限,因而报错
此刻ES此节点挂掉,检查日志能够得到下面这段。
[2023-03-09T17:45:22,338][INFO ][o.e.i.b.HierarchyCircuitBreakerService] [node-2] attempting to trigger G1GC due to high heap usage [2049729624]
[2023-03-09T17:45:22,343][INFO ][o.e.i.b.HierarchyCircuitBreakerService] [node-2] GC did bring memory usage down, before [2049729624], after [2029398072], allocations [1], duration [5]
[2023-03-09T17:45:22,332][WARN ][o.e.h.AbstractHttpServerTransport] [node-2] caught exception while handling client http traffic, closing connection Netty4HttpChannel{localAddress=/192.168.158.114:9200, remoteAddress=/172.27.136.31:6096}
java.lang.Exception: java.lang.OutOfMemoryError: Java heap space
这个处理办法其实很简略,由于调任何参数都不治本了,所以仅有能做的便是加内存,我在测验阶段是加到4G就没有问题了。
微醺码头
代码场景设计实践 – 云雨雪的专栏 – (),忽然发现我这个专栏现已有26人订阅了,系列文章均3K阅览量40赞60收藏。本期也将收录至该专栏,依照传统,必定要登上微醺码头,为咱们带来一些散碎的知识点,坐稳了,开船啦!
更换节点
一般分为四种节点类型,分别是主节点、数据节点、客户端节点和混合节点。经过elasticsearch.yml中的node.master: true和node.data: true(默许值)来装备,默许节点是混合节点。
- 主节点(master),装备node.master: true、node.data: false。主要功用是保护元数据,管理集群节点状况,不担任数据写入和查询。此机器相对内存装备要求低一些,可是有必要安稳,由所以master,挂了就都没得玩了。
- 数据节点(data),装备node.master: false、node.data: true。主要功用是担任数据写入和查询,因而要求内存有必要够大。
- 客户端节点(client),装备node.master: false、node.data: false。主要功用是担任任务分发和成果汇聚,分管数据节点压力,计算用的,内存也得大大大。
- 混合节点(mixed),装备node.master: true、node.data: true。全能冠军,(⊙﹏⊙),就和全栈工程师一样,啥都会但也不会特别凶猛,压力一大就容易G了。
当集群到达一定规划之后,不主张运用mixed,而是应该对各节点进行角色区分,能够依照mixed—>master+data—>master+data+client逐渐区分。
接下来介绍下怎么从mixed进行转化,当然我的主张是一开端就要区分出来清晰的角色,别到一半再转。以下是从mixed转化到master的步骤,其他同理了。
- 修正elasticsearch.yml中的node.master: true和node.data:false
- 下线该节点,在es装置目录下的bin文件夹下执行./elasticsearch-node repurpose铲除分区数据
- 最终重启该节点就成功
留意这儿有几个问题,首要是在当前节点拥有分片数据的时分是不允许直接设置node.data:false的,会提示以下反常
我是运用的ES7.9版本,因而官方在报错信息后贴心的提示了能够运用东西www.elastic.co/guide/en/el…进行分片信息的删去。
这儿bin目录下执行指令后,会有个承认环节,再次让你判断是否要铲除分片数据。这儿我的主张是三个,一是手动经过ES的API搬迁分片(一次只能搬迁一个索引,摧残),二是一开端就弄好节点角色定位防止二次改变,三是所有索引都加上副本。
从上面这个图中能够看到,当挑选铲除主节点分片数据后,有副本的索引的数据不会丢,而是将副本晋级为分片,体现在未分配分片中。由于我的索引大部分设置为3分片,所以这时分索引大部分状况都是Yellow,小部分没有副本的则是变成了Red。(上面这个图截的不好,应该截索引那个界面的)
当我从头将master节点变成mixed节点后,未分配的分片会渐渐填充到该节点,节点和集群状况也会转Green,不要害怕,让子弹飞一瞬间。
Kafka各类型通用出产者装备
依照功用区分为三种特化型出产者和一种普适出产者。三种特化型分别是高功用(吞吐量)、高可靠性、次序性。高功用上面说过了,就不复述了。
- 高可靠性进步重试次数和重试时刻以及ack设置为all来保障音讯发送的可靠性
- 次序性则运用max.in.flight.requests.per.connection设置为1确保音讯不会由于重试而次序紊乱,一起topic设置分区为1或许出产者发送指定分区来协同确保次序性
import com.xxx.transfer.properties.TransferProperties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
import java.util.Map;
/**
* @Author WangZY
* @Date 2022/3/15 14:51
* @Description 多种出产者装备
**/
@EnableConfigurationProperties(value = {TransferProperties.class})
@Configuration
public class KafkaProducerConfig {
@Autowired
private TransferProperties prop;
/**
* @author WangZY
* @date 2022/7/14 11:38
* @description 高功用
*/
@Bean
public KafkaTemplate<String, String> performanceKafkaTemplate() {
Map<String, Object> props = new HashMap<>(16);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 163840);
props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
//config.getInt,主动强转这儿不必介意是字符串仍是数字
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
/**
* @author WangZY
* @date 2022/7/14 11:40
* @description 高可靠性
*/
@Bean
public KafkaTemplate<String, String> reliableHighKafkaTemplate() {
Map<String, Object> props = new HashMap<>(16);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
props.put(ProducerConfig.ACKS_CONFIG, "-1");
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
/**
* @author WangZY
* @date 2022/7/14 11:40
* @description 次序性
*/
@Bean
public KafkaTemplate<String, String> timeKafkaTemplate() {
Map<String, Object> props = new HashMap<>(16);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(ProducerConfig.RETRIES_CONFIG, "1");
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
/**
* @author WangZY
* @date 2022/7/14 11:41
* @description 一般
*/
@Primary
@Bean
public KafkaTemplate<String, String> normalKafkaTemplate() {
Map<String, Object> props = new HashMap<>(16);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(ProducerConfig.RETRIES_CONFIG, "1");
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
}
Kafka各类型通用顾客装备
这个就相对朴素了,区分比较少,主要是是否主动提交offset的比照,高功用部分也在前面提到过,直接贴代码吧。
import com.xxx.transfer.properties.TransferProperties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
/**
* @Author WangZY
* @Date 2022/3/15 14:51
* @Description 多种顾客装备
**/
@EnableConfigurationProperties(value = {TransferProperties.class})
@Configuration
public class KafkaConsumerConfig {
@Autowired
private TransferProperties prop;
/**
* @author WangZY
* @date 2022/7/14 11:35
* @description 高功用
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> performanceFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> container =
new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> props = new HashMap<>(16);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, StringUtils.isEmpty(prop.getConsumerGroupId()) ?
"performanceConsumerGroup" : prop.getConsumerGroupId());
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
container.setConcurrency(3);
container.setBatchListener(true);
return container;
}
/**
* @author WangZY
* @date 2022/7/14 11:36
* @description 高可靠
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> reliableHighFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> container =
new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> props = new HashMap<>(16);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, StringUtils.isEmpty(prop.getConsumerGroupId()) ?
"reliableHighConsumerGroup" : prop.getConsumerGroupId());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
container.setConcurrency(3);
container.setBatchListener(true);
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return container;
}
/**
* @author WangZY
* @date 2022/7/14 11:38
* @description 一般
*/
@Primary
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> normalFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> container =
new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> props = new HashMap<>(16);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, StringUtils.isEmpty(prop.getConsumerGroupId()) ?
"normalConsumerGroup" : prop.getConsumerGroupId());
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
container.setConcurrency(3);
container.setBatchListener(true);
return container;
}
}
写在最终
在我看来音讯积压这个问题场景的处理思路是很简略的,的确没啥好讲的,但一起细节又非常丰富,因而我补充了一些优化知识和实际遇到的问题。处理问题的过程仍是蛮有意思的,非常影响可是遗憾的是,问题只能一个人来处理,事实有些寂寞,工作压力大,导致我最近也比较难过。一起也有一些事,延迟触发,让我更加难过,感觉仍是一时半会儿走不出去,摧残。2023年3月12日的下午补完了本篇,算是达成了今日的方针,原本想开开心心的来收个尾,可是忽然一下就被苦楚击倒,变得低沉,有时分,没那么多主意也挺好。
我本年开端的时分,给我自己定下的人生信条是,要做一个高兴的人,在这苦楚压抑的国际绽放幸福高兴之花。当时是和一个人谈天的时分,忽然想到这句话的,就直接打出来了,他说我中二,我没说话,由于他说得很对。人不中二枉少年,或许就要单纯一点、直白一点,但我做不到。所以我给自己定下一个方针,便是的个人介绍里的那句,作为方针吧,少年终究是要向前冲击,向美好的国际献上祝福!这句话对我来说有两个意思,谜语人哈,我也不想解说,不过给咱们的就单纯是一个美好的祝福吧。日子不易,愿国际绽放幸福高兴之花,诸君共勉!