Peterfei

上主是我的牧者,我实在一无所缺


  • 首页

  • 归档

  • 标签

Kafka入门及进阶段

发表于 2023-04-18   |  

1.1 Kafka入门

一、Kafka 是什么?

有人说世界上有三个伟大的发明:火,轮子,以及 Kafka。

发展到现在,Apache Kafka 无疑是很成功的,Confluent 公司曾表示世界五百强中有三分之一的企业在使用 Kafka。在流式计算中,Kafka 一般用来缓存数据,例如 Flink 通过消费 Kafka 的数据进行计算。

image

关于Kafka,我们最开始需要了解的是以下四点:

1.Apache Kafka 是一个开源 消息 系统,由 Scala 写成。是由 Apache 软件基金会开发的 一个开源消息系统项目。

2.Kafka 最初是由 LinkedIn 公司开发,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础,现在它已被多家不同类型的公司作为多种类型的数据管道和消息系统使用。

3.Kafka 是一个分布式消息队列。Kafka 对消息保存时根据 Topic 进行归类,发送消息 者称为 Producer,消息接受者称为 Consumer,此外 kafka 集群有多个 kafka 实例组成,每个 实例(server)称为 broker。

4.无论是 kafka 集群,还是 consumer 都依赖于 Zookeeper 集群保存一些 meta 信息, 来保证系统可用性。

二、为什么要有 Kafka?

kafka 之所以受到越来越多的青睐,与它所扮演的三大角色是分不开的的:

消息系统:kafka与传统的消息中间件都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,kafka还提供了大多数消息系统难以实现的消息顺序性保障及回溯性消费的功能。

存储系统:kafka把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效的降低了消息丢失的风险。这得益于其消息持久化和多副本机制。也可以将kafka作为长期的存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题日志压缩功能。

流式处理平台:kafka为流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理框架,比如窗口、连接、变换和聚合等各类操作。

image.png

三、Kafka 基本概念

在深入理解 Kafka 之前,可以先了解下 Kafka 的基本概念。

一个典型的 Kafka 包含若干Producer、若干 Broker、若干 Consumer 以及一个 Zookeeper 集群。Zookeeper 是 Kafka 用来负责集群元数据管理、控制器选举等操作的。Producer 是负责将消息发送到 Broker 的,Broker 负责将消息持久化到磁盘,而 Consumer 是负责从Broker 订阅并消费消息。Kafka体系结构如下所示:

image

概念一:生产者(Producer)与消费者(Consumer)

image

对于 Kafka 来说客户端有两种基本类型:生产者(Producer)和 消费者(Consumer)。除此之外,还有用来做数据集成的 Kafka Connect API 和流式处理的 Kafka Streams 等高阶客户端,但这些高阶客户端底层仍然是生产者和消费者API,只不过是在上层做了封装。

Producer :消息生产者,就是向 Kafka broker 发消息的客户端;

Consumer :消息消费者,向 Kafka broker 取消息的客户端;

概念二:Broker 和集群(Cluster)

一个 Kafka 服务器也称为 Broker,它接受生产者发送的消息并存入磁盘;Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。使用特定的机器硬件,一个 Broker 每秒可以处理成千上万的分区和百万量级的消息。

若干个 Broker 组成一个 集群(Cluster),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。下图是一个样例:

image

概念三:主题(Topic)与分区(Partition)

image

在 Kafka 中,消息以 主题(Topic)来分类,每一个主题都对应一个「消息队列」,这有点儿类似于数据库中的表。但是如果我们把所有同类的消息都塞入到一个“中心”队列中,势必缺少可伸缩性,无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储。

Kafka是天然分布式的。

备份分区仅仅用作于备份,不做读写。如果某个Broker挂了,那就会选举出其他Broker的partition来作为主分区,这就实现了高可用。

另外值得一提的是:当生产者把数据丢进topic时,我们知道是写在partition上的,那partition是怎么将其持久化的呢?(不持久化如果Broker中途挂了,那肯定会丢数据嘛)。

Kafka是将partition的数据写在磁盘的(消息日志),不过Kafka只允许追加写入(顺序访问),避免缓慢的随机 I/O 操作。

  • Kafka也不是partition一有数据就立马将数据写到磁盘上,它会先缓存一部分,等到足够多数据量或等待一定的时间再批量写入(flush)。

消费者在读的时候也很有讲究:正常的读磁盘数据是需要将内核态数据拷贝到用户态的,而Kafka 通过调用sendfile()直接从内核空间(DMA的)到内核空间(Socket的),少做了一步拷贝的操作。

img

