Kafka入门及进阶段

1.1 Kafka入门

一、Kafka 是什么?

有人说世界上有三个伟大的发明:火,轮子,以及 Kafka。

发展到现在,Apache Kafka 无疑是很成功的,Confluent 公司曾表示世界五百强中有三分之一的企业在使用 Kafka。在流式计算中,Kafka 一般用来缓存数据,例如 Flink 通过消费 Kafka 的数据进行计算。

image

关于Kafka,我们最开始需要了解的是以下四点:

1.Apache Kafka 是一个开源 消息 系统,由 Scala 写成。是由 Apache 软件基金会开发的 一个开源消息系统项目。

2.Kafka 最初是由 LinkedIn 公司开发,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础,现在它已被多家不同类型的公司作为多种类型的数据管道和消息系统使用。

3.Kafka 是一个分布式消息队列。Kafka 对消息保存时根据 Topic 进行归类,发送消息 者称为 Producer,消息接受者称为 Consumer,此外 kafka 集群有多个 kafka 实例组成,每个 实例(server)称为 broker。

4.无论是 kafka 集群,还是 consumer 都依赖于 Zookeeper 集群保存一些 meta 信息, 来保证系统可用性。

二、为什么要有 Kafka?

kafka 之所以受到越来越多的青睐,与它所扮演的三大角色是分不开的的:

消息系统:kafka与传统的消息中间件都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,kafka还提供了大多数消息系统难以实现的消息顺序性保障及回溯性消费的功能。

存储系统:kafka把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效的降低了消息丢失的风险。这得益于其消息持久化和多副本机制。也可以将kafka作为长期的存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题日志压缩功能。

流式处理平台:kafka为流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理框架,比如窗口、连接、变换和聚合等各类操作。

image.png

三、Kafka 基本概念

在深入理解 Kafka 之前,可以先了解下 Kafka 的基本概念。

一个典型的 Kafka 包含若干Producer、若干 Broker、若干 Consumer 以及一个 Zookeeper 集群。Zookeeper 是 Kafka 用来负责集群元数据管理、控制器选举等操作的。Producer 是负责将消息发送到 Broker 的,Broker 负责将消息持久化到磁盘,而 Consumer 是负责从Broker 订阅并消费消息。Kafka体系结构如下所示:

image

概念一:生产者(Producer)与消费者(Consumer)

image

对于 Kafka 来说客户端有两种基本类型:生产者(Producer)和 消费者(Consumer)。除此之外,还有用来做数据集成的 Kafka Connect API 和流式处理的 Kafka Streams 等高阶客户端,但这些高阶客户端底层仍然是生产者和消费者API,只不过是在上层做了封装。

Producer :消息生产者,就是向 Kafka broker 发消息的客户端;

Consumer :消息消费者,向 Kafka broker 取消息的客户端;

概念二:Broker 和集群(Cluster)

一个 Kafka 服务器也称为 Broker,它接受生产者发送的消息并存入磁盘;Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。使用特定的机器硬件,一个 Broker 每秒可以处理成千上万的分区和百万量级的消息。

若干个 Broker 组成一个 集群(Cluster),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。下图是一个样例:

image

概念三:主题(Topic)与分区(Partition)

image

在 Kafka 中,消息以 主题(Topic)来分类,每一个主题都对应一个「消息队列」,这有点儿类似于数据库中的表。但是如果我们把所有同类的消息都塞入到一个“中心”队列中,势必缺少可伸缩性,无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储。

Kafka是天然分布式的

备份分区仅仅用作于备份,不做读写。如果某个Broker挂了,那就会选举出其他Broker的partition来作为主分区,这就实现了高可用

另外值得一提的是:当生产者把数据丢进topic时,我们知道是写在partition上的,那partition是怎么将其持久化的呢?(不持久化如果Broker中途挂了,那肯定会丢数据嘛)。

