Elastic-Stack学习笔记

在容器云的日志收集和分析中,没有采用CNCF的fluentd方案,而是直接采用了elastic-stack的一整套解决方案。

通过服务编排的时候写日志收集的CRD到K8S,在主机上通过daemonset的方式来运行fileBeat收集容器日志;fileBeat的output是kafka,通过消息中间件来做汇聚和压力削峰,接下来再是logstach和elasticsearch。这只是日志收集的流程,其实里面还涉及具体如何做系统的负载均衡,日志流的告警等,这里就不一一分析。

本文重点聊下elasticsearch相关的一些知识点。

ES安装

部署

1
docker run --name=elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms256m -Xmx256m" -d elasticsearch

插件命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
root@2c03a6312e66:/usr/share/elasticsearch# elasticsearch-plugin --help
A tool for managing installed elasticsearch plugins
Commands
--------
list - Lists installed elasticsearch plugins
install - Install a plugin
remove - removes a plugin from Elasticsearch

Non-option arguments:
command

Option Description
------ -----------
-h, --help show help
-s, --silent show minimal output
-v, --verbose show verbose output

# 安装插件
elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.2.4/elasticsearch-analysis-ik-6.2.4.zip

Kibana安装

1
docker run -d --link elasticsearch:elasticsearch --name kibana -p 5601:5601 kibana:5.6.16

ES核心知识

主要涉及对ES整体框架认识理解的几个方面。

集群管理

节点角色

  • Master节点

    Master节点控制Elasticsearch集群,并负责在集群范围内创建/删除索引,跟踪哪些节点是集群的一部分,并将分片分配给这些节点。主节点一次处理一个集群状态,并将状态广播到所有其他节点,这些节点需要响应并确认主节点的信息。

    在elasticsearch.yml中,将nodes.master属性设置为true(默认),可以将节点配置为有资格成为主节点的节点。对于大型生产集群,建议拥有一个专用主​​节点来控制集群,并且不服务任何用户请求。

  • Data节点

    数据节点用来保存数据和倒排索引。默认情况下,每个节点都配置为一个data节点,并且在elasticsearch.yml中将属性node.data设置为true。如果您想要一个专用的master节点,那么将node.data属性更改为false。

  • Client节点

    如果将node.master和node.data设置为false,则将节点配置为客户端节点,并充当负载平衡器,将传入的请求路由到集群中的不同节点。若你连接的是作为客户端的节点,该节点称为协调节点(coordinating node)。协调节点将客户机请求路由到集群中对应分片所在的节点。对于读取请求,协调节点每次选择不同的分片来提供请求以平衡负载。

跨集群搜索

相当于在多个ES集群之外,配置一个ES作为代理,这个代理节点不会加入到存储数据的ES集群中;它只是作为查询代理节点。

配置的方式可以是直接修改yaml文件,或者通过HTTP的方式下发配置,指定remote ES cluster。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
PUT _cluster/settings
{
"persistent": {
"cluster": {
"remote": {
"cluster_one": {
"seeds": [
"127.0.0.1:9300"
]
},
"cluster_two": {
"seeds": [
"127.0.0.1:9301"
]
},
"cluster_three": {
"seeds": [
"127.0.0.1:9302"
]
}
}
}
}
}

具体参考这里

故障检测

zen discovery模块

  • Ping:执行该过程的节点,用来发现对方。
  • Unicast:该模块包含一个主机名列表,用于控制要ping哪个节点。

故障检测的过程是这样的: 主节点ping所有的节点来检查这些节点是否存活,而所有的节点回ping主节点来汇报他们是存活的。

discovery.zen.minimum_master_nodes

推荐的值为: (num_master_node / 2) + 1

假设我们有一个集群,有3个master节点,当网络发生故障的时候,有可能其中一个节点不能和其他节点进行通信了。这个时候,当discovery.zen.minimum_master_nodes设置为1,就会分成两个小的独立集群,当网络好的时候,就会出现数据错误或者丢失数据的情况。当discovery.zen.minimum_master_nodes设置为2的时候,一个网络中有两个主资格节点,可以继续工作,另一部分,由于只有一个master节点,则不会形成一个独立的集群,这个时候当网络恢复的时候,节点又会从新加入集群。

