kafka-0.9.0

原文:http://kafka.apache.org/documentation.html

自己翻译的kafka官方文档(哈哈),持续更新中。。。

开始

介绍

kafka是一个分布式的(distributed),分区的(partitioned),可复制的(replicated) commit log 服务。它提供了作为一个消息系统的常用功能,但又有独一无二的设计。

什么意思呢?

首先,让我们回顾一些基本的消息术语:

  • kafka维持消息流分类的称作topics.

  • 我们称处理发布消息到kafka topic的为producers.

  • 我们称处理从topics中订阅并且处理已发布的消息流为consumers.

所以,以一个高层次来说,producers通过网络发送消息给kafka集群,集群轮流把它们提供给consumers,如下图:

客户端和服务器之间的交流是以一个单一,高性能,跨语言的tcp协议完成。我们为kafka提供了一个java客户端,但是客户端可以是很多其他语言。

topics 和 logs

让我们首先投入到kafka提供的高级抽象概念-topic

一个topic就是一个种类或者是已发布消息的名称。对于每一个topic,kafka集群(cluster)维持一个分区日志如下图:

每一区都是一系列顺序的,不可改变的消息,消息被连续添加到一个commit log。在分区里的消息都被赋予一个序列id号称作offset,在相应区里面是唯一的。

在一个可以配置的时间段里,kafka保留所有已发布的消息,无论它们是否被消费。例如,如果日志保留时间设置为2天,那么一个消息在发布后的两天内对于消费者是有效的,之后它将会被丢弃从而释放空间。在数据大小方面而言,kafka的性能持续有效,所以保留大量数据没有问题。

事实上,在日志中以每一个消费者的方式保留的唯一元数据是消费者的位置,称作“offset”。这个偏移量由消费者控制:当消费者读消息的时候将线性推进它的偏移量,但是事实上该位置由消费者控制,它可以以任何顺序消费消息。例如,一个消费者可以重置到一个旧的偏移量重新执行。

这些混合的特征意味着kafka消费者非常廉价,它们可以在集群或者其他消费者之间来或去,而没有太多影响。例如,当一些现存的消费者在消费一些主题内容时,你可以用我们的命令行工具来“tail”出这些内容而不会改变什么。

在日志服务中的分区有以下几个目的:首先,它们允许日志按大小比例适应每一个服务器。每一个分区必须适应绑定它的服务器,但是一个主题可能有很多分区,所以它可以随意处理很多数据。第二,它们以相似的单位行动,跟多的是以位。

Distribution 分布式

日志分区分布在kafka集群中的各个服务器上,每一个服务器处理来自一串分区的请求和数据。为了容错,每一个分区会在配置数量的服务器间复制。

每一个分区都有一个服务器作为“leader”-领导者,0或多个服务器当作“followers”-追随者。领导者处理所有来自分区的读和写请求,而追随者被动的从领导者那复制。如果领导者失败了,其中一个追随者将自动成为领导者。(这里有一个疑问,是随机一个follower称为leader吗?)在集群内,每一个服务器在一些分区中扮演一个领导者,而在其他分区中扮演一个追随者,所以可以平衡运行。

Producers 生产者

生产者们发布数据到它们选择的主题里。生产者在主题里有责任选择消息分配到哪个分区里。这可以在仅仅一个循环内进行平衡加载,或者根据某些语义分区函数完成(根据消息中的一些key)。更多分区用法在第二章。

Consumers 消费者

传统消息有2个模块:queuingpublish-subscribe。在一个队列里,一个消费者池从一个服务器中读取消息,然后消息到达它们中的一个;而在publish-subscribe模式中,消息被广播到所有消费者中。kafka提出一个单一的消费者抽象概念,概括了这2个模块-consumer group消费组

消费者用消费组名称作标签,每一个发布到主题的消息会被转发给订阅消费组中的消费实例。消费实例可以是单独进程,或者单独机器。

如果所有的消费实例都属于同个消费组,那么这就像是一个传统的队列在各个消费者间负载均衡。

如果所有的消费实例都属于不同的消费组,那么这就像是pub-sub模块,所有消息广播到所有消费者中。

更通常的,但是,我们发现topics主题有小数目的consumer groups消费群体,用于每个“logical subscriber”逻辑订阅者。每一组由许多可扩展和容错的消费实例组成,就是pub-sub语义上是消费群中的订阅者而不是一个单一的进程。

kafka也比传统消息系统更强的排序保证。

一个传统队列在服务器上顺序保留了消息,如果多个消费者从队列里消费,那么服务器会顺序处理保存的消息。但是,虽然服务器顺序分发消息,但是消息还是被异步转发给消费者,所以它们可能不是顺序到达不同的消费者。这明显意味着,消息的顺序性在并行消费面前失效了。消息系统经常通过一个“exclusive consumer”独家消费的概念来解决这个问题,即只允许一个进程从服务器消费,但是这意味着在进程中没有并行。

一个2个服务器的kafka集群,用2个消费群组绑定了4个分区(p0-p3)。消费群A有2个消费实例,消费群B有4个。

kafka在这方面做的更好。通过一个在主题中的并行(parallelism)概念-分区,kafka不仅可以保证顺序,而且还可以在消费进程池里面负载均衡。这个通过赋予消费群里的消费者主题分区实现,在一个组里面每一个分区被一个消费者消费。通过这个,我们保证消费者是分区里的唯一读取者并且顺序消费数据。所以有很多这样的分区仍在很多消费者实例中负载均衡。但是要注意,在一个消费群里面不可能有比分区更多的消费实例。

kafka只在一个分区内提供全序消息,而不是在分类里的分区之间。对于大部分应用来说,预分区通过key分区数据的能力已经足够了。但是,如果你请求一个全序消息,这只能在一个只有一个分区的分类里实现,虽然这意味着一个消费者处理一个消费群。

