开启生长之旅!这是我参与「日新计划 12 月更文应战」的第8天,点击查看活动概况

作者:谢泽华

背景

众所周知单个机房在出现不可抗拒的问题(如断电、断网等要素)时,会导致无法正常供给服务,会对事务形成潜在的丢失。所以在协同作业范畴,一种能够依据同城或异地多活机制的高可用规划,在确保数据一致性的同时,能够最大程度下降因为机房的仅单点可用所导致的潜在高可用问题,最大程度上确保事务的用户体验,下降单点问题对事务形成的潜在丢失显得尤为重要。

同城双活,关于出产的高可用确保,重大的意义和价值是不可言喻的。表面上同城双活仅仅简单的部署了一套出产环境罢了,但是在架构上,这个改变的影响是巨大的,无状态运用的高可用办理、恳求流量的办理、版别发布的办理、网络架构的办理等,其提升的架构复杂度巨大。

结合真实的协同作业产品:京办(为北京市政府供给协同作业服务的综合性平台)出产环境面对的复杂的政务网络以及京办同城双活架构演进的案例,给我们介绍下京办继续改善、分阶段演进过程中的一些考虑和实践经历的总结。本文仅针对ES集群在跨机房同步过程中的计划和经历进行介绍和总结。

架构

1.部署Logstash在金山云机房上,Logstash启动多个实例(按不同的类型分类,提高同步功率),并且和金山云机房的ES集群在相同的VPC

2.Logstash需求装备大网访问权限,确保Logstash和ES原集群和方针集群互通。

3.数据搬迁能够全量搬迁和增量搬迁,首次搬迁都是全量搬迁后续的添加数据挑选增量搬迁。

4.增量搬迁需求改造添加辨认的增量数据的标识,具体方法后续进行介绍。



跨机房ES同步实战



原理

Logstash作业原理



跨机房ES同步实战



Logstash分为三个部分input 、filter、ouput:

1.input处理接纳数据,数据能够来源ES,日志文件,kafka等通道.

2.filter对数据进行过滤,清洗。

3.ouput输出数据到方针设备,能够输出到ES,kafka,文件等。

增量同步原理

  1. 关于T时刻的数据,先运用Logstash将T曾经的一切数据搬迁到有孚机房京东云ES,假定用时∆T

  2. 关于T到T+∆T的增量数据,再次运用logstash将数据导入到有孚机房京东云的ES集群

  3. 重复上述步骤2,直到∆T足够小,此时将事务切换到华为云,终究完结新增数据的搬迁

适用范围:ES的数据中带有时刻戳或许其他能够区别新旧数据的标签

流程



跨机房ES同步实战



预备作业

1.创立ECS和装置JDK忽略,自行装置即可

2.下载对应版别的Logstash,尽量挑选与Elasticsearch版别一致,或挨近的版别装置即可

www.elastic.co/cn/download…

1) 源码下载直接解压装置包,开箱即用

2)修改对内存运用,logstash默许的堆内存是1G,依据ECS集群挑选合适的内存,能够加快集群数据的搬迁功率。



跨机房ES同步实战



  1. 搬迁索引

Logstash会协助用户自动创立索引,但是自动创立的索引和用户本身的索引会有少许差异,导致终究数据的搜索格式不一致,一般索引需求手动创立,确保索引的数据完全一致。

以下供给创立索引的python脚本,用户能够运用该脚本创立需求的索引。

create_mapping.py文件是同步索引的python脚本,config.yaml是集群地址装备文件。

注:运用该脚本需求装置相关依靠

yum install -y PyYAML
yum install -y python-requests

复制以下代码保存为 create_mapping.py:

import yaml
import requests
import json
import getopt
import sys
def help():
    print
    """
    usage:
    -h/--help print this help.
    -c/--config config file path, default is config.yaml
    example:  
    python create_mapping.py -c config.yaml 
    """
