消息中间件Kafka

1、简介

  • Kafka是一个分布式的基于发布/订阅模式消息队列Message Queue),主要应用于大数据实时处理领域。

  • 发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息

  • Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道流分析数据集成关键任务应用

  • 目前企业中比较常见的消息队列产品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。在大数据场景主要采用Kafka作为消息队列。在 JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ。

  • 使用消息队列的好处:

    • 解耦(类似SpringIOC):允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

    • 可恢复性:系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

  • 缓冲:有助于控制和优化数据流经过系统的速度, 解决生产消息和消费消息的处理速度不一致的情况。

  • 灵活性 & 峰值处理能力(削峰):在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

    • 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们

2、消费模式

2.1 点对点模式

  • 一对一,消费者主动拉取数据,消息收到后消息清除。

  • 消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费以后, queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

2.2 发布/订阅模式

  • 一对多,消费者消费数据之后不会清除消息。

  • 消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

3、基础架构

  • kafka架构有如下特点:

    • 为方便扩展,并提高吞吐量,一个topic分为多个partition。
    • 配合分区的设计,提出消费者组的概念,组内每个消费者并行消费。
    • 为提高可用性,为每个partition增加若干副本,类似NameNode HA。
    • ZK中记录谁是leader,Kafka2.8.0以后也可以配置不采用ZK。
  • Kafka整体架构如下图:

    • Producer:消息生产者,就是向kafka broker发消息的客户端。
    • Consumer:消息消费者,向kafka broker取消息的客户端(一个Partition(分区)只能被同个消费者组里的某个消费者消费,从而避免重复消费,即如果消费者组里的消费者个数如果大于某个topic的分区个数是无意义的,但使用消费者组能提高消费能力)。
    • Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
    • Broker:一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic
    • Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic
    • Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上, **一个topic可以分为多个partition**,每个partition是一个有序的队列。
    • Replica: 副本(Replication),为保证集群中的某个节点发生故障时, 该节点上的partition数据不丢失,且Kafka仍然能够继续工作Kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower
    • leader:每个分区多个副本的”主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader
    • follower:每个分区多个副本中的”从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的leader
  • Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。Controller的管理工作都是依赖于Zookeeper的。以下为partitionleader选举过程:

4、安装kafka

  • 使用docker安装单机版的kafka

    • 安装单机版zookeeper。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      [root@k8s-node4 ~]# docker pull wurstmeister/zookeeper
      Using default tag: latest
      latest: Pulling from wurstmeister/zookeeper
      a3ed95caeb02: Pull complete
      ef38b711a50f: Pull complete
      e057c74597c7: Pull complete
      666c214f6385: Pull complete
      c3d6a96f1ffc: Pull complete
      3fe26a83e0ca: Pull complete
      3d3a7dd3a3b1: Pull complete
      f8cc938abe5f: Pull complete
      9978b75f7a58: Pull complete
      4d4dbcc8f8cc: Pull complete
      8b130a9baa49: Pull complete
      6b9611650a73: Pull complete
      5df5aac51927: Pull complete
      76eea4448d9b: Pull complete
      8b66990876c6: Pull complete
      f0dd38204b6f: Pull complete
      Digest: sha256:7a7fd44a72104bfbd24a77844bad5fabc86485b036f988ea927d1780782a6680
      Status: Downloaded newer image for wurstmeister/zookeeper:latest
      docker.io/wurstmeister/zookeeper:latest
      [root@k8s-node4 ~]# docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
      89c7b8a1aac50835e5647bb2a60ff706a81ba4bdc246b62e72e1229d440c2853
    • 安装kafka集群。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      [root@k8s-node4 ~]# docker pull wurstmeister/kafka
      Using default tag: latest
      latest: Pulling from wurstmeister/kafka
      540db60ca938: Pull complete
      f0698009749d: Pull complete
      d67ee08425e3: Pull complete
      1a56bfced4ac: Pull complete
      dccb9e5a402a: Pull complete
      Digest: sha256:4916aa312512d255a6d82bed2dc5fbee29df717fd9efbdfd673fc81c6ce03a5f
      Status: Downloaded newer image for wurstmeister/kafka:latest
      docker.io/wurstmeister/kafka:latest

      # -e KAFKA_BROKER_ID=0:在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
      # -e KAFKA_ZOOKEEPER_CONNECT=ip:2181/kafka:配置zookeeper管理kafka的路径
      # -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://ip:9092:把kafka的地址端口注册给zookeeper
      # -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092:配置kafka的监听端口
      # 部署第一台kafka
      [root@k8s-node4 ~]# docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=119.91.110.181:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://119.91.110.181:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
      c08c7ef80c1e7ab3090bded93e931e82524c52d91a55885c6dc0e8eccd9432c2

      # 部署第二台kafka
      [root@k8s-node4 ~]# docker run -d --name kafka1 -p 9093:9093 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=119.91.110.181:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://119.91.110.181:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -t wurstmeister/kafka
      6ce02e83203539d2010bc0ec3531d35b9914729678b04e41b6e64e5e6aa2bc12

      # 部署第三台kafka
      [root@k8s-node4 ~]# docker run -d --name kafka2 -p 9094:9094 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=119.91.110.181:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://119.91.110.181:9094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -t wurstmeister/kafka
      265e18a4a1f60aec02a3fa81891bd00921fce01c8fa93c425ede2b84643937db
    • 测试生产并消费消息。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      # 生产者
      [root@k8s-node4 ~]# docker exec -it kafka /bin/sh
      / # cd /opt/kafka_2.13-2.8.1/bin
      /opt/kafka_2.13-2.8.1/bin # ./kafka-console-producer.sh --broker-list 10.0.8.3:9092 --topic test_kafka
      >hello kafka!

      # 消费者
      [root@k8s-node4 ~]# docker exec -it kafka /bin/sh
      / # cd /opt/kafka_2.13-2.8.1/bin
      /opt/kafka_2.13-2.8.1/bin # kafka-console-consumer.sh --bootstrap-server 10.0.8.3:9092 --topic test_kafka --from-beginning
      hello kafka!

5、命令行操作

  • 查看当前服务器中的所有topic。

    1
    2
    3
    4
    5
    6
    [root@k8s-node4 ~]# docker exec -it kafka /bin/sh
    / # cd /opt/kafka_2.13-2.8.1
    /opt/kafka_2.13-2.8.1 # bin/kafka-topics.sh --bootstrap-server 10.0.8.3:9092 --list
    __consumer_offsets
    test_kafk
    test_kafka
  • 创建first topic。

    1
    2
    3
    4
    5
    6
    # --topic:定义topic名
    # --replication-factor:定义副本数
    # --partitions:定义分区数
    # 为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个 partition,每个partition是一个有序的队列
    /opt/kafka_2.13-2.8.1 # bin/kafka-topics.sh --bootstrap-server 10.0.8.3:9092 --topic first --create --partitions 1 --replication-factor 3
    Created topic first.
  • 查看某个topic的详情。

    1
    2
    3
    /opt/kafka_2.13-2.8.1 # bin/kafka-topics.sh --bootstrap-server 10.0.8.3:9092 --topic first --describe
    Topic: first TopicId: DEqWNAD6SJ62Jm3xwoj89A PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
    Topic: first Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
  • 修改分区数(注意:分区数只能增加,不能减少;不能通过命令行的方式修改副本数)。

    1
    2
    3
    4
    5
    6
    /opt/kafka_2.13-2.8.1 # bin/kafka-topics.sh --bootstrap-server 10.0.8.3:9092 --topic first --alter --partitions 3
    /opt/kafka_2.13-2.8.1 # bin/kafka-topics.sh --bootstrap-server 10.0.8.3:9092 --topic first --describe
    Topic: first TopicId: DEqWNAD6SJ62Jm3xwoj89A PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
    Topic: first Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
    Topic: first Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
    Topic: first Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
  • 生产者发送消息,消费者监听消息。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    # 第一个消费者监听topic消息
    /opt/kafka_2.13-2.8.1 # bin/kafka-console-consumer.sh --bootstrap-server 10.0.8.3:9092 --topic first

    # 生产者发送消息
    /opt/kafka_2.13-2.8.1 # bin/kafka-console-producer.sh --bootstrap-server 10.0.8.3:9092 --topic first
    >hello

    # 第一个消费者收到消息
    /opt/kafka_2.13-2.8.1 # bin/kafka-console-consumer.sh --bootstrap-server 10.0.8.3:9092 --topic first
    hello

    # 开启新的消费者,设置--from-beginning参数从头获取消息,即会把主题中以往所有的数据都读取出来
    /opt/kafka_2.13-2.8.1 # kafka-console-consumer.sh --bootstrap-server 10.0.8.3:9092 --topic first --from-beginning
    hello
  • 删除指定topic

    1
    2
    # 需要server.properties中设置delete.topic.enable=true,否则只是标记删除,这里默认就是true。
    /opt/kafka_2.13-2.8.1 # bin/kafka-topics.sh --bootstrap-server 10.0.8.3:9092 --topic first --delete

