在上一篇文章中,咱们介绍了Elasticsearch java client的一些基本用法,为了达到生产级别的运用规范,下面介绍一些进阶的用法。
1.客户端tcp衔接超时
在咱们创立客户端时,实践上创立的是RestClient
,而底层运用的是apache
的HttpClient
,在创立后长期无操作时这个衔接或许会被封闭,此时客户端并不知晓,直接运用就会提示下文中的过错。再次恳求又是正常的,因为客户端会重新创立衔接。
java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-6 [ACTIVE]
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:915)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:300)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:288)
at co.elastic.clients.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:147)
at co.elastic.clients.elasticsearch.ElasticsearchClient.search(ElasticsearchClient.java:1833)
Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-6 [ACTIVE]
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:381)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
这个问题在Github的官方Issues上面也有相关的讨论:github.com/elastic/ela…
解决问题的关键在于让衔接可以坚持tcp keepalive
,有两种计划:
计划一:在客户端中显式的敞开keepalive
选项
RestClient httpClient = RestClient.builder(new HttpHost(hostName, port))
.setHttpClientConfigCallback(hc -> hc
.setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build())
).build();
别的,还需求设置体系层面的tcp keepalive
探测时间,默认值7200s太长或许会被自动封闭,建议修改为300s,默认配置如下:
net.ipv4.tcp_keepalive_time = 7200
net.ipv4.tcp_keepalive_intvl = 75
net.ipv4.tcp_keepalive_probes = 9
计划二:在客户端设置keepalive
策略,在超越指定时间后由客户端自行封闭,运用时再重新创立
RestClient httpClient = RestClient.builder(new HttpHost(hostName, port))
.setHttpClientConfigCallback(hc -> hc
.setKeepAliveStrategy((response, context) -> Duration.ofMinutes(5).toMillis()))
.build();
2.聚合核算
在Elasticsearch中Aggregation 分为3种类型:
- Metric: 核算类型,对字段进行核算平均值、求和等;
- Bucket: 分组核算,根据字段或许规模将文档分组到桶中进行核算;
- Pipeline:对聚合成果再次进行聚合核算;
在分组核算中有2个参数需求特别关注:Size、Shard size。官方文档:www.elastic.co/guide/en/el…
Size:在运用terms
对字段进行分桶时,默认值回来top 10文档,即只有10个核算成果,经过设置size的巨细可以回来所需巨细,最大值不超越search.max_buckets
。
MultiTermsAggregation aggregation = MultiTermsAggregation.of(s -> s.terms(
MultiTermLookup.of(t->t.field("product")),
MultiTermLookup.of(t->t.field("user"))
).size(100));
Shard size:在上一篇文章中,咱们介绍过Elasticsearch的查询进程,需求从每个分片获取成果后,再由和谐节点进行兼并排序。因为数据分布不均匀的原因,如果每个分片只获取size
巨细的文档,或许会出现核算误差。
Elasticsearch的解决计划是获取比所需更多的文档,在必定程度上避免这个问题,也就是Shard size
参数的用处。默认值:Shard size = size * 1.5 + 10
,在数据偏斜严重的情况下,可以恰当调大这个参数,当然也意味着更多的功能损耗。
3.数据快照
之前咱们介绍过search_after
可以完成深度分页功用,而在一些大批量数据导出的场景下,一般需求坚持数据游标不变来导出完整的数据,类似于快照的功用。而这就需求用到 point in time (PIT)
。
//获取pit id
OpenPointInTimeRequest openRequest = OpenPointInTimeRequest.of(o -> o
.index(getIndex())
.keepAlive(Time.of(t->t.time("10m"))));
OpenPointInTimeResponse openResponse = elasticsearchClient.openPointInTime(openRequest);
//查询数据
SearchRequest searchRequest = new SearchRequest.Builder()
.size(pageSize)
.sort(sortOptions)
.pit(p -> p.id(params.getPit()));
.build();
elasticsearchClient.search(searchRequest);
//封闭pit
ClosePointInTimeRequest closeRequest = ClosePointInTimeRequest.of(c -> c.id(pit));
elasticsearchClient.closePointInTime(closeRequest);
4.获取查找成果数量
在默认情况下search
接口回来的hits size
最大值是10000,如果需求获取实践的成果总数,需求敞开TrackHits
SearchRequest searchRequest = new SearchRequest.Builder()
.trackTotalHits(TrackHits.of(t->t.enabled(true)));
.build();
elasticsearchClient.search(searchRequest);
5.并发写入
一般情况下,Elasticsearch数据的写入会经过mq来进行触发,理论上可以经过mq的有序性来操控并发写入导致的数据覆盖问题,现实情况中考虑到功能、可靠性,较少选用这种方式。
计划一:添加version数据版别字段,经过CAS操作来完成乐观锁;
计划二:运用分布式锁,确保同一时间单个文档只有一个线程在履行更新操作,重试操作可以由mq来完成;
6.数据库业务
假定你正在运用Elasticsearch存储订单数据,在业务代码中的履行步骤如下:
- 更新MySQL中订单表数据;
- 发送订单变更的mq通知;
- 消费mq消息,从MySQL读取最新的数据写入Elasticsearch;
在运转一段时间后,你或许会发现Elasticsearch的数据与MySQL不一致,不是最新的版别;仔细分析上述进程会发现一个问题,在履行第2步操作时第1步的数据业务还没提交完成,将导致第3读取的不是最新数据。提供一种解决问题的思路,在业务提交完成后再发送mq消息。
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter(){
@Override
public void afterCommit() {
//发送mq
}
});
今日就先写到这里,你学”废”了吗。