def process_mapping(index_mapping, dest_index):
    print(index_mapping)
    # remove unnecessary keys
    del index_mapping["settings"]["index"]["provided_name"]
    del index_mapping["settings"]["index"]["uuid"]
    del index_mapping["settings"]["index"]["creation_date"]
    del index_mapping["settings"]["index"]["version"]
    # check alias
    aliases = index_mapping["aliases"]
    for alias in list(aliases.keys()):
        if alias == dest_index:
            print(
                "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
            del index_mapping["aliases"][alias]
    if index_mapping["settings"]["index"].has_key("lifecycle"):
        lifecycle = index_mapping["settings"]["index"]["lifecycle"]
        opendistro = {"opendistro": {"index_state_management":
                                         {"policy_id": lifecycle["name"],
                                          "rollover_alias": lifecycle["rollover_alias"]}}}
        index_mapping["settings"].update(opendistro)
        # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
        del index_mapping["settings"]["index"]["lifecycle"]
    print(index_mapping)
    return index_mapping
def put_mapping_to_target(url, mapping, source_index, dest_auth=None):
    headers = {'Content-Type': 'application/json'}
    create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)
    if create_resp.status_code != 200:
        print(
            "create index " + url + " failed with response: " + str(create_resp) + ", source index is " + source_index)
        print(create_resp.text)
        with open(source_index + ".json", "w") as f:
            json.dump(mapping, f)
def main():
    config_yaml = "config.yaml"
    opts, args = getopt.getopt(sys.argv[1:], '-h-c:', ['help', 'config='])
    for opt_name, opt_value in opts:
        if opt_name in ('-h', '--help'):
            help()
            exit()
        if opt_name in ('-c', '--config'):
            config_yaml = opt_value
    config_file = open(config_yaml)
    config = yaml.load(config_file)
    source = config["source"]
    source_user = config["source_user"]
    source_passwd = config["source_passwd"]
    source_auth = None
    if source_user != "":
        source_auth = (source_user, source_passwd)
    dest = config["destination"]
    dest_user = config["destination_user"]
    dest_passwd = config["destination_passwd"]
    dest_auth = None
    if dest_user != "":
        dest_auth = (dest_user, dest_passwd)
    print(source_auth)
    print(dest_auth)
    # only deal with mapping list
    if config["only_mapping"]:
        for source_index, dest_index in config["mapping"].iteritems():
            print("start to process source index" + source_index + ", target index: " + dest_index)
            source_url = source + "/" + source_index
            response = requests.get(source_url, auth=source_auth)
            if response.status_code != 200:
                print("*** get ElasticSearch message failed. resp statusCode:" + str(
                    response.status_code) + " response is " + response.text)
                continue
            mapping = response.json()
            index_mapping = process_mapping(mapping[source_index], dest_index)
            dest_url = dest + "/" + dest_index
            put_mapping_to_target(dest_url, index_mapping, source_index, dest_auth)
            print("process source index " + source_index + " to target index " + dest_index + " successed.")
    else:
        # get all indices
        response = requests.get(source + "/_alias", auth=source_auth)
        if response.status_code != 200:
            print("*** get all index failed. resp statusCode:" + str(
                response.status_code) + " response is " + response.text)
            exit()
        all_index = response.json()
        for index in list(all_index.keys()):
            if "." in index:
                continue
            print("start to process source index" + index)
            source_url = source + "/" + index
            index_response = requests.get(source_url, auth=source_auth)
            if index_response.status_code != 200:
                print("*** get ElasticSearch message failed. resp statusCode:" + str(
                    index_response.status_code) + " response is " + index_response.text)
                continue
            mapping = index_response.json()
            dest_index = index
            if index in config["mapping"].keys():
                dest_index = config["mapping"][index]
            index_mapping = process_mapping(mapping[index], dest_index)
            dest_url = dest + "/" + dest_index
            put_mapping_to_target(dest_url, index_mapping, index, dest_auth)
            print("process source index " + index + " to target index " + dest_index + " successed.")
if __name__ == '__main__':
    main()

装备文件保存为config.yaml:

# 源端ES集群地址,加上http://
source: http://ip:port
source_user: "username"
source_passwd: "password"
# 意图端ES集群地址,加上http://
destination: http://ip:port
destination_user: "username"
destination_passwd: "password"
# 是否只处理这个文件中mapping地址的索引
# 假如设置成true,则只会将下面的mapping中的索引获取到并在意图端创立
# 假如设置成false,则会取源端集群的一切索引,除去(.kibana)
# 并且将索引称号与下面的mapping匹配,假如匹配到运用mapping的value作为意图端的索引称号
# 假如匹配不到,则运用源端原始的索引称号
only_mapping: true
# 要搬迁的索引,key为源端的索引姓名,value为意图端的索引姓名
mapping:
    source_index: dest_index

以上代码和装备文件预备完结,直接执行 python create_mapping.py 即可完结索引同步。

索引同步完结能够取方针集群的kibana上查看或许执行curl查看索引搬迁状况:

GET _cat/indices?v