6、kafka架构深入

6.1 工作流程

  • Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
  • topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文 件,该log文件中存储的就是producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

6.1.1 Zookeeper存储的Kafka信息

  • 启动Zookeeper客户端,通过ls命令可以查看kafka相关信息。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    [root@k8s-node4 ~]# docker exec -it zookeeper /bin/bash
    root@830ec2ba2435:/opt/zookeeper-3.4.13# bin/zkCli.sh

    [zk: localhost:2181(CONNECTED) 0] ls /
    [kafka, zookeeper]
    [zk: localhost:2181(CONNECTED) 1] ls /kafka
    [log_dir_event_notification, isr_change_notification, admin, consumers, cluster, config, feature, latest_producer_id_block, controller, brokers, controller_epoch]
    [zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers
    [seqid, topics, ids]
    [zk: localhost:2181(CONNECTED) 3] ls /kafka/brokers/ids
    [2, 1, 0]
  • 在zookeeper的服务端存储的Kafka相关信息:

6.1.2 Broker总体工作流程

  • (1)每台broker启动后都会在zk中注册brokerid信息。
  • (2)(3)向zk中注册controller,集群中哪个kafka实例先注册,谁将成为接下来的“老大”,注意,kafka集群中,每个实例自身都有一个controller,但是需要一个类似leader的controller,负责监听brokers节点的变化。
  • (4)controller进行leader选举。
  • (5)假设选举出的leader为broker1,controller将节点信息上传到zk。
  • (6)其他contorller从zk同步相关信息,目的是为了当controller leader挂了时可以上位。
  • (7假设broker1中leader挂了,(8)被leader controller监听到节点信息变化后,(9)将会从zk中重新拉取leader信息和isr信息,(10)进行新的leader的选举。
  • (11)重新选举完毕后更新leader以及isr信息到zk。

6.1.3 Broker重要参数

6.2 文件存储

6.2.1 文件存储机制

  • Topic数据的存储机制:

    • Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数 据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片索引机制, 将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件".index"文件存储大量的索引信息,".log"文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如,first这个topic有三个分区,则其对应的文件夹为first-0first-1first-2

    • 示例:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      # 创建first主题
      [root@k8s-node4 ~]# docker exec -it kafka /bin/sh
      / # cd /opt/kafka_2.13-2.8.1
      /opt/kafka_2.13-2.8.1 # bin/kafka-topics.sh --bootstrap-server 10.0.8.3:9092 --topic first --create --partitions 3 --replication-factor 3
      Created topic first.

      # 启动生产者,并发送消息。
      /opt/kafka_2.13-2.8.1 # bin/kafka-console-producer.sh --bootstrap-server 10.0.8.3:9092 --topic first
      >hello
      >world
      >kafka
      >aaa
      >bbb
      >ccc
      >ddd

      # 查看日志文件所在位置
      /opt/kafka_2.13-2.8.1 # cat config/server.properties
      …………
      # A comma separated list of directories under which to store log files
      log.dirs=/kafka/kafka-logs-227869cad198
      …………

      # 查看日志文件
      /opt/kafka_2.13-2.8.1 # cd /kafka/kafka-logs-227869cad198
      /kafka/kafka-logs-227869cad198 # ls -l
      total 28
      …………
      drwxr-xr-x 2 root root 4096 May 14 03:10 first-0
      drwxr-xr-x 2 root root 4096 May 14 03:10 first-1
      drwxr-xr-x 2 root root 4096 May 14 03:10 first-2
      …………
      /kafka/kafka-logs-227869cad198 # cd first-0
      /kafka/kafka-logs-227869cad198/first-0 # ls -l
      total 8
      -rw-r--r-- 1 root root 10485760 May 14 03:42 00000000000000000000.index
      -rw-r--r-- 1 root root 144 May 14 03:43 00000000000000000000.log
      -rw-r--r-- 1 root root 10485756 May 14 03:42 00000000000000000000.timeindex
      -rw-r--r-- 1 root root 8 May 14 03:42 leader-epoch-checkpoint

      # 通过工具查看index和log信息。
      /kafka/kafka-logs-227869cad198/first-1 # kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
      Dumping ./00000000000000000000.index
      offset: 0 position: 0
      /kafka/kafka-logs-227869cad198/first-1 # kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
      Dumping ./00000000000000000000.log
      Starting offset: 0
      baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1652499768062 size: 73 magic: 2 compresscodec: NONE crc: 1262695588 isvalid: true
      baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 73 CreateTime: 1652499785630 size: 71 magic: 2 compresscodec: NONE crc: 675604454 isvalid: true
    • 说明:日志存储参数配置:

6.2.2 文件清理策略

  • Kafka中默认的日志保存时间为7天,可以通过调整如下参数修改保存时间。

    • log.retention.hours,最低优先级小时,默认7天。
    • log.retention.minutes,分钟。
    • log.retention.ms,最高优先级毫秒。
    • log.retention.check.interval.ms,负责设置检查周期,默认5分钟。
  • 那么日志一旦超过了设置的时间,怎么处理呢?Kafka中提供的日志清理策略有deletecompact两种。

    • delete日志删除:将过期数据删除。log.cleanup.policy = delete所有数据启用删除策略。

      • 基于时间:默认打开。以segment中所有记录中的最大时间戳作为该文件时间戳。
      • 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment。log.retention.bytes,默认等于-1,表示无穷大。
    • compact日志压缩。对于相同key的不同value值,只保留最后一个版本。log.cleanup.policy = compact 所有数据启用压缩策略。

      • 压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大 的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。
      • 这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。

6.3 生产者

6.3.1 分区策略

分区的原因

  • 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

  • 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了。

  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

分区的原则

  • 默认的分区器DefaultPartitioner。

  • 需要将producer发送的数据封装成一个ProducerRecord对象。

    • 指明partition的情况下,直接将指明的值直接作为partiton值。
    • 没有指明partition值但有key的情况下,将keyhash值与topicpartition数进行取余得到partition值。
    • 既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成(linger.ms时间到),Kafka再随机一个分区进行使用(和上一次的分区不同)。

自定义分区器

  • 如果研发人员可以根据企业需求,自己重新实现分区器。例如我们实现一个分区器实现,发送过来的数据中如果包含aaa,就发往0号分区, 不包含aaa,就发往1号分区。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    // 自定义分区器,默认的分区器为org.apache.kafka.clients.producer.internals.DefaultPartitioner
    // 生产者使用自定义分区器需要:
    // Properties props = new Properties();
    // props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
    public class MyPartitioner implements Partitioner {
    /**
    * 返回信息对应的分区
    *
    * @param topic 主题
    * @param key 消息的 key
    * @param keyBytes 消息的 key 序列化后的字节数组
    * @param value 消息的 value
    * @param valueBytes 消息的 value 序列化后的字节数组
    * @param cluster 集群元数据可以查看分区信息
    * @return
    */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    String msgValues = value.toString();
    int partition;
    if (msgValues.contains("aaa")) {
    partition = 0;
    } else {
    partition = 1;
    }
    return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
    }

6.3.2 数据可靠性保证

  • 为保证producer发送的数据,能可靠的发送到指定的topictopic的每个partition收到producer发送的数据后,都需要向producer发送ackacknowledgement 确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

    1635647612298
    • 发送ack的时机:确保有followerleader同步完成,leader再发送ack,这样才能保证leader挂掉之后,能在follower中选举出新的leader

    • 对于有多少个follower同步完成之后再发送ack,有以下两种方案(副本数据同步策略):

      方案 优点 缺点
      半数以上完成同步,就发送ack 延迟低 选举新的leader时,容忍n台节点的故障,需要2n+1个副本
      全部完成同步,才发送ack 选举新的leader时,容忍n台节点的故障,需要n+1个副本 延迟高
      • Kafka选择了第二种方案,原因如下:
        • 同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
        • 虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。
        • 采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。为解决这个问题,**Leader维护了一个动态的in-sync replica set(ISR),意为和leader保持同步的follower集合(例如leader:0,isr:0,1,2) 。当ISR中的follower完成数据的同步之后,就会给leader发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定(默认30s,例如2超时则leader:0,isr:0,1)。 这样就不用等长期联系不上或者已经故障的节点,Leader发生故障之后,就会从ISR中选举新的leader。**
          • 老版本的kafka会同时考虑条数(follower和leader的相差条数,大于这个数会被踢出ISR)和时间(超过此同步时间的follower会被踢出ISR)两个因素。但在新版本中只保留了时间这个因素,因为实际上生产者是可以批量发送数据的,假设限制条数为10,而此时生产者一下子发送12条数据或者更多,那么在此瞬间ISR中的follower和leader相差条数就大于10了,于是会被踢出ISR。经过一段时间的同步后又会被加入ISR,如果接下来生产者又批量发送一堆数据,又会造成follower不断进出ISR,这期间会不断操作内存和zookeeper,耗费资源。
        • 如果分区副本设置为1个,或者ISR里应答的最小副本数量( min.insync.replicas默认为1)设置为1,和ack=1的效果是一 样的,仍然有丢数的风险(leader:0,isr:0)。因此数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
  • 对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡, 选择以下的配置。即acks参数配置:

    • 0:producer不等待brokerack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据。

    • 1:producer等待brokerackpartitionleader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据。

    • -1(all):producer等待brokerackpartitionleaderfollower(ISR里的)全部落盘成功后才返回ack。但是如果考虑ISR中只有一个leader的情况(其它副本都被踢出ISR),这时leader写完数据并发送ack后发生故障也会造成数据丢失(这是极端情况,一般不会发生)。而且如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复

    1
    2
    3
    4
    // 设置acks
    properties.put(ProducerConfig.ACKS_CONFIG, "all");
    // 重试次数retries,默认是 int 最大值,2147483647
    properties.put(ProducerConfig.RETRIES_CONFIG, 3);
  • 可靠性总结:

    • acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
    • acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
    • acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
    • 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。
  • 为了保证集群内服务器的数据一致性问题,还引入了LEOHW这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复):

    1652446798554
    • LEO (Log End Offset) :每个副本的最后一个offset,LEO其实就是最新的offset + 1。

    • HW (High Watermark) :指的是消费者能见到的最大的offsetISR队列中最小的LEO,所有副本中最小的LEO。

      • 如果follower发生故障。

        • ①Follower发生故障后会被临时踢出ISR。

          1652447089635
        • ②这个期间Leader和Follower继续接收数据。

          1652447152877
        • ③待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。

        • ④等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。

          1652447400805
      • 如果leader发生故障。

        • ①Leader发生故障之后,会从ISR中选出一个新的Leader(看6.7.2节的leader选举流程)。

          1652447665758
        • ②为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。

          1652447752316

6.3.3 数据去重

  • 数据重复分析:

    • 如果设置应答级别为-1,设想如果当生产者发送的消息已经完成了leader和follower的同步操作,但在leader向生产者应答之前宕机了,那么此时会挑一个其中一个follower成为新的leader,此时由于之前生产者没收到ack,那么隔一段时间后会继续重发消息,导致集群消息重复。

  • 数据传递语义:

    • 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2。

      • 将服务器的ACK级别设置为-1,可以保证ProducerServer之间不会丢失数据,即At Least Once语义。
      • At Least Once可以保证数据不丢失,但是不能保证数据不重复。
    • 最多一次(At Most Once)= ACK级别设置为0。

      • 将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。
      • At Most Once可以保证数据不重复,但是不能保证数据不丢失。
    • 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

      • 在0.11版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。

      • 0.11版本的Kafka,引入了一项重大特性:幂等性

        • 所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了KafkaExactly Once语义。即:At Least Once(保证数据不丢失) + 幂等性(保证数据不重复) = Exactly Once

        • 重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的。所以幂等性只能保证的是在单分区单会话内不重复

        • 要启用幂等性,只需要将Producer的参数中enable.idompotence设置为true即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once

6.3.4 数据有序与乱序

  • 单分区内有序,但需满足一定条件。

    • kafka在1.x版本之前保证数据单分区有序,条件如下:

      1
      max.in.flight.requests.per.connection=1  # 不需要考虑是否开启幂等性
    • kafka在1.x及以后版本保证数据单分区有序,条件如下:

      • 未开启幂等性:max.in.flight.requests.per.connection需要设置为1。
      • 开启幂等性:max.in.flight.requests.per.connection需要设置小于等于5。

      原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据(可以根据<PID, Partition, SeqNumber>中的SeqNumber保证有序), 故无论如何,都可以保证最近5个request的数据都是有序的。

  • 多分区,分区与分区间无序。

6.3.5 生产者如何提高吞吐量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class CustomProducerParameters {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();

// 连接kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");

// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

// 批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

// linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

// 压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

// 1 创建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "test" + i));
}

// 3 关闭资源
kafkaProducer.close();
}
}

