Kafka-Eagle搭建可视化工具

集群三台机器

在Hosts文件中建立域名和IP的映射配置(方便解析): vim /etc/hosts

192.168.0.1                     master
192.168.0.2                     slave1
192.168.0.3                     slave2

配置环境变量:vim /etc/profile

注意修改安装目录

#kafka eagle config
export KE_HOME=/opt/module/eagle
export PATH=$PATH:$KE_HOME/bin

使环境变量生效:source /etc/profile

主要配置修改:

######################################
# multi zookeeper & kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=master,slave1,slave2
master.zk.list=master:2181,slave1:2181,slave2:2181
slave1.zk.list=master:2181,slave1:2181,slave2:2181
slave2.zk.list=master:2181,slave1:2181,slave2:2181

######################################
# broker size online list
######################################
cluster1.kafka.eagle.broker.size=20

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=25

######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048

######################################
# kafka offset storage
######################################
master.kafka.eagle.offset.storage=kafka
slave1.kafka.eagle.offset.storage=kafka
slave2.kafka.eagle.offset.storage=kafka

######################################
# kafka metrics, 30 days by default
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=30

######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.fix.error=false

######################################
# delete kafka topic token
######################################
kafka.eagle.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
master.kafka.eagle.sasl.enable=false
master.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
master.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
master.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
master.kafka.eagle.sasl.client.id=
master.kafka.eagle.sasl.cgroup.enable=false
master.kafka.eagle.sasl.cgroup.topics=

slave1.kafka.eagle.sasl.enable=false
slave1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
slave1.kafka.eagle.sasl.mechanism=PLAIN
slave1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
slave1.kafka.eagle.sasl.client.id=
slave1.kafka.eagle.sasl.cgroup.enable=false
slave1.kafka.eagle.sasl.cgroup.topics=

slave2.kafka.eagle.sasl.enable=false
slave2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
slave2.kafka.eagle.sasl.mechanism=PLAIN
slave2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
slave2.kafka.eagle.sasl.client.id=
slave2.kafka.eagle.sasl.cgroup.enable=false
slave2.kafka.eagle.sasl.cgroup.topics=

######################################
# kafka sqlite jdbc driver address
######################################
#kafka.eagle.driver=org.sqlite.JDBC
#如需使用eagle自带数据库,则修改安装路径
#kafka.eagle.url=jdbc:sqlite:/opt/module/eagle/db/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://slave2:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123a123@

##授权启动文件:chomd 777 ke.sh

##启动ke.sh start

eagle常用命令start|stop|restart|status|stats|find|gc|jdk|version

访问:ip:8048/ke  admin/123456

Centos安装mysql8.0

注意:本次安装目录为/opt/module/

###下载mysql安装包
[root@chenxi module]# wget https://downloads.mysql.com/archives/get/p/23/file/mysql-8.0.18-linux-glibc2.12-x86_64.tar.xz

###解压安装包
[root@chenxi module]# tar -xf mysql-8.0.18-linux-glibc2.12-x86_64.tar.xz
[root@chenxi module]# mv mysql-8.0.18-linux-glibc2.12-x86_64 mysql
###在/etc目录下创建my.cnf初始文件插入以下并保存
[root@chenxi module]# vim /etc/my.cnf

[mysqld]
# 设置3306端口
port=3306
# 设置mysql的安装目录
basedir=/opt/module/mysql
# 设置mysql数据库的数据的存放目录
datadir=/opt/module/mysql/data
# 允许最大连接数
max_connections=10000
# 允许连接失败的次数。这是为了防止有人从该主机试图攻击数据库系统
max_connect_errors=10
# 服务端使用的字符集默认为UTF8MB4
# 搭建时使用UTF8报出警告如下:
# --character-set-server: 'utf8' is currently an alias for the character
# set UTF8MB3, but will be an alias for UTF8MB4 in a future release.
# Please consider using UTF8MB4 in order to be unambiguous
character-set-server=UTF8MB4
# 创建新表时将使用的默认存储引擎
default-storage-engine=INNODB
# 默认使用“mysql_native_password”插件认证
default_authentication_plugin=mysql_native_password
[mysql]
# 设置mysql客户端默认字符集
default-character-set=UTF8MB4
[client]
# 设置mysql客户端连接服务端时默认使用的端口
port=3306
default-character-set=UTF8MB4