跨机房ES同步实战



全量搬迁

Logstash装备位于config目录下。

用户能够参阅装备修改Logstash装备文件,为了确保搬迁数据的准确性,一般主张树立多组Logstash,分批次搬迁数据,每个Logstash搬迁部分数据。

装备集群间搬迁装备参阅:



跨机房ES同步实战



input{
    elasticsearch{
        # 源端地址
        hosts =>  ["ip1:port1","ip2:port2"]
        # 安全集群装备登录用户名暗码
        user => "username"
        password => "password"
        # 需求搬迁的索引列表,以逗号分隔,支持通配符
        index => "a_*,b_*"
        # 以下三项保持默许即可,包含线程数和搬迁数据大小和logstash jvm装备相关
        docinfo=>true
        slices => 10
        size => 2000
        scroll => "60m"
    }
}
filter {
  # 去掉一些logstash自己加的字段
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}
output{
    elasticsearch{
        # 意图端es地址
        hosts => ["http://ip:port"]
        # 安全集群装备登录用户名暗码
        user => "username"
        password => "password"
 # 意图端索引称号,以下装备为和源端保持一致
        index => "%{[@metadata][_index]}"
        # 意图端索引type,以下装备为和源端保持一致
        document_type => "%{[@metadata][_type]}"
        # 方针端数据的_id,假如不需求保留原_id,能够删去以下这行,删去后功能会更好
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }
    # 调试信息,正式搬迁去掉
    stdout { codec => rubydebug { metadata => true }}
}

增量搬迁

预处理:

  1. @timestamp 在elasticsearch2.0.0beta版别后弃用

www.elastic.co/guide/en/el…

  1. 本次关于京办从金山云机房搬迁到京东有孚机房,所涉及到的事务范畴多,各个事务线中所代表新增记录的时刻戳字段不一致,所涉及到的兼容作业量大,所以考虑经过elasticsearch中预处理功能pipeline进行预处理添加一致增量符号字段:gmt_created_at,以减少搬迁作业的复杂度(各自事务线可自行评估是否需求此步骤)。
PUT _ingest/pipeline/gmt_created_at
{
  "description": "Adds gmt_created_at timestamp to documents",
  "processors": [
    {
      "set": {
        "field": "_source.gmt_created_at",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}
  1. 查看pipeline是否收效
GET _ingest/pipeline/*
  1. 各个index设置对应settings添加pipeline为默许预处理
PUT index_xxxx/_settings
{
  "settings": {
    "index.default_pipeline": "gmt_created_at"
  }
}
  1. 查看新增settings是否收效
GET index_xxxx/_settings



跨机房ES同步实战



增量搬迁脚本

schedule-migrate.conf

index:能够运用通配符的方法

query: 增量同步的DSL,一致gmt_create_at为增量同步的特别符号

schedule: 每分钟同步一把,”* * * * *”

input {
elasticsearch {
        hosts =>  ["ip:port"]
        # 安全集群装备登录用户名暗码
        user => "username"
        password => "password"
        index => "index_*"
        query => '{"query":{"range":{"gmt_create_at":{"gte":"now-1m","lte":"now/m"}}}}'
        size => 5000
        scroll => "5m"
        docinfo => true
        schedule => "* * * * *"
      }
}
filter {
     mutate {
      remove_field => ["source", "@version"]
   }
}
output {
    elasticsearch {
        # 意图端es地址
        hosts => ["http://ip:port"]
        # 安全集群装备登录用户名暗码
        user => "username"
        password => "password"
        index => "%{[@metadata][_index]}"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }
# 调试信息,正式搬迁去掉
stdout { codec => rubydebug { metadata => true }}
}

问题:

mapping中存在join父子类型的字段,直接搬迁报400反常



跨机房ES同步实战



[2022-09-20T20:02:16,404][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400,
:action=>["index", {:_id=>"xxx", :_index=>"xxx", :_type=>"joywork_t_work", :routing=>nil}, #<LogStash::Event:0x3b3df773>], 
:response=>{"index"=>{"_index"=>"xxx", "_type"=>"xxx", "_id"=>"xxx", "status"=>400, 
"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", 
"caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"[routing] is missing for join field [task_user]"}}}}}

处理方法:

discuss.elastic.co/t/an-routin… github.com/elastic/ela…

结合事务特征,经过在filter中参加小量的ruby代码,将_routing的值取出来,放回logstah event中,由此问题得以处理。

示例:



跨机房ES同步实战