Kafka -- 生产者
生产者概述
创建一个ProducerRecord对象,ProducerRecord对象包含Topic和Value,还可以指定Key或Partition
在发送ProducerRecord对象时,生产者先将Key和Partition序列化成字节数组,以便于在网络上传输
字节数组被传给分区器
如果在ProducerRecord对象里指定了Partition
那么分区器就不会做任何事情,直接返回指定的分区
如果没有指定分区,那么分区器会根据ProducerRecord对象的Key来选择一个Partition
选择好分区后,生产者就知道该往哪个主题和分区发送这条记录
这条记录会被添加到一个记录批次里,一个批次内的所有消息都会被发送到相同的Topic和Partition上
有一个单独的线程负责把这些记录批次发送到相应的Broker
服务器在收到这些消息时会返回一个响应
如果消息成功写入Kafka,就会返回一个RecordMetaData对象
包含了Topic和Partition信息,以及记录在分区里的偏移量
如果写入失败,就会返回一个错误
生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败 ...
Kafka -- 集群安装与配置(Docker)
配置文件文件列表123$ tree.└── docker-compose.yml
docker-compose.yml123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103version: '2'services: zk1: image: confluentinc/cp-zookeeper:latest hostname: zk1 container_name: zk1 restart: always ports: - "12181:2181" environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_POR ...
Kafka -- 集群安装与配置(Ubuntu)
单节点安装Java添加ppa12$ sudo add-apt-repository ppa:webupd8team/java$ sudo apt-get update
安装oracle-java8-installer1$ sudo apt-get install oracle-java8-installer
设置系统默认JDK1$ sudo update-java-alternatives -s java-8-oracle
下载解压Kafka12345$ mkdir ~/Downloads & cd ~/Downloads$ wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz$ mkdir ~/kafka && cd ~/kafka$ kafka tar -xvzf ~/Downloads/kafka_2.11-2.0.0.tgz --strip 1
允许Kafka删除主题1234$ vim ~/kafka/config/server.properties# 添加delete.to ...
Kafka -- 单节点安装与配置(Mac)
安装步骤Kafka与ZookeeperKafka使用Zookeeper保存集群的元数据信息和消费者信息
安装Zookeeper和Kafka12345678910111213$ brew install kafka==> Installing dependencies for kafka: zookeeper==> Caveats==> zookeeperTo have launchd start zookeeper now and restart at login: brew services start zookeeperOr, if you don't want/need a background service you can just run: zkServer start==> kafkaTo have launchd start kafka now and restart at login: brew services start kafkaOr, if you don't want/need a background service yo ...
Kafka -- 简介
相关概念
Kafka一般被称为分布式提交日志或者分布式流平台
文件系统或数据库提交日志用来提供所有事务的持久记录,通过重放这些日志可以重建系统的状态
Kafka的数据是按照一定顺序持久化保存的,可以按需读取
Kafka的数据分布在整个系统里,具备数据故障保护和性能伸缩的能力
消息和批次
Kafka的数据单元被称为消息
消息由字节数组组成,消息里的数据没有特别的格式或者含义
消息可以有一个可选的元数据,就是键,键也是一个字节数组,同样没有特殊含义
消息以一种可控的方式写入不同的分区时,会用到键
例如为键生成一个一致性散列值,然后使用散列值对主题分区数进行取模
为消息选取分区,从而保证具有相同键的消息总是被写入到相同的分区
为了提高效率,消息被分批写入Kafka,批次就是一组消息,这些消息属于同一个主题和分区
如果每个消息都单独穿行于网络,会导致大量的网络开销,把消息分成批次传输可以减少网络开销
不过,这需要在时间延迟和吞吐量之间做出权衡
批次越大,单位时间内处理的消息就会越多,耽搁消息的传输时间就会越长
批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理
...
Zookeeper -- 分布式锁InterProcessMutex
Curator是ZooKeeper的一个客户端框架,其中封装了分布式互斥锁的实现,最为常用的是InterProcessMutex,本文将对其进行代码剖析
简介InterProcessMutex基于Zookeeper实现了_**分布式的公平可重入互斥锁**_,类似于单个JVM进程内的ReentrantLock(fair=true)
构造函数1234567891011121314151617// 最常用public InterProcessMutex(CuratorFramework client, String path){ // Zookeeper利用path创建临时顺序节点,实现公平锁的核心 this(client, path, new StandardLockInternalsDriver());}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver){ // maxLeases=1,表示可以获得分布式锁的线程数量(跨JVM) ...
Zookeeper -- ZAB协议
本文将简要介绍Zookeeper的ZAB协议
基本概念ZAB VS Base-Paxos
Base-Paxos是**通用的分布式一致性算法**
ZAB协议不是Base-Paxos的典型实现,而是特别为Zookeeper设计的一种**支持崩溃恢复的原子广播协议**
相对于ZAB协议,Base-Paxos主要存在2个问题:**活锁问题+全序问题**
活锁问题是指在Base-Paxos算法中,由于并不存在Leader角色,**新轮次可以不断抢占旧轮次**,如此循环往复,产生活锁
全序问题是指如果消息a在消息b之前发送,则所有Server应该看到相同的结果,但Base-Paxos并不保证这一点
ZAB的解决方案
为了解决活锁问题,ZAB协议引入了Leader角色,所有的事务请求只能由Leader处理,但是单Leader会存在单点问题,ZAB协议进而引入崩溃恢复模式
为了解决全序问题,ZAB协议引入了ZXID(全局单调递增的唯一ID)和利用TCP的FIFO特性
服务器角色Zookeeper中服务器有三种角色:**Leader、Follower和Observer**,其中Observer与ZAB协议本身 ...
Java 8 -- Optional
本文主要介绍Java 8的 Optional 的简单使用
Address1234567@Data@AllArgsConstructor@NoArgsConstructorpublic class Address { private String province; private String city;}
of + ofNullable相关代码托管在java8_demo
123456789101112131415161718@Test(expected = NoSuchElementException.class)public void emptyTest() { // 声明一个空的Optional对象 Optional<Address> nameOptional = Optional.empty(); // java.util.NoSuchElementException: No value present nameOptional.get();}@Test(expected = NullPointerExceptio ...
Java 8 -- Default Method
本文主要介绍Java 8的 default 方法的简单使用
简介
default方法作为接口的一部分由实现类继承
default方法的目标用户是类库设计者
以兼容的方式解决类库的演进问题
冲突解决一个类可以实现多个拥有默认方法的接口,从而实现行为的多继承,按照下列步骤解决冲突
类或父类中声明的方法的优先级高于任何声明为默认方法的优先级
子接口的default方法优先级高于父接口的default方法
显式选择使用哪一个default方法
类与接口定义相关代码托管在java8_demo
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647interface A { default String hello() { return "Hello From A"; }}interface B extends A { @Override default String hello ...
Java 8 -- Stream
本文主要介绍Java 8的 Stream的简单使用
简介流与集合的区别计算的时机
是否全部载入内存
能否添加或删除元素
类似于
集合
是
能
DVD
流
否,按需计算
不能
网络流媒体
消费一次流只能遍历一次,遍历后即被消费,类似于网络流相关代码托管在java8_demo
12345List<String> strs = Arrays.asList("zhong", "ming", "mao");Stream<String> stream = strs.stream();stream.forEach(s -> System.out.println(s)); // Lambda// throw java.lang.IllegalStateException: stream has already been operated upon or closedstream.forEach(System.out::println); // 方法引用,相关内容请参照「Java8回忆录 - Lambda」
...