Kafka事务

Kafka0.11版本开始引入了事务支持事务可以保证KafkaExactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败 。

Producer事务:

    为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PIDTransaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID 
    为了管理TransactionKafka引入了一个新的组件Transaction CoordinatorProducer就是通过和Transaction Coordinator 交互获得Transaction ID 对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
 
Consumer事务:

    上述事务机制主要是从Producer方面考虑,对于Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费。这是由于Consumer 可以通过offset访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。 

 

Kafka进阶之幂等性以及分区分配策略

Exactly Once语义:

    将服务器的ACK级别设置为-1,可以保证ProducerServer之间不会丢失数据,即At Least Once 语义。相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。

    At Least Once可以保证数据不丢失,但是不能保证数据不重复;相对的, At Least Once可以保证数据不重复,但是不能保证数据不丢失。 
对于一些非常重要的信息,比如交易数据,下游数据消费者要求数据既不重复也不丢失0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。

    0.11 版本的 Kafka,引入了一项重大特性:幂等性。幂等性就是指Producer不论向Broker发送多少次重复数据,Broker都只会持久化一条。幂等性结合At Least Once语义,就构成了KafkaExactly Once语义。

                At Least Once + 幂等性 = Exactly Once



    要启用幂等性,只需要将Producer的参数中enable.idompotence设置为true(设置为true时,ACK默认设置为-1)    Kafka的幂等性实现是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条

    PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once 

Kafka消费者:

     consumer 采用 pull(拉) 模式从 broker 中读取数据。
 
     push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,pull模式则可以根据consumer的消费能力以适当的速率消费消息。

    pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回

分区分配策略 :

     一个consumer group中有多个consumer,一个topic有多个partition,所以会涉及到partition的分配问题,如何确定哪个partition由哪个consumer来消费Kafka有两种分配策略:RoundRobin(轮循)Range(范围)。

Kafka默认使用的是Range分配策略。
可设置partition.assignment.strategy的值进行自由调整(RoundRobin、Range


RoundRobin(轮循)根据消费者组来进行轮循划分,把消费者组看成一个整体来进行轮循。(需要保证同一个消费者组下的消费者订阅同一个topic)







Range(范围):根据(分区数量 / 消费者组数量 = 范围)划分。

假设有2个消费者(cA,cB)和2个topic分别有3个partition 
(topic-A{a-p-0,a-p-1,a-p-2}), topic-B{b-p-0,b-p-1,b-p-2}) 

分配后的结果是: 
M = 3/2
cA = a-p-0,a-p-1,b-p-0,b-p-1 
cB = a-p-2,b-p-2


导致前M个分配不均匀,消费者负载

Kafka框架基础概念

Kafka是一个分布式流平台,高吞吐量的分布式发布/订阅模式的消息队列(系统),它可以处理消费者在网站中的所有动作流数据,应用于大数据处理领域。

消息队列的好处:

1.灵活性&峰值处理能力 
    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 

2.解耦:
    允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

3.异步通信:
 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

4.可恢复性:

     系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 

5.缓冲:
     有助于控制和优化数据流经过系统的速度, 解决生产消息和消费消息的处理速度不一致的情况。

消息队列的两种模式:

1.点对点模式:一对一,消费者主动拉取数据,消费者收到数据后从队列中删除,即使有多个消费者同时消费数据,也能保证数据处理的顺序。

如下图:生产者发送消息到Queue,只有一个消费者能收到。

2.发布/订阅模式:一对多,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。

如下图:生产者发送到topic的消息,消费者订阅了生产者的topic才会收到消息。

主动拉取数据:

1.维护长轮询
2.生产者消息来了,通知消费者根据自身条件拉取

推送消费者数据:

主动推送多个消费者数据,并且多个消费方消费生产者数据

Kafka基础架构以及工作流程图:

Kafka-0.9版本之前,消费者消费offset(位置信息)存储在zk里面,Kafka-0.9版本之后(包含0.9版本)消费者消费位置信息offset存放在Kafka一个内置的topic中,该topic为__consumer_offsets。

