JVM运行时数据区-虚拟机栈

Java虚拟机栈(Java Virtual Machine Stack):

基本概述:

      跨平台设计,Java指令根据栈来设计的,不同平台CPU架构不同,所以不能设计为基于寄存器的

      线程私有的,每个线程创建时都会创建一个虚拟机栈,其内部保存一个个栈帧(Stack Frame),一个栈帧对应一个Java方法。

内存中的栈与堆的区别:

    堆:存储数据区域
    栈:运行时数据区域


栈与堆数据存放区别:
   堆:主体数据存放堆区(对象存放堆中)
   栈:局部变量存放在栈空间中(基本数据类型),如果是引用数据类型(在栈空间存放数据对象的引用)。

栈的生命周期:

 栈的生命周期和线程一致。

栈的作用:

栈是Java程序运行时的一块区域,用于保存局部变量,部分结果并参与方法的调用和返回。

局部变量:
基本类型变量:8种基本数据类型变量
引用类型变量:(类、数组、接口)在栈中存放,对象引用地址。

栈的优点:

栈是一种有效的分配存储方式,访问速度仅次于程序计数器(PC寄存器)。

Java栈操作:
    1) 每个方法执行(入栈)
    2) 方法执行结束(出栈)
    3) 遵循先进后出原则

栈不存在GC垃圾回收问题(因为都是入栈/出栈操作)
栈会出现内存溢出问题(可通过设置栈内存大小 -Xss)

栈中异常问题:

Java虚拟机规范允许Java栈大小是动态扩展固定大小的。

固定大小:如果线程请求分配的栈容量超过Java虚拟机栈允许的最大容量,Java虚拟机栈会抛出StackOverflowError(栈溢出)异常

动态扩展:如果扩展时无法申请到足够内存,或在创建新的线程时没有足够的内存去创建对应的虚拟机栈,会抛出OutOfMemoryError(内存溢出)异常

栈的存储单位:

        栈中的数据是以栈帧(Stack Frame)的格式存放。
每个线程都有一个栈,当前线程正在执行的每个方法都对应一个栈帧(方法执行->入站,执行结束->出栈)
栈帧是一个内存区块/数据集,维护方法执行过程中各种数据信息。

栈的运行原理:

        在一条活动的线程中,只会有一个活动栈帧,只有当前正在执行方法的栈帧(栈顶栈帧)是有效的,这个栈帧被称为“当前栈帧”(Current Frame),与当前栈帧相对应的方法是“当前方法”(Current Method),定义这个方法的类是“当前类”(Current Class)

        执行引擎运行的所有字节码指令只针对当前栈帧进行操作。

        如果在方法中调用了其他方法,对应的新的栈帧会被创建出来,放在栈的顶端,成为新的当前栈帧方法返回时,当前栈帧会传回此方法的执行结果给前一个栈帧,然后虚拟机会丢弃当前栈帧,使前一个栈帧重新成为当前栈帧

        不同线程中所包含的栈帧不允许存在互相引用的。

        Java中方法有两种返回方式:1) Return    2) 抛出异常(未try catch处理异常)不管哪种方式返回都会导致栈帧被弹出。

JVM运行时数据区-程序计数器

运行时数据区分为:
        程序计数器(PC寄存器)、虚拟机栈、本地方法栈、方法区、堆区

线程共享区:方法区,堆区。
线程独立区:程序计数器(PC寄存器)、虚拟机栈、本地方法栈。

JVM线程:

1.一个JVM线程对应一个Runtime(运行时数据区)

2.JVM允许一个应用有多个线程并执行。

3.当一个Java线程准备好执行后,此时操作系统的本地线程也同时创建,Java线程执行终止后,本地线程也会回收。

4.当一个Java线程准备好执行后(初始化-> 1) 程序计数器 2)虚拟机栈 3)本地方法栈),此时操作系统的本地线程也同时创建并初始化,本地线程初始化成功后,就会调用Java线程中的run()方法

如果Java线程启动,发现未处理异常,Java线程终止,操作系统线程决定要不要回收取决于该线程是守护线程或普通线程。

程序计数器:(PC寄存器  Program Counter Register)

        寄存器存储指令相关现场信息,CPU只有把数据装载到寄存器中才能够运行。JVM中的PC寄存器是对物理PC寄存器的一种抽象模拟。