6.4 消费者

6.4.1 消费方式

  • pull(拉)模式

    • consumer采用pull(拉)模式从broker中主动读取数据。
    • pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout
  • push(推)模式

    • Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有消费者的消费速率。例如推送的速度是50m/s, Consumer1、Consumer2就来不及处理消息。

      1652681667991
    • push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

6.4.2 消费者工作流程

6.4.3 消费者组原理

  • Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

    • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费

    • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者

      1652682626363
    • 如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。

6.4.4 消费者组初始化流程

  • coordinator:辅助实现消费者组的初始化和分区的分配。

  • coordinator节点选择 = groupid的hashcode值 % 50(__consumer_offsets的分区数量)

    • 例如:groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。

6.4.5 消费者组详细消费流程

1652684607496

6.4.5 分区分配策略以及再平衡

  • 一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定哪个partition由哪个consumer来消费。

  • 首先kafka设定了默认的消费逻辑:一个分区只能被同一个消费组(ConsumerGroup)内的一个消费者消费。在这个消费逻辑设定下,假设目前某消费组内只有一个消费者C0,订阅了一个topic,这个topic包含6个分区,也就是说这个消费者C0订阅了6个分区,这时候可能会发生下列三种情况:

    • 如果这时候消费者组内新增了一个消费者C1,这个时候就需要把之前分配给C0的6个分区拿出来3个分配给C1。
    • 如果这时候这个topic多了一些分区,就要按照某种策略,把多出来的分区分配给C0C1
    • 如果这时候C1消费者挂掉了或者退出了,不在消费者组里了,那所有的分区需要再次分配给C0
  • 以上三种情况其实就是kafka进行分区分配的前提条件(只有满足了这三个条件的任意一个,才会进行分区分配):

    • 同一个Consumer Group 内新增消费者。

    • 订阅的主题新增分区。

    • 消费者离开当前所属的Consumer Group,包括shuts downcrashes

  • Kafka有四种主流的分区分配策略:RangeRoundRobinStickyCooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略,把它们以逗号分隔就可以了。