###初始化mysql 
[root@chenxi module]# /opt/module/mysql/bin/mysqld --initialize --user=mysqluser --basedir=/opt/module/mysql/ --datadir=/opt/module/mysql/data/ 

###暂且保留好密码:root@localhost: dj91/8JC%k=Z

###创建用户组和用户并赋予mysql文件夹操作权限
[root@chenxi module]# groupadd mysqlgroup
[root@chenxi module]# useradd mysqluser
[root@chenxi module]# chown -R mysqluser:mysqlgroup /opt/module/mysql

###把启动命令加入到系统服务service管理并授权
[root@chenxi module]# cp support-files/mysql.server /etc/init.d/mysqld
[root@chenxi module]# chmod 755 /etc/init.d/mysqld

###切换用户启动
[root@chenxi module]# su mysqluser

###注意:切换用户如不切换用户启动会报如下错误:(大坑)



[mysqluser@chenxi module]# service mysqld start

###进入mysql(/opt/module/mysql/bin/mysql为mysql安装目录下的bin/mysql)
[mysqluser@chenxi module]# /opt/module/mysql/bin/mysql -uroot -p dj91/8JC%k=Z

###修改当前用户密码、创建新用户并授权、刷新缓存
mysql> alter user user() identified by '密码';
mysql> create user '用户名'@'%' identified by '密码';
mysql> grant all privileges on *.* to '用户名'@'%' with grant option;
mysql> flush privileges;

Kafka事务

Kafka0.11版本开始引入了事务支持事务可以保证KafkaExactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败 。

Producer事务:

    为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PIDTransaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID 
    为了管理TransactionKafka引入了一个新的组件Transaction CoordinatorProducer就是通过和Transaction Coordinator 交互获得Transaction ID 对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
 
Consumer事务:

    上述事务机制主要是从Producer方面考虑,对于Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费。这是由于Consumer 可以通过offset访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。 

 

Zookeeper+Kafka集群部署