PC寄存器作用:

        PC寄存器用来存储指向下一条指令的地址,也即将要执行的指令代码,由执行引擎读取下一条指令

PC寄存器介绍:
1.PC寄存器是一块很小的内存空间,也是运行速度最快的存储区域。没有GC。

2.在JVM规范中,每个线程都有独立的程序计数器,线程私有的,生命周期与线程生命周期保持一致。

3.程序计数器指令会在存储当前线程正在执行的Java方法的Jvm地址,如果是正在执行Native(本地)方法,则是未指定值(undefined)。

字节码解释器工作是通过改变程序计数器值来选取下一条需要执行的字节码指令。

唯一一个在Java虚拟机规范中没有规定任何Out Of Memory(内存溢出)情况的区域。

JVM指令集、类加载子系统介绍

Jvm整体架构图:

Jvm指令集架构:

指令集架构分为两种:
    1.基于栈的指令集架构(Java编译器是基于栈的指令集架构)。
    2.基于寄存器指令架构。

栈的指令架构:
    优势:跨平台、零地址指令、指令集更小、移植性高。(编译器更容易实现)
    劣势:指令多,性能下降(实现同样的功能需要更多指令)
寄存器指令架构:
    优势:性能优秀、执行效率高,指令少
    劣势:移植性差

Jvm生命周期:

跟随线程的结束或终止生命周期就结束(System.exit()方法或Runtime类的Halt方法)。

Jvm类加载子系统图:

 

类加载子系统作用:

类加载子系统负责从文件系统或网络中加载class文件,class文件在文件开头有特定文件标识(Ca Fe Ba Be)。ClassLoad只负责class文件的加载,是否可以运行由Execution Engine决定。

Jvm加载器:

1.启动类加载器(引导类加载器 Bootstrap ClassLoader):
    1) 使用C/C++语言实现,嵌套在Jvm内部。
    2) 用来加载Java核心类库。
    3) 加载扩展类和系统类,指定为它们的父类加载器2.扩展类加载器(Extension ClassLoader):
    1) Java语言编写。
    2) 派生于ClassLoader类。
    3) 从java.ext.dirs系统属性所指定目录中加载或JDK的安装目录jre/lib/ext子目录下加载。
    4) 如果用户创建的Jar放在jre/lib/ext子目录下,自动由扩展类加载器加载3.系统类加载器(App ClassLoader):
    1) Java语言编写。
    2) 派生于ClassLoader类。
    3) 父类为扩展类加载器。
    4) 负责加载环境变量classpath或系统属性java.class.path指定下的类库。
    5) Java应用类,自定义类都是由系统加载器完成加载自定义加载器的好处:
    1) 隔离加载类 2) 修改类的加载方式 3) 扩展加载源 4) 防止源码泄漏

如何自定义加载器:
    1) 通过继承抽象类java.lang.ClassLoader的方式,实现自己的类加载器。
    2) 如果没有复杂的需求,可以直接继承URLClassLoader类,可以避免编写findClass()方法及获取字节码的方式,使自定义加载器更加简洁。
/**
 * 自定义类加载器
 */
public class MyClassLoader extends ClassLoader{

 private String classPath;

 public MyClassLoader(String classPath){
 this.classPath = classPath;
 }

 @Override
 protected Class<?> findClass(String name) {
 byte[] classByte = loadClassByte(name);
 return defineClass(name, classByte, 0, classByte.length);
 }

 private byte[] loadClassByte(String name) {
 // 获取该类在文件系统中保存的格式 (即路径加文件名)
 String fileName = classPath + File.separator + name.replace(".",File.separator) +".class";

 File file = new File(fileName);
 ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
 try(InputStream is = new FileInputStream(file)){

 int len;
 byte[] b = new byte[1024*8];
 while ((len = is.read(b)) != -1) {
 outputStream.write(b, 0, len);
 }
 } catch (FileNotFoundException e) {
 e.printStackTrace();
 } catch (IOException e) {
 e.printStackTrace();
 }
 return outputStream.toByteArray();
 }
}
public static void main(String[] args) throws Exception {
 //路径请使用classpath外的一个class文件,并且类路径下不能包含此class,才能使用到自定义的类加载器
 MyClassLoader myClassLoader = new MyClassLoader("F:\\chenxi");
 Class clazz = myClassLoader.loadClass("com.MyClassLoading");
 System.out.println(clazz + "-" + clazz.newInstance()+ "-" +clazz.getClassLoader());
}