附docker-compose.yml 脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
version: "3"
services:
zookeeper:
image: 'confluentinc/cp-zookeeper:6.2.0'
hostname: zookeeper
ports:
- '2181:2181'
environment:
# 匿名登录--必须开启
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
volumes:
- ./zookeeper/data:/var/lib/zookeeper/data:Z
- ./zookeeper/log:/var/lib/zookeeper/log:Z
# 该镜像具体配置参考 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.md
broker:
image: 'confluentinc/cp-kafka:6.2.0'
hostname: broker
ports:
- '9092:9092'
- "29092:29092"
#- '9999:9999'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.92:9092,PLAINTEXT_HOST://localhost:29092
#- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT
# 客户端访问地址,更换成自己的
#- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.3.92:9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0
# 允许使用PLAINTEXT协议(镜像中默认为关闭,需要手动开启)
- ALLOW_PLAINTEXT_LISTENER=yes
# 关闭自动创建 topic 功能
#- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
# 全局消息过期时间 6 小时(测试时可以设置短一点)
- KAFKA_CFG_LOG_RETENTION_HOURS=6
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
- KAFKA_HEAP_OPTS=-Xmx256M -Xms128M
# 开启JMX监控
#- JMX_PORT=9999
#- KAFKA_JMX_OPTS= -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.3.92 -Dcom.sun.management.jmxremote.rmi.port=9999 -Dcom.sun.management.jmxremote.port=9999
volumes:
- ./kafka/data:/var/lib/kafka/data:Z
- ./kafka/config:/var/lib/kafka/config:Z
depends_on:
- zookeeper
schema-registry:
image: confluentinc/cp-schema-registry:6.2.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
connect:
image: confluentinc/cp-kafka-connect:latest
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:29092
CONNECT_REST_ADVERTISED_HOST_NAME: localhost
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: ksql-connect-cluster
CONNECT_OFFSET_STORAGE_TOPIC: ksql-connect-configs
CONNECT_CONFIG_STORAGE_TOPIC: ksql-connect-topics
CONNECT_STATUS_STORAGE_TOPIC: ksql-connect-statuses
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
ksqldb-server:
image: confluentinc/ksqldb-server:latest
hostname: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
#healthcheck:
# test: curl -f http://ksqldb-server:8088/ || exit 1
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: 192.168.3.92:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_CONNECT_URL: http://connect:8083
ksqldb-cli:
image: confluentinc/ksqldb-cli:latest
container_name: ksqldb-cli
depends_on:
- ksqldb-server
- broker
entrypoint: /bin/sh
tty: true

Kafka topic CLI:

1
kafka-topics --bootstrap-server localhost:9092 --list

image-20230418103634592

1
kafka-console-producer --broker-list localhost:9092 --topic test

image-20230418103936415

1
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

image-20230418104000562

1
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

image-20230418110419615

1
show topics;

image-20230418110443996

1
2
CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
WITH (kafka_topic='locations', value_format='json', partitions=1);
1
2
3
4
5
6
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
select * from riderLocations;
1
ksql> select * from riderLocations;

image-20230418111709787

1
2
CREATE TABLE myNearDemo AS \
>select profileId,la,lo from currentLocation ;

image-20230418151308205

监测变化

1
select * from myNearDemo EMIT CHANGES;

image-20230418151345474

原始流 stream 里插入元数据

image-20230418151452071

1
2
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 38.7857, -122.4012);

image-20230418151605116

Mysql使用pt-query-digest分析日志

发表于 2023-02-22   |  

1. 开启慢查询日志

(1)首先我们要创建一个文件夹用于保存慢查询日志文件,并且设置 mysql 有权读写该目录:

1
mkdir` `/``var``/log/mysql``sudo ``chown` `mysql:mysql -R /``var``/log/mysql

(2)我们可以登入 mysql 命令行后执行如下命令,使用 set 设置变量来临时开启。注意这种方式重启服务即失效。

1
set ``global` `slow_query_log=on; ``//开启慢查询功能``set ``global` `slow_query_log_file=``'/var/log/mysql/mysql-slow.log'``; ``//指定慢查询日志文件位置``set ``global` `log_queries_not_using_indexes=on; ``//记录没有使用索引的查询(非必须)``set ``global` `long_query_time=1; ``//只记录处理时间1s以上的慢查询

(3)或者我们也可以通过修改配置文件来永久开启慢查询日志功能,首先编辑配置文件:

1
vi /etc/my.cnf
  • 然后在里面添加如下高亮配置:
1
[mysqld]``slow_query_log=on #开启慢查询功能``slow_query_log_file=``'/var/log/mysql/mysql-slow.log'` `#指定慢查询日志文件位置``log_queries_not_using_indexes=on #记录没有使用索引的查询(非必须)``long_query_time=1 #只记录处理时间1s以上的慢查询
  • 保存关闭文件后,执行如下命令重启 mysql 即可:
1
service mysqld restart

2. 查看慢查询功能是否开启

(1)登入 mysql 命令行后执行如下命令可以查看慢查询开启状态,以及慢查询日志存放的位置:

1
show variables like ``'slow_query%'``;

(2)执行如下命令可以查看查询超过多少秒才记录:

1
show variables like ``'long_query_time'``;

3. 慢查询测试

(1)首先我们执行一个如下的 sql,模拟一个 2 秒的慢查询:

1
select sleep(2);

(2)查看日志可以发现这个慢查询已经被记录:

1
cat /``var``/log/mysql/mysql-slow.log

使用 pt-query-digest 工具分析慢查询日志

1,工具安装

(1)首先我们执行如下命令将 rpm 包下载到本地:

注意:如果下载不下来也可访问其官网(点击打开),手动下载下来再上传到服务器上。

1
wget https:``//downloads.percona.com/downloads/percona-toolkit/3.2.1/binary/redhat/7/x86_64/percona-toolkit-3.2.1-1.el7.x86_64.rpm

(2)接着使用 yum 命令进行安装:

