Kafka -- Avro + Twitter Bijection
Avro + Kafka Native API
比较繁琐
编译Schema
依赖于Avro实现自定义的序列化器和反序列化器
引入依赖12345<dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.12</artifactId> <version>0.9.6</version></dependency>
Schema路径:src/main/resources/user.json
123456789{ "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, ...
Kafka -- Avro + Kafka Native API
Schema123456789101112131415{ "namespace": "me.zhongmingmao.avro", "type": "record", "name": "Stock", "fields": [ {"name": "stockCode", "type": "string"}, {"name": "stockName", "type": "string"}, {"name": "tradeTime", "type": "long"}, {" ...
Kafka -- Avro入门
引入依赖12345<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.2</version></dependency>
1234567891011121314151617<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.2</version> <executions> <execution> <phase>generate-sources</phase> <goals> ...
Kafka -- 生产者
生产者概述
创建一个ProducerRecord对象,ProducerRecord对象包含Topic和Value,还可以指定Key或Partition
在发送ProducerRecord对象时,生产者先将Key和Partition序列化成字节数组,以便于在网络上传输
字节数组被传给分区器
如果在ProducerRecord对象里指定了Partition
那么分区器就不会做任何事情,直接返回指定的分区
如果没有指定分区,那么分区器会根据ProducerRecord对象的Key来选择一个Partition
选择好分区后,生产者就知道该往哪个主题和分区发送这条记录
这条记录会被添加到一个记录批次里,一个批次内的所有消息都会被发送到相同的Topic和Partition上
有一个单独的线程负责把这些记录批次发送到相应的Broker
服务器在收到这些消息时会返回一个响应
如果消息成功写入Kafka,就会返回一个RecordMetaData对象
包含了Topic和Partition信息,以及记录在分区里的偏移量
如果写入失败,就会返回一个错误
生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败 ...
Kafka -- 集群安装与配置(Docker)
配置文件文件列表123$ tree.└── docker-compose.yml
docker-compose.yml123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103version: '2'services: zk1: image: confluentinc/cp-zookeeper:latest hostname: zk1 container_name: zk1 restart: always ports: - "12181:2181" environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_POR ...
Kafka -- 集群安装与配置(Ubuntu)
单节点安装Java添加ppa12$ sudo add-apt-repository ppa:webupd8team/java$ sudo apt-get update
安装oracle-java8-installer1$ sudo apt-get install oracle-java8-installer
设置系统默认JDK1$ sudo update-java-alternatives -s java-8-oracle
下载解压Kafka12345$ mkdir ~/Downloads & cd ~/Downloads$ wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz$ mkdir ~/kafka && cd ~/kafka$ kafka tar -xvzf ~/Downloads/kafka_2.11-2.0.0.tgz --strip 1
允许Kafka删除主题1234$ vim ~/kafka/config/server.properties# 添加delete.to ...
Kafka -- 单节点安装与配置(Mac)
安装步骤Kafka与ZookeeperKafka使用Zookeeper保存集群的元数据信息和消费者信息
安装Zookeeper和Kafka12345678910111213$ brew install kafka==> Installing dependencies for kafka: zookeeper==> Caveats==> zookeeperTo have launchd start zookeeper now and restart at login: brew services start zookeeperOr, if you don't want/need a background service you can just run: zkServer start==> kafkaTo have launchd start kafka now and restart at login: brew services start kafkaOr, if you don't want/need a background service yo ...
Kafka -- 简介
相关概念
Kafka一般被称为分布式提交日志或者分布式流平台
文件系统或数据库提交日志用来提供所有事务的持久记录,通过重放这些日志可以重建系统的状态
Kafka的数据是按照一定顺序持久化保存的,可以按需读取
Kafka的数据分布在整个系统里,具备数据故障保护和性能伸缩的能力
消息和批次
Kafka的数据单元被称为消息
消息由字节数组组成,消息里的数据没有特别的格式或者含义
消息可以有一个可选的元数据,就是键,键也是一个字节数组,同样没有特殊含义
消息以一种可控的方式写入不同的分区时,会用到键
例如为键生成一个一致性散列值,然后使用散列值对主题分区数进行取模
为消息选取分区,从而保证具有相同键的消息总是被写入到相同的分区
为了提高效率,消息被分批写入Kafka,批次就是一组消息,这些消息属于同一个主题和分区
如果每个消息都单独穿行于网络,会导致大量的网络开销,把消息分成批次传输可以减少网络开销
不过,这需要在时间延迟和吞吐量之间做出权衡
批次越大,单位时间内处理的消息就会越多,耽搁消息的传输时间就会越长
批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理
...
Zookeeper -- 分布式锁InterProcessMutex
Curator是ZooKeeper的一个客户端框架,其中封装了分布式互斥锁的实现,最为常用的是InterProcessMutex,本文将对其进行代码剖析
简介InterProcessMutex基于Zookeeper实现了_**分布式的公平可重入互斥锁**_,类似于单个JVM进程内的ReentrantLock(fair=true)
构造函数1234567891011121314151617// 最常用public InterProcessMutex(CuratorFramework client, String path){ // Zookeeper利用path创建临时顺序节点,实现公平锁的核心 this(client, path, new StandardLockInternalsDriver());}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver){ // maxLeases=1,表示可以获得分布式锁的线程数量(跨JVM) ...
Zookeeper -- ZAB协议
本文将简要介绍Zookeeper的ZAB协议
基本概念ZAB VS Base-Paxos
Base-Paxos是**通用的分布式一致性算法**
ZAB协议不是Base-Paxos的典型实现,而是特别为Zookeeper设计的一种**支持崩溃恢复的原子广播协议**
相对于ZAB协议,Base-Paxos主要存在2个问题:**活锁问题+全序问题**
活锁问题是指在Base-Paxos算法中,由于并不存在Leader角色,**新轮次可以不断抢占旧轮次**,如此循环往复,产生活锁
全序问题是指如果消息a在消息b之前发送,则所有Server应该看到相同的结果,但Base-Paxos并不保证这一点
ZAB的解决方案
为了解决活锁问题,ZAB协议引入了Leader角色,所有的事务请求只能由Leader处理,但是单Leader会存在单点问题,ZAB协议进而引入崩溃恢复模式
为了解决全序问题,ZAB协议引入了ZXID(全局单调递增的唯一ID)和利用TCP的FIFO特性
服务器角色Zookeeper中服务器有三种角色:**Leader、Follower和Observer**,其中Observer与ZAB协议本身 ...