shard (分片)

每个索引包含多个shard,默认是5个,Shard是一个最小的Lucene索引单元。

  1. 分片,ES是分布式搜索引擎,每个索引有一个或多个分片,索引的数据被分配到各个分片上,相当于一桶水用了N个杯子装。
  2. 分片有助于横向扩展,N个分片会被尽可能平均地(rebalance)分配在不同的节点上(例如你有2个节点,4个主分片,那么每个节点会分到2个分片,后来你增加了2个节点,那么你这4个节点上都会有1个分片,这个过程叫relocation,ES感知后自动完成)。
  3. 分片是独立的,对于一个Search Request的行为,每个分片都会执行这个Request。
  4. 每个分片都是一个Lucene Index,所以一个分片只能存放 Integer.MAX_VALUE - 128 = 2,147,483,519 个docs。

当写入一个document的时候,ES通过对docid进行hash来确定将其放在哪个shard上面,然后在shard上面进行索引存储(可以通过指定routing字段来固定hash值,从而将document固定到某个分片上)。

replicas (副本)

replicas就是备份,ES采用的是Push Replication模式,当你往master主分片上面索引一个文档,该分片会复制该文档到分片副本中,这些分片也会索引这个文档。默认情况下一个索引创建5个分片一个备份,即 5primary + 5replica = 10个分片

  1. 主分片和备分片不会出现在同一个节点上(防止单点故障)。
  2. 如果只有一个节点,那么5个replica都无法分配(unassigned),此时cluster status会变成Yellow。
  3. primary分片丢失,replica分片就会被顶上去成为新的主分片,同时根据这个新的主分片创建新的replica,集群数据安然无恙。
  4. 索引请求只能发生在主分片上,replica不能执行索引请求。
  5. 对于一个索引,除非重建索引(reindex,需先close索引)否则不能调整分片的数目(主分片数, number_of_shards),但可以随时调整replica数(number_of_replicas)。

写数据

主要涉及ES为何能够做到实时查询的相关原理。

flush/reflush

当向elasticsearch发送创建document索引请求的时候,document数据会先进入到index buffer之后,与此同时会将操作记录在translog之中,当发生refresh时(数据从index buffer中进入filesystem cache的过程,时间很短,默认是1s一次)translog中的操作记录并不会被清除,而是当数据从filesystem cache中被写入磁盘之后才会将translog中清空。而从filesystem cache写入磁盘的过程就是flush

translog

  • translog的功能

    保证在filesystem cache中的数据不会因为elasticsearch重启或是发生意外故障的时候丢失。
    当系统重启时会从translog中恢复之前记录的操作。
    当对elasticsearch进行CRUD操作的时候,会先到translog之中进行查找,因为tranlog之中保存的是最新的数据。
    translog的清除时间到期时进行flush操作之后(将数据从filesystem cache刷入disk之中)。

  • flush操作的时间点

    es的各个shard会每个30分钟进行一次flush操作。
    当translog的数据达到某个上限的时候会进行一次flush操作。

  • translog和flush的一些配置项

    index.translog.flush_threshold_ops:当发生多少次操作时进行一次flush。默认是 unlimited。
    index.translog.flush_threshold_size:当translog的大小达到此值时会进行一次flush操作。默认是512mb。
    index.translog.flush_threshold_period:在指定的时间间隔内如果没有进行flush操作,会进行一次强制flush操作。默认是30m。
    index.translog.interval:多少时间间隔内会检查一次translog,来进行一次flush操作。es会随机的在这个值到这个值的2倍大小之间进行一次操作,默认是5s。

reindex

可以实现将远端数据index到本地,也可以将本地的index做修改字段等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200", // 远程es的ip和port列表
"socket_timeout": "1m",
"connect_timeout": "10s" // 超时时间设置
},
"index": "my_index_name", // 源索引名称
"query": { // 满足条件的数据
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest_index_name" // 目标索引名称
},
"script": { // 修改信息
"source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
"lang": "painless"
}

}

查询