1
yum install -y percona-toolkit-3.2.1-1.el7.x86_64.rpm

2. 分析慢查询日志

(1)执行如下命令可以分析指定的慢查询日志文件:

1
pt-query-digest /``var``/log/mysql/mysql-slow.log

(2)分析结果分为三部分,第一部分是总体统计结果:

  • Overall:总共有多少条查询
  • Time range:查询执行的时间范围
  • unique:唯一查询数量,即对查询条件进行参数化以后,总共有多少个不同的查询
  • total:所有查询总计时长
  • min:所有查询最小时长
  • max:所有查询最大时长
  • avg:所有查询平均时长
  • 95%:把所有时长值从小到大排列,位置位于 95% 的那个时长数,这个数一般最具有参考价值
  • median:中位数,把所有时长值从小到大排列,位置位于中间那个时长数

(3)第二部分是查询分组统计结果:

  • Rank:所有语句的排名,默认按查询时间降序排列,通过 –order-by 指定
  • Query ID:语句的 ID(去掉多余空格和文本字符,计算 hash 值)
  • Response:总的响应时间
  • time:该查询在本次分析中总的时间占比
  • Calls:执行次数,即本次分析总共有多少条这种类型的查询语句
  • R/Call:平均每次执行的响应时间
  • V/M:响应时间 Variance-to-mean 的比率
  • Item:查询对象

(4)第三部分是每一种查询比较慢的 sql 的详细统计结果:

  • pct:该 sql 语句某执行属性占所有慢查询语句某执行属性的百分比
  • total:该 sql 语句某执行属性的所有属性时间。
  • Count:sql 语句执行的次数。对应的 pct 表示此 sql 语句执行次数占所有慢查询语句执行次数的 % 比(下图为 10%),对应的 total 表示总共执行了 3 次。
  • Exec time:sql 执行时间
  • Lock time:sql 执行期间被锁定的时间
  • Rows sent:传输的有效数据,在 select 查询语句中才有值
  • Rows examine:总共查询的数据,非目标数据。
  • Query_time distribution:查询时间分布
  • SQL 语句:下图中为 select sleep(7)\G

3. 进阶用法

(1)分析 slow.log 日志,并将分析报告输入到 slow_report.log 中:

1
pt-query-digest slow.log > slow_report.log

(2)分析最近 12 小时内的查询:

1
pt-query-digest --since=12h slow.log > slow_report2.log

(3)分析指定时间范围内的查询:

1
pt-query-digest slow.log --since ``'2020-04-17 09:30:00'` `--until ``'2020-04-17 10:00:00'` `> slow_report3.log

(4)分析指含有 select 语句的慢查询:

1
pt-query-digest --filter ``'$event->{fingerprint} =~ m/^select/i'` `slow.log> slow_report4.log

(5)针对某个用户的慢查询:

1
pt-query-digest --filter ``'($event->{user} || "") =~ m/^root/i'` `slow.log> slow_report5.log

(6)查询所有的全表扫描或 full join 的慢查询:

1
pt-query-digest --filter ``'(($event->{Full_scan} || "") eq "yes") ||(($event->{Full_join} || "") eq "yes")'` `slow.log> slow_report6.log

(7)把查询保存到 query_review 表:

1
pt-query-digest --user=root –password=abc123 --review h=localhost,D=test,t=query_review--create-review-table slow.log

(8)通过 tcpdump 抓取 mysql 的 tcp 协议数据,然后再分析:

1
tcpdump -s 65535 -x -nn -q -tttt -i any -c 1000 port 3306 > mysql.tcp.txt``pt-query-digest --type tcpdump mysql.tcp.txt> slow_report9.log

(9)分析 binlog:

1
mysqlbinlog mysql-bin.000093 > mysql-bin000093.sql``pt-query-digest --type=binlog mysql-bin000093.sql > slow_report10.log

(10)分析 general log:

1
pt-query-digest --type=genlog localhost.log > slow_report11.log

Elastic 使用极限网关进行数据双向切换

发表于 2023-02-09   |  

极限网关 (INFINI Gateway) 是一个面向 Elasticsearch 的高性能应用网关,它包含丰富的特性,使用起来也非常简单。极限网关工作的方式和普通的反向代理一样,我们一般是将网关部署在 Elasticsearch 集群前面, 将以往直接发送给 Elasticsearch 的请求都发送给网关,再由网关转发给请求到后端的 Elasticsearch 集群。因为网关位于在用户端和后端 Elasticsearch 之间,所以网关在中间可以做非常多的事情, 比如可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等

下载安装

1
2
3
wget https://release.infinilabs.com/gateway/stable/gateway-1.8.6-769-linux-amd64.tar.gz
tar vxzf gateway-1.8.6-769-linux-amd64.tar.gz
mv gateway-linux-amd64 bin/gateway

验证安装

极限网关下载解压之后,我们可以执行这个命令来验证安装包是否有效,如下:

1
2
✗ ./bin/gateway -v
gateway 1.0.0_SNAPSHOT 2021-01-03 22:45:28 6a54bb2

如果能够正常看到上面的版本信息,说明网关程序本身一切正常。