6.4.5.1 Range以及再平衡

  • Range策略:该策略是面向每个主题的,首先会对同一个主题里面的分区按照序号进行排序,并把消费者线程按照字母顺序进行排序。然后用分区数除以消费者线程数量来判断每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。

    • 假设有个名为T1的主题,包含了7个分区,它有两个消费者(C0C1),其中C0num.streams(消费者线程)=1,C1num.streams=2。排序后的分区是:0,1,2,3,4,5,6。消费者线程排序后是:C0-0,C1-0,C1-1。一共有7个分区,3个消费者线程,进行计算7/3=2…1,商为2余数为1,则每个消费者线程消费2个分区,并且前面1个消费者线程多消费一个分区,结果会是这样的:

      消费者线程 对应消费的分区序号
      C0-0 0,1,2
      C1-0 3,4
      C1-1 5,6
      • 如果分区数是8,那么8/3=2余2,除不尽,那么C0-0和C1-0分别多消费一个。
    • 如果在上面的基础上订阅多个主题(分区数还是为7),我们假设有3个主题(T1T2T3),都有7个分区,那么按照咱们上面这种Range分配策略分配后的消费结果如下:

      消费者线程 对应消费的分区序号
      C0-0 T1(0,1,2),T2(0,1,2),T3(0,1,2)
      C1-0 T1(3,4),T2(3,4),T3(3,4)
      C1-1 T1(5,6),T2(5,6),T3(5,6)
    • 结论:

      • 如果只是针对1个topic而言,C0-0消费者多消费1个分区影响不是很大。但是如果有N多个topic,那么针对每个topic,消费者C0-0都将多消费1个分区,topic越多,C0-0消费的分区会比其他消费者明显多消费N个分区。
      • 在这种情况下,C0-0消费线程要多消费3个分区,这显然是不合理的,其实这就是Range分区分配策略的缺点。
    • 测试案例:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      # 修改主题first为7个分区。
      # 分区数可以增加,但是不能减少。
      [root@k8s-node4 ~]# docker exec -it kafka /bin/sh
      / # cd /opt/kafka_2.13-2.8.1
      /opt/kafka_2.13-2.8.1 # bin/kafka-topics.sh --bootstrap-server 119.91.110.181:9092 --alter --topic first --partitions 7
      /opt/kafka_2.13-2.8.1 # bin/kafka-topics.sh --bootstrap-server 119.91.110.181:9092 --describe --topic first
      Topic: first TopicId: PtxUiol9QW6-oDJgyohupQ PartitionCount: 7 ReplicationFactor: 3 Configs: segment.bytes=1073741824
      Topic: first Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
      Topic: first Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
      Topic: first Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
      Topic: first Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
      Topic: first Partition: 4 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
      Topic: first Partition: 5 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
      Topic: first Partition: 6 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2

      复制CustomConsumer类,创建CustomConsumer2。这样可以由三个消费者CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”,同时启动3个消费者。

      启动CustomProducer生产者,发送500条消息,随机发送到不同的分区(说明:Kafka默认的分区分配策略就是Range + CooperativeSticky,所以不需要修改策略)。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      public class CallBackProducer {
      public static void main(String[] args) throws InterruptedException {
      Properties props = new Properties();
      props.put("bootstrap.servers", "119.91.110.181:9092");
      props.put("acks", "all");
      props.put("retries", 1);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      props.put("key.serializer",
      "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer",
      "org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer<>(props);
      for (int i = 0; i < 500; i++) {
      producer.send(new ProducerRecord<>("first","test" + i,
      "test" + i), new Callback() {
      // 回调函数,该方法会在Producer收到ack时调用,为异步调用
      @Override
      public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception == null) {
      System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
      } else {
      exception.printStackTrace();
      }
      }
      });
      // 为了发送到不同分区
      Thread.sleep(2);
      }
      producer.close();
      }
      }

      观察3个消费者分别消费哪些分区的数据。

      停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。

      • 1号消费者:消费到3、4号分区数据。
      • 2号消费者:消费到5、6号分区数据。
      • 0号消费者的任务会整体被分配到1号消费者或者2号消费者。

      说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。

      再次重新发送消息观看结果(45s以后)。

      • 1号消费者:消费到0、1、2、3号分区数据。

      • 2号消费者:消费到4、5、6号分区数据。

      说明:消费者0已经被踢出消费者组,所以重新按照range方式分配(7/2=3余1)。

6.4.5.2 RoundRobin以及再平衡

  • RoundRobin策略:是针对集群中所有Topic而言,原理是把所有的partition和所有的consumer都列出来,然后按照hashcode进行排序,最后通过轮询算法来分配partition给到各个消费者。

    1652690982220
    • 测试案例:依次在CustomConsumer、CustomConsumer1、CustomConsumer2三个消费者代码中修改分区分配策略为RoundRobin。

      1
      2
      // 修改分区分配策略
      properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

      重启3个消费者,重复发送消息的步骤,观看分区结果。

      停止掉2号消费者,快速重新发送消息观看结果(45s以内,越快越好)。

      • 0号消费者:消费到0、1、4、6号分区数据。
      • 1号消费者:消费到2、3、5号分区数据。
      • 2号消费者的任务会按照RoundRobin的方式,把数据轮询分成0、3和6号分区数据,分别由1号消费者或者2号消费者消费。

      说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。

      再次重新发送消息观看结果(45s以后)。

      • 0号消费者:消费到0、2、4、6号分区数据。

      • 1号消费者:消费到1、3、5号分区数据。

      说明:消费者0已经被踢出消费者组,所以重新按照RoundRobin方式分配。

    • 使用RoundRobin分配策略时会出现两种情况:

      • 如果同一消费组内,所有的消费者订阅的消息都是相同的,那么RoundRobin策略的分区分配会是均匀的。 比如我们有3个消费者(C0C1C2),都订阅了2个主题(T0T1)并且每个主题都有3个分区(p0p1p2),那么所订阅的所有分区可以标识为T0p0T0p1T0p2T1p0T1p1T1p2。此时使用RoundRobin分配策略后,得到的分区分配结果如下:

        消费者线程 对应消费的分区序号
        C0 T0p0、T1p0
        C1 T0p1、T1p1
        C2 T0p2、T1p2
        • 可以看到,这时候的分区分配策略是比较平均的。
      • 如果同一消费者组内,所订阅的消息是不相同的,那么在执行分区分配的时候,就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个topic,那么在分配分区的时候,此消费者将不会分配到这个topic的任何分区。比如我们依然有3个消费者(C0C1C2),他们合在一起订阅了3个主题:T0T1T2C0订阅的是主题T0,消费者C1订阅的是主题T0T1,消费者C2订阅的是主题T0T1T2),这3个主题分别有1、2、3 个分区(即T0有1个分区(p0),T1有2个分区(p0p1),T2有3个分区(p0p1p2)),即整个消费者所订阅的所有分区可以标识为T0p0T1p0T1p1T2p0T2p1T2p2。此时如果使用RoundRobin分配策略,得到的分区分配过程和结果如下:

        • 依次以按顺序轮询的方式将这六个分区分配给三个consumer,如果当前consumer没有订阅当前分区所在的topic,则轮询的判断下一个consumer

          • 尝试将T0p0分配给C0,由于C0订阅了T0,因而可以分配成功。

          • 尝试将T1p0分配给C1,由于C1订阅了T1,因而可以分配成功。

          • 尝试将T1p1分配给C2,由于C2订阅了T1,因而可以分配成功。

          • 尝试将T2p0分配给C0,由于C0没有订阅T2,因而会轮询下一个consumer

          • 尝试将T2p0分配给C1,由于C1没有订阅t2,因而会轮询下一个consumer

          • 尝试将T2p0分配给C2,由于C2订阅了t2,因而可以分配成功。

          • 尝试将T2p1分配给C0,由于C0没有订阅T2,因而会轮询下一个consumer

          • 尝试将T2p1分配给C1,由于C1没有订阅T2,因而会轮询下一个consumer

          • 尝试将T2p1分配给C2,由于C2订阅了T2,因而可以分配成功。

          • 尝试将T2p2分配给C0,由于C0没有订阅T2,因而会轮询下一个consumer

          • 尝试将T2p2分配给C1,由于C1没有订阅T2,因而会轮询下一个consumer

          • 尝试将T2p2分配给C2,由于C2订阅了T2,因而可以分配成功。

            消费者线程 对应消费的分区序号
            C0 T0p0
            C1 T1p0
            C2 T1p1、T2p0、T2p1、T2p2
        • 这时候显然分配是不均匀的,因此在使用RoundRobin分配策略时,为了保证得均匀的分区分配结果,需要满足两个条件(如果无法满足,那最好不要使用RoundRobin分配策略):

          • 同一个消费者组里的每个消费者订阅的主题必须相同。
          • 同一个消费者组里面的所有消费者的num.streams(消费者线程)必须相等。

6.4.5.3 Sticky以及再平衡

  • 粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

  • Sticky策略:该策略在kafka的0.11.X版本才开始引入的,是目前最复杂也是最优秀的分配策略。它的设计主要实现了两个目的(如果这两个目的发生了冲突,优先实现第一个目的):①分区的分配要尽可能的均匀。②分区的分配尽可能的与上次分配的保持相同。

    • 测试案例:设置主题为first,7个分区;准备3个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

      1
      2
      // 注意:3 个消费者都应该注释掉,之后重启3个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。
      properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");

    重启3个消费者,重复发送消息的步骤,观看分区结果。可以看到会尽量保持分区的个数近似划分分区。

{% asset_img 1652693812147.png 1652693812147 %}
  
停止掉0号消费者,快速重新发送消息观看结果(45s以内,越快越好)。

+ 1号消费者:消费到4、5、6号分区数据。
  • 2号消费者:消费到0、3号分区数据。
    • 0号消费者的任务会按照粘性规则,尽可能均衡的随机分成0和1号分区数据,分别由1号消费者或者2号消费者消费。

