Rocket MQ在业务开发中用的比较多,无论是其功能还是易用性方面表现很不错。先前云平台代码中也是用RMQ作为消息中间件,但是在上生产的前夜,QA对微服务做高可用测试时发现一些连接相关的问题。连夜分析,深入追踪当时使用的一个RMQ非官方golang库,发现其代码比较乱,最终评估下来决定在放弃对RMQ的使用。
于是CNCF的NATS就进入了我们的视野,由于其本来就是基于golang实现,对于云平台来讲适配会更加友好;但是NATS缺乏消息持久化的支持,必须通过NATS Streaming才能够弥补这块的缺陷。
本文重点不是介绍NATS,而是NAT Streaming;接下来,就来讲下我在NAT Streaming使用中的一些实践。
NATS Streaming介绍
官方有这样一副图,大致意思是NATS Streaming Module都一定是需要attach到NATS上,将消息内容持久化到存储中,且并不单独对外提供访问接口的。
主要功能包括
- 保障可达 (At-Least-Once delivery)
- 消息、事件的持久化 (persistence)
- 消费者可按需重复消费消息 (replay by subject)
- 生产者、消费者端均支持限速 (rate limit)
- 提供持久化的订阅 (Durable subscriptions)
HA环境
在这部分,我们先搭建一个由三个节点组成的集群,然后再简单了解一些常用的监控和调试方式。
NATS Streaming自带内嵌NATS Server,但通过分析参数列表,我们发现其貌似无法使用内嵌的NATS Server建立NAST集群。为了实现NATS Streaming的高可用,我们需要建立两层集群。
NATS Server集群
无法使用NAT Streaming内嵌的NATS,必须维护一套独立的NATS集群。NATS Streaming集群
通过指定external NATS的形势来连接NATS,NATS Streaming集群只负责数据持久化,只要有一个外部NATS Server可用,就不会受到影响。
集群启动
将下列粘贴到一个文件(nats-boot.sh)中,然后接下来介绍几种方法来替换变量,然后分别在三台主机上运行生成的命令。
1 | ### node-1 命令 |
通过环境变量设置
环境变量设置模板如下:
1
2
3
4
5NATS-CLUSTER-ID=cluster-1
NODE-1-IP=1.1.1.1
NODE-2-IP=2.2.2.2
NODE-3-IP=3.3.3.3直接在vim中替换变量
1
2
3
4
5
6vim ./nats-boot.sh
:%s/NATS-CLUSTER-ID/cluster-1/g
:%s/NODE-1-IP/1.1.1.1/g
:%s/NODE-2-IP/2.2.2.2/g
:%s/NODE-3-IP/3.3.3.3/g命令行替换变量
1
2
3
4sed -i 's/NATS-CLUSTER-ID/cluster-1/g' nats-boot.sh
sed -i 's/NODE-1-IP/1.1.1.1/g' nats-boot.sh
sed -i 's/NODE-2-IP/2.2.2.2/g' nats-boot.sh
sed -i 's/NODE-3-IP/3.3.3.3/g' nats-boot.sh
状态监控
nats streaming实现了监控的对应接口,当然还不够完善;不过可以对接外部exporter来将监控信息收集到prometheus等。
集群状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16curl localhost:9222/streaming/serverz
{
"cluster_id": "${NATS-CLUSTER-ID}",
"server_id": "2DTn1aiWKHXcrTWScNMYbB",
"version": "0.11.2",
"go": "go1.11.1",
"state": "CLUSTERED",
"now": "2019-03-13T08:05:32.976885437Z",
"start_time": "2019-03-13T07:54:17.961989986Z",
"uptime": "11m15s",
"clients": 0,
"subscriptions": 0,
"channels": 0,
"total_msgs": 0,
"total_bytes": 0
}存储使用情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17curl localhost:9222/streaming/storez
{
"cluster_id": "${NATS-CLUSTER-ID}",
"server_id": "2DTn1aiWKHXcrTWScNMYbB",
"now": "2019-03-13T08:08:49.634481176Z",
"type": "RAFT_FILE",
"limits": {
"max_channels": 100,
"max_msgs": 1000000,
"max_bytes": 1024000000,
"max_age": 0,
"max_subscriptions": 1000,
"MaxInactivity": 0
},
"total_msgs": 0,
"total_bytes": 0
}client列表
1
2
3
4
5
6
7
8
9
10
11curl localhost:9222/streaming/clientsz
{
"cluster_id": "${NATS-CLUSTER-ID}",
"server_id": "2DTn1aiWKHXcrTWScNMYbB",
"now": "2019-03-13T08:10:38.048664063Z",
"offset": 0,
"limit": 1024,
"count": 0,
"total": 0,
"clients": []
}channel信息查询
1
2
3
4
5
6
7
8
9
10curl localhost:9222/streaming/channelsz
{
"cluster_id": "${NATS-CLUSTER-ID}",
"server_id": "2DTn1aiWKHXcrTWScNMYbB",
"now": "2019-03-13T08:11:22.338425862Z",
"offset": 0,
"limit": 1024,
"count": 0,
"total": 0
}
NATS Streming客户端
官方文档见nats-straming client golang version
Subscribe
- subscribe
类似于广播,如果不使用用Queue Group,只要订阅了该topic的消费者均能够收取消息; - durable subscription
通过client-id和durable name来唯一表示一个client。当该client网络中断后,nats会将错过的消息都记录下来,等待该client-id对应的client重新起来之后再发送消息过去继续消费。
Queue Group VS. Durable Queue Group
名称 | 功能 | 生命周期 | 效果 |
---|---|---|---|
Queue Group | 用来将一堆消费者归类,在这个组内的消费者,只有一个会收到消息,而非广播的。 | 当queue group中最后一个消费者离开的时候(调用unscribe或者close),queue group会被销毁。 | 假设最后一个member调用close退出queue group,queue group将被销毁。而在该member(相同client-id)重新上线之前,queue group所订阅topic(channel)的消息将被丢失。 |
Durable Queue Group | 除了queue group的功能外,可以持久化消息,直到有新的消费者加入后继续消费。 | 可以当queueu group中所有的消费者都离开的时候,依然存在;当最后一个消费者调用unscribe的时候销毁。 | 假设最后一个member调用close退出queue group,queue group会被保留。而在该member(相同client-id)重新上线之前,queue group所订阅topic(channel)的消息将被在member起来之后,发送到member重新消费。 |
Client Id维护
client-id相同的两个member无法同时连接到nats streaming,而如果member挂掉了,新上线的member可以继续使用之前的client-id(IP地址可以不一致)可以继续从durable queue group中消费宕机时漏掉的消息。这里具体分析一下nats streaming行为。
已知使用durable queue group的last member未来得及执行unsubscribe和close操作就异常终止了;然后新的member(使用相同client-id)在另外一台主机上(IP不一致)启动,日志如下。
1 | [1] 2019/03/17 10:15:14.464874 [DBG] STREAM: [Client:consumer-5] Suspended durable queue subscription, subject=nautilus-test, inbox=_INBOX.1ENluyqy0UX4IeohElmiGm, queue=mydurableName:oceanus, subid=9 |
先前的member异常终止,server无法感知,因此等待新的member起来之后,才去check老的member的状态。具体操作流程为:向老的member对应client-id的Inbox发送消息,并等待反馈
(因为nats-streaming不直接与客户端通信),如果无法得到响应就开始执行:
- Suspended durable queue subscription
- Close旧member的connection
- 由于两个member都使用相同的client-id,于是改变client-id映射的Inbox,即替换client-id对应的member
- Resumed durable queue subscription