启动网关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
[root@k8s-master gateway]# cat /opt/gateway/gateway.yml
# 数据路径
path.data: data
# 日志路径
path.logs: log
# 定义 Elasticsearch 集群地址
elasticsearch:
# cluster01 集群
- name: cluster01
enabled: true
endpoint: http://192.168.10.15:9200
basic_auth:
username: elastic
password: IQsMLRniP5BcYfoNzTBT
discovery:
enabled: true
refresh:
enabled: true
interval: 1s
# cluster02 集群
- name: cluster02
enabled: true
endpoint: http://192.168.10.15:30993
basic_auth:
username: elastic
password: IQsMLRniP5BcYfoNzTBT
discovery:
enabled: true
refresh:
enabled: true
interval: 1s
# 定义网关入口
entry:
- name: my_es_entry
enabled: true
router: my_router
network:
binding: 0.0.0.0:8000
# 定义工作流
flow:
- name: auth-flow
filter:
#- basic_auth:
# valid_users:
# elastic: ******
- set_basic_auth:
username: elastic
password: IQsMLRniP5BcYfoNzTBT
- name: set-auth-for-backup-flow
filter:
- set_basic_auth: #覆盖备集群的身份信息用于备集群正常处理请求
username: elastic
password: IQsMLRniP5BcYfoNzTBT
# 写请求优先发给主集群, 当主集群不可用时发给备集群
# 当主集群数据写入成功时,记录到队列中,异步消费写入备集群
- name: write-flow
filter:
- flow:
flows:
- auth-flow
- if:
# 当主集群可用时
cluster_available: ["cluster01"]
then:
# 先将数据写入主集群
- elasticsearch:
elasticsearch: "cluster01"
# 写入消息队列,等待 pipeline 异步消费到备集群
- queue:
queue_name: "cluster02-queue"
else:
- elasticsearch:
elasticsearch: "cluster02"
- queue:
queue_name: "cluster01-queue"
# 读请求优先发给主集群, 当主集群不可用时发给备集群
- name: read-flow
filter:
- flow:
flows:
- set-auth-for-backup-flow
- if:
cluster_available: ["cluster01"]
then:
- elasticsearch:
elasticsearch: "cluster01"
else:
- elasticsearch:
elasticsearch: "cluster02"
# 路由规则
router:
- name: my_router
# 默认路由
default_flow: write-flow
# 读请求路由
rules:
- method:
- "GET"
- "HEAD"
pattern:
- "/{any:*}"
flow:
- read-flow
- method:
- "POST"
- "GET"
pattern:
- "/_refresh"
- "/_count"
- "/_search"
- "/_msearch"
- "/_mget"
- "/{any_index}/_count"
- "/{any_index}/_search"
- "/{any_index}/_msearch"
- "/{any_index}/_mget"
flow:
- read-flow
# 定义管道, 异步将数据写入备集群
pipeline:
- name: cluster01-consumer
auto_start: true
keep_running: true
processor:
- queue_consumer:
input_queue: "cluster01-queue"
elasticsearch: "cluster01"
when:
cluster_available: ["cluster01"] # 当集群可用时,才消费队列中的数据
- name: cluster02-consumer
auto_start: true
keep_running: true
processor:
- queue_consumer:
input_queue: "cluster02-queue"
elasticsearch: "cluster02"
when:
cluster_available: ["cluster02"]
elastic:
enabled: true
remote_configs: false
health_check:
enabled: true
interval: 1s
availability_check:
enabled: true
interval: 1s
metadata_refresh:
enabled: true
interval: 1s
cluster_settings_check:
enabled: false
interval: 1s

启动网关

[root@k8s-master gateway]# ./bin/gateway

测试准备

建立本地docker es 集群,配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
[root@k8s-master gateway]# cat docker-compose.yml
version: '3.8'
services:
# 集群 cluster01
# Elasticsearch
es01:
image: docker.io/library/elasticsearch:7.9.3
container_name: es01
environment:
# 节点名
- node.name=es01
# 集群名
- cluster.name=cluster01
# 指定单节点启动
- discovery.type=single-node
# 开启内存锁定
- bootstrap.memory_lock=true
# 设置内存大小
- "ES_JAVA_OPTS=-Xms2g -Xmx2g"
# 启用安全
- xpack.security.enabled=true
# 设置 elastic 用户密码
- ELASTIC_PASSWORD=IQsMLRniP5BcYfoNzTBT
ulimits:
memlock:
soft: -1
hard: -1
# 映射到主机名的端口 宿主机端口:容器端口
ports:
- 9200:9200
volumes:
- data01:/usr/share/elasticsearch/data
networks:
- elastic
# Kibana
kib01:
image: kibana:7.9.3
container_name: kib01
ports:
- 5601:5601
environment:
# Elasticsearch 连接信息
ELASTICSEARCH_URL: http://es01:9200
ELASTICSEARCH_HOSTS: '["http://es01:9200"]'
ELASTICSEARCH_USERNAME: elastic
ELASTICSEARCH_PASSWORD: IQsMLRniP5BcYfoNzTBT
networks:
- elastic
# 存储卷
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local
# 网络
networks:
elastic:
driver: bridge

k8s 集群(已提前搭建)

kubectl exec -it $(kubectl get pods -n esbeta| grep elasticsearch-client | sed -n 1p | awk ‘{print $1}’) -n esbeta – bin/elasticsearch-setup-passwords interactive