说明:0号消费者挂掉后,消费者组需要按照超时时间45s来判断它是否退出,所以需要等待,时间到了45s后,判断它真的退出就会把任务分配给其他broker执行。

再次重新发送消息观看结果(45s以后)。

  • 1号消费者:消费到2、4、5、6号分区数据。

    • 2号消费者:消费到0、1、3号分区数据。

    说明:消费者0已经被踢出消费者组,所以重新按照粘性方式分配。

  • 比如我们有3个消费者(C0C1C2),都订阅了2个主题(T0T1)并且每个主题都有3个分区(p0p1p2),那么所订阅的所有分区可以标识为T0p0、T0p1、T0p2、T1p0、T1p1、T1p2。此时使用Sticky分配策略后,得到的分区分配结果如下:

    消费者线程 对应消费的分区序号
    C0 T0p0、T1p0
    C1 T0p1、T1p1
    C2 T0p2、T1p2
    • 上面结果和RoundRobin一样,但底层实现并不一样,这里假设C2故障退出了消费者组,然后需要对分区进行再平衡操作,如果使用的是RoundRobin分配策略,它会按照消费者C0C1进行重新轮询分配,再平衡后的结果如下:

      消费者线程 对应消费的分区序号

    | C0 | T0p0、T0p2、T1p1 |
    | C1 | T0p1、T1p0、T1p2 |

    • 但是如果使用的是Sticky分配策略,再平衡后的结果会是这样:

      消费者线程 对应消费的分区序号
      C0 T0p0、T1p0、T0p2
      C1 T0p1、T1p1、T1p2
      • 其保留了再平衡之前的消费分配结果,并将原来消费者C2的分配结果分配给了剩余的两个消费者C0C1,最终C0C1的分配还保持了均衡。
    • 这样处理的好处:发生分区重分配后,对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次处理一遍,这时就会浪费系统资源。而使用Sticky策略就可以让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而可以减少系统资源的损耗以及其它异常情况的发生。

    • 如果使用Sticky策略来解决上面RoundRobin策略出现的缺陷,即比如我们依然有3个消费者(C0C1C2),他们合在一起订阅了3个主题:T0T1T2C0订阅的是主题T0,消费者C1订阅的是主题T0T1,消费者C2订阅的是主题T0T1T2),这3个主题分别有1、2、3 个分区(即T0有1个分区(p0),T1有2个分区(p0p1),T2有3个分区(p0p1p2)),即整个消费者所订阅的所有分区可以标识为T0p0T1p0T1p1T2p0T2p1T2p2。此时如果使用Sticky分配策略,得到的分区分配过程和结果如下:

      消费者线程 对应消费的分区序号
      C0 T0p0
      C1 T1p0、T1p1
      C2 T2p0、T2p1、T2p2
      • 由于C0消费者没有订阅T1T2主题,因此如上这样的分配策略已经是这个问题的最优解了,这时如果消费者C0挂了,发生再平衡后的分配结果,如果是RoundRobin策略为:

        消费者线程 对应消费的分区序号
        C1 T0p0、T1p1
        C2 T1p0、T2p0、T2p1、T2p2
      • 如果使用Sticky策略,再平衡后分分配情况:

        消费者线程 对应消费的分区序号
        C1 T1p0、T1p1、T0p0
        C2 T2p0、T2p1、T2p2
        • 发现sticky只是把之前C0消耗的T0p0分配给了C1,我们结合资源消耗来看,这相比RoundRobin能节省更多的资源。

6.4.6 offset的默认维护位置

  • 由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

  • Kafka 0.9版本之前,consumer 默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic__consumer_offsets

    • __consumer_offsets主题里面采用key和value的方式存储数据。keygroup.id+topic+分区号value就是当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,也就是每个group.id+topic+分区号就保留最新数据。
  • 测试案例:

    • ①修改配置文件consumer.properties中的属性exclude.internal.topics=false,即设置成可以正常消费系统的主题。默认是true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      [root@k8s-node4 ~]# docker exec -it kafka /bin/sh
      / # cd /opt/kafka_2.13-2.8.1
      /opt/kafka_2.13-2.8.1 # cd config/
      /opt/kafka_2.13-2.8.1/config # ls
      connect-console-sink.properties connect-file-source.properties consumer.properties server.properties
      connect-console-source.properties connect-log4j.properties kraft tools-log4j.properties
      connect-distributed.properties connect-mirror-maker.properties log4j.properties trogdor.conf
      connect-file-sink.properties connect-standalone.properties producer.properties zookeeper.properties
      /opt/kafka_2.13-2.8.1/config # vim consumer.properties
      /bin/sh: vim: not found
      /opt/kafka_2.13-2.8.1/config # vi consumer.properties
      # 添加exclude.internal.topics=false
    • ②采用命令行方式,创建一个新的topic。

      1
      2
      /opt/kafka_2.13-2.8.1 # bin/kafka-topics.sh --bootstrap-server 119.91.110.181:9092 --create --topic testoffset --partitions 2 --replication-factor 2
      Created topic testoffset.
    • ③启动消费者消费testoffset数据。

      1
      /opt/kafka_2.13-2.8.1 # bin/kafka-console-consumer.sh --bootstrap-server 119.91.110.181:9092 --topic testoffset --group test
    • ④启动生产者往testoffset生产数据。

      1
      2
      3
      /opt/kafka_2.13-2.8.1 # bin/kafka-console-producer.sh --topic testoffset --bootstrap-server 119.91.110.181:9092
      >hello
      >world
    • ⑤此时消费者消费到数据。

      1
      2
      3
      /opt/kafka_2.13-2.8.1 # bin/kafka-console-consumer.sh --bootstrap-server 119.91.110.181:9092 --topic testoffset --group test
      hello
      world
    • ⑥查看消费者消费主题__consumer_offsets。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      /opt/kafka_2.13-2.8.1 # bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 119.91.110.181:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
      [test-consumer-group,__consumer_offsets,22]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1652755294817, expireTimestamp=None)
      [test-consumer-group,__consumer_offsets,30]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1652755294817, expireTimestamp=None)
      [test-consumer-group,__consumer_offsets,25]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1652755294817, expireTimestamp=None)
      …………
      [test,testoffset,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1652755298527, expireTimestamp=None)
      [test,testoffset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1652755298527, expireTimestamp=None)
      [test,testoffset,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1652755303527, expireTimestamp=None)
      [test,testoffset,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1652755303527, expireTimestamp=None)
      [test,testoffset,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1652755308527, expireTimestamp=None)
      …………

6.4.7 消费者组案例

  • 测试同一个消费者组中的消费者, 同一时刻只能有一个消费者消费。

    • ①修改%KAFKA_HOME\config\consumer.properties%文件中的group.id属性为任意其它值。

      1
      2
      3
      4
      [root@centos7-01 ~]# docker exec -it kafka /bin/sh
      / # cd /opt/kafka_2.13-2.7.1/config
      /opt/kafka_2.13-2.7.1/config # vi consumer.properties
      group.id=test-consumer-group1
    • ②打开两个终端,分别启动两个消费者监听。

      1
      2
      3
      [root@centos7-01 ~]# docker exec -it kafka /bin/sh
      / # cd /opt/kafka_2.13-2.7.1/
      /opt/kafka_2.13-2.7.1 # bin/kafka-console-consumer.sh --bootstrap-server 192.168.200.130:9092 --topic first --consumer.config config/consumer.properties
    • ③再打开一个终端,启动一个生产者并发送数据,会发现两个消费者窗口中,只有一个才会弹出消息

      1
      2
      3
      4
      [root@centos7-01 ~]# docker exec -it kafka /bin/sh
      / # cd /opt/kafka_2.13-2.7.1/
      /opt/kafka_2.13-2.7.1 # bin/kafka-console-producer.sh --broker-list 192.168.200.130:9092 --topic first
      > hello

6.5 高效读写数据

  • Kafka本身是分布式集群,可以采用分区技术,并行度高。

  • 读数据采用稀疏索引,可以快速定位要消费的数据。

  • 顺序写磁盘:

    • Kafkaproducer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

  • 页缓存 + 零拷贝技术:

    • 零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高
    1635690695165
    • PageCache页缓存:Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存 都当做了磁盘缓存来使用。

6.6 事务

  • Kafka从0.11版本开始引入了事务支持。事务可以保证KafkaExactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