1.下载zookeeper和kafka:

  • zk下载地址:http://archive.apache.org/dist/zookeeper/
  • kafka下载地址:https://kafka.apache.org/downloads.html
  • 本次搭建下载的zk和kafka版本是(zookeeper-3.4.14.tar.gz/kafka_2.11-0.11.0.0.tgz

服务器环境->准备三台服务器集群

Hosts文件中建立域名和IP的映射配置(方便解析): vim /etc/hosts

192.168.0.1                     master
192.168.0.2                     slave1
192.168.0.3                     slave2

1.每台服务器配置环境变量

##配置zk和kafka环境变量
[root@kafka-master module]# vim /etc/profile

###配置zk环境 
#zookeeper config
export ZK_HOME=/opt/module/zookeeper
export PATH=$ZK_HOME/bin:$PATH

###配置kafka环境
#kafka config
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

###使环境变量立即生效 
[root@kafka-master module]# source /etc/profile

本次把zookeeper和kafka的安装目录统一为:/opt/module

Zookeeper搭建集群:

###解压zookeeper:
[root@kafka-master module]# tar -zxvf zookeeper-3.4.14.tar.gz
###重命名zookeeper-3.4.14为zookeeper
[root@kafka-master module]# mv zookeeper-3.4.14 zookeeper
[root@kafka-master module]# cd zookeeper
###重命名zoo_sample.cfg为zoo.cfg
[root@kafka-master zookeeper]# mv conf/zoo_sample.cfg zoo.cfg
[root@kafka-master zookeeper]# vim conf/zoo.cfg
###修改zoo.cfg配置:

# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
# 修改zk数据存储路径
dataDir=/opt/module/zookeeper/data/

#在文件末尾追加以下配置项
#zk cluster
server.0=master:2888:3888
server.1=slave1:2888:3888
server.2=slave2:2888:3888

###新建刚配置的zk存储路径
[root@kafka-master zookeeper]# mkdir data
###新建myid文件并写入[master(0),slave1(1),slave2(2)]并保存
[root@kafka-master zookeeper]# vim data/myid

###配置其他两个服务器,然后依次启动三台机器 master,slave1,slave2
zk启动命令分别为:zkServer.sh start/stop/restart/status

启动完成分别查看三台zk状态:

[root@kafka-master zookeeper]# bin/zkServer.sh status

[root@kafka-slave-1 zookeeper]# bin/zkServer.sh status

[root@kafka-slave-2 zookeeper]# bin/zkServer.sh status


slave1被选举称为leader,master和slave2被选为follower。

三台分别显示leader或follower则搭建成功。

zookeeper搭建常见问题:https://chenxitag.com/zookeeper/problem
centos firewalld防火墙:https://chenxitag.com/centos/firewalld

Kafka搭建集群:

###解压kafka:
[root@kafka-master module]# tar -zxvf kafka_2.11-0.11.0.0.tgz
###重命名kafka_2.11-0.11.0.0为kafka
[root@kafka-master module]# mv kafka_2.11-0.11.0.0 kafka
[root@kafka-master module]# cd kafka
###修改kafka启动配置文件
[root@kafka-master kafka]# vim config/server.properties

###broker.id不可重复
master:broker.id = 0
slave1:broker.id = 1
slave2:broker.id = 2





###启动Kafka
[root@kafka-master kafka]# bin/kafka-server-start.sh -daemon config/server.properties

slave1,slave2机器亦是如此

kafka常用命令:https://chenxitag.com/kafka/cmd

Kafka常用命令

kafka启动:bin/kafka-server-start.sh -daemon config/server.properties
kafka停止:bin/kafka-server-stop.sh
选项说明:
--topic topic名称
--replication-factor 副本数
--partitions 分区数
--zookeeper 本机ip:zk端口号
--bootstrap-list 本机ip:kafka端口号
--broker-list 本机ip:kafka端口号
--from-beginning 查看历史消息

###查看当前服务器所有topic
[root@kafka-master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --list

###创建topic
[root@kafka-master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --create --topic topicname --partitions 3 --replication-factor 1

##删除topic
[root@kafka-master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --delete --topic topicname

##修改topic分区数  注意:只能增加,不能减少
[root@kafka-master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --alter --topic topicname --partitions 4

##查看topic详情
[root@kafka-master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --describe --topic topicname

##生产者发送消息
[root@kafka-master kafka]# bin/kafka-console-producer.sh --broker-list master:9092 --topic topicname

##消费者消费消息
[root@kafka-master kafka]# bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic topicname --from-beginning

Zookeeper搭建集群问题

错误日志:ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.14/bin/../conf/zoo.cfg
Error contacting service. It is probably not running.

解决方案:

1.注意myid文件内容是否对应zk集群配置的server.id
2.注意Server的防火墙是否开放zk端口

错误日志:
2020-02-18 14:36:53,275 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:FastLeaderElection@847] – Notification time out: 12800
2020-02-18 14:37:06,076 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:QuorumCnxManager@347] – Have smaller server identifier, so dropping the connection: (1, 0)
2020-02-18 14:37:06,077 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:QuorumCnxManager@347] – Have smaller server identifier, so dropping the connection: (2, 0)
2020-02-18 14:37:06,077 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:FastLeaderElection@847] – Notification time out: 25600
2020-02-18 14:37:31,678 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:QuorumCnxManager@347] – Have smaller server identifier, so dropping the connection: (1, 0)
2020-02-18 14:37:31,679 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:QuorumCnxManager@347] – Have smaller server identifier, so dropping the connection: (2, 0)
2020-02-18 14:37:31,680 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:FastLeaderElection@847] – Notification time out: 51200