类加载过程:

分为三个阶段:1.加载  2.链接  3.初始化

加载阶段: 
     1.通过一个类的全限定名获取定义此类的二进制字节流。
     2.将字节流所代表的静态存储结构化为方法区的运行时数据结构。
     3.在内存中生成一个代表这个类的java.lang.class对象作为一个方法区,这个类的各种数据访问入口。

     加载.class文件的方式:
         1.Jar,War格式的基础,从zip压缩包中读取
         2.动态代理技术,运行时计算生成。

链接阶段:
    1.验证:确保class文件中的字节流中包含信息符合当前虚拟机要求,保证被加载类的正确性,不会危害虚拟机自身安全(四种验证:1.文件格式 2.元数据 3.字节码 4.符号引用)。
            
     
    2.准备:为变量分配内存并设置该类变量的默认初始值(零值)(不包含用final修饰的static,因为final在编译时就分配了,准备阶段会显式初始化)。

    3.解析:将常量池内的符号引用转换为直接引用的过程,解析操作随着Jvm在执行空初始化后再执行(解析主要针对于:类、接口、字段、类方法、接口方法、方法类型等)。

初始化阶段:
    执行类构造器的方法<clinit>()的过程。此方法不需要定义,是Javac编译器自动收集类中的所有类变量的赋值动作和静态代码块中的语句合并而来。
    static int a = 1;
    static{
      a = 2;
    }
    构造器方法中指令按语句在源码文件中出现的顺序执行。
    <clinit>()不同于类的构造器。若该类具有父类,Jvm会保证子类的<clinit>()执行前,父类的<clinit>()已经执行完毕。
    虚拟机必须保证一个类的<clinit>()方法在多线程下被同步加锁。(保证一个类只加载一次)
    <init>对应类中的构造器。

    IDEA安装jclasslib插件:





双亲委派机制

工作原理:

    1) 如果一个类加载器收到了类加载的请求,它并不会自己去加载,而是先将这个请求委托给父类的加载器去执行。

    2) 如果父类的加载器还存在其父类加载器,则进一步向上委托,依次递归,请求最终达到顶层的引导类加载器(启动类加载器)。

    3) 如果父类加载器可以完成此加载任务,就成功返回,如果父类加载器无法完成此加载任务,子类加载器才会尝试自己加载

优势:
    1) 避免重复加载 
    2) 保护程序安全,防止核心Api被随意篡改(沙箱安全机制)。

Kafka-Eagle搭建可视化工具

集群三台机器

在Hosts文件中建立域名和IP的映射配置(方便解析): vim /etc/hosts

192.168.0.1                     master
192.168.0.2                     slave1
192.168.0.3                     slave2

配置环境变量:vim /etc/profile

注意修改安装目录

#kafka eagle config
export KE_HOME=/opt/module/eagle
export PATH=$PATH:$KE_HOME/bin

使环境变量生效:source /etc/profile

主要配置修改:

######################################
# multi zookeeper & kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=master,slave1,slave2
master.zk.list=master:2181,slave1:2181,slave2:2181
slave1.zk.list=master:2181,slave1:2181,slave2:2181
slave2.zk.list=master:2181,slave1:2181,slave2:2181

######################################
# broker size online list
######################################
cluster1.kafka.eagle.broker.size=20

######################################
# zk client thread limit
######################################
kafka.zk.limit.size=25

######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048

######################################
# kafka offset storage
######################################
master.kafka.eagle.offset.storage=kafka
slave1.kafka.eagle.offset.storage=kafka
slave2.kafka.eagle.offset.storage=kafka

######################################
# kafka metrics, 30 days by default
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=30

######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.fix.error=false

######################################
# delete kafka topic token
######################################
kafka.eagle.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
master.kafka.eagle.sasl.enable=false
master.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
master.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
master.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
master.kafka.eagle.sasl.client.id=
master.kafka.eagle.sasl.cgroup.enable=false
master.kafka.eagle.sasl.cgroup.topics=

slave1.kafka.eagle.sasl.enable=false
slave1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
slave1.kafka.eagle.sasl.mechanism=PLAIN
slave1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
slave1.kafka.eagle.sasl.client.id=
slave1.kafka.eagle.sasl.cgroup.enable=false
slave1.kafka.eagle.sasl.cgroup.topics=