6.6.1 Producer事务

  • 为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID(由客户端提供),并将Producer获得的PIDTransaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID

  • 为了管理TransactionKafka引入了一个新的组件Transaction CoordinatorProducer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

  • 说明:开启事务,必须开启幂等性

  • Producer在使用事务功能前,必须先自定义一个唯一的transactional.id。有了transactional.id,即使客户端挂掉了,它重启后也能继续处理未完成的事务。

    • ①:Kafka实现事务需要依靠幂等性,而幂等性需要指定producer id。所以Producer在启动事务之前,需要向TC服务申请producer id。
  • ②:producer id返回给Producer。

    • ③:Producer在接收到producer id后,就可以正常的发送消息了。不过发送消息之前,需要先将这些消息的分区地址,上传到TC服务。TC服务会将这些分区地址持久化到事务topic。然后Producer才会真正的发送消息,这些消息与普通消息不同,它们会有一个字段,表示自身是事务消息。
    • 这里需要注意下一种特殊的请求,提交消费位置请求,用于原子性的从某个topic读取消息,并且发送消息到另外一个topic。我们知道一般是消费者使用消费组订阅topic,才会发送提交消费位置的请求,而这里是由Producer发送的。Producer首先会发送一条请求,里面会包含这个消费组对应的分区(每个消费组的消费位置都保存在__consumer_offset topic的一个分区里),TC服务会将分区持久化之后,发送响应。Producer收到响应后,就会直接发送消费位置请求给GroupCoordinator。
    • ④:Producer发送完消息后,如果认为该事务可以提交了,就会发送提交请求到TC服务。Producer的工作至此就完成了,接下来它只需要等待响应。这里需要强调下,Producer会在发送事务提交请求之前,会等待之前所有的请求都已经发送并且响应成功。
  • ⑤、⑥:TC服务收到事务提交请求后,会先将提交信息先持久化到事务topic。持久化成功后,服务端就立即发送成功响应给Producer。

    • 在一般的二阶段提交中,协调者需要收到所有参与者的响应后,才能判断此事务是否成功,最后才将结果返回给客户。那如果TC服务在发送响应给Producer后,还没来及向分区发送请求就挂掉了,那么Kafka是如何保证事务完成。因为每次事务的信息都会持久化,所以TC服务挂掉重新启动后,会先从事务topic加载事务信息,如果发现只有事务提交信息,却没有后来的事务完成信息,说明存在事务结果信息没有提交到分区。
    • ⑦、⑧、⑨:找到该事务涉及到的所有分区,为每个分区生成提交请求,存到队列里等待发送。后台线程会不停的从队列里,拉取请求并且发送到分区。当一个分区收到事务结果消息后,会将结果保存到分区里,并且返回成功响应到TC服务。当TC服务收到所有分区的成功响应后,会持久化一条事务完成的消息到事务topic。至此,一个完整的事务流程就完成了。
  • Kafka的事务一共有如下5个API。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // 1 初始化事务
    void initTransactions();
    // 2 开启事务
    void beginTransaction() throws ProducerFencedException;
    // 3 在事务内提交已经消费的偏移量(主要用于消费者)
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
    String consumerGroupId) throws
    ProducerFencedException;
    // 4 提交事务
    void commitTransaction() throws ProducerFencedException;
    // 5 放弃事务(类似于回滚事务的操作)
    void abortTransaction() throws ProducerFencedException;
    • 单个Producer,使用事务保证消息的仅一次发送。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      public class CustomProducerTransaction {

      public static void main(String[] args) {
      // 0 配置
      Properties properties = new Properties();

      // 连接集群 bootstrap.servers
      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");

      // 指定对应的key和value的序列化类型 key.serializer
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

      // 指定事务id
      properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_01");

      // 1 创建kafka生产者对象
      KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

      kafkaProducer.initTransactions();

      kafkaProducer.beginTransaction();

      try {
      // 2 发送数据
      for (int i = 0; i < 5; i++) {
      kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i));
      }
      int i = 1 / 0;
      kafkaProducer.commitTransaction();
      } catch (Exception e) {
      kafkaProducer.abortTransaction();
      } finally {
      // 3 关闭资源
      kafkaProducer.close();
      }
      }
      }

6.6.2 Consumer事务

  • 上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

  • 如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比 如 MySQL)。

6.7 kafka副本

6.7.1 副本基本信息

  • Kafka 副本作用:提高数据可靠性。
  • Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  • Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。
  • Kafka分区中的所有副本统称为AR(Assigned Repllicas)。AR = ISR + OSR
    • ISR,表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。
    • OSR,表示Follower与Leader副本同步时,延迟过多的副本。

6.7.2 Leader选举流程

  • Kafka集群中有一个broker的Controller会被选举为Controller Leader,负责管理集群broker的上下线,所有topic的分区副本分配Leader选举等工作。

  • Controller的信息同步工作是依赖于Zookeeper的。

6.7.3 分区副本分配

  • 如果kafka服务器只有4个节点,那么设置kafka的分区数大于服务器台数,在kafka底层如何分配存储副本呢?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # 创建16个分区,3个副本。
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 16 --replication-factor 3 --topic second

    # 查看分区和副本情况。
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic second

    Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
    Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
    Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
    Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
    Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
    Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
    Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
    Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
    Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
    Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
    Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
    Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
    Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
    Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
    Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
    Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
    1652450834672

6.7.4 手动调整分区副本存储

  • 在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。

  • 需求:创建一个新的topic,4个分区,两个副本,名称为three。将该topic的所有副本都存储到broker0和 broker1两台服务器上。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # 创建一个新的topic,名称为three。
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 4 --replication-factor 2 --topic three

    # 创建副本存储计划(所有副本都指定存储在broker0、broker1中)。
    vim increase-replication-factor.json

    {
    "version":1,
    "partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
    {"topic":"three","partition":1,"replicas":[0,1]},
    {"topic":"three","partition":2,"replicas":[1,0]},
    {"topic":"three","partition":3,"replicas":[1,0]}]
    }

    # 执行副本存储计划。
    bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute

    # 验证副本存储计划。
    bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify

    # 查看分区副本存储情况。
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic three

6.7.5 Leader Partition负载平衡

  • 正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的 broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡

  • 下面拿一个主题举例说明,假设集群只有一个主题如下图所示:

    • 针对broker0节点,分区2的AR优先副本是0节点,但是0节点却不是Leader节点,所以不平衡数加1,AR副本总数是4,所以broker0节点不平衡率为1/4>10%,需要再平衡。
    • broker2和broker3节点和broker0不平衡率一样,需要再平衡。 Broker1的不平衡数为0,不需要再平衡。

6.7.6 增加副本因子

  • 在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    # 创建topic。
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic four

    # 创建副本存储计划(所有副本都指定存储在broker0、broker1、broker2中)。
    vim increase-replication-factor.json

    {
    "version":1,
    "partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},
    {"topic":"four","partition":1,"replicas":[0,1,2]},
    {"topic":"four","partition":2,"replicas":[0,1,2]}]
    }

    # 执行副本存储计划。
    bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute

6.8 数据积压

  • 如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可)

  • 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间 < 生产速度),使处理的数据小于生产的数据,也会造成数据积压。

7、Kakfa API

7.1 Producer API

7.1.1 消息发送流程

  • 在消息发送的过程中,涉及到了两个线程——main线程Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator, Sender线程不断从 RecordAccumulator中拉取消息发送到Kafka Broker

    • 相关参数:
      • batch.size:只有数据积累到batch.size之后,sender才会发送数据。
      • linger.ms: 如果数据迟迟未达到batch.sizesender等待linger.time之后就会发送数据。
  • 生产者重要参数列表:

    参数名称 描述
    bootstrap.servers 生产者连接集群所需的broker地 址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。
    key.serializer和value.serializer 指定发送消息的key和value的序列化类型。一定要写全类名。
    buffer.memory RecordAccumulator 缓冲区总大小,默认32m
    batch.size 缓冲区一批数据最大值,默认16k。适当增加该值,可 以提高吞吐量,但是如果该值设置太大,会导致数据 传输延迟增加。
    linger.ms 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
    acks 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和isr队列 里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的
    max.in.flight.requests.per.connection 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是1-5的数字。
    retries 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
    retry.backoff.ms 两次重试之间的时间间隔,默认是100ms
    enable.idempotence 是否开启幂等性,默认true,开启幂等性。
    compression.type 生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。

