Skip to content

Kafka 从入门到实战:分布式流处理平台完整学习指南

Published: at 07:41 AM

最近工作中需要使用到 Kafka,基于 mysqlbin logpostgresWAL log 把数据发布到 KafkaKafka 另外一大用途主要用作消息中间件,虽然没有在业务系统中使用它作为消息队列,但这也是一个令人心动的功能。趁此契机,从环境搭建,系统原理,到生产使用再系统过一遍 Kafka

术语

用来做什么

一个工具可以用来做什么,是我们为什么要学习他的主要原因,另外了解它和同类产品比有什么优缺点,可以更好地根据自己的需求做出抉择。官网这么介绍 Kafka

Apache Kafka® is a distributed streaming platform.

在官网描述 Kafka 是一个分布式流处理(distributed streaming)平台,作为一个流处理的工具,它可以应用在一下几方面。

参考 http://kafka.apache.org/uses

它有四个核心 API

安装

CentOS

安装 java

kafka 依赖于 java 的环境,所以第一步先安装 JDK 和 JRE。

yum install java-1.8.0-openjdk
yum install java-1.8.0-openjdk-devel

安装 ZookeeperKafka

安装好 java 后按照以下命令安装和启动 Kafka

Kafka 依赖于 Zookeeper,启动 Kafka 之前先启动 Zookeeper。这里启动一个单实例的 Kafka

关于 kafkaZookeeper 的下载地址,可以在国内的镜像源进行下载

https://mirrors.tuna.tsinghua.edu.cn/apache

$ curl -O http://mirrors.shu.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz

$ # 在国内可以使用以下镜像源
$ # curl -O https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz

$ tar -xzf kafka_2.12-2.2.0.tgz
$ cd kafka_2.12-2.2.0

# 以以下配置文件启动 Zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
...
[2019-04-04 16:07:30,141] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

# 以以下配置文件启动 Kafka
$ bin/kafka-server-start.sh config/server.properties
...
[2019-04-04 16:13:07,794] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

启动成功后,Zookeeper 会占用 2181 端口号,而 Kafka 会占用 9092 端口号,对于这些特殊服务的端口号需要留意一下,在以下很多命令中需要指定端口号

如何获取版本号

如果是自己搭建的 Kafka,自然可以很容易地通过下载包的命名知道版本号。如果是别人搭建的,那又如何获取版本号?

你可以先试一试,kafka-topics.sh --version

$ ./kafka-topics.sh --version
2.2.0 (Commit:05fcfde8f69b0349)

如果不成功,那说明版本在 2.0 一下,你只能通过安装文件夹下文件的命名用肉眼来辨别了

如果你在 kafka_install_dir/libs 下发现了文件 kafka-clients-2.2.0.jar,那么它的版本号就是 2.2.0。

Hello, world | 一个简单的示例

先开始一个最简单的示例,一方在终端发布数据,一方在终端订阅数据。在这之前,先了解下什么是 Topic

Topic 简介

上边术语中提到过,Topic 是数据流的集合,类似于 sql 中的 table,并由 record 组成。

topic

如上图,由于 kafka 是分布式的,每个 Topic 由分区日志组成,每个分区日志由 不可变的,顺序的 record 组成。Consumer 会标记每个分区日志中已处理的 record 位置,即 offset,使用 offset 可以唯一标志每个 record。

多个 Consumer 并不会对已处理数据丢失,数据永久存储在 kafka 上,除非过了 kafka 设置的过期时间。

创建 Topic

# 创建一个 Topic: test
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.

# 列出 Topic 列表
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

生产者(Producer)/消费者(Consumer)

运行以下命令,从终端生产数据发布消息

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
hello
world

打开消费者,会从终端看到订阅的数据

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world

这样,一个简单版的 kafka 的 Demo 完成了。

不过,此时他有两个问题

  1. 单节点
  2. 数据来源只有终端

接下来看如何处理这两方面问题

Broker 集群

bin/kafka-server-start.sh config/server.properties

从以上单节点的示例中,我们看到它会读取 config/server.properties 作为配置文件。

默默吐槽下配置文件的 DSL 太多了

先复制出来两份文件,修改下部分属性,作为三个 broker 的配置文件进行启动,就可以有三个 broker 了

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

以下几项是需要修改的配置

以下是修改后的配置文件内容

$ cat config/server-1.properties
...
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
...
$ cat config/server-2.properties
...
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
...

接下来启动多个 broker

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

创建一个多 broker 的 Topic

--replication-facor 指定备份数目,现在有三个 broker,因此指定三个备份。

现在有了三个 broker 的地址,现在创建 Topic 时指定 Zookeeper 地址。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic todo
Error while executing topic command : Replication factor: 3 larger than available brokers: 2.
[2019-04-11 15:27:28,265] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
 (kafka.admin.TopicCommand$)

原来是由于我的机器配置太差,内存不足以支持三个副本运行。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic todo
Created topic todo.

那这时候又引进来了一个新的问题,如何查看目前集群中的 broker 个数

如何查看目前的 broker 个数

TODO

查看 Topic 信息

当为 Topic 使用了多个备份时,可以使用 --describe 查看信息

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic todo
Topic:todo    PartitionCount:1        ReplicationFactor:2     Configs:segment.bytes=1073741824
        Topic: todo2    Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1

生产者/消费者

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic todo
hello
world
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic todo
hello
world

如何读取数据

mysql 或者 postgres 中可以使用命令行的方式进入数据库,使用 select 来读取数据库表中的数据。那么如何在 kafka 中读取 Topic 的数据呢?

答案是通过 Consumer API 来读取数据。

那可以使用命令行式的交互工具读取数据吗? 恩,好像不能。

Consumer

以上无论是单节点还是多节点,我们都是用 Consumer 从终端中读取数据

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

从文件中读取和写入数据

使用 Connect API 控制数据从文件中读写

TODO

可靠性

At most once——消息可能会丢失但绝不重传。 At least once——消息可以重传但绝不丢失。 Exactly once——这正是人们想要的, 每一条消息只被传递一次.

Kafka Cheat Sheets

查看版本号

$ bin/kafka-topics.sh --version
2.2.0

启动 zookeeper

config/zookeeper.properties 是 zookeeper 的配置文件

bin/zookeeper-server-start.sh config/zookeeper.properties

启动 Kafka

config/server.properties 是 Kafka 的配置文件

bin/kafka-server-start.sh config/server.properties

创建一个 Topic

创建命名为 test 的 Topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

查看 Topic 列表

__consumer_offsetsKafka 自动生成的,代表每个 consumer 的 offset。

$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
test

发布 Topic 数据 (console-producer)

以下是使用 console-producer 发布数据到 test

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
hello
world

订阅 Topic 数据 (console-consumer)

以下是使用 console-consumer 读取 test 中的数据

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world

查看 Consumer Group 列表

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
console-consumer-54112

查看 Consumer Group 中的 Consumer 信息

可以查看组内的每个 Consumer 的 id 以及 offset

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-54112
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
todo            0          -               11              -               consumer-1-ab662bd3-60b3-4e97-829d-84f27a799f0d /192.168.1.214  consumer-1