当进行查询时,如果提供了查询的DocID,ES通过hash就知道Doc存在哪个shard上面,再通过routing table查询就知道在哪个node上面,然后去node上面去取就好了。如果不提供DocID,那么ES会在该Index(indics)分片所在的所有node上执行搜索,然后返回搜索结果,由coordinating node gather之后返回给用户。

mapping

ES默认是动态创建索引和索引类型的mapping的。这就相当于无需定义Schema,无需指定各个字段的索引规则就可以索引文件,很方便。但有时方便就代表着不灵活。比如,ES默认一个字段是要做分词的,但我们有时要搜索匹配整个字段却不行。如有统计工作要记录每个城市出现的次数。对于NAME字段,若记录“new york”文本,ES可能会把它拆分成“new”和“york”这两个词,分别计算这个两个单词的次数,而不是我们期望的“new york”。

这时,就需要我们在创建索引时定义mapping。此外,es支持多字段结构,例如:我们希望两个字段中有相同的值,一个用于搜索,一个用户排序;或者一个用于分词器分析,一个用于空白字符。例如:编写mapping文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{	
"index_type":{
"properties":{
"ID":{
"type":"string",
"index":"not_analyzed"
},
"NAME":{
"type":"string",
"fields":{
"NAME":{
"type":"string"
},
"raw":{
"type":"string",
"index":"not_analyzed"
}
}
}
}
}
}

就是说我们对于index_type这个索引类型,定义了它的mapping。重点是将NAME这个字段映射为两个,一个是需要做索引分析的NAME,另一个是不分析的raw,即不会拆分new york这种词组。这样我们在做搜索的时候,就可以对NAME.raw这个字段做term aggregation,获得所有城市出现的次数了。

phrase

首先说,wildcard/regexp/prefix这三个的性能都不好!wildcard和regexp查询的工作方式和prefix查询完全一样。它们也需要遍历倒排索引中的词条列表来找到所有的匹配词条,然后逐个词条地收集对应的文档ID。它们和prefix查询的唯一区别在于它们能够支持更加复杂的模式。

prefix,wildcard以及regexp查询基于词条进行操作。如果你在一个analyzed字段上使用了它们,它们会检查字段中的每个词条,而不是整个字段。比如,假设我们的title字段中含有”Quick brown fox”,它会产生词条quick,brown和fox。这个查询能够匹配:

1
{ "regexp": { "title": "br.*" }}

而不会匹配:

1
{ "regexp": { "title": "quick br*" }}

如果需要匹配字符串”quick brown fox”,可以采用phrase。phrase在ES中叫邻近匹配, slop值来用指定查询项之间可以分隔多远的距离。

1
2
3
4
5
6
7
8
9
10
11
12
POST /bookdb_index/book/_search
{
"query": {
"multi_match" : {
"query": "quick brown fox",
"fields": ["title", "summary"],
"type": "phrase",
"slop": 1
}
},
"_source": [ "title", "summary", "publish_date" ]
}

前面的查询的数据必须满足下面3个条件:

  1. quick, brown, fox 三个词必须同时出现
  2. brown 的位置必须比quick大1
  3. fox 的位置必须比quick大2

有关ES查询相关的方法,可以参考这篇文档, 总结的比较好!

Logstash

pipeline

  • 可以运行多个pipeline,这个需要指定一个配置文件pipelines.yml,在这个文件中指定每一个pipeline所使用的具体的input,filter,ouput的配置文件;
  • 一般情况下,一个logstash只运行一个pipeline,可以指定pipeline的worker数为cpu cores数,但是这样容易出现对多个input的相互阻塞的问题;
  • 多个pipeline的分别拥有隔离的Persistent queues 和 dead letter queues,但是需要小心设置其workers,因为会出现资源竞争的问题;

高可用场景

  • 直接使用beats对接到logstash,这样logstash只需要扩展节点,因为beats可以做客户端负载均衡;
  • 使用kafka,然后将多个logstash节点配置为consumer去消费同一个topic的group,这样kafka可做负载均衡;
  • 对于udp、tcp之类的力量,需要在logstash nodes前添加负载均衡设备

Plugins

Logstash有按照条件发送邮件的output plugins,按照这个的逻辑,可以实现一个告警的output plugins,在被匹配的规则满足条件后,调用webhook或者是直接发送邮件,从而实现日志告警功能。

0%