NATS Streaming实践

Rocket MQ在业务开发中用的比较多,无论是其功能还是易用性方面表现很不错。先前云平台代码中也是用RMQ作为消息中间件,但是在上生产的前夜,QA对微服务做高可用测试时发现一些连接相关的问题。连夜分析,深入追踪当时使用的一个RMQ非官方golang库,发现其代码比较乱,最终评估下来决定在放弃对RMQ的使用。

于是CNCF的NATS就进入了我们的视野,由于其本来就是基于golang实现,对于云平台来讲适配会更加友好;但是NATS缺乏消息持久化的支持,必须通过NATS Streaming才能够弥补这块的缺陷。

本文重点不是介绍NATS,而是NAT Streaming;接下来,就来讲下我在NAT Streaming使用中的一些实践。

NATS Streaming介绍

官方有这样一副图,大致意思是NATS Streaming Module都一定是需要attach到NATS上,将消息内容持久化到存储中,且并不单独对外提供访问接口的。

主要功能包括

  • 保障可达 (At-Least-Once delivery)
  • 消息、事件的持久化 (persistence)
  • 消费者可按需重复消费消息 (replay by subject)
  • 生产者、消费者端均支持限速 (rate limit)
  • 提供持久化的订阅 (Durable subscriptions)

HA环境

在这部分,我们先搭建一个由三个节点组成的集群,然后再简单了解一些常用的监控和调试方式。

NATS Streaming自带内嵌NATS Server,但通过分析参数列表,我们发现其貌似无法使用内嵌的NATS Server建立NAST集群。为了实现NATS Streaming的高可用,我们需要建立两层集群。

  • NATS Server集群
    无法使用NAT Streaming内嵌的NATS,必须维护一套独立的NATS集群。

  • NATS Streaming集群
    通过指定external NATS的形势来连接NATS,NATS Streaming集群只负责数据持久化,只要有一个外部NATS Server可用,就不会受到影响。

集群启动

将下列粘贴到一个文件(nats-boot.sh)中,然后接下来介绍几种方法来替换变量,然后分别在三台主机上运行生成的命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
### node-1 命令
###############
docker run --restart=always -d --net=host --name=nats-node-1 nats -m 8222 -p 4222 -cluster nats://0.0.0.0:6222 -routes nats://${NODE-2-IP}:6222,nats://${NODE-3-IP}:6222

docker run --restart=always --name=nats-streaming-node-1 -p 9222:9222 -d nats-streaming -m 9222 --stan_trace=true --stan_debug=true --debug=true --trace=true -store file -dir datastore -clustered -cluster_id ${NATS-CLUSTER-ID} -cluster_node_id node-1 -cluster_peers node-1,node-2,node-3 -ns nats://${NODE-1-IP}:6222,nats://${NODE-2-IP}:6222,nats://${NODE-3-IP}:6222

### node-2 命令
###############
docker run --restart=always -d --net=host --name=nats-node-2 nats -m 8222 -p 4222 -cluster nats://0.0.0.0:6222 -routes nats://${NODE-1-IP}:6222,nats://${NODE-3-IP}:6222

docker run --restart=always --name=nats-streaming-node-2 -p 9222:9222 -d nats-streaming -m 9222 --stan_trace=true --stan_debug=true --debug=true --trace=true -store file -dir datastore -clustered -cluster_id ${NATS-CLUSTER-ID} -cluster_node_id node-2 -cluster_peers node-1,node-2,node-3 -ns nats://${NODE-1-IP}:6222,nats://${NODE-2-IP}:6222,nats://${NODE-3-IP}:6222

### node-3 命令
###############
docker run --restart=always -d --net=host --name=nats-node-3 nats -m 8222 -p 4222 -cluster nats://0.0.0.0:6222 -routes nats://${NODE-1-IP}:6222,nats://${NODE-2-IP}:6222

docker run --restart=always --name=nats-streaming-node-3 -p 9222:9222 -d nats-streaming -m 9222 --stan_trace=true --stan_debug=true --debug=true --trace=true -store file -dir datastore -clustered -cluster_id ${NATS-CLUSTER-ID} -cluster_node_id node-3 -cluster_peers node-1,node-2,node-3 -ns nats://${NODE-1-IP}:6222,nats://${NODE-2-IP}:6222,nats://${NODE-3-IP}:6222
  • 通过环境变量设置

    环境变量设置模板如下:

    1
    2
    3
    4
    5
    NATS-CLUSTER-ID=cluster-1

    NODE-1-IP=1.1.1.1
    NODE-2-IP=2.2.2.2
    NODE-3-IP=3.3.3.3
  • 直接在vim中替换变量

    1
    2
    3
    4
    5
    6
    vim ./nats-boot.sh

    :%s/NATS-CLUSTER-ID/cluster-1/g
    :%s/NODE-1-IP/1.1.1.1/g
    :%s/NODE-2-IP/2.2.2.2/g
    :%s/NODE-3-IP/3.3.3.3/g
  • 命令行替换变量

    1
    2
    3
    4
    sed -i 's/NATS-CLUSTER-ID/cluster-1/g' nats-boot.sh
    sed -i 's/NODE-1-IP/1.1.1.1/g' nats-boot.sh
    sed -i 's/NODE-2-IP/2.2.2.2/g' nats-boot.sh
    sed -i 's/NODE-3-IP/3.3.3.3/g' nats-boot.sh