Kafka是将partition的数据写在磁盘的(消息日志),不过Kafka只允许追加写入(顺序访问),避免缓慢的随机 I/O 操作。

  • Kafka也不是partition一有数据就立马将数据写到磁盘上,它会先缓存一部分,等到足够多数据量或等待一定的时间再批量写入(flush)。

消费者在读的时候也很有讲究:正常的读磁盘数据是需要将内核态数据拷贝到用户态的,而Kafka 通过调用sendfile()直接从内核空间(DMA的)到内核空间(Socket的),少做了一步拷贝的操作。

img

附docker-compose.yml 脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
version: "3"
services:
zookeeper:
image: 'confluentinc/cp-zookeeper:6.2.0'
hostname: zookeeper
ports:
- '2181:2181'
environment:
# 匿名登录--必须开启
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
volumes:
- ./zookeeper/data:/var/lib/zookeeper/data:Z
- ./zookeeper/log:/var/lib/zookeeper/log:Z
# 该镜像具体配置参考 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.md
broker:
image: 'confluentinc/cp-kafka:6.2.0'
hostname: broker
ports:
- '9092:9092'
- "29092:29092"
#- '9999:9999'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.92:9092,PLAINTEXT_HOST://localhost:29092
#- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT
# 客户端访问地址,更换成自己的
#- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.92:9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
# 允许使用PLAINTEXT协议(镜像中默认为关闭,需要手动开启)
- ALLOW_PLAINTEXT_LISTENER=yes
# 关闭自动创建 topic 功能
#- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
# 全局消息过期时间 6 小时(测试时可以设置短一点)
- KAFKA_CFG_LOG_RETENTION_HOURS=6
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_HEAP_OPTS=-Xmx256M -Xms128M
# 开启JMX监控
#- JMX_PORT=9999
#- KAFKA_JMX_OPTS= -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.3.92 -Dcom.sun.management.jmxremote.rmi.port=9999 -Dcom.sun.management.jmxremote.port=9999
volumes:
- ./kafka/data:/var/lib/kafka/data:Z
- ./kafka/config:/var/lib/kafka/config:Z
depends_on:
- zookeeper
schema-registry:
image: confluentinc/cp-schema-registry:6.2.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
connect:
image: confluentinc/cp-kafka-connect:latest
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:29092
CONNECT_REST_ADVERTISED_HOST_NAME: localhost
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: ksql-connect-cluster
CONNECT_OFFSET_STORAGE_TOPIC: ksql-connect-configs
CONNECT_CONFIG_STORAGE_TOPIC: ksql-connect-topics
CONNECT_STATUS_STORAGE_TOPIC: ksql-connect-statuses
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
ksqldb-server:
image: confluentinc/ksqldb-server:latest
hostname: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
#healthcheck:
# test: curl -f http://ksqldb-server:8088/ || exit 1
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: 192.168.3.92:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_CONNECT_URL: http://connect:8083
ksqldb-cli:
image: confluentinc/ksqldb-cli:latest
container_name: ksqldb-cli
depends_on:
- ksqldb-server
- broker
entrypoint: /bin/sh
tty: true

Kafka topic CLI:

1
kafka-topics --bootstrap-server localhost:9092 --list

image-20230418103634592

1
kafka-console-producer --broker-list localhost:9092 --topic test

image-20230418103936415

1
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

image-20230418104000562

1
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

image-20230418110419615

1
show topics;

image-20230418110443996

1
2
CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
WITH (kafka_topic='locations', value_format='json', partitions=1);
1
2
3
4
5
6
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
select * from riderLocations;
1
ksql> select * from riderLocations;

image-20230418111709787

1
2
CREATE TABLE myNearDemo AS \
>select profileId,la,lo from currentLocation ;

image-20230418151308205

监测变化

1
select * from myNearDemo EMIT CHANGES;

image-20230418151345474

原始流 stream 里插入元数据

image-20230418151452071

1
2
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 38.7857, -122.4012);

image-20230418151605116