解决方案:  保持这台有问题zk的现状, 按myid从小到大依次重启其他的zk机器;

原因:  zk是需要集群中所有机器两两建立连接的, 其中配置中的3555端口是用来进行选举时机器直接建立通讯的端口, 大id的server才会去连接小id的server,避免连接浪费.如果是最后重启myid最小的实例,该实例将不能加入到集群中, 因为不能和其他集群建立连接

Kafka进阶之幂等性以及分区分配策略

Exactly Once语义:

    将服务器的ACK级别设置为-1,可以保证ProducerServer之间不会丢失数据,即At Least Once 语义。相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。

    At Least Once可以保证数据不丢失,但是不能保证数据不重复;相对的, At Least Once可以保证数据不重复,但是不能保证数据不丢失。 
对于一些非常重要的信息,比如交易数据,下游数据消费者要求数据既不重复也不丢失0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。

    0.11 版本的 Kafka,引入了一项重大特性:幂等性。幂等性就是指Producer不论向Broker发送多少次重复数据,Broker都只会持久化一条。幂等性结合At Least Once语义,就构成了KafkaExactly Once语义。

                At Least Once + 幂等性 = Exactly Once



    要启用幂等性,只需要将Producer的参数中enable.idompotence设置为true(设置为true时,ACK默认设置为-1)    Kafka的幂等性实现是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条

    PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once 

Kafka消费者:

     consumer 采用 pull(拉) 模式从 broker 中读取数据。
 
     push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,pull模式则可以根据consumer的消费能力以适当的速率消费消息。

    pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回

分区分配策略 :

     一个consumer group中有多个consumer,一个topic有多个partition,所以会涉及到partition的分配问题,如何确定哪个partition由哪个consumer来消费Kafka有两种分配策略:RoundRobin(轮循)Range(范围)。

Kafka默认使用的是Range分配策略。
可设置partition.assignment.strategy的值进行自由调整(RoundRobin、Range


RoundRobin(轮循)根据消费者组来进行轮循划分,把消费者组看成一个整体来进行轮循。(需要保证同一个消费者组下的消费者订阅同一个topic)







Range(范围):根据(分区数量 / 消费者组数量 = 范围)划分。

假设有2个消费者(cA,cB)和2个topic分别有3个partition 
(topic-A{a-p-0,a-p-1,a-p-2}), topic-B{b-p-0,b-p-1,b-p-2}) 

分配后的结果是: 
M = 3/2
cA = a-p-0,a-p-1,b-p-0,b-p-1 
cB = a-p-2,b-p-2


导致前M个分配不均匀,消费者负载

Centos安装禅道

1.安装包下载

根据系统内核版本选择合适的安装包:https://www.zentao.net/dynamic/80201.html

查看服务器版本:
    [root@chenxi opt]# cat /etc/redhat-release
    [root@chenxi opt]# cat /proc/version

注意:linux一键安装包只能安装在/opt目录下,如果需要安装到其他目录。可以自己搭建环境后使用源码包安装,参考文档:http://www.zentao.net/book/zentaopmshelp/101.html

2.上传至安装目录:

sftp:/opt> put F:\下载\ZenTaoPMS.biz3.6.1.zbox_64.tar.gz

2.解压到/opt/module目录下

[root@chenxi opt]# tar -zxvf ZenTaoPMS.9.0.1.zbox_64.tar.gz -C /opt/

3.修改禅道自带apache(80)mysql(3306)端口

为了不占用服务器端口,根据自己情况可不修改

        ①修改禅道自带apache端口:

[root@chenxi opt]# /opt/zbox/zbox -ap 9000

       ② 修改禅道自带mysql端口:

[root@chenxi opt]#  /opt/zbox/zbox -mp 9001

4.重启禅道服务

[root@chenxi opt]# /opt/zbox/zbox restart

    显示  Apache is running    Mysql is running   则启动成功

相关命令:

