Kafka -- 消费者
基本概念消费者 + 消费者群组 消费者从属于消费者群组 一个消费者群组里的消费者订阅的是同一个主题,每个消费者接收主题的部分分区的消息 消费者横向扩展1个消费者 主题T1有4个分区,然后创建消费者C1,C1是消费者群组G1里唯一的消费者,C1订阅T1 消费者C1将接收主题T1的全部4个分区的消息 2个消费者 如果群组G1新增一个消费者C2,那么每个消费者将分别从两个分区接收消息 假设C1接收分区0和分区2的消息,C2接收分区1和分区3的消息 4个消费者 如果群组G1有4个消费者,那么每个消费者可以分配到一个分区 5个消费者 如果群组G1有5个消费者,_**消费者数量超过主题的分区数量**_,那么有1个消费者就会被**闲置**,不会接收到任何消息 总结 往群组里增加消费者是横向伸缩消费能力的主要方式 消费者经常会做一些高延迟的操作,比如把数据写到数据库或HDFS,或者使用数据进行比较耗时的计算 有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者,减少消息堆积 不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置 消费者群组横向扩展 Kafka设计的主要目标...
Kafka -- Avro + Twitter Bijection
Avro + Kafka Native API 比较繁琐 编译Schema 依赖于Avro实现自定义的序列化器和反序列化器 引入依赖12345<dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.12</artifactId> <version>0.9.6</version></dependency> Schema路径:src/main/resources/user.json 123456789{ "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"...
Kafka -- Avro + Kafka Native API
Schema123456789101112131415{ "namespace": "me.zhongmingmao.avro", "type": "record", "name": "Stock", "fields": [ {"name": "stockCode", "type": "string"}, {"name": "stockName", "type": "string"}, {"name": "tradeTime", "type": "long"}, {&qu...
Kafka -- Avro入门
引入依赖12345<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.2</version></dependency> 1234567891011121314151617<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.2</version> <executions> <execution> <phase>generate-sources</phase> <goals> ...
Kafka -- 生产者
生产者概述 创建一个ProducerRecord对象,ProducerRecord对象包含Topic和Value,还可以指定Key或Partition 在发送ProducerRecord对象时,生产者先将Key和Partition序列化成字节数组,以便于在网络上传输 字节数组被传给分区器 如果在ProducerRecord对象里指定了Partition 那么分区器就不会做任何事情,直接返回指定的分区 如果没有指定分区,那么分区器会根据ProducerRecord对象的Key来选择一个Partition 选择好分区后,生产者就知道该往哪个主题和分区发送这条记录 这条记录会被添加到一个记录批次里,一个批次内的所有消息都会被发送到相同的Topic和Partition上 有一个单独的线程负责把这些记录批次发送到相应的Broker 服务器在收到这些消息时会返回一个响应 如果消息成功写入Kafka,就会返回一个RecordMetaData对象 包含了Topic和Partition信息,以及记录在分区里的偏移量 如果写入失败,就会返回一个错误 生产者在收到错误之后会尝试重新发送消息,几次之后如果还...
Kafka -- 集群安装与配置(Docker)
配置文件文件列表123$ tree.└── docker-compose.yml docker-compose.yml123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103version: '2'services: zk1: image: confluentinc/cp-zookeeper:latest hostname: zk1 container_name: zk1 restart: always ports: - "12181:2181" environment: ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_CLIENT_...
Kafka -- 集群安装与配置(Ubuntu)
单节点安装Java添加ppa12$ sudo add-apt-repository ppa:webupd8team/java$ sudo apt-get update 安装oracle-java8-installer1$ sudo apt-get install oracle-java8-installer 设置系统默认JDK1$ sudo update-java-alternatives -s java-8-oracle 下载解压Kafka12345$ mkdir ~/Downloads & cd ~/Downloads$ wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz$ mkdir ~/kafka && cd ~/kafka$ kafka tar -xvzf ~/Downloads/kafka_2.11-2.0.0.tgz --strip 1 允许Kafka删除主题1234$ vim ~/kafka/config/server.properties# 添加delete...
Kafka -- 单节点安装与配置(Mac)
安装步骤Kafka与ZookeeperKafka使用Zookeeper保存集群的元数据信息和消费者信息 安装Zookeeper和Kafka12345678910111213$ brew install kafka==> Installing dependencies for kafka: zookeeper==> Caveats==> zookeeperTo have launchd start zookeeper now and restart at login: brew services start zookeeperOr, if you don't want/need a background service you can just run: zkServer start==> kafkaTo have launchd start kafka now and restart at login: brew services start kafkaOr, if you don't want/need a background service...
Kafka -- 简介
相关概念 Kafka一般被称为分布式提交日志或者分布式流平台 文件系统或数据库提交日志用来提供所有事务的持久记录,通过重放这些日志可以重建系统的状态 Kafka的数据是按照一定顺序持久化保存的,可以按需读取 Kafka的数据分布在整个系统里,具备数据故障保护和性能伸缩的能力 消息和批次 Kafka的数据单元被称为消息 消息由字节数组组成,消息里的数据没有特别的格式或者含义 消息可以有一个可选的元数据,就是键,键也是一个字节数组,同样没有特殊含义 消息以一种可控的方式写入不同的分区时,会用到键 例如为键生成一个一致性散列值,然后使用散列值对主题分区数进行取模 为消息选取分区,从而保证具有相同键的消息总是被写入到相同的分区 为了提高效率,消息被分批写入Kafka,批次就是一组消息,这些消息属于同一个主题和分区 如果每个消息都单独穿行于网络,会导致大量的网络开销,把消息分成批次传输可以减少网络开销 不过,这需要在时间延迟和吞吐量之间做出权衡 批次越大,单位时间内处理的消息就会越多,耽搁消息的传输时间就会越长 批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算...
Zookeeper -- 分布式锁InterProcessMutex
Curator是ZooKeeper的一个客户端框架,其中封装了分布式互斥锁的实现,最为常用的是InterProcessMutex,本文将对其进行代码剖析 简介InterProcessMutex基于Zookeeper实现了_**分布式的公平可重入互斥锁**_,类似于单个JVM进程内的ReentrantLock(fair=true) 构造函数1234567891011121314151617// 最常用public InterProcessMutex(CuratorFramework client, String path){ // Zookeeper利用path创建临时顺序节点,实现公平锁的核心 this(client, path, new StandardLockInternalsDriver());}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver){ // maxLeases=1,表示可以获得分布式锁的线程数量(跨J...