Guarantees 保证

高级kafka给出以下保证:

  • 生产者发送消息到一个指定的分类分区将会被顺序添加。那就是说,如果消息m1和m2被同样的生产者发送,m1先发送,然后m1将会比m2低一点偏移,在日志中出现的更早。
  • 一个消费实例顺序看到在日志中存储的消息。
  • 对于一个分类冗余因子n,我们将容忍n-1个服务器丢失提交给日志的信息。

更多这些保证的详细信息会在文档的设计部分给出。

使用案例

以下是一些apache kafka常见使用案例的描述。要查看这些领域的更多概述,请看这个blog.

messaging消息传递

作为一个传统消息代理的替代者,kafka运作的很好。消息代理基于很多种原因使用(从数据生产者中解耦,缓存还没处理的消息等等)。与大部分消息系统对比,kafka有更好的吞吐量,内置分区,冗余和容错机制,使得它能够很好的置身于大规模消息处理的应用中。

以我们的经验,消息传递使用通常相当于低吞吐量,但是端对端低延时,基于kafka提供的高持久保证。

在这个领域,kafka可以比得上传统的消息系统比如ActiveMQ或者RabbitMQ

网站行为轨迹

kafka的最初用途是可以重建一个用户的行为轨迹线作为一组实时的发布-订阅feed。这意味着网站活动(比如pv,搜索,或者其他可获得的用户行为)会以每个行为类型一个topic分类的发布到中央topic。这些feed被一系列用途使用,包括实时进程、实时监控、加载进hadoop或者离线数据仓库系统做离线处理和报告。

活跃的轨迹通常包含了高容量的每个用户pv产生的活跃信息。

Metrics度量

kafka经常被用来运转中监控数据,这需要从分布式应用中,生产集中运转中的数据feed,然后汇总统计。

日志聚合

很多朋友用kafka作为日志聚合的一个解决替代方案。典型的,日志聚合线下收集物理日志文件,然后把它们放入一个集中的地方(一个文件系统或者HDFS)来处理。kafka理论上远离了文件细节,给了一个更干净的文件抽象概念,甚至数据作为信息流。这对多种数据源和分布式数据消费有着低延迟和更好的支持。相比于日志中心系统比如scribe和flume,kafka同样提供高性能,强持久冗余保证和更低的端对端延迟。

流式处理

很多用户都是最终阶段处理那些从分类消费而来的未经处理的数据,然后汇总,丰富,或者转换进新的kafka分类作进一步的消费。例如一个文章推荐处理流程可能是先从rss feed流中爬文章内容,然后发布到’articles’分类中;更进一步的处理可能是规范化或者对内容去重,扔到一个放干净文章内容的分类里;最后一步可能是匹配这些内容给相关用户。这创建了一个有别于个别分类的实时数据流曲线图。StormSamza是实现这些转换的流行框架。

事件源

事件源是一种状态改变以时间顺序记录的一种应用设计方式。用这种方式,kafka以非常大的存储日志数据支持使的它成为一个应用极好的后端。

日志提交

kafka可以以一种外在的日志提交为分布式系统服务。这个日志帮助在节点之间复制数据,和作为一种预同步机制为那些失败节点恢复数据。kafka的这个log compaction特点帮助支持了这个用例。在这个用法上kafka和Apache BookKeeper项目类似。

快速开始

这部分指导假定你是从新开始,没有kafka或者zookeeper数据。

第一步:下载源代码

下载0.9.0.0版本,然后解压它。

> tar -xzf kafka_2.11-0.9.0.0.tgz
> cd kafka_2.11-0.9.0.0

第二步:启动服务

kafka用了zookeeper,所以你需要首先启动zookeeper服务。你可以用kafka包里面的简单脚本来启动一个快速简便的单一节点zookeeper实例。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)

现在启动kafka服务:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

第三步:创建一个分类

让我们创建一个名叫”test”的分类,仅有一个分区和复制:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

现在我们可以看见这个分类,如果执行了列出分类的命令:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

或者不手动创建分类,你可以配置你的服务器,当一个不存在的分类发布时自动创建这个分类。

第四步:发送一些消息

kafka自带一个命令行客户端,这个可以从一个文件或者表格式得到输入,然后作为消息发送到kafka集群。默认的,每一行作为一个单独的消息被送出。

运行生产者,然后输入一些消息进控制台发送到服务器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

第五步:启动一个消费者

kafka也有一个消费者命令行,将消息转储到标准输出。

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

如果你在不同终端运行着以上命令,那么你应当可以输入消息到生产者终端,然后在消费者终端出现这些消息。

所有的命令后工具都有额外的选项;不带参数运行命令将会显示出更多用法的详细信息。

第六步:设置多服务器集群

到目前为止,我们已经执行了一个单一服务器,但是这没什么意思。对于kafka,一个单一服务器仅仅是集群中的一个,所以启动更多的服务器实例将没有太多改变。但是为了感受一下,我们还是扩充我们的集群到3个节点(还是在我们的本地机器上)。

首先我们为每一个broker服务器生成一个配置文件:

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

现在编辑这些新的文件,设置如下内容:

config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2

在集群的每一个节点中,这个broker.id是唯一和永久的名字。我们修改了端口和日志目录,因为我们是在相同的机器上运行这些,并且想让服务器都注册到相同的端口,彼此覆盖数据。

我们已经有了zookeeper,我们的单一节点也启动了,所以我们只要启动下面两个新节点:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

现在创建一个新的具有3个冗余因子的分类:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好了,我们有了一个集群,但是我们如何知道每一个broker服务器在干什么呢?