/opt/zbox/zbox start :开启Apache和Mysql
/opt/zbox/zbox stop :停止Apache和Mysql
/opt/zbox/zbox restart :重启Apache和Mysql
5.创建数据库账号
    [root@chenxi opt]# /opt/zbox/auth/adduser.sh

输入账号 和 密码 mysql数据库:/opt/zbox/bin/mysql -u root -P 9001 -p

6.配置Firewall防火墙规则,允许端口访问
    开启9000端口:
    [root@chenxi opt]# firewall-cmd –zone=public –add-port=9000/tcp
    开启9001端口:

[root@chenxi opt]# firewall-cmd –zone=public –add-port=9001/tcp

7.重启防火墙,使规则生效:

[root@chenxi opt]# systemctl restart firewalld.service

8.浏览器访问 
    http://IP:9000/zentao/user-login-L3plbnRhby8=.html
    默认账号密码:admin/123456

Centos Firewalld防火墙设置

Centos7及以上默认安装了firewalld,如果没有安装的话,可以使用 
yum install firewalld firewalld-config进行安装。
1.启动防火墙
systemctl start firewalld
2.禁用防火墙
systemctl stop firewalld
3.设置开机启动
systemctl enable firewalld
4.停止并禁用开机启动
sytemctl disable firewalld
5.重启防火墙
firewall-cmd --reload
6.查看状态
systemctl status firewalld 或者 firewall-cmd --state
7.查看版本
firewall-cmd --version
8.查看帮助
firewall-cmd --help
9.查看区域信息
firewall-cmd --get-active-zones
10.查看指定接口所属区域信息
firewall-cmd --get-zone-of-interface=eth0
11.拒绝所有包
firewall-cmd --panic-on
12.取消拒绝状态
firewall-cmd --panic-off
13.查看是否拒绝
firewall-cmd --query-panic
14.将接口添加到区域(默认接口都在public)
firewall-cmd --zone=public --add-interface=eth0(永久生效再加上 --permanent然后reload(重启)防火墙)
15.设置默认接口区域
firewall-cmd --set-default-zone=public(立即生效,无需重启)
16.更新防火墙规则
firewall-cmd --reload:无需断开连接,就是firewalld特性之一动态 添加规则
firewall-cmd --complete-reload:需要断开连接,类似重启服务
17.查看指定区域所有打开的端口
firewall-cmd --zone=public --list-ports
18.在指定区域打开端口(记得重启防火墙)
firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)

Kafka框架基础概念

Kafka是一个分布式流平台,高吞吐量的分布式发布/订阅模式的消息队列(系统),它可以处理消费者在网站中的所有动作流数据,应用于大数据处理领域。

消息队列的好处:

1.灵活性&峰值处理能力 
    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 

2.解耦:
    允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

3.异步通信:
 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

4.可恢复性:

     系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 

5.缓冲:
     有助于控制和优化数据流经过系统的速度, 解决生产消息和消费消息的处理速度不一致的情况。

消息队列的两种模式:

1.点对点模式:一对一,消费者主动拉取数据,消费者收到数据后从队列中删除,即使有多个消费者同时消费数据,也能保证数据处理的顺序。

如下图:生产者发送消息到Queue,只有一个消费者能收到。

2.发布/订阅模式:一对多,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。

如下图:生产者发送到topic的消息,消费者订阅了生产者的topic才会收到消息。

主动拉取数据:

1.维护长轮询
2.生产者消息来了,通知消费者根据自身条件拉取

推送消费者数据:

主动推送多个消费者数据,并且多个消费方消费生产者数据

Kafka基础架构以及工作流程图:

Kafka-0.9版本之前,消费者消费offset(位置信息)存储在zk里面,Kafka-0.9版本之后(包含0.9版本)消费者消费位置信息offset存放在Kafka一个内置的topic中,该topic为__consumer_offsets。

1.Producer:消息生产者,向Kafka broker发消息的客户端

2.Consumer:消息消费者,向Kafka broker获取消息的客户端