大数据-实时计算Flink之FlinkSQL

发表于 2022-09-17   |  

背景

大数据处理技术现今已广泛应用于各个行业,为业务解决海量存储和海量分析的需求。但数据量的爆发式增长,对数据处理能力提出了更大的挑战,同时对时效性也提出了更高的要求。业务通常已不再满足滞后的分析结果,希望看到更实时的数据,从而在第一时间做出判断和决策。典型的场景如电商大促和金融风控等,基于延迟数据的分析结果已经失去了价值。

如何构建一个统一的数据湖存储,并在其上进行多种形式的数据分析,成了企业构建大数据生态的一个重要方向。如何快速、一致、原子性地在数据湖存储上构建起 Data Pipeline,成了亟待解决的问题。

下载flink-sql-connector-mysql-cdc, flink-sql-connector-kafka ,flink-sql-connector-hive到 /usr/lib/flink/lib/

1
2
3
4
5
6
7
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.14.2/flink-sql-connector-kafka_2.12-1.14.2.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.14.2/flink-sql-connector-hive-3.1.2_2.12-1.14.2.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.2/flink-connector-jdbc_2.12-1.14.2.jar

Flink1.11引入了CDC的connector,通过这种方式可以很方便地捕获变化的数据,大大简化了数据处理的流程。Flink1.11的CDC connector主要包括:MySQL CDC和Postgres CDC,同时对Kafka的Connector支持canal-json和debezium-json以及changelog-json的format。

简介

Flink CDC Connector 是ApacheFlink的一组数据源连接器,使用变化数据捕获change data capture (CDC))从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它可以充分利用Debezium的功能。

特点

  • 支持读取数据库快照,并且能够持续读取数据库的变更日志,即使发生故障,也支持exactly-once 的处理语义
  • 对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中使用多个数据库和表上的变更数据。
  • 对于Table/SQL API 的CDC connector,用户可以使用SQL DDL创建CDC数据源,来监视单个表上的数据变更。

使用场景

  • 数据库之间的增量数据同步
  • 审计日志
  • 数据库之上的实时物化视图
  • 基于CDC的维表join

Flink提供的 table format

Flink提供了一系列可以用于table connector的table format,具体如下:

Formats Supported Connectors
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro Apache Kafka, Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

使用过程中的注意点

使用MySQL CDC的注意点

如果要使用MySQL CDC connector,对于程序而言,需要添加如下依赖:

1
2
3
4
5
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.0.0</version>
</dependency>

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。

之前, mysql 中需加入配置

1
2
3
4
server_id=1
log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL

重启mysql.

mysql-cdc的操作实践

创建MySQL数据源表

在创建MySQL CDC表之前,需要先创建MySQL的数据表,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
-- MySQL
/*Table structure for table `order_info` */
DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
`consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
`total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
`order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1表示下单,2表示支付',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
`payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
`delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
`order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
`out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
`trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`operate_time` datetime DEFAULT NULL COMMENT '操作时间',
`expire_time` datetime DEFAULT NULL COMMENT '失效时间',
`tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
`parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
`province_id` int(20) DEFAULT NULL COMMENT '地区',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
-- ----------------------------
-- Records of order_info
-- ----------------------------
INSERT INTO `order_info`
VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
INSERT INTO `order_info`
VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
INSERT INTO `order_info`
VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);
/*Table structure for table `order_detail` */
CREATE TABLE `order_detail` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
`sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
`order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
`sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单明细表';
-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail`
VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38');
INSERT INTO `order_detail`
VALUES (1330, 477, 9, '荣耀10 GT游戏加速 AIS手持夜景 6GB+64GB 幻影蓝全网通 移动联通电信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25');
INSERT INTO `order_detail`
VALUES (1331, 478, 4, '小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34');
INSERT INTO `order_detail`
VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34');
INSERT INTO `order_detail`
VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34');

Flink SQL Cli创建CDC数据源

启动 Flink 集群,再启动 SQL CLI,执行下面命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 创建订单信息表
CREATE TABLE order_info(
id BIGINT,
user_id BIGINT,
create_time TIMESTAMP(0),
operate_time TIMESTAMP(0),
province_id INT,
order_status STRING,
total_amount DECIMAL(10, 5)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'kms-1',
'port' = '3306',
'username' = 'root',
'password' = '123qwe',
'database-name' = 'mydw',
'table-name' = 'order_info'
);

在Flink SQL Cli中查询该表的数据:result-mode: tableau,+表示数据的insert

img

在SQL CLI中创建订单详情表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE order_detail(
id BIGINT,
order_id BIGINT,
sku_id BIGINT,
sku_name STRING,
sku_num BIGINT,
order_price DECIMAL(10, 5),
create_time TIMESTAMP(0)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'kms-1',
'port' = '3306',
'username' = 'root',
'password' = '123qwe',
'database-name' = 'mydw',
'table-name' = 'order_detail'
);

查询结果如下:

img

执行JOIN操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
SELECT
od.id,
oi.id order_id,
oi.user_id,
oi.province_id,
od.sku_id,
od.sku_name,
od.sku_num,
od.order_price,
oi.create_time,
oi.operate_time
FROM
(
SELECT *
FROM order_info
WHERE
order_status = '2'-- 已支付
) oi
JOIN
(
SELECT *
FROM order_detail
) od
ON oi.id = od.order_id;