1.Producer:消息生产者,向Kafka broker发消息的客户端

2.Consumer:消息消费者,向Kafka broker获取消息的客户端

3.Consumer Group(CG):消费者组,由多个consumer组成,消费者组内每个消费者负责消费不同的分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

4.Broker:一台Kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic

5.Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic

6.Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列

7.Replica(副本):为了保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且Kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower

8.Leader:每个分区多个副本的主,生产者发送数据的对象以及消费者的对象都是Leader

9.Follower:每个分区多个副本中的从,定时从Leader中同步数据,保持和Leader数据的同步,Leader发生故障时,某个follower会成为新的leader

Kafka文件存储机制:

一个topic可以有多个生产者写入数据,Kafka集群都会维护每个Topic的分区日志。

Kafka集群使用可配置的保留期限持久保留所有已发布的消息,如果将保留策略设置为两天,则在发布消息后的两天内,该记录可供使用,之后将被丢弃以释放空间。 

每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。 producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的 offset。 消费者组中的每个消费者, 都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

    由于生产者的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个 segment对应两个文件——“.index”文件和“.log”文件。 这些文件位于一个文件夹下,该文件夹的命名规则为(topic 名称+分区序号) 
 
我集群了三台机器,定义topic名称为"start",创建topic时设置了3个partition(分区)。因此分别为:
(server-1/start-0,server-2/start-1,server-3/start-2)

server-1

server-2
server-3





    Kafka log文件查找,设置log文件索引,分区块查找,比如设置区间1kb大小文件索引下标为1,以此类推查找速度快
    .index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

Kafka生产者:

Kafka分区策略:

分区原因:
     1.方便在集群中扩展,每个Partition可以通过调整以适应所在的机器,而一个Topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了。
     2.可以提高高并发,因为可以以Partition为单位读写。

Kafka Akc(确认消息收到,保证数据的可靠性) :

    为保证Producer发送的数据能可靠的发送到指定的Topic,Topic的每个Partition收到Producer发送的数据后,都需要向Producer发送Akc(acknowledgement 确认收到),如果Producer收到Ack就会进行下一轮的发送,否则重新发送数据。

副本数据同步策略:

方案一:
  半数以上完成同步发送ack
  优点:延迟低
  缺点:选举新的leader时,容忍N台节点的故障,需要2N+1个副本

方案二:
  全部完成同步发送ack
  优点:选举新的leader时,容忍N台节点故障,需要N+1个副本
  缺点:延迟高

Kafka选择第二种方案,原因:
    ①.为了容忍N台节点故障,第一种方案需要2N+1个副本,第二种方案只需要N+1个副本,而Kafka每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
    ②.网络延迟对Kafka影响较小

ISR(in-syncreplica set):

    Leader维护了一个动态的ISR,意为和leader保持同步的follower集合,当ISR中follower完成数据的同步后,leader就会给follower发送ack,如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阀值由replica.lag.time.max.ms参数设定,leader发生故障后,从ISR中选举新的leader

Kafka Ack答应机制:

    对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功

Kafka Akc三种可靠级别:

acks:
     0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没写入磁盘就已经返回,当broker故障时有可能丢失数据。

     1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步之前leader故障,那么将丢失数据。

    -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,但是如果follower在同步完成后、broker发送数据之前,leader发生故障,会造成数据重复

故障处理细节:

Log文件中的HW和LEO:

HW(Hight Watermark):所有副本中最小的LEO,指的是消费者能见到的最大的offset,ISR队列中最小的LEO。
LEO(Log End Offset):每个副本最后一个offset。

如果此时Server-A-Leader(服务器-A-主节点)挂掉

Leader故障:
    leader发生故障后,会从ISR中选举新的leader,为了保证多个副本之间的数据一致性,其余的follower会先将个字的log文件高于HW的部分截取掉,然后从新的leader中同步数据。
    注意:只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

Follower故障:
    follower发生故障会被临时踢出ISR,等待该follower恢复后,follower会读取本地磁盘记录上的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等待该follower的LEO大于等于该partition的HW,等follower追上leader之后就可以重新加入ISR了。