3.Consumer Group(CG):消费者组,由多个consumer组成,消费者组内每个消费者负责消费不同的分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

4.Broker:一台Kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic

5.Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic

6.Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列

7.Replica(副本):为了保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且Kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower

8.Leader:每个分区多个副本的主,生产者发送数据的对象以及消费者的对象都是Leader

9.Follower:每个分区多个副本中的从,定时从Leader中同步数据,保持和Leader数据的同步,Leader发生故障时,某个follower会成为新的leader

Kafka文件存储机制:

一个topic可以有多个生产者写入数据,Kafka集群都会维护每个Topic的分区日志。

Kafka集群使用可配置的保留期限持久保留所有已发布的消息,如果将保留策略设置为两天,则在发布消息后的两天内,该记录可供使用,之后将被丢弃以释放空间。 

每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。 producer 生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的 offset。 消费者组中的每个消费者, 都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

    由于生产者的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个 segment对应两个文件——“.index”文件和“.log”文件。 这些文件位于一个文件夹下,该文件夹的命名规则为(topic 名称+分区序号) 
 
我集群了三台机器,定义topic名称为"start",创建topic时设置了3个partition(分区)。因此分别为:
(server-1/start-0,server-2/start-1,server-3/start-2)

server-1

server-2
server-3





    Kafka log文件查找,设置log文件索引,分区块查找,比如设置区间1kb大小文件索引下标为1,以此类推查找速度快
    .index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

Kafka生产者:

Kafka分区策略:

分区原因:
     1.方便在集群中扩展,每个Partition可以通过调整以适应所在的机器,而一个Topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了。
     2.可以提高高并发,因为可以以Partition为单位读写。

Kafka Akc(确认消息收到,保证数据的可靠性) :

    为保证Producer发送的数据能可靠的发送到指定的Topic,Topic的每个Partition收到Producer发送的数据后,都需要向Producer发送Akc(acknowledgement 确认收到),如果Producer收到Ack就会进行下一轮的发送,否则重新发送数据。

副本数据同步策略:

方案一:
  半数以上完成同步发送ack
  优点:延迟低
  缺点:选举新的leader时,容忍N台节点的故障,需要2N+1个副本

方案二:
  全部完成同步发送ack
  优点:选举新的leader时,容忍N台节点故障,需要N+1个副本
  缺点:延迟高

Kafka选择第二种方案,原因:
    ①.为了容忍N台节点故障,第一种方案需要2N+1个副本,第二种方案只需要N+1个副本,而Kafka每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
    ②.网络延迟对Kafka影响较小

ISR(in-syncreplica set):

    Leader维护了一个动态的ISR,意为和leader保持同步的follower集合,当ISR中follower完成数据的同步后,leader就会给follower发送ack,如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阀值由replica.lag.time.max.ms参数设定,leader发生故障后,从ISR中选举新的leader

Kafka Ack答应机制:

    对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功

Kafka Akc三种可靠级别:

acks:
     0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没写入磁盘就已经返回,当broker故障时有可能丢失数据。

     1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步之前leader故障,那么将丢失数据。

    -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,但是如果follower在同步完成后、broker发送数据之前,leader发生故障,会造成数据重复

故障处理细节:

Log文件中的HW和LEO:

HW(Hight Watermark):所有副本中最小的LEO,指的是消费者能见到的最大的offset,ISR队列中最小的LEO。
LEO(Log End Offset):每个副本最后一个offset。

如果此时Server-A-Leader(服务器-A-主节点)挂掉

Leader故障:
    leader发生故障后,会从ISR中选举新的leader,为了保证多个副本之间的数据一致性,其余的follower会先将个字的log文件高于HW的部分截取掉,然后从新的leader中同步数据。
    注意:只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

Follower故障:
    follower发生故障会被临时踢出ISR,等待该follower恢复后,follower会读取本地磁盘记录上的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等待该follower的LEO大于等于该partition的HW,等follower追上leader之后就可以重新加入ISR了。