Headscale 之部署私有 DERP 中继服务器

发表于 2022-09-05   |  

STUN 是什么

Tailscale 的终极目标是让两台处于网络上的任何位置的机器建立点对点连接(直连),但现实世界是复杂的,大部份情况下机器都位于 NAT 和防火墙后面,这时候就需要通过打洞来实现直连,也就是 NAT 穿透。

NAT 按照 NAT 映射行为和有状态防火墙行为可以分为多种类型,但对于 NAT 穿透来说根本不需要关心这么多类型,只需要看 NAT 或者有状态防火墙是否会严格检查目标 Endpoint,根据这个因素,可以将 NAT 分为 Easy NAT 和 Hard NAT。

  • Easy NAT 及其变种称为 “Endpoint-Independent Mapping” (EIM,终点无关的映射)
    这里的 Endpoint 指的是目标 Endpoint,也就是说,有状态防火墙只要看到有客户端自己发起的出向包,就会允许相应的入向包进入,不管这个入向包是谁发进来的都可以。
  • hard NAT 以及变种称为 “Endpoint-Dependent Mapping”(EDM,终点相关的映射)
    这种 NAT 会针对每个目标 Endpoint 来生成一条相应的映射关系。 在这样的设备上,如果客户端向某个目标 Endpoint 发起了出向包,假设客户端的公网 IP 是 2.2.2.2,那么有状态防火墙就会打开一个端口,假设是 4242。那么只有来自该目标 Endpoint 的入向包才允许通过 2.2.2.2:4242,其他客户端一律不允许。这种 NAT 更加严格,所以叫 Hard NAT。

对于 Easy NAT,我们只需要提供一个第三方的服务,它能够告诉客户端“它看到的客户端的公网 ip:port 是什么”,然后将这个信息以某种方式告诉通信对端(peer),后者就知道该和哪个地址建连了!这种服务就叫 STUN (Session Traversal Utilities for NAT,NAT会话穿越应用程序)。它的工作流程如下图所示:

  • 笔记本向 STUN 服务器发送一个请求:“从你的角度看,我的地址什么?”
  • STUN 服务器返回一个响应:“我看到你的 UDP 包是从这个地址来的:ip:port”。

中继是什么

对于 Hard NAT 来说,STUN 就不好使了,即使 STUN 拿到了客户端的公网 ip:port 告诉通信对端也于事无补,因为防火墙是和 STUN 通信才打开的缺口,这个缺口只允许 STUN 的入向包进入,其他通信对端知道了这个缺口也进不来。通常企业级 NAT 都属于 Hard NAT。

这种情况下打洞是不可能了,但也不能就此放弃,可以选择一种折衷的方式:创建一个中继服务器(relay server),客户端与中继服务器进行通信,中继服务器再将包中继(relay)给通信对端。

至于中继的性能,那要看具体情况了:

  • 如果能直连,那显然没必要用中继方式;
  • 但如果无法直连,而中继路径又非常接近双方直连的真实路径,并且带宽足够大,那中继方式并不会明显降低通信质量。延迟肯定会增加一点,带宽会占用一些,但相比完全连接不上,还是可以接受的。

事实上对于大部分网络而言,Tailscale 都可以通过各种黑科技打洞成功,只有极少数情况下才会选择中继,中继只是一种 fallback 机制。

中继协议简介

中继协议有多种实现方式。

TURN

TURN 即 Traversal Using Relays around NAT,这是一种经典的中继实现方式,核心理念是:

  • 用户(人)先去公网上的 TURN 服务器认证,成功后后者会告诉你:“我已经为你分配了 ip:port,接下来将为你中继流量”,
  • 然后将这个 ip:port 地址告诉对方,让它去连接这个地址,接下去就是非常简单的客户端/服务器通信模型了。

与 STUN 不同,这种协议没有真正的交互性,不是很好用,因此 Tailscale 并没有采用 TURN 作为中继协议。

DERP

DERP 即 Detoured Encrypted Routing Protocol,这是 Tailscale 自研的一个协议:

  • 它是一个通用目的包中继协议,运行在 HTTP 之上,而大部分网络都是允许 HTTP 通信的。
  • 它根据目的公钥(destination’s public key)来中继加密的流量(encrypted payloads)。

自建私有 DERP server

Tailscale 的私钥只会保存在当前节点,因此 DERP server 无法解密流量,它只能和互联网上的其他路由器一样,呆呆地将加密的流量从一个节点转发到另一个节点,只不过 DERP 使用了一个稍微高级一点的协议来防止滥用。

Tailscale 开源了 DERP 服务器的代码,如果你感兴趣,可以阅读 DERP 的源代码。


Tailscale 官方内置了很多 DERP 服务器,分步在全球各地,惟独不包含中国大陆,原因你懂得。这就导致了一旦流量通过 DERP 服务器进行中继,延时就会非常高。而且官方提供的 DERP 服务器是万人骑,存在安全隐患。

为了实现低延迟、高安全性,我们可以参考 Tailscale 官方文档自建私有的 DERP 服务器。有两种部署模式,一种是基于域名,另外一种不需要域名,可以直接使用 IP,不过需要一点黑科技。我们先来看最简单的使用域名的方案。