状态监控

nats streaming实现了监控的对应接口,当然还不够完善;不过可以对接外部exporter来将监控信息收集到prometheus等。

  • 集群状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    curl localhost:9222/streaming/serverz
    {
    "cluster_id": "${NATS-CLUSTER-ID}",
    "server_id": "2DTn1aiWKHXcrTWScNMYbB",
    "version": "0.11.2",
    "go": "go1.11.1",
    "state": "CLUSTERED",
    "now": "2019-03-13T08:05:32.976885437Z",
    "start_time": "2019-03-13T07:54:17.961989986Z",
    "uptime": "11m15s",
    "clients": 0,
    "subscriptions": 0,
    "channels": 0,
    "total_msgs": 0,
    "total_bytes": 0
    }
  • 存储使用情况

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    curl localhost:9222/streaming/storez
    {
    "cluster_id": "${NATS-CLUSTER-ID}",
    "server_id": "2DTn1aiWKHXcrTWScNMYbB",
    "now": "2019-03-13T08:08:49.634481176Z",
    "type": "RAFT_FILE",
    "limits": {
    "max_channels": 100,
    "max_msgs": 1000000,
    "max_bytes": 1024000000,
    "max_age": 0,
    "max_subscriptions": 1000,
    "MaxInactivity": 0
    },
    "total_msgs": 0,
    "total_bytes": 0
    }
  • client列表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    curl localhost:9222/streaming/clientsz
    {
    "cluster_id": "${NATS-CLUSTER-ID}",
    "server_id": "2DTn1aiWKHXcrTWScNMYbB",
    "now": "2019-03-13T08:10:38.048664063Z",
    "offset": 0,
    "limit": 1024,
    "count": 0,
    "total": 0,
    "clients": []
    }
  • channel信息查询

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    curl localhost:9222/streaming/channelsz
    {
    "cluster_id": "${NATS-CLUSTER-ID}",
    "server_id": "2DTn1aiWKHXcrTWScNMYbB",
    "now": "2019-03-13T08:11:22.338425862Z",
    "offset": 0,
    "limit": 1024,
    "count": 0,
    "total": 0
    }

NATS Streming客户端

官方文档见nats-straming client golang version

Subscribe

  • subscribe
    类似于广播,如果不使用用Queue Group,只要订阅了该topic的消费者均能够收取消息;
  • durable subscription
    通过client-id和durable name来唯一表示一个client。当该client网络中断后,nats会将错过的消息都记录下来,等待该client-id对应的client重新起来之后再发送消息过去继续消费。

Queue Group VS. Durable Queue Group

名称 功能 生命周期 效果
Queue Group 用来将一堆消费者归类,在这个组内的消费者,只有一个会收到消息,而非广播的。 当queue group中最后一个消费者离开的时候(调用unscribe或者close),queue group会被销毁。 假设最后一个member调用close退出queue group,queue group将被销毁。而在该member(相同client-id)重新上线之前,queue group所订阅topic(channel)的消息将被丢失。
Durable Queue Group 除了queue group的功能外,可以持久化消息,直到有新的消费者加入后继续消费。 可以当queueu group中所有的消费者都离开的时候,依然存在;当最后一个消费者调用unscribe的时候销毁。 假设最后一个member调用close退出queue group,queue group会被保留。而在该member(相同client-id)重新上线之前,queue group所订阅topic(channel)的消息将被在member起来之后,发送到member重新消费。

Client Id维护

client-id相同的两个member无法同时连接到nats streaming,而如果member挂掉了,新上线的member可以继续使用之前的client-id(IP地址可以不一致)可以继续从durable queue group中消费宕机时漏掉的消息。这里具体分析一下nats streaming行为。

已知使用durable queue group的last member未来得及执行unsubscribe和close操作就异常终止了;然后新的member(使用相同client-id)在另外一台主机上(IP不一致)启动,日志如下。

1
2
3
4
[1] 2019/03/17 10:15:14.464874 [DBG] STREAM: [Client:consumer-5] Suspended durable queue subscription, subject=nautilus-test, inbox=_INBOX.1ENluyqy0UX4IeohElmiGm, queue=mydurableName:oceanus, subid=9
[1] 2019/03/17 10:15:14.464912 [DBG] STREAM: [Client:consumer-5] Closed (Inbox=_INBOX.1ENluyqy0UX4IeohElmhyO)
[1] 2019/03/17 10:15:14.464925 [DBG] STREAM: [Client:consumer-5] Replaced old client (Inbox=_INBOX.Ah2vp3eh5BKTfcSVfq1Sv8)
[1] 2019/03/17 10:15:14.679385 [DBG] STREAM: [Client:consumer-5] Resumed durable queue subscription, subject=nautilus-test, inbox=_INBOX.Ah2vp3eh5BKTfcSVfq1T4o, queue=mydurableName:oceanus, subid=9

先前的member异常终止,server无法感知,因此等待新的member起来之后,才去check老的member的状态。具体操作流程为:向老的member对应client-id的Inbox发送消息,并等待反馈(因为nats-streaming不直接与客户端通信),如果无法得到响应就开始执行:

  1. Suspended durable queue subscription
  2. Close旧member的connection
  3. 由于两个member都使用相同的client-id,于是改变client-id映射的Inbox,即替换client-id对应的member
  4. Resumed durable queue subscription
0%