7.1.2 异步发送API

  • ①创建一个maven项目,引入以下kafka依赖:

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.1</version>
    </dependency>
  • ②编写代码,包括以下内容:

    • KafkaProducer:需要创建一个生产者对象,用来发送数据。

    • ProducerConfig:获取所需的一系列配置参数。

    • ProducerRecord:每条数据都要封装成一个ProducerRecord对象。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      // 不带回调函数的生产者
      public class MyProducer {
      public static void main(String[] args) {
      Properties props = new Properties();
      // 指定连接的kafka集群,在命令行中是broker-list
      // 可用ProducerConfig.BOOTSTRAP_SERVERS_CONFIG替代
      props.put("bootstrap.servers", "192.168.200.130:9092");
      // ACK应答级别,这里的ProducerConfig.ACKS_CONFIG="acks"
      props.put(ProducerConfig.ACKS_CONFIG, "all");
      // 重试次数
      props.put("retries", 1);
      // 批次大小,这里设置16k
      props.put("batch.size", 16384);
      // 等待时间
      props.put("linger.ms", 1);
      // RecordAccumulator,缓冲区大小,这里设置32M
      props.put("buffer.memory", 33554432);
      // 序列化机制,即指定key和value的序列化类
      // props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      // 创建生产者对象
      Producer<String, String> producer = new KafkaProducer<>(props);
      // 向test主题发送数据,假如kafka中不存在该主题会自动创建
      for (int i = 0; i < 10; i++) {
      producer.send(new ProducerRecord<>("test", "test-" + i,
      "test-" + i));
      }
      // 关闭连接
      producer.close();
      }
      }
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      // 带回调函数的生产者
      // 回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exception 为null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
      public class CallBackProducer {
      public static void main(String[] args) {
      Properties props = new Properties();
      props.put("bootstrap.servers", "192.168.200.130:9092");
      props.put("acks", "all");
      props.put("retries", 1);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      props.put("key.serializer",
      "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer",
      "org.apache.kafka.common.serialization.StringSerializer");

      Producer<String, String> producer = new KafkaProducer<>(props);
      for (int i = 0; i < 10; i++) {
      producer.send(new ProducerRecord<>("first","test" + i,
      "test" + i), new Callback() {
      // 回调函数,该方法会在Producer收到ack时调用,为异步调用
      // 消息发送失败会自动重试,不需要我们在回调函数中手动重试。
      @Override
      public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception == null) {
      System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
      } else {
      exception.printStackTrace();
      }
      }
      });
      }
      producer.close();
      }
      }
      1652327219626

7.1.3 同步发送API

  • 同步发送的意思就是,当一条消息发送之后,会阻塞当前线程,直至返回ack

  • 由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需再调用Future对象的get方法即可。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 10; i++) {
    // 只需在异步发送的基础上,再调用一下get()方法即可。
    producer.send(new ProducerRecord<String, String>("first", "test - 1"), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    ...
    }
    }).get();
    }

7.2 Consumer API

  • Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。
  • 由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。所以offset的维护是Consumer消费数据是必须考虑的问题。

7.2.1 订阅主题

  • 创建一个独立消费者,消费first主题中数据。注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不填写消费者组id会被自动填写随机的消费者组id

    1652684855007
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public class CustomConsumer {
    public static void main(String[] args) {

    // 0 配置
    Properties properties = new Properties();

    // 连接 bootstrap.servers
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

    // 反序列化
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    // 配置消费者组id
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");

    // 设置分区分配策略
    properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");

    // 1 创建一个消费者 "", "hello"
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

    // 2 订阅主题 first
    ArrayList<String> topics = new ArrayList<>();
    topics.add("first");
    kafkaConsumer.subscribe(topics);

    // 3 消费数据
    while (true){

    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
    System.out.println(consumerRecord);
    }

    kafkaConsumer.commitAsync();
    }
    }
    }

7.2.2 订阅分区

  • 创建一个独立消费者,消费first主题0号分区的数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public class CustomConsumerPartition {

    public static void main(String[] args) {
    // 0 配置
    Properties properties = new Properties();

    // 连接
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

    // 反序列化
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    // 组id
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

    // 1 创建一个消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

    // 2 订阅主题对应的分区
    ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
    topicPartitions.add(new TopicPartition("first",0));
    kafkaConsumer.assign(topicPartitions);

    // 3 消费数据
    while (true){

    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
    System.out.println(consumerRecord);
    }
    }
    }
    }

7.2.3 消费者组案例

  • 测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

    1652685151128
    • 复制一份基础消费者的代码,在IDEA中同时启动,即可启动同一个消费者组中的两个消费者。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      public class CustomConsumer1 {
      public static void main(String[] args) {

      // 0 配置
      Properties properties = new Properties();

      // 连接 bootstrap.servers
      properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

      // 反序列化
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

      // 配置消费者组id
      properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");
      // 设置分区分配策略
      properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");

      // 1 创建一个消费者 "", "hello"
      KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

      // 2 订阅主题 first
      ArrayList<String> topics = new ArrayList<>();
      topics.add("first");
      kafkaConsumer.subscribe(topics);

      // 3 消费数据
      while (true){

      ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

      for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
      System.out.println(consumerRecord);
      }
      }
      }
      }
    • 启动代码中的生产者发送消息,在IDEA控制台即可看到两个消费者在消费不同分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码Thread.sleep(2);)。

7.2.4 自动提交offset

  • 需要用到的类:

    • KafkaConsumer:需要创建一个消费者对象,用来消费数据。
    • ConsumerConfig:获取所需的一系列配置参数。
    • ConsumerRecord:每条数据都要封装成一个ConsumerRecord对象。
  • 为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。 自动提交offset的相关参数:

    • enable.auto.commit:是否开启自动提交offset功能,默认是true。
    • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
      public class MyConsumer {
    public static void main(String[] args) {
    Properties props = new Properties();
    // 设置连接的kafka集群
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
    // 设置offset自动提交
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    // offset自动提交的延时
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    // key和value的反序列化
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringDeserializer");
    // 设置消费者组
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata");
    // 创建消费者对象,"", "hello"
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    // 订阅主题,假如kafka中不存在该主题会自动创建
    consumer.subscribe(Collections.singletonList("first"));
    while (true) {
    // 每隔多长时间拉取数据
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    // 解析并打印ConsumerRecords
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    }
    }
    }

    运行结果如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # 生产者生产数据时指定key
    offset = 57, key = test-1, value = test-1
    offset = 58, key = test-3, value = test-3
    offset = 59, key = test-4, value = test-4
    offset = 60, key = test-5, value = test-5
    offset = 61, key = test-7, value = test-7
    offset = 123, key = test-0, value = test-0
    offset = 124, key = test-2, value = test-2
    offset = 125, key = test-6, value = test-6
    offset = 126, key = test-8, value = test-8
    offset = 127, key = test-9, value = test-9
    # 生产者生产数据时不指定key
    offset = 62, key = null, value = test-0
    offset = 63, key = null, value = test-1
    offset = 64, key = null, value = test-2
    offset = 65, key = null, value = test-3
    offset = 66, key = null, value = test-4
    offset = 67, key = null, value = test-5
    offset = 68, key = null, value = test-6
    offset = 69, key = null, value = test-7
    offset = 70, key = null, value = test-8
    offset = 71, key = null, value = test-9

7.2.5 手动提交offset

  • 虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offsetAPI

  • 手动提交offset的方法有两种:分别是commitSync(同步提交)commitAsync(异步提交)。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

    • 同步提交:必须等待offset提交完毕,再去消费下一批数据。由于同步提交offset有失败重试机制,故更加可靠。以下为同步提交offset的示例。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      public class SyncCommitOffset {
      public static void main(String[] args) {
      Properties props = new Properties();
      ...
      // 关闭自动提交offset
      props.put("enable.auto.commit", "false");
      ...
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList("test"));
      while (true) {
      // 消费者拉取数据
      ConsumerRecords<String, String> records =
      consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
      System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
      }
      // 同步提交,当前线程会阻塞直到offset提交成功
      consumer.commitSync();
      }
      }
      }
    • 异步提交:发送完提交offset请求后,就开始消费下一批数据了。虽然同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交offset的方式,以下为异步提交offset的示例。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      public class AsyncCommitOffset {
      public static void main(String[] args) {
      Properties props = new Properties();
      //关闭自动提交
      props.put("enable.auto.commit", "false");
      ...
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList("test"));
      while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
      System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
      }
      // 异步提交
      consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
      if (exception != null) {
      System.err.println("Commit failed for" + offsets);
      }
      }
      });
      }
      }
      }
  • 无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据的重复消费。

