失效链接处理 |
Kafka跨集群容灾备份 PDF 下载
本站整理下载:
提取码:u6rv
相关截图:
主要内容:
Kafka跨集群容灾备份
1、使用场景:
一般情况我们都是使用一套Kafka集群来完成业务,但是有些情况,如一些灾难恢复场景或者为地理相近的客户提供低延时服务场景下,我们会有多套Kafka集群同时工作的需求。这里主要说一下通过镜像工具MirrorMaker实现Kafka的跨集群数据备份
注:我们通常把数据在单个集群下不同节点之间的拷贝称为备份,把数据在集群间的拷贝称为镜像。
2、MirrorMaker架构
A、镜像流程:
从本质上说,MirrorMaker 就是一个消费者 + 生产者的程序。消费者负责从源集群(Source Cluster)消费数据,生产者负责向目标集群(Target Cluster)发送消息。MirrorMaker 连接的源集群和目标集群,会实时同步消息。整个镜像流程如下图所示:
B、多套集群间数据备份
下图中部署了三套集群:左边的源集群负责主要的业务处理;右上角的目标集群可以用于执行数据分析;而右下角的目标集群则充当源集群的热备份。
3、MirrorMaker工具使用
Kafka默认提供irrorMaker 命令行工具 kafka-mirror-maker 脚本,执行如下命令即可:
$ bin/kafka-mirror-maker.sh --consumer.config ./config/consumer.properties --producer.config ./config/producer.properties --num.streams 8 --whitelist ".*"
参数含义:
consumer.config 参数。它指定了 MirrorMaker 中消费者的配置文件地址
producer.config 参数。它指定了 MirrorMaker 内部生产者组件的配置文件地址
num.streams 参数。它告诉 MirrorMaker 要创建多少个 KafkaConsumer 实例
whitelist 参数。这个参数接收一个正则表达式。所有匹配该正则表达式的主题都会被自动地执行镜像。
注:建议在拷贝前将要同步的所有主题按照源集群上的规格在目标集群上等价地创建出来,否则默认就是根Broker 端参数 num.partitions 和 default.replication.factor 的默认值,自动将主题创建出来。
4、验证消息是否拷贝成功
A、验证是否所有的主题都能正确地同步到目标集群上。
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9093 --topic test --time -2
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9093 --topic test --time -1
-1 和 -2 分别表示获取某分区最新的位移和最早的位移,这两个位移值的差值就是这个分区当前的消息数
5、跨机房备份案例
该案例使用两个单机版kafka,模拟跨机房的两个集群。
源kafka:Hadoop001:9002
目标kafka:Hadoop002:9002
Kafka跨集群同步、创建Kafka集群镜像等相关问题,主要使用Kafka内置的MirrorMaker工具实现, 该工具通过Kafka consumer从源Kafka集群消费数据,然后通过一个内置的Kafka producer将数据重新推送到目标Kafka集群。(一般同步脚本跑在目标端集群,因为考虑到任务挂掉后不至于丢数据)
目标端创建配置文件:
执行命令:
../bin/kafka-mirror-maker.sh --consumer.config source_consumer.properties --producer.config Target_producer.properties --num.streams 1 --whitelist "test1"
测试:在源端topic中写入数据,在目标端进行消费:
6、注意事项:
进程开启方式—nohup
nohup ../bin/kafka-mirror-maker.sh --consumer.config source_consumer.properties --producer.config Target_producer.properties --num.streams 1 --whitelist "test1" &
配置文件参数说明:
ource_consumer.properties
#新版consumer摈弃了对zookeeper的依赖,使用bootstrap.servers告诉consumer kafka server的位置
bootstrap.servers=ip-188-33-33-31.eu-central-1.compute.internal:9092,ip-188-33-33-32.eu-central-1.compute.internal:9092,ip-188-33-33-33.eu-central-1.compute.internal:9092
#如果使用旧版Consumer,则使用zookeeper.connect
#zookeeper.connect=ip-188-33-33-31.eu-central-1.compute.internal:2181,ip-188-33-33-32.eu-central-1.compute.internal:2181,ip-188-33-33-33.eu-central-1.compute.internal:2181
1.compute.internal:2181
#change the default 40000 to 50000
request.timeout.ms=50000
#hange default heartbeat interval from 3000 to 15000
heartbeat.interval.ms=30000
#change default session timeout from 30000 to 40000
session.timeout.ms=40000
#consumer group id
group.id=africaBetMirrorGroupTest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
#restrict the max poll records from 2147483647 to 200000
max.poll.records=20000
#set receive buffer from default 64kB to 512kb
receive.buffer.bytes=524288
#set max amount of data per partition to override default 1048576
max.partition.fetch.bytes=5248576
#consumer timeout
#consumer.timeout.ms=5000
Target_producer.properties
bootstrap.servers=10.120.241.146:9092,10.120.241.82:9092,10.120.241.110:9092
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none
# message encoder
serializer.class=kafka.serializer.DefaultEncoder
使用kafka-consumer-groups.sh循环监控消费延迟:
bin/kafka-consumer-groups.sh --bootstrap-server ip-188-33-33-31.eu-central-1.compute.internal:9092,ip-188-33-33-32.eu-central-1.compute.internal:9092,ip-188-33-33-33.eu-central-1.compute.internal:9092 --describe --group africaBetMirrorGroupTest --new-consumer
Mirror Maker启动后,包含了一组消费者,这些消费者属于同一个group,并从多个topic上读取数据,所有的topic均使用该group.id,每个MirrorMaker 进程仅有一个生产者,该生产者将数据发送给目标集群的多个topic;
MirrorMaker 为每个消费者分配一个线程,消费者从源集群的topic和分区上读取数据,然后通过公共生产者将数据发送到目标集群上,官方建议尽量让 MirrorMaker 运行在目标数据中心里,因为长距离的跨机房网络相对而言更加不可靠,如果发生了网络分区,数据中心之间断开了连接,无法连接到集群的消费者要比一个无法连接到集群的生产者要安全得多。
如果消费者无法连接到集群,最多也就是无法消费数据,数据仍然会在 Kafka 集群里保留很长的一段时间,不会有丢失的风险。相反,在发生网络分区时如果 MirrorMaker 已经读取了数据,但无法将数据生产到目标集群上,就会造成数据丢失。 所以说远程读取比远程生成更加安全。
|