slave2.kafka.eagle.sasl.enable=false
slave2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
slave2.kafka.eagle.sasl.mechanism=PLAIN
slave2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
slave2.kafka.eagle.sasl.client.id=
slave2.kafka.eagle.sasl.cgroup.enable=false
slave2.kafka.eagle.sasl.cgroup.topics=

######################################
# kafka sqlite jdbc driver address
######################################
#kafka.eagle.driver=org.sqlite.JDBC
#如需使用eagle自带数据库,则修改安装路径
#kafka.eagle.url=jdbc:sqlite:/opt/module/eagle/db/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://slave2:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123a123@

##授权启动文件:chomd 777 ke.sh

##启动ke.sh start

eagle常用命令start|stop|restart|status|stats|find|gc|jdk|version

访问:ip:8048/ke  admin/123456

Centos安装mysql8.0

注意:本次安装目录为/opt/module/

###下载mysql安装包
[root@chenxi module]# wget https://downloads.mysql.com/archives/get/p/23/file/mysql-8.0.18-linux-glibc2.12-x86_64.tar.xz

###解压安装包
[root@chenxi module]# tar -xf mysql-8.0.18-linux-glibc2.12-x86_64.tar.xz
[root@chenxi module]# mv mysql-8.0.18-linux-glibc2.12-x86_64 mysql
###在/etc目录下创建my.cnf初始文件插入以下并保存
[root@chenxi module]# vim /etc/my.cnf

[mysqld]
# 设置3306端口
port=3306
# 设置mysql的安装目录
basedir=/opt/module/mysql
# 设置mysql数据库的数据的存放目录
datadir=/opt/module/mysql/data
# 允许最大连接数
max_connections=10000
# 允许连接失败的次数。这是为了防止有人从该主机试图攻击数据库系统
max_connect_errors=10
# 服务端使用的字符集默认为UTF8MB4
# 搭建时使用UTF8报出警告如下:
# --character-set-server: 'utf8' is currently an alias for the character
# set UTF8MB3, but will be an alias for UTF8MB4 in a future release.
# Please consider using UTF8MB4 in order to be unambiguous
character-set-server=UTF8MB4
# 创建新表时将使用的默认存储引擎
default-storage-engine=INNODB
# 默认使用“mysql_native_password”插件认证
default_authentication_plugin=mysql_native_password
[mysql]
# 设置mysql客户端默认字符集
default-character-set=UTF8MB4
[client]
# 设置mysql客户端连接服务端时默认使用的端口
port=3306
default-character-set=UTF8MB4

###初始化mysql 
[root@chenxi module]# /opt/module/mysql/bin/mysqld --initialize --user=mysqluser --basedir=/opt/module/mysql/ --datadir=/opt/module/mysql/data/ 

###暂且保留好密码:root@localhost: dj91/8JC%k=Z

###创建用户组和用户并赋予mysql文件夹操作权限
[root@chenxi module]# groupadd mysqlgroup
[root@chenxi module]# useradd mysqluser
[root@chenxi module]# chown -R mysqluser:mysqlgroup /opt/module/mysql

###把启动命令加入到系统服务service管理并授权
[root@chenxi module]# cp support-files/mysql.server /etc/init.d/mysqld
[root@chenxi module]# chmod 755 /etc/init.d/mysqld

###切换用户启动
[root@chenxi module]# su mysqluser

###注意:切换用户如不切换用户启动会报如下错误:(大坑)



[mysqluser@chenxi module]# service mysqld start

###进入mysql(/opt/module/mysql/bin/mysql为mysql安装目录下的bin/mysql)
[mysqluser@chenxi module]# /opt/module/mysql/bin/mysql -uroot -p dj91/8JC%k=Z

###修改当前用户密码、创建新用户并授权、刷新缓存
mysql> alter user user() identified by '密码';
mysql> create user '用户名'@'%' identified by '密码';
mysql> grant all privileges on *.* to '用户名'@'%' with grant option;
mysql> flush privileges;

Kafka事务

Kafka0.11版本开始引入了事务支持事务可以保证KafkaExactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败 。

Producer事务:

    为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PIDTransaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID 
    为了管理TransactionKafka引入了一个新的组件Transaction CoordinatorProducer就是通过和Transaction Coordinator 交互获得Transaction ID 对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
 
Consumer事务:

    上述事务机制主要是从Producer方面考虑,对于Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费。这是由于Consumer 可以通过offset访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。 

 

Zookeeper+Kafka集群部署

1.下载zookeeper和kafka:

  • zk下载地址:http://archive.apache.org/dist/zookeeper/
  • kafka下载地址:https://kafka.apache.org/downloads.html
  • 本次搭建下载的zk和kafka版本是(zookeeper-3.4.14.tar.gz/kafka_2.11-0.11.0.0.tgz

服务器环境->准备三台服务器集群

Hosts文件中建立域名和IP的映射配置(方便解析): vim /etc/hosts

192.168.0.1                     master
192.168.0.2                     slave1
192.168.0.3                     slave2

1.每台服务器配置环境变量

##配置zk和kafka环境变量
[root@kafka-master module]# vim /etc/profile

###配置zk环境 
#zookeeper config
export ZK_HOME=/opt/module/zookeeper
export PATH=$ZK_HOME/bin:$PATH

###配置kafka环境
#kafka config
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

###使环境变量立即生效 
[root@kafka-master module]# source /etc/profile

本次把zookeeper和kafka的安装目录统一为:/opt/module

Zookeeper搭建集群:

###解压zookeeper:
[root@kafka-master module]# tar -zxvf zookeeper-3.4.14.tar.gz
###重命名zookeeper-3.4.14为zookeeper
[root@kafka-master module]# mv zookeeper-3.4.14 zookeeper
[root@kafka-master module]# cd zookeeper
###重命名zoo_sample.cfg为zoo.cfg
[root@kafka-master zookeeper]# mv conf/zoo_sample.cfg zoo.cfg
[root@kafka-master zookeeper]# vim conf/zoo.cfg
###修改zoo.cfg配置:

# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
# 修改zk数据存储路径
dataDir=/opt/module/zookeeper/data/

#在文件末尾追加以下配置项
#zk cluster
server.0=master:2888:3888
server.1=slave1:2888:3888
server.2=slave2:2888:3888

###新建刚配置的zk存储路径
[root@kafka-master zookeeper]# mkdir data
###新建myid文件并写入[master(0),slave1(1),slave2(2)]并保存
[root@kafka-master zookeeper]# vim data/myid

###配置其他两个服务器,然后依次启动三台机器 master,slave1,slave2
zk启动命令分别为:zkServer.sh start/stop/restart/status

启动完成分别查看三台zk状态:

[root@kafka-master zookeeper]# bin/zkServer.sh status

[root@kafka-slave-1 zookeeper]# bin/zkServer.sh status

[root@kafka-slave-2 zookeeper]# bin/zkServer.sh status


slave1被选举称为leader,master和slave2被选为follower。

三台分别显示leader或follower则搭建成功。

zookeeper搭建常见问题:https://chenxitag.com/zookeeper/problem
centos firewalld防火墙:https://chenxitag.com/centos/firewalld

Kafka搭建集群:

###解压kafka:
[root@kafka-master module]# tar -zxvf kafka_2.11-0.11.0.0.tgz
###重命名kafka_2.11-0.11.0.0为kafka
[root@kafka-master module]# mv kafka_2.11-0.11.0.0 kafka
[root@kafka-master module]# cd kafka
###修改kafka启动配置文件
[root@kafka-master kafka]# vim config/server.properties

###broker.id不可重复
master:broker.id = 0
slave1:broker.id = 1
slave2:broker.id = 2





###启动Kafka
[root@kafka-master kafka]# bin/kafka-server-start.sh -daemon config/server.properties

slave1,slave2机器亦是如此

kafka常用命令:https://chenxitag.com/kafka/cmd

Kafka常用命令

kafka启动:bin/kafka-server-start.sh -daemon config/server.properties
kafka停止:bin/kafka-server-stop.sh
选项说明:
--topic topic名称
--replication-factor 副本数
--partitions 分区数
--zookeeper 本机ip:zk端口号
--bootstrap-list 本机ip:kafka端口号
--broker-list 本机ip:kafka端口号
--from-beginning 查看历史消息

###查看当前服务器所有topic
[root@kafka-master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --list

###创建topic
[root@kafka-master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --create --topic topicname --partitions 3 --replication-factor 1

##删除topic
[root@kafka-master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --delete --topic topicname

##修改topic分区数  注意:只能增加,不能减少
[root@kafka-master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --alter --topic topicname --partitions 4

##查看topic详情
[root@kafka-master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --describe --topic topicname

##生产者发送消息
[root@kafka-master kafka]# bin/kafka-console-producer.sh --broker-list master:9092 --topic topicname

##消费者消费消息
[root@kafka-master kafka]# bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic topicname --from-beginning

Zookeeper搭建集群问题

错误日志:ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.4.14/bin/../conf/zoo.cfg
Error contacting service. It is probably not running.

解决方案:

1.注意myid文件内容是否对应zk集群配置的server.id
2.注意Server的防火墙是否开放zk端口

错误日志:
2020-02-18 14:36:53,275 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:FastLeaderElection@847] – Notification time out: 12800
2020-02-18 14:37:06,076 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:QuorumCnxManager@347] – Have smaller server identifier, so dropping the connection: (1, 0)
2020-02-18 14:37:06,077 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:QuorumCnxManager@347] – Have smaller server identifier, so dropping the connection: (2, 0)
2020-02-18 14:37:06,077 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:FastLeaderElection@847] – Notification time out: 25600
2020-02-18 14:37:31,678 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:QuorumCnxManager@347] – Have smaller server identifier, so dropping the connection: (1, 0)
2020-02-18 14:37:31,679 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:QuorumCnxManager@347] – Have smaller server identifier, so dropping the connection: (2, 0)
2020-02-18 14:37:31,680 [myid:0] – INFO [QuorumPeer[myid=0]/0.0.0.0:2181:FastLeaderElection@847] – Notification time out: 51200

解决方案:  保持这台有问题zk的现状, 按myid从小到大依次重启其他的zk机器;

原因:  zk是需要集群中所有机器两两建立连接的, 其中配置中的3555端口是用来进行选举时机器直接建立通讯的端口, 大id的server才会去连接小id的server,避免连接浪费.如果是最后重启myid最小的实例,该实例将不能加入到集群中, 因为不能和其他集群建立连接

Kafka进阶之幂等性以及分区分配策略

Exactly Once语义:

    将服务器的ACK级别设置为-1,可以保证ProducerServer之间不会丢失数据,即At Least Once 语义。相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。

    At Least Once可以保证数据不丢失,但是不能保证数据不重复;相对的, At Least Once可以保证数据不重复,但是不能保证数据不丢失。 
对于一些非常重要的信息,比如交易数据,下游数据消费者要求数据既不重复也不丢失0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。

    0.11 版本的 Kafka,引入了一项重大特性:幂等性。幂等性就是指Producer不论向Broker发送多少次重复数据,Broker都只会持久化一条。幂等性结合At Least Once语义,就构成了KafkaExactly Once语义。

                At Least Once + 幂等性 = Exactly Once



    要启用幂等性,只需要将Producer的参数中enable.idompotence设置为true(设置为true时,ACK默认设置为-1)    Kafka的幂等性实现是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条

    PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once 

Kafka消费者:

     consumer 采用 pull(拉) 模式从 broker 中读取数据。
 
     push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,pull模式则可以根据consumer的消费能力以适当的速率消费消息。

    pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回

分区分配策略 :

     一个consumer group中有多个consumer,一个topic有多个partition,所以会涉及到partition的分配问题,如何确定哪个partition由哪个consumer来消费Kafka有两种分配策略:RoundRobin(轮循)Range(范围)。

Kafka默认使用的是Range分配策略。
可设置partition.assignment.strategy的值进行自由调整(RoundRobin、Range


RoundRobin(轮循)根据消费者组来进行轮循划分,把消费者组看成一个整体来进行轮循。(需要保证同一个消费者组下的消费者订阅同一个topic)







Range(范围):根据(分区数量 / 消费者组数量 = 范围)划分。

假设有2个消费者(cA,cB)和2个topic分别有3个partition 
(topic-A{a-p-0,a-p-1,a-p-2}), topic-B{b-p-0,b-p-1,b-p-2}) 

分配后的结果是: 
M = 3/2
cA = a-p-0,a-p-1,b-p-0,b-p-1 
cB = a-p-2,b-p-2


导致前M个分配不均匀,消费者负载