7.2.6 自定义存储offset

  • Kafka 0.9版本之前,offset存储在zookeeper,0.9版本及之后,默认将offset存储在Kafka的一个内置的topic中。除此之外,Kafka还可以选择自定义存储offset

  • offset的维护是相当繁琐的,因为需要考虑到消费者的Rebalace

  • 当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做Rebalance

  • 消费者发生Rebalance之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的offset位置继续消费。

  • 要实现自定义存储offset,需要借助ConsumerRebalanceListener,以下为示例代码,其中提交和获取offset的方法,需要根据所选的offset存储系统自行实现。

    • 如果是保存到mysql,可以在mysql的表中定义消费者组、主题、分区、offset字段
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    public class CustomSaveOffset {
    private static Map<TopicPartition, Long> currentOffset = new HashMap<>();

    public static void main(String[] args) {
    // 创建配置信息
    Properties props = new Properties();
    ...
    // 关闭自动提交offset
    props.put("enable.auto.commit", "false");
    ...
    // 创建一个消费者
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    // 消费者订阅主题
    consumer.subscribe(Arrays.asList("first"),
    new ConsumerRebalanceListener() {
    // 该方法会在Rebalance之前调用
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    commitOffset(currentOffset);
    }

    // 该方法会在Rebalance之后调用
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    currentOffset.clear();
    for (TopicPartition partition : partitions) {
    consumer.seek(partition, getOffset(partition));// 定位到最近提交的offset位置继续消费
    }
    }
    });

    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);// 消费者拉取数据
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
    }
    commitOffset(currentOffset);// 异步提交
    }
    }

    // 获取某分区的最新offset
    private static long getOffset(TopicPartition partition) {
    return 0;
    }

    // 提交该消费者所有分区的offset
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
    }
    }

7.2.7 指定Offset消费

  • 重置消费者offset可以通过设置auto.offset.reset(值为earliest、none或者latest(默认))来实现。

    • earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
    • latest(默认值):自动将偏移量重置为最新偏移量。
    • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
  • 此参数只有在以下两种情况之一会生效:

    • 该消费者组第一次消费(更换消费者组)。
    • 该消费者组的offset过期被删除。
    1
    2
    3
    4
    5
    6
    7
    8
    Properties props = new Properties();
    ...
    // earliest指从该消费者组上次已经消费的offset开始消费,与--from-beginning拥有相同的作用
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // 换组
    props.put("group.id", "abcd");
    ...
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    • ①开启一个消费者组消费test主题。
    • ②开启一个生产者往test主题写入数据。
    • ③消费者接收到消息并在控制台打印。
    • ④关闭消费者。
    • ⑤重启生产者继续往test主题写入数据。
    • ⑥重启消费者后,能够读取到第⑤步中生产者新写入的数据,即因为是同个消费者组,从上次earliest的位置即为已经消费到的offset的位置继续读取。(如果更换消费者组则是把test主题的消息从头消费)
  • 如果设置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")关闭自动提交,则会重复消费数据,因为当前消费者组的offset没有更新到kafka里进行持久化(offset在消费者的内存中,断电会失效)。

  • 任意指定offset位移开始消费:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    public class CustomConsumerSeek {

    public static void main(String[] args) {

    // 0 配置信息
    Properties properties = new Properties();

    // 连接
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

    // 反序列化
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    // 组id
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");

    // 1 创建消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

    // 2 订阅主题
    ArrayList<String> topics = new ArrayList<>();
    topics.add("first");
    kafkaConsumer.subscribe(topics);

    // 指定位置进行消费
    Set<TopicPartition> assignment = kafkaConsumer.assignment();

    // 保证分区分配方案已经制定完毕
    while (assignment.size() == 0){
    kafkaConsumer.poll(Duration.ofSeconds(1));
    assignment = kafkaConsumer.assignment();
    }

    // 指定消费的offset
    for (TopicPartition topicPartition : assignment) {
    kafkaConsumer.seek(topicPartition,100);
    }

    // 3 消费数据
    while (true){

    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

    System.out.println(consumerRecord);
    }
    }
    }
    }

7.2.8 指定时间消费

  • 在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    public class CustomConsumerSeekTime {

    public static void main(String[] args) {

    // 0 配置信息
    Properties properties = new Properties();

    // 连接
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

    // 反序列化
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    // 组id
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

    // 1 创建消费者
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

    // 2 订阅主题
    ArrayList<String> topics = new ArrayList<>();
    topics.add("first");
    kafkaConsumer.subscribe(topics);

    // 指定位置进行消费
    Set<TopicPartition> assignment = kafkaConsumer.assignment();

    // 保证分区分配方案已经制定完毕
    while (assignment.size() == 0){
    kafkaConsumer.poll(Duration.ofSeconds(1));

    assignment = kafkaConsumer.assignment();
    }

    // 希望把时间转换为对应的offset
    HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();

    // 封装对应集合
    for (TopicPartition topicPartition : assignment) {
    topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
    }

    Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);

    // 指定消费的offset
    for (TopicPartition topicPartition : assignment) {

    OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);

    kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
    }

    // 3 消费数据
    while (true){

    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

    System.out.println(consumerRecord);
    }
    }
    }
    }

7.2.9 漏消费和重复消费

  • 重复消费:已经消费了数据,但是offset没提交。自动提交offset引起

  • 漏消费:先提交offset后消费,有可能会造成数据的漏消费。设置offset为手动提交,当offset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。

  • 怎么能做到既不漏消费也不重复消费呢?详看消费者事务

7.3 自定义Interceptor

  • Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。

  • 对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

    • ①configure(configs):获取配置信息和初始化数据时调用。
    • ②onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
    • ③onAcknowledgement(RecordMetadata,Exception):该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producerIO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
    • ④close:关闭interceptor,主要用于执行一些资源清理工作。
  • 如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

  • 拦截器案例:实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。

    • ①创建一个时间戳拦截器。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      public class TimeInterceptor implements ProducerInterceptor<String, String> {
      @Override
      public void configure(Map<String, ?> configs) {
      }

      @Override
      public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
      // 创建一个新的record,把时间戳写入消息体的最前部
      return new ProducerRecord<String,String>(record.topic(), record.partition(), record.timestamp(), record.key(),
      "TimeInterceptor: " + System.currentTimeMillis() + "," + record.value());
      }

      @Override
      public void close() {
      }

      @Override
      public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
      }
      }
    • ②创建一个计数拦截器,用于统计发送消息成功和发送失败消息数,并在producer关闭时打印这两个计数器。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      public class CounterInterceptor implements ProducerInterceptor<String, String> {
      private int errorCounter = 0;
      private int successCounter = 0;

      @Override
      public void configure(Map<String, ?> configs) {
      }

      @Override
      public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
      return record;
      }

      @Override
      public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
      // 统计成功和失败的次数
      if (exception == null) {
      successCounter++;
      } else {
      errorCounter++;
      }
      }

      @Override
      public void close() {
      // 保存结果
      System.out.println("Successful sent: " + successCounter);
      System.out.println("Failed sent: " + errorCounter);
      }
      }
    • ③创建producer主程序。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      public class InterceptorProducer {
      public static void main(String[] args) {
      // 设置配置信息
      Properties props = new Properties();
      props.put("bootstrap.servers", "192.168.200.130:9092");
      props.put("acks", "all");
      props.put("retries", 3);
      props.put("batch.size", 16384);
      props.put("linger.ms", 1);
      props.put("buffer.memory", 33554432);
      props.put("key.serializer",
      "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer",
      "org.apache.kafka.common.serialization.StringSerializer");
      // 构建拦截链,有顺序
      List<String> interceptors = new ArrayList<>();
      interceptors.add("com.demo.interceptor.TimeInterceptor");
      interceptors.add("com.demo.interceptor.CounterInterceptor");
      props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
      String topic = "test";
      Producer<String, String> producer = new KafkaProducer<>(props);
      // 发送消息
      for (int i = 0; i < 10; i++) {
      ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
      producer.send(record);
      }
      // 一定要关闭producer,这样才会调用interceptor的close方法
      producer.close();
      }
      }
    • ④创建消费者:

      1
      2
      3
      4
      [root@centos7-01 ~]# docker exec -it kafka /bin/sh
      / # cd /opt/kafka_2.13-2.7.1/
      /opt/kafka_2.13-2.7.1 # bin/kafka-console-consumer.sh --bootstrap-server 192.168.200.130:9092 --topic test

      启动生产者后打印消息如下:

      消费者监听到消息: