kafka入门

2017-11-13 15:46:14

下载kafka

> tar -xzf kafka_2.11-0.11.0.1.tgz
> cd kafka_2.11-0.11.0.1

kafka依赖于zookeeper,要是还没有zookeeper,可以使用kafka中自带的zookeeper,

bin/zookeeper-server-start.sh config/zookeeper.properties >>zk.log 2>&1 &

看到如下日志,zookeeper启动成功

[2017-10-15 00:47:20,447] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-15 00:47:20,447] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-15 00:47:20,447] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2017-10-15 00:47:20,458] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

启动kafka
kafka启动默认占用1G内存,如果内存不足,可以修改bin/kafka-server-start.sh,将

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"

启动kafka

> bin/kafka-server-start.sh config/server.properties >>kafka.log 2>&1 &

创建一个topic
topic,主题,消费者可以从一个topic中拉取消息,生产者可以发送消息到toipc上

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

–zookeeper 指定zookeeper地址
–replication-factor 备份数
–partitions 分区数

查看topic

> bin/kafka-topics.sh --list --zookeeper localhost:2181

发送消息
Kafka附带一个命令行客户端,它可以将文件或标准输入作为输入,并将输入内容作为消息发送到Kafka集群。 默认情况下,每行将作为单独的消息发送。

>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

–topic test 指定发送的topic
上面例子中发送了两条消息

消费消息
Kafka也提供了一个消费者命令行,可以将消息输出到标准输出。

>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

–from-beginning 从topic开始读取消息

外网访问
kafka默认不允许外网访问,如果需要才外网访问,需要打开config/server.properties

advertised.listeners=PLAINTEXT://your.host.name:9092

基本概念

生产者(Producers)

生产者往某个Topic上发布消息。

消费者(Consumers)

消费者会被被划分成到一个消费者组(group)

主题(Topics)

Kafka将消息分类, 每一类的消息称之为话题(Topic). Producers发送消息要指定topic, Consumers也要指定监听的topic.

Broker

已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker).

分区(Partition)

Kafka会将每个主题的消息划分为若干部分, 每一部分为一个Partition,
每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。
对于消息, kakfa抽象为log, 对于每一个Topic, Kafka集群维护这些分区的log,

分布式(Distribution)

Log的分区被分布到集群中的多个服务器上。每个服务器处理它分到的分区。 根据配置每个分区还可以复制到其它服务器作为备份容错。
每个分区有一个leader,零或多个follower。Leader处理此分区的所有的读写请求而follower被动的复制数据。如果leader当机,其它的一个follower会被推举为新的leader。
一台服务器可能同时是一个分区的leader,另一个分区的follower。 这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理。

kakfa使用的是一种”副本与数据分布”的数据分布方式, 推荐阅读
分布式系统原理介绍

java Consumer

gradle引用

org.apache.kafka:kafka-clients:0.11.0.1

kafka-clients版本最好和kafak版本一致

消费者

Properties props = new Properties();
props.put("bootstrap.servers", "aws.binecy.com:9092");
props.put("group.id", "test");      // 配置用户group
props.put("enable.auto.commit", "true");    // 是否自动提交用户offset
props.put("auto.commit.interval.ms", "1000");   //用户offset自动提交的频率
props.put("session.timeout.ms", "30000");   // broker超时时间, 如果broker在该时间内没有回应心跳将被remove
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

consumer.subscribe(Arrays.asList("test"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    if(!records.isEmpty()) {
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
    }
}

这是个很简单的消息消费者。

offset机制

在kafka中, 每个topic是由分到不同partitions的logs组成的, Producers写到这些logs的尾部, Consumers根据自己的offset读取logs, kafka将topic中的partitions分给同一group的不同消费者,
每个partition都只会分配给gropu中唯一的一个成员。
下图标识一个topic有三个partitions,一个消费组有两个消费者成员。

对于每个消费组,kafka会选择一个brokers作为消费组的协调者(group coordinator).
协调者负责管理消费者组的状态。 它的主要工作是负责协调partition的分配(assignment): 当有新成员加入,旧成员退出,
或者topic的metadata发生变化(topic的partitions改变),它会重新分配partition, 这叫做消费组的平衡(group rebalance)

当消费组第一次被初始化时, 消费者通常从每个分区的最早或最近的偏移(offset)开始读取。 然后顺序地读取每个partition log的消息.
在消费者读取过程中,它会提交已经成功处理的消息的offsets。 下图中消费者的位置在6位置,消费者最近提交的offset则在位置1.

当一个partition被分配给消费组中的其他消费者,(新的消费者)初始位置会设置为(原始消费者)最近提交的offset.
如果示例中的消费者突然崩溃,接管partition的成员会从lastCommitOffset=1的位置开始消费.
这种情况下,,它将不得不重新处理消息直到原消费者崩溃的位置6。

上图中还有两个log中重要的位置信息. Log End Offset是写入log中最后一条消息的offset+1.
High Watermark是成功拷贝到log的所有副本节点的最近消息的offset(译注: 实际上是partition的所有ISR节点).

上图中还有两个log中重要的位置信息. Log End Offset是写入log中最后一条消息的offset+1(Producers会在这里写新消息)。
High Watermark是成功拷贝到log的所有副本节点的最近消息的offset。
从消费者的角度来看,最多只能读取到High watermark的位置, 这是为了防止消费者读取还没有完全复制的数据造成数据丢失.
(注:如果消费者读取了未完全复制的数据,但是这部分数据之后丢失了,导致读取不该读的消息,所以应该读取完全复制的数据)

用户group

一个消息, kafka保证 在同一个group中的消费者,只有一个会接收到该消息。

在消费组中,每个消费者会被分配它订阅的topics的一部分partitions,并且这些partitions只被该消费组这个消费者消费,就像在这些partitions上加了一个组锁。
只要锁被持有,组中的其他成员就不会读取他们(注:每个partition都对应唯一的消费者,partition锁只属于唯一的消费者).
当你的消费者是正常状态时,当然是最好不过了,因为这是防止重复消费的唯一方式.
但如果消费者失败了,你需要释放掉那个锁,这样可以将partitions分配给其他健康的成员.

kafka的消费组协调协议使用心跳机制解决了这个问题.在每次rebalance,当前generation所有的成员都会定时地发送心跳给group协调者.
只要协调者持续接收到心跳,它会假设这个成员是健康的. 每次接收到心跳,协调者就启动(或复位)一个计时器.
如果超过时间没有收到消费者的心跳,协调者标记消费者为死亡状态,并触发组中其他的消费者重新加入,来重新分配partitions.
计时器的时间间隔就是session timeout,即客户端应用程序中配置的session.timeout.ms

session timeout确保应用程序崩溃或者partition将网络分区隔离了使用者与协调器的情况下锁会被释放.
注意应用程序的失败(进程还存在)有点不同,因为消费者仍然会发送心跳给协调者,但这时应用程序不一定是健康的.

消费者的轮询循环被设计为解决这个问题. 所有的网络IO操作调用poll或者其他的阻塞API,都是在前台完成的.
消费者并不使用任何的后台线程. 这就意味着消费者的心跳只有在调用poll的时候才会发送给协调者.
如果应用程序停止polling(不管是处理代码抛出异常或者下游系统崩溃了),就不会再发送心跳了,
最终就会导致session超时(没有收到心跳,计时器开始增加), 然后消费组就会开始平衡操作.
如上面栗子中, consumer.poll在while循环中进行,如果程序出现问题导致while循环结束或阻塞,consumer.poll将停止, 就无法响应心跳了。

唯一存在的问题是如果消费者处理消息花费的时间比session timeout还要长,就会触发一个假的rebalance.
可以通过设置更长的session timeout防止发生这样的情况.默认的超时时间是30秒,设置为几分钟也不是不行的.
更长的session timeout的缺点是,协调者会花费较长时间才能检测到真正崩溃的消费者.

auto.commit.interval.ms

auto.commit.interval.ms表示kakfa是否接受消息后是否自动提交offset。

  • 该值配置为true,消费者接受消息后, 成功处理消息前崩溃, 新的消费者将从offset+1开始消费。这该offset位置的消息被丢弃。这时kafka保证消息“最多消费一次”。
  • 该值配置为false,用户需要在处理消息完成后调用consumer.commitSync();提交offset。如果消费者在处理消息完成前崩溃(offset未提交),这新的消费者会重新读取这些消息, 这时kafka保证消息“最少消费一次”

java Producer

Properties props = new Properties();
props.put("bootstrap.servers", "aws.binecy.com:9092");
// Producer接受broker ack的模式
// all:这意味着领导者将等待全套的in-sync副本确认记录。
props.put("acks", "all");
// 失败重发次数
props.put("retries", 0);    
// 批量发送记录的最大值
props.put("batch.size", 16384); 
// Producer可用于缓冲等待发送记录的最大字节数
props.put("buffer.memory", 33554432);
// 如果缓冲区已满或metadata不可用Producer将阻塞  max.block.ms配置可阻塞的多长时间,超时它将抛出一个异常
props.put("max.block.ms", 60000);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);

for(int i = 0; i < 5; i++)
    producer.send(new ProducerRecord<>("test", "from java : " + i ));


producer.close();

关键配置:

acks

Producer发送记录后,可以等待broker回复ack确认。如果发送失败可以重发。acks配置Producer接受broker ack的模式

  • 0: 如果设置为零,则生产者不会等待来自服务器的任何确认。
  • 1: leader接收成功并写入本地日志就返回ack 如果leader在接收记录之后但在追随者复制之前立即崩溃,那么记录将会丢失。
  • all: 这意味着leader将等待所以追随者复制完成后才确认记录。

batch.size

如果有多个记录被发送到同一个分区,生产者就会尝试将记录批量提交。这时kafka提高吞吐量的一个策略。
batch.size过小会降低吞吐量, 过大也会浪费内存。

参考:
Kafka快速入门-鸟窝
译:使用新的Kafka消费者客户端
Kafka基础概念