还需要创建自签名证书,可以通过脚本来创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# build_cert.sh
#!/bin/bash
CERT_HOST=$1
CERT_DIR=$2
CONF_FILE=$3
echo "[req]
default_bits = 2048
distinguished_name = req_distinguished_name
req_extensions = req_ext
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
countryName = XX
stateOrProvinceName = N/A
localityName = N/A
organizationName = Self-signed certificate
commonName = $CERT_HOST: Self-signed certificate
[req_ext]
subjectAltName = @alt_names
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = $CERT_HOST
" > "$CONF_FILE"
mkdir -p "$CERT_DIR"
openssl req -x509 -nodes -days 730 -newkey rsa:2048 -keyout "$CERT_DIR/$CERT_HOST.key" -out "$CERT_DIR/$CERT_HOST.crt" -config "$CONF_FILE"

重新编写 Dockerfile,将 derper 的域名设置为 127.0.0.1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
FROM golang:latest AS builder
WORKDIR /app
# ========= CONFIG =========
# - download links
ENV MODIFIED_DERPER_GIT=https://github.com/yangchuansheng/ip_derper.git
ENV BRANCH=ip_derper
# ==========================
# build modified derper
RUN git clone -b $BRANCH $MODIFIED_DERPER_GIT tailscale --depth 1 && \
cd /app/tailscale/cmd/derper && \
/usr/local/go/bin/go build -ldflags "-s -w" -o /app/derper && \
cd /app && \
rm -rf /app/tailscale
FROM ubuntu:20.04
WORKDIR /app
# ========= CONFIG =========
# - derper args
ENV DERP_HOST=127.0.0.1
ENV DERP_CERTS=/app/certs/
ENV DERP_STUN true
ENV DERP_VERIFY_CLIENTS false
# ==========================
# apt
RUN apt-get update && \
apt-get install -y openssl curl
COPY build_cert.sh /app/
COPY --from=builder /app/derper /app/derper
# build self-signed certs && start derper
CMD bash /app/build_cert.sh $DERP_HOST $DERP_CERTS /app/san.conf && \
/app/derper --hostname=$DERP_HOST \
--certmode=manual \
--certdir=$DERP_CERTS \
--stun=$DERP_STUN \
--verify-clients=$DERP_VERIFY_CLIENTS

构建好镜像后,就可以在你想部署 derper 的主机上直接通过该镜像启动 derper 容器了,命令如下:

1
🐳 → docker run --restart always --net host --name derper -p 12345:12345 -p 3478:3478/udp -e DERP_ADDR=:12345 -d ghcr.io/yangchuansheng/ip_derper

Headscale 的本地 YAML 文件目前还不支持这个配置项,所以没办法,咱只能使用在线 URL 了。JSON 配置内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[root@jetlink headscale]# cat derp.json
{
"Regions": {
"901": {
"RegionID": 901,
"RegionCode": "ali-sh",
"RegionName": "Aliyun Shanghai",
"Nodes": [
{
"Name": "901a",
"RegionID": 901,
"DERPPort": 12345,
"HostName": "xx.xx.xx.118",
"IPv4": "xx.xx.xx.118",
"InsecureForTests": true
}
]
}
}
}

配置解析:

  • HostName 直接填 derper 的公网 IP,即和 IPv4 的值相同。
  • InsecureForTests 一定要设置为 true,以跳过域名验证。

你需要把这个 JSON 文件变成 Headscale 服务器可以访问的 URL,比如在 Headscale 主机上搭个 Nginx,或者上传到对象存储(比如阿里云 OSS)

接下来还需要修改 Headscale 的配置文件,引用上面的自定义 DERP 的 URL。需要修改的配置项如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# /etc/headscale/config.yaml
derp:
# List of externally available DERP maps encoded in JSON
urls:
# - https://controlplane.tailscale.com/derpmap/default
- https://xxxxx/derp.json
# Locally available DERP map files encoded in YAML
#
# This option is mostly interesting for people hosting
# their own DERP servers:
# https://tailscale.com/kb/1118/custom-derp-servers/
#
# paths:
# - /etc/headscale/derp-example.yaml
paths:
- /etc/headscale/derp.yaml
# If enabled, a worker will be set up to periodically
# refresh the given sources and update the derpmap
# will be set up.
auto_update_enabled: true
# How often should we check for DERP updates?
update_frequency: 24h

修改完配置后,重启 headscale 服务:

1
$ systemctl restart headscale

在 Tailscale 客户端上使用以下命令查看目前可以使用的 DERP 服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ tailscale netcheck
Report:
* UDP: true
* IPv4: yes, 192.168.100.1:49656
* IPv6: no
* MappingVariesByDestIP: true
* HairPinning: false
* PortMapping: UPnP
* Nearest DERP: Home Hangzhou
* DERP latency:
- home: 9.7ms (Home Hangzhou)
- hs: 25.2ms (Huawei Shanghai)
- thk: 43.5ms (Tencent Hongkong)

再次查看与通信对端的连接方式:

1
2
3
4
$ tailscale status
coredns default linux active; direct xxxx:45986; offline, tx 131012 rx 196020
oneplus-8t default android active; relay "home"; offline, tx 211900 rx 22780
openwrt default linux active; direct 192.168.100.254:41641; offline, tx 189868 rx 4074772

可以看到这一次 Tailscale 自动选择了一个线路最优的国内的 DERP 服务器作为中继,可以测试一下延迟:

1
2
3
4
$ tailscale ping 10.1.0.8
pong from oneplus-8t (10.1.0.8) via DERP(home) in 30ms
pong from oneplus-8t (10.1.0.8) via DERP(home) in 45ms
pong from oneplus-8t (10.1.0.8) via DERP(home) in 30ms

完美!这里的 home 当然是我的家庭宽带,部署方式与上面所说的国内云主机类似,你需要额外开启公网的端口映射(12345/tcp, 3478/udp)。还有一点需要注意的是配置内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
"Regions": {
"901": {
"RegionID": 901,
"RegionCode": "ali-sh",
"RegionName": "Aliyun Shanghai",
"Nodes": [
{
"Name": "901a",
"RegionID": 901,
"DERPPort": 443,
"HostName": "xxxx",
"IPv4": "xxxx",
"InsecureForTests": true
}
]
},
"902": {
"RegionID": 902,
"RegionCode": "home",
"RegionName": "Home Hangzhou",
"Nodes": [
{
"Name": "902a",
"RegionID": 902,
"DERPPort": 12345,
"HostName": "xxxx",
"InsecureForTests": true
}
]
}
}
}

与国内云主机相比,家庭宽带的配置有两点不同:

  • 需要删除 IPv4 配置项。因为家用宽带的公网 IP 是动态变化的,所以你需要使用 DDNS 来动态解析公网 IP。
  • HostName 最好填域名,因为你的公网 IP 是动态变化的,没法填写 IP,除非你不停地修改配置文件。填域名也没关系啦,反正不会验证域名的,也不用关心证书的事情,只要域名能解析到你的公网 IP 即可。

防止 DERP 被白嫖

默认情况下 DERP 服务器是可以被白嫖的,只要别人知道了你的 DERP 服务器的地址和端口,就可以为他所用。如果你的服务器是个小水管,用的人多了可能会把你撑爆,因此我们需要修改配置来防止被白嫖。

特别声明:只有使用域名的方式才可以通过认证防止被白嫖,使用纯 IP 的方式无法防白嫖,你只能小心翼翼地隐藏好你的 IP 和端口,不能让别人知道。

只需要做两件事情:

1、在 DERP 服务器上安装 Tailscale。

第一步需要在 DERP 服务所在的主机上安装 Tailscale 客户端,启动 tailscaled 进程。

2、derper 启动时加上参数 --verify-clients。

本文推荐的是通过容器启动, Dockerfile 内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
FROM golang:latest AS builder
LABEL org.opencontainers.image.source https://github.com/yangchuansheng/docker-image
WORKDIR /app
# https://tailscale.com/kb/1118/custom-derp-servers/
RUN go install tailscale.com/cmd/derper@main
FROM ubuntu
WORKDIR /app
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && \
apt-get install -y --no-install-recommends apt-utils && \
apt-get install -y ca-certificates && \
mkdir /app/certs
ENV DERP_DOMAIN your-hostname.com
ENV DERP_CERT_MODE letsencrypt
ENV DERP_CERT_DIR /app/certs
ENV DERP_ADDR :443
ENV DERP_STUN true
ENV DERP_HTTP_PORT 80
ENV DERP_VERIFY_CLIENTS false
COPY --from=builder /go/bin/derper .
CMD /app/derper --hostname=$DERP_DOMAIN \
--certmode=$DERP_CERT_MODE \
--certdir=$DERP_CERT_DIR \
--a=$DERP_ADDR \
--stun=$DERP_STUN \
--http-port=$DERP_HTTP_PORT \
--verify-clients=$DERP_VERIFY_CLIENTS

默认情况下 --verify-clients 参数设置的是 false。我们不需要对 Dockerfile 内容做任何改动,只需在容器启动时加上环境变量即可,将之前的启动命令修改一下:

1
2
3
4
5
6
7
8
🐳 → docker run --restart always \
--name derper -p 12345:12345 -p 3478:3478/udp \
-v /root/.acme.sh/xxxx/:/app/certs \
-e DERP_CERT_MODE=manual \
-e DERP_ADDR=:12345 \
-e DERP_DOMAIN=xxxx \
-e DERP_VERIFY_CLIENTS=true \
-d ghcr.io/yangchuansheng/derper:latest

这样就大功告成了,别人即使知道了你的 DERP 服务器地址也无法使用,但还是要说明一点,即便如此,你也应该尽量不让别人知道你的服务器地址,防止别人有可趁之机。

总结

本文给大家介绍了 STUN 对于辅助 NAT 穿透的意义,科普了几种常见的中继协议,包含 Tailscale 自研的 DERP 协议。最后手把手教大家如何自建私有的 DERP 服务器,并让 Tailscale 使用我们自建的 DERP 服务器。

123…16
peterfei

peterfei

peterfei|技术|上主是我的牧者

80 日志
14 分类
62 标签
RSS
github
© 2025 peterfei
由 Hexo 强力驱动